Skip to content

Commit

Permalink
refactoring.
Browse files Browse the repository at this point in the history
  • Loading branch information
bbansal committed Jan 8, 2010
1 parent 5f1300a commit fb948b2
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 154 deletions.
4 changes: 3 additions & 1 deletion src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -702,11 +702,13 @@ public void waitForCompletion(int nodeId, int requestId, long maxWait, TimeUnit
long delay = INITIAL_DELAY;
long waitUntil = System.currentTimeMillis() + timeUnit.toMillis(maxWait);

String description = null;
while(System.currentTimeMillis() < waitUntil) {
try {
AsyncOperationStatus status = getAsyncRequestStatus(nodeId, requestId);
logger.debug("Status for async task " + requestId + " at node " + nodeId + " is "
+ status);
description = status.getDescription();
if(status.isComplete())
return;
if(status.hasException())
Expand All @@ -721,7 +723,7 @@ public void waitForCompletion(int nodeId, int requestId, long maxWait, TimeUnit
Thread.currentThread().interrupt();
}
} catch(Exception e) {
throw new VoldemortException("Failed while waiting for async task " + requestId
throw new VoldemortException("Failed while waiting for async task " + description
+ " at node " + nodeId + " to finish", e);
}
}
Expand Down
14 changes: 5 additions & 9 deletions src/java/voldemort/client/rebalance/RebalanceController.java
Expand Up @@ -30,20 +30,17 @@ public class RebalanceController {

private static Logger logger = Logger.getLogger(RebalanceController.class);

private final ExecutorService executor;
private final AdminClient adminClient;
RebalanceClientConfig rebalanceConfig;

public RebalanceController(String bootstrapUrl, RebalanceClientConfig rebalanceConfig) {
this.adminClient = new AdminClient(bootstrapUrl, rebalanceConfig);
this.rebalanceConfig = rebalanceConfig;
this.executor = createExecutors(rebalanceConfig.getMaxParallelRebalancing());
}

public RebalanceController(Cluster cluster, RebalanceClientConfig config) {
this.adminClient = new AdminClient(cluster, config);
this.rebalanceConfig = config;
this.executor = createExecutors(rebalanceConfig.getMaxParallelRebalancing());
}

private ExecutorService createExecutors(int numThreads) {
Expand Down Expand Up @@ -98,9 +95,11 @@ public void rebalance(final Cluster targetCluster) {
System.currentTimeMillis()),
new ArrayList<Integer>());

ExecutorService executor = createExecutors(rebalanceConfig.getMaxParallelRebalancing());

// start all threads
for(int nThreads = 0; nThreads < this.rebalanceConfig.getMaxParallelRebalancing(); nThreads++) {
this.executor.execute(new Runnable() {
executor.execute(new Runnable() {

public void run() {
// pick one node to rebalance from queue
Expand All @@ -124,7 +123,6 @@ public void run() {
try {
commitClusterChanges(adminClient.getAdminClientCluster()
.getNodeById(stealerNodeId),
targetCluster,
rebalanceSubTask);
} catch(Exception e) {
if(-1 != rebalanceAsyncId) {
Expand Down Expand Up @@ -205,7 +203,6 @@ public AdminClient getAdminClient() {

public void stop() {
adminClient.stop();
executor.shutdownNow();
}

/* package level function to ease of unit testing */
Expand All @@ -224,9 +221,8 @@ public void stop() {
* @param rebalanceStealInfo
* @throws Exception
*/
void commitClusterChanges(Node stealerNode,
Cluster targetCluster,
RebalancePartitionsInfo rebalanceStealInfo) throws Exception {
void commitClusterChanges(Node stealerNode, RebalancePartitionsInfo rebalanceStealInfo)
throws Exception {
synchronized(adminClient) {
Cluster currentCluster = adminClient.getAdminClientCluster();
Node donorNode = currentCluster.getNodeById(rebalanceStealInfo.getDonorId());
Expand Down

0 comments on commit fb948b2

Please sign in to comment.