Browse files

Merge branch 'master' of github.com:voldemort/voldemort

  • Loading branch information...
2 parents 994c51b + 3ba4e35 commit 683873e458603c4a44c8eb152239dd3153db8969 @afeinberg afeinberg committed Aug 13, 2010
Showing with 448 additions and 352 deletions.
  1. +2 −2 build.xml
  2. +1 −1 contrib/ec2-testing/src/java/voldemort/utils/app/VoldemortApp.java
  3. +0 −84 contrib/ec2-testing/test/voldemort/utils/Ec2InstanceRemoteTestUtils.java
  4. +1 −1 contrib/hadoop-store-builder/src/java/voldemort/store/readwrite/benchmark/BuildTestRWStore.java
  5. +52 −7 ...adoop-store-builder/src/java/voldemort/store/readwrite/mr/AbstractRWHadoopStoreBuilderMapper.java
  6. +25 −43 contrib/hadoop-store-builder/src/java/voldemort/store/readwrite/mr/HadoopRWStoreBuilder.java
  7. +38 −24 contrib/hadoop-store-builder/src/java/voldemort/store/readwrite/mr/HadoopRWStoreBuilderReducer.java
  8. +29 −19 contrib/hadoop-store-builder/src/java/voldemort/store/readwrite/mr/HadoopRWStoreJobRunner.java
  9. +1 −1 contrib/hadoop-store-builder/test/voldemort/store/readwrite/mr/HadoopRWStoreBuilderTest.java
  10. +212 −131 src/java/voldemort/VoldemortAdminTool.java
  11. +0 −4 src/java/voldemort/server/scheduler/SchedulerService.java
  12. +5 −0 src/java/voldemort/server/storage/StorageService.java
  13. +48 −1 src/java/voldemort/store/readonly/swapper/AdminStoreSwapper.java
  14. +0 −1 src/java/voldemort/store/rebalancing/RedirectingStore.java
  15. +1 −1 src/java/voldemort/utils/RebalanceUtils.java
  16. +1 −0 src/java/voldemort/xml/StoreDefinitionsMapper.java
  17. +16 −15 test/integration/voldemort/performance/benchmark/Benchmark.java
  18. +1 −1 test/integration/voldemort/performance/benchmark/Measurement.java
  19. +4 −6 test/integration/voldemort/performance/benchmark/VoldemortWrapper.java
  20. +7 −0 test/unit/voldemort/client/AdminServiceBasicTest.java
  21. +0 −2 test/unit/voldemort/routing/ZoneRoutingStrategyTest.java
  22. +0 −1 test/unit/voldemort/server/gossip/GossiperTest.java
  23. +1 −3 test/unit/voldemort/store/readonly/ReadOnlyUtilsTest.java
  24. +3 −3 test/unit/voldemort/store/routed/NodeValueTest.java
  25. +0 −1 test/unit/voldemort/utils/RebalanceUtilsTest.java
View
4 build.xml
@@ -231,7 +231,7 @@
</junitreport>
</target>
- <target name="ec2testing-junit" depends="contrib-jar" description="Run EC2 testing contrib junit tests.">
+ <target name="ec2testing-junit" depends="all" description="Run EC2 testing contrib junit tests.">
<copy todir="${testclasses.dir}">
<fileset dir="${contrib.root.dir}/ec2-testing/resources" />
</copy>
@@ -245,7 +245,7 @@
<formatter type="xml" />
<batchtest fork="yes" todir="${contribtestreport.dir}">
<fileset dir="${contrib.classes.dir}">
- <include name="**/Ec2*Test.class" />
+ <include name="**/Ec2SmokeTest.class" />
</fileset>
</batchtest>
</junit>
View
2 contrib/ec2-testing/src/java/voldemort/utils/app/VoldemortApp.java
@@ -41,6 +41,7 @@
import com.xerox.amazonws.ec2.RegionInfo;
+@SuppressWarnings("unchecked")
public abstract class VoldemortApp {
protected final OptionParser parser = new OptionParser();
@@ -77,7 +78,6 @@ protected OptionSet parse(String[] args) {
}
}
- @SuppressWarnings("unchecked")
protected void setLogging(OptionSet options) {
// "Options are \"debug\", \"info\" (default), \"warn\", \"error\", or \"off\"")
String levelString = CmdUtils.valueOf(options, "logging", "info");
View
84 contrib/ec2-testing/test/voldemort/utils/Ec2InstanceRemoteTestUtils.java
@@ -1,84 +0,0 @@
-/*
- * Copyright 2009 LinkedIn, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package voldemort.utils;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.io.FileUtils;
-
-import voldemort.utils.impl.TypicaEc2Connection;
-
-public class Ec2InstanceRemoteTestUtils {
-
- public static List<HostNamePair> createInstances(String ec2AccessId,
- String ec2SecretKey,
- String ec2Ami,
- String ec2KeyPairId,
- int count) throws Exception {
- Ec2Connection ec2 = new TypicaEc2Connection(ec2AccessId, ec2SecretKey);
- // TODO : fix me
- // return ec2.create(ec2Ami, ec2KeyPairId,
- // Ec2Connection.Ec2InstanceType.DEFAULT, count);
-
- return null;
- }
-
- public static void destroyInstances(String ec2AccessId,
- String ec2SecretKey,
- List<String> hostNames) throws Exception {
- Ec2Connection ec2 = new TypicaEc2Connection(ec2AccessId, ec2SecretKey);
- // TODO : fix me
- // ec2.delete(hostNames);
- }
-
- public static List<HostNamePair> listInstances(String ec2AccessId, String ec2SecretKey)
- throws Exception {
- Ec2Connection ec2 = new TypicaEc2Connection(ec2AccessId, ec2SecretKey);
- return ec2.list();
- }
-
- public static Map<String, Integer> generateClusterDescriptor(List<HostNamePair> hostNamePairs,
- String clusterName,
- String path) throws Exception {
- List<String> hostNames = new ArrayList<String>();
-
- for(HostNamePair hostNamePair: hostNamePairs)
- hostNames.add(hostNamePair.getInternalHostName());
-
- ClusterGenerator clusterGenerator = new ClusterGenerator();
- List<ClusterNodeDescriptor> nodes = clusterGenerator.createClusterNodeDescriptors(hostNames,
- 3);
- String clusterXml = clusterGenerator.createClusterDescriptor(clusterName, nodes);
- FileUtils.writeStringToFile(new File(path), clusterXml);
- Map<String, Integer> nodeIds = new HashMap<String, Integer>();
-
- for(ClusterNodeDescriptor node: nodes) {
- // OK, yeah, super-inefficient...
- for(HostNamePair hostNamePair: hostNamePairs) {
- if(node.getHostName().equals(hostNamePair.getInternalHostName()))
- nodeIds.put(hostNamePair.getExternalHostName(), node.getId());
- }
- }
-
- return nodeIds;
- }
-
-}
View
2 ...b/hadoop-store-builder/src/java/voldemort/store/readwrite/benchmark/BuildTestRWStore.java
@@ -90,7 +90,7 @@ public int run(String[] args) throws Exception {
SequenceFileInputFormat.class,
cluster,
def,
- (long) (1.5 * 1024 * 1024 * 1024),
+ 1,
new Path(tempDir),
new Path(inputDir));
builder.build();
View
59 ...ore-builder/src/java/voldemort/store/readwrite/mr/AbstractRWHadoopStoreBuilderMapper.java
@@ -18,6 +18,7 @@
import java.io.IOException;
import java.security.MessageDigest;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.BytesWritable;
@@ -38,6 +39,8 @@
import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.store.readonly.mr.AbstractStoreBuilderConfigurable;
import voldemort.utils.ByteUtils;
+import voldemort.versioning.ClockEntry;
+import voldemort.versioning.VectorClock;
/**
* A base class that can be used for building voldemort read-only stores. To use
@@ -61,8 +64,14 @@
private CompressionStrategy keyCompressor;
private SerializerDefinition keySerializerDefinition;
private SerializerDefinition valueSerializerDefinition;
- private int sizeInt = ByteUtils.SIZE_OF_INT;
+ private int sizeInt = ByteUtils.SIZE_OF_INT, vectorNodeId;
+ private long vectorNodeVersion, jobStartTime;
+ private List<ClockEntry> versions = new ArrayList<ClockEntry>();
+ private VectorClock vectorClock;
+ /**
+ * Can return a null in which case the record will be ignored
+ */
public abstract Object makeKey(K key, V value);
public abstract Object makeValue(K key, V value);
@@ -78,8 +87,14 @@ public void map(K key,
V value,
OutputCollector<BytesWritable, BytesWritable> output,
Reporter reporter) throws IOException {
- byte[] keyBytes = keySerializer.toBytes(makeKey(key, value));
- byte[] valBytes = valueSerializer.toBytes(makeValue(key, value));
+ Object keyObject = makeKey(key, value);
+ Object valueObject = makeValue(key, value);
+
+ if(keyObject == null || valueObject == null)
+ return;
+
+ byte[] keyBytes = keySerializer.toBytes(keyObject);
+ byte[] valBytes = valueSerializer.toBytes(valueObject);
// compress key and values if required
if(keySerializerDefinition.hasCompression()) {
@@ -90,8 +105,24 @@ public void map(K key,
valBytes = valueCompressor.deflate(valBytes);
}
- // Generate value
- byte[] outputValBytes = new byte[keyBytes.length + sizeInt + valBytes.length + sizeInt];
+ List<Node> nodeList = routingStrategy.routeRequest(keyBytes);
+
+ // Generate vector clock
+ versions.clear();
+ if(vectorNodeId < 0) {
+ // Use master node
+ versions.add(0, new ClockEntry((short) nodeList.get(0).getId(), vectorNodeVersion));
+ } else {
+ // Use node id specified
+ versions.add(0, new ClockEntry((short) vectorNodeId, vectorNodeVersion));
+ }
+ vectorClock = new VectorClock(versions, jobStartTime);
+ byte[] vectorClockBytes = vectorClock.toBytes();
+
+ // Generate mapper value
+ byte[] outputValBytes = new byte[(3 * sizeInt) + keyBytes.length + valBytes.length
+ + vectorClockBytes.length];
+
ByteUtils.writeInt(outputValBytes, keyBytes.length, 0);
System.arraycopy(keyBytes, 0, outputValBytes, sizeInt, keyBytes.length);
ByteUtils.writeInt(outputValBytes, valBytes.length, sizeInt + keyBytes.length);
@@ -100,11 +131,17 @@ public void map(K key,
outputValBytes,
sizeInt + sizeInt + keyBytes.length,
valBytes.length);
+ ByteUtils.writeInt(outputValBytes, vectorClockBytes.length, (2 * sizeInt) + keyBytes.length
+ + valBytes.length);
+ System.arraycopy(vectorClockBytes,
+ 0,
+ outputValBytes,
+ (3 * sizeInt) + keyBytes.length + valBytes.length,
+ vectorClockBytes.length);
BytesWritable outputVal = new BytesWritable(outputValBytes);
- // Generate key
+ // Generate mapper key & output
int chunkId = ReadOnlyUtils.chunk(md5er.digest(keyBytes), getNumChunks());
- List<Node> nodeList = routingStrategy.routeRequest(keyBytes);
for(Node node: nodeList) {
byte[] outputKeyBytes = new byte[sizeInt + sizeInt];
ByteUtils.writeInt(outputKeyBytes, node.getId(), 0);
@@ -143,5 +180,13 @@ public void configure(JobConf conf) {
RoutingStrategyFactory factory = new RoutingStrategyFactory();
routingStrategy = factory.updateRoutingStrategy(getStoreDef(), getCluster());
+
+ vectorNodeId = conf.getInt("vector.node.id", -1);
+ vectorNodeVersion = conf.getLong("vector.node.version", 1L);
+
+ jobStartTime = conf.getLong("job.start.time.ms", -1);
+ if(jobStartTime < 0) {
+ throw new RuntimeException("Incorrect job start time");
+ }
}
}
View
68 contrib/hadoop-store-builder/src/java/voldemort/store/readwrite/mr/HadoopRWStoreBuilder.java
@@ -16,11 +16,9 @@
package voldemort.store.readwrite.mr;
-import java.io.IOException;
import java.util.Collections;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -61,37 +59,39 @@
private final StoreDefinition storeDef;
private final Path inputPath;
private final Path tempPath;
- private final long chunkSizeBytes, hadoopPushVersion;
- private final int hadoopNodeId;
+ private final int reducersPerNode, vectorNodeId;
+ private final long vectorNodeVersion;
+ @SuppressWarnings("unchecked")
public HadoopRWStoreBuilder(Configuration conf,
Class<? extends AbstractRWHadoopStoreBuilderMapper<?, ?>> mapperClass,
Class<? extends InputFormat> inputFormatClass,
Cluster cluster,
StoreDefinition storeDef,
- long chunkSizeBytes,
+ int reducersPerNode,
Path tempPath,
Path inputPath) {
this(conf,
mapperClass,
inputFormatClass,
cluster,
storeDef,
- chunkSizeBytes,
- cluster.getNumberOfNodes(),
+ reducersPerNode,
+ -1,
1L,
tempPath,
inputPath);
}
+ @SuppressWarnings("unchecked")
public HadoopRWStoreBuilder(Configuration conf,
Class<? extends AbstractRWHadoopStoreBuilderMapper<?, ?>> mapperClass,
Class<? extends InputFormat> inputFormatClass,
Cluster cluster,
StoreDefinition storeDef,
- long chunkSizeBytes,
- int hadoopNodeId,
- long hadoopPushVersion,
+ int reducersPerNode,
+ int vectorNodeId,
+ long vectorNodeVersion,
Path tempPath,
Path inputPath) {
this.config = conf;
@@ -101,12 +101,11 @@ public HadoopRWStoreBuilder(Configuration conf,
this.cluster = Utils.notNull(cluster);
this.storeDef = Utils.notNull(storeDef);
this.tempPath = Utils.notNull(tempPath);
- this.hadoopNodeId = hadoopNodeId;
- this.hadoopPushVersion = hadoopPushVersion;
- this.chunkSizeBytes = chunkSizeBytes;
- if(chunkSizeBytes > MAX_CHUNK_SIZE || chunkSizeBytes < MIN_CHUNK_SIZE)
- throw new VoldemortException("Invalid chunk size, chunk size must be in the range "
- + MIN_CHUNK_SIZE + "..." + MAX_CHUNK_SIZE);
+ this.vectorNodeId = vectorNodeId;
+ this.vectorNodeVersion = vectorNodeVersion;
+ this.reducersPerNode = reducersPerNode;
+ if(reducersPerNode < 0)
+ throw new VoldemortException("Number of reducers cannot be negative");
}
/**
@@ -118,8 +117,8 @@ public void build() {
conf.set("cluster.xml", new ClusterMapper().writeCluster(cluster));
conf.set("stores.xml",
new StoreDefinitionsMapper().writeStoreList(Collections.singletonList(storeDef)));
- conf.setInt("hadoop.node.id", this.hadoopNodeId);
- conf.setLong("hadoop.push.version", this.hadoopPushVersion);
+ conf.setInt("vector.node.id", this.vectorNodeId);
+ conf.setLong("vector.node.version", this.vectorNodeVersion);
conf.setLong("job.start.time.ms", System.currentTimeMillis());
conf.setPartitionerClass(HadoopRWStoreBuilderPartitioner.class);
@@ -133,6 +132,7 @@ public void build() {
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setOutputKeyClass(BytesWritable.class);
conf.setOutputValueClass(BytesWritable.class);
+ conf.setReduceSpeculativeExecution(false);
conf.setJarByClass(getClass());
FileInputFormat.setInputPaths(conf, inputPath);
@@ -143,18 +143,13 @@ public void build() {
FileSystem tempFs = tempPath.getFileSystem(conf);
tempFs.delete(tempPath, true);
- long size = sizeOfPath(tempFs, inputPath);
- int numChunks = Math.max((int) (storeDef.getReplicationFactor() * size
- / cluster.getNumberOfNodes() / chunkSizeBytes), 1);
- logger.info("Data size = " + size + ", replication factor = "
- + storeDef.getReplicationFactor() + ", numNodes = "
- + cluster.getNumberOfNodes() + ", chunk size = " + chunkSizeBytes
- + ", num.chunks = " + numChunks);
- conf.setInt("num.chunks", numChunks);
- int numReduces = cluster.getNumberOfNodes() * numChunks;
- conf.setNumReduceTasks(numReduces);
-
- logger.info("Number of reduces: " + numReduces);
+ conf.setInt("num.chunks", reducersPerNode);
+ int numReducers = cluster.getNumberOfNodes() * reducersPerNode;
+ logger.info("Replication factor = " + storeDef.getReplicationFactor() + ", numNodes = "
+ + cluster.getNumberOfNodes() + ", reducers per node = " + reducersPerNode
+ + ", numReducers = " + numReducers);
+ conf.setNumReduceTasks(numReducers);
+
logger.info("Building RW store...");
JobClient.runJob(conf);
@@ -164,17 +159,4 @@ public void build() {
}
- private long sizeOfPath(FileSystem fs, Path path) throws IOException {
- long size = 0;
- FileStatus[] statuses = fs.listStatus(path);
- if(statuses != null) {
- for(FileStatus status: statuses) {
- if(status.isDir())
- size += sizeOfPath(fs, status.getPath());
- else
- size += status.getLen();
- }
- }
- return size;
- }
}
View
62 ...doop-store-builder/src/java/voldemort/store/readwrite/mr/HadoopRWStoreBuilderReducer.java
@@ -17,9 +17,7 @@
package voldemort.store.readwrite.mr;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
@@ -34,7 +32,6 @@
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.Pair;
-import voldemort.versioning.ClockEntry;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;
@@ -52,12 +49,15 @@
private long totalBytes;
private long transferStartTime;
private int sizeInt = ByteUtils.SIZE_OF_INT;
- private VectorClock vectorClock;
+
+ protected static enum RecordCounter {
+ RECORDS_STREAMED;
+ }
public void reduce(BytesWritable key,
final Iterator<BytesWritable> values,
OutputCollector<BytesWritable, BytesWritable> output,
- Reporter reporter) throws IOException {
+ final Reporter reporter) throws IOException {
this.transferStartTime = System.nanoTime();
this.iterator = new AbstractIterator<Pair<ByteArray, Versioned<byte[]>>>() {
@@ -66,33 +66,54 @@ public void reduce(BytesWritable key,
protected Pair<ByteArray, Versioned<byte[]>> computeNext() {
while(values.hasNext()) {
BytesWritable keyValue = values.next();
- byte[] tempKeyValueBytes = new byte[keyValue.get().length];
- System.arraycopy(keyValue.get(), 0, tempKeyValueBytes, 0, keyValue.get().length);
+ byte[] keyValueBytes = new byte[keyValue.get().length];
+ System.arraycopy(keyValue.get(), 0, keyValueBytes, 0, keyValue.get().length);
// Reading key
- int keyBytesLength = ByteUtils.readInt(tempKeyValueBytes, 0);
+ int keyBytesLength = ByteUtils.readInt(keyValueBytes, 0);
byte[] keyBytes = new byte[keyBytesLength];
- System.arraycopy(tempKeyValueBytes, sizeInt, keyBytes, 0, keyBytesLength);
+ System.arraycopy(keyValueBytes, sizeInt, keyBytes, 0, keyBytesLength);
// Reading value
- int valueBytesLength = ByteUtils.readInt(tempKeyValueBytes, sizeInt
- + keyBytesLength);
+ int valueBytesLength = ByteUtils.readInt(keyValueBytes, sizeInt
+ + keyBytesLength);
byte[] valueBytes = new byte[valueBytesLength];
- System.arraycopy(tempKeyValueBytes,
+ System.arraycopy(keyValueBytes,
sizeInt + sizeInt + keyBytesLength,
valueBytes,
0,
valueBytesLength);
- totalBytes += (keyBytesLength + valueBytesLength);
+ // Reading vector clock
+ int vectorClockBytesLength = ByteUtils.readInt(keyValueBytes,
+ sizeInt + sizeInt
+ + keyBytesLength
+ + valueBytesLength);
+ byte[] vectorClockBytes = new byte[vectorClockBytesLength];
+ System.arraycopy(keyValueBytes,
+ sizeInt + sizeInt + sizeInt + keyBytesLength
+ + valueBytesLength,
+ vectorClockBytes,
+ 0,
+ vectorClockBytesLength);
+ VectorClock vectorClock = new VectorClock(vectorClockBytes);
+
+ totalBytes += (keyBytesLength + valueBytesLength + vectorClockBytesLength);
+
+ // Generating output
ByteArray key = new ByteArray(keyBytes);
Versioned<byte[]> versioned = Versioned.value(valueBytes, vectorClock);
+
+ reporter.incrCounter(RecordCounter.RECORDS_STREAMED, 1);
return new Pair<ByteArray, Versioned<byte[]>>(key, versioned);
}
return endOfData();
}
};
+ logger.info("Connecting to admin client on " + this.nodeId + " - chunk id - "
+ + this.chunkId);
this.client.updateEntries(this.nodeId, getStoreName(), this.iterator, null);
+ logger.info("Completed transfer of chunk id " + this.chunkId + " to node " + this.nodeId);
}
@Override
@@ -101,21 +122,14 @@ public void configure(JobConf job) {
this.totalBytes = 0;
this.chunkId = job.getInt("mapred.task.partition", -1); // http://www.mail-archive.com/core-user@hadoop.apache.org/msg05749.html
- int hadoopNodeId = job.getInt("hadoop.node.id", -1);
- long hadoopPushVersion = job.getLong("hadoop.push.version", -1L);
- long jobStartTime = job.getLong("job.start.time.ms", -1);
- if(this.chunkId < 0 || hadoopPushVersion < 0 || hadoopNodeId < 0 || jobStartTime < 0) {
- throw new RuntimeException("Incorrect chunk id / hadoop push version / hadoop node id / job start time");
+
+ if(this.chunkId < 0) {
+ throw new RuntimeException("Incorrect chunk id ");
}
this.nodeId = this.chunkId / getNumChunks();
- List<ClockEntry> versions = new ArrayList<ClockEntry>();
- versions.add(0, new ClockEntry((short) hadoopNodeId, hadoopPushVersion));
- vectorClock = new VectorClock(versions, jobStartTime);
-
- logger.info("Working on Node id - " + this.nodeId + " and chunk id - " + this.chunkId);
this.client = new AdminClient(getCluster(), new AdminClientConfig());
- logger.info("Opening partition admin client " + this.nodeId);
+ logger.info("Reducer for Node id - " + this.nodeId + " and chunk id - " + this.chunkId);
}
@Override
View
48 ...ib/hadoop-store-builder/src/java/voldemort/store/readwrite/mr/HadoopRWStoreJobRunner.java
@@ -53,7 +53,7 @@
import com.google.common.collect.ImmutableCollection;
/**
- * A runner class to facitilate the launching of HadoopStoreBuilder from the
+ * A runner class to facilitate the launching of HadoopStoreBuilder from the
* command-line.
*/
@SuppressWarnings("deprecation")
@@ -82,11 +82,16 @@ private static OptionParser configureParser() {
parser.accepts("storedefinitions", "local path to stores.xml.").withRequiredArg();
parser.accepts("storename", "store name from store definition.").withRequiredArg();
parser.accepts("inputformat", "JavaClassName (default=text).").withRequiredArg();
- parser.accepts("chunksize", "maximum size of a chunk in bytes.").withRequiredArg();
+ parser.accepts("reducerspernode", "number of reducers per node (default=1)")
+ .withRequiredArg()
+ .ofType(Integer.class);
parser.accepts("jar", "mapper class jar if not in $HADOOP_CLASSPATH.").withRequiredArg();
- parser.accepts("hadoopnodeid", "node id for hadoop (default=num_nodes+1)")
- .withRequiredArg();
- parser.accepts("pushversion", "version of push (default=1)").withRequiredArg();
+ parser.accepts("vectornodeid", "node id whose vector clock to set (default=master)")
+ .withRequiredArg()
+ .ofType(Integer.class);
+ parser.accepts("vectorversion", "version of vector clock (default=1)")
+ .withRequiredArg()
+ .ofType(Long.class);
parser.accepts("help", "print usage information");
return parser;
}
@@ -108,8 +113,7 @@ public int run(String[] args) throws Exception {
"mapper",
"cluster",
"storedefinitions",
- "storename",
- "chunksize");
+ "storename");
if(missing.size() > 0) {
System.err.println("Missing required arguments: " + Joiner.on(", ").join(missing)
@@ -124,7 +128,6 @@ public int run(String[] args) throws Exception {
String storeName = (String) options.valueOf("storename");
Path inputPath = new Path((String) options.valueOf("input"));
Path tempPath = new Path((String) options.valueOf("temp"));
- long chunkSizeBytes = Long.parseLong((String) options.valueOf("chunksize"));
List<StoreDefinition> stores;
stores = new StoreDefinitionsMapper().readStoreList(new BufferedReader(new FileReader(storeDefFile)));
@@ -166,18 +169,25 @@ public int run(String[] args) throws Exception {
inputFormatClass = TextInputFormat.class;
}
- int hadoopNodeId;
- if(options.has("hadoopnodeid")) {
- hadoopNodeId = Integer.parseInt((String) options.valueOf("hadoopnodeid"));
+ int vectorNodeId;
+ if(options.has("vectornodeid")) {
+ vectorNodeId = (Integer) options.valueOf("vectornodeid");
} else {
- hadoopNodeId = (short) cluster.getNumberOfNodes();
+ vectorNodeId = -1; // To denote master
}
- long pushVersion;
- if(options.has("pushversion")) {
- pushVersion = Long.parseLong((String) options.valueOf("pushversion"));
+ long vectorNodeVersion;
+ if(options.has("vectorversion")) {
+ vectorNodeVersion = (Long) options.valueOf("vectorversion");
} else {
- pushVersion = 1L;
+ vectorNodeVersion = 1L;
+ }
+
+ int reducersPerNode;
+ if(options.has("reducerspernode")) {
+ reducersPerNode = (Integer) options.valueOf("reducerspernode");
+ } else {
+ reducersPerNode = 1;
}
Configuration conf = getConf();
@@ -194,9 +204,9 @@ public int run(String[] args) throws Exception {
inputFormatClass,
cluster,
storeDef,
- chunkSizeBytes,
- hadoopNodeId,
- pushVersion,
+ reducersPerNode,
+ vectorNodeId,
+ vectorNodeVersion,
tempPath,
inputPath);
View
2 contrib/hadoop-store-builder/test/voldemort/store/readwrite/mr/HadoopRWStoreBuilderTest.java
@@ -132,7 +132,7 @@ public void testHadoopBuild() throws Exception {
TextInputFormat.class,
cluster,
storeDef,
- 60 * 1000,
+ 1,
hadoopNodeId,
hadoopPushVersion,
new Path(tempDir.getAbsolutePath()),
View
343 src/java/voldemort/VoldemortAdminTool.java
@@ -28,6 +28,7 @@
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
+import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.util.Iterator;
import java.util.List;
@@ -64,10 +65,12 @@
import com.google.common.collect.Maps;
/**
- * Provides a command line interface to the {@link voldemort.client.protocol.admin.AdminClient}
+ * Provides a command line interface to the
+ * {@link voldemort.client.protocol.admin.AdminClient}
*/
public class VoldemortAdminTool {
- public static void main (String [] args) throws Exception {
+
+ public static void main(String[] args) throws Exception {
OptionParser parser = new OptionParser();
parser.accepts("help", "print help information");
parser.accepts("url", "[REQUIRED] bootstrap URL")
@@ -94,7 +97,7 @@ public static void main (String [] args) throws Exception {
.describedAs("partition-ids")
.withValuesSeparatedBy(',')
.ofType(Integer.class);
- parser.accepts("fetch-entries", "[EXPERIMENTAL] Fetch full entries")
+ parser.accepts("fetch-entries", "Fetch full entries")
.withRequiredArg()
.describedAs("partition-ids")
.withValuesSeparatedBy(',')
@@ -120,20 +123,22 @@ public static void main (String [] args) throws Exception {
.withRequiredArg()
.describedAs("input-directory")
.ofType(String.class);
-
+ parser.accepts("get-metadata", "retreive metadata information [stores.xml | cluster.xml]")
+ .withRequiredArg()
+ .describedAs("metadata-key")
+ .ofType(String.class);
+
OptionSet options = parser.parse(args);
- if (options.has("help")) {
+ if(options.has("help")) {
parser.printHelpOn(System.out);
System.exit(0);
}
- Set<String> missing = CmdUtils.missing(options,
- "url",
- "node");
- if (missing.size() > 0) {
+ Set<String> missing = CmdUtils.missing(options, "url", "node");
+ if(missing.size() > 0) {
// Not the most elegant way to do this
- if (!(missing.equals(ImmutableSet.of("node")) && (options.has("add-stores") || options.has("delete-store")))) {
+ if(!(missing.equals(ImmutableSet.of("node")) && (options.has("add-stores") || options.has("delete-store")))) {
System.err.println("Missing required arguments: " + Joiner.on(", ").join(missing));
parser.printHelpOn(System.err);
System.exit(1);
@@ -147,58 +152,62 @@ public static void main (String [] args) throws Exception {
AdminClient adminClient = new AdminClient(url, new AdminClientConfig());
String ops = "";
- if (options.has("delete-partitions")) {
+ if(options.has("delete-partitions")) {
ops += "d";
}
- if (options.has("fetch-keys")) {
+ if(options.has("fetch-keys")) {
ops += "k";
}
- if (options.has("fetch-entries")) {
+ if(options.has("fetch-entries")) {
ops += "v";
}
- if (options.has("restore")) {
+ if(options.has("restore")) {
ops += "r";
}
- if (options.has("add-stores")) {
+ if(options.has("add-stores")) {
ops += "a";
}
- if (options.has("update-entries")) {
+ if(options.has("update-entries")) {
ops += "u";
}
- if (options.has("delete-store")) {
+ if(options.has("delete-store")) {
ops += "s";
}
+ if(options.has("get-metadata")) {
+ ops += "g";
+ }
if(ops.length() < 1) {
- Utils.croak("At least one of (delete-partitions, restore, add-node, fetch-entries, fetch-keys, add-stores, delete-store, update-entries) must be specified");
+ Utils.croak("At least one of (delete-partitions, restore, add-node, fetch-entries, fetch-keys, add-stores, delete-store, update-entries, get-metadata) must be specified");
}
List<String> storeNames = null;
- if (options.has("stores")) {
- // For some reason one can't just do @SuppressWarnings without identifier following it
+ if(options.has("stores")) {
+ // For some reason one can't just do @SuppressWarnings without
+ // identifier following it
@SuppressWarnings("unchecked")
List<String> temp = (List<String>) options.valuesOf("stores");
storeNames = temp;
}
try {
- if (ops.contains("d")) {
+ if(ops.contains("d")) {
System.out.println("Starting delete-partitions");
@SuppressWarnings("unchecked")
List<Integer> partitionIdList = (List<Integer>) options.valuesOf("delete-partitions");
executeDeletePartitions(nodeId, adminClient, partitionIdList, storeNames);
System.out.println("Finished delete-partitions");
}
- if (ops.contains("r")) {
+ if(ops.contains("r")) {
System.out.println("Starting restore");
adminClient.restoreDataFromReplications(nodeId, parallelism);
System.err.println("Finished restore");
}
- if (ops.contains("k")) {
- if (!options.has("outdir")) {
- Utils.croak("Directory name (outdir) must be specified for fetch-keys");
+ if(ops.contains("k")) {
+ String outputDir = null;
+ if(options.has("outdir")) {
+ outputDir = (String) options.valueOf("outdir");
}
- String outputDir = (String) options.valueOf("outdir");
boolean useAscii = options.has("ascii");
System.out.println("Starting fetch keys");
@SuppressWarnings("unchecked")
@@ -210,41 +219,58 @@ public static void main (String [] args) throws Exception {
storeNames,
useAscii);
}
- if (ops.contains("v")) {
- if (!options.has("outdir")) {
- Utils.croak("Directory name (outdir) must be specified for fetch-values");
+ if(ops.contains("v")) {
+ String outputDir = null;
+ if(options.has("outdir")) {
+ outputDir = (String) options.valueOf("outdir");
}
- String outputDir = (String) options.valueOf("outdir");
boolean useAscii = options.has("ascii");
@SuppressWarnings("unchecked")
- List<Integer> partitionIdList = (List<Integer>) options.valuesOf("fetch-values");
+ List<Integer> partitionIdList = (List<Integer>) options.valuesOf("fetch-entries");
executeFetchEntries(nodeId,
- adminClient,
- partitionIdList,
- outputDir,
- storeNames,
- useAscii);
+ adminClient,
+ partitionIdList,
+ outputDir,
+ storeNames,
+ useAscii);
}
- if (ops.contains("a")) {
+ if(ops.contains("a")) {
String storesXml = (String) options.valueOf("add-stores");
executeAddStores(adminClient, storesXml, storeNames);
}
- if (ops.contains("u")) {
+ if(ops.contains("u")) {
String inputDir = (String) options.valueOf("update-entries");
boolean useAscii = options.has("ascii");
executeUpdateEntries(nodeId, adminClient, storeNames, inputDir, useAscii);
-
}
if(ops.contains("s")) {
String storeName = (String) options.valueOf("delete-store");
executeDeleteStore(adminClient, storeName);
}
+ if(ops.contains("g")) {
+ String metadataKey = (String) options.valueOf("get-metadata");
+ executeGetMetadata(nodeId, adminClient, metadataKey);
+ }
} catch(Exception e) {
e.printStackTrace();
Utils.croak(e.getMessage());
}
}
+ public static void executeGetMetadata(Integer nodeId,
+ AdminClient adminClient,
+ String metadataKey) {
+ Versioned<String> versioned = adminClient.getRemoteMetadata(nodeId, metadataKey);
+ if(versioned == null) {
+ System.out.println("null");
+ } else {
+ System.out.println(versioned.getVersion());
+ System.out.print(": ");
+ System.out.println(versioned.getValue());
+ System.out.println();
+ }
+ }
+
public static void executeDeleteStore(AdminClient adminClient, String storeName) {
System.out.println("Deleting " + storeName);
adminClient.deleteStore(storeName);
@@ -255,15 +281,15 @@ public static void executeAddStores(AdminClient adminClient,
List<String> storeNames) throws IOException {
List<StoreDefinition> storeDefinitionList = new StoreDefinitionsMapper().readStoreList(new File(storesXml));
Map<String, StoreDefinition> storeDefinitionMap = Maps.newHashMap();
- for (StoreDefinition storeDefinition: storeDefinitionList) {
+ for(StoreDefinition storeDefinition: storeDefinitionList) {
storeDefinitionMap.put(storeDefinition.getName(), storeDefinition);
}
List<String> stores = storeNames;
- if (stores == null) {
+ if(stores == null) {
stores = Lists.newArrayList();
stores.addAll(storeDefinitionMap.keySet());
}
- for (String store: stores) {
+ for(String store: stores) {
System.out.println("Adding " + store);
adminClient.addStore(storeDefinitionMap.get(store));
}
@@ -276,35 +302,48 @@ public static void executeFetchEntries(Integer nodeId,
String outputDir,
List<String> storeNames,
boolean useAscii) throws IOException {
- List<StoreDefinition> storeDefinitionList = adminClient.getRemoteStoreDefList(nodeId).getValue();
+
+ List<StoreDefinition> storeDefinitionList = adminClient.getRemoteStoreDefList(nodeId)
+ .getValue();
Map<String, StoreDefinition> storeDefinitionMap = Maps.newHashMap();
- for (StoreDefinition storeDefinition: storeDefinitionList) {
+ for(StoreDefinition storeDefinition: storeDefinitionList) {
storeDefinitionMap.put(storeDefinition.getName(), storeDefinition);
}
- File directory = new File(outputDir);
- if (directory.exists() || directory.mkdir()) {
- List<String> stores = storeNames;
- if (stores == null) {
- stores = Lists.newArrayList();
- stores.addAll(storeDefinitionMap.keySet());
+ File directory = null;
+ if(outputDir != null) {
+ directory = new File(outputDir);
+ if(!(directory.exists() || directory.mkdir())) {
+ Utils.croak("Can't find or create directory " + outputDir);
+ }
+ }
+ List<String> stores = storeNames;
+ if(stores == null) {
+ stores = Lists.newArrayList();
+ stores.addAll(storeDefinitionMap.keySet());
+ }
+ for(String store: stores) {
+ System.out.println("Fetching entries in partitions "
+ + Joiner.on(", ").join(partitionIdList) + " of " + store);
+ Iterator<Pair<ByteArray, Versioned<byte[]>>> entriesIterator = adminClient.fetchEntries(nodeId,
+ store,
+ partitionIdList,
+ null,
+ false);
+ File outputFile = null;
+ if(directory != null) {
+ outputFile = new File(directory, store + ".entries");
}
- for (String store: stores) {
- System.out.println("Fetching entries in partitions " + Joiner.on(", ").join(partitionIdList) + " of " + store);
- Iterator<Pair<ByteArray, Versioned<byte[]>>> entriesIterator = adminClient.fetchEntries(nodeId,
- store,
- partitionIdList,
- null,
- false);
- File outputFile = new File(directory, store + ".entries");
- if (useAscii) {
- StoreDefinition storeDefinition = storeDefinitionMap.get(store);
- writeEntriesAscii(entriesIterator, outputFile, storeDefinition);
- } else {
- writeEntriesBinary(entriesIterator, outputFile);
- }
+ if(useAscii) {
+ StoreDefinition storeDefinition = storeDefinitionMap.get(store);
+ writeEntriesAscii(entriesIterator, outputFile, storeDefinition);
+ } else {
+ writeEntriesBinary(entriesIterator, outputFile);
}
+
+ if(outputFile != null)
+ System.out.println("Fetched keys from " + store + " to " + outputFile);
}
}
@@ -314,56 +353,57 @@ private static void executeUpdateEntries(Integer nodeId,
List<String> storeNames,
String inputDirPath,
boolean useAscii) throws IOException {
- List<StoreDefinition> storeDefinitionList = adminClient.getRemoteStoreDefList(nodeId).getValue();
+ List<StoreDefinition> storeDefinitionList = adminClient.getRemoteStoreDefList(nodeId)
+ .getValue();
Map<String, StoreDefinition> storeDefinitionMap = Maps.newHashMap();
- for (StoreDefinition storeDefinition: storeDefinitionList) {
+ for(StoreDefinition storeDefinition: storeDefinitionList) {
storeDefinitionMap.put(storeDefinition.getName(), storeDefinition);
}
File inputDir = new File(inputDirPath);
- if (!inputDir.exists()) {
+ if(!inputDir.exists()) {
throw new FileNotFoundException("input directory " + inputDirPath + " doesn't exist");
}
- if (storeNames == null) {
+ if(storeNames == null) {
storeNames = Lists.newArrayList();
- for (File storeFile: inputDir.listFiles()) {
+ for(File storeFile: inputDir.listFiles()) {
String fileName = storeFile.getName();
- if (fileName.endsWith(".entries")) {
+ if(fileName.endsWith(".entries")) {
int extPosition = fileName.lastIndexOf(".entries");
storeNames.add(fileName.substring(0, extPosition));
}
}
}
- for (String storeName: storeNames) {
+ for(String storeName: storeNames) {
Iterator<Pair<ByteArray, Versioned<byte[]>>> iterator;
- if (useAscii) {
+ if(useAscii) {
StoreDefinition storeDefinition = storeDefinitionMap.get(storeName);
- if (storeDefinition == null) {
+ if(storeDefinition == null) {
throw new IllegalArgumentException("No definition found for " + storeName);
}
- iterator = readEntriesAscii(inputDir,storeName,storeDefinition);
+ iterator = readEntriesAscii(inputDir, storeName, storeDefinition);
} else {
- iterator = readEntriesBinary(inputDir,storeName);
-
+ iterator = readEntriesBinary(inputDir, storeName);
}
adminClient.updateEntries(nodeId, storeName, iterator, null);
}
}
// TODO: implement this
- private static Iterator<Pair<ByteArray,Versioned<byte[]>>> readEntriesAscii(File inputDir,
- String storeName,
- StoreDefinition storeDefinition)
- throws IOException {
+ private static Iterator<Pair<ByteArray, Versioned<byte[]>>> readEntriesAscii(File inputDir,
+ String storeName,
+ StoreDefinition storeDefinition)
+ throws IOException {
File inputFile = new File(inputDir, storeName + ".entries");
if(!inputFile.exists()) {
- throw new FileNotFoundException("File " + inputFile.getAbsolutePath() + " does not exist!");
+ throw new FileNotFoundException("File " + inputFile.getAbsolutePath()
+ + " does not exist!");
}
- return new AbstractIterator<Pair<ByteArray, Versioned<byte[]>>> () {
+ return new AbstractIterator<Pair<ByteArray, Versioned<byte[]>>>() {
@Override
protected Pair<ByteArray, Versioned<byte[]>> computeNext() {
@@ -373,15 +413,18 @@ private static void executeUpdateEntries(Integer nodeId,
};
}
- private static Iterator<Pair<ByteArray,Versioned<byte[]>>> readEntriesBinary(File inputDir, String storeName)
- throws IOException {
+ private static Iterator<Pair<ByteArray, Versioned<byte[]>>> readEntriesBinary(File inputDir,
+ String storeName)
+ throws IOException {
File inputFile = new File(inputDir, storeName + ".entries");
- if (!inputFile.exists()) {
- throw new FileNotFoundException("File " + inputFile.getAbsolutePath() + " does not exist!");
+ if(!inputFile.exists()) {
+ throw new FileNotFoundException("File " + inputFile.getAbsolutePath()
+ + " does not exist!");
}
final DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(inputFile)));
- return new AbstractIterator<Pair<ByteArray, Versioned<byte[]>>> () {
+ return new AbstractIterator<Pair<ByteArray, Versioned<byte[]>>>() {
+
@Override
protected Pair<ByteArray, Versioned<byte[]>> computeNext() {
try {
@@ -400,17 +443,17 @@ private static void executeUpdateEntries(Integer nodeId,
Versioned<byte[]> value = new Versioned<byte[]>(valueBytes, version);
return new Pair<ByteArray, Versioned<byte[]>>(key, value);
- } catch (EOFException e) {
+ } catch(EOFException e) {
try {
dis.close();
- } catch (IOException ie) {
+ } catch(IOException ie) {
ie.printStackTrace();
}
return endOfData();
- } catch (IOException e) {
+ } catch(IOException e) {
try {
dis.close();
- } catch (IOException ie) {
+ } catch(IOException ie) {
ie.printStackTrace();
}
throw new VoldemortException("Error reading from input file ", e);
@@ -422,7 +465,12 @@ private static void executeUpdateEntries(Integer nodeId,
private static void writeEntriesAscii(Iterator<Pair<ByteArray, Versioned<byte[]>>> iterator,
File outputFile,
StoreDefinition storeDefinition) throws IOException {
- BufferedWriter writer = new BufferedWriter(new FileWriter(outputFile));
+ BufferedWriter writer = null;
+ if(outputFile != null) {
+ writer = new BufferedWriter(new FileWriter(outputFile));
+ } else {
+ writer = new BufferedWriter(new OutputStreamWriter(System.out));
+ }
SerializerFactory serializerFactory = new DefaultSerializerFactory();
StringWriter stringWriter = new StringWriter();
JsonGenerator generator = new JsonFactory(new ObjectMapper()).createJsonGenerator(stringWriter);
@@ -433,7 +481,7 @@ private static void writeEntriesAscii(Iterator<Pair<ByteArray, Versioned<byte[]>
Serializer<Object> valueSerializer = (Serializer<Object>) serializerFactory.getSerializer(storeDefinition.getValueSerializer());
try {
- while (iterator.hasNext()) {
+ while(iterator.hasNext()) {
Pair<ByteArray, Versioned<byte[]>> kvPair = iterator.next();
byte[] keyBytes = kvPair.getFirst().get();
VectorClock version = (VectorClock) kvPair.getSecond().getVersion();
@@ -448,7 +496,7 @@ private static void writeEntriesAscii(Iterator<Pair<ByteArray, Versioned<byte[]>
generator.writeObject(valueObject);
StringBuffer buf = stringWriter.getBuffer();
- if (buf.charAt(0) == ' ') {
+ if(buf.charAt(0) == ' ') {
buf.setCharAt(0, '\n');
}
writer.write(buf.toString());
@@ -462,9 +510,14 @@ private static void writeEntriesAscii(Iterator<Pair<ByteArray, Versioned<byte[]>
private static void writeEntriesBinary(Iterator<Pair<ByteArray, Versioned<byte[]>>> iterator,
File outputFile) throws IOException {
- DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(outputFile)));
+ DataOutputStream dos = null;
+ if(outputFile != null) {
+ dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(outputFile)));
+ } else {
+ dos = new DataOutputStream(new BufferedOutputStream(System.out));
+ }
try {
- while (iterator.hasNext()) {
+ while(iterator.hasNext()) {
Pair<ByteArray, Versioned<byte[]>> kvPair = iterator.next();
byte[] keyBytes = kvPair.getFirst().get();
byte[] versionBytes = ((VectorClock) kvPair.getSecond().getVersion()).toBytes();
@@ -480,61 +533,81 @@ private static void writeEntriesBinary(Iterator<Pair<ByteArray, Versioned<byte[]
dos.close();
}
}
-
+
public static void executeFetchKeys(Integer nodeId,
AdminClient adminClient,
List<Integer> partitionIdList,
String outputDir,
List<String> storeNames,
boolean useAscii) throws IOException {
- List<StoreDefinition> storeDefinitionList = adminClient.getRemoteStoreDefList(nodeId).getValue();
+ List<StoreDefinition> storeDefinitionList = adminClient.getRemoteStoreDefList(nodeId)
+ .getValue();
Map<String, StoreDefinition> storeDefinitionMap = Maps.newHashMap();
- for (StoreDefinition storeDefinition: storeDefinitionList) {
+ for(StoreDefinition storeDefinition: storeDefinitionList) {
storeDefinitionMap.put(storeDefinition.getName(), storeDefinition);
}
- File directory = new File(outputDir);
- if (directory.exists() || directory.mkdir()) {
- List<String> stores = storeNames;
- if (stores == null) {
- stores = Lists.newArrayList();
- stores.addAll(storeDefinitionMap.keySet());
+ File directory = null;
+ if(outputDir != null) {
+ directory = new File(outputDir);
+ if(!(directory.exists() || directory.mkdir())) {
+ Utils.croak("Can't find or create directory " + outputDir);
}
- for (String store: stores) {
- System.out.println("Fetching keys in partitions " + Joiner.on(", ").join(partitionIdList) + " of " + store);
- Iterator<ByteArray> keyIterator = adminClient.fetchKeys(nodeId, store, partitionIdList, null, false);
- File outputFile = new File(directory, store + ".keys");
- if (useAscii) {
- StoreDefinition storeDefinition = storeDefinitionMap.get(store);
- writeKeysAscii(keyIterator, outputFile, storeDefinition);
- } else {
- writeKeysBinary(keyIterator, outputFile);
- }
+ }
- System.out.println("Fetched keys from " + store + " to " + outputFile);
+ List<String> stores = storeNames;
+ if(stores == null) {
+ stores = Lists.newArrayList();
+ stores.addAll(storeDefinitionMap.keySet());
+ }
+ for(String store: stores) {
+ System.out.println("Fetching keys in partitions "
+ + Joiner.on(", ").join(partitionIdList) + " of " + store);
+ Iterator<ByteArray> keyIterator = adminClient.fetchKeys(nodeId,
+ store,
+ partitionIdList,
+ null,
+ false);
+ File outputFile = null;
+ if(directory != null) {
+ outputFile = new File(directory, store + ".keys");
}
- } else {
- Utils.croak("Can't find or create directory " + outputDir);
+
+ if(useAscii) {
+ StoreDefinition storeDefinition = storeDefinitionMap.get(store);
+ writeKeysAscii(keyIterator, outputFile, storeDefinition);
+ } else {
+ writeKeysBinary(keyIterator, outputFile);
+ }
+
+ if(outputFile != null)
+ System.out.println("Fetched keys from " + store + " to " + outputFile);
}
}
private static void writeKeysAscii(Iterator<ByteArray> keyIterator,
File outputFile,
StoreDefinition storeDefinition) throws IOException {
- BufferedWriter writer = new BufferedWriter(new FileWriter(outputFile));
+ BufferedWriter writer = null;
+ if(outputFile != null) {
+ writer = new BufferedWriter(new FileWriter(outputFile));
+ } else {
+ writer = new BufferedWriter(new OutputStreamWriter(System.out));
+ }
+
SerializerFactory serializerFactory = new DefaultSerializerFactory();
StringWriter stringWriter = new StringWriter();
JsonGenerator generator = new JsonFactory(new ObjectMapper()).createJsonGenerator(stringWriter);
@SuppressWarnings("unchecked")
Serializer<Object> serializer = (Serializer<Object>) serializerFactory.getSerializer(storeDefinition.getKeySerializer());
try {
- while (keyIterator.hasNext()) {
+ while(keyIterator.hasNext()) {
// Ugly hack to be able to separate text by newlines vs. spaces
byte[] keyBytes = keyIterator.next().get();
Object keyObject = serializer.toObject(keyBytes);
generator.writeObject(keyObject);
StringBuffer buf = stringWriter.getBuffer();
- if (buf.charAt(0) == ' ') {
+ if(buf.charAt(0) == ' ') {
buf.setCharAt(0, '\n');
}
writer.write(buf.toString());
@@ -545,12 +618,18 @@ private static void writeKeysAscii(Iterator<ByteArray> keyIterator,
writer.close();
}
}
-
- private static void writeKeysBinary(Iterator<ByteArray> keyIterator, File outputFile) throws IOException {
- DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(outputFile)));
+
+ private static void writeKeysBinary(Iterator<ByteArray> keyIterator, File outputFile)
+ throws IOException {
+ DataOutputStream dos = null;
+ if(outputFile != null) {
+ dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(outputFile)));
+ } else {
+ dos = new DataOutputStream(new BufferedOutputStream(System.out));
+ }
try {
- while (keyIterator.hasNext()) {
+ while(keyIterator.hasNext()) {
byte[] keyBytes = keyIterator.next().get();
dos.writeInt(keyBytes.length);
dos.write(keyBytes);
@@ -565,16 +644,18 @@ public static void executeDeletePartitions(Integer nodeId,
List<Integer> partitionIdList,
List<String> storeNames) {
List<String> stores = storeNames;
- if (stores == null) {
+ if(stores == null) {
stores = Lists.newArrayList();
- List<StoreDefinition> storeDefinitionList = adminClient.getRemoteStoreDefList(nodeId).getValue();
- for (StoreDefinition storeDefinition: storeDefinitionList) {
+ List<StoreDefinition> storeDefinitionList = adminClient.getRemoteStoreDefList(nodeId)
+ .getValue();
+ for(StoreDefinition storeDefinition: storeDefinitionList) {
stores.add(storeDefinition.getName());
}
}
-
- for (String store: stores) {
- System.out.println("Deleting partitions " + Joiner.on(", ").join(partitionIdList) + " of " + store);
+
+ for(String store: stores) {
+ System.out.println("Deleting partitions " + Joiner.on(", ").join(partitionIdList)
+ + " of " + store);
adminClient.deletePartitions(nodeId, store, partitionIdList, null);
}
}
View
4 src/java/voldemort/server/scheduler/SchedulerService.java
@@ -21,8 +21,6 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import org.apache.log4j.Logger;
-
import voldemort.annotations.jmx.JmxManaged;
import voldemort.server.AbstractService;
import voldemort.server.ServiceType;
@@ -36,8 +34,6 @@
@JmxManaged(description = "A service that runs scheduled jobs.")
public class SchedulerService extends AbstractService {
- private static final Logger logger = Logger.getLogger(SchedulerService.class.getName());
-
private final ScheduledThreadPoolExecutor scheduler;
private final Time time;
View
5 src/java/voldemort/server/storage/StorageService.java
@@ -256,6 +256,11 @@ public void unregisterEngine(StoreDefinition storeDef, StorageEngine<ByteArray,
}
}
+ if(voldemortConfig.isServerRoutingEnabled()) {
+ this.storeRepository.removeRoutedStore(engineName);
+ for(Node node: metadata.getCluster().getNodes())
+ this.storeRepository.removeNodeStore(storeDef.getName(), node.getId());
+ }
}
storeRepository.removeStorageEngine(engineName);
View
49 src/java/voldemort/store/readonly/swapper/AdminStoreSwapper.java
@@ -16,6 +16,10 @@
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
+import voldemort.utils.RebalanceUtils;
+import voldemort.versioning.Occured;
+import voldemort.versioning.VectorClock;
+import voldemort.versioning.Versioned;
public class AdminStoreSwapper extends StoreSwapper {
@@ -104,4 +108,47 @@ protected void invokeSwap(String storeName, List<String> fetchFiles) {
if(exception != null)
throw new VoldemortException(exception);
}
-}
+
+ public void invokeUpdateClusterMetadata() {
+ Versioned<Cluster> latestCluster = new Versioned<Cluster>(adminClient.getAdminClientCluster());
+ List<Integer> requiredNodeIds = new ArrayList<Integer>();
+
+ ArrayList<Versioned<Cluster>> clusterList = new ArrayList<Versioned<Cluster>>();
+ clusterList.add(latestCluster);
+
+ boolean sameCluster = true;
+ for(Node node: adminClient.getAdminClientCluster().getNodes()) {
+ requiredNodeIds.add(node.getId());
+ try {
+ Versioned<Cluster> versionedCluster = adminClient.getRemoteCluster(node.getId());
+ VectorClock newClock = (VectorClock) versionedCluster.getVersion();
+ if(null != newClock && !clusterList.contains(versionedCluster)) {
+
+ if(sameCluster
+ && !adminClient.getAdminClientCluster().equals(versionedCluster.getValue())) {
+ sameCluster = false;
+ }
+ // check no two clocks are concurrent.
+ RebalanceUtils.checkNotConcurrent(clusterList, newClock);
+
+ // add to clock list
+ clusterList.add(versionedCluster);
+
+ // update latestClock
+ Occured occured = newClock.compare(latestCluster.getVersion());
+ if(Occured.AFTER.equals(occured))
+ latestCluster = versionedCluster;
+ }
+ } catch(Exception e) {
+ throw new VoldemortException("Failed to get cluster metadata from node:" + node, e);
+ }
+ }
+
+ if(!sameCluster) {
+ VectorClock latestClock = (VectorClock) latestCluster.getVersion();
+ latestClock.incrementVersion(cluster.getNodes().iterator().next().getId(),
+ System.currentTimeMillis());
+ RebalanceUtils.propagateCluster(adminClient, cluster, latestClock, requiredNodeIds);
+ }
+ }
+}
View
1 src/java/voldemort/store/rebalancing/RedirectingStore.java
@@ -16,7 +16,6 @@
package voldemort.store.rebalancing;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
View
2 src/java/voldemort/utils/RebalanceUtils.java
@@ -191,7 +191,7 @@ public static Node updateNode(Node node, List<Integer> partitionsList) {
return latestCluster;
}
- private static void checkNotConcurrent(ArrayList<Versioned<Cluster>> clockList,
+ public static void checkNotConcurrent(ArrayList<Versioned<Cluster>> clockList,
VectorClock newClock) {
for(Versioned<Cluster> versionedCluster: clockList) {
VectorClock clock = (VectorClock) versionedCluster.getVersion();
View
1 src/java/voldemort/xml/StoreDefinitionsMapper.java
@@ -156,6 +156,7 @@ public StoreDefinition readStore(Reader input) {
}
}
+ @SuppressWarnings("unchecked")
private StoreDefinition readStore(Element store) {
String name = store.getChildText(STORE_NAME_ELMT);
String storeType = store.getChildText(STORE_PERSISTENCE_ELMT);
View
31 test/integration/voldemort/performance/benchmark/Benchmark.java
@@ -160,55 +160,56 @@ public void run() {
private VoldemortWrapper db;
private boolean runBenchmark;
- private boolean verbose;
- private Workload workLoad;
- private int opsCount;
+ private boolean isVerbose;
+ private Workload clientWorkLoad;
+ private int operationsCount;
private double targetThroughputPerMs;
private int opsDone;
private final WorkloadPlugin plugin;
public ClientThread(VoldemortWrapper db,
boolean runBenchmark,
- Workload workLoad,
- int opsCount,
+ Workload w,
+ int operationsCount,
double targetThroughputPerMs,
- boolean verbose,
+ boolean isVerbose,
WorkloadPlugin plugin) {
this.db = db;
this.runBenchmark = runBenchmark;
- this.workLoad = workLoad;
- this.opsCount = opsCount;
+ this.clientWorkLoad = w;
+ this.operationsCount = operationsCount;
this.opsDone = 0;
this.targetThroughputPerMs = targetThroughputPerMs;
- this.verbose = verbose;
+ this.isVerbose = isVerbose;
this.plugin = plugin;
}
public int getOpsDone() {
return this.opsDone;
}
+ @Override
public void run() {
long startTime = System.currentTimeMillis();
- while(opsDone < this.opsCount) {
+ while(opsDone < this.operationsCount) {
try {
if(runBenchmark) {
- if(!workLoad.doTransaction(this.db, plugin)) {
+ if(!clientWorkLoad.doTransaction(this.db, plugin)) {
break;
}
} else {
- if(!workLoad.doWrite(this.db, plugin)) {
+ if(!clientWorkLoad.doWrite(this.db, plugin)) {
break;
}
}
} catch(Exception e) {
- if(this.verbose)
+ if(this.isVerbose)
e.printStackTrace();
}
opsDone++;
if(targetThroughputPerMs > 0) {
- while(System.currentTimeMillis() - startTime < ((double) opsDone)
- / targetThroughputPerMs) {
+ double timePerOp = ((double) opsDone) / targetThroughputPerMs;
+ while(System.currentTimeMillis() - startTime < timePerOp) {
try {
sleep(1);
} catch(InterruptedException e) {}
View
2 test/integration/voldemort/performance/benchmark/Measurement.java
@@ -69,7 +69,7 @@ public String getName() {
public Measurement(String name, boolean summaryOnly) {
this.name = name;
- this.buckets = 1000;
+ this.buckets = 3000; // Default bucket size of 3000 milliseconds
this.histogram = new int[buckets];
this.histogramOverflow = 0;
this.operations = 0;
View
10 test/integration/voldemort/performance/benchmark/VoldemortWrapper.java
@@ -85,15 +85,13 @@ public void mixed(final Object key, final Object newValue) {
@Override
public void update(StoreClient<Object, Object> storeClient) {
long startNs = System.nanoTime();
- Versioned<Object> v = storeClient.get(key);
- if(v != null) {
- voldemortStore.put(key, newValue);
- }
+ storeClient.get(key);
+ storeClient.put(key, newValue);
long endNs = System.nanoTime();
measurement.recordLatency(Operations.Mixed.getOpString(),
(int) ((endNs - startNs) / Time.NS_PER_MS));
}
- }, 3);
+ });
ReturnCode res = ReturnCode.Error;
if(updated) {
@@ -115,7 +113,7 @@ public void update(StoreClient<Object, Object> storeClient) {
measurement.recordLatency(Operations.Write.getOpString(),
(int) ((endNs - startNs) / Time.NS_PER_MS));
}
- }, 3);
+ });
ReturnCode res = ReturnCode.Error;
if(written) {
View
7 test/unit/voldemort/client/AdminServiceBasicTest.java
@@ -265,6 +265,13 @@ public void testDeleteStore() throws Exception {
if(!(e instanceof BootstrapFailureException))
throw e;
}
+ // try adding the store again
+ adminClient.addStore(definition);
+
+ client = factory.getStoreClient("deleteTest");
+ client.put("abc", "123");
+ String s = (String) client.get("abc").getValue();
+ assertEquals(s, "123");
}
@Test
View
2 test/unit/voldemort/routing/ZoneRoutingStrategyTest.java
@@ -27,8 +27,6 @@
public class ZoneRoutingStrategyTest extends TestCase {
- private final byte[] key = new byte[0];
-
private ZoneRoutingStrategy getRouter(int... zonesRepFactor) {
int totalZoneRepFactor = 0;
for(int i = 0; i < zonesRepFactor.length; i++) {
View
1 test/unit/voldemort/server/gossip/GossiperTest.java
@@ -48,7 +48,6 @@
10000,
100000,
32 * 1024);
- private static String testStoreName = "test-replication-memory";
private static String storesXmlfile = "test/common/voldemort/config/stores.xml";
private final boolean useNio;
View
4 test/unit/voldemort/store/readonly/ReadOnlyUtilsTest.java
@@ -16,8 +16,6 @@
package voldemort.store.readonly;
-import java.io.IOException;
-
import junit.framework.TestCase;
import voldemort.utils.ByteUtils;
@@ -28,7 +26,7 @@
*/
public class ReadOnlyUtilsTest extends TestCase {
- public void testMinIntegerBug() throws IOException {
+ public void testMinIntegerBug() {
byte[] keyBytes = new byte[4];
ByteUtils.writeInt(keyBytes, Integer.MIN_VALUE, 0);
assertEquals(0, ReadOnlyUtils.chunk(keyBytes, 15));
View
6 test/unit/voldemort/store/routed/NodeValueTest.java
@@ -47,9 +47,9 @@ public void testHashCodeEquals() {
assertFalse(v1.equals(v3));
assertFalse(v1.hashCode() == v3.hashCode());
- NodeValue nv1 = createNodeValue(1, v1);
- NodeValue nv2 = createNodeValue(1, v2);
- NodeValue nv3 = createNodeValue(1, v3);
+ NodeValue<ByteArray, byte[]> nv1 = createNodeValue(1, v1);
+ NodeValue<ByteArray, byte[]> nv2 = createNodeValue(1, v2);
+ NodeValue<ByteArray, byte[]> nv3 = createNodeValue(1, v3);
logger.info("nv1 = " + nv1);
logger.info("nv2 = " + nv2);
View
1 test/unit/voldemort/utils/RebalanceUtilsTest.java
@@ -35,7 +35,6 @@
public class RebalanceUtilsTest extends TestCase {
- private static String testStoreName = "test-replication-memory";
private static String storeDefFile = "test/common/voldemort/config/stores.xml";
private Cluster currentCluster;

0 comments on commit 683873e

Please sign in to comment.