From 185a6c62d2b9dc3aaa211d163f109224ae03ed5c Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Thu, 21 Jul 2011 13:44:05 -0700 Subject: [PATCH 1/4] Add an option write lzo index file along with lzo file. --- .../com/hadoop/compression/lzo/LzopCodec.java | 16 +++- .../lzo/LzopIndexedOutputStream.java | 77 +++++++++++++++++++ 2 files changed, 91 insertions(+), 2 deletions(-) create mode 100644 src/java/com/hadoop/compression/lzo/LzopIndexedOutputStream.java diff --git a/src/java/com/hadoop/compression/lzo/LzopCodec.java b/src/java/com/hadoop/compression/lzo/LzopCodec.java index fbbb58d1..66eb7b82 100644 --- a/src/java/com/hadoop/compression/lzo/LzopCodec.java +++ b/src/java/com/hadoop/compression/lzo/LzopCodec.java @@ -18,6 +18,7 @@ package com.hadoop.compression.lzo; +import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -48,16 +49,27 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc return createOutputStream(out, createCompressor()); } + public CompressionOutputStream createIndexedOutputStream(OutputStream out, + DataOutputStream indexOut) + throws IOException { + return createIndexedOutputStream(out, indexOut, createCompressor()); + } + @Override public CompressionOutputStream createOutputStream(OutputStream out, - Compressor compressor) throws IOException { + Compressor compressor) throws IOException { + return createIndexedOutputStream(out, null, compressor); + } + + public CompressionOutputStream createIndexedOutputStream(OutputStream out, + DataOutputStream indexOut, Compressor compressor) throws IOException { if (!isNativeLzoLoaded(getConf())) { throw new RuntimeException("native-lzo library not available"); } LzoCompressor.CompressionStrategy strategy = LzoCompressor.CompressionStrategy.valueOf( getConf().get(LZO_COMPRESSOR_KEY, LzoCompressor.CompressionStrategy.LZO1X_1.name())); int bufferSize = getConf().getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE); - return new LzopOutputStream(out, compressor, bufferSize, strategy); + return new LzopIndexedOutputStream(out, indexOut, compressor, bufferSize, strategy); } @Override diff --git a/src/java/com/hadoop/compression/lzo/LzopIndexedOutputStream.java b/src/java/com/hadoop/compression/lzo/LzopIndexedOutputStream.java new file mode 100644 index 00000000..9cb74268 --- /dev/null +++ b/src/java/com/hadoop/compression/lzo/LzopIndexedOutputStream.java @@ -0,0 +1,77 @@ +package com.hadoop.compression.lzo; + +import java.io.DataOutputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.hadoop.io.compress.Compressor; + +import com.hadoop.compression.lzo.LzoCompressor.CompressionStrategy; + +/** + * An {@link LzopOutputStream} that also writes lzop block indices + * to an output stream. + */ +public class LzopIndexedOutputStream extends LzopOutputStream { + + // This class can be easily folded into LzoOutputStream + + DataOutputStream indexOut; + CountingOutputStream cout; + + // keeps count of number of bytes written. + private static class CountingOutputStream extends FilterOutputStream { + public CountingOutputStream(OutputStream out) { + super(out); + } + + long bytesWritten = 0; + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + if (len > 0) + bytesWritten += len; + } + + @Override + public void write(int b) throws IOException { + out.write(b); + bytesWritten++; + } + } + + public LzopIndexedOutputStream(OutputStream out, + DataOutputStream indexOut, + Compressor compressor, + int bufferSize, + CompressionStrategy strategy) + throws IOException { + super(new CountingOutputStream(out), compressor, bufferSize, strategy); + + this.indexOut = indexOut; + this.cout = (CountingOutputStream) this.out; + } + + @Override + public void close() throws IOException { + super.close(); + if (indexOut != null) { + indexOut.close(); + indexOut = null; + } + } + + @Override + protected void compress() throws IOException { + long start = cout.bytesWritten; + super.compress(); + + if ( cout.bytesWritten > start && indexOut != null ) { + // new block is written. write the start pos + indexOut.writeLong(start); + } + } + +} From febf96b1808c102fda818dddb542aea8d68d1f93 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Tue, 26 Jul 2011 14:49:34 -0700 Subject: [PATCH 2/4] pull #23: merge LzoIndexdOutputFormat into LzoOutputFormat. Updated outputformat tests to verify the index. --- .../lzo/LzopIndexedOutputStream.java | 77 ------------------- 1 file changed, 77 deletions(-) delete mode 100644 src/java/com/hadoop/compression/lzo/LzopIndexedOutputStream.java diff --git a/src/java/com/hadoop/compression/lzo/LzopIndexedOutputStream.java b/src/java/com/hadoop/compression/lzo/LzopIndexedOutputStream.java deleted file mode 100644 index 9cb74268..00000000 --- a/src/java/com/hadoop/compression/lzo/LzopIndexedOutputStream.java +++ /dev/null @@ -1,77 +0,0 @@ -package com.hadoop.compression.lzo; - -import java.io.DataOutputStream; -import java.io.FilterOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.hadoop.io.compress.Compressor; - -import com.hadoop.compression.lzo.LzoCompressor.CompressionStrategy; - -/** - * An {@link LzopOutputStream} that also writes lzop block indices - * to an output stream. - */ -public class LzopIndexedOutputStream extends LzopOutputStream { - - // This class can be easily folded into LzoOutputStream - - DataOutputStream indexOut; - CountingOutputStream cout; - - // keeps count of number of bytes written. - private static class CountingOutputStream extends FilterOutputStream { - public CountingOutputStream(OutputStream out) { - super(out); - } - - long bytesWritten = 0; - - @Override - public void write(byte[] b, int off, int len) throws IOException { - out.write(b, off, len); - if (len > 0) - bytesWritten += len; - } - - @Override - public void write(int b) throws IOException { - out.write(b); - bytesWritten++; - } - } - - public LzopIndexedOutputStream(OutputStream out, - DataOutputStream indexOut, - Compressor compressor, - int bufferSize, - CompressionStrategy strategy) - throws IOException { - super(new CountingOutputStream(out), compressor, bufferSize, strategy); - - this.indexOut = indexOut; - this.cout = (CountingOutputStream) this.out; - } - - @Override - public void close() throws IOException { - super.close(); - if (indexOut != null) { - indexOut.close(); - indexOut = null; - } - } - - @Override - protected void compress() throws IOException { - long start = cout.bytesWritten; - super.compress(); - - if ( cout.bytesWritten > start && indexOut != null ) { - // new block is written. write the start pos - indexOut.writeLong(start); - } - } - -} From f011c246878b3fc4f0b71e1c01052add3b3fca12 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Wed, 27 Jul 2011 12:06:35 -0700 Subject: [PATCH 3/4] previous commit was incomplete. This should be part of it. --- .../com/hadoop/compression/lzo/LzopCodec.java | 2 +- .../compression/lzo/LzopOutputStream.java | 48 +++++++++++++++---- .../compression/lzo/TestLzopOutputStream.java | 29 ++++++++++- 3 files changed, 67 insertions(+), 12 deletions(-) diff --git a/src/java/com/hadoop/compression/lzo/LzopCodec.java b/src/java/com/hadoop/compression/lzo/LzopCodec.java index 66eb7b82..88e11d80 100644 --- a/src/java/com/hadoop/compression/lzo/LzopCodec.java +++ b/src/java/com/hadoop/compression/lzo/LzopCodec.java @@ -69,7 +69,7 @@ public CompressionOutputStream createIndexedOutputStream(OutputStream out, LzoCompressor.CompressionStrategy strategy = LzoCompressor.CompressionStrategy.valueOf( getConf().get(LZO_COMPRESSOR_KEY, LzoCompressor.CompressionStrategy.LZO1X_1.name())); int bufferSize = getConf().getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE); - return new LzopIndexedOutputStream(out, indexOut, compressor, bufferSize, strategy); + return new LzopOutputStream(out, indexOut, compressor, bufferSize, strategy); } @Override diff --git a/src/java/com/hadoop/compression/lzo/LzopOutputStream.java b/src/java/com/hadoop/compression/lzo/LzopOutputStream.java index e57aa3eb..4f6d4be8 100644 --- a/src/java/com/hadoop/compression/lzo/LzopOutputStream.java +++ b/src/java/com/hadoop/compression/lzo/LzopOutputStream.java @@ -18,6 +18,8 @@ package com.hadoop.compression.lzo; +import java.io.DataOutputStream; +import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.zip.Adler32; @@ -26,13 +28,11 @@ import org.apache.hadoop.io.compress.CompressorStream; import org.apache.hadoop.io.compress.Compressor; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - public class LzopOutputStream extends CompressorStream { - private static final Log LOG = LogFactory.getLog(LzopOutputStream.class); final int MAX_INPUT_SIZE; + protected DataOutputStream indexOut; + private CountingOutputStream cout; /** * Write an lzop-compatible header to the OutputStream provided. @@ -76,16 +76,19 @@ protected static void writeLzopHeader(OutputStream out, } } - public LzopOutputStream(OutputStream out, Compressor compressor, - int bufferSize, LzoCompressor.CompressionStrategy strategy) - throws IOException { - super(out, compressor, bufferSize); + public LzopOutputStream(OutputStream out, DataOutputStream indexOut, + Compressor compressor, int bufferSize, + LzoCompressor.CompressionStrategy strategy) + throws IOException { + super(new CountingOutputStream(out), compressor, bufferSize); + this.cout = (CountingOutputStream) this.out; + this.indexOut = indexOut; int overhead = strategy.name().contains("LZO1") ? (bufferSize >> 4) + 64 + 3 : (bufferSize >> 3) + 128 + 3; MAX_INPUT_SIZE = bufferSize - overhead; - writeLzopHeader(out, strategy); + writeLzopHeader(this.out, strategy); } /** @@ -97,6 +100,9 @@ public void close() throws IOException { finish(); out.write(new byte[]{ 0, 0, 0, 0 }); out.close(); + if (indexOut != null) { + indexOut.close(); + } closed = true; } } @@ -171,6 +177,11 @@ public void finish() throws IOException { protected void compress() throws IOException { int len = compressor.compress(buffer, 0, buffer.length); if (len > 0) { + // new lzo block. write current position to index file. + if (indexOut != null) { + indexOut.writeLong(cout.bytesWritten); + } + rawWriteInt((int)compressor.getBytesRead()); // If the compressed buffer is actually larger than the uncompressed buffer, @@ -196,4 +207,23 @@ private void rawWriteInt(int v) throws IOException { out.write((v >>> 8) & 0xFF); out.write((v >>> 0) & 0xFF); } + + /* keeps count of number of bytes written. */ + private static class CountingOutputStream extends FilterOutputStream { + public CountingOutputStream(OutputStream out) { + super(out); + } + + long bytesWritten = 0; + + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + bytesWritten += len; + } + + public void write(int b) throws IOException { + out.write(b); + bytesWritten++; + } + } } diff --git a/src/test/com/hadoop/compression/lzo/TestLzopOutputStream.java b/src/test/com/hadoop/compression/lzo/TestLzopOutputStream.java index 98c084f1..9df8b815 100644 --- a/src/test/com/hadoop/compression/lzo/TestLzopOutputStream.java +++ b/src/test/com/hadoop/compression/lzo/TestLzopOutputStream.java @@ -20,11 +20,11 @@ import java.io.BufferedWriter; import java.io.BufferedReader; +import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.FileInputStream; import java.io.IOException; -import java.io.OutputStreamWriter; import java.io.InputStreamReader; import java.security.NoSuchAlgorithmException; @@ -32,6 +32,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; /** * Test the LzoOutputFormat, make sure that it can write files of different sizes and read them back in @@ -47,11 +50,15 @@ public class TestLzopOutputStream extends TestCase { private final String mediumFile = "1000.txt"; private final String smallFile = "100.txt"; private final String issue20File = "issue20-lzop.txt"; + private FileSystem localFs; @Override protected void setUp() throws Exception { super.setUp(); inputDataPath = System.getProperty("test.build.data", "data"); + Configuration conf = new Configuration(); + conf.set("io.compression.codecs", LzopCodec.class.getName()); + localFs = FileSystem.getLocal(conf).getRaw(); } /** @@ -121,6 +128,7 @@ private void runTest(String filename) throws IOException, File textFile = new File(inputDataPath, filename); File lzoFile = new File(inputDataPath, filename + new LzopCodec().getDefaultExtension()); File lzoOutFile = new File(inputDataPath, "output_" + filename + new LzopCodec().getDefaultExtension()); + File lzoIndexFile = new File(lzoOutFile.getAbsolutePath() + LzoIndex.LZO_INDEX_SUFFIX); if (lzoOutFile.exists()) { lzoOutFile.delete(); } @@ -136,7 +144,9 @@ private void runTest(String filename) throws IOException, int lzoBufferSize = 256 * 1024; LzoCompressor.CompressionStrategy strategy = LzoCompressor.CompressionStrategy.LZO1X_1; LzoCompressor lzoCompressor = new LzoCompressor(strategy, lzoBufferSize); - LzopOutputStream lzoOut = new LzopOutputStream(new FileOutputStream(lzoOutFile.getAbsolutePath()), lzoCompressor, lzoBufferSize, strategy); + LzopOutputStream lzoOut = new LzopOutputStream(new FileOutputStream(lzoOutFile), + new DataOutputStream(new FileOutputStream(lzoIndexFile)), + lzoCompressor, lzoBufferSize, strategy); // Now read line by line and stream out.. String textLine; @@ -178,5 +188,20 @@ private void runTest(String filename) throws IOException, lzoBr.close(); textBr2.close(); + + // verify the index file: + + Path lzoOutPath = new Path(lzoOutFile.getAbsolutePath()); + LzoIndex lzoIndex = LzoIndex.readIndex(localFs, lzoOutPath); + + // create offline index to compare. + assertTrue(lzoIndexFile.delete()); + LzoIndex.createIndex(localFs, lzoOutPath); + LzoIndex expectedIndex = LzoIndex.readIndex(localFs, lzoOutPath); + + assertEquals(lzoIndex.getNumberOfBlocks(), expectedIndex.getNumberOfBlocks()); + for (int i=0; i Date: Wed, 27 Jul 2011 14:54:18 -0700 Subject: [PATCH 4/4] review comments : restore the old constructor for LzoOutputStream. --- src/java/com/hadoop/compression/lzo/LzopCodec.java | 2 +- src/java/com/hadoop/compression/lzo/LzopOutputStream.java | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/java/com/hadoop/compression/lzo/LzopCodec.java b/src/java/com/hadoop/compression/lzo/LzopCodec.java index 88e11d80..0ff82256 100644 --- a/src/java/com/hadoop/compression/lzo/LzopCodec.java +++ b/src/java/com/hadoop/compression/lzo/LzopCodec.java @@ -57,7 +57,7 @@ public CompressionOutputStream createIndexedOutputStream(OutputStream out, @Override public CompressionOutputStream createOutputStream(OutputStream out, - Compressor compressor) throws IOException { + Compressor compressor) throws IOException { return createIndexedOutputStream(out, null, compressor); } diff --git a/src/java/com/hadoop/compression/lzo/LzopOutputStream.java b/src/java/com/hadoop/compression/lzo/LzopOutputStream.java index 4f6d4be8..789c7be9 100644 --- a/src/java/com/hadoop/compression/lzo/LzopOutputStream.java +++ b/src/java/com/hadoop/compression/lzo/LzopOutputStream.java @@ -76,6 +76,12 @@ protected static void writeLzopHeader(OutputStream out, } } + public LzopOutputStream(OutputStream out, Compressor compressor, + int bufferSize, LzoCompressor.CompressionStrategy strategy) + throws IOException { + this(out, null, compressor, bufferSize, strategy); + } + public LzopOutputStream(OutputStream out, DataOutputStream indexOut, Compressor compressor, int bufferSize, LzoCompressor.CompressionStrategy strategy)