forked from voldemort/voldemort
/
RebalanceTask.java
166 lines (141 loc) · 5.53 KB
/
RebalanceTask.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
/*
* Copyright 2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package voldemort.client.rebalance.task;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.rebalance.RebalanceBatchPlanProgressBar;
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.utils.RebalanceUtils;
public abstract class RebalanceTask implements Runnable {
protected final int batchId;
protected final int taskId;
protected final List<RebalancePartitionsInfo> stealInfos;
protected final Semaphore donorPermit;
protected final AdminClient adminClient;
protected final RebalanceBatchPlanProgressBar progressBar;
protected final Logger loggerToUse;
protected Exception exception;
protected final AtomicBoolean isComplete;
protected final int partitionStoreCount;
protected long permitAcquisitionTimeMs;
protected long taskCompletionTimeMs;
protected final static int INVALID_REBALANCE_ID = -1;
public RebalanceTask(final int batchId,
final int taskId,
final List<RebalancePartitionsInfo> stealInfos,
final Semaphore donorPermit,
final AdminClient adminClient,
final RebalanceBatchPlanProgressBar progressBar,
final Logger logger) {
this.batchId = batchId;
this.taskId = taskId;
this.stealInfos = stealInfos;
this.donorPermit = donorPermit;
this.adminClient = adminClient;
this.progressBar = progressBar;
this.loggerToUse = logger;
this.exception = null;
this.isComplete = new AtomicBoolean(false);
this.partitionStoreCount = RebalanceUtils.countPartitionStores(stealInfos);
this.permitAcquisitionTimeMs = -1;
this.taskCompletionTimeMs = -1;
taskLog(toString());
}
public List<RebalancePartitionsInfo> getStealInfos() {
return this.stealInfos;
}
public boolean isComplete() {
return this.isComplete.get();
}
public boolean hasException() {
return exception != null;
}
public Exception getError() {
return exception;
}
protected void acquirePermit(int nodeId) throws InterruptedException {
permitStart(nodeId);
donorPermit.acquire();
permitAcquired(nodeId);
}
@Override
public String toString() {
return "Rebalance task " + taskId + "from batch " + batchId + " : " + getStealInfos();
}
/**
* Helper method to log updates in uniform manner that includes batch & task
* ID.
*
* @param message
*/
protected void taskLog(String message) {
RebalanceUtils.printBatchTaskLog(batchId, taskId, loggerToUse, message);
}
/**
* Helper method to pretty print progress and timing info.
*
* @param nodeId node ID for which donor permit is required
*/
protected void permitStart(int nodeId) {
permitAcquisitionTimeMs = System.currentTimeMillis();
taskLog("Acquiring donor permit for node " + nodeId + ".");
}
/**
* Helper method to pretty print progress and timing info.
*
* @param nodeId node ID for which donor permit is required
*/
protected void permitAcquired(int nodeId) {
String durationString = "";
if(permitAcquisitionTimeMs >= 0) {
long durationMs = System.currentTimeMillis() - permitAcquisitionTimeMs;
permitAcquisitionTimeMs = -1;
durationString = " in " + TimeUnit.MILLISECONDS.toSeconds(durationMs) + " seconds.";
}
taskLog("Acquired donor permit for node " + nodeId + durationString);
}
/**
* Helper method to pretty print progress and timing info.
*
* @param rebalanceAsyncId ID of the async rebalancing task
*/
protected void taskStart(int rebalanceAsyncId) {
taskCompletionTimeMs = System.currentTimeMillis();
taskLog("Starting rebalance of " + partitionStoreCount
+ " partition-stores for async operation id " + rebalanceAsyncId + ".");
progressBar.beginTask(taskId);
}
/**
* Helper method to pretty print progress and timing info.
*
* @param rebalanceAsyncId ID of the async rebalancing task
*/
protected void taskDone(int rebalanceAsyncId) {
String durationString = "";
if(taskCompletionTimeMs >= 0) {
long durationMs = System.currentTimeMillis() - taskCompletionTimeMs;
taskCompletionTimeMs = -1;
durationString = " in " + TimeUnit.MILLISECONDS.toSeconds(durationMs) + " seconds.";
}
taskLog("Successfully finished rebalance of " + partitionStoreCount
+ " for async operation id " + rebalanceAsyncId + durationString);
progressBar.completeTask(taskId, partitionStoreCount);
}
}