diff --git a/src/java/voldemort/client/rebalance/RebalanceController.java b/src/java/voldemort/client/rebalance/RebalanceController.java index 9620311acc..7c781e57c9 100644 --- a/src/java/voldemort/client/rebalance/RebalanceController.java +++ b/src/java/voldemort/client/rebalance/RebalanceController.java @@ -17,9 +17,15 @@ package voldemort.client.rebalance; import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -676,6 +682,118 @@ private void executeSubBatch(final int batchId, } } + public class Scheduler { + + final private ExecutorService service; + + private Map> tasksByStealer; + private int numTasksExecuting; + private Set nodeIdsWithWork; + private CountDownLatch doneSignal; + + Scheduler(ExecutorService service) { + this.service = service; + } + + public void run(List sbTaskList) { + // Setup for this run. + this.tasksByStealer = new HashMap>(); + for(StealerBasedRebalanceTask task: sbTaskList) { + if(task.getStealInfos().size() != 1) { + throw new VoldemortException("StealerBasedRebalanceTasks should have a list of RebalancePartitionsInfo of length 1."); + } + + RebalancePartitionsInfo stealInfo = task.getStealInfos().get(0); + int stealerId = stealInfo.getStealerId(); + if(!this.tasksByStealer.containsKey(stealerId)) { + this.tasksByStealer.put(stealerId, new ArrayList()); + } + this.tasksByStealer.get(stealerId).add(task); + } + + if(tasksByStealer.isEmpty()) { + return; + } + + this.numTasksExecuting = 0; + this.nodeIdsWithWork = new HashSet(); + doneSignal = new CountDownLatch(sbTaskList.size()); + + scheduleMoreTasks(); + + try { + doneSignal.await(); + } catch(InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + private synchronized void scheduleMoreTasks() { + RebalanceTask scheduledTask = scheduleNextTask(); + while(scheduledTask != null) { + scheduledTask = scheduleNextTask(); + } + } + + private synchronized StealerBasedRebalanceTask scheduleNextTask() { + // Make sure there is work left to do. + if(doneSignal.getCount() == 0) { + return null; + } + + // Limit number of tasks outstanding. + if(this.numTasksExecuting == maxParallelRebalancing) { + return null; + } + + // Should probably round-robin among stealerIds. But, its easier to + // randomly shuffle list of stealer IDs each time a new task to + // schedule needs to be found. In theory, either round-robin'ing or + // shuffling will avoid prioritizing one specific stealers work + // ahead of all others. + List stealerIds = new ArrayList(tasksByStealer.keySet()); + Collections.shuffle(stealerIds); + for(int stealerId: stealerIds) { + if(nodeIdsWithWork.contains(stealerId)) { + continue; + } + for(StealerBasedRebalanceTask sbTask: tasksByStealer.get(stealerId)) { + int donorId = sbTask.getStealInfos().get(0).getDonorId(); + if(nodeIdsWithWork.contains(donorId)) { + continue; + } + + // Bookkeeping for task about to execute: + nodeIdsWithWork.add(stealerId); + nodeIdsWithWork.add(donorId); + numTasksExecuting++; + // Remove this task from list thus destroying list + // being iterated over. This is safe because returning + // directly out of this branch. + tasksByStealer.get(stealerId).remove(sbTask); + + service.execute(sbTask); + return sbTask; + } + } + + return null; + } + + // There should only be one task per stealer-donor and so these ids + // uniquely identify a stealer-based rebalancing task. + public synchronized void doneTask(int stealerId, int donorId) { + nodeIdsWithWork.remove(stealerId); + nodeIdsWithWork.remove(donorId); + numTasksExecuting--; + + doneSignal.countDown(); + + scheduleMoreTasks(); + } + } + private List executeTasks(final int batchId, RebalanceBatchPlanProgressBar progressBar, final ExecutorService service, @@ -684,6 +802,8 @@ private List executeTasks(final int batchId, List taskList = Lists.newArrayList(); int taskId = 0; if(stealerBasedRebalancing) { + Scheduler scheduler = new Scheduler(service); + List sbTaskList = Lists.newArrayList(); for(RebalancePartitionsInfo partitionsInfo: rebalancePartitionPlanList) { StealerBasedRebalanceTask rebalanceTask = new StealerBasedRebalanceTask(batchId, taskId, @@ -691,11 +811,14 @@ private List executeTasks(final int batchId, maxTriesRebalancing, donorPermits[partitionsInfo.getDonorId()], adminClient, - progressBar); + progressBar, + scheduler); taskList.add(rebalanceTask); - service.execute(rebalanceTask); + sbTaskList.add(rebalanceTask); + // service.execute(rebalanceTask); taskId++; } + scheduler.run(sbTaskList); } else { // Group by donor nodes HashMap> donorNodeBasedPartitionsInfo = RebalanceUtils.groupPartitionsInfoByNode(rebalancePartitionPlanList, diff --git a/src/java/voldemort/client/rebalance/task/StealerBasedRebalanceTask.java b/src/java/voldemort/client/rebalance/task/StealerBasedRebalanceTask.java index 37da1b90a9..e4026ab5d3 100644 --- a/src/java/voldemort/client/rebalance/task/StealerBasedRebalanceTask.java +++ b/src/java/voldemort/client/rebalance/task/StealerBasedRebalanceTask.java @@ -23,6 +23,7 @@ import voldemort.VoldemortException; import voldemort.client.protocol.admin.AdminClient; import voldemort.client.rebalance.RebalanceBatchPlanProgressBar; +import voldemort.client.rebalance.RebalanceController; import voldemort.client.rebalance.RebalancePartitionsInfo; import voldemort.server.rebalance.AlreadyRebalancingException; import voldemort.store.UnreachableStoreException; @@ -42,6 +43,7 @@ public class StealerBasedRebalanceTask extends RebalanceTask { private static final Logger logger = Logger.getLogger(StealerBasedRebalanceTask.class); private final int stealerNodeId; + private final int donorNodeId; // TODO: What is the use of maxTries for stealer-based tasks? Need to // validate reason for existence or remove. // NOTES FROM VINOTH: @@ -61,13 +63,16 @@ public class StealerBasedRebalanceTask extends RebalanceTask { // done. private final int maxTries; + private final RebalanceController.Scheduler scheduler; + public StealerBasedRebalanceTask(final int batchId, final int taskId, final RebalancePartitionsInfo stealInfo, final int maxTries, final Semaphore donorPermit, final AdminClient adminClient, - final RebalanceBatchPlanProgressBar progressBar) { + final RebalanceBatchPlanProgressBar progressBar, + final RebalanceController.Scheduler scheduler) { super(batchId, taskId, Lists.newArrayList(stealInfo), @@ -78,6 +83,9 @@ public StealerBasedRebalanceTask(final int batchId, this.maxTries = maxTries; this.stealerNodeId = stealInfo.getStealerId(); + this.donorNodeId = stealInfo.getDonorId(); + + this.scheduler = scheduler; taskLog(toString()); } @@ -136,6 +144,7 @@ public void run() { } finally { donorPermit.release(); isComplete.set(true); + scheduler.doneTask(stealerNodeId, donorNodeId); } }