From 5e72e526b9ec1987ff73d160f947e88515702e06 Mon Sep 17 00:00:00 2001 From: Jay J Wylie Date: Wed, 1 May 2013 09:26:56 -0700 Subject: [PATCH] RebalancePlanCLI working for 2-to-3 zone expansion Stealers steal from first donor with desired partition-store. This currently biases all work towards donors with lowest node IDs. This is truly imbalanced plan! RebalanceClusterPlan - Decorated with more TODOs... - Changed stealer to pick first donor with desired partition-store - Added commented-out code section that outlines next step for better planning RebalancePartitionsInfo - Decorated with more TODOs RebalancePlan - Much clean up refactoring. Construction now generates plan. - Methods added to get the plan (getPlan) and to print the plan (toString) RebalanceUtils - Helper method that derives targetCluster from initialCluster and finalCluster. --- .../rebalance/RebalanceClusterPlan.java | 76 +++++++++- .../rebalance/RebalancePartitionsInfo.java | 3 +- .../client/rebalance/RebalancePlan.java | 143 +++++++++++------- .../voldemort/tools/RebalancePlanCLI.java | 15 +- src/java/voldemort/utils/RebalanceUtils.java | 23 +++ 5 files changed, 190 insertions(+), 70 deletions(-) diff --git a/src/java/voldemort/client/rebalance/RebalanceClusterPlan.java b/src/java/voldemort/client/rebalance/RebalanceClusterPlan.java index e4106a7dd6..be6804e3f7 100644 --- a/src/java/voldemort/client/rebalance/RebalanceClusterPlan.java +++ b/src/java/voldemort/client/rebalance/RebalanceClusterPlan.java @@ -25,7 +25,7 @@ // TODO: (refactor) Rename RebalanceClusterPlan to RebalanceBatchPlan // TODO: (refactor) Rename targetCluster -> finalCluster -// TODO: (refactor) Rename currentCluster -> targetCluster +// TODO: (refactor) Rename currentCluster -> targetCluster? // TODO: (refactor) Fix cluster nomenclature in general: make sure there are // exactly three prefixes used to distinguish cluster xml: initial or current, // target or spec or expanded, and final. 'target' is overloaded to mean @@ -248,6 +248,9 @@ private void prioritizeBatchPlan() { */ } + // TODO: Revisit these "two principles". I am not sure about the second one. + // Either because I don't like delete being mixed with this code or because + // we probably want to copy from a to-be-deleted partitoin-store. /** * Generate the list of partition movement based on 2 principles: * @@ -304,9 +307,33 @@ private List getRebalancePartitionsInfo(final Cluster c continue; } + /*- + * Outline of code to change this method... + StoreRoutingPlan currentStoreRoutingPlan = new StoreRoutingPlan(currentCluster, + storeDef); + StoreRoutingPlan targetStoreRoutingPlan = new StoreRoutingPlan(targetCluster, storeDef); + final Set> trackStealPartitionsTuples = new HashSet>(); + final Set> haveToStealTuples = Sets.newHashSet(stealerNodeIdToStolenPartitionTuples.get(stealerNodeId)); + for(Pair replicaTypePartitionId: haveToStealTuples) { + + // TODO: Do not steal something you already have! + // TODO: Steal from zone local n-ary if possible + // TODO: Steal appropriate n-ary from zone that currently hosts + // true + // primary. + } + if(trackStealPartitionsTuples.size() > 0) { + addPartitionsToPlan(trackStealPartitionsTuples, + donorNodeToStoreToStealPartition, + donorNode.getId(), + storeDef.getName()); + + } + */ // Now we find out which donor can donate partitions to this stealer + final Set> haveToStealTuples = Sets.newHashSet(stealerNodeIdToStolenPartitionTuples.get(stealerNodeId)); for(Node donorNode: currentCluster.getNodes()) { // The same node can't donate @@ -321,6 +348,10 @@ private List getRebalancePartitionsInfo(final Cluster c final Set> trackStealPartitionsTuples = new HashSet>(); final Set> trackDeletePartitionsTuples = new HashSet>(); + // TODO: donatePartitionTuple ought to take all donor nodes + // as input and select "best" partition to donate. E.g., from + // within the zone! + // Checks if this donor node can donate any tuples donatePartitionTuple(donorNode, haveToStealTuples, @@ -350,8 +381,8 @@ private List getRebalancePartitionsInfo(final Cluster c donorNode.getId(), storeDef.getName()); } - } + } // Now combine the plans generated individually into the actual @@ -376,7 +407,7 @@ private List getRebalancePartitionsInfo(final Cluster c private void addPartitionsToPlan(Set> trackPartitionsTuples, HashMap>>> donorNodeToStoreToPartitionTuples, int donorNodeId, - String name) { + String storeName) { HashMap>> storeToStealPartitionTuples = null; if(donorNodeToStoreToPartitionTuples.containsKey(donorNodeId)) { storeToStealPartitionTuples = donorNodeToStoreToPartitionTuples.get(donorNodeId); @@ -384,10 +415,15 @@ private void addPartitionsToPlan(Set> trackPartitionsTupl storeToStealPartitionTuples = Maps.newHashMap(); donorNodeToStoreToPartitionTuples.put(donorNodeId, storeToStealPartitionTuples); } - storeToStealPartitionTuples.put(name, + storeToStealPartitionTuples.put(storeName, RebalanceUtils.flattenPartitionTuples(trackPartitionsTuples)); } + // TODO: (refactor) trackStealPartitoinsTuples is updated in this method. + // I.e., the 'void' return code is misleading. AND, only specific + // partitionId:replicaType's are actually stolen. AND, 'haveToStealTuples' + // is also modified. Clean up this method! + // TODO: (refactor): At least remove commented out historic code... /** * Given a donor node and a set of tuples that need to be stolen, checks if * the donor can contribute any @@ -398,6 +434,7 @@ private void addPartitionsToPlan(Set> trackPartitionsTupl * @param trackStealPartitionsTuples Set of partitions tuples already stolen * @param donorPartitionTuples All partition tuples on donor node */ + /*- private void donatePartitionTuple(final Node donorNode, Set> haveToStealTuples, Set> trackStealPartitionsTuples, @@ -416,6 +453,37 @@ private void donatePartitionTuple(final Node donorNode, } } } + */ + private void donatePartitionTuple(final Node donorNode, + Set> haveToStealTuples, + Set> trackStealPartitionsTuples, + Set> donorPartitionTuples) { + final Iterator> iter = haveToStealTuples.iterator(); + + // Iterate over the partition tuples to steal and check if this node can + // donate it + while(iter.hasNext()) { + Pair partitionTupleToSteal = iter.next(); + + // TODO: HACK to steal from ANY node that has the desired partition. + // Totally ignoring the replicaType. + for(Pair rt: donorPartitionTuples) { + if(rt.getSecond() == partitionTupleToSteal.getSecond()) { + // TODO: passing in 'rt' instead of 'partitoinTupleToSteal' + // is a one-line change that circumvents server-side checks + // during execution that the "correct" replicaType is being + // stolen from a donor. This change is fragile and should be + // hardened by removing such replicaType checks from the + // code path. + trackStealPartitionsTuples.add(rt); + // trackStealPartitionsTuples.add(partitionTupleToSteal); + + // This partition has been donated, remove it + iter.remove(); + } + } + } + } /** * We do not delete if this donor node is also the stealer node for another diff --git a/src/java/voldemort/client/rebalance/RebalancePartitionsInfo.java b/src/java/voldemort/client/rebalance/RebalancePartitionsInfo.java index 647f624648..3a345bfd53 100644 --- a/src/java/voldemort/client/rebalance/RebalancePartitionsInfo.java +++ b/src/java/voldemort/client/rebalance/RebalancePartitionsInfo.java @@ -53,7 +53,7 @@ public class RebalancePartitionsInfo { // Except for MetadataStoreTest which passes in a random value, the value // passed to the constructor for this value is always 0. private int attempt; - // TODO: (refactor) Unclear what value the Hashmap provides. It maps + // TODO: (refactor) Unclear what value of the inner Hashmap is. It maps // "replica type" to lists of partition IDs. A list of partition IDs (per // store) seems sufficient for all purposes. The replica type is a // distraction. @@ -263,6 +263,7 @@ public synchronized int getPartitionStoreMoves() { } } + // TODO: Confirm not counting deletes is the correct action. /*- * Do not count deletes. for(HashMap> storeDeletes: storeToReplicaToDeletePartitionList.values()) { diff --git a/src/java/voldemort/client/rebalance/RebalancePlan.java b/src/java/voldemort/client/rebalance/RebalancePlan.java index 2a1703a9ce..1fbf1d77d3 100644 --- a/src/java/voldemort/client/rebalance/RebalancePlan.java +++ b/src/java/voldemort/client/rebalance/RebalancePlan.java @@ -33,6 +33,7 @@ import voldemort.utils.MoveMap; import voldemort.utils.PartitionBalance; import voldemort.utils.RebalanceUtils; +import voldemort.utils.Utils; import voldemort.xml.ClusterMapper; import com.google.common.collect.Lists; @@ -47,17 +48,24 @@ public class RebalancePlan { private final Cluster currentCluster; private final List currentStores; - // TODO: (refactor) Better name than targetCluster? expandedCluster? - // specCluster? - private final Cluster targetCluster; private final Cluster finalCluster; private final List finalStores; private final boolean stealerBased; private final int batchSize; private final String outputDir; + // TODO: (refactor) Better name than targetCluster? expandedCluster? + // specCluster? + private final Cluster targetCluster; private List batchPlans; + // Aggregate stats + private int numPrimaryPartitionMoves; + private int numPartitionStoreMoves; + private int numXZonePartitionStoreMoves; + private final MoveMap nodeMoveMap; + private final MoveMap zoneMoveMap; + public RebalancePlan(final Cluster currentCluster, final List currentStores, final Cluster finalCluster, @@ -73,13 +81,32 @@ public RebalancePlan(final Cluster currentCluster, this.batchSize = batchSize; this.outputDir = outputDir; + // Derive the targetCluster from current & final cluster xml RebalanceUtils.validateFinalCluster(this.currentCluster, this.finalCluster); - this.targetCluster = RebalanceUtils.getClusterWithNewNodes(this.currentCluster, - this.finalCluster); + this.targetCluster = RebalanceUtils.getTargetCluster(this.currentCluster, this.finalCluster); + // Verify each cluster/storedefs pair RebalanceUtils.validateClusterStores(this.currentCluster, this.currentStores); RebalanceUtils.validateClusterStores(this.finalCluster, this.finalStores); RebalanceUtils.validateClusterStores(this.targetCluster, this.finalStores); + + // Log key arguments + logger.info("Current cluster : " + currentCluster); + logger.info("Target cluster : " + targetCluster); + logger.info("Final cluster : " + finalCluster); + logger.info("Batch size : " + batchSize); + + // Initialize the plan + batchPlans = new ArrayList(); + + // Initialize aggregate statistics + numPrimaryPartitionMoves = 0; + numPartitionStoreMoves = 0; + numXZonePartitionStoreMoves = 0; + nodeMoveMap = new MoveMap(targetCluster.getNodeIds()); + zoneMoveMap = new MoveMap(targetCluster.getZoneIds()); + + plan(); } public RebalancePlan(final Cluster currentCluster, @@ -98,49 +125,18 @@ public RebalancePlan(final Cluster currentCluster, } /** - * Create a plan - */ - public List plan() { - logger.info("Current cluster : " + currentCluster); - logger.info("Target cluster : " + targetCluster); - logger.info("Final cluster : " + finalCluster); - logger.info("Batch size : " + batchSize); - - batchPlans = new ArrayList(); - rebalancePerClusterTransition(); - return batchPlans; - } - - // TODO: (refactor) rename this method to something sane and/or flatten into - // plan() method. And/or break into helper mehtods. - /** - * Rebalance on a step-by-step transitions from cluster.xml to - * target-cluster.xml + * Create a plan. The plan consists of batches. Each batch involves the + * movement of nor more than batchSize primary partitions. The movement of a + * single primary partition may require migration of other n-ary replicas, + * and potentially deletions. Migrating a primary or n-ary partition + * requires migrating one partition-store for every store hosted at that + * partition. * - *
- * - * Each transition represents the migration of one primary partition ( - * {@link #rebalancePerPartitionTransition(int, OrderedClusterTransition)} ) - * along with all its side effect ( i.e. migration of replicas + deletions - * ). - * - * - * @param currentCluster The normalized cluster. This cluster contains new - * nodes with empty partitions as well - * @param finalCluster The desired cluster after rebalance - * @param storeDefs Stores to rebalance */ - private void rebalancePerClusterTransition() { + private void plan() { // Mapping of stealer node to list of primary partitions being moved final TreeMultimap stealerToStolenPrimaryPartitions = TreeMultimap.create(); - // Various counts for progress bar - int numPrimaryPartitionMoves = 0; - int numPartitionStoreMoves = 0; - int numXZonePartitionStoreMoves = 0; - MoveMap nodeMoveMap = new MoveMap(targetCluster.getNodeIds()); - MoveMap zoneMoveMap = new MoveMap(targetCluster.getZoneIds()); - // Used for creating clones ClusterMapper mapper = new ClusterMapper(); @@ -212,17 +208,7 @@ private void rebalancePerClusterTransition() { batchTargetCluster = mapper.readCluster(new StringReader(mapper.writeCluster(batchFinalCluster))); } - logger.info("Total number of primary partition moves : " + numPrimaryPartitionMoves); - logger.info("Total number of partition-store moves : " + numPartitionStoreMoves); - logger.info("Total number of cross-zone partition-store moves :" - + numXZonePartitionStoreMoves); - logger.info("Zone move map (partition-stores):\n" - + "(zone id) -> (zone id) = # of partition-stores moving from the former zone to the latter\n" - + zoneMoveMap); - logger.info("Node flow map (partition-stores):\n" - + "# partitions-stores stealing into -> (node id) -> # of partition-stores donating out of\n" - + nodeMoveMap.toFlowString()); - logger.info(storageOverhead(nodeMoveMap.groupByTo())); + logger.info(this); } /** @@ -236,7 +222,7 @@ private String storageOverhead(Map finalNodeToOverhead) { double maxOverhead = Double.MIN_VALUE; PartitionBalance pb = new PartitionBalance(currentCluster, currentStores); StringBuilder sb = new StringBuilder(); - sb.append("Per-node store-overhead:\n"); + sb.append("Per-node store-overhead:").append(Utils.NEWLINE); DecimalFormat doubleDf = new DecimalFormat("####.##"); for(int nodeId: finalCluster.getNodeIds()) { Node node = finalCluster.getNodeById(nodeId); @@ -258,9 +244,52 @@ private String storageOverhead(Map finalNodeToOverhead) { + String.format("%6d", toLoad) + " -> " + String.format("%6d", initialLoad + toLoad) + " (" + doubleDf.format(overhead) + " X)"; - sb.append(nodeTag + " : " + loadTag + "\n"); + sb.append(nodeTag + " : " + loadTag).append(Utils.NEWLINE); } - sb.append("\n\tMax per-node storage overhead: " + doubleDf.format(maxOverhead) + " X.\n"); + sb.append(Utils.NEWLINE) + .append("**** Max per-node storage overhead: " + doubleDf.format(maxOverhead) + " X.") + .append(Utils.NEWLINE); return (sb.toString()); } + + /** + * + * @return The plan! + */ + public List getPlan() { + return batchPlans; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + // Dump entire plan batch-by-batch, partition info-by-partition info... + for(RebalanceClusterPlan batchPlan: batchPlans) { + sb.append(batchPlan).append(Utils.NEWLINE); + } + // Dump aggregate stats of the plan + sb.append("Total number of primary partition moves : " + numPrimaryPartitionMoves) + .append(Utils.NEWLINE) + .append("Total number of partition-store moves : " + numPartitionStoreMoves) + .append(Utils.NEWLINE) + .append("Total number of cross-zone partition-store moves :" + + numXZonePartitionStoreMoves) + .append(Utils.NEWLINE) + .append("Zone move map (partition-stores):") + .append(Utils.NEWLINE) + .append("(zone id) -> (zone id) = # of partition-stores moving from the former zone to the latter") + .append(Utils.NEWLINE) + .append(zoneMoveMap) + .append(Utils.NEWLINE) + .append("Node flow map (partition-stores):") + .append(Utils.NEWLINE) + .append("# partitions-stores stealing into -> (node id) -> # of partition-stores donating out of") + .append(Utils.NEWLINE) + .append(nodeMoveMap.toFlowString()) + .append(Utils.NEWLINE) + .append(storageOverhead(nodeMoveMap.groupByTo())) + .append(Utils.NEWLINE); + + return sb.toString(); + } } diff --git a/src/java/voldemort/tools/RebalancePlanCLI.java b/src/java/voldemort/tools/RebalancePlanCLI.java index e437c03971..6d04d7cb8d 100644 --- a/src/java/voldemort/tools/RebalancePlanCLI.java +++ b/src/java/voldemort/tools/RebalancePlanCLI.java @@ -173,14 +173,13 @@ public static void main(String[] args) throws Exception { RebalanceClientConfig config = new RebalanceClientConfig(); config.setPrimaryPartitionBatchSize(batchSize); - RebalancePlan rebalancePlanner = new RebalancePlan(currentCluster, - currentStoreDefs, - targetCluster, - targetStoreDefs, - stealerBased, - batchSize, - outputDir); - rebalancePlanner.plan(); + new RebalancePlan(currentCluster, + currentStoreDefs, + targetCluster, + targetStoreDefs, + stealerBased, + batchSize, + outputDir); } } diff --git a/src/java/voldemort/utils/RebalanceUtils.java b/src/java/voldemort/utils/RebalanceUtils.java index fde5e5d041..95eba3ad81 100644 --- a/src/java/voldemort/utils/RebalanceUtils.java +++ b/src/java/voldemort/utils/RebalanceUtils.java @@ -361,6 +361,29 @@ public static void validateTargetCluster(final Cluster currentCluster, return; } + /** + * Given the current cluster and final cluster, generates a target cluster + * with empty new nodes (and zones). + * + * @param currentCluster Current cluster metadata + * @param finalCluster Final cluster metadata + * @return Returns a new target cluster which contains nodes and zones of + * final cluster, but with empty partition lists if they were not + * present in current cluster. + */ + public static Cluster getTargetCluster(Cluster currentCluster, Cluster finalCluster) { + List newNodeList = new ArrayList(currentCluster.getNodes()); + for(Node node: finalCluster.getNodes()) { + if(!ClusterUtils.containsNode(currentCluster, node.getId())) { + newNodeList.add(NodeUtils.updateNode(node, new ArrayList())); + } + } + Collections.sort(newNodeList); + return new Cluster(currentCluster.getName(), + newNodeList, + Lists.newArrayList(finalCluster.getZones())); + } + /** * Given the current cluster and a target cluster, generates a cluster with * new nodes ( which in turn contain empty partition lists )