Skip to content

Commit

Permalink
Address review comments for RebalancePlan
Browse files Browse the repository at this point in the history
RebalanceBatchPlan
- factored logic to decide which donor to steal from out of constructBatchPlan.
- huge header comment about other policies to consider

RebalanceController
- dropped comment about historic (arbitrary) sleep command

RebalancePlan
- added helper hacky method to "clone" cluster

RebalanceCluster
- TODO to add a .clone() method

StoreRoutingPlan
- cleaned up javadoc, TODOs, and method names

DonorBasedRebalancePusherSlave
- set sleep to 30 seconds. Made comments and logger messages consistent with code.

StorageEngine
- cleaned up javadoc for isPartitionAware()

RebalanceUtils
- fix validation method to use safe(r) comparison.

RebalanceBatchPlanTest
- fixed tests to (mostly) only test plan invariants. Prior test code focussed on exact plan details and so was hard-coded to the implementation, rather than the interface.
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent 4fdeaf2 commit 5244216
Show file tree
Hide file tree
Showing 16 changed files with 383 additions and 558 deletions.
124 changes: 89 additions & 35 deletions src/java/voldemort/client/rebalance/RebalanceBatchPlan.java
Expand Up @@ -72,7 +72,7 @@ public RebalanceBatchPlan(final Cluster targetCluster,
RebalanceUtils.validateClusterStores(targetCluster, storeDefs);
RebalanceUtils.validateClusterStores(finalCluster, storeDefs);

this.batchPlan = batchPlan();
this.batchPlan = constructBatchPlan();

}

Expand Down Expand Up @@ -202,23 +202,16 @@ public List<RebalancePartitionsInfo> buildRebalancePartitionsInfos() {
*
* 1) A stealer node does not steal any partition-stores it already hosts.
*
* 2) If possible, a stealer node that is the n-ary zone replica in the
* finalCluster steals from the n-ary zone replica in the targetCluster in
* the same zone.
* 2) Use current policy to decide which node to steal from: see getDonorId
* method.
*
* 3) If there are no partitoin-stores to steal in the same zone (i.e., this
* is the "zone expansion" use case), then the stealer node that is the
* n-ary zone replica in the finalCluster determines which pre-existing zone
* in the targetCluster hosts the primary partitionId for the
* partition-store and steals the n-ary zone replica from that zone.
*
* In summary, this batch plan avoids all unnecessary cross zone moves,
* Currently, this batch plan avoids all unnecessary cross zone moves,
* distributes cross zone moves into new zones evenly across existing zones,
* and copies replicaFactor partition-stores into any new zone.
*
* @return the batch plan
*/
private List<RebalancePartitionsInfo> batchPlan() {
private List<RebalancePartitionsInfo> constructBatchPlan() {
// Construct all store routing plans once.
HashMap<String, StoreRoutingPlan> targetStoreRoutingPlans = new HashMap<String, StoreRoutingPlan>();
HashMap<String, StoreRoutingPlan> finalStoreRoutingPlans = new HashMap<String, StoreRoutingPlan>();
Expand All @@ -239,36 +232,26 @@ private List<RebalancePartitionsInfo> batchPlan() {
for(StoreDefinition storeDef: storeDefs) {
StoreRoutingPlan targetSRP = targetStoreRoutingPlans.get(storeDef.getName());
StoreRoutingPlan finalSRP = finalStoreRoutingPlans.get(storeDef.getName());
for(int stealerPartitionId: finalSRP.getNaryPartitionIds(stealerNodeId)) {
// ... and all nary partition-stores
// steal what is needed!
for(int stealerPartitionId: finalSRP.getZoneNAryPartitionIds(stealerNodeId)) {
// ... and all nary partition-stores,
// now steal what is needed

// Do not steal a partition-store you host
// Optimization: Do not steal a partition-store you already
// host!
if(targetSRP.getReplicationNodeList(stealerPartitionId).contains(stealerNodeId)) {
continue;
}

int stealerZoneReplicaType = finalSRP.getZoneReplicaType(stealerZoneId,
stealerNodeId,
stealerPartitionId);

int donorZoneId;
if(targetSRP.hasZoneReplicaType(stealerZoneId, stealerPartitionId)) {
// Steal from local n-ary (since one exists).
donorZoneId = stealerZoneId;
} else {
// Steal from zone that hosts primary partition Id.
// TODO: Add option to steal from specific
// donorZoneId.
int targetMasterNodeId = targetSRP.getNodeIdForPartitionId(stealerPartitionId);
donorZoneId = targetCluster.getNodeById(targetMasterNodeId).getZoneId();
}
// Determine which node to steal from.
int donorNodeId = getDonorId(targetSRP,
finalSRP,
stealerZoneId,
stealerNodeId,
stealerPartitionId);

int donorNodeId = targetSRP.getZoneReplicaNodeId(donorZoneId,
stealerZoneReplicaType,
stealerPartitionId);
// Add this specific partition-store steal to the overall
// plan
int donorReplicaType = targetSRP.getReplicaType(donorNodeId, stealerPartitionId);

rpiBuilder.addPartitionStoreMove(stealerNodeId,
donorNodeId,
storeDef.getName(),
Expand All @@ -281,6 +264,77 @@ private List<RebalancePartitionsInfo> batchPlan() {
return rpiBuilder.buildRebalancePartitionsInfos();
}

/**
* Decide which donor node to steal from. This is a policy implementation.
* I.e., in the future, additional policies could be considered. At that
* time, this method should be overridden in a sub-class, or a policy object
* ought to implement this algorithm.
*
* Current policy:
*
* 1) If possible, a stealer node that is the zone n-ary in the finalCluster
* steals from the zone n-ary in the targetCluster in the same zone.
*
* 2) If there are no partition-stores to steal in the same zone (i.e., this
* is the "zone expansion" use case), then a differnt policy must be used.
* The stealer node that is the zone n-ary in the finalCluster determines
* which pre-existing zone in the targetCluster hosts the primary partition
* id for the partition-store. The stealer then steals the zone n-ary from
* that pre-existing zone.
*
* This policy avoids unnecessary cross-zone moves and distributes the load
* of cross-zone moves approximately-uniformly across pre-existing zones.
*
* Other policies to consider:
*
* - For zone expansion, steal all partition-stores from one specific
* pre-existing zone.
*
* - Replace heuristic to approximately uniformly distribute load among
* existing zones to something more concrete (i.e. track steals from each
* pre-existing zone and forcibly balance them).
*
* - Select a single donor for all replicas in a new zone. This will require
* donor-based rebalancing to be run (at least for this specific part of the
* plan). This would reduce the number of donor-side scans of data. (But
* still send replication factor copies over the WAN.) This would require
* apparatus in the RebalanceController to work.
*
* - Set up some sort of chain-replication in which a single stealer in the
* new zone steals some replica from a pre-exising zone, and then other
* n-aries in the new zone steal from the single cross-zone stealer in the
* zone. This would require apparatus in the RebalanceController to work.
*
* @param targetSRP
* @param finalSRP
* @param stealerZoneId
* @param stealerNodeId
* @param stealerPartitionId
* @return the node id of the donor for this partition Id.
*/
protected int getDonorId(StoreRoutingPlan targetSRP,
StoreRoutingPlan finalSRP,
int stealerZoneId,
int stealerNodeId,
int stealerPartitionId) {

int donorZoneId;
if(targetSRP.zoneHasReplica(stealerZoneId, stealerPartitionId)) {
// Steal from local n-ary (since one exists).
donorZoneId = stealerZoneId;
} else {
// Steal from zone that hosts primary partition Id.
int targetMasterNodeId = targetSRP.getNodeIdForPartitionId(stealerPartitionId);
donorZoneId = targetCluster.getNodeById(targetMasterNodeId).getZoneId();
}

int stealerZoneNAry = finalSRP.getZoneNaryForNodesPartition(stealerZoneId,
stealerNodeId,
stealerPartitionId);
return targetSRP.getNodeIdForZoneNary(donorZoneId, stealerZoneNAry, stealerPartitionId);

}

@Override
public String toString() {
if(batchPlan == null || batchPlan.isEmpty()) {
Expand Down
7 changes: 1 addition & 6 deletions src/java/voldemort/client/rebalance/RebalanceController.java
Expand Up @@ -184,7 +184,7 @@ private void validateCluster(Cluster finalCluster, List<StoreDefinition> finalSt
// Reset the cluster that the admin client points at
adminClient.setAdminClientCluster(finalCluster);
// Validate that all the nodes ( new + old ) are in normal state
RebalanceUtils.validateProdClusterStateIsNormal(finalCluster, adminClient);
RebalanceUtils.checkEachServerInNormalState(finalCluster, adminClient);
// Verify all old RO stores exist at version 2
RebalanceUtils.validateReadOnlyStores(finalCluster, finalStores, adminClient);
}
Expand Down Expand Up @@ -643,11 +643,6 @@ private List<RebalanceTask> executeTasks(final int taskId,
HashMap<Integer, List<RebalancePartitionsInfo>> donorNodeBasedPartitionsInfo = RebalanceUtils.groupPartitionsInfoByNode(rebalancePartitionPlanList,
false);
for(Entry<Integer, List<RebalancePartitionsInfo>> entries: donorNodeBasedPartitionsInfo.entrySet()) {
// At some point, a 10 second sleep was added here to help with
// a race condition. Leaving this comment here in case, at some
// point in the future, we need to hack around some race
// condition:
// Thread.sleep(10000);
DonorBasedRebalanceTask rebalanceTask = new DonorBasedRebalanceTask(taskId,
entries.getValue(),
rebalancingClientTimeoutSeconds,
Expand Down
23 changes: 16 additions & 7 deletions src/java/voldemort/client/rebalance/RebalancePlan.java
Expand Up @@ -127,7 +127,7 @@ public RebalancePlan(final Cluster currentCluster,

/**
* Create a plan. The plan consists of batches. Each batch involves the
* movement of nor more than batchSize primary partitions. The movement of a
* movement of no more than batchSize primary partitions. The movement of a
* single primary partition may require migration of other n-ary replicas,
* and potentially deletions. Migrating a primary or n-ary partition
* requires migrating one partition-store for every store hosted at that
Expand All @@ -138,9 +138,6 @@ private void plan() {
// Mapping of stealer node to list of primary partitions being moved
final TreeMultimap<Integer, Integer> stealerToStolenPrimaryPartitions = TreeMultimap.create();

// Used for creating clones
ClusterMapper mapper = new ClusterMapper();

// Output initial and final cluster
if(outputDir != null)
RebalanceUtils.dumpClusters(targetCluster, finalCluster, outputDir);
Expand All @@ -159,11 +156,11 @@ private void plan() {

// Determine plan batch-by-batch
int batches = 0;
Cluster batchTargetCluster = mapper.readCluster(new StringReader(mapper.writeCluster(targetCluster)));
Cluster batchTargetCluster = cloneCluster(targetCluster);
while(!stealerToStolenPrimaryPartitions.isEmpty()) {

// Generate a batch partitions to move
Cluster batchFinalCluster = mapper.readCluster(new StringReader(mapper.writeCluster(batchTargetCluster)));
Cluster batchFinalCluster = cloneCluster(batchTargetCluster);
int partitions = 0;
List<Entry<Integer, Integer>> partitionsMoved = Lists.newArrayList();
for(Entry<Integer, Integer> stealerToPartition: stealerToStolenPrimaryPartitions.entries()) {
Expand Down Expand Up @@ -202,12 +199,24 @@ private void plan() {
zoneMoveMap.add(RebalanceBatchPlan.getZoneMoveMap());

batches++;
batchTargetCluster = mapper.readCluster(new StringReader(mapper.writeCluster(batchFinalCluster)));
batchTargetCluster = cloneCluster(batchFinalCluster);
}

logger.info(this);
}

/**
* In the absence of a Cluster.clone() operation, hack a clone method for
* local use.
*
* @param cluster
* @return clone of Cluster cluster.
*/
private Cluster cloneCluster(Cluster cluster) {
ClusterMapper mapper = new ClusterMapper();
return mapper.readCluster(new StringReader(mapper.writeCluster(cluster)));
}

/**
* Determines storage overhead and returns pretty printed summary.
*
Expand Down
Expand Up @@ -33,6 +33,7 @@ public DonorBasedRebalanceTask(final int taskId,
this.donorNodeId = stealInfos.get(0).getDonorId();
}

@Override
public void run() {
int rebalanceAsyncId = INVALID_REBALANCE_ID;

Expand Down
4 changes: 4 additions & 0 deletions src/java/voldemort/cluster/Cluster.java
Expand Up @@ -259,6 +259,10 @@ public String toString(boolean isDetailed) {
return builder.toString();
}

// TODO: Add a .clone() implementation. See hacked method in
// RebalancePlan.cloneCluster for example of current approach to cloning
// (use ClusterMapper to serde via XML...)

@Override
public boolean equals(Object second) {
if(this == second)
Expand Down

0 comments on commit 5244216

Please sign in to comment.