From ceb7de8bfe09f287bb60975723d8079e301c0991 Mon Sep 17 00:00:00 2001 From: Zhongjie Wu Date: Wed, 3 Apr 2013 15:35:57 -0700 Subject: [PATCH 01/10] make run-class.sh working in Mac per ctasada https://github.com/voldemort/voldemort/issues/132 --- bin/run-class.sh | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/bin/run-class.sh b/bin/run-class.sh index 465637dd4a..5e7f801429 100755 --- a/bin/run-class.sh +++ b/bin/run-class.sh @@ -21,8 +21,14 @@ if [ $# -lt 1 ]; then exit 1 fi -script_path=$(readlink -f "$0") -script_dir=`dirname "$script_path"` +if [ $(uname) == 'Darwin' ]; then + pushd `dirname $0` > /dev/null + script_dir=$(pwd) + popd > /dev/null +else + script_path=$(readlink -f "$0") + script_dir=`dirname "$script_path"` +fi base_dir=`dirname "$script_dir"` From e9f07768d243de96574b7d31d4409ee8b9d17ee0 Mon Sep 17 00:00:00 2001 From: Chinmay Soman Date: Wed, 3 Apr 2013 17:19:02 -0700 Subject: [PATCH 02/10] Added unit tests to ensure that slops are registered for different asynchronous put operation failures --- .../store/routed/PipelineRoutedStore.java | 10 +- .../voldemort/store/SleepyForceFailStore.java | 60 ++ .../routed/HintedHandoffFailureTest.java | 535 ++++++++++++++++++ 3 files changed, 600 insertions(+), 5 deletions(-) create mode 100644 test/common/voldemort/store/SleepyForceFailStore.java create mode 100644 test/unit/voldemort/store/routed/HintedHandoffFailureTest.java diff --git a/src/java/voldemort/store/routed/PipelineRoutedStore.java b/src/java/voldemort/store/routed/PipelineRoutedStore.java index 11b794aab5..d0181c6dc4 100644 --- a/src/java/voldemort/store/routed/PipelineRoutedStore.java +++ b/src/java/voldemort/store/routed/PipelineRoutedStore.java @@ -29,11 +29,11 @@ import voldemort.cluster.failuredetector.FailureDetector; import voldemort.common.VoldemortOpCode; import voldemort.routing.RoutingStrategyType; +import voldemort.store.CompositeVoldemortRequest; import voldemort.store.Store; import voldemort.store.StoreDefinition; import voldemort.store.StoreRequest; import voldemort.store.StoreUtils; -import voldemort.store.CompositeVoldemortRequest; import voldemort.store.nonblockingstore.NonblockingStore; import voldemort.store.routed.Pipeline.Event; import voldemort.store.routed.Pipeline.Operation; @@ -75,10 +75,10 @@ */ public class PipelineRoutedStore extends RoutedStore { - private final Map nonblockingStores; - private final Map> slopStores; - private final Map nonblockingSlopStores; - private final HintedHandoffStrategy handoffStrategy; + protected final Map nonblockingStores; + protected final Map> slopStores; + protected final Map nonblockingSlopStores; + protected final HintedHandoffStrategy handoffStrategy; private Zone clientZone; private boolean zoneRoutingEnabled; private PipelineRoutedStats stats; diff --git a/test/common/voldemort/store/SleepyForceFailStore.java b/test/common/voldemort/store/SleepyForceFailStore.java new file mode 100644 index 0000000000..81e9955bbf --- /dev/null +++ b/test/common/voldemort/store/SleepyForceFailStore.java @@ -0,0 +1,60 @@ +package voldemort.store; + +import java.util.List; +import java.util.Map; + +import voldemort.VoldemortException; +import voldemort.versioning.Version; +import voldemort.versioning.Versioned; + +public class SleepyForceFailStore extends ForceFailStore { + + private long sleepTimeMs; + + public SleepyForceFailStore(Store innerStore, VoldemortException e, long sleepTimeInMs) { + super(innerStore, e); + this.sleepTimeMs = sleepTimeInMs; + } + + @Override + public boolean delete(K key, Version version) throws VoldemortException { + try { + Thread.sleep(sleepTimeMs); + return super.delete(key, version); + } catch(InterruptedException e) { + throw new VoldemortException(e); + } + } + + @Override + public List> get(K key, T transforms) throws VoldemortException { + try { + Thread.sleep(sleepTimeMs); + return super.get(key, transforms); + } catch(InterruptedException e) { + throw new VoldemortException(e); + } + } + + @Override + public Map>> getAll(Iterable keys, Map transforms) + throws VoldemortException { + try { + Thread.sleep(sleepTimeMs); + return super.getAll(keys, transforms); + } catch(InterruptedException e) { + throw new VoldemortException(e); + } + } + + @Override + public void put(K key, Versioned value, T transforms) throws VoldemortException { + try { + Thread.sleep(sleepTimeMs); + super.put(key, value, transforms); + } catch(InterruptedException e) { + throw new VoldemortException(e); + } + } + +} diff --git a/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java b/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java new file mode 100644 index 0000000000..bbeb4927e7 --- /dev/null +++ b/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java @@ -0,0 +1,535 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package voldemort.store.routed; + +import static org.junit.Assert.fail; +import static voldemort.VoldemortTestConstants.getTwoNodeCluster; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import voldemort.ServerTestUtils; +import voldemort.TestUtils; +import voldemort.VoldemortException; +import voldemort.client.RoutingTier; +import voldemort.client.TimeoutConfig; +import voldemort.cluster.Cluster; +import voldemort.cluster.Node; +import voldemort.cluster.Zone; +import voldemort.cluster.failuredetector.FailureDetector; +import voldemort.cluster.failuredetector.FailureDetectorConfig; +import voldemort.cluster.failuredetector.FailureDetectorUtils; +import voldemort.cluster.failuredetector.MutableStoreVerifier; +import voldemort.cluster.failuredetector.ThresholdFailureDetector; +import voldemort.routing.RoutingStrategyFactory; +import voldemort.routing.RoutingStrategyType; +import voldemort.serialization.SerializerDefinition; +import voldemort.server.StoreRepository; +import voldemort.server.scheduler.slop.StreamingSlopPusherJob; +import voldemort.server.storage.ScanPermitWrapper; +import voldemort.store.SleepyForceFailStore; +import voldemort.store.StorageEngine; +import voldemort.store.Store; +import voldemort.store.StoreDefinition; +import voldemort.store.StoreDefinitionBuilder; +import voldemort.store.UnreachableStoreException; +import voldemort.store.logging.LoggingStore; +import voldemort.store.memory.InMemoryStorageConfiguration; +import voldemort.store.memory.InMemoryStorageEngine; +import voldemort.store.metadata.MetadataStore; +import voldemort.store.nonblockingstore.NonblockingStore; +import voldemort.store.routed.Pipeline.Event; +import voldemort.store.routed.Pipeline.Operation; +import voldemort.store.routed.action.AbstractAction; +import voldemort.store.routed.action.AbstractConfigureNodes; +import voldemort.store.routed.action.ConfigureNodesDefault; +import voldemort.store.routed.action.IncrementClock; +import voldemort.store.routed.action.PerformParallelPutRequests; +import voldemort.store.routed.action.PerformPutHintedHandoff; +import voldemort.store.routed.action.PerformSerialPutRequests; +import voldemort.store.slop.HintedHandoff; +import voldemort.store.slop.Slop; +import voldemort.store.slop.SlopStorageEngine; +import voldemort.store.slop.strategy.HintedHandoffStrategyType; +import voldemort.utils.ByteArray; +import voldemort.utils.ByteUtils; +import voldemort.versioning.Versioned; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * Test class to ensure that slops are registered for different asynchronous put + * failures. + */ +public class HintedHandoffFailureTest { + + private final static String STORE_NAME = "test"; + private final static String SLOP_STORE_NAME = "slop"; + private final static int REPLICATION_FACTOR = 2; + private final static int P_READS = 1; + private final static int R_READS = 1; + private final static int P_WRITES = 2; + private final static int R_WRITES = 1; + private static final int NUM_THREADS = 3; + private static final int NUM_NODES_TOTAL = 2; + private static final int FAILED_NODE_ID = 0; + + private Cluster cluster; + private FailureDetector failureDetector; + private StoreDefinition storeDef; + private ExecutorService routedStoreThreadPool; + private RoutedStoreFactory routedStoreFactory; + private RoutedStore store; + + private final static long routingTimeoutInMs = 1000; + private final static long sleepBeforeFailingInMs = 2000; + private static long delayBeforeHintedHandoff = 3000; + + private final Map> subStores = new ConcurrentHashMap>(); + private final Map> slopStores = new ConcurrentHashMap>(); + private final List slopPusherJobs = Lists.newLinkedList(); + + private StoreDefinition getStoreDef(String storeName, + int replicationFactor, + int preads, + int rreads, + int pwrites, + int rwrites, + String strategyType) { + SerializerDefinition serDef = new SerializerDefinition("string"); + return new StoreDefinitionBuilder().setName(storeName) + .setType(InMemoryStorageConfiguration.TYPE_NAME) + .setKeySerializer(serDef) + .setValueSerializer(serDef) + .setRoutingPolicy(RoutingTier.SERVER) + .setRoutingStrategyType(strategyType) + .setReplicationFactor(replicationFactor) + .setPreferredReads(preads) + .setRequiredReads(rreads) + .setPreferredWrites(pwrites) + .setRequiredWrites(rwrites) + .setHintedHandoffStrategy(HintedHandoffStrategyType.PROXIMITY_STRATEGY) + .build(); + } + + private void setFailureDetector(Map> subStores) + throws Exception { + if(failureDetector != null) + failureDetector.destroy(); + + // Using Threshold FD + FailureDetectorConfig failureDetectorConfig = new FailureDetectorConfig(); + failureDetectorConfig.setImplementationClassName(ThresholdFailureDetector.class.getName()); + failureDetectorConfig.setCluster(cluster); + failureDetectorConfig.setStoreVerifier(MutableStoreVerifier.create(subStores)); + + failureDetector = FailureDetectorUtils.create(failureDetectorConfig, false); + } + + /** + * Setup a cluster with 2 nodes, with the following characteristics: + * + * - Node 0: Sleepy force failing store (will throw an exception after a + * delay) + * + * - Node 1: Standard In-memory store (wrapped by Logging store) + * + * - In memory slop stores + * + * - A custom Put pipeline with a delay between parallel puts and doing the + * handoff + * + * @throws Exception + */ + @Before + public void setUp() throws Exception { + + cluster = getTwoNodeCluster(); + storeDef = getStoreDef(STORE_NAME, + REPLICATION_FACTOR, + P_READS, + R_READS, + P_WRITES, + R_WRITES, + RoutingStrategyType.CONSISTENT_STRATEGY); + + VoldemortException e = new UnreachableStoreException("Node down"); + + InMemoryStorageEngine inMemoryStorageEngine = new InMemoryStorageEngine(STORE_NAME); + LoggingStore loggingStore = new LoggingStore(inMemoryStorageEngine); + + // Set node 1 as a regular store + subStores.put(1, loggingStore); + + // Set node 0 as the force failing store + SleepyForceFailStore failureStore = new SleepyForceFailStore(loggingStore, + e, + sleepBeforeFailingInMs); + failureStore.setFail(true); + subStores.put(0, failureStore); + + setFailureDetector(subStores); + + routedStoreThreadPool = Executors.newFixedThreadPool(NUM_THREADS); + routedStoreFactory = new RoutedStoreFactory(true, + routedStoreThreadPool, + new TimeoutConfig(routingTimeoutInMs, false)); + new RoutingStrategyFactory().updateRoutingStrategy(storeDef, cluster); + + Map nonblockingSlopStores = Maps.newHashMap(); + for(Node node: cluster.getNodes()) { + int nodeId = node.getId(); + StoreRepository storeRepo = new StoreRepository(); + storeRepo.addLocalStore(subStores.get(nodeId)); + + for(int i = 0; i < NUM_NODES_TOTAL; i++) + storeRepo.addNodeStore(i, subStores.get(i)); + + SlopStorageEngine slopStorageEngine = new SlopStorageEngine(new InMemoryStorageEngine(SLOP_STORE_NAME), + cluster); + StorageEngine storageEngine = slopStorageEngine.asSlopStore(); + storeRepo.setSlopStore(slopStorageEngine); + nonblockingSlopStores.put(nodeId, + routedStoreFactory.toNonblockingStore(slopStorageEngine)); + slopStores.put(nodeId, storageEngine); + + MetadataStore metadataStore = ServerTestUtils.createMetadataStore(cluster, + Lists.newArrayList(storeDef)); + StreamingSlopPusherJob pusher = new StreamingSlopPusherJob(storeRepo, + metadataStore, + failureDetector, + ServerTestUtils.createServerConfigWithDefs(false, + nodeId, + TestUtils.createTempDir() + .getAbsolutePath(), + cluster, + Lists.newArrayList(storeDef), + new Properties()), + new ScanPermitWrapper(1)); + slopPusherJobs.add(pusher); + } + + Map nonblockingStores = Maps.newHashMap(); + for(Map.Entry> entry: subStores.entrySet()) + nonblockingStores.put(entry.getKey(), + routedStoreFactory.toNonblockingStore(entry.getValue())); + + store = new DelayedPutPipelineRoutedStore(subStores, + nonblockingStores, + slopStores, + nonblockingSlopStores, + cluster, + storeDef, + failureDetector); + } + + @After + public void tearDown() throws Exception { + if(failureDetector != null) { + failureDetector.destroy(); + } + + if(routedStoreThreadPool != null) { + routedStoreThreadPool.shutdown(); + } + } + + /** + * Function to create a set of slop keys for the FAILED_NODE_ID for PUT + * operation + * + * @param failedKeys Set of keys that the put should've failed for + * @return Set of slop keys based on the failed keys and the FAILED_NODE_ID + */ + private Set makeSlopKeys(Set failedKeys) { + Set slopKeys = Sets.newHashSet(); + + for(ByteArray failedKey: failedKeys) { + byte[] opCode = new byte[] { Slop.Operation.PUT.getOpCode() }; + byte[] spacer = new byte[] { (byte) 0 }; + byte[] storeName = ByteUtils.getBytes(STORE_NAME, "UTF-8"); + byte[] nodeIdBytes = new byte[ByteUtils.SIZE_OF_INT]; + ByteUtils.writeInt(nodeIdBytes, FAILED_NODE_ID, 0); + ByteArray slopKey = new ByteArray(ByteUtils.cat(opCode, + spacer, + storeName, + spacer, + nodeIdBytes, + spacer, + failedKey.get())); + slopKeys.add(slopKey); + } + return slopKeys; + } + + /** + * Test to ensure that when an asynchronous put completes (with a failure) + * after PerformParallelPut has finished processing the responses and before + * the hinted handoff actually begins, a slop is still registered for the + * same. + */ + @Test + public void testSlopOnDelayedFailingAsyncPut() { + + // The following key will be routed to node 1 (pseudo master). We've set + // node 0 to be the sleepy failing node + String key = "a"; + String val = "xyz"; + Versioned versionedVal = new Versioned(val.getBytes()); + ByteArray keyByteArray = new ByteArray(key.getBytes()); + this.store.put(keyByteArray, versionedVal, null); + + // Check the slop stores + Set failedKeys = Sets.newHashSet(); + failedKeys.add(keyByteArray); + Set slopKeys = makeSlopKeys(failedKeys); + + Set registeredSlops = Sets.newHashSet(); + for(Store slopStore: slopStores.values()) { + Map>> res = slopStore.getAll(slopKeys, null); + for(Map.Entry>> entry: res.entrySet()) { + Slop slop = entry.getValue().get(0).getValue(); + registeredSlops.add(slop); + System.out.println(slop); + } + } + + if(registeredSlops.size() == 0) { + fail("Should have seen some slops. But could not find any."); + } + } + + /** + * Test to ensure that when an asynchronous put completes (with a failure) + * after the pipeline completes, a slop is still registered (via a serial + * hint). + */ + @Test + public void testSlopViaSerialHint() { + + // The following key will be routed to node 1 (pseudo master). We've set + // node 0 to be the sleepy failing node + String key = "a"; + String val = "xyz"; + Versioned versionedVal = new Versioned(val.getBytes()); + ByteArray keyByteArray = new ByteArray(key.getBytes()); + + // We remove the delay in the pipeline so that the pipeline will finish + // before the failing async put returns. At this point it should do a + // serial hint. + delayBeforeHintedHandoff = 0; + + this.store.put(keyByteArray, versionedVal, null); + + // Give enough time for the serial hint to work. + try { + System.out.println("Sleeping for 5 seconds to wait for the serial hint to finish"); + Thread.sleep(5000); + } catch(Exception e) {} + + // Check the slop stores + Set failedKeys = Sets.newHashSet(); + failedKeys.add(keyByteArray); + Set slopKeys = makeSlopKeys(failedKeys); + + Set registeredSlops = Sets.newHashSet(); + for(Store slopStore: slopStores.values()) { + Map>> res = slopStore.getAll(slopKeys, null); + for(Map.Entry>> entry: res.entrySet()) { + Slop slop = entry.getValue().get(0).getValue(); + registeredSlops.add(slop); + System.out.println(slop); + } + } + + if(registeredSlops.size() == 0) { + fail("Should have seen some slops. But could not find any."); + } + } + + /** + * An action within a pipeline which sleeps for the specified time duration. + * + */ + private class DelayAction extends AbstractAction { + + private long sleepTimeInMs; + + protected DelayAction(PutPipelineData pipelineData, Event completeEvent, long sleepTimeInMs) { + super(pipelineData, completeEvent); + this.sleepTimeInMs = sleepTimeInMs; + } + + @Override + public void execute(Pipeline pipeline) { + try { + System.out.println("Delayed pipeline action now sleeping for : " + sleepTimeInMs); + Thread.sleep(sleepTimeInMs); + System.out.println("Now moving on to doing actual hinted handoff. Current time = " + + new Date(System.currentTimeMillis())); + } catch(Exception e) {} + pipeline.addEvent(completeEvent); + } + + } + + /** + * A custom implementation of the PipelineRoutedStore with an extra (sleep) + * action between PerformParallelPutRequests stage and + * PerformPutHintedHandoff stage + * + */ + private class DelayedPutPipelineRoutedStore extends PipelineRoutedStore { + + public DelayedPutPipelineRoutedStore(Map> innerStores, + Map nonblockingStores, + Map> slopStores, + Map nonblockingSlopStores, + Cluster cluster, + StoreDefinition storeDef, + FailureDetector failureDetector) { + super(storeDef.getName(), + innerStores, + nonblockingStores, + slopStores, + nonblockingSlopStores, + cluster, + storeDef, + false, + Zone.DEFAULT_ZONE_ID, + new TimeoutConfig(routingTimeoutInMs, false), + failureDetector, + false, + 0); + + } + + /** + * A custom put implementation. Here we add an extra action to the + * pipeline to sleep before doing the actual handoff + */ + @Override + public void put(ByteArray key, Versioned versioned, byte[] transforms) + throws VoldemortException { + PutPipelineData pipelineData = new PutPipelineData(); + pipelineData.setZonesRequired(null); + pipelineData.setStartTimeNs(System.nanoTime()); + pipelineData.setStoreName(getName()); + + long putOpTimeoutInMs = routingTimeoutInMs; + Pipeline pipeline = new Pipeline(Operation.PUT, putOpTimeoutInMs, TimeUnit.MILLISECONDS); + pipeline.setEnableHintedHandoff(true); + HintedHandoff hintedHandoff = null; + + AbstractConfigureNodes configureNodes = new ConfigureNodesDefault(pipelineData, + Event.CONFIGURED, + failureDetector, + storeDef.getRequiredWrites(), + routingStrategy, + key); + + hintedHandoff = new HintedHandoff(failureDetector, + slopStores, + nonblockingSlopStores, + handoffStrategy, + pipelineData.getFailedNodes(), + putOpTimeoutInMs); + + pipeline.addEventAction(Event.STARTED, configureNodes); + + pipeline.addEventAction(Event.CONFIGURED, + new PerformSerialPutRequests(pipelineData, + isHintedHandoffEnabled() ? Event.RESPONSES_RECEIVED + : Event.COMPLETED, + key, + transforms, + failureDetector, + innerStores, + storeDef.getRequiredWrites(), + versioned, + time, + Event.MASTER_DETERMINED)); + pipeline.addEventAction(Event.MASTER_DETERMINED, + new PerformParallelPutRequests(pipelineData, + Event.RESPONSES_RECEIVED, + key, + transforms, + failureDetector, + storeDef.getPreferredWrites(), + storeDef.getRequiredWrites(), + putOpTimeoutInMs, + nonblockingStores, + hintedHandoff)); + + pipeline.addEventAction(Event.ABORTED, new PerformPutHintedHandoff(pipelineData, + Event.ERROR, + key, + versioned, + transforms, + hintedHandoff, + time)); + + // We use INSUFFICIENT_SUCCESSES as the next event (since there is + // no specific delay event) + pipeline.addEventAction(Event.RESPONSES_RECEIVED, + new DelayAction(pipelineData, + Event.INSUFFICIENT_SUCCESSES, + delayBeforeHintedHandoff)); + + pipeline.addEventAction(Event.INSUFFICIENT_SUCCESSES, + new PerformPutHintedHandoff(pipelineData, + Event.HANDOFF_FINISHED, + key, + versioned, + transforms, + hintedHandoff, + time)); + pipeline.addEventAction(Event.HANDOFF_FINISHED, new IncrementClock(pipelineData, + Event.COMPLETED, + versioned, + time)); + + pipeline.addEvent(Event.STARTED); + if(logger.isDebugEnabled()) { + logger.debug("Operation " + pipeline.getOperation().getSimpleName() + " Key " + + ByteUtils.toHexString(key.get())); + } + try { + pipeline.execute(); + } catch(VoldemortException e) { + throw e; + } + + if(pipelineData.getFatalError() != null) + throw pipelineData.getFatalError(); + } + } + +} From 7aa96cc6ecbefcc9fe02cbb6e76326ef02fe1169 Mon Sep 17 00:00:00 2001 From: Chinmay Soman Date: Thu, 4 Apr 2013 18:52:58 -0700 Subject: [PATCH 03/10] Bug fixes to HintedHandoffFailureTest and added more tests to handle 3-2-2 config. Removed SleepyForceFailStore --- .../voldemort/store/SleepyForceFailStore.java | 60 ----- .../routed/HintedHandoffFailureTest.java | 251 ++++++++++++++---- 2 files changed, 206 insertions(+), 105 deletions(-) delete mode 100644 test/common/voldemort/store/SleepyForceFailStore.java diff --git a/test/common/voldemort/store/SleepyForceFailStore.java b/test/common/voldemort/store/SleepyForceFailStore.java deleted file mode 100644 index 81e9955bbf..0000000000 --- a/test/common/voldemort/store/SleepyForceFailStore.java +++ /dev/null @@ -1,60 +0,0 @@ -package voldemort.store; - -import java.util.List; -import java.util.Map; - -import voldemort.VoldemortException; -import voldemort.versioning.Version; -import voldemort.versioning.Versioned; - -public class SleepyForceFailStore extends ForceFailStore { - - private long sleepTimeMs; - - public SleepyForceFailStore(Store innerStore, VoldemortException e, long sleepTimeInMs) { - super(innerStore, e); - this.sleepTimeMs = sleepTimeInMs; - } - - @Override - public boolean delete(K key, Version version) throws VoldemortException { - try { - Thread.sleep(sleepTimeMs); - return super.delete(key, version); - } catch(InterruptedException e) { - throw new VoldemortException(e); - } - } - - @Override - public List> get(K key, T transforms) throws VoldemortException { - try { - Thread.sleep(sleepTimeMs); - return super.get(key, transforms); - } catch(InterruptedException e) { - throw new VoldemortException(e); - } - } - - @Override - public Map>> getAll(Iterable keys, Map transforms) - throws VoldemortException { - try { - Thread.sleep(sleepTimeMs); - return super.getAll(keys, transforms); - } catch(InterruptedException e) { - throw new VoldemortException(e); - } - } - - @Override - public void put(K key, Versioned value, T transforms) throws VoldemortException { - try { - Thread.sleep(sleepTimeMs); - super.put(key, value, transforms); - } catch(InterruptedException e) { - throw new VoldemortException(e); - } - } - -} diff --git a/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java b/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java index bbeb4927e7..bc3458e533 100644 --- a/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java +++ b/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java @@ -17,7 +17,7 @@ package voldemort.store.routed; import static org.junit.Assert.fail; -import static voldemort.VoldemortTestConstants.getTwoNodeCluster; +import static voldemort.VoldemortTestConstants.getThreeNodeCluster; import java.util.Date; import java.util.List; @@ -29,8 +29,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.log4j.Logger; import org.junit.After; -import org.junit.Before; import org.junit.Test; import voldemort.ServerTestUtils; @@ -46,13 +46,15 @@ import voldemort.cluster.failuredetector.FailureDetectorUtils; import voldemort.cluster.failuredetector.MutableStoreVerifier; import voldemort.cluster.failuredetector.ThresholdFailureDetector; +import voldemort.routing.RoutingStrategy; import voldemort.routing.RoutingStrategyFactory; import voldemort.routing.RoutingStrategyType; import voldemort.serialization.SerializerDefinition; import voldemort.server.StoreRepository; import voldemort.server.scheduler.slop.StreamingSlopPusherJob; import voldemort.server.storage.ScanPermitWrapper; -import voldemort.store.SleepyForceFailStore; +import voldemort.store.ForceFailStore; +import voldemort.store.SleepyStore; import voldemort.store.StorageEngine; import voldemort.store.Store; import voldemort.store.StoreDefinition; @@ -90,23 +92,24 @@ */ public class HintedHandoffFailureTest { - private final static String STORE_NAME = "test"; private final static String SLOP_STORE_NAME = "slop"; - private final static int REPLICATION_FACTOR = 2; - private final static int P_READS = 1; - private final static int R_READS = 1; - private final static int P_WRITES = 2; - private final static int R_WRITES = 1; + private static int REPLICATION_FACTOR = 2; + private static int P_READS = 1; + private static int R_READS = 1; + private static int P_WRITES = 1; + private static int R_WRITES = 1; private static final int NUM_THREADS = 3; - private static final int NUM_NODES_TOTAL = 2; - private static final int FAILED_NODE_ID = 0; + private static final int NUM_NODES_TOTAL = 3; + private static int FAILED_NODE_ID = 0; + private final String STORE_NAME = "test"; private Cluster cluster; private FailureDetector failureDetector; private StoreDefinition storeDef; private ExecutorService routedStoreThreadPool; private RoutedStoreFactory routedStoreFactory; private RoutedStore store; + private RoutingStrategy strategy; private final static long routingTimeoutInMs = 1000; private final static long sleepBeforeFailingInMs = 2000; @@ -116,6 +119,8 @@ public class HintedHandoffFailureTest { private final Map> slopStores = new ConcurrentHashMap>(); private final List slopPusherJobs = Lists.newLinkedList(); + private final Logger logger = Logger.getLogger(getClass()); + private StoreDefinition getStoreDef(String storeName, int replicationFactor, int preads, @@ -154,24 +159,23 @@ private void setFailureDetector(Map> s } /** - * Setup a cluster with 2 nodes, with the following characteristics: + * Setup a cluster with 3 nodes, with the following characteristics: * - * - Node 0: Sleepy force failing store (will throw an exception after a - * delay) + * - 1st replica node: Sleepy force failing store (will throw an exception + * after a delay) * - * - Node 1: Standard In-memory store (wrapped by Logging store) + * - Pseudo master and other replicas: Standard In-memory store (wrapped by + * Logging store) * * - In memory slop stores * - * - A custom Put pipeline with a delay between parallel puts and doing the - * handoff + * @param key The ByteArray representation of the key * * @throws Exception */ - @Before - public void setUp() throws Exception { + public void customSetup(ByteArray key) throws Exception { - cluster = getTwoNodeCluster(); + cluster = getThreeNodeCluster(); storeDef = getStoreDef(STORE_NAME, REPLICATION_FACTOR, P_READS, @@ -180,28 +184,37 @@ public void setUp() throws Exception { R_WRITES, RoutingStrategyType.CONSISTENT_STRATEGY); - VoldemortException e = new UnreachableStoreException("Node down"); + strategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDef, cluster); InMemoryStorageEngine inMemoryStorageEngine = new InMemoryStorageEngine(STORE_NAME); LoggingStore loggingStore = new LoggingStore(inMemoryStorageEngine); - // Set node 1 as a regular store - subStores.put(1, loggingStore); - - // Set node 0 as the force failing store - SleepyForceFailStore failureStore = new SleepyForceFailStore(loggingStore, - e, - sleepBeforeFailingInMs); + VoldemortException e = new UnreachableStoreException("Node down"); + ForceFailStore failureStore = new ForceFailStore(loggingStore, + e); + SleepyStore sleepyFailureStore = new SleepyStore(sleepBeforeFailingInMs, + failureStore); failureStore.setFail(true); - subStores.put(0, failureStore); + // Get the first replica node for the given key + // This will act as the sleepy failing node + Node failingNode = strategy.routeRequest(key.get()).get(1); + FAILED_NODE_ID = failingNode.getId(); + + subStores.clear(); + for(int i = 0; i < NUM_NODES_TOTAL; i++) { + if(i == FAILED_NODE_ID) { + subStores.put(i, sleepyFailureStore); + } else { + subStores.put(i, loggingStore); + } + } setFailureDetector(subStores); routedStoreThreadPool = Executors.newFixedThreadPool(NUM_THREADS); routedStoreFactory = new RoutedStoreFactory(true, routedStoreThreadPool, new TimeoutConfig(routingTimeoutInMs, false)); - new RoutingStrategyFactory().updateRoutingStrategy(storeDef, cluster); Map nonblockingSlopStores = Maps.newHashMap(); for(Node node: cluster.getNodes()) { @@ -209,8 +222,9 @@ public void setUp() throws Exception { StoreRepository storeRepo = new StoreRepository(); storeRepo.addLocalStore(subStores.get(nodeId)); - for(int i = 0; i < NUM_NODES_TOTAL; i++) + for(int i = 0; i < NUM_NODES_TOTAL; i++) { storeRepo.addNodeStore(i, subStores.get(i)); + } SlopStorageEngine slopStorageEngine = new SlopStorageEngine(new InMemoryStorageEngine(SLOP_STORE_NAME), cluster); @@ -274,12 +288,12 @@ private Set makeSlopKeys(Set failedKeys) { for(ByteArray failedKey: failedKeys) { byte[] opCode = new byte[] { Slop.Operation.PUT.getOpCode() }; byte[] spacer = new byte[] { (byte) 0 }; - byte[] storeName = ByteUtils.getBytes(STORE_NAME, "UTF-8"); + byte[] storeNameBytes = ByteUtils.getBytes(STORE_NAME, "UTF-8"); byte[] nodeIdBytes = new byte[ByteUtils.SIZE_OF_INT]; ByteUtils.writeInt(nodeIdBytes, FAILED_NODE_ID, 0); ByteArray slopKey = new ByteArray(ByteUtils.cat(opCode, spacer, - storeName, + storeNameBytes, spacer, nodeIdBytes, spacer, @@ -294,16 +308,30 @@ private Set makeSlopKeys(Set failedKeys) { * after PerformParallelPut has finished processing the responses and before * the hinted handoff actually begins, a slop is still registered for the * same. + * + * This is for the 2-1-1 configuration. */ @Test - public void testSlopOnDelayedFailingAsyncPut() { + public void testSlopOnDelayedFailingAsyncPut_2_1_1() { - // The following key will be routed to node 1 (pseudo master). We've set - // node 0 to be the sleepy failing node String key = "a"; String val = "xyz"; Versioned versionedVal = new Versioned(val.getBytes()); ByteArray keyByteArray = new ByteArray(key.getBytes()); + + // Set the correct replication config + REPLICATION_FACTOR = 2; + P_READS = 1; + R_READS = 1; + P_WRITES = 1; + R_WRITES = 1; + + try { + customSetup(keyByteArray); + } catch(Exception e) { + fail("Error in setup."); + } + this.store.put(keyByteArray, versionedVal, null); // Check the slop stores @@ -317,12 +345,67 @@ public void testSlopOnDelayedFailingAsyncPut() { for(Map.Entry>> entry: res.entrySet()) { Slop slop = entry.getValue().get(0).getValue(); registeredSlops.add(slop); - System.out.println(slop); + logger.info(slop); } } if(registeredSlops.size() == 0) { fail("Should have seen some slops. But could not find any."); + } else if(registeredSlops.size() != 1) { + fail("Number of slops registered != 1"); + } + } + + /** + * Test to ensure that when an asynchronous put completes (with a failure) + * after PerformParallelPut has finished processing the responses and before + * the hinted handoff actually begins, a slop is still registered for the + * same. + * + * This is for the 3-2-2 configuration. + */ + @Test + public void testSlopOnDelayedFailingAsyncPut_3_2_2() { + + String key = "a"; + String val = "xyz"; + Versioned versionedVal = new Versioned(val.getBytes()); + ByteArray keyByteArray = new ByteArray(key.getBytes()); + + // Set the correct replication config + REPLICATION_FACTOR = 3; + P_READS = 2; + R_READS = 2; + P_WRITES = 2; + R_WRITES = 2; + + try { + customSetup(keyByteArray); + } catch(Exception e) { + fail("Error in setup."); + } + + this.store.put(keyByteArray, versionedVal, null); + + // Check the slop stores + Set failedKeys = Sets.newHashSet(); + failedKeys.add(keyByteArray); + Set slopKeys = makeSlopKeys(failedKeys); + + Set registeredSlops = Sets.newHashSet(); + for(Store slopStore: slopStores.values()) { + Map>> res = slopStore.getAll(slopKeys, null); + for(Map.Entry>> entry: res.entrySet()) { + Slop slop = entry.getValue().get(0).getValue(); + registeredSlops.add(slop); + logger.info(slop); + } + } + + if(registeredSlops.size() == 0) { + fail("Should have seen some slops. But could not find any."); + } else if(registeredSlops.size() != 1) { + fail("Number of slops registered != 1"); } } @@ -330,17 +413,93 @@ public void testSlopOnDelayedFailingAsyncPut() { * Test to ensure that when an asynchronous put completes (with a failure) * after the pipeline completes, a slop is still registered (via a serial * hint). + * + * This is for the 2-1-1 configuration */ @Test - public void testSlopViaSerialHint() { + public void testSlopViaSerialHint_2_1_1() { - // The following key will be routed to node 1 (pseudo master). We've set - // node 0 to be the sleepy failing node String key = "a"; String val = "xyz"; Versioned versionedVal = new Versioned(val.getBytes()); ByteArray keyByteArray = new ByteArray(key.getBytes()); + // Set the correct replication config + REPLICATION_FACTOR = 2; + P_READS = 1; + R_READS = 1; + P_WRITES = 1; + R_WRITES = 1; + + try { + customSetup(keyByteArray); + } catch(Exception e) { + fail("Error in setup."); + } + + // We remove the delay in the pipeline so that the pipeline will finish + // before the failing async put returns. At this point it should do a + // serial hint. + delayBeforeHintedHandoff = 0; + + this.store.put(keyByteArray, versionedVal, null); + + // Give enough time for the serial hint to work. + try { + logger.info("Sleeping for 5 seconds to wait for the serial hint to finish"); + Thread.sleep(5000); + } catch(Exception e) {} + + // Check the slop stores + Set failedKeys = Sets.newHashSet(); + failedKeys.add(keyByteArray); + Set slopKeys = makeSlopKeys(failedKeys); + + Set registeredSlops = Sets.newHashSet(); + for(Store slopStore: slopStores.values()) { + Map>> res = slopStore.getAll(slopKeys, null); + for(Map.Entry>> entry: res.entrySet()) { + Slop slop = entry.getValue().get(0).getValue(); + registeredSlops.add(slop); + logger.info(slop); + } + } + + if(registeredSlops.size() == 0) { + fail("Should have seen some slops. But could not find any."); + } else if(registeredSlops.size() != 1) { + fail("Number of slops registered != 1"); + } + } + + /** + * Test to ensure that when an asynchronous put completes (with a failure) + * after the pipeline completes, a slop is still registered (via a serial + * hint). + * + * This is for the 3-2-2 configuration + */ + @Test + public void testSlopViaSerialHint_3_2_2() { + + String key = "a"; + String val = "xyz"; + Versioned versionedVal = new Versioned(val.getBytes()); + ByteArray keyByteArray = new ByteArray(key.getBytes()); + + // Set the correct replication config + REPLICATION_FACTOR = 3; + P_READS = 2; + R_READS = 2; + P_WRITES = 2; + R_WRITES = 2; + + try { + customSetup(keyByteArray); + } catch(Exception e) { + fail("Error in setup."); + } + // We remove the delay in the pipeline so that the pipeline will finish // before the failing async put returns. At this point it should do a // serial hint. @@ -350,7 +509,7 @@ public void testSlopViaSerialHint() { // Give enough time for the serial hint to work. try { - System.out.println("Sleeping for 5 seconds to wait for the serial hint to finish"); + logger.info("Sleeping for 5 seconds to wait for the serial hint to finish"); Thread.sleep(5000); } catch(Exception e) {} @@ -365,12 +524,14 @@ public void testSlopViaSerialHint() { for(Map.Entry>> entry: res.entrySet()) { Slop slop = entry.getValue().get(0).getValue(); registeredSlops.add(slop); - System.out.println(slop); + logger.info(slop); } } if(registeredSlops.size() == 0) { fail("Should have seen some slops. But could not find any."); + } else if(registeredSlops.size() != 1) { + fail("Number of slops registered != 1"); } } @@ -390,10 +551,10 @@ protected DelayAction(PutPipelineData pipelineData, Event completeEvent, long sl @Override public void execute(Pipeline pipeline) { try { - System.out.println("Delayed pipeline action now sleeping for : " + sleepTimeInMs); + logger.info("Delayed pipeline action now sleeping for : " + sleepTimeInMs); Thread.sleep(sleepTimeInMs); - System.out.println("Now moving on to doing actual hinted handoff. Current time = " - + new Date(System.currentTimeMillis())); + logger.info("Now moving on to doing actual hinted handoff. Current time = " + + new Date(System.currentTimeMillis())); } catch(Exception e) {} pipeline.addEvent(completeEvent); } From 7234b65bba50c2604f39c69125935ab7836fd645 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Tue, 9 Apr 2013 18:05:00 -0700 Subject: [PATCH 04/10] Tool to forklift data over for store migrations --- .../protocol/admin/StreamingClient.java | 11 +- src/java/voldemort/store/StoreUtils.java | 10 + .../utils/AbstractConsistencyFixer.java | 337 ++++++++++ .../voldemort/utils/ClusterForkLiftTool.java | 574 ++++++++++++++++++ .../voldemort/utils/ConsistencyFixWorker.java | 271 +-------- 5 files changed, 935 insertions(+), 268 deletions(-) create mode 100644 src/java/voldemort/utils/AbstractConsistencyFixer.java create mode 100644 src/java/voldemort/utils/ClusterForkLiftTool.java diff --git a/src/java/voldemort/client/protocol/admin/StreamingClient.java b/src/java/voldemort/client/protocol/admin/StreamingClient.java index 4de4dda835..50430d2c40 100644 --- a/src/java/voldemort/client/protocol/admin/StreamingClient.java +++ b/src/java/voldemort/client/protocol/admin/StreamingClient.java @@ -105,8 +105,8 @@ public class StreamingClient { protected EventThrottler throttler; - AdminClient adminClient; - AdminClientConfig adminClientConfig; + private AdminClient adminClient; + private AdminClientConfig adminClientConfig; String bootstrapURL; @@ -135,7 +135,12 @@ public StreamingClient(StreamingClientConfig config) { this.bootstrapURL = config.getBootstrapURL(); CHECKPOINT_COMMIT_SIZE = config.getBatchSize(); THROTTLE_QPS = config.getThrottleQPS(); + adminClientConfig = new AdminClientConfig(); + adminClient = new AdminClient(bootstrapURL, adminClientConfig, new ClientConfig()); + } + public AdminClient getAdminClient() { + return adminClient; } public synchronized void updateThrottleLimit(int throttleQPS) { @@ -297,8 +302,6 @@ public synchronized void initStreamingSessions(List stores, List blackListedNodes) { logger.info("Initializing a streaming session"); - adminClientConfig = new AdminClientConfig(); - adminClient = new AdminClient(bootstrapURL, adminClientConfig, new ClientConfig()); this.checkpointCallback = checkpointCallback; this.recoveryCallback = recoveryCallback; this.allowMerge = allowMerge; diff --git a/src/java/voldemort/store/StoreUtils.java b/src/java/voldemort/store/StoreUtils.java index d4122bad1f..7cb2cdb6bf 100644 --- a/src/java/voldemort/store/StoreUtils.java +++ b/src/java/voldemort/store/StoreUtils.java @@ -215,4 +215,14 @@ public static List getStoreNames(List list, boolean ign storeNameSet.add(def.getName()); return storeNameSet; } + + public static HashMap getStoreDefsAsMap(List storeDefs) { + if(storeDefs == null) + return null; + + HashMap storeDefMap = new HashMap(); + for(StoreDefinition def: storeDefs) + storeDefMap.put(def.getName(), def); + return storeDefMap; + } } diff --git a/src/java/voldemort/utils/AbstractConsistencyFixer.java b/src/java/voldemort/utils/AbstractConsistencyFixer.java new file mode 100644 index 0000000000..b14da40e3a --- /dev/null +++ b/src/java/voldemort/utils/AbstractConsistencyFixer.java @@ -0,0 +1,337 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package voldemort.utils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.log4j.Logger; + +import voldemort.VoldemortException; +import voldemort.client.protocol.admin.AdminClient; +import voldemort.client.protocol.admin.QueryKeyResult; +import voldemort.store.routed.NodeValue; +import voldemort.store.routed.ReadRepairer; +import voldemort.utils.ConsistencyFix.BadKey; +import voldemort.utils.ConsistencyFix.Status; +import voldemort.versioning.VectorClock; +import voldemort.versioning.Versioned; + +import com.google.common.collect.Lists; + +abstract class AbstractConsistencyFixer { + + private static final Logger logger = Logger.getLogger(AbstractConsistencyFixer.class); + private static final int fakeNodeID = Integer.MIN_VALUE; + + protected final BadKey badKey; + protected final StoreInstance storeInstance; + protected final AdminClient adminClient; + protected final QueryKeyResult orphanedValues; + + /** + * Normal use case constructor. + * + * @param keyInHexFormat + * @param consistencyFix + * @param badKeyQOut + */ + AbstractConsistencyFixer(BadKey badKey, StoreInstance storeInstance, AdminClient adminClient) { + this(badKey, storeInstance, adminClient, null); + } + + /** + * Constructor for "orphaned values" use case. I.e., there are values for + * the specific key that exist somewhere and may need to be written to the + * nodes which actually host the key. + * + * @param keyInHexFormat + * @param consistencyFix + * @param badKeyQOut + * @param orphanedValues Set to null if no orphaned values to be included. + */ + AbstractConsistencyFixer(BadKey badKey, + StoreInstance storeInstance, + AdminClient adminClient, + QueryKeyResult orphanedValues) { + this.badKey = badKey; + this.storeInstance = storeInstance; + this.adminClient = adminClient; + this.orphanedValues = orphanedValues; + } + + public Status doConsistencyFix() { + // Initialization. + byte[] keyInBytes; + List nodeIdList = null; + int masterPartitionId = -1; + try { + keyInBytes = ByteUtils.fromHexString(badKey.getKeyInHexFormat()); + masterPartitionId = this.storeInstance.getMasterPartitionId(keyInBytes); + nodeIdList = this.storeInstance.getReplicationNodeList(masterPartitionId); + } catch(Exception exception) { + logger.info("Aborting fixKey due to bad init."); + if(logger.isDebugEnabled()) { + exception.printStackTrace(); + } + return Status.BAD_INIT; + } + ByteArray keyAsByteArray = new ByteArray(keyInBytes); + + // Do the reads + Map nodeIdToKeyValues = doReads(nodeIdList, + keyInBytes, + badKey.getKeyInHexFormat()); + + // Process read replies (i.e., nodeIdToKeyValues) + ProcessReadRepliesResult result = processReadReplies(nodeIdList, + keyAsByteArray, + badKey.getKeyInHexFormat(), + nodeIdToKeyValues); + if(result.status != Status.SUCCESS) { + return result.status; + } + + // Resolve conflicts indicated in nodeValues + List> toReadRepair = resolveReadConflicts(result.nodeValues); + if(logger.isTraceEnabled()) { + if(toReadRepair.size() == 0) { + logger.trace("Nothing to repair"); + } + for(NodeValue nodeValue: toReadRepair) { + logger.trace(nodeValue.getNodeId() + " --- " + nodeValue.getKey().toString()); + } + } + + // Do the repairs + Status status = doRepairPut(toReadRepair); + + // return status of last operation (success or otherwise) + return status; + } + + /** + * + * @param nodeIdList + * @param keyInBytes + * @param keyInHexFormat + * @return + */ + private Map doReads(final List nodeIdList, + final byte[] keyInBytes, + final String keyInHexFormat) { + Map nodeIdToKeyValues = new HashMap(); + + ByteArray key = new ByteArray(keyInBytes); + for(int nodeId: nodeIdList) { + List> values = null; + try { + values = this.adminClient.storeOps.getNodeKey(this.storeInstance.getStoreDefinition() + .getName(), + nodeId, + key); + nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, values)); + } catch(VoldemortException ve) { + nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, ve)); + } + } + + return nodeIdToKeyValues; + } + + /** + * Result of an invocation of processReadReplies + */ + private class ProcessReadRepliesResult { + + public final Status status; + public final List> nodeValues; + + /** + * Constructor for error status + */ + ProcessReadRepliesResult(Status status) { + this.status = status; + this.nodeValues = null; + } + + /** + * Constructor for success + */ + ProcessReadRepliesResult(List> nodeValues) { + this.status = Status.SUCCESS; + this.nodeValues = nodeValues; + } + } + + /** + * + * @param nodeIdList + * @param keyAsByteArray + * @param keyInHexFormat + * @param nodeIdToKeyValues + * @param nodeValues Effectively the output of this method. Must pass in a + * non-null object to be populated by this method. + * @return + */ + private ProcessReadRepliesResult processReadReplies(final List nodeIdList, + final ByteArray keyAsByteArray, + final String keyInHexFormat, + final Map nodeIdToKeyValues) { + List> nodeValues = new ArrayList>(); + boolean exceptionsEncountered = false; + for(int nodeId: nodeIdList) { + QueryKeyResult keyValue; + if(nodeIdToKeyValues.containsKey(nodeId)) { + keyValue = nodeIdToKeyValues.get(nodeId); + + if(keyValue.hasException()) { + logger.debug("Exception encountered while fetching key " + keyInHexFormat + + " from node with nodeId " + nodeId + " : " + + keyValue.getException().getMessage()); + exceptionsEncountered = true; + } else { + if(keyValue.getValues().isEmpty()) { + Versioned versioned = new Versioned(null); + nodeValues.add(new NodeValue(nodeId, + keyValue.getKey(), + versioned)); + + } else { + for(Versioned value: keyValue.getValues()) { + nodeValues.add(new NodeValue(nodeId, + keyValue.getKey(), + value)); + } + } + } + } else { + logger.debug("No key-value returned from node with id:" + nodeId); + Versioned versioned = new Versioned(null); + nodeValues.add(new NodeValue(nodeId, keyAsByteArray, versioned)); + } + } + if(exceptionsEncountered) { + logger.info("Aborting fixKey because exceptions were encountered when fetching key-values."); + return new ProcessReadRepliesResult(Status.FETCH_EXCEPTION); + } + + if(logger.isDebugEnabled()) { + for(NodeValue nkv: nodeValues) { + logger.debug("\tRead NodeKeyValue : " + ByteUtils.toHexString(nkv.getKey().get()) + + " on node with id " + nkv.getNodeId() + " for version " + + nkv.getVersion()); + } + } + + return new ProcessReadRepliesResult(nodeValues); + } + + /** + * Decide on the specific key-value to write everywhere. + * + * @param nodeValues + * @return The subset of entries from nodeValues that need to be repaired. + */ + private List> resolveReadConflicts(final List> nodeValues) { + + if(logger.isTraceEnabled()) { + logger.trace("NodeValues passed into resolveReadConflicts."); + if(nodeValues.size() == 0) { + logger.trace("Empty nodeValues passed to resolveReadConflicts"); + } + for(NodeValue nodeValue: nodeValues) { + logger.trace("\t" + nodeValue.getNodeId() + " - " + nodeValue.getKey().toString() + + " - " + nodeValue.getVersion().toString()); + } + } + + // If orphaned values exist, add them to fake nodes to be processed by + // "getRepairs" + int currentFakeNodeId = fakeNodeID; + if(this.orphanedValues != null) { + for(Versioned value: this.orphanedValues.getValues()) { + nodeValues.add(new NodeValue(currentFakeNodeId, + this.orphanedValues.getKey(), + value)); + currentFakeNodeId++; + } + } + + // Some cut-paste-and-modify coding from + // store/routed/action/AbstractReadRepair.java and + // store/routed/ThreadPoolRoutedStore.java + ReadRepairer readRepairer = new ReadRepairer(); + List> nodeKeyValues = readRepairer.getRepairs(nodeValues); + + if(logger.isTraceEnabled()) { + if(nodeKeyValues.size() == 0) { + logger.trace("\treadRepairer returned an empty list."); + } + for(NodeValue nodeKeyValue: nodeKeyValues) { + logger.trace("\tNodeKeyValue result from readRepairer.getRepairs : " + + ByteUtils.toHexString(nodeKeyValue.getKey().get()) + + " on node with id " + nodeKeyValue.getNodeId() + " for version " + + nodeKeyValue.getVersion()); + } + } + + List> toReadRepair = Lists.newArrayList(); + for(NodeValue v: nodeKeyValues) { + if(v.getNodeId() > currentFakeNodeId) { + // Only copy repairs intended for real nodes. + Versioned versioned = Versioned.value(v.getVersioned().getValue(), + ((VectorClock) v.getVersion()).clone()); + toReadRepair.add(new NodeValue(v.getNodeId(), + v.getKey(), + versioned)); + } else { + if(logger.isDebugEnabled()) { + logger.debug("\tIgnoring repair to fake node: " + + ByteUtils.toHexString(v.getKey().get()) + " on node with id " + + v.getNodeId() + " for version " + v.getVersion()); + } + } + + } + + if(logger.isTraceEnabled()) { + if(toReadRepair.size() == 0) { + logger.trace("\ttoReadRepair is empty."); + } + for(NodeValue nodeKeyValue: toReadRepair) { + logger.trace("\tRepair key " + ByteUtils.toHexString(nodeKeyValue.getKey().get()) + + " on node with id " + nodeKeyValue.getNodeId() + " for version " + + nodeKeyValue.getVersion()); + + } + } + return toReadRepair; + } + + /** + * Override this method to place whatever logic you have to handle the + * resolved value + * + * @param toReadRepair Effectively the output of this method. Must pass in a + * non-null object to be populated by this method. + * @return + */ + public abstract Status doRepairPut(final List> toReadRepair); +} \ No newline at end of file diff --git a/src/java/voldemort/utils/ClusterForkLiftTool.java b/src/java/voldemort/utils/ClusterForkLiftTool.java new file mode 100644 index 0000000000..827a859e41 --- /dev/null +++ b/src/java/voldemort/utils/ClusterForkLiftTool.java @@ -0,0 +1,574 @@ +package voldemort.utils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; + +import org.apache.log4j.Logger; + +import voldemort.VoldemortException; +import voldemort.client.ClientConfig; +import voldemort.client.protocol.admin.AdminClient; +import voldemort.client.protocol.admin.AdminClientConfig; +import voldemort.client.protocol.admin.QueryKeyResult; +import voldemort.client.protocol.admin.StreamingClient; +import voldemort.client.protocol.admin.StreamingClientConfig; +import voldemort.cluster.Cluster; +import voldemort.cluster.Node; +import voldemort.store.StoreDefinition; +import voldemort.store.StoreUtils; +import voldemort.versioning.ChainedResolver; +import voldemort.versioning.ObsoleteVersionException; +import voldemort.versioning.TimeBasedInconsistencyResolver; +import voldemort.versioning.VectorClock; +import voldemort.versioning.VectorClockInconsistencyResolver; +import voldemort.versioning.Versioned; + +import com.google.common.collect.Lists; + +/** + * Tool to fork lift data over from a source cluster to a destination cluster. + * When used in conjunction with a client that "double writes" to both the + * clusters, this can be a used as a feasible store migration tool to move an + * existing store to a new cluster. + * + * There are two modes around how the divergent versions of a key are + * consolidated from the source cluster. : + * + * 1) Primary only Resolution ({@link SinglePartitionForkLiftTask}: The entries + * on the primary partition are moved over to the destination cluster with empty + * vector clocks. if any key has multiple versions on the primary, they are + * resolved. This approach is fast and is best suited if you deem the replicas + * being very much in sync with each other. This is the DEFAULT mode + * + * 2) Global Resolution ({@link SinglePartitionGloballyResolvingForkLiftTask} : + * The keys belonging to a partition are fetched out of the primary replica, and + * for each such key, the corresponding values are obtained from all other + * replicas, using get(..) operations. These versions are then resolved and + * written back to the destination cluster as before. This approach is slow + * since it involves several roundtrips to the server for each key (some + * potentially cross colo) and hence should be used when thorough version + * resolution is neccessary or the admin deems the replicas being fairly + * out-of-sync + * + * + * In both mode, the default chained resolver ( + * {@link VectorClockInconsistencyResolver} + + * {@link TimeBasedInconsistencyResolver} is used to determine a final resolved + * version. + * + * NOTES: + * + * 1) If the tool fails for some reason in the middle, the admin can restart the + * tool for the failed partitions alone. The keys that were already written in + * the failed partitions, will all experience {@link ObsoleteVersionException} + * and the un-inserted keys will be inserted. + * + * 2) Since the forklift writes are issued with empty vector clocks, they will + * always yield to online writes happening on the same key, before or during the + * forklift window. Of course, after the forklift window, the destination + * cluster resumes normal operation. + * + */ +public class ClusterForkLiftTool implements Runnable { + + private static Logger logger = Logger.getLogger(ClusterForkLiftTool.class); + private static final int DEFAULT_MAX_PUTS_PER_SEC = 500; + private static final int DEFAULT_PROGRESS_PERIOD_OPS = 100000; + private static final int DEFAULT_PARTITION_PARALLELISM = 8; + private static final int DEFAULT_WORKER_POOL_SHUTDOWN_WAIT_MINS = 5; + + private final AdminClient srcAdminClient; + private final StreamingClient dstStreamingClient; + private final List storesList; + private final ExecutorService workerPool; + private final int progressOps; + private final HashMap srcStoreDefMap; + private final List partitionList; + private final boolean globalResolution; + + public ClusterForkLiftTool(String srcBootstrapUrl, + String dstBootstrapUrl, + int maxPutsPerSecond, + int partitionParallelism, + int progressOps, + List storesList, + List partitions, + boolean globalResolution) { + // set up AdminClient on source cluster + this.srcAdminClient = new AdminClient(srcBootstrapUrl, + new AdminClientConfig(), + new ClientConfig()); + + // set up streaming client to the destination cluster + Props property = new Props(); + property.put("streaming.platform.bootstrapURL", dstBootstrapUrl); + property.put("streaming.platform.throttle.qps", maxPutsPerSecond); + StreamingClientConfig config = new StreamingClientConfig(property); + this.dstStreamingClient = new StreamingClient(config); + + // determine and verify final list of stores to be forklifted over + if(storesList != null) { + this.storesList = storesList; + } else { + this.storesList = StoreUtils.getStoreNames(srcAdminClient.metadataMgmtOps.getRemoteStoreDefList(0) + .getValue(), + true); + } + this.srcStoreDefMap = checkStoresOnBothSides(); + + // determine the partitions to be fetched + if(partitions != null) { + this.partitionList = partitions; + } else { + this.partitionList = new ArrayList(srcAdminClient.getAdminClientCluster() + .getNumberOfPartitions()); + for(Node node: srcAdminClient.getAdminClientCluster().getNodes()) + this.partitionList.addAll(node.getPartitionIds()); + if(this.partitionList.size() > srcAdminClient.getAdminClientCluster() + .getNumberOfPartitions()) { + throw new VoldemortException("Incorrect partition mapping in source cluster"); + } + } + + // set up thread pool to parallely forklift partitions + this.workerPool = Executors.newFixedThreadPool(partitionParallelism); + this.progressOps = progressOps; + this.globalResolution = globalResolution; + } + + private HashMap checkStoresOnBothSides() { + List srcStoreDefs = srcAdminClient.metadataMgmtOps.getRemoteStoreDefList(0) + .getValue(); + HashMap srcStoreDefMap = StoreUtils.getStoreDefsAsMap(srcStoreDefs); + List dstStoreDefs = dstStreamingClient.getAdminClient().metadataMgmtOps.getRemoteStoreDefList(0) + .getValue(); + HashMap dstStoreDefMap = StoreUtils.getStoreDefsAsMap(dstStoreDefs); + + Set storesToSkip = new HashSet(); + for(String store: storesList) { + if(!srcStoreDefMap.containsKey(store)) { + logger.warn("Store " + store + " does not exist in source cluster "); + storesToSkip.add(store); + } + if(!dstStoreDefMap.containsKey(store)) { + logger.warn("Store " + store + " does not exist in destination cluster "); + storesToSkip.add(store); + } + } + logger.warn("List of stores that will be skipped :" + storesToSkip); + storesList.removeAll(storesToSkip); + return srcStoreDefMap; + } + + abstract class SinglePartitionForkLiftTask { + + protected int partitionId; + protected CountDownLatch latch; + protected StoreInstance storeInstance; + protected String workName; + + SinglePartitionForkLiftTask(StoreInstance storeInstance, + int partitionId, + CountDownLatch latch) { + this.partitionId = partitionId; + this.latch = latch; + this.storeInstance = storeInstance; + workName = "[Store: " + storeInstance.getStoreDefinition().getName() + ", Partition: " + + this.partitionId + "] "; + } + } + + /** + * Fetches keys belonging the primary partition, and then fetches values for + * that key from all replicas in a non-streaming fashion, applies the + * default resolver and writes it back to the destination cluster + * + * TODO a streaming N way merge is the more efficient & correct solution. + * Without this, the resolving can be very slow due to cross data center + * get(..) + */ + class SinglePartitionGloballyResolvingForkLiftTask extends SinglePartitionForkLiftTask + implements Runnable { + + SinglePartitionGloballyResolvingForkLiftTask(StoreInstance storeInstance, + int partitionId, + CountDownLatch latch) { + super(storeInstance, partitionId, latch); + } + + /** + * For now, we will fallback to fetching the key from the primary + * replica, fetch the values out manually, resolve and write it back. + * PitFalls : primary somehow does not have the key. + * + * Two scenarios. + * + * 1) Key active after double writes: the situation is the result of + * slop not propagating to the primary. But double writes would write + * the key back to destination cluster anyway. We are good. + * + * 2) Key inactive after double writes: This indicates a problem + * elsewhere. This is a base guarantee voldemort should offer. + * + */ + public void run() { + String storeName = this.storeInstance.getStoreDefinition().getName(); + long entriesForkLifted = 0; + try { + logger.info(workName + "Starting processing"); + ChainedResolver> resolver = new ChainedResolver>(new VectorClockInconsistencyResolver(), + new TimeBasedInconsistencyResolver()); + Iterator keyItr = srcAdminClient.bulkFetchOps.fetchKeys(storeInstance.getNodeIdForPartitionId(this.partitionId), + storeName, + Lists.newArrayList(this.partitionId), + null, + true); + List nodeList = storeInstance.getReplicationNodeList(this.partitionId); + while(keyItr.hasNext()) { + ByteArray keyToResolve = keyItr.next(); + Map valuesMap = doReads(nodeList, keyToResolve.get()); + List> values = new ArrayList>(valuesMap.size()); + for(Map.Entry entry: valuesMap.entrySet()) { + int nodeId = entry.getKey(); + QueryKeyResult result = entry.getValue(); + + if(result.hasException()) { + logger.error(workName + "key fetch failed for key " + + ByteUtils.toHexString(keyToResolve.get()) + + " on node " + nodeId, + result.getException()); + break; + } + values.addAll(result.getValues()); + } + + List> resolvedVersions = resolver.resolveConflicts(values); + // after timestamp based resolving there should be only one + // version. Insert that to the destination cluster with + // empty vector clock + if(resolvedVersions.size() > 1) { + throw new VoldemortException("More than one resolved versions, key: " + + ByteUtils.toHexString(keyToResolve.get()) + + " vals:" + resolvedVersions); + } + dstStreamingClient.streamingPut(keyToResolve, + new Versioned(resolvedVersions.get(0) + .getValue())); + + entriesForkLifted++; + if(entriesForkLifted % progressOps == 0) { + logger.info(workName + " fork lifted " + entriesForkLifted + + " entries successfully"); + } + } + logger.info(workName + "Completed processing " + entriesForkLifted + " records"); + } catch(Exception e) { + // all work should stop if we get here + logger.error(workName + "Error forklifting data ", e); + } finally { + latch.countDown(); + } + } + + /** + * + * @param nodeIdList + * @param keyInBytes + * @return + */ + private Map doReads(final List nodeIdList, + final byte[] keyInBytes) { + Map nodeIdToKeyValues = new HashMap(); + + ByteArray key = new ByteArray(keyInBytes); + for(int nodeId: nodeIdList) { + List> values = null; + try { + values = srcAdminClient.storeOps.getNodeKey(storeInstance.getStoreDefinition() + .getName(), + nodeId, + key); + nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, values)); + } catch(VoldemortException ve) { + nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, ve)); + } + } + return nodeIdToKeyValues; + } + } + + /** + * Simply fetches the data for the partition from the primary replica and + * writes it into the destination cluster. Works well when the replicas are + * fairly consistent. + * + */ + class SinglePartitionPrimaryResolvingForkLiftTask extends SinglePartitionForkLiftTask implements + Runnable { + + SinglePartitionPrimaryResolvingForkLiftTask(StoreInstance storeInstance, + int partitionId, + CountDownLatch latch) { + super(storeInstance, partitionId, latch); + } + + @Override + public void run() { + String storeName = this.storeInstance.getStoreDefinition().getName(); + long entriesForkLifted = 0; + ChainedResolver> resolver = new ChainedResolver>(new VectorClockInconsistencyResolver(), + new TimeBasedInconsistencyResolver()); + try { + logger.info(workName + "Starting processing"); + Iterator>> entryItr = srcAdminClient.bulkFetchOps.fetchEntries(storeInstance.getNodeIdForPartitionId(this.partitionId), + storeName, + Lists.newArrayList(this.partitionId), + null, + true); + ByteArray prevKey = null; + List> vals = new ArrayList>(); + + while(entryItr.hasNext()) { + Pair> record = entryItr.next(); + ByteArray key = record.getFirst(); + Versioned versioned = record.getSecond(); + + if(prevKey != null && !prevKey.equals(key)) { + // resolve and write, if you see a new key + List> resolvedVersions = resolver.resolveConflicts(vals); + if(resolvedVersions.size() > 1) { + throw new VoldemortException("More than one resolved versions, key: " + + ByteUtils.toHexString(prevKey.get()) + + " vals:" + resolvedVersions); + } + Versioned resolvedVersioned = resolvedVersions.get(0); + // an empty vector clock will ensure, online traffic + // will always win over the forklift writes + Versioned newEntry = new Versioned(resolvedVersioned.getValue(), + new VectorClock(((VectorClock) resolvedVersioned.getVersion()).getTimestamp())); + + dstStreamingClient.streamingPut(prevKey, newEntry); + entriesForkLifted++; + if(entriesForkLifted % progressOps == 0) { + logger.info(workName + " fork lifted " + entriesForkLifted + + " entries successfully"); + } + vals = new ArrayList>(); + } + vals.add(versioned); + prevKey = key; + } + + // process the last record + if(vals.size() > 0) { + List> resolvedVals = resolver.resolveConflicts(vals); + assert resolvedVals.size() == 1; + Versioned resolvedVersioned = resolvedVals.get(0); + Versioned newEntry = new Versioned(resolvedVersioned.getValue(), + new VectorClock(((VectorClock) resolvedVersioned.getVersion()).getTimestamp())); + dstStreamingClient.streamingPut(prevKey, newEntry); + entriesForkLifted++; + } + + logger.info(workName + "Completed processing " + entriesForkLifted + " records"); + } catch(Exception e) { + // if for some reason this partition fails, we will have retry + // again for those partitions alone. + logger.error(workName + "Error forklifting data ", e); + } finally { + latch.countDown(); + } + } + } + + @Override + public void run() { + final Cluster srcCluster = srcAdminClient.getAdminClientCluster(); + try { + // process stores one-by-one + for(String store: storesList) { + logger.info("Processing store " + store); + dstStreamingClient.initStreamingSession(store, new Callable() { + + @Override + public Object call() throws Exception { + + return null; + } + }, new Callable() { + + @Override + public Object call() throws Exception { + + return null; + } + }, true); + + final CountDownLatch latch = new CountDownLatch(srcCluster.getNumberOfPartitions()); + StoreInstance storeInstance = new StoreInstance(srcCluster, + srcStoreDefMap.get(store)); + + // submit work on every partition that is to be forklifted + for(Integer partitionId: partitionList) { + if(this.globalResolution) { + // do thorough global resolution across replicas + SinglePartitionGloballyResolvingForkLiftTask work = new SinglePartitionGloballyResolvingForkLiftTask(storeInstance, + partitionId, + latch); + workerPool.submit(work); + } else { + // do the less cleaner, but much faster route + SinglePartitionPrimaryResolvingForkLiftTask work = new SinglePartitionPrimaryResolvingForkLiftTask(storeInstance, + partitionId, + latch); + workerPool.submit(work); + } + } + + // wait till all the partitions are processed + latch.await(); + dstStreamingClient.closeStreamingSession(); + logger.info("Finished processing store " + store); + } + } catch(Exception e) { + logger.error("Exception running forklift tool", e); + } finally { + workerPool.shutdown(); + try { + workerPool.awaitTermination(DEFAULT_WORKER_POOL_SHUTDOWN_WAIT_MINS, + TimeUnit.MINUTES); + } catch(InterruptedException ie) { + logger.error("InterruptedException while waiting for worker pool to shutdown", ie); + } + srcAdminClient.close(); + dstStreamingClient.getAdminClient().close(); + // TODO cleanly shut down the threadpool + System.exit(0); + } + } + + /** + * Return args parser + * + * @return program parser + * */ + private static OptionParser getParser() { + OptionParser parser = new OptionParser(); + parser.accepts("help", "print help information"); + parser.accepts("src-url", "[REQUIRED] bootstrap URL of source cluster") + .withRequiredArg() + .describedAs("source-bootstrap-url") + .ofType(String.class); + parser.accepts("dst-url", "[REQUIRED] bootstrap URL of destination cluster") + .withRequiredArg() + .describedAs("destination-bootstrap-url") + .ofType(String.class); + parser.accepts("stores", + "Store names to forklift. Comma delimited list or singleton. [Default: ALL SOURCE STORES]") + .withRequiredArg() + .describedAs("stores") + .withValuesSeparatedBy(',') + .ofType(String.class); + parser.accepts("partitions", + "partitions to forklift. Comma delimited list or singleton. [Default: ALL SOURCE PARTITIONS]") + .withRequiredArg() + .describedAs("partitions") + .withValuesSeparatedBy(',') + .ofType(Integer.class); + parser.accepts("max-puts-per-second", + "Maximum number of put(...) operations issued against destination cluster per second. [Default: " + + DEFAULT_MAX_PUTS_PER_SEC + " ]") + .withRequiredArg() + .describedAs("maxPutsPerSecond") + .ofType(Integer.class); + parser.accepts("progress-period-ops", + "Number of operations between progress info is displayed. [Default: " + + DEFAULT_PROGRESS_PERIOD_OPS + " ]") + .withRequiredArg() + .describedAs("progressPeriodOps") + .ofType(Integer.class); + parser.accepts("parallelism", + "Number of partitions to fetch in parallel. [Default: " + + DEFAULT_PARTITION_PARALLELISM + " ]") + .withRequiredArg() + .describedAs("partitionParallelism") + .ofType(Integer.class); + parser.accepts("global-resolution", + "Determines if a thorough global resolution needs to be done, by comparing all replicas. [Default: Fetch from primary alone ]"); + return parser; + } + + /** + * @param args + */ + public static void main(String[] args) throws Exception { + OptionParser parser = null; + OptionSet options = null; + try { + parser = getParser(); + options = parser.parse(args); + } catch(Exception oe) { + logger.error("Exception processing command line options", oe); + parser.printHelpOn(System.out); + return; + } + + /* validate options */ + if(options.has("help")) { + parser.printHelpOn(System.out); + return; + } + + if(!options.has("src-url") || !options.has("dst-url")) { + logger.error("Both 'src-url' and 'dst-url' options are mandatory"); + parser.printHelpOn(System.out); + return; + } + + String srcBootstrapUrl = (String) options.valueOf("src-url"); + String dstBootstrapUrl = (String) options.valueOf("dst-url"); + int maxPutsPerSecond = DEFAULT_MAX_PUTS_PER_SEC; + if(options.has("max-puts-per-second")) + maxPutsPerSecond = (Integer) options.valueOf("max-puts-per-second"); + List storesList = null; + if(options.has("stores")) { + storesList = (List) options.valuesOf("stores"); + } + List partitions = null; + if(options.has("partitions")) { + partitions = (List) options.valuesOf("partitions"); + } + + int partitionParallelism = DEFAULT_PARTITION_PARALLELISM; + if(options.has("parallelism")) { + partitionParallelism = (Integer) options.valueOf("parallelism"); + } + int progressOps = DEFAULT_PROGRESS_PERIOD_OPS; + if(options.has("progress-period-ops")) { + progressOps = (Integer) options.valueOf("progress-period-ops"); + } + + ClusterForkLiftTool forkLiftTool = new ClusterForkLiftTool(srcBootstrapUrl, + dstBootstrapUrl, + maxPutsPerSecond, + partitionParallelism, + progressOps, + storesList, + partitions, + options.has("global-resolution")); + forkLiftTool.run(); + } +} diff --git a/src/java/voldemort/utils/ConsistencyFixWorker.java b/src/java/voldemort/utils/ConsistencyFixWorker.java index 63b6a0b97c..f04c4fe14d 100644 --- a/src/java/voldemort/utils/ConsistencyFixWorker.java +++ b/src/java/voldemort/utils/ConsistencyFixWorker.java @@ -16,10 +16,7 @@ package voldemort.utils; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.BlockingQueue; import org.apache.log4j.Logger; @@ -27,25 +24,17 @@ import voldemort.VoldemortException; import voldemort.client.protocol.admin.QueryKeyResult; import voldemort.store.routed.NodeValue; -import voldemort.store.routed.ReadRepairer; import voldemort.utils.ConsistencyFix.BadKey; import voldemort.utils.ConsistencyFix.BadKeyStatus; import voldemort.utils.ConsistencyFix.Status; import voldemort.versioning.ObsoleteVersionException; -import voldemort.versioning.VectorClock; -import voldemort.versioning.Versioned; -import com.google.common.collect.Lists; - -class ConsistencyFixWorker implements Runnable { +class ConsistencyFixWorker extends AbstractConsistencyFixer implements Runnable { private static final Logger logger = Logger.getLogger(ConsistencyFixWorker.class); - private static final int fakeNodeID = Integer.MIN_VALUE; - private final BadKey badKey; private final ConsistencyFix consistencyFix; private final BlockingQueue badKeyQOut; - private final QueryKeyResult orphanedValues; /** * Normal use case constructor. @@ -74,10 +63,12 @@ class ConsistencyFixWorker implements Runnable { ConsistencyFix consistencyFix, BlockingQueue badKeyQOut, QueryKeyResult orphanedValues) { - this.badKey = badKey; + super(badKey, + consistencyFix.getStoreInstance(), + consistencyFix.getAdminClient(), + orphanedValues); this.consistencyFix = consistencyFix; this.badKeyQOut = badKeyQOut; - this.orphanedValues = orphanedValues; } private String myName() { @@ -87,7 +78,7 @@ private String myName() { @Override public void run() { logger.trace("About to process key " + badKey + " (" + myName() + ")"); - Status status = doConsistencyFix(badKey); + Status status = doConsistencyFix(); logger.trace("Finished processing key " + badKey + " (" + myName() + ")"); consistencyFix.getStats().incrementFixCount(); @@ -101,261 +92,13 @@ public void run() { } } - public Status doConsistencyFix(BadKey badKey) { - // Initialization. - byte[] keyInBytes; - List nodeIdList = null; - int masterPartitionId = -1; - try { - keyInBytes = ByteUtils.fromHexString(badKey.getKeyInHexFormat()); - masterPartitionId = consistencyFix.getStoreInstance().getMasterPartitionId(keyInBytes); - nodeIdList = consistencyFix.getStoreInstance() - .getReplicationNodeList(masterPartitionId); - } catch(Exception exception) { - logger.info("Aborting fixKey due to bad init."); - if(logger.isDebugEnabled()) { - exception.printStackTrace(); - } - return Status.BAD_INIT; - } - ByteArray keyAsByteArray = new ByteArray(keyInBytes); - - // Do the reads - Map nodeIdToKeyValues = doReads(nodeIdList, - keyInBytes, - badKey.getKeyInHexFormat()); - - // Process read replies (i.e., nodeIdToKeyValues) - ProcessReadRepliesResult result = processReadReplies(nodeIdList, - keyAsByteArray, - badKey.getKeyInHexFormat(), - nodeIdToKeyValues); - if(result.status != Status.SUCCESS) { - return result.status; - } - - // Resolve conflicts indicated in nodeValues - List> toReadRepair = resolveReadConflicts(result.nodeValues); - if(logger.isTraceEnabled()) { - if(toReadRepair.size() == 0) { - logger.trace("Nothing to repair"); - } - for(NodeValue nodeValue: toReadRepair) { - logger.trace(nodeValue.getNodeId() + " --- " + nodeValue.getKey().toString()); - } - } - - // Do the repairs - Status status = doRepairPut(toReadRepair); - - // return status of last operation (success or otherwise) - return status; - } - - /** - * - * @param nodeIdList - * @param keyInBytes - * @param keyInHexFormat - * @return - */ - private Map doReads(final List nodeIdList, - final byte[] keyInBytes, - final String keyInHexFormat) { - Map nodeIdToKeyValues = new HashMap(); - - ByteArray key = new ByteArray(keyInBytes); - for(int nodeId: nodeIdList) { - List> values = null; - try { - values = consistencyFix.getAdminClient().storeOps.getNodeKey(consistencyFix.getStoreName(), - nodeId, - key); - nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, values)); - } catch(VoldemortException ve) { - nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, ve)); - } - } - - return nodeIdToKeyValues; - } - - /** - * Result of an invocation of processReadReplies - */ - private class ProcessReadRepliesResult { - - public final Status status; - public final List> nodeValues; - - /** - * Constructor for error status - */ - ProcessReadRepliesResult(Status status) { - this.status = status; - this.nodeValues = null; - } - - /** - * Constructor for success - */ - ProcessReadRepliesResult(List> nodeValues) { - this.status = Status.SUCCESS; - this.nodeValues = nodeValues; - } - } - - /** - * - * @param nodeIdList - * @param keyAsByteArray - * @param keyInHexFormat - * @param nodeIdToKeyValues - * @param nodeValues Effectively the output of this method. Must pass in a - * non-null object to be populated by this method. - * @return - */ - private ProcessReadRepliesResult processReadReplies(final List nodeIdList, - final ByteArray keyAsByteArray, - final String keyInHexFormat, - final Map nodeIdToKeyValues) { - List> nodeValues = new ArrayList>(); - boolean exceptionsEncountered = false; - for(int nodeId: nodeIdList) { - QueryKeyResult keyValue; - if(nodeIdToKeyValues.containsKey(nodeId)) { - keyValue = nodeIdToKeyValues.get(nodeId); - - if(keyValue.hasException()) { - logger.debug("Exception encountered while fetching key " + keyInHexFormat - + " from node with nodeId " + nodeId + " : " - + keyValue.getException().getMessage()); - exceptionsEncountered = true; - } else { - if(keyValue.getValues().isEmpty()) { - Versioned versioned = new Versioned(null); - nodeValues.add(new NodeValue(nodeId, - keyValue.getKey(), - versioned)); - - } else { - for(Versioned value: keyValue.getValues()) { - nodeValues.add(new NodeValue(nodeId, - keyValue.getKey(), - value)); - } - } - } - } else { - logger.debug("No key-value returned from node with id:" + nodeId); - Versioned versioned = new Versioned(null); - nodeValues.add(new NodeValue(nodeId, keyAsByteArray, versioned)); - } - } - if(exceptionsEncountered) { - logger.info("Aborting fixKey because exceptions were encountered when fetching key-values."); - return new ProcessReadRepliesResult(Status.FETCH_EXCEPTION); - } - - if(logger.isDebugEnabled()) { - for(NodeValue nkv: nodeValues) { - logger.debug("\tRead NodeKeyValue : " + ByteUtils.toHexString(nkv.getKey().get()) - + " on node with id " + nkv.getNodeId() + " for version " - + nkv.getVersion()); - } - } - - return new ProcessReadRepliesResult(nodeValues); - } - - /** - * Decide on the specific key-value to write everywhere. - * - * @param nodeValues - * @return The subset of entries from nodeValues that need to be repaired. - */ - private List> resolveReadConflicts(final List> nodeValues) { - - if(logger.isTraceEnabled()) { - logger.trace("NodeValues passed into resolveReadConflicts."); - if(nodeValues.size() == 0) { - logger.trace("Empty nodeValues passed to resolveReadConflicts"); - } - for(NodeValue nodeValue: nodeValues) { - logger.trace("\t" + nodeValue.getNodeId() + " - " + nodeValue.getKey().toString() - + " - " + nodeValue.getVersion().toString()); - } - } - - // If orphaned values exist, add them to fake nodes to be processed by - // "getRepairs" - int currentFakeNodeId = fakeNodeID; - if(this.orphanedValues != null) { - for(Versioned value: this.orphanedValues.getValues()) { - nodeValues.add(new NodeValue(currentFakeNodeId, - this.orphanedValues.getKey(), - value)); - currentFakeNodeId++; - } - } - - // Some cut-paste-and-modify coding from - // store/routed/action/AbstractReadRepair.java and - // store/routed/ThreadPoolRoutedStore.java - ReadRepairer readRepairer = new ReadRepairer(); - List> nodeKeyValues = readRepairer.getRepairs(nodeValues); - - if(logger.isTraceEnabled()) { - if(nodeKeyValues.size() == 0) { - logger.trace("\treadRepairer returned an empty list."); - } - for(NodeValue nodeKeyValue: nodeKeyValues) { - logger.trace("\tNodeKeyValue result from readRepairer.getRepairs : " - + ByteUtils.toHexString(nodeKeyValue.getKey().get()) - + " on node with id " + nodeKeyValue.getNodeId() + " for version " - + nodeKeyValue.getVersion()); - } - } - - List> toReadRepair = Lists.newArrayList(); - for(NodeValue v: nodeKeyValues) { - if(v.getNodeId() > currentFakeNodeId) { - // Only copy repairs intended for real nodes. - Versioned versioned = Versioned.value(v.getVersioned().getValue(), - ((VectorClock) v.getVersion()).clone()); - toReadRepair.add(new NodeValue(v.getNodeId(), - v.getKey(), - versioned)); - } else { - if(logger.isDebugEnabled()) { - logger.debug("\tIgnoring repair to fake node: " - + ByteUtils.toHexString(v.getKey().get()) + " on node with id " - + v.getNodeId() + " for version " + v.getVersion()); - } - } - - } - - if(logger.isTraceEnabled()) { - if(toReadRepair.size() == 0) { - logger.trace("\ttoReadRepair is empty."); - } - for(NodeValue nodeKeyValue: toReadRepair) { - logger.trace("\tRepair key " + ByteUtils.toHexString(nodeKeyValue.getKey().get()) - + " on node with id " + nodeKeyValue.getNodeId() + " for version " - + nodeKeyValue.getVersion()); - - } - } - return toReadRepair; - } - /** * * @param toReadRepair Effectively the output of this method. Must pass in a * non-null object to be populated by this method. * @return */ + @Override public Status doRepairPut(final List> toReadRepair) { if(this.consistencyFix.isDryRun()) { logger.debug("Returning success from ConsistencyFixWorker.doRepairPut because this is a dry run."); From 560bd67828e328631ffec002b66adad7e88d95fb Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Wed, 10 Apr 2013 18:41:30 -0700 Subject: [PATCH 05/10] Adding unit test for fork lift tool --- .../utils/AbstractConsistencyFixer.java | 10 + .../voldemort/utils/ClusterForkLiftTool.java | 56 ++-- .../config/two-stores-replicated.xml | 39 +++ .../utils/ClusterForkLiftToolTest.java | 302 ++++++++++++++++++ 4 files changed, 386 insertions(+), 21 deletions(-) create mode 100644 test/common/voldemort/config/two-stores-replicated.xml create mode 100644 test/unit/voldemort/utils/ClusterForkLiftToolTest.java diff --git a/src/java/voldemort/utils/AbstractConsistencyFixer.java b/src/java/voldemort/utils/AbstractConsistencyFixer.java index b14da40e3a..c13dc1529a 100644 --- a/src/java/voldemort/utils/AbstractConsistencyFixer.java +++ b/src/java/voldemort/utils/AbstractConsistencyFixer.java @@ -35,6 +35,16 @@ import com.google.common.collect.Lists; +/** + * Base class that contains logic to read the state of a key on a cluster and + * resolve it using read repair + * + * NOTE : For now, there is only one subclass extending this class, to perform + * Consistency fixing by writing the resolved versions back to the cluster. Any + * future tool that needs similar functionality can extend this class and + * implement logic to do whatever it wants to do with the resolved versions. + * + */ abstract class AbstractConsistencyFixer { private static final Logger logger = Logger.getLogger(AbstractConsistencyFixer.class); diff --git a/src/java/voldemort/utils/ClusterForkLiftTool.java b/src/java/voldemort/utils/ClusterForkLiftTool.java index 827a859e41..67a7e42538 100644 --- a/src/java/voldemort/utils/ClusterForkLiftTool.java +++ b/src/java/voldemort/utils/ClusterForkLiftTool.java @@ -81,6 +81,29 @@ * forklift window. Of course, after the forklift window, the destination * cluster resumes normal operation. * + * 3) For now, we will fallback to fetching the key from the primary replica, + * fetch the values out manually, resolve and write it back. PitFalls : primary + * somehow does not have the key. + * + * Two scenarios. + * + * 1) Key active after double writes: the situation is the result of slop not + * propagating to the primary. But double writes would write the key back to + * destination cluster anyway. We are good. + * + * 2) Key inactive after double writes: This indicates a problem elsewhere. This + * is a base guarantee voldemort should offer. + * + * 4) Zoned <-> Non Zoned forklift implications. + * + * When forklifting data from a non-zoned to zoned cluster, both destination + * zones will be populated with data, by simply running the tool once with the + * respective bootstrap urls. If you need to forklift data from zoned to + * non-zoned clusters (i.e your replication between datacenters is not handled + * by Voldemort), then you need to run the tool twice for each destination + * non-zoned cluster. Zoned -> Zoned and Non-Zoned -> Non-Zoned forklifts are + * trivial. + * */ public class ClusterForkLiftTool implements Runnable { @@ -113,10 +136,10 @@ public ClusterForkLiftTool(String srcBootstrapUrl, new ClientConfig()); // set up streaming client to the destination cluster - Props property = new Props(); - property.put("streaming.platform.bootstrapURL", dstBootstrapUrl); - property.put("streaming.platform.throttle.qps", maxPutsPerSecond); - StreamingClientConfig config = new StreamingClientConfig(property); + Props props = new Props(); + props.put("streaming.platform.bootstrapURL", dstBootstrapUrl); + props.put("streaming.platform.throttle.qps", maxPutsPerSecond); + StreamingClientConfig config = new StreamingClientConfig(props); this.dstStreamingClient = new StreamingClient(config); // determine and verify final list of stores to be forklifted over @@ -173,6 +196,12 @@ private HashMap checkStoresOnBothSides() { return srcStoreDefMap; } + /** + * TODO this base class can potentially provide some framework of execution + * for the subclasses, to yield a better objected oriented design (progress + * tracking etc) + * + */ abstract class SinglePartitionForkLiftTask { protected int partitionId; @@ -209,21 +238,6 @@ class SinglePartitionGloballyResolvingForkLiftTask extends SinglePartitionForkLi super(storeInstance, partitionId, latch); } - /** - * For now, we will fallback to fetching the key from the primary - * replica, fetch the values out manually, resolve and write it back. - * PitFalls : primary somehow does not have the key. - * - * Two scenarios. - * - * 1) Key active after double writes: the situation is the result of - * slop not propagating to the primary. But double writes would write - * the key back to destination cluster anyway. We are good. - * - * 2) Key inactive after double writes: This indicates a problem - * elsewhere. This is a base guarantee voldemort should offer. - * - */ public void run() { String storeName = this.storeInstance.getStoreDefinition().getName(); long entriesForkLifted = 0; @@ -455,8 +469,6 @@ public Object call() throws Exception { } srcAdminClient.close(); dstStreamingClient.getAdminClient().close(); - // TODO cleanly shut down the threadpool - System.exit(0); } } @@ -570,5 +582,7 @@ public static void main(String[] args) throws Exception { partitions, options.has("global-resolution")); forkLiftTool.run(); + // TODO cleanly shut down the hanging threadpool + System.exit(0); } } diff --git a/test/common/voldemort/config/two-stores-replicated.xml b/test/common/voldemort/config/two-stores-replicated.xml new file mode 100644 index 0000000000..3e15423fc0 --- /dev/null +++ b/test/common/voldemort/config/two-stores-replicated.xml @@ -0,0 +1,39 @@ + + + + test + bdb + client + 2 + 1 + 1 + 1 + 1 + + string + UTF-8 + + + string + UTF-8 + + + + best + bdb + client + 3 + 2 + 2 + 2 + 2 + + string + UTF-8 + + + string + UTF-8 + + + diff --git a/test/unit/voldemort/utils/ClusterForkLiftToolTest.java b/test/unit/voldemort/utils/ClusterForkLiftToolTest.java new file mode 100644 index 0000000000..be59902376 --- /dev/null +++ b/test/unit/voldemort/utils/ClusterForkLiftToolTest.java @@ -0,0 +1,302 @@ +package voldemort.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import voldemort.ServerTestUtils; +import voldemort.client.ClientConfig; +import voldemort.client.SocketStoreClientFactory; +import voldemort.client.StoreClient; +import voldemort.client.StoreClientFactory; +import voldemort.client.protocol.admin.AdminClient; +import voldemort.client.protocol.admin.AdminClientConfig; +import voldemort.cluster.Cluster; +import voldemort.cluster.Node; +import voldemort.server.VoldemortServer; +import voldemort.store.StoreDefinition; +import voldemort.store.StoreUtils; +import voldemort.store.routed.NodeValue; +import voldemort.store.socket.SocketStoreFactory; +import voldemort.store.socket.clientrequest.ClientRequestExecutorPool; +import voldemort.versioning.ClockEntry; +import voldemort.versioning.VectorClock; +import voldemort.versioning.Versioned; +import voldemort.xml.StoreDefinitionsMapper; + +import com.google.common.collect.Lists; + +public class ClusterForkLiftToolTest { + + final static String STORES_XML = "test/common/voldemort/config/two-stores-replicated.xml"; + final static String PRIMARY_RESOLVING_STORE_NAME = "test"; + final static String GLOBALLY_RESOLVING_STORE_NAME = "best"; + + private String srcBootStrapUrl; + private String dstBootStrapUrl; + private Cluster srcCluster; + private Cluster dstCluster; + private VoldemortServer[] srcServers; + private VoldemortServer[] dstServers; + + private StoreDefinition primaryResolvingStoreDef; + private StoreDefinition globallyResolvingStoreDef; + + private HashMap kvPairs; + private String firstKey; + private String lastKey; + private String conflictKey; + + private AdminClient srcAdminClient; + + private StoreClientFactory srcfactory; + private StoreClientFactory dstfactory; + private StoreClient srcPrimaryResolvingStoreClient; + private StoreClient dstPrimaryResolvingStoreClient; + private StoreClient srcGloballyResolvingStoreClient; + private StoreClient dstGloballyResolvingStoreClient; + + @Before + public void setUpClusters() { + // setup four nodes with one store and one partition + final SocketStoreFactory socketStoreFactory = new ClientRequestExecutorPool(2, + 10000, + 100000, + 32 * 1024); + + try { + int srcPartitionMap[][] = { { 0 }, { 1 }, { 2 }, { 3 } }; + srcServers = new VoldemortServer[4]; + srcCluster = ServerTestUtils.startVoldemortCluster(4, + srcServers, + srcPartitionMap, + socketStoreFactory, + true, + null, + STORES_XML, + new Properties()); + Node node = srcCluster.getNodeById(0); + srcBootStrapUrl = "tcp://" + node.getHost() + ":" + node.getSocketPort(); + + int dstPartitionMap[][] = { { 0 }, { 1 }, { 2 } }; + dstServers = new VoldemortServer[3]; + dstCluster = ServerTestUtils.startVoldemortCluster(3, + dstServers, + dstPartitionMap, + socketStoreFactory, + true, + null, + STORES_XML, + new Properties()); + node = dstCluster.getNodeById(0); + dstBootStrapUrl = "tcp://" + node.getHost() + ":" + node.getSocketPort(); + + kvPairs = ServerTestUtils.createRandomKeyValueString(100); + int keyCount = 0; + for(String key: kvPairs.keySet()) { + if(keyCount == 0) + firstKey = key; + if(keyCount == kvPairs.size() - 1) + lastKey = key; + if(keyCount == kvPairs.size() / 2) + conflictKey = key; + keyCount++; + } + + srcAdminClient = new AdminClient(srcCluster, + new AdminClientConfig(), + new ClientConfig()); + + List storeDefs = new StoreDefinitionsMapper().readStoreList(new File(STORES_XML)); + + primaryResolvingStoreDef = StoreUtils.getStoreDef(storeDefs, + PRIMARY_RESOLVING_STORE_NAME); + globallyResolvingStoreDef = StoreUtils.getStoreDef(storeDefs, + GLOBALLY_RESOLVING_STORE_NAME); + + srcfactory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(srcBootStrapUrl) + .setSelectors(1) + .setRoutingTimeout(1000, + java.util.concurrent.TimeUnit.MILLISECONDS) + .setSocketTimeout(1000, + java.util.concurrent.TimeUnit.MILLISECONDS) + .setConnectionTimeout(1000, + java.util.concurrent.TimeUnit.MILLISECONDS) + .setMaxConnectionsPerNode(1)); + srcPrimaryResolvingStoreClient = srcfactory.getStoreClient(PRIMARY_RESOLVING_STORE_NAME); + srcGloballyResolvingStoreClient = srcfactory.getStoreClient(GLOBALLY_RESOLVING_STORE_NAME); + + dstfactory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(dstBootStrapUrl) + .setSelectors(1) + .setRoutingTimeout(1000, + java.util.concurrent.TimeUnit.MILLISECONDS) + .setSocketTimeout(1000, + java.util.concurrent.TimeUnit.MILLISECONDS) + .setConnectionTimeout(1000, + java.util.concurrent.TimeUnit.MILLISECONDS) + .setMaxConnectionsPerNode(1)); + dstPrimaryResolvingStoreClient = dstfactory.getStoreClient(PRIMARY_RESOLVING_STORE_NAME); + dstGloballyResolvingStoreClient = dstfactory.getStoreClient(GLOBALLY_RESOLVING_STORE_NAME); + + } catch(IOException e) { + e.printStackTrace(); + fail("Unexpected exception"); + } + } + + @Test + public void testPrimaryResolvingForkLift() throws Exception { + + StoreInstance srcStoreInstance = new StoreInstance(srcCluster, primaryResolvingStoreDef); + + // populate data on the source cluster.. + for(Map.Entry entry: kvPairs.entrySet()) { + srcPrimaryResolvingStoreClient.put(entry.getKey(), entry.getValue()); + } + + // generate a conflict on the master partition + int masterNode = srcStoreInstance.getNodeIdForPartitionId(srcStoreInstance.getMasterPartitionId(conflictKey.getBytes("UTF-8"))); + VectorClock losingClock = new VectorClock(Lists.newArrayList(new ClockEntry((short) 0, 5)), + System.currentTimeMillis()); + VectorClock winningClock = new VectorClock(Lists.newArrayList(new ClockEntry((short) 1, 5)), + losingClock.getTimestamp() + 1); + srcAdminClient.storeOps.putNodeKeyValue(PRIMARY_RESOLVING_STORE_NAME, + new NodeValue(masterNode, + new ByteArray(conflictKey.getBytes("UTF-8")), + new Versioned("losing value".getBytes("UTF-8"), + losingClock))); + srcAdminClient.storeOps.putNodeKeyValue(PRIMARY_RESOLVING_STORE_NAME, + new NodeValue(masterNode, + new ByteArray(conflictKey.getBytes("UTF-8")), + new Versioned("winning value".getBytes("UTF-8"), + winningClock))); + + // do a write to destination cluster + dstPrimaryResolvingStoreClient.put(firstKey, "before forklift"); + + // perform the forklifting.. + ClusterForkLiftTool forkLiftTool = new ClusterForkLiftTool(srcBootStrapUrl, + dstBootStrapUrl, + 10000, + 1, + 1000, + Lists.newArrayList(PRIMARY_RESOLVING_STORE_NAME), + null, + false); + forkLiftTool.run(); + + // do a write to destination cluster + dstPrimaryResolvingStoreClient.put(lastKey, "after forklift"); + + // verify data on the destination is as expected + for(Map.Entry entry: kvPairs.entrySet()) { + if(entry.getKey().equals(firstKey)) { + assertEquals("Online write overwritten", + dstPrimaryResolvingStoreClient.get(firstKey).getValue(), + "before forklift"); + } else if(entry.getKey().equals(lastKey)) { + assertEquals("Online write overwritten", + dstPrimaryResolvingStoreClient.get(lastKey).getValue(), + "after forklift"); + } else if(entry.getKey().equals(conflictKey)) { + assertEquals("Conflict resolution incorrect", + dstPrimaryResolvingStoreClient.get(conflictKey).getValue(), + "winning value"); + } else { + assertEquals("fork lift data missing", + dstPrimaryResolvingStoreClient.get(entry.getKey()).getValue(), + entry.getValue()); + } + } + } + + @Test + public void testGloballyResolvingForkLift() throws Exception { + + StoreInstance srcStoreInstance = new StoreInstance(srcCluster, globallyResolvingStoreDef); + + // populate data on the source cluster.. + for(Map.Entry entry: kvPairs.entrySet()) { + srcGloballyResolvingStoreClient.put(entry.getKey(), entry.getValue()); + } + + // generate a conflict on the primary and a secondary + List nodeList = srcStoreInstance.getReplicationNodeList(srcStoreInstance.getMasterPartitionId(conflictKey.getBytes("UTF-8"))); + VectorClock losingClock = new VectorClock(Lists.newArrayList(new ClockEntry((short) 0, 5)), + System.currentTimeMillis()); + VectorClock winningClock = new VectorClock(Lists.newArrayList(new ClockEntry((short) 1, 5)), + losingClock.getTimestamp() + 1); + srcAdminClient.storeOps.putNodeKeyValue(GLOBALLY_RESOLVING_STORE_NAME, + new NodeValue(nodeList.get(0), + new ByteArray(conflictKey.getBytes("UTF-8")), + new Versioned("losing value".getBytes("UTF-8"), + losingClock))); + srcAdminClient.storeOps.putNodeKeyValue(GLOBALLY_RESOLVING_STORE_NAME, + new NodeValue(nodeList.get(1), + new ByteArray(conflictKey.getBytes("UTF-8")), + new Versioned("winning value".getBytes("UTF-8"), + winningClock))); + + // do a write to destination cluster + dstGloballyResolvingStoreClient.put(firstKey, "before forklift"); + + // perform the forklifting.. + ClusterForkLiftTool forkLiftTool = new ClusterForkLiftTool(srcBootStrapUrl, + dstBootStrapUrl, + 10000, + 1, + 1000, + Lists.newArrayList(GLOBALLY_RESOLVING_STORE_NAME), + null, + true); + forkLiftTool.run(); + + // do a write to destination cluster + dstGloballyResolvingStoreClient.put(lastKey, "after forklift"); + + // verify data on the destination is as expected + for(Map.Entry entry: kvPairs.entrySet()) { + if(entry.getKey().equals(firstKey)) { + assertEquals("Online write overwritten", + dstGloballyResolvingStoreClient.get(firstKey).getValue(), + "before forklift"); + } else if(entry.getKey().equals(lastKey)) { + assertEquals("Online write overwritten", + dstGloballyResolvingStoreClient.get(lastKey).getValue(), + "after forklift"); + } else if(entry.getKey().equals(conflictKey)) { + assertEquals("Conflict resolution incorrect", + dstGloballyResolvingStoreClient.get(conflictKey).getValue(), + "winning value"); + } else { + assertEquals("fork lift data missing", + dstGloballyResolvingStoreClient.get(entry.getKey()).getValue(), + entry.getValue()); + } + } + } + + @After + public void tearDownClusters() { + + srcAdminClient.close(); + + srcfactory.close(); + dstfactory.close(); + + for(VoldemortServer server: srcServers) + server.stop(); + for(VoldemortServer server: dstServers) + server.stop(); + } +} From 74bdfde547e7916b92c3674d695bdfb8d2a1cd51 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Thu, 11 Apr 2013 12:40:30 -0700 Subject: [PATCH 06/10] Clarifying arbitrary choice to return BEFORE for equal vector clocks. --- src/java/voldemort/versioning/VectorClock.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/java/voldemort/versioning/VectorClock.java b/src/java/voldemort/versioning/VectorClock.java index de38a009ea..50ab8699ab 100644 --- a/src/java/voldemort/versioning/VectorClock.java +++ b/src/java/voldemort/versioning/VectorClock.java @@ -325,7 +325,11 @@ else if(ver2.getVersion() > ver1.getVersion()) else if(p2 < v2.versions.size()) v2Bigger = true; - /* This is the case where they are equal, return BEFORE arbitrarily */ + /* + * This is the case where they are equal. Consciously return BEFORE, so + * that the we would throw back an ObsoleteVersionException for online + * writes with the same clock. + */ if(!v1Bigger && !v2Bigger) return Occurred.BEFORE; /* This is the case where v1 is a successor clock to v2 */ From b8e6f784abe0d13930196f5189e848d445f5c3d5 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Fri, 12 Apr 2013 16:49:39 -0700 Subject: [PATCH 07/10] Forklift tool fix to equally spread fetches --- src/java/voldemort/utils/ClusterForkLiftTool.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/java/voldemort/utils/ClusterForkLiftTool.java b/src/java/voldemort/utils/ClusterForkLiftTool.java index 67a7e42538..5f98e36aae 100644 --- a/src/java/voldemort/utils/ClusterForkLiftTool.java +++ b/src/java/voldemort/utils/ClusterForkLiftTool.java @@ -1,6 +1,7 @@ package voldemort.utils; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -160,6 +161,9 @@ public ClusterForkLiftTool(String srcBootstrapUrl, .getNumberOfPartitions()); for(Node node: srcAdminClient.getAdminClientCluster().getNodes()) this.partitionList.addAll(node.getPartitionIds()); + // shuffle the partition list so the fetching will equally spread + // across the source cluster + Collections.shuffle(this.partitionList); if(this.partitionList.size() > srcAdminClient.getAdminClientCluster() .getNumberOfPartitions()) { throw new VoldemortException("Incorrect partition mapping in source cluster"); @@ -557,7 +561,7 @@ public static void main(String[] args) throws Exception { maxPutsPerSecond = (Integer) options.valueOf("max-puts-per-second"); List storesList = null; if(options.has("stores")) { - storesList = (List) options.valuesOf("stores"); + storesList = new ArrayList((List) options.valuesOf("stores")); } List partitions = null; if(options.has("partitions")) { From 18c2931aff0ea65cc582d9df3bbd3e5a6768a45e Mon Sep 17 00:00:00 2001 From: Chinmay Soman Date: Thu, 11 Apr 2013 18:20:57 -0700 Subject: [PATCH 08/10] Adding another Hinted handoff failure test to ensure main thread returns with failure when all replicas dont respond --- .../routed/HintedHandoffFailureTest.java | 252 +++++++++++++----- 1 file changed, 187 insertions(+), 65 deletions(-) diff --git a/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java b/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java index bc3458e533..28629b6f77 100644 --- a/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java +++ b/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java @@ -100,7 +100,6 @@ public class HintedHandoffFailureTest { private static int R_WRITES = 1; private static final int NUM_THREADS = 3; private static final int NUM_NODES_TOTAL = 3; - private static int FAILED_NODE_ID = 0; private final String STORE_NAME = "test"; private Cluster cluster; @@ -111,8 +110,8 @@ public class HintedHandoffFailureTest { private RoutedStore store; private RoutingStrategy strategy; - private final static long routingTimeoutInMs = 1000; - private final static long sleepBeforeFailingInMs = 2000; + private static long routingTimeoutInMs = 1000; + private static long sleepBeforeFailingInMs = 2000; private static long delayBeforeHintedHandoff = 3000; private final Map> subStores = new ConcurrentHashMap>(); @@ -121,6 +120,11 @@ public class HintedHandoffFailureTest { private final Logger logger = Logger.getLogger(getClass()); + private enum FAILURE_MODE { + FAIL_FIRST_REPLICA_NODE, + FAIL_ALL_REPLICAS + } + private StoreDefinition getStoreDef(String storeName, int replicationFactor, int preads, @@ -158,22 +162,37 @@ private void setFailureDetector(Map> s failureDetector = FailureDetectorUtils.create(failureDetectorConfig, false); } + /** + * A wrapper for the actual customSetup method with the default failure mode + * as FAIL_FIRST_REPLICA_NODE + * + * @param key The ByteArray representation of the key + * + * @throws Exception + */ + public List customSetup(ByteArray key) throws Exception { + return customSetup(key, FAILURE_MODE.FAIL_FIRST_REPLICA_NODE); + } + /** * Setup a cluster with 3 nodes, with the following characteristics: * - * - 1st replica node: Sleepy force failing store (will throw an exception - * after a delay) + * If FAILURE_MODE is FAIL_FIRST_REPLICA_NODE set the first replica store to + * a sleepy force failing store * - * - Pseudo master and other replicas: Standard In-memory store (wrapped by - * Logging store) + * If FAILURE_MODE is FAIL_ALL_REPLICAS: set all replicas to sleepy force + * failing store * - * - In memory slop stores + * Pseudo master : Standard In-memory store (wrapped by Logging store) + * + * In memory slop stores * * @param key The ByteArray representation of the key + * @param failureMode The Failure mode for the replicas * * @throws Exception */ - public void customSetup(ByteArray key) throws Exception { + public List customSetup(ByteArray key, FAILURE_MODE failureMode) throws Exception { cluster = getThreeNodeCluster(); storeDef = getStoreDef(STORE_NAME, @@ -196,14 +215,24 @@ public void customSetup(ByteArray key) throws Exception { failureStore); failureStore.setFail(true); - // Get the first replica node for the given key - // This will act as the sleepy failing node - Node failingNode = strategy.routeRequest(key.get()).get(1); - FAILED_NODE_ID = failingNode.getId(); + List failingNodeIdList = Lists.newArrayList(); + List replicaList = strategy.routeRequest(key.get()); + + switch(failureMode) { + case FAIL_FIRST_REPLICA_NODE: + failingNodeIdList.add(replicaList.get(1).getId()); + break; + + case FAIL_ALL_REPLICAS: + for(int nodeId = 1; nodeId < replicaList.size(); nodeId++) { + failingNodeIdList.add(nodeId); + } + break; + } subStores.clear(); for(int i = 0; i < NUM_NODES_TOTAL; i++) { - if(i == FAILED_NODE_ID) { + if(failingNodeIdList.contains(i)) { subStores.put(i, sleepyFailureStore); } else { subStores.put(i, loggingStore); @@ -262,6 +291,7 @@ public void customSetup(ByteArray key) throws Exception { cluster, storeDef, failureDetector); + return failingNodeIdList; } @After @@ -282,15 +312,15 @@ public void tearDown() throws Exception { * @param failedKeys Set of keys that the put should've failed for * @return Set of slop keys based on the failed keys and the FAILED_NODE_ID */ - private Set makeSlopKeys(Set failedKeys) { + private Set makeSlopKeys(ByteArray failedKey, List failingNodeIdList) { Set slopKeys = Sets.newHashSet(); - for(ByteArray failedKey: failedKeys) { + for(int failingNodeId: failingNodeIdList) { byte[] opCode = new byte[] { Slop.Operation.PUT.getOpCode() }; byte[] spacer = new byte[] { (byte) 0 }; byte[] storeNameBytes = ByteUtils.getBytes(STORE_NAME, "UTF-8"); byte[] nodeIdBytes = new byte[ByteUtils.SIZE_OF_INT]; - ByteUtils.writeInt(nodeIdBytes, FAILED_NODE_ID, 0); + ByteUtils.writeInt(nodeIdBytes, failingNodeId, 0); ByteArray slopKey = new ByteArray(ByteUtils.cat(opCode, spacer, storeNameBytes, @@ -303,6 +333,25 @@ private Set makeSlopKeys(Set failedKeys) { return slopKeys; } + /** + * A function to fetch all the registered slops + * + * @param slopKeys Keys for the registered slops in the slop store + * @return Set of all the registered Slops + */ + public Set getAllSlops(Iterable slopKeys) { + Set registeredSlops = Sets.newHashSet(); + for(Store slopStore: slopStores.values()) { + Map>> res = slopStore.getAll(slopKeys, null); + for(Map.Entry>> entry: res.entrySet()) { + Slop slop = entry.getValue().get(0).getValue(); + registeredSlops.add(slop); + logger.info(slop); + } + } + return registeredSlops; + } + /** * Test to ensure that when an asynchronous put completes (with a failure) * after PerformParallelPut has finished processing the responses and before @@ -318,6 +367,7 @@ public void testSlopOnDelayedFailingAsyncPut_2_1_1() { String val = "xyz"; Versioned versionedVal = new Versioned(val.getBytes()); ByteArray keyByteArray = new ByteArray(key.getBytes()); + List failingNodeIdList = null; // Set the correct replication config REPLICATION_FACTOR = 2; @@ -327,8 +377,9 @@ public void testSlopOnDelayedFailingAsyncPut_2_1_1() { R_WRITES = 1; try { - customSetup(keyByteArray); + failingNodeIdList = customSetup(keyByteArray); } catch(Exception e) { + logger.info(e.getMessage()); fail("Error in setup."); } @@ -337,17 +388,8 @@ public void testSlopOnDelayedFailingAsyncPut_2_1_1() { // Check the slop stores Set failedKeys = Sets.newHashSet(); failedKeys.add(keyByteArray); - Set slopKeys = makeSlopKeys(failedKeys); - - Set registeredSlops = Sets.newHashSet(); - for(Store slopStore: slopStores.values()) { - Map>> res = slopStore.getAll(slopKeys, null); - for(Map.Entry>> entry: res.entrySet()) { - Slop slop = entry.getValue().get(0).getValue(); - registeredSlops.add(slop); - logger.info(slop); - } - } + Set slopKeys = makeSlopKeys(keyByteArray, failingNodeIdList); + Set registeredSlops = getAllSlops(slopKeys); if(registeredSlops.size() == 0) { fail("Should have seen some slops. But could not find any."); @@ -371,6 +413,7 @@ public void testSlopOnDelayedFailingAsyncPut_3_2_2() { String val = "xyz"; Versioned versionedVal = new Versioned(val.getBytes()); ByteArray keyByteArray = new ByteArray(key.getBytes()); + List failingNodeIdList = null; // Set the correct replication config REPLICATION_FACTOR = 3; @@ -380,8 +423,9 @@ public void testSlopOnDelayedFailingAsyncPut_3_2_2() { R_WRITES = 2; try { - customSetup(keyByteArray); + failingNodeIdList = customSetup(keyByteArray); } catch(Exception e) { + logger.info(e.getMessage()); fail("Error in setup."); } @@ -390,17 +434,8 @@ public void testSlopOnDelayedFailingAsyncPut_3_2_2() { // Check the slop stores Set failedKeys = Sets.newHashSet(); failedKeys.add(keyByteArray); - Set slopKeys = makeSlopKeys(failedKeys); - - Set registeredSlops = Sets.newHashSet(); - for(Store slopStore: slopStores.values()) { - Map>> res = slopStore.getAll(slopKeys, null); - for(Map.Entry>> entry: res.entrySet()) { - Slop slop = entry.getValue().get(0).getValue(); - registeredSlops.add(slop); - logger.info(slop); - } - } + Set slopKeys = makeSlopKeys(keyByteArray, failingNodeIdList); + Set registeredSlops = getAllSlops(slopKeys); if(registeredSlops.size() == 0) { fail("Should have seen some slops. But could not find any."); @@ -423,6 +458,7 @@ public void testSlopViaSerialHint_2_1_1() { String val = "xyz"; Versioned versionedVal = new Versioned(val.getBytes()); ByteArray keyByteArray = new ByteArray(key.getBytes()); + List failingNodeIdList = null; // Set the correct replication config REPLICATION_FACTOR = 2; @@ -432,8 +468,9 @@ public void testSlopViaSerialHint_2_1_1() { R_WRITES = 1; try { - customSetup(keyByteArray); + failingNodeIdList = customSetup(keyByteArray); } catch(Exception e) { + logger.info(e.getMessage()); fail("Error in setup."); } @@ -453,17 +490,8 @@ public void testSlopViaSerialHint_2_1_1() { // Check the slop stores Set failedKeys = Sets.newHashSet(); failedKeys.add(keyByteArray); - Set slopKeys = makeSlopKeys(failedKeys); - - Set registeredSlops = Sets.newHashSet(); - for(Store slopStore: slopStores.values()) { - Map>> res = slopStore.getAll(slopKeys, null); - for(Map.Entry>> entry: res.entrySet()) { - Slop slop = entry.getValue().get(0).getValue(); - registeredSlops.add(slop); - logger.info(slop); - } - } + Set slopKeys = makeSlopKeys(keyByteArray, failingNodeIdList); + Set registeredSlops = getAllSlops(slopKeys); if(registeredSlops.size() == 0) { fail("Should have seen some slops. But could not find any."); @@ -486,6 +514,7 @@ public void testSlopViaSerialHint_3_2_2() { String val = "xyz"; Versioned versionedVal = new Versioned(val.getBytes()); ByteArray keyByteArray = new ByteArray(key.getBytes()); + List failingNodeIdList = null; // Set the correct replication config REPLICATION_FACTOR = 3; @@ -495,8 +524,9 @@ public void testSlopViaSerialHint_3_2_2() { R_WRITES = 2; try { - customSetup(keyByteArray); + failingNodeIdList = customSetup(keyByteArray); } catch(Exception e) { + logger.info(e.getMessage()); fail("Error in setup."); } @@ -516,17 +546,8 @@ public void testSlopViaSerialHint_3_2_2() { // Check the slop stores Set failedKeys = Sets.newHashSet(); failedKeys.add(keyByteArray); - Set slopKeys = makeSlopKeys(failedKeys); - - Set registeredSlops = Sets.newHashSet(); - for(Store slopStore: slopStores.values()) { - Map>> res = slopStore.getAll(slopKeys, null); - for(Map.Entry>> entry: res.entrySet()) { - Slop slop = entry.getValue().get(0).getValue(); - registeredSlops.add(slop); - logger.info(slop); - } - } + Set slopKeys = makeSlopKeys(keyByteArray, failingNodeIdList); + Set registeredSlops = getAllSlops(slopKeys); if(registeredSlops.size() == 0) { fail("Should have seen some slops. But could not find any."); @@ -535,6 +556,107 @@ public void testSlopViaSerialHint_3_2_2() { } } + /** + * Test to do a put with a 3-2-2 config such that both the replica nodes do + * not respond at all. This test is to make sure that the main thread + * returns with an error and that no slops are registered. + */ + @Test + public void testNoSlopsOnAllReplicaFailures() { + + String key = "a"; + String val = "xyz"; + final Versioned versionedVal = new Versioned(val.getBytes()); + final ByteArray keyByteArray = new ByteArray(key.getBytes()); + List failingNodeIdList = null; + + // Set the correct replication config + REPLICATION_FACTOR = 3; + R_READS = 2; + R_WRITES = 2; + + // Large sleep time for the replica nodes + sleepBeforeFailingInMs = 10000; + + // 0 artificial delay for the put pipeline + delayBeforeHintedHandoff = 0; + + try { + failingNodeIdList = customSetup(keyByteArray, FAILURE_MODE.FAIL_ALL_REPLICAS); + } catch(Exception e) { + logger.info(e.getMessage()); + fail("Error in setup."); + } + + PerformAsyncPut asyncPutThread = new PerformAsyncPut(this.store, keyByteArray, versionedVal); + Executors.newFixedThreadPool(1).submit(asyncPutThread); + + // Sleep for the routing timeout with some headroom + try { + logger.info("Sleeping for " + (routingTimeoutInMs + 2000) / 1000 + + " seconds to wait for the put to finish"); + Thread.sleep(routingTimeoutInMs + 2000); + + if(!asyncPutThread.isDone) { + fail("The main thread for put did not finish."); + } + } catch(Exception e) { + fail("Unknown error while doing a put: " + e); + } + + // Check the slop stores + Set failedKeys = Sets.newHashSet(); + failedKeys.add(keyByteArray); + Set slopKeys = makeSlopKeys(keyByteArray, failingNodeIdList); + Set registeredSlops = getAllSlops(slopKeys); + + if(registeredSlops.size() != 0) { + fail("Should not have seen any slops."); + } + } + + /** + * A runnable class to do a Voldemort Put operation. This becomes important + * in the scenario that the put operation might hang / deadlock. + * + */ + private class PerformAsyncPut implements Runnable { + + private Versioned versionedVal = null; + private ByteArray keyByteArray = null; + private RoutedStore asyncPutStore = null; + private boolean isDone = false; + + public PerformAsyncPut(RoutedStore asyncPutStore, + ByteArray keyByteArray, + Versioned versionedVal) { + this.asyncPutStore = asyncPutStore; + this.keyByteArray = keyByteArray; + this.versionedVal = versionedVal; + } + + @Override + public void run() { + try { + asyncPutStore.put(keyByteArray, versionedVal, null); + fail("A put with required writes 2 should've failed for this setup"); + } catch(Exception ve) { + // This is expected. Nothing to do. + logger.info("Error occured as expected : " + ve.getMessage()); + } + markAsDone(true); + } + + @SuppressWarnings("unused") + public boolean isDone() { + return isDone; + } + + public void markAsDone(boolean isDone) { + this.isDone = isDone; + } + } + /** * An action within a pipeline which sleeps for the specified time duration. * From 4b1557b5255a6ca3e93f25075a5fcf42f6bcb077 Mon Sep 17 00:00:00 2001 From: Chinmay Soman Date: Mon, 15 Apr 2013 11:33:41 -0700 Subject: [PATCH 09/10] Added bigger timeout to testNoSlopsOnAllReplicaFailures test and doing better exception handling in HintedHandoffFailureTest --- .../voldemort/store/routed/HintedHandoffFailureTest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java b/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java index 28629b6f77..581d71b4b7 100644 --- a/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java +++ b/test/unit/voldemort/store/routed/HintedHandoffFailureTest.java @@ -593,9 +593,8 @@ public void testNoSlopsOnAllReplicaFailures() { // Sleep for the routing timeout with some headroom try { - logger.info("Sleeping for " + (routingTimeoutInMs + 2000) / 1000 - + " seconds to wait for the put to finish"); - Thread.sleep(routingTimeoutInMs + 2000); + logger.info("Sleeping for 5 seconds to wait for the put to finish"); + Thread.sleep(5000); if(!asyncPutThread.isDone) { fail("The main thread for put did not finish."); @@ -640,9 +639,11 @@ public void run() { try { asyncPutStore.put(keyByteArray, versionedVal, null); fail("A put with required writes 2 should've failed for this setup"); - } catch(Exception ve) { + } catch(VoldemortException ve) { // This is expected. Nothing to do. logger.info("Error occured as expected : " + ve.getMessage()); + } catch(Exception e) { + fail("Should've got a Voldemort exception. But got this: " + e); } markAsDone(true); } From db6ac447895255c84ef0e0cbd0303ffa6b45e05b Mon Sep 17 00:00:00 2001 From: Chinmay Soman Date: Mon, 15 Apr 2013 13:41:59 -0700 Subject: [PATCH 10/10] Added Null pointer check in teardown of ReadOnlyStorageEngineTest --- .../readonly/ReadOnlyStorageEngineTest.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/test/unit/voldemort/store/readonly/ReadOnlyStorageEngineTest.java b/test/unit/voldemort/store/readonly/ReadOnlyStorageEngineTest.java index 3174015c50..e0eee3051f 100644 --- a/test/unit/voldemort/store/readonly/ReadOnlyStorageEngineTest.java +++ b/test/unit/voldemort/store/readonly/ReadOnlyStorageEngineTest.java @@ -117,7 +117,9 @@ public ReadOnlyStorageEngineTest(SearchStrategy strategy, ReadOnlyStorageFormat @After public void tearDown() { - Utils.rm(dir); + if(dir != null) { + Utils.rm(dir); + } } /** @@ -613,10 +615,12 @@ private void createStoreFiles(File dir, int indexBytes, int dataBytes, Node node case READONLY_V1: { for(Integer partitionId: node.getPartitionIds()) { for(int chunkId = 0; chunkId < numChunks; chunkId++) { - File index = createFile(dir, Integer.toString(partitionId) + "_" - + Integer.toString(chunkId) + ".index"); - File data = createFile(dir, Integer.toString(partitionId) + "_" - + Integer.toString(chunkId) + ".data"); + File index = createFile(dir, + Integer.toString(partitionId) + "_" + + Integer.toString(chunkId) + ".index"); + File data = createFile(dir, + Integer.toString(partitionId) + "_" + + Integer.toString(chunkId) + ".data"); // write some random crap for index and data FileOutputStream dataOs = new FileOutputStream(data); for(int i = 0; i < dataBytes; i++) @@ -637,8 +641,9 @@ private void createStoreFiles(File dir, int indexBytes, int dataBytes, Node node for(int chunkId = 0; chunkId < numChunks; chunkId++) { File index = createFile(dir, Integer.toString(partitionId) + "_0_" + Integer.toString(chunkId) + ".index"); - File data = createFile(dir, Integer.toString(partitionId) + "_0_" - + Integer.toString(chunkId) + ".data"); + File data = createFile(dir, + Integer.toString(partitionId) + "_0_" + + Integer.toString(chunkId) + ".data"); // write some random crap for index and data FileOutputStream dataOs = new FileOutputStream(data); for(int i = 0; i < dataBytes; i++)