diff --git a/src/java/voldemort/client/rebalance/RebalancePlan.java b/src/java/voldemort/client/rebalance/RebalancePlan.java index 1e34bd317c..580ed8795f 100644 --- a/src/java/voldemort/client/rebalance/RebalancePlan.java +++ b/src/java/voldemort/client/rebalance/RebalancePlan.java @@ -33,6 +33,7 @@ import voldemort.utils.MoveMap; import voldemort.utils.PartitionBalance; import voldemort.utils.RebalanceUtils; +import voldemort.utils.RepartitionUtils; import voldemort.utils.Utils; import voldemort.xml.ClusterMapper; @@ -97,6 +98,11 @@ public RebalancePlan(final Cluster currentCluster, logger.info("Final cluster : " + finalCluster); logger.info("Batch size : " + batchSize); + logger.info(RepartitionUtils.dumpInvalidMetadataRate(currentCluster, + currentStores, + finalCluster, + finalStores)); + // Initialize the plan batchPlans = new ArrayList(); diff --git a/src/java/voldemort/cluster/Cluster.java b/src/java/voldemort/cluster/Cluster.java index e9e3ec2023..4fcc84d4db 100644 --- a/src/java/voldemort/cluster/Cluster.java +++ b/src/java/voldemort/cluster/Cluster.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -51,6 +52,8 @@ public class Cluster implements Serializable { private final Map zonesById; private final Map> nodesPerZone; private final Map> partitionsPerZone; + private final Map partitionIdToZone; + private final Map partitionIdToNode; public Cluster(String name, List nodes) { this(name, nodes, new ArrayList()); @@ -60,6 +63,8 @@ public Cluster(String name, List nodes, List zones) { this.name = Utils.notNull(name); this.partitionsPerZone = new LinkedHashMap>(); this.nodesPerZone = new LinkedHashMap>(); + this.partitionIdToZone = new HashMap(); + this.partitionIdToNode = new HashMap(); if(zones.size() != 0) { zonesById = new LinkedHashMap(zones.size()); @@ -93,6 +98,10 @@ public Cluster(String name, List nodes, List zones) { } nodesPerZone.get(nodesZone).add(node.getId()); partitionsPerZone.get(nodesZone).addAll(node.getPartitionIds()); + for(Integer partitionId: node.getPartitionIds()) { + this.partitionIdToZone.put(partitionId, nodesZone); + this.partitionIdToNode.put(partitionId, node); + } } this.numberOfTags = getNumberOfTags(nodes); } @@ -192,6 +201,14 @@ public Set getPartitionIdsInZone(Integer zoneId) { return new TreeSet(partitionsPerZone.get(getZoneById(zoneId))); } + public Zone getZoneForPartitionId(int partitionId) { + return partitionIdToZone.get(partitionId); + } + + public Node getNodeForPartitionId(int partitionId) { + return partitionIdToNode.get(partitionId); + } + public Node getNodeById(int id) { Node node = nodesById.get(id); if(node == null) diff --git a/src/java/voldemort/routing/StoreRoutingPlan.java b/src/java/voldemort/routing/StoreRoutingPlan.java index 332bae5c80..efb9b7aa93 100644 --- a/src/java/voldemort/routing/StoreRoutingPlan.java +++ b/src/java/voldemort/routing/StoreRoutingPlan.java @@ -34,8 +34,6 @@ import com.google.common.collect.Lists; -// TODO: Add StoreInstanceTest unit test for these helper methods. - /** * This class wraps up a Cluster object and a StoreDefinition. The methods are * effectively helper or util style methods for querying the routing plan that @@ -48,6 +46,7 @@ public class StoreRoutingPlan { private final StoreDefinition storeDefinition; private final Map partitionIdToNodeIdMap; private final Map> nodeIdToNaryPartitionMap; + private final Map> nodeIdToZonePrimaryMap; private final RoutingStrategy routingStrategy; public StoreRoutingPlan(Cluster cluster, StoreDefinition storeDefinition) { @@ -57,14 +56,20 @@ public StoreRoutingPlan(Cluster cluster, StoreDefinition storeDefinition) { this.routingStrategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition, cluster); this.nodeIdToNaryPartitionMap = new HashMap>(); + this.nodeIdToZonePrimaryMap = new HashMap>(); for(int nodeId: cluster.getNodeIds()) { this.nodeIdToNaryPartitionMap.put(nodeId, new ArrayList()); + this.nodeIdToZonePrimaryMap.put(nodeId, new ArrayList()); } for(int masterPartitionId = 0; masterPartitionId < cluster.getNumberOfPartitions(); ++masterPartitionId) { List naryPartitionIds = getReplicatingPartitionList(masterPartitionId); for(int naryPartitionId: naryPartitionIds) { int naryNodeId = getNodeIdForPartitionId(naryPartitionId); nodeIdToNaryPartitionMap.get(naryNodeId).add(masterPartitionId); + int naryZoneId = cluster.getNodeById(naryNodeId).getZoneId(); + if(getZoneReplicaType(naryZoneId, naryNodeId, naryPartitionId) == 0) { + nodeIdToZonePrimaryMap.get(naryNodeId).add(masterPartitionId); + } } } } @@ -87,8 +92,6 @@ public List getReplicatingPartitionList(int masterPartitionId) { return this.routingStrategy.getReplicatingPartitionList(masterPartitionId); } - // TODO: Add test for this method (if this method is still required after - // the RebalanceController is updated to use RebalancePlan). /** * * @param nodeId @@ -98,6 +101,15 @@ public List getNaryPartitionIds(int nodeId) { return nodeIdToNaryPartitionMap.get(nodeId); } + /** + * + * @param nodeId + * @return all nary partition IDs hosted on the node. + */ + public List getZonePrimaryPartitionIds(int nodeId) { + return nodeIdToZonePrimaryMap.get(nodeId); + } + /** * Determines list of partition IDs that replicate the key. * @@ -387,7 +399,6 @@ public int getZoneReplicaNodeId(int zoneId, int zoneReplicaType, int partitionId // TODO: (refactor) Move from static methods to non-static methods that use // this object's cluster and storeDefinition member for the various // check*BelongsTo* methods. - /** * Check that the key belongs to one of the partitions in the map of replica * type to partitions diff --git a/src/java/voldemort/tools/RepartitionCLI.java b/src/java/voldemort/tools/RepartitionCLI.java index 85340240d0..ef8202b981 100644 --- a/src/java/voldemort/tools/RepartitionCLI.java +++ b/src/java/voldemort/tools/RepartitionCLI.java @@ -30,6 +30,7 @@ import voldemort.cluster.Cluster; import voldemort.store.StoreDefinition; import voldemort.utils.CmdUtils; +import voldemort.utils.RebalanceUtils; import voldemort.utils.RepartitionUtils; import voldemort.utils.Utils; import voldemort.xml.ClusterMapper; @@ -41,13 +42,7 @@ public class RepartitionCLI { private final static Logger logger = Logger.getLogger(RepartitionCLI.class); - private final static int DEFAULT_REPARTITION_ATTEMPTS = 5; - private final static int DEFAULT_RANDOM_SWAP_ATTEMPTS = 100; - private final static int DEFAULT_RANDOM_SWAP_SUCCESSES = 100; - private final static int DEFAULT_GREEDY_SWAP_ATTEMPTS = 5; - private final static int DEFAULT_GREEDY_MAX_PARTITIONS_PER_NODE = 5; - private final static int DEFAULT_GREEDY_MAX_PARTITIONS_PER_ZONE = 25; - private final static int DEFAULT_MAX_CONTIGUOUS_PARTITIONS = 0; + // TODO: (review) explain these default values. private static OptionParser parser; @@ -70,7 +65,7 @@ private static void setupParser() { .describedAs("stores.xml"); parser.accepts("attempts", "Number of attempts at repartitioning. [ Default: " - + DEFAULT_REPARTITION_ATTEMPTS + " ]") + + RepartitionUtils.DEFAULT_REPARTITION_ATTEMPTS + " ]") .withRequiredArg() .ofType(Integer.class) .describedAs("num-attempts"); @@ -87,13 +82,13 @@ private static void setupParser() { "Enable attempts to improve balance by random partition swaps within a zone. [Default: disabled]"); parser.accepts("random-swap-attempts", "Number of random swaps to attempt. [Default:" - + DEFAULT_RANDOM_SWAP_ATTEMPTS + " ]") + + RepartitionUtils.DEFAULT_RANDOM_SWAP_ATTEMPTS + " ]") .withRequiredArg() .ofType(Integer.class) .describedAs("num-attempts"); parser.accepts("random-swap-successes", "Number of successful random swaps to permit exit before completing all swap attempts. [Default:" - + DEFAULT_RANDOM_SWAP_SUCCESSES + " ]") + + RepartitionUtils.DEFAULT_RANDOM_SWAP_SUCCESSES + " ]") .withRequiredArg() .ofType(Integer.class) .describedAs("num-successes"); @@ -101,25 +96,26 @@ private static void setupParser() { "Enable attempts to improve balance by greedily swapping (random) partitions within a zone. [Default: disabled]"); parser.accepts("greedy-swap-attempts", "Number of greedy (random) swaps to attempt. [Default:" - + DEFAULT_GREEDY_SWAP_ATTEMPTS + " ]") + + RepartitionUtils.DEFAULT_GREEDY_SWAP_ATTEMPTS + " ]") .withRequiredArg() .ofType(Integer.class) .describedAs("num-attempts"); parser.accepts("greedy-max-partitions-per-node", "Max number of partitions per-node to evaluate swapping with other partitions within the zone. [Default:" - + DEFAULT_GREEDY_MAX_PARTITIONS_PER_NODE + " ]") + + RepartitionUtils.DEFAULT_GREEDY_MAX_PARTITIONS_PER_NODE + " ]") .withRequiredArg() .ofType(Integer.class) .describedAs("max-partitions-per-node"); parser.accepts("greedy-max-partitions-per-zone", "Max number of (random) partitions per-zone to evaluate swapping with partitions from node being evaluated. [Default:" - + DEFAULT_GREEDY_MAX_PARTITIONS_PER_ZONE + " ]") + + RepartitionUtils.DEFAULT_GREEDY_MAX_PARTITIONS_PER_ZONE + " ]") .withRequiredArg() .ofType(Integer.class) .describedAs("max-partitions-per-zone"); parser.accepts("max-contiguous-partitions", "Limit the number of contiguous partition IDs allowed within a zone. [Default:" - + DEFAULT_MAX_CONTIGUOUS_PARTITIONS + " (indicating no limit)]") + + RepartitionUtils.DEFAULT_MAX_CONTIGUOUS_PARTITIONS + + " (indicating no limit)]") .withRequiredArg() .ofType(Integer.class) .describedAs("num-contiguous"); @@ -205,14 +201,18 @@ public static void main(String[] args) throws Exception { Cluster currentCluster = new ClusterMapper().readCluster(new File(currentClusterXML)); List currentStoreDefs = new StoreDefinitionsMapper().readStoreList(new File(currentStoresXML)); + RebalanceUtils.validateClusterStores(currentCluster, currentStoreDefs); + Cluster targetCluster = new ClusterMapper().readCluster(new File(targetClusterXML)); List targetStoreDefs = new StoreDefinitionsMapper().readStoreList(new File(targetStoresXML)); + RebalanceUtils.validateClusterStores(targetCluster, targetStoreDefs); - // TODO: more verification of arguments: - // RebalanceUtils.validateTargetCluster(currentCluster,targetCluster); + RebalanceUtils.validateCurrentTargetCluster(currentCluster, targetCluster); // Optional administrivia args - int attempts = CmdUtils.valueOf(options, "attempts", DEFAULT_REPARTITION_ATTEMPTS); + int attempts = CmdUtils.valueOf(options, + "attempts", + RepartitionUtils.DEFAULT_REPARTITION_ATTEMPTS); String outputDir = null; if(options.has("output-dir")) { outputDir = (String) options.valueOf("output-dir"); @@ -224,23 +224,23 @@ public static void main(String[] args) throws Exception { boolean enableRandomSwaps = options.has("enable-random-swaps"); int randomSwapAttempts = CmdUtils.valueOf(options, "random-swap-attempts", - DEFAULT_RANDOM_SWAP_ATTEMPTS); + RepartitionUtils.DEFAULT_RANDOM_SWAP_ATTEMPTS); int randomSwapSuccesses = CmdUtils.valueOf(options, "random-swap-successes", - DEFAULT_RANDOM_SWAP_SUCCESSES); + RepartitionUtils.DEFAULT_RANDOM_SWAP_SUCCESSES); boolean enableGreedySwaps = options.has("enable-greedy-swaps"); int greedySwapAttempts = CmdUtils.valueOf(options, "greedy-swap-attempts", - DEFAULT_GREEDY_SWAP_ATTEMPTS); + RepartitionUtils.DEFAULT_GREEDY_SWAP_ATTEMPTS); int greedyMaxPartitionsPerNode = CmdUtils.valueOf(options, "greedy-max-partitions-per-node", - DEFAULT_GREEDY_MAX_PARTITIONS_PER_NODE); + RepartitionUtils.DEFAULT_GREEDY_MAX_PARTITIONS_PER_NODE); int greedyMaxPartitionsPerZone = CmdUtils.valueOf(options, "greedy-max-partitions-per-zone", - DEFAULT_GREEDY_MAX_PARTITIONS_PER_ZONE); + RepartitionUtils.DEFAULT_GREEDY_MAX_PARTITIONS_PER_ZONE); int maxContiguousPartitionsPerZone = CmdUtils.valueOf(options, "max-contiguous-partitions", - DEFAULT_MAX_CONTIGUOUS_PARTITIONS); + RepartitionUtils.DEFAULT_MAX_CONTIGUOUS_PARTITIONS); // Sanity check optional repartitioning args if(disableNodeBalancing && !enableRandomSwaps && !enableGreedySwaps @@ -255,10 +255,9 @@ public static void main(String[] args) throws Exception { && !enableGreedySwaps) { printUsageAndDie("Provided arguments for generate greedy swaps but did not enable the feature"); } - // TODO: If necessary, add options to choose each zone in cluster, all - // nodes across cluster, or a list of zone IDs. For now though, just - // offer the behavior of greedily swapping among all nodes in cluster. - List greedyZoneIds = null; + // In the future, can add options to choose each zone in cluster, all + // nodes across cluster, or a list of zone IDs. + List greedyZoneIds = RepartitionUtils.DEFAULT_GREEDY_ZONE_IDS; RepartitionUtils.repartition(currentCluster, currentStoreDefs, diff --git a/src/java/voldemort/utils/ClusterUtils.java b/src/java/voldemort/utils/ClusterUtils.java index e0bad5b3bc..dc38f242c7 100644 --- a/src/java/voldemort/utils/ClusterUtils.java +++ b/src/java/voldemort/utils/ClusterUtils.java @@ -90,7 +90,7 @@ public static boolean containsPreferenceList(Cluster cluster, int nodeId) { for(int partition: preferenceList) { - if(getNodeByPartitionId(cluster, partition).getId() == nodeId) { + if(cluster.getNodeForPartitionId(partition).getId() == nodeId) { return true; } } @@ -124,22 +124,6 @@ public static Map getCurrentPartitionMapping(Cluster currentCl return partitionToNode; } - /** - * Returns the Node associated to the provided partition. - * - * @param cluster The cluster in which to find the node - * @param partitionId Partition id for which we want the corresponding node - * @return Node that owns the partition - */ - public static Node getNodeByPartitionId(Cluster cluster, int partitionId) { - for(Node node: cluster.getNodes()) { - if(node.getPartitionIds().contains(partitionId)) { - return node; - } - } - return null; - } - /** * Compress contiguous partitions into format "e-i" instead of * "e, f, g, h, i". This helps illustrate contiguous partitions within a @@ -150,56 +134,42 @@ public static Node getNodeByPartitionId(Cluster cluster, int partitionId) { * @return */ public static String compressedListOfPartitionsInZone(final Cluster cluster, int zoneId) { - Set partitionIds = cluster.getPartitionIdsInZone(zoneId); - if(partitionIds.size() == 0) { - return "[]"; - } - int curLastPartitionId = -1; - int curInitPartitionId = -1; - - String compressedList = "["; - for(int partitionId: partitionIds) { - // Handle initial condition - if(curInitPartitionId == -1) { - curInitPartitionId = partitionId; - curLastPartitionId = partitionId; - continue; - } - // Contiguous partition Id - if(partitionId == curLastPartitionId + 1) { - curLastPartitionId = partitionId; - continue; + Map idToRunLength = ClusterUtils.getMapOfContiguousPartitions(cluster, + zoneId); + + StringBuilder sb = new StringBuilder(); + sb.append("["); + boolean first = true; + Set sortedInitPartitionIds = new TreeSet(idToRunLength.keySet()); + for(int initPartitionId: sortedInitPartitionIds) { + if(!first) { + sb.append(", "); + } else { + first = false; } - // End of (possibly) contiguous partition Ids - if(curInitPartitionId == curLastPartitionId) { - compressedList += curLastPartitionId + ", "; + int runLength = idToRunLength.get(initPartitionId); + if(runLength == 1) { + sb.append(initPartitionId); } else { - compressedList += curInitPartitionId + "-" + curLastPartitionId + ", "; + int endPartitionId = (initPartitionId + runLength - 1) + % cluster.getNumberOfPartitions(); + sb.append(initPartitionId).append("-").append(endPartitionId); } - curInitPartitionId = partitionId; - curLastPartitionId = partitionId; - } - // Handle end condition - if(curInitPartitionId == curLastPartitionId) { - compressedList += curLastPartitionId + "]"; - } else { - compressedList += curInitPartitionId + "-" + curLastPartitionId + "]"; } + sb.append("]"); - return compressedList; + return sb.toString(); } /** - * Determines run length for each 'initial' partition ID - * - * Does not correctly address "wrap around" of partition IDs (i.e., the fact - * that partition ID 0 is "next" to partition ID 'max') + * Determines run length for each 'initial' partition ID. Note that a + * contiguous run may "wrap around" the end of the ring. * * @param cluster * @param zoneId - * @return map of length of contiguous run of partitions to count of number - * of such runs. + * @return map of initial partition Id to length of contiguous run of + * partition IDs within the same zone.. */ public static Map getMapOfContiguousPartitions(final Cluster cluster, int zoneId) { @@ -311,41 +281,22 @@ public static String getPrettyMapOfContiguousPartitionRunLengths(final Cluster c */ public static String getHotPartitionsDueToContiguity(final Cluster cluster, int hotContiguityCutoff) { - StringBuilder sb = new StringBuilder(); - for(Integer zoneId: cluster.getZoneIds()) { - List partitionIds = new ArrayList(cluster.getPartitionIdsInZone(zoneId)); - - // Skip zones without any partition IDs. - if(partitionIds.size() == 0) { - continue; - } - - int lastPartitionId = partitionIds.get(0); - int initPartitionId = lastPartitionId; - - for(int offset = 1; offset < partitionIds.size(); offset++) { - int partitionId = partitionIds.get(offset); - if(partitionId == lastPartitionId + 1) { - lastPartitionId = partitionId; + for(int zoneId: cluster.getZoneIds()) { + Map idToRunLength = ClusterUtils.getMapOfContiguousPartitions(cluster, + zoneId); + for(Integer initialPartitionId: idToRunLength.keySet()) { + int runLength = idToRunLength.get(initialPartitionId); + if(runLength < hotContiguityCutoff) continue; - } - int runLength = lastPartitionId - initPartitionId + 1; - if(runLength > hotContiguityCutoff) { - int hotPartitionId = lastPartitionId + 1; - for(Node node: cluster.getNodes()) { - if(node.getPartitionIds().contains(hotPartitionId)) { - sb.append("\tNode " + node.getId() + " (" + node.getHost() - + ") has hot primary partition " + hotPartitionId - + " that follows contiguous run of length " + runLength - + "\n"); - } - } - } - initPartitionId = partitionId; - lastPartitionId = initPartitionId; + int hotPartitionId = (initialPartitionId + runLength) + % cluster.getNumberOfPartitions(); + Node hotNode = cluster.getNodeForPartitionId(hotPartitionId); + sb.append("\tNode " + hotNode.getId() + " (" + hotNode.getHost() + + ") has hot primary partition " + hotPartitionId + + " that follows contiguous run of length " + runLength + Utils.NEWLINE); } } diff --git a/src/java/voldemort/utils/PartitionBalance.java b/src/java/voldemort/utils/PartitionBalance.java index d367172198..0ce9d206b9 100644 --- a/src/java/voldemort/utils/PartitionBalance.java +++ b/src/java/voldemort/utils/PartitionBalance.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import voldemort.cluster.Cluster; import voldemort.cluster.Node; @@ -31,6 +32,17 @@ public class PartitionBalance { + /** + * Multiplier in utility method to weight the balance of "IOPS" (get QPS & + * pseudo-master put QPS) relative to "CAPACITY". + */ + private final static int UTILITY_MULTIPLIER_IOPS = 2; + /** + * Multiplier in utility method to weight the balance of "CAPACITY" (put QPS + * and therefore amount of data stored) relative to "IOPS". + */ + private final static int UTILITY_MULTIPLIER_CAPACITY = 1; + private final Cluster cluster; private final double primaryMaxMin; @@ -50,7 +62,6 @@ public PartitionBalance(Cluster cluster, List storeDefs) { HashMap uniqueStores = KeyDistributionGenerator.getUniqueStoreDefinitionsWithCounts(storeDefs); Set nodeIds = cluster.getNodeIds(); - Set zoneIds = cluster.getZoneIds(); builder.append("PARTITION DUMP\n"); this.primaryAggNodeIdToPartitionCount = Maps.newHashMap(); @@ -71,107 +82,51 @@ public PartitionBalance(Cluster cluster, List storeDefs) { for(StoreDefinition storeDefinition: uniqueStores.keySet()) { StoreRoutingPlan storeRoutingPlan = new StoreRoutingPlan(cluster, storeDefinition); - builder.append("\n"); - builder.append("Store exemplar: " + storeDefinition.getName() + "\n"); - builder.append("\tReplication factor: " + storeDefinition.getReplicationFactor() + "\n"); - builder.append("\tRouting strategy: " + storeDefinition.getRoutingStrategyType() + "\n"); - builder.append("\tThere are " + uniqueStores.get(storeDefinition) - + " other similar stores.\n"); - - // Map of node Id to Sets of pairs. Pairs of Integers are of - // - Map>> nodeIdToAllPartitions = RebalanceUtils.getNodeIdToAllPartitions(cluster, - storeDefinition, - true); - Map primaryNodeIdToPartitionCount = Maps.newHashMap(); - Map nodeIdToZonePrimaryCount = Maps.newHashMap(); - Map allNodeIdToPartitionCount = Maps.newHashMap(); - - // Print out all partitions, by replica type, per node - builder.append("\n"); - builder.append("\tDetailed Dump:\n"); - for(Integer nodeId: nodeIds) { - builder.append("\tNode ID: " + nodeId + "in zone " - + cluster.getNodeById(nodeId).getZoneId() + "\n"); - primaryNodeIdToPartitionCount.put(nodeId, 0); - nodeIdToZonePrimaryCount.put(nodeId, 0); - allNodeIdToPartitionCount.put(nodeId, 0); - Set> partitionPairs = nodeIdToAllPartitions.get(nodeId); - - int replicaType = 0; - while(partitionPairs.size() > 0) { - List> replicaPairs = new ArrayList>(); - for(Pair pair: partitionPairs) { - if(pair.getFirst() == replicaType) { - replicaPairs.add(pair); - } - } - List partitions = new ArrayList(); - for(Pair pair: replicaPairs) { - partitionPairs.remove(pair); - partitions.add(pair.getSecond()); - } - java.util.Collections.sort(partitions); - - builder.append("\t\t" + replicaType); - for(int zoneId: zoneIds) { - builder.append(" : z" + zoneId + " : "); - List zonePartitions = new ArrayList(); - for(int partitionId: partitions) { - if(cluster.getPartitionIdsInZone(zoneId).contains(partitionId)) { - zonePartitions.add(partitionId); - } - } - builder.append(zonePartitions.toString()); - - } - builder.append("\n"); - if(replicaType == 0) { - primaryNodeIdToPartitionCount.put(nodeId, - primaryNodeIdToPartitionCount.get(nodeId) - + partitions.size()); - } - - allNodeIdToPartitionCount.put(nodeId, allNodeIdToPartitionCount.get(nodeId) - + partitions.size()); - replicaType++; - } - } - - // Go through all partition IDs and determine which node is - // "first" in the replicating node list for every zone. This - // determines the number of "zone primaries" each node hosts. - for(int partitionId = 0; partitionId < cluster.getNumberOfPartitions(); partitionId++) { - for(int zoneId: zoneIds) { - for(int nodeId: storeRoutingPlan.getReplicationNodeList(partitionId)) { - if(cluster.getNodeById(nodeId).getZoneId() == zoneId) { - nodeIdToZonePrimaryCount.put(nodeId, - nodeIdToZonePrimaryCount.get(nodeId) + 1); - break; - } - } - } - } - - builder.append("\n"); - builder.append("\tSummary Dump:\n"); + // High level information about the store def exemplar + builder.append(Utils.NEWLINE) + .append("Store exemplar: " + storeDefinition.getName()) + .append(Utils.NEWLINE) + .append("\tReplication factor: " + storeDefinition.getReplicationFactor()) + .append(Utils.NEWLINE) + .append("\tRouting strategy: " + storeDefinition.getRoutingStrategyType()) + .append(Utils.NEWLINE) + .append("\tThere are " + uniqueStores.get(storeDefinition) + + " other similar stores.") + .append(Utils.NEWLINE) + .append(Utils.NEWLINE); + + // Detailed dump of partitions on nodes + builder.append(dumpZoneNAryDetails(storeRoutingPlan)); + builder.append(Utils.NEWLINE); + + builder.append(dumpReplicaTypeDetails(cluster, storeDefinition)); + builder.append(Utils.NEWLINE); + + // Per-node counts of various partition types (primary, + // zone-primary, and n-ary) + Map nodeIdToPrimaryCount = getNodeIdToPrimaryCount(cluster); + Map nodeIdToZonePrimaryCount = getNodeIdToZonePrimaryCount(cluster, + storeRoutingPlan); + Map nodeIdToNaryCount = getNodeIdToNaryCount(cluster, + storeRoutingPlan); + + builder.append("\tSummary of NAry counts:").append(Utils.NEWLINE); for(Integer nodeId: nodeIds) { - builder.append("\tNode ID: " + nodeId + " : " - + allNodeIdToPartitionCount.get(nodeId) + "\n"); + builder.append("\tNode ID: " + nodeId + " : " + nodeIdToNaryCount.get(nodeId) + + "\n"); primaryAggNodeIdToPartitionCount.put(nodeId, primaryAggNodeIdToPartitionCount.get(nodeId) - + (primaryNodeIdToPartitionCount.get(nodeId) * uniqueStores.get(storeDefinition))); + + (nodeIdToPrimaryCount.get(nodeId) * uniqueStores.get(storeDefinition))); aggNodeIdToZonePrimaryCount.put(nodeId, aggNodeIdToZonePrimaryCount.get(nodeId) + nodeIdToZonePrimaryCount.get(nodeId) * uniqueStores.get(storeDefinition)); allAggNodeIdToPartitionCount.put(nodeId, allAggNodeIdToPartitionCount.get(nodeId) - + (allNodeIdToPartitionCount.get(nodeId) * uniqueStores.get(storeDefinition))); + + (nodeIdToNaryCount.get(nodeId) * uniqueStores.get(storeDefinition))); } } - builder.append("\n"); - builder.append("\n"); + builder.append(Utils.NEWLINE).append(Utils.NEWLINE); Pair summary = summarizeBalance(primaryAggNodeIdToPartitionCount, "AGGREGATE PRIMARY-PARTITION COUNT (across all stores)"); @@ -191,6 +146,149 @@ public PartitionBalance(Cluster cluster, List storeDefs) { this.verbose = builder.toString(); } + /** + * Go through all nodes and determine how many partition Ids each node + * hosts. + * + * @param cluster + * @return map of nodeId to number of primary partitions hosted on node. + */ + private Map getNodeIdToPrimaryCount(Cluster cluster) { + Map nodeIdToPrimaryCount = Maps.newHashMap(); + for(Node node: cluster.getNodes()) { + nodeIdToPrimaryCount.put(node.getId(), node.getPartitionIds().size()); + } + + return nodeIdToPrimaryCount; + } + + /** + * Go through all partition IDs and determine which node is "first" in the + * replicating node list for every zone. This determines the number of + * "zone primaries" each node hosts. + * + * @return map of nodeId to number of zone-primaries hosted on node. + */ + private Map getNodeIdToZonePrimaryCount(Cluster cluster, + StoreRoutingPlan storeRoutingPlan) { + Map nodeIdToZonePrimaryCount = Maps.newHashMap(); + for(Integer nodeId: cluster.getNodeIds()) { + nodeIdToZonePrimaryCount.put(nodeId, + storeRoutingPlan.getZonePrimaryPartitionIds(nodeId).size()); + } + + return nodeIdToZonePrimaryCount; + } + + /** + * Go through all node IDs and determine which node + * + * @param cluster + * @param storeRoutingPlan + * @return + */ + private Map getNodeIdToNaryCount(Cluster cluster, + StoreRoutingPlan storeRoutingPlan) { + Map nodeIdToNaryCount = Maps.newHashMap(); + + for(int nodeId: cluster.getNodeIds()) { + nodeIdToNaryCount.put(nodeId, storeRoutingPlan.getNaryPartitionIds(nodeId).size()); + } + + return nodeIdToNaryCount; + } + + // TODO: (refactor) When/if "replica type" is exorcised from the code base, + // this detailed dump method should be removed. + /** + * Dumps the partition IDs per node in terms of "replica type". + * + * @param cluster + * @param storeDefinition + * @return pretty printed string of detailed replica tyep dump. + */ + private String dumpReplicaTypeDetails(Cluster cluster, StoreDefinition storeDefinition) { + StringBuilder sb = new StringBuilder(); + Map>> nodeIdToAllPartitions = RebalanceUtils.getNodeIdToAllPartitions(cluster, + storeDefinition, + true); + sb.append("\tDetailed Dump (Replica Types):").append(Utils.NEWLINE); + for(Integer nodeId: cluster.getNodeIds()) { + sb.append("\tNode ID: " + nodeId + " in zone " + + cluster.getNodeById(nodeId).getZoneId()).append(Utils.NEWLINE); + Set> partitionPairs = nodeIdToAllPartitions.get(nodeId); + + int replicaType = 0; + while(partitionPairs.size() > 0) { + List> replicaPairs = new ArrayList>(); + for(Pair pair: partitionPairs) { + if(pair.getFirst() == replicaType) { + replicaPairs.add(pair); + } + } + List partitions = new ArrayList(); + for(Pair pair: replicaPairs) { + partitionPairs.remove(pair); + partitions.add(pair.getSecond()); + } + java.util.Collections.sort(partitions); + + sb.append("\t\t" + replicaType); + for(int zoneId: cluster.getZoneIds()) { + sb.append(" : z" + zoneId + " : "); + List zonePartitions = new ArrayList(); + for(int partitionId: partitions) { + if(cluster.getPartitionIdsInZone(zoneId).contains(partitionId)) { + zonePartitions.add(partitionId); + } + } + sb.append(zonePartitions.toString()); + + } + sb.append(Utils.NEWLINE); + + replicaType++; + } + } + + return sb.toString(); + } + + /** + * Dumps the partition IDs per node in terms of zone n-ary type. + * + * @param cluster + * @param storeRoutingPlan + * @return pretty printed string of detailed zone n-ary type. + */ + private String dumpZoneNAryDetails(StoreRoutingPlan storeRoutingPlan) { + StringBuilder sb = new StringBuilder(); + + sb.append("\tDetailed Dump (Zone N-Aries):").append(Utils.NEWLINE); + for(Node node: storeRoutingPlan.getCluster().getNodes()) { + int zoneId = node.getZoneId(); + int nodeId = node.getId(); + sb.append("\tNode ID: " + nodeId + " in zone " + zoneId).append(Utils.NEWLINE); + List naries = storeRoutingPlan.getNaryPartitionIds(nodeId); + Map> zoneNaryTypeToPartitionIds = new HashMap>(); + for(int nary: naries) { + int zoneReplicaType = storeRoutingPlan.getZoneReplicaType(zoneId, nodeId, nary); + if(!zoneNaryTypeToPartitionIds.containsKey(zoneReplicaType)) { + zoneNaryTypeToPartitionIds.put(zoneReplicaType, new ArrayList()); + } + zoneNaryTypeToPartitionIds.get(zoneReplicaType).add(nary); + } + + for(int replicaType: new TreeSet(zoneNaryTypeToPartitionIds.keySet())) { + sb.append("\t\t" + replicaType + " : "); + sb.append(zoneNaryTypeToPartitionIds.get(replicaType).toString()); + sb.append(Utils.NEWLINE); + } + } + + return sb.toString(); + } + public double getPrimaryMaxMin() { return primaryMaxMin; } @@ -234,7 +332,8 @@ public int getNaryPartitionCount(int nodeId) { */ public double getUtility() { - return 2 * getZonePrimaryMaxMin() + getNaryMaxMin(); + return (UTILITY_MULTIPLIER_IOPS * getZonePrimaryMaxMin()) + + (UTILITY_MULTIPLIER_CAPACITY * getNaryMaxMin()); } @Override diff --git a/src/java/voldemort/utils/RebalanceUtils.java b/src/java/voldemort/utils/RebalanceUtils.java index 6092187f3a..798eb1b178 100644 --- a/src/java/voldemort/utils/RebalanceUtils.java +++ b/src/java/voldemort/utils/RebalanceUtils.java @@ -538,7 +538,7 @@ public static Cluster createUpdatedCluster(Cluster currentCluster, for(int donatedPartition: donatedPartitions) { // Gets the donor Node that owns this donated partition - Node donorNode = ClusterUtils.getNodeByPartitionId(updatedCluster, donatedPartition); + Node donorNode = updatedCluster.getNodeForPartitionId(donatedPartition); Node stealerNode = updatedCluster.getNodeById(stealerNodeId); if(donorNode == stealerNode) { diff --git a/src/java/voldemort/utils/RepartitionUtils.java b/src/java/voldemort/utils/RepartitionUtils.java index c234def2f4..74fba2f56a 100644 --- a/src/java/voldemort/utils/RepartitionUtils.java +++ b/src/java/voldemort/utils/RepartitionUtils.java @@ -30,12 +30,16 @@ import voldemort.cluster.Cluster; import voldemort.cluster.Node; +import voldemort.routing.StoreRoutingPlan; import voldemort.store.StoreDefinition; +import voldemort.store.StoreUtils; import voldemort.xml.ClusterMapper; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +// TODO: (review) change this from a static helper class to a full-blown class +// "Repartitioner"? /** * RepartitionUtils provides functions that balance the distribution of * partitions across a cluster. @@ -45,6 +49,48 @@ public class RepartitionUtils { private static Logger logger = Logger.getLogger(RepartitionUtils.class); + /** + * Recommended (default) number of times to attempt repartitioning. + */ + public final static int DEFAULT_REPARTITION_ATTEMPTS = 5; + /** + * Default number of random partition ID swaps to attempt, if random swaps + * are enabled. + */ + public final static int DEFAULT_RANDOM_SWAP_ATTEMPTS = 100; + /** + * Default number of successful random swaps (i.e., the random swap improves + * balance) after which reparitioning stops, if random swaps are enabled. + */ + public final static int DEFAULT_RANDOM_SWAP_SUCCESSES = 100; + /** + * Default number of greedy partition ID swaps to perform, if greedy swaps + * are enabled. Each greedy partition ID swaps considers (some number of + * partitions per node) X (some number of partitions from rest of cluster) + * and selects the best such swap. + */ + public final static int DEFAULT_GREEDY_SWAP_ATTEMPTS = 5; + /** + * Default setting for which zone IDs to run greedy swap algorithm. null + * implies greedily swapping across all zones. + */ + public final static List DEFAULT_GREEDY_ZONE_IDS = null; + /** + * Default (max) number of partition IDs per node to consider, if greedy + * swaps are enabled. + */ + public final static int DEFAULT_GREEDY_MAX_PARTITIONS_PER_NODE = 5; + /** + * Default (max) number of partition IDs from all the other nodes in the + * cluster to consider, if greedy swaps are enabled. + */ + public final static int DEFAULT_GREEDY_MAX_PARTITIONS_PER_ZONE = 25; + /** + * Default limit on length of contiguous partition ID run within a zone. 0 + * implies no limit on such runs. + */ + public final static int DEFAULT_MAX_CONTIGUOUS_PARTITIONS = 0; + /** * Runs a number of distinct algorithms over the specified clusters/store * defs to better balance partition IDs over nodes such that all nodes have @@ -63,8 +109,7 @@ public class RepartitionUtils { * * This method is used for three key use cases: *
    - *
  • Rebalancing : Distribute partition IDs better for an existing - * cluster. + *
  • Shuffling : Distribute partition IDs better for an existing cluster. *
  • Cluster expansion : Distribute partition IDs to take advantage of new * nodes (added to some of the zones). *
  • Zone expansion : Distribute partition IDs into a new zone. @@ -76,14 +121,19 @@ public class RepartitionUtils { * expansion, otherwise pass in same as currentCluster. * @param targetStoreDefs target store defs; needed for zone expansion, * otherwise pass in same as currentStores. - * @param outputDir - * @param attempts - * @param disableNodeBalancing - * @param disableZoneBalancing - * @param enableRandomSwaps + * @param outputDir Directory in which to dump cluster xml and analysis + * files. + * @param attempts Number of distinct repartitionings to attempt, the best + * of which is returned. + * @param disableNodeBalancing Disables the core algorithm that balances + * primaries among nodes within each zone. + * @param disableZoneBalancing For the core algorithm that balances + * primaries among nodes in each zone, disable balancing primaries + * among zones. + * @param enableRandomSwaps Enables random swap optimization. * @param randomSwapAttempts * @param randomSwapSuccesses - * @param enableGreedySwaps + * @param enableGreedySwaps Enables greedy swap optimization. * @param greedyZoneIds * @param greedySwapAttempts * @param greedySwapMaxPartitionsPerNode @@ -156,6 +206,11 @@ public static Cluster repartition(final Cluster currentCluster, double currentUtility = partitionBalance.getUtility(); System.out.println("Optimization number " + attempt + ": " + currentUtility + " max/min ratio"); + System.out.println("-------------------------\n"); + System.out.println(dumpInvalidMetadataRate(targetCluster, + currentStoreDefs, + nextCluster, + currentStoreDefs)); if(currentUtility <= minUtility) { minUtility = currentUtility; @@ -254,7 +309,14 @@ public static Pair, HashMap> getDonorsAndS return new Pair, HashMap>(donorNodes, stealerNodes); } + // TODO: (refactor) rename targetCluster -> interimCluster /** + * This method balances primary partitions among nodes within a zone, and + * optionally primary partitions among zones. The balancing is done at the + * level of partitionIds. Such partition Id movement may, or may not, result + * in data movement during a rebalancing. See RebalancePlan for the object + * responsible for determining which partition-stores move where for a + * specific repartitioning. * * @param targetCluster * @param balanceZones indicates whether or not number of primary partitions @@ -299,7 +361,24 @@ public static Cluster balancePrimaryPartitions(final Cluster targetCluster, bool HashMap stealerNodes = donorsAndStealers.getSecond(); List stealerNodeKeys = new ArrayList(stealerNodes.keySet()); - // Go over every stealerNode and steal partitions from donor nodes + /* + * There is no "intelligence" here about which partition IDs are moved + * where. The RebalancePlan object owns determining how to move data + * around to meet a specific repartitioning. That said, a little bit of + * intelligence here may go a long way. For example, for zone expansion + * data could be minimized by: + * + * (1) Selecting a minimal # of partition IDs for the new zoneto + * minimize how much the ring in existing zones is perturbed; + * + * (2) Selecting partitions for the new zone from contiguous runs of + * partition IDs in other zones that are not currently n-ary partitions + * for other primary partitions; + * + * (3) Some combination of (1) and (2)... + */ + + // Go over every stealerNode and steal partition Ids from donor nodes Cluster returnCluster = ClusterUtils.copyCluster(targetCluster); Collections.shuffle(stealerNodeKeys, new Random(System.currentTimeMillis())); @@ -774,6 +853,59 @@ public static boolean validateClusterUpdate(final Cluster before, final Cluster return true; } + // TODO: move to some other util class since it is called by + // RepartitionUtils and by RebalancePlan. + /** + * + * @param curCluster + * @param targetClustertargetPartitionsPerZone + * @return + */ + public static String dumpInvalidMetadataRate(final Cluster currentCluster, + List currentStoreDefs, + final Cluster targetCluster, + List targetStoreDefs) { + StringBuilder sb = new StringBuilder(); + sb.append("Dump of invalid metadata rates per zone").append(Utils.NEWLINE); + + HashMap uniqueStores = KeyDistributionGenerator.getUniqueStoreDefinitionsWithCounts(currentStoreDefs); + + for(StoreDefinition currentStoreDef: uniqueStores.keySet()) { + sb.append("Store exemplar: " + currentStoreDef.getName()) + .append(Utils.NEWLINE) + .append("\tThere are " + uniqueStores.get(currentStoreDef) + " other similar stores.") + .append(Utils.NEWLINE); + + StoreRoutingPlan currentSRP = new StoreRoutingPlan(currentCluster, currentStoreDef); + StoreDefinition targetStoreDef = StoreUtils.getStoreDef(targetStoreDefs, + currentStoreDef.getName()); + StoreRoutingPlan targetSRP = new StoreRoutingPlan(targetCluster, targetStoreDef); + + // Only care about existing zones + for(int zoneId: currentCluster.getZoneIds()) { + int zoneLocalPrimaries = 0; + int invalidMetadata = 0; + // Examine nodes in current cluster in existing zone. + for(int nodeId: currentCluster.getNodeIdsInZone(zoneId)) { + for(int partitionId: targetSRP.getZonePrimaryPartitionIds(nodeId)) { + zoneLocalPrimaries++; + if(!currentSRP.getNaryPartitionIds(nodeId).contains(partitionId)) { + invalidMetadata++; + } + } + } + float rate = invalidMetadata / (float) zoneLocalPrimaries; + sb.append("\tZone " + zoneId) + .append(" : total zone primaries " + zoneLocalPrimaries) + .append(", # that trigger invalid metadata " + invalidMetadata) + .append(" => " + rate) + .append(Utils.NEWLINE); + } + } + + return sb.toString(); + } + // TODO: Move to some more generic util class? /** * This method breaks the inputList into distinct lists that are no longer @@ -820,6 +952,7 @@ public static List removeItemsToSplitListEvenly(final List inp return itemsToRemove; } + // TODO: (review) rename the methods... "evenlyDistribute" // TODO: Move to some more generic util class? /** * This method returns a list that "evenly" (within one) distributes some @@ -829,8 +962,8 @@ public static List removeItemsToSplitListEvenly(final List inp * @param breadSlices The number of buckets over which to evenly distribute * the elements. * @param peanutButter The number of elements to distribute. - * @return A list of size breadSlices each integer entry of which indicates - * the number of elements + * @return A list of size breadSlices, each integer entry of which indicates + * the number of elements. */ public static List peanutButterList(int breadSlices, int peanutButter) { if(breadSlices < 1) { @@ -855,13 +988,14 @@ public static List peanutButterList(int breadSlices, int peanutButter) } // TODO: Move to some more generic util class? + // TODO: (review) Rename from peanutButter. /** * This method returns a map that "evenly" (within one) distributes some * number of elements (peanut butter) over some number of buckets (bread * slices). * - * @param set The collection of objects over which which to evenly - * distribute the elements. + * @param set The keys of the map over which which to evenly distribute the + * elements. * @param peanutButter The number of elements to distribute. * @return A Map with keys specified by breadSlices each integer entry of * which indicates the number of elements