Skip to content

Commit

Permalink
Address review of initial RebalanceController work
Browse files Browse the repository at this point in the history
- Removed 'timeout' option for rebalancing since always setting timeouts
  to infintie (Long.MAX_VALUE) is the recommended practice.

- Cleaned up some TODOs and added some javadoc in response to code
  review from Lei
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent 92faebd commit 45903e0
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 68 deletions.
6 changes: 6 additions & 0 deletions release_notes.txt
@@ -1,3 +1,9 @@
Pending release notes...

Note: Server changes are not backwards compatible. To use new
rebalancing tooling, servers must be upgraded before hand.


Release 1.3.3 on 04/24/2013
* VoldemortBuildandPush
- Fixed bug with schema check
Expand Down
33 changes: 33 additions & 0 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -750,6 +750,24 @@ public String waitForCompletion(int nodeId, int requestId, long maxWait, TimeUni
return waitForCompletion(nodeId, requestId, maxWait, timeUnit, null);
}

/**
* Wait for async task at (remote) nodeId to finish completion, using
* exponential backoff to poll the task completion status. Effectively
* waits forever.
* <p>
*
* <i>Logs the status at each status check if debug is enabled.</i>
*
* @param nodeId Id of the node to poll
* @param requestId Id of the request to check
* @return description The final description attached with the response
* @throws VoldemortException if task failed to finish in specified
* maxWait time.
*/
public String waitForCompletion(int nodeId, int requestId) {
return waitForCompletion(nodeId, requestId, Long.MAX_VALUE, TimeUnit.SECONDS, null);
}

/**
* Wait till the passed value matches with the metadata value returned
* by the remote node for the passed key.
Expand Down Expand Up @@ -794,6 +812,21 @@ public void waitForCompletion(int nodeId,
+ maxWait + " " + timeUnit.toString() + " time.");
}

/**
* Wait till the passed value matches with the metadata value returned
* by the remote node for the passed key. Effectively waits forever.
* <p>
*
* <i>Logs the status at each status check if debug is enabled.</i>
*
* @param nodeId Id of the node to poll
* @param key metadata key to keep checking for current value
* @param value metadata value should match for exit criteria.
*/
public void waitForCompletion(int nodeId, String key, String value) {
waitForCompletion(nodeId, key, value, Long.MAX_VALUE, TimeUnit.SECONDS);
}

}

/**
Expand Down
10 changes: 10 additions & 0 deletions src/java/voldemort/client/rebalance/RebalanceBatchPlan.java
Expand Up @@ -40,6 +40,16 @@
* partition-stores included in the move are based on those listed in storeDefs.
* This batch plan is execution-agnostic, i.e., a plan is generated and later
* stealer- versus donor-based execution of that plan is decided.
*
* Long term, its unclear if the notion of RebalanceBatchPlan separate from
* RebalancePlan is needed. Batching tends to increase the overall cost of
* rebalancing and has historically been error prone. (I.e., the transition
* between batches has had intermittent failures.) Its value, if any, lies in
* allowing long-running (days or weeks) rebalancing jobs to have interim
* checkpoints such that single node failures don't force a restart from initial
* state. Should consider deprecating batching after zone expansion and zone
* shrinking have been done successfully as short (less than a day or two),
* single-batch rebalances.
*/
public class RebalanceBatchPlan {

Expand Down
31 changes: 10 additions & 21 deletions src/java/voldemort/client/rebalance/RebalanceController.java
Expand Up @@ -68,18 +68,15 @@ public class RebalanceController {

public final static int MAX_PARALLEL_REBALANCING = 1;
public final static int MAX_TRIES_REBALANCING = 2;
public final static long REBALANCING_CLIENT_TIMEOUT_SEC = TimeUnit.DAYS.toSeconds(30);
public final static boolean STEALER_BASED_REBALANCING = true;

private final int maxParallelRebalancing;
private final int maxTriesRebalancing;
private final long rebalancingClientTimeoutSeconds;
private final boolean stealerBasedRebalancing;

public RebalanceController(String bootstrapUrl,
int maxParallelRebalancing,
int maxTriesRebalancing,
long rebalancingClientTimeoutSeconds,
boolean stealerBased) {
this.adminClient = new AdminClient(bootstrapUrl,
new AdminClientConfig(),
Expand All @@ -90,7 +87,6 @@ public RebalanceController(String bootstrapUrl,

this.maxParallelRebalancing = maxParallelRebalancing;
this.maxTriesRebalancing = maxTriesRebalancing;
this.rebalancingClientTimeoutSeconds = rebalancingClientTimeoutSeconds;
this.stealerBasedRebalancing = stealerBased;
}

Expand Down Expand Up @@ -240,15 +236,16 @@ private void executePlan(RebalancePlan rebalancePlan) {
}
}

// TODO: Add interim progress reporting of some sort. Whenever a task
// completes, a sane intra-batch progress report should be provided.
/**
* Pretty print a progress update after each batch complete.
*
* @param id
* @param batchCount
* @param numBatches
* @param partitionStoreCount
* @param numPartitionStores
* @param totalTimeMs
* @param batchCount current batch
* @param numBatches total number of batches
* @param partitionStoreCount partition stores migrated
* @param numPartitionStores total number of partition stores to migrate
* @param totalTimeMs total time, in milliseconds, of execution thus far.
*/
private void batchStatusLog(int batchCount,
int numBatches,
Expand Down Expand Up @@ -572,16 +569,10 @@ private void executeSubBatch(final int taskId,
service,
rebalancePartitionPlanList,
donorPermits);
RebalanceUtils.printLog(taskId, logger, "All rebalance tasks submitted");

// All tasks submitted.
RebalanceUtils.printLog(taskId,
logger,
"All rebalance tasks were submitted ( shutting down in "
+ this.rebalancingClientTimeoutSeconds + " sec )");

// Wait and shutdown after timeout
RebalanceUtils.executorShutDown(service, this.rebalancingClientTimeoutSeconds);

// Wait and shutdown after (infinite) timeout
RebalanceUtils.executorShutDown(service, Long.MAX_VALUE);
RebalanceUtils.printLog(taskId, logger, "Finished waiting for executors");

// Collects all failures + incomplete tasks from the rebalance
Expand Down Expand Up @@ -666,7 +657,6 @@ private List<RebalanceTask> executeTasks(final int taskId,
for(RebalancePartitionsInfo partitionsInfo: rebalancePartitionPlanList) {
StealerBasedRebalanceTask rebalanceTask = new StealerBasedRebalanceTask(taskId,
partitionsInfo,
rebalancingClientTimeoutSeconds,
maxTriesRebalancing,
donorPermits[partitionsInfo.getDonorId()],
adminClient);
Expand All @@ -680,7 +670,6 @@ private List<RebalanceTask> executeTasks(final int taskId,
for(Entry<Integer, List<RebalancePartitionsInfo>> entries: donorNodeBasedPartitionsInfo.entrySet()) {
DonorBasedRebalanceTask rebalanceTask = new DonorBasedRebalanceTask(taskId,
entries.getValue(),
rebalancingClientTimeoutSeconds,
donorPermits[entries.getValue()
.get(0)
.getDonorId()],
Expand Down
Expand Up @@ -332,8 +332,12 @@ public synchronized String toString() {
return sb.toString();
}

// TODO: Add javadoc
// TODO: move to rebalanceUtils? Or leave here?
/**
* Pretty prints a task list of rebalancing tasks.
*
* @param infos list of rebalancing tasks (RebalancePartitiosnInfo)
* @return pretty-printed string
*/
public static String taskListToString(List<RebalancePartitionsInfo> infos) {
StringBuffer sb = new StringBuffer();
for(RebalancePartitionsInfo info: infos) {
Expand Down
26 changes: 22 additions & 4 deletions src/java/voldemort/client/rebalance/RebalancePlan.java
Expand Up @@ -36,9 +36,10 @@
import com.google.common.collect.Lists;
import com.google.common.collect.TreeMultimap;

// TODO: Add a header comment.
// TODO: Remove stealerBased from the constructor once RebalanceController is
// switched over to use RebalancePlan. (Or sooner)
/**
* RebalancePlan encapsulates all aspects of planning a shuffle, cluster
* expansion, or zone expansion.
*/
public class RebalancePlan {

private static final Logger logger = Logger.getLogger(RebalancePlan.class);
Expand Down Expand Up @@ -67,7 +68,24 @@ public class RebalancePlan {
private final MoveMap nodeMoveMap;
private final MoveMap zoneMoveMap;

// TODO: Add javadoc
/**
* Constructs a plan for the specified change from currentCluster/StoreDefs
* to finalCluster/StoreDefs.
*
* finalStoreDefs are needed for the zone expansion use case since store
* definitions depend on the number of zones.
*
* In theory, since current & final StoreDefs are passed in, this plan could
* be used to transform deployed store definitions. In practice, this use
* case has not been tested.
*
* @param currentCluster current deployed cluster
* @param currentStoreDefs current deployed store defs
* @param finalCluster desired deployed cluster
* @param finalStoreDefs desired deployed store defs
* @param batchSize number of primary partitions to move in each batch.
* @param outputDir directory in which to dump metadata files for the plan
*/
public RebalancePlan(final Cluster currentCluster,
final List<StoreDefinition> currentStoreDefs,
final Cluster finalCluster,
Expand Down
Expand Up @@ -2,7 +2,6 @@

import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

Expand All @@ -25,10 +24,9 @@ public class DonorBasedRebalanceTask extends RebalanceTask {

public DonorBasedRebalanceTask(final int taskId,
final List<RebalancePartitionsInfo> stealInfos,
final long timeoutSeconds,
final Semaphore donorPermit,
final AdminClient adminClient) {
super(taskId, stealInfos, timeoutSeconds, donorPermit, adminClient);
super(taskId, stealInfos, donorPermit, adminClient);
RebalanceUtils.assertSameDonor(stealInfos, -1);
this.donorNodeId = stealInfos.get(0).getDonorId();
}
Expand All @@ -46,11 +44,7 @@ public void run() {
+ " rebalancing task " + stealInfos);
rebalanceAsyncId = adminClient.rebalanceOps.rebalanceNode(stealInfos);

// Wait for the task to get over
adminClient.rpcOps.waitForCompletion(donorNodeId,
rebalanceAsyncId,
timeoutSeconds,
TimeUnit.SECONDS);
adminClient.rpcOps.waitForCompletion(donorNodeId, rebalanceAsyncId);
RebalanceUtils.printLog(taskId,
logger,
"Succesfully finished rebalance for async operation id "
Expand Down
3 changes: 0 additions & 3 deletions src/java/voldemort/client/rebalance/task/RebalanceTask.java
Expand Up @@ -11,7 +11,6 @@ public abstract class RebalanceTask implements Runnable {

protected final int taskId;
protected Exception exception;
protected final long timeoutSeconds;
protected final AdminClient adminClient;
protected final Semaphore donorPermit;
protected final AtomicBoolean isComplete;
Expand All @@ -21,12 +20,10 @@ public abstract class RebalanceTask implements Runnable {

public RebalanceTask(final int taskId,
final List<RebalancePartitionsInfo> stealInfos,
final long timeoutSeconds,
final Semaphore donorPermit,
final AdminClient adminClient) {
this.stealInfos = stealInfos;
this.taskId = taskId;
this.timeoutSeconds = timeoutSeconds;
this.adminClient = adminClient;
this.donorPermit = donorPermit;
this.exception = null;
Expand Down
@@ -1,7 +1,6 @@
package voldemort.client.rebalance.task;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

Expand Down Expand Up @@ -42,15 +41,15 @@ public class StealerBasedRebalanceTask extends RebalanceTask {
// submits "work" to the server and servers are mature enough to throttle
// and process them as fast as they can. Since that looks like changing all
// the server execution frameworks, let's stick with this for now..
// TODO: Talk with Lei and Vinoth and decide on the fate of ma
private final int maxTries;

public StealerBasedRebalanceTask(final int taskId,
final RebalancePartitionsInfo stealInfo,
final long timeoutSeconds,
final int maxTries,
final Semaphore donorPermit,
final AdminClient adminClient) {
super(taskId, Lists.newArrayList(stealInfo), timeoutSeconds, donorPermit, adminClient);
super(taskId, Lists.newArrayList(stealInfo), donorPermit, adminClient);
this.maxTries = maxTries;
this.stealerNodeId = stealInfo.getStealerId();
}
Expand All @@ -77,9 +76,7 @@ private int startNodeRebalancing() {
+ " is currently rebalancing. Waiting till completion");
adminClient.rpcOps.waitForCompletion(stealerNodeId,
MetadataStore.SERVER_STATE_KEY,
VoldemortState.NORMAL_SERVER.toString(),
timeoutSeconds,
TimeUnit.SECONDS);
VoldemortState.NORMAL_SERVER.toString());
rebalanceException = e;
}
}
Expand All @@ -99,12 +96,7 @@ public void run() {
donorPermit.acquire();

rebalanceAsyncId = startNodeRebalancing();

// Wait for the task to get over
adminClient.rpcOps.waitForCompletion(stealerNodeId,
rebalanceAsyncId,
timeoutSeconds,
TimeUnit.SECONDS);
adminClient.rpcOps.waitForCompletion(stealerNodeId, rebalanceAsyncId);
RebalanceUtils.printLog(taskId,
logger,
"Succesfully finished rebalance for async operation id "
Expand Down
15 changes: 2 additions & 13 deletions src/java/voldemort/tools/RebalanceControllerCLI.java
Expand Up @@ -59,13 +59,6 @@ private static void setupParser() {
.withRequiredArg()
.ofType(Integer.class)
.describedAs("num-tries");
// TODO: Can this option be deprecated?
parser.accepts("timeout",
"Time-out in seconds for rebalancing of a single task ( stealer - donor tuple ) [ Default : "
+ RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC + " ]")
.withRequiredArg()
.ofType(Long.class)
.describedAs("sec");
// TODO: Can this option be described better?
parser.accepts("parallelism",
"Number of servers running stealer- or donor-based tasks in parallel [ Default:"
Expand Down Expand Up @@ -105,6 +98,8 @@ private static void printUsage() {
help.append(" --final-cluster <clusterXML>\n");
help.append(" Optional:\n");
help.append(" --final-stores <storesXML> [ Needed for zone expansion ]\n");
help.append(" --parallelism <parallelism> [ Number of rebalancing tasks to run in parallel ]");
help.append(" --tries <tries> [ Number of times to try starting an async rebalancing task on a node ");
help.append(" --output-dir [ Output directory in which plan is stored ]\n");
help.append(" --batch <batch> [ Number of primary partitions to move in each rebalancing batch. ]\n");
help.append(" --output-dir <outputDir> [ Directory in which cluster metadata is dumped for each batch of the plan. ]\n");
Expand Down Expand Up @@ -167,15 +162,9 @@ public static void main(String[] args) throws Exception {
tries = (Integer) options.valueOf("tries");
}

long timeout = RebalanceController.REBALANCING_CLIENT_TIMEOUT_SEC;
if(options.has("timeout")) {
timeout = (Integer) options.valueOf("timeout");
}

RebalanceController rebalanceController = new RebalanceController(bootstrapURL,
parallelism,
tries,
timeout,
stealerBased);

Cluster currentCluster = rebalanceController.getCurrentCluster();
Expand Down

0 comments on commit 45903e0

Please sign in to comment.