Skip to content

Commit

Permalink
Fix and tests for Issue 211: Unnecessary read repairs during getAll w…
Browse files Browse the repository at this point in the history
…ith more than one key.

ReadRepairer now groups the values by key before performing the algorithm.
  • Loading branch information
ijuma committed Feb 3, 2010
1 parent 3ad9a5a commit 4ade916
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 1 deletion.
24 changes: 24 additions & 0 deletions src/java/voldemort/store/routed/ReadRepairer.java
Expand Up @@ -20,13 +20,16 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import voldemort.annotations.concurrency.Threadsafe;
import voldemort.versioning.Occured;
import voldemort.versioning.Version;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;

/**
Expand Down Expand Up @@ -54,6 +57,27 @@ public List<NodeValue<K, V>> getRepairs(List<NodeValue<K, V>> nodeValues) {
if(size <= 1)
return Collections.emptyList();

Map<K, List<NodeValue<K, V>>> keyToNodeValues = Maps.newHashMap();
for(NodeValue<K, V> nodeValue: nodeValues) {
List<NodeValue<K, V>> keyNodeValues = keyToNodeValues.get(nodeValue.getKey());
if(keyNodeValues == null) {
keyNodeValues = Lists.newArrayListWithCapacity(5);
keyToNodeValues.put(nodeValue.getKey(), keyNodeValues);
}
keyNodeValues.add(nodeValue);
}

List<NodeValue<K, V>> result = Lists.newArrayList();
for(List<NodeValue<K, V>> keyNodeValues: keyToNodeValues.values())
result.addAll(singleKeyGetRepairs(keyNodeValues));
return result;
}

private List<NodeValue<K, V>> singleKeyGetRepairs(List<NodeValue<K, V>> nodeValues) {
int size = nodeValues.size();
if(size <= 1)
return Collections.emptyList();

// A list of obsolete nodes that need to be repaired
Set<Integer> obsolete = new HashSet<Integer>(3);

Expand Down
16 changes: 16 additions & 0 deletions test/unit/voldemort/store/routed/ReadRepairerTest.java
Expand Up @@ -58,6 +58,7 @@
import voldemort.versioning.Versioned;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

Expand Down Expand Up @@ -292,6 +293,21 @@ public void assertVariationsEqual(List<NodeValue<String, Integer>> expected,
}
}

/**
* See Issue #211: Unnecessary read repairs during getAll with more than one
* key
*/
@Test
public void testMultipleKeys() {
List<NodeValue<String, Integer>> nodeValues = Lists.newArrayList();
nodeValues.add(getValue(0, 1, new int[2]));
nodeValues.add(getValue(0, 2, new int[0]));
nodeValues.add(getValue(1, 2, new int[0]));
nodeValues.add(getValue(2, 1, new int[2]));
List<NodeValue<String, Integer>> repairs = repairer.getRepairs(nodeValues);
assertEquals("There should be no repairs.", 0, repairs.size());
}

private NodeValue<String, Integer> getValue(int nodeId, int value, int[] version) {
return new NodeValue<String, Integer>(nodeId,
Integer.toString(value),
Expand Down
50 changes: 49 additions & 1 deletion test/unit/voldemort/store/routed/RoutedStoreTest.java
Expand Up @@ -59,6 +59,8 @@
import voldemort.store.StoreDefinitionBuilder;
import voldemort.store.UnreachableStoreException;
import voldemort.store.memory.InMemoryStorageEngine;
import voldemort.store.stats.StatTrackingStore;
import voldemort.store.stats.Tracked;
import voldemort.store.versioned.InconsistencyResolvingStore;
import voldemort.utils.ByteArray;
import voldemort.utils.Utils;
Expand Down Expand Up @@ -685,6 +687,53 @@ public void testPutTimeout() throws Exception {
}
}

/**
* See Issue #211: Unnecessary read repairs during getAll with more than one
* key
*/
@Test
public void testNoReadRepair() throws Exception {
cluster = VoldemortTestConstants.getThreeNodeCluster();
StoreDefinition storeDef = ServerTestUtils.getStoreDef("test",
3,
2,
1,
3,
2,
RoutingStrategyType.CONSISTENT_STRATEGY);

Map<Integer, Store<ByteArray, byte[]>> subStores = Maps.newHashMap();

/* We just need to keep a store from one node */
StatTrackingStore<ByteArray, byte[]> statTrackingStore = null;
for(int i = 0; i < 3; ++i) {
statTrackingStore = new StatTrackingStore<ByteArray, byte[]>(new InMemoryStorageEngine<ByteArray, byte[]>("test"),
null);
subStores.put(Iterables.get(cluster.getNodes(), i).getId(), statTrackingStore);
}
setFailureDetector(subStores);
RoutedStore routedStore = new RoutedStore("test",
subStores,
cluster,
storeDef,
1,
true,
1000L,
failureDetector);
ByteArray key1 = aKey;
routedStore.put(key1, Versioned.value("value1".getBytes()));
ByteArray key2 = TestUtils.toByteArray("voldemort");
routedStore.put(key2, Versioned.value("value2".getBytes()));

long putCount = statTrackingStore.getStats().getCount(Tracked.PUT);
routedStore.getAll(Arrays.asList(key1, key2));
/* Read repair happens asynchronously, so we wait a bit */
Thread.sleep(500);
assertEquals("put count should remain the same if there are no read repairs",
putCount,
statTrackingStore.getStats().getCount(Tracked.PUT));
}

private void assertOperationalNodes(int expected) {
int found = 0;
for(Node n: cluster.getNodes())
Expand All @@ -706,5 +755,4 @@ private void setFailureDetector(Map<Integer, Store<ByteArray, byte[]>> subStores
.setStoreVerifier(create(subStores));
failureDetector = create(failureDetectorConfig, false);
}

}

0 comments on commit 4ade916

Please sign in to comment.