Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

We’re showing branches in this repository, but you can also compare across forks.

base fork: voldemort/voldemort
base: fd_integration_test
...
head fork: voldemort/voldemort
compare: release-1.4.2
  • 8 commits
  • 10 files changed
  • 0 commit comments
  • 2 contributors
Commits on Jun 28, 2013
Jay Wylie jayjwylie Tweaks to PartitionBalance and RebalancePlan
PartitionBalance
- calculate partition-stores per zone.
- This measure provides more context to evaluate the size of any plans to rebalance the cluster.

RebalancePlanCLI
- fix typo in verbose usage message
a26a558
Jay Wylie jayjwylie Initial hack at new rebalance scheduler.
Added RebalanceController.scheduler
- limits each node to participating in a single task as either a stealer or donor.
- randomizes the order in which tasks are attempted to be scheduled
- not a clean implementation, but enough to evaluate.
34e849e
Jay Wylie jayjwylie Tweak new rebalance scheduler
Randomized the order of rebalance tasks in each stealer's list. This
will avoid biasing the rebalance based on the order tasks were
generated.
d7356df
Jay Wylie jayjwylie Addressed review feedback on rebalance scheduler
RebalanceController
- Changed default parallelism to "infinite" since scheduler throttles parallelism
- Added TODOs for cleanup of scheduler
- Added javadoc to document scheduler and its methods
- Catch exceptions, log them, re-throw as VoldemmortRebalancingException

A ton of white space changes due to futzing with eclipse code formatter preferences. Sorry.
f8765a3
Jay Wylie jayjwylie Refactor StoreRoutingPlan. Break Base from rest to expedite construct…
…ion.
606f6fa
Jay Wylie jayjwylie Adding BaseStoreRoutingPlan. bf7daad
Jay Wylie jayjwylie Making StoreRoutingPlan even lighter-weight 5459942
vinoth chandar vinothchandar Update release_notes and build.properties for 1.4.2 release 4a95638
2  build.properties
View
@@ -42,5 +42,5 @@ tomcat.context=/voldemort
javac.version=1.5
## Release
-curr.release=1.4.0
+curr.release=1.4.2
3  release_notes.txt
View
@@ -1,6 +1,9 @@
Note: Server changes are not backwards compatible. To use new
rebalancing tooling, servers must be upgraded before hand.
+Release 1.4.2 on 06/27/2013
+* Fixing costly StoreRoutingPlan object construction
+
Release 1.4.0 on 06/21/2013
* Zone expansion release
445 src/java/voldemort/client/rebalance/RebalanceController.java
View
@@ -17,10 +17,17 @@
package voldemort.client.rebalance;
import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -66,7 +73,7 @@
private final Cluster currentCluster;
private final List<StoreDefinition> currentStoreDefs;
- public final static int MAX_PARALLEL_REBALANCING = 1;
+ public final static int MAX_PARALLEL_REBALANCING = Integer.MAX_VALUE;
public final static int MAX_TRIES_REBALANCING = 2;
public final static boolean STEALER_BASED_REBALANCING = true;
public final static long PROXY_PAUSE_IN_SECONDS = TimeUnit.MINUTES.toSeconds(5);
@@ -117,7 +124,8 @@ public RebalanceController(String bootstrapUrl,
* the current cluster/stores & configuration of the RebalanceController.
*
* @param finalCluster
- * @param finalStoreDefs Needed for zone expansion/shrinking.
+ * @param finalStoreDefs
+ * Needed for zone expansion/shrinking.
* @param batchSize
* @return
*/
@@ -203,7 +211,9 @@ private void validateClusterForRebalance(RebalancePlan rebalancePlan) {
// Validate that all the nodes ( new + old ) are in normal state
RebalanceUtils.checkEachServerInNormalState(finalCluster, adminClient);
// Verify all old RO stores exist at version 2
- RebalanceUtils.validateReadOnlyStores(finalCluster, finalStoreDefs, adminClient);
+ RebalanceUtils.validateReadOnlyStores(finalCluster,
+ finalStoreDefs,
+ adminClient);
}
/**
@@ -223,9 +233,12 @@ private void executePlan(RebalancePlan rebalancePlan) {
int numBatches = entirePlan.size();
int numPartitionStores = rebalancePlan.getPartitionStoresMoved();
- for(RebalanceBatchPlan batchPlan: entirePlan) {
- logger.info("======== REBALANCING BATCH " + (batchCount + 1) + " ========");
- RebalanceUtils.printBatchLog(batchCount, logger, batchPlan.toString());
+ for (RebalanceBatchPlan batchPlan : entirePlan) {
+ logger.info("======== REBALANCING BATCH " + (batchCount + 1)
+ + " ========");
+ RebalanceUtils.printBatchLog(batchCount,
+ logger,
+ batchPlan.toString());
long startTimeMs = System.currentTimeMillis();
// ACTUALLY DO A BATCH OF REBALANCING!
@@ -246,11 +259,16 @@ private void executePlan(RebalancePlan rebalancePlan) {
/**
* Pretty print a progress update after each batch complete.
*
- * @param batchCount current batch
- * @param numBatches total number of batches
- * @param partitionStoreCount partition stores migrated
- * @param numPartitionStores total number of partition stores to migrate
- * @param totalTimeMs total time, in milliseconds, of execution thus far.
+ * @param batchCount
+ * current batch
+ * @param numBatches
+ * total number of batches
+ * @param partitionStoreCount
+ * partition stores migrated
+ * @param numPartitionStores
+ * total number of partition stores to migrate
+ * @param totalTimeMs
+ * total time, in milliseconds, of execution thus far.
*/
private void batchStatusLog(int batchCount,
int numBatches,
@@ -260,7 +278,7 @@ private void batchStatusLog(int batchCount,
// Calculate the estimated end time and pretty print stats
double rate = 1;
long estimatedTimeMs = 0;
- if(numPartitionStores > 0) {
+ if (numPartitionStores > 0) {
rate = partitionStoreCount / numPartitionStores;
estimatedTimeMs = (long) (totalTimeMs / rate) - totalTimeMs;
}
@@ -292,11 +310,13 @@ private void batchStatusLog(int batchCount,
/**
* Executes a batch plan.
*
- * @param batchId Used as the ID of the batch plan. This allows related
- * tasks on client- & server-side to pretty print messages in a
- * manner that debugging can track specific batch plans across the
- * cluster.
- * @param batchPlan The batch plan...
+ * @param batchId
+ * Used as the ID of the batch plan. This allows related tasks on
+ * client- & server-side to pretty print messages in a manner
+ * that debugging can track specific batch plans across the
+ * cluster.
+ * @param batchPlan
+ * The batch plan...
*/
private void executeBatch(int batchId, final RebalanceBatchPlan batchPlan) {
final Cluster batchCurrentCluster = batchPlan.getCurrentCluster();
@@ -307,9 +327,9 @@ private void executeBatch(int batchId, final RebalanceBatchPlan batchPlan) {
try {
final List<RebalancePartitionsInfo> rebalancePartitionsInfoList = batchPlan.getBatchPlan();
- if(rebalancePartitionsInfoList.isEmpty()) {
- RebalanceUtils.printBatchLog(batchId, logger, "Skipping batch " + batchId
- + " since it is empty.");
+ if (rebalancePartitionsInfoList.isEmpty()) {
+ RebalanceUtils.printBatchLog(batchId, logger, "Skipping batch "
+ + batchId + " since it is empty.");
// Even though there is no rebalancing work to do, cluster
// metadata must be updated so that the server is aware of the
// new cluster xml.
@@ -326,16 +346,18 @@ private void executeBatch(int batchId, final RebalanceBatchPlan batchPlan) {
return;
}
- RebalanceUtils.printBatchLog(batchId, logger, "Starting batch " + batchId + ".");
+ RebalanceUtils.printBatchLog(batchId, logger, "Starting batch "
+ + batchId + ".");
// Split the store definitions
List<StoreDefinition> readOnlyStoreDefs = StoreDefinitionUtils.filterStores(batchFinalStoreDefs,
true);
List<StoreDefinition> readWriteStoreDefs = StoreDefinitionUtils.filterStores(batchFinalStoreDefs,
false);
- boolean hasReadOnlyStores = readOnlyStoreDefs != null && readOnlyStoreDefs.size() > 0;
+ boolean hasReadOnlyStores = readOnlyStoreDefs != null
+ && readOnlyStoreDefs.size() > 0;
boolean hasReadWriteStores = readWriteStoreDefs != null
- && readWriteStoreDefs.size() > 0;
+ && readWriteStoreDefs.size() > 0;
// STEP 1 - Cluster state change
boolean finishedReadOnlyPhase = false;
@@ -353,7 +375,7 @@ private void executeBatch(int batchId, final RebalanceBatchPlan batchPlan) {
finishedReadOnlyPhase);
// STEP 2 - Move RO data
- if(hasReadOnlyStores) {
+ if (hasReadOnlyStores) {
RebalanceBatchPlanProgressBar progressBar = batchPlan.getProgressBar(batchId);
executeSubBatch(batchId,
progressBar,
@@ -381,7 +403,7 @@ private void executeBatch(int batchId, final RebalanceBatchPlan batchPlan) {
finishedReadOnlyPhase);
// STEP 4 - Move RW data
- if(hasReadWriteStores) {
+ if (hasReadWriteStores) {
proxyPause();
RebalanceBatchPlanProgressBar progressBar = batchPlan.getProgressBar(batchId);
executeSubBatch(batchId,
@@ -394,15 +416,16 @@ private void executeBatch(int batchId, final RebalanceBatchPlan batchPlan) {
finishedReadOnlyPhase);
}
- RebalanceUtils.printBatchLog(batchId, logger, "Successfully terminated batch "
- + batchId + ".");
-
- } catch(Exception e) {
- RebalanceUtils.printErrorLog(batchId,
+ RebalanceUtils.printBatchLog(batchId,
logger,
- "Error in batch " + batchId + " - " + e.getMessage(),
+ "Successfully terminated batch "
+ + batchId + ".");
+
+ } catch (Exception e) {
+ RebalanceUtils.printErrorLog(batchId, logger, "Error in batch "
+ + batchId + " - " + e.getMessage(), e);
+ throw new VoldemortException("Rebalance failed on batch " + batchId,
e);
- throw new VoldemortException("Rebalance failed on batch " + batchId, e);
}
}
@@ -412,10 +435,12 @@ private void executeBatch(int batchId, final RebalanceBatchPlan batchPlan) {
*/
private void proxyPause() {
logger.info("Pausing after cluster state has changed to allow proxy bridges to be established. "
- + "Will start rebalancing work on servers in " + proxyPauseSec + " seconds.");
+ + "Will start rebalancing work on servers in "
+ + proxyPauseSec
+ + " seconds.");
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(proxyPauseSec));
- } catch(InterruptedException e) {
+ } catch (InterruptedException e) {
logger.warn("Sleep interrupted in proxy pause.");
}
}
@@ -439,34 +464,43 @@ private void proxyPause() {
*
* Truth table, FTW!
*
- * @param batchId Rebalancing batch id
- * @param batchCurrentCluster Current cluster
- * @param batchFinalCluster Transition cluster to propagate
- * @param rebalancePartitionPlanList List of partition plan list
- * @param hasReadOnlyStores Boolean indicating if read-only stores exist
- * @param hasReadWriteStores Boolean indicating if read-write stores exist
- * @param finishedReadOnlyStores Boolean indicating if we have finished RO
- * store migration
+ * @param batchId
+ * Rebalancing batch id
+ * @param batchCurrentCluster
+ * Current cluster
+ * @param batchFinalCluster
+ * Transition cluster to propagate
+ * @param rebalancePartitionPlanList
+ * List of partition plan list
+ * @param hasReadOnlyStores
+ * Boolean indicating if read-only stores exist
+ * @param hasReadWriteStores
+ * Boolean indicating if read-write stores exist
+ * @param finishedReadOnlyStores
+ * Boolean indicating if we have finished RO store migration
*/
- private void rebalanceStateChange(final int batchId,
- Cluster batchCurrentCluster,
- List<StoreDefinition> batchCurrentStoreDefs,
- Cluster batchFinalCluster,
- List<StoreDefinition> batchFinalStoreDefs,
- List<RebalancePartitionsInfo> rebalancePartitionPlanList,
- boolean hasReadOnlyStores,
- boolean hasReadWriteStores,
- boolean finishedReadOnlyStores) {
+ private void
+ rebalanceStateChange(final int batchId,
+ Cluster batchCurrentCluster,
+ List<StoreDefinition> batchCurrentStoreDefs,
+ Cluster batchFinalCluster,
+ List<StoreDefinition> batchFinalStoreDefs,
+ List<RebalancePartitionsInfo> rebalancePartitionPlanList,
+ boolean hasReadOnlyStores,
+ boolean hasReadWriteStores,
+ boolean finishedReadOnlyStores) {
try {
- if(!hasReadOnlyStores && !hasReadWriteStores) {
+ if (!hasReadOnlyStores && !hasReadWriteStores) {
// Case 6 / 7 - no stores, exception
throw new VoldemortException("Cannot get this state since it means there are no stores");
- } else if(!hasReadOnlyStores && hasReadWriteStores && !finishedReadOnlyStores) {
+ } else if (!hasReadOnlyStores && hasReadWriteStores
+ && !finishedReadOnlyStores) {
// Case 5 - ignore
RebalanceUtils.printBatchLog(batchId,
logger,
"Ignoring state change since there are no read-only stores");
- } else if(!hasReadOnlyStores && hasReadWriteStores && finishedReadOnlyStores) {
+ } else if (!hasReadOnlyStores && hasReadWriteStores
+ && finishedReadOnlyStores) {
// Case 4 - cluster change + rebalance state change
RebalanceUtils.printBatchLog(batchId,
logger,
@@ -481,9 +515,11 @@ private void rebalanceStateChange(final int batchId,
true,
true,
true);
- } else if(hasReadOnlyStores && !finishedReadOnlyStores) {
+ } else if (hasReadOnlyStores && !finishedReadOnlyStores) {
// Case 1 / 3 - rebalance state change
- RebalanceUtils.printBatchLog(batchId, logger, "Rebalance state change");
+ RebalanceUtils.printBatchLog(batchId,
+ logger,
+ "Rebalance state change");
adminClient.rebalanceOps.rebalanceStateChange(batchCurrentCluster,
batchFinalCluster,
batchCurrentStoreDefs,
@@ -494,9 +530,12 @@ private void rebalanceStateChange(final int batchId,
true,
true,
true);
- } else if(hasReadOnlyStores && !hasReadWriteStores && finishedReadOnlyStores) {
+ } else if (hasReadOnlyStores && !hasReadWriteStores
+ && finishedReadOnlyStores) {
// Case 2 - swap + cluster change
- RebalanceUtils.printBatchLog(batchId, logger, "Swap + Cluster metadata change");
+ RebalanceUtils.printBatchLog(batchId,
+ logger,
+ "Swap + Cluster metadata change");
adminClient.rebalanceOps.rebalanceStateChange(batchCurrentCluster,
batchFinalCluster,
batchCurrentStoreDefs,
@@ -524,7 +563,7 @@ private void rebalanceStateChange(final int batchId,
true);
}
- } catch(VoldemortRebalancingException e) {
+ } catch (VoldemortRebalancingException e) {
RebalanceUtils.printErrorLog(batchId,
logger,
"Failure while changing rebalancing state",
@@ -559,23 +598,31 @@ private void rebalanceStateChange(final int batchId,
* | 7 | f | f | f | won't be triggered |
* </pre>
*
- * @param batchId Rebalance batch id
- * @param batchRollbackCluster Cluster to rollback to if we have a problem
- * @param rebalancePartitionPlanList The list of rebalance partition plans
- * @param hasReadOnlyStores Are we rebalancing any read-only stores?
- * @param hasReadWriteStores Are we rebalancing any read-write stores?
- * @param finishedReadOnlyStores Have we finished rebalancing of read-only
- * stores?
+ * @param batchId
+ * Rebalance batch id
+ * @param batchRollbackCluster
+ * Cluster to rollback to if we have a problem
+ * @param rebalancePartitionPlanList
+ * The list of rebalance partition plans
+ * @param hasReadOnlyStores
+ * Are we rebalancing any read-only stores?
+ * @param hasReadWriteStores
+ * Are we rebalancing any read-write stores?
+ * @param finishedReadOnlyStores
+ * Have we finished rebalancing of read-only stores?
*/
- private void executeSubBatch(final int batchId,
- RebalanceBatchPlanProgressBar progressBar,
- final Cluster batchRollbackCluster,
- final List<StoreDefinition> batchRollbackStoreDefs,
- final List<RebalancePartitionsInfo> rebalancePartitionPlanList,
- boolean hasReadOnlyStores,
- boolean hasReadWriteStores,
- boolean finishedReadOnlyStores) {
- RebalanceUtils.printBatchLog(batchId, logger, "Submitting rebalance tasks ");
+ private void
+ executeSubBatch(final int batchId,
+ RebalanceBatchPlanProgressBar progressBar,
+ final Cluster batchRollbackCluster,
+ final List<StoreDefinition> batchRollbackStoreDefs,
+ final List<RebalancePartitionsInfo> rebalancePartitionPlanList,
+ boolean hasReadOnlyStores,
+ boolean hasReadWriteStores,
+ boolean finishedReadOnlyStores) {
+ RebalanceUtils.printBatchLog(batchId,
+ logger,
+ "Submitting rebalance tasks ");
// Get an ExecutorService in place used for submitting our tasks
ExecutorService service = RebalanceUtils.createExecutors(maxParallelRebalancing);
@@ -586,7 +633,7 @@ private void executeSubBatch(final int batchId,
// Semaphores for donor nodes - To avoid multiple disk sweeps
Semaphore[] donorPermits = new Semaphore[batchRollbackCluster.getNumberOfNodes()];
- for(Node node: batchRollbackCluster.getNodes()) {
+ for (Node node : batchRollbackCluster.getNodes()) {
donorPermits[node.getId()] = new Semaphore(1);
}
@@ -597,25 +644,29 @@ private void executeSubBatch(final int batchId,
service,
rebalancePartitionPlanList,
donorPermits);
- RebalanceUtils.printBatchLog(batchId, logger, "All rebalance tasks submitted");
+ RebalanceUtils.printBatchLog(batchId,
+ logger,
+ "All rebalance tasks submitted");
// Wait and shutdown after (infinite) timeout
RebalanceUtils.executorShutDown(service, Long.MAX_VALUE);
- RebalanceUtils.printBatchLog(batchId, logger, "Finished waiting for executors");
+ RebalanceUtils.printBatchLog(batchId,
+ logger,
+ "Finished waiting for executors");
// Collects all failures + incomplete tasks from the rebalance
// tasks.
List<Exception> failures = Lists.newArrayList();
- for(RebalanceTask task: allTasks) {
- if(task.hasException()) {
+ for (RebalanceTask task : allTasks) {
+ if (task.hasException()) {
failedTasks.add(task);
failures.add(task.getError());
- } else if(!task.isComplete()) {
+ } else if (!task.isComplete()) {
incompleteTasks.add(task);
}
}
- if(failedTasks.size() > 0) {
+ if (failedTasks.size() > 0) {
throw new VoldemortRebalancingException("Rebalance task terminated unsuccessfully on tasks "
+ failedTasks,
failures);
@@ -627,16 +678,18 @@ private void executeSubBatch(final int batchId,
// VoldemortRebalancingException ( which will start reverting
// metadata ). The operator may want to manually then resume the
// process.
- if(incompleteTasks.size() > 0) {
+ if (incompleteTasks.size() > 0) {
throw new VoldemortException("Rebalance tasks are still incomplete / running "
- + incompleteTasks);
+ + incompleteTasks);
}
- } catch(VoldemortRebalancingException e) {
+ } catch (VoldemortRebalancingException e) {
- logger.error("Failure while migrating partitions for rebalance task " + batchId);
+ logger.error("Failure while migrating partitions for rebalance task "
+ + batchId);
- if(hasReadOnlyStores && hasReadWriteStores && finishedReadOnlyStores) {
+ if (hasReadOnlyStores && hasReadWriteStores
+ && finishedReadOnlyStores) {
// Case 0
adminClient.rebalanceOps.rebalanceStateChange(null,
batchRollbackCluster,
@@ -648,7 +701,7 @@ private void executeSubBatch(final int batchId,
false,
false,
false);
- } else if(hasReadWriteStores && finishedReadOnlyStores) {
+ } else if (hasReadWriteStores && finishedReadOnlyStores) {
// Case 4
adminClient.rebalanceOps.rebalanceStateChange(null,
batchRollbackCluster,
@@ -665,7 +718,7 @@ private void executeSubBatch(final int batchId,
throw e;
} finally {
- if(!service.isShutdown()) {
+ if (!service.isShutdown()) {
RebalanceUtils.printErrorLog(batchId,
logger,
"Could not shutdown service cleanly for rebalance task "
@@ -676,31 +729,225 @@ private void executeSubBatch(final int batchId,
}
}
- private List<RebalanceTask> executeTasks(final int batchId,
- RebalanceBatchPlanProgressBar progressBar,
- final ExecutorService service,
- List<RebalancePartitionsInfo> rebalancePartitionPlanList,
- Semaphore[] donorPermits) {
+ // TODO: (refactor) Move RebalanceController scheduler into its own class.
+ // TODO: Add unit tests for RebalanceController.scheduler.
+ // TODO: This scheduler deprecates the need for donor permits. Consider
+ // removing them.
+ /**
+ * Scheduler for rebalancing tasks. There is at most one rebalancing task
+ * per stealer-donor pair. This scheduler ensures the following invariant is
+ * obeyed:
+ *
+ * A node works on no more than one rebalancing task at a time.
+ *
+ * Note that a node working on a rebalancing task may be either a stealer or
+ * a donor. This invariant should somewhat isolate the foreground workload
+ * against the work a server must do for rebalancing. Because of this
+ * isolation, it is safe to attempt "infinite" parallelism since no more
+ * than floor(number of nodes / 2) rebalancing tasks can possibly be
+ * scheduled to execute while obeying the invariant.
+ *
+ * The order of tasks are randomized within this class. The intent is to
+ * "spread" rebalancing work smoothly out over the cluster and avoid
+ * "long tails" of straggler rebalancing tasks. Only experience will tell us
+ * if we need to do anything smarter.
+ */
+ public class Scheduler {
+
+ final private ExecutorService service;
+
+ private Map<Integer, List<StealerBasedRebalanceTask>> tasksByStealer;
+ private int numTasksExecuting;
+ private Set<Integer> nodeIdsWithWork;
+ private CountDownLatch doneSignal;
+
+ Scheduler(ExecutorService service) {
+ this.service = service;
+ }
+
+ /**
+ * Set up scheduling structures and then start scheduling tasks to
+ * execute. Blocks until all tasks have been scheduled. (For all tasks
+ * to be scheduled, most tasks must have completed.)
+ *
+ * @param sbTaskList
+ * List of all stealer-based rebalancing tasks to be
+ * scheduled.
+ */
+ public void run(List<StealerBasedRebalanceTask> sbTaskList) {
+ // Setup mapping of stealers to work for this run.
+ this.tasksByStealer = new HashMap<Integer, List<StealerBasedRebalanceTask>>();
+ for (StealerBasedRebalanceTask task : sbTaskList) {
+ if (task.getStealInfos().size() != 1) {
+ throw new VoldemortException("StealerBasedRebalanceTasks should have a list of RebalancePartitionsInfo of length 1.");
+ }
+
+ RebalancePartitionsInfo stealInfo = task.getStealInfos().get(0);
+ int stealerId = stealInfo.getStealerId();
+ if (!this.tasksByStealer.containsKey(stealerId)) {
+ this.tasksByStealer.put(stealerId,
+ new ArrayList<StealerBasedRebalanceTask>());
+ }
+ this.tasksByStealer.get(stealerId).add(task);
+ }
+
+ if (tasksByStealer.isEmpty()) {
+ return;
+ }
+
+ // Shuffle order of each stealer's work list. This randomization
+ // helps to get rid of any "patterns" in how rebalancing tasks were
+ // added to the task list passed in.
+ for (List<StealerBasedRebalanceTask> taskList : tasksByStealer.values()) {
+ Collections.shuffle(taskList);
+ }
+
+ // Prepare to execute the rebalance
+ this.numTasksExecuting = 0;
+ this.nodeIdsWithWork = new HashSet<Integer>();
+ doneSignal = new CountDownLatch(sbTaskList.size());
+
+ // Start scheduling tasks to execute!
+ scheduleMoreTasks();
+
+ try {
+ doneSignal.await();
+ } catch (InterruptedException e) {
+ logger.error("RebalancController scheduler interrupted while waiting for rebalance tasks to be scheduled.",
+ e);
+ throw new VoldemortRebalancingException("RebalancController scheduler interrupted while waiting for rebalance tasks to be scheduled.");
+
+ }
+ }
+
+ /**
+ * Schedule as many tasks as possible.
+ *
+ * To schedule a task is to make it available to the executor service to
+ * run.
+ *
+ * "As many tasks as possible" is limited by (i) the parallelism level
+ * permitted and (ii) the invariant that a node shall be part of at most
+ * one rebalancing task at a time (as either stealer or donor).
+ */
+ private synchronized void scheduleMoreTasks() {
+ RebalanceTask scheduledTask = scheduleNextTask();
+ while (scheduledTask != null) {
+ scheduledTask = scheduleNextTask();
+ }
+ }
+
+ /**
+ * Schedule at most one task.
+ *
+ * The scheduled task *must* invoke 'doneTask()' upon
+ * completion/termination.
+ *
+ * @return The task scheduled or null if not possible to schedule a task
+ * at this time.
+ */
+ private synchronized StealerBasedRebalanceTask scheduleNextTask() {
+ // Make sure there is work left to do.
+ if (doneSignal.getCount() == 0) {
+ return null;
+ }
+
+ // Limit number of tasks outstanding.
+ if (this.numTasksExecuting == maxParallelRebalancing) {
+ return null;
+ }
+
+ // Shuffle list of stealer IDs each time a new task to schedule
+ // needs to be found. Randomizing the order should avoid
+ // prioritizing one specific stealer's work ahead of all others.
+ List<Integer> stealerIds = new ArrayList<Integer>(tasksByStealer.keySet());
+ Collections.shuffle(stealerIds);
+ for (int stealerId : stealerIds) {
+ if (nodeIdsWithWork.contains(stealerId)) {
+ continue;
+ }
+ for (StealerBasedRebalanceTask sbTask : tasksByStealer.get(stealerId)) {
+ int donorId = sbTask.getStealInfos().get(0).getDonorId();
+ if (nodeIdsWithWork.contains(donorId)) {
+ continue;
+ }
+
+ // Bookkeeping for task about to execute:
+ nodeIdsWithWork.add(stealerId);
+ nodeIdsWithWork.add(donorId);
+ numTasksExecuting++;
+ // Remove this task from list thus destroying list
+ // being iterated over. This is safe because returning
+ // directly out of this branch.
+ tasksByStealer.get(stealerId).remove(sbTask);
+
+ try {
+ service.execute(sbTask);
+ } catch (RejectedExecutionException ree) {
+ logger.error("Rebalancing task rejected by executor service.",
+ ree);
+ throw new VoldemortRebalancingException("Rebalancing task rejected by executor service.");
+ }
+
+ return sbTask;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Method must be invoked upon completion of a rebalancing task. It is
+ * the task's responsibility to do so.
+ *
+ * @param stealerId
+ * @param donorId
+ */
+ public synchronized void doneTask(int stealerId, int donorId) {
+ // Bookkeeping for completed task :
+ nodeIdsWithWork.remove(stealerId);
+ nodeIdsWithWork.remove(donorId);
+ numTasksExecuting--;
+
+ doneSignal.countDown();
+
+ // Try and schedule more tasks now that resources may be available
+ // to do so.
+ scheduleMoreTasks();
+ }
+ }
+
+ private List<RebalanceTask>
+ executeTasks(final int batchId,
+ RebalanceBatchPlanProgressBar progressBar,
+ final ExecutorService service,
+ List<RebalancePartitionsInfo> rebalancePartitionPlanList,
+ Semaphore[] donorPermits) {
List<RebalanceTask> taskList = Lists.newArrayList();
int taskId = 0;
- if(stealerBasedRebalancing) {
- for(RebalancePartitionsInfo partitionsInfo: rebalancePartitionPlanList) {
+ if (stealerBasedRebalancing) {
+ Scheduler scheduler = new Scheduler(service);
+ List<StealerBasedRebalanceTask> sbTaskList = Lists.newArrayList();
+ for (RebalancePartitionsInfo partitionsInfo : rebalancePartitionPlanList) {
StealerBasedRebalanceTask rebalanceTask = new StealerBasedRebalanceTask(batchId,
taskId,
partitionsInfo,
maxTriesRebalancing,
donorPermits[partitionsInfo.getDonorId()],
adminClient,
- progressBar);
+ progressBar,
+ scheduler);
taskList.add(rebalanceTask);
- service.execute(rebalanceTask);
+ sbTaskList.add(rebalanceTask);
+ // service.execute(rebalanceTask);
taskId++;
}
+ scheduler.run(sbTaskList);
} else {
// Group by donor nodes
HashMap<Integer, List<RebalancePartitionsInfo>> donorNodeBasedPartitionsInfo = RebalanceUtils.groupPartitionsInfoByNode(rebalancePartitionPlanList,
false);
- for(Entry<Integer, List<RebalancePartitionsInfo>> entries: donorNodeBasedPartitionsInfo.entrySet()) {
+ for (Entry<Integer, List<RebalancePartitionsInfo>> entries : donorNodeBasedPartitionsInfo.entrySet()) {
DonorBasedRebalanceTask rebalanceTask = new DonorBasedRebalanceTask(batchId,
taskId,
entries.getValue(),
39 src/java/voldemort/client/rebalance/task/StealerBasedRebalanceTask.java
View
@@ -23,6 +23,7 @@
import voldemort.VoldemortException;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.rebalance.RebalanceBatchPlanProgressBar;
+import voldemort.client.rebalance.RebalanceController;
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.server.rebalance.AlreadyRebalancingException;
import voldemort.store.UnreachableStoreException;
@@ -42,6 +43,7 @@
private static final Logger logger = Logger.getLogger(StealerBasedRebalanceTask.class);
private final int stealerNodeId;
+ private final int donorNodeId;
// TODO: What is the use of maxTries for stealer-based tasks? Need to
// validate reason for existence or remove.
// NOTES FROM VINOTH:
@@ -61,13 +63,16 @@
// done.
private final int maxTries;
+ private final RebalanceController.Scheduler scheduler;
+
public StealerBasedRebalanceTask(final int batchId,
final int taskId,
final RebalancePartitionsInfo stealInfo,
final int maxTries,
final Semaphore donorPermit,
final AdminClient adminClient,
- final RebalanceBatchPlanProgressBar progressBar) {
+ final RebalanceBatchPlanProgressBar progressBar,
+ final RebalanceController.Scheduler scheduler) {
super(batchId,
taskId,
Lists.newArrayList(stealInfo),
@@ -78,6 +83,8 @@ public StealerBasedRebalanceTask(final int batchId,
this.maxTries = maxTries;
this.stealerNodeId = stealInfo.getStealerId();
+ this.donorNodeId = stealInfo.getDonorId();
+ this.scheduler = scheduler;
taskLog(toString());
}
@@ -86,17 +93,18 @@ private int startNodeRebalancing() {
int nTries = 0;
AlreadyRebalancingException rebalanceException = null;
- while(nTries < maxTries) {
+ while (nTries < maxTries) {
nTries++;
try {
-
- taskLog("Trying to start async rebalance task on stealer node " + stealerNodeId);
+ taskLog("Trying to start async rebalance task on stealer node "
+ + stealerNodeId);
int asyncOperationId = adminClient.rebalanceOps.rebalanceNode(stealInfos.get(0));
- taskLog("Started async rebalance task on stealer node " + stealerNodeId);
+ taskLog("Started async rebalance task on stealer node "
+ + stealerNodeId);
return asyncOperationId;
- } catch(AlreadyRebalancingException e) {
+ } catch (AlreadyRebalancingException e) {
taskLog("Node " + stealerNodeId
+ " is currently rebalancing. Waiting till completion");
adminClient.rpcOps.waitForCompletion(stealerNodeId,
@@ -106,8 +114,8 @@ private int startNodeRebalancing() {
}
}
- throw new VoldemortException("Failed to start rebalancing with plan: " + getStealInfos(),
- rebalanceException);
+ throw new VoldemortException("Failed to start rebalancing with plan: "
+ + getStealInfos(), rebalanceException);
}
@Override
@@ -121,27 +129,30 @@ public void run() {
rebalanceAsyncId = startNodeRebalancing();
taskStart(rebalanceAsyncId);
- adminClient.rpcOps.waitForCompletion(stealerNodeId, rebalanceAsyncId);
+ adminClient.rpcOps.waitForCompletion(stealerNodeId,
+ rebalanceAsyncId);
taskDone(rebalanceAsyncId);
- } catch(UnreachableStoreException e) {
+ } catch (UnreachableStoreException e) {
exception = e;
- logger.error("Stealer node " + stealerNodeId
+ logger.error("Stealer node "
+ + stealerNodeId
+ " is unreachable, please make sure it is up and running : "
+ e.getMessage(),
e);
- } catch(Exception e) {
+ } catch (Exception e) {
exception = e;
logger.error("Rebalance failed : " + e.getMessage(), e);
} finally {
donorPermit.release();
isComplete.set(true);
+ scheduler.doneTask(stealerNodeId, donorNodeId);
}
}
@Override
public String toString() {
- return "Stealer based rebalance task on stealer node " + stealerNodeId + " : "
- + getStealInfos();
+ return "Stealer based rebalance task on stealer node " + stealerNodeId
+ + " : " + getStealInfos();
}
}
143 src/java/voldemort/routing/BaseStoreRoutingPlan.java
View
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2013 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.routing;
+
+import java.util.List;
+
+import voldemort.VoldemortException;
+import voldemort.cluster.Cluster;
+import voldemort.cluster.Node;
+import voldemort.store.StoreDefinition;
+import voldemort.utils.ByteUtils;
+import voldemort.utils.NodeUtils;
+
+/**
+ * This class wraps up a Cluster object and a StoreDefinition. The methods are
+ * effectively helper or util style methods for querying the routing plan that
+ * will be generated for a given routing strategy upon store and cluster
+ * topology information.
+ */
+public class BaseStoreRoutingPlan {
+
+ protected final Cluster cluster;
+ protected final StoreDefinition storeDefinition;
+ protected final RoutingStrategy routingStrategy;
+
+ public BaseStoreRoutingPlan(Cluster cluster, StoreDefinition storeDefinition) {
+ this.cluster = cluster;
+ this.storeDefinition = storeDefinition;
+ this.routingStrategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition,
+ cluster);
+ }
+
+ public Cluster getCluster() {
+ return cluster;
+ }
+
+ public StoreDefinition getStoreDefinition() {
+ return storeDefinition;
+ }
+
+ /**
+ * Given a key that belong to a given node, returns a number n (< zone
+ * replication factor), such that the given node holds the key as the nth
+ * replica of the given zone
+ *
+ * eg: if the method returns 1, then given node hosts the key as the zone
+ * secondary in the given zone
+ *
+ * @param zoneId
+ * @param nodeId
+ * @param key
+ * @return zone n-ary level for key hosted on node id in zone id.
+ */
+ // TODO: add unit test.
+ public int getZoneNAry(int zoneId, int nodeId, byte[] key) {
+ if(cluster.getNodeById(nodeId).getZoneId() != zoneId) {
+ throw new VoldemortException("Node " + nodeId + " is not in zone " + zoneId
+ + "! The node is in zone "
+ + cluster.getNodeById(nodeId).getZoneId());
+ }
+
+ List<Node> replicatingNodes = this.routingStrategy.routeRequest(key);
+ int zoneNAry = -1;
+ for(Node node: replicatingNodes) {
+ // bump up the replica number once you encounter a node in the given
+ // zone
+ if(node.getZoneId() == zoneId) {
+ zoneNAry++;
+ }
+ // we are done when we find the given node
+ if(node.getId() == nodeId) {
+ return zoneNAry;
+ }
+ }
+ if(zoneNAry > -1) {
+ throw new VoldemortException("Node " + nodeId + " not a replica for the key "
+ + ByteUtils.toHexString(key) + " in given zone " + zoneId);
+ } else {
+ throw new VoldemortException("Could not find any replicas for the key "
+ + ByteUtils.toHexString(key) + " in given zone " + zoneId);
+ }
+ }
+
+ /**
+ * Given a key and a replica type n (< zone replication factor), figure out
+ * the node that contains the key as the nth replica in the given zone.
+ *
+ * @param zoneId
+ * @param zoneNary
+ * @param key
+ * @return node id that hosts zone n-ary replica for the key
+ */
+ // TODO: add unit test.
+ public int getNodeIdForZoneNary(int zoneId, int zoneNary, byte[] key) {
+ List<Node> replicatingNodes = this.routingStrategy.routeRequest(key);
+ int zoneNAry = -1;
+ for(Node node: replicatingNodes) {
+ // bump up the counter if we encounter a replica in the given zone;
+ // return current node if counter now matches requested
+ if(node.getZoneId() == zoneId) {
+ zoneNAry++;
+
+ if(zoneNAry == zoneNary) {
+ return node.getId();
+ }
+ }
+ }
+ if(zoneNAry == -1) {
+ throw new VoldemortException("Could not find any replicas for the key "
+ + ByteUtils.toHexString(key) + " in given zone " + zoneId);
+ } else {
+ throw new VoldemortException("Could not find " + (zoneNary + 1)
+ + " replicas for the key " + ByteUtils.toHexString(key)
+ + " in given zone " + zoneId + ". Only found "
+ + (zoneNAry + 1));
+ }
+ }
+
+ /**
+ * Determines the list of nodes that the key replicates to
+ *
+ * @param key
+ * @return list of nodes that key replicates to
+ */
+ public List<Integer> getReplicationNodeList(final byte[] key) {
+ return NodeUtils.getNodeIds(this.routingStrategy.routeRequest(key));
+ }
+
+}
240 src/java/voldemort/routing/StoreRoutingPlan.java
View
@@ -27,9 +27,7 @@
import voldemort.cluster.Node;
import voldemort.store.StoreDefinition;
import voldemort.store.system.SystemStoreConstants;
-import voldemort.utils.ByteUtils;
import voldemort.utils.ClusterUtils;
-import voldemort.utils.NodeUtils;
import voldemort.utils.Pair;
import voldemort.utils.Utils;
@@ -41,23 +39,18 @@
* will be generated for a given routing strategy upon store and cluster
* topology information.
*/
-public class StoreRoutingPlan {
+public class StoreRoutingPlan extends BaseStoreRoutingPlan {
- private final Cluster cluster;
- private final StoreDefinition storeDefinition;
private final Map<Integer, Integer> partitionIdToNodeIdMap;
private final Map<Integer, List<Integer>> nodeIdToNaryPartitionMap;
private final Map<Integer, List<Integer>> nodeIdToZonePrimaryMap;
- private final RoutingStrategy routingStrategy;
public StoreRoutingPlan(Cluster cluster, StoreDefinition storeDefinition) {
- this.cluster = cluster;
- this.storeDefinition = storeDefinition;
+ super(cluster, storeDefinition);
verifyClusterStoreDefinition();
this.partitionIdToNodeIdMap = ClusterUtils.getCurrentPartitionMapping(cluster);
- this.routingStrategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDefinition,
- cluster);
+
this.nodeIdToNaryPartitionMap = new HashMap<Integer, List<Integer>>();
this.nodeIdToZonePrimaryMap = new HashMap<Integer, List<Integer>>();
for(int nodeId: cluster.getNodeIds()) {
@@ -127,14 +120,6 @@ private void verifyClusterStoreDefinition() {
}
}
- public Cluster getCluster() {
- return cluster;
- }
-
- public StoreDefinition getStoreDefinition() {
- return storeDefinition;
- }
-
/**
* Determines list of partition IDs that replicate the master partition ID.
*
@@ -146,29 +131,6 @@ public StoreDefinition getStoreDefinition() {
}
/**
- * Returns all (zone n-ary) partition IDs hosted on the node.
- *
- * @param nodeId
- * @return all zone n-ary partition IDs hosted on the node in an unordered
- * list.
- */
- public List<Integer> getZoneNAryPartitionIds(int nodeId) {
- return nodeIdToNaryPartitionMap.get(nodeId);
- }
-
- /**
- * Returns all zone-primary partition IDs on node. A zone-primary means zone
- * n-ary==0. Zone-primary nodes are generally pseudo-masters in the zone and
- * receive get traffic for some partition Id.
- *
- * @param nodeId
- * @return all primary partition IDs (zone n-ary == 0) hosted on the node.
- */
- public List<Integer> getZonePrimaryPartitionIds(int nodeId) {
- return nodeIdToZonePrimaryMap.get(nodeId);
- }
-
- /**
* Determines list of partition IDs that replicate the key.
*
* @param key
@@ -179,16 +141,6 @@ public StoreDefinition getStoreDefinition() {
}
/**
- * Determines the list of nodes that the key replicates to
- *
- * @param key
- * @return list of nodes that key replicates to
- */
- public List<Integer> getReplicationNodeList(final byte[] key) {
- return NodeUtils.getNodeIds(this.routingStrategy.routeRequest(key));
- }
-
- /**
* Determines master partition ID for the key.
*
* @param key
@@ -268,46 +220,26 @@ public Integer getNodesPartitionIdForKey(int nodeId, final byte[] key) {
}
/**
- * Given a key that belong to a given node, returns a number n (< zone
- * replication factor), such that the given node holds the key as the nth
- * replica of the given zone
- *
- * eg: if the method returns 1, then given node hosts the key as the zone
- * secondary in the given zone
+ * Returns all (zone n-ary) partition IDs hosted on the node.
*
- * @param zoneId
* @param nodeId
- * @param key
- * @return zone n-ary level for key hosted on node id in zone id.
+ * @return all zone n-ary partition IDs hosted on the node in an unordered
+ * list.
*/
- // TODO: add unit test.
- public int getZoneNAry(int zoneId, int nodeId, byte[] key) {
- if(cluster.getNodeById(nodeId).getZoneId() != zoneId) {
- throw new VoldemortException("Node " + nodeId + " is not in zone " + zoneId
- + "! The node is in zone "
- + cluster.getNodeById(nodeId).getZoneId());
- }
+ public List<Integer> getZoneNAryPartitionIds(int nodeId) {
+ return nodeIdToNaryPartitionMap.get(nodeId);
+ }
- List<Node> replicatingNodes = this.routingStrategy.routeRequest(key);
- int zoneNAry = -1;
- for(Node node: replicatingNodes) {
- // bump up the replica number once you encounter a node in the given
- // zone
- if(node.getZoneId() == zoneId) {
- zoneNAry++;
- }
- // we are done when we find the given node
- if(node.getId() == nodeId) {
- return zoneNAry;
- }
- }
- if(zoneNAry > -1) {
- throw new VoldemortException("Node " + nodeId + " not a replica for the key "
- + ByteUtils.toHexString(key) + " in given zone " + zoneId);
- } else {
- throw new VoldemortException("Could not find any replicas for the key "
- + ByteUtils.toHexString(key) + " in given zone " + zoneId);
- }
+ /**
+ * Returns all zone-primary partition IDs on node. A zone-primary means zone
+ * n-ary==0. Zone-primary nodes are generally pseudo-masters in the zone and
+ * receive get traffic for some partition Id.
+ *
+ * @param nodeId
+ * @return all primary partition IDs (zone n-ary == 0) hosted on the node.
+ */
+ public List<Integer> getZonePrimaryPartitionIds(int nodeId) {
+ return nodeIdToZonePrimaryMap.get(nodeId);
}
/**
@@ -402,77 +334,6 @@ public int getReplicaType(int nodeId, int partitionId) {
}
}
- /**
- * Given a key and a replica type n (< zone replication factor), figure out
- * the node that contains the key as the nth replica in the given zone.
- *
- * @param zoneId
- * @param zoneNary
- * @param key
- * @return node id that hosts zone n-ary replica for the key
- */
- // TODO: add unit test.
- public int getNodeIdForZoneNary(int zoneId, int zoneNary, byte[] key) {
- List<Node> replicatingNodes = this.routingStrategy.routeRequest(key);
- int zoneNAry = -1;
- for(Node node: replicatingNodes) {
- // bump up the counter if we encounter a replica in the given zone;
- // return current node if counter now matches requested
- if(node.getZoneId() == zoneId) {
- zoneNAry++;
-
- if(zoneNAry == zoneNary) {
- return node.getId();
- }
- }
- }
- if(zoneNAry == -1) {
- throw new VoldemortException("Could not find any replicas for the key "
- + ByteUtils.toHexString(key) + " in given zone " + zoneId);
- } else {
- throw new VoldemortException("Could not find " + (zoneNary + 1)
- + " replicas for the key " + ByteUtils.toHexString(key)
- + " in given zone " + zoneId + ". Only found "
- + (zoneNAry + 1));
- }
- }
-
- /**
- * Determines which node hosts partition id with specified n-ary level in
- * specified zone.
- *
- * @param zoneId
- * @param zoneNary
- * @param partitionId
- * @return node ID that hosts zone n-ary replica of partition.
- */
- // TODO: add unit test.
- public int getNodeIdForZoneNary(int zoneId, int zoneNary, int partitionId) {
- List<Integer> replicatingNodeIds = getReplicationNodeList(partitionId);
-
- int zoneNAry = -1;
- for(int replicatingNodeId: replicatingNodeIds) {
- Node replicatingNode = cluster.getNodeById(replicatingNodeId);
- // bump up the counter if we encounter a replica in the given zone
- if(replicatingNode.getZoneId() == zoneId) {
- zoneNAry++;
- }
- // when the counter matches up with the replicaNumber we need, we
- // are done.
- if(zoneNAry == zoneNary) {
- return replicatingNode.getId();
- }
- }
- if(zoneNAry == 0) {
- throw new VoldemortException("Could not find any replicas for the partition "
- + partitionId + " in given zone " + zoneId);
- } else {
- throw new VoldemortException("Could not find " + zoneNary
- + " replicas for the partition " + partitionId
- + " in given zone " + zoneId + ". Only found " + zoneNAry);
- }
- }
-
// TODO: (refactor) Move from static methods to non-static methods that use
// this object's cluster and storeDefinition member for the various
// check*BelongsTo* methods. Also, tweak internal members to make these
@@ -486,9 +347,10 @@ public int getNodeIdForZoneNary(int zoneId, int zoneNary, int partitionId) {
* @param replicaToPartitionList Mapping of replica type to partition list
* @return Returns a boolean to indicate if this belongs to the map
*/
- public static boolean checkKeyBelongsToPartition(List<Integer> keyPartitions,
- List<Integer> nodePartitions,
- HashMap<Integer, List<Integer>> replicaToPartitionList) {
+ public static boolean
+ checkKeyBelongsToPartition(List<Integer> keyPartitions,
+ List<Integer> nodePartitions,
+ HashMap<Integer, List<Integer>> replicaToPartitionList) {
// Check for null
replicaToPartitionList = Utils.notNull(replicaToPartitionList);
@@ -519,11 +381,12 @@ public static boolean checkKeyBelongsToPartition(List<Integer> keyPartitions,
* @param storeDef The store definition
* @return Returns a boolean to indicate if this belongs to the map
*/
- public static boolean checkKeyBelongsToPartition(int nodeId,
- byte[] key,
- HashMap<Integer, List<Integer>> replicaToPartitionList,
- Cluster cluster,
- StoreDefinition storeDef) {
+ public static boolean
+ checkKeyBelongsToPartition(int nodeId,
+ byte[] key,
+ HashMap<Integer, List<Integer>> replicaToPartitionList,
+ Cluster cluster,
+ StoreDefinition storeDef) {
boolean checkResult = false;
if(storeDef.getRoutingStrategyType().equals(RoutingStrategyType.TO_ALL_STRATEGY)
|| storeDef.getRoutingStrategyType()
@@ -577,10 +440,11 @@ public static boolean checkPartitionBelongsToNode(int partitionId,
* @param storeDef Store definitions
* @return List of node ids
*/
- public static List<Integer> checkKeyBelongsToPartition(byte[] key,
- Set<Pair<Integer, HashMap<Integer, List<Integer>>>> stealerNodeToMappingTuples,
- Cluster cluster,
- StoreDefinition storeDef) {
+ public static List<Integer>
+ checkKeyBelongsToPartition(byte[] key,
+ Set<Pair<Integer, HashMap<Integer, List<Integer>>>> stealerNodeToMappingTuples,
+ Cluster cluster,
+ StoreDefinition storeDef) {
List<Integer> keyPartitions = new RoutingStrategyFactory().updateRoutingStrategy(storeDef,
cluster)
.getPartitionList(key);
@@ -643,4 +507,40 @@ public static boolean checkKeyBelongsToNode(byte[] key,
return replicatingPartitions.size() > 0;
}
+ /**
+ * Determines which node hosts partition id with specified n-ary level in
+ * specified zone.
+ *
+ * @param zoneId
+ * @param zoneNary
+ * @param partitionId
+ * @return node ID that hosts zone n-ary replica of partition.
+ */
+ // TODO: add unit test.
+ public int getNodeIdForZoneNary(int zoneId, int zoneNary, int partitionId) {
+ List<Integer> replicatingNodeIds = getReplicationNodeList(partitionId);
+
+ int zoneNAry = -1;
+ for(int replicatingNodeId: replicatingNodeIds) {
+ Node replicatingNode = cluster.getNodeById(replicatingNodeId);
+ // bump up the counter if we encounter a replica in the given zone
+ if(replicatingNode.getZoneId() == zoneId) {
+ zoneNAry++;
+ }
+ // when the counter matches up with the replicaNumber we need, we
+ // are done.
+ if(zoneNAry == zoneNary) {
+ return replicatingNode.getId();
+ }
+ }
+ if(zoneNAry == 0) {
+ throw new VoldemortException("Could not find any replicas for the partition "
+ + partitionId + " in given zone " + zoneId);
+ } else {
+ throw new VoldemortException("Could not find " + zoneNary
+ + " replicas for the partition " + partitionId
+ + " in given zone " + zoneId + ". Only found " + zoneNAry);
+ }
+ }
+
}
313 src/java/voldemort/store/rebalancing/RedirectingStore.java
View
@@ -30,7 +30,7 @@
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.cluster.failuredetector.FailureDetector;
-import voldemort.routing.StoreRoutingPlan;
+import voldemort.routing.BaseStoreRoutingPlan;
import voldemort.server.RequestRoutingType;
import voldemort.server.StoreRepository;
import voldemort.store.DelegatingStore;
@@ -80,7 +80,8 @@
* :) :) :)
*
*/
-public class RedirectingStore extends DelegatingStore<ByteArray, byte[], byte[]> {
+public class RedirectingStore extends
+ DelegatingStore<ByteArray, byte[], byte[]> {
private final static Logger logger = Logger.getLogger(RedirectingStore.class);
private final MetadataStore metadata;
@@ -114,14 +115,16 @@ public RedirectingStore(Store<ByteArray, byte[], byte[]> innerStore,
}
@JmxSetter(name = "setRedirectingStoreEnabled", description = "Enable the redirecting store for this store")
- public void setIsRedirectingStoreEnabled(boolean isRedirectingStoreEnabled) {
+ public void
+ setIsRedirectingStoreEnabled(boolean isRedirectingStoreEnabled) {
logger.info("Setting redirecting store flag for " + getName() + " to "
- + isRedirectingStoreEnabled);
+ + isRedirectingStoreEnabled);
this.isRedirectingStoreEnabled.set(isRedirectingStoreEnabled);
}
@JmxGetter(name = "isRedirectingStoreEnabled", description = "Get the redirecting store state for this store")
- public boolean getIsRedirectingStoreEnabled() {
+ public boolean
+ getIsRedirectingStoreEnabled() {
return this.isRedirectingStoreEnabled.get();
}
@@ -134,19 +137,19 @@ public boolean getIsRedirectingStoreEnabled() {
* @return
* @throws VoldemortException
*/
- private List<Versioned<byte[]>> redirectingGet(ByteArray key, byte[] transforms)
- throws VoldemortException {
+ private List<Versioned<byte[]>>
+ redirectingGet(ByteArray key, byte[] transforms) throws VoldemortException {
/**
* If I am rebalancing for this key, try to do remote get(), put it
* locally first to get the correct version ignoring any
* {@link ObsoleteVersionException}
*/
Integer redirectNode = getProxyNode(key.get());
- if(redirectNode != null) {
+ if (redirectNode != null) {
// First, attempt a local get
List<Versioned<byte[]>> vals = getInnerStore().get(key, transforms);
// If found, return
- if(!vals.isEmpty()) {
+ if (!vals.isEmpty()) {
/*
* There is a subtle race here if the underlying storage does
* not implement multiVersionPut(), since the we could read some
@@ -158,9 +161,10 @@ public boolean getIsRedirectingStoreEnabled() {
return vals;
}
- if(logger.isTraceEnabled()) {
- logger.trace("Proxying GET on stealer:" + metadata.getNodeId() + " for key "
- + ByteUtils.toHexString(key.get()) + " to node:" + redirectNode);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Proxying GET on stealer:" + metadata.getNodeId()
+ + " for key " + ByteUtils.toHexString(key.get())
+ + " to node:" + redirectNode);
}
proxyGetAndLocalPut(key, redirectNode, transforms);
}
@@ -183,20 +187,21 @@ public boolean getIsRedirectingStoreEnabled() {
* {@link ObsoleteVersionException}.
*/
Integer redirectNode = getProxyNode(key.get());
- if(redirectNode != null) {
+ if (redirectNode != null) {
// First, attempt a local getVersions()
List<Version> versions = getInnerStore().getVersions(key);
// If found some versions, return
- if(!versions.isEmpty()) {
+ if (!versions.isEmpty()) {
// Same caveat here as in redirectingGet(). Need multiVersionPut
// support in storage to avoid seeing partial versions
return versions;
}
- if(logger.isTraceEnabled()) {
- logger.trace("Proxying GETVERSIONS on stealer:" + metadata.getNodeId()
- + " for key " + ByteUtils.toHexString(key.get()) + " to node:"
- + redirectNode);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Proxying GETVERSIONS on stealer:"
+ + metadata.getNodeId() + " for key "
+ + ByteUtils.toHexString(key.get()) + " to node:"
+ + redirectNode);
}
proxyGetAndLocalPut(key, redirectNode, null);
}
@@ -213,17 +218,18 @@ public boolean getIsRedirectingStoreEnabled() {
* @return
* @throws VoldemortException
*/
- private Map<ByteArray, List<Versioned<byte[]>>> redirectingGetAll(Iterable<ByteArray> keys,
- Map<ByteArray, byte[]> transforms)
- throws VoldemortException {
+ private Map<ByteArray, List<Versioned<byte[]>>>
+ redirectingGetAll(Iterable<ByteArray> keys,
+ Map<ByteArray, byte[]> transforms) throws VoldemortException {
// first determine how many keys are already present locally.
- Map<ByteArray, List<Versioned<byte[]>>> localVals = getInnerStore().getAll(keys, transforms);
+ Map<ByteArray, List<Versioned<byte[]>>> localVals = getInnerStore().getAll(keys,
+ transforms);
Map<ByteArray, Integer> keyToProxyNodeMap = Maps.newHashMapWithExpectedSize(Iterables.size(keys));
- for(ByteArray key: keys) {
+ for (ByteArray key : keys) {
// Relies on inner getAll() to not return an entry for the key in
// the result hashmap, in case the key does not exist on storage
- if(localVals.containsKey(key)) {
+ if (localVals.containsKey(key)) {
// if you have it locally, move to next key
continue;
}
@@ -235,7 +241,7 @@ public boolean getIsRedirectingStoreEnabled() {
* this is a non-existent key. We can't really confirm key does not
* exist, without going to the proxy node..
*/
- if(redirectNode != null) {
+ if (redirectNode != null) {
/*
* If we are indeed rebalancing for the key, then a proxy fetch
* will make things certain.
@@ -245,13 +251,13 @@ public boolean getIsRedirectingStoreEnabled() {
}
// If all keys were present locally, return. If not, do proxy fetch
- if(!keyToProxyNodeMap.isEmpty()) {
- if(logger.isTraceEnabled()) {
+ if (!keyToProxyNodeMap.isEmpty()) {
+ if (logger.isTraceEnabled()) {
String keyStr = "";
- for(ByteArray key: keys)
+ for (ByteArray key : keys)
keyStr += key + " ";
- logger.trace("Proxying GETALL on stealer:" + metadata.getNodeId() + " for keys "
- + keyStr);
+ logger.trace("Proxying GETALL on stealer:"
+ + metadata.getNodeId() + " for keys " + keyStr);
}
// Issue proxy fetches for non-rebalancing keys that did not exist
// locally
@@ -260,7 +266,7 @@ public boolean getIsRedirectingStoreEnabled() {
Map<ByteArray, List<Versioned<byte[]>>> proxyFetchedVals = getInnerStore().getAll(keyToProxyNodeMap.keySet(),
transforms);
// Merge the results
- for(Map.Entry<ByteArray, List<Versioned<byte[]>>> entry: proxyFetchedVals.entrySet()) {
+ for (Map.Entry<ByteArray, List<Versioned<byte[]>>> entry : proxyFetchedVals.entrySet()) {
localVals.put(entry.getKey(), entry.getValue());
}
}
@@ -277,8 +283,9 @@ public boolean getIsRedirectingStoreEnabled() {
* @param transforms
* @throws VoldemortException
*/
- private void redirectingPut(ByteArray key, Versioned<byte[]> value, byte[] transforms)
- throws VoldemortException {
+ private void redirectingPut(ByteArray key,
+ Versioned<byte[]> value,
+ byte[] transforms) throws VoldemortException {
Cluster currentCluster = metadata.getCluster();
// TODO:refactor O(n) linear lookup of storedef here. Ideally should be
// a hash lookup.
@@ -288,29 +295,34 @@ private void redirectingPut(ByteArray key, Versioned<byte[]> value, byte[] trans
* doing puts against it. We don't to do extra work and fill the log
* with errors in that case.
*/
- if(storeDef.getType().compareTo(ReadOnlyStorageConfiguration.TYPE_NAME) == 0) {
+ if (storeDef.getType()
+ .compareTo(ReadOnlyStorageConfiguration.TYPE_NAME) == 0) {
throw new UnsupportedOperationException("put() not supported on read-only store");
}
- StoreRoutingPlan currentRoutingPlan = new StoreRoutingPlan(currentCluster, storeDef);
- Integer redirectNode = getProxyNode(currentRoutingPlan, storeDef, key.get());
+ BaseStoreRoutingPlan currentRoutingPlan = new BaseStoreRoutingPlan(currentCluster,
+ storeDef);
+ Integer redirectNode = getProxyNode(currentRoutingPlan,
+ storeDef,
+ key.get());
/**
* If I am rebalancing for this key, try to do remote get() if this node
* does not have the key , put it locally first to get the correct
* version ignoring any {@link ObsoleteVersionException}
*/
- if(redirectNode != null) {
+ if (redirectNode != null) {
/*
* first check if the key exists locally. If so, it means, it has
* been moved over (either by a proxy fetch or background fetch) and
* we are good simply issuing the put on top of that.
*/
List<Versioned<byte[]>> vals = getInnerStore().get(key, transforms);
- if(vals.isEmpty()) {
+ if (vals.isEmpty()) {
// if not, then go proxy fetch it
- if(logger.isTraceEnabled()) {
- logger.trace("Proxying GET (before PUT) on stealer:" + metadata.getNodeId()
- + " for key " + ByteUtils.toHexString(key.get()) + " to node:"
- + redirectNode);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Proxying GET (before PUT) on stealer:"
+ + metadata.getNodeId() + " for key "
+ + ByteUtils.toHexString(key.get()) + " to node:"
+ + redirectNode);
}
proxyGetAndLocalPut(key, redirectNode, transforms);
}
@@ -326,8 +338,10 @@ private void redirectingPut(ByteArray key, Versioned<byte[]> value, byte[] trans
// could have a situation where the online replicated write could lose
// out to the proxy put and hence fail the client operation with an
// OVE). So do not send proxy puts in those cases.
- if(isProxyPutEnabled && redirectNode != null
- && !currentRoutingPlan.getReplicationNodeList(key.get()).contains(redirectNode)) {
+ if (isProxyPutEnabled
+ && redirectNode != null
+ && !currentRoutingPlan.getReplicationNodeList(key.get())
+ .contains(redirectNode)) {
AsyncProxyPutTask asyncProxyPutTask = new AsyncProxyPutTask(this,
key,
value,
@@ -339,8 +353,9 @@ private void redirectingPut(ByteArray key, Versioned<byte[]> value, byte[] trans
}
@Override
- public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws VoldemortException {
- if(isServerRebalancing()) {
+ public List<Versioned<byte[]>>
+ get(ByteArray key, byte[] transforms) throws VoldemortException {
+ if (isServerRebalancing()) {
return redirectingGet(key, transforms);
} else {
return getInnerStore().get(key, transforms);
@@ -349,7 +364,7 @@ private void redirectingPut(ByteArray key, Versioned<byte[]> value, byte[] trans
@Override
public List<Version> getVersions(ByteArray key) {
- if(isServerRebalancing()) {
+ if (isServerRebalancing()) {
return redirectingGetVersions(key);
} else {
return getInnerStore().getVersions(key);
@@ -357,10 +372,9 @@ private void redirectingPut(ByteArray key, Versioned<byte[]> value, byte[] trans
}
@Override
- public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
- Map<ByteArray, byte[]> transforms)
- throws VoldemortException {
- if(isServerRebalancing()) {
+ public Map<ByteArray, List<Versioned<byte[]>>>
+ getAll(Iterable<ByteArray> keys, Map<ByteArray, byte[]> transforms) throws VoldemortException {
+ if (isServerRebalancing()) {
return redirectingGetAll(keys, transforms);
} else {
return getInnerStore().getAll(keys, transforms);
@@ -368,9 +382,9 @@ private void redirectingPut(ByteArray key, Versioned<byte[]> value, byte[] trans
}
@Override
- public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
- throws VoldemortException {
- if(isServerRebalancing()) {
+ public void
+ put(ByteArray key, Versioned<byte[]> value, byte[] transforms) throws VoldemortException {
+ if (isServerRebalancing()) {
redirectingPut(key, value, transforms);
} else {
getInnerStore().put(key, value, transforms);
@@ -395,7 +409,8 @@ public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
* </ol>
*/
@Override
- public boolean delete(ByteArray key, Version version) throws VoldemortException {
+ public boolean
+ delete(ByteArray key, Version version) throws VoldemortException {
StoreUtils.assertValidKey(key);
return getInnerStore()