Skip to content
Browse files

Misc. read-only store improvements. Some cleanup in hadoop contrib.

  • Loading branch information...
1 parent 44d1cba commit d2274da1d66fd262b3064275cd5ae9efaf4487d4 @jkreps jkreps committed
Showing with 1,077 additions and 1,708 deletions.
  1. +3 −4 .classpath
  2. +5 −0 bin/run-class.sh
  3. +15 −0 build.xml
  4. +0 −104 contrib/batch-indexer/src/java/voldemort/contrib/batchindexer/ReadOnlyBatchIndexMapper.java
  5. +0 −24 contrib/batch-indexer/src/java/voldemort/contrib/batchindexer/ReadOnlyBatchIndexPartitioner.java
  6. +0 −103 contrib/batch-indexer/src/java/voldemort/contrib/batchindexer/ReadOnlyBatchIndexReducer.java
  7. +0 −132 contrib/batch-indexer/src/java/voldemort/contrib/batchindexer/ReadOnlyBatchIndexer.java
  8. +0 −90 contrib/batch-indexer/src/java/voldemort/contrib/batchswapper/AbstractSwapperMapper.java
  9. +0 −52 contrib/batch-indexer/src/java/voldemort/contrib/batchswapper/NonSplitableDummyFileInputFormat.java
  10. +0 −126 contrib/batch-indexer/src/java/voldemort/contrib/batchswapper/ReadOnlyBatchIndexHadoopSwapper.java
  11. +0 −136 contrib/batch-indexer/src/java/voldemort/contrib/batchswapper/ReadOnlyBatchIndexSwapper.java
  12. +0 −71 contrib/batch-indexer/src/java/voldemort/contrib/batchswapper/SwapperUtils.java
  13. +0 −43 contrib/batch-indexer/src/java/voldemort/contrib/fetcher/HdfsFetcher.java
  14. +0 −141 contrib/batch-indexer/test/voldemort/contrib/batchindexer/ReadOnlyBatchIndexerTest.java
  15. +0 −188 contrib/batch-indexer/test/voldemort/contrib/batchswapper/ReadOnlyHadoopSwapperTest.java
  16. +0 −182 contrib/batch-indexer/test/voldemort/contrib/batchswapper/ReadOnlySimpleSwapperTest.java
  17. +0 −73 contrib/batch-indexer/test/voldemort/contrib/batchswapper/ReadOnlySwapperTestUtils.java
  18. +0 −39 contrib/batch-indexer/test/voldemort/contrib/fetcher/HdfsFetcherTest.java
  19. 0 contrib/{batch-indexer → hadoop-store-builder}/lib/commons-cli-2.0-SNAPSHOT.jar
  20. 0 contrib/{batch-indexer → hadoop-store-builder}/lib/hadoop-0.18.1-core.jar
  21. 0 ...adoop-store-builder/perf}/voldemort/contrib/batchindexer/performance/BdbBuildPerformanceTest.java
  22. 0 ...oop-store-builder/perf}/voldemort/contrib/batchindexer/performance/MysqlBuildPerformanceTest.java
  23. +80 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/benchmark/BuildTestStore.java
  24. +97 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/benchmark/GenerateData.java
  25. +105 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java
  26. +74 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/IoThrottler.java
  27. +83 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/AbstractStoreBuilderMapper.java
  28. +113 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilder.java
  29. +51 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderBase.java
  30. +22 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderPartitioner.java
  31. +114 −0 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderReducer.java
  32. +72 −0 contrib/hadoop-store-builder/test/voldemort/store/readonly/fetcher/HdfsFetcherTest.java
  33. +140 −0 contrib/hadoop-store-builder/test/voldemort/store/readonly/mr/StoreBuilderTest.java
  34. +0 −65 contrib/utils/src/java/test/voldemort/contrib/utils/ContribUtilsTest.java
  35. +0 −72 contrib/utils/src/java/voldemort/contrib/utils/ContribUtils.java
  36. +22 −4 src/java/voldemort/server/http/gui/ReadOnlyStoreManagementServlet.java
  37. +4 −4 src/java/voldemort/server/http/gui/templates/read-only-mgmt.vm
  38. +2 −1 src/java/voldemort/store/ErrorCodeMapper.java
  39. +29 −16 src/java/voldemort/store/readonly/StoreSwapper.java
  40. +4 −0 src/java/voldemort/utils/SystemTime.java
  41. +2 −0 src/java/voldemort/utils/Time.java
  42. +20 −27 src/java/voldemort/utils/Utils.java
  43. +20 −11 test/common/voldemort/MockTime.java
View
7 .classpath
@@ -1,12 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src/java"/>
+ <classpathentry kind="src" path="contrib/hadoop-store-builder/test"/>
+ <classpathentry kind="src" path="contrib/hadoop-store-builder/src/java"/>
<classpathentry kind="src" path="contrib/mongodb/example"/>
- <classpathentry kind="src" path="contrib/batch-indexer/test"/>
<classpathentry kind="src" path="contrib/mongodb/src/java"/>
<classpathentry kind="src" path="contrib/mongodb/test"/>
<classpathentry kind="src" path="contrib/utils/src/java"/>
- <classpathentry kind="src" path="contrib/batch-indexer/src/java"/>
<classpathentry kind="src" path="test/unit"/>
<classpathentry kind="src" path="test/integration"/>
<classpathentry kind="src" path="test/common"/>
@@ -32,12 +32,11 @@
<classpathentry kind="lib" path="lib/velocity-1.5.jar"/>
<classpathentry kind="lib" path="lib/xerces.jar"/>
<classpathentry kind="lib" path="lib/colt-1.2.0.jar"/>
- <classpathentry kind="lib" path="contrib/batch-indexer/lib/hadoop-0.18.1-core.jar" sourcepath="/Users/bbansal/work/linkedin/downloads/hadoop/hadoop-0.18.1/src"/>
- <classpathentry kind="lib" path="contrib/batch-indexer/lib/commons-cli-2.0-SNAPSHOT.jar"/>
<classpathentry kind="lib" path="lib/protobuf-java-2.0.3.jar"/>
<classpathentry kind="lib" path="lib/libthrift-20080411p1.jar"/>
<classpathentry kind="lib" path="lib/google-collect-snapshot-20090211.jar"/>
<classpathentry kind="lib" path="contrib/mongodb/lib/mongo-xjdm.jar"/>
+ <classpathentry kind="lib" path="contrib/hadoop-store-builder/lib/hadoop-0.18.1-core.jar"/>
<classpathentry kind="lib" path="lib/je-3.3.75.jar"/>
<classpathentry kind="output" path="classes"/>
</classpath>
View
5 bin/run-class.sh
@@ -16,6 +16,11 @@
# limitations under the License.
#
+if [ $# -lt 1 ]; then
+ echo $0 java-class-name [options]
+ exit 1
+fi
+
base_dir=$(dirname $0)/..
for file in $base_dir/lib/*.jar;
View
15 build.xml
@@ -129,6 +129,21 @@
</jar>
</target>
+ <target name="alljar" depends="build, contrib-build" description="Build a jar file that includes all contrib code.">
+ <jar destfile="${dist.dir}/${name}-${curr.release}-all.jar">
+ <fileset dir="${classes.dir}">
+ <include name="**/*.*" />
+ </fileset>
+ <fileset dir="${contrib.classes.dir}">
+ <include name="**/*.*" />
+ </fileset>
+ <!-- include xsds -->
+ <fileset dir="${java.dir}">
+ <include name="**/*.xsd" />
+ </fileset>
+ </jar>
+ </target>
+
<target name="war" depends="build" description="Build server war file">
<war destfile="${dist.dir}/${name}.war" webxml="web.xml" basedir="${classes.dir}">
<classes dir="${classes.dir}"/>
View
104 contrib/batch-indexer/src/java/voldemort/contrib/batchindexer/ReadOnlyBatchIndexMapper.java
@@ -1,104 +0,0 @@
-package voldemort.contrib.batchindexer;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-import voldemort.cluster.Cluster;
-import voldemort.cluster.Node;
-import voldemort.contrib.utils.ContribUtils;
-import voldemort.routing.ConsistentRoutingStrategy;
-import voldemort.serialization.DefaultSerializerFactory;
-import voldemort.serialization.Serializer;
-import voldemort.store.StoreDefinition;
-import voldemort.utils.ByteUtils;
-
-/**
- * Mapper code for Read-Only store batch Indexer Reads following properties from
- * JobConf
- * <p>
- * <li><strong>voldemort.store.name </strong></li>
- * <li><strong>voldemort.store.version </strong></li>
- * <p>
- * Assumes Distributed cache have files with names
- * <li><strong>cluster.xml </strong></li>
- * <li><strong>stores.xml</strong></li>
- *
- * @author bbansal
- *
- */
-public abstract class ReadOnlyBatchIndexMapper<K, V> implements
- Mapper<K, V, BytesWritable, BytesWritable> {
-
- private Cluster cluster = null;
- private StoreDefinition storeDef = null;
- private ConsistentRoutingStrategy routingStrategy = null;
- private Serializer<Object> keySerializer;
- private Serializer<Object> valueSerializer;
-
- public abstract Object getKeyBytes(K key, V value);
-
- public abstract Object getValueBytes(K key, V value);
-
- public void map(K key,
- V value,
- OutputCollector<BytesWritable, BytesWritable> output,
- Reporter reporter) throws IOException {
- byte[] keyBytes = keySerializer.toBytes(getKeyBytes(key, value));
- byte[] valBytes = valueSerializer.toBytes(getValueBytes(key, value));
-
- List<Node> nodes = routingStrategy.routeRequest(keyBytes);
- for(Node node: nodes) {
- ByteArrayOutputStream versionedValue = new ByteArrayOutputStream();
- DataOutputStream valueDin = new DataOutputStream(versionedValue);
- valueDin.writeInt(node.getId());
- valueDin.write(valBytes);
- valueDin.close();
- BytesWritable outputKey = new BytesWritable(ByteUtils.md5(keyBytes));
- BytesWritable outputVal = new BytesWritable(versionedValue.toByteArray());
-
- output.collect(outputKey, outputVal);
- }
- }
-
- @SuppressWarnings("unchecked")
- public void configure(JobConf conf) {
-
- try {
-
- // get the voldemort cluster.xml and store.xml files.
- String clusterFilePath = ContribUtils.getFileFromCache(conf, "cluster.xml");
- String storeFilePath = ContribUtils.getFileFromCache(conf, "store.xml");
-
- if(null == clusterFilePath || null == storeFilePath) {
- throw new RuntimeException("Mapper expects cluster.xml / stores.xml passed through Distributed cache.");
- }
-
- // get Cluster and Store details
- cluster = ContribUtils.getVoldemortClusterDetails(clusterFilePath);
- storeDef = ContribUtils.getVoldemortStoreDetails(storeFilePath,
- conf.get("voldemort.store.name"));
-
- keySerializer = (Serializer<Object>) new DefaultSerializerFactory().getSerializer(storeDef.getKeySerializer());
- valueSerializer = (Serializer<Object>) new DefaultSerializerFactory().getSerializer(storeDef.getValueSerializer());
-
- routingStrategy = new ConsistentRoutingStrategy(cluster.getNodes(),
- storeDef.getReplicationFactor());
-
- if(routingStrategy == null) {
- throw new RuntimeException("Failed to create routing strategy");
- }
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void close() throws IOException {}
-}
View
24 .../batch-indexer/src/java/voldemort/contrib/batchindexer/ReadOnlyBatchIndexPartitioner.java
@@ -1,24 +0,0 @@
-package voldemort.contrib.batchindexer;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.lib.HashPartitioner;
-
-public class ReadOnlyBatchIndexPartitioner extends HashPartitioner<BytesWritable, BytesWritable> {
-
- @Override
- public int getPartition(BytesWritable key, BytesWritable value, int numReduceTasks) {
- // The partition id is first 4 bytes in the value.
- DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(value.get()));
- int nodeId = -2;
- try {
- nodeId = buffer.readInt();
- } catch(IOException e) {
- throw new RuntimeException("Failed to parse nodeId from buffer.", e);
- }
- return (nodeId) % numReduceTasks;
- }
-}
View
103 contrib/batch-indexer/src/java/voldemort/contrib/batchindexer/ReadOnlyBatchIndexReducer.java
@@ -1,103 +0,0 @@
-package voldemort.contrib.batchindexer;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-import voldemort.utils.ByteUtils;
-
-public class ReadOnlyBatchIndexReducer implements Reducer<BytesWritable, BytesWritable, Text, Text> {
-
- private DataOutputStream indexFileStream = null;
- private DataOutputStream valueFileStream = null;
-
- private long position = 0;
-
- private JobConf conf = null;
- private String taskId = null;
- private int nodeId = -1;
-
- Path taskIndexFileName;
- Path taskValueFileName;
-
- /**
- * Reduce should get sorted MD5 keys here with a single value (appended in
- * begining with 4 bits of nodeId)
- */
- public void reduce(BytesWritable key,
- Iterator<BytesWritable> values,
- OutputCollector<Text, Text> output,
- Reporter reporter) throws IOException {
- byte[] keyBytes = ByteUtils.copy(key.get(), 0, key.getSize());
-
- while(values.hasNext()) {
- BytesWritable value = values.next();
- byte[] valBytes = ByteUtils.copy(value.get(), 0, value.getSize());
-
- if(nodeId == -1) {
- DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(valBytes));
- nodeId = buffer.readInt();
- }
- // strip first 4 bytes as node_id
- byte[] value1 = ByteUtils.copy(valBytes, 4, valBytes.length);
-
- // Write Index Key/ position
- indexFileStream.write(keyBytes);
- indexFileStream.writeLong(position);
- valueFileStream.writeInt(value1.length);
- valueFileStream.write(value1);
- position += value1.length + 4;
-
- if(position < 0) {
- throw new RuntimeException("Position bigger than Integer size, split input files.");
- }
- }
-
- }
-
- public void configure(JobConf job) {
- try {
- position = 0;
- conf = job;
-
- taskId = job.get("mapred.task.id");
-
- taskIndexFileName = new Path(FileOutputFormat.getOutputPath(conf),
- conf.get("voldemort.index.filename") + "_" + taskId);
- taskValueFileName = new Path(FileOutputFormat.getOutputPath(conf),
- conf.get("voldemort.data.filename") + "_" + taskId);
-
- FileSystem fs = taskIndexFileName.getFileSystem(job);
-
- indexFileStream = fs.create(taskIndexFileName, (short) 1);
- valueFileStream = fs.create(taskValueFileName, (short) 1);
- } catch(IOException e) {
- throw new RuntimeException("Failed to open Input/OutputStream", e);
- }
- }
-
- public void close() throws IOException {
-
- indexFileStream.close();
- valueFileStream.close();
-
- Path hdfsIndexFile = new Path(FileOutputFormat.getOutputPath(conf), nodeId + ".index");
- Path hdfsValueFile = new Path(FileOutputFormat.getOutputPath(conf), nodeId + ".data");
-
- FileSystem fs = hdfsIndexFile.getFileSystem(conf);
- fs.rename(taskIndexFileName, hdfsIndexFile);
- fs.rename(taskValueFileName, hdfsValueFile);
- }
-}
View
132 contrib/batch-indexer/src/java/voldemort/contrib/batchindexer/ReadOnlyBatchIndexer.java
@@ -1,132 +0,0 @@
-package voldemort.contrib.batchindexer;
-
-import java.net.URI;
-
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.log4j.Logger;
-
-import voldemort.cluster.Cluster;
-import voldemort.contrib.utils.ContribUtils;
-
-/**
- * Creates a simple Read-Only Voldemort store for easy batch update.
- * <p>
- * Creates two files
- * <ul>
- * <li><strong>Index</strong> File: Keeps the position Index for each key
- * sorted by MD5(key) Tuple: <KEY_HASH_SIZE(16 bytes)><POSITION_SIZE(8 bytes)>
- * </li>
- * <li><strong>Values</strong> file: Keeps the variable length values Tuple:
- * <SIZE_OF_VALUE(4 bytes)><VALUE(byte[])> </li>
- * <ul>
- * <p>
- * Required Properties
- * <ul>
- * <li>job.name</li>
- * <li>voldemort.cluster.local.filePath</li>
- * <li>voldemort.store.local.filePath</li>
- * <li>voldemort.store.name</li>
- * <li>voldemort.store.version</li>
- * <li>input.data.check.percent</li>
- * </ul>
- *
- * @author bbansal
- */
-public abstract class ReadOnlyBatchIndexer extends Configured implements Tool, JobConfigurable {
-
- private static Logger logger = Logger.getLogger(ReadOnlyBatchIndexer.class);
-
- public int run(String[] args) throws Exception {
-
- JobConf conf = new JobConf(ReadOnlyBatchIndexer.class);
- configure(conf);
-
- try {
- // get the voldemort cluster definition
- Cluster cluster = ContribUtils.getVoldemortClusterDetails(conf.get("voldemort.cluster.local.filePath"));
- conf.setNumReduceTasks(cluster.getNumberOfNodes());
- } catch(Exception e) {
- logger.error("Failed to read Voldemort cluster details", e);
- throw new RuntimeException("", e);
- }
-
- // set the partitioner
- conf.setPartitionerClass(ReadOnlyBatchIndexPartitioner.class);
-
- // set mapper Outputclasses
- conf.setMapOutputKeyClass(BytesWritable.class);
- conf.setMapOutputValueClass(BytesWritable.class);
-
- // set reducer classes
- conf.setReducerClass(ReadOnlyBatchIndexReducer.class);
- conf.setOutputFormat(SequenceFileOutputFormat.class);
- conf.setOutputKeyClass(BytesWritable.class);
- conf.setOutputValueClass(BytesWritable.class);
-
- // get the store information
- String storeName = conf.get("voldemort.store.name");
- conf.setStrings("voldemort.index.filename", storeName + ".index");
- conf.setStrings("voldemort.data.filename", storeName + ".data");
-
- // get Local config files.
- Path clusterFile = new Path(conf.get("voldemort.cluster.local.filePath"));
- Path storeFile = new Path(conf.get("voldemort.store.local.filePath"));
-
- // move files to HDFS if Hadoop run is not local
- if(!conf.get("mapred.job.tracker").equals("local")) {
-
- // set temp HDFS paths
- Path clusterHdfs = new Path("/tmp/" + conf.getJobName() + "/" + "cluster.xml");
- Path storeHdfs = new Path("/tmp/" + conf.getJobName() + "/" + "store.xml");
-
- // get FileSystem & copy files to HDFS
- // TODO LOW: Distributed cache should take care of this
- FileSystem fs = clusterFile.getFileSystem(conf);
- fs.copyFromLocalFile(clusterFile, clusterHdfs);
- fs.copyFromLocalFile(storeFile, storeHdfs);
-
- // add HDFS files to distributed cache
- DistributedCache.addCacheFile(new URI(clusterHdfs.toString() + "#cluster.xml"), conf);
- DistributedCache.addCacheFile(new URI(storeHdfs.toString() + "#store.xml"), conf);
- } else {
- // Add local files to distributed cache
- DistributedCache.addCacheFile(new URI(clusterFile.toString() + "#cluster.xml"), conf);
- DistributedCache.addCacheFile(new URI(storeFile.toString() + "#store.xml"), conf);
- }
-
- // run(conf);
- JobClient.runJob(conf);
- return 0;
- }
-
- /**
- * <strong>configure must set:</strong>
- * <ul>
- * <li>Input Path List</li>
- * <li>Output Path</li>
- * <li>Mapper class <? extends {@link ReadOnlyBatchIndexMapper}></li>
- * <li>Input Format class</li>
- * </ul>
- * <p>
- * <strong>configure must set these properties.</strong>
- * <ul>
- * <li>job.name: String</li>
- * <li>voldemort.cluster.local.filePath: String</li>
- * <li>voldemort.store.local.filePath: String</li>
- * <li>voldemort.store.name: String</li>
- * </ul>
- *
- *
- * @return
- */
- public abstract void configure(JobConf conf);
-}
View
90 contrib/batch-indexer/src/java/voldemort/contrib/batchswapper/AbstractSwapperMapper.java
@@ -1,90 +0,0 @@
-package voldemort.contrib.batchswapper;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-import voldemort.cluster.Cluster;
-import voldemort.cluster.Node;
-import voldemort.contrib.utils.ContribUtils;
-
-public abstract class AbstractSwapperMapper implements Mapper<LongWritable, Text, Text, Text> {
-
- JobConf conf;
- Node node;
- String sourcefileName;
- boolean isIndexFile;
- boolean doNothing = false;
-
- /**
- * copy local source file to remote destination
- *
- * @param hostname
- * @param source
- * @param destination
- * @return
- */
- public abstract boolean copyRemoteFile(String hostname, String source, String destination);
-
- public void map(LongWritable key,
- Text value,
- OutputCollector<Text, Text> output,
- Reporter reporter) throws IOException {
-
- if(!doNothing) {
- String targetFileName;
- String destinationDir = conf.get("destination.path");
-
- if(isIndexFile) {
- targetFileName = SwapperUtils.getIndexDestinationFile(node.getId(), destinationDir);
- } else {
- targetFileName = SwapperUtils.getDataDestinationFile(node.getId(), destinationDir);
- }
-
- // copy remote File
- if(!copyRemoteFile(node.getHost(), sourcefileName, targetFileName)) {
- throw new RuntimeException("Failed to copy file host:" + node.getHost()
- + " sourcePath:" + sourcefileName + " targetFileName:"
- + targetFileName);
- }
- }
- }
-
- public void configure(JobConf job) {
- conf = job;
- sourcefileName = new Path(conf.get("map.input.file")).toUri().getPath();
-
- if(sourcefileName.contains("index") || sourcefileName.contains("data")) {
- System.out.println("mapper got file:" + sourcefileName);
-
- isIndexFile = new Path(sourcefileName).getName().contains("index");
-
- // split on '.' character names expected are 0.index , 0.data
- String[] tempSplits = new Path(sourcefileName).getName().split("\\.");
- int fileNodeId = Integer.parseInt(tempSplits[0]);
-
- try {
- // get the voldemort cluster.xml
- String clusterFilePath = ContribUtils.getFileFromCache(conf, "cluster.xml");
-
- if(null == clusterFilePath) {
- throw new RuntimeException("Mapper expects cluster.xml / stores.xml passed through Distributed cache.");
- }
- // create cluster
- Cluster cluster = ContribUtils.getVoldemortClusterDetails(clusterFilePath);
-
- node = cluster.getNodeById(fileNodeId);
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- public void close() throws IOException {}
-}
View
52 ...tch-indexer/src/java/voldemort/contrib/batchswapper/NonSplitableDummyFileInputFormat.java
@@ -1,52 +0,0 @@
-package voldemort.contrib.batchswapper;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.LineRecordReader;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-public class NonSplitableDummyFileInputFormat extends TextInputFormat {
-
- @Override
- protected boolean isSplitable(FileSystem fs, Path filename) {
- return false;
- }
-
- @Override
- public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit,
- JobConf job,
- Reporter reporter) throws IOException {
-
- reporter.setStatus(genericSplit.toString());
- return new DummyRecordReader(job, (FileSplit) genericSplit);
- }
-
- class DummyRecordReader extends LineRecordReader {
-
- private int counter = 0;
-
- public DummyRecordReader(Configuration conf, FileSplit split) throws IOException {
- super(conf, split);
- }
-
- @Override
- public boolean next(LongWritable key, Text value) throws IOException {
- if(counter >= 1) {
- return false;
- }
- counter++;
- return true;
- }
- }
-
-}
View
126 ...atch-indexer/src/java/voldemort/contrib/batchswapper/ReadOnlyBatchIndexHadoopSwapper.java
@@ -1,126 +0,0 @@
-package voldemort.contrib.batchswapper;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.util.Tool;
-
-import voldemort.cluster.Cluster;
-import voldemort.cluster.Node;
-import voldemort.contrib.utils.ContribUtils;
-
-/**
- * voldemort swap index job. rsync and swap index/data files for Read-Only store
- * in voldemort cluster. Reads from local filesystem and writes to remote node.
- * <p>
- * {@link ReadOnlyBatchIndexSwapper} have few problems
- * <ul>
- * <li> Bringing all files to one nodes at one time is not practical</li>
- * <li> Hadoop mappers can use hdfs data locality (mapper is smart)</li>
- * </ul>
- * <p>
- * We will use customize inputformat to only give is single line input
- * {@link NonSplitableDummyFileInputFormat}
- *
- * @author bbansal
- * <p>
- * Required Properties
- * <ul>
- * <li>voldemort.cluster.local.filePath: String</li>
- * <li>voldemort.store.local.filePath: String</li>
- * <li>voldemort.store.name: String</li>
- * <li>source.path: String HDFS Path</li>
- * <li>destination.path: String Remote machine temp directory</li>
- * </ul>
- */
-public abstract class ReadOnlyBatchIndexHadoopSwapper extends Configured implements Tool,
- JobConfigurable {
-
- public int run(String[] args) {
- JobConf conf = new JobConf(ReadOnlyBatchIndexHadoopSwapper.class);
- configure(conf);
-
- Cluster cluster = null;
- try {
- // get the voldemort cluster definition
- cluster = ContribUtils.getVoldemortClusterDetails(conf.get("voldemort.cluster.local.filePath"));
-
- // No Reducer needed for this Job
- conf.setNumReduceTasks(0);
-
- // set FileInputFormat
- conf.setMapperClass(getSwapperMapperClass());
- conf.setInputFormat(NonSplitableDummyFileInputFormat.class);
-
- // get the store information
- String storeName = conf.get("voldemort.store.name");
-
- // move files to Distributed Cache for mapper to use
- moveMetaDatatoHDFS(conf, new Path(conf.get("voldemort.cluster.local.filePath")));
-
- // set Input Output Path
- FileInputFormat.setInputPaths(conf, new Path(conf.get("source.path")));
-
- // Set Output File to a dummy temp dir
- String tempHadoopDir = conf.get("hadoop.tmp.dir") + File.separatorChar
- + (int) (Math.random() * 1000000);
- FileOutputFormat.setOutputPath(conf, new Path(tempHadoopDir));
-
- // run(conf);
- JobClient.runJob(conf);
-
- // delete tempHdoopDir
- new Path(tempHadoopDir).getFileSystem(conf).delete(new Path(tempHadoopDir), true);
-
- // lets try to swap only the successful nodes
- for(Node node: cluster.getNodes()) {
- SwapperUtils.doSwap(storeName, node, conf.get("destination.path"));
- }
- } catch(Exception e) {
- throw new RuntimeException("Swap Job Failed", e);
- }
- return 0;
- }
-
- private void moveMetaDatatoHDFS(JobConf conf, Path clusterFile) throws IOException,
- URISyntaxException {
- if(!conf.get("mapred.job.tracker").equals("local")) {
- // make temp hdfs path and add to distributed cache
- Path clusterHdfs = new Path("/tmp/" + conf.getJobName() + "/" + "cluster.xml");
- FileSystem fs = clusterFile.getFileSystem(conf);
- fs.copyFromLocalFile(clusterFile, clusterHdfs);
- DistributedCache.addCacheFile(new URI(clusterHdfs.toString() + "#cluster.xml"), conf);
- } else {
- // Add local files to distributed cache
- DistributedCache.addCacheFile(new URI(clusterFile.toString() + "#cluster.xml"), conf);
- }
- }
-
- /**
- * <strong>configure must set these properties.</strong>
- * <ul>
- * <li>voldemort.cluster.local.filePath: String</li>
- * <li>voldemort.store.local.filePath: String</li>
- * <li>voldemort.store.name: String</li>
- * <li>source.path: String HDFS Path</li>
- * <li>destination.path: String Remote machine temp directory</li>
- * </ul>
- *
- *
- * @return
- */
- public abstract void configure(JobConf conf);
-
- public abstract Class<? extends AbstractSwapperMapper> getSwapperMapperClass();
-}
View
136 contrib/batch-indexer/src/java/voldemort/contrib/batchswapper/ReadOnlyBatchIndexSwapper.java
@@ -1,136 +0,0 @@
-package voldemort.contrib.batchswapper;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-import voldemort.cluster.Cluster;
-import voldemort.cluster.Node;
-import voldemort.contrib.utils.ContribUtils;
-import voldemort.utils.Props;
-
-/**
- * voldemort swap index job. rsync and swap index/data files for Read-Only store
- * in voldemort cluster. Reads from local filesystem and writes to remote node.
- *
- * @author bbansal
- * <p>
- * Required Properties
- * <ul>
- * <li>voldemort.cluster.local.filePath: String</li>
- * <li>voldemort.store.name: String</li>
- * <li>source.local.path: String</li>
- * <li>destination.remote.path: String</li>
- * </ul>>
- */
-public abstract class ReadOnlyBatchIndexSwapper {
-
- private static Logger logger = Logger.getLogger(ReadOnlyBatchIndexSwapper.class);
-
- public void run() throws Throwable {
-
- final Props props = new Props();
-
- // get user settings.
- configure(props);
-
- Cluster cluster = ContribUtils.getVoldemortClusterDetails(props.get("voldemort.cluster.local.filePath"));
- String storeName = props.get("voldemort.store.name");
- final Path inputDir = new Path(props.get("source.local.path"));
-
- ExecutorService executors = Executors.newFixedThreadPool(cluster.getNumberOfNodes());
- final Semaphore semaphore = new Semaphore(0, false);
- final boolean[] succeeded = new boolean[cluster.getNumberOfNodes()];
- final String destinationDir = props.get("destination.remote.path");
-
- for(final Node node: cluster.getNodes()) {
-
- executors.execute(new Runnable() {
-
- public void run() {
- int id = node.getId();
- String indexFile = inputDir + "/" + Integer.toString(id) + ".index";
- String dataFile = inputDir + "/" + Integer.toString(id) + ".data";
-
- if(!(new File(indexFile).exists())) {
- logger.warn("IndexFile for node " + id + " not available path:" + indexFile);
- }
- if(!(new File(dataFile).exists())) {
- logger.warn("DataFile for node " + id + " not available path:" + dataFile);
- }
-
- if(new File(indexFile).exists() && new File(indexFile).exists()) {
- String host = node.getHost();
-
- boolean index = false;
- boolean data = false;
-
- try {
- index = copyRemoteFile(host,
- indexFile,
- SwapperUtils.getIndexDestinationFile(node.getId(),
- destinationDir));
- data = copyRemoteFile(host,
- dataFile,
- SwapperUtils.getDataDestinationFile(node.getId(),
- destinationDir));
- } catch(IOException e) {
- logger.error("copy to Remote node failed for node:" + node.getId(), e);
- }
-
- if(index && data) {
- succeeded[node.getId()] = true;
- }
- }
- semaphore.release();
- }
- });
- }
-
- // wait for all operations to complete
- semaphore.acquire(cluster.getNumberOfNodes());
-
- int counter = 0;
- // lets try to swap only the successful nodes
- for(Node node: cluster.getNodes()) {
- // data refresh succeeded
- if(succeeded[node.getId()]) {
- SwapperUtils.doSwap(storeName, node, destinationDir);
- counter++;
- }
- }
- logger.info(counter + " node out of " + cluster.getNumberOfNodes()
- + " refreshed with fresh index/data for store '" + storeName + "'");
- }
-
- /**
- * <strong>configure must set these properties.</strong>
- * <ul>
- * <li>voldemort.cluster.local.filePath: String</li>
- * <li>voldemort.store.name: String</li>
- * <li>source.local.path: String</li>
- * <li>destination.remote.path: String</li>
- * </ul>
- *
- *
- * @return
- */
- public abstract void configure(Props props);
-
- /**
- * copy local source file to remote destination
- *
- * @param hostname
- * @param source
- * @param destination
- * @return
- * @throws IOException
- */
- public abstract boolean copyRemoteFile(String hostname, String source, String destination)
- throws IOException;
-}
View
71 contrib/batch-indexer/src/java/voldemort/contrib/batchswapper/SwapperUtils.java
@@ -1,71 +0,0 @@
-package voldemort.contrib.batchswapper;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.net.URLEncoder;
-
-import voldemort.cluster.Node;
-
-public class SwapperUtils {
-
- public static void doSwap(String storeName, Node node, String destinationDir)
- throws IOException {
- // construct data operation = swap
- String data = URLEncoder.encode("operation", "UTF-8") + "="
- + URLEncoder.encode("swap", "UTF-8");
- // add index = indexFileName
- data += "&" + URLEncoder.encode("index", "UTF-8") + "="
- + URLEncoder.encode(getIndexDestinationFile(node.getId(), destinationDir), "UTF-8");
- // add data = dataFileName
- data += "&" + URLEncoder.encode("data", "UTF-8") + "="
- + URLEncoder.encode(getDataDestinationFile(node.getId(), destinationDir), "UTF-8");
- // add store= storeName
- data += "&" + URLEncoder.encode("store", "UTF-8") + "="
- + URLEncoder.encode(storeName, "UTF-8");
-
- // Send data
- URL url = new URL("http://" + node.getHost() + ":" + node.getHttpPort() + "/read-only/mgmt");
- System.out.println("swapping node:" + node.getId() + " with url:" + url.toString()
- + " data:" + data);
- HttpURLConnection connection = (HttpURLConnection) url.openConnection();
- connection.setRequestMethod("POST");
- connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
-
- connection.setRequestProperty("Content-Length", ""
- + Integer.toString(data.getBytes().length));
- connection.setRequestProperty("Content-Language", "en-US");
-
- connection.setUseCaches(false);
- connection.setDoInput(true);
- connection.setDoOutput(true);
-
- OutputStreamWriter wr = new OutputStreamWriter(connection.getOutputStream());
- wr.write(data);
- wr.flush();
- wr.close();
- // Get Response
- InputStream is = connection.getInputStream();
- BufferedReader rd = new BufferedReader(new InputStreamReader(is));
- String line;
- StringBuffer response = new StringBuffer();
- while((line = rd.readLine()) != null) {
- response.append(line);
- response.append('\r');
- }
- System.out.println("doSwap Completed for " + node + " Response:" + response.toString());
- rd.close();
- }
-
- public static String getIndexDestinationFile(int nodeId, String destinationDir) {
- return destinationDir + "/" + "node-" + nodeId + ".index";
- }
-
- public static String getDataDestinationFile(int nodeId, String destinationDir) {
- return destinationDir + "/" + "node-" + nodeId + ".data";
- }
-}
View
43 contrib/batch-indexer/src/java/voldemort/contrib/fetcher/HdfsFetcher.java
@@ -1,43 +0,0 @@
-package voldemort.contrib.fetcher;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import voldemort.store.readonly.FileFetcher;
-
-/**
- * Fetch the store from HDFS
- *
- * @author jay
- *
- */
-public class HdfsFetcher implements FileFetcher {
-
- public File fetchFile(String fileUrl) throws IOException {
- Path filePath = new Path(fileUrl);
- FileSystem fs = filePath.getFileSystem(new Configuration());
-
- // copy index file
- FSDataInputStream input = null;
- OutputStream output = null;
- try {
- input = fs.open(filePath);
- File outputFile = File.createTempFile("fetcher-", ".dat");
- output = new FileOutputStream(outputFile);
- IOUtils.copyLarge(input, output);
- return outputFile;
- } finally {
- IOUtils.closeQuietly(output);
- IOUtils.closeQuietly(input);
- }
- }
-
-}
View
141 contrib/batch-indexer/test/voldemort/contrib/batchindexer/ReadOnlyBatchIndexerTest.java
@@ -1,141 +0,0 @@
-/*
- * Copyright 2008-2009 LinkedIn, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package voldemort.contrib.batchindexer;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.util.List;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.io.FileDeleteStrategy;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.util.ToolRunner;
-
-import voldemort.serialization.DefaultSerializerFactory;
-import voldemort.serialization.Serializer;
-import voldemort.serialization.SerializerDefinition;
-import voldemort.store.Store;
-import voldemort.store.readonly.RandomAccessFileStore;
-import voldemort.store.serialized.SerializingStore;
-import voldemort.versioning.Versioned;
-
-/**
- * Unit test to check Read-Only Batch Indexer <strong>in Local mode numReduce
- * will be only one hence we will see only one node files irrespective of
- * cluster details.</strong>
- *
- * @author bbansal
- *
- */
-public class ReadOnlyBatchIndexerTest extends TestCase {
-
- @SuppressWarnings("unchecked")
- public void testCSVFileBatchIndexer() throws Exception {
-
- // rename Files
- File dataDir = new File("contrib/batch-indexer/temp-output/text");
- if(dataDir.exists()) {
- FileDeleteStrategy.FORCE.delete(dataDir);
- }
-
- ToolRunner.run(new Configuration(), new TextBatchIndexer(), null);
-
- // rename Files
- new File(dataDir, "0.index").renameTo(new File(dataDir, "users.index"));
- new File(dataDir, "0.data").renameTo(new File(dataDir, "users.data"));
-
- // open Store
- SerializerDefinition serDef = new SerializerDefinition("string", "UTF-8");
- Serializer<Object> Keyserializer = (Serializer<Object>) new DefaultSerializerFactory().getSerializer(serDef);
- Serializer<Object> Valueserializer = (Serializer<Object>) new DefaultSerializerFactory().getSerializer(new SerializerDefinition("java-serialization"));
-
- Store<Object, Object> store = new SerializingStore<Object, Object>(new RandomAccessFileStore("users",
- dataDir,
- 1,
- 3,
- 1000,
- 100 * 1000 * 1000),
- Keyserializer,
- Valueserializer);
-
- // query all keys and check for value
- BufferedReader reader = new BufferedReader(new FileReader(new File("contrib/common/test-data/usersCSV.txt")));
- String line;
- while(null != (line = reader.readLine())) {
-
- // correct Query
- String[] tokens = line.split("\\|");
- List<Versioned<Object>> found = store.get(tokens[0]);
- String result = (String) found.get(0).getValue();
- assertEquals("Value for key should match for set value", tokens[1], result);
-
- // wrong query
- int changeIndex = (int) (Math.random() * tokens[0].length());
- found = store.get(tokens[0].replace(tokens[0].charAt(changeIndex), '|'));
- // found size should be 0 or not match the original value.
- if(found.size() > 0) {
- result = (String) found.get(0).getValue();
- assertNotSame("Value for key should not match for set value", tokens[1], result);
- }
- }
-
- }
-}
-
-class TextBatchMapper extends ReadOnlyBatchIndexMapper<LongWritable, Text> {
-
- @Override
- public Object getKeyBytes(LongWritable key, Text value) {
- String[] tokens = value.toString().split("\\|");
- return tokens[0];
- }
-
- @Override
- public Object getValueBytes(LongWritable key, Text value) {
- String[] tokens = value.toString().split("\\|");
- return tokens[1];
- }
-
-}
-
-class TextBatchIndexer extends ReadOnlyBatchIndexer {
-
- @Override
- public void configure(JobConf conf) {
-
- conf.set("job.name", "testCSVBatchIndexer");
- conf.set("voldemort.cluster.local.filePath", "contrib/common/config/nine-node-cluster.xml");
- conf.set("voldemort.store.local.filePath", "contrib/common/config/stores.xml");
- conf.set("voldemort.store.name", "users");
-
- // set inset/outset path
- FileInputFormat.addInputPaths(conf, "contrib/common/test-data/usersCSV.txt");
- FileOutputFormat.setOutputPath(conf, new Path("contrib/batch-indexer/temp-output/text"));
-
- conf.setMapperClass(TextBatchMapper.class);
- conf.setInputFormat(TextInputFormat.class);
- }
-}
View
188 contrib/batch-indexer/test/voldemort/contrib/batchswapper/ReadOnlyHadoopSwapperTest.java
@@ -1,188 +0,0 @@
-/*
- * Copyright 2008-2009 LinkedIn, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package voldemort.contrib.batchswapper;
-
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.io.FileDeleteStrategy;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.mapred.JobConf;
-
-import voldemort.ServerTestUtils;
-import voldemort.TestUtils;
-import voldemort.cluster.Cluster;
-import voldemort.serialization.DefaultSerializerFactory;
-import voldemort.serialization.Serializer;
-import voldemort.serialization.SerializerDefinition;
-import voldemort.server.VoldemortConfig;
-import voldemort.server.VoldemortServer;
-import voldemort.store.Store;
-import voldemort.store.StoreUtils;
-import voldemort.utils.ByteArray;
-import voldemort.xml.ClusterMapper;
-
-public class ReadOnlyHadoopSwapperTest extends TestCase {
-
- private static final String baseDir = TestUtils.createTempDir().getAbsolutePath();
-
- private static final String clusterFile = "contrib/common/config/two-node-cluster.xml";
- private static final String storerFile = "contrib/common/config/testSwapStore.xml";
- private static final String storeName = "swapTestStore";
-
- VoldemortServer server1;
- VoldemortServer server2;
-
- @Override
- public void setUp() throws Exception {
- // clean baseDir to be sure
- FileDeleteStrategy.FORCE.delete(new File(baseDir));
-
- // First make the readOnlyIndex and copy the index to start Read-Only
- // store cleanly
- String indexDir = makeReadOnlyIndex(1, 1000);
-
- VoldemortConfig config = ServerTestUtils.createServerConfig(0,
- baseDir,
- clusterFile,
- storerFile);
- server1 = new VoldemortServer(config);
- // copy read-only index before starting
- FileUtils.copyFile(new File(indexDir, "0.index"),
- new File(config.getReadOnlyDataStorageDirectory(), storeName + ".index"));
- FileUtils.copyFile(new File(indexDir, "0.data"),
- new File(config.getReadOnlyDataStorageDirectory(), storeName + ".data"));
- server1.start();
-
- config = ServerTestUtils.createServerConfig(1, baseDir, clusterFile, storerFile);
- server2 = new VoldemortServer(config);
- // copy read-only index before starting
- FileUtils.copyFile(new File(indexDir, "1.index"),
- new File(config.getReadOnlyDataStorageDirectory(), storeName + ".index"));
- FileUtils.copyFile(new File(indexDir, "1.data"),
- new File(config.getReadOnlyDataStorageDirectory(), storeName + ".data"));
- server2.start();
- }
-
- @Override
- public void tearDown() throws IOException, InterruptedException {
- server1.stop();
- server2.stop();
- FileDeleteStrategy.FORCE.delete(new File(baseDir));
- }
-
- private String makeReadOnlyIndex(int minKey, int maxKey) throws Exception {
-
- Map<String, String> entryMap = new HashMap<String, String>();
- for(int i = minKey; i <= maxKey; i++) {
- entryMap.put("key" + i, "value-" + i);
- }
-
- Cluster cluster = new ClusterMapper().readCluster(new FileReader(new File("contrib/common/config/two-node-cluster.xml")));
- return ReadOnlySwapperTestUtils.createReadOnlyIndex(cluster, entryMap, baseDir);
- }
-
- public void testswap() throws Throwable {
- // assert that read-only store is working
- Store<ByteArray, byte[]> store1 = server1.getStoreRepository().getStorageEngine(storeName);
- Store<ByteArray, byte[]> store2 = server2.getStoreRepository().getStorageEngine(storeName);
-
- SerializerDefinition serDef = new SerializerDefinition("json", "'string'");
- Serializer<Object> serializer = StoreUtils.unsafeGetSerializer(new DefaultSerializerFactory(),
- serDef);
-
- // initial keys are from 1 to 1000
- for(int i = 1; i < 1000; i++) {
-
- ByteArray key = new ByteArray(serializer.toBytes("key" + i));
- byte[] value = serializer.toBytes("value-" + i);
-
- assertEquals("either store1 or store2 will have the key:'key-" + i + "'",
- true,
- store1.get(key).size() > 0 || store2.get(key).size() > 0);
-
- assertEquals("value should match",
- new String(value),
- new String((store1.get(key).size() > 0) ? store1.get(key)
- .get(0)
- .getValue()
- : store2.get(key).get(0).getValue()));
- }
-
- // lets create new index files
- final String newIndexDir = makeReadOnlyIndex(2000, 3000);
-
- ReadOnlyBatchIndexHadoopSwapper indexSwapper = new ReadOnlyBatchIndexHadoopSwapper() {
-
- @Override
- public void configure(JobConf conf) {
- conf.set("voldemort.cluster.local.filePath", clusterFile);
- conf.set("voldemort.store.name", storeName);
- conf.set("source.path", newIndexDir);
- conf.set("destination.path", baseDir + File.separatorChar
- + (int) (Math.random() * 1000));
- }
-
- @Override
- public Class<? extends SwapperMapper> getSwapperMapperClass() {
- return SwapperMapper.class;
- }
- };
-
- // do Index Swap
- indexSwapper.run(null);
-
- // check that only new keys can be seen
- for(int i = 1; i < 1000; i++) {
- ByteArray key = new ByteArray(serializer.toBytes("key" + i));
- assertEquals("store 1 get for key:" + i + " should be empty", 0, store1.get(key).size());
- assertEquals("store 2 get for key:" + i + " should be empty", 0, store2.get(key).size());
- }
-
- for(int i = 2000; i < 3000; i++) {
- ByteArray key = new ByteArray(serializer.toBytes("key" + i));
- assertEquals("either store1 or store2 will have the key:'key-" + i + "'",
- true,
- store1.get(key).size() > 0 || store2.get(key).size() > 0);
- }
-
- }
-
- static class SwapperMapper extends AbstractSwapperMapper {
-
- @Override
- public boolean copyRemoteFile(String hostname, String source, String destination) {
- // for test both files are local just
- System.out.println("copy Remote Files called host:" + hostname + " source:" + source
- + " destination:" + destination);
- assertEquals("source file should be present", true, new File(source).exists());
- try {
- FileUtils.copyFile(new File(source), new File(destination));
- } catch(IOException e) {
- System.out.println("copy call Failed");
- e.printStackTrace();
- }
-
- return new File(destination).exists();
- }
- }
-}
View
182 contrib/batch-indexer/test/voldemort/contrib/batchswapper/ReadOnlySimpleSwapperTest.java
@@ -1,182 +0,0 @@
-/*
- * Copyright 2008-2009 LinkedIn, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package voldemort.contrib.batchswapper;
-
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.io.FileDeleteStrategy;
-import org.apache.commons.io.FileUtils;
-
-import voldemort.ServerTestUtils;
-import voldemort.TestUtils;
-import voldemort.cluster.Cluster;
-import voldemort.serialization.DefaultSerializerFactory;
-import voldemort.serialization.Serializer;
-import voldemort.serialization.SerializerDefinition;
-import voldemort.server.VoldemortConfig;
-import voldemort.server.VoldemortServer;
-import voldemort.store.Store;
-import voldemort.store.StoreUtils;
-import voldemort.utils.ByteArray;
-import voldemort.utils.Props;
-import voldemort.xml.ClusterMapper;
-
-public class ReadOnlySimpleSwapperTest extends TestCase {
-
- private static final String baseDir = TestUtils.createTempDir().getAbsolutePath();
-
- private static final String clusterFile = "contrib/common/config/two-node-cluster.xml";
- private static final String storerFile = "contrib/common/config/testSwapStore.xml";
- private static final String storeName = "swapTestStore";
-
- VoldemortServer server1;
- VoldemortServer server2;
-
- @Override
- public void setUp() throws Exception {
- // clean baseDir to be sure
- FileDeleteStrategy.FORCE.delete(new File(baseDir));
-
- String indexDir = makeReadOnlyIndex(1, 1000);
- server1 = startServer(0, indexDir);
- server2 = startServer(1, indexDir);
- }
-
- private VoldemortServer startServer(int nodeId, String indexDir) throws Exception {
- VoldemortConfig config = ServerTestUtils.createServerConfig(nodeId,
- baseDir,
- clusterFile,
- storerFile);
- VoldemortServer server = new VoldemortServer(config);
- // copy read-only index before starting
- FileUtils.copyFile(new File(indexDir, nodeId + ".index"),
- new File(config.getReadOnlyDataStorageDirectory(), storeName + ".index"));
- FileUtils.copyFile(new File(indexDir, nodeId + ".data"),
- new File(config.getReadOnlyDataStorageDirectory(), storeName + ".data"));
- server.start();
- return server;
- }
-
- @Override
- public void tearDown() throws IOException, InterruptedException {
- server1.stop();
- server2.stop();
- FileDeleteStrategy.FORCE.delete(new File(baseDir));
- }
-
- private String makeReadOnlyIndex(int minKey, int maxKey) throws Exception {
-
- Map<String, String> entryMap = new HashMap<String, String>();
- for(int i = minKey; i <= maxKey; i++) {
- entryMap.put("key" + i, "value-" + i);
- }
-
- Cluster cluster = new ClusterMapper().readCluster(new FileReader(new File("contrib/common/config/two-node-cluster.xml")));
- return ReadOnlySwapperTestUtils.createReadOnlyIndex(cluster, entryMap, baseDir);
- }
-
- public void testswap() throws Throwable {
- // assert that read-only store is working
- Store<ByteArray, byte[]> store1 = server1.getStoreRepository().getStorageEngine(storeName);
- Store<ByteArray, byte[]> store2 = server2.getStoreRepository().getStorageEngine(storeName);
-
- SerializerDefinition serDef = new SerializerDefinition("json", "'string'");
- Serializer<Object> serializer = StoreUtils.unsafeGetSerializer(new DefaultSerializerFactory(),
- serDef);
-
- // initial keys are from 1 to 1000
- for(int i = 1; i < 1000; i++) {
-
- ByteArray key = new ByteArray(serializer.toBytes("key" + i));
- byte[] value = serializer.toBytes("value-" + i);
-
- assertEquals("either store1 or store2 will have the key:'key-" + i + "'",
- true,
- store1.get(key).size() > 0 || store2.get(key).size() > 0);
- assertEquals("value should match",
- new String(value),
- new String((store1.get(key).size() > 0) ? store1.get(key)
- .get(0)
- .getValue()
- : store2.get(key).get(0).getValue()));
- }
-
- // lets create new index files
- final String newIndexDir = makeReadOnlyIndex(2000, 3000);
-
- ReadOnlyBatchIndexSwapper indexSwapper = new ReadOnlyBatchIndexSwapper() {
-
- @Override
- public void configure(Props props) {
- props.put("voldemort.cluster.local.filePath", clusterFile);
- props.put("voldemort.store.name", storeName);
- props.put("source.local.path", newIndexDir);
- props.put("destination.remote.path", baseDir + File.separatorChar
- + (int) (Math.random() * 1000));
- }
-
- @Override
- public boolean copyRemoteFile(String hostname, String source, String destination) {
- // for test both files are local just
- int i = 0;
- while(i++ < 5)
- try {
- FileUtils.copyFile(new File(source), new File(destination));
- if(new File(destination).exists()) {
- return true;
- }
-
- } catch(IOException e) {
- // ignore
- }
-
- return false;
- }
- };
-
- // do Index Swap
- indexSwapper.run();
-
- // check that only new keys can be seen
- for(int i = 1; i < 1000; i++) {
- ByteArray key = new ByteArray(serializer.toBytes("key" + i));
- assertEquals("store 1 get for key:" + i + " should be empty", 0, store1.get(key).size());
- assertEquals("store 2 get for key:" + i + " should be empty", 0, store2.get(key).size());
- }
-
- for(int i = 2000; i < 3000; i++) {
- ByteArray key = new ByteArray(serializer.toBytes("key" + i));
- byte[] value = serializer.toBytes("value-" + i);
- assertEquals("either store1 or store2 will have the key:'key-" + i + "'",
- true,
- store1.get(key).size() > 0 || store2.get(key).size() > 0);
- assertEquals("value should match",
- new String(value),
- new String((store1.get(key).size() > 0) ? store1.get(key)
- .get(0)
- .getValue()
- : store2.get(key).get(0).getValue()));
- }
-
- }
-}
View
73 contrib/batch-indexer/test/voldemort/contrib/batchswapper/ReadOnlySwapperTestUtils.java
@@ -1,73 +0,0 @@
-package voldemort.contrib.batchswapper;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.util.Map;
-
-import voldemort.client.RoutingTier;
-import voldemort.cluster.Cluster;
-import voldemort.routing.ConsistentRoutingStrategy;
-import voldemort.routing.RoutingStrategy;
-import voldemort.serialization.SerializerDefinition;
-import voldemort.serialization.json.JsonReader;
-import voldemort.store.StoreDefinition;
-import voldemort.store.readonly.JsonStoreBuilder;
-import voldemort.store.readonly.RandomAccessFileStorageConfiguration;
-
-public class ReadOnlySwapperTestUtils {
-
- /**
- *
- * @param cluster
- * @param data
- * @param baseDir
- * @param TEST_SIZE
- * @return the directory where the index is created
- * @throws Exception
- */
- public static String createReadOnlyIndex(Cluster cluster,
- Map<String, String> data,
- String baseDir) throws Exception {
- // write data to file
- File dataFile = File.createTempFile("test", ".txt");
- dataFile.deleteOnExit();
- BufferedWriter writer = new BufferedWriter(new FileWriter(dataFile));
- for(Map.Entry<String, String> entry: data.entrySet())
- writer.write("\"" + entry.getKey() + "\"\t\"" + entry.getValue() + "\"\n");
- writer.close();
- BufferedReader reader = new BufferedReader(new FileReader(dataFile));
- JsonReader jsonReader = new JsonReader(reader);
-
- SerializerDefinition serDef = new SerializerDefinition("json", "'string'");
- StoreDefinition storeDef = new StoreDefinition("test",
- RandomAccessFileStorageConfiguration.TYPE_NAME,
- serDef,
- serDef,
- RoutingTier.CLIENT,
- 1,
- 1,
- 1,
- 1,
- 1,
- 1);
- RoutingStrategy router = new ConsistentRoutingStrategy(cluster.getNodes(), 1);
-
- // make a temp dir
- File dataDir = new File(baseDir + File.separatorChar + "read-only-temp-index-"
- + new Integer((int) (Math.random() * 1000)));
- // build and open store
- JsonStoreBuilder storeBuilder = new JsonStoreBuilder(jsonReader,
- cluster,
- storeDef,
- router,
- dataDir,
- 100,
- 1);
- storeBuilder.build();
-
- return dataDir.getAbsolutePath();
- }
-}
View
39 contrib/batch-indexer/test/voldemort/contrib/fetcher/HdfsFetcherTest.java
@@ -1,39 +0,0 @@
-package voldemort.contrib.fetcher;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Random;
-
-import org.apache.commons.io.IOUtils;
-
-import voldemort.utils.ByteUtils;
-
-import junit.framework.TestCase;
-
-public class HdfsFetcherTest extends TestCase {
-
- private File testFile;
- private HdfsFetcher fetcher;
-
- @Override
- public void setUp() throws IOException {
- this.testFile = File.createTempFile("test", ".dat");
- Random random = new Random();
- byte[] buffer = new byte[1000];
- random.nextBytes(buffer);
- this.fetcher = new HdfsFetcher();
- }
-
- public void testFetch() throws IOException {
- File fetchedFile = this.fetcher.fetchFile(testFile.getAbsolutePath());
- InputStream orig = new FileInputStream(testFile);
- byte[] origBytes = IOUtils.toByteArray(orig);
- InputStream fetched = new FileInputStream(fetchedFile);
- byte[] fetchedBytes = IOUtils.toByteArray(fetched);
- assertTrue("Fetched bytes not equal to original bytes.",
- 0 == ByteUtils.compare(origBytes, fetchedBytes));
- }
-
-}
View
0 ...-indexer/lib/commons-cli-2.0-SNAPSHOT.jar → ...-builder/lib/commons-cli-2.0-SNAPSHOT.jar
File renamed without changes.
View
0 .../batch-indexer/lib/hadoop-0.18.1-core.jar → ...-store-builder/lib/hadoop-0.18.1-core.jar
File renamed without changes.
View
0 .../performance/BdbBuildPerformanceTest.java → .../performance/BdbBuildPerformanceTest.java
File renamed without changes.
View
0 ...erformance/MysqlBuildPerformanceTest.java → ...erformance/MysqlBuildPerformanceTest.java
File renamed without changes.
View
80 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/benchmark/BuildTestStore.java
@@ -0,0 +1,80 @@
+package voldemort.store.readonly.benchmark;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import voldemort.cluster.Cluster;
+import voldemort.store.StoreDefinition;
+import voldemort.store.readonly.mr.AbstractStoreBuilderMapper;
+import voldemort.store.readonly.mr.HadoopStoreBuilder;
+import voldemort.utils.Utils;
+import voldemort.xml.ClusterMapper;
+import voldemort.xml.StoreDefinitionsMapper;
+
+/**
+ * Build a test store from the generated data
+ *
+ * @author jay
+ *
+ */
+public class BuildTestStore extends Configured implements Tool {
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new BuildTestStore(), args);
+ System.exit(res);
+ }
+
+ public int run(String[] args) throws Exception {
+ if(args.length != 5)
+ Utils.croak("Expected arguments store_name config_dir temp_dir input_path output_path");
+ String storeName = args[0];
+ String configDir = args[1];
+ String tempDir = args[2];
+ String inputDir = args[3];
+ String outputDir = args[4];
+
+ List<StoreDefinition> storeDefs = new StoreDefinitionsMapper().readStoreList(new File(configDir,
+ "stores.xml"));
+ StoreDefinition def = null;
+ for(StoreDefinition d: storeDefs)
+ if(d.getName().equals(storeName))
+ def = d;
+ Cluster cluster = new ClusterMapper().readCluster(new File(configDir, "cluster.xml"));
+
+ Configuration config = this.getConf();
+ HadoopStoreBuilder builder = new HadoopStoreBuilder(config,
+ BuildTestStoreMapper.class,
+ SequenceFileInputFormat.class,
+ cluster,
+ def,
+ 2,
+ new Path(tempDir),
+ new Path(outputDir),
+ new Path(inputDir));
+ builder.build();
+ return 0;
+ }
+
+ public static class BuildTestStoreMapper extends AbstractStoreBuilderMapper<Text, Text> {
+
+ @Override
+ public Object makeKey(Text key, Text value) {
+ return key.toString();
+ }
+
+ @Override
+ public Object makeValue(Text key, Text value) {
+ return value.toString();
+ }
+
+ }
+
+}
View
97 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/benchmark/GenerateData.java
@@ -0,0 +1,97 @@
+package voldemort.store.readonly.benchmark;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import voldemort.utils.Utils;
+
+/**
+ * A test harness that takes as input a text file of keys and generates random
+ * data as values. This data is output as a SequenceFile where the key is the
+ * given key, and the value is the produced value.
+ *
+ * @author jay
+ *
+ */
+public class GenerateData extends Configured implements Tool {
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new GenerateData(), args);
+ System.exit(res);
+ }
+
+ public int run(String[] args) throws Exception {
+ if(args.length != 3)
+ Utils.croak("USAGE: GenerateData input-file output-dir value-size");
+ JobConf conf = new JobConf(getConf(), GenerateData.class);
+ conf.setJobName("generate-data");
+
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(IntWritable.class);
+
+ conf.setMapperClass(GenerateDataMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+ conf.setNumReduceTasks(0);
+
+ conf.setInputFormat(TextInputFormat.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(Text.class);
+
+ Path inputPath = new Path(args[0]);
+ FileInputFormat.setInputPaths(conf, inputPath);
+ Path outputPath = new Path(args[1]);
+ // delete output path if it already exists
+ FileSystem fs = outputPath.getFileSystem(conf);
+ if(fs.exists(outputPath))
+ fs.delete(outputPath, true);
+ FileOutputFormat.setOutputPath(conf, outputPath);
+ conf.setInt("value.size", Integer.parseInt(args[2]));
+
+ JobClient.runJob(conf);
+ return 0;
+ }
+
+ public static class GenerateDataMapper extends MapReduceBase implements
+ Mapper<LongWritable, Text, Text, Text> {
+
+ private String string;
+
+ public void map(LongWritable lineNumber,
+ Text line,
+ OutputCollector<Text, Text> collector,
+ Reporter reporter) throws IOException {
+ collector.collect(line, new Text(string));
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ StringBuilder builder = new StringBuilder();
+ int size = job.getInt("value.size", -1);
+ for(int i = 0; i < size; i++)
+ builder.append('a');
+ this.string = builder.toString();
+ }
+ }
+
+}
View
105 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java
@@ -0,0 +1,105 @@
+package voldemort.store.readonly.fetcher;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import voldemort.store.readonly.FileFetcher;
+import voldemort.utils.Props;
+import voldemort.utils.Time;
+import voldemort.utils.Utils;
+
+/**
+ * A fetcher that fetches the store files from HDFS
+ *
+ * @author jay
+ *
+ */
+public class HdfsFetcher implements FileFetcher {
+
+ private static final Logger logger = Logger.getLogger(HdfsFetcher.class);
+ private static final int BUFFER_SIZE = 64 * 1024;
+ private static final int REPORTING_INTERVAL_BYTES = 100 * 1024 * 1024;
+
+ private final Long maxBytesPerSecond;
+
+ public HdfsFetcher(Props props) {
+ this(props.getBytes("fetcher.max.bytes.per.sec"));
+ }
+
+ public HdfsFetcher() {
+ this((Long) null);
+ }
+
+ public HdfsFetcher(Long maxBytesPerSecond) {
+ this.maxBytesPerSecond = maxBytesPerSecond;
+ }
+
+ public File fetchFile(String fileUrl) throws IOException {
+ Path filePath = new Path(fileUrl);
+ Configuration config = new Configuration();
+ config.setInt("io.file.buffer.size", 64 * 1024);
+ FileSystem fs = filePath.getFileSystem(config);
+ IoThrottler throttler = null;
+ if(maxBytesPerSecond != null)
+ throttler = new IoThrottler(maxBytesPerSecond);
+
+ // copy file
+ long bytesCopied = 0;
+ long bytesSinceLastReport = 0;
+ FSDataInputStream input = null;
+ OutputStream output = null;
+ try {
+ input = fs.open(filePath);
+ File outputFile = File.createTempFile("fetcher-", ".dat");
+ output = new FileOutputStream(outputFile);
+ byte[] buffer = new byte[BUFFER_SIZE];
+ while(true) {
+ int read = input.read(buffer);
+ if(read < 0)
+ break;
+ output.write(buffer, 0, read);
+ if(throttler != null)
+ throttler.maybeThrottle(read);
+ bytesSinceLastReport += read;
+ bytesCopied += read;
+ if(bytesSinceLastReport > REPORTING_INTERVAL_BYTES) {
+ logger.info(bytesCopied / (1024 * 1024) + " MB copied");
+ bytesSinceLastReport = 0;
+ }
+
+ }
+ return outputFile;
+ } finally {
+ IOUtils.closeQuietly(output);
+ IOUtils.closeQuietly(input);
+ }
+ }
+
+ /*
+ * Main method for testing fetching
+ */
+ public static void main(String[] args) throws Exception {
+ if(args.length != 2)
+ Utils.croak("USAGE: java " + HdfsFetcher.class.getName() + " url maxBytesPerSec");
+ String url = args[0];
+ long maxBytesPerSec = Long.parseLong(args[1]);
+ Path p = new Path(url);
+ FileStatus status = p.getFileSystem(new Configuration()).getFileStatus(p);
+ long size = status.getLen();
+ HdfsFetcher fetcher = new HdfsFetcher(maxBytesPerSec);
+ long start = System.currentTimeMillis();
+ fetcher.fetchFile(url);
+ double rate = size * Time.MS_PER_SECOND / (double) (System.currentTimeMillis() - start);
+ System.out.println("Fetch completed: " + rate + " bytes/sec.");
+ }
+}
View
74 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/IoThrottler.java
@@ -0,0 +1,74 @@
+package voldemort.store.readonly.fetcher;
+
+import org.apache.log4j.Logger;
+
+import voldemort.VoldemortException;
+import voldemort.annotations.concurrency.NotThreadsafe;
+import voldemort.utils.SystemTime;
+import voldemort.utils.Time;
+
+/**
+ * A class to throttle IO to a certain rate
+ *
+ * This class takes a maximum rate in bytes/sec and a minimum interval in ms at
+ * which to check the rate. The rate is checked every time the interval
+ * ellapses, and if the IO rate exceeds the maximum, the call will block long
+ * enough to equalize it.
+ *
+ * @author jay
+ *
+ */
+@NotThreadsafe
+public class IoThrottler {
+
+ private final static Logger logger = Logger.getLogger(IoThrottler.class);
+ private final static long DEFAULT_CHECK_INTERVAL_MS = 50;
+
+ private final Time time;
+ private final long maxBytesPerSecond;
+ private final long intervalMs;
+ private long startTime;
+ private long bytesReadInInterval;
+
+ public IoThrottler(long maxBytesPerSecond) {
+ this(SystemTime.INSTANCE, maxBytesPerSecond, DEFAULT_CHECK_INTERVAL_MS);
+ }
+
+ public IoThrottler(Time time, long maxBytesPerSecond, long intervalMs) {
+ this.time = time;
+ this.intervalMs = intervalMs;
+ this.maxBytesPerSecond = maxBytesPerSecond;
+ this.bytesReadInInterval = 0L;
+ this.startTime = 0L;
+ }
+
+ public void maybeThrottle(int bytesRead) {
+ bytesReadInInterval += bytesRead;
+ long now = time.getNanoseconds();
+ long ellapsedNs = now - startTime;
+ // if we have completed an interval AND we have read some bytes, maybe
+ // we should take a little nap
+ if(ellapsedNs > intervalMs * Time.NS_PER_MS && bytesReadInInterval > 0) {
+ long bytesPerSec = (bytesReadInInterval * Time.NS_PER_SECOND) / ellapsedNs;
+ if(bytesPerSec > maxBytesPerSecond) {
+ // solve for the amount of time to sleep to make us hit the
+ // correct i/o rate
+ double maxBytesPerMs = maxBytesPerSecond / (double) Time.MS_PER_SECOND;
+ long ellapsedMs = ellapsedNs / Time.NS_PER_MS;
+ long sleepTime = Math.round(bytesReadInInterval / maxBytesPerMs - ellapsedMs);
+ if(logger.isDebugEnabled())
+ logger.debug("Natural I/O rate is " + bytesPerSec + " bytes/sec, sleeping for "
+ + sleepTime + " ms to compensate.");
+ if(sleepTime > 0) {
+ try {
+ time.sleep(sleepTime);
+ } catch(InterruptedException e) {
+ throw new VoldemortException(e);
+ }
+ }
+ }
+ startTime = now;
+ bytesReadInInterval = 0;
+ }
+ }
+}
View
83 ...hadoop-store-builder/src/java/voldemort/store/readonly/mr/AbstractStoreBuilderMapper.java
@@ -0,0 +1,83 @@
+package voldemort.store.readonly.mr;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import voldemort.cluster.Node;
+import voldemort.routing.ConsistentRoutingStrategy;
+import voldemort.serialization.DefaultSerializerFactory;
+import voldemort.serialization.Serializer;
+import voldemort.utils.ByteUtils;
+
+/**
+ * Mapper reads input data and translates it into data serialized with the
+ * appropriate Serializer for the given store. Override makeKey() and
+ * makeValue() to create the appropriate objects to pass into the Serializer.
+ *
+ * This mapper expects the store name to be defined by the property
+ * voldemort.store.name, and it expects to find distributed cache files
+ * cluster.xml and stores.xml.
+ *
+ * @author bbansal, jay
+ *
+ */
+public abstract class AbstractStoreBuilderMapper<K, V> extends HadoopStoreBuilderBase implements
+ Mapper<K, V, BytesWritable, BytesWritable> {
+
+ private ConsistentRoutingStrategy routingStrategy;
+ private Serializer<Object> keySerializer;
+ private Serializer<Object> valueSerializer;
+
+ public abstract Object makeKey(K key, V value);
+
+ public abstract Object makeValue(K key, V value);
+
+ /**
+ * Create the voldemort key and value from the input key and value and map
+ * it out for each of the responsible voldemort nodes
+ *
+ * The output key is the md5 of the serialized key returned by makeKey().
+ * The output value is the nodeid of the responsible node followed by
+ * serialized value returned by makeValue().
+ */
+ public void map(K key,
+ V value,
+ OutputCollector<BytesWritable, BytesWritable> output,
+ Reporter reporter) throws IOException {
+ byte[] keyBytes = keySerializer.toBytes(makeKey(key, value));
+ byte[] valBytes = valueSerializer.toBytes(makeValue(key, value));
+
+ List<Node> nodes = routingStrategy.routeRequest(keyBytes);
+ for(Node node: nodes) {
+ ByteArrayOutputStream versionedValue = new ByteArrayOutputStream();
+ DataOutputStream valueDin = new DataOutputStream(versionedValue);
+ valueDin.writeInt(node.getId());
+ valueDin.write(valBytes);
+ valueDin.close();
+ BytesWritable outputKey = new BytesWritable(ByteUtils.md5(keyBytes));
+ BytesWritable outputVal = new BytesWritable(versionedValue.toByteArray());
+
+ output.collect(outputKey, outputVal);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void configure(JobConf conf) {
+ super.configure(conf);
+ keySerializer = (Serializer<Object>) new DefaultSerializerFactory().getSerializer(getStoreDef().getKeySerializer());
+ valueSerializer = (Serializer<Object>) new DefaultSerializerFactory().getSerializer(getStoreDef().getValueSerializer());
+
+ routingStrategy = new ConsistentRoutingStrategy(getCluster().getNodes(),
+ getStoreDef().getReplicationFactor());
+ }
+
+}
View
113 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilder.java
@@ -0,0 +1,113 @@
+package voldemort.store.readonly.mr;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+import voldemort.VoldemortException;
+import voldemort.cluster.Cluster;
+import voldemort.store.StoreDefinition;
+import voldemort.utils.Utils;
+import voldemort.xml.ClusterMapper;
+import voldemort.xml.StoreDefinitionsMapper;
+
+/**
+ * Creates a simple Read-Only Voldemort store for easy batch update.
+ * <p>
+ * Creates a read-only store from the specified input data
+ *
+ * @author bbansal, jay
+ */
+public class HadoopStoreBuilder {
+
+ private final Configuration config;
+ private final Class<? extends AbstractStoreBuilderMapper<?, ?>> mapperClass;
+ @SuppressWarnings("unchecked")
+ private final Class<? extends InputFormat> inputFormatClass;
+ private final Cluster cluster;
+ private final StoreDefinition storeDef;
+ private final int replicationFactor;
+ private final Path inputPath;
+ private final Path outputDir;
+ private final Path tempDir;
+
+ /**
+ * Create the store builder
+ *
+ * @param conf A base configuration to start with
+ * @param mapperClass The class to use as the mapper
+ * @param inputFormatClass The input format to use for reading values
+ * @param cluster The voldemort cluster for which the stores are being built
+ * @param storeDef The store definition of the store
+ * @param replicationFactor
+ * @param tempDir
+ * @param outputDir
+ * @param path
+ */
+ @SuppressWarnings("unchecked")
+ public HadoopStoreBuilder(Configuration conf,
+ Class<? extends AbstractStoreBuilderMapper<?, ?>> mapperClass,
+ Class<? extends InputFormat> inputFormatClass,
+ Cluster cluster,
+ StoreDefinition storeDef,
+ int replicationFactor,
+ Path tempDir,
+ Path outputDir,
+ Path inputPath) {
+ super();
+ this.config = conf;
+ this.mapperClass = Utils.notNull(mapperClass);
+ this.inputFormatClass = Utils.notNull(inputFormatClass);
+ this.inputPath = inputPath;
+ this.cluster = Utils.notNull(cluster);
+ this.storeDef = Utils.notNull(storeDef);
+ this.replicationFactor = replicationFactor;
+ this.tempDir = tempDir;
+ this.outputDir = Utils.notNull(outputDir);
+ }
+
+ public void build() {
+ JobConf conf = new JobConf(config);
+ conf.setInt("io.file.buffer.size", 64 * 1024);
+ conf.set("cluster.xml", new ClusterMapper().writeCluster(cluster));
+ conf.set("stores.xml",
+ new StoreDefinitionsMapper().writeStoreList(Collections.singletonList(storeDef)));
+ conf.setInt("store.output.replication.factor", replicationFactor);
+ conf.setNumReduceTasks(cluster.getNumberOfNodes());
+ conf.setPartitionerClass(HadoopStoreBuilderPartitioner.class);
+ conf.setMapperClass(mapperClass);
+ conf.setMapOutputKeyClass(BytesWritable.class);
+ conf.setMapOutputValueClass(BytesWritable.class);
+ conf.setReducerClass(HadoopStoreBuilderReducer.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ conf.setOutputKeyClass(BytesWritable.class);
+ conf.setOutputValueClass(BytesWritable.class);
+ conf.setInputFormat(inputFormatClass);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ FileInputFormat.setInputPaths(conf, inputPath);
+ conf.set("final.output.dir", outputDir.toString());
+ FileOutputFormat.setOutputPath(conf, tempDir);
+
+ try {
+ // delete output dir if it already exists
+ FileSystem fs = tempDir.getFileSystem(conf);
+ fs.delete(tempDir, true);
+
+ // run job
+ JobClient.runJob(conf);
+ } catch(IOException e) {
+ throw new VoldemortException(e);
+ }