diff --git a/src/java/voldemort/store/routed/action/PerformParallelDeleteRequests.java b/src/java/voldemort/store/routed/action/PerformParallelDeleteRequests.java index d5fa5a5a30..2eb799e630 100644 --- a/src/java/voldemort/store/routed/action/PerformParallelDeleteRequests.java +++ b/src/java/voldemort/store/routed/action/PerformParallelDeleteRequests.java @@ -87,13 +87,11 @@ public PerformParallelDeleteRequests(PD pipelineData, this.hintedHandoff = hintedHandoff; } - @Override public void execute(final Pipeline pipeline) { List nodes = pipelineData.getNodes(); + final Map> responses = new ConcurrentHashMap>(); int attempts = nodes.size(); int blocks = Math.min(preferred, attempts); - - final Map> responses = new ConcurrentHashMap>(); final CountDownLatch attemptsLatch = new CountDownLatch(attempts); final CountDownLatch blocksLatch = new CountDownLatch(blocks); @@ -101,17 +99,62 @@ public void execute(final Pipeline pipeline) { logger.trace("Attempting " + attempts + " " + pipeline.getOperation().getSimpleName() + " operations in parallel"); - long startTimeNs = System.nanoTime(); + long beginTime = System.nanoTime(); for(int i = 0; i < attempts; i++) { final Node node = nodes.get(i); pipelineData.incrementNodeIndex(); - NonblockingStoreCallback callback = new Callback(pipeline, - node, - responses, - attemptsLatch, - blocksLatch); + NonblockingStoreCallback callback = new NonblockingStoreCallback() { + + public void requestComplete(Object result, long requestTime) { + if(logger.isTraceEnabled()) + logger.trace(pipeline.getOperation().getSimpleName() + + " response received (" + requestTime + " ms.) from node " + + node.getId()); + + Response response = new Response(node, + key, + result, + requestTime); + responses.put(node.getId(), response); + if(enableHintedHandoff && pipeline.isFinished() + && response.getValue() instanceof UnreachableStoreException) { + Slop slop = new Slop(pipelineData.getStoreName(), + Slop.Operation.DELETE, + key, + null, + null, + node.getId(), + new Date()); + pipelineData.addFailedNode(node); + hintedHandoff.sendHintSerial(node, version, slop); + } + + attemptsLatch.countDown(); + blocksLatch.countDown(); + + if(logger.isTraceEnabled()) + logger.trace(attemptsLatch.getCount() + " attempts remaining. Will block " + + " for " + blocksLatch.getCount() + " more "); + + // Note errors that come in after the pipeline has finished. + // These will *not* get a chance to be called in the loop of + // responses below. + if(pipeline.isFinished() && response.getValue() instanceof Exception + && !(response.getValue() instanceof ObsoleteVersionException)) { + if(response.getValue() instanceof InvalidMetadataException) { + pipelineData.reportException((InvalidMetadataException) response.getValue()); + logger.warn("Received invalid metadata problem after a successful " + + pipeline.getOperation().getSimpleName() + + " call on node " + node.getId() + ", store '" + + pipelineData.getStoreName() + "'"); + } else { + handleResponseError(response, pipeline, failureDetector); + } + } + } + }; if(logger.isTraceEnabled()) logger.info("Submitting " + pipeline.getOperation().getSimpleName() @@ -121,13 +164,74 @@ public void execute(final Pipeline pipeline) { store.submitDeleteRequest(key, version, callback, timeoutMs); } - waitForResponses(startTimeNs, blocksLatch, responses, pipeline); + try { + long ellapsedNs = System.nanoTime() - beginTime; + long remainingNs = (timeoutMs * Time.NS_PER_MS) - ellapsedNs; + if(remainingNs > 0) { + blocksLatch.await(remainingNs, TimeUnit.NANOSECONDS); + } + } catch(InterruptedException e) { + if(logger.isEnabledFor(Level.WARN)) + logger.warn(e, e); + } + + for(Entry> responseEntry: responses.entrySet()) { + Response response = responseEntry.getValue(); + if(response.getValue() instanceof Exception) { + if(response.getValue() instanceof ObsoleteVersionException) { + // ignore this completely here + // this means that a higher version was able + // to write on this node and should be termed as + // clean success. + responses.remove(responseEntry.getKey()); + } else if(handleResponseError(response, pipeline, failureDetector)) { + return; + } + } else { + pipelineData.incrementSuccesses(); + failureDetector.recordSuccess(response.getNode(), response.getRequestTime()); + pipelineData.getZoneResponses().add(response.getNode().getZoneId()); + Response rCast = Utils.uncheckedCast(response); + pipelineData.getResponses().add(rCast); + responses.remove(responseEntry.getKey()); + } + } // wait for more responses in case we did not have enough successful // response to achieve the required count boolean quorumSatisfied = true; if(pipelineData.getSuccesses() < required) { - waitForResponses(startTimeNs, attemptsLatch, responses, pipeline); + long ellapsedNs = System.nanoTime() - beginTime; + long remainingNs = (timeoutMs * Time.NS_PER_MS) - ellapsedNs; + if(remainingNs > 0) { + try { + attemptsLatch.await(remainingNs, TimeUnit.NANOSECONDS); + } catch(InterruptedException e) { + if(logger.isEnabledFor(Level.WARN)) + logger.warn(e, e); + } + + for(Entry> responseEntry: responses.entrySet()) { + Response response = responseEntry.getValue(); + if(response.getValue() instanceof Exception) { + if(response.getValue() instanceof ObsoleteVersionException) { + // ignore this completely here + // this means that a higher version was able + // to write on this node and should be termed as + // clean success. + responses.remove(responseEntry.getKey()); + } else if(handleResponseError(response, pipeline, failureDetector)) + return; + } else { + pipelineData.incrementSuccesses(); + failureDetector.recordSuccess(response.getNode(), response.getRequestTime()); + pipelineData.getZoneResponses().add(response.getNode().getZoneId()); + Response rCast = Utils.uncheckedCast(response); + pipelineData.getResponses().add(rCast); + responses.remove(responseEntry.getKey()); + } + } + } if(pipelineData.getSuccesses() < required) { pipelineData.setFatalError(new InsufficientOperationalNodesException(required @@ -152,7 +256,32 @@ public void execute(final Pipeline pipeline) { if(zonesSatisfied >= (pipelineData.getZonesRequired() + 1)) { pipeline.addEvent(completeEvent); } else { - waitForResponses(startTimeNs, attemptsLatch, responses, pipeline); + long timeMs = (System.nanoTime() - beginTime) / Time.NS_PER_MS; + + if((timeoutMs - timeMs) > 0) { + try { + attemptsLatch.await(timeoutMs - timeMs, TimeUnit.MILLISECONDS); + } catch(InterruptedException e) { + if(logger.isEnabledFor(Level.WARN)) + logger.warn(e, e); + } + + for(Entry> responseEntry: responses.entrySet()) { + Response response = responseEntry.getValue(); + if(response.getValue() instanceof Exception) { + if(handleResponseError(response, pipeline, failureDetector)) + return; + } else { + pipelineData.incrementSuccesses(); + failureDetector.recordSuccess(response.getNode(), + response.getRequestTime()); + pipelineData.getZoneResponses().add(response.getNode().getZoneId()); + Response rCast = Utils.uncheckedCast(response); + pipelineData.getResponses().add(rCast); + responses.remove(responseEntry.getKey()); + } + } + } if(pipelineData.getZoneResponses().size() >= (pipelineData.getZonesRequired() + 1)) { pipeline.addEvent(completeEvent); @@ -173,118 +302,4 @@ public void execute(final Pipeline pipeline) { } } } - - private void waitForResponses(long startTimeNs, - CountDownLatch latch, - final Map> responses, - final Pipeline pipeline) { - long elapsedNs = System.nanoTime() - startTimeNs; - long remainingNs = (timeoutMs * Time.NS_PER_MS) - elapsedNs; - if(remainingNs > 0) { - try { - latch.await(remainingNs, TimeUnit.NANOSECONDS); - } catch(InterruptedException e) { - if(logger.isEnabledFor(Level.WARN)) - logger.warn(e, e); - } - - processResponses(responses, pipeline); - } - } - - private void processResponses(final Map> responses, - final Pipeline pipeline) { - for(Entry> responseEntry: responses.entrySet()) { - Response response = responseEntry.getValue(); - // Treat ObsoleteVersionExceptions as success since such an - // exception means that a higher version was able to write on the - // node. - if(response.getValue() instanceof Exception - && !(response.getValue() instanceof ObsoleteVersionException)) { - if(handleResponseError(response, pipeline, failureDetector)) - return; - } else { - pipelineData.incrementSuccesses(); - failureDetector.recordSuccess(response.getNode(), response.getRequestTime()); - pipelineData.getZoneResponses().add(response.getNode().getZoneId()); - - Response rCast = Utils.uncheckedCast(response); - pipelineData.getResponses().add(rCast); - - responses.remove(responseEntry.getKey()); - } - } - } - - public class Callback implements NonblockingStoreCallback { - - final Pipeline pipeline; - final Node node; - final Map> responses; - final CountDownLatch attemptsLatch; - final CountDownLatch blocksLatch; - - Callback(Pipeline pipeline, - Node node, - Map> responses, - CountDownLatch attemptsLatch, - CountDownLatch blocksLatch) { - this.pipeline = pipeline; - this.node = node; - this.responses = responses; - this.attemptsLatch = attemptsLatch; - this.blocksLatch = blocksLatch; - } - - @Override - public void requestComplete(Object result, long requestTime) { - if(logger.isTraceEnabled()) - logger.trace(pipeline.getOperation().getSimpleName() + " response received (" - + requestTime + " ms.) from node " + node.getId()); - - Response response = new Response(node, - key, - result, - requestTime); - responses.put(node.getId(), response); - - if(enableHintedHandoff && pipeline.isFinished() - && response.getValue() instanceof UnreachableStoreException) { - Slop slop = new Slop(pipelineData.getStoreName(), - Slop.Operation.DELETE, - key, - null, - null, - node.getId(), - new Date()); - pipelineData.addFailedNode(node); - // TODO: Should not have blocking operation in callback - hintedHandoff.sendHintSerial(node, version, slop); - } - - attemptsLatch.countDown(); - blocksLatch.countDown(); - - if(logger.isTraceEnabled()) - logger.trace(attemptsLatch.getCount() + " attempts remaining. Will block " - + " for " + blocksLatch.getCount() + " more "); - - // Note errors that come in after the pipeline has finished. - // These will *not* get a chance to be called in the loop of - // responses below. - if(pipeline.isFinished() && response.getValue() instanceof Exception - && !(response.getValue() instanceof ObsoleteVersionException)) { - if(response.getValue() instanceof InvalidMetadataException) { - pipelineData.reportException((InvalidMetadataException) response.getValue()); - logger.warn("Received invalid metadata problem after a successful " - + pipeline.getOperation().getSimpleName() + " call on node " - + node.getId() + ", store '" + pipelineData.getStoreName() + "'"); - } else { - // TODO: Should not have operation that acquires locks and - // may do blocking operations in callback - handleResponseError(response, pipeline, failureDetector); - } - } - } - } } diff --git a/src/java/voldemort/store/routed/action/PerformParallelGetAllRequests.java b/src/java/voldemort/store/routed/action/PerformParallelGetAllRequests.java index abc9dc20cc..e44cfdc9d3 100644 --- a/src/java/voldemort/store/routed/action/PerformParallelGetAllRequests.java +++ b/src/java/voldemort/store/routed/action/PerformParallelGetAllRequests.java @@ -1,5 +1,5 @@ /* - * Copyright 2011-2012 LinkedIn, Inc + * Copyright 2010 LinkedIn, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -37,7 +37,6 @@ import voldemort.store.routed.Pipeline.Event; import voldemort.store.routed.Response; import voldemort.utils.ByteArray; -import voldemort.utils.Utils; import voldemort.versioning.Versioned; import com.google.common.collect.Lists; @@ -63,12 +62,11 @@ public PerformParallelGetAllRequests(GetAllPipelineData pipelineData, this.nonblockingStores = nonblockingStores; } - @Override + @SuppressWarnings("unchecked") public void execute(final Pipeline pipeline) { int attempts = pipelineData.getNodeToKeysMap().size(); - final Map, Object>> responses = new ConcurrentHashMap, Object>>(); - final CountDownLatch attemptsLatch = new CountDownLatch(attempts); + final CountDownLatch latch = new CountDownLatch(attempts); if(logger.isTraceEnabled()) logger.trace("Attempting " + attempts + " " + pipeline.getOperation().getSimpleName() @@ -80,11 +78,37 @@ public void execute(final Pipeline pipeline) { final Node node = entry.getKey(); final Collection keys = entry.getValue(); - NonblockingStoreCallback callback = new Callback(pipeline, - node, - keys, - responses, - attemptsLatch); + NonblockingStoreCallback callback = new NonblockingStoreCallback() { + + public void requestComplete(Object result, long requestTime) { + if(logger.isTraceEnabled()) + logger.trace(pipeline.getOperation().getSimpleName() + + " response received (" + requestTime + " ms.) from node " + + node.getId()); + + Response, Object> response = new Response, Object>(node, + keys, + result, + requestTime); + responses.put(node.getId(), response); + latch.countDown(); + + // Note errors that come in after the pipeline has finished. + // These will *not* get a chance to be called in the loop of + // responses below. + if(pipeline.isFinished() && response.getValue() instanceof Exception) + if(response.getValue() instanceof InvalidMetadataException) { + pipelineData.reportException((InvalidMetadataException) response.getValue()); + logger.warn("Received invalid metadata problem after a successful " + + pipeline.getOperation().getSimpleName() + + " call on node " + node.getId() + ", store '" + + pipelineData.getStoreName() + "'"); + } else { + handleResponseError(response, pipeline, failureDetector); + } + } + + }; if(logger.isTraceEnabled()) logger.trace("Submitting " + pipeline.getOperation().getSimpleName() @@ -95,7 +119,7 @@ public void execute(final Pipeline pipeline) { } try { - attemptsLatch.await(timeoutMs, TimeUnit.MILLISECONDS); + latch.await(timeoutMs, TimeUnit.MILLISECONDS); } catch(InterruptedException e) { if(logger.isEnabledFor(Level.WARN)) logger.warn(e, e); @@ -106,7 +130,7 @@ public void execute(final Pipeline pipeline) { if(handleResponseError(response, pipeline, failureDetector)) return; } else { - Map>> values = Utils.uncheckedCast(response.getValue()); + Map>> values = (Map>>) response.getValue(); for(ByteArray key: response.getKey()) { MutableInt successCount = pipelineData.getSuccessCount(key); @@ -147,55 +171,4 @@ public void execute(final Pipeline pipeline) { pipeline.addEvent(completeEvent); } - - public class Callback implements NonblockingStoreCallback { - - final Pipeline pipeline; - final Node node; - final Collection keys; - final Map, Object>> responses; - final CountDownLatch attemptsLatch; - - Callback(Pipeline pipeline, - Node node, - Collection keys, - Map, Object>> responses, - CountDownLatch attemptsLatch) { - this.pipeline = pipeline; - this.node = node; - this.keys = keys; - this.responses = responses; - this.attemptsLatch = attemptsLatch; - } - - @Override - public void requestComplete(Object result, long requestTime) { - if(logger.isTraceEnabled()) - logger.trace(pipeline.getOperation().getSimpleName() + " response received (" - + requestTime + " ms.) from node " + node.getId()); - - Response, Object> response = new Response, Object>(node, - keys, - result, - requestTime); - responses.put(node.getId(), response); - attemptsLatch.countDown(); - - // Note errors that come in after the pipeline has finished. - // These will *not* get a chance to be called in the loop of - // responses below. - if(pipeline.isFinished() && response.getValue() instanceof Exception) - if(response.getValue() instanceof InvalidMetadataException) { - pipelineData.reportException((InvalidMetadataException) response.getValue()); - logger.warn("Received invalid metadata problem after a successful " - + pipeline.getOperation().getSimpleName() + " call on node " - + node.getId() + ", store '" + pipelineData.getStoreName() + "'"); - } else { - // TODO: Should not have operation that acquires locks and - // may do blocking operations in callback - handleResponseError(response, pipeline, failureDetector); - } - - } - } } \ No newline at end of file diff --git a/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java b/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java index 69938ad5c0..1cf13c308d 100644 --- a/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java +++ b/src/java/voldemort/store/routed/action/PerformParallelPutRequests.java @@ -91,7 +91,6 @@ public boolean isHintedHandoffEnabled() { return enableHintedHandoff; } - @Override public void execute(final Pipeline pipeline) { Node master = pipelineData.getMaster(); final Versioned versionedCopy = pipelineData.getVersionedCopy(); @@ -117,12 +116,65 @@ public void execute(final Pipeline pipeline) { final Node node = nodes.get(i); pipelineData.incrementNodeIndex(); - NonblockingStoreCallback callback = new Callback(pipeline, - node, - versionedCopy, - responses, - attemptsLatch, - blocksLatch); + NonblockingStoreCallback callback = new NonblockingStoreCallback() { + + public void requestComplete(Object result, long requestTime) { + if(logger.isTraceEnabled()) + logger.trace(pipeline.getOperation().getSimpleName() + + " response received (" + requestTime + " ms.) from node " + + node.getId()); + + Response response = new Response(node, + key, + result, + requestTime); + responses.put(node.getId(), response); + + if(logger.isDebugEnabled()) + logger.debug("Finished secondary PUT for key " + + ByteUtils.toHexString(key.get()) + " (keyRef: " + + System.identityHashCode(key) + "); took " + requestTime + + " ms on node " + node.getId() + "(" + node.getHost() + ")"); + + if(isHintedHandoffEnabled() && pipeline.isFinished()) { + if(response.getValue() instanceof UnreachableStoreException) { + Slop slop = new Slop(pipelineData.getStoreName(), + Slop.Operation.PUT, + key, + versionedCopy.getValue(), + transforms, + node.getId(), + new Date()); + pipelineData.addFailedNode(node); + hintedHandoff.sendHintSerial(node, versionedCopy.getVersion(), slop); + } + } + + attemptsLatch.countDown(); + blocksLatch.countDown(); + + if(logger.isTraceEnabled()) + logger.trace(attemptsLatch.getCount() + " attempts remaining. Will block " + + " for " + blocksLatch.getCount() + " more "); + + // Note errors that come in after the pipeline has finished. + // These will *not* get a chance to be called in the loop of + // responses below. + if(pipeline.isFinished() && response.getValue() instanceof Exception + && !(response.getValue() instanceof ObsoleteVersionException)) { + if(response.getValue() instanceof InvalidMetadataException) { + pipelineData.reportException((InvalidMetadataException) response.getValue()); + logger.warn("Received invalid metadata problem after a successful " + + pipeline.getOperation().getSimpleName() + + " call on node " + node.getId() + ", store '" + + pipelineData.getStoreName() + "'"); + } else { + handleResponseError(response, pipeline, failureDetector); + } + } + } + + }; if(logger.isTraceEnabled()) logger.trace("Submitting " + pipeline.getOperation().getSimpleName() @@ -131,11 +183,63 @@ public void execute(final Pipeline pipeline) { NonblockingStore store = nonblockingStores.get(node.getId()); store.submitPutRequest(key, versionedCopy, transforms, callback, timeoutMs); } - waitForResponses(blocksLatch, responses, pipeline); + + try { + long ellapsedNs = System.nanoTime() - pipelineData.getStartTimeNs(); + long remainingNs = (timeoutMs * Time.NS_PER_MS) - ellapsedNs; + if(remainingNs > 0) + blocksLatch.await(remainingNs, TimeUnit.NANOSECONDS); + } catch(InterruptedException e) { + if(logger.isEnabledFor(Level.WARN)) + logger.warn(e, e); + } + + for(Entry> responseEntry: responses.entrySet()) { + Response response = responseEntry.getValue(); + // Treat ObsoleteVersionExceptions as success since such an + // exception means that a higher version was able to write on the + // node. + if(response.getValue() instanceof Exception + && !(response.getValue() instanceof ObsoleteVersionException)) { + if(handleResponseError(response, pipeline, failureDetector)) + return; + } else { + pipelineData.incrementSuccesses(); + failureDetector.recordSuccess(response.getNode(), response.getRequestTime()); + pipelineData.getZoneResponses().add(response.getNode().getZoneId()); + responses.remove(responseEntry.getKey()); + } + } boolean quorumSatisfied = true; if(pipelineData.getSuccesses() < required) { - waitForResponses(attemptsLatch, responses, pipeline); + long ellapsedNs = System.nanoTime() - pipelineData.getStartTimeNs(); + long remainingNs = (timeoutMs * Time.NS_PER_MS) - ellapsedNs; + if(remainingNs > 0) { + try { + attemptsLatch.await(remainingNs, TimeUnit.NANOSECONDS); + } catch(InterruptedException e) { + if(logger.isEnabledFor(Level.WARN)) + logger.warn(e, e); + } + + for(Entry> responseEntry: responses.entrySet()) { + Response response = responseEntry.getValue(); + // Treat ObsoleteVersionExceptions as success since such an + // exception means that a higher version was able to write + // on the node. + if(response.getValue() instanceof Exception + && !(response.getValue() instanceof ObsoleteVersionException)) { + if(handleResponseError(response, pipeline, failureDetector)) + return; + } else { + pipelineData.incrementSuccesses(); + failureDetector.recordSuccess(response.getNode(), response.getRequestTime()); + pipelineData.getZoneResponses().add(response.getNode().getZoneId()); + responses.remove(responseEntry.getKey()); + } + } + } if(pipelineData.getSuccesses() < required) { pipelineData.setFatalError(new InsufficientOperationalNodesException(required @@ -156,12 +260,36 @@ public void execute(final Pipeline pipeline) { if(quorumSatisfied) { if(pipelineData.getZonesRequired() != null) { - int zonesSatisfied = pipelineData.getZoneResponses().size(); + int zonesSatisfied = pipelineData.getZoneResponses().size(); if(zonesSatisfied >= (pipelineData.getZonesRequired() + 1)) { pipeline.addEvent(completeEvent); } else { - waitForResponses(attemptsLatch, responses, pipeline); + long timeMs = (System.nanoTime() - pipelineData.getStartTimeNs()) + / Time.NS_PER_MS; + + if((timeoutMs - timeMs) > 0) { + try { + attemptsLatch.await(timeoutMs - timeMs, TimeUnit.MILLISECONDS); + } catch(InterruptedException e) { + if(logger.isEnabledFor(Level.WARN)) + logger.warn(e, e); + } + + for(Entry> responseEntry: responses.entrySet()) { + Response response = responseEntry.getValue(); + if(response.getValue() instanceof Exception) { + if(handleResponseError(response, pipeline, failureDetector)) + return; + } else { + pipelineData.incrementSuccesses(); + failureDetector.recordSuccess(response.getNode(), + response.getRequestTime()); + pipelineData.getZoneResponses().add(response.getNode().getZoneId()); + responses.remove(responseEntry.getKey()); + } + } + } if(pipelineData.getZoneResponses().size() >= (pipelineData.getZonesRequired() + 1)) { pipeline.addEvent(completeEvent); @@ -177,130 +305,10 @@ public void execute(final Pipeline pipeline) { pipeline.abort(); } } - } else { - pipeline.addEvent(completeEvent); - } - } - } - private void waitForResponses(CountDownLatch latch, - final Map> responses, - final Pipeline pipeline) { - long elapsedNs = System.nanoTime() - pipelineData.getStartTimeNs(); - long remainingNs = (timeoutMs * Time.NS_PER_MS) - elapsedNs; - if(remainingNs > 0) { - try { - latch.await(remainingNs, TimeUnit.NANOSECONDS); - } catch(InterruptedException e) { - if(logger.isEnabledFor(Level.WARN)) - logger.warn(e, e); - } - - processResponses(responses, pipeline); - } - } - - private void processResponses(final Map> responses, - final Pipeline pipeline) { - for(Entry> responseEntry: responses.entrySet()) { - Response response = responseEntry.getValue(); - // Treat ObsoleteVersionExceptions as success since such an - // exception means that a higher version was able to write on the - // node. - if(response.getValue() instanceof Exception - && !(response.getValue() instanceof ObsoleteVersionException)) { - if(handleResponseError(response, pipeline, failureDetector)) - return; } else { - pipelineData.incrementSuccesses(); - failureDetector.recordSuccess(response.getNode(), response.getRequestTime()); - pipelineData.getZoneResponses().add(response.getNode().getZoneId()); - - responses.remove(responseEntry.getKey()); - } - } - } - - public class Callback implements NonblockingStoreCallback { - - final Pipeline pipeline; - final Node node; - final Versioned versionedCopy; - final Map> responses; - final CountDownLatch attemptsLatch; - final CountDownLatch blocksLatch; - - Callback(Pipeline pipeline, - Node node, - Versioned versionedCopy, - Map> responses, - CountDownLatch attemptsLatch, - CountDownLatch blocksLatch) { - this.pipeline = pipeline; - this.node = node; - this.versionedCopy = versionedCopy; - this.responses = responses; - this.attemptsLatch = attemptsLatch; - this.blocksLatch = blocksLatch; - } - - @Override - public void requestComplete(Object result, long requestTime) { - if(logger.isTraceEnabled()) - logger.trace(pipeline.getOperation().getSimpleName() + " response received (" - + requestTime + " ms.) from node " + node.getId()); - - Response response = new Response(node, - key, - result, - requestTime); - responses.put(node.getId(), response); - - if(logger.isDebugEnabled()) - logger.debug("Finished secondary PUT for key " + ByteUtils.toHexString(key.get()) - + " (keyRef: " + System.identityHashCode(key) + "); took " - + requestTime + " ms on node " + node.getId() + "(" + node.getHost() - + ")"); - - if(isHintedHandoffEnabled() && pipeline.isFinished()) { - if(response.getValue() instanceof UnreachableStoreException) { - Slop slop = new Slop(pipelineData.getStoreName(), - Slop.Operation.PUT, - key, - versionedCopy.getValue(), - transforms, - node.getId(), - new Date()); - pipelineData.addFailedNode(node); - // TODO: Should not have blocking operation in callback - hintedHandoff.sendHintSerial(node, versionedCopy.getVersion(), slop); - } - } - - attemptsLatch.countDown(); - blocksLatch.countDown(); - - if(logger.isTraceEnabled()) - logger.trace(attemptsLatch.getCount() + " attempts remaining. Will block " - + " for " + blocksLatch.getCount() + " more "); - - // Note errors that come in after the pipeline has finished. - // These will *not* get a chance to be called in the loop of - // responses below. - if(pipeline.isFinished() && response.getValue() instanceof Exception - && !(response.getValue() instanceof ObsoleteVersionException)) { - if(response.getValue() instanceof InvalidMetadataException) { - pipelineData.reportException((InvalidMetadataException) response.getValue()); - logger.warn("Received invalid metadata problem after a successful " - + pipeline.getOperation().getSimpleName() + " call on node " - + node.getId() + ", store '" + pipelineData.getStoreName() + "'"); - } else { - // TODO: Should not have operation that acquires locks and - // may do blocking operations in callback - handleResponseError(response, pipeline, failureDetector); - } + pipeline.addEvent(completeEvent); } } - } } diff --git a/src/java/voldemort/store/routed/action/PerformParallelRequests.java b/src/java/voldemort/store/routed/action/PerformParallelRequests.java index b2aaa7abbf..f0a6c4f1e1 100644 --- a/src/java/voldemort/store/routed/action/PerformParallelRequests.java +++ b/src/java/voldemort/store/routed/action/PerformParallelRequests.java @@ -1,5 +1,5 @@ /* - * Copyright 2011-2012 LinkedIn, Inc + * Copyright 2010 LinkedIn, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -82,13 +82,11 @@ public PerformParallelRequests(PD pipelineData, this.insufficientZonesEvent = insufficientZonesEvent; } - @Override public void execute(final Pipeline pipeline) { List nodes = pipelineData.getNodes(); int attempts = Math.min(preferred, nodes.size()); - final Map> responses = new ConcurrentHashMap>(); - final CountDownLatch attemptsLatch = new CountDownLatch(attempts); + final CountDownLatch latch = new CountDownLatch(attempts); if(logger.isTraceEnabled()) logger.trace("Attempting " + attempts + " " + pipeline.getOperation().getSimpleName() @@ -98,16 +96,54 @@ public void execute(final Pipeline pipeline) { final Node node = nodes.get(i); pipelineData.incrementNodeIndex(); - NonblockingStoreCallback callback = new Callback(pipeline, - node, - responses, - attemptsLatch); + final long startMs = logger.isDebugEnabled() ? System.currentTimeMillis() : -1; + + NonblockingStoreCallback callback = new NonblockingStoreCallback() { + + public void requestComplete(Object result, long requestTime) { + if(logger.isTraceEnabled()) + logger.trace(pipeline.getOperation().getSimpleName() + + " response received (" + requestTime + " ms.) from node " + + node.getId()); + + Response response = new Response(node, + key, + result, + requestTime); + if(logger.isDebugEnabled()) + logger.debug("Finished " + pipeline.getOperation().getSimpleName() + + " for key " + ByteUtils.toHexString(key.get()) + + " (keyRef: " + System.identityHashCode(key) + + "); started at " + startMs + " took " + requestTime + + " ms on node " + node.getId() + "(" + node.getHost() + ")"); + + responses.put(node.getId(), response); + latch.countDown(); + + // Note errors that come in after the pipeline has finished. + // These will *not* get a chance to be called in the loop of + // responses below. + if(pipeline.isFinished() && response.getValue() instanceof Exception) { + if(response.getValue() instanceof InvalidMetadataException) { + pipelineData.reportException((InvalidMetadataException) response.getValue()); + logger.warn("Received invalid metadata problem after a successful " + + pipeline.getOperation().getSimpleName() + + " call on node " + node.getId() + ", store '" + + pipelineData.getStoreName() + "'"); + } else { + handleResponseError(response, pipeline, failureDetector); + } + } + } + + }; if(logger.isTraceEnabled()) logger.trace("Submitting " + pipeline.getOperation().getSimpleName() + " request on node " + node.getId()); NonblockingStore store = nonblockingStores.get(node.getId()); + if(pipeline.getOperation() == Operation.GET) store.submitGetRequest(key, transforms, callback, timeoutMs); else if(pipeline.getOperation() == Operation.GET_VERSIONS) @@ -119,7 +155,7 @@ else if(pipeline.getOperation() == Operation.GET_VERSIONS) } try { - attemptsLatch.await(timeoutMs, TimeUnit.MILLISECONDS); + latch.await(timeoutMs, TimeUnit.MILLISECONDS); } catch(InterruptedException e) { if(logger.isEnabledFor(Level.WARN)) logger.warn(e, e); @@ -131,11 +167,11 @@ else if(pipeline.getOperation() == Operation.GET_VERSIONS) return; } else { pipelineData.incrementSuccesses(); - failureDetector.recordSuccess(response.getNode(), response.getRequestTime()); - pipelineData.getZoneResponses().add(response.getNode().getZoneId()); Response rCast = Utils.uncheckedCast(response); pipelineData.getResponses().add(rCast); + failureDetector.recordSuccess(response.getNode(), response.getRequestTime()); + pipelineData.getZoneResponses().add(response.getNode().getZoneId()); } } @@ -163,8 +199,11 @@ else if(pipeline.getOperation() == Operation.GET_VERSIONS) pipeline.abort(); } + } else { + if(pipelineData.getZonesRequired() != null) { + int zonesSatisfied = pipelineData.getZoneResponses().size(); if(zonesSatisfied >= (pipelineData.getZonesRequired() + 1)) { pipeline.addEvent(completeEvent); @@ -186,64 +225,12 @@ else if(pipeline.getOperation() == Operation.GET_VERSIONS) + zonesSatisfied + " succeeded")); } + } + } else { pipeline.addEvent(completeEvent); } } } - - public class Callback implements NonblockingStoreCallback { - - final Pipeline pipeline; - final Node node; - final Map> responses; - final CountDownLatch attemptsLatch; - - Callback(Pipeline pipeline, - Node node, - Map> responses, - CountDownLatch attemptsLatch) { - this.pipeline = pipeline; - this.node = node; - this.responses = responses; - this.attemptsLatch = attemptsLatch; - } - - @Override - public void requestComplete(Object result, long requestTime) { - if(logger.isTraceEnabled()) - logger.trace(pipeline.getOperation().getSimpleName() + " response received (" - + requestTime + " ms.) from node " + node.getId()); - - Response response = new Response(node, - key, - result, - requestTime); - if(logger.isDebugEnabled()) - logger.debug("Finished " + pipeline.getOperation().getSimpleName() + " for key " - + ByteUtils.toHexString(key.get()) + " (keyRef: " - + System.identityHashCode(key) + "); took " + requestTime - + " ms on node " + node.getId() + "(" + node.getHost() + ")"); - - responses.put(node.getId(), response); - attemptsLatch.countDown(); - - // Note errors that come in after the pipeline has finished. - // These will *not* get a chance to be called in the loop of - // responses below. - if(pipeline.isFinished() && response.getValue() instanceof Exception) { - if(response.getValue() instanceof InvalidMetadataException) { - pipelineData.reportException((InvalidMetadataException) response.getValue()); - logger.warn("Received invalid metadata problem after a successful " - + pipeline.getOperation().getSimpleName() + " call on node " - + node.getId() + ", store '" + pipelineData.getStoreName() + "'"); - } else { - // TODO: Should not have operation that acquires locks and - // may do blocking operations in callback - handleResponseError(response, pipeline, failureDetector); - } - } - } - } }