From 5f796e8c6742ea101dc2d530925dc5e1b629f988 Mon Sep 17 00:00:00 2001 From: Chinmay Soman Date: Wed, 19 Jun 2013 15:56:56 -0700 Subject: [PATCH] Added 3 zone tests to RoutedStoreTest.java, fixed a small bug in GetAllConfigureNodes --- .../routed/action/GetAllConfigureNodes.java | 6 +- .../voldemort/VoldemortTestConstants.java | 24 + .../config/nine-node-cluster-with-3-zones.xml | 97 ++ .../config/six-node-cluster-with-3-zones.xml | 70 ++ .../three-node-cluster-with-3-zones.xml | 43 + .../store/routed/RoutedStoreTest.java | 964 ++++++++++++++++-- 6 files changed, 1135 insertions(+), 69 deletions(-) create mode 100644 test/common/voldemort/config/nine-node-cluster-with-3-zones.xml create mode 100644 test/common/voldemort/config/six-node-cluster-with-3-zones.xml create mode 100644 test/common/voldemort/config/three-node-cluster-with-3-zones.xml diff --git a/src/java/voldemort/store/routed/action/GetAllConfigureNodes.java b/src/java/voldemort/store/routed/action/GetAllConfigureNodes.java index c923e25b1b..a06bac9899 100644 --- a/src/java/voldemort/store/routed/action/GetAllConfigureNodes.java +++ b/src/java/voldemort/store/routed/action/GetAllConfigureNodes.java @@ -117,7 +117,11 @@ public void execute(Pipeline pipeline) { } // Add the rest - nodes.addAll(zoneIdToNode.get(this.clientZone.getId())); + List zoneIDNodeList = zoneIdToNode.get(this.clientZone.getId()); + if(zoneIDNodeList != null) { + nodes.addAll(zoneIDNodeList); + } + for(int index = 0; index < proximityList.size(); index++) { List zoneNodes = zoneIdToNode.get(proximityList.get(index)); if(zoneNodes != null) diff --git a/test/common/voldemort/VoldemortTestConstants.java b/test/common/voldemort/VoldemortTestConstants.java index 3f74f99c3b..1272ed5d6e 100644 --- a/test/common/voldemort/VoldemortTestConstants.java +++ b/test/common/voldemort/VoldemortTestConstants.java @@ -58,6 +58,10 @@ public static String getTwoNodeClusterXml() { return readString("config/two-node-cluster.xml"); } + public static String getThreeNodeClusterWith3ZonesXml() { + return readString("config/three-node-cluster-with-3-zones.xml"); + } + public static String getStoreWithTwoKeyVersions() { return readString("config/store-with-two-key-versions.xml"); } @@ -66,6 +70,18 @@ public static Cluster getTwoNodeCluster() { return new ClusterMapper().readCluster(new StringReader(getTwoNodeClusterXml())); } + public static Cluster getThreeNodeClusterWith3Zones() { + return new ClusterMapper().readCluster(new StringReader(getThreeNodeClusterWith3ZonesXml())); + } + + public static String getSixNodeClusterWith3ZonesXml() { + return readString("config/six-node-cluster-with-3-zones.xml"); + } + + public static Cluster getSixNodeClusterWith3Zones() { + return new ClusterMapper().readCluster(new StringReader(getSixNodeClusterWith3ZonesXml())); + } + public static String getTenNodeClusterXml() { return readString("config/ten-node-cluster.xml"); } @@ -86,6 +102,10 @@ public static String getEightNodeClusterWithZonesXml() { return readString("config/eight-node-cluster-with-zones.xml"); } + public static String getNineNodeClusterWith3ZonesXml() { + return readString("config/nine-node-cluster-with-3-zones.xml"); + } + public static String getSingleStoreWithZonesXml() { return readString("config/single-store-with-zones.xml"); } @@ -114,6 +134,10 @@ public static Cluster getEightNodeClusterWithZones() { return new ClusterMapper().readCluster(new StringReader(getEightNodeClusterWithZonesXml())); } + public static Cluster getNineNodeClusterWith3Zones() { + return new ClusterMapper().readCluster(new StringReader(getNineNodeClusterWith3ZonesXml())); + } + private static String readString(String filename) { try { return IOUtils.toString(VoldemortTestConstants.class.getResourceAsStream(filename)); diff --git a/test/common/voldemort/config/nine-node-cluster-with-3-zones.xml b/test/common/voldemort/config/nine-node-cluster-with-3-zones.xml new file mode 100644 index 0000000000..032ac9aa10 --- /dev/null +++ b/test/common/voldemort/config/nine-node-cluster-with-3-zones.xml @@ -0,0 +1,97 @@ + + + mycluster + + 0 + 1,2 + + + 1 + 0,2 + + + 2 + 0,1 + + + 0 + localhost + 8080 + 6666 + 7666 + 16, 3, 21 + 0 + + + 1 + localhost + 8081 + 6667 + 7667 + 23, 18, 13 + 0 + + + 2 + localhost + 8082 + 6668 + 7668 + 22, 12 + 0 + + + 3 + localhost + 8083 + 6669 + 7669 + 2, 17, 24 + 1 + + + 4 + localhost + 8084 + 6670 + 7670 + 11, 25, 10 + 1 + + + 5 + localhost + 8085 + 6671 + 7671 + 4, 19, 5 + 1 + + + 6 + localhost + 8086 + 6672 + 7672 + 20, 0, 14 + 2 + + + 7 + localhost + 8087 + 6673 + 7673 + 1, 9, 6 + 2 + + + 8 + localhost + 8088 + 6674 + 7674 + 8, 7, 15 + 2 + + \ No newline at end of file diff --git a/test/common/voldemort/config/six-node-cluster-with-3-zones.xml b/test/common/voldemort/config/six-node-cluster-with-3-zones.xml new file mode 100644 index 0000000000..0746efb1a4 --- /dev/null +++ b/test/common/voldemort/config/six-node-cluster-with-3-zones.xml @@ -0,0 +1,70 @@ + + + mycluster + + 0 + 1,2 + + + 1 + 0,2 + + + 2 + 0,1 + + + 0 + localhost + 8080 + 6666 + 7666 + 16, 3, 7, 20, 21 + 0 + + + 1 + localhost + 8081 + 6667 + 7667 + 0, 15, 23, 18, 13 + 0 + + + 2 + localhost + 8082 + 6668 + 7668 + 8, 14, 22, 12 + 1 + + + 3 + localhost + 8083 + 6669 + 7669 + 1, 2, 17, 24 + 1 + + + 4 + localhost + 8084 + 6670 + 7670 + 9, 11, 25, 10 + 2 + + + 5 + localhost + 8085 + 6671 + 7671 + 4, 6, 19, 5 + 2 + + \ No newline at end of file diff --git a/test/common/voldemort/config/three-node-cluster-with-3-zones.xml b/test/common/voldemort/config/three-node-cluster-with-3-zones.xml new file mode 100644 index 0000000000..2ec6966e09 --- /dev/null +++ b/test/common/voldemort/config/three-node-cluster-with-3-zones.xml @@ -0,0 +1,43 @@ + + + mycluster + + 0 + 1,2 + + + 1 + 0,2 + + + 2 + 0,1 + + + 0 + localhost + 8080 + 6666 + 7666 + 0, 3 + 0 + + + 1 + localhost + 8080 + 6667 + 7667 + 1, 4 + 1 + + + 2 + localhost + 8080 + 6668 + 7668 + 2, 5 + 2 + + diff --git a/test/unit/voldemort/store/routed/RoutedStoreTest.java b/test/unit/voldemort/store/routed/RoutedStoreTest.java index 46bc5e3f56..636ec97652 100644 --- a/test/unit/voldemort/store/routed/RoutedStoreTest.java +++ b/test/unit/voldemort/store/routed/RoutedStoreTest.java @@ -28,6 +28,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -49,6 +50,7 @@ import voldemort.client.TimeoutConfig; import voldemort.cluster.Cluster; import voldemort.cluster.Node; +import voldemort.cluster.Zone; import voldemort.cluster.failuredetector.BannagePeriodFailureDetector; import voldemort.cluster.failuredetector.FailureDetector; import voldemort.cluster.failuredetector.FailureDetectorConfig; @@ -56,6 +58,7 @@ import voldemort.routing.RoutingStrategy; import voldemort.routing.RoutingStrategyFactory; import voldemort.routing.RoutingStrategyType; +import voldemort.routing.StoreRoutingPlan; import voldemort.serialization.SerializerDefinition; import voldemort.store.AbstractByteArrayStoreTest; import voldemort.store.FailingReadsStore; @@ -100,6 +103,7 @@ public class RoutedStoreTest extends AbstractByteArrayStoreTest { public static final int OPERATION_TIMEOUT = 60; private Cluster cluster; + private StoreDefinition storeDef; private final ByteArray aKey = TestUtils.toByteArray("jay"); private final byte[] aValue = "kreps".getBytes(); private final byte[] aTransform = "transform".getBytes(); @@ -157,7 +161,7 @@ private RoutedStore getStore(Cluster cluster, int reads, int writes, int threads threads, failing, 0, - RoutingStrategyType.TO_ALL_STRATEGY, + RoutingStrategyType.CONSISTENT_STRATEGY, new VoldemortException()); } @@ -194,13 +198,13 @@ else if(count < failing + sleepy) } setFailureDetector(subStores); - StoreDefinition storeDef = ServerTestUtils.getStoreDef("test", - reads + writes, - reads, - reads, - writes, - writes, - strategy); + this.storeDef = ServerTestUtils.getStoreDef("test", + reads + writes, + reads, + reads, + writes, + writes, + strategy); routedStoreThreadPool = Executors.newFixedThreadPool(threads); RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(isPipelineRoutedStoreEnabled, routedStoreThreadPool, @@ -210,6 +214,46 @@ else if(count < failing + sleepy) return routedStoreFactory.create(cluster, storeDef, subStores, true, failureDetector); } + public Store getZonedStore() throws Exception { + cluster = VoldemortTestConstants.getNineNodeClusterWith3Zones(); + HashMap zoneReplicationFactor = Maps.newHashMap(); + zoneReplicationFactor.put(0, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(1, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(2, cluster.getNumberOfNodesInZone(0)); + + return new InconsistencyResolvingStore(getStore(cluster, + cluster.getNumberOfNodes(), + cluster.getNumberOfNodes(), + cluster.getNumberOfZones() - 1, + cluster.getNumberOfZones() - 1, + 4, + zoneReplicationFactor), + new VectorClockInconsistencyResolver()); + } + + private RoutedStore getStore(Cluster cluster, + int reads, + int writes, + int zonereads, + int zonewrites, + int threads, + HashMap zoneReplicationFactor) throws Exception { + return getStore(cluster, + reads, + writes, + zonereads, + zonewrites, + threads, + null, + null, + zoneReplicationFactor, + RoutingStrategyType.ZONE_STRATEGY, + 0, + BANNAGE_PERIOD, + null); + + } + private RoutedStore getStore(Cluster cluster, int reads, int writes, @@ -224,7 +268,6 @@ private RoutedStore getStore(Cluster cluster, long timeOutMs, VoldemortException e) throws Exception { Map> subStores = Maps.newHashMap(); - int count = 0; for(Node n: cluster.getNodes()) { Store subStore = null; @@ -237,21 +280,19 @@ else if(sleepy != null && sleepy.contains(n.getId())) subStore = new InMemoryStorageEngine("test"); subStores.put(n.getId(), subStore); - - count += 1; } setFailureDetector(subStores); - StoreDefinition storeDef = ServerTestUtils.getStoreDef("test", - reads, - reads, - writes, - writes, - zonereads, - zonewrites, - zoneReplicationFactor, - HintedHandoffStrategyType.PROXIMITY_STRATEGY, - strategy); + this.storeDef = ServerTestUtils.getStoreDef("test", + reads, + reads, + writes, + writes, + zonereads, + zonewrites, + zoneReplicationFactor, + HintedHandoffStrategyType.PROXIMITY_STRATEGY, + strategy); routedStoreThreadPool = Executors.newFixedThreadPool(threads); RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(true, routedStoreThreadPool, @@ -292,51 +333,266 @@ private void assertNOrMoreEqual(RoutedStore routedStore, + value + "', but found " + count + ".", expected <= count); } - private void testBasicOperations(int reads, int writes, int failures, int threads) - throws Exception { - RoutedStore routedStore = getStore(cluster, reads, writes, threads, failures); + /** + * In case of Zoned cluster, there is a non trivial time required for the + * delete. The custom timeout is used to account for this delay. + */ + private void waitForOperationToComplete(long customSleepTime) { + if(customSleepTime > 0) { + try { + Thread.sleep(customSleepTime); + } catch(Exception e) {} + } + + } + + private void testBasicOperations(int reads, + int writes, + int failures, + int threads, + RoutedStore customRoutedStore, + long customSleepTime) throws Exception { + + RoutedStore routedStore = null; + if(customRoutedStore == null) { + routedStore = getStore(cluster, reads, writes, threads, failures); + } else { + routedStore = customRoutedStore; + } + Store store = new InconsistencyResolvingStore(routedStore, new VectorClockInconsistencyResolver()); + VectorClock clock = getClock(1); Versioned versioned = new Versioned(aValue, clock); routedStore.put(aKey, versioned, aTransform); + + waitForOperationToComplete(customSleepTime); assertNOrMoreEqual(routedStore, cluster.getNumberOfNodes() - failures, aKey, versioned); + List> found = store.get(aKey, aTransform); assertEquals(1, found.size()); assertEquals(versioned, found.get(0)); + + waitForOperationToComplete(customSleepTime); assertNOrMoreEqual(routedStore, cluster.getNumberOfNodes() - failures, aKey, versioned); + assertTrue(routedStore.delete(aKey, versioned.getVersion())); + + waitForOperationToComplete(customSleepTime); assertNEqual(routedStore, 0, aKey, versioned); + assertTrue(!routedStore.delete(aKey, versioned.getVersion())); } @Test public void testBasicOperationsSingleThreaded() throws Exception { - testBasicOperations(cluster.getNumberOfNodes(), cluster.getNumberOfNodes(), 0, 1); + testBasicOperations(cluster.getNumberOfNodes(), cluster.getNumberOfNodes(), 0, 1, null, 0); + } + + /** + * Test to ensure that the basic operations can be performed successfully + * against a 3 zone cluster with a single thread. + * + * @throws Exception + */ + @Test + public void testBasicOperationsZZZSingleThreaded() throws Exception { + cluster = VoldemortTestConstants.getNineNodeClusterWith3Zones(); + + HashMap zoneReplicationFactor = Maps.newHashMap(); + zoneReplicationFactor.put(0, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(1, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(2, cluster.getNumberOfNodesInZone(0)); + + // PR = RR = #nodes in a zone + // PW = RW = #nodes in a zone + // Zone Reads = # Zones - 1 + // Zone Writes = # Zones - 1 + // Threads = 1 + RoutedStore routedStore = getStore(cluster, + cluster.getNumberOfNodesInZone(0), + cluster.getNumberOfNodesInZone(0), + cluster.getNumberOfZones() - 1, + cluster.getNumberOfZones() - 1, + 1, + zoneReplicationFactor); + + testBasicOperations(cluster.getNumberOfNodes(), + cluster.getNumberOfNodes(), + 0, + 0, + routedStore, + 500); } @Test public void testBasicOperationsMultiThreaded() throws Exception { - testBasicOperations(cluster.getNumberOfNodes(), cluster.getNumberOfNodes(), 0, 4); + testBasicOperations(cluster.getNumberOfNodes(), cluster.getNumberOfNodes(), 0, 4, null, 0); + } + + /** + * Test to ensure that the basic operations can be performed successfully + * against a 3 zone cluster with multiple threads. + * + * @throws Exception + */ + + @Test + public void testBasicOperationsZZZMultiThreaded() throws Exception { + cluster = VoldemortTestConstants.getNineNodeClusterWith3Zones(); + + HashMap zoneReplicationFactor = Maps.newHashMap(); + zoneReplicationFactor.put(0, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(1, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(2, cluster.getNumberOfNodesInZone(0)); + + // PR = RR = #nodes in a zone + // PW = RW = #nodes in a zone + // Zone Reads = # Zones - 1 + // Zone Writes = # Zones - 1 + // Threads = 4 + RoutedStore routedStore = getStore(cluster, + cluster.getNumberOfNodesInZone(0), + cluster.getNumberOfNodesInZone(0), + cluster.getNumberOfZones() - 1, + cluster.getNumberOfZones() - 1, + 4, + zoneReplicationFactor); + + testBasicOperations(cluster.getNumberOfNodes(), + cluster.getNumberOfNodes(), + 0, + 0, + routedStore, + 500); } @Test public void testBasicOperationsMultiThreadedWithFailures() throws Exception { - testBasicOperations(cluster.getNumberOfNodes() - 2, cluster.getNumberOfNodes() - 2, 2, 4); + testBasicOperations(cluster.getNumberOfNodes() - 2, + cluster.getNumberOfNodes() - 2, + 2, + 4, + null, + 0); } - private void testBasicOperationFailure(int reads, int writes, int failures, int threads) - throws Exception { - VectorClock clock = getClock(1); - Versioned versioned = new Versioned(aValue, clock); + /** + * Test to ensure that the basic operations can be performed successfully + * against a 3 zone cluster in the presence of failures. + * + * @throws Exception + */ + + @Test + public void testBasicOperationsZZZMultiThreadedWithFailures() throws Exception { + cluster = VoldemortTestConstants.getNineNodeClusterWith3Zones(); + + HashMap zoneReplicationFactor = Maps.newHashMap(); + zoneReplicationFactor.put(0, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(1, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(2, cluster.getNumberOfNodesInZone(0)); + + // This configuration uses zone reads = 0 in order to avoid getting + // InsufficientZoneResponsesException. Please check the comment at the + // end of PerformSerialRequests.java to understand this. + + // PR = RR = #nodes in a zone - 1 + // PW = RW = #nodes in a zone - 1 + // Zone Reads = 0 + // Zone Writes = # Zones - 1 + // Failing nodes = 1 from each zone + // Threads = 4 RoutedStore routedStore = getStore(cluster, - reads, - writes, - threads, - failures, + cluster.getNumberOfNodesInZone(0) - 1, + cluster.getNumberOfNodesInZone(0) - 1, 0, - RoutingStrategyType.TO_ALL_STRATEGY, - new UnreachableStoreException("no go")); + cluster.getNumberOfZones() - 1, + 4, + Sets.newHashSet(1, 5, 6), + null, + zoneReplicationFactor, + RoutingStrategyType.ZONE_STRATEGY, + 0, + BANNAGE_PERIOD, + new VoldemortException()); + + testBasicOperations(cluster.getNumberOfNodes(), + cluster.getNumberOfNodes(), + 3, + 0, + routedStore, + 1000); + } + + /** + * Test to ensure that the basic operations occur correctly, in the presence + * of some sleepy nodes. NOTE: For some selection of the sleepy nodes, it is + * possible that the operation will timeout. This particular set of sleepy + * nodes is designed for the chosen key and the chosen global timeout. + * + * @throws Exception + */ + @Test + public void testBasicOperationsZZZMultiThreadedWithDelays() throws Exception { + cluster = VoldemortTestConstants.getNineNodeClusterWith3Zones(); + + HashMap zoneReplicationFactor = Maps.newHashMap(); + zoneReplicationFactor.put(0, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(1, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(2, cluster.getNumberOfNodesInZone(0)); + + // PR = RR = #nodes in a zone - 1 + // PW = RW = #nodes in a zone - 1 + // Zone Reads = 1 + // Zone Writes = 1 + // Sleepy nodes = 1 from each zone + // Threads = 4 + RoutedStore routedStore = getStore(cluster, + cluster.getNumberOfNodesInZone(0) - 1, + cluster.getNumberOfNodesInZone(0) - 1, + 1, + 1, + 4, + null, + Sets.newHashSet(2, 4, 8), + zoneReplicationFactor, + RoutingStrategyType.ZONE_STRATEGY, + SLEEPY_TIME, + 100, + new VoldemortException()); + + testBasicOperations(cluster.getNumberOfNodes(), + cluster.getNumberOfNodes(), + 3, + 0, + routedStore, + 1000); + } + + private void testBasicOperationFailure(int reads, + int writes, + int failures, + int threads, + RoutedStore customRoutedStore) throws Exception { + VectorClock clock = getClock(1); + Versioned versioned = new Versioned(aValue, clock); + + RoutedStore routedStore = null; + if(customRoutedStore == null) { + routedStore = getStore(cluster, + reads, + writes, + threads, + failures, + 0, + RoutingStrategyType.TO_ALL_STRATEGY, + new UnreachableStoreException("no go")); + } else { + routedStore = customRoutedStore; + } + try { routedStore.put(aKey, versioned, aTransform); fail("Put succeeded with too few operational nodes."); @@ -362,7 +618,98 @@ public void testBasicOperationFailureMultiThreaded() throws Exception { testBasicOperationFailure(cluster.getNumberOfNodes() - 2, cluster.getNumberOfNodes() - 2, 4, - 4); + 4, + null); + } + + /** + * Test to ensure that the basic operations fail in the presence of bad + * nodes in a 3 zone cluster. + * + * @throws Exception + */ + @Test + public void testBasicOperationFailureZZZMultiThreaded() throws Exception { + cluster = VoldemortTestConstants.getNineNodeClusterWith3Zones(); + + HashMap zoneReplicationFactor = Maps.newHashMap(); + zoneReplicationFactor.put(0, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(1, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(2, cluster.getNumberOfNodesInZone(0)); + + // PR = RR = 7 + // PW = RW = 7 + // Zone Reads = # Zones - 1 + // Zone Writes = # Zones - 1 + // Failing nodes = 1 from each zone + // Threads = 4 + RoutedStore zonedRoutedStore = getStore(cluster, + 7, + 7, + cluster.getNumberOfZones() - 1, + cluster.getNumberOfZones() - 1, + 4, + Sets.newHashSet(1, 5, 6), + null, + zoneReplicationFactor, + RoutingStrategyType.ZONE_STRATEGY, + 0, + BANNAGE_PERIOD, + new UnreachableStoreException("no go")); + + testBasicOperationFailure(cluster.getNumberOfNodes() - 2, + cluster.getNumberOfNodes() - 2, + 0, + 0, + zonedRoutedStore); + } + + /** + * Test to ensure that the basic operations fail, in the presence of some + * sleepy nodes and zone count reads and writes = 2 in a 3 zone cluster. + * + * @throws Exception + */ + @Test + public void testBasicOperationsFailureZZZMultiThreadedWithDelays() throws Exception { + cluster = VoldemortTestConstants.getNineNodeClusterWith3Zones(); + + HashMap zoneReplicationFactor = Maps.newHashMap(); + zoneReplicationFactor.put(0, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(1, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(2, cluster.getNumberOfNodesInZone(0)); + + // PR = RR = #nodes in a zone - 1 + // PW = RW = #nodes in a zone - 1 + // Zone Reads = # Zones - 1 + // Zone Writes = # Zones - 1 + // Sleepy nodes = 1 from each zone + // Threads = 4 + RoutedStore routedStore = getStore(cluster, + cluster.getNumberOfNodesInZone(0) - 1, + cluster.getNumberOfNodesInZone(0) - 1, + cluster.getNumberOfZones() - 1, + cluster.getNumberOfZones() - 1, + 4, + null, + Sets.newHashSet(2, 4, 8), + zoneReplicationFactor, + RoutingStrategyType.ZONE_STRATEGY, + SLEEPY_TIME, + OPERATION_TIMEOUT, + new VoldemortException()); + + try { + testBasicOperations(cluster.getNumberOfNodes(), + cluster.getNumberOfNodes(), + 3, + 0, + routedStore, + 1000); + fail("Too few successful zone responses. Should've failed."); + } catch(InsufficientZoneResponsesException ize) { + // Expected + } } @Test @@ -378,6 +725,19 @@ public void testPutIncrementsVersion() throws Exception { copy.compare(found.get(0).getVersion())); } + @Test + public void testPutIncrementsVersionZZZ() throws Exception { + Store store = getZonedStore(); + VectorClock clock = new VectorClock(); + VectorClock copy = clock.clone(); + store.put(aKey, new Versioned(getValue(), clock), aTransform); + List> found = store.get(aKey, aTransform); + assertEquals("Invalid number of items found.", 1, found.size()); + assertEquals("Version not incremented properly", + Occurred.BEFORE, + copy.compare(found.get(0).getVersion())); + } + @Test public void testObsoleteMasterFails() { // write me @@ -752,16 +1112,30 @@ public void testGetVersions2() throws Exception { assertEquals(0, store.getVersions(keys.get(1)).size()); } - /** - * Tests that getAll works correctly with a node down in a two node cluster. - */ @Test - public void testGetAllWithNodeDown() throws Exception { - cluster = VoldemortTestConstants.getTwoNodeCluster(); + public void testGetVersions2ZZZ() throws Exception { + List keys = getKeys(2); + ByteArray key = keys.get(0); + byte[] value = getValue(); + Store store = getZonedStore(); + store.put(key, Versioned.value(value), null); + List> versioneds = store.get(key, null); + List versions = store.getVersions(key); + assertEquals(1, versioneds.size()); + assertEquals(9, versions.size()); + for(int i = 0; i < versions.size(); i++) + assertEquals(versioneds.get(0).getVersion(), versions.get(i)); - RoutedStore routedStore = getStore(cluster, 1, 2, 1, 0); - Store store = new InconsistencyResolvingStore(routedStore, - new VectorClockInconsistencyResolver()); + assertEquals(0, store.getVersions(keys.get(1)).size()); + } + + /** + * Util function to test getAll with one node down + * + * @param store The Routed store object used to perform the put and getall + * @throws Exception + */ + private void getAllWithNodeDown(Store store) throws Exception { Map expectedValues = Maps.newHashMap(); for(byte i = 1; i < 11; ++i) { @@ -781,6 +1155,50 @@ public void testGetAllWithNodeDown() throws Exception { } } + /** + * Tests that getAll works correctly with a node down in a two node cluster. + */ + @Test + public void testGetAllWithNodeDown() throws Exception { + cluster = VoldemortTestConstants.getTwoNodeCluster(); + + RoutedStore routedStore = getStore(cluster, 1, 2, 1, 0); + Store store = new InconsistencyResolvingStore(routedStore, + new VectorClockInconsistencyResolver()); + getAllWithNodeDown(store); + } + + /** + * Tests that getAll works correctly with a node down in a three node three + * zone cluster. + */ + @Test + public void testGetAllWithNodeDownZZZ() throws Exception { + cluster = VoldemortTestConstants.getThreeNodeClusterWith3Zones(); + + HashMap zoneReplicationFactor = Maps.newHashMap(); + zoneReplicationFactor.put(0, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(1, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(2, cluster.getNumberOfNodesInZone(0)); + + // PR = RR = 2 + // PW = RW = 3 + // Zone Reads = 0 + // Zone Writes = 0 + // Threads = 1 + RoutedStore routedStore = getStore(cluster, + cluster.getNumberOfNodes() - 1, + cluster.getNumberOfNodes(), + 0, + 0, + 1, + zoneReplicationFactor); + + Store store = new InconsistencyResolvingStore(routedStore, + new VectorClockInconsistencyResolver()); + getAllWithNodeDown(store); + } + /** * Tests that getAll returns partial results */ @@ -850,6 +1268,101 @@ public void testPartialGetAll() throws Exception { } } + /** + * Tests that getAll returns partial results in a 3 zone cluster (with a + * node down). + */ + @Test + public void testPartialGetAllZZZ() throws Exception { + + // Set replication factors for a 3 zone cluster + HashMap zoneReplicationFactor = Maps.newHashMap(); + zoneReplicationFactor.put(0, 1); + zoneReplicationFactor.put(1, 1); + zoneReplicationFactor.put(2, 1); + + // Create a store with RF=3, Required reads = 3 and zone count reads = 2 + // This ensures that a GET operation requires a response from all 3 + // nodes (from the respective 3 zones) + StoreDefinition definition = new StoreDefinitionBuilder().setName("test") + .setType("foo") + .setKeySerializer(new SerializerDefinition("test")) + .setValueSerializer(new SerializerDefinition("test")) + .setRoutingPolicy(RoutingTier.CLIENT) + .setRoutingStrategyType(RoutingStrategyType.ZONE_STRATEGY) + .setHintedHandoffStrategy(HintedHandoffStrategyType.PROXIMITY_STRATEGY) + .setReplicationFactor(3) + .setPreferredReads(3) + .setRequiredReads(3) + .setPreferredWrites(1) + .setRequiredWrites(1) + .setZoneCountReads(2) + .setZoneCountWrites(1) + .setZoneReplicationFactor(zoneReplicationFactor) + .build(); + + Map> stores = new HashMap>(); + List nodes = new ArrayList(); + // create nodes with varying speeds - 100ms, 200ms, 300ms + for(int i = 0; i < 3; i++) { + Store store = new SleepyStore(100 * (i + 1), + new InMemoryStorageEngine("test")); + stores.put(i, store); + List partitions = Arrays.asList(i); + + // Create zoned nodes - one in each zone (0, 1, 2) + nodes.add(new Node(i, "none", 0, 0, 0, i, partitions)); + } + setFailureDetector(stores); + + routedStoreThreadPool = Executors.newFixedThreadPool(3); + + TimeoutConfig timeoutConfig = new TimeoutConfig(1500, true); + // This means, the getall will only succeed on two of the nodes + timeoutConfig.setOperationTimeout(VoldemortOpCode.GET_ALL_OP_CODE, 250); + RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(true, + routedStoreThreadPool, + timeoutConfig); + + List zones = Lists.newArrayList(); + + for(int i = 0; i < 3; i++) { + LinkedList zoneProximityList = Lists.newLinkedList(); + Set zoneIds = Sets.newHashSet(0, 1, 2); + zoneIds.remove(i); + zoneProximityList.addAll(zoneIds); + zones.add(new Zone(i, zoneProximityList)); + } + + RoutedStore routedStore = routedStoreFactory.create(new Cluster("test", nodes, zones), + definition, + stores, + true, + failureDetector); + /* do some puts so we have some data to test getalls */ + Map expectedValues = Maps.newHashMap(); + for(byte i = 1; i < 11; ++i) { + ByteArray key = new ByteArray(new byte[] { i }); + byte[] value = new byte[] { (byte) (i + 50) }; + routedStore.put(key, Versioned.value(value), null); + expectedValues.put(key, value); + } + + /* 1. positive test; if partial is on, should get something back */ + Map>> all = routedStore.getAll(expectedValues.keySet(), + null); + assert (expectedValues.size() > all.size()); + + /* 2. negative test; if partial is off, should fail the whole operation */ + timeoutConfig.setPartialGetAllAllowed(false); + try { + all = routedStore.getAll(expectedValues.keySet(), null); + fail("Should have failed"); + } catch(Exception e) { + + } + } + @Test public void testGetAllWithFailingStore() throws Exception { cluster = VoldemortTestConstants.getTwoNodeCluster(); @@ -901,6 +1414,78 @@ public void testGetAllWithFailingStore() throws Exception { } } + /** + * Test to ensure that get all works in a 3 zone cluster with 2 nodes per + * zone and 1 node down in each zone. + */ + @Test + public void testGetAllWithFailingStoreZZZ() throws Exception { + cluster = VoldemortTestConstants.getSixNodeClusterWith3Zones(); + HashMap zoneReplicationFactor = Maps.newHashMap(); + zoneReplicationFactor.put(0, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(1, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(2, cluster.getNumberOfNodesInZone(0)); + + // PR = RR = 3 + // PW = RW = 6 + // Zone Reads = 2 + // Zone Writes = 2 + StoreDefinition storeDef = ServerTestUtils.getStoreDef("test", + 3, + 3, + 6, + 6, + 2, + 2, + zoneReplicationFactor, + HintedHandoffStrategyType.PROXIMITY_STRATEGY, + RoutingStrategyType.ZONE_STRATEGY); + + Map> subStores = Maps.newHashMap(); + + for(int id = 0; id < 6; id++) { + // Mark all the even nodes as normal and odd as read-failing + // This ensures that one node in each zone is read-failing and the + // other is normal + if(id % 2 == 0) { + subStores.put(id, new InMemoryStorageEngine("test")); + } else { + subStores.put(id, new FailingReadsStore("test")); + } + } + + setFailureDetector(subStores); + routedStoreThreadPool = Executors.newFixedThreadPool(1); + RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(true, + routedStoreThreadPool, + new TimeoutConfig(BANNAGE_PERIOD, + false)); + + RoutedStore routedStore = routedStoreFactory.create(cluster, + storeDef, + subStores, + true, + failureDetector); + + Store store = new InconsistencyResolvingStore(routedStore, + new VectorClockInconsistencyResolver()); + + Map expectedValues = Maps.newHashMap(); + for(byte i = 1; i < 11; ++i) { + ByteArray key = new ByteArray(new byte[] { i }); + byte[] value = new byte[] { (byte) (i + 50) }; + store.put(key, Versioned.value(value), null); + expectedValues.put(key, value); + } + + Map>> all = store.getAll(expectedValues.keySet(), null); + assertEquals(expectedValues.size(), all.size()); + for(Map.Entry>> mapEntry: all.entrySet()) { + byte[] value = expectedValues.get(mapEntry.getKey()); + assertEquals(new ByteArray(value), new ByteArray(mapEntry.getValue().get(0).getValue())); + } + } + /** * One node up, two preferred reads and one required read. See: * @@ -958,43 +1543,147 @@ public void testGetAllWithMorePreferredReadsThanNodes() throws Exception { public void testReadRepairWithFailures() throws Exception { cluster = getNineNodeCluster(); - RoutedStore routedStore = getStore(cluster, - cluster.getNumberOfNodes() - 1, - cluster.getNumberOfNodes() - 1, - 1, - 0); - // Disable node 1 so that the first put also goes to the last node - recordException(failureDetector, Iterables.get(cluster.getNodes(), 1)); + RoutedStore routedStore = getStore(cluster, 2, 2, 1, 0); + StoreRoutingPlan routingPlan = new StoreRoutingPlan(cluster, this.storeDef); + List replicatingNodes = routingPlan.getReplicationNodeList(aKey.get()); + // This is node 1 + Node primaryNode = Iterables.get(cluster.getNodes(), replicatingNodes.get(0)); + // This is node 6 + Node secondaryNode = Iterables.get(cluster.getNodes(), replicatingNodes.get(1)); + + // Disable primary node so that the first put happens with 6 as the + // pseudo master + recordException(failureDetector, primaryNode); Store store = new InconsistencyResolvingStore(routedStore, new VectorClockInconsistencyResolver()); store.put(aKey, new Versioned(aValue), null); byte[] anotherValue = "john".getBytes(); - // Disable the last node and enable node 1 to prevent the last node from - // getting the new version - recordException(failureDetector, Iterables.getLast(cluster.getNodes())); - recordSuccess(failureDetector, Iterables.get(cluster.getNodes(), 1)); - VectorClock clock = getClock(1); + /* + * Disable the secondary node and enable primary node to prevent the + * secondary from getting the new version + */ + recordException(failureDetector, secondaryNode); + recordSuccess(failureDetector, primaryNode); + // Generate the clock based off secondary so that the resulting clock + // will be [1:1, 6:1] across the replicas, except for the secondary + // which will be [6:1] + VectorClock clock = getClock(6); store.put(aKey, new Versioned(anotherValue, clock), null); - // Enable last node and disable node 1, the following get should cause a - // read repair on the last node in the code path that is only executed - // if there are failures. - recordException(failureDetector, Iterables.get(cluster.getNodes(), 1)); - recordSuccess(failureDetector, Iterables.getLast(cluster.getNodes())); + // Enable secondary and disable primary, the following get should cause + // a read repair on the secondary in the code path that is only executed + // if there are failures. This should repair the secondary with the + // superceding clock [1:1,6:1] + recordException(failureDetector, primaryNode); + recordSuccess(failureDetector, secondaryNode); List> versioneds = store.get(aKey, null); assertEquals(1, versioneds.size()); assertEquals(new ByteArray(anotherValue), new ByteArray(versioneds.get(0).getValue())); // Read repairs are done asynchronously, so we sleep for a short period. // It may be a good idea to use a synchronous executor service. - Thread.sleep(100); - for(Store innerStore: routedStore.getInnerStores().values()) { - List> innerVersioneds = innerStore.get(aKey, null); + Thread.sleep(500); + for(Map.Entry> innerStoreEntry: routedStore.getInnerStores() + .entrySet()) { + // Only look at the nodes in the pref list + if(replicatingNodes.contains(innerStoreEntry.getKey())) { + List> innerVersioneds = innerStoreEntry.getValue() + .get(aKey, null); + assertEquals(1, versioneds.size()); + assertEquals(new ByteArray(anotherValue), new ByteArray(innerVersioneds.get(0) + .getValue())); + } + } + } + + /** + * Test to ensure that read repair happens correctly across zones in case of + * inconsistent writes in a 3 zone cluster. + * + * @throws Exception + */ + @Test + public void testReadRepairWithFailuresZZZ() throws Exception { + cluster = VoldemortTestConstants.getSixNodeClusterWith3Zones(); + HashMap zoneReplicationFactor = Maps.newHashMap(); + zoneReplicationFactor.put(0, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(1, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(2, cluster.getNumberOfNodesInZone(0)); + + // PR = RR = 6 + // PW = RW = 4 + // Zone Reads = # Zones - 1 + // Zone Writes = 1 + // Threads = 1 + RoutedStore routedStore = getStore(cluster, + 6, + 4, + cluster.getNumberOfZones() - 1, + 1, + 1, + zoneReplicationFactor); + + Store store = new InconsistencyResolvingStore(routedStore, + new VectorClockInconsistencyResolver()); + + StoreRoutingPlan routingPlan = new StoreRoutingPlan(cluster, this.storeDef); + List replicatingNodes = routingPlan.getReplicationNodeList(aKey.get()); + + try { + // Do the initial put with all nodes up + store.put(aKey, new Versioned(aValue), null); + + List initialVersions = store.getVersions(aKey); + assertEquals(6, initialVersions.size()); + + Version mainVersion = initialVersions.get(0); + for(int i = 1; i < initialVersions.size(); i++) { + assertEquals(mainVersion, initialVersions.get(i)); + } + + // Do another put with all nodes in the zone 0 marked as + // unavailable. This will force the put to use a different pseudo + // master than before. + byte[] anotherValue = "john".getBytes(); + + // In this cluster, nodes 0 and 1 are in Zone 0. Mark them + // unavailable + recordException(failureDetector, cluster.getNodeById(0)); + recordException(failureDetector, cluster.getNodeById(1)); + Version newVersion = ((VectorClock) mainVersion).clone(); + store.put(aKey, new Versioned(anotherValue, newVersion), null); + + waitForOperationToComplete(500); + + // Mark the nodes in Zone 0 as available and do a get. The Required + // reads = 4 and Zone count reads = 2 will force the client to read + // from all the zones and do the essential read repairs. + recordSuccess(failureDetector, cluster.getNodeById(0)); + recordSuccess(failureDetector, cluster.getNodeById(1)); + List> versioneds = store.get(aKey, null); assertEquals(1, versioneds.size()); - assertEquals(new ByteArray(anotherValue), new ByteArray(innerVersioneds.get(0) - .getValue())); + assertEquals(new ByteArray(anotherValue), new ByteArray(versioneds.get(0).getValue())); + + // Read repairs are done asynchronously, so we sleep for a short + // period. It may be a good idea to use a synchronous executor + // service. + Thread.sleep(500); + for(Map.Entry> innerStoreEntry: routedStore.getInnerStores() + .entrySet()) { + // Only look at the nodes in the pref list + if(replicatingNodes.contains(innerStoreEntry.getKey())) { + List> innerVersioneds = innerStoreEntry.getValue().get(aKey, + null); + assertEquals(1, versioneds.size()); + assertEquals(new ByteArray(anotherValue), + new ByteArray(innerVersioneds.get(0).getValue())); + } + } + + } catch(VoldemortException ve) { + fail("Unexpected error occurred : " + ve); } } @@ -1052,6 +1741,55 @@ public void testPutWithOneNodeDownAndOneNodeSlow() throws Exception { store.put(aKey, new Versioned(aValue), aTransform); } + /** + * See issue #134: RoutedStore put() doesn't wait for enough attempts to + * succeed + * + * This issue would only happen with one node down and another that was slow + * to respond in a 3 zone cluster. + */ + @Test + public void testPutWithOneNodeDownAndOneNodeSlowZZZ() throws Exception { + cluster = VoldemortTestConstants.getSixNodeClusterWith3Zones(); + HashMap zoneReplicationFactor = Maps.newHashMap(); + zoneReplicationFactor.put(0, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(1, cluster.getNumberOfNodesInZone(0)); + zoneReplicationFactor.put(2, cluster.getNumberOfNodesInZone(0)); + + // The replication set for aKey is [1, 2, 0, 3, 5, 4] + // As per problem statement, set node 2 as the failing node and node 0 + // as the sleepy node + + // PR = RR = 4 + // PW = RW = 4 + // Zone Reads = 0 + // Zone Writes = 0 + // Failing nodes = Node 2 + // Sleepy node = Node 0 + // Threads = 4 + RoutedStore routedStore = getStore(cluster, + 4, + 4, + 0, + 0, + 4, + Sets.newHashSet(2), + Sets.newHashSet(0), + zoneReplicationFactor, + RoutingStrategyType.ZONE_STRATEGY, + 0, + BANNAGE_PERIOD, + new VoldemortException()); + Store store = new InconsistencyResolvingStore(routedStore, + new VectorClockInconsistencyResolver()); + + try { + store.put(aKey, new Versioned(aValue), aTransform); + } catch(VoldemortException ve) { + fail("Unknown exception occurred : " + ve); + } + } + @Test public void testPutTimeout() throws Exception { int timeout = 50; @@ -1158,6 +1896,94 @@ public void testGetTimeout() throws Exception { } } + @Test + public void testGetAndPutTimeoutZZZ() throws Exception { + int timeout = 50; + + // Set replication factors for a 3 zone cluster + HashMap zoneReplicationFactor = Maps.newHashMap(); + zoneReplicationFactor.put(0, 1); + zoneReplicationFactor.put(1, 1); + zoneReplicationFactor.put(2, 1); + + // Create a store with RF=3, Required reads = 3 and zone count reads = 2 + // This ensures that a GET operation requires a response from all 3 + // nodes (from the respective 3 zones) + StoreDefinition definition = new StoreDefinitionBuilder().setName("test") + .setType("foo") + .setKeySerializer(new SerializerDefinition("test")) + .setValueSerializer(new SerializerDefinition("test")) + .setRoutingPolicy(RoutingTier.CLIENT) + .setRoutingStrategyType(RoutingStrategyType.ZONE_STRATEGY) + .setHintedHandoffStrategy(HintedHandoffStrategyType.PROXIMITY_STRATEGY) + .setReplicationFactor(3) + .setPreferredReads(3) + .setRequiredReads(3) + .setPreferredWrites(3) + .setRequiredWrites(3) + .setZoneCountReads(2) + .setZoneCountWrites(2) + .setZoneReplicationFactor(zoneReplicationFactor) + .build(); + + Map> stores = new HashMap>(); + List nodes = new ArrayList(); + int totalDelay = 0; + for(int i = 0; i < 3; i++) { + int delay = 4 + i * timeout; + totalDelay += delay; + Store store = new SleepyStore(delay, + new InMemoryStorageEngine("test")); + stores.put(i, store); + List partitions = Arrays.asList(i); + nodes.add(new Node(i, "none", 0, 0, 0, i, partitions)); + } + + setFailureDetector(stores); + + routedStoreThreadPool = Executors.newFixedThreadPool(3); + RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(true, + routedStoreThreadPool, + new TimeoutConfig(timeout, + false)); + + List zones = Lists.newArrayList(); + + for(int i = 0; i < 3; i++) { + LinkedList zoneProximityList = Lists.newLinkedList(); + Set zoneIds = Sets.newHashSet(0, 1, 2); + zoneIds.remove(i); + zoneProximityList.addAll(zoneIds); + zones.add(new Zone(i, zoneProximityList)); + } + + RoutedStore routedStore = routedStoreFactory.create(new Cluster("test", nodes, zones), + definition, + stores, + true, + failureDetector); + + long start = System.nanoTime(); + try { + routedStore.get(new ByteArray("test".getBytes()), null); + fail("Should have thrown"); + } catch(InsufficientOperationalNodesException e) { + long elapsed = (System.nanoTime() - start) / Time.NS_PER_MS; + assertTrue(elapsed + " < " + totalDelay, elapsed < totalDelay); + } + + start = System.nanoTime(); + try { + routedStore.put(new ByteArray("test".getBytes()), + new Versioned(new byte[] { 1 }), + null); + fail("Should have thrown"); + } catch(InsufficientOperationalNodesException e) { + long elapsed = (System.nanoTime() - start) / Time.NS_PER_MS; + assertTrue(elapsed + " < " + totalDelay, elapsed < totalDelay); + } + } + @Test public void testOperationSpecificTimeouts() throws Exception { StoreDefinition definition = new StoreDefinitionBuilder().setName("test") @@ -1478,6 +2304,8 @@ private void setFailureDetector(Map> s if(failureDetector != null) failureDetector.destroy(); + // Bannage is not supported/recommended anymore. But makes sense for the + // purpose of this test. FailureDetectorConfig failureDetectorConfig = new FailureDetectorConfig().setImplementationClassName(failureDetectorClass.getName()) .setBannagePeriod(BANNAGE_PERIOD) .setCluster(cluster)