Skip to content

Commit

Permalink
RebalanceController uses atomic cluster/store
Browse files Browse the repository at this point in the history
RebalancePlan
RebalanceBatchPlan
- split storeDefs into currentStoreDefs & finalStoreDefs
- clean up some variable naming

*RoutingStrategy
- marked all override methods with @OverRide

RouteToAllStrategy
- Implemented getPartitionList/getReplicatingPartitionList/getMasterPartition
- Use a single imaginary parition id ("0") to implement these methods
- This allows StoreRoutingPlan to be constructed for this strategy

AbstractZonedRebalanceTest
- fix proxy put test
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent 8bbf529 commit d4c314f
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 147 deletions.
83 changes: 53 additions & 30 deletions src/java/voldemort/client/rebalance/RebalanceBatchPlan.java
Expand Up @@ -36,57 +36,73 @@
// target or spec or expanded, and final. 'target' has historically been
// overloaded to mean spec/expanded or final depending on context.
/**
* Constructs a batch plan that goes from targetCluster to finalCluster. The
* Constructs a batch plan that goes from currentCluster to finalCluster. The
* partition-stores included in the move are based on those listed in storeDefs.
* This batch plan is execution-agnostic, i.e., a plan is generated and later
* stealer- versus donor-based execution of that plan is decided.
*/
// TODO: atomic : add current/target store defs here
public class RebalanceBatchPlan {

private final Cluster targetCluster;
private final Cluster currentCluster;
private final List<StoreDefinition> currentStoreDefs;
private final Cluster finalCluster;
private final List<StoreDefinition> storeDefs;
private final List<StoreDefinition> finalStoreDefs;

protected final List<RebalancePartitionsInfo> batchPlan;

/**
* Compares the targetCluster configuration with the desired finalClsuter
* and builds a map of Target node-id to map of source node-ids and
* partitions desired to be stolen/fetched.
* Develops a batch plan to go from current cluster/stores to final
* cluster/stores.
*
* @param targetCluster The current cluster definition
* @param finalCluster The target cluster definition
* @param storeDefList The list of store definitions to rebalance
* @param enabledDeletePartition Delete the RW partition on the donor side
* after rebalance
* @param isStealerBased Do we want to generate the final plan based on the
* stealer node or the donor node?
* @param currentCluster
* @param currentStoreDefs
* @param finalCluster
* @param finalStoreDefs
*/
public RebalanceBatchPlan(final Cluster targetCluster,
public RebalanceBatchPlan(final Cluster currentCluster,
final List<StoreDefinition> currentStoreDefs,
final Cluster finalCluster,
final List<StoreDefinition> storeDefs) {
this.targetCluster = targetCluster;
final List<StoreDefinition> finalStoreDefs) {
this.currentCluster = currentCluster;
this.currentStoreDefs = currentStoreDefs;
this.finalCluster = finalCluster;
this.storeDefs = storeDefs;
RebalanceUtils.validateTargetFinalCluster(targetCluster, finalCluster);
RebalanceUtils.validateClusterStores(targetCluster, storeDefs);
RebalanceUtils.validateClusterStores(finalCluster, storeDefs);
this.finalStoreDefs = finalStoreDefs;
RebalanceUtils.validateCurrentFinalCluster(currentCluster, finalCluster);
RebalanceUtils.validateClusterStores(currentCluster, currentStoreDefs);
RebalanceUtils.validateClusterStores(finalCluster, finalStoreDefs);

this.batchPlan = constructBatchPlan();

}

/**
* Develops a batch plan to go from current cluster to final cluster for
* given stores. (Stores is common for current and final cluster.)
*
* @param currentCluster
* @param finalCluster
* @param commonStoreDefs
*/
public RebalanceBatchPlan(final Cluster currentCluster,
final Cluster finalCluster,
final List<StoreDefinition> commonStoreDefs) {
this(currentCluster, commonStoreDefs, finalCluster, commonStoreDefs);
}

public Cluster getCurrentCluster() {
return targetCluster;
return currentCluster;
}

public List<StoreDefinition> getCurrentStoreDefs() {
return currentStoreDefs;
}

public Cluster getFinalCluster() {
return finalCluster;
}

public List<StoreDefinition> getStoreDefs() {
return storeDefs;
public List<StoreDefinition> getFinalStoreDefs() {
return finalStoreDefs;
}

public List<RebalancePartitionsInfo> getBatchPlan() {
Expand Down Expand Up @@ -115,6 +131,11 @@ public MoveMap getNodeMoveMap() {
return moveMap;
}

/**
* Determines total number of partition-stores moved across zones.
*
* @return number of cross zone partition-store moves
*/
public int getCrossZonePartitionStoreMoves() {
int xzonePartitionStoreMoves = 0;
for(RebalancePartitionsInfo info: batchPlan) {
Expand Down Expand Up @@ -191,7 +212,7 @@ public List<RebalancePartitionsInfo> buildRebalancePartitionsInfos() {
result.add(new RebalancePartitionsInfo(stealerDonor.getFirst(),
stealerDonor.getSecond(),
stealerDonorToStoreToStealPartition.get(stealerDonor),
targetCluster));
currentCluster));
}
return result;
}
Expand All @@ -215,10 +236,12 @@ public List<RebalancePartitionsInfo> buildRebalancePartitionsInfos() {
private List<RebalancePartitionsInfo> constructBatchPlan() {
// Construct all store routing plans once.
HashMap<String, StoreRoutingPlan> targetStoreRoutingPlans = new HashMap<String, StoreRoutingPlan>();
HashMap<String, StoreRoutingPlan> finalStoreRoutingPlans = new HashMap<String, StoreRoutingPlan>();
for(StoreDefinition storeDef: storeDefs) {
targetStoreRoutingPlans.put(storeDef.getName(), new StoreRoutingPlan(targetCluster,
for(StoreDefinition storeDef: currentStoreDefs) {
targetStoreRoutingPlans.put(storeDef.getName(), new StoreRoutingPlan(currentCluster,
storeDef));
}
HashMap<String, StoreRoutingPlan> finalStoreRoutingPlans = new HashMap<String, StoreRoutingPlan>();
for(StoreDefinition storeDef: finalStoreDefs) {
finalStoreRoutingPlans.put(storeDef.getName(), new StoreRoutingPlan(finalCluster,
storeDef));
}
Expand All @@ -230,7 +253,7 @@ private List<RebalancePartitionsInfo> constructBatchPlan() {
int stealerNodeId = stealerNode.getId();

// Consider all store definitions ...
for(StoreDefinition storeDef: storeDefs) {
for(StoreDefinition storeDef: finalStoreDefs) {
StoreRoutingPlan targetSRP = targetStoreRoutingPlans.get(storeDef.getName());
StoreRoutingPlan finalSRP = finalStoreRoutingPlans.get(storeDef.getName());
for(int stealerPartitionId: finalSRP.getZoneNAryPartitionIds(stealerNodeId)) {
Expand Down Expand Up @@ -329,7 +352,7 @@ protected int getDonorId(StoreRoutingPlan targetSRP,
} else {
// Steal from zone that hosts primary partition Id.
int targetMasterNodeId = targetSRP.getNodeIdForPartitionId(stealerPartitionId);
donorZoneId = targetCluster.getNodeById(targetMasterNodeId).getZoneId();
donorZoneId = currentCluster.getNodeById(targetMasterNodeId).getZoneId();
}

return targetSRP.getNodeIdForZoneNary(donorZoneId, stealerZoneNAry, stealerPartitionId);
Expand Down
59 changes: 31 additions & 28 deletions src/java/voldemort/client/rebalance/RebalanceController.java
Expand Up @@ -297,10 +297,10 @@ private void batchStatusLog(int batchCount,
* @param batchPlan The batch plan...
*/
private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {
// TODO: Add current/final store defs here.
final Cluster batchCurrentCluster = batchPlan.getCurrentCluster();
final List<StoreDefinition> batchCurrentStoreDefs = batchPlan.getCurrentStoreDefs();
final Cluster batchFinalCluster = batchPlan.getFinalCluster();
final List<StoreDefinition> batchStoreDefs = batchPlan.getStoreDefs();
final List<StoreDefinition> batchFinalStoreDefs = batchPlan.getFinalStoreDefs();

try {
final List<RebalancePartitionsInfo> rebalancePartitionsInfoList = batchPlan.getBatchPlan();
Expand All @@ -313,8 +313,8 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {
// new cluster xml.
adminClient.rebalanceOps.rebalanceStateChange(batchCurrentCluster,
batchFinalCluster,
batchStoreDefs,
batchStoreDefs,
batchCurrentStoreDefs,
batchFinalStoreDefs,
rebalancePartitionsInfoList,
false,
true,
Expand All @@ -327,9 +327,9 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {
RebalanceUtils.printLog(batchCount, logger, "Starting batch " + batchCount + ".");

// Split the store definitions
List<StoreDefinition> readOnlyStoreDefs = StoreDefinitionUtils.filterStores(batchStoreDefs,
List<StoreDefinition> readOnlyStoreDefs = StoreDefinitionUtils.filterStores(batchFinalStoreDefs,
true);
List<StoreDefinition> readWriteStoreDefs = StoreDefinitionUtils.filterStores(batchStoreDefs,
List<StoreDefinition> readWriteStoreDefs = StoreDefinitionUtils.filterStores(batchFinalStoreDefs,
false);
boolean hasReadOnlyStores = readOnlyStoreDefs != null && readOnlyStoreDefs.size() > 0;
boolean hasReadWriteStores = readWriteStoreDefs != null
Expand All @@ -342,8 +342,9 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {

rebalanceStateChange(batchCount,
batchCurrentCluster,
batchCurrentStoreDefs,
batchFinalCluster,
batchStoreDefs,
batchFinalStoreDefs,
filteredRebalancePartitionPlanList,
hasReadOnlyStores,
hasReadWriteStores,
Expand All @@ -353,7 +354,7 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {
if(hasReadOnlyStores) {
executeSubBatch(batchCount,
batchCurrentCluster,
batchStoreDefs,
batchCurrentStoreDefs,
filteredRebalancePartitionPlanList,
hasReadOnlyStores,
hasReadWriteStores,
Expand All @@ -367,8 +368,9 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {

rebalanceStateChange(batchCount,
batchCurrentCluster,
batchCurrentStoreDefs,
batchFinalCluster,
batchStoreDefs,
batchFinalStoreDefs,
filteredRebalancePartitionPlanList,
hasReadOnlyStores,
hasReadWriteStores,
Expand All @@ -378,7 +380,7 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {
if(hasReadWriteStores) {
executeSubBatch(batchCount,
batchCurrentCluster,
batchStoreDefs,
batchCurrentStoreDefs,
filteredRebalancePartitionPlanList,
hasReadOnlyStores,
hasReadWriteStores,
Expand Down Expand Up @@ -425,8 +427,9 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {
*/
private void rebalanceStateChange(final int taskId,
Cluster batchCurrentCluster,
List<StoreDefinition> batchCurrentStoreDefs,
Cluster batchFinalCluster,
List<StoreDefinition> batchStoreDefs,
List<StoreDefinition> batchFinalStoreDefs,
List<RebalancePartitionsInfo> rebalancePartitionPlanList,
boolean hasReadOnlyStores,
boolean hasReadWriteStores,
Expand All @@ -447,8 +450,8 @@ private void rebalanceStateChange(final int taskId,
"Cluster metadata change + rebalance state change");
adminClient.rebalanceOps.rebalanceStateChange(batchCurrentCluster,
batchFinalCluster,
batchStoreDefs,
batchStoreDefs,
batchCurrentStoreDefs,
batchFinalStoreDefs,
rebalancePartitionPlanList,
false,
true,
Expand All @@ -460,8 +463,8 @@ private void rebalanceStateChange(final int taskId,
RebalanceUtils.printLog(taskId, logger, "Rebalance state change");
adminClient.rebalanceOps.rebalanceStateChange(batchCurrentCluster,
batchFinalCluster,
batchStoreDefs,
batchStoreDefs,
batchCurrentStoreDefs,
batchFinalStoreDefs,
rebalancePartitionPlanList,
false,
false,
Expand All @@ -473,8 +476,8 @@ private void rebalanceStateChange(final int taskId,
RebalanceUtils.printLog(taskId, logger, "Swap + Cluster metadata change");
adminClient.rebalanceOps.rebalanceStateChange(batchCurrentCluster,
batchFinalCluster,
batchStoreDefs,
batchStoreDefs,
batchCurrentStoreDefs,
batchFinalStoreDefs,
rebalancePartitionPlanList,
true,
true,
Expand All @@ -488,8 +491,8 @@ private void rebalanceStateChange(final int taskId,
"Swap + Cluster metadata change + rebalance state change");
adminClient.rebalanceOps.rebalanceStateChange(batchCurrentCluster,
batchFinalCluster,
batchStoreDefs,
batchStoreDefs,
batchCurrentStoreDefs,
batchFinalStoreDefs,
rebalancePartitionPlanList,
true,
true,
Expand Down Expand Up @@ -534,16 +537,16 @@ private void rebalanceStateChange(final int taskId,
* </pre>
*
* @param taskId Rebalance task id
* @param batchCurrentCluster Cluster to rollback to if we have a problem
* @param batchRollbackCluster Cluster to rollback to if we have a problem
* @param rebalancePartitionPlanList The list of rebalance partition plans
* @param hasReadOnlyStores Are we rebalancing any read-only stores?
* @param hasReadWriteStores Are we rebalancing any read-write stores?
* @param finishedReadOnlyStores Have we finished rebalancing of read-only
* stores?
*/
private void executeSubBatch(final int taskId,
final Cluster batchCurrentCluster,
final List<StoreDefinition> batchStoreDefs,
final Cluster batchRollbackCluster,
final List<StoreDefinition> batchRollbackStoreDefs,
final List<RebalancePartitionsInfo> rebalancePartitionPlanList,
boolean hasReadOnlyStores,
boolean hasReadWriteStores,
Expand All @@ -558,8 +561,8 @@ private void executeSubBatch(final int taskId,
final List<RebalanceTask> incompleteTasks = Lists.newArrayList();

// Semaphores for donor nodes - To avoid multiple disk sweeps
Semaphore[] donorPermits = new Semaphore[batchCurrentCluster.getNumberOfNodes()];
for(Node node: batchCurrentCluster.getNodes()) {
Semaphore[] donorPermits = new Semaphore[batchRollbackCluster.getNumberOfNodes()];
for(Node node: batchRollbackCluster.getNodes()) {
donorPermits[node.getId()] = new Semaphore(1);
}

Expand Down Expand Up @@ -617,9 +620,9 @@ private void executeSubBatch(final int taskId,
if(hasReadOnlyStores && hasReadWriteStores && finishedReadOnlyStores) {
// Case 0
adminClient.rebalanceOps.rebalanceStateChange(null,
batchCurrentCluster,
batchRollbackCluster,
null,
batchStoreDefs,
batchRollbackStoreDefs,
null,
true,
true,
Expand All @@ -629,9 +632,9 @@ private void executeSubBatch(final int taskId,
} else if(hasReadWriteStores && finishedReadOnlyStores) {
// Case 4
adminClient.rebalanceOps.rebalanceStateChange(null,
batchCurrentCluster,
batchRollbackCluster,
null,
batchStoreDefs,
batchRollbackStoreDefs,
null,
false,
true,
Expand Down

0 comments on commit d4c314f

Please sign in to comment.