Navigation Menu

Skip to content

Commit

Permalink
Address minor review feedback
Browse files Browse the repository at this point in the history
Mostly, made variable names more clear

AdminClient
- cleaned up comments & TODO

RebalanceController
- fix subtle bug in when proxyPause invoked

RepartitoinerTest
- changed zone expansion test to cover two use cases: when invoked from
  RepartitoinerCLI with a current cluster versus with an interim
  cluster.
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent f816900 commit 412088d
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 126 deletions.
19 changes: 12 additions & 7 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -2393,17 +2393,17 @@ public Versioned<RebalancerState> getRemoteRebalancerState(int nodeId) {
value.getVersion());
}

// TODO: The "Order" column makes no sense to me. This is very, very
// hard to grok. I want to prefix each entry with in the order column
// with a number (e.g., '0: rebalance', '1: cluster->Swap', ...) but
// have no idea what the order is. Are they in order in the table? what
// do the arrows mean in the order column?

/**
* Used in rebalancing to indicate change in states. Groups the
* partition plans on the basis of stealer nodes and sends them over.
*
* The various combinations and their order of execution is given below
* where:
* <ul>
* <li>'cluster' means cluster state is updated
* <li>'rebalance' means rebalance flag is set.
* <li>'swap' means stores are swapped.
* </ul>
*
* <pre>
* | swapRO | changeClusterMetadata | changeRebalanceState | Order |
Expand All @@ -2414,7 +2414,12 @@ public Versioned<RebalancerState> getRemoteRebalancerState(int nodeId) {
* </pre>
*
*
* Similarly for rollback:
* Similarly for rollback, order means the following:
* <ul>
* <li>'remove from rebalance' means set rebalance flag false
* <li>'cluster' means cluster is rolled back
* <li>'swap' means stores are swapped
* </ul>
*
* <pre>
* | swapRO | changeClusterMetadata | changeRebalanceState | Order |
Expand Down
Expand Up @@ -31,25 +31,33 @@ public class RebalanceBatchPlanProgressBar {
private static final DecimalFormat decimalFormatter = new DecimalFormat("#.##");

private final int batchId;
private final int taskCount;
private final int partitionStoreCount;
private final int totalTaskCount;
private final int totalPartitionStoreCount;

private final long startTimeMs;

private Set<Integer> tasksInFlight;
private int numTasks;
private int numPartitionStores;
private int numTasksCompleted;
private int numPartitionStoresMigrated;

RebalanceBatchPlanProgressBar(int batchId, int taskCount, int partitionStoreCount) {
/**
* Construct a progress bar object to track rebalance tasks completed and
* partition-stores migrated.
*
* @param batchId
* @param totalTaskCount
* @param totalPartitionStoreCount
*/
RebalanceBatchPlanProgressBar(int batchId, int totalTaskCount, int totalPartitionStoreCount) {
this.batchId = batchId;
this.taskCount = taskCount;
this.partitionStoreCount = partitionStoreCount;
this.totalTaskCount = totalTaskCount;
this.totalPartitionStoreCount = totalPartitionStoreCount;

this.startTimeMs = System.currentTimeMillis();

this.tasksInFlight = new HashSet<Integer>();
this.numTasks = 0;
this.numPartitionStores = 0;
this.numTasksCompleted = 0;
this.numPartitionStoresMigrated = 0;
}

/**
Expand All @@ -65,17 +73,17 @@ synchronized public void beginTask(int taskId) {

/**
* Called whenever a rebalance task completes. This means one task is done
* and some number of partition stores have been moved.
* and some number of partition stores have been migrated.
*
* @param taskId
* @param partitionStoreCount Number of partition stores moved by this
* @param totalPartitionStoreCount Number of partition stores moved by this
* completed task.
*/
synchronized public void completeTask(int taskId, int taskPartitionStores) {
synchronized public void completeTask(int taskId, int partitionStoresMigrated) {
tasksInFlight.remove(taskId);

numTasks++;
numPartitionStores += taskPartitionStores;
numTasksCompleted++;
numPartitionStoresMigrated += partitionStoresMigrated;

updateProgressBar();
}
Expand All @@ -89,8 +97,8 @@ synchronized public void completeTask(int taskId, int taskPartitionStores) {
synchronized public String getPrettyProgressBar() {
StringBuilder sb = new StringBuilder();

double taskRate = numTasks / (double) taskCount;
double partitionStoreRate = numPartitionStores / (double) partitionStoreCount;
double taskRate = numTasksCompleted / (double) totalTaskCount;
double partitionStoreRate = numPartitionStoresMigrated / (double) totalPartitionStoreCount;

long deltaTimeMs = System.currentTimeMillis() - startTimeMs;
long taskTimeRemainingMs = Long.MAX_VALUE;
Expand All @@ -110,7 +118,8 @@ synchronized public String getPrettyProgressBar() {
.append(".")
.append(Utils.NEWLINE);
// Tasks completed update
sb.append("\t" + numTasks + " out of " + taskCount + " rebalance tasks complete.")
sb.append("\t" + numTasksCompleted + " out of " + totalTaskCount
+ " rebalance tasks complete.")
.append(Utils.NEWLINE)
.append("\t")
.append(decimalFormatter.format(taskRate * 100.0))
Expand All @@ -121,7 +130,7 @@ synchronized public String getPrettyProgressBar() {
.append(" minutes) remaining.")
.append(Utils.NEWLINE);
// Partition-stores migrated update
sb.append("\t" + numPartitionStores + " out of " + partitionStoreCount
sb.append("\t" + numPartitionStoresMigrated + " out of " + totalPartitionStoreCount
+ " partition-stores migrated.")
.append(Utils.NEWLINE)
.append("\t")
Expand Down
19 changes: 8 additions & 11 deletions src/java/voldemort/client/rebalance/RebalanceController.java
Expand Up @@ -74,13 +74,13 @@ public class RebalanceController {
private final int maxParallelRebalancing;
private final int maxTriesRebalancing;
private final boolean stealerBasedRebalancing;
private final long proxyPauseS;
private final long proxyPauseSec;

public RebalanceController(String bootstrapUrl,
int maxParallelRebalancing,
int maxTriesRebalancing,
boolean stealerBased,
long proxyPauseS) {
long proxyPauseSec) {
this.adminClient = new AdminClient(bootstrapUrl,
new AdminClientConfig(),
new ClientConfig());
Expand All @@ -91,7 +91,7 @@ public RebalanceController(String bootstrapUrl,
this.maxParallelRebalancing = maxParallelRebalancing;
this.maxTriesRebalancing = maxTriesRebalancing;
this.stealerBasedRebalancing = stealerBased;
this.proxyPauseS = proxyPauseS;
this.proxyPauseSec = proxyPauseSec;
}

/**
Expand Down Expand Up @@ -354,7 +354,6 @@ private void executeBatch(int batchId, final RebalanceBatchPlan batchPlan) {

// STEP 2 - Move RO data
if(hasReadOnlyStores) {
proxyPause();
RebalanceBatchPlanProgressBar progressBar = batchPlan.getProgressBar(batchId);
executeSubBatch(batchId,
progressBar,
Expand Down Expand Up @@ -383,9 +382,7 @@ private void executeBatch(int batchId, final RebalanceBatchPlan batchPlan) {

// STEP 4 - Move RW data
if(hasReadWriteStores) {
if(!hasReadOnlyStores) {
proxyPause();
}
proxyPause();
RebalanceBatchPlanProgressBar progressBar = batchPlan.getProgressBar(batchId);
executeSubBatch(batchId,
progressBar,
Expand Down Expand Up @@ -415,9 +412,9 @@ private void executeBatch(int batchId, final RebalanceBatchPlan batchPlan) {
*/
private void proxyPause() {
logger.info("Pausing after cluster state has changed to allow proxy bridges to be established. "
+ "Will start rebalancing work on servers in " + proxyPauseS + " seconds.");
+ "Will start rebalancing work on servers in " + proxyPauseSec + " seconds.");
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(proxyPauseS));
Thread.sleep(TimeUnit.SECONDS.toMillis(proxyPauseSec));
} catch(InterruptedException e) {
logger.warn("Sleep interrupted in proxy pause.");
}
Expand All @@ -442,7 +439,7 @@ private void proxyPause() {
*
* Truth table, FTW!
*
* @param batchId Rebalancing task id
* @param batchId Rebalancing batch id
* @param batchCurrentCluster Current cluster
* @param batchFinalCluster Transition cluster to propagate
* @param rebalancePartitionPlanList List of partition plan list
Expand Down Expand Up @@ -562,7 +559,7 @@ private void rebalanceStateChange(final int batchId,
* | 7 | f | f | f | won't be triggered |
* </pre>
*
* @param batchId Rebalance task id
* @param batchId Rebalance batch id
* @param batchRollbackCluster Cluster to rollback to if we have a problem
* @param rebalancePartitionPlanList The list of rebalance partition plans
* @param hasReadOnlyStores Are we rebalancing any read-only stores?
Expand Down
32 changes: 20 additions & 12 deletions src/java/voldemort/client/rebalance/task/RebalanceTask.java
Expand Up @@ -42,7 +42,8 @@ public abstract class RebalanceTask implements Runnable {
protected final AtomicBoolean isComplete;

protected final int partitionStoreCount;
protected long timeMs;
protected long permitAcquisitionTimeMs;
protected long taskCompletionTimeMs;

protected final static int INVALID_REBALANCE_ID = -1;

Expand All @@ -65,7 +66,8 @@ public RebalanceTask(final int batchId,
this.isComplete = new AtomicBoolean(false);

this.partitionStoreCount = RebalanceUtils.countPartitionStores(stealInfos);
this.timeMs = 0;
this.permitAcquisitionTimeMs = -1;
this.taskCompletionTimeMs = -1;

taskLog(toString());
}
Expand Down Expand Up @@ -113,7 +115,7 @@ protected void taskLog(String message) {
* @param nodeId node ID for which donor permit is required
*/
protected void permitStart(int nodeId) {
timeMs = System.currentTimeMillis();
permitAcquisitionTimeMs = System.currentTimeMillis();
taskLog("Acquiring donor permit for node " + nodeId + ".");
}

Expand All @@ -123,10 +125,13 @@ protected void permitStart(int nodeId) {
* @param nodeId node ID for which donor permit is required
*/
protected void permitAcquired(int nodeId) {
long durationMs = System.currentTimeMillis() - timeMs;
timeMs = 0;
taskLog("Acquired donor permit for node " + nodeId + " in "
+ TimeUnit.MILLISECONDS.toSeconds(durationMs) + " seconds.");
String durationString = "";
if(permitAcquisitionTimeMs >= 0) {
long durationMs = System.currentTimeMillis() - permitAcquisitionTimeMs;
permitAcquisitionTimeMs = -1;
durationString = " in " + TimeUnit.MILLISECONDS.toSeconds(durationMs) + " seconds.";
}
taskLog("Acquired donor permit for node " + nodeId + durationString);
}

/**
Expand All @@ -135,7 +140,7 @@ protected void permitAcquired(int nodeId) {
* @param rebalanceAsyncId ID of the async rebalancing task
*/
protected void taskStart(int rebalanceAsyncId) {
timeMs = System.currentTimeMillis();
taskCompletionTimeMs = System.currentTimeMillis();
taskLog("Starting rebalance of " + partitionStoreCount
+ " partition-stores for async operation id " + rebalanceAsyncId + ".");
progressBar.beginTask(taskId);
Expand All @@ -147,11 +152,14 @@ protected void taskStart(int rebalanceAsyncId) {
* @param rebalanceAsyncId ID of the async rebalancing task
*/
protected void taskDone(int rebalanceAsyncId) {
long durationMs = System.currentTimeMillis() - timeMs;
timeMs = 0;
String durationString = "";
if(taskCompletionTimeMs >= 0) {
long durationMs = System.currentTimeMillis() - taskCompletionTimeMs;
taskCompletionTimeMs = -1;
durationString = " in " + TimeUnit.MILLISECONDS.toSeconds(durationMs) + " seconds.";
}
taskLog("Successfully finished rebalance of " + partitionStoreCount
+ " for async operation id " + rebalanceAsyncId + " in "
+ TimeUnit.MILLISECONDS.toSeconds(durationMs) + " seconds.");
+ " for async operation id " + rebalanceAsyncId + durationString);

progressBar.completeTask(taskId, partitionStoreCount);
}
Expand Down
8 changes: 4 additions & 4 deletions src/java/voldemort/tools/RebalanceControllerCLI.java
Expand Up @@ -103,7 +103,7 @@ private static void printUsage() {
help.append(" Optional:\n");
help.append(" --final-stores <storesXML> [ Needed for zone expansion ]\n");
help.append(" --parallelism <parallelism> [ Number of rebalancing tasks to run in parallel ]");
help.append(" --proxy-pause <proxyPause> [ Seconds to pause between cluster change and server-side rebalancing tasks ]");
help.append(" --proxy-pause <proxyPauseSec> [ Seconds to pause between cluster change and server-side rebalancing tasks ]");
help.append(" --tries <tries> [ Number of times to try starting an async rebalancing task on a node ");
help.append(" --output-dir [ Output directory in which plan is stored ]\n");
help.append(" --batch <batch> [ Number of primary partitions to move in each rebalancing batch. ]\n");
Expand Down Expand Up @@ -167,16 +167,16 @@ public static void main(String[] args) throws Exception {
tries = (Integer) options.valueOf("tries");
}

long proxyPauseS = RebalanceController.PROXY_PAUSE_IN_SECONDS;
long proxyPauseSec = RebalanceController.PROXY_PAUSE_IN_SECONDS;
if(options.has("proxy-pause")) {
proxyPauseS = (Long) options.valueOf("proxy-pause");
proxyPauseSec = (Long) options.valueOf("proxy-pause");
}

RebalanceController rebalanceController = new RebalanceController(bootstrapURL,
parallelism,
tries,
stealerBased,
proxyPauseS);
proxyPauseSec);

Cluster currentCluster = rebalanceController.getCurrentCluster();
List<StoreDefinition> currentStoreDefs = rebalanceController.getCurrentStoreDefs();
Expand Down

0 comments on commit 412088d

Please sign in to comment.