Skip to content

Commit

Permalink
Fixes after rebase/dropped propagateCluster method
Browse files Browse the repository at this point in the history
After the rebase with the atomic update of cluster/stores branch, had
some clean up to do. Got code running and unit tests passing again. Note
that VAdminProto.java had to be regenerated after the merge.

Noted that RebalanceUtils.propagate cluster was unnecessary (and
potentially dangerous) and so dropped it from the code.
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent c228e5f commit 8bbf529
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 92 deletions.
Expand Up @@ -41,6 +41,7 @@
* This batch plan is execution-agnostic, i.e., a plan is generated and later
* stealer- versus donor-based execution of that plan is decided.
*/
// TODO: atomic : add current/target store defs here
public class RebalanceBatchPlan {

private final Cluster targetCluster;
Expand Down
53 changes: 36 additions & 17 deletions src/java/voldemort/client/rebalance/RebalanceController.java
Expand Up @@ -148,18 +148,14 @@ public RebalancePlan getPlan(Cluster finalCluster, int batchSize) {
return getPlan(finalCluster, currentStoreDefs, batchSize);
}

/**
* Validate the plan, validate the cluster, and then kick off rebalance.
*
* @param rebalancePlan
*/
public void rebalance(final RebalancePlan rebalancePlan) {
Cluster finalCluster = rebalancePlan.getFinalCluster();
List<StoreDefinition> finalStores = rebalancePlan.getFinalStores();

validatePlan(rebalancePlan);
prepareForRebalance(finalCluster, finalStores);

logger.info("Propagating cluster " + finalCluster + " to all nodes");
// TODO: (atomic cluster/stores update) Add finalStores here so that
// cluster & stores can be updated atomically. Need to rebase first.
RebalanceUtils.propagateCluster(adminClient, finalCluster);

validateRebalancePlanState(rebalancePlan);
validateClusterForRebalance(rebalancePlan);
executePlan(rebalancePlan);
}

Expand All @@ -168,8 +164,8 @@ public void rebalance(final RebalancePlan rebalancePlan) {
*
* @param rebalancePlan
*/
private void validatePlan(RebalancePlan rebalancePlan) {
logger.info("Validating plan state.");
private void validateRebalancePlanState(RebalancePlan rebalancePlan) {
logger.info("Validating rebalance plan state.");

Cluster currentCluster = rebalancePlan.getCurrentCluster();
List<StoreDefinition> currentStores = rebalancePlan.getCurrentStores();
Expand All @@ -194,15 +190,17 @@ private void validatePlan(RebalancePlan rebalancePlan) {
* @param finalCluster
* @param finalStores
*/
private void prepareForRebalance(Cluster finalCluster, List<StoreDefinition> finalStores) {
logger.info("Validating state of deployed cluster.");
private void validateClusterForRebalance(RebalancePlan rebalancePlan) {
logger.info("Validating state of deployed cluster to confirm its ready for rebalance.");
Cluster finalCluster = rebalancePlan.getFinalCluster();
List<StoreDefinition> finalStoreDefs = rebalancePlan.getFinalStores();

// Reset the cluster that the admin client points at
adminClient.setAdminClientCluster(finalCluster);
// Validate that all the nodes ( new + old ) are in normal state
RebalanceUtils.checkEachServerInNormalState(finalCluster, adminClient);
// Verify all old RO stores exist at version 2
RebalanceUtils.validateReadOnlyStores(finalCluster, finalStores, adminClient);
RebalanceUtils.validateReadOnlyStores(finalCluster, finalStoreDefs, adminClient);
}

/**
Expand All @@ -212,7 +210,7 @@ private void prepareForRebalance(Cluster finalCluster, List<StoreDefinition> fin
* @param rebalancePlan
*/
private void executePlan(RebalancePlan rebalancePlan) {
logger.info("Starting rebalancing!");
logger.info("Starting to execute rebalance Plan!");

int batchCount = 0;
int partitionStoreCount = 0;
Expand Down Expand Up @@ -299,6 +297,7 @@ private void batchStatusLog(int batchCount,
* @param batchPlan The batch plan...
*/
private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {
// TODO: Add current/final store defs here.
final Cluster batchCurrentCluster = batchPlan.getCurrentCluster();
final Cluster batchFinalCluster = batchPlan.getFinalCluster();
final List<StoreDefinition> batchStoreDefs = batchPlan.getStoreDefs();
Expand All @@ -314,6 +313,8 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {
// new cluster xml.
adminClient.rebalanceOps.rebalanceStateChange(batchCurrentCluster,
batchFinalCluster,
batchStoreDefs,
batchStoreDefs,
rebalancePartitionsInfoList,
false,
true,
Expand Down Expand Up @@ -342,6 +343,7 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {
rebalanceStateChange(batchCount,
batchCurrentCluster,
batchFinalCluster,
batchStoreDefs,
filteredRebalancePartitionPlanList,
hasReadOnlyStores,
hasReadWriteStores,
Expand All @@ -351,6 +353,7 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {
if(hasReadOnlyStores) {
executeSubBatch(batchCount,
batchCurrentCluster,
batchStoreDefs,
filteredRebalancePartitionPlanList,
hasReadOnlyStores,
hasReadWriteStores,
Expand All @@ -365,6 +368,7 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {
rebalanceStateChange(batchCount,
batchCurrentCluster,
batchFinalCluster,
batchStoreDefs,
filteredRebalancePartitionPlanList,
hasReadOnlyStores,
hasReadWriteStores,
Expand All @@ -374,6 +378,7 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {
if(hasReadWriteStores) {
executeSubBatch(batchCount,
batchCurrentCluster,
batchStoreDefs,
filteredRebalancePartitionPlanList,
hasReadOnlyStores,
hasReadWriteStores,
Expand Down Expand Up @@ -421,6 +426,7 @@ private void executeBatch(int batchCount, final RebalanceBatchPlan batchPlan) {
private void rebalanceStateChange(final int taskId,
Cluster batchCurrentCluster,
Cluster batchFinalCluster,
List<StoreDefinition> batchStoreDefs,
List<RebalancePartitionsInfo> rebalancePartitionPlanList,
boolean hasReadOnlyStores,
boolean hasReadWriteStores,
Expand All @@ -441,6 +447,8 @@ private void rebalanceStateChange(final int taskId,
"Cluster metadata change + rebalance state change");
adminClient.rebalanceOps.rebalanceStateChange(batchCurrentCluster,
batchFinalCluster,
batchStoreDefs,
batchStoreDefs,
rebalancePartitionPlanList,
false,
true,
Expand All @@ -452,6 +460,8 @@ private void rebalanceStateChange(final int taskId,
RebalanceUtils.printLog(taskId, logger, "Rebalance state change");
adminClient.rebalanceOps.rebalanceStateChange(batchCurrentCluster,
batchFinalCluster,
batchStoreDefs,
batchStoreDefs,
rebalancePartitionPlanList,
false,
false,
Expand All @@ -463,6 +473,8 @@ private void rebalanceStateChange(final int taskId,
RebalanceUtils.printLog(taskId, logger, "Swap + Cluster metadata change");
adminClient.rebalanceOps.rebalanceStateChange(batchCurrentCluster,
batchFinalCluster,
batchStoreDefs,
batchStoreDefs,
rebalancePartitionPlanList,
true,
true,
Expand All @@ -476,6 +488,8 @@ private void rebalanceStateChange(final int taskId,
"Swap + Cluster metadata change + rebalance state change");
adminClient.rebalanceOps.rebalanceStateChange(batchCurrentCluster,
batchFinalCluster,
batchStoreDefs,
batchStoreDefs,
rebalancePartitionPlanList,
true,
true,
Expand Down Expand Up @@ -529,6 +543,7 @@ private void rebalanceStateChange(final int taskId,
*/
private void executeSubBatch(final int taskId,
final Cluster batchCurrentCluster,
final List<StoreDefinition> batchStoreDefs,
final List<RebalancePartitionsInfo> rebalancePartitionPlanList,
boolean hasReadOnlyStores,
boolean hasReadWriteStores,
Expand Down Expand Up @@ -604,6 +619,8 @@ private void executeSubBatch(final int taskId,
adminClient.rebalanceOps.rebalanceStateChange(null,
batchCurrentCluster,
null,
batchStoreDefs,
null,
true,
true,
false,
Expand All @@ -614,6 +631,8 @@ private void executeSubBatch(final int taskId,
adminClient.rebalanceOps.rebalanceStateChange(null,
batchCurrentCluster,
null,
batchStoreDefs,
null,
false,
true,
false,
Expand Down
75 changes: 0 additions & 75 deletions src/java/voldemort/utils/RebalanceUtils.java
Expand Up @@ -530,81 +530,6 @@ public static Cluster createUpdatedCluster(Cluster currentCluster,
return updatedCluster;
}

/**
* Attempt to propagate a cluster definition to all nodes. Also rollback is
* in place in case one of them fails
*
* @param adminClient {@link voldemort.client.protocol.admin.AdminClient}
* instance to use.
* @param cluster Cluster definition to propagate
*/
public static void propagateCluster(AdminClient adminClient, Cluster cluster) {

// Contains a mapping of node id to the existing cluster definition
HashMap<Integer, Cluster> currentClusters = Maps.newHashMap();

Versioned<Cluster> latestCluster = new Versioned<Cluster>(cluster);
ArrayList<Versioned<Cluster>> clusterList = new ArrayList<Versioned<Cluster>>();
clusterList.add(latestCluster);

for(Node node: cluster.getNodes()) {
try {
Versioned<Cluster> versionedCluster = adminClient.metadataMgmtOps.getRemoteCluster(node.getId());
VectorClock newClock = (VectorClock) versionedCluster.getVersion();

// Update the current cluster information
currentClusters.put(node.getId(), versionedCluster.getValue());

if(null != newClock && !clusterList.contains(versionedCluster)) {
// check no two clocks are concurrent.
checkNotConcurrent(clusterList, newClock);

// add to clock list
clusterList.add(versionedCluster);

// update latestClock
Occurred occurred = newClock.compare(latestCluster.getVersion());
if(Occurred.AFTER.equals(occurred))
latestCluster = versionedCluster;
}

} catch(Exception e) {
throw new VoldemortException("Failed to get cluster version from node "
+ node.getId(), e);
}
}

// Vector clock to propagate
VectorClock latestClock = ((VectorClock) latestCluster.getVersion()).incremented(0,
System.currentTimeMillis());

// Alright, now try updating the values...
Set<Integer> completedNodeIds = Sets.newHashSet();
try {
for(Node node: cluster.getNodes()) {
logger.info("Updating cluster definition on remote node " + node);
adminClient.metadataMgmtOps.updateRemoteCluster(node.getId(), cluster, latestClock);
logger.info("Updated cluster definition " + cluster + " on remote node "
+ node.getId());
completedNodeIds.add(node.getId());
}
} catch(VoldemortException e) {
// Fail early...
for(Integer completedNodeId: completedNodeIds) {
try {
adminClient.metadataMgmtOps.updateRemoteCluster(completedNodeId,
currentClusters.get(completedNodeId),
latestClock);
} catch(VoldemortException exception) {
logger.error("Could not revert cluster metadata back on node "
+ completedNodeId);
}
}
throw e;
}

}

/**
* For a particular stealer node find all the "primary" <replica, partition>
* tuples it will steal. In other words, expect the "replica" part to be 0
Expand Down

0 comments on commit 8bbf529

Please sign in to comment.