Skip to content
Browse files

Fix for issue 96: using preferredReads instead of attempted reads. Th…

…is can cause us to block on a non-existant read attempt if there are less than preferredReads nodes available.
  • Loading branch information...
1 parent 9120d14 commit ec050be54a59bb4b2df5fece488074f586db0a1f @jkreps jkreps committed Apr 22, 2009
Showing with 90 additions and 83 deletions.
  1. +83 −80 src/java/voldemort/store/routed/RoutedStore.java
  2. +7 −3 src/java/voldemort/store/socket/SocketStore.java
View
163 src/java/voldemort/store/routed/RoutedStore.java
@@ -165,7 +165,7 @@ public RoutedStore(String name,
public boolean delete(final ByteArray key, final Version version) throws VoldemortException {
StoreUtils.assertValidKey(key);
- final List<Node> nodes = routingStrategy.routeRequest(key.get());
+ final List<Node> nodes = availableNodes(routingStrategy.routeRequest(key.get()));
// quickly fail if there aren't enough live nodes to meet the
// requirements
@@ -210,6 +210,7 @@ public void run() {
});
}
+ int attempts = Math.min(preferredWrites, numNodes);
if(this.preferredWrites <= 0) {
return true;
} else {
@@ -221,7 +222,7 @@ public void run() {
+ "to complete after waiting " + timeoutMs + " ms.");
// okay, at least the required number of operations have
// completed, were they successful?
- if(successes.get() >= this.preferredWrites)
+ if(successes.get() >= attempts)
return deletedSomething.get();
} catch(InterruptedException e) {
throw new InsufficientOperationalNodesException("Delete operation interrupted!",
@@ -230,9 +231,9 @@ public void run() {
}
}
- // If we get to here, that means we couldn't hit the preferred number of
- // writes,
- // throw an exception if you can't even hit the required number
+ // If we get to here, that means we couldn't hit the preferred number
+ // of writes, throw an exception if you can't even hit the required
+ // number
if(successes.get() < requiredWrites)
throw new InsufficientOperationalNodesException(this.requiredWrites
+ " deletes required, but "
@@ -259,7 +260,7 @@ public void run() {
*/
public List<Versioned<byte[]>> get(final ByteArray key) throws VoldemortException {
StoreUtils.assertValidKey(key);
- final List<Node> nodes = routingStrategy.routeRequest(key.get());
+ final List<Node> nodes = availableNodes(routingStrategy.routeRequest(key.get()));
// quickly fail if there aren't enough nodes to meet the requirement
if(nodes.size() < this.requiredReads)
@@ -277,39 +278,37 @@ public void run() {
final List<Exception> failures = Collections.synchronizedList(new LinkedList<Exception>());
// Do the preferred number of reads in parallel
- final CountDownLatch latch = new CountDownLatch(this.preferredReads);
+ int attempts = Math.min(this.preferredReads, nodes.size());
+ final CountDownLatch latch = new CountDownLatch(attempts);
int nodeIndex = 0;
- for(; nodeIndex < this.preferredReads; nodeIndex++) {
+ for(; nodeIndex < attempts; nodeIndex++) {
final Node node = nodes.get(nodeIndex);
- if(isAvailable(node)) {
- this.executor.execute(new Runnable() {
+ this.executor.execute(new Runnable() {
- public void run() {
- try {
- List<Versioned<byte[]>> fetched = innerStores.get(node.getId())
- .get(key);
- retrieved.addAll(fetched);
- if(repairReads) {
- for(Versioned<byte[]> f: fetched)
- nodeValues.add(new NodeValue<ByteArray, byte[]>(node.getId(),
- key,
- f));
- }
- successes.incrementAndGet();
- node.getStatus().setAvailable();
- } catch(UnreachableStoreException e) {
- failures.add(e);
- markUnavailable(node, e);
- } catch(Exception e) {
- logger.debug("Error in get.", e);
- failures.add(e);
- } finally {
- // signal that the operation is complete
- latch.countDown();
+ public void run() {
+ try {
+ List<Versioned<byte[]>> fetched = innerStores.get(node.getId()).get(key);
+ retrieved.addAll(fetched);
+ if(repairReads) {
+ for(Versioned<byte[]> f: fetched)
+ nodeValues.add(new NodeValue<ByteArray, byte[]>(node.getId(),
+ key,
+ f));
}
+ successes.incrementAndGet();
+ node.getStatus().setAvailable();
+ } catch(UnreachableStoreException e) {
+ failures.add(e);
+ markUnavailable(node, e);
+ } catch(Exception e) {
+ logger.debug("Error in get.", e);
+ failures.add(e);
+ } finally {
+ // signal that the operation is complete
+ latch.countDown();
}
- });
- }
+ }
+ });
}
// Wait for those operations to complete or timeout
@@ -407,7 +406,7 @@ public String getName() {
public void put(final ByteArray key, final Versioned<byte[]> versioned)
throws VoldemortException {
StoreUtils.assertValidKey(key);
- final List<Node> nodes = routingStrategy.routeRequest(key.get());
+ final List<Node> nodes = availableNodes(routingStrategy.routeRequest(key.get()));
// quickly fail if there aren't enough nodes to meet the requirement
final int numNodes = nodes.size();
@@ -424,32 +423,29 @@ public void put(final ByteArray key, final Versioned<byte[]> versioned)
final Map<Integer, Exception> failures = Collections.synchronizedMap(new HashMap<Integer, Exception>(1));
// If requiredWrites > 0 then do a single blocking write to the first
- // live node in the
- // preference list if this node throws an ObsoleteVersionException allow
- // it to propagate
+ // live node in the preference list if this node throws an
+ // ObsoleteVersionException allow it to propagate
Node master = null;
int currentNode = 0;
Versioned<byte[]> versionedCopy = null;
for(; currentNode < numNodes; currentNode++) {
Node current = nodes.get(currentNode);
- if(isAvailable(nodes.get(currentNode))) {
- try {
- versionedCopy = incremented(versioned, current.getId());
- innerStores.get(current.getId()).put(key, versionedCopy);
- successes.getAndIncrement();
- current.getStatus().setAvailable();
- master = current;
- break;
- } catch(UnreachableStoreException e) {
- markUnavailable(current, e);
- failures.put(current.getId(), e);
- } catch(ObsoleteVersionException e) {
- // if this version is obsolete on the master, then bail out
- // of this operation
- throw e;
- } catch(Exception e) {
- failures.put(currentNode, e);
- }
+ try {
+ versionedCopy = incremented(versioned, current.getId());
+ innerStores.get(current.getId()).put(key, versionedCopy);
+ successes.getAndIncrement();
+ current.getStatus().setAvailable();
+ master = current;
+ break;
+ } catch(UnreachableStoreException e) {
+ markUnavailable(current, e);
+ failures.put(current.getId(), e);
+ } catch(ObsoleteVersionException e) {
+ // if this version is obsolete on the master, then bail out
+ // of this operation
+ throw e;
+ } catch(Exception e) {
+ failures.put(currentNode, e);
}
}
@@ -467,48 +463,47 @@ public void put(final ByteArray key, final Versioned<byte[]> versioned)
final Versioned<byte[]> finalVersionedCopy = versionedCopy;
final Semaphore semaphore = new Semaphore(0, false);
// Add the operations to the pool
+ int attempts = 0;
for(; currentNode < numNodes; currentNode++) {
+ attempts++;
final Node node = nodes.get(currentNode);
- if(isAvailable(node)) {
- this.executor.execute(new Runnable() {
+ this.executor.execute(new Runnable() {
- public void run() {
- try {
- innerStores.get(node.getId()).put(key, finalVersionedCopy);
- successes.incrementAndGet();
- node.getStatus().setAvailable();
- } catch(UnreachableStoreException e) {
- markUnavailable(node, e);
- failures.put(node.getId(), e);
- } catch(Exception e) {
- logger.debug("Error in get.", e);
- failures.put(node.getId(), e);
- } finally {
- // signal that the operation is complete
- semaphore.release();
- }
+ public void run() {
+ try {
+ innerStores.get(node.getId()).put(key, finalVersionedCopy);
+ successes.incrementAndGet();
+ node.getStatus().setAvailable();
+ } catch(UnreachableStoreException e) {
+ markUnavailable(node, e);
+ failures.put(node.getId(), e);
+ } catch(Exception e) {
+ logger.debug("Error in put:", e);
+ failures.put(node.getId(), e);
+ } finally {
+ // signal that the operation is complete
+ semaphore.release();
}
- });
- }
+ }
+ });
}
- // Block until at least requiredwrites have accumulated
- for(int i = 0; i < preferredWrites - 1; i++) {
+ // Block until we get enough completions
+ int blockCount = Math.min(preferredWrites - 1, attempts);
+ for(int i = 0; i < blockCount; i++) {
try {
boolean acquired = semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
if(!acquired)
logger.warn("Timed out waiting for put to succeed.");
// okay, at least the required number of operations have
- // completed, where they successful?
+ // completed, were they successful?
if(successes.get() >= this.preferredWrites)
break;
} catch(InterruptedException e) {
throw new InsufficientOperationalNodesException("Put operation interrupted", e);
}
}
- // if we don't have enough writes, then do some slop writes
- // these are seperate because we only want to do them if we have to
if(successes.get() < requiredWrites) {
throw new InsufficientOperationalNodesException(successes.get()
+ " writes succeeded, but "
@@ -538,6 +533,14 @@ private void markUnavailable(Node node, Exception e) {
time.getMilliseconds()));
}
+ private List<Node> availableNodes(List<Node> list) {
+ List<Node> available = new ArrayList<Node>(list.size());
+ for(Node node: list)
+ if(isAvailable(node))
+ available.add(node);
+ return available;
+ }
+
public void close() {
this.executor.shutdown();
try {
View
10 src/java/voldemort/store/socket/SocketStore.java
@@ -31,6 +31,7 @@
import voldemort.store.ErrorCodeMapper;
import voldemort.store.Store;
import voldemort.store.StoreUtils;
+import voldemort.store.UnreachableStoreException;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.Utils;
@@ -82,7 +83,8 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException
return inputStream.readBoolean();
} catch(IOException e) {
close(sands.getSocket());
- throw new VoldemortException(e);
+ throw new UnreachableStoreException("Failure in delete on " + destination + ": "
+ + e.getMessage(), e);
} finally {
pool.checkin(destination, sands);
}
@@ -122,7 +124,8 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException
return results;
} catch(IOException e) {
close(sands.getSocket());
- throw new VoldemortException(e);
+ throw new UnreachableStoreException("Failure in get on " + destination + ": "
+ + e.getMessage(), e);
} finally {
pool.checkin(destination, sands);
}
@@ -146,7 +149,8 @@ public void put(ByteArray key, Versioned<byte[]> value) throws VoldemortExceptio
checkException(inputStream);
} catch(IOException e) {
close(sands.getSocket());
- throw new VoldemortException(e);
+ throw new UnreachableStoreException("Failure in put on " + destination + ": "
+ + e.getMessage(), e);
} finally {
pool.checkin(destination, sands);
}

0 comments on commit ec050be

Please sign in to comment.
Something went wrong with that request. Please try again.