Skip to content

Commit

Permalink
more commits on slop fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongjiewu committed May 16, 2013
1 parent 204615f commit ffe892d
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 67 deletions.
6 changes: 1 addition & 5 deletions src/java/voldemort/store/memory/InMemoryStorageEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
public class InMemoryStorageEngine<K, V, T> extends AbstractStorageEngine<K, V, T> {

private static final Logger logger = Logger.getLogger(InMemoryStorageEngine.class);
private final ConcurrentMap<K, List<Versioned<V>>> map;
protected final ConcurrentMap<K, List<Versioned<V>>> map;

public InMemoryStorageEngine(String name) {
super(name);
Expand All @@ -67,10 +67,6 @@ public boolean delete(K key) {
return delete(key, null);
}

public ConcurrentMap<K, List<Versioned<V>>> getInnerMap() {
return map;
}

@Override
public synchronized boolean delete(K key, Version version) {
StoreUtils.assertValidKey(key);
Expand Down
5 changes: 1 addition & 4 deletions src/java/voldemort/store/routed/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,7 @@ public void addEventAction(Event event, Action action) {
*/

public void abort() {
if(isHintedHandoffEnabled())
addEvent(Event.ABORTED);
else
addEvent(Event.ERROR);
addEvent(Event.ERROR);
}

/**
Expand Down
7 changes: 7 additions & 0 deletions src/java/voldemort/store/routed/PipelineRoutedStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import voldemort.common.VoldemortOpCode;
import voldemort.routing.RoutingStrategyType;
import voldemort.store.CompositeVoldemortRequest;
import voldemort.store.PersistenceFailureException;
import voldemort.store.Store;
import voldemort.store.StoreDefinition;
import voldemort.store.StoreRequest;
import voldemort.store.StoreUtils;
import voldemort.store.UnreachableStoreException;
import voldemort.store.nonblockingstore.NonblockingStore;
import voldemort.store.routed.Pipeline.Event;
import voldemort.store.routed.Pipeline.Operation;
Expand Down Expand Up @@ -904,4 +906,9 @@ public boolean delete(CompositeVoldemortRequest<ByteArray, byte[]> request)
throws VoldemortException {
return delete(request.getKey(), request.getVersion(), request.getRoutingTimeoutInMs());
}

public static boolean isSlopableFailure(Object response) {
return response instanceof UnreachableStoreException
|| response instanceof PersistenceFailureException;
}
}
1 change: 0 additions & 1 deletion src/java/voldemort/store/routed/action/ConfigureNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,4 @@ public void execute(Pipeline pipeline) {
pipelineData.setNodes(nodes);
pipeline.addEvent(completeEvent);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@
import voldemort.store.InsufficientOperationalNodesException;
import voldemort.store.InsufficientZoneResponsesException;
import voldemort.store.InvalidMetadataException;
import voldemort.store.PersistenceFailureException;
import voldemort.store.UnreachableStoreException;
import voldemort.store.nonblockingstore.NonblockingStore;
import voldemort.store.nonblockingstore.NonblockingStoreCallback;
import voldemort.store.routed.Pipeline;
import voldemort.store.routed.Pipeline.Event;
import voldemort.store.routed.PipelineRoutedStore;
import voldemort.store.routed.PutPipelineData;
import voldemort.store.routed.Response;
import voldemort.store.slop.HintedHandoff;
Expand Down Expand Up @@ -139,7 +138,12 @@ public void requestComplete(Object result, long requestTime) {
}

if(!responseHandledByMaster) {
if(isSlopableFailure(response.getValue())) {
if(logger.isDebugEnabled()) {
logger.debug("PUT {key:"
+ key
+ "} Master thread did not accept the response: will handle in worker thread");
}
if(PipelineRoutedStore.isSlopableFailure(response.getValue())) {
if(logger.isDebugEnabled())
logger.debug("PUT {key:" + key + "} failed on node={id:"
+ node.getId() + ",host:" + node.getHost() + "}");
Expand Down Expand Up @@ -176,6 +180,8 @@ public void requestComplete(Object result, long requestTime) {
}
}
} else {
// did not slop because either it's not exception or
// the exception is ignorable
if(logger.isDebugEnabled()) {
if(result instanceof Exception) {
logger.debug("PUT {key:"
Expand All @@ -184,7 +190,7 @@ public void requestComplete(Object result, long requestTime) {
+ result.getClass().toString());
} else {
logger.debug("PUT {key:" + key
+ "} will not send hint. Response is normal");
+ "} will not send hint. Response is success");
}
}
}
Expand Down Expand Up @@ -365,7 +371,7 @@ private void processResponse(Response<ByteArray, Object> response, Pipeline pipe

return;
}
if(isSlopableFailure(response.getValue())) {
if(PipelineRoutedStore.isSlopableFailure(response.getValue())) {
pipelineData.getSynchronizer().tryDelegateSlop(response.getNode());
}

Expand All @@ -380,9 +386,4 @@ private void processResponse(Response<ByteArray, Object> response, Pipeline pipe
}
}
}

private boolean isSlopableFailure(Object object) {
return object instanceof UnreachableStoreException
|| object instanceof PersistenceFailureException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.store.InsufficientOperationalNodesException;
import voldemort.store.InsufficientZoneResponsesException;
import voldemort.store.PersistenceFailureException;
import voldemort.store.Store;
import voldemort.store.UnreachableStoreException;
import voldemort.store.routed.Pipeline;
import voldemort.store.routed.Pipeline.Event;
import voldemort.store.routed.PipelineRoutedStore;
import voldemort.store.routed.PutPipelineData;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
Expand Down Expand Up @@ -127,8 +126,7 @@ public void execute(Pipeline pipeline) {
+ (System.nanoTime() - start) + " ns" + " (keyRef: "
+ System.identityHashCode(key) + ")");

if(e instanceof UnreachableStoreException
|| e instanceof PersistenceFailureException) {
if(PipelineRoutedStore.isSlopableFailure(e)) {
pipelineData.getSynchronizer().tryDelegateSlop(node);
}
if(handleResponseError(e, node, requestTime, pipeline, failureDetector))
Expand Down
30 changes: 21 additions & 9 deletions src/java/voldemort/store/slop/HintedHandoff.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public HintedHandoff(FailureDetector failureDetector,
public void sendHintParallel(Node failedNode, Version version, Slop slop) {
List<Node> nodes = new LinkedList<Node>();
nodes.addAll(handoffStrategy.routeHint(failedNode));
if(logger.isTraceEnabled()) {
if(logger.isDebugEnabled()) {
List<Integer> nodeIds = new ArrayList<Integer>();
for(Node node: nodes) {
nodeIds.add(node.getId());
Expand All @@ -114,20 +114,32 @@ public void sendHintParallel(Node failedNode, Version version, Slop slop) {
version), nodes);
}

/**
* A callback that handles requestComplete event from NIO selector manager
* Will try any possible nodes and pass itself as callback util all nodes
* are exhausted
*
* @param slopKey
* @param slopVersioned
* @param nodesToTry List of nodes to try to contact. Will become shorter
* after each callback
*/
private void sendOneAsyncHint(final ByteArray slopKey,
final Versioned<byte[]> slopVersioned,
final List<Node> routeNodes) {
final List<Node> nodesToTry) {
Node nodeToHostHint = null;
while(routeNodes.size() > 0) {
nodeToHostHint = routeNodes.remove(0);
boolean foundNode = false;
while(nodesToTry.size() > 0) {
nodeToHostHint = nodesToTry.remove(0);
if(!failedNodes.contains(nodeToHostHint) && failureDetector.isAvailable(nodeToHostHint)) {
foundNode = true;
break;
} else {
nodeToHostHint = null;
}
}
if(nodeToHostHint == null) {
logger.error("trying to send an async hint but used up all nodes");
if(!foundNode) {
Slop slop = slopSerializer.toObject(slopVersioned.getValue());
logger.error("Trying to send an async hint but used up all nodes. key: "
+ slop.getKey() + " version: " + slopVersioned.getVersion().toString());
return;
}
final Node node = nodeToHostHint;
Expand Down Expand Up @@ -166,7 +178,7 @@ public void requestComplete(Object result, long requestTime) {
failureDetector.recordException(node, (System.nanoTime() - startNs)
/ Time.NS_PER_MS, use);
}
sendOneAsyncHint(slopKey, slopVersioned, routeNodes);
sendOneAsyncHint(slopKey, slopVersioned, nodesToTry);
}

if(loggerDebugEnabled)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package voldemort.store.memory;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -11,6 +12,16 @@
import voldemort.store.StoreUtils;
import voldemort.versioning.Versioned;

/**
* . This class is used to assert puts on keys and to examine what key put
* assertions has been fulfilled and what are not This is particularly useful
* for cases where there are a large number of puts and the their values do not
* matter and they are not read
*
* @param <K> Key Type
* @param <V> Value Type
* @param <T> Transformation Type
*/
public class InMemoryPutAssertionStorageEngine<K, V, T> extends InMemoryStorageEngine<K, V, T> {

private static final Logger logger = Logger.getLogger(InMemoryPutAssertionStorageEngine.class);
Expand All @@ -26,7 +37,7 @@ public synchronized void assertPut(K key) throws VoldemortException {
StoreUtils.assertValidKey(key);

// delete if exist
List<Versioned<V>> result = super.getInnerMap().remove(key);
List<Versioned<V>> result = map.remove(key);
if(result == null || result.size() == 0) {
// if non-exist, record as assertion
assertionMap.put(key, true); // use synchronized to avoid race
Expand Down Expand Up @@ -64,6 +75,6 @@ public synchronized void put(K key, Versioned<V> value, T transforms) throws Vol
}

public Set<K> getFailedAssertions() {
return assertionMap.keySet();
return Collections.unmodifiableSet(assertionMap.keySet());
}
}
Loading

0 comments on commit ffe892d

Please sign in to comment.