Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
previous commit was incomplete. This should be part of it.
  • Loading branch information
Raghu Angadi committed Jul 27, 2011
1 parent febf96b commit f011c24
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/java/com/hadoop/compression/lzo/LzopCodec.java
Expand Up @@ -69,7 +69,7 @@ public CompressionOutputStream createIndexedOutputStream(OutputStream out,
LzoCompressor.CompressionStrategy strategy = LzoCompressor.CompressionStrategy.valueOf(
getConf().get(LZO_COMPRESSOR_KEY, LzoCompressor.CompressionStrategy.LZO1X_1.name()));
int bufferSize = getConf().getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE);
return new LzopIndexedOutputStream(out, indexOut, compressor, bufferSize, strategy);
return new LzopOutputStream(out, indexOut, compressor, bufferSize, strategy);
}

@Override
Expand Down
48 changes: 39 additions & 9 deletions src/java/com/hadoop/compression/lzo/LzopOutputStream.java
Expand Up @@ -18,6 +18,8 @@

package com.hadoop.compression.lzo;

import java.io.DataOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.Adler32;
Expand All @@ -26,13 +28,11 @@
import org.apache.hadoop.io.compress.CompressorStream;
import org.apache.hadoop.io.compress.Compressor;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class LzopOutputStream extends CompressorStream {
private static final Log LOG = LogFactory.getLog(LzopOutputStream.class);

final int MAX_INPUT_SIZE;
protected DataOutputStream indexOut;
private CountingOutputStream cout;

/**
* Write an lzop-compatible header to the OutputStream provided.
Expand Down Expand Up @@ -76,16 +76,19 @@ protected static void writeLzopHeader(OutputStream out,
}
}

public LzopOutputStream(OutputStream out, Compressor compressor,
int bufferSize, LzoCompressor.CompressionStrategy strategy)
throws IOException {
super(out, compressor, bufferSize);
public LzopOutputStream(OutputStream out, DataOutputStream indexOut,
Compressor compressor, int bufferSize,
LzoCompressor.CompressionStrategy strategy)
throws IOException {
super(new CountingOutputStream(out), compressor, bufferSize);

this.cout = (CountingOutputStream) this.out;
this.indexOut = indexOut;
int overhead = strategy.name().contains("LZO1") ?
(bufferSize >> 4) + 64 + 3 : (bufferSize >> 3) + 128 + 3;
MAX_INPUT_SIZE = bufferSize - overhead;

writeLzopHeader(out, strategy);
writeLzopHeader(this.out, strategy);
}

/**
Expand All @@ -97,6 +100,9 @@ public void close() throws IOException {
finish();
out.write(new byte[]{ 0, 0, 0, 0 });
out.close();
if (indexOut != null) {
indexOut.close();
}
closed = true;
}
}
Expand Down Expand Up @@ -171,6 +177,11 @@ public void finish() throws IOException {
protected void compress() throws IOException {
int len = compressor.compress(buffer, 0, buffer.length);
if (len > 0) {
// new lzo block. write current position to index file.
if (indexOut != null) {
indexOut.writeLong(cout.bytesWritten);
}

rawWriteInt((int)compressor.getBytesRead());

// If the compressed buffer is actually larger than the uncompressed buffer,
Expand All @@ -196,4 +207,23 @@ private void rawWriteInt(int v) throws IOException {
out.write((v >>> 8) & 0xFF);
out.write((v >>> 0) & 0xFF);
}

/* keeps count of number of bytes written. */
private static class CountingOutputStream extends FilterOutputStream {
public CountingOutputStream(OutputStream out) {
super(out);
}

long bytesWritten = 0;

public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
bytesWritten += len;
}

public void write(int b) throws IOException {
out.write(b);
bytesWritten++;
}
}
}
29 changes: 27 additions & 2 deletions src/test/com/hadoop/compression/lzo/TestLzopOutputStream.java
Expand Up @@ -20,18 +20,21 @@

import java.io.BufferedWriter;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.InputStreamReader;
import java.security.NoSuchAlgorithmException;

import junit.framework.TestCase;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
* Test the LzoOutputFormat, make sure that it can write files of different sizes and read them back in
Expand All @@ -47,11 +50,15 @@ public class TestLzopOutputStream extends TestCase {
private final String mediumFile = "1000.txt";
private final String smallFile = "100.txt";
private final String issue20File = "issue20-lzop.txt";
private FileSystem localFs;

@Override
protected void setUp() throws Exception {
super.setUp();
inputDataPath = System.getProperty("test.build.data", "data");
Configuration conf = new Configuration();
conf.set("io.compression.codecs", LzopCodec.class.getName());
localFs = FileSystem.getLocal(conf).getRaw();
}

/**
Expand Down Expand Up @@ -121,6 +128,7 @@ private void runTest(String filename) throws IOException,
File textFile = new File(inputDataPath, filename);
File lzoFile = new File(inputDataPath, filename + new LzopCodec().getDefaultExtension());
File lzoOutFile = new File(inputDataPath, "output_" + filename + new LzopCodec().getDefaultExtension());
File lzoIndexFile = new File(lzoOutFile.getAbsolutePath() + LzoIndex.LZO_INDEX_SUFFIX);
if (lzoOutFile.exists()) {
lzoOutFile.delete();
}
Expand All @@ -136,7 +144,9 @@ private void runTest(String filename) throws IOException,
int lzoBufferSize = 256 * 1024;
LzoCompressor.CompressionStrategy strategy = LzoCompressor.CompressionStrategy.LZO1X_1;
LzoCompressor lzoCompressor = new LzoCompressor(strategy, lzoBufferSize);
LzopOutputStream lzoOut = new LzopOutputStream(new FileOutputStream(lzoOutFile.getAbsolutePath()), lzoCompressor, lzoBufferSize, strategy);
LzopOutputStream lzoOut = new LzopOutputStream(new FileOutputStream(lzoOutFile),
new DataOutputStream(new FileOutputStream(lzoIndexFile)),
lzoCompressor, lzoBufferSize, strategy);

// Now read line by line and stream out..
String textLine;
Expand Down Expand Up @@ -178,5 +188,20 @@ private void runTest(String filename) throws IOException,

lzoBr.close();
textBr2.close();

// verify the index file:

Path lzoOutPath = new Path(lzoOutFile.getAbsolutePath());
LzoIndex lzoIndex = LzoIndex.readIndex(localFs, lzoOutPath);

// create offline index to compare.
assertTrue(lzoIndexFile.delete());
LzoIndex.createIndex(localFs, lzoOutPath);
LzoIndex expectedIndex = LzoIndex.readIndex(localFs, lzoOutPath);

assertEquals(lzoIndex.getNumberOfBlocks(), expectedIndex.getNumberOfBlocks());
for (int i=0; i<lzoIndex.getNumberOfBlocks(); i++) {
assertEquals(lzoIndex.getPosition(i), expectedIndex.getPosition(i));
}
}
}

0 comments on commit f011c24

Please sign in to comment.