Skip to content

Commit

Permalink
Rebalancing tested for all failures modes locally.
Browse files Browse the repository at this point in the history
  • Loading branch information
bbansal committed Dec 10, 2009
1 parent 6c1f77f commit db4c0fa
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 51 deletions.
2 changes: 1 addition & 1 deletion bin/run-class.sh
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ if [ -z $VOLD_OPTS ]; then
fi fi


export CLASSPATH export CLASSPATH
java $VOLD_OPTS -cp $CLASSPATH $@ java -Dlog4j.configuration=src/java/log4j.properties $VOLD_OPTS -cp $CLASSPATH $@
4 changes: 2 additions & 2 deletions bin/voldemort-rebalance-shell.sh
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
# limitations under the License. # limitations under the License.
# #


if [ $# -lt 2 ]; if [ $# -lt 3 ];
then then
echo 'USAGE: bin/voldemort-shell.sh currentCluster.xml targetCluster.xml stores.xml' echo 'USAGE: bin/voldemort-shell.sh currentCluster.xml targetCluster.xml stores.xml numParallelRebalancing'
exit 1 exit 1
fi fi


Expand Down
2 changes: 1 addition & 1 deletion bin/voldemort-server.sh
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ if [ -z $VOLD_OPTS ]; then
VOLD_OPTS="-Xmx2G -server -Dcom.sun.management.jmxremote" VOLD_OPTS="-Xmx2G -server -Dcom.sun.management.jmxremote"
fi fi


java $VOLD_OPTS -cp $CLASSPATH voldemort.server.VoldemortServer $@ java -Dlog4j.configuration=src/java/log4j.properties $VOLD_OPTS -cp $CLASSPATH voldemort.server.VoldemortServer $@
7 changes: 4 additions & 3 deletions src/java/log4j.properties
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ log4j.logger.httpclient.wire=INFO
log4j.logger.org.mortbay.log=WARN log4j.logger.org.mortbay.log=WARN
log4j.logger.voldemort.store.routed=INFO log4j.logger.voldemort.store.routed=INFO
log4j.logger.voldemort.server.niosocket=INFO log4j.logger.voldemort.server.niosocket=INFO
log4j.logger.voldemort.utils=DEBUG log4j.logger.voldemort.utils=INFO
log4j.logger.voldemort.client.rebalance=DEBUG log4j.logger.voldemort.client.rebalance=INFO
log4j.logger.voldemort.server.protocol.admin=DEBUG log4j.logger.voldemort.server.protocol.admin=INFO
log4j.logger.voldemort.server=INFO
8 changes: 4 additions & 4 deletions src/java/voldemort/client/rebalance/RebalanceClient.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public void run() {


while(rebalanceSubTaskList.size() > 0) { while(rebalanceSubTaskList.size() > 0) {
RebalanceStealInfo rebalanceSubTask = rebalanceSubTaskList.remove(0); RebalanceStealInfo rebalanceSubTask = rebalanceSubTaskList.remove(0);
logger.debug("Starting rebalancing for stealerNode:" + stealerNode logger.info("Starting rebalancing for stealerNode:" + stealerNode
+ " rebalanceInfo:" + rebalanceSubTask); + " rebalanceInfo:" + rebalanceSubTask);


try { try {


Expand All @@ -101,8 +101,8 @@ public void run() {
// attempt to rebalance for all stores. // attempt to rebalance for all stores.
attemptRebalanceSubTask(rebalanceSubTask); attemptRebalanceSubTask(rebalanceSubTask);


logger.debug("Successfully finished RebalanceSubTask attempt:" logger.info("Successfully finished RebalanceSubTask attempt:"
+ rebalanceSubTask); + rebalanceSubTask);
} catch(Exception e) { } catch(Exception e) {
logger.warn("rebalancing task (" + rebalanceSubTask logger.warn("rebalancing task (" + rebalanceSubTask
+ ") failed with exception:", e); + ") failed with exception:", e);
Expand Down
2 changes: 1 addition & 1 deletion src/java/voldemort/server/VoldemortConfig.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public VoldemortConfig(Props props) {
// rebalancing parameters // rebalancing parameters
this.maxRebalancingAttempt = props.getInt("max.rebalancing.attempts", 3); this.maxRebalancingAttempt = props.getInt("max.rebalancing.attempts", 3);
this.rebalancingTimeoutInSeconds = props.getInt("rebalancing.timeout.seconds", 60 * 60); this.rebalancingTimeoutInSeconds = props.getInt("rebalancing.timeout.seconds", 60 * 60);
this.rebalancingServicePeriod = props.getInt("rebalancing.service.period.ms", 5 * 60 * 1000); this.rebalancingServicePeriod = props.getInt("rebalancing.service.period.ms", 1000);


// network class loader disable by default. // network class loader disable by default.
this.enableNetworkClassLoader = props.getBoolean("enable.network.classloader", false); this.enableNetworkClassLoader = props.getBoolean("enable.network.classloader", false);
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -264,7 +264,13 @@ public void handleUpdatePartitionEntries(VAdminProto.UpdatePartitionEntriesReque
Versioned<byte[]> value = ProtoUtils.decodeVersioned(partitionEntry.getVersioned()); Versioned<byte[]> value = ProtoUtils.decodeVersioned(partitionEntry.getVersioned());


if(filter.accept(key, value)) { if(filter.accept(key, value)) {
storageEngine.put(key, value); try {
storageEngine.put(key, value);

} catch(ObsoleteVersionException e) {
// log and ignore
logger.debug("updateEntries (Streaming put) threw ObsoleteVersionException .. Ignoring.");
}


if(throttler != null) { if(throttler != null) {
throttler.maybeThrottle(entrySize(Pair.create(key, value))); throttler.maybeThrottle(entrySize(Pair.create(key, value)));
Expand Down Expand Up @@ -298,12 +304,6 @@ public VAdminProto.AsyncOperationStatusResponse handleRebalanceNode(VAdminProto.
throw new VoldemortException("Rebalance service is not enabled for node:" throw new VoldemortException("Rebalance service is not enabled for node:"
+ metadataStore.getNodeId()); + metadataStore.getNodeId());


if(!rebalancer.acquireRebalancingPermit()) {
throw new VoldemortException("Node:"
+ metadataStore.getNodeId()
+ " is already rebalancing cannot start new rebalancing request.");
}

RebalanceStealInfo rebalanceStealInfo = new RebalanceStealInfo(request.getStealerId(), RebalanceStealInfo rebalanceStealInfo = new RebalanceStealInfo(request.getStealerId(),
request.getDonorId(), request.getDonorId(),
request.getPartitionsList(), request.getPartitionsList(),
Expand All @@ -313,6 +313,12 @@ public VAdminProto.AsyncOperationStatusResponse handleRebalanceNode(VAdminProto.
int requestId = rebalancer.rebalanceLocalNode(request.getCurrentStore(), int requestId = rebalancer.rebalanceLocalNode(request.getCurrentStore(),
rebalanceStealInfo); rebalanceStealInfo);


if(-1 == requestId) {
throw new VoldemortException("Node:"
+ metadataStore.getNodeId()
+ " is already rebalancing cannot start new rebalancing request.");
}

response.setRequestId(requestId) response.setRequestId(requestId)
.setDescription(rebalanceStealInfo.toString()) .setDescription(rebalanceStealInfo.toString())
.setStatus("started") .setStatus("started")
Expand Down Expand Up @@ -363,7 +369,7 @@ public void operate() {
entry.getSecond()); entry.getSecond());
} catch(ObsoleteVersionException e) { } catch(ObsoleteVersionException e) {
// log and ignore // log and ignore
logger.warn("FetchAndUpdate threw ObsoleteVersionException .. Ignoring."); logger.debug("FetchAndUpdate threw ObsoleteVersionException .. Ignoring.");
} }


throttler.maybeThrottle(entrySize(entry)); throttler.maybeThrottle(entrySize(entry));
Expand Down
82 changes: 51 additions & 31 deletions src/java/voldemort/server/rebalance/Rebalancer.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public class Rebalancer implements Runnable {


private final AtomicBoolean rebalancePermit = new AtomicBoolean(false); private final AtomicBoolean rebalancePermit = new AtomicBoolean(false);
private final MetadataStore metadataStore; private final MetadataStore metadataStore;
private final AdminClient adminClient;
private final AsyncOperationRunner asyncRunner; private final AsyncOperationRunner asyncRunner;
private final VoldemortConfig config; private final VoldemortConfig config;


Expand All @@ -36,53 +35,52 @@ public Rebalancer(MetadataStore metadataStore,
this.metadataStore = metadataStore; this.metadataStore = metadataStore;
this.asyncRunner = asyncRunner; this.asyncRunner = asyncRunner;
this.config = config; this.config = config;
this.adminClient = RebalanceUtils.createTempAdminClient(config, metadataStore.getCluster());
} }


public void start() { public void start() {
// add startup time stuff here. // add startup time stuff here.
} }


/** public void stop() {}
* After the current operation finishes, no longer gossip.
*/
public void stop() {
try {
adminClient.stop();
} catch(Exception e) {
logger.error("Error while closing adminClient.", e);
}
}


public boolean acquireRebalancingPermit() { private boolean acquireRebalancingPermit() {
if(rebalancePermit.compareAndSet(false, true)) if(rebalancePermit.compareAndSet(false, true))
return true; return true;


return false; return false;
} }


public void releaseRebalancingPermit() { private void releaseRebalancingPermit() {
if(!rebalancePermit.compareAndSet(true, false)) { if(!rebalancePermit.compareAndSet(true, false)) {
throw new VoldemortException("Invalid state rebalancePermit must be true here."); throw new VoldemortException("Invalid state rebalancePermit must be true here.");
} }
} }


public void run() { public void run() {
logger.debug("rebalancer run() called.");
if(VoldemortState.REBALANCING_MASTER_SERVER.equals(metadataStore.getServerState()) if(VoldemortState.REBALANCING_MASTER_SERVER.equals(metadataStore.getServerState())
&& acquireRebalancingPermit()) { && acquireRebalancingPermit()) {

// free permit here for rebalanceLocalNode to acquire.
releaseRebalancingPermit();

RebalanceStealInfo stealInfo = metadataStore.getRebalancingStealInfo(); RebalanceStealInfo stealInfo = metadataStore.getRebalancingStealInfo();
logger.warn("Rebalance server found incomplete rebalancing attempt restarting "
+ stealInfo);

if(stealInfo.getAttempt() < config.getMaxRebalancingAttempt()) {
attemptRebalance(stealInfo);
} else {
logger.warn("Rebalancing for rebalancing task:" + stealInfo
+ " failed multiple times, Aborting more trials...");
}


// clean all rebalancing state try {
metadataStore.cleanAllRebalancingState(); logger.warn("Rebalance server found incomplete rebalancing attempt " + stealInfo
+ " restarting ...");

if(stealInfo.getAttempt() < config.getMaxRebalancingAttempt()) {
attemptRebalance(stealInfo);
} else {
logger.warn("Rebalancing for rebalancing task:" + stealInfo
+ " failed multiple times, Aborting more trials...");
metadataStore.cleanAllRebalancingState();
}
} catch(Exception e) {
logger.error("RebalanceService rebalancing attempt " + stealInfo
+ " failed with exception", e);
}
} }
} }


Expand All @@ -91,8 +89,14 @@ private void attemptRebalance(RebalanceStealInfo stealInfo) {
List<String> unbalanceStoreList = ImmutableList.copyOf(stealInfo.getUnbalancedStoreList()); List<String> unbalanceStoreList = ImmutableList.copyOf(stealInfo.getUnbalancedStoreList());


for(String storeName: unbalanceStoreList) { for(String storeName: unbalanceStoreList) {
AdminClient adminClient = RebalanceUtils.createTempAdminClient(config,
metadataStore.getCluster());
try { try {
int rebalanceAsyncId = rebalanceLocalNode(storeName, stealInfo); int rebalanceAsyncId = rebalanceLocalNode(storeName, stealInfo);
if(-1 == rebalanceAsyncId) {
logger.warn("rebalancer is already running, aborting this rebalanceService run() ..");
return;
}


adminClient.waitForCompletion(stealInfo.getStealerId(), adminClient.waitForCompletion(stealInfo.getStealerId(),
rebalanceAsyncId, rebalanceAsyncId,
Expand All @@ -102,6 +106,8 @@ private void attemptRebalance(RebalanceStealInfo stealInfo) {
stealInfo.getUnbalancedStoreList().remove(storeName); stealInfo.getUnbalancedStoreList().remove(storeName);
} catch(Exception e) { } catch(Exception e) {
logger.warn("rebalanceSubTask:" + stealInfo + " failed for store:" + storeName, e); logger.warn("rebalanceSubTask:" + stealInfo + " failed for store:" + storeName, e);
} finally {
adminClient.stop();
} }
} }
} }
Expand All @@ -121,6 +127,10 @@ private void attemptRebalance(RebalanceStealInfo stealInfo) {
* @return taskId for asynchronous task. * @return taskId for asynchronous task.
*/ */
public int rebalanceLocalNode(final String storeName, final RebalanceStealInfo stealInfo) { public int rebalanceLocalNode(final String storeName, final RebalanceStealInfo stealInfo) {

if(!acquireRebalancingPermit())
return -1;

int requestId = asyncRunner.getUniqueRequestId(); int requestId = asyncRunner.getUniqueRequestId();


asyncRunner.submitOperation(requestId, new AsyncOperation(requestId, stealInfo.toString()) { asyncRunner.submitOperation(requestId, new AsyncOperation(requestId, stealInfo.toString()) {
Expand All @@ -129,7 +139,10 @@ public int rebalanceLocalNode(final String storeName, final RebalanceStealInfo s


@Override @Override
public void operate() throws Exception { public void operate() throws Exception {
AdminClient adminClient = RebalanceUtils.createTempAdminClient(config,
metadataStore.getCluster());
try { try {
logger.info("Rebalancer: rebalance " + stealInfo + " starting.");


checkCurrentState(metadataStore, stealInfo); checkCurrentState(metadataStore, stealInfo);
setRebalancingState(metadataStore, stealInfo); setRebalancingState(metadataStore, stealInfo);
Expand All @@ -139,32 +152,39 @@ public void operate() throws Exception {
storeName, storeName,
stealInfo.getPartitionList(), stealInfo.getPartitionList(),
null); null);
logger.debug("rebalance internal async Id:" + fetchAndUpdateAsyncId);
adminClient.waitForCompletion(metadataStore.getNodeId(), adminClient.waitForCompletion(metadataStore.getNodeId(),
fetchAndUpdateAsyncId, fetchAndUpdateAsyncId,
24 * 60 * 60, 24 * 60 * 60,
TimeUnit.SECONDS); TimeUnit.SECONDS);
logger.info("rebalance " + stealInfo + " completed successfully.");
logger.info("Rebalancer: rebalance " + stealInfo + " completed successfully.");


// clean state only if successfull. // clean state only if successfull.
metadataStore.cleanAllRebalancingState(); metadataStore.cleanAllRebalancingState();


} finally { } finally {

// free the permit in all cases.
releaseRebalancingPermit();
adminClient.stop();
fetchAndUpdateAsyncId = -1;
} }
} }


@Override @Override
@JmxGetter(name = "asyncTaskStatus") @JmxGetter(name = "asyncTaskStatus")
public AsyncOperationStatus getStatus() { public AsyncOperationStatus getStatus() {
if(-1 != fetchAndUpdateAsyncId && !asyncRunner.isComplete(fetchAndUpdateAsyncId)) if(-1 != fetchAndUpdateAsyncId)
updateStatus(asyncRunner.getStatus(fetchAndUpdateAsyncId)); try {
updateStatus(asyncRunner.getStatus(fetchAndUpdateAsyncId));
} catch(Exception e) {
// ignore : handle race condition between asyncRunner
// removing value and fetchAndUpdate setting to -1
}


return super.getStatus(); return super.getStatus();
} }
}); });


logger.debug("rebalance node request_id:" + requestId);
return requestId; return requestId;
} }


Expand Down

0 comments on commit db4c0fa

Please sign in to comment.