diff --git a/src/java/voldemort/client/rebalance/RebalanceController.java b/src/java/voldemort/client/rebalance/RebalanceController.java index 6ae971f86c..c9173f323e 100644 --- a/src/java/voldemort/client/rebalance/RebalanceController.java +++ b/src/java/voldemort/client/rebalance/RebalanceController.java @@ -153,7 +153,7 @@ public void rebalance(final RebalancePlan rebalancePlan) { List finalStores = rebalancePlan.getFinalStores(); validatePlan(rebalancePlan); - validateCluster(finalCluster, finalStores); + prepareForRebalance(finalCluster, finalStores); logger.info("Propagating cluster " + finalCluster + " to all nodes"); // TODO: (atomic cluster/stores update) Add finalStores here so that @@ -163,6 +163,11 @@ public void rebalance(final RebalancePlan rebalancePlan) { executePlan(rebalancePlan); } + /** + * Validates all aspects of the plan (i.e., all config files) + * + * @param rebalancePlan + */ private void validatePlan(RebalancePlan rebalancePlan) { logger.info("Validating plan state."); @@ -178,7 +183,18 @@ private void validatePlan(RebalancePlan rebalancePlan) { RebalanceUtils.validateRebalanceStore(finalStores); } - private void validateCluster(Cluster finalCluster, List finalStores) { + /** + * Validates deployed cluster: + * + * + * @param finalCluster + * @param finalStores + */ + private void prepareForRebalance(Cluster finalCluster, List finalStores) { logger.info("Validating state of deployed cluster."); // Reset the cluster that the admin client points at @@ -309,9 +325,6 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) { RebalanceUtils.printLog(batchCount, logger, "Starting batch " + batchCount + "."); - // Flatten the node plans to partition plans - List rebalancePartitionPlanList = rebalancePartitionsInfoList; - // Split the store definitions List readOnlyStoreDefs = StoreDefinitionUtils.filterStores(batchStoreDefs, true); @@ -323,7 +336,7 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) { // STEP 1 - Cluster state change boolean finishedReadOnlyPhase = false; - List filteredRebalancePartitionPlanList = RebalanceUtils.filterPartitionPlanWithStores(rebalancePartitionPlanList, + List filteredRebalancePartitionPlanList = RebalanceUtils.filterPartitionPlanWithStores(rebalancePartitionsInfoList, readOnlyStoreDefs); rebalanceStateChange(batchCount, @@ -346,7 +359,7 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) { // STEP 3 - Cluster change state finishedReadOnlyPhase = true; - filteredRebalancePartitionPlanList = RebalanceUtils.filterPartitionPlanWithStores(rebalancePartitionPlanList, + filteredRebalancePartitionPlanList = RebalanceUtils.filterPartitionPlanWithStores(rebalancePartitionsInfoList, readWriteStoreDefs); rebalanceStateChange(batchCount, diff --git a/src/java/voldemort/client/rebalance/RebalancePartitionsInfo.java b/src/java/voldemort/client/rebalance/RebalancePartitionsInfo.java index 1bba84e2e1..658fbc694f 100644 --- a/src/java/voldemort/client/rebalance/RebalancePartitionsInfo.java +++ b/src/java/voldemort/client/rebalance/RebalancePartitionsInfo.java @@ -114,7 +114,7 @@ public RebalancePartitionsInfo(int stealerNodeId, this.initialCluster = Utils.notNull(initialCluster); } - private void flattenStoreToReplicaToAddPartitionListTOStoreToPartitionIds() { + private void flattenStoreToReplicaTypeToAddPartitionListTOStoreToPartitionIds() { this.storeToPartitionIds = new HashMap>(); for(Entry>> entry: storeToReplicaToAddPartitionList.entrySet()) { if(!this.storeToPartitionIds.containsKey(entry.getKey())) { @@ -270,7 +270,7 @@ public synchronized HashMap> getReplicaToAddPartitionList public synchronized void setStoreToReplicaToAddPartitionList(HashMap>> storeToReplicaToAddPartitionList) { this.storeToReplicaToAddPartitionList = storeToReplicaToAddPartitionList; findMaxReplicaType(storeToReplicaToAddPartitionList); - flattenStoreToReplicaToAddPartitionListTOStoreToPartitionIds(); + flattenStoreToReplicaTypeToAddPartitionListTOStoreToPartitionIds(); } public synchronized void removeStore(String storeName) { diff --git a/src/java/voldemort/client/rebalance/task/StealerBasedRebalanceTask.java b/src/java/voldemort/client/rebalance/task/StealerBasedRebalanceTask.java index 7f2df14efe..e3c23a8051 100644 --- a/src/java/voldemort/client/rebalance/task/StealerBasedRebalanceTask.java +++ b/src/java/voldemort/client/rebalance/task/StealerBasedRebalanceTask.java @@ -29,6 +29,19 @@ public class StealerBasedRebalanceTask extends RebalanceTask { private final int stealerNodeId; // TODO: What is the use of maxTries for stealer-based tasks? Need to // validate reason for existence or remove. + // NOTES FROM VINOTH: + // I traced the code down and it seems like this is basically used to + // reissue StealerBasedRebalanceTask when it encounters an + // AlreadyRebalancingException (which is tied to obtaining a rebalance + // permit for the donor node) .. In general, I vote for removing this + // parameter.. I think we should have the controller wait/block with a + // decent log message if it truly blocked on other tasks to complete... But, + // we need to check how likely this retry is saving us grief today and + // probably stick to it for sometime, as we stabliize the code base with the + // new planner/controller et al...Right way to do this.. Controller simply + // submits "work" to the server and servers are mature enough to throttle + // and process them as fast as they can. Since that looks like changing all + // the server execution frameworks, let's stick with this for now.. private final int maxTries; public StealerBasedRebalanceTask(final int taskId, diff --git a/src/java/voldemort/tools/RebalanceControllerCLI.java b/src/java/voldemort/tools/RebalanceControllerCLI.java index 0c3fefec9a..bcef1647e1 100644 --- a/src/java/voldemort/tools/RebalanceControllerCLI.java +++ b/src/java/voldemort/tools/RebalanceControllerCLI.java @@ -66,8 +66,9 @@ private static void setupParser() { .withRequiredArg() .ofType(Long.class) .describedAs("sec"); + // TODO: Can this option be described better? parser.accepts("parallelism", - "Number of rebalances to run in parallel [ Default:" + "Number of servers running stealer- or donor-based tasks in parallel [ Default:" + RebalanceController.MAX_PARALLEL_REBALANCING + " ]") .withRequiredArg() .ofType(Integer.class) @@ -81,7 +82,7 @@ private static void setupParser() { .withRequiredArg() .describedAs("stores.xml"); - parser.accepts("batch", + parser.accepts("batch-size", "Number of primary partitions to move together [ RebalancePlan parameter; Default : " + RebalancePlan.BATCH_SIZE + " ]") .withRequiredArg() @@ -195,7 +196,7 @@ public static void main(String[] args) throws Exception { RebalanceUtils.validateCurrentFinalCluster(currentCluster, finalCluster); // Process optional "planning" arguments - int batchSize = CmdUtils.valueOf(options, "batch", RebalancePlan.BATCH_SIZE); + int batchSize = CmdUtils.valueOf(options, "batch-size", RebalancePlan.BATCH_SIZE); String outputDir = null; if(options.has("output-dir")) { diff --git a/src/java/voldemort/tools/RebalancePlanCLI.java b/src/java/voldemort/tools/RebalancePlanCLI.java index 29f60ae978..647d50ca95 100644 --- a/src/java/voldemort/tools/RebalancePlanCLI.java +++ b/src/java/voldemort/tools/RebalancePlanCLI.java @@ -60,7 +60,7 @@ private static void setupParser() { "Path to target store definition xml. Needed for zone expansion.") .withRequiredArg() .describedAs("stores.xml"); - parser.accepts("batch", + parser.accepts("batch-size", "Number of primary partitions to move together [ Default : " + RebalancePlan.BATCH_SIZE + " ]") .withRequiredArg() @@ -149,7 +149,7 @@ public static void main(String[] args) throws Exception { List targetStoreDefs = new StoreDefinitionsMapper().readStoreList(new File(targetStoresXML)); // Optional args - int batchSize = CmdUtils.valueOf(options, "batch", RebalancePlan.BATCH_SIZE); + int batchSize = CmdUtils.valueOf(options, "batch-size", RebalancePlan.BATCH_SIZE); String outputDir = null; if(options.has("output-dir")) { diff --git a/src/java/voldemort/utils/RebalanceUtils.java b/src/java/voldemort/utils/RebalanceUtils.java index 48c327a6dc..75b849749e 100644 --- a/src/java/voldemort/utils/RebalanceUtils.java +++ b/src/java/voldemort/utils/RebalanceUtils.java @@ -241,11 +241,13 @@ public static void checkEachServerInNormalState(final Cluster cluster, */ public static void validateClusterStores(final Cluster cluster, final List storeDefs) { - // Constructing a PartitionBalance object has the (desirable in this + // Constructing a StoreRoutingPlan has the (desirable in this // case) side-effect of verifying that the store definition is congruent // with the cluster definition. If there are issues, exceptions are // thrown. - new PartitionBalance(cluster, storeDefs); + for(StoreDefinition storeDefinition: storeDefs) { + new StoreRoutingPlan(cluster, storeDefinition); + } return; } @@ -621,8 +623,13 @@ public static List getStolenPrimaryPartitions(final Cluster currentClus .getPartitionIds()); List currentList = new ArrayList(); - if(ClusterUtils.containsNode(currentCluster, stealNodeId)) + if(ClusterUtils.containsNode(currentCluster, stealNodeId)) { currentList = currentCluster.getNodeById(stealNodeId).getPartitionIds(); + } else { + // TODO: Is throwing exception desirable here? + throw new VoldemortException("Current cluster does not contain stealer node (cluster : [[[" + + currentCluster + "]]], node id " + stealNodeId + ")"); + } // remove all current partitions from targetList targetList.removeAll(currentList);