Skip to content

Commit

Permalink
Reverted refactoring of PerformParallel* classes committed in f37b25e…
Browse files Browse the repository at this point in the history
…cdd14ac27480a8ca353cae919464df3af
  • Loading branch information
jayjwylie committed Jan 15, 2013
1 parent eecaba0 commit b120f45
Show file tree
Hide file tree
Showing 4 changed files with 369 additions and 386 deletions.
267 changes: 141 additions & 126 deletions src/java/voldemort/store/routed/action/PerformParallelDeleteRequests.java
Expand Up @@ -87,31 +87,74 @@ public PerformParallelDeleteRequests(PD pipelineData,
this.hintedHandoff = hintedHandoff;
}

@Override
public void execute(final Pipeline pipeline) {
List<Node> nodes = pipelineData.getNodes();
final Map<Integer, Response<ByteArray, Object>> responses = new ConcurrentHashMap<Integer, Response<ByteArray, Object>>();
int attempts = nodes.size();
int blocks = Math.min(preferred, attempts);

final Map<Integer, Response<ByteArray, Object>> responses = new ConcurrentHashMap<Integer, Response<ByteArray, Object>>();
final CountDownLatch attemptsLatch = new CountDownLatch(attempts);
final CountDownLatch blocksLatch = new CountDownLatch(blocks);

if(logger.isTraceEnabled())
logger.trace("Attempting " + attempts + " " + pipeline.getOperation().getSimpleName()
+ " operations in parallel");

long startTimeNs = System.nanoTime();
long beginTime = System.nanoTime();

for(int i = 0; i < attempts; i++) {
final Node node = nodes.get(i);
pipelineData.incrementNodeIndex();

NonblockingStoreCallback callback = new Callback(pipeline,
node,
responses,
attemptsLatch,
blocksLatch);
NonblockingStoreCallback callback = new NonblockingStoreCallback() {

public void requestComplete(Object result, long requestTime) {
if(logger.isTraceEnabled())
logger.trace(pipeline.getOperation().getSimpleName()
+ " response received (" + requestTime + " ms.) from node "
+ node.getId());

Response<ByteArray, Object> response = new Response<ByteArray, Object>(node,
key,
result,
requestTime);
responses.put(node.getId(), response);
if(enableHintedHandoff && pipeline.isFinished()
&& response.getValue() instanceof UnreachableStoreException) {
Slop slop = new Slop(pipelineData.getStoreName(),
Slop.Operation.DELETE,
key,
null,
null,
node.getId(),
new Date());
pipelineData.addFailedNode(node);
hintedHandoff.sendHintSerial(node, version, slop);
}

attemptsLatch.countDown();
blocksLatch.countDown();

if(logger.isTraceEnabled())
logger.trace(attemptsLatch.getCount() + " attempts remaining. Will block "
+ " for " + blocksLatch.getCount() + " more ");

// Note errors that come in after the pipeline has finished.
// These will *not* get a chance to be called in the loop of
// responses below.
if(pipeline.isFinished() && response.getValue() instanceof Exception
&& !(response.getValue() instanceof ObsoleteVersionException)) {
if(response.getValue() instanceof InvalidMetadataException) {
pipelineData.reportException((InvalidMetadataException) response.getValue());
logger.warn("Received invalid metadata problem after a successful "
+ pipeline.getOperation().getSimpleName()
+ " call on node " + node.getId() + ", store '"
+ pipelineData.getStoreName() + "'");
} else {
handleResponseError(response, pipeline, failureDetector);
}
}
}
};

if(logger.isTraceEnabled())
logger.info("Submitting " + pipeline.getOperation().getSimpleName()
Expand All @@ -121,13 +164,74 @@ public void execute(final Pipeline pipeline) {
store.submitDeleteRequest(key, version, callback, timeoutMs);
}

waitForResponses(startTimeNs, blocksLatch, responses, pipeline);
try {
long ellapsedNs = System.nanoTime() - beginTime;
long remainingNs = (timeoutMs * Time.NS_PER_MS) - ellapsedNs;
if(remainingNs > 0) {
blocksLatch.await(remainingNs, TimeUnit.NANOSECONDS);
}
} catch(InterruptedException e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e, e);
}

for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
Response<ByteArray, Object> response = responseEntry.getValue();
if(response.getValue() instanceof Exception) {
if(response.getValue() instanceof ObsoleteVersionException) {
// ignore this completely here
// this means that a higher version was able
// to write on this node and should be termed as
// clean success.
responses.remove(responseEntry.getKey());
} else if(handleResponseError(response, pipeline, failureDetector)) {
return;
}
} else {
pipelineData.incrementSuccesses();
failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
pipelineData.getZoneResponses().add(response.getNode().getZoneId());
Response<ByteArray, V> rCast = Utils.uncheckedCast(response);
pipelineData.getResponses().add(rCast);
responses.remove(responseEntry.getKey());
}
}

// wait for more responses in case we did not have enough successful
// response to achieve the required count
boolean quorumSatisfied = true;
if(pipelineData.getSuccesses() < required) {
waitForResponses(startTimeNs, attemptsLatch, responses, pipeline);
long ellapsedNs = System.nanoTime() - beginTime;
long remainingNs = (timeoutMs * Time.NS_PER_MS) - ellapsedNs;
if(remainingNs > 0) {
try {
attemptsLatch.await(remainingNs, TimeUnit.NANOSECONDS);
} catch(InterruptedException e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e, e);
}

for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
Response<ByteArray, Object> response = responseEntry.getValue();
if(response.getValue() instanceof Exception) {
if(response.getValue() instanceof ObsoleteVersionException) {
// ignore this completely here
// this means that a higher version was able
// to write on this node and should be termed as
// clean success.
responses.remove(responseEntry.getKey());
} else if(handleResponseError(response, pipeline, failureDetector))
return;
} else {
pipelineData.incrementSuccesses();
failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
pipelineData.getZoneResponses().add(response.getNode().getZoneId());
Response<ByteArray, V> rCast = Utils.uncheckedCast(response);
pipelineData.getResponses().add(rCast);
responses.remove(responseEntry.getKey());
}
}
}

if(pipelineData.getSuccesses() < required) {
pipelineData.setFatalError(new InsufficientOperationalNodesException(required
Expand All @@ -152,7 +256,32 @@ public void execute(final Pipeline pipeline) {
if(zonesSatisfied >= (pipelineData.getZonesRequired() + 1)) {
pipeline.addEvent(completeEvent);
} else {
waitForResponses(startTimeNs, attemptsLatch, responses, pipeline);
long timeMs = (System.nanoTime() - beginTime) / Time.NS_PER_MS;

if((timeoutMs - timeMs) > 0) {
try {
attemptsLatch.await(timeoutMs - timeMs, TimeUnit.MILLISECONDS);
} catch(InterruptedException e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e, e);
}

for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
Response<ByteArray, Object> response = responseEntry.getValue();
if(response.getValue() instanceof Exception) {
if(handleResponseError(response, pipeline, failureDetector))
return;
} else {
pipelineData.incrementSuccesses();
failureDetector.recordSuccess(response.getNode(),
response.getRequestTime());
pipelineData.getZoneResponses().add(response.getNode().getZoneId());
Response<ByteArray, V> rCast = Utils.uncheckedCast(response);
pipelineData.getResponses().add(rCast);
responses.remove(responseEntry.getKey());
}
}
}

if(pipelineData.getZoneResponses().size() >= (pipelineData.getZonesRequired() + 1)) {
pipeline.addEvent(completeEvent);
Expand All @@ -173,118 +302,4 @@ public void execute(final Pipeline pipeline) {
}
}
}

private void waitForResponses(long startTimeNs,
CountDownLatch latch,
final Map<Integer, Response<ByteArray, Object>> responses,
final Pipeline pipeline) {
long elapsedNs = System.nanoTime() - startTimeNs;
long remainingNs = (timeoutMs * Time.NS_PER_MS) - elapsedNs;
if(remainingNs > 0) {
try {
latch.await(remainingNs, TimeUnit.NANOSECONDS);
} catch(InterruptedException e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e, e);
}

processResponses(responses, pipeline);
}
}

private void processResponses(final Map<Integer, Response<ByteArray, Object>> responses,
final Pipeline pipeline) {
for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
Response<ByteArray, Object> response = responseEntry.getValue();
// Treat ObsoleteVersionExceptions as success since such an
// exception means that a higher version was able to write on the
// node.
if(response.getValue() instanceof Exception
&& !(response.getValue() instanceof ObsoleteVersionException)) {
if(handleResponseError(response, pipeline, failureDetector))
return;
} else {
pipelineData.incrementSuccesses();
failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
pipelineData.getZoneResponses().add(response.getNode().getZoneId());

Response<ByteArray, V> rCast = Utils.uncheckedCast(response);
pipelineData.getResponses().add(rCast);

responses.remove(responseEntry.getKey());
}
}
}

public class Callback implements NonblockingStoreCallback {

final Pipeline pipeline;
final Node node;
final Map<Integer, Response<ByteArray, Object>> responses;
final CountDownLatch attemptsLatch;
final CountDownLatch blocksLatch;

Callback(Pipeline pipeline,
Node node,
Map<Integer, Response<ByteArray, Object>> responses,
CountDownLatch attemptsLatch,
CountDownLatch blocksLatch) {
this.pipeline = pipeline;
this.node = node;
this.responses = responses;
this.attemptsLatch = attemptsLatch;
this.blocksLatch = blocksLatch;
}

@Override
public void requestComplete(Object result, long requestTime) {
if(logger.isTraceEnabled())
logger.trace(pipeline.getOperation().getSimpleName() + " response received ("
+ requestTime + " ms.) from node " + node.getId());

Response<ByteArray, Object> response = new Response<ByteArray, Object>(node,
key,
result,
requestTime);
responses.put(node.getId(), response);

if(enableHintedHandoff && pipeline.isFinished()
&& response.getValue() instanceof UnreachableStoreException) {
Slop slop = new Slop(pipelineData.getStoreName(),
Slop.Operation.DELETE,
key,
null,
null,
node.getId(),
new Date());
pipelineData.addFailedNode(node);
// TODO: Should not have blocking operation in callback
hintedHandoff.sendHintSerial(node, version, slop);
}

attemptsLatch.countDown();
blocksLatch.countDown();

if(logger.isTraceEnabled())
logger.trace(attemptsLatch.getCount() + " attempts remaining. Will block "
+ " for " + blocksLatch.getCount() + " more ");

// Note errors that come in after the pipeline has finished.
// These will *not* get a chance to be called in the loop of
// responses below.
if(pipeline.isFinished() && response.getValue() instanceof Exception
&& !(response.getValue() instanceof ObsoleteVersionException)) {
if(response.getValue() instanceof InvalidMetadataException) {
pipelineData.reportException((InvalidMetadataException) response.getValue());
logger.warn("Received invalid metadata problem after a successful "
+ pipeline.getOperation().getSimpleName() + " call on node "
+ node.getId() + ", store '" + pipelineData.getStoreName() + "'");
} else {
// TODO: Should not have operation that acquires locks and
// may do blocking operations in callback
handleResponseError(response, pipeline, failureDetector);
}
}
}
}
}

0 comments on commit b120f45

Please sign in to comment.