Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support LZ4 and ZSTD when writing Hive tables #910

Merged
merged 3 commits into from Jun 5, 2019
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -462,7 +462,7 @@
<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
<version>0.13</version>
<version>0.14</version>
</dependency>

<dependency>
@@ -16,7 +16,9 @@
import io.prestosql.orc.metadata.CompressionKind;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.io.compress.ZStandardCodec;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.util.Optional;
@@ -27,6 +29,8 @@
{
NONE(null, CompressionKind.NONE, CompressionCodecName.UNCOMPRESSED),
SNAPPY(SnappyCodec.class, CompressionKind.SNAPPY, CompressionCodecName.SNAPPY),
LZ4(Lz4Codec.class, CompressionKind.LZ4, CompressionCodecName.LZ4),
ZSTD(ZStandardCodec.class, CompressionKind.ZSTD, CompressionCodecName.ZSTD),
GZIP(GzipCodec.class, CompressionKind.ZLIB, CompressionCodecName.GZIP);

private final Optional<Class<? extends CompressionCodec>> codec;
@@ -102,9 +102,11 @@
import static org.apache.parquet.hadoop.ParquetOutputFormat.ENABLE_DICTIONARY;
import static org.apache.parquet.hadoop.ParquetOutputFormat.WRITER_VERSION;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZ4;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZO;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -137,7 +139,7 @@ public static ParquetTester quickParquetTester()
public static ParquetTester fullParquetTester()
{
ParquetTester parquetTester = new ParquetTester();
parquetTester.compressions = ImmutableSet.of(GZIP, UNCOMPRESSED, SNAPPY, LZO);
parquetTester.compressions = ImmutableSet.of(GZIP, UNCOMPRESSED, SNAPPY, LZO, LZ4, ZSTD);
parquetTester.versions = ImmutableSet.copyOf(WriterVersion.values());
parquetTester.sessions = ImmutableSet.of(SESSION, SESSION_USE_NAME);
return parquetTester;
@@ -17,12 +17,12 @@
import io.airlift.compress.Compressor;
import io.airlift.compress.lz4.Lz4Compressor;
import io.airlift.compress.snappy.SnappyCompressor;
import io.airlift.compress.zstd.ZstdCompressor;
import io.airlift.slice.SizeOf;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceOutput;
import io.prestosql.orc.checkpoint.InputStreamCheckpoint;
import io.prestosql.orc.metadata.CompressionKind;
import io.prestosql.orc.zstd.ZstdJniCompressor;
import org.openjdk.jol.info.ClassLayout;

import javax.annotation.Nullable;
@@ -99,7 +99,7 @@ else if (compression == CompressionKind.LZ4) {
this.compressor = new Lz4Compressor();
}
else if (compression == CompressionKind.ZSTD) {
this.compressor = new ZstdJniCompressor();
this.compressor = new ZstdCompressor();
}
else {
throw new IllegalArgumentException("Unsupported compression " + compression);
@@ -14,8 +14,10 @@
package io.prestosql.parquet;

import io.airlift.compress.Decompressor;
import io.airlift.compress.lz4.Lz4Decompressor;
import io.airlift.compress.lzo.LzoDecompressor;
import io.airlift.compress.snappy.SnappyDecompressor;
import io.airlift.compress.zstd.ZstdDecompressor;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -56,6 +58,10 @@ public static Slice decompress(CompressionCodecName codec, Slice input, int unco
return input;
case LZO:
return decompressLZO(input, uncompressedSize);
case LZ4:
return decompressLz4(input, uncompressedSize);
case ZSTD:
return decompressZstd(input, uncompressedSize);
default:
throw new ParquetCorruptionException("Codec not supported in Parquet: " + codec);
}
@@ -68,6 +74,13 @@ private static Slice decompressSnappy(Slice input, int uncompressedSize)
return wrappedBuffer(buffer);
}

private static Slice decompressZstd(Slice input, int uncompressedSize)
{
byte[] buffer = new byte[uncompressedSize];
decompress(new ZstdDecompressor(), input, 0, input.length(), buffer, 0);
return wrappedBuffer(buffer);
}

private static Slice decompressGzip(Slice input, int uncompressedSize)
throws IOException
{
@@ -86,6 +99,32 @@ private static Slice decompressGzip(Slice input, int uncompressedSize)
}
}

private static Slice decompressLz4(Slice input, int uncompressedSize)
{
Lz4Decompressor decompressor = new Lz4Decompressor();
long totalDecompressedCount = 0;
// over allocate buffer which makes decompression easier
byte[] output = new byte[uncompressedSize + SIZE_OF_LONG];
int outputOffset = 0;
int inputOffset = 0;
int cumulativeUncompressedBlockLength = 0;

while (totalDecompressedCount < uncompressedSize) {
if (totalDecompressedCount == cumulativeUncompressedBlockLength) {
cumulativeUncompressedBlockLength += Integer.reverseBytes(input.getInt(inputOffset));
inputOffset += SIZE_OF_INT;
}
int compressedChunkLength = Integer.reverseBytes(input.getInt(inputOffset));
inputOffset += SIZE_OF_INT;
int decompressionSize = decompress(decompressor, input, inputOffset, compressedChunkLength, output, outputOffset);
totalDecompressedCount += decompressionSize;
outputOffset += decompressionSize;
inputOffset += compressedChunkLength;
}
checkArgument(outputOffset == uncompressedSize);
return wrappedBuffer(output, 0, uncompressedSize);
}

private static Slice decompressLZO(Slice input, int uncompressedSize)
{
LzoDecompressor lzoDecompressor = new LzoDecompressor();
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.