Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Fix issue #90: Simplify RoutedStore.get and remove usage of synchronized

collections.
  • Loading branch information...
commit 09d6d268ed9dc6dac7ab5f6465bed0ae7aefd06b 1 parent f232537
@ijuma ijuma authored
Showing with 92 additions and 47 deletions.
  1. +92 −47 src/java/voldemort/store/routed/RoutedStore.java
View
139 src/java/voldemort/store/routed/RoutedStore.java
@@ -25,7 +25,6 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -431,64 +430,60 @@ public void run() {
// quickly fail if there aren't enough nodes to meet the requirement
checkRequiredReads(nodes);
- final List<Versioned<byte[]>> retrieved = Collections.synchronizedList(new ArrayList<Versioned<byte[]>>());
- final List<NodeValue<ByteArray, byte[]>> nodeValues = Collections.synchronizedList(new ArrayList<NodeValue<ByteArray, byte[]>>());
+ final List<Versioned<byte[]>> retrieved = Lists.newArrayList();
+ final List<NodeValue<ByteArray, byte[]>> nodeValues = Lists.newArrayList();
// A count of the number of successful operations
- final AtomicInteger successes = new AtomicInteger();
+ int successes = 0;
// A list of thrown exceptions, indicating the number of failures
- final List<Exception> failures = Collections.synchronizedList(new LinkedList<Exception>());
+ final List<Throwable> failures = Lists.newArrayListWithCapacity(3);
// Do the preferred number of reads in parallel
int attempts = Math.min(this.storeDef.getPreferredReads(), nodes.size());
- final CountDownLatch latch = new CountDownLatch(attempts);
int nodeIndex = 0;
+ List<GetCallable> callables = Lists.newArrayListWithCapacity(attempts);
for(; nodeIndex < attempts; nodeIndex++) {
final Node node = nodes.get(nodeIndex);
- this.executor.execute(new Runnable() {
-
- public void run() {
- try {
- if(logger.isTraceEnabled())
- logger.trace("Attempting get operation on node " + node.getId()
- + " for key '" + ByteUtils.toHexString(key.get()) + "'.");
- List<Versioned<byte[]>> fetched = innerStores.get(node.getId()).get(key);
- retrieved.addAll(fetched);
- if(repairReads) {
- for(Versioned<byte[]> f: fetched)
- nodeValues.add(new NodeValue<ByteArray, byte[]>(node.getId(),
- key,
- f));
- }
- successes.incrementAndGet();
- node.getStatus().setAvailable();
- } catch(UnreachableStoreException e) {
- failures.add(e);
- markUnavailable(node, e);
- } catch(Exception e) {
- logger.warn("Error in GET on node " + node.getId() + "(" + node.getHost()
- + ")", e);
- failures.add(e);
- } finally {
- // signal that the operation is complete
- latch.countDown();
- }
- }
- });
+ callables.add(new GetCallable(node, key));
}
- // Wait for those operations to complete or timeout
+ List<Future<GetResult>> futures;
try {
- boolean succeeded = latch.await(timeoutMs, TimeUnit.MILLISECONDS);
- if(!succeeded)
- logger.warn("Get operation timed out after " + timeoutMs + " ms.");
+ futures = executor.invokeAll(callables, timeoutMs, TimeUnit.MILLISECONDS);
} catch(InterruptedException e) {
throw new InsufficientOperationalNodesException("Get operation interrupted!", e);
}
+ for(Future<GetResult> f: futures) {
+ if(f.isCancelled()) {
+ logger.warn("Get operation timed out after " + timeoutMs + " ms.");
+ continue;
+ }
+ try {
+ GetResult getResult = f.get();
+ if(getResult.exception != null) {
+ failures.add(getResult.exception);
+ continue;
+ }
+ ++successes;
+ nodeValues.addAll(getResult.nodeValues);
+ retrieved.addAll(getResult.retrieved);
+ } catch(InterruptedException e) {
+ throw new InsufficientOperationalNodesException("Get operation interrupted!", e);
+ } catch(ExecutionException e) {
+ // We catch all Throwable subclasses apart from Error in the
+ // callable, so the else
+ // part should never happen.
+ if(e.getCause() instanceof Error)
+ throw (Error) e.getCause();
+ else
+ logger.error(e.getMessage(), e);
+ }
+ }
+
// Now if we had any failures we will be short a few reads. Do serial
// reads to make up for these.
- while(successes.get() < this.storeDef.getPreferredReads() && nodeIndex < nodes.size()) {
+ while(successes < this.storeDef.getPreferredReads() && nodeIndex < nodes.size()) {
Node node = nodes.get(nodeIndex);
try {
List<Versioned<byte[]>> fetched = innerStores.get(node.getId()).get(key);
@@ -497,7 +492,7 @@ public void run() {
for(Versioned<byte[]> f: fetched)
nodeValues.add(new NodeValue<ByteArray, byte[]>(node.getId(), key, f));
}
- successes.incrementAndGet();
+ ++successes;
node.getStatus().setAvailable();
} catch(UnreachableStoreException e) {
failures.add(e);
@@ -516,14 +511,12 @@ public void run() {
if(repairReads && retrieved.size() > 1)
repairReads(nodeValues);
- if(successes.get() >= this.storeDef.getRequiredReads())
+ if(successes >= this.storeDef.getRequiredReads())
return retrieved;
else
throw new InsufficientOperationalNodesException(this.storeDef.getRequiredReads()
- + " reads required, but "
- + successes.get()
- + " succeeded.",
- failures);
+ + " reads required, but " + successes
+ + " succeeded.", failures);
}
private void repairReads(final List<NodeValue<ByteArray, byte[]>> nodeValues) {
@@ -761,6 +754,58 @@ public Object getCapability(StoreCapabilityType capability) {
}
}
+ private final class GetCallable implements Callable<GetResult> {
+
+ private final Node node;
+ private final ByteArray key;
+
+ public GetCallable(Node node, ByteArray key) {
+ this.node = node;
+ this.key = key;
+ }
+
+ public GetResult call() throws Exception {
+ List<Versioned<byte[]>> fetched = Collections.emptyList();
+ List<NodeValue<ByteArray, byte[]>> nodeValues = Lists.newArrayList();
+ Throwable exception = null;
+ try {
+ if(logger.isTraceEnabled())
+ logger.trace("Attempting get operation on node " + node.getId() + " for key '"
+ + ByteUtils.toHexString(key.get()) + "'.");
+ fetched = innerStores.get(node.getId()).get(key);
+ if(repairReads) {
+ for(Versioned<byte[]> f: fetched)
+ nodeValues.add(new NodeValue<ByteArray, byte[]>(node.getId(), key, f));
+ }
+ node.getStatus().setAvailable();
+ } catch(UnreachableStoreException e) {
+ exception = e;
+ markUnavailable(node, e);
+ } catch(Throwable e) {
+ if(e instanceof Error)
+ throw (Error) e;
+ logger.warn("Error in GET on node " + node.getId() + "(" + node.getHost() + ")", e);
+ exception = e;
+ }
+ return new GetResult(fetched, nodeValues, exception);
+ }
+ }
+
+ private final static class GetResult {
+
+ final List<Versioned<byte[]>> retrieved;
+ final List<NodeValue<ByteArray, byte[]>> nodeValues;
+ final Throwable exception;
+
+ public GetResult(List<Versioned<byte[]>> retrieved,
+ List<NodeValue<ByteArray, byte[]>> nodeValues,
+ Throwable exception) {
+ this.retrieved = retrieved;
+ this.nodeValues = nodeValues;
+ this.exception = exception;
+ }
+ }
+
private final class GetAllCallable implements Callable<GetAllResult> {
private final Node node;
Please sign in to comment.
Something went wrong with that request. Please try again.