Permalink
Browse files

Read-only store improvements: (1) Break store into chunks so that mul…

…tiple hadoop reducers can do build in parallel, (2) move to mmap instead of RandomAccessFile to avoid read-ahead, (3) rename everything to ReadOnlyStorageEngine instead of RandomAccessFileStore for consistency.
  • Loading branch information...
1 parent 83fed1e commit f77aff653637650ca891083336dd420d2384cd29 @jkreps jkreps committed May 15, 2009
Showing with 1,063 additions and 933 deletions.
  1. +20 −0 build.xml
  2. +4 −2 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/benchmark/BuildTestStore.java
  3. +84 −21 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java
  4. +7 −7 ...emort/store/readonly/mr/{AbstractStoreBuilderMapper.java → AbstractHadoopStoreBuilderMapper.java}
  5. +13 −5 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilder.java
  6. +22 −7 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderPartitioner.java
  7. +49 −31 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/mr/HadoopStoreBuilderReducer.java
  8. +1 −1 contrib/hadoop-store-builder/test/voldemort/store/readonly/fetcher/HdfsFetcherTest.java
  9. +10 −10 ...re-builder/test/voldemort/store/readonly/mr/{StoreBuilderTest.java → HadoopStoreBuilderTest.java}
  10. +2 −8 src/java/voldemort/server/VoldemortConfig.java
  11. +20 −33 src/java/voldemort/server/http/gui/ReadOnlyStoreManagementServlet.java
  12. +4 −12 src/java/voldemort/server/http/gui/templates/read-only-mgmt.vm
  13. +170 −0 src/java/voldemort/store/readonly/ChunkedFileSet.java
  14. +1 −1 src/java/voldemort/store/readonly/FileFetcher.java
  15. +44 −29 src/java/voldemort/store/readonly/JsonStoreBuilder.java
  16. +0 −604 src/java/voldemort/store/readonly/RandomAccessFileStore.java
  17. +5 −8 ...rt/store/readonly/{RandomAccessFileStorageConfiguration.java → ReadOnlyStorageConfiguration.java}
  18. +421 −0 src/java/voldemort/store/readonly/ReadOnlyStorageEngine.java
  19. +11 −0 src/java/voldemort/store/readonly/ReadOnlyUtils.java
  20. +22 −35 src/java/voldemort/store/readonly/StoreSwapper.java
  21. +17 −0 src/java/voldemort/utils/Utils.java
  22. +4 −3 test/common/voldemort/TestUtils.java
  23. +0 −60 test/integration/voldemort/CatRandomAccessFileStore.java
  24. +66 −0 test/integration/voldemort/CatReadOnlyStore.java
  25. +2 −2 test/integration/voldemort/performance/ReadOnlyStorePerformanceTest.java
  26. +13 −21 test/unit/voldemort/store/readonly/RandomAccessStoreTestInstance.java
  27. +51 −33 test/unit/voldemort/store/readonly/{RandomAccessFileStoreTest.java → ReadOnlyStorageEngineTest.java}
View
@@ -219,6 +219,26 @@
</tarfileset>
</tar>
</target>
+
+ <target name="hadoop-benchmark-jar" depends="build, contrib-build" description="Build a jar file that includes all contrib code.">
+ <jar destfile="${dist.dir}/${name}-${curr.release}-all.jar">
+ <fileset dir="${classes.dir}">
+ <include name="**/*.*" />
+ </fileset>
+ <fileset dir="${contrib.classes.dir}">
+ <include name="**/*.*" />
+ </fileset>
+ <!-- include xsds -->
+ <fileset dir="${java.dir}">
+ <include name="**/*.xsd" />
+ </fileset>
+ <fileset dir="">
+ <include name="lib/jdom.jar"/>
+ <include name="lib/google-collect*.jar"/>
+ <include name="lib/commons-lang*.jar"/>
+ </fileset>
+ </jar>
+ </target>
<target name="junit" depends="build, buildtest" description="Run junit tests.">
<replace-dir dir="${testreport.dir}" />
@@ -13,7 +13,7 @@
import voldemort.cluster.Cluster;
import voldemort.store.StoreDefinition;
-import voldemort.store.readonly.mr.AbstractStoreBuilderMapper;
+import voldemort.store.readonly.mr.AbstractHadoopStoreBuilderMapper;
import voldemort.store.readonly.mr.HadoopStoreBuilder;
import voldemort.utils.Utils;
import voldemort.xml.ClusterMapper;
@@ -50,20 +50,22 @@ public int run(String[] args) throws Exception {
Cluster cluster = new ClusterMapper().readCluster(new File(configDir, "cluster.xml"));
Configuration config = this.getConf();
+ config.set("mapred.job.name", "test-store-builder");
HadoopStoreBuilder builder = new HadoopStoreBuilder(config,
BuildTestStoreMapper.class,
SequenceFileInputFormat.class,
cluster,
def,
2,
+ 512 * 1024 * 1024,
new Path(tempDir),
new Path(outputDir),
new Path(inputDir));
builder.build();
return 0;
}
- public static class BuildTestStoreMapper extends AbstractStoreBuilderMapper<Text, Text> {
+ public static class BuildTestStoreMapper extends AbstractHadoopStoreBuilderMapper<Text, Text> {
@Override
public Object makeKey(Text key, Text value) {
@@ -30,38 +30,75 @@
private static final int BUFFER_SIZE = 64 * 1024;
private static final int REPORTING_INTERVAL_BYTES = 100 * 1024 * 1024;
+ private File tempDir;
private final Long maxBytesPerSecond;
public HdfsFetcher(Props props) {
- this(props.getBytes("fetcher.max.bytes.per.sec"));
+ this(props.getBytes("fetcher.max.bytes.per.sec"),
+ new File(props.getString("hdfs.fetcher.tmp.dir", null)));
}
public HdfsFetcher() {
- this((Long) null);
+ this((Long) null, null);
}
- public HdfsFetcher(Long maxBytesPerSecond) {
+ public HdfsFetcher(Long maxBytesPerSecond, File tempDir) {
+ if(tempDir == null)
+ this.tempDir = new File(System.getProperty("java.io.tmpdir"), "hdfs-fetcher");
+ else
+ this.tempDir = Utils.notNull(new File(tempDir, "hdfs-fetcher"));
this.maxBytesPerSecond = maxBytesPerSecond;
+ this.tempDir.mkdirs();
}
- public File fetchFile(String fileUrl) throws IOException {
- Path filePath = new Path(fileUrl);
+ public File fetch(String fileUrl) throws IOException {
+ Path path = new Path(fileUrl);
Configuration config = new Configuration();
config.setInt("io.file.buffer.size", 64 * 1024);
- FileSystem fs = filePath.getFileSystem(config);
+ FileSystem fs = path.getFileSystem(config);
IoThrottler throttler = null;
if(maxBytesPerSecond != null)
throttler = new IoThrottler(maxBytesPerSecond);
// copy file
- long bytesCopied = 0;
- long bytesSinceLastReport = 0;
+ CopyStats stats = new CopyStats();
+ File destination = new File(this.tempDir, path.getName());
+ fetch(fs, path, destination, throttler, stats);
+ return destination;
+ }
+
+ private void fetch(FileSystem fs, Path source, File dest, IoThrottler throttler, CopyStats stats)
+ throws IOException {
+ if(fs.isFile(source)) {
+ copyFile(fs, source, dest, throttler, stats);
+ } else {
+ dest.mkdirs();
+ FileStatus[] statuses = fs.listStatus(source);
+ if(statuses != null) {
+ for(FileStatus status: statuses) {
+ if(!status.getPath().getName().startsWith(".")) {
+ fetch(fs,
+ status.getPath(),
+ new File(dest, status.getPath().getName()),
+ throttler,
+ stats);
+ }
+ }
+ }
+ }
+ }
+
+ private void copyFile(FileSystem fs,
+ Path source,
+ File dest,
+ IoThrottler throttler,
+ CopyStats stats) throws IOException {
+ logger.info("Starting copy of " + source + " to " + dest);
FSDataInputStream input = null;
OutputStream output = null;
try {
- input = fs.open(filePath);
- File outputFile = File.createTempFile("fetcher-", ".dat");
- output = new FileOutputStream(outputFile);
+ input = fs.open(source);
+ output = new FileOutputStream(dest);
byte[] buffer = new byte[BUFFER_SIZE];
while(true) {
int read = input.read(buffer);
@@ -70,21 +107,47 @@ public File fetchFile(String fileUrl) throws IOException {
output.write(buffer, 0, read);
if(throttler != null)
throttler.maybeThrottle(read);
- bytesSinceLastReport += read;
- bytesCopied += read;
- if(bytesSinceLastReport > REPORTING_INTERVAL_BYTES) {
- logger.info(bytesCopied / (1024 * 1024) + " MB copied");
- bytesSinceLastReport = 0;
+ stats.recordBytes(read);
+ if(stats.getBytesSinceLastReport() > REPORTING_INTERVAL_BYTES) {
+ logger.info(stats.getBytesCopied() / (1024 * 1024) + " MB copied");
+ stats.resetBytesSinceLastReport();
}
-
}
- return outputFile;
+ logger.info("Completed copy of " + source + " to " + dest);
} finally {
IOUtils.closeQuietly(output);
IOUtils.closeQuietly(input);
}
}
+ private static class CopyStats {
+
+ private long bytesSinceLastReport;
+ private long bytesCopied;
+
+ public CopyStats() {
+ this.bytesCopied = 0;
+ this.bytesSinceLastReport = 0;
+ }
+
+ public void recordBytes(long bytes) {
+ this.bytesCopied += bytes;
+ this.bytesSinceLastReport += bytes;
+ }
+
+ public void resetBytesSinceLastReport() {
+ this.bytesSinceLastReport = 0;
+ }
+
+ public long getBytesSinceLastReport() {
+ return bytesSinceLastReport;
+ }
+
+ public long getBytesCopied() {
+ return bytesCopied;
+ }
+ }
+
/*
* Main method for testing fetching
*/
@@ -96,10 +159,10 @@ public static void main(String[] args) throws Exception {
Path p = new Path(url);
FileStatus status = p.getFileSystem(new Configuration()).getFileStatus(p);
long size = status.getLen();
- HdfsFetcher fetcher = new HdfsFetcher(maxBytesPerSec);
+ HdfsFetcher fetcher = new HdfsFetcher(maxBytesPerSec, null);
long start = System.currentTimeMillis();
- fetcher.fetchFile(url);
+ File location = fetcher.fetch(url);
double rate = size * Time.MS_PER_SECOND / (double) (System.currentTimeMillis() - start);
- System.out.println("Fetch completed: " + rate + " bytes/sec.");
+ System.out.println("Fetch to " + location + " completed: " + rate + " bytes/sec.");
}
}
@@ -29,8 +29,8 @@
* @author bbansal, jay
*
*/
-public abstract class AbstractStoreBuilderMapper<K, V> extends HadoopStoreBuilderBase implements
- Mapper<K, V, BytesWritable, BytesWritable> {
+public abstract class AbstractHadoopStoreBuilderMapper<K, V> extends HadoopStoreBuilderBase
+ implements Mapper<K, V, BytesWritable, BytesWritable> {
private ConsistentRoutingStrategy routingStrategy;
private Serializer<Object> keySerializer;
@@ -57,11 +57,11 @@ public void map(K key,
List<Node> nodes = routingStrategy.routeRequest(keyBytes);
for(Node node: nodes) {
- ByteArrayOutputStream versionedValue = new ByteArrayOutputStream();
- DataOutputStream valueDin = new DataOutputStream(versionedValue);
- valueDin.writeInt(node.getId());
- valueDin.write(valBytes);
- valueDin.close();
+ ByteArrayOutputStream versionedValue = new ByteArrayOutputStream(keyBytes.length + 4);
+ DataOutputStream valueStream = new DataOutputStream(versionedValue);
+ valueStream.writeInt(node.getId());
+ valueStream.write(valBytes);
+ valueStream.close();
BytesWritable outputKey = new BytesWritable(ByteUtils.md5(keyBytes));
BytesWritable outputVal = new BytesWritable(versionedValue.toByteArray());
@@ -4,6 +4,7 @@
import java.util.Collections;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -31,12 +32,13 @@
public class HadoopStoreBuilder {
private final Configuration config;
- private final Class<? extends AbstractStoreBuilderMapper<?, ?>> mapperClass;
+ private final Class<? extends AbstractHadoopStoreBuilderMapper<?, ?>> mapperClass;
@SuppressWarnings("unchecked")
private final Class<? extends InputFormat> inputFormatClass;
private final Cluster cluster;
private final StoreDefinition storeDef;
private final int replicationFactor;
+ private final int chunkSizeBytes;
private final Path inputPath;
private final Path outputDir;
private final Path tempDir;
@@ -50,17 +52,19 @@
* @param cluster The voldemort cluster for which the stores are being built
* @param storeDef The store definition of the store
* @param replicationFactor
+ * @param chunkSizeBytes
* @param tempDir
* @param outputDir
* @param path
*/
@SuppressWarnings("unchecked")
public HadoopStoreBuilder(Configuration conf,
- Class<? extends AbstractStoreBuilderMapper<?, ?>> mapperClass,
+ Class<? extends AbstractHadoopStoreBuilderMapper<?, ?>> mapperClass,
Class<? extends InputFormat> inputFormatClass,
Cluster cluster,
StoreDefinition storeDef,
int replicationFactor,
+ int chunkSizeBytes,
Path tempDir,
Path outputDir,
Path inputPath) {
@@ -72,6 +76,7 @@ public HadoopStoreBuilder(Configuration conf,
this.cluster = Utils.notNull(cluster);
this.storeDef = Utils.notNull(storeDef);
this.replicationFactor = replicationFactor;
+ this.chunkSizeBytes = chunkSizeBytes;
this.tempDir = tempDir;
this.outputDir = Utils.notNull(outputDir);
}
@@ -83,7 +88,6 @@ public void build() {
conf.set("stores.xml",
new StoreDefinitionsMapper().writeStoreList(Collections.singletonList(storeDef)));
conf.setInt("store.output.replication.factor", replicationFactor);
- conf.setNumReduceTasks(cluster.getNumberOfNodes());
conf.setPartitionerClass(HadoopStoreBuilderPartitioner.class);
conf.setMapperClass(mapperClass);
conf.setMapOutputKeyClass(BytesWritable.class);
@@ -94,6 +98,7 @@ public void build() {
conf.setOutputValueClass(BytesWritable.class);
conf.setInputFormat(inputFormatClass);
conf.setOutputFormat(SequenceFileOutputFormat.class);
+ conf.setJarByClass(getClass());
FileInputFormat.setInputPaths(conf, inputPath);
conf.set("final.output.dir", outputDir.toString());
FileOutputFormat.setOutputPath(conf, tempDir);
@@ -103,11 +108,14 @@ public void build() {
FileSystem fs = tempDir.getFileSystem(conf);
fs.delete(tempDir, true);
- // run job
+ FileStatus status = fs.getFileStatus(inputPath);
+ int numChunks = Math.max((int) status.getLen() / chunkSizeBytes, 1);
+ conf.setInt("num.chunks", numChunks);
+ conf.setNumReduceTasks(cluster.getNumberOfNodes() * numChunks);
+
JobClient.runJob(conf);
} catch(IOException e) {
throw new VoldemortException(e);
}
}
-
}
@@ -1,22 +1,37 @@
package voldemort.store.readonly.mr;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import voldemort.VoldemortException;
+import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.utils.ByteUtils;
/**
- * A Partitioner that pulls the voldemort node id from the first four bytes of
- * the value
+ * A Partitioner that splits data so that all data for the same nodeId, chunkId
+ * combination ends up in the same reduce
*
- * @author bbansal
+ * @author bbansal, jay
*
*/
-public class HadoopStoreBuilderPartitioner extends HashPartitioner<BytesWritable, BytesWritable> {
+public class HadoopStoreBuilderPartitioner implements Partitioner<BytesWritable, BytesWritable> {
+
+ private int numChunks;
- @Override
public int getPartition(BytesWritable key, BytesWritable value, int numReduceTasks) {
int nodeId = ByteUtils.readInt(value.get(), 0);
- return (nodeId) % numReduceTasks;
+ int chunkId = ReadOnlyUtils.chunk(key.get(), numChunks);
+ System.out.println("nodeId = " + nodeId + ", chunkId = " + chunkId + ", partition = "
+ + (chunkId * numChunks + nodeId) % numReduceTasks
+ + ", numReduceTasks = " + numReduceTasks);
+ return (chunkId * numChunks + nodeId) % numReduceTasks;
+ }
+
+ public void configure(JobConf job) {
+ this.numChunks = job.getInt("num.chunks", -1);
+ if(this.numChunks < 1)
+ throw new VoldemortException("num.chunks not specified in the job conf.");
}
+
}
Oops, something went wrong.

0 comments on commit f77aff6

Please sign in to comment.