From 464df36183f7ceabf2c5e325088bb07019b2552f Mon Sep 17 00:00:00 2001 From: Jay J Wylie Date: Thu, 9 May 2013 08:39:43 -0700 Subject: [PATCH] Initial commit of new rebalance controller. This is an interim commit. The new RebalanceControllerCLI has been added. Portions of the RebalanceController have been updated to use the RebalancePlan. All the tests that should pass, still pass for the old RebalanceController. About to switch tests to use new RebalanceController that works with a RebalancePlan. --- .../rebalance/OrderedClusterTransition.java | 10 + .../client/rebalance/RebalanceCLI.java | 52 +-- .../rebalance/RebalanceClientConfig.java | 4 + .../rebalance/RebalanceClusterPlan.java | 6 +- .../client/rebalance/RebalanceController.java | 314 +++++++++++++++++- .../client/rebalance/RebalancePlan.java | 16 + .../tools/RebalanceControllerCLI.java | 214 ++++++++++++ .../voldemort/tools/RebalancePlanCLI.java | 8 +- src/java/voldemort/utils/RebalanceUtils.java | 3 +- .../AbstractNonZonedRebalanceTest.java | 1 + .../rebalance/AbstractRebalanceTest.java | 1 + .../rebalance/AbstractZonedRebalanceTest.java | 1 + 12 files changed, 582 insertions(+), 48 deletions(-) create mode 100644 src/java/voldemort/tools/RebalanceControllerCLI.java diff --git a/src/java/voldemort/client/rebalance/OrderedClusterTransition.java b/src/java/voldemort/client/rebalance/OrderedClusterTransition.java index c5eec5cebd..7734df0cad 100644 --- a/src/java/voldemort/client/rebalance/OrderedClusterTransition.java +++ b/src/java/voldemort/client/rebalance/OrderedClusterTransition.java @@ -36,6 +36,7 @@ public class OrderedClusterTransition { private String printedContent; private final int id; + @Deprecated public OrderedClusterTransition(final Cluster currentCluster, final Cluster targetCluster, List storeDefs, @@ -48,6 +49,15 @@ public OrderedClusterTransition(final Cluster currentCluster, this.orderedRebalancePartitionsInfoList = orderedClusterPlan(rebalanceClusterPlan); } + public OrderedClusterTransition(final RebalanceClusterPlan rebalanceClusterPlan) { + this.id = idGen.incrementAndGet(); + this.currentCluster = rebalanceClusterPlan.getCurrentCluster(); + this.targetCluster = rebalanceClusterPlan.getFinalCluster(); + this.storeDefs = rebalanceClusterPlan.getStoreDefs(); + this.rebalanceClusterPlan = rebalanceClusterPlan; + this.orderedRebalancePartitionsInfoList = orderedClusterPlan(rebalanceClusterPlan); + } + public List getStoreDefs() { return this.storeDefs; } diff --git a/src/java/voldemort/client/rebalance/RebalanceCLI.java b/src/java/voldemort/client/rebalance/RebalanceCLI.java index 2a409b2cfe..e2a36dec2c 100644 --- a/src/java/voldemort/client/rebalance/RebalanceCLI.java +++ b/src/java/voldemort/client/rebalance/RebalanceCLI.java @@ -153,6 +153,7 @@ public static void main(String[] args) throws Exception { } if(options.has("url")) { + // Old rebalancing controller if(!options.has("target-cluster")) { System.err.println("Missing required arguments: target-cluster"); @@ -169,8 +170,12 @@ public static void main(String[] args) throws Exception { rebalanceController.rebalance(targetCluster); } else { - - Set missing = CmdUtils.missing(options, "current-cluster", "current-stores"); + // Entropy tool + Set missing = CmdUtils.missing(options, + "entropy", + "output-dir", + "current-cluster", + "current-stores"); if(missing.size() > 0) { System.err.println("Missing required arguments: " + Joiner.on(", ").join(missing)); @@ -184,45 +189,20 @@ public static void main(String[] args) throws Exception { Cluster currentCluster = new ClusterMapper().readCluster(new File(currentClusterXML)); List storeDefs = new StoreDefinitionsMapper().readStoreList(new File(currentStoresXML)); - // TODO: Remove this option. - if(options.has("entropy")) { - - if(!config.hasOutputDirectory()) { - System.err.println("Missing arguments output-dir"); - printHelp(System.err, parser); - System.exit(ERROR_EXIT_CODE); - } - - boolean entropy = (Boolean) options.valueOf("entropy"); - boolean verbose = options.has("verbose-logging"); - long numKeys = CmdUtils.valueOf(options, "keys", Entropy.DEFAULT_NUM_KEYS); - Entropy generator = new Entropy(-1, numKeys, verbose); - generator.generateEntropy(currentCluster, - storeDefs, - new File(config.getOutputDirectory()), - entropy); - return; - - } - - if(!options.has("target-cluster")) { - System.err.println("Missing required arguments: target-cluster"); - printHelp(System.err, parser); - System.exit(ERROR_EXIT_CODE); - } - - String targetClusterXML = (String) options.valueOf("target-cluster"); - Cluster targetCluster = new ClusterMapper().readCluster(new File(targetClusterXML)); - - rebalanceController = new RebalanceController(currentCluster, config); - rebalanceController.rebalance(currentCluster, targetCluster, storeDefs); - + boolean entropy = (Boolean) options.valueOf("entropy"); + boolean verbose = options.has("verbose-logging"); + long numKeys = CmdUtils.valueOf(options, "keys", Entropy.DEFAULT_NUM_KEYS); + Entropy generator = new Entropy(-1, numKeys, verbose); + generator.generateEntropy(currentCluster, + storeDefs, + new File(config.getOutputDirectory()), + entropy); } - exitCode = SUCCESS_EXIT_CODE; if(logger.isInfoEnabled()) { logger.info("Successfully terminated rebalance all tasks"); } + exitCode = SUCCESS_EXIT_CODE; } catch(VoldemortException e) { logger.error("Unsuccessfully terminated rebalance operation - " + e.getMessage(), e); diff --git a/src/java/voldemort/client/rebalance/RebalanceClientConfig.java b/src/java/voldemort/client/rebalance/RebalanceClientConfig.java index 5ee7a4ee62..1ad869f0c0 100644 --- a/src/java/voldemort/client/rebalance/RebalanceClientConfig.java +++ b/src/java/voldemort/client/rebalance/RebalanceClientConfig.java @@ -21,6 +21,10 @@ import voldemort.client.protocol.admin.AdminClientConfig; import voldemort.utils.Props; +// TODO: This class mixes configuration of RebalancePlan with configuration of +// RebalanceController. It needs to be deprecated so that a clean way of setting +// the parameters/configs for each of these classes is put in place. +@Deprecated public class RebalanceClientConfig extends AdminClientConfig { public final static int MAX_PARALLEL_REBALANCING = 1; diff --git a/src/java/voldemort/client/rebalance/RebalanceClusterPlan.java b/src/java/voldemort/client/rebalance/RebalanceClusterPlan.java index 437fcc8e73..a721d208c3 100644 --- a/src/java/voldemort/client/rebalance/RebalanceClusterPlan.java +++ b/src/java/voldemort/client/rebalance/RebalanceClusterPlan.java @@ -171,7 +171,7 @@ public RebalanceClusterPlan(final Cluster targetCluster, // TODO: (end) Remove ... } - public Cluster getTargetCluster() { + public Cluster getCurrentCluster() { return targetCluster; } @@ -179,6 +179,10 @@ public Cluster getFinalCluster() { return finalCluster; } + public List getStoreDefs() { + return storeDefs; + } + @Deprecated public Queue getRebalancingTaskQueue() { return rebalanceTaskQueue; diff --git a/src/java/voldemort/client/rebalance/RebalanceController.java b/src/java/voldemort/client/rebalance/RebalanceController.java index ff5fb5355e..ddbf70246a 100644 --- a/src/java/voldemort/client/rebalance/RebalanceController.java +++ b/src/java/voldemort/client/rebalance/RebalanceController.java @@ -24,6 +24,7 @@ import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; @@ -31,6 +32,7 @@ import voldemort.client.ClientConfig; import voldemort.client.protocol.RequestFormatType; import voldemort.client.protocol.admin.AdminClient; +import voldemort.client.protocol.admin.AdminClientConfig; import voldemort.client.rebalance.task.DonorBasedRebalanceTask; import voldemort.client.rebalance.task.RebalanceTask; import voldemort.client.rebalance.task.StealerBasedRebalanceTask; @@ -39,9 +41,11 @@ import voldemort.server.rebalance.VoldemortRebalancingException; import voldemort.store.StoreDefinition; import voldemort.utils.NodeUtils; +import voldemort.utils.Pair; import voldemort.utils.RebalanceUtils; import voldemort.utils.StoreDefinitionUtils; import voldemort.utils.Time; +import voldemort.utils.Utils; import voldemort.versioning.Versioned; import voldemort.xml.ClusterMapper; @@ -55,18 +59,77 @@ public class RebalanceController { private static final DecimalFormat decimalFormatter = new DecimalFormat("#.##"); private final AdminClient adminClient; + private final Cluster currentCluster; + private final List currentStoreDefs; + + @Deprecated private final RebalanceClientConfig rebalanceConfig; + public final static int MAX_PARALLEL_REBALANCING = 1; + public final static int MAX_TRIES_REBALANCING = 2; + public final static long REBALANCING_CLIENT_TIMEOUT_SEC = TimeUnit.DAYS.toSeconds(30); + public final static boolean STEALER_BASED_REBALANCING = true; + public final static boolean DELETE_AFTER_REBALANCING = false; + + private final int maxParallelRebalancing; + private final int maxTriesRebalancing; + private final long rebalancingClientTimeoutSeconds; + private final boolean stealerBasedRebalancing; + private final boolean deleteAfterRebalancingEnabled; + + public RebalanceController(String bootstrapUrl, + int maxParallelRebalancing, + int maxTriesRebalancing, + long rebalancingClientTimeoutSeconds, + boolean stealerBased, + boolean deleteAfter) { + this.adminClient = new AdminClient(bootstrapUrl, + new AdminClientConfig(), + new ClientConfig()); + Pair> pair = getCurrentClusterState(); + this.currentCluster = pair.getFirst(); + this.currentStoreDefs = pair.getSecond(); + + this.rebalanceConfig = null; + this.maxParallelRebalancing = maxParallelRebalancing; + this.maxTriesRebalancing = maxTriesRebalancing; + this.rebalancingClientTimeoutSeconds = rebalancingClientTimeoutSeconds; + this.stealerBasedRebalancing = stealerBased; + this.deleteAfterRebalancingEnabled = deleteAfter; + } + + @Deprecated public RebalanceController(String bootstrapUrl, RebalanceClientConfig rebalanceConfig) { this.adminClient = new AdminClient(bootstrapUrl, rebalanceConfig, new ClientConfig().setRequestFormatType(RequestFormatType.PROTOCOL_BUFFERS)); + this.currentCluster = null; + this.currentStoreDefs = null; + this.rebalanceConfig = rebalanceConfig; + maxParallelRebalancing = rebalanceConfig.getMaxParallelRebalancing(); + maxTriesRebalancing = rebalanceConfig.getMaxTriesRebalancing(); + rebalancingClientTimeoutSeconds = rebalanceConfig.getRebalancingClientTimeoutSeconds(); + stealerBasedRebalancing = rebalanceConfig.isStealerBasedRebalancing(); + deleteAfterRebalancingEnabled = rebalanceConfig.isDeleteAfterRebalancingEnabled(); } - public RebalanceController(Cluster cluster, RebalanceClientConfig config) { - this.adminClient = new AdminClient(cluster, config, new ClientConfig()); - this.rebalanceConfig = config; + /** + * Probe the existing cluster to retrieve the current cluster xml and stores + * xml. + * + * @return Pair of Cluster and List from current cluster. + */ + private Pair> getCurrentClusterState() { + + // Retrieve the latest cluster metadata from the existing nodes + Versioned currentVersionedCluster = RebalanceUtils.getLatestCluster(NodeUtils.getNodeIds(Lists.newArrayList(adminClient.getAdminClientCluster() + .getNodes())), + adminClient); + Cluster cluster = currentVersionedCluster.getValue(); + List storeDefs = RebalanceUtils.getCurrentStoreDefinitions(currentCluster, + adminClient); + return new Pair>(cluster, storeDefs); } /** @@ -100,12 +163,237 @@ public void rebalance(Cluster currentCluster, final Cluster targetCluster) { adminClient.setAdminClientCluster(targetCluster); // Retrieve list of stores + check for that all are consistent - List storeDefs = RebalanceUtils.getStoreDefinition(targetCluster, - adminClient); + List storeDefs = RebalanceUtils.getCurrentStoreDefinitions(targetCluster, + adminClient); rebalance(currentCluster, targetCluster, storeDefs); } + public void rebalance(final RebalancePlan rebalancePlan) { + Cluster finalCluster = rebalancePlan.getFinalCluster(); + List finalStores = rebalancePlan.getFinalStores(); + + validatePlan(rebalancePlan); + validateCluster(finalCluster, finalStores); + + logger.info("Propagating cluster " + finalCluster + " to all nodes"); + // TODO: Add finalStores here so that cluster & stores can be updated + // atomically. + RebalanceUtils.propagateCluster(adminClient, finalCluster); + + executePlan(rebalancePlan); + } + + private void validatePlan(RebalancePlan rebalancePlan) { + logger.info("Validating plan state."); + + Cluster currentCluster = rebalancePlan.getCurrentCluster(); + List currentStores = rebalancePlan.getCurrentStores(); + Cluster finalCluster = rebalancePlan.getFinalCluster(); + List finalStores = rebalancePlan.getFinalStores(); + + RebalanceUtils.validateClusterStores(currentCluster, currentStores); + RebalanceUtils.validateClusterStores(finalCluster, finalStores); + RebalanceUtils.validateCurrentFinalCluster(currentCluster, finalCluster); + RebalanceUtils.validateRebalanceStore(currentStores); + RebalanceUtils.validateRebalanceStore(finalStores); + } + + private void validateCluster(Cluster finalCluster, List finalStores) { + logger.info("Validating state of deployed cluster."); + + // Reset the cluster that the admin client points at + adminClient.setAdminClientCluster(finalCluster); + // Validate that all the nodes ( new + old ) are in normal state + RebalanceUtils.validateProdClusterStateIsNormal(finalCluster, adminClient); + // Verify all old RO stores exist at version 2 + RebalanceUtils.validateReadOnlyStores(finalCluster, finalStores, adminClient); + } + + private void executePlan(RebalancePlan rebalancePlan) { + logger.info("Starting rebalancing!"); + + int batchCount = 0; + int partitionStoreCount = 0; + long totalTimeMs = 0; + + List plan = rebalancePlan.getPlan(); + int numBatches = plan.size(); + int numPartitionStores = rebalancePlan.getPartitionStoresMoved(); + + for(RebalanceClusterPlan batch: plan) { + logger.info("======== REBALANCING BATCH " + (batchCount + 1) + " ========"); + // TODO: Any way to deprecate/remove OrderedClusterTransition? + final OrderedClusterTransition orderedClusterTransition = new OrderedClusterTransition(batch); + + RebalanceUtils.printLog(orderedClusterTransition.getId(), + logger, + orderedClusterTransition.toString()); + + long startTimeMs = System.currentTimeMillis(); + executeBatch(orderedClusterTransition); + totalTimeMs += (System.currentTimeMillis() - startTimeMs); + + // Bump up the statistics + batchCount++; + partitionStoreCount += batch.getPartitionStoreMoves(); + batchStatusLog(orderedClusterTransition.getId(), + batchCount, + numBatches, + partitionStoreCount, + numPartitionStores, + totalTimeMs); + } + } + + /** + * Pretty print a progress update after each batch complete. + * + * @param id + * @param batchCount + * @param numBatches + * @param partitionStoreCount + * @param numPartitionStores + * @param totalTimeMs + */ + private void batchStatusLog(int id, + int batchCount, + int numBatches, + int partitionStoreCount, + int numPartitionStores, + long totalTimeMs) { + // Calculate the estimated end time and pretty print stats + double rate = partitionStoreCount / numPartitionStores; + long estimatedTimeMs = (long) (totalTimeMs / rate) - totalTimeMs; + + StringBuilder sb = new StringBuilder(); + sb.append("Batch Complete!") + .append(Utils.NEWLINE) + .append("\tbatches moved: ") + .append(batchCount) + .append(" out of ") + .append(numBatches) + .append(Utils.NEWLINE) + .append("\tPartition stores moved: ") + .append(partitionStoreCount) + .append(" out of ") + .append(numPartitionStores) + .append(Utils.NEWLINE) + .append("\tPercent done: ") + .append(decimalFormatter.format(rate * 100.0)) + .append(Utils.NEWLINE) + .append("\tEstimated time left: ") + .append(estimatedTimeMs) + .append(" ms (") + .append(TimeUnit.MILLISECONDS.toHours(estimatedTimeMs)) + .append(" hours)"); + RebalanceUtils.printLog(id, logger, sb.toString()); + } + + private void executeBatch(final OrderedClusterTransition orderedClusterTransition) { + try { + final List rebalancePartitionsInfoList = orderedClusterTransition.getOrderedRebalancePartitionsInfoList(); + + if(rebalancePartitionsInfoList.isEmpty()) { + RebalanceUtils.printLog(orderedClusterTransition.getId(), + logger, + "Skipping rebalance task id " + + orderedClusterTransition.getId() + + " since it is empty."); + // Even though there is no rebalancing work to do, cluster + // metadata must be updated so that the server is aware of the + // new cluster xml. + adminClient.rebalanceOps.rebalanceStateChange(orderedClusterTransition.getCurrentCluster(), + orderedClusterTransition.getTargetCluster(), + rebalancePartitionsInfoList, + false, + true, + false, + false, + true); + return; + } + + RebalanceUtils.printLog(orderedClusterTransition.getId(), + logger, + "Starting rebalance task id " + + orderedClusterTransition.getId()); + + // Flatten the node plans to partition plans + List rebalancePartitionPlanList = rebalancePartitionsInfoList; + + // Split the store definitions + List readOnlyStoreDefs = StoreDefinitionUtils.filterStores(orderedClusterTransition.getStoreDefs(), + true); + List readWriteStoreDefs = StoreDefinitionUtils.filterStores(orderedClusterTransition.getStoreDefs(), + false); + boolean hasReadOnlyStores = readOnlyStoreDefs != null && readOnlyStoreDefs.size() > 0; + boolean hasReadWriteStores = readWriteStoreDefs != null + && readWriteStoreDefs.size() > 0; + + // STEP 1 - Cluster state change + boolean finishedReadOnlyPhase = false; + List filteredRebalancePartitionPlanList = RebalanceUtils.filterPartitionPlanWithStores(rebalancePartitionPlanList, + readOnlyStoreDefs); + + rebalanceStateChange(orderedClusterTransition.getId(), + orderedClusterTransition.getCurrentCluster(), + orderedClusterTransition.getTargetCluster(), + filteredRebalancePartitionPlanList, + hasReadOnlyStores, + hasReadWriteStores, + finishedReadOnlyPhase); + + // STEP 2 - Move RO data + if(hasReadOnlyStores) { + rebalancePerTaskTransition(orderedClusterTransition.getId(), + orderedClusterTransition.getCurrentCluster(), + filteredRebalancePartitionPlanList, + hasReadOnlyStores, + hasReadWriteStores, + finishedReadOnlyPhase); + } + + // STEP 3 - Cluster change state + finishedReadOnlyPhase = true; + filteredRebalancePartitionPlanList = RebalanceUtils.filterPartitionPlanWithStores(rebalancePartitionPlanList, + readWriteStoreDefs); + + rebalanceStateChange(orderedClusterTransition.getId(), + orderedClusterTransition.getCurrentCluster(), + orderedClusterTransition.getTargetCluster(), + filteredRebalancePartitionPlanList, + hasReadOnlyStores, + hasReadWriteStores, + finishedReadOnlyPhase); + + // STEP 4 - Move RW data + if(hasReadWriteStores) { + rebalancePerTaskTransition(orderedClusterTransition.getId(), + orderedClusterTransition.getCurrentCluster(), + filteredRebalancePartitionPlanList, + hasReadOnlyStores, + hasReadWriteStores, + finishedReadOnlyPhase); + } + + RebalanceUtils.printLog(orderedClusterTransition.getId(), + logger, + "Successfully terminated rebalance task id " + + orderedClusterTransition.getId()); + + } catch(Exception e) { + RebalanceUtils.printErrorLog(orderedClusterTransition.getId(), + logger, + "Error in rebalance task id " + + orderedClusterTransition.getId() + " - " + + e.getMessage(), + e); + throw new VoldemortException("Rebalance failed on rebalance task id " + + orderedClusterTransition.getId(), e); + } + } + /** * Does basic verification of the metadata + server state. Finally starts * the rebalancing. @@ -114,6 +402,8 @@ public void rebalance(Cluster currentCluster, final Cluster targetCluster) { * @param targetCluster The desired cluster after rebalance * @param storeDefs Stores to rebalance */ + // TODO: replaced by rebalance(final RebalancePlan rebalancePlan) + @Deprecated public void rebalance(Cluster currentCluster, final Cluster targetCluster, List storeDefs) { @@ -169,6 +459,8 @@ public void rebalance(Cluster currentCluster, * @param targetCluster The desired cluster after rebalance * @param storeDefs Stores to rebalance */ + // TODO: Replaced by executePlan() + @Deprecated private void rebalancePerClusterTransition(Cluster currentCluster, final Cluster targetCluster, final List storeDefs) { @@ -351,6 +643,8 @@ private void rebalancePerClusterTransition(Cluster currentCluster, } + // TODO: Rename this method to make it stand out as being the key trigger + // for a batch of rebalancing. /** * Rebalance per partition transition - This does the actual rebalancing * work required for a single primary partition move. @@ -364,6 +658,8 @@ private void rebalancePerClusterTransition(Cluster currentCluster, * @param orderedClusterTransition The ordered cluster transition we are * going to run */ + // TODO: Replaced by executeBatch(); + @Deprecated private void rebalancePerPartitionTransition(final OrderedClusterTransition orderedClusterTransition) { try { final List rebalancePartitionsInfoList = orderedClusterTransition.getOrderedRebalancePartitionsInfoList(); @@ -800,6 +1096,14 @@ public AdminClient getAdminClient() { return adminClient; } + public Cluster getCurrentCluster() { + return currentCluster; + } + + public List getCurrentStoreDefs() { + return currentStoreDefs; + } + public void stop() { adminClient.close(); } diff --git a/src/java/voldemort/client/rebalance/RebalancePlan.java b/src/java/voldemort/client/rebalance/RebalancePlan.java index 538541369f..7e725c2428 100644 --- a/src/java/voldemort/client/rebalance/RebalancePlan.java +++ b/src/java/voldemort/client/rebalance/RebalancePlan.java @@ -257,6 +257,22 @@ private String storageOverhead(Map finalNodeToOverhead) { return (sb.toString()); } + public Cluster getCurrentCluster() { + return currentCluster; + } + + public List getCurrentStores() { + return currentStores; + } + + public Cluster getFinalCluster() { + return finalCluster; + } + + public List getFinalStores() { + return finalStores; + } + /** * * @return The plan! diff --git a/src/java/voldemort/tools/RebalanceControllerCLI.java b/src/java/voldemort/tools/RebalanceControllerCLI.java new file mode 100644 index 0000000000..68dfc0265e --- /dev/null +++ b/src/java/voldemort/tools/RebalanceControllerCLI.java @@ -0,0 +1,214 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package voldemort.tools; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import joptsimple.OptionException; +import joptsimple.OptionParser; +import joptsimple.OptionSet; + +import org.apache.log4j.Logger; + +import voldemort.client.rebalance.RebalanceClientConfig; +import voldemort.client.rebalance.RebalanceController; +import voldemort.client.rebalance.RebalancePlan; +import voldemort.cluster.Cluster; +import voldemort.store.StoreDefinition; +import voldemort.utils.CmdUtils; +import voldemort.utils.RebalanceUtils; +import voldemort.utils.Utils; +import voldemort.xml.ClusterMapper; +import voldemort.xml.StoreDefinitionsMapper; + +import com.google.common.base.Joiner; + +public class RebalanceControllerCLI { + + private final static Logger logger = Logger.getLogger(RebalanceControllerCLI.class); + + private static OptionParser parser; + + private static void setupParser() { + parser = new OptionParser(); + parser.accepts("help", "Print usage information"); + parser.accepts("url", "Url to bootstrap from ").withRequiredArg().describedAs("url"); + parser.accepts("donor-based", "Execute donor-based rebalancing."); + parser.accepts("stealer-based", "Execute stealer-based rebalancing (default)."); + + // TODO: WTF + parser.accepts("tries", + "(1) Tries during rebalance [ Default: " + + RebalanceClientConfig.MAX_TRIES_REBALANCING + + " ] (2) Number of tries while generating new metadata") + .withRequiredArg() + .ofType(Integer.class) + .describedAs("num-tries"); + // TODO: WTF + parser.accepts("delete", + "Delete after rebalancing (Valid only for RW Stores) [ Default : false ] "); + // TODO: WTF + parser.accepts("timeout", + "Time-out in seconds for rebalancing of a single task ( stealer - donor tuple ) [ Default : " + + RebalanceClientConfig.REBALANCING_CLIENT_TIMEOUT_SEC + " ]") + .withRequiredArg() + .ofType(Long.class) + .describedAs("sec"); + // TODO: WTF + parser.accepts("parallelism", + "Number of rebalances to run in parallel [ Default:" + + RebalanceClientConfig.MAX_PARALLEL_REBALANCING + " ]") + .withRequiredArg() + .ofType(Integer.class) + .describedAs("parallelism"); + + parser.accepts("final-cluster", "Path to target cluster xml") + .withRequiredArg() + .describedAs("cluster.xml"); + parser.accepts("final-stores", + "Path to target store definition xml. Needed for zone expansion.") + .withRequiredArg() + .describedAs("stores.xml"); + + // TODO: These options are common with RebalancePlanCLI. How to share? + // TODO: Switch default for batch size to infinite. + parser.accepts("batch", + "Number of primary partitions to move together [ Default : " + + RebalanceClientConfig.PRIMARY_PARTITION_BATCH_SIZE + " ]") + .withRequiredArg() + .ofType(Integer.class) + .describedAs("num-primary-partitions"); + parser.accepts("output-dir", "Output directory in which to dump per-batch metadata") + .withRequiredArg() + .ofType(String.class) + .describedAs("path"); + } + + private static void printUsage() { + StringBuilder help = new StringBuilder(); + help.append("RebalanceControllerCLI\n"); + help.append(" Executes a rebalance plan.\n"); + help.append("Options:\n"); + help.append(" Required:\n"); + help.append(" --url \n"); + help.append(" --final-cluster \n"); + help.append(" Optional:\n"); + help.append(" --final-stores [ Needed for zone expansion ]\n"); + help.append(" --output-dir [ Output directory is where we store the optimized cluster ]\n"); + help.append(" --batch [ Number of primary partitions to move in each rebalancing batch. ]\n"); + help.append(" --output-dir [ Directory in which cluster metadata is dumped for each batch of the plan. ]\n"); + help.append(" --stealer-based or --donor-based [ Defaults to stealer-based. ]\n"); + // TODO: Add in WTF members: parallelism, tries, timeout, delete, other? + + try { + parser.printHelpOn(System.out); + } catch(IOException e) { + e.printStackTrace(); + } + System.out.print(help.toString()); + } + + private static void printUsageAndDie(String errMessage) { + printUsage(); + Utils.croak("\n" + errMessage); + } + + private static OptionSet getValidOptions(String[] args) { + OptionSet options = null; + try { + options = parser.parse(args); + } catch(OptionException oe) { + printUsageAndDie("Exception when parsing arguments : " + oe.getMessage()); + } + + if(options.has("help")) { + printUsage(); + System.exit(0); + } + + Set missing = CmdUtils.missing(options, "url", "target-cluster"); + if(missing.size() > 0) { + printUsageAndDie("Missing required arguments: " + Joiner.on(", ").join(missing)); + } + + return options; + } + + public static void main(String[] args) throws Exception { + setupParser(); + OptionSet options = getValidOptions(args); + + // Bootstrap & fetch current cluster/stores + String bootstrapURL = (String) options.valueOf("url"); + + // TODO: Process optional controller args + RebalanceController rebalanceController = new RebalanceController(bootstrapURL, + 1, + 2, + TimeUnit.DAYS.toSeconds(30), + true, + false); + + Cluster currentCluster = rebalanceController.getCurrentCluster(); + List currentStoreDefs = rebalanceController.getCurrentStoreDefs(); + // If this test doesn't pass, something is wrong in prod! + RebalanceUtils.validateClusterStores(currentCluster, currentStoreDefs); + + // Deterimine final cluster/stores and validate them + String finalClusterXML = (String) options.valueOf("final-cluster"); + Cluster finalCluster = new ClusterMapper().readCluster(new File(finalClusterXML)); + + List finalStoreDefs = currentStoreDefs; + if(options.has("final-stores")) { + String storesXML = (String) options.valueOf("final-stores"); + finalStoreDefs = new StoreDefinitionsMapper().readStoreList(new File(storesXML)); + } + RebalanceUtils.validateClusterStores(finalCluster, finalStoreDefs); + RebalanceUtils.validateCurrentFinalCluster(currentCluster, finalCluster); + + // Process optional planning args + int batchSize = CmdUtils.valueOf(options, + "batch", + RebalanceClientConfig.PRIMARY_PARTITION_BATCH_SIZE); + + String outputDir = null; + if(options.has("output-dir")) { + outputDir = (String) options.valueOf("output-dir"); + } + + boolean stealerBased = true; + if(options.has("donor-based")) { + stealerBased = false; + } + + // Plan rebalancing + // TODO: Figure out when/how stealerBased flag should be used. + RebalancePlan rebalancePlan = new RebalancePlan(currentCluster, + currentStoreDefs, + finalCluster, + finalStoreDefs, + stealerBased, + batchSize, + outputDir); + // Execute rebalancing plan. + rebalanceController.rebalance(rebalancePlan); + } +} diff --git a/src/java/voldemort/tools/RebalancePlanCLI.java b/src/java/voldemort/tools/RebalancePlanCLI.java index 6d04d7cb8d..86cc161c8f 100644 --- a/src/java/voldemort/tools/RebalancePlanCLI.java +++ b/src/java/voldemort/tools/RebalancePlanCLI.java @@ -79,8 +79,8 @@ private static void setupParser() { private static void printUsage() { StringBuilder help = new StringBuilder(); - help.append("RepartitionCLI\n"); - help.append(" Moves partitions to achieve better balance. This can be done for rebalancing (improve balance among existing nodes)," + help.append("RebalancePlanCLI\n"); + help.append(" Moves partitions to achieve better balance. This can be done for shuffling (improve balance among existing nodes)," + " cluster expansion (adding nodes to some zones), and zone expansion (adding an entire new zone).\n"); help.append("Options:\n"); help.append(" Required:\n"); @@ -133,6 +133,7 @@ private static OptionSet getValidOptions(String[] args) { return options; } + // TODO: Rename target-cluster target-stores to final-* public static void main(String[] args) throws Exception { setupParser(); OptionSet options = getValidOptions(args); @@ -170,9 +171,6 @@ public static void main(String[] args) throws Exception { stealerBased = false; } - RebalanceClientConfig config = new RebalanceClientConfig(); - config.setPrimaryPartitionBatchSize(batchSize); - new RebalancePlan(currentCluster, currentStoreDefs, targetCluster, diff --git a/src/java/voldemort/utils/RebalanceUtils.java b/src/java/voldemort/utils/RebalanceUtils.java index e0ef97c363..4ea607f502 100644 --- a/src/java/voldemort/utils/RebalanceUtils.java +++ b/src/java/voldemort/utils/RebalanceUtils.java @@ -849,7 +849,8 @@ public static AdminClient createTempAdminClient(VoldemortConfig voldemortConfig, * definitions * @return List of store definitions */ - public static List getStoreDefinition(Cluster cluster, AdminClient adminClient) { + public static List getCurrentStoreDefinitions(Cluster cluster, + AdminClient adminClient) { List storeDefs = null; for(Node node: cluster.getNodes()) { List storeDefList = adminClient.metadataMgmtOps.getRemoteStoreDefList(node.getId()) diff --git a/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java index 2395178db8..79a46ae8fa 100644 --- a/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractNonZonedRebalanceTest.java @@ -1050,6 +1050,7 @@ public void run() { @Override public void run() { try { + // TODO: Switch test to RebalancePlan-based testing rebalanceClient.rebalance(updatedTargetCluster); } catch(Exception e) { logger.error("Error in rebalancing... ", e); diff --git a/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java index 073a7ea973..34c12a86fa 100644 --- a/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java @@ -181,6 +181,7 @@ protected void rebalanceAndCheck(Cluster currentCluster, List storeDefs, RebalanceController rebalanceClient, List nodeCheckList) { + // TODO: Switch test to RebalancePlan-based testing rebalanceClient.rebalance(targetCluster); checkEntriesPostRebalance(currentCluster, targetCluster, diff --git a/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java b/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java index 36e7a035f5..50f7f6fd65 100644 --- a/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java +++ b/test/unit/voldemort/client/rebalance/AbstractZonedRebalanceTest.java @@ -739,6 +739,7 @@ public void run() { @Override public void run() { try { + // TODO: Switch test to RebalancePlan-based testing rebalanceClient.rebalance(updatedTargetCluster); } catch(Exception e) { logger.error("Error in rebalancing... ", e);