Skip to content

Commit

Permalink
Use heuristic to size stream decompression buffers
Browse files Browse the repository at this point in the history
Instead of always allocating the maximum amount of memory, use a
heuristic to allocate memory for orc stream decompression buffer. For
wide tables with small stripes (~100 rows, ~30KB) this can provide a 50x
speedup
  • Loading branch information
cberner committed Mar 9, 2015
1 parent cd3fdc4 commit be658b1
Showing 1 changed file with 41 additions and 21 deletions.
Expand Up @@ -24,6 +24,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;

Expand All @@ -41,15 +42,16 @@
public final class OrcInputStream
extends InputStream
{
public static final int EXPECTED_COMPRESSION_RATIO = 5;
private final String source;
private final FixedLengthSliceInput compressedSliceInput;
private final CompressionKind compressionKind;
private final int bufferSize;
private final int maxBufferSize;

private int currentCompressedBlockOffset;
private FixedLengthSliceInput current;

private Slice buffer;
private byte[] buffer;

public OrcInputStream(String source, FixedLengthSliceInput sliceInput, CompressionKind compressionKind, int bufferSize)
{
Expand All @@ -58,7 +60,7 @@ public OrcInputStream(String source, FixedLengthSliceInput sliceInput, Compressi
checkNotNull(sliceInput, "sliceInput is null");

this.compressionKind = checkNotNull(compressionKind, "compressionKind is null");
this.bufferSize = bufferSize;
this.maxBufferSize = bufferSize;

if (compressionKind == UNCOMPRESSED) {
this.current = sliceInput;
Expand Down Expand Up @@ -159,7 +161,7 @@ public boolean seekToCheckpoint(long checkpoint)

if (decompressedOffset != current.position()) {
current.setPosition(0);
if (current.remaining() < decompressedOffset) {
if (current.remaining() < decompressedOffset) {
decompressedOffset -= current.remaining();
advance();
}
Expand Down Expand Up @@ -211,19 +213,15 @@ private void advance()
current = chunk.getInput();
}
else {
if (buffer == null) {
buffer = Slices.allocate(bufferSize);
}

int uncompressedSize;
if (compressionKind == ZLIB) {
uncompressedSize = decompressZip(chunk, buffer);
uncompressedSize = decompressZip(chunk);
}
else {
uncompressedSize = decompressSnappy(chunk, buffer);
uncompressedSize = decompressSnappy(chunk);
}

current = buffer.slice(0, uncompressedSize).getInput();
current = Slices.wrappedBuffer(buffer, 0, uncompressedSize).getInput();
}
}

Expand All @@ -239,17 +237,25 @@ public String toString()
}

// This comes from the Apache Hive ORC code
private static int decompressZip(Slice in, Slice buffer)
private int decompressZip(Slice in)
throws IOException
{
Inflater inflater = new Inflater(true);
try {
inflater.setInput((byte[]) in.getBase(), (int) (in.getAddress() - ARRAY_BYTE_BASE_OFFSET), in.length());
byte[] outArray = (byte[]) buffer.getBase();
int outOffset = (int) (buffer.getAddress() - ARRAY_BYTE_BASE_OFFSET);
int outLength = buffer.length();

int uncompressedLength = inflater.inflate(outArray, outOffset, outLength);
allocateOrGrowBuffer(in.length() * EXPECTED_COMPRESSION_RATIO, false);
int uncompressedLength = 0;
while (true) {
uncompressedLength += inflater.inflate(buffer, uncompressedLength, buffer.length - uncompressedLength);
if (inflater.finished() || buffer.length >= maxBufferSize) {
break;
}
int oldBufferSize = buffer.length;
allocateOrGrowBuffer(buffer.length * 2, true);
if (buffer.length <= oldBufferSize) {
throw new IllegalStateException(String.format("Buffer failed to grow. Old size %d, current size %d", oldBufferSize, buffer.length));
}
}

if (!inflater.finished()) {
throw new OrcCorruptionException("Could not decompress all input (output buffer too small?)");
Expand All @@ -265,15 +271,29 @@ private static int decompressZip(Slice in, Slice buffer)
}
}

private static int decompressSnappy(Slice in, Slice buffer)
private int decompressSnappy(Slice in)
throws IOException
{
byte[] outArray = (byte[]) buffer.getBase();

byte[] inArray = (byte[]) in.getBase();
int inOffset = (int) (in.getAddress() - ARRAY_BYTE_BASE_OFFSET);
int inLength = in.length();

return Snappy.uncompress(inArray, inOffset, inLength, outArray, 0);
int uncompressedLength = Snappy.getUncompressedLength(inArray, inOffset);
checkArgument(uncompressedLength <= maxBufferSize, "Snappy requires buffer (%d) larger than max size (%d)", uncompressedLength, maxBufferSize);
allocateOrGrowBuffer(uncompressedLength, false);

return Snappy.uncompress(inArray, inOffset, inLength, buffer, 0);
}

private void allocateOrGrowBuffer(int size, boolean copyExistingData)
{
if (buffer == null || buffer.length < size) {
if (copyExistingData && buffer != null) {
buffer = Arrays.copyOfRange(buffer, 0, Math.min(size, maxBufferSize));
}
else {
buffer = new byte[Math.min(size, maxBufferSize)];
}
}
}
}

0 comments on commit be658b1

Please sign in to comment.