Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

fix two concurrency bugs during the termination of donor-based rebala…

…ncing
  • Loading branch information...
commit 1f057d7e3c68dc4d67387a5485ac6793d3feb8b7 1 parent f4edf7e
Lei Gao authored
View
102 src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java
@@ -268,70 +268,74 @@ public StreamRequestHandler handleRequest(final DataInputStream inputStream,
private VAdminProto.DeleteStoreRebalanceStateResponse handleDeleteStoreRebalanceState(VAdminProto.DeleteStoreRebalanceStateRequest request) {
VAdminProto.DeleteStoreRebalanceStateResponse.Builder response = VAdminProto.DeleteStoreRebalanceStateResponse.newBuilder();
+ synchronized(rebalancer) {
+ try {
- try {
-
- int nodeId = request.getNodeId();
- String storeName = request.getStoreName();
+ int nodeId = request.getNodeId();
+ String storeName = request.getStoreName();
- logger.info("Removing rebalancing state for donor node " + nodeId + " and store "
- + storeName);
- RebalancePartitionsInfo info = metadataStore.getRebalancerState().find(nodeId);
- if(info == null) {
- throw new VoldemortException("Could not find state for donor node " + nodeId);
- }
+ logger.info("Removing rebalancing state for donor node " + nodeId + " and store "
+ + storeName + " from stealer node " + metadataStore.getNodeId());
+ RebalancePartitionsInfo info = metadataStore.getRebalancerState().find(nodeId);
+ if(info == null) {
+ throw new VoldemortException("Could not find state for donor node " + nodeId);
+ }
- HashMap<Integer, List<Integer>> replicaToPartition = info.getReplicaToAddPartitionList(storeName);
- if(replicaToPartition == null) {
- throw new VoldemortException("Could not find state for donor node " + nodeId
- + " and store " + storeName);
- }
+ HashMap<Integer, List<Integer>> replicaToPartition = info.getReplicaToAddPartitionList(storeName);
+ if(replicaToPartition == null) {
+ throw new VoldemortException("Could not find state for donor node " + nodeId
+ + " and store " + storeName);
+ }
- info.removeStore(storeName);
- logger.info("Removed rebalancing state for donor node " + nodeId + " and store "
- + storeName);
+ info.removeStore(storeName);
+ logger.info("Removed rebalancing state for donor node " + nodeId + " and store "
+ + storeName + " from stealer node " + metadataStore.getNodeId());
- if(info.getUnbalancedStoreList().isEmpty()) {
- metadataStore.deleteRebalancingState(info);
- logger.info("Removed entire rebalancing state for donor node " + nodeId);
+ if(info.getUnbalancedStoreList().isEmpty()) {
+ metadataStore.deleteRebalancingState(info);
+ logger.info("Removed entire rebalancing state for donor node " + nodeId
+ + " from stealer node " + metadataStore.getNodeId());
+ }
+ } catch(VoldemortException e) {
+ response.setError(ProtoUtils.encodeError(errorCodeMapper, e));
+ logger.error("handleDeleteStoreRebalanceState failed for request("
+ + request.toString() + ")", e);
}
- } catch(VoldemortException e) {
- response.setError(ProtoUtils.encodeError(errorCodeMapper, e));
- logger.error("handleDeleteStoreRebalanceState failed for request(" + request.toString()
- + ")", e);
}
return response.build();
}
public VAdminProto.RebalanceStateChangeResponse handleRebalanceStateChange(VAdminProto.RebalanceStateChangeRequest request) {
-
VAdminProto.RebalanceStateChangeResponse.Builder response = VAdminProto.RebalanceStateChangeResponse.newBuilder();
- try {
- // Retrieve all values first
- List<RebalancePartitionsInfo> rebalancePartitionsInfo = Lists.newArrayList();
- for(RebalancePartitionInfoMap map: request.getRebalancePartitionInfoListList()) {
- rebalancePartitionsInfo.add(ProtoUtils.decodeRebalancePartitionInfoMap(map));
- }
-
- Cluster cluster = new ClusterMapper().readCluster(new StringReader(request.getClusterString()));
-
- boolean swapRO = request.getSwapRo();
- boolean changeClusterMetadata = request.getChangeClusterMetadata();
- boolean changeRebalanceState = request.getChangeRebalanceState();
- boolean rollback = request.getRollback();
+ synchronized(rebalancer) {
+ try {
+ // Retrieve all values first
+ List<RebalancePartitionsInfo> rebalancePartitionsInfo = Lists.newArrayList();
+ for(RebalancePartitionInfoMap map: request.getRebalancePartitionInfoListList()) {
+ rebalancePartitionsInfo.add(ProtoUtils.decodeRebalancePartitionInfoMap(map));
+ }
- rebalancer.rebalanceStateChange(cluster,
- rebalancePartitionsInfo,
- swapRO,
- changeClusterMetadata,
- changeRebalanceState,
- rollback);
- } catch(VoldemortException e) {
- response.setError(ProtoUtils.encodeError(errorCodeMapper, e));
- logger.error("handleRebalanceStateChange failed for request(" + request.toString()
- + ")", e);
+ Cluster cluster = new ClusterMapper().readCluster(new StringReader(request.getClusterString()));
+
+ boolean swapRO = request.getSwapRo();
+ boolean changeClusterMetadata = request.getChangeClusterMetadata();
+ boolean changeRebalanceState = request.getChangeRebalanceState();
+ boolean rollback = request.getRollback();
+
+ rebalancer.rebalanceStateChange(cluster,
+ rebalancePartitionsInfo,
+ swapRO,
+ changeClusterMetadata,
+ changeRebalanceState,
+ rollback);
+ } catch(VoldemortException e) {
+ response.setError(ProtoUtils.encodeError(errorCodeMapper, e));
+ logger.error("handleRebalanceStateChange failed for request(" + request.toString()
+ + ")", e);
+ }
}
+
return response.build();
}
View
2  src/java/voldemort/server/rebalance/Rebalancer.java
@@ -350,7 +350,7 @@ public int rebalanceNodeOnDonor(final List<RebalancePartitionsInfo> stealInfos)
// Get a lock for the stealer node
if(!acquireRebalancingPermit(stealerNodeId)) {
throw new VoldemortException("Node " + metadataStore.getNodeId()
- + " is already trying to steal from "
+ + " is already trying to push to stealer node "
+ stealerNodeId);
}
View
57 src/java/voldemort/server/rebalance/async/DonorBasedRebalanceAsyncOperation.java
@@ -21,11 +21,15 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import voldemort.VoldemortException;
@@ -34,7 +38,6 @@
import voldemort.cluster.Cluster;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
-import voldemort.server.protocol.admin.AsyncOperationService;
import voldemort.server.rebalance.Rebalancer;
import voldemort.server.rebalance.VoldemortRebalancingException;
import voldemort.store.StorageEngine;
@@ -59,9 +62,12 @@
*/
public class DonorBasedRebalanceAsyncOperation extends RebalanceAsyncOperation {
- public final static Pair<ByteArray, Versioned<byte[]>> END = Pair.create(null, null);
+ public static final Pair<ByteArray, Versioned<byte[]>> END = Pair.create(new ByteArray("END".getBytes()),
+ new Versioned<byte[]>("END".getBytes()));
+ public static final Pair<ByteArray, Versioned<byte[]>> BREAK = Pair.create(new ByteArray("BREAK".getBytes()),
+ new Versioned<byte[]>("BREAK".getBytes()));
- // Batch 500 entries for each fetchUpdate call.
+ // Batch 1000 entries for each fetchUpdate call.
private static final int FETCHUPDATE_BATCH_SIZE = 1000;
// Print scanned entries every 100k
private static final int SCAN_PROGRESS_COUNT = 100000;
@@ -75,7 +81,7 @@
private final HashMultimap<String, Pair<Integer, HashMap<Integer, List<Integer>>>> storeToNodePartitionMapping;
- private final AsyncOperationService pushSlavesExecutor;
+ private final ExecutorService pushSlavesExecutor;
private Map<String, List<DonorBasedRebalancePusherSlave>> updatePushSlavePool;
private HashMultimap<String, Pair<Integer, HashMap<Integer, List<Integer>>>> groupByStores(List<RebalancePartitionsInfo> stealInfos) {
@@ -107,7 +113,14 @@ public DonorBasedRebalanceAsyncOperation(Rebalancer rebalancer,
// Group the plans by the store names
this.storeToNodePartitionMapping = groupByStores(stealInfos);
- pushSlavesExecutor = rebalancer.getAsyncOperationService();
+ pushSlavesExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
+
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setName(r.getClass().getName());
+ return thread;
+ }
+ });
updatePushSlavePool = Collections.synchronizedMap(new HashMap<String, List<DonorBasedRebalancePusherSlave>>());
}
@@ -268,18 +281,15 @@ private void rebalanceStore(final String storeName,
final SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>> queue = new SynchronousQueue<Pair<ByteArray, Versioned<byte[]>>>();
nodeToQueue.put(tuple.getFirst(), queue);
- int jobId = pushSlavesExecutor.getUniqueRequestId();
String jobName = "DonorBasedRebalancePusherSlave for store " + storeName
+ " on node " + tuple.getFirst();
- DonorBasedRebalancePusherSlave updatePushSlave = new DonorBasedRebalancePusherSlave(jobId,
- jobName,
- tuple.getFirst(),
+ DonorBasedRebalancePusherSlave updatePushSlave = new DonorBasedRebalancePusherSlave(tuple.getFirst(),
queue,
storeName,
adminClient);
storePushSlaves.add(updatePushSlave);
- pushSlavesExecutor.submitOperation(jobId, updatePushSlave);
- logger.info("Submitted donor-based pusher job: id=" + jobId + " name=" + jobName);
+ pushSlavesExecutor.execute(updatePushSlave);
+ logger.info("Started a thread for " + jobName);
}
fetchEntriesForStealers(storageEngine,
@@ -339,7 +349,7 @@ private void putAll(List<Integer> dests,
fetched[nodeId]++;
nodeToQueue.get(nodeId).put(Pair.create(key, value));
if(0 == fetched[nodeId] % FETCHUPDATE_BATCH_SIZE) {
- nodeToQueue.get(nodeId).put(END);
+ nodeToQueue.get(nodeId).put(BREAK);
}
}
}
@@ -371,18 +381,27 @@ private void terminateAllSlaves(List<DonorBasedRebalancePusherSlave> updatePushS
it.next().requestCompletion();
}
- // wait for all async slave to finish
- for(Iterator<DonorBasedRebalancePusherSlave> it = updatePushSlavePool.iterator(); it.hasNext();) {
- it.next().waitCompletion();
+ // signal and wait for all slaves to finish
+ pushSlavesExecutor.shutdown();
+ try {
+ if(pushSlavesExecutor.awaitTermination(30, TimeUnit.MINUTES)) {
+ logger.info("All DonorBasedRebalancePushSlaves terminated successfully.");
+ } else {
+ logger.warn("Timed out while waiting for pusher slaves to shutdown!!!");
+ }
+ } catch(InterruptedException e) {
+ logger.warn("Interrupted while waiting for pusher slaves to shutdown!!!");
}
- logger.info("All DonorBasedRebalancePushSlaves terminated successfully.");
+ logger.info("DonorBasedRebalancingOperation existed.");
}
private void terminateAllSlavesAsync(List<DonorBasedRebalancePusherSlave> updatePushSlavePool) {
logger.info("Terminating DonorBasedRebalancePushSlaves asynchronously");
for(Iterator<DonorBasedRebalancePusherSlave> it = updatePushSlavePool.iterator(); it.hasNext();) {
- it.next().setCompletion();
+ it.next().requestCompletion();
}
+ pushSlavesExecutor.shutdownNow();
+ logger.info("DonorBasedRebalancingAsyncOperation existed.");
}
@Override
@@ -395,4 +414,4 @@ public void stop() {
}
executors.shutdownNow();
}
-}
+}
View
121 src/java/voldemort/server/rebalance/async/DonorBasedRebalancePusherSlave.java
@@ -4,13 +4,11 @@
import java.util.ArrayList;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.protocol.admin.AdminClient;
-import voldemort.server.protocol.admin.AsyncOperation;
import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator;
import voldemort.utils.Pair;
@@ -18,7 +16,7 @@
import com.google.common.collect.Lists;
-public class DonorBasedRebalancePusherSlave extends AsyncOperation {
+public class DonorBasedRebalancePusherSlave implements Runnable {
protected final static Logger logger = Logger.getLogger(DonorBasedRebalancePusherSlave.class);
@@ -26,28 +24,24 @@
private BlockingQueue<Pair<ByteArray, Versioned<byte[]>>> queue;
private String storeName;
private AdminClient adminClient;
- private ResumableIterator<Pair<ByteArray, Versioned<byte[]>>> nodeIterator = new ResumableIterator<Pair<ByteArray, Versioned<byte[]>>>();
- private AtomicBoolean stopRequested;
+ private ResumableIterator<Pair<ByteArray, Versioned<byte[]>>> nodeIterator;
- public DonorBasedRebalancePusherSlave(int id,
- String description,
- int nodeId,
+ public DonorBasedRebalancePusherSlave(int nodeId,
BlockingQueue<Pair<ByteArray, Versioned<byte[]>>> queue,
String storeName,
AdminClient adminClient) {
- super(id, description);
this.nodeId = nodeId;
this.queue = queue;
this.storeName = storeName;
this.adminClient = adminClient;
- this.stopRequested = new AtomicBoolean(false);
+ nodeIterator = new ResumableIterator<Pair<ByteArray, Versioned<byte[]>>>();
}
- @Override
- public void operate() throws Exception {
+ public void run() throws VoldemortException {
+ boolean needWait = false;
logger.info("DonorBasedRebalancePusherSlave begains to send partitions for store "
+ storeName + " to node " + nodeId);
- while(!isStopRequest()) {
+ while(!nodeIterator.done) {
try {
nodeIterator.reset();
adminClient.updateEntries(nodeId, storeName, nodeIterator, null);
@@ -61,41 +55,38 @@ public void operate() throws Exception {
+ " to remote node " + nodeId
+ ". Will retry again after 5 minutes");
logger.error(e.getCause());
- Thread.sleep(30000);
+ needWait = true;
} else {
throw e;
}
}
+
+ if(needWait) {
+ try {
+ // sleep for 5 minutes if exception occur while communicate
+ // with remote node
+ logger.info("waiting for 5 minutes for the remote node to recover");
+ Thread.sleep(30000);
+ needWait = false;
+ } catch(InterruptedException e) {
+ // continue
+ }
+ }
}
- setCompletion();
+
logger.info("DonorBasedRebalancePusherSlave finished sending partitions for store "
+ storeName + " to node " + nodeId);
}
- public void setStopRequest() {
- stopRequested.set(true);
- }
-
- public boolean isStopRequest() {
- return stopRequested.get();
- }
-
- @Override
- public void stop() {
- requestCompletion();
- }
-
/**
- * This function will set the request for stop first; Then insert 'END' into
- * the queue so slave will return from updateEntries. Noted that this order
- * shall not be changed or the slave will enter updateEntries again.
+ * This function inserts 'END' into the queue so slave will return from
+ * updateEntries.
*
* @param immediateTerminate
* @param notifySlave
*/
- public synchronized void requestCompletion() {
+ public void requestCompletion() {
try {
- setStopRequest();
queue.put(DonorBasedRebalanceAsyncOperation.END);
} catch(InterruptedException e) {
logger.info("Unable to send termination message to pusher slave for node " + nodeId
@@ -103,28 +94,10 @@ public synchronized void requestCompletion() {
}
}
- public synchronized void setCompletion() {
- getStatus().setComplete(true);
- notifyAll();
- }
-
- public synchronized void waitCompletion() {
- while(!getStatus().isComplete()) {
- try {
- logger.info("Waiting for the completion, with 10s timeout, of pusher slave for "
- + getStatus().getDescription() + " with id=" + getStatus().getId());
- // check for status every 10 seconds
- wait(10000);
- } catch(InterruptedException e) {
- logger.info("Existing wait loop due to interrupt.");
- break;
- }
- }
- }
-
// It will always Iterator through 'tentativeList' before iterating 'queue'
class ResumableIterator<T> implements ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> {
+ private boolean done = false;
private boolean recoveryModeOn = false;
private int recoveryPosition = 0;
private Pair<ByteArray, Versioned<byte[]>> currentElem = null;
@@ -153,19 +126,30 @@ public void reset() {
this.currentElem = null;
}
- // return when something is available, blocked otherwise
public boolean hasNext() {
boolean hasNext = false;
- while(null == currentElem) {
- try {
- currentElem = getNextElem();
- } catch(InterruptedException e) {
- logger.info("hasNext is interrupted while waiting for the next elem, existing...");
- break;
+ if(!done) {
+ while(null == currentElem) {
+ try {
+ currentElem = getNextElem();
+ } catch(InterruptedException e) {
+ logger.info("hasNext is interrupted while waiting for the next elem, existing...");
+ break;
+ }
+ }
+
+ // regular event
+ if(null != currentElem
+ && !currentElem.equals(DonorBasedRebalanceAsyncOperation.END)
+ && !currentElem.equals(DonorBasedRebalanceAsyncOperation.BREAK)) {
+ hasNext = true;
+ }
+
+ // this is the last element returned by this iterator
+ if(currentElem != null && currentElem.equals(DonorBasedRebalanceAsyncOperation.END)) {
+ done = true;
+ hasNext = false;
}
- }
- if(null != currentElem && !currentElem.equals(DonorBasedRebalanceAsyncOperation.END)) {
- hasNext = true;
}
return hasNext;
}
@@ -173,6 +157,10 @@ public boolean hasNext() {
// return the element when one or more is available, blocked
// otherwise
public Pair<ByteArray, Versioned<byte[]>> next() {
+ if(done) {
+ throw new NoSuchElementException();
+ }
+
while(null == currentElem) {
try {
currentElem = getNextElem();
@@ -180,10 +168,17 @@ public boolean hasNext() {
logger.info("next is interrupted while waiting for the next elem, existing...");
break;
}
- if(null == currentElem || currentElem.equals(DonorBasedRebalanceAsyncOperation.END)) {
+ if(null == currentElem || currentElem.equals(DonorBasedRebalanceAsyncOperation.END)
+ || currentElem.equals(DonorBasedRebalanceAsyncOperation.BREAK)) {
throw new NoSuchElementException();
}
}
+
+ // this is the last element returned by this iterator
+ if(currentElem != null && currentElem.equals(DonorBasedRebalanceAsyncOperation.END)) {
+ done = true;
+ }
+
Pair<ByteArray, Versioned<byte[]>> returnValue = currentElem;
currentElem = null;
return returnValue;
View
1  test/unit/voldemort/client/rebalance/AbstractRebalanceTest.java
@@ -666,6 +666,7 @@ public void testRWRebalanceFourNodes() throws Exception {
config.setDeleteAfterRebalancingEnabled(true);
config.setStealerBasedRebalancing(!useDonorBased());
config.setPrimaryPartitionBatchSize(100);
+ config.setMaxParallelRebalancing(5);
RebalanceController rebalanceClient = new RebalanceController(getBootstrapUrl(currentCluster,
0),
config);
Please sign in to comment.
Something went wrong with that request. Please try again.