Skip to content

Commit

Permalink
Addressed review feedback on new RebalanceController.
Browse files Browse the repository at this point in the history
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent ad4b726 commit 91c0b18
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 17 deletions.
27 changes: 20 additions & 7 deletions src/java/voldemort/client/rebalance/RebalanceController.java
Expand Up @@ -153,7 +153,7 @@ public void rebalance(final RebalancePlan rebalancePlan) {
List<StoreDefinition> finalStores = rebalancePlan.getFinalStores();

validatePlan(rebalancePlan);
validateCluster(finalCluster, finalStores);
prepareForRebalance(finalCluster, finalStores);

logger.info("Propagating cluster " + finalCluster + " to all nodes");
// TODO: (atomic cluster/stores update) Add finalStores here so that
Expand All @@ -163,6 +163,11 @@ public void rebalance(final RebalancePlan rebalancePlan) {
executePlan(rebalancePlan);
}

/**
* Validates all aspects of the plan (i.e., all config files)
*
* @param rebalancePlan
*/
private void validatePlan(RebalancePlan rebalancePlan) {
logger.info("Validating plan state.");

Expand All @@ -178,7 +183,18 @@ private void validatePlan(RebalancePlan rebalancePlan) {
RebalanceUtils.validateRebalanceStore(finalStores);
}

private void validateCluster(Cluster finalCluster, List<StoreDefinition> finalStores) {
/**
* Validates deployed cluster:
* <ul>
* <li>sets local admin client to finalCluster
* <li>checks that all servers are currently in normal state
* <li>confirms read-only stores can be rebalanced.
* </ul>
*
* @param finalCluster
* @param finalStores
*/
private void prepareForRebalance(Cluster finalCluster, List<StoreDefinition> finalStores) {
logger.info("Validating state of deployed cluster.");

// Reset the cluster that the admin client points at
Expand Down Expand Up @@ -309,9 +325,6 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {

RebalanceUtils.printLog(batchCount, logger, "Starting batch " + batchCount + ".");

// Flatten the node plans to partition plans
List<RebalancePartitionsInfo> rebalancePartitionPlanList = rebalancePartitionsInfoList;

// Split the store definitions
List<StoreDefinition> readOnlyStoreDefs = StoreDefinitionUtils.filterStores(batchStoreDefs,
true);
Expand All @@ -323,7 +336,7 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {

// STEP 1 - Cluster state change
boolean finishedReadOnlyPhase = false;
List<RebalancePartitionsInfo> filteredRebalancePartitionPlanList = RebalanceUtils.filterPartitionPlanWithStores(rebalancePartitionPlanList,
List<RebalancePartitionsInfo> filteredRebalancePartitionPlanList = RebalanceUtils.filterPartitionPlanWithStores(rebalancePartitionsInfoList,
readOnlyStoreDefs);

rebalanceStateChange(batchCount,
Expand All @@ -346,7 +359,7 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {

// STEP 3 - Cluster change state
finishedReadOnlyPhase = true;
filteredRebalancePartitionPlanList = RebalanceUtils.filterPartitionPlanWithStores(rebalancePartitionPlanList,
filteredRebalancePartitionPlanList = RebalanceUtils.filterPartitionPlanWithStores(rebalancePartitionsInfoList,
readWriteStoreDefs);

rebalanceStateChange(batchCount,
Expand Down
Expand Up @@ -114,7 +114,7 @@ public RebalancePartitionsInfo(int stealerNodeId,
this.initialCluster = Utils.notNull(initialCluster);
}

private void flattenStoreToReplicaToAddPartitionListTOStoreToPartitionIds() {
private void flattenStoreToReplicaTypeToAddPartitionListTOStoreToPartitionIds() {
this.storeToPartitionIds = new HashMap<String, List<Integer>>();
for(Entry<String, HashMap<Integer, List<Integer>>> entry: storeToReplicaToAddPartitionList.entrySet()) {
if(!this.storeToPartitionIds.containsKey(entry.getKey())) {
Expand Down Expand Up @@ -270,7 +270,7 @@ public synchronized HashMap<Integer, List<Integer>> getReplicaToAddPartitionList
public synchronized void setStoreToReplicaToAddPartitionList(HashMap<String, HashMap<Integer, List<Integer>>> storeToReplicaToAddPartitionList) {
this.storeToReplicaToAddPartitionList = storeToReplicaToAddPartitionList;
findMaxReplicaType(storeToReplicaToAddPartitionList);
flattenStoreToReplicaToAddPartitionListTOStoreToPartitionIds();
flattenStoreToReplicaTypeToAddPartitionListTOStoreToPartitionIds();
}

public synchronized void removeStore(String storeName) {
Expand Down
Expand Up @@ -29,6 +29,19 @@ public class StealerBasedRebalanceTask extends RebalanceTask {
private final int stealerNodeId;
// TODO: What is the use of maxTries for stealer-based tasks? Need to
// validate reason for existence or remove.
// NOTES FROM VINOTH:
// I traced the code down and it seems like this is basically used to
// reissue StealerBasedRebalanceTask when it encounters an
// AlreadyRebalancingException (which is tied to obtaining a rebalance
// permit for the donor node) .. In general, I vote for removing this
// parameter.. I think we should have the controller wait/block with a
// decent log message if it truly blocked on other tasks to complete... But,
// we need to check how likely this retry is saving us grief today and
// probably stick to it for sometime, as we stabliize the code base with the
// new planner/controller et al...Right way to do this.. Controller simply
// submits "work" to the server and servers are mature enough to throttle
// and process them as fast as they can. Since that looks like changing all
// the server execution frameworks, let's stick with this for now..
private final int maxTries;

public StealerBasedRebalanceTask(final int taskId,
Expand Down
7 changes: 4 additions & 3 deletions src/java/voldemort/tools/RebalanceControllerCLI.java
Expand Up @@ -66,8 +66,9 @@ private static void setupParser() {
.withRequiredArg()
.ofType(Long.class)
.describedAs("sec");
// TODO: Can this option be described better?
parser.accepts("parallelism",
"Number of rebalances to run in parallel [ Default:"
"Number of servers running stealer- or donor-based tasks in parallel [ Default:"
+ RebalanceController.MAX_PARALLEL_REBALANCING + " ]")
.withRequiredArg()
.ofType(Integer.class)
Expand All @@ -81,7 +82,7 @@ private static void setupParser() {
.withRequiredArg()
.describedAs("stores.xml");

parser.accepts("batch",
parser.accepts("batch-size",
"Number of primary partitions to move together [ RebalancePlan parameter; Default : "
+ RebalancePlan.BATCH_SIZE + " ]")
.withRequiredArg()
Expand Down Expand Up @@ -195,7 +196,7 @@ public static void main(String[] args) throws Exception {
RebalanceUtils.validateCurrentFinalCluster(currentCluster, finalCluster);

// Process optional "planning" arguments
int batchSize = CmdUtils.valueOf(options, "batch", RebalancePlan.BATCH_SIZE);
int batchSize = CmdUtils.valueOf(options, "batch-size", RebalancePlan.BATCH_SIZE);

String outputDir = null;
if(options.has("output-dir")) {
Expand Down
4 changes: 2 additions & 2 deletions src/java/voldemort/tools/RebalancePlanCLI.java
Expand Up @@ -60,7 +60,7 @@ private static void setupParser() {
"Path to target store definition xml. Needed for zone expansion.")
.withRequiredArg()
.describedAs("stores.xml");
parser.accepts("batch",
parser.accepts("batch-size",
"Number of primary partitions to move together [ Default : "
+ RebalancePlan.BATCH_SIZE + " ]")
.withRequiredArg()
Expand Down Expand Up @@ -149,7 +149,7 @@ public static void main(String[] args) throws Exception {
List<StoreDefinition> targetStoreDefs = new StoreDefinitionsMapper().readStoreList(new File(targetStoresXML));

// Optional args
int batchSize = CmdUtils.valueOf(options, "batch", RebalancePlan.BATCH_SIZE);
int batchSize = CmdUtils.valueOf(options, "batch-size", RebalancePlan.BATCH_SIZE);

String outputDir = null;
if(options.has("output-dir")) {
Expand Down
13 changes: 10 additions & 3 deletions src/java/voldemort/utils/RebalanceUtils.java
Expand Up @@ -241,11 +241,13 @@ public static void checkEachServerInNormalState(final Cluster cluster,
*/
public static void validateClusterStores(final Cluster cluster,
final List<StoreDefinition> storeDefs) {
// Constructing a PartitionBalance object has the (desirable in this
// Constructing a StoreRoutingPlan has the (desirable in this
// case) side-effect of verifying that the store definition is congruent
// with the cluster definition. If there are issues, exceptions are
// thrown.
new PartitionBalance(cluster, storeDefs);
for(StoreDefinition storeDefinition: storeDefs) {
new StoreRoutingPlan(cluster, storeDefinition);
}
return;
}

Expand Down Expand Up @@ -621,8 +623,13 @@ public static List<Integer> getStolenPrimaryPartitions(final Cluster currentClus
.getPartitionIds());

List<Integer> currentList = new ArrayList<Integer>();
if(ClusterUtils.containsNode(currentCluster, stealNodeId))
if(ClusterUtils.containsNode(currentCluster, stealNodeId)) {
currentList = currentCluster.getNodeById(stealNodeId).getPartitionIds();
} else {
// TODO: Is throwing exception desirable here?
throw new VoldemortException("Current cluster does not contain stealer node (cluster : [[["
+ currentCluster + "]]], node id " + stealNodeId + ")");
}

// remove all current partitions from targetList
targetList.removeAll(currentList);
Expand Down

0 comments on commit 91c0b18

Please sign in to comment.