From b0a000da5a8bf8d6d967b41e2ec04fe7aa2d03b7 Mon Sep 17 00:00:00 2001 From: Jay J Wylie Date: Thu, 7 Mar 2013 10:48:19 -0800 Subject: [PATCH] Actually adding files KeySamplerCLI and KeyVersionSamplerCLI. --- src/java/voldemort/utils/KeySamplerCLI.java | 407 ++++++++++++++++++ .../voldemort/utils/KeyVersionSamplerCLI.java | 324 ++++++++++++++ 2 files changed, 731 insertions(+) create mode 100644 src/java/voldemort/utils/KeySamplerCLI.java create mode 100644 src/java/voldemort/utils/KeyVersionSamplerCLI.java diff --git a/src/java/voldemort/utils/KeySamplerCLI.java b/src/java/voldemort/utils/KeySamplerCLI.java new file mode 100644 index 0000000000..f061abb25d --- /dev/null +++ b/src/java/voldemort/utils/KeySamplerCLI.java @@ -0,0 +1,407 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package voldemort.utils; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import joptsimple.OptionException; +import joptsimple.OptionParser; +import joptsimple.OptionSet; + +import org.apache.log4j.Logger; + +import voldemort.VoldemortException; +import voldemort.client.protocol.admin.AdminClient; +import voldemort.client.protocol.admin.AdminClientConfig; +import voldemort.cluster.Cluster; +import voldemort.cluster.Node; +import voldemort.store.StoreDefinition; + +/** + * The KeySamplerCLI tool samples keys for every partition for every store on a + * cluster. A distinct file of sampled keys is generated for each store. + * + * By default, the "first" key of each partition is sampled. Optional arguments + * control sampling more keys per partition, and skipping some keys on the + * server while sampling. + */ +public class KeySamplerCLI { + + private static Logger logger = Logger.getLogger(ConsistencyCheck.class); + + private final static int NODE_PARALLELISM = 8; + private final static int MAX_RECORDS = 1; + private final static int SKIP_RECORDS = 0; + + private final AdminClient adminClient; + private final Cluster cluster; + private final List storeDefinitions; + private final Map storeNameToKeyStringsMap; + + private final String outDir; + private final ExecutorService nodeSamplerService; + + private final int maxRecords; + private final int skipRecords; + + public KeySamplerCLI(String url, + String outDir, + int nodeParallelism, + int maxRecords, + int skipRecords) { + if(logger.isInfoEnabled()) { + logger.info("Connecting to bootstrap server: " + url); + } + this.adminClient = new AdminClient(url, new AdminClientConfig()); + this.cluster = adminClient.getAdminClientCluster(); + this.storeDefinitions = adminClient.metadataMgmtOps.getRemoteStoreDefList(0).getValue(); + this.storeNameToKeyStringsMap = new HashMap(); + for(StoreDefinition storeDefinition: storeDefinitions) { + this.storeNameToKeyStringsMap.put(storeDefinition.getName(), new StringBuilder()); + } + + this.outDir = outDir; + + this.nodeSamplerService = Executors.newFixedThreadPool(nodeParallelism); + + this.maxRecords = maxRecords; + this.skipRecords = skipRecords; + } + + public boolean sampleStores() { + for(StoreDefinition storeDefinition: storeDefinitions) { + boolean success = sampleStore(storeDefinition); + if(!success) { + return false; + } + } + return true; + } + + public static class NodeSampleResult { + + public final boolean success; + public final String keyString; + + NodeSampleResult(boolean success, String keyString) { + this.success = success; + this.keyString = keyString; + } + } + + public class NodeSampler implements Callable { + + private final Node node; + private final StoreDefinition storeDefinition; + + NodeSampler(Node node, StoreDefinition storeDefinition) { + this.node = node; + this.storeDefinition = storeDefinition; + } + + @Override + public NodeSampleResult call() throws Exception { + boolean success = false; + + String storeName = storeDefinition.getName(); + StringBuilder hexKeyStrings = new StringBuilder(); + + for(int partitionId: node.getPartitionIds()) { + success = false; + + // Simple, lame throttling since thread is going at same node + // repeatedly + try { + Thread.sleep(100); + } catch(InterruptedException e) { + logger.warn("Sleep throttling interrupted : " + e.getMessage()); + e.printStackTrace(); + } + + String infoTag = "store " + storeName + ", partitionID " + partitionId + + " on node " + node.getId() + " [" + node.getHost() + "]"; + logger.info("Starting sample --- " + infoTag); + + List singlePartition = new ArrayList(); + singlePartition.add(partitionId); + + // Wrap fetchKeys in backoff-and-retry loop + int attempts = 0; + int backoffMs = 1000; + while(attempts < 5 && !success) { + try { + Iterator fetchIterator; + // TODO: should fetchMasterEntries be true? + fetchIterator = adminClient.bulkFetchOps.fetchKeys(node.getId(), + storeName, + singlePartition, + null, + false, + skipRecords, + maxRecords); + int keyCount = 0; + while(fetchIterator.hasNext()) { + ByteArray key = fetchIterator.next(); + String hexKeyString = ByteUtils.toHexString(key.get()); + hexKeyStrings.append(hexKeyString + "\n"); + keyCount++; + } + if(keyCount < maxRecords) { + logger.warn("Fewer keys (" + keyCount + ") than requested (" + + maxRecords + ") returned --- " + infoTag); + } + success = true; + } catch(VoldemortException ve) { + logger.warn("Caught VoldemortException and will retry (" + infoTag + "): " + + ve.getMessage() + " --- " + ve.getCause().getMessage()); + try { + Thread.sleep(backoffMs); + backoffMs *= 2; + } catch(InterruptedException e) { + logger.warn("Backoff-and-retry sleep interrupted : " + e.getMessage()); + e.printStackTrace(); + break; + } + } + } + if(!success) { + logger.error("Failed to sample --- " + infoTag); + break; + } + } + return new NodeSampleResult(success, hexKeyStrings.toString()); + } + } + + public boolean sampleStore(StoreDefinition storeDefinition) { + String storeName = storeDefinition.getName(); + String fileName = outDir + System.getProperty("file.separator") + storeName + ".keys"; + + File file = new File(fileName); + if(file.exists()) { + logger.warn("Key file " + fileName + " already exists. Skipping sampling store " + + storeName + "."); + return true; + } + + Writer keyWriter = null; + try { + keyWriter = new FileWriter(file); + + Map> results = new HashMap>(); + for(Node node: cluster.getNodes()) { + Future future = nodeSamplerService.submit(new NodeSampler(node, + storeDefinition)); + results.put(node, future); + } + + boolean success = true; + for(Node node: cluster.getNodes()) { + Future future = results.get(node); + if(!success) { + future.cancel(true); + continue; + } + + try { + NodeSampleResult nodeSampleResult = future.get(); + if(nodeSampleResult.success) { + keyWriter.write(nodeSampleResult.keyString); + } else { + success = false; + logger.error("Sampling on node " + node.getHost() + " of store " + + storeDefinition.getName() + " failed."); + } + } catch(ExecutionException ee) { + success = false; + logger.error("Encountered an execution exception on node " + node.getHost() + + " while sampling " + storeName + ": " + ee.getMessage()); + ee.printStackTrace(); + } catch(InterruptedException ie) { + success = false; + logger.error("Waiting for node " + node.getHost() + " to be sampled for store " + + storeName + ", but was interrupted: " + ie.getMessage()); + } + } + return success; + } catch(IOException e) { + logger.error("IOException encountered for store " + storeName + " : " + e.getMessage()); + return false; + } finally { + try { + keyWriter.close(); + } catch(IOException e) { + logger.error("IOException caught while trying to close keyWriter for store " + + storeName + " : " + e.getMessage()); + } + } + } + + public void stop() { + if(adminClient != null) { + adminClient.stop(); + } + nodeSamplerService.shutdown(); + } + + /** + * Return args parser + * + * @return program parser + * */ + private static OptionParser getParser() { + OptionParser parser = new OptionParser(); + parser.accepts("help", "print help information"); + parser.accepts("url", "[REQUIRED] bootstrap URL") + .withRequiredArg() + .describedAs("bootstrap-url") + .ofType(String.class); + parser.accepts("out-dir", + "[REQUIRED] Directory in which to output the key files (named \"{storeName}.keys\".") + .withRequiredArg() + .describedAs("outputDirectory") + .ofType(String.class); + parser.accepts("parallelism", + "Number of nodes to sample in parallel. [Default: " + NODE_PARALLELISM + + " ]") + .withRequiredArg() + .describedAs("storeParallelism") + .ofType(Integer.class); + parser.accepts("max-records", + "Number of keys sampled per partitoin. [Default: " + MAX_RECORDS + " ]") + .withRequiredArg() + .describedAs("maxRecords") + .ofType(Integer.class); + parser.accepts("skip-records", + "Number of keys to skip between samples (per partition). [Default: " + + SKIP_RECORDS + " ]") + .withRequiredArg() + .describedAs("maxRecords") + .ofType(Integer.class); + return parser; + } + + /** + * Print Usage to STDOUT + */ + private static void printUsage() { + StringBuilder help = new StringBuilder(); + help.append("KeySamplerCLI Tool\n"); + help.append(" Find one key from each store-partition. Output keys per store.\n"); + help.append("Options:\n"); + help.append(" Required:\n"); + help.append(" --url \n"); + help.append(" --out-dir \n"); + help.append(" Optional:\n"); + help.append(" --parallelism \n"); + help.append(" --max-records \n"); + help.append(" --skip-records \n"); + help.append(" --help\n"); + System.out.print(help.toString()); + } + + private static void printUsageAndDie(String errMessage) { + printUsage(); + Utils.croak("\n" + errMessage); + } + + // TODO: Add a "stores" option so that a subset of stores can be done + // instead of all stores one-by-one. + + // TODO: Add a "partitions" option so that a subset of partitions can be + // done instead of all partitions. + + public static void main(String[] args) throws Exception { + OptionSet options = null; + try { + options = getParser().parse(args); + } catch(OptionException oe) { + printUsageAndDie("Exception when parsing arguments : " + oe.getMessage()); + return; + } + + /* validate options */ + if(options.hasArgument("help")) { + printUsage(); + return; + } + if(!options.hasArgument("url") || !options.hasArgument("out-dir")) { + printUsageAndDie("Missing a required argument."); + return; + } + + String url = (String) options.valueOf("url"); + + String outDir = (String) options.valueOf("out-dir"); + Utils.mkdirs(new File(outDir)); + + Integer nodeParallelism = NODE_PARALLELISM; + if(options.hasArgument("parallelism")) { + nodeParallelism = (Integer) options.valueOf("parallelism"); + } + + Integer maxRecords = MAX_RECORDS; + if(options.hasArgument("max-records")) { + maxRecords = (Integer) options.valueOf("max-records"); + } + + Integer skipRecords = SKIP_RECORDS; + if(options.hasArgument("skip-records")) { + skipRecords = (Integer) options.valueOf("skip-records"); + } + + // TODO: Add a '--pid-server' and a '--unordered-server' option and + // require exactly one of them to be set. This forces the person + // invoking the command to determine if the servers can do per-partition + // sampling directly, or if many keys must be explicitly sampled so that + // determination of partition coverage is done client-side. + logger.warn("This tool is hard-coded to take advantage of servers that " + + "use PID style layout of data in BDB. " + + "Use fo this tool against other types of servers is undefined."); + + try { + KeySamplerCLI sampler = new KeySamplerCLI(url, + outDir, + nodeParallelism, + maxRecords, + skipRecords); + try { + if(!sampler.sampleStores()) { + logger.error("Some stores were not successfully sampled."); + } + } finally { + sampler.stop(); + } + + } catch(Exception e) { + Utils.croak("Exception during key sampling: " + e.getMessage()); + } + + } +} diff --git a/src/java/voldemort/utils/KeyVersionSamplerCLI.java b/src/java/voldemort/utils/KeyVersionSamplerCLI.java new file mode 100644 index 0000000000..9ad132687d --- /dev/null +++ b/src/java/voldemort/utils/KeyVersionSamplerCLI.java @@ -0,0 +1,324 @@ +/* + * Copyright 2013 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package voldemort.utils; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import joptsimple.OptionException; +import joptsimple.OptionParser; +import joptsimple.OptionSet; + +import org.apache.commons.codec.DecoderException; +import org.apache.log4j.Logger; + +import voldemort.client.protocol.admin.AdminClient; +import voldemort.client.protocol.admin.AdminClientConfig; +import voldemort.cluster.Cluster; +import voldemort.store.StoreDefinition; +import voldemort.versioning.Versioned; + +/** + * The KeyVersionSamplerCLI is a rudimentary tool that outputs a sampling of + * existing keys from a cluster. For each store in the cluster, a distinct file + * of keys to sample is expected. And, for each of these, a distint file of + * key-versions is generated. + * + */ +public class KeyVersionSamplerCLI { + + private static Logger logger = Logger.getLogger(ConsistencyCheck.class); + + private final static int KEY_PARALLELISM = 4; + + private final AdminClient adminClient; + private final Cluster cluster; + private final List storeDefinitions; + private final Map storeNameToKeyStringsMap; + + private final String inDir; + private final String outDir; + + private final ExecutorService kvSamplerService; + + public KeyVersionSamplerCLI(String url, String inDir, String outDir, int keyParallelism) { + if(logger.isInfoEnabled()) { + logger.info("Connecting to bootstrap server: " + url); + } + this.adminClient = new AdminClient(url, new AdminClientConfig()); + this.cluster = adminClient.getAdminClientCluster(); + this.storeDefinitions = adminClient.metadataMgmtOps.getRemoteStoreDefList(0).getValue(); + this.storeNameToKeyStringsMap = new HashMap(); + for(StoreDefinition storeDefinition: storeDefinitions) { + this.storeNameToKeyStringsMap.put(storeDefinition.getName(), new StringBuilder()); + } + + this.inDir = inDir; + this.outDir = outDir; + + this.kvSamplerService = Executors.newFixedThreadPool(keyParallelism); + } + + public boolean sampleStores() { + for(StoreDefinition storeDefinition: storeDefinitions) { + if(!sampleStore(storeDefinition)) { + return false; + } + } + return true; + } + + public class KeyVersionSampler implements Callable { + + private final StoreInstance storeInstance; + private final byte[] key; + + KeyVersionSampler(StoreInstance storeInstance, byte[] key) { + this.storeInstance = storeInstance; + this.key = key; + } + + @Override + public String call() throws Exception { + String storeName = storeInstance.getStoreDefinition().getName(); + int masterPartitionId = storeInstance.getMasterPartitionId(key); + List replicatingNodeIds = storeInstance.getReplicationNodeList(masterPartitionId); + + int replicationOffset = 0; + StringBuilder sb = new StringBuilder(); + for(int replicatingNodeId: replicatingNodeIds) { + List> values = adminClient.storeOps.getNodeKey(storeName, + replicatingNodeId, + new ByteArray(key)); + sb.append(replicationOffset + " : " + ByteUtils.toHexString(key) + "\t"); + for(Versioned value: values) { + sb.append(value.getVersion().toString() + "\t"); + } + sb.append("\n"); + replicationOffset++; + } + return sb.toString(); + } + } + + public boolean sampleStore(StoreDefinition storeDefinition) { + String storeName = storeDefinition.getName(); + + String keysFileName = inDir + System.getProperty("file.separator") + storeName + ".keys"; + File keysFile = new File(keysFileName); + if(!keysFile.exists()) { + logger.error("Keys file " + keysFileName + "does not exist!"); + return false; + } + + String kvFileName = outDir + System.getProperty("file.separator") + storeName + ".kvs"; + File kvFile = new File(kvFileName); + if(kvFile.exists()) { + logger.info("Key-Version file " + kvFileName + + " exists, so will not sample keys from file " + keysFileName + "."); + return true; + } + + StoreInstance storeInstance = new StoreInstance(cluster, storeDefinition); + BufferedReader keyReader = null; + BufferedWriter kvWriter = null; + try { + keyReader = new BufferedReader(new FileReader(keysFileName)); + + Queue> futureKVs = new LinkedList>(); + for(String keyLine = keyReader.readLine(); keyLine != null; keyLine = keyReader.readLine()) { + byte[] keyInBytes = ByteUtils.fromHexString(keyLine.trim()); + + KeyVersionSampler kvSampler = new KeyVersionSampler(storeInstance, keyInBytes); + Future future = kvSamplerService.submit(kvSampler); + futureKVs.add(future); + } + + kvWriter = new BufferedWriter(new FileWriter(kvFileName)); + while(!futureKVs.isEmpty()) { + Future future = futureKVs.poll(); + String keyVersions = future.get(); + kvWriter.append(keyVersions); + } + + return true; + } catch(DecoderException de) { + logger.error("Could not decode key to sample for store " + storeName + " : " + + de.getMessage()); + return false; + } catch(IOException ioe) { + logger.error("IOException caught while sampling store " + storeName + " : " + + ioe.getMessage()); + return false; + } catch(InterruptedException ie) { + logger.error("InterruptedException caught while sampling store " + storeName + " : " + + ie.getMessage()); + return false; + } catch(ExecutionException ee) { + logger.error("Encountered an execution exception while sampling " + storeName + ": " + + ee.getMessage()); + ee.printStackTrace(); + return false; + } finally { + if(keyReader != null) { + try { + keyReader.close(); + } catch(IOException e) { + logger.error("IOException caught while trying to close keyReader for store " + + storeName + " : " + e.getMessage()); + e.printStackTrace(); + } + } + if(kvWriter != null) { + try { + kvWriter.close(); + } catch(IOException e) { + logger.error("IOException caught while trying to close kvWriter for store " + + storeName + " : " + e.getMessage()); + e.printStackTrace(); + } + } + } + } + + public void stop() { + if(adminClient != null) { + adminClient.stop(); + } + kvSamplerService.shutdown(); + } + + /** + * Return args parser + * + * @return program parser + * */ + private static OptionParser getParser() { + OptionParser parser = new OptionParser(); + parser.accepts("help", "print help information"); + parser.accepts("url", "[REQUIRED] bootstrap URL") + .withRequiredArg() + .describedAs("bootstrap-url") + .ofType(String.class); + parser.accepts("in-dir", + "[REQUIRED] Directory in which to find the input key files (named \"{storeName}.kvs\", generated by KeySamplerCLI.") + .withRequiredArg() + .describedAs("inputDirectory") + .ofType(String.class); + parser.accepts("out-dir", + "[REQUIRED] Directory in which to output the key files (named \"{storeName}.kvs\".") + .withRequiredArg() + .describedAs("outputDirectory") + .ofType(String.class); + parser.accepts("parallelism", + "Number of key-versions to sample in parallel. [Default: " + KEY_PARALLELISM + + " ]") + .withRequiredArg() + .describedAs("storeParallelism") + .ofType(Integer.class); + return parser; + } + + /** + * Print Usage to STDOUT + */ + private static void printUsage() { + StringBuilder help = new StringBuilder(); + help.append("KeySamplerCLI Tool\n"); + help.append(" Find one key from each store-partition. Output keys per store.\n"); + help.append("Options:\n"); + help.append(" Required:\n"); + help.append(" --url \n"); + help.append(" --in-dir \n"); + help.append(" --out-dir \n"); + help.append(" Optional:\n"); + help.append(" --parallelism \n"); + help.append(" --help\n"); + System.out.print(help.toString()); + } + + private static void printUsageAndDie(String errMessage) { + printUsage(); + Utils.croak("\n" + errMessage); + } + + public static void main(String[] args) throws Exception { + OptionSet options = null; + try { + options = getParser().parse(args); + } catch(OptionException oe) { + printUsageAndDie("Exception when parsing arguments : " + oe.getMessage()); + return; + } + + /* validate options */ + if(options.hasArgument("help")) { + printUsage(); + return; + } + if(!options.hasArgument("url") || !options.hasArgument("in-dir") + || !options.hasArgument("out-dir")) { + printUsageAndDie("Missing a required argument."); + return; + } + + String url = (String) options.valueOf("url"); + + String inDir = (String) options.valueOf("in-dir"); + Utils.mkdirs(new File(inDir)); + + String outDir = (String) options.valueOf("out-dir"); + Utils.mkdirs(new File(outDir)); + + Integer keyParallelism = KEY_PARALLELISM; + if(options.hasArgument("parallelism")) { + keyParallelism = (Integer) options.valueOf("parallelism"); + } + + try { + KeyVersionSamplerCLI sampler = new KeyVersionSamplerCLI(url, + inDir, + outDir, + keyParallelism); + + try { + if(!sampler.sampleStores()) { + logger.error("Key-versions were not successfully sampled from some stores."); + } + } finally { + sampler.stop(); + } + + } catch(Exception e) { + Utils.croak("Exception during key-version sampling: " + e.getMessage()); + } + + } +}