Skip to content

Commit

Permalink
Addressing reviews of balance analysis & repartitionerCLI
Browse files Browse the repository at this point in the history
RebalancePlan
- print out invalid metadata rate analysis

Cluster
- added members that allow reverse lookup of partitionID to either ZoneId or NodeId

StoreRoutingPlan
- Added reverse lookup of nodeID to Zone Primaries hosted.

RepartitionCLI
- moved constants into RepartitionUtils
- Added validation of clusters & storedefs passed in.

ClusterUtils
- use lookup from Cluster object
- changed all contig partition ID methods to call the one method that does all the complicated work.

PartitionBalance
- Added constants for use in the utility function
- Broke the core, complicated method into small, reasonably-sized pieces.
- Added verbose print out of ZoneReplicaTypes (to complement old "replica type" print out)

RepartitionUtils
- Document the recommend default constants for repartitioning
- Dump invalid metadata rate
- Clean up comments and java doc
- dumpInvalidMetadataRate analyzes and pretty-prints how many pseudo-masters per-zone per-unique-store-def lead to invalid metadata exception.
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent 80dc13a commit fdc0eb3
Show file tree
Hide file tree
Showing 8 changed files with 441 additions and 224 deletions.
6 changes: 6 additions & 0 deletions src/java/voldemort/client/rebalance/RebalancePlan.java
Expand Up @@ -33,6 +33,7 @@
import voldemort.utils.MoveMap;
import voldemort.utils.PartitionBalance;
import voldemort.utils.RebalanceUtils;
import voldemort.utils.RepartitionUtils;
import voldemort.utils.Utils;
import voldemort.xml.ClusterMapper;

Expand Down Expand Up @@ -97,6 +98,11 @@ public RebalancePlan(final Cluster currentCluster,
logger.info("Final cluster : " + finalCluster);
logger.info("Batch size : " + batchSize);

logger.info(RepartitionUtils.dumpInvalidMetadataRate(currentCluster,
currentStores,
finalCluster,
finalStores));

// Initialize the plan
batchPlans = new ArrayList<RebalanceClusterPlan>();

Expand Down
17 changes: 17 additions & 0 deletions src/java/voldemort/cluster/Cluster.java
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -51,6 +52,8 @@ public class Cluster implements Serializable {
private final Map<Integer, Zone> zonesById;
private final Map<Zone, List<Integer>> nodesPerZone;
private final Map<Zone, List<Integer>> partitionsPerZone;
private final Map<Integer, Zone> partitionIdToZone;
private final Map<Integer, Node> partitionIdToNode;

public Cluster(String name, List<Node> nodes) {
this(name, nodes, new ArrayList<Zone>());
Expand All @@ -60,6 +63,8 @@ public Cluster(String name, List<Node> nodes, List<Zone> zones) {
this.name = Utils.notNull(name);
this.partitionsPerZone = new LinkedHashMap<Zone, List<Integer>>();
this.nodesPerZone = new LinkedHashMap<Zone, List<Integer>>();
this.partitionIdToZone = new HashMap<Integer, Zone>();
this.partitionIdToNode = new HashMap<Integer, Node>();

if(zones.size() != 0) {
zonesById = new LinkedHashMap<Integer, Zone>(zones.size());
Expand Down Expand Up @@ -93,6 +98,10 @@ public Cluster(String name, List<Node> nodes, List<Zone> zones) {
}
nodesPerZone.get(nodesZone).add(node.getId());
partitionsPerZone.get(nodesZone).addAll(node.getPartitionIds());
for(Integer partitionId: node.getPartitionIds()) {
this.partitionIdToZone.put(partitionId, nodesZone);
this.partitionIdToNode.put(partitionId, node);
}
}
this.numberOfTags = getNumberOfTags(nodes);
}
Expand Down Expand Up @@ -192,6 +201,14 @@ public Set<Integer> getPartitionIdsInZone(Integer zoneId) {
return new TreeSet<Integer>(partitionsPerZone.get(getZoneById(zoneId)));
}

public Zone getZoneForPartitionId(int partitionId) {
return partitionIdToZone.get(partitionId);
}

public Node getNodeForPartitionId(int partitionId) {
return partitionIdToNode.get(partitionId);
}

public Node getNodeById(int id) {
Node node = nodesById.get(id);
if(node == null)
Expand Down
21 changes: 16 additions & 5 deletions src/java/voldemort/routing/StoreRoutingPlan.java
Expand Up @@ -34,8 +34,6 @@

import com.google.common.collect.Lists;

// TODO: Add StoreInstanceTest unit test for these helper methods.

/**
* This class wraps up a Cluster object and a StoreDefinition. The methods are
* effectively helper or util style methods for querying the routing plan that
Expand All @@ -48,6 +46,7 @@ public class StoreRoutingPlan {
private final StoreDefinition storeDefinition;
private final Map<Integer, Integer> partitionIdToNodeIdMap;
private final Map<Integer, List<Integer>> nodeIdToNaryPartitionMap;
private final Map<Integer, List<Integer>> nodeIdToZonePrimaryMap;
private final RoutingStrategy routingStrategy;

public StoreRoutingPlan(Cluster cluster, StoreDefinition storeDefinition) {
Expand All @@ -57,14 +56,20 @@ public StoreRoutingPlan(Cluster cluster, StoreDefinition storeDefinition) {
this.routingStrategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition,
cluster);
this.nodeIdToNaryPartitionMap = new HashMap<Integer, List<Integer>>();
this.nodeIdToZonePrimaryMap = new HashMap<Integer, List<Integer>>();
for(int nodeId: cluster.getNodeIds()) {
this.nodeIdToNaryPartitionMap.put(nodeId, new ArrayList<Integer>());
this.nodeIdToZonePrimaryMap.put(nodeId, new ArrayList<Integer>());
}
for(int masterPartitionId = 0; masterPartitionId < cluster.getNumberOfPartitions(); ++masterPartitionId) {
List<Integer> naryPartitionIds = getReplicatingPartitionList(masterPartitionId);
for(int naryPartitionId: naryPartitionIds) {
int naryNodeId = getNodeIdForPartitionId(naryPartitionId);
nodeIdToNaryPartitionMap.get(naryNodeId).add(masterPartitionId);
int naryZoneId = cluster.getNodeById(naryNodeId).getZoneId();
if(getZoneReplicaType(naryZoneId, naryNodeId, naryPartitionId) == 0) {
nodeIdToZonePrimaryMap.get(naryNodeId).add(masterPartitionId);
}
}
}
}
Expand All @@ -87,8 +92,6 @@ public List<Integer> getReplicatingPartitionList(int masterPartitionId) {
return this.routingStrategy.getReplicatingPartitionList(masterPartitionId);
}

// TODO: Add test for this method (if this method is still required after
// the RebalanceController is updated to use RebalancePlan).
/**
*
* @param nodeId
Expand All @@ -98,6 +101,15 @@ public List<Integer> getNaryPartitionIds(int nodeId) {
return nodeIdToNaryPartitionMap.get(nodeId);
}

/**
*
* @param nodeId
* @return all nary partition IDs hosted on the node.
*/
public List<Integer> getZonePrimaryPartitionIds(int nodeId) {
return nodeIdToZonePrimaryMap.get(nodeId);
}

/**
* Determines list of partition IDs that replicate the key.
*
Expand Down Expand Up @@ -387,7 +399,6 @@ public int getZoneReplicaNodeId(int zoneId, int zoneReplicaType, int partitionId
// TODO: (refactor) Move from static methods to non-static methods that use
// this object's cluster and storeDefinition member for the various
// check*BelongsTo* methods.

/**
* Check that the key belongs to one of the partitions in the map of replica
* type to partitions
Expand Down
53 changes: 26 additions & 27 deletions src/java/voldemort/tools/RepartitionCLI.java
Expand Up @@ -30,6 +30,7 @@
import voldemort.cluster.Cluster;
import voldemort.store.StoreDefinition;
import voldemort.utils.CmdUtils;
import voldemort.utils.RebalanceUtils;
import voldemort.utils.RepartitionUtils;
import voldemort.utils.Utils;
import voldemort.xml.ClusterMapper;
Expand All @@ -41,13 +42,7 @@ public class RepartitionCLI {

private final static Logger logger = Logger.getLogger(RepartitionCLI.class);

private final static int DEFAULT_REPARTITION_ATTEMPTS = 5;
private final static int DEFAULT_RANDOM_SWAP_ATTEMPTS = 100;
private final static int DEFAULT_RANDOM_SWAP_SUCCESSES = 100;
private final static int DEFAULT_GREEDY_SWAP_ATTEMPTS = 5;
private final static int DEFAULT_GREEDY_MAX_PARTITIONS_PER_NODE = 5;
private final static int DEFAULT_GREEDY_MAX_PARTITIONS_PER_ZONE = 25;
private final static int DEFAULT_MAX_CONTIGUOUS_PARTITIONS = 0;
// TODO: (review) explain these default values.

private static OptionParser parser;

Expand All @@ -70,7 +65,7 @@ private static void setupParser() {
.describedAs("stores.xml");
parser.accepts("attempts",
"Number of attempts at repartitioning. [ Default: "
+ DEFAULT_REPARTITION_ATTEMPTS + " ]")
+ RepartitionUtils.DEFAULT_REPARTITION_ATTEMPTS + " ]")
.withRequiredArg()
.ofType(Integer.class)
.describedAs("num-attempts");
Expand All @@ -87,39 +82,40 @@ private static void setupParser() {
"Enable attempts to improve balance by random partition swaps within a zone. [Default: disabled]");
parser.accepts("random-swap-attempts",
"Number of random swaps to attempt. [Default:"
+ DEFAULT_RANDOM_SWAP_ATTEMPTS + " ]")
+ RepartitionUtils.DEFAULT_RANDOM_SWAP_ATTEMPTS + " ]")
.withRequiredArg()
.ofType(Integer.class)
.describedAs("num-attempts");
parser.accepts("random-swap-successes",
"Number of successful random swaps to permit exit before completing all swap attempts. [Default:"
+ DEFAULT_RANDOM_SWAP_SUCCESSES + " ]")
+ RepartitionUtils.DEFAULT_RANDOM_SWAP_SUCCESSES + " ]")
.withRequiredArg()
.ofType(Integer.class)
.describedAs("num-successes");
parser.accepts("enable-greedy-swaps",
"Enable attempts to improve balance by greedily swapping (random) partitions within a zone. [Default: disabled]");
parser.accepts("greedy-swap-attempts",
"Number of greedy (random) swaps to attempt. [Default:"
+ DEFAULT_GREEDY_SWAP_ATTEMPTS + " ]")
+ RepartitionUtils.DEFAULT_GREEDY_SWAP_ATTEMPTS + " ]")
.withRequiredArg()
.ofType(Integer.class)
.describedAs("num-attempts");
parser.accepts("greedy-max-partitions-per-node",
"Max number of partitions per-node to evaluate swapping with other partitions within the zone. [Default:"
+ DEFAULT_GREEDY_MAX_PARTITIONS_PER_NODE + " ]")
+ RepartitionUtils.DEFAULT_GREEDY_MAX_PARTITIONS_PER_NODE + " ]")
.withRequiredArg()
.ofType(Integer.class)
.describedAs("max-partitions-per-node");
parser.accepts("greedy-max-partitions-per-zone",
"Max number of (random) partitions per-zone to evaluate swapping with partitions from node being evaluated. [Default:"
+ DEFAULT_GREEDY_MAX_PARTITIONS_PER_ZONE + " ]")
+ RepartitionUtils.DEFAULT_GREEDY_MAX_PARTITIONS_PER_ZONE + " ]")
.withRequiredArg()
.ofType(Integer.class)
.describedAs("max-partitions-per-zone");
parser.accepts("max-contiguous-partitions",
"Limit the number of contiguous partition IDs allowed within a zone. [Default:"
+ DEFAULT_MAX_CONTIGUOUS_PARTITIONS + " (indicating no limit)]")
+ RepartitionUtils.DEFAULT_MAX_CONTIGUOUS_PARTITIONS
+ " (indicating no limit)]")
.withRequiredArg()
.ofType(Integer.class)
.describedAs("num-contiguous");
Expand Down Expand Up @@ -205,14 +201,18 @@ public static void main(String[] args) throws Exception {

Cluster currentCluster = new ClusterMapper().readCluster(new File(currentClusterXML));
List<StoreDefinition> currentStoreDefs = new StoreDefinitionsMapper().readStoreList(new File(currentStoresXML));
RebalanceUtils.validateClusterStores(currentCluster, currentStoreDefs);

Cluster targetCluster = new ClusterMapper().readCluster(new File(targetClusterXML));
List<StoreDefinition> targetStoreDefs = new StoreDefinitionsMapper().readStoreList(new File(targetStoresXML));
RebalanceUtils.validateClusterStores(targetCluster, targetStoreDefs);

// TODO: more verification of arguments:
// RebalanceUtils.validateTargetCluster(currentCluster,targetCluster);
RebalanceUtils.validateCurrentTargetCluster(currentCluster, targetCluster);

// Optional administrivia args
int attempts = CmdUtils.valueOf(options, "attempts", DEFAULT_REPARTITION_ATTEMPTS);
int attempts = CmdUtils.valueOf(options,
"attempts",
RepartitionUtils.DEFAULT_REPARTITION_ATTEMPTS);
String outputDir = null;
if(options.has("output-dir")) {
outputDir = (String) options.valueOf("output-dir");
Expand All @@ -224,23 +224,23 @@ public static void main(String[] args) throws Exception {
boolean enableRandomSwaps = options.has("enable-random-swaps");
int randomSwapAttempts = CmdUtils.valueOf(options,
"random-swap-attempts",
DEFAULT_RANDOM_SWAP_ATTEMPTS);
RepartitionUtils.DEFAULT_RANDOM_SWAP_ATTEMPTS);
int randomSwapSuccesses = CmdUtils.valueOf(options,
"random-swap-successes",
DEFAULT_RANDOM_SWAP_SUCCESSES);
RepartitionUtils.DEFAULT_RANDOM_SWAP_SUCCESSES);
boolean enableGreedySwaps = options.has("enable-greedy-swaps");
int greedySwapAttempts = CmdUtils.valueOf(options,
"greedy-swap-attempts",
DEFAULT_GREEDY_SWAP_ATTEMPTS);
RepartitionUtils.DEFAULT_GREEDY_SWAP_ATTEMPTS);
int greedyMaxPartitionsPerNode = CmdUtils.valueOf(options,
"greedy-max-partitions-per-node",
DEFAULT_GREEDY_MAX_PARTITIONS_PER_NODE);
RepartitionUtils.DEFAULT_GREEDY_MAX_PARTITIONS_PER_NODE);
int greedyMaxPartitionsPerZone = CmdUtils.valueOf(options,
"greedy-max-partitions-per-zone",
DEFAULT_GREEDY_MAX_PARTITIONS_PER_ZONE);
RepartitionUtils.DEFAULT_GREEDY_MAX_PARTITIONS_PER_ZONE);
int maxContiguousPartitionsPerZone = CmdUtils.valueOf(options,
"max-contiguous-partitions",
DEFAULT_MAX_CONTIGUOUS_PARTITIONS);
RepartitionUtils.DEFAULT_MAX_CONTIGUOUS_PARTITIONS);

// Sanity check optional repartitioning args
if(disableNodeBalancing && !enableRandomSwaps && !enableGreedySwaps
Expand All @@ -255,10 +255,9 @@ public static void main(String[] args) throws Exception {
&& !enableGreedySwaps) {
printUsageAndDie("Provided arguments for generate greedy swaps but did not enable the feature");
}
// TODO: If necessary, add options to choose each zone in cluster, all
// nodes across cluster, or a list of zone IDs. For now though, just
// offer the behavior of greedily swapping among all nodes in cluster.
List<Integer> greedyZoneIds = null;
// In the future, can add options to choose each zone in cluster, all
// nodes across cluster, or a list of zone IDs.
List<Integer> greedyZoneIds = RepartitionUtils.DEFAULT_GREEDY_ZONE_IDS;

RepartitionUtils.repartition(currentCluster,
currentStoreDefs,
Expand Down

0 comments on commit fdc0eb3

Please sign in to comment.