Permalink
Browse files

The main changes. Many are (unfortunately) whitespace caused by Eclip…

…se's code formatter. Diff with no whitespace to remove that part. Much of the rest is a reorganization of the files because I kept getting tired of looking through long files for nested inner classes. I broke things out into their own classes and files. I also added a com.hadoop.mapred.DeprecatedLzoTextInputFormat to go along with the com.hadoop.mapreduce.LzoTextInputFormat -- applications like streaming still need a class derived from org.apache.hadoop.mapred.InputFormat, so this is one way to make streaming work until it's fixed. In the process of doing that, I ran into some code that was (a) repeated and (b) looked like it should be part of the LzoIndex class anyway, like createIndex and readIndex. I added that and another couple simple functions to the LzoIndex class.
  • Loading branch information...
1 parent 63eedfa commit a756a429809278a5c7f8ac4d9af57493c6b59aa5 @kevinweil kevinweil committed Sep 21, 2009
View
@@ -19,3 +19,4 @@
.settings
.svn
build/
+bin/
@@ -0,0 +1,24 @@
+package com.hadoop.compression.lzo;
+
+import java.util.zip.Adler32;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+/**
+ * Checksums on compressed block data with header bitmask, Checksum class.
+ */
+public enum CChecksum {
+ F_ADLER32C(0x02, Adler32.class), F_CRC32C(0x200, CRC32.class);
+ private final int mask;
+ private final Class<? extends Checksum> clazz;
+ CChecksum(int mask, Class<? extends Checksum> clazz) {
+ this.mask = mask;
+ this.clazz = clazz;
+ }
+ public int getHeaderMask() {
+ return mask;
+ }
+ public Class<? extends Checksum> getChecksumClass() {
+ return clazz;
+ }
+}
@@ -0,0 +1,24 @@
+package com.hadoop.compression.lzo;
+
+import java.util.zip.Adler32;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+/**
+ * Checksums on decompressed block data with header bitmask, Checksum class.
+ */
+public enum DChecksum {
+ F_ADLER32D(0x01, Adler32.class), F_CRC32D(0x100, CRC32.class);
+ private final int mask;
+ private final Class<? extends Checksum> clazz;
+ DChecksum(int mask, Class<? extends Checksum> clazz) {
+ this.mask = mask;
+ this.clazz = clazz;
+ }
+ public int getHeaderMask() {
+ return mask;
+ }
+ public Class<? extends Checksum> getChecksumClass() {
+ return clazz;
+ }
+}
@@ -24,7 +24,7 @@
private static final Log LOG = LogFactory.getLog(GPLNativeCodeLoader.class);
private static boolean nativeLibraryLoaded = false;
-
+
static {
try {
//try to load the lib
@@ -36,7 +36,7 @@
nativeLibraryLoaded = false;
}
}
-
+
/**
* Are the native gpl libraries loaded?
* @return true if loaded, otherwise false
@@ -19,8 +19,8 @@
package com.hadoop.compression.lzo;
import java.io.IOException;
-import java.io.OutputStream;
import java.io.InputStream;
+import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -41,26 +41,26 @@
*
*/
public class LzoCodec implements Configurable, CompressionCodec {
-
+
private static final Log LOG = LogFactory.getLog(LzoCodec.class.getName());
private Configuration conf;
public void setConf(Configuration conf) {
this.conf = conf;
}
-
+
public Configuration getConf() {
return conf;
}
private static boolean nativeLzoLoaded = false;
-
+
static {
if (GPLNativeCodeLoader.isNativeCodeLoaded()) {
nativeLzoLoaded = LzoCompressor.isNativeLzoLoaded() &&
- LzoDecompressor.isNativeLzoLoaded();
-
+ LzoDecompressor.isNativeLzoLoaded();
+
if (nativeLzoLoaded) {
LOG.info("Successfully loaded & initialized native-lzo library");
} else {
@@ -83,17 +83,17 @@ public static boolean isNativeLzoLoaded(Configuration conf) {
}
public CompressionOutputStream createOutputStream(OutputStream out)
- throws IOException {
+ throws IOException {
return createOutputStream(out, createCompressor());
}
-
+
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor) throws IOException {
// Ensure native-lzo library is loaded & initialized
if (!isNativeLzoLoaded(conf)) {
throw new RuntimeException("native-lzo library not available");
}
-
+
/**
* <b>http://www.oberhumer.com/opensource/lzo/lzofaq.php</b>
*
@@ -118,15 +118,15 @@ public CompressionOutputStream createOutputStream(OutputStream out,
LzoCompressor.CompressionStrategy strategy =
LzoCompressor.CompressionStrategy.valueOf(
conf.get("io.compression.codec.lzo.compressor",
- LzoCompressor.CompressionStrategy.LZO1X_1.name()));
+ LzoCompressor.CompressionStrategy.LZO1X_1.name()));
int bufferSize =
conf.getInt("io.compression.codec.lzo.buffersize", 64*1024);
int compressionOverhead = strategy.name().contains("LZO1")
- ? (bufferSize >> 4) + 64 + 3
- : (bufferSize >> 3) + 128 + 3;
+ ? (bufferSize >> 4) + 64 + 3
+ : (bufferSize >> 3) + 128 + 3;
return new BlockCompressorStream(out, compressor, bufferSize,
- compressionOverhead);
+ compressionOverhead);
}
public Class<? extends Compressor> getCompressorType() {
@@ -142,24 +142,24 @@ public Compressor createCompressor() {
if (!isNativeLzoLoaded(conf)) {
throw new RuntimeException("native-lzo library not available");
}
-
+
LzoCompressor.CompressionStrategy strategy =
LzoCompressor.CompressionStrategy.valueOf(
conf.get("io.compression.codec.lzo.compressor",
- LzoCompressor.CompressionStrategy.LZO1X_1.name()));
+ LzoCompressor.CompressionStrategy.LZO1X_1.name()));
int bufferSize =
conf.getInt("io.compression.codec.lzo.buffersize", 64*1024);
return new LzoCompressor(strategy, bufferSize);
}
public CompressionInputStream createInputStream(InputStream in)
- throws IOException {
+ throws IOException {
return createInputStream(in, createDecompressor());
}
public CompressionInputStream createInputStream(InputStream in,
- Decompressor decompressor)
+ Decompressor decompressor)
throws IOException {
// Ensure native-lzo library is loaded & initialized
if (!isNativeLzoLoaded(conf)) {
@@ -182,11 +182,11 @@ public Decompressor createDecompressor() {
if (!isNativeLzoLoaded(conf)) {
throw new RuntimeException("native-lzo library not available");
}
-
+
LzoDecompressor.CompressionStrategy strategy =
LzoDecompressor.CompressionStrategy.valueOf(
conf.get("io.compression.codec.lzo.decompressor",
- LzoDecompressor.CompressionStrategy.LZO1X.name()));
+ LzoDecompressor.CompressionStrategy.LZO1X.name()));
int bufferSize =
conf.getInt("io.compression.codec.lzo.buffersize", 64*1024);
@@ -38,15 +38,15 @@
// HACK - Use this as a global lock in the JNI layer
@SuppressWarnings({ "unchecked", "unused" })
private static Class clazz = LzoDecompressor.class;
-
+
private int directBufferSize;
private byte[] userBuf = null;
private int userBufOff = 0, userBufLen = 0;
private Buffer uncompressedDirectBuf = null;
private int uncompressedDirectBufLen = 0;
private Buffer compressedDirectBuf = null;
private boolean finish, finished;
-
+
private long bytesread = 0L;
private long byteswritten = 0L;
@@ -56,7 +56,7 @@
private int workingMemoryBufLen = 0; // The length of 'working memory' buf.
@SuppressWarnings("unused")
private Buffer workingMemoryBuf; // The 'working memory' for lzo.
-
+
/**
* The compression algorithm for lzo library.
*/
@@ -66,13 +66,13 @@
*/
LZO1 (0),
LZO1_99 (1),
-
+
/**
* lzo1a algorithms.
*/
LZO1A (2),
LZO1A_99 (3),
-
+
/**
* lzo1b algorithms.
*/
@@ -108,13 +108,13 @@
LZO1C_9 (29),
LZO1C_99 (30),
LZO1C_999 (31),
-
+
/**
* lzo1f algorithms.
*/
LZO1F_1 (32),
LZO1F_999 (33),
-
+
/**
* lzo1x algorithms.
*/
@@ -123,37 +123,37 @@
LZO1X_12 (36),
LZO1X_15 (37),
LZO1X_999 (38),
-
+
/**
* lzo1y algorithms.
*/
LZO1Y_1 (39),
LZO1Y_999 (40),
-
+
/**
* lzo1z algorithms.
*/
LZO1Z_999 (41),
-
+
/**
* lzo2a algorithms.
*/
LZO2A_999 (42);
-
+
private final int compressor;
private CompressionStrategy(int compressor) {
this.compressor = compressor;
}
-
+
int getCompressor() {
return compressor;
}
}; // CompressionStrategy
private static boolean nativeLzoLoaded;
public static final int LZO_LIBRARY_VERSION;
-
+
static {
if (GPLNativeCodeLoader.isNativeCodeLoaded()) {
// Initialize the native library
@@ -166,15 +166,15 @@ int getCompressor() {
nativeLzoLoaded = false;
}
LZO_LIBRARY_VERSION = (nativeLzoLoaded) ? 0xFFFF & getLzoLibraryVersion()
- : -1;
+ : -1;
} else {
LOG.error("Cannot load " + LzoCompressor.class.getName() +
- " without native-hadoop library!");
+ " without native-hadoop library!");
nativeLzoLoaded = false;
LZO_LIBRARY_VERSION = -1;
- }
- }
-
+ }
+ }
+
/**
* Check if lzo compressors are loaded and initialized.
*
@@ -197,21 +197,21 @@ public LzoCompressor(CompressionStrategy strategy, int directBufferSize) {
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
compressedDirectBuf.position(directBufferSize);
-
+
/**
* Initialize {@link #lzoCompress} and {@link #workingMemoryBufLen}
*/
init(this.strategy.getCompressor());
workingMemoryBuf = ByteBuffer.allocateDirect(workingMemoryBufLen);
}
-
+
/**
* Creates a new compressor with the default lzo1x_1 compression.
*/
public LzoCompressor() {
this(CompressionStrategy.LZO1X_1, 64*1024);
}
-
+
public synchronized void setInput(byte[] b, int off, int len) {
if (b== null) {
throw new NullPointerException();
@@ -246,7 +246,7 @@ synchronized void setInputFromSavedData() {
uncompressedDirectBufLen = Math.min(userBufLen, directBufferSize);
((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff,
- uncompressedDirectBufLen);
+ uncompressedDirectBufLen);
// Note how much data is being fed to lzo
userBufOff += uncompressedDirectBufLen;
@@ -267,15 +267,15 @@ public boolean needsInput() {
public synchronized void finish() {
finish = true;
}
-
+
public synchronized boolean finished() {
// Check if 'lzo' says its 'finished' and
// all compressed data has been consumed
return (finish && finished && compressedDirectBuf.remaining() == 0);
}
public synchronized int compress(byte[] b, int off, int len)
- throws IOException {
+ throws IOException {
if (b == null) {
throw new NullPointerException();
}
@@ -354,7 +354,7 @@ public synchronized long getBytesWritten() {
public synchronized void end() {
// nop
}
-
+
private native static void initIDs();
private native static int getLzoLibraryVersion();
private native void init(int compressor);
Oops, something went wrong.

0 comments on commit a756a42

Please sign in to comment.