Skip to content

Commit

Permalink
Add LZ4 Compression Codec (apache#6804) (apache#7035)
Browse files Browse the repository at this point in the history
  • Loading branch information
GSharayu authored and wuwenw committed Jun 18, 2021
1 parent fa21ba5 commit f50a817
Show file tree
Hide file tree
Showing 13 changed files with 408 additions and 83 deletions.
4 changes: 4 additions & 0 deletions pinot-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,17 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
private static final String SNAPPY_STRING = "SNAPPY_STRING";
private static final String PASS_THROUGH_STRING = "PASS_THROUGH_STRING";
private static final String ZSTANDARD_STRING = "ZSTANDARD_STRING";
private static final String LZ4_STRING = "LZ4_STRING";

private static final String SNAPPY_LONG = "SNAPPY_LONG";
private static final String PASS_THROUGH_LONG = "PASS_THROUGH_LONG";
private static final String ZSTANDARD_LONG = "ZSTANDARD_LONG";
private static final String LZ4_LONG = "LZ4_LONG";

private static final String SNAPPY_INTEGER = "SNAPPY_INTEGER";
private static final String PASS_THROUGH_INTEGER = "PASS_THROUGH_INTEGER";
private static final String ZSTANDARD_INTEGER = "ZSTANDARD_INTEGER";
private static final String LZ4_INTEGER = "LZ4_INTEGER";

private static final List<String> RAW_SNAPPY_INDEX_COLUMNS = Arrays
.asList(SNAPPY_STRING, SNAPPY_LONG, SNAPPY_INTEGER);
Expand All @@ -85,6 +88,9 @@ public class NoDictionaryCompressionQueriesTest extends BaseQueriesTest {
private static final List<String> RAW_PASS_THROUGH_INDEX_COLUMNS = Arrays
.asList(PASS_THROUGH_STRING, PASS_THROUGH_LONG, PASS_THROUGH_INTEGER);

private static final List<String> RAW_LZ4_INDEX_COLUMNS = Arrays
.asList(LZ4_STRING, LZ4_LONG, LZ4_INTEGER);

private final List<GenericRow> _rows = new ArrayList<>();

private IndexSegment _indexSegment;
Expand Down Expand Up @@ -118,6 +124,7 @@ public void setUp()
indexColumns.addAll(RAW_SNAPPY_INDEX_COLUMNS);
indexColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS);
indexColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS);
indexColumns.addAll(RAW_LZ4_INDEX_COLUMNS);

indexLoadingConfig.getNoDictionaryColumns().addAll(indexColumns);
ImmutableSegment immutableSegment =
Expand All @@ -136,7 +143,9 @@ private void buildSegment()
throws Exception {
rows = createTestData();

List<FieldConfig> fieldConfigs = new ArrayList<>(RAW_SNAPPY_INDEX_COLUMNS.size() + RAW_ZSTANDARD_INDEX_COLUMNS.size() + RAW_PASS_THROUGH_INDEX_COLUMNS.size());
List<FieldConfig> fieldConfigs = new ArrayList<>(RAW_SNAPPY_INDEX_COLUMNS.size()
+ RAW_ZSTANDARD_INDEX_COLUMNS.size() + RAW_PASS_THROUGH_INDEX_COLUMNS.size() + RAW_LZ4_INDEX_COLUMNS.size());

for (String indexColumn : RAW_SNAPPY_INDEX_COLUMNS) {
fieldConfigs
.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, null, FieldConfig.CompressionCodec.SNAPPY, null));
Expand All @@ -152,10 +161,16 @@ private void buildSegment()
.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, null, FieldConfig.CompressionCodec.PASS_THROUGH, null));
}

for (String indexColumn : RAW_LZ4_INDEX_COLUMNS) {
fieldConfigs
.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, null, FieldConfig.CompressionCodec.LZ4, null));
}

List<String> _noDictionaryColumns = new ArrayList<>();
_noDictionaryColumns.addAll(RAW_SNAPPY_INDEX_COLUMNS);
_noDictionaryColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS);
_noDictionaryColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS);
_noDictionaryColumns.addAll(RAW_LZ4_INDEX_COLUMNS);

TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setNoDictionaryColumns(_noDictionaryColumns)
Expand All @@ -164,12 +179,15 @@ private void buildSegment()
.addSingleValueDimension(SNAPPY_STRING, FieldSpec.DataType.STRING)
.addSingleValueDimension(PASS_THROUGH_STRING, FieldSpec.DataType.STRING)
.addSingleValueDimension(ZSTANDARD_STRING, FieldSpec.DataType.STRING)
.addSingleValueDimension(LZ4_STRING, FieldSpec.DataType.STRING)
.addSingleValueDimension(SNAPPY_INTEGER, FieldSpec.DataType.INT)
.addSingleValueDimension(ZSTANDARD_INTEGER, FieldSpec.DataType.INT)
.addSingleValueDimension(PASS_THROUGH_INTEGER, FieldSpec.DataType.INT)
.addSingleValueDimension(LZ4_INTEGER, FieldSpec.DataType.INT)
.addSingleValueDimension(SNAPPY_LONG, FieldSpec.DataType.LONG)
.addSingleValueDimension(ZSTANDARD_LONG, FieldSpec.DataType.LONG)
.addSingleValueDimension(PASS_THROUGH_LONG, FieldSpec.DataType.LONG)
.addSingleValueDimension(LZ4_LONG, FieldSpec.DataType.LONG)
.build();
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
config.setOutDir(INDEX_DIR.getPath());
Expand Down Expand Up @@ -213,12 +231,15 @@ private List<GenericRow> createTestData()
row.putValue(SNAPPY_STRING, tempStringRows[i]);
row.putValue(ZSTANDARD_STRING, tempStringRows[i]);
row.putValue(PASS_THROUGH_STRING, tempStringRows[i]);
row.putValue(LZ4_STRING, tempStringRows[i]);
row.putValue(SNAPPY_INTEGER, tempIntRows[i]);
row.putValue(ZSTANDARD_INTEGER, tempIntRows[i]);
row.putValue(PASS_THROUGH_INTEGER, tempIntRows[i]);
row.putValue(LZ4_INTEGER, tempIntRows[i]);
row.putValue(SNAPPY_LONG, tempLongRows[i]);
row.putValue(ZSTANDARD_LONG, tempLongRows[i]);
row.putValue(PASS_THROUGH_LONG, tempLongRows[i]);
row.putValue(LZ4_LONG, tempLongRows[i]);
rows.add(row);
}
return rows;
Expand All @@ -232,15 +253,15 @@ public void testQueriesWithCompressionCodec()
throws Exception {

String query =
"SELECT SNAPPY_STRING, ZSTANDARD_STRING, PASS_THROUGH_STRING, SNAPPY_INTEGER, ZSTANDARD_INTEGER, PASS_THROUGH_INTEGER, "
+ "SNAPPY_LONG, ZSTANDARD_LONG, PASS_THROUGH_LONG FROM MyTable LIMIT 1000";
"SELECT SNAPPY_STRING, ZSTANDARD_STRING, PASS_THROUGH_STRING, LZ4_STRING, SNAPPY_INTEGER, ZSTANDARD_INTEGER, PASS_THROUGH_INTEGER, LZ4_INTEGER, "
+ "SNAPPY_LONG, ZSTANDARD_LONG, PASS_THROUGH_LONG, LZ4_LONG FROM MyTable LIMIT 1000";
ArrayList<Serializable[]> expected = new ArrayList<>();

for(GenericRow row: rows) {
expected.add(new Serializable[]{
String.valueOf(row.getValue(SNAPPY_STRING)), String.valueOf(row.getValue(ZSTANDARD_STRING)), String.valueOf(row.getValue(PASS_THROUGH_STRING)),
(Integer) row.getValue(SNAPPY_INTEGER), (Integer) row.getValue(ZSTANDARD_INTEGER), (Integer) row.getValue(PASS_THROUGH_INTEGER),
(Long) row.getValue(SNAPPY_LONG), (Long)row.getValue(ZSTANDARD_LONG), (Long) row.getValue(PASS_THROUGH_LONG),
String.valueOf(row.getValue(SNAPPY_STRING)), String.valueOf(row.getValue(ZSTANDARD_STRING)), String.valueOf(row.getValue(PASS_THROUGH_STRING)), String.valueOf(row.getValue(LZ4_STRING)),
(Integer) row.getValue(SNAPPY_INTEGER), (Integer) row.getValue(ZSTANDARD_INTEGER), (Integer) row.getValue(PASS_THROUGH_INTEGER), (Integer) row.getValue(LZ4_INTEGER),
(Long) row.getValue(SNAPPY_LONG), (Long)row.getValue(ZSTANDARD_LONG), (Long) row.getValue(PASS_THROUGH_LONG), (Long) row.getValue(LZ4_LONG)
});
}
testSelectQueryHelper(query, expected.size(), expected);
Expand All @@ -267,6 +288,27 @@ public void testZstandardIntegerFilterQueriesWithCompressionCodec()
testSelectQueryHelper(query, expected.size(), expected);
}

/**
* Tests for filter over integer values LZ4 compression codec queries.
*/
@Test
public void testLZ4IntegerFilterQueriesWithCompressionCodec()
throws Exception {

String query =
"SELECT LZ4_INTEGER FROM MyTable "
+ "WHERE LZ4_INTEGER > 1000 LIMIT 1000";
ArrayList<Serializable[]> expected = new ArrayList<>();

for(GenericRow row: rows) {
int value = (Integer) row.getValue(LZ4_INTEGER);
if(value > 1000) {
expected.add(new Serializable[]{value});
}
}
testSelectQueryHelper(query, expected.size(), expected);
}

/**
* Tests for filter over integer values compression codec queries.
*/
Expand Down Expand Up @@ -328,6 +370,25 @@ public void testZstandardStringFilterQueriesWithCompressionCodec()
testSelectQueryHelper(query, expected.size(), expected);
}

/**
* Tests for filter over string values LZ4 compression codec queries.
*/
@Test
public void testLZ4StringFilterQueriesWithCompressionCodec()
throws Exception {
String query =
"SELECT LZ4_STRING FROM MyTable WHERE LZ4_STRING = 'hello_world_123' LIMIT 1000";
ArrayList<Serializable[]> expected = new ArrayList<>();

for(GenericRow row: rows) {
String value = String.valueOf(row.getValue(LZ4_STRING));
if(value.equals("hello_world_123")) {
expected.add(new Serializable[]{value});
}
}
testSelectQueryHelper(query, expected.size(), expected);
}

/**
* Tests for filter over string values snappy compression codec queries.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import net.jpountz.lz4.LZ4Factory;
import org.apache.commons.lang3.RandomUtils;
import org.apache.pinot.segment.local.io.compression.SnappyCompressor;
import org.apache.pinot.segment.local.io.compression.SnappyDecompressor;
import org.apache.pinot.segment.local.io.compression.ZstandardCompressor;
import org.apache.pinot.segment.local.io.compression.ZstandardDecompressor;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand All @@ -41,14 +38,15 @@
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.xerial.snappy.Snappy;


@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(1)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
@State(Scope.Benchmark)
// Test to get memory statistics for snappy and zstandard integer compression techniques
// Test to get memory statistics for snappy, zstandard and lz4 integer compression techniques
public class BenchmarkNoDictionaryIntegerCompression {

@Param({"500000", "1000000", "2000000", "3000000", "4000000", "5000000"})
Expand All @@ -58,16 +56,18 @@ public class BenchmarkNoDictionaryIntegerCompression {
public static class BenchmarkNoDictionaryIntegerCompressionState {

private static ByteBuffer _uncompressedInt;
private static ByteBuffer _snappyIntegerIntegerInput;
private static ByteBuffer _snappyCompressedIntegerInput;
private static ByteBuffer _zstandardCompressedIntegerInput;
private static ByteBuffer _snappyCompressedIntegerOutput;
private static ByteBuffer _zstdCompressedIntegerOutput;
private static ByteBuffer _snappyIntegerDecompressed;
private static ByteBuffer _zstdIntegerDecompressed;
private static SnappyCompressor snappyCompressor;
private static SnappyDecompressor snappyDecompressor;
private static ZstandardCompressor zstandardCompressor;
private static ZstandardDecompressor zstandardDecompressor;

private static ByteBuffer _lz4CompressedIntegerOutput;
private static ByteBuffer _lz4CompressedIntegerInput;
private static ByteBuffer _lz4IntegerDecompressed;

private static LZ4Factory factory;

@Setup(Level.Invocation)
public void setUp()
Expand All @@ -77,10 +77,13 @@ public void setUp()
generateRandomIntegerBuffer();
allocateBufferMemory();

snappyCompressor.compress(_uncompressedInt,_snappyIntegerIntegerInput);
Snappy.compress(_uncompressedInt, _snappyCompressedIntegerInput);
Zstd.compress(_zstandardCompressedIntegerInput, _uncompressedInt);
// ZSTD compressor with change the position of _uncompressedInt, a flip() operation over input to reset position for lz4 is required
_uncompressedInt.flip();
factory.fastCompressor().compress(_uncompressedInt, _lz4CompressedIntegerInput);

_zstdIntegerDecompressed.flip();_zstandardCompressedIntegerInput.flip();_uncompressedInt.flip();_snappyIntegerDecompressed.flip();
_zstdIntegerDecompressed.rewind();_zstandardCompressedIntegerInput.flip();_uncompressedInt.flip();_snappyIntegerDecompressed.rewind();_lz4CompressedIntegerInput.flip();
}

private void generateRandomIntegerBuffer() {
Expand All @@ -90,26 +93,24 @@ private void generateRandomIntegerBuffer() {
_uncompressedInt.putInt(RandomUtils.nextInt());
}
_uncompressedInt.flip();

_snappyCompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
_zstdCompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
}

private void initializeCompressors() {
//Initialize compressors and decompressors for snappy
snappyCompressor = new SnappyCompressor();
snappyDecompressor = new SnappyDecompressor();

//Initialize compressors and decompressors for zstandard
zstandardCompressor = new ZstandardCompressor();
zstandardDecompressor = new ZstandardDecompressor();
//Initialize compressors and decompressors for lz4
factory = LZ4Factory.fastestInstance();
}

private void allocateBufferMemory() {
_snappyIntegerDecompressed = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
_zstdIntegerDecompressed = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
_snappyIntegerIntegerInput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
_snappyCompressedIntegerInput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
_zstandardCompressedIntegerInput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
_lz4IntegerDecompressed = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
_lz4CompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
_lz4CompressedIntegerInput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
_lz4CompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
_snappyCompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
_zstdCompressedIntegerOutput = ByteBuffer.allocateDirect(_uncompressedInt.capacity()*2);
}

@TearDown(Level.Invocation)
Expand All @@ -119,9 +120,12 @@ public void tearDown()
_snappyIntegerDecompressed.clear();
_zstdCompressedIntegerOutput.clear();
_zstdIntegerDecompressed.clear();
_lz4CompressedIntegerOutput.clear();
_lz4IntegerDecompressed.clear();

_uncompressedInt.rewind();
_zstandardCompressedIntegerInput.rewind();
_lz4CompressedIntegerInput.rewind();
}
}

Expand All @@ -130,7 +134,7 @@ public void tearDown()
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkSnappyIntegerCompression(BenchmarkNoDictionaryIntegerCompressionState state)
throws IOException {
int size = state.snappyCompressor.compress(state._uncompressedInt, state._snappyCompressedIntegerOutput);
int size = Snappy.compress(state._uncompressedInt, state._snappyCompressedIntegerOutput);
return size;
}

Expand All @@ -139,7 +143,7 @@ public int benchmarkSnappyIntegerCompression(BenchmarkNoDictionaryIntegerCompres
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkSnappyIntegerDecompression(BenchmarkNoDictionaryIntegerCompressionState state)
throws IOException {
int size = state.snappyDecompressor.decompress(state._snappyIntegerIntegerInput, state._snappyIntegerDecompressed);
int size = Snappy.uncompress(state._snappyCompressedIntegerInput, state._snappyIntegerDecompressed);
return size;
}

Expand All @@ -148,7 +152,7 @@ public int benchmarkSnappyIntegerDecompression(BenchmarkNoDictionaryIntegerCompr
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkZstandardIntegerCompression(BenchmarkNoDictionaryIntegerCompressionState state)
throws IOException {
int size = state.zstandardCompressor.compress(state._zstdCompressedIntegerOutput, state._uncompressedInt);
int size = Zstd.compress(state._zstdCompressedIntegerOutput, state._uncompressedInt);
return size;
}

Expand All @@ -157,10 +161,46 @@ public int benchmarkZstandardIntegerCompression(BenchmarkNoDictionaryIntegerComp
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkZstandardIntegerDecompression(BenchmarkNoDictionaryIntegerCompressionState state)
throws IOException {
int size = state.zstandardDecompressor.decompress(state._zstdIntegerDecompressed, state._zstandardCompressedIntegerInput);
int size = Zstd.decompress(state._zstdIntegerDecompressed, state._zstandardCompressedIntegerInput);
return size;
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkLZ4IntegerCompression(BenchmarkNoDictionaryIntegerCompressionState state)
throws IOException {
state.factory.fastCompressor().compress(state._uncompressedInt, state._lz4CompressedIntegerOutput);
return state._lz4CompressedIntegerOutput.position();
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkLZ4IntegerDecompression(BenchmarkNoDictionaryIntegerCompressionState state)
throws IOException {
state.factory.safeDecompressor().decompress(state._lz4CompressedIntegerInput, state._lz4IntegerDecompressed);
return state._lz4IntegerDecompressed.position();
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkLZ4HCIntegerCompression(BenchmarkNoDictionaryIntegerCompressionState state)
throws IOException {
state.factory.highCompressor().compress(state._uncompressedInt, state._lz4CompressedIntegerOutput);
return state._lz4CompressedIntegerOutput.position();
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public int benchmarkLZ4HCIntegerDecompression(BenchmarkNoDictionaryIntegerCompressionState state)
throws IOException {
state.factory.safeDecompressor().decompress(state._lz4CompressedIntegerInput, state._lz4IntegerDecompressed);
return state._lz4IntegerDecompressed.position();
}

public static void main(String[] args)
throws Exception {
new Runner(new OptionsBuilder().include(BenchmarkNoDictionaryIntegerCompression.class.getSimpleName()).build()).run();
Expand Down

0 comments on commit f50a817

Please sign in to comment.