Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Test and fix for Issue 89: Sequential retrieval in RoutedStore.get do…

…esn't consider repairReads.
  • Loading branch information...
commit c113bb9b8bf18d74cdf4b413fd86ac71811f109b 1 parent bfd828f
@ijuma ijuma authored
View
7 src/java/voldemort/store/routed/RoutedStore.java
@@ -326,7 +326,12 @@ public void run() {
while(successes.get() < this.preferredReads && nodeIndex < nodes.size()) {
Node node = nodes.get(nodeIndex);
try {
- retrieved.addAll(innerStores.get(node.getId()).get(key));
+ List<Versioned<byte[]>> fetched = innerStores.get(node.getId()).get(key);
+ retrieved.addAll(fetched);
+ if(repairReads) {
+ for(Versioned<byte[]> f: fetched)
+ nodeValues.add(new NodeValue<ByteArray, byte[]>(node.getId(), key, f));
+ }
successes.incrementAndGet();
node.getStatus().setAvailable();
} catch(UnreachableStoreException e) {
View
50 test/unit/voldemort/store/routed/RoutedStoreTest.java
@@ -42,6 +42,7 @@
import voldemort.versioning.VectorClockInconsistencyResolver;
import voldemort.versioning.Versioned;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
/**
@@ -302,6 +303,55 @@ public void testOnlyNodeFailuresDisableNode() {
assertOperationalNodes(cluster, 0);
}
+ /**
+ * See Issue #89: Sequential retrieval in RoutedStore.get doesn't consider
+ * repairReads.
+ */
+ public void testReadRepairWithFailures() throws InterruptedException {
+ Cluster cluster = getNineNodeCluster();
+ // Disable node 1 so that the first put also goes to the last node
+ Iterables.get(cluster.getNodes(), 1).getStatus().setUnavailable();
+
+ RoutedStore routedStore = getStore(cluster,
+ cluster.getNumberOfNodes() - 1,
+ cluster.getNumberOfNodes() - 1,
+ 1,
+ 0);
+ Store<ByteArray, byte[]> store = new InconsistencyResolvingStore<ByteArray, byte[]>(routedStore,
+ new VectorClockInconsistencyResolver<byte[]>());
+ store.put(aKey, new Versioned<byte[]>(aValue));
+
+ byte[] anotherValue = "john".getBytes();
+
+ // Disable the last node and enable node 1 to prevent the last node from
+ // getting the new version
+ Iterables.getLast(cluster.getNodes()).getStatus().setUnavailable();
+ Iterables.get(cluster.getNodes(), 1).getStatus().setAvailable();
+ VectorClock clock = getClock(1);
+ store.put(aKey, new Versioned<byte[]>(anotherValue, clock));
+
+ // Enable last node and disable node 1, the following get should cause a
+ // read repair on the last node in the code path that is only executed
+ // if there are failures.
+ Iterables.get(cluster.getNodes(), 1).getStatus().setUnavailable();
+ Iterables.getLast(cluster.getNodes()).getStatus().setUnavailable();
+ store.get(aKey);
+
+ // Read repairs are done asynchronously, so we sleep for a short period.
+ // It may be a good idea to use a synchronous executor service.
+ Thread.sleep(100);
+
+ List<Versioned<byte[]>> versioneds = store.get(aKey);
+ assertEquals(1, versioneds.size());
+ assertEquals(new ByteArray(anotherValue), new ByteArray(versioneds.get(0).getValue()));
+ for(Store<ByteArray, byte[]> innerStore: routedStore.getInnerStores().values()) {
+ List<Versioned<byte[]>> innerVersioneds = innerStore.get(aKey);
+ assertEquals(1, versioneds.size());
+ assertEquals(new ByteArray(anotherValue), new ByteArray(innerVersioneds.get(0)
+ .getValue()));
+ }
+ }
+
public void testStoreTimeouts() {
/*
* Cluster cluster = getThreeNodeThreePartitionCluster(); RoutingStrategy
Please sign in to comment.
Something went wrong with that request. Please try again.