Skip to content

Commit

Permalink
refactored PerformParallelPutAction and related to ensure a slop is r…
Browse files Browse the repository at this point in the history
…egistered
  • Loading branch information
zhongjiewu committed May 16, 2013
1 parent cbd4dbd commit 1ba83a1
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 178 deletions.
24 changes: 24 additions & 0 deletions src/java/voldemort/store/routed/PutPipelineData.java
Expand Up @@ -16,6 +16,9 @@

package voldemort.store.routed;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import voldemort.cluster.Node;
import voldemort.store.routed.action.PerformSerialPutRequests;
import voldemort.versioning.Versioned;
Expand All @@ -33,6 +36,8 @@ public class PutPipelineData extends BasicPipelineData<Void> {

private long startTimeNs;

final private List<Node> slopOwnerNodes = new CopyOnWriteArrayList<Node>();

/**
* Returns the previously determined "master" node. This is the first node
* in the preference list that succeeded in "putting" the value.
Expand Down Expand Up @@ -95,4 +100,23 @@ public long getStartTimeNs() {
return this.startTimeNs;
}

/**
* Add a node to the slop owner list for registering slops in hintedHandoff
* stage
*
* @param node
*/
public void addSlopOwnerNode(Node node) {
slopOwnerNodes.add(node);
}

/**
* Get list of nodes to register slop for
*
* @return list of nodes to register slop for
*/
public List<Node> getSlopOwnerNodes() {
return slopOwnerNodes;
}

}
Expand Up @@ -25,6 +25,7 @@
import voldemort.store.InsufficientOperationalNodesException;
import voldemort.store.routed.Pipeline.Event;
import voldemort.store.routed.PipelineData;
import voldemort.store.routed.PutPipelineData;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;

Expand Down Expand Up @@ -61,6 +62,9 @@ protected List<Node> getNodes(ByteArray key) {
if(failureDetector.isAvailable(node))
nodes.add(node);
else {
if(pipelineData instanceof PutPipelineData) {
((PutPipelineData) pipelineData).addSlopOwnerNode(node);
}
pipelineData.addFailedNode(node);
if(logger.isDebugEnabled()) {
logger.debug("Key " + ByteUtils.toHexString(key.get()) + " Node "
Expand Down
370 changes: 201 additions & 169 deletions src/java/voldemort/store/routed/action/PerformParallelPutRequests.java

Large diffs are not rendered by default.

Expand Up @@ -53,8 +53,8 @@ public PerformPutHintedHandoff(PutPipelineData pipelineData,
@Override
public void execute(Pipeline pipeline) {
Versioned<byte[]> versionedCopy = pipelineData.getVersionedCopy();
for(Node failedNode: failedNodes) {
int failedNodeId = failedNode.getId();
for(Node slopOwnerNode: pipelineData.getSlopOwnerNodes()) {
int failedNodeId = slopOwnerNode.getId();
if(versionedCopy == null) {
VectorClock clock = (VectorClock) versioned.getVersion();
versionedCopy = new Versioned<byte[]>(versioned.getValue(),
Expand All @@ -64,8 +64,9 @@ public void execute(Pipeline pipeline) {

Version version = versionedCopy.getVersion();
if(logger.isTraceEnabled())
logger.trace("Performing hinted handoff for node " + failedNode + ", store "
+ pipelineData.getStoreName() + " key " + key + ", version " + version);
logger.trace("Performing parallel hinted handoff for node " + slopOwnerNode
+ ", store " + pipelineData.getStoreName() + " key " + key
+ ", version " + version);

Slop slop = new Slop(pipelineData.getStoreName(),
Slop.Operation.PUT,
Expand All @@ -74,7 +75,7 @@ public void execute(Pipeline pipeline) {
transforms,
failedNodeId,
new Date());
hintedHandoff.sendHintParallel(failedNode, version, slop);
hintedHandoff.sendHintParallel(slopOwnerNode, version, slop);
}
pipeline.addEvent(completeEvent);
}
Expand Down
Expand Up @@ -25,6 +25,7 @@
import voldemort.store.InsufficientOperationalNodesException;
import voldemort.store.InsufficientZoneResponsesException;
import voldemort.store.Store;
import voldemort.store.UnreachableStoreException;
import voldemort.store.routed.Pipeline;
import voldemort.store.routed.Pipeline.Event;
import voldemort.store.routed.PutPipelineData;
Expand Down Expand Up @@ -125,6 +126,9 @@ public void execute(Pipeline pipeline) {
+ (System.nanoTime() - start) + " ns" + " (keyRef: "
+ System.identityHashCode(key) + ")");

if(e instanceof UnreachableStoreException) {
pipelineData.addSlopOwnerNode(node);
}
if(handleResponseError(e, node, requestTime, pipeline, failureDetector))
return;
}
Expand Down
6 changes: 5 additions & 1 deletion src/java/voldemort/store/slop/HintedHandoff.java
Expand Up @@ -216,9 +216,13 @@ public boolean sendHintSerial(Node failedNode, Version version, Slop slop) {
} catch(UnreachableStoreException e) {
failureDetector.recordException(node, (System.nanoTime() - startNs)
/ Time.NS_PER_MS, e);
logger.warn("Error during hinted handoff", e);
logger.warn("Error during hinted handoff. Will try another node", e);
} catch(IllegalStateException e) {
logger.warn("Error during hinted handoff. Will try another node", e);
} catch(ObsoleteVersionException e) {
logger.debug(e, e);
} catch(Exception e) {
logger.error("Unknown exception. Will try another node" + e);
}

if(logger.isDebugEnabled())
Expand Down
7 changes: 6 additions & 1 deletion src/java/voldemort/store/socket/SocketStore.java
Expand Up @@ -284,7 +284,12 @@ private <T> T request(ClientRequest<T> delegate, String operationName) {
clientRequestExecutor.addClientRequest(blockingClientRequest,
timeoutMs,
System.nanoTime() - startTimeNs);
blockingClientRequest.await();

boolean awaitResult = blockingClientRequest.await();

if(awaitResult == false) {
blockingClientRequest.timeOut();
}

if(logger.isDebugEnabled())
debugMsgStr += "success";
Expand Down
Expand Up @@ -57,8 +57,8 @@ public boolean isComplete() {
return delegate.isComplete() && latch.getCount() == 0;
}

public void await() throws InterruptedException {
latch.await(timeoutMs, TimeUnit.MILLISECONDS);
public boolean await() throws InterruptedException {
return latch.await(timeoutMs, TimeUnit.MILLISECONDS);
}

public T getResult() throws VoldemortException, IOException {
Expand Down

0 comments on commit 1ba83a1

Please sign in to comment.