From 18c2931aff0ea65cc582d9df3bbd3e5a6768a45e Mon Sep 17 00:00:00 2001 From: Chinmay Soman Date: Thu, 11 Apr 2013 18:20:57 -0700 Subject: [PATCH] Adding another Hinted handoff failure test to ensure main thread returns with failure when all replicas dont respond --- .../routed/HintedHandoffFailureTest.java | 252 +++++++++++++----- 1 file changed, 187 insertions(+), 65 deletions(-) diff --git a/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java b/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java index bc3458e533..28629b6f77 100644 --- a/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java +++ b/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java @@ -100,7 +100,6 @@ public class HintedHandoffFailureTest { private static int R_WRITES = 1; private static final int NUM_THREADS = 3; private static final int NUM_NODES_TOTAL = 3; - private static int FAILED_NODE_ID = 0; private final String STORE_NAME = "test"; private Cluster cluster; @@ -111,8 +110,8 @@ public class HintedHandoffFailureTest { private RoutedStore store; private RoutingStrategy strategy; - private final static long routingTimeoutInMs = 1000; - private final static long sleepBeforeFailingInMs = 2000; + private static long routingTimeoutInMs = 1000; + private static long sleepBeforeFailingInMs = 2000; private static long delayBeforeHintedHandoff = 3000; private final Map> subStores = new ConcurrentHashMap>(); @@ -121,6 +120,11 @@ public class HintedHandoffFailureTest { private final Logger logger = Logger.getLogger(getClass()); + private enum FAILURE_MODE { + FAIL_FIRST_REPLICA_NODE, + FAIL_ALL_REPLICAS + } + private StoreDefinition getStoreDef(String storeName, int replicationFactor, int preads, @@ -158,22 +162,37 @@ private void setFailureDetector(Map> s failureDetector = FailureDetectorUtils.create(failureDetectorConfig, false); } + /** + * A wrapper for the actual customSetup method with the default failure mode + * as FAIL_FIRST_REPLICA_NODE + * + * @param key The ByteArray representation of the key + * + * @throws Exception + */ + public List customSetup(ByteArray key) throws Exception { + return customSetup(key, FAILURE_MODE.FAIL_FIRST_REPLICA_NODE); + } + /** * Setup a cluster with 3 nodes, with the following characteristics: * - * - 1st replica node: Sleepy force failing store (will throw an exception - * after a delay) + * If FAILURE_MODE is FAIL_FIRST_REPLICA_NODE set the first replica store to + * a sleepy force failing store * - * - Pseudo master and other replicas: Standard In-memory store (wrapped by - * Logging store) + * If FAILURE_MODE is FAIL_ALL_REPLICAS: set all replicas to sleepy force + * failing store * - * - In memory slop stores + * Pseudo master : Standard In-memory store (wrapped by Logging store) + * + * In memory slop stores * * @param key The ByteArray representation of the key + * @param failureMode The Failure mode for the replicas * * @throws Exception */ - public void customSetup(ByteArray key) throws Exception { + public List customSetup(ByteArray key, FAILURE_MODE failureMode) throws Exception { cluster = getThreeNodeCluster(); storeDef = getStoreDef(STORE_NAME, @@ -196,14 +215,24 @@ public void customSetup(ByteArray key) throws Exception { failureStore); failureStore.setFail(true); - // Get the first replica node for the given key - // This will act as the sleepy failing node - Node failingNode = strategy.routeRequest(key.get()).get(1); - FAILED_NODE_ID = failingNode.getId(); + List failingNodeIdList = Lists.newArrayList(); + List replicaList = strategy.routeRequest(key.get()); + + switch(failureMode) { + case FAIL_FIRST_REPLICA_NODE: + failingNodeIdList.add(replicaList.get(1).getId()); + break; + + case FAIL_ALL_REPLICAS: + for(int nodeId = 1; nodeId < replicaList.size(); nodeId++) { + failingNodeIdList.add(nodeId); + } + break; + } subStores.clear(); for(int i = 0; i < NUM_NODES_TOTAL; i++) { - if(i == FAILED_NODE_ID) { + if(failingNodeIdList.contains(i)) { subStores.put(i, sleepyFailureStore); } else { subStores.put(i, loggingStore); @@ -262,6 +291,7 @@ public void customSetup(ByteArray key) throws Exception { cluster, storeDef, failureDetector); + return failingNodeIdList; } @After @@ -282,15 +312,15 @@ public void tearDown() throws Exception { * @param failedKeys Set of keys that the put should've failed for * @return Set of slop keys based on the failed keys and the FAILED_NODE_ID */ - private Set makeSlopKeys(Set failedKeys) { + private Set makeSlopKeys(ByteArray failedKey, List failingNodeIdList) { Set slopKeys = Sets.newHashSet(); - for(ByteArray failedKey: failedKeys) { + for(int failingNodeId: failingNodeIdList) { byte[] opCode = new byte[] { Slop.Operation.PUT.getOpCode() }; byte[] spacer = new byte[] { (byte) 0 }; byte[] storeNameBytes = ByteUtils.getBytes(STORE_NAME, "UTF-8"); byte[] nodeIdBytes = new byte[ByteUtils.SIZE_OF_INT]; - ByteUtils.writeInt(nodeIdBytes, FAILED_NODE_ID, 0); + ByteUtils.writeInt(nodeIdBytes, failingNodeId, 0); ByteArray slopKey = new ByteArray(ByteUtils.cat(opCode, spacer, storeNameBytes, @@ -303,6 +333,25 @@ private Set makeSlopKeys(Set failedKeys) { return slopKeys; } + /** + * A function to fetch all the registered slops + * + * @param slopKeys Keys for the registered slops in the slop store + * @return Set of all the registered Slops + */ + public Set getAllSlops(Iterable slopKeys) { + Set registeredSlops = Sets.newHashSet(); + for(Store slopStore: slopStores.values()) { + Map>> res = slopStore.getAll(slopKeys, null); + for(Map.Entry>> entry: res.entrySet()) { + Slop slop = entry.getValue().get(0).getValue(); + registeredSlops.add(slop); + logger.info(slop); + } + } + return registeredSlops; + } + /** * Test to ensure that when an asynchronous put completes (with a failure) * after PerformParallelPut has finished processing the responses and before @@ -318,6 +367,7 @@ public void testSlopOnDelayedFailingAsyncPut_2_1_1() { String val = "xyz"; Versioned versionedVal = new Versioned(val.getBytes()); ByteArray keyByteArray = new ByteArray(key.getBytes()); + List failingNodeIdList = null; // Set the correct replication config REPLICATION_FACTOR = 2; @@ -327,8 +377,9 @@ public void testSlopOnDelayedFailingAsyncPut_2_1_1() { R_WRITES = 1; try { - customSetup(keyByteArray); + failingNodeIdList = customSetup(keyByteArray); } catch(Exception e) { + logger.info(e.getMessage()); fail("Error in setup."); } @@ -337,17 +388,8 @@ public void testSlopOnDelayedFailingAsyncPut_2_1_1() { // Check the slop stores Set failedKeys = Sets.newHashSet(); failedKeys.add(keyByteArray); - Set slopKeys = makeSlopKeys(failedKeys); - - Set registeredSlops = Sets.newHashSet(); - for(Store slopStore: slopStores.values()) { - Map>> res = slopStore.getAll(slopKeys, null); - for(Map.Entry>> entry: res.entrySet()) { - Slop slop = entry.getValue().get(0).getValue(); - registeredSlops.add(slop); - logger.info(slop); - } - } + Set slopKeys = makeSlopKeys(keyByteArray, failingNodeIdList); + Set registeredSlops = getAllSlops(slopKeys); if(registeredSlops.size() == 0) { fail("Should have seen some slops. But could not find any."); @@ -371,6 +413,7 @@ public void testSlopOnDelayedFailingAsyncPut_3_2_2() { String val = "xyz"; Versioned versionedVal = new Versioned(val.getBytes()); ByteArray keyByteArray = new ByteArray(key.getBytes()); + List failingNodeIdList = null; // Set the correct replication config REPLICATION_FACTOR = 3; @@ -380,8 +423,9 @@ public void testSlopOnDelayedFailingAsyncPut_3_2_2() { R_WRITES = 2; try { - customSetup(keyByteArray); + failingNodeIdList = customSetup(keyByteArray); } catch(Exception e) { + logger.info(e.getMessage()); fail("Error in setup."); } @@ -390,17 +434,8 @@ public void testSlopOnDelayedFailingAsyncPut_3_2_2() { // Check the slop stores Set failedKeys = Sets.newHashSet(); failedKeys.add(keyByteArray); - Set slopKeys = makeSlopKeys(failedKeys); - - Set registeredSlops = Sets.newHashSet(); - for(Store slopStore: slopStores.values()) { - Map>> res = slopStore.getAll(slopKeys, null); - for(Map.Entry>> entry: res.entrySet()) { - Slop slop = entry.getValue().get(0).getValue(); - registeredSlops.add(slop); - logger.info(slop); - } - } + Set slopKeys = makeSlopKeys(keyByteArray, failingNodeIdList); + Set registeredSlops = getAllSlops(slopKeys); if(registeredSlops.size() == 0) { fail("Should have seen some slops. But could not find any."); @@ -423,6 +458,7 @@ public void testSlopViaSerialHint_2_1_1() { String val = "xyz"; Versioned versionedVal = new Versioned(val.getBytes()); ByteArray keyByteArray = new ByteArray(key.getBytes()); + List failingNodeIdList = null; // Set the correct replication config REPLICATION_FACTOR = 2; @@ -432,8 +468,9 @@ public void testSlopViaSerialHint_2_1_1() { R_WRITES = 1; try { - customSetup(keyByteArray); + failingNodeIdList = customSetup(keyByteArray); } catch(Exception e) { + logger.info(e.getMessage()); fail("Error in setup."); } @@ -453,17 +490,8 @@ public void testSlopViaSerialHint_2_1_1() { // Check the slop stores Set failedKeys = Sets.newHashSet(); failedKeys.add(keyByteArray); - Set slopKeys = makeSlopKeys(failedKeys); - - Set registeredSlops = Sets.newHashSet(); - for(Store slopStore: slopStores.values()) { - Map>> res = slopStore.getAll(slopKeys, null); - for(Map.Entry>> entry: res.entrySet()) { - Slop slop = entry.getValue().get(0).getValue(); - registeredSlops.add(slop); - logger.info(slop); - } - } + Set slopKeys = makeSlopKeys(keyByteArray, failingNodeIdList); + Set registeredSlops = getAllSlops(slopKeys); if(registeredSlops.size() == 0) { fail("Should have seen some slops. But could not find any."); @@ -486,6 +514,7 @@ public void testSlopViaSerialHint_3_2_2() { String val = "xyz"; Versioned versionedVal = new Versioned(val.getBytes()); ByteArray keyByteArray = new ByteArray(key.getBytes()); + List failingNodeIdList = null; // Set the correct replication config REPLICATION_FACTOR = 3; @@ -495,8 +524,9 @@ public void testSlopViaSerialHint_3_2_2() { R_WRITES = 2; try { - customSetup(keyByteArray); + failingNodeIdList = customSetup(keyByteArray); } catch(Exception e) { + logger.info(e.getMessage()); fail("Error in setup."); } @@ -516,17 +546,8 @@ public void testSlopViaSerialHint_3_2_2() { // Check the slop stores Set failedKeys = Sets.newHashSet(); failedKeys.add(keyByteArray); - Set slopKeys = makeSlopKeys(failedKeys); - - Set registeredSlops = Sets.newHashSet(); - for(Store slopStore: slopStores.values()) { - Map>> res = slopStore.getAll(slopKeys, null); - for(Map.Entry>> entry: res.entrySet()) { - Slop slop = entry.getValue().get(0).getValue(); - registeredSlops.add(slop); - logger.info(slop); - } - } + Set slopKeys = makeSlopKeys(keyByteArray, failingNodeIdList); + Set registeredSlops = getAllSlops(slopKeys); if(registeredSlops.size() == 0) { fail("Should have seen some slops. But could not find any."); @@ -535,6 +556,107 @@ public void testSlopViaSerialHint_3_2_2() { } } + /** + * Test to do a put with a 3-2-2 config such that both the replica nodes do + * not respond at all. This test is to make sure that the main thread + * returns with an error and that no slops are registered. + */ + @Test + public void testNoSlopsOnAllReplicaFailures() { + + String key = "a"; + String val = "xyz"; + final Versioned versionedVal = new Versioned(val.getBytes()); + final ByteArray keyByteArray = new ByteArray(key.getBytes()); + List failingNodeIdList = null; + + // Set the correct replication config + REPLICATION_FACTOR = 3; + R_READS = 2; + R_WRITES = 2; + + // Large sleep time for the replica nodes + sleepBeforeFailingInMs = 10000; + + // 0 artificial delay for the put pipeline + delayBeforeHintedHandoff = 0; + + try { + failingNodeIdList = customSetup(keyByteArray, FAILURE_MODE.FAIL_ALL_REPLICAS); + } catch(Exception e) { + logger.info(e.getMessage()); + fail("Error in setup."); + } + + PerformAsyncPut asyncPutThread = new PerformAsyncPut(this.store, keyByteArray, versionedVal); + Executors.newFixedThreadPool(1).submit(asyncPutThread); + + // Sleep for the routing timeout with some headroom + try { + logger.info("Sleeping for " + (routingTimeoutInMs + 2000) / 1000 + + " seconds to wait for the put to finish"); + Thread.sleep(routingTimeoutInMs + 2000); + + if(!asyncPutThread.isDone) { + fail("The main thread for put did not finish."); + } + } catch(Exception e) { + fail("Unknown error while doing a put: " + e); + } + + // Check the slop stores + Set failedKeys = Sets.newHashSet(); + failedKeys.add(keyByteArray); + Set slopKeys = makeSlopKeys(keyByteArray, failingNodeIdList); + Set registeredSlops = getAllSlops(slopKeys); + + if(registeredSlops.size() != 0) { + fail("Should not have seen any slops."); + } + } + + /** + * A runnable class to do a Voldemort Put operation. This becomes important + * in the scenario that the put operation might hang / deadlock. + * + */ + private class PerformAsyncPut implements Runnable { + + private Versioned versionedVal = null; + private ByteArray keyByteArray = null; + private RoutedStore asyncPutStore = null; + private boolean isDone = false; + + public PerformAsyncPut(RoutedStore asyncPutStore, + ByteArray keyByteArray, + Versioned versionedVal) { + this.asyncPutStore = asyncPutStore; + this.keyByteArray = keyByteArray; + this.versionedVal = versionedVal; + } + + @Override + public void run() { + try { + asyncPutStore.put(keyByteArray, versionedVal, null); + fail("A put with required writes 2 should've failed for this setup"); + } catch(Exception ve) { + // This is expected. Nothing to do. + logger.info("Error occured as expected : " + ve.getMessage()); + } + markAsDone(true); + } + + @SuppressWarnings("unused") + public boolean isDone() { + return isDone; + } + + public void markAsDone(boolean isDone) { + this.isDone = isDone; + } + } + /** * An action within a pipeline which sleeps for the specified time duration. *