Permalink
Browse files

Borrowing from Todd Lipcon's great work, added a distributed lzo inde…

…xer. It indexes lzo files in a map reduce job, one mapper per file, using custom input and output formats. Moving the indexing computation away from a local process is a big step forward. Thanks Todd.
  • Loading branch information...
1 parent f20f81a commit f706102ffd1c22b2b9960afe80650a53e52ac0fe @kevinweil kevinweil committed Jan 11, 2010
View
111 src/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java
@@ -0,0 +1,111 @@
+package com.hadoop.compression.lzo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import com.hadoop.mapreduce.LzoIndexOutputFormat;
+import com.hadoop.mapreduce.LzoSplitInputFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class DistributedLzoIndexer extends Configured implements Tool {
+ private static final Log LOG = LogFactory.getLog(DistributedLzoIndexer.class);
+ private final String LZO_EXTENSION = new LzopCodec().getDefaultExtension();
+
+ private final PathFilter nonTemporaryFilter = new PathFilter() {
+ public boolean accept(Path path) {
+ return !path.toString().endsWith("/_temporary");
+ }
+ };
+
+ private void walkPath(Path path, PathFilter pathFilter, List<Path> accumulator) {
+ try {
+ FileSystem fs = path.getFileSystem(getConf());
+ FileStatus fileStatus = fs.getFileStatus(path);
+
+ if (fileStatus.isDir()) {
+ FileStatus[] children = fs.listStatus(path, pathFilter);
+ for (FileStatus childStatus : children) {
+ walkPath(childStatus.getPath(), pathFilter, accumulator);
+ }
+ } else if (path.toString().endsWith(LZO_EXTENSION)) {
+ Path lzoIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX);
+ if (fs.exists(lzoIndexPath)) {
+ // If the index exists and is of nonzero size, we're already done.
+ // We re-index a file with a zero-length index, because every file has at least one block.
+ if (fs.getFileStatus(lzoIndexPath).getLen() > 0) {
+ LOG.info("[SKIP] LZO index file already exists for " + path);
+ return;
+ } else {
+ LOG.info("Adding LZO file " + path + " to indexing list (index file exists but is zero length)");
+ accumulator.add(path);
+ }
+ } else {
+ // If no index exists, we need to index the file.
+ LOG.info("Adding LZO file " + path + " to indexing list (no index currently exists)");
+ accumulator.add(path);
+ }
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Error walking path: " + path, ioe);
+ }
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length == 0 || (args.length == 1 && "--help".equals(args[0]))) {
+ printUsage();
+ ToolRunner.printGenericCommandUsage(System.err);
+ return -1;
+ }
+
+ List<Path> inputPaths = new ArrayList<Path>();
+ for (String strPath: args) {
+ walkPath(new Path(strPath), nonTemporaryFilter, inputPaths);
+ }
+
+ Configuration conf = new Configuration();
+ Job job = new Job(conf);
+ job.setJobName("Distributed Lzo Indexer " + Arrays.toString(args));
+
+ job.setOutputKeyClass(Path.class);
+ job.setOutputValueClass(LongWritable.class);
+
+ job.setJarByClass(DistributedLzoIndexer.class);
+ job.setInputFormatClass(LzoSplitInputFormat.class);
+ job.setOutputFormatClass(LzoIndexOutputFormat.class);
+ job.setNumReduceTasks(0);
+ job.setMapperClass(Mapper.class);
+
+ for (Path p : inputPaths) {
+ FileInputFormat.addInputPath(job, p);
+ }
+
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new DistributedLzoIndexer(), args);
+ System.exit(exitCode);
+ }
+
+ public static void printUsage() {
+ System.err.println("Usage: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer <file.lzo | directory> [file2.lzo directory3 ...]");
+ }
+}
View
40 src/java/com/hadoop/mapreduce/LzoIndexOutputFormat.java
@@ -0,0 +1,40 @@
+package com.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class LzoIndexOutputFormat extends OutputFormat<Path, LongWritable> {
+ @Override
+ public RecordWriter<Path, LongWritable> getRecordWriter(TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ return new LzoIndexRecordWriter(taskAttemptContext);
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException {}
+
+ // A totally no-op output committer, because the LzoIndexRecordWriter opens a file on the side
+ // and writes to that instead.
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ return new OutputCommitter() {
+ @Override public void setupJob(JobContext jobContext) throws IOException {}
+ @Override public void cleanupJob(JobContext jobContext) throws IOException {}
+ @Override public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {}
+ @Override public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {}
+ @Override public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {}
+ @Override public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
+ return false;
+ }
+ };
+ }
+}
View
67 src/java/com/hadoop/mapreduce/LzoIndexRecordWriter.java
@@ -0,0 +1,67 @@
+package com.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import com.hadoop.compression.lzo.LzoIndex;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class LzoIndexRecordWriter extends RecordWriter<Path, LongWritable> {
+ private static final Log LOG = LogFactory.getLog(LzoIndexRecordWriter.class);
+
+ private FSDataOutputStream outputStream;
+ private final TaskAttemptContext context;
+
+ private FileSystem fs;
+ private Path inputPath;
+ private Path tmpIndexPath;
+ private Path realIndexPath;
+
+ public LzoIndexRecordWriter(TaskAttemptContext taskAttemptContext) {
+ context = taskAttemptContext;
+ }
+
+ @Override
+ public void write(Path path, LongWritable offset) throws IOException, InterruptedException {
+ if (outputStream == null) {
+ // Set up the output file on the first record.
+ LOG.info("Setting up output stream to write index file for " + path);
+ outputStream = setupOutputFile(path);
+ }
+ offset.write(outputStream);
+ }
+
+ @Override
+ public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ if (outputStream != null) {
+ // Close the output stream so that the tmp file is synced, then move it.
+ outputStream.close();
+
+ LOG.info("In close, now renaming " + tmpIndexPath + " to final location " + realIndexPath);
+ // Rename, indexing completed.
+ fs.rename(tmpIndexPath, realIndexPath);
+ }
+ }
+
+ private FSDataOutputStream setupOutputFile(Path path) throws IOException {
+ fs = path.getFileSystem(context.getConfiguration());
+ inputPath = path;
+
+ // For /a/b/c.lzo, tmpIndexPath = /a/b/c.lzo.index.tmp,
+ // and it is moved to realIndexPath = /a/b/c.lzo.index upon completion.
+ tmpIndexPath = path.suffix(LzoIndex.LZO_TMP_INDEX_SUFFIX);
+ realIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX);
+
+ // Delete the old index files if they exist.
+ fs.delete(tmpIndexPath, false);
+ fs.delete(realIndexPath, false);
+
+ return fs.create(tmpIndexPath, false);
+ }
+}
View
27 src/java/com/hadoop/mapreduce/LzoSplitInputFormat.java
@@ -0,0 +1,27 @@
+package com.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+public class LzoSplitInputFormat extends FileInputFormat<Path, LongWritable> {
+
+ @Override
+ public RecordReader<Path, LongWritable> createRecordReader(InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ return new LzoSplitRecordReader();
+ }
+
+ @Override
+ protected boolean isSplitable(JobContext context, Path filename) {
+ // Force the files to be unsplittable, because indexing requires seeing all the
+ // compressed blocks in succession.
+ return false;
+ }
+}
View
123 src/java/com/hadoop/mapreduce/LzoSplitRecordReader.java
@@ -0,0 +1,123 @@
+package com.hadoop.mapreduce;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import com.hadoop.compression.lzo.LzopCodec;
+import com.hadoop.compression.lzo.LzopDecompressor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+public class LzoSplitRecordReader extends RecordReader<Path, LongWritable> {
+ private static final Log LOG = LogFactory.getLog(LzoSplitRecordReader.class);
+
+ private final int LOG_EVERY_N_BLOCKS = 1000;
+
+ private final LongWritable curValue = new LongWritable(-1);
+ private FSDataInputStream rawInputStream;
+ private TaskAttemptContext context;
+
+ private int numBlocksRead = 0;
+ private int numChecksums = -1;
+ private long totalFileSize = 0;
+ private Path lzoFile;
+
+ @Override
+ public void initialize(InputSplit genericSplit, TaskAttemptContext taskAttemptContext) throws IOException {
+ context = taskAttemptContext;
+ FileSplit fileSplit = (FileSplit)genericSplit;
+ lzoFile = fileSplit.getPath();
+ // The LzoSplitInputFormat is not splittable, so the split length is the whole file.
+ totalFileSize = fileSplit.getLength();
+
+ // Jump through some hoops to create the lzo codec.
+ Configuration conf = context.getConfiguration();
+ CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+ CompressionCodec codec = factory.getCodec(lzoFile);
+ ((Configurable)codec).setConf(conf);
+
+ LzopDecompressor lzopDecompressor = (LzopDecompressor)codec.createDecompressor();
+ FileSystem fs = lzoFile.getFileSystem(conf);
+ rawInputStream = fs.open(lzoFile);
+
+ // Creating the LzopInputStream here just reads the lzo header for us, nothing more.
+ // We do the rest of our input off of the raw stream is.
+ codec.createInputStream(rawInputStream, lzopDecompressor);
+
+ // This must be called AFTER createInputStream is called, because createInputStream
+ // is what reads the header, which has the checksum information. Otherwise getChecksumsCount
+ // erroneously returns zero, and all block offsets will be wrong.
+ numChecksums = lzopDecompressor.getChecksumsCount();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ int uncompressedBlockSize = rawInputStream.readInt();
+ if (uncompressedBlockSize == 0) {
+ // An uncompressed block size of zero means end of file.
+ return false;
+ } else if (uncompressedBlockSize < 0) {
+ throw new EOFException("Could not read uncompressed block size at position " +
+ rawInputStream.getPos() + " in file " + lzoFile);
+ }
+
+ int compressedBlockSize = rawInputStream.readInt();
+ if (compressedBlockSize <= 0) {
+ throw new EOFException("Could not read compressed block size at position " +
+ rawInputStream.getPos() + " in file " + lzoFile);
+ }
+
+ // Get the current position. Since we've read two ints, the current block started 8 bytes ago.
+ long pos = rawInputStream.getPos();
+ curValue.set(pos - 8);
+ // Seek beyond the checksums and beyond the block data to the beginning of the next block.
+ rawInputStream.seek(pos + compressedBlockSize + 4 * numChecksums);
+ ++numBlocksRead;
+
+ // Log some progress every so often.
+ if (numBlocksRead % LOG_EVERY_N_BLOCKS == 0) {
+ LOG.info("Reading block " + numBlocksRead + " at pos " + pos + " of " + totalFileSize + ". Read is " +
+ (100.0 * getProgress()) + "% done. ");
+ }
+
+ return true;
+ }
+
+ @Override
+ public Path getCurrentKey() {
+ return lzoFile;
+ }
+
+ @Override
+ public LongWritable getCurrentValue() {
+ return curValue;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ if (totalFileSize == 0) {
+ return 0.0f;
+ } else {
+ return (float)rawInputStream.getPos() / totalFileSize;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ LOG.info("Closing input stream after reading " + numBlocksRead + " blocks from " + lzoFile);
+ rawInputStream.close();
+ }
+}
+

0 comments on commit f706102

Please sign in to comment.