diff --git a/src/java/voldemort/store/routed/PutPipelineData.java b/src/java/voldemort/store/routed/PutPipelineData.java index ebcc30e648..af34ec9b0d 100644 --- a/src/java/voldemort/store/routed/PutPipelineData.java +++ b/src/java/voldemort/store/routed/PutPipelineData.java @@ -16,10 +16,8 @@ package voldemort.store.routed; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - import voldemort.cluster.Node; +import voldemort.store.routed.action.AsyncPutSynchronizer; import voldemort.store.routed.action.PerformSerialPutRequests; import voldemort.versioning.Versioned; @@ -36,7 +34,7 @@ public class PutPipelineData extends BasicPipelineData { private long startTimeNs; - final private List slopOwnerNodes = new CopyOnWriteArrayList(); + final private AsyncPutSynchronizer synchronizer = new AsyncPutSynchronizer(); /** * Returns the previously determined "master" node. This is the first node @@ -100,23 +98,7 @@ public long getStartTimeNs() { return this.startTimeNs; } - /** - * Add a node to the slop owner list for registering slops in hintedHandoff - * stage - * - * @param node - */ - public void addSlopOwnerNode(Node node) { - slopOwnerNodes.add(node); - } - - /** - * Get list of nodes to register slop for - * - * @return list of nodes to register slop for - */ - public List getSlopOwnerNodes() { - return slopOwnerNodes; + public AsyncPutSynchronizer getSynchronizer() { + return synchronizer; } - } diff --git a/src/java/voldemort/store/routed/action/AbstractConfigureNodes.java b/src/java/voldemort/store/routed/action/AbstractConfigureNodes.java index 84576eb187..ff5dffa280 100644 --- a/src/java/voldemort/store/routed/action/AbstractConfigureNodes.java +++ b/src/java/voldemort/store/routed/action/AbstractConfigureNodes.java @@ -63,7 +63,7 @@ protected List getNodes(ByteArray key) { nodes.add(node); else { if(pipelineData instanceof PutPipelineData) { - ((PutPipelineData) pipelineData).addSlopOwnerNode(node); + ((PutPipelineData) pipelineData).getSynchronizer().tryDelegateSlop(node); } pipelineData.addFailedNode(node); if(logger.isDebugEnabled()) { diff --git a/src/java/voldemort/store/routed/action/AsyncPutSynchronizer.java b/src/java/voldemort/store/routed/action/AsyncPutSynchronizer.java new file mode 100644 index 0000000000..4a915e9cdb --- /dev/null +++ b/src/java/voldemort/store/routed/action/AsyncPutSynchronizer.java @@ -0,0 +1,131 @@ +package voldemort.store.routed.action; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; + +import voldemort.cluster.Node; +import voldemort.store.routed.Response; +import voldemort.utils.ByteArray; + +/** + * The AsyncPutSynchronizer Class is used for synchronizing operations inside + * PerformParallelPut action More specifically, it coordinate the exception + * handling and hinted handoff responsibility between master thread and async + * put threads + * + */ +public class AsyncPutSynchronizer { + + private final static Logger logger = Logger.getLogger(AsyncPutSynchronizer.class); + private boolean asyncCallbackShouldSendhint; + private boolean responseHandlingCutoff; + private final ConcurrentMap slopDestinations; // the value in + // the map is + // not used + private final Queue> responseQueue; + + public AsyncPutSynchronizer() { + asyncCallbackShouldSendhint = false; + responseHandlingCutoff = false; + slopDestinations = new ConcurrentHashMap(); + responseQueue = new LinkedList>(); + } + + /** + * Get list of nodes to register slop for + * + * @return list of nodes to register slop for + */ + public synchronized Set getDelegatedSlopDestinations() { + return Collections.unmodifiableSet(slopDestinations.keySet()); + } + + /** + * Stop accepting delegated slop responsibility by master + */ + public synchronized void disallowDelegateSlop() { + asyncCallbackShouldSendhint = true; + } + + /** + * Try to delegate the responsibility of sending slops to master + * + * @param node The node that slop should eventually be pushed to + * @return true if master accept the responsibility; false if master does + * not accept + */ + public synchronized boolean tryDelegateSlop(Node node) { + if(asyncCallbackShouldSendhint) { + return false; + } else { + slopDestinations.put(node, true); + return true; + } + } + + /** + * Master Stop accepting new responses (from async callbacks) + */ + public synchronized void cutoffHandling() { + responseHandlingCutoff = true; + } + + /** + * try to delegate the master to handle the response + * + * @param response + * @return true if the master accepted the response; false if the master + * didn't accept + */ + public synchronized boolean tryDelegateResponseHandling(Response response) { + if(responseHandlingCutoff) { + return false; + } else { + responseQueue.offer(response); + this.notifyAll(); + return true; + } + } + + /** + * poll the response queue for response + * + * @param timeout timeout amount + * @param timeUnit timeUnit of timeout + * @return same result of BlockQueue.poll(long, TimeUnit) + * @throws InterruptedException + */ + public synchronized Response responseQueuePoll(long timeout, + TimeUnit timeUnit) + throws InterruptedException { + long timeoutMs = timeUnit.toMillis(timeout); + long timeoutWallClockMs = System.currentTimeMillis() + timeoutMs; + while(responseQueue.isEmpty() && System.currentTimeMillis() < timeoutWallClockMs) { + long remainingMs = Math.max(0, timeoutWallClockMs - System.currentTimeMillis()); + if(logger.isDebugEnabled()) { + logger.debug("Start waiting for response queue with timeoutMs: " + timeoutMs); + } + this.wait(remainingMs); + if(logger.isDebugEnabled()) { + logger.debug("End waiting for response queue with timeoutMs: " + timeoutMs); + } + } + return responseQueue.poll(); + } + + /** + * to see if the response queue is empty + * + * @return true is response queue is empty; false if not empty. + */ + public synchronized boolean responseQueueIsEmpty() { + return responseQueue.isEmpty(); + } +} diff --git a/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java b/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java index edf2646a88..28711e84e6 100644 --- a/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java +++ b/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java @@ -21,10 +21,7 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Level; @@ -67,6 +64,11 @@ public class PerformParallelPutRequests extends public boolean enableHintedHandoff; + private boolean quorumSatisfied = false; + private boolean zonesSatisfied = false; + private Integer numResponsesGot = 0; + private Integer numNodesPendingResponse = 0; + public PerformParallelPutRequests(PutPipelineData pipelineData, Event completeEvent, ByteArray key, @@ -98,16 +100,11 @@ public void execute(final Pipeline pipeline) { final List nodes = pipelineData.getNodes(); final Versioned versionedCopy = pipelineData.getVersionedCopy(); final Integer numNodesTouchedInSerialPut = nodes.indexOf(masterNode) + 1; - Integer numNodesToWait = nodes.size() - numNodesTouchedInSerialPut; - Integer numResponsedGot = 0; - - final BlockingQueue> responseQueue = new LinkedBlockingQueue>(); - final AtomicBoolean parallelPutSuccess = new AtomicBoolean(false); - final AtomicBoolean parallelPutFinish = new AtomicBoolean(false); + numNodesPendingResponse = nodes.size() - numNodesTouchedInSerialPut; if(logger.isDebugEnabled()) logger.debug("PUT {key:" + key + "} MasterNode={id:" + masterNode.getId() - + "} totalNodesToAsyncPut=" + numNodesToWait); + + "} totalNodesToAsyncPut=" + numNodesPendingResponse); // initiate parallel puts for(int i = numNodesTouchedInSerialPut; i < nodes.size(); i++) { @@ -118,7 +115,7 @@ public void execute(final Pipeline pipeline) { @Override public void requestComplete(Object result, long requestTime) { - boolean willSendSlop = false; + boolean responseHandledByMaster = false; if(logger.isDebugEnabled()) logger.debug("PUT {key:" + key + "} response received from node={id:" + node.getId() + "} in " + requestTime + " ms)"); @@ -126,70 +123,73 @@ public void requestComplete(Object result, long requestTime) { Response response; response = new Response(node, key, result, requestTime); - responseQueue.add(response); - if(logger.isDebugEnabled()) { - logger.debug("PUT {key:" + key - + "} Parallel put thread returned result to main thread"); + logger.debug("PUT {key:" + + key + + "} Parallel put thread trying to return result to main thread"); } - Object responseObject = response.getValue(); - if(responseObject instanceof Exception) { - if(responseObject instanceof InvalidMetadataException) { - - pipelineData.reportException((InvalidMetadataException) response.getValue()); - logger.warn("PUT {key:" + key - + "} Received invalid metadata problem after a successful " - + pipeline.getOperation().getSimpleName() - + " call on node " + node.getId() + ", store '" - + pipelineData.getStoreName() + "'"); - } else if(responseObject instanceof UnreachableStoreException) { - // decide to delegate main thread to send slop or do - // it by myself - synchronized(parallelPutFinish) { - boolean finished = parallelPutFinish.get(); - boolean success = parallelPutSuccess.get(); - if(!finished) { - pipelineData.addSlopOwnerNode(node); - } else { - if(success) { - willSendSlop = true; - } - } - } + responseHandledByMaster = pipelineData.getSynchronizer() + .tryDelegateResponseHandling(response); + if(logger.isDebugEnabled()) { + logger.debug("PUT {key:" + key + "} Master thread accepted the response: " + + responseHandledByMaster); + } + + if(!responseHandledByMaster) { + if(result instanceof UnreachableStoreException) { if(logger.isDebugEnabled()) logger.debug("PUT {key:" + key + "} failed on node={id:" + node.getId() + ",host:" + node.getHost() + "}"); - if(isHintedHandoffEnabled() && willSendSlop) { - Slop slop = new Slop(pipelineData.getStoreName(), - Slop.Operation.PUT, - key, - versionedCopy.getValue(), - transforms, - node.getId(), - new Date()); - pipelineData.addFailedNode(node); - if(logger.isDebugEnabled()) - logger.debug("PUT {key:" + key - + "} Start registering Slop(node:" + node.getId() - + ",host:" + node.getHost() + ")"); - hintedHandoff.sendHintSerial(node, versionedCopy.getVersion(), slop); - if(logger.isDebugEnabled()) - logger.debug("PUT {key:" + key - + "} Finished registering Slop(node:" - + node.getId() + ",host:" + node.getHost() + ")"); + if(isHintedHandoffEnabled()) { + boolean triedDelegateSlop = pipelineData.getSynchronizer() + .tryDelegateSlop(node); + if(logger.isDebugEnabled()) { + logger.debug("PUT {key:" + key + "} triedDelegateSlop: " + + triedDelegateSlop); + } + if(!triedDelegateSlop) { + Slop slop = new Slop(pipelineData.getStoreName(), + Slop.Operation.PUT, + key, + versionedCopy.getValue(), + transforms, + node.getId(), + new Date()); + pipelineData.addFailedNode(node); + if(logger.isDebugEnabled()) + logger.debug("PUT {key:" + key + + "} Start registering Slop(node:" + + node.getId() + ",host:" + node.getHost() + + ")"); + hintedHandoff.sendHintSerial(node, + versionedCopy.getVersion(), + slop); + if(logger.isDebugEnabled()) + logger.debug("PUT {key:" + key + + "} Finished registering Slop(node:" + + node.getId() + ",host:" + node.getHost() + + ")"); + } } - handleResponseError(response, pipeline, failureDetector); - } else if(responseObject instanceof ObsoleteVersionException) { + } - } else { - handleResponseError(response, pipeline, failureDetector); + if(result instanceof Exception + && !(result instanceof ObsoleteVersionException)) { + if(response.getValue() instanceof InvalidMetadataException) { + pipelineData.reportException((InvalidMetadataException) response.getValue()); + logger.warn("Received invalid metadata problem after a successful " + + pipeline.getOperation().getSimpleName() + + " call on node " + node.getId() + ", store '" + + pipelineData.getStoreName() + "'"); + } else { + handleResponseError(response, pipeline, failureDetector); + } } } } - }; if(logger.isTraceEnabled()) @@ -201,91 +201,75 @@ public void requestComplete(Object result, long requestTime) { } try { - boolean preferredSatisfied = false; - boolean quorumSatisfied = false; - boolean zonesSatisfied = false; while(true) { long ellapsedNs = System.nanoTime() - pipelineData.getStartTimeNs(); long remainingNs = (timeoutMs * Time.NS_PER_MS) - ellapsedNs; remainingNs = Math.max(0, remainingNs); // preferred check - if(!preferredSatisfied) { - if(numResponsedGot >= preferred - 1) { - preferredSatisfied = true; - } + if(numResponsesGot >= preferred - 1) { + preferredSatisfied = true; } // quorum check - if(!quorumSatisfied) { - if(pipelineData.getSuccesses() >= required) { - quorumSatisfied = true; - } + if(pipelineData.getSuccesses() >= required) { + quorumSatisfied = true; } + // zone check - if(!zonesSatisfied) { - if(pipelineData.getZonesRequired() == null) { + if(pipelineData.getZonesRequired() == null) { + zonesSatisfied = true; + } else { + int numZonesSatisfied = pipelineData.getZoneResponses().size(); + if(numZonesSatisfied >= (pipelineData.getZonesRequired() + 1)) { zonesSatisfied = true; - } else { - int numZonesSatisfied = pipelineData.getZoneResponses().size(); - if(numZonesSatisfied >= (pipelineData.getZonesRequired() + 1)) { - zonesSatisfied = true; - } } } if(quorumSatisfied && zonesSatisfied && preferredSatisfied || remainingNs <= 0 - || numNodesToWait <= 0) { + || numNodesPendingResponse <= 0) { + pipelineData.getSynchronizer().cutoffHandling(); break; } else { - Response response = responseQueue.poll(remainingNs, - TimeUnit.NANOSECONDS); - if(response != null) { - numNodesToWait--; - numResponsedGot++; - if(response.getValue() instanceof Exception - && !(response.getValue() instanceof ObsoleteVersionException)) { - if(logger.isDebugEnabled()) { - logger.debug("PUT {key:" + key + "} handling async put error"); - } - if(handleResponseError(response, pipeline, failureDetector)) { - if(logger.isDebugEnabled()) { - logger.debug("PUT {key:" - + key - + "} severe async put error, exiting parallel put stage"); - } - - return; - } - - if(logger.isDebugEnabled()) { - logger.debug("PUT {key:" + key + "} handled async put error"); - } - - } else { - pipelineData.incrementSuccesses(); - failureDetector.recordSuccess(response.getNode(), - response.getRequestTime()); - pipelineData.getZoneResponses().add(response.getNode().getZoneId()); - } - } else { - logger.warn("RoutingTimedout on waiting for async ops; parellelResponseToWait: " - + numNodesToWait - + "; preferred-1: " - + (preferred - 1) - + "; quromOK: " - + quorumSatisfied - + "; zoneOK: " - + zonesSatisfied); + if(logger.isTraceEnabled()) { + logger.trace("PUT {key:" + key + "} trying to poll from queue"); + } + Response response = pipelineData.getSynchronizer() + .responseQueuePoll(remainingNs, + TimeUnit.NANOSECONDS); + processResponse(response, pipeline); + if(logger.isTraceEnabled()) { + logger.trace("PUT {key:" + key + "} tried to poll from queue. Null?: " + + (response == null) + " numResponsesGot:" + numResponsesGot + + " parellelResponseToWait: " + numNodesPendingResponse + + "; preferred-1: " + (preferred - 1) + "; preferredOK: " + + preferredSatisfied + " quromOK: " + quorumSatisfied + + "; zoneOK: " + zonesSatisfied); } + } } + // clean leftovers + // a) The main thread did a processResponse, due to which the + // criteria (quorum) was satisfied + // b) After this, the main thread cuts off adding responses to the + // queue by the async callbacks + + // An async callback can be invoked between a and b (this is the + // leftover) + while(!pipelineData.getSynchronizer().responseQueueIsEmpty()) { + Response response = pipelineData.getSynchronizer() + .responseQueuePoll(0, + TimeUnit.NANOSECONDS); + processResponse(response, pipeline); + } + if(quorumSatisfied && zonesSatisfied) { if(logger.isDebugEnabled()) { - logger.debug("PUT {key:" + key + "} successed at parellel put stage"); + logger.debug("PUT {key:" + key + "} succeeded at parellel put stage"); } - parallelPutSuccess.set(true); + pipelineData.getSynchronizer().disallowDelegateSlop(); pipeline.addEvent(completeEvent); } else { VoldemortException fatalError; @@ -324,7 +308,6 @@ public void requestComplete(Object result, long requestTime) { + pipelineData.getFailedNodes()); pipelineData.setFatalError(fatalError); } - parallelPutSuccess.set(false); pipeline.abort(); } } catch(InterruptedException e) { @@ -338,8 +321,49 @@ public void requestComplete(Object result, long requestTime) { if(logger.isDebugEnabled()) { logger.debug("PUT {key:" + key + "} marking parallel put stage finished"); } - synchronized(parallelPutFinish) { - parallelPutFinish.set(true); + } + } + + /** + * Process the response by reporting proper log and feeding failure + * detectors + * + * @param response + * @param pipeline + */ + private void processResponse(Response response, Pipeline pipeline) { + if(response == null) { + logger.warn("RoutingTimedout on waiting for async ops; parellelResponseToWait: " + + numNodesPendingResponse + "; preferred-1: " + (preferred - 1) + + "; quromOK: " + quorumSatisfied + "; zoneOK: " + zonesSatisfied); + } else { + numNodesPendingResponse = numNodesPendingResponse - 1; + numResponsesGot = numResponsesGot + 1; + if(response.getValue() instanceof Exception + && !(response.getValue() instanceof ObsoleteVersionException)) { + if(logger.isDebugEnabled()) { + logger.debug("PUT {key:" + key + "} handling async put error"); + } + if(handleResponseError(response, pipeline, failureDetector)) { + if(logger.isDebugEnabled()) { + logger.debug("PUT {key:" + key + + "} severe async put error, exiting parallel put stage"); + } + + return; + } + if(response.getValue() instanceof UnreachableStoreException) { + pipelineData.getSynchronizer().tryDelegateSlop(response.getNode()); + } + + if(logger.isDebugEnabled()) { + logger.debug("PUT {key:" + key + "} handled async put error"); + } + + } else { + pipelineData.incrementSuccesses(); + failureDetector.recordSuccess(response.getNode(), response.getRequestTime()); + pipelineData.getZoneResponses().add(response.getNode().getZoneId()); } } } diff --git a/src/java/voldemort/store/routed/action/PerformPutHintedHandoff.java b/src/java/voldemort/store/routed/action/PerformPutHintedHandoff.java index 0660459850..a7f770ab0c 100644 --- a/src/java/voldemort/store/routed/action/PerformPutHintedHandoff.java +++ b/src/java/voldemort/store/routed/action/PerformPutHintedHandoff.java @@ -53,8 +53,8 @@ public PerformPutHintedHandoff(PutPipelineData pipelineData, @Override public void execute(Pipeline pipeline) { Versioned versionedCopy = pipelineData.getVersionedCopy(); - for(Node slopOwnerNode: pipelineData.getSlopOwnerNodes()) { - int failedNodeId = slopOwnerNode.getId(); + for(Node slopFinalDestinationNode: pipelineData.getSynchronizer().getDelegatedSlopDestinations()) { + int failedNodeId = slopFinalDestinationNode.getId(); if(versionedCopy == null) { VectorClock clock = (VectorClock) versioned.getVersion(); versionedCopy = new Versioned(versioned.getValue(), @@ -64,7 +64,7 @@ public void execute(Pipeline pipeline) { Version version = versionedCopy.getVersion(); if(logger.isTraceEnabled()) - logger.trace("Performing parallel hinted handoff for node " + slopOwnerNode + logger.trace("Performing parallel hinted handoff for node " + slopFinalDestinationNode + ", store " + pipelineData.getStoreName() + " key " + key + ", version " + version); @@ -75,7 +75,7 @@ public void execute(Pipeline pipeline) { transforms, failedNodeId, new Date()); - hintedHandoff.sendHintParallel(slopOwnerNode, version, slop); + hintedHandoff.sendHintParallel(slopFinalDestinationNode, version, slop); } pipeline.addEvent(completeEvent); } diff --git a/src/java/voldemort/store/routed/action/PerformSerialPutRequests.java b/src/java/voldemort/store/routed/action/PerformSerialPutRequests.java index 561da2cadf..034f945745 100644 --- a/src/java/voldemort/store/routed/action/PerformSerialPutRequests.java +++ b/src/java/voldemort/store/routed/action/PerformSerialPutRequests.java @@ -127,7 +127,7 @@ public void execute(Pipeline pipeline) { + System.identityHashCode(key) + ")"); if(e instanceof UnreachableStoreException) { - pipelineData.addSlopOwnerNode(node); + pipelineData.getSynchronizer().tryDelegateSlop(node); } if(handleResponseError(e, node, requestTime, pipeline, failureDetector)) return;