Skip to content

Commit

Permalink
RebalancePlanCLI working for 2-to-3 zone expansion
Browse files Browse the repository at this point in the history
Stealers steal from first donor with desired partition-store. This currently biases all work towards donors with lowest node IDs. This is truly imbalanced plan!

RebalanceClusterPlan
- Decorated with more TODOs...
- Changed stealer to pick first donor with desired partition-store
- Added commented-out code section that outlines next step for better planning

RebalancePartitionsInfo
- Decorated with more TODOs

RebalancePlan
- Much clean up refactoring. Construction now generates plan.
- Methods added to get the plan (getPlan) and to print the plan (toString)

RebalanceUtils
- Helper method that derives targetCluster from initialCluster and finalCluster.
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent 857aa57 commit 5e72e52
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 70 deletions.
76 changes: 72 additions & 4 deletions src/java/voldemort/client/rebalance/RebalanceClusterPlan.java
Expand Up @@ -25,7 +25,7 @@

// TODO: (refactor) Rename RebalanceClusterPlan to RebalanceBatchPlan
// TODO: (refactor) Rename targetCluster -> finalCluster
// TODO: (refactor) Rename currentCluster -> targetCluster
// TODO: (refactor) Rename currentCluster -> targetCluster?
// TODO: (refactor) Fix cluster nomenclature in general: make sure there are
// exactly three prefixes used to distinguish cluster xml: initial or current,
// target or spec or expanded, and final. 'target' is overloaded to mean
Expand Down Expand Up @@ -248,6 +248,9 @@ private void prioritizeBatchPlan() {
*/
}

// TODO: Revisit these "two principles". I am not sure about the second one.
// Either because I don't like delete being mixed with this code or because
// we probably want to copy from a to-be-deleted partitoin-store.
/**
* Generate the list of partition movement based on 2 principles:
*
Expand Down Expand Up @@ -304,9 +307,33 @@ private List<RebalancePartitionsInfo> getRebalancePartitionsInfo(final Cluster c
continue;
}

/*-
* Outline of code to change this method...
StoreRoutingPlan currentStoreRoutingPlan = new StoreRoutingPlan(currentCluster,
storeDef);
StoreRoutingPlan targetStoreRoutingPlan = new StoreRoutingPlan(targetCluster, storeDef);
final Set<Pair<Integer, Integer>> trackStealPartitionsTuples = new HashSet<Pair<Integer, Integer>>();
final Set<Pair<Integer, Integer>> haveToStealTuples = Sets.newHashSet(stealerNodeIdToStolenPartitionTuples.get(stealerNodeId));
for(Pair<Integer, Integer> replicaTypePartitionId: haveToStealTuples) {
// TODO: Do not steal something you already have!
// TODO: Steal from zone local n-ary if possible
// TODO: Steal appropriate n-ary from zone that currently hosts
// true
// primary.
}
if(trackStealPartitionsTuples.size() > 0) {
addPartitionsToPlan(trackStealPartitionsTuples,
donorNodeToStoreToStealPartition,
donorNode.getId(),
storeDef.getName());
}
*/

// Now we find out which donor can donate partitions to this stealer
final Set<Pair<Integer, Integer>> haveToStealTuples = Sets.newHashSet(stealerNodeIdToStolenPartitionTuples.get(stealerNodeId));
for(Node donorNode: currentCluster.getNodes()) {

// The same node can't donate
Expand All @@ -321,6 +348,10 @@ private List<RebalancePartitionsInfo> getRebalancePartitionsInfo(final Cluster c
final Set<Pair<Integer, Integer>> trackStealPartitionsTuples = new HashSet<Pair<Integer, Integer>>();
final Set<Pair<Integer, Integer>> trackDeletePartitionsTuples = new HashSet<Pair<Integer, Integer>>();

// TODO: donatePartitionTuple ought to take all donor nodes
// as input and select "best" partition to donate. E.g., from
// within the zone!

// Checks if this donor node can donate any tuples
donatePartitionTuple(donorNode,
haveToStealTuples,
Expand Down Expand Up @@ -350,8 +381,8 @@ private List<RebalancePartitionsInfo> getRebalancePartitionsInfo(final Cluster c
donorNode.getId(),
storeDef.getName());
}

}

}

// Now combine the plans generated individually into the actual
Expand All @@ -376,18 +407,23 @@ private List<RebalancePartitionsInfo> getRebalancePartitionsInfo(final Cluster c
private void addPartitionsToPlan(Set<Pair<Integer, Integer>> trackPartitionsTuples,
HashMap<Integer, HashMap<String, HashMap<Integer, List<Integer>>>> donorNodeToStoreToPartitionTuples,
int donorNodeId,
String name) {
String storeName) {
HashMap<String, HashMap<Integer, List<Integer>>> storeToStealPartitionTuples = null;
if(donorNodeToStoreToPartitionTuples.containsKey(donorNodeId)) {
storeToStealPartitionTuples = donorNodeToStoreToPartitionTuples.get(donorNodeId);
} else {
storeToStealPartitionTuples = Maps.newHashMap();
donorNodeToStoreToPartitionTuples.put(donorNodeId, storeToStealPartitionTuples);
}
storeToStealPartitionTuples.put(name,
storeToStealPartitionTuples.put(storeName,
RebalanceUtils.flattenPartitionTuples(trackPartitionsTuples));
}

// TODO: (refactor) trackStealPartitoinsTuples is updated in this method.
// I.e., the 'void' return code is misleading. AND, only specific
// partitionId:replicaType's are actually stolen. AND, 'haveToStealTuples'
// is also modified. Clean up this method!
// TODO: (refactor): At least remove commented out historic code...
/**
* Given a donor node and a set of tuples that need to be stolen, checks if
* the donor can contribute any
Expand All @@ -398,6 +434,7 @@ private void addPartitionsToPlan(Set<Pair<Integer, Integer>> trackPartitionsTupl
* @param trackStealPartitionsTuples Set of partitions tuples already stolen
* @param donorPartitionTuples All partition tuples on donor node
*/
/*-
private void donatePartitionTuple(final Node donorNode,
Set<Pair<Integer, Integer>> haveToStealTuples,
Set<Pair<Integer, Integer>> trackStealPartitionsTuples,
Expand All @@ -416,6 +453,37 @@ private void donatePartitionTuple(final Node donorNode,
}
}
}
*/
private void donatePartitionTuple(final Node donorNode,
Set<Pair<Integer, Integer>> haveToStealTuples,
Set<Pair<Integer, Integer>> trackStealPartitionsTuples,
Set<Pair<Integer, Integer>> donorPartitionTuples) {
final Iterator<Pair<Integer, Integer>> iter = haveToStealTuples.iterator();

// Iterate over the partition tuples to steal and check if this node can
// donate it
while(iter.hasNext()) {
Pair<Integer, Integer> partitionTupleToSteal = iter.next();

// TODO: HACK to steal from ANY node that has the desired partition.
// Totally ignoring the replicaType.
for(Pair<Integer, Integer> rt: donorPartitionTuples) {
if(rt.getSecond() == partitionTupleToSteal.getSecond()) {
// TODO: passing in 'rt' instead of 'partitoinTupleToSteal'
// is a one-line change that circumvents server-side checks
// during execution that the "correct" replicaType is being
// stolen from a donor. This change is fragile and should be
// hardened by removing such replicaType checks from the
// code path.
trackStealPartitionsTuples.add(rt);
// trackStealPartitionsTuples.add(partitionTupleToSteal);

// This partition has been donated, remove it
iter.remove();
}
}
}
}

/**
* We do not delete if this donor node is also the stealer node for another
Expand Down
Expand Up @@ -53,7 +53,7 @@ public class RebalancePartitionsInfo {
// Except for MetadataStoreTest which passes in a random value, the value
// passed to the constructor for this value is always 0.
private int attempt;
// TODO: (refactor) Unclear what value the Hashmap provides. It maps
// TODO: (refactor) Unclear what value of the inner Hashmap is. It maps
// "replica type" to lists of partition IDs. A list of partition IDs (per
// store) seems sufficient for all purposes. The replica type is a
// distraction.
Expand Down Expand Up @@ -263,6 +263,7 @@ public synchronized int getPartitionStoreMoves() {
}
}

// TODO: Confirm not counting deletes is the correct action.
/*-
* Do not count deletes.
for(HashMap<Integer, List<Integer>> storeDeletes: storeToReplicaToDeletePartitionList.values()) {
Expand Down
143 changes: 86 additions & 57 deletions src/java/voldemort/client/rebalance/RebalancePlan.java
Expand Up @@ -33,6 +33,7 @@
import voldemort.utils.MoveMap;
import voldemort.utils.PartitionBalance;
import voldemort.utils.RebalanceUtils;
import voldemort.utils.Utils;
import voldemort.xml.ClusterMapper;

import com.google.common.collect.Lists;
Expand All @@ -47,17 +48,24 @@ public class RebalancePlan {

private final Cluster currentCluster;
private final List<StoreDefinition> currentStores;
// TODO: (refactor) Better name than targetCluster? expandedCluster?
// specCluster?
private final Cluster targetCluster;
private final Cluster finalCluster;
private final List<StoreDefinition> finalStores;
private final boolean stealerBased;
private final int batchSize;
private final String outputDir;

// TODO: (refactor) Better name than targetCluster? expandedCluster?
// specCluster?
private final Cluster targetCluster;
private List<RebalanceClusterPlan> batchPlans;

// Aggregate stats
private int numPrimaryPartitionMoves;
private int numPartitionStoreMoves;
private int numXZonePartitionStoreMoves;
private final MoveMap nodeMoveMap;
private final MoveMap zoneMoveMap;

public RebalancePlan(final Cluster currentCluster,
final List<StoreDefinition> currentStores,
final Cluster finalCluster,
Expand All @@ -73,13 +81,32 @@ public RebalancePlan(final Cluster currentCluster,
this.batchSize = batchSize;
this.outputDir = outputDir;

// Derive the targetCluster from current & final cluster xml
RebalanceUtils.validateFinalCluster(this.currentCluster, this.finalCluster);
this.targetCluster = RebalanceUtils.getClusterWithNewNodes(this.currentCluster,
this.finalCluster);
this.targetCluster = RebalanceUtils.getTargetCluster(this.currentCluster, this.finalCluster);

// Verify each cluster/storedefs pair
RebalanceUtils.validateClusterStores(this.currentCluster, this.currentStores);
RebalanceUtils.validateClusterStores(this.finalCluster, this.finalStores);
RebalanceUtils.validateClusterStores(this.targetCluster, this.finalStores);

// Log key arguments
logger.info("Current cluster : " + currentCluster);
logger.info("Target cluster : " + targetCluster);
logger.info("Final cluster : " + finalCluster);
logger.info("Batch size : " + batchSize);

// Initialize the plan
batchPlans = new ArrayList<RebalanceClusterPlan>();

// Initialize aggregate statistics
numPrimaryPartitionMoves = 0;
numPartitionStoreMoves = 0;
numXZonePartitionStoreMoves = 0;
nodeMoveMap = new MoveMap(targetCluster.getNodeIds());
zoneMoveMap = new MoveMap(targetCluster.getZoneIds());

plan();
}

public RebalancePlan(final Cluster currentCluster,
Expand All @@ -98,49 +125,18 @@ public RebalancePlan(final Cluster currentCluster,
}

/**
* Create a plan
*/
public List<RebalanceClusterPlan> plan() {
logger.info("Current cluster : " + currentCluster);
logger.info("Target cluster : " + targetCluster);
logger.info("Final cluster : " + finalCluster);
logger.info("Batch size : " + batchSize);

batchPlans = new ArrayList<RebalanceClusterPlan>();
rebalancePerClusterTransition();
return batchPlans;
}

// TODO: (refactor) rename this method to something sane and/or flatten into
// plan() method. And/or break into helper mehtods.
/**
* Rebalance on a step-by-step transitions from cluster.xml to
* target-cluster.xml
* Create a plan. The plan consists of batches. Each batch involves the
* movement of nor more than batchSize primary partitions. The movement of a
* single primary partition may require migration of other n-ary replicas,
* and potentially deletions. Migrating a primary or n-ary partition
* requires migrating one partition-store for every store hosted at that
* partition.
*
* <br>
*
* Each transition represents the migration of one primary partition (
* {@link #rebalancePerPartitionTransition(int, OrderedClusterTransition)} )
* along with all its side effect ( i.e. migration of replicas + deletions
* ).
*
*
* @param currentCluster The normalized cluster. This cluster contains new
* nodes with empty partitions as well
* @param finalCluster The desired cluster after rebalance
* @param storeDefs Stores to rebalance
*/
private void rebalancePerClusterTransition() {
private void plan() {
// Mapping of stealer node to list of primary partitions being moved
final TreeMultimap<Integer, Integer> stealerToStolenPrimaryPartitions = TreeMultimap.create();

// Various counts for progress bar
int numPrimaryPartitionMoves = 0;
int numPartitionStoreMoves = 0;
int numXZonePartitionStoreMoves = 0;
MoveMap nodeMoveMap = new MoveMap(targetCluster.getNodeIds());
MoveMap zoneMoveMap = new MoveMap(targetCluster.getZoneIds());

// Used for creating clones
ClusterMapper mapper = new ClusterMapper();

Expand Down Expand Up @@ -212,17 +208,7 @@ private void rebalancePerClusterTransition() {
batchTargetCluster = mapper.readCluster(new StringReader(mapper.writeCluster(batchFinalCluster)));
}

logger.info("Total number of primary partition moves : " + numPrimaryPartitionMoves);
logger.info("Total number of partition-store moves : " + numPartitionStoreMoves);
logger.info("Total number of cross-zone partition-store moves :"
+ numXZonePartitionStoreMoves);
logger.info("Zone move map (partition-stores):\n"
+ "(zone id) -> (zone id) = # of partition-stores moving from the former zone to the latter\n"
+ zoneMoveMap);
logger.info("Node flow map (partition-stores):\n"
+ "# partitions-stores stealing into -> (node id) -> # of partition-stores donating out of\n"
+ nodeMoveMap.toFlowString());
logger.info(storageOverhead(nodeMoveMap.groupByTo()));
logger.info(this);
}

/**
Expand All @@ -236,7 +222,7 @@ private String storageOverhead(Map<Integer, Integer> finalNodeToOverhead) {
double maxOverhead = Double.MIN_VALUE;
PartitionBalance pb = new PartitionBalance(currentCluster, currentStores);
StringBuilder sb = new StringBuilder();
sb.append("Per-node store-overhead:\n");
sb.append("Per-node store-overhead:").append(Utils.NEWLINE);
DecimalFormat doubleDf = new DecimalFormat("####.##");
for(int nodeId: finalCluster.getNodeIds()) {
Node node = finalCluster.getNodeById(nodeId);
Expand All @@ -258,9 +244,52 @@ private String storageOverhead(Map<Integer, Integer> finalNodeToOverhead) {
+ String.format("%6d", toLoad) + " -> "
+ String.format("%6d", initialLoad + toLoad) + " ("
+ doubleDf.format(overhead) + " X)";
sb.append(nodeTag + " : " + loadTag + "\n");
sb.append(nodeTag + " : " + loadTag).append(Utils.NEWLINE);
}
sb.append("\n\tMax per-node storage overhead: " + doubleDf.format(maxOverhead) + " X.\n");
sb.append(Utils.NEWLINE)
.append("**** Max per-node storage overhead: " + doubleDf.format(maxOverhead) + " X.")
.append(Utils.NEWLINE);
return (sb.toString());
}

/**
*
* @return The plan!
*/
public List<RebalanceClusterPlan> getPlan() {
return batchPlans;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
// Dump entire plan batch-by-batch, partition info-by-partition info...
for(RebalanceClusterPlan batchPlan: batchPlans) {
sb.append(batchPlan).append(Utils.NEWLINE);
}
// Dump aggregate stats of the plan
sb.append("Total number of primary partition moves : " + numPrimaryPartitionMoves)
.append(Utils.NEWLINE)
.append("Total number of partition-store moves : " + numPartitionStoreMoves)
.append(Utils.NEWLINE)
.append("Total number of cross-zone partition-store moves :"
+ numXZonePartitionStoreMoves)
.append(Utils.NEWLINE)
.append("Zone move map (partition-stores):")
.append(Utils.NEWLINE)
.append("(zone id) -> (zone id) = # of partition-stores moving from the former zone to the latter")
.append(Utils.NEWLINE)
.append(zoneMoveMap)
.append(Utils.NEWLINE)
.append("Node flow map (partition-stores):")
.append(Utils.NEWLINE)
.append("# partitions-stores stealing into -> (node id) -> # of partition-stores donating out of")
.append(Utils.NEWLINE)
.append(nodeMoveMap.toFlowString())
.append(Utils.NEWLINE)
.append(storageOverhead(nodeMoveMap.groupByTo()))
.append(Utils.NEWLINE);

return sb.toString();
}
}
15 changes: 7 additions & 8 deletions src/java/voldemort/tools/RebalancePlanCLI.java
Expand Up @@ -173,14 +173,13 @@ public static void main(String[] args) throws Exception {
RebalanceClientConfig config = new RebalanceClientConfig();
config.setPrimaryPartitionBatchSize(batchSize);

RebalancePlan rebalancePlanner = new RebalancePlan(currentCluster,
currentStoreDefs,
targetCluster,
targetStoreDefs,
stealerBased,
batchSize,
outputDir);
rebalancePlanner.plan();
new RebalancePlan(currentCluster,
currentStoreDefs,
targetCluster,
targetStoreDefs,
stealerBased,
batchSize,
outputDir);
}

}

0 comments on commit 5e72e52

Please sign in to comment.