diff --git a/src/java/com/hadoop/compression/lzo/LzopCodec.java b/src/java/com/hadoop/compression/lzo/LzopCodec.java index fbbb58d1..0ff82256 100644 --- a/src/java/com/hadoop/compression/lzo/LzopCodec.java +++ b/src/java/com/hadoop/compression/lzo/LzopCodec.java @@ -18,6 +18,7 @@ package com.hadoop.compression.lzo; +import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -48,16 +49,27 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc return createOutputStream(out, createCompressor()); } + public CompressionOutputStream createIndexedOutputStream(OutputStream out, + DataOutputStream indexOut) + throws IOException { + return createIndexedOutputStream(out, indexOut, createCompressor()); + } + @Override public CompressionOutputStream createOutputStream(OutputStream out, Compressor compressor) throws IOException { + return createIndexedOutputStream(out, null, compressor); + } + + public CompressionOutputStream createIndexedOutputStream(OutputStream out, + DataOutputStream indexOut, Compressor compressor) throws IOException { if (!isNativeLzoLoaded(getConf())) { throw new RuntimeException("native-lzo library not available"); } LzoCompressor.CompressionStrategy strategy = LzoCompressor.CompressionStrategy.valueOf( getConf().get(LZO_COMPRESSOR_KEY, LzoCompressor.CompressionStrategy.LZO1X_1.name())); int bufferSize = getConf().getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE); - return new LzopOutputStream(out, compressor, bufferSize, strategy); + return new LzopOutputStream(out, indexOut, compressor, bufferSize, strategy); } @Override diff --git a/src/java/com/hadoop/compression/lzo/LzopOutputStream.java b/src/java/com/hadoop/compression/lzo/LzopOutputStream.java index e57aa3eb..789c7be9 100644 --- a/src/java/com/hadoop/compression/lzo/LzopOutputStream.java +++ b/src/java/com/hadoop/compression/lzo/LzopOutputStream.java @@ -18,6 +18,8 @@ package com.hadoop.compression.lzo; +import java.io.DataOutputStream; +import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.zip.Adler32; @@ -26,13 +28,11 @@ import org.apache.hadoop.io.compress.CompressorStream; import org.apache.hadoop.io.compress.Compressor; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - public class LzopOutputStream extends CompressorStream { - private static final Log LOG = LogFactory.getLog(LzopOutputStream.class); final int MAX_INPUT_SIZE; + protected DataOutputStream indexOut; + private CountingOutputStream cout; /** * Write an lzop-compatible header to the OutputStream provided. @@ -79,13 +79,22 @@ protected static void writeLzopHeader(OutputStream out, public LzopOutputStream(OutputStream out, Compressor compressor, int bufferSize, LzoCompressor.CompressionStrategy strategy) throws IOException { - super(out, compressor, bufferSize); + this(out, null, compressor, bufferSize, strategy); + } + public LzopOutputStream(OutputStream out, DataOutputStream indexOut, + Compressor compressor, int bufferSize, + LzoCompressor.CompressionStrategy strategy) + throws IOException { + super(new CountingOutputStream(out), compressor, bufferSize); + + this.cout = (CountingOutputStream) this.out; + this.indexOut = indexOut; int overhead = strategy.name().contains("LZO1") ? (bufferSize >> 4) + 64 + 3 : (bufferSize >> 3) + 128 + 3; MAX_INPUT_SIZE = bufferSize - overhead; - writeLzopHeader(out, strategy); + writeLzopHeader(this.out, strategy); } /** @@ -97,6 +106,9 @@ public void close() throws IOException { finish(); out.write(new byte[]{ 0, 0, 0, 0 }); out.close(); + if (indexOut != null) { + indexOut.close(); + } closed = true; } } @@ -171,6 +183,11 @@ public void finish() throws IOException { protected void compress() throws IOException { int len = compressor.compress(buffer, 0, buffer.length); if (len > 0) { + // new lzo block. write current position to index file. + if (indexOut != null) { + indexOut.writeLong(cout.bytesWritten); + } + rawWriteInt((int)compressor.getBytesRead()); // If the compressed buffer is actually larger than the uncompressed buffer, @@ -196,4 +213,23 @@ private void rawWriteInt(int v) throws IOException { out.write((v >>> 8) & 0xFF); out.write((v >>> 0) & 0xFF); } + + /* keeps count of number of bytes written. */ + private static class CountingOutputStream extends FilterOutputStream { + public CountingOutputStream(OutputStream out) { + super(out); + } + + long bytesWritten = 0; + + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + bytesWritten += len; + } + + public void write(int b) throws IOException { + out.write(b); + bytesWritten++; + } + } } diff --git a/src/test/com/hadoop/compression/lzo/TestLzopOutputStream.java b/src/test/com/hadoop/compression/lzo/TestLzopOutputStream.java index 98c084f1..9df8b815 100644 --- a/src/test/com/hadoop/compression/lzo/TestLzopOutputStream.java +++ b/src/test/com/hadoop/compression/lzo/TestLzopOutputStream.java @@ -20,11 +20,11 @@ import java.io.BufferedWriter; import java.io.BufferedReader; +import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.FileInputStream; import java.io.IOException; -import java.io.OutputStreamWriter; import java.io.InputStreamReader; import java.security.NoSuchAlgorithmException; @@ -32,6 +32,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; /** * Test the LzoOutputFormat, make sure that it can write files of different sizes and read them back in @@ -47,11 +50,15 @@ public class TestLzopOutputStream extends TestCase { private final String mediumFile = "1000.txt"; private final String smallFile = "100.txt"; private final String issue20File = "issue20-lzop.txt"; + private FileSystem localFs; @Override protected void setUp() throws Exception { super.setUp(); inputDataPath = System.getProperty("test.build.data", "data"); + Configuration conf = new Configuration(); + conf.set("io.compression.codecs", LzopCodec.class.getName()); + localFs = FileSystem.getLocal(conf).getRaw(); } /** @@ -121,6 +128,7 @@ private void runTest(String filename) throws IOException, File textFile = new File(inputDataPath, filename); File lzoFile = new File(inputDataPath, filename + new LzopCodec().getDefaultExtension()); File lzoOutFile = new File(inputDataPath, "output_" + filename + new LzopCodec().getDefaultExtension()); + File lzoIndexFile = new File(lzoOutFile.getAbsolutePath() + LzoIndex.LZO_INDEX_SUFFIX); if (lzoOutFile.exists()) { lzoOutFile.delete(); } @@ -136,7 +144,9 @@ private void runTest(String filename) throws IOException, int lzoBufferSize = 256 * 1024; LzoCompressor.CompressionStrategy strategy = LzoCompressor.CompressionStrategy.LZO1X_1; LzoCompressor lzoCompressor = new LzoCompressor(strategy, lzoBufferSize); - LzopOutputStream lzoOut = new LzopOutputStream(new FileOutputStream(lzoOutFile.getAbsolutePath()), lzoCompressor, lzoBufferSize, strategy); + LzopOutputStream lzoOut = new LzopOutputStream(new FileOutputStream(lzoOutFile), + new DataOutputStream(new FileOutputStream(lzoIndexFile)), + lzoCompressor, lzoBufferSize, strategy); // Now read line by line and stream out.. String textLine; @@ -178,5 +188,20 @@ private void runTest(String filename) throws IOException, lzoBr.close(); textBr2.close(); + + // verify the index file: + + Path lzoOutPath = new Path(lzoOutFile.getAbsolutePath()); + LzoIndex lzoIndex = LzoIndex.readIndex(localFs, lzoOutPath); + + // create offline index to compare. + assertTrue(lzoIndexFile.delete()); + LzoIndex.createIndex(localFs, lzoOutPath); + LzoIndex expectedIndex = LzoIndex.readIndex(localFs, lzoOutPath); + + assertEquals(lzoIndex.getNumberOfBlocks(), expectedIndex.getNumberOfBlocks()); + for (int i=0; i