Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge remote branch 'kw/master'

  • Loading branch information...
commit ceb643f15057b0b5c2664233db33062bf49cb8c6 2 parents 47d4714 + 3d19b14
@toddlipcon toddlipcon authored
View
1  .archive-version
@@ -0,0 +1 @@
+$Format:%H$
View
1  .gitattributes
@@ -0,0 +1 @@
+.archive-version export-subst
View
2  .gitignore
@@ -15,11 +15,13 @@
*~
.classpath
+.idea
.project
.settings
.svn
build/
bin/
+out/
*.ipr
*.iml
*.iws
View
2  README.md
@@ -5,7 +5,7 @@ Hadoop-LZO is a project to bring splittable LZO compression to Hadoop. LZO is a
### Origins
-This project builds off the great work done at [code.google.com/p/hadoop-gpl-compression](code.google.com/p/hadoop-gpl-compression). As of issue 41, the differences in this codebase are the following.
+This project builds off the great work done at [http://code.google.com/p/hadoop-gpl-compression](http://code.google.com/p/hadoop-gpl-compression). As of issue 41, the differences in this codebase are the following.
- it fixes a few bugs in hadoop-gpl-compression -- notably, it allows the decompressor to read small or uncompressable lzo files, and also fixes the compressor to follow the lzo standard when compressing small or uncompressible chunks. it also fixes a number of inconsistenly caught and thrown exception cases that can occur when the lzo writer gets killed mid-stream, plus some other smaller issues (see commit log).
- it adds the ability to work with Hadoop streaming via the com.apache.hadoop.mapred.DeprecatedLzoTextInputFormat class
View
2  build.xml
@@ -28,7 +28,7 @@
<property name="Name" value="Hadoop GPL Compression"/>
<property name="name" value="hadoop-lzo"/>
- <property name="version" value="0.4.10"/>
+ <property name="version" value="0.4.13"/>
<property name="final.name" value="${name}-${version}"/>
<property name="year" value="2008"/>
View
28 src/get_build_revision.sh
@@ -1,7 +1,27 @@
-#!/bin/sh
+#!/bin/bash
-if [ -z "$BUILD_REVISION" ]; then
- git rev-parse HEAD
-else
+# Allow user to specify - this is done by packages
+if [ -n "$BUILD_REVISION" ]; then
echo $BUILD_REVISION
+ exit
fi
+
+# If we're in git, use that
+BUILD_REVISION=$(git rev-parse HEAD 2>/dev/null)
+if [ -n "$BUILD_REVISION" ]; then
+ echo $BUILD_REVISION
+ exit
+fi
+
+# Otherwise try to use the .archive-version file which
+# is filled in by git exports (eg github downloads)
+BIN=$(dirname ${BASH_SOURCE:-0})
+BUILD_REVISION=$(cat $BIN/../.archive-version 2>/dev/null)
+
+if [[ "$BUILD_REVISION" != *Format* ]]; then
+ echo "$BUILD_REVISION"
+ exit
+fi
+
+# Give up
+echo "Unknown build revision"
View
31 src/java/com/hadoop/compression/lzo/LzoCodec.java
@@ -46,17 +46,21 @@
public static final String LZO_COMPRESSOR_KEY = "io.compression.codec.lzo.compressor";
public static final String LZO_DECOMPRESSOR_KEY = "io.compression.codec.lzo.decompressor";
+ public static final String LZO_COMPRESSION_LEVEL_KEY = "io.compression.codec.lzo.compression.level";
public static final String LZO_BUFFER_SIZE_KEY = "io.compression.codec.lzo.buffersize";
public static final int DEFAULT_LZO_BUFFER_SIZE = 256 * 1024;
public static final int MAX_BLOCK_SIZE = 64*1024*1024;
+ public static final int UNDEFINED_COMPRESSION_LEVEL = -999; // Constant from LzoCompressor.c
private Configuration conf;
+ @Override
public void setConf(Configuration conf) {
this.conf = conf;
}
+ @Override
public Configuration getConf() {
return conf;
}
@@ -86,6 +90,7 @@ public Configuration getConf() {
* else <code>false</code>
*/
public static boolean isNativeLzoLoaded(Configuration conf) {
+ assert conf != null : "Configuration cannot be null!";
return nativeLzoLoaded && conf.getBoolean("hadoop.native.lib", true);
}
@@ -100,10 +105,12 @@ public static String getRevisionHash() {
}
}
+ @Override
public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
return createOutputStream(out, createCompressor());
}
+ @Override
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor) throws IOException {
// Ensure native-lzo library is loaded & initialized
@@ -141,6 +148,7 @@ public CompressionOutputStream createOutputStream(OutputStream out,
compressionOverhead);
}
+ @Override
public Class<? extends Compressor> getCompressorType() {
// Ensure native-lzo library is loaded & initialized
if (!isNativeLzoLoaded(conf)) {
@@ -149,8 +157,10 @@ public CompressionOutputStream createOutputStream(OutputStream out,
return LzoCompressor.class;
}
+ @Override
public Compressor createCompressor() {
// Ensure native-lzo library is loaded & initialized
+ assert conf != null : "Configuration cannot be null! You must call setConf() before creating a compressor.";
if (!isNativeLzoLoaded(conf)) {
throw new RuntimeException("native-lzo library not available");
}
@@ -158,11 +168,13 @@ public Compressor createCompressor() {
return new LzoCompressor(conf);
}
+ @Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException {
return createInputStream(in, createDecompressor());
}
+ @Override
public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor)
throws IOException {
@@ -174,6 +186,7 @@ public CompressionInputStream createInputStream(InputStream in,
conf.getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE));
}
+ @Override
public Class<? extends Decompressor> getDecompressorType() {
// Ensure native-lzo library is loaded & initialized
if (!isNativeLzoLoaded(conf)) {
@@ -182,6 +195,7 @@ public CompressionInputStream createInputStream(InputStream in,
return LzoDecompressor.class;
}
+ @Override
public Decompressor createDecompressor() {
// Ensure native-lzo library is loaded & initialized
if (!isNativeLzoLoaded(conf)) {
@@ -197,37 +211,54 @@ public Decompressor createDecompressor() {
* Get the default filename extension for this kind of compression.
* @return the extension including the '.'
*/
+ @Override
public String getDefaultExtension() {
return ".lzo_deflate";
}
static LzoCompressor.CompressionStrategy getCompressionStrategy(Configuration conf) {
+ assert conf != null : "Configuration cannot be null!";
return LzoCompressor.CompressionStrategy.valueOf(
conf.get(LZO_COMPRESSOR_KEY,
LzoCompressor.CompressionStrategy.LZO1X_1.name()));
}
static LzoDecompressor.CompressionStrategy getDecompressionStrategy(Configuration conf) {
+ assert conf != null : "Configuration cannot be null!";
return LzoDecompressor.CompressionStrategy.valueOf(
conf.get(LZO_DECOMPRESSOR_KEY,
LzoDecompressor.CompressionStrategy.LZO1X.name()));
}
+ static int getCompressionLevel(Configuration conf) {
+ assert conf != null : "Configuration cannot be null!";
+ return conf.getInt(LZO_COMPRESSION_LEVEL_KEY, UNDEFINED_COMPRESSION_LEVEL);
+ }
+
static int getBufferSize(Configuration conf) {
+ assert conf != null : "Configuration cannot be null!";
return conf.getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE);
}
public static void setCompressionStrategy(Configuration conf,
LzoCompressor.CompressionStrategy strategy) {
+ assert conf != null : "Configuration cannot be null!";
conf.set(LZO_COMPRESSOR_KEY, strategy.name());
}
public static void setDecompressionStrategy(Configuration conf,
LzoDecompressor.CompressionStrategy strategy) {
+ assert conf != null : "Configuration cannot be null!";
conf.set(LZO_DECOMPRESSOR_KEY, strategy.name());
}
+ public static void setCompressionLevel(Configuration conf, int compressionLevel) {
+ assert conf != null : "Configuration cannot be null!";
+ conf.setInt(LZO_COMPRESSION_LEVEL_KEY, compressionLevel);
+ }
+
public static void setBufferSize(Configuration conf, int bufferSize) {
+ assert conf != null : "Configuration cannot be null!";
conf.setInt(LZO_BUFFER_SIZE_KEY, bufferSize);
}
View
9 src/java/com/hadoop/compression/lzo/LzoCompressor.java
@@ -57,6 +57,7 @@
private int workingMemoryBufLen = 0; // The length of 'working memory' buf.
@SuppressWarnings("unused")
private ByteBuffer workingMemoryBuf; // The 'working memory' for lzo.
+ private int lzoCompressionLevel;
/**
* Used when the user doesn't specify a configuration. We cache a single
@@ -209,9 +210,10 @@ public void reinit(Configuration conf) {
conf = defaultConfiguration;
}
LzoCompressor.CompressionStrategy strategy = LzoCodec.getCompressionStrategy(conf);
+ int compressionLevel = LzoCodec.getCompressionLevel(conf);
int bufferSize = LzoCodec.getBufferSize(conf);
- init(strategy, bufferSize);
+ init(strategy, compressionLevel, bufferSize);
}
/**
@@ -221,7 +223,7 @@ public void reinit(Configuration conf) {
* @param directBufferSize size of the direct buffer to be used.
*/
public LzoCompressor(CompressionStrategy strategy, int directBufferSize) {
- init(strategy, directBufferSize);
+ init(strategy, LzoCodec.UNDEFINED_COMPRESSION_LEVEL, directBufferSize);
}
/**
@@ -254,8 +256,9 @@ private ByteBuffer realloc(ByteBuffer buf, int newSize) {
return ByteBuffer.allocateDirect(newSize);
}
- private void init(CompressionStrategy strategy, int directBufferSize) {
+ private void init(CompressionStrategy strategy, int compressionLevel, int directBufferSize) {
this.strategy = strategy;
+ this.lzoCompressionLevel = compressionLevel;
this.directBufferSize = directBufferSize;
uncompressedDirectBuf = realloc(uncompressedDirectBuf, directBufferSize);
View
75 src/java/com/hadoop/compression/lzo/LzoInputFormatCommon.java
@@ -0,0 +1,75 @@
+/*
+ * This file is part of Hadoop-Gpl-Compression.
+ *
+ * Hadoop-Gpl-Compression is free software: you can redistribute it
+ * and/or modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Hadoop-Gpl-Compression is distributed in the hope that it will be
+ * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Hadoop-Gpl-Compression. If not, see
+ * <http://www.gnu.org/licenses/>.
+ */
+
+package com.hadoop.compression.lzo;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.hadoop.compression.lzo.LzoIndexer;
+import com.hadoop.compression.lzo.LzopCodec;
+
+public class LzoInputFormatCommon {
+ /**
+ * The boolean property <code>lzo.text.input.format.ignore.nonlzo</code> tells
+ * the LZO text input format whether it should silently ignore non-LZO input
+ * files. When the property is true (which is the default), non-LZO files will
+ * be silently ignored. When the property is false, non-LZO files will be
+ * processed using the standard TextInputFormat.
+ */
+ public static final String IGNORE_NONLZO_KEY = "lzo.text.input.format.ignore.nonlzo";
+ /**
+ * Default value of the <code>lzo.text.input.format.ignore.nonlzo</code>
+ * property.
+ */
+ public static final boolean DEFAULT_IGNORE_NONLZO = true;
+ /**
+ * Full extension for LZO index files (".lzo.index").
+ */
+ public static final String FULL_LZO_INDEX_SUFFIX =
+ LzopCodec.DEFAULT_LZO_EXTENSION + LzoIndex.LZO_INDEX_SUFFIX;
+
+ /**
+ * @param conf the Configuration object
+ * @return the value of the <code>lzo.text.input.format.ignore.nonlzo</code>
+ * property in <code>conf</code>, or <code>DEFAULT_IGNORE_NONLZO</code>
+ * if the property is not set.
+ */
+ public static boolean getIgnoreNonLzoProperty(Configuration conf) {
+ return conf.getBoolean(IGNORE_NONLZO_KEY, DEFAULT_IGNORE_NONLZO);
+ }
+
+ /**
+ * Checks if the given filename ends in ".lzo".
+ *
+ * @param filename filename to check.
+ * @return true if the filename ends in ".lzo"
+ */
+ public static boolean isLzoFile(String filename) {
+ return filename.endsWith(LzopCodec.DEFAULT_LZO_EXTENSION);
+ }
+
+ /**
+ * Checks if the given filename ends in ".lzo.index".
+ *
+ * @param filename filename to check.
+ * @return true if the filename ends in ".lzo.index"
+ */
+ public static boolean isLzoIndexFile(String filename) {
+ return filename.endsWith(FULL_LZO_INDEX_SUFFIX);
+ }
+}
View
17 src/java/com/hadoop/compression/lzo/LzopCodec.java
@@ -18,6 +18,7 @@
package com.hadoop.compression.lzo;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -42,22 +43,34 @@
public static final int LZOP_VERSION = 0x1010;
/** Latest verion of lzop this should be compatible with */
public static final int LZOP_COMPAT_VERSION = 0x0940;
+ public static final String DEFAULT_LZO_EXTENSION = ".lzo";
@Override
public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
return createOutputStream(out, createCompressor());
}
+ public CompressionOutputStream createIndexedOutputStream(OutputStream out,
+ DataOutputStream indexOut)
+ throws IOException {
+ return createIndexedOutputStream(out, indexOut, createCompressor());
+ }
+
@Override
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor) throws IOException {
+ return createIndexedOutputStream(out, null, compressor);
+ }
+
+ public CompressionOutputStream createIndexedOutputStream(OutputStream out,
+ DataOutputStream indexOut, Compressor compressor) throws IOException {
if (!isNativeLzoLoaded(getConf())) {
throw new RuntimeException("native-lzo library not available");
}
LzoCompressor.CompressionStrategy strategy = LzoCompressor.CompressionStrategy.valueOf(
getConf().get(LZO_COMPRESSOR_KEY, LzoCompressor.CompressionStrategy.LZO1X_1.name()));
int bufferSize = getConf().getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE);
- return new LzopOutputStream(out, compressor, bufferSize, strategy);
+ return new LzopOutputStream(out, indexOut, compressor, bufferSize, strategy);
}
@Override
@@ -95,6 +108,6 @@ public Decompressor createDecompressor() {
@Override
public String getDefaultExtension() {
- return ".lzo";
+ return DEFAULT_LZO_EXTENSION;
}
}
View
50 src/java/com/hadoop/compression/lzo/LzopOutputStream.java
@@ -18,6 +18,8 @@
package com.hadoop.compression.lzo;
+import java.io.DataOutputStream;
+import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.Adler32;
@@ -26,13 +28,11 @@
import org.apache.hadoop.io.compress.CompressorStream;
import org.apache.hadoop.io.compress.Compressor;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
public class LzopOutputStream extends CompressorStream {
- private static final Log LOG = LogFactory.getLog(LzopOutputStream.class);
final int MAX_INPUT_SIZE;
+ protected DataOutputStream indexOut;
+ private CountingOutputStream cout;
/**
* Write an lzop-compatible header to the OutputStream provided.
@@ -79,13 +79,22 @@ protected static void writeLzopHeader(OutputStream out,
public LzopOutputStream(OutputStream out, Compressor compressor,
int bufferSize, LzoCompressor.CompressionStrategy strategy)
throws IOException {
- super(out, compressor, bufferSize);
+ this(out, null, compressor, bufferSize, strategy);
+ }
+ public LzopOutputStream(OutputStream out, DataOutputStream indexOut,
+ Compressor compressor, int bufferSize,
+ LzoCompressor.CompressionStrategy strategy)
+ throws IOException {
+ super(new CountingOutputStream(out), compressor, bufferSize);
+
+ this.cout = (CountingOutputStream) this.out;
+ this.indexOut = indexOut;
int overhead = strategy.name().contains("LZO1") ?
(bufferSize >> 4) + 64 + 3 : (bufferSize >> 3) + 128 + 3;
MAX_INPUT_SIZE = bufferSize - overhead;
- writeLzopHeader(out, strategy);
+ writeLzopHeader(this.out, strategy);
}
/**
@@ -97,6 +106,9 @@ public void close() throws IOException {
finish();
out.write(new byte[]{ 0, 0, 0, 0 });
out.close();
+ if (indexOut != null) {
+ indexOut.close();
+ }
closed = true;
}
}
@@ -171,13 +183,18 @@ public void finish() throws IOException {
protected void compress() throws IOException {
int len = compressor.compress(buffer, 0, buffer.length);
if (len > 0) {
+ // new lzo block. write current position to index file.
+ if (indexOut != null) {
+ indexOut.writeLong(cout.bytesWritten);
+ }
+
rawWriteInt((int)compressor.getBytesRead());
// If the compressed buffer is actually larger than the uncompressed buffer,
// the LZO specification says that we should write the uncompressed bytes rather
// than the compressed bytes. The decompressor understands this because both sizes
// get written to the stream.
- if (compressor.getBytesRead() < compressor.getBytesWritten()) {
+ if (compressor.getBytesRead() <= compressor.getBytesWritten()) {
// Compression actually increased the size of the buffer, so write the uncompressed bytes.
byte[] uncompressed = ((LzoCompressor)compressor).uncompressedBytes();
rawWriteInt(uncompressed.length);
@@ -196,4 +213,23 @@ private void rawWriteInt(int v) throws IOException {
out.write((v >>> 8) & 0xFF);
out.write((v >>> 0) & 0xFF);
}
+
+ /* keeps count of number of bytes written. */
+ private static class CountingOutputStream extends FilterOutputStream {
+ public CountingOutputStream(OutputStream out) {
+ super(out);
+ }
+
+ long bytesWritten = 0;
+
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ bytesWritten += len;
+ }
+
+ public void write(int b) throws IOException {
+ out.write(b);
+ bytesWritten++;
+ }
+ }
}
View
57 src/java/com/hadoop/mapred/DeprecatedLzoTextInputFormat.java
@@ -31,14 +31,16 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
import com.hadoop.compression.lzo.LzoIndex;
+import com.hadoop.compression.lzo.LzoInputFormatCommon;
import com.hadoop.compression.lzo.LzopCodec;
/**
@@ -50,27 +52,40 @@
* com.hadoop.mapred.DeprecatedLzoTextInputFormat, not
* com.hadoop.mapreduce.LzoTextInputFormat. The classes attempt to be alike in
* every other respect.
+ *
+ * Note that to use this input format properly with hadoop-streaming, you should
+ * also set the property <code>stream.map.input.ignoreKey=true</code>. That will
+ * replicate the behavior of the default TextInputFormat by stripping off the byte
+ * offset keys from the input lines that get piped to the mapper process.
+ *
+ * See {@link LzoInputFormatCommon} for a description of the boolean property
+ * <code>lzo.text.input.format.ignore.nonlzo</code> and how it affects the
+ * behavior of this input format.
*/
@SuppressWarnings("deprecation")
-public class DeprecatedLzoTextInputFormat extends FileInputFormat<LongWritable, Text> {
- public static final String LZO_INDEX_SUFFIX = ".index";
+public class DeprecatedLzoTextInputFormat extends TextInputFormat {
private final Map<Path, LzoIndex> indexes = new HashMap<Path, LzoIndex>();
@Override
protected FileStatus[] listStatus(JobConf conf) throws IOException {
List<FileStatus> files = new ArrayList<FileStatus>(Arrays.asList(super.listStatus(conf)));
- String fileExtension = new LzopCodec().getDefaultExtension();
+ boolean ignoreNonLzo = LzoInputFormatCommon.getIgnoreNonLzoProperty(conf);
Iterator<FileStatus> it = files.iterator();
while (it.hasNext()) {
FileStatus fileStatus = it.next();
Path file = fileStatus.getPath();
- if (!file.toString().endsWith(fileExtension)) {
- // Get rid of non-LZO files.
- it.remove();
+ if (!LzoInputFormatCommon.isLzoFile(file.toString())) {
+ // Get rid of non-LZO files, unless the conf explicitly tells us to
+ // keep them.
+ // However, always skip over files that end with ".lzo.index", since
+ // they are not part of the input.
+ if (ignoreNonLzo || LzoInputFormatCommon.isLzoIndexFile(file.toString())) {
+ it.remove();
+ }
} else {
FileSystem fs = file.getFileSystem(conf);
LzoIndex index = LzoIndex.readIndex(fs, file);
@@ -83,8 +98,13 @@
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
- LzoIndex index = indexes.get(filename);
- return !index.isEmpty();
+ if (LzoInputFormatCommon.isLzoFile(filename.toString())) {
+ LzoIndex index = indexes.get(filename);
+ return !index.isEmpty();
+ } else {
+ // Delegate non-LZO files to the TextInputFormat base class.
+ return super.isSplitable(fs, filename);
+ }
}
@Override
@@ -97,6 +117,14 @@ protected boolean isSplitable(FileSystem fs, Path filename) {
for (FileSplit fileSplit: splits) {
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
+
+ if (!LzoInputFormatCommon.isLzoFile(file.toString())) {
+ // non-LZO file, keep the input split as is.
+ result.add(fileSplit);
+ continue;
+ }
+
+ // LZO file, try to split if the .index file was found
LzoIndex index = indexes.get(file);
if (index == null) {
throw new IOException("Index not found for " + file);
@@ -124,8 +152,13 @@ protected boolean isSplitable(FileSystem fs, Path filename) {
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
JobConf conf, Reporter reporter) throws IOException {
- reporter.setStatus(split.toString());
- return new DeprecatedLzoLineRecordReader(conf, (FileSplit)split);
+ FileSplit fileSplit = (FileSplit) split;
+ if (LzoInputFormatCommon.isLzoFile(fileSplit.getPath().toString())) {
+ reporter.setStatus(split.toString());
+ return new DeprecatedLzoLineRecordReader(conf, (FileSplit)split);
+ } else {
+ // delegate non-LZO files to the TextInputFormat base class.
+ return super.getRecordReader(split, conf, reporter);
+ }
}
-
}
View
52 src/java/com/hadoop/mapreduce/LzoTextInputFormat.java
@@ -36,36 +36,45 @@
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import com.hadoop.compression.lzo.LzoIndex;
+import com.hadoop.compression.lzo.LzoInputFormatCommon;
import com.hadoop.compression.lzo.LzopCodec;
/**
* An {@link InputFormat} for lzop compressed text files. Files are broken into
* lines. Either linefeed or carriage-return are used to signal end of line.
* Keys are the position in the file, and values are the line of text.
+ *
+ * See {@link LzoInputFormatCommon} for a description of the boolean property
+ * <code>lzo.text.input.format.ignore.nonlzo</code> and how it affects the
+ * behavior of this input format.
*/
-public class LzoTextInputFormat extends FileInputFormat<LongWritable, Text> {
-
+public class LzoTextInputFormat extends TextInputFormat {
private final Map<Path, LzoIndex> indexes = new HashMap<Path, LzoIndex>();
@Override
protected List<FileStatus> listStatus(JobContext job) throws IOException {
List<FileStatus> files = super.listStatus(job);
- String fileExtension = new LzopCodec().getDefaultExtension();
Configuration conf = job.getConfiguration();
+ boolean ignoreNonLzo = LzoInputFormatCommon.getIgnoreNonLzoProperty(conf);
for (Iterator<FileStatus> iterator = files.iterator(); iterator.hasNext();) {
FileStatus fileStatus = iterator.next();
Path file = fileStatus.getPath();
FileSystem fs = file.getFileSystem(conf);
- if (!file.toString().endsWith(fileExtension)) {
- //get rid of non lzo files
- iterator.remove();
+ if (!LzoInputFormatCommon.isLzoFile(file.toString())) {
+ // Get rid of non-LZO files, unless the conf explicitly tells us to
+ // keep them.
+ // However, always skip over files that end with ".lzo.index", since
+ // they are not part of the input.
+ if (ignoreNonLzo || LzoInputFormatCommon.isLzoIndexFile(file.toString())) {
+ iterator.remove();
+ }
} else {
//read the index file
LzoIndex index = LzoIndex.readIndex(fs, file);
@@ -78,8 +87,13 @@
@Override
protected boolean isSplitable(JobContext context, Path filename) {
- LzoIndex index = indexes.get(filename);
- return !index.isEmpty();
+ if (LzoInputFormatCommon.isLzoFile(filename.toString())) {
+ LzoIndex index = indexes.get(filename);
+ return !index.isEmpty();
+ } else {
+ // Delegate non-LZO files to the TextInputFormat base class.
+ return super.isSplitable(context, filename);
+ }
}
@Override
@@ -92,10 +106,17 @@ protected boolean isSplitable(JobContext context, Path filename) {
List<InputSplit> result = new ArrayList<InputSplit>();
for (InputSplit genericSplit : splits) {
- // load the index
FileSplit fileSplit = (FileSplit) genericSplit;
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
+
+ if (!LzoInputFormatCommon.isLzoFile(file.toString())) {
+ // non-LZO file, keep the input split as is.
+ result.add(fileSplit);
+ continue;
+ }
+
+ // LZO file, try to split if the .index file was found
LzoIndex index = indexes.get(file);
if (index == null) {
throw new IOException("Index not found for " + file);
@@ -123,8 +144,13 @@ protected boolean isSplitable(JobContext context, Path filename) {
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
- TaskAttemptContext taskAttempt) throws IOException, InterruptedException {
-
- return new LzoLineRecordReader();
+ TaskAttemptContext taskAttempt) {
+ FileSplit fileSplit = (FileSplit) split;
+ if (LzoInputFormatCommon.isLzoFile(fileSplit.getPath().toString())) {
+ return new LzoLineRecordReader();
+ } else {
+ // Delegate non-LZO files to the TextInputFormat base class.
+ return super.createRecordReader(split, taskAttempt);
+ }
}
}
View
19 src/native/impl/lzo/LzoCompressor.c
@@ -35,6 +35,7 @@ typedef struct {
#define UNDEFINED_COMPRESSION_LEVEL -999
+// Default compression level used when user supplies no value.
static lzo_compressor lzo_compressors[] = {
/** lzo1 compressors */
/* 0 */ {"lzo1_compress", LZO1_MEM_COMPRESS, UNDEFINED_COMPRESSION_LEVEL},
@@ -112,6 +113,7 @@ static jfieldID LzoCompressor_uncompressedDirectBufLen;
static jfieldID LzoCompressor_compressedDirectBuf;
static jfieldID LzoCompressor_directBufferSize;
static jfieldID LzoCompressor_lzoCompressor;
+static jfieldID LzoCompressor_lzoCompressionLevel;
static jfieldID LzoCompressor_workingMemoryBufLen;
static jfieldID LzoCompressor_workingMemoryBuf;
@@ -144,6 +146,8 @@ Java_com_hadoop_compression_lzo_LzoCompressor_initIDs(
"directBufferSize", "I");
LzoCompressor_lzoCompressor = (*env)->GetFieldID(env, class,
"lzoCompressor", "J");
+ LzoCompressor_lzoCompressionLevel = (*env)->GetFieldID(env, class,
+ "lzoCompressionLevel", "I");
LzoCompressor_workingMemoryBufLen = (*env)->GetFieldID(env, class,
"workingMemoryBufLen", "I");
LzoCompressor_workingMemoryBuf = (*env)->GetFieldID(env, class,
@@ -189,7 +193,6 @@ Java_com_hadoop_compression_lzo_LzoCompressor_init(
// Save the compressor-function into LzoCompressor_lzoCompressor
(*env)->SetIntField(env, this, LzoCompressor_workingMemoryBufLen,
lzo_compressors[compressor].wrkmem);
-
return;
}
@@ -218,6 +221,13 @@ Java_com_hadoop_compression_lzo_LzoCompressor_compressBytesDirect(
lzo_uint compressed_direct_buf_len = (*env)->GetIntField(env, this,
LzoCompressor_directBufferSize);
+ // Prefer the user defined compression level.
+ int compression_level = (*env)->GetIntField(env, this,
+ LzoCompressor_lzoCompressionLevel);
+ if (UNDEFINED_COMPRESSION_LEVEL == compression_level) {
+ compression_level = lzo_compressors[compressor].compression_level;
+ }
+
jobject working_memory_buf = (*env)->GetObjectField(env, this,
LzoCompressor_workingMemoryBuf);
@@ -256,12 +266,17 @@ Java_com_hadoop_compression_lzo_LzoCompressor_compressBytesDirect(
// Compress
lzo_uint no_compressed_bytes = compressed_direct_buf_len;
int rv = 0;
- int compression_level = lzo_compressors[compressor].compression_level;
if (compression_level == UNDEFINED_COMPRESSION_LEVEL) {
lzo_compress_t fptr = (lzo_compress_t) FUNC_PTR(lzo_compressor_funcptr);
rv = fptr(uncompressed_bytes, uncompressed_direct_buf_len,
compressed_bytes, &no_compressed_bytes,
workmem);
+ } else if (strstr(lzo_compressor_function, "lzo1x_999")
+ || strstr(lzo_compressor_function, "lzo1y_999")) {
+ // Compression levels are only available in these codecs.
+ rv = lzo1x_999_compress_level(uncompressed_bytes, uncompressed_direct_buf_len,
+ compressed_bytes, &no_compressed_bytes,
+ workmem, NULL, 0, 0, compression_level);
} else {
lzo_compress2_t fptr = (lzo_compress2_t) FUNC_PTR(lzo_compressor_funcptr);
rv = fptr(uncompressed_bytes, uncompressed_direct_buf_len,
View
49 src/test/com/hadoop/compression/lzo/TestLzopOutputStream.java
@@ -20,11 +20,11 @@
import java.io.BufferedWriter;
import java.io.BufferedReader;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
-import java.io.OutputStreamWriter;
import java.io.InputStreamReader;
import java.security.NoSuchAlgorithmException;
@@ -32,6 +32,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
/**
* Test the LzoOutputFormat, make sure that it can write files of different sizes and read them back in
@@ -46,11 +49,16 @@
private final String bigFile = "100000.txt";
private final String mediumFile = "1000.txt";
private final String smallFile = "100.txt";
+ private final String issue20File = "issue20-lzop.txt";
+ private FileSystem localFs;
@Override
protected void setUp() throws Exception {
super.setUp();
inputDataPath = System.getProperty("test.build.data", "data");
+ Configuration conf = new Configuration();
+ conf.set("io.compression.codecs", LzopCodec.class.getName());
+ localFs = FileSystem.getLocal(conf).getRaw();
}
/**
@@ -85,6 +93,25 @@ public void testSmallFile() throws NoSuchAlgorithmException, IOException,
}
/**
+ * The LZO specification says that we should write the uncompressed bytes
+ * rather than the compressed bytes if the compressed buffer is actually
+ * larger ('&gt;') than the uncompressed buffer.
+ *
+ * To conform to the standard, this means we have to write the uncompressed
+ * bytes also when they have exactly the same size as the compressed bytes.
+ * (the '==' in '&lt;=').
+ *
+ * The input data of this test is known to compress to the same size as the
+ * uncompressed data. Hence we verify that we handle the boundary condition
+ * correctly.
+ *
+ */
+ public void testIssue20File() throws NoSuchAlgorithmException, IOException,
+ InterruptedException {
+ runTest(issue20File);
+ }
+
+ /**
* Test that reading an lzo-compressed file produces the same lines as reading the equivalent
* flat file. The test opens both the compressed and flat file, successively reading each
* line by line and comparing.
@@ -101,6 +128,7 @@ private void runTest(String filename) throws IOException,
File textFile = new File(inputDataPath, filename);
File lzoFile = new File(inputDataPath, filename + new LzopCodec().getDefaultExtension());
File lzoOutFile = new File(inputDataPath, "output_" + filename + new LzopCodec().getDefaultExtension());
+ File lzoIndexFile = new File(lzoOutFile.getAbsolutePath() + LzoIndex.LZO_INDEX_SUFFIX);
if (lzoOutFile.exists()) {
lzoOutFile.delete();
}
@@ -116,7 +144,9 @@ private void runTest(String filename) throws IOException,
int lzoBufferSize = 256 * 1024;
LzoCompressor.CompressionStrategy strategy = LzoCompressor.CompressionStrategy.LZO1X_1;
LzoCompressor lzoCompressor = new LzoCompressor(strategy, lzoBufferSize);
- LzopOutputStream lzoOut = new LzopOutputStream(new FileOutputStream(lzoOutFile.getAbsolutePath()), lzoCompressor, lzoBufferSize, strategy);
+ LzopOutputStream lzoOut = new LzopOutputStream(new FileOutputStream(lzoOutFile),
+ new DataOutputStream(new FileOutputStream(lzoIndexFile)),
+ lzoCompressor, lzoBufferSize, strategy);
// Now read line by line and stream out..
String textLine;
@@ -158,5 +188,20 @@ private void runTest(String filename) throws IOException,
lzoBr.close();
textBr2.close();
+
+ // verify the index file:
+
+ Path lzoOutPath = new Path(lzoOutFile.getAbsolutePath());
+ LzoIndex lzoIndex = LzoIndex.readIndex(localFs, lzoOutPath);
+
+ // create offline index to compare.
+ assertTrue(lzoIndexFile.delete());
+ LzoIndex.createIndex(localFs, lzoOutPath);
+ LzoIndex expectedIndex = LzoIndex.readIndex(localFs, lzoOutPath);
+
+ assertEquals(lzoIndex.getNumberOfBlocks(), expectedIndex.getNumberOfBlocks());
+ for (int i=0; i<lzoIndex.getNumberOfBlocks(); i++) {
+ assertEquals(lzoIndex.getPosition(i), expectedIndex.getPosition(i));
+ }
}
}
View
120 src/test/com/hadoop/mapreduce/TestLzoTextInputFormat.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
@@ -31,6 +32,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -41,11 +43,13 @@
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.hadoop.compression.lzo.GPLNativeCodeLoader;
import com.hadoop.compression.lzo.LzoIndex;
+import com.hadoop.compression.lzo.LzoInputFormatCommon;
import com.hadoop.compression.lzo.LzopCodec;
/**
@@ -156,7 +160,7 @@ private void runTest(boolean testWithIndex, int charsToOutput) throws IOExceptio
localFs.delete(outputDir, true);
localFs.mkdirs(outputDir);
- Job job = new Job(conf);
+ Job job = new Job(conf);
TextOutputFormat.setCompressOutput(job, true);
TextOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
TextOutputFormat.setOutputPath(job, outputDir);
@@ -263,4 +267,118 @@ private int fillText(char[] chars, Random r, int charsMax, Text text) {
return stringLength;
}
+ public void testIgnoreNonLzoTrue()
+ throws IOException, InterruptedException, NoSuchAlgorithmException {
+ runTestIgnoreNonLzo(true, OUTPUT_BIG, true);
+ runTestIgnoreNonLzo(true, OUTPUT_SMALL, true);
+ runTestIgnoreNonLzo(false, OUTPUT_BIG, true);
+ runTestIgnoreNonLzo(false, OUTPUT_SMALL, true);
+ }
+
+ public void testIgnoreNonLzoFalse()
+ throws IOException, InterruptedException, NoSuchAlgorithmException {
+ runTestIgnoreNonLzo(true, OUTPUT_BIG, false);
+ runTestIgnoreNonLzo(true, OUTPUT_SMALL, false);
+ runTestIgnoreNonLzo(false, OUTPUT_BIG, false);
+ runTestIgnoreNonLzo(false, OUTPUT_SMALL, false);
+ }
+
+ private void runTestIgnoreNonLzo(boolean testWithIndex, int charsToOutput,
+ boolean ignoreNonLzo) throws IOException, InterruptedException, NoSuchAlgorithmException {
+ if (!GPLNativeCodeLoader.isNativeCodeLoaded()) {
+ LOG.warn("Cannot run this test without the native lzo libraries");
+ return;
+ }
+
+ Configuration conf = new Configuration();
+ conf.setLong("fs.local.block.size", charsToOutput / 2);
+ // reducing block size to force a split of the tiny file
+ conf.set("io.compression.codecs", LzopCodec.class.getName());
+ conf.setBoolean(LzoInputFormatCommon.IGNORE_NONLZO_KEY, ignoreNonLzo);
+
+ FileSystem localFs = FileSystem.getLocal(conf);
+ localFs.delete(outputDir, true);
+ localFs.mkdirs(outputDir);
+
+ // Create a non-LZO input file and put it alongside the LZO files.
+ Path nonLzoFile = new Path(outputDir, "part-r-00001");
+ localFs.createNewFile(nonLzoFile);
+ FSDataOutputStream outputStream = localFs.create(nonLzoFile);
+ outputStream.writeBytes("key1\tvalue1\nkey2\tvalue2\nkey3\tvalue3\n");
+ outputStream.close();
+
+ Job job = new Job(conf);
+ TextOutputFormat.setCompressOutput(job, true);
+ TextOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
+ TextOutputFormat.setOutputPath(job, outputDir);
+
+ TaskAttemptContext attemptContext = new TaskAttemptContext(job.getConfiguration(),
+ new TaskAttemptID("123", 0, false, 1, 2));
+
+ // create some input data
+ byte[] expectedMd5 = createTestInput(outputDir, localFs, attemptContext, charsToOutput);
+
+ if (testWithIndex) {
+ Path lzoFile = new Path(outputDir, lzoFileName);
+ LzoIndex.createIndex(localFs, lzoFile);
+ }
+
+ LzoTextInputFormat inputFormat = new LzoTextInputFormat();
+ TextInputFormat.setInputPaths(job, outputDir);
+
+ // verify we have the right number of input splits
+ List<InputSplit> is = inputFormat.getSplits(job);
+ int numExpectedLzoSplits = 0;
+ int numExpectedNonLzoSplits = 0;
+ int numActualLzoSplits = 0;
+ int numActualNonLzoSplits = 0;
+ if (!ignoreNonLzo) {
+ numExpectedNonLzoSplits += 1;
+ }
+ if (testWithIndex && OUTPUT_BIG == charsToOutput) {
+ numExpectedLzoSplits += 3;
+ } else {
+ numExpectedLzoSplits += 1;
+ }
+ assertEquals(numExpectedLzoSplits + numExpectedNonLzoSplits, is.size());
+
+ // Verify that we have the right number of each kind of split and the right
+ // data inside the splits.
+ List<String> expectedNonLzoLines = new ArrayList<String>();
+ if (!ignoreNonLzo) {
+ expectedNonLzoLines.add("key1\tvalue1");
+ expectedNonLzoLines.add("key2\tvalue2");
+ expectedNonLzoLines.add("key3\tvalue3");
+ }
+ List<String> actualNonLzoLines = new ArrayList<String>();
+ for (InputSplit inputSplit : is) {
+ FileSplit fileSplit = (FileSplit) inputSplit;
+ Path file = fileSplit.getPath();
+ RecordReader<LongWritable, Text> rr = inputFormat.createRecordReader(
+ inputSplit, attemptContext);
+ rr.initialize(inputSplit, attemptContext);
+ if (LzoInputFormatCommon.isLzoFile(file.toString())) {
+ numActualLzoSplits += 1;
+
+ while (rr.nextKeyValue()) {
+ Text value = rr.getCurrentValue();
+
+ md5.update(value.getBytes(), 0, value.getLength());
+ }
+
+ rr.close();
+ } else {
+ numActualNonLzoSplits += 1;
+
+ while (rr.nextKeyValue()) {
+ actualNonLzoLines.add(rr.getCurrentValue().toString());
+ }
+ }
+ }
+ localFs.close();
+ assertEquals(numExpectedLzoSplits, numActualLzoSplits);
+ assertEquals(numExpectedNonLzoSplits, numActualNonLzoSplits);
+ assertTrue(Arrays.equals(expectedMd5, md5.digest()));
+ assertEquals(expectedNonLzoLines, actualNonLzoLines);
+ }
}
View
6 src/test/data/issue20-lzop.txt
@@ -0,0 +1,6 @@
+0.5 74 25425
+0.9 200 25384
+0.95 203 4
+0.98 211 2
+0.99 219 3
+0.995 240 5
Please sign in to comment.
Something went wrong with that request. Please try again.