forked from voldemort/voldemort
/
RebalanceAsyncOperation.java
61 lines (49 loc) · 2.15 KB
/
RebalanceAsyncOperation.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package voldemort.server.rebalance.async;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.server.VoldemortConfig;
import voldemort.server.protocol.admin.AsyncOperation;
import voldemort.server.rebalance.Rebalancer;
import voldemort.store.metadata.MetadataStore;
public abstract class RebalanceAsyncOperation extends AsyncOperation {
protected final static Logger logger = Logger.getLogger(RebalanceAsyncOperation.class);
protected final VoldemortConfig voldemortConfig;
protected final MetadataStore metadataStore;
protected AdminClient adminClient;
protected final ExecutorService executors;
protected Rebalancer rebalancer;
protected ExecutorService createExecutors(int numThreads) {
return Executors.newFixedThreadPool(numThreads, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(r.getClass().getName());
return thread;
}
});
}
public RebalanceAsyncOperation(Rebalancer rebalancer,
VoldemortConfig voldemortConfig,
MetadataStore metadataStore,
int requestId,
String operationString) {
super(requestId, operationString);
this.voldemortConfig = voldemortConfig;
this.metadataStore = metadataStore;
this.adminClient = null;
this.executors = createExecutors(voldemortConfig.getMaxParallelStoresRebalancing());
this.rebalancer = rebalancer;
}
protected void waitForShutdown() {
try {
executors.shutdown();
executors.awaitTermination(voldemortConfig.getRebalancingTimeoutSec(), TimeUnit.SECONDS);
} catch(InterruptedException e) {
logger.error("Interrupted while awaiting termination for executors.", e);
}
}
}