Skip to content
Browse files

Addressed review feedback and TODOs for KeyVersionSamplerCLI (and ren…

…amed it to KeyVersionFetcherCLI).

- mostly usability changes about command line options...
- one copyright fix
  • Loading branch information...
1 parent f5e8f5a commit 92b80c0488d839692ac5c9aa43545413cebbae78 @jayjwylie jayjwylie committed Mar 14, 2013
View
2 src/java/voldemort/client/protocol/pb/ProtoUtils.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2008-2009 LinkedIn, Inc
+ * Copyright 2008-2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
View
126 ...voldemort/utils/KeyVersionSamplerCLI.java → ...voldemort/utils/KeyVersionFetcherCLI.java
@@ -31,6 +31,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import joptsimple.OptionException;
import joptsimple.OptionParser;
@@ -46,20 +48,19 @@
import voldemort.store.StoreDefinition;
import voldemort.versioning.Versioned;
-// TODO: Rename KeyValueFetcher
-
/**
- * The KeyVersionSamplerCLI is a rudimentary tool that outputs a sampling of
+ * The KeyVersionFetcherCLI is a rudimentary tool that outputs a sampling of
* existing keys from a cluster. For each store in the cluster, a distinct file
* of keys to sample is expected. And, for each of these, a distint file of
* key-versions is generated.
*
*/
-public class KeyVersionSamplerCLI {
+public class KeyVersionFetcherCLI {
- private static Logger logger = Logger.getLogger(KeyVersionSamplerCLI.class);
+ private static Logger logger = Logger.getLogger(KeyVersionFetcherCLI.class);
- private final static int KEY_PARALLELISM = 4;
+ private final static int DEFAULT_KEY_PARALLELISM = 4;
+ private final static int DEFAULT_PROGRESS_PERIOD_OPS = 1000;
private final AdminClient adminClient;
private final Cluster cluster;
@@ -69,9 +70,18 @@
private final String inDir;
private final String outDir;
- private final ExecutorService kvSamplerService;
+ private final ExecutorService kvFetcherService;
+ private final int progressPeriodOps;
+
+ private final long startTimeMs;
+ private static AtomicInteger fetches = new AtomicInteger(0);
- public KeyVersionSamplerCLI(String url, String inDir, String outDir, int keyParallelism) {
+ public KeyVersionFetcherCLI(String url,
+ String inDir,
+ String outDir,
+ List<String> storeNames,
+ int keyParallelism,
+ int progressPeriodOps) {
if(logger.isInfoEnabled()) {
logger.info("Connecting to bootstrap server: " + url);
}
@@ -80,30 +90,69 @@ public KeyVersionSamplerCLI(String url, String inDir, String outDir, int keyPara
this.storeDefinitions = adminClient.metadataMgmtOps.getRemoteStoreDefList(0).getValue();
this.storeNameToKeyStringsMap = new HashMap<String, StringBuilder>();
for(StoreDefinition storeDefinition: storeDefinitions) {
- this.storeNameToKeyStringsMap.put(storeDefinition.getName(), new StringBuilder());
+ String storeName = storeDefinition.getName();
+ if(storeNames != null) {
+ if(!storeNames.contains(storeName)) {
+ logger.debug("Will not sample store "
+ + storeName
+ + " since it is not in list of storeNames provided on command line.");
+ continue;
+ }
+ }
+ this.storeNameToKeyStringsMap.put(storeName, new StringBuilder());
+ }
+
+ if(storeNames != null) {
+ List<String> badStoreNames = new LinkedList<String>();
+ for(String storeName: storeNames) {
+ if(!this.storeNameToKeyStringsMap.keySet().contains(storeName)) {
+ badStoreNames.add(storeName);
+ }
+ }
+ if(badStoreNames.size() > 0) {
+ Utils.croak("Some storeNames provided on the command line were not found on this cluster: "
+ + badStoreNames);
+ }
}
this.inDir = inDir;
this.outDir = outDir;
- this.kvSamplerService = Executors.newFixedThreadPool(keyParallelism);
+ this.kvFetcherService = Executors.newFixedThreadPool(keyParallelism);
+
+ this.progressPeriodOps = progressPeriodOps;
+ this.startTimeMs = System.currentTimeMillis();
}
public boolean sampleStores() {
for(StoreDefinition storeDefinition: storeDefinitions) {
- if(!sampleStore(storeDefinition)) {
- return false;
+ if(storeNameToKeyStringsMap.keySet().contains(storeDefinition.getName())) {
+ if(!sampleStore(storeDefinition)) {
+ return false;
+ }
}
}
return true;
}
- public class KeyVersionSampler implements Callable<String> {
+ public void updateFetchProgress() {
+ int curFetches = fetches.incrementAndGet();
+
+ if(0 == curFetches % progressPeriodOps) {
+ if(logger.isInfoEnabled()) {
+ long durationS = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()
+ - startTimeMs);
+ logger.info("Fetched " + curFetches + " in " + durationS + " seconds.");
+ }
+ }
+ }
+
+ public class KeyVersionFetcher implements Callable<String> {
private final StoreInstance storeInstance;
private final byte[] key;
- KeyVersionSampler(StoreInstance storeInstance, byte[] key) {
+ KeyVersionFetcher(StoreInstance storeInstance, byte[] key) {
this.storeInstance = storeInstance;
this.key = key;
}
@@ -127,6 +176,7 @@ public String call() throws Exception {
sb.append("\n");
replicationOffset++;
}
+ updateFetchProgress();
return sb.toString();
}
}
@@ -159,8 +209,8 @@ public boolean sampleStore(StoreDefinition storeDefinition) {
for(String keyLine = keyReader.readLine(); keyLine != null; keyLine = keyReader.readLine()) {
byte[] keyInBytes = ByteUtils.fromHexString(keyLine.trim());
- KeyVersionSampler kvSampler = new KeyVersionSampler(storeInstance, keyInBytes);
- Future<String> future = kvSamplerService.submit(kvSampler);
+ KeyVersionFetcher kvFetcher = new KeyVersionFetcher(storeInstance, keyInBytes);
+ Future<String> future = kvFetcherService.submit(kvFetcher);
futureKVs.add(future);
}
@@ -215,7 +265,7 @@ public void stop() {
if(adminClient != null) {
adminClient.close();
}
- kvSamplerService.shutdown();
+ kvFetcherService.shutdown();
}
/**
@@ -231,7 +281,7 @@ private static OptionParser getParser() {
.describedAs("bootstrap-url")
.ofType(String.class);
parser.accepts("in-dir",
- "[REQUIRED] Directory in which to find the input key files (named \"{storeName}.kvs\", generated by KeySamplerCLI.")
+ "[REQUIRED] Directory in which to find the input key files (named \"{storeName}.kvs\", generated by KeyFetcherCLI.")
.withRequiredArg()
.describedAs("inputDirectory")
.ofType(String.class);
@@ -240,12 +290,24 @@ private static OptionParser getParser() {
.withRequiredArg()
.describedAs("outputDirectory")
.ofType(String.class);
+ parser.accepts("store-names",
+ "Store names to sample. Comma delimited list or singleton. [Default: ALL]")
+ .withRequiredArg()
+ .describedAs("storeNames")
+ .withValuesSeparatedBy(',')
+ .ofType(String.class);
parser.accepts("parallelism",
- "Number of key-versions to sample in parallel. [Default: " + KEY_PARALLELISM
- + " ]")
+ "Number of key-versions to sample in parallel. [Default: "
+ + DEFAULT_KEY_PARALLELISM + " ]")
.withRequiredArg()
.describedAs("storeParallelism")
.ofType(Integer.class);
+ parser.accepts("progress-period-ops",
+ "Number of operations between progress info is displayed. [Default: "
+ + DEFAULT_PROGRESS_PERIOD_OPS + " ]")
+ .withRequiredArg()
+ .describedAs("progressPeriodOps")
+ .ofType(Integer.class);
return parser;
}
@@ -254,15 +316,17 @@ private static OptionParser getParser() {
*/
private static void printUsage() {
StringBuilder help = new StringBuilder();
- help.append("KeySamplerCLI Tool\n");
+ help.append("KeyFetcherCLI Tool\n");
help.append(" Find one key from each store-partition. Output keys per store.\n");
help.append("Options:\n");
help.append(" Required:\n");
help.append(" --url <bootstrap-url>\n");
help.append(" --in-dir <inputDirectory>\n");
help.append(" --out-dir <outputDirectory>\n");
help.append(" Optional:\n");
+ help.append(" --store-names <storeName>[,<storeName>...]\n");
help.append(" --parallelism <keyParallelism>\n");
+ help.append(" --progress-period-ops <progressPeriodOps>\n");
help.append(" --help\n");
System.out.print(help.toString());
}
@@ -309,16 +373,30 @@ public static void main(String[] args) throws Exception {
String outDir = (String) options.valueOf("out-dir");
Utils.mkdirs(new File(outDir));
- Integer keyParallelism = KEY_PARALLELISM;
+ List<String> storeNames = null;
+ if(options.hasArgument("store-names")) {
+ @SuppressWarnings("unchecked")
+ List<String> list = (List<String>) options.valuesOf("store-names");
+ storeNames = list;
+ }
+
+ Integer keyParallelism = DEFAULT_KEY_PARALLELISM;
if(options.hasArgument("parallelism")) {
keyParallelism = (Integer) options.valueOf("parallelism");
}
+ Integer progressPeriodOps = DEFAULT_PROGRESS_PERIOD_OPS;
+ if(options.hasArgument("progress-period-ops")) {
+ progressPeriodOps = (Integer) options.valueOf("progress-period-ops");
+ }
+
try {
- KeyVersionSamplerCLI sampler = new KeyVersionSamplerCLI(url,
+ KeyVersionFetcherCLI sampler = new KeyVersionFetcherCLI(url,
inDir,
outDir,
- keyParallelism);
+ storeNames,
+ keyParallelism,
+ progressPeriodOps);
try {
if(!sampler.sampleStores()) {

0 comments on commit 92b80c0

Please sign in to comment.
Something went wrong with that request. Please try again.