Skip to content

Commit

Permalink
Added "progress bar" for rebalance batch plan
Browse files Browse the repository at this point in the history
Added RebalanceBatchPlanProgressBar
- progress tracking object for each rebalancing batch
- integrated with RebalanceController, AsyncRebalanceTask, and
  (Donor|Stealer)BasedRebalanceTask

Did other general clean up of logging during rebalance to make it
appropriately verbose (more verbose in some places, less verbose in
others...)
  • Loading branch information
jayjwylie committed Jun 20, 2013
1 parent c9b370e commit 4c276b8
Show file tree
Hide file tree
Showing 12 changed files with 404 additions and 99 deletions.
3 changes: 2 additions & 1 deletion src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -698,9 +698,10 @@ public String waitForCompletion(int nodeId,
while(System.currentTimeMillis() < waitUntil) {
try {
AsyncOperationStatus status = getAsyncRequestStatus(nodeId, requestId);
if(!status.getStatus().equalsIgnoreCase(oldStatus))
if(!status.getStatus().equalsIgnoreCase(oldStatus)) {
logger.info("Status from node " + nodeId + " (" + status.getDescription()
+ ") - " + status.getStatus());
}
oldStatus = status.getStatus();

if(higherStatus != null) {
Expand Down
13 changes: 13 additions & 0 deletions src/java/voldemort/client/rebalance/RebalanceBatchPlan.java
Expand Up @@ -119,6 +119,10 @@ public List<RebalancePartitionsInfo> getBatchPlan() {
return batchPlan;
}

public RebalanceBatchPlanProgressBar getProgressBar(int batchId) {
return new RebalanceBatchPlanProgressBar(batchId, getTaskCount(), getPartitionStoreMoves());
}

public MoveMap getZoneMoveMap() {
MoveMap moveMap = new MoveMap(finalCluster.getZoneIds());

Expand Down Expand Up @@ -175,6 +179,15 @@ public int getPartitionStoreMoves() {
return partitionStoreMoves;
}

/**
* Returns the number of rebalance tasks in this batch.
*
* @return number of rebalance tasks in this batch
*/
public int getTaskCount() {
return batchPlan.size();
}

// TODO: (replicaType) As part of dropping replicaType and
// RebalancePartitionsInfo from code, simplify this object.
/**
Expand Down
144 changes: 144 additions & 0 deletions src/java/voldemort/client/rebalance/RebalanceBatchPlanProgressBar.java
@@ -0,0 +1,144 @@
/*
* Copyright 2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package voldemort.client.rebalance;

import java.text.DecimalFormat;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

import voldemort.utils.Utils;

public class RebalanceBatchPlanProgressBar {

private static final Logger logger = Logger.getLogger(RebalanceBatchPlanProgressBar.class);
private static final DecimalFormat decimalFormatter = new DecimalFormat("#.##");

private final int batchId;
private final int taskCount;
private final int partitionStoreCount;

private final long startTimeMs;

private Set<Integer> tasksInFlight;
private int numTasks;
private int numPartitionStores;

RebalanceBatchPlanProgressBar(int batchId, int taskCount, int partitionStoreCount) {
this.batchId = batchId;
this.taskCount = taskCount;
this.partitionStoreCount = partitionStoreCount;

this.startTimeMs = System.currentTimeMillis();

this.tasksInFlight = new HashSet<Integer>();
this.numTasks = 0;
this.numPartitionStores = 0;
}

/**
* Called whenever a rebalance task starts.
*
* @param taskId
*/
synchronized public void beginTask(int taskId) {
tasksInFlight.add(taskId);

updateProgressBar();
}

/**
* Called whenever a rebalance task completes. This means one task is done
* and some number of partition stores have been moved.
*
* @param taskId
* @param partitionStoreCount Number of partition stores moved by this
* completed task.
*/
synchronized public void completeTask(int taskId, int taskPartitionStores) {
tasksInFlight.remove(taskId);

numTasks++;
numPartitionStores += taskPartitionStores;

updateProgressBar();
}

/**
* Construct a pretty string documenting progress for this batch plan thus
* far.
*
* @return
*/
synchronized public String getPrettyProgressBar() {
StringBuilder sb = new StringBuilder();

double taskRate = numTasks / (double) taskCount;
double partitionStoreRate = numPartitionStores / (double) partitionStoreCount;

long deltaTimeMs = System.currentTimeMillis() - startTimeMs;
long taskTimeRemainingMs = Long.MAX_VALUE;
if(taskRate > 0) {
taskTimeRemainingMs = (long) (deltaTimeMs * ((1.0 / taskRate) - 1.0));
}
long partitionStoreTimeRemainingMs = Long.MAX_VALUE;
if(partitionStoreRate > 0) {
partitionStoreTimeRemainingMs = (long) (deltaTimeMs * ((1.0 / partitionStoreRate) - 1.0));
}

// Title line
sb.append("Progess update on rebalancing batch " + batchId).append(Utils.NEWLINE);
// Tasks in flight update
sb.append("There are currently " + tasksInFlight.size() + " rebalance tasks executing: ")
.append(tasksInFlight)
.append(".")
.append(Utils.NEWLINE);
// Tasks completed update
sb.append("\t" + numTasks + " out of " + taskCount + " rebalance tasks complete.")
.append(Utils.NEWLINE)
.append("\t")
.append(decimalFormatter.format(taskRate * 100.0))
.append("% done, estimate ")
.append(taskTimeRemainingMs)
.append(" ms (")
.append(TimeUnit.MILLISECONDS.toMinutes(taskTimeRemainingMs))
.append(" minutes) remaining.")
.append(Utils.NEWLINE);
// Partition-stores migrated update
sb.append("\t" + numPartitionStores + " out of " + partitionStoreCount
+ " partition-stores migrated.")
.append(Utils.NEWLINE)
.append("\t")
.append(decimalFormatter.format(partitionStoreRate * 100.0))
.append("% done, estimate ")
.append(partitionStoreTimeRemainingMs)
.append(" ms (")
.append(TimeUnit.MILLISECONDS.toMinutes(partitionStoreTimeRemainingMs))
.append(" minutes) remaining.")
.append(Utils.NEWLINE);
return sb.toString();
}

public void updateProgressBar() {
if(logger.isInfoEnabled()) {
String progressBar = getPrettyProgressBar();
logger.info(progressBar);
}
}
}

0 comments on commit 4c276b8

Please sign in to comment.