From 44f1737e99e5c6e650d94e98c00d3d1e4c211663 Mon Sep 17 00:00:00 2001 From: Jay J Wylie Date: Thu, 7 Feb 2013 18:24:22 -0800 Subject: [PATCH] Complete implementation of consistency fixer. src/java/voldemort/utils/ConsistencyFix.java - added Execute method that orchestrates all the threads - switched pattern of thread execution: - one thread for reading bad keys file that submits to ... - a thread pool of workers that enqueues badkeys for the ... - one thread writing still bad keys - note: construct thread pool with a blocking queue - switched to logging (rather than System.out/err) - added Stats tracking - moved methods that do complicated work (getting & repairing keys) out src/java/voldemort/utils/ConsistencyFixWorker.java - moved methods that do complicated work (getting & repairing keys) in src/java/voldemort/utils/ConsistencyFixCLI.java - added 'progress-bar' option - got rid of 'verbose' option - moved all thread orchestration to ConsistencyFix --- src/java/voldemort/utils/ConsistencyFix.java | 498 ++++++------------ .../voldemort/utils/ConsistencyFixCLI.java | 103 +--- .../utils/ConsistencyFixKeyGetter.java | 96 ---- .../utils/ConsistencyFixRepairPutter.java | 49 -- .../voldemort/utils/ConsistencyFixWorker.java | 286 ++++++++++ 5 files changed, 467 insertions(+), 565 deletions(-) delete mode 100644 src/java/voldemort/utils/ConsistencyFixKeyGetter.java delete mode 100644 src/java/voldemort/utils/ConsistencyFixRepairPutter.java create mode 100644 src/java/voldemort/utils/ConsistencyFixWorker.java diff --git a/src/java/voldemort/utils/ConsistencyFix.java b/src/java/voldemort/utils/ConsistencyFix.java index 0a00bb82ab..ef25c53ddd 100644 --- a/src/java/voldemort/utils/ConsistencyFix.java +++ b/src/java/voldemort/utils/ConsistencyFix.java @@ -21,50 +21,66 @@ import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; -import java.util.HashMap; +import java.text.DecimalFormat; import java.util.List; -import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; -import voldemort.VoldemortException; import voldemort.client.protocol.admin.AdminClient; import voldemort.client.protocol.admin.AdminClientConfig; -import voldemort.client.protocol.admin.QueryKeyResult; import voldemort.cluster.Cluster; import voldemort.store.StoreDefinition; -import voldemort.store.routed.NodeValue; -import voldemort.store.routed.ReadRepairer; -import voldemort.versioning.ObsoleteVersionException; -import voldemort.versioning.VectorClock; import voldemort.versioning.Versioned; -import com.google.common.collect.Lists; - public class ConsistencyFix { + private static final Logger logger = Logger.getLogger(ConsistencyFix.class); + private final String storeName; private final AdminClient adminClient; private final StoreInstance storeInstance; + private final Stats stats; - ConsistencyFix(String url, String storeName) { + ConsistencyFix(String url, String storeName, long progressBar) { this.storeName = storeName; - System.out.println("Connecting to bootstrap server: " + url); + logger.info("Connecting to bootstrap server: " + url); this.adminClient = new AdminClient(url, new AdminClientConfig(), 0); Cluster cluster = adminClient.getAdminClientCluster(); - System.out.println("Cluster determined to be: " + cluster.getName()); + logger.info("Cluster determined to be: " + cluster.getName()); - System.out.println("Determining store definition for store: " + storeName); Versioned> storeDefinitions = adminClient.metadataMgmtOps.getRemoteStoreDefList(0); List storeDefs = storeDefinitions.getValue(); StoreDefinition storeDefinition = StoreDefinitionUtils.getStoreDefinitionWithName(storeDefs, storeName); - System.out.println("Store definition determined."); + logger.info("Store definition for store " + storeName + " has been determined."); storeInstance = new StoreInstance(cluster, storeDefinition); + + stats = new Stats(progressBar); } - public void stop() { - adminClient.stop(); + public String getStoreName() { + return storeName; + } + + public StoreInstance getStoreInstance() { + return storeInstance; + } + + public AdminClient getAdminClient() { + return adminClient; + } + + public Stats getStats() { + return stats; } /** @@ -88,38 +104,63 @@ public String toString() { } } - /** - * Type with which to wrap "bad keys" read from input file. Has a "poison" - * value to effectively signal EOF. - */ - public class BadKeyInput { - - private final String keyInHexFormat; - private final boolean poison; - - /** - * Common case constructor. - */ - BadKeyInput(String keyInHexFormat) { - this.keyInHexFormat = keyInHexFormat; - this.poison = false; - } - - /** - * Constructs a "poison" object. - */ - BadKeyInput() { - this.keyInHexFormat = null; - this.poison = true; - } - - boolean isPoison() { - return poison; - } + public String execute(int parallelism, String badKeyFileIn, String badKeyFileOut) { + ExecutorService badKeyReaderService; + ExecutorService badKeyWriterService; + ExecutorService consistencyFixWorkers; + + // Create BadKeyWriter thread + BlockingQueue badKeyQOut = new ArrayBlockingQueue(parallelism * 10); + badKeyWriterService = Executors.newSingleThreadExecutor(); + badKeyWriterService.submit(new BadKeyWriter(badKeyFileOut, badKeyQOut)); + logger.info("Created badKeyWriter."); + + // Create ConsistencyFixWorker thread pool + BlockingQueue blockingQ = new ArrayBlockingQueue(parallelism); + RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); + consistencyFixWorkers = new ThreadPoolExecutor(parallelism, + parallelism, + 0L, + TimeUnit.MILLISECONDS, + blockingQ, + rejectedExecutionHandler); + logger.info("Created ConsistencyFixWorker pool."); + + // Create BadKeyReader thread + CountDownLatch allBadKeysReadLatch = new CountDownLatch(1); + badKeyReaderService = Executors.newSingleThreadExecutor(); + badKeyReaderService.submit(new BadKeyReader(allBadKeysReadLatch, + badKeyFileIn, + this, + consistencyFixWorkers, + badKeyQOut)); + logger.info("Created badKeyReader."); - String getKey() { - return keyInHexFormat; + try { + allBadKeysReadLatch.await(); + + badKeyReaderService.shutdown(); + badKeyReaderService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + logger.info("Bad key reader service has shutdown."); + + consistencyFixWorkers.shutdown(); + consistencyFixWorkers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + logger.info("All workers have shutdown."); + + // Poison the bad key writer to have it exit. + badKeyQOut.put(new BadKeyResult()); + badKeyWriterService.shutdown(); + badKeyWriterService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + logger.info("Bad key writer service has shutdown."); + } catch(InterruptedException e) { + logger.warn("InterruptedException caught."); + if(logger.isDebugEnabled()) { + e.printStackTrace(); + } + } finally { + adminClient.stop(); } + return stats.summary(); } /** @@ -164,19 +205,29 @@ public Status getResult() { } } - // TODO: Should either move BadKeyReader and BadKeyWriter thread definitions - // out of this file (as has been done with ConsistencyFixKeyGetter and - // ConsistencyFixRepairPutter), or move those thread definitions (back) into - // this file. - class BadKeyReader implements Runnable { + public class BadKeyReader implements Runnable { + private final CountDownLatch latch; private final String badKeyFileIn; - private final BlockingQueue badKeyQIn; + + private final ConsistencyFix consistencyFix; + private final ExecutorService consistencyFixWorkers; + private final BlockingQueue badKeyQOut; + private BufferedReader fileReader; - BadKeyReader(String badKeyFileIn, BlockingQueue badKeyQIn) { + BadKeyReader(CountDownLatch latch, + String badKeyFileIn, + ConsistencyFix consistencyFix, + ExecutorService consistencyFixWorkers, + BlockingQueue badKeyQOut) { + this.latch = latch; this.badKeyFileIn = badKeyFileIn; - this.badKeyQIn = badKeyQIn; + + this.consistencyFix = consistencyFix; + this.consistencyFixWorkers = consistencyFixWorkers; + this.badKeyQOut = badKeyQOut; + try { fileReader = new BufferedReader(new FileReader(badKeyFileIn)); } catch(IOException e) { @@ -191,31 +242,28 @@ public void run() { for(String line = fileReader.readLine(); line != null; line = fileReader.readLine()) { if(!line.isEmpty()) { counter++; - System.out.println("BadKeyReader read line: key (" + line - + ") and counter (" + counter + ")"); - badKeyQIn.put(new BadKeyInput(line)); + logger.debug("BadKeyReader read line: key (" + line + ") and counter (" + + counter + ")"); + consistencyFixWorkers.submit(new ConsistencyFixWorker(line, + consistencyFix, + badKeyQOut)); } } - System.out.println("BadKeyReader poisoning the pipeline"); - badKeyQIn.put(new BadKeyInput()); } catch(IOException ioe) { - System.err.println("IO exception reading badKeyFile " + badKeyFileIn + " : " - + ioe.getMessage()); - } catch(InterruptedException ie) { - System.err.println("Interrupted exception during reading of badKeyFile " - + badKeyFileIn + " : " + ie.getMessage()); + logger.warn("IO exception reading badKeyFile " + badKeyFileIn + " : " + + ioe.getMessage()); } finally { + latch.countDown(); try { fileReader.close(); } catch(IOException ioe) { - System.err.println("IOException during fileReader.close in BadKeyReader thread."); + logger.warn("IOException during fileReader.close in BadKeyReader thread."); } } - System.out.println("BadKeyReader is done."); } } - class BadKeyWriter implements Runnable { + public class BadKeyWriter implements Runnable { private final String badKeyFileOut; private final BlockingQueue badKeyQOut; @@ -238,304 +286,80 @@ public void run() { try { BadKeyResult badKeyResult = badKeyQOut.take(); while(!badKeyResult.isPoison()) { - System.out.println("BadKeyWriter write key (" + badKeyResult.keyInHexFormat); + logger.debug("BadKeyWriter write key (" + badKeyResult.keyInHexFormat + ")"); fileWriter.write("BADKEY," + badKeyResult.keyInHexFormat + "," + badKeyResult.fixKeyResult.name() + "\n"); badKeyResult = badKeyQOut.take(); } } catch(IOException ioe) { - System.err.println("IO exception reading badKeyFile " + badKeyFileOut + " : " - + ioe.getMessage()); + logger.warn("IO exception reading badKeyFile " + badKeyFileOut + " : " + + ioe.getMessage()); } catch(InterruptedException ie) { - System.err.println("Interrupted exception during writing of badKeyFile " - + badKeyFileOut + " : " + ie.getMessage()); + logger.warn("Interrupted exception during writing of badKeyFile " + badKeyFileOut + + " : " + ie.getMessage()); } finally { try { fileWriter.close(); } catch(IOException ioe) { - System.err.println("Interrupted exception during fileWriter.close:" - + ioe.getMessage()); + logger.warn("Interrupted exception during fileWriter.close:" + ioe.getMessage()); } } } } - // TODO: Make a type to handle Status + nodeIdToKeyValues so that - // nodeIdToKeyValues is not an "out" parameter. - /** - * - * @param nodeIdList - * @param keyInBytes - * @param keyInHexFormat - * @param verbose - * @param nodeIdToKeyValues Effectively the output of this method. Must pass - * in a non-null object to be populated by this method. - * @return - */ - private ConsistencyFix.Status doRead(final List nodeIdList, - final byte[] keyInBytes, - final String keyInHexFormat, - boolean verbose, - Map nodeIdToKeyValues) { - if(nodeIdToKeyValues == null) { - if(verbose) { - System.out.println("Aborting doRead due to bad init."); - } - return Status.BAD_INIT; - } + public class Stats { - if(verbose) { - System.out.println("Reading key-values for specified key: " + keyInHexFormat); - } - ByteArray key = new ByteArray(keyInBytes); - for(int nodeId: nodeIdList) { - List> values = null; - try { - values = adminClient.storeOps.getNodeKey(storeName, nodeId, key); - nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, values)); - } catch(VoldemortException ve) { - nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, ve)); - } - } + final long progressBar; + long count; + long failures; + long lastTimeMs; + final long startTimeMs; - return Status.SUCCESS; - } - - // TODO: Make a type to handle Status + nodeValues so that nodeValue is not - // an "out" parameter. - /** - * - * @param nodeIdList - * @param keyAsByteArray - * @param keyInHexFormat - * @param verbose - * @param nodeIdToKeyValues - * @param nodeValues Effectively the output of this method. Must pass in a - * non-null object to be populated by this method. - * @return - */ - private ConsistencyFix.Status processReadReplies(final List nodeIdList, - final ByteArray keyAsByteArray, - final String keyInHexFormat, - boolean verbose, - final Map nodeIdToKeyValues, - List> nodeValues) { - if(nodeValues == null) { - if(verbose) { - System.out.println("Aborting processReadReplies due to bad init."); - } - return Status.BAD_INIT; + Stats(long progressBar) { + this.progressBar = progressBar; + this.count = 0; + this.failures = 0; + this.lastTimeMs = System.currentTimeMillis(); + this.startTimeMs = lastTimeMs; } - if(verbose) { - System.out.println("Confirming all nodes (" + nodeIdList - + ") responded with key-values for specified key: " + keyInHexFormat); + private synchronized String getPrettyQPS(long count, long ms) { + long periodS = TimeUnit.MILLISECONDS.toSeconds(ms); + double qps = (count * 1.0 / periodS); + DecimalFormat df = new DecimalFormat("0.##"); + return df.format(qps); } - boolean exceptionsEncountered = false; - for(int nodeId: nodeIdList) { - if(verbose) { - System.out.println("\t Processing response from node with id:" + nodeId); - } - QueryKeyResult keyValue; - if(nodeIdToKeyValues.containsKey(nodeId)) { - if(verbose) { - System.out.println("\t... There was a key-value returned from node with id:" - + nodeId); - } - keyValue = nodeIdToKeyValues.get(nodeId); - if(keyValue.hasException()) { - if(verbose) { - System.out.println("\t... Exception encountered while fetching key " - + keyInHexFormat + " from node with nodeId " + nodeId - + " : " + keyValue.getException().getMessage()); - } - exceptionsEncountered = true; - } else { - if(keyValue.getValues().isEmpty()) { - if(verbose) { - System.out.println("\t... Adding null version to nodeValues"); - } - Versioned versioned = new Versioned(null); - nodeValues.add(new NodeValue(nodeId, - keyValue.getKey(), - versioned)); - - } else { - for(Versioned value: keyValue.getValues()) { - if(verbose) { - System.out.println("\t... Adding following version to nodeValues: " - + value.getVersion()); - } - nodeValues.add(new NodeValue(nodeId, - keyValue.getKey(), - value)); - } - } - } - } else { - if(verbose) { - System.out.println("\t... No key-value returned from node with id:" + nodeId); - System.out.println("\t... Adding null version to nodeValues"); - } - Versioned versioned = new Versioned(null); - nodeValues.add(new NodeValue(nodeId, keyAsByteArray, versioned)); - } - } - if(exceptionsEncountered) { - if(verbose) { - System.out.println("Aborting fixKey because exceptions were encountered when fetching key-values."); + public synchronized void incrementCount() { + count++; + if(count % progressBar == 0) { + long nowTimeMs = System.currentTimeMillis(); + logger.info("Bad keys attempted to be processed count = " + count + " (" + + getPrettyQPS(progressBar, lastTimeMs - nowTimeMs) + " keys/second)"); + lastTimeMs = nowTimeMs; } - return Status.FETCH_EXCEPTION; } - return Status.SUCCESS; - } - - /** - * Decide on the specific key-value to write everywhere. - * - * @param verbose - * @param nodeValues - * @return The subset of entries from nodeValues that need to be repaired. - */ - private List> resolveReadConflicts(boolean verbose, - final List> nodeValues) { - - // Some cut-paste-and-modify coding from - // store/routed/action/AbstractReadRepair.java and - // store/routed/ThreadPoolRoutedStore.java - if(verbose) { - System.out.println("Resolving conflicts in responses."); - } - ReadRepairer readRepairer = new ReadRepairer(); - List> toReadRepair = Lists.newArrayList(); - for(NodeValue v: readRepairer.getRepairs(nodeValues)) { - Versioned versioned = Versioned.value(v.getVersioned().getValue(), - ((VectorClock) v.getVersion()).clone()); - if(verbose) { - System.out.println("\tAdding toReadRepair: key (" + v.getKey() + "), version (" - + versioned.getVersion() + ")"); - } - toReadRepair.add(new NodeValue(v.getNodeId(), v.getKey(), versioned)); - } - - if(verbose) { - System.out.println("Repair work to be done:"); - for(NodeValue nodeKeyValue: toReadRepair) { - System.out.println("\tRepair key " + nodeKeyValue.getKey() + "on node with id " - + nodeKeyValue.getNodeId() + " for version " - + nodeKeyValue.getVersion()); + public synchronized void incrementFailures() { + failures++; + if(failures % progressBar == 0) { + logger.info("Bad key failed to process count = " + failures); } } - return toReadRepair; - } - public class doKeyGetStatus { - - public final Status status; - public final List> nodeValues; - - doKeyGetStatus(Status status, List> nodeValues) { - this.status = status; - this.nodeValues = nodeValues; - } + public synchronized String summary() { + StringBuilder summary = new StringBuilder(); + summary.append("\n\n"); + summary.append("Consistency Fix Summary\n"); + summary.append("-----------------------\n"); + summary.append("Total keys processed: " + count + "\n"); + summary.append("Total keys processed that were not corrected: " + failures + "\n"); + long nowTimeMs = System.currentTimeMillis(); - doKeyGetStatus(Status status) { - this.status = status; - this.nodeValues = null; - } - } - - public ConsistencyFix.doKeyGetStatus doKeyGet(String keyInHexFormat, boolean verbose) { - if(verbose) { - System.out.println("Performing consistency fix of key: " + keyInHexFormat); - } - - // Initialization. - byte[] keyInBytes; - List nodeIdList = null; - int masterPartitionId = -1; - try { - keyInBytes = ByteUtils.fromHexString(keyInHexFormat); - masterPartitionId = storeInstance.getMasterPartitionId(keyInBytes); - nodeIdList = storeInstance.getReplicationNodeList(masterPartitionId); - } catch(Exception exception) { - if(verbose) { - System.out.println("Aborting fixKey due to bad init."); - exception.printStackTrace(); - } - return new doKeyGetStatus(Status.BAD_INIT); - } - ByteArray keyAsByteArray = new ByteArray(keyInBytes); - - // Read - Map nodeIdToKeyValues = new HashMap(); - Status fixKeyResult = doRead(nodeIdList, - keyInBytes, - keyInHexFormat, - verbose, - nodeIdToKeyValues); - if(fixKeyResult != Status.SUCCESS) { - return new doKeyGetStatus(fixKeyResult); - } - - // Process read replies - List> nodeValues = Lists.newArrayList(); - fixKeyResult = processReadReplies(nodeIdList, - keyAsByteArray, - keyInHexFormat, - verbose, - nodeIdToKeyValues, - nodeValues); - if(fixKeyResult != Status.SUCCESS) { - return new doKeyGetStatus(fixKeyResult); - } - - // Resolve conflicts - List> toReadRepair = resolveReadConflicts(verbose, nodeValues); - - return new doKeyGetStatus(Status.SUCCESS, toReadRepair); - } - - /** - * - * @param toReadRepair Effectively the output of this method. Must pass in a - * non-null object to be populated by this method. - * @param verbose - * @param vInstance - * @return - */ - public Status doRepairPut(final List> toReadRepair, boolean verbose) { - if(verbose) { - System.out.println("Performing repair work:"); - } - - boolean allRepairsSuccessful = true; - for(NodeValue nodeKeyValue: toReadRepair) { - if(verbose) { - System.out.println("\tDoing repair for node with id:" + nodeKeyValue.getNodeId()); - } - try { - adminClient.storeOps.putNodeKeyValue(storeName, nodeKeyValue); - } catch(ObsoleteVersionException ove) { - // NOOP. Treat OVE as success. - } catch(VoldemortException ve) { - allRepairsSuccessful = false; - System.out.println("\t... Repair of key " + nodeKeyValue.getKey() - + "on node with id " + nodeKeyValue.getNodeId() - + " for version " + nodeKeyValue.getVersion() - + " failed because of exception : " + ve.getMessage()); - } - } - if(!allRepairsSuccessful) { - if(verbose) { - System.err.println("Aborting fixKey because exceptions were encountered when reparing key-values."); - System.out.println("Fix failed..."); - } - return Status.REPAIR_EXCEPTION; + summary.append("Keys per second processed: " + + getPrettyQPS(count, nowTimeMs - startTimeMs) + "\n"); + return summary.toString(); } - return Status.SUCCESS; } } diff --git a/src/java/voldemort/utils/ConsistencyFixCLI.java b/src/java/voldemort/utils/ConsistencyFixCLI.java index 4cd942a954..9d4a47e395 100644 --- a/src/java/voldemort/utils/ConsistencyFixCLI.java +++ b/src/java/voldemort/utils/ConsistencyFixCLI.java @@ -17,17 +17,9 @@ package voldemort.utils; import java.io.IOException; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import joptsimple.OptionParser; import joptsimple.OptionSet; -import voldemort.utils.ConsistencyFix.BadKeyInput; -import voldemort.utils.ConsistencyFix.BadKeyResult; public class ConsistencyFixCLI { @@ -45,14 +37,15 @@ public static void printUsage(String errMessage) { private static class Options { - public final static int defaultParallelism = 2; + public final static int defaultParallelism = 8; + public final static long defaultProgressBar = 1000; public String url = null; public String storeName = null; public String badKeyFileIn = null; public String badKeyFileOut = null; - public int parallelism = 0; - public boolean verbose = false; + public int parallelism = defaultParallelism; + public long progressBar = defaultProgressBar; } /** @@ -84,13 +77,15 @@ private static ConsistencyFixCLI.Options parseArgs(String[] args) { + "Keys that are not mae consistent are output to this file.") .ofType(String.class); parser.accepts("parallelism") - .withOptionalArg() - .describedAs("Number of read and to repair in parallel. " - + "Up to 2X this value requests outstanding simultaneously. " + .withRequiredArg() + .describedAs("Number of consistency fix messages outstanding in parallel. " + "[Default value: " + Options.defaultParallelism + "]") .ofType(Integer.class); - - parser.accepts("verbose", "verbose"); + parser.accepts("progress-bar") + .withRequiredArg() + .describedAs("Number of operations between 'info' progress messages. " + + "[Default value: " + Options.defaultProgressBar + "]") + .ofType(Long.class); OptionSet optionSet = parser.parse(args); if(optionSet.hasArgument("help")) { @@ -121,85 +116,27 @@ private static ConsistencyFixCLI.Options parseArgs(String[] args) { options.storeName = (String) optionSet.valueOf("store"); options.badKeyFileOut = (String) optionSet.valueOf("bad-key-file-out"); options.badKeyFileIn = (String) optionSet.valueOf("bad-key-file-in"); - options.parallelism = Options.defaultParallelism; if(optionSet.has("parallelism")) { options.parallelism = (Integer) optionSet.valueOf("parallelism"); } - - if(optionSet.has("verbose")) { - options.verbose = true; + if(optionSet.has("progress-bar")) { + options.progressBar = (Long) optionSet.valueOf("progress-bar"); } return options; } - private static ExecutorService badKeyReaderService; - private static ExecutorService badKeyWriterService; - private static ExecutorService badKeyGetters; - private static ExecutorService repairPutters; - - // TODO: Should all of this executor service stuff be in this class? Or, in - // ConsistencyFix? - - // TODO: Did I do anything stupid to parallelize this work? I have much more - // machinery than I expected (four executor services!) and I am not - // particularly fond of the "poison" types used to tear down the threads - // that depend on BlockingQueues (BadKeyREader and BadKeyWriter). public static void main(String[] args) throws Exception { Options options = parseArgs(args); - ConsistencyFix consistencyFix = new ConsistencyFix(options.url, options.storeName); - System.out.println("Constructed the consistency fixer.."); - - BlockingQueue badKeyQIn = new ArrayBlockingQueue(1000); - badKeyReaderService = Executors.newSingleThreadExecutor(); - badKeyReaderService.submit(consistencyFix.new BadKeyReader(options.badKeyFileIn, badKeyQIn)); - System.out.println("Created badKeyReader."); - - BlockingQueue badKeyQOut = new ArrayBlockingQueue(1000); - badKeyWriterService = Executors.newSingleThreadExecutor(); - badKeyWriterService.submit(consistencyFix.new BadKeyWriter(options.badKeyFileOut, - badKeyQOut)); - System.out.println("Created badKeyWriter."); - - CountDownLatch latch = new CountDownLatch(options.parallelism); - badKeyGetters = Executors.newFixedThreadPool(options.parallelism); - repairPutters = Executors.newFixedThreadPool(options.parallelism); - System.out.println("Created getters & putters."); - - for(int i = 0; i < options.parallelism; i++) { - badKeyGetters.submit(new ConsistencyFixKeyGetter(latch, - consistencyFix, - repairPutters, - badKeyQIn, - badKeyQOut, - options.verbose)); - } + ConsistencyFix consistencyFix = new ConsistencyFix(options.url, + options.storeName, + options.progressBar); - latch.await(); - System.out.println("All badKeyGetters have completed."); - - badKeyReaderService.shutdown(); - badKeyReaderService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); - System.out.println("Bad key reader service has shutdown."); - - badKeyGetters.shutdown(); - badKeyGetters.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); - System.out.println("All badKeyGetters have shutdown."); - - repairPutters.shutdown(); - repairPutters.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); - System.out.println("All repairPutters have shutdown."); - - // Poison the bad key writer. - badKeyQOut.put(consistencyFix.new BadKeyResult()); - badKeyWriterService.shutdown(); - badKeyWriterService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); - System.out.println("Bad key writer service has shutdown."); - - consistencyFix.stop(); - System.out.println("Stopped the consistency fixer.."); + String summary = consistencyFix.execute(options.parallelism, + options.badKeyFileIn, + options.badKeyFileOut); + System.out.println(summary); } - } diff --git a/src/java/voldemort/utils/ConsistencyFixKeyGetter.java b/src/java/voldemort/utils/ConsistencyFixKeyGetter.java deleted file mode 100644 index 3b9ef09c21..0000000000 --- a/src/java/voldemort/utils/ConsistencyFixKeyGetter.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright 2013 LinkedIn, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package voldemort.utils; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; - -import voldemort.utils.ConsistencyFix.BadKeyInput; -import voldemort.utils.ConsistencyFix.BadKeyResult; -import voldemort.utils.ConsistencyFix.Status; - -// TODO: Move (back) into ConsistencyFix? -class ConsistencyFixKeyGetter implements Runnable { - - // TODO: Add stats shared across all getters (invocations, successes, - // etc.) - - private final CountDownLatch latch; - private final ConsistencyFix consistencyFix; - private final ExecutorService repairPuttersService; - private final BlockingQueue badKeyQIn; - private final BlockingQueue badKeyQOut; - private final boolean verbose; - - ConsistencyFixKeyGetter(CountDownLatch latch, - ConsistencyFix consistencyFix, - ExecutorService repairPuttersService, - BlockingQueue badKeyQIn, - BlockingQueue badKeyQOut, - boolean verbose) { - this.latch = latch; - this.consistencyFix = consistencyFix; - this.repairPuttersService = repairPuttersService; - this.badKeyQIn = badKeyQIn; - this.badKeyQOut = badKeyQOut; - this.verbose = verbose; - } - - private String myName() { - return Thread.currentThread().getName(); - } - - @Override - public void run() { - int counter = 0; - BadKeyInput badKeyInput = null; - - try { - badKeyInput = badKeyQIn.take(); - - while(!badKeyInput.isPoison()) { - counter++; - ConsistencyFix.doKeyGetStatus doKeyGetStatus = consistencyFix.doKeyGet(badKeyInput.getKey(), - verbose); - - if(doKeyGetStatus.status == Status.SUCCESS) { - repairPuttersService.submit(new ConsistencyFixRepairPutter(consistencyFix, - badKeyInput.getKey(), - badKeyQOut, - doKeyGetStatus.nodeValues, - verbose)); - } else { - badKeyQOut.put(consistencyFix.new BadKeyResult(badKeyInput.getKey(), - doKeyGetStatus.status)); - } - - badKeyInput = badKeyQIn.take(); - } - - // Done. Poison other KeyGetters! - badKeyQIn.put(consistencyFix.new BadKeyInput()); - } catch(InterruptedException ie) { - System.err.println("KeyGetter thread " + myName() + " interruped."); - } finally { - latch.countDown(); - } - System.err.println("Thread " + myName() + " has swallowed poison and has counter = " - + counter); - } - -} \ No newline at end of file diff --git a/src/java/voldemort/utils/ConsistencyFixRepairPutter.java b/src/java/voldemort/utils/ConsistencyFixRepairPutter.java deleted file mode 100644 index ef6a01de17..0000000000 --- a/src/java/voldemort/utils/ConsistencyFixRepairPutter.java +++ /dev/null @@ -1,49 +0,0 @@ -package voldemort.utils; - -import java.util.List; -import java.util.concurrent.BlockingQueue; - -import voldemort.store.routed.NodeValue; -import voldemort.utils.ConsistencyFix.BadKeyResult; -import voldemort.utils.ConsistencyFix.Status; - -// TODO: Move (back) into ConsistencyFix? -class ConsistencyFixRepairPutter implements Runnable { - - // TODO: Add stats shared across all putters (invocations, successes, - // etc.) - private final ConsistencyFix consistencyFix; - private final String keyInHexFormat; - private final BlockingQueue badKeyQOut; - private final List> nodeValues; - private final boolean verbose; - - ConsistencyFixRepairPutter(ConsistencyFix consistencyFix, - String keyInHexFormat, - BlockingQueue badKeyQOut, - List> nodeValues, - boolean verbose) { - this.consistencyFix = consistencyFix; - this.keyInHexFormat = keyInHexFormat; - this.badKeyQOut = badKeyQOut; - this.nodeValues = nodeValues; - this.verbose = verbose; - } - - private String myName() { - return Thread.currentThread().getName(); - } - - @Override - public void run() { - Status consistencyFixStatus = consistencyFix.doRepairPut(nodeValues, verbose); - if(consistencyFixStatus != Status.SUCCESS) { - try { - badKeyQOut.put(consistencyFix.new BadKeyResult(keyInHexFormat, consistencyFixStatus)); - } catch(InterruptedException ie) { - System.err.println("RepairPutter thread " + myName() + " interruped."); - } - } - } - -} \ No newline at end of file diff --git a/src/java/voldemort/utils/ConsistencyFixWorker.java b/src/java/voldemort/utils/ConsistencyFixWorker.java new file mode 100644 index 0000000000..a0adc5d73e --- /dev/null +++ b/src/java/voldemort/utils/ConsistencyFixWorker.java @@ -0,0 +1,286 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package voldemort.utils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +import org.apache.log4j.Logger; + +import voldemort.VoldemortException; +import voldemort.client.protocol.admin.QueryKeyResult; +import voldemort.store.routed.NodeValue; +import voldemort.store.routed.ReadRepairer; +import voldemort.utils.ConsistencyFix.BadKeyResult; +import voldemort.utils.ConsistencyFix.Status; +import voldemort.versioning.ObsoleteVersionException; +import voldemort.versioning.VectorClock; +import voldemort.versioning.Versioned; + +import com.google.common.collect.Lists; + +class ConsistencyFixWorker implements Runnable { + + private static final Logger logger = Logger.getLogger(ConsistencyFixWorker.class); + + private final String keyInHexFormat; + private final ConsistencyFix consistencyFix; + private final BlockingQueue badKeyQOut; + + ConsistencyFixWorker(String keyInHexFormat, + ConsistencyFix consistencyFix, + BlockingQueue badKeyQOut) { + this.keyInHexFormat = keyInHexFormat; + this.consistencyFix = consistencyFix; + this.badKeyQOut = badKeyQOut; + } + + private String myName() { + return Thread.currentThread().getName() + "-" + ConsistencyFixWorker.class.getName(); + } + + @Override + public void run() { + logger.trace("About to process key " + keyInHexFormat + " (" + myName() + ")"); + Status status = doConsistencyFix(keyInHexFormat); + logger.trace("Finished processing key " + keyInHexFormat + " (" + myName() + ")"); + consistencyFix.getStats().incrementCount(); + + if(status != Status.SUCCESS) { + try { + badKeyQOut.put(consistencyFix.new BadKeyResult(keyInHexFormat, status)); + } catch(InterruptedException ie) { + logger.warn("Worker thread " + myName() + " interrupted."); + } + consistencyFix.getStats().incrementFailures(); + } + } + + public Status doConsistencyFix(String keyInHexFormat) { + + // Initialization. + byte[] keyInBytes; + List nodeIdList = null; + int masterPartitionId = -1; + try { + keyInBytes = ByteUtils.fromHexString(keyInHexFormat); + masterPartitionId = consistencyFix.getStoreInstance().getMasterPartitionId(keyInBytes); + nodeIdList = consistencyFix.getStoreInstance() + .getReplicationNodeList(masterPartitionId); + } catch(Exception exception) { + logger.info("Aborting fixKey due to bad init."); + if(logger.isDebugEnabled()) { + exception.printStackTrace(); + } + return Status.BAD_INIT; + } + ByteArray keyAsByteArray = new ByteArray(keyInBytes); + + // Do the reads + Map nodeIdToKeyValues = doReads(nodeIdList, + keyInBytes, + keyInHexFormat); + + // Process read replies (i.e., nodeIdToKeyValues) + ProcessReadRepliesResult result = processReadReplies(nodeIdList, + keyAsByteArray, + keyInHexFormat, + nodeIdToKeyValues); + if(result.status != Status.SUCCESS) { + return result.status; + } + + // Resolve conflicts indicated in nodeValues + List> toReadRepair = resolveReadConflicts(result.nodeValues); + + // Do the repairs + Status status = doRepairPut(toReadRepair); + + // return status of last operation (success or otherwise) + return status; + } + + /** + * + * @param nodeIdList + * @param keyInBytes + * @param keyInHexFormat + * @return + */ + private Map doReads(final List nodeIdList, + final byte[] keyInBytes, + final String keyInHexFormat) { + Map nodeIdToKeyValues = new HashMap(); + + ByteArray key = new ByteArray(keyInBytes); + for(int nodeId: nodeIdList) { + List> values = null; + try { + values = consistencyFix.getAdminClient().storeOps.getNodeKey(consistencyFix.getStoreName(), + nodeId, + key); + nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, values)); + } catch(VoldemortException ve) { + nodeIdToKeyValues.put(nodeId, new QueryKeyResult(key, ve)); + } + } + + return nodeIdToKeyValues; + } + + /** + * Result of an invocation of processReadReplies + */ + private class ProcessReadRepliesResult { + + public final Status status; + public final List> nodeValues; + + /** + * Constructor for error status + */ + ProcessReadRepliesResult(Status status) { + this.status = status; + this.nodeValues = null; + } + + /** + * Constructor for success + */ + ProcessReadRepliesResult(List> nodeValues) { + this.status = Status.SUCCESS; + this.nodeValues = nodeValues; + } + } + + /** + * + * @param nodeIdList + * @param keyAsByteArray + * @param keyInHexFormat + * @param nodeIdToKeyValues + * @param nodeValues Effectively the output of this method. Must pass in a + * non-null object to be populated by this method. + * @return + */ + private ProcessReadRepliesResult processReadReplies(final List nodeIdList, + final ByteArray keyAsByteArray, + final String keyInHexFormat, + final Map nodeIdToKeyValues) { + List> nodeValues = new ArrayList>(); + boolean exceptionsEncountered = false; + for(int nodeId: nodeIdList) { + QueryKeyResult keyValue; + if(nodeIdToKeyValues.containsKey(nodeId)) { + keyValue = nodeIdToKeyValues.get(nodeId); + + if(keyValue.hasException()) { + logger.debug("Exception encountered while fetching key " + keyInHexFormat + + " from node with nodeId " + nodeId + " : " + + keyValue.getException().getMessage()); + exceptionsEncountered = true; + } else { + if(keyValue.getValues().isEmpty()) { + Versioned versioned = new Versioned(null); + nodeValues.add(new NodeValue(nodeId, + keyValue.getKey(), + versioned)); + + } else { + for(Versioned value: keyValue.getValues()) { + nodeValues.add(new NodeValue(nodeId, + keyValue.getKey(), + value)); + } + } + } + } else { + logger.debug("No key-value returned from node with id:" + nodeId); + Versioned versioned = new Versioned(null); + nodeValues.add(new NodeValue(nodeId, keyAsByteArray, versioned)); + } + } + if(exceptionsEncountered) { + logger.info("Aborting fixKey because exceptions were encountered when fetching key-values."); + return new ProcessReadRepliesResult(Status.FETCH_EXCEPTION); + } + + return new ProcessReadRepliesResult(nodeValues); + } + + /** + * Decide on the specific key-value to write everywhere. + * + * @param nodeValues + * @return The subset of entries from nodeValues that need to be repaired. + */ + private List> resolveReadConflicts(final List> nodeValues) { + + // Some cut-paste-and-modify coding from + // store/routed/action/AbstractReadRepair.java and + // store/routed/ThreadPoolRoutedStore.java + ReadRepairer readRepairer = new ReadRepairer(); + List> toReadRepair = Lists.newArrayList(); + for(NodeValue v: readRepairer.getRepairs(nodeValues)) { + Versioned versioned = Versioned.value(v.getVersioned().getValue(), + ((VectorClock) v.getVersion()).clone()); + toReadRepair.add(new NodeValue(v.getNodeId(), v.getKey(), versioned)); + } + + if(logger.isDebugEnabled()) { + for(NodeValue nodeKeyValue: toReadRepair) { + logger.debug("\tRepair key " + nodeKeyValue.getKey() + "on node with id " + + nodeKeyValue.getNodeId() + " for version " + + nodeKeyValue.getVersion()); + } + } + return toReadRepair; + } + + /** + * + * @param toReadRepair Effectively the output of this method. Must pass in a + * non-null object to be populated by this method. + * @return + */ + public Status doRepairPut(final List> toReadRepair) { + + boolean allRepairsSuccessful = true; + for(NodeValue nodeKeyValue: toReadRepair) { + try { + consistencyFix.getAdminClient().storeOps.putNodeKeyValue(consistencyFix.getStoreName(), + nodeKeyValue); + } catch(ObsoleteVersionException ove) { + // NOOP. Treat OVE as success. + } catch(VoldemortException ve) { + allRepairsSuccessful = false; + logger.debug("Repair of key " + nodeKeyValue.getKey() + "on node with id " + + nodeKeyValue.getNodeId() + " for version " + + nodeKeyValue.getVersion() + " failed because of exception : " + + ve.getMessage()); + } + } + if(!allRepairsSuccessful) { + logger.info("Aborting fixKey because exceptions were encountered when reparing key-values."); + return Status.REPAIR_EXCEPTION; + } + return Status.SUCCESS; + } +} \ No newline at end of file