Browse files

reuse compressors;

simplify decompressor reuse (reuse decompressors for both cases)
  • Loading branch information...
1 parent 88b5a82 commit c03cd51f9c1f88b1c3b613a11d775cab287c395c @yuxutw yuxutw committed Apr 25, 2012
View
16 src/java/com/hadoop/compression/lzo/LzopCodec.java
@@ -48,13 +48,17 @@
@Override
public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
- return createOutputStream(out, createCompressor());
+ //get a compressor which will be returned to the pool when the output stream
+ //is closed.
+ return createOutputStream(out, CodecPool.getCompressor(this, getConf()));
}
public CompressionOutputStream createIndexedOutputStream(OutputStream out,
DataOutputStream indexOut)
throws IOException {
- return createIndexedOutputStream(out, indexOut, createCompressor());
+ //get a compressor which will be returned to the pool when the output stream
+ //is closed.
+ return createIndexedOutputStream(out, indexOut,CodecPool.getCompressor(this, getConf()));
}
@Override
@@ -87,11 +91,9 @@ public CompressionInputStream createInputStream(InputStream in,
@Override
public CompressionInputStream createInputStream(InputStream in) throws IOException {
- /* create a decompressor and tell LzoInputStream to reuse it
- * (return it to the pool when LzoInputStream is closed).
- */
- return new LzopInputStream(in, CodecPool.getDecompressor(this),
- getConf().getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE), true);
+ // get a decompressor from a pool which will be returned to the pool
+ // when LzoInputStream is closed
+ return createInputStream(in, CodecPool.getDecompressor(this));
}
@Override
View
12 src/java/com/hadoop/compression/lzo/LzopInputStream.java
@@ -48,21 +48,13 @@
private int noUncompressedBytes = 0;
private int noCompressedBytes = 0;
private int uncompressedBlockSize = 0;
- private boolean reuseDecompressor;
public LzopInputStream(InputStream in, Decompressor decompressor,
int bufferSize) throws IOException {
- this(in, decompressor, bufferSize, false);
- }
-
-
- public LzopInputStream(InputStream in, Decompressor decompressor, int bufferSize, boolean reuseDecompressor) throws IOException {
super(in, decompressor, bufferSize);
readHeader(in);
- this.reuseDecompressor = reuseDecompressor;
}
-
/**
* Reads len bytes in a loop.
*
@@ -346,8 +338,8 @@ public void close() throws IOException {
// the file didn't. It's not critical, though, so log and eat it in this case.
LOG.warn("Incorrect LZO file format: file did not end with four trailing zeroes.", e);
} finally{
- if(reuseDecompressor)
- CodecPool.returnDecompressor(decompressor);
+ //return the decompressor to the pool, the function itself handles null.
+ CodecPool.returnDecompressor(decompressor);
}
}
}
View
4 src/java/com/hadoop/compression/lzo/LzopOutputStream.java
@@ -25,6 +25,7 @@
import java.util.zip.Adler32;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressorStream;
import org.apache.hadoop.io.compress.Compressor;
@@ -111,6 +112,9 @@ public void close() throws IOException {
}
closed = true;
}
+ //return the compressor to the pool for later reuse;
+ //the returnCompressor handles nulls.
+ CodecPool.returnCompressor(compressor);
}
@Override

0 comments on commit c03cd51

Please sign in to comment.