Skip to content

Commit

Permalink
Refactor all PerformParallel*Request classes.
Browse files Browse the repository at this point in the history
- got rid of anonymous call back classes
- factored out waitForResponses logic and processResponses logic for most of these classes. GetAll stands out as being fairly different from the others.
- did not refactor to the point of sharing common code across classes, just refactored within each class.
- added many TODOs to the code for further refactoring.
  • Loading branch information
jayjwylie committed Jan 15, 2013
1 parent 551b96d commit 3162ba2
Show file tree
Hide file tree
Showing 4 changed files with 388 additions and 367 deletions.
273 changes: 132 additions & 141 deletions src/java/voldemort/store/routed/action/PerformParallelDeleteRequests.java
Expand Up @@ -71,74 +71,32 @@ public PerformParallelDeleteRequests(PD pipelineData,
this.hintedHandoff = hintedHandoff;
}

// TODO: This is almost identical to PerformParallelPutRequests.execute
@Override
public void execute(final Pipeline pipeline) {
List<Node> nodes = pipelineData.getNodes();
final Map<Integer, Response<ByteArray, Object>> responses = new ConcurrentHashMap<Integer, Response<ByteArray, Object>>();
int attempts = nodes.size();
int blocks = Math.min(preferred, attempts);

final Map<Integer, Response<ByteArray, Object>> responses = new ConcurrentHashMap<Integer, Response<ByteArray, Object>>();
final CountDownLatch attemptsLatch = new CountDownLatch(attempts);
final CountDownLatch blocksLatch = new CountDownLatch(blocks);

if(logger.isTraceEnabled())
logger.trace("Attempting " + attempts + " " + pipeline.getOperation().getSimpleName()
+ " operations in parallel");

long beginTime = System.nanoTime();
long startTimeNs = System.nanoTime();

for(int i = 0; i < attempts; i++) {
final Node node = nodes.get(i);
pipelineData.incrementNodeIndex();

NonblockingStoreCallback callback = new NonblockingStoreCallback() {

public void requestComplete(Object result, long requestTime) {
if(logger.isTraceEnabled())
logger.trace(pipeline.getOperation().getSimpleName()
+ " response received (" + requestTime + " ms.) from node "
+ node.getId());

Response<ByteArray, Object> response = new Response<ByteArray, Object>(node,
key,
result,
requestTime);
responses.put(node.getId(), response);
if(enableHintedHandoff && pipeline.isFinished()
&& response.getValue() instanceof UnreachableStoreException) {
Slop slop = new Slop(pipelineData.getStoreName(),
Slop.Operation.DELETE,
key,
null,
null,
node.getId(),
new Date());
pipelineData.addFailedNode(node);
hintedHandoff.sendHintSerial(node, version, slop);
}

attemptsLatch.countDown();
blocksLatch.countDown();

if(logger.isTraceEnabled())
logger.trace(attemptsLatch.getCount() + " attempts remaining. Will block "
+ " for " + blocksLatch.getCount() + " more ");

// Note errors that come in after the pipeline has finished.
// These will *not* get a chance to be called in the loop of
// responses below.
if(pipeline.isFinished() && response.getValue() instanceof Exception
&& !(response.getValue() instanceof ObsoleteVersionException)) {
if(response.getValue() instanceof InvalidMetadataException) {
pipelineData.reportException((InvalidMetadataException) response.getValue());
logger.warn("Received invalid metadata problem after a successful "
+ pipeline.getOperation().getSimpleName()
+ " call on node " + node.getId() + ", store '"
+ pipelineData.getStoreName() + "'");
} else {
handleResponseError(response, pipeline, failureDetector);
}
}
}
};
NonblockingStoreCallback callback = new Callback(pipeline,
node,
responses,
attemptsLatch,
blocksLatch);

if(logger.isTraceEnabled())
logger.info("Submitting " + pipeline.getOperation().getSimpleName()
Expand All @@ -148,74 +106,13 @@ public void requestComplete(Object result, long requestTime) {
store.submitDeleteRequest(key, version, callback, timeoutMs);
}

try {
long ellapsedNs = System.nanoTime() - beginTime;
long remainingNs = (timeoutMs * Time.NS_PER_MS) - ellapsedNs;
if(remainingNs > 0) {
blocksLatch.await(remainingNs, TimeUnit.NANOSECONDS);
}
} catch(InterruptedException e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e, e);
}

for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
Response<ByteArray, Object> response = responseEntry.getValue();
if(response.getValue() instanceof Exception) {
if(response.getValue() instanceof ObsoleteVersionException) {
// ignore this completely here
// this means that a higher version was able
// to write on this node and should be termed as
// clean success.
responses.remove(responseEntry.getKey());
} else if(handleResponseError(response, pipeline, failureDetector)) {
return;
}
} else {
pipelineData.incrementSuccesses();
failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
pipelineData.getZoneResponses().add(response.getNode().getZoneId());
Response<ByteArray, V> rCast = Utils.uncheckedCast(response);
pipelineData.getResponses().add(rCast);
responses.remove(responseEntry.getKey());
}
}
waitForResponses(startTimeNs, blocksLatch, responses, pipeline);

// wait for more responses in case we did not have enough successful
// response to achieve the required count
boolean quorumSatisfied = true;
if(pipelineData.getSuccesses() < required) {
long ellapsedNs = System.nanoTime() - beginTime;
long remainingNs = (timeoutMs * Time.NS_PER_MS) - ellapsedNs;
if(remainingNs > 0) {
try {
attemptsLatch.await(remainingNs, TimeUnit.NANOSECONDS);
} catch(InterruptedException e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e, e);
}

for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
Response<ByteArray, Object> response = responseEntry.getValue();
if(response.getValue() instanceof Exception) {
if(response.getValue() instanceof ObsoleteVersionException) {
// ignore this completely here
// this means that a higher version was able
// to write on this node and should be termed as
// clean success.
responses.remove(responseEntry.getKey());
} else if(handleResponseError(response, pipeline, failureDetector))
return;
} else {
pipelineData.incrementSuccesses();
failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
pipelineData.getZoneResponses().add(response.getNode().getZoneId());
Response<ByteArray, V> rCast = Utils.uncheckedCast(response);
pipelineData.getResponses().add(rCast);
responses.remove(responseEntry.getKey());
}
}
}
waitForResponses(startTimeNs, attemptsLatch, responses, pipeline);

if(pipelineData.getSuccesses() < required) {
pipelineData.setFatalError(new InsufficientOperationalNodesException(required
Expand All @@ -240,32 +137,7 @@ public void requestComplete(Object result, long requestTime) {
if(zonesSatisfied >= (pipelineData.getZonesRequired() + 1)) {
pipeline.addEvent(completeEvent);
} else {
long timeMs = (System.nanoTime() - beginTime) / Time.NS_PER_MS;

if((timeoutMs - timeMs) > 0) {
try {
attemptsLatch.await(timeoutMs - timeMs, TimeUnit.MILLISECONDS);
} catch(InterruptedException e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e, e);
}

for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
Response<ByteArray, Object> response = responseEntry.getValue();
if(response.getValue() instanceof Exception) {
if(handleResponseError(response, pipeline, failureDetector))
return;
} else {
pipelineData.incrementSuccesses();
failureDetector.recordSuccess(response.getNode(),
response.getRequestTime());
pipelineData.getZoneResponses().add(response.getNode().getZoneId());
Response<ByteArray, V> rCast = Utils.uncheckedCast(response);
pipelineData.getResponses().add(rCast);
responses.remove(responseEntry.getKey());
}
}
}
waitForResponses(startTimeNs, attemptsLatch, responses, pipeline);

if(pipelineData.getZoneResponses().size() >= (pipelineData.getZonesRequired() + 1)) {
pipeline.addEvent(completeEvent);
Expand All @@ -287,4 +159,123 @@ public void requestComplete(Object result, long requestTime) {
}
}

// TODO: except for start time, this is same as
// PerformParallelPutRequests.waitForResponses
private void waitForResponses(long startTimeNs,
CountDownLatch latch,
final Map<Integer, Response<ByteArray, Object>> responses,
final Pipeline pipeline) {
long elapsedNs = System.nanoTime() - startTimeNs;
long remainingNs = (timeoutMs * Time.NS_PER_MS) - elapsedNs;
if(remainingNs > 0) {
try {
latch.await(remainingNs, TimeUnit.NANOSECONDS);
} catch(InterruptedException e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e, e);
}

processResponses(responses, pipeline);
}
}

// TODO: except for two lines, this is same as
// PerformParallelPutRequests.processResponses
private void processResponses(final Map<Integer, Response<ByteArray, Object>> responses,
final Pipeline pipeline) {
for(Entry<Integer, Response<ByteArray, Object>> responseEntry: responses.entrySet()) {
Response<ByteArray, Object> response = responseEntry.getValue();
// Treat ObsoleteVersionExceptions as success since such an
// exception means that a higher version was able to write on the
// node.
if(response.getValue() instanceof Exception
&& !(response.getValue() instanceof ObsoleteVersionException)) {
if(handleResponseError(response, pipeline, failureDetector))
return;
} else {
pipelineData.incrementSuccesses();
failureDetector.recordSuccess(response.getNode(), response.getRequestTime());
pipelineData.getZoneResponses().add(response.getNode().getZoneId());

// TODO: Are the next two lines necessary!?!?!?
Response<ByteArray, V> rCast = Utils.uncheckedCast(response);
pipelineData.getResponses().add(rCast);

responses.remove(responseEntry.getKey());
}
}
}

// TODO: Almost identical to PerformParallelPutRequests.Callback. Anyway to
// refactor into common code?
public class Callback implements NonblockingStoreCallback {

final Pipeline pipeline;
final Node node;
final Map<Integer, Response<ByteArray, Object>> responses;
final CountDownLatch attemptsLatch;
final CountDownLatch blocksLatch;

Callback(Pipeline pipeline,
Node node,
Map<Integer, Response<ByteArray, Object>> responses,
CountDownLatch attemptsLatch,
CountDownLatch blocksLatch) {
this.pipeline = pipeline;
this.node = node;
this.responses = responses;
this.attemptsLatch = attemptsLatch;
this.blocksLatch = blocksLatch;
}

@Override
public void requestComplete(Object result, long requestTime) {
if(logger.isTraceEnabled())
logger.trace(pipeline.getOperation().getSimpleName() + " response received ("
+ requestTime + " ms.) from node " + node.getId());

Response<ByteArray, Object> response = new Response<ByteArray, Object>(node,
key,
result,
requestTime);
responses.put(node.getId(), response);

// TODO: Must move heavy-weight ops out of callback
if(enableHintedHandoff && pipeline.isFinished()
&& response.getValue() instanceof UnreachableStoreException) {
Slop slop = new Slop(pipelineData.getStoreName(),
Slop.Operation.DELETE,
key,
null,
null,
node.getId(),
new Date());
pipelineData.addFailedNode(node);
hintedHandoff.sendHintSerial(node, version, slop);
}

attemptsLatch.countDown();
blocksLatch.countDown();

if(logger.isTraceEnabled())
logger.trace(attemptsLatch.getCount() + " attempts remaining. Will block "
+ " for " + blocksLatch.getCount() + " more ");

// TODO: Must move heavy-weight ops out of callback
// Note errors that come in after the pipeline has finished.
// These will *not* get a chance to be called in the loop of
// responses below.
if(pipeline.isFinished() && response.getValue() instanceof Exception
&& !(response.getValue() instanceof ObsoleteVersionException)) {
if(response.getValue() instanceof InvalidMetadataException) {
pipelineData.reportException((InvalidMetadataException) response.getValue());
logger.warn("Received invalid metadata problem after a successful "
+ pipeline.getOperation().getSimpleName() + " call on node "
+ node.getId() + ", store '" + pipelineData.getStoreName() + "'");
} else {
handleResponseError(response, pipeline, failureDetector);
}
}
}
}
}

0 comments on commit 3162ba2

Please sign in to comment.