Skip to content

Commit

Permalink
Do not allow rle value to be a dictionary or rle block
Browse files Browse the repository at this point in the history
Bypass rle when 0 or 1 positions are used.
  • Loading branch information
dain committed Sep 18, 2022
1 parent aa225ae commit 52eb874
Show file tree
Hide file tree
Showing 79 changed files with 401 additions and 351 deletions.
Expand Up @@ -656,7 +656,7 @@ public GroupByIdBlock getResult()

return new GroupByIdBlock(
nextGroupId,
new RunLengthEncodedBlock(
RunLengthEncodedBlock.create(
BIGINT.createFixedSizeBlockBuilder(1).writeLong(groupId).build(),
block.getPositionCount()));
}
Expand Down
Expand Up @@ -79,7 +79,7 @@ public Page transformPage(Page inputPage)
Block operationChannelBlock = mergeRow.getField(mergeRow.getFieldCount() - 2);
builder.add(operationChannelBlock);
builder.add(inputPage.getBlock(rowIdChannel));
builder.add(new RunLengthEncodedBlock(INSERT_FROM_UPDATE_BLOCK, positionCount));
builder.add(RunLengthEncodedBlock.create(INSERT_FROM_UPDATE_BLOCK, positionCount));

Page result = new Page(builder.toArray(Block[]::new));

Expand Down
Expand Up @@ -178,14 +178,14 @@ private Page generateNextPage()

for (int i = 0; i < groupingSetInputs[currentGroupingSet].length; i++) {
if (groupingSetInputs[currentGroupingSet][i] == -1) {
outputBlocks[i] = new RunLengthEncodedBlock(nullBlocks[i], currentPage.getPositionCount());
outputBlocks[i] = RunLengthEncodedBlock.create(nullBlocks[i], currentPage.getPositionCount());
}
else {
outputBlocks[i] = currentPage.getBlock(groupingSetInputs[currentGroupingSet][i]);
}
}

outputBlocks[outputBlocks.length - 1] = new RunLengthEncodedBlock(groupIdBlocks[currentGroupingSet], currentPage.getPositionCount());
outputBlocks[outputBlocks.length - 1] = RunLengthEncodedBlock.create(groupIdBlocks[currentGroupingSet], currentPage.getPositionCount());
currentGroupingSet = (currentGroupingSet + 1) % groupingSetInputs.length;
Page outputPage = new Page(currentPage.getPositionCount(), outputBlocks);

Expand Down
Expand Up @@ -67,12 +67,12 @@ private Block processNextGroupIds(GroupByIdBlock ids)
// must have > 1 positions to benefit from using a RunLengthEncoded block
if (nextDistinctId == ids.getGroupCount()) {
// no new distinct positions
return new RunLengthEncodedBlock(BooleanType.createBlockForSingleNonNullValue(false), positions);
return RunLengthEncodedBlock.create(BooleanType.createBlockForSingleNonNullValue(false), positions);
}
if (nextDistinctId + positions == ids.getGroupCount()) {
// all positions are distinct
nextDistinctId = ids.getGroupCount();
return new RunLengthEncodedBlock(BooleanType.createBlockForSingleNonNullValue(true), positions);
return RunLengthEncodedBlock.create(BooleanType.createBlockForSingleNonNullValue(true), positions);
}
}
byte[] distinctMask = new byte[positions];
Expand Down
Expand Up @@ -972,7 +972,7 @@ public GroupByIdBlock getResult()

return new GroupByIdBlock(
nextGroupId,
new RunLengthEncodedBlock(
RunLengthEncodedBlock.create(
BIGINT.createFixedSizeBlockBuilder(1).writeLong(groupId).build(),
page.getPositionCount()));
}
Expand Down
Expand Up @@ -392,7 +392,7 @@ public Page next()
// For the page with less rows, create RLE blocks and add them to the blocks array
for (int i = 0; i < smallPageOutputBlocks.length; i++) {
Block block = smallPageOutputBlocks[i].getSingleValueBlock(rowIndex);
resultBlockBuffer[indexForRleBlocks + i] = new RunLengthEncodedBlock(block, largePagePositionCount);
resultBlockBuffer[indexForRleBlocks + i] = RunLengthEncodedBlock.create(block, largePagePositionCount);
}
// Page constructor will create a copy of the block buffer (and must for correctness)
return new Page(largePagePositionCount, resultBlockBuffer);
Expand Down
Expand Up @@ -123,7 +123,7 @@ public Block build()
result = new ByteArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values);
}
else {
result = new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount);
result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount);
}
reset();
return result;
Expand Down
Expand Up @@ -136,7 +136,7 @@ public Block build()
result = new Int128ArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values);
}
else {
result = new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount);
result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount);
}
reset();
return result;
Expand Down
Expand Up @@ -131,7 +131,7 @@ public Block build()
result = new Int96ArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), high, low);
}
else {
result = new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount);
result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount);
}
reset();
return result;
Expand Down
Expand Up @@ -123,7 +123,7 @@ public Block build()
result = new IntArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values);
}
else {
result = new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount);
result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount);
}
reset();
return result;
Expand Down
Expand Up @@ -123,7 +123,7 @@ public Block build()
result = new LongArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values);
}
else {
result = new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount);
result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount);
}
reset();
return result;
Expand Down
Expand Up @@ -459,7 +459,7 @@ private Page getPartitionFunctionArguments(Page page)
for (int i = 0; i < blocks.length; i++) {
int channel = partitionChannels[i];
if (channel < 0) {
blocks[i] = new RunLengthEncodedBlock(partitionConstantBlocks[i], page.getPositionCount());
blocks[i] = RunLengthEncodedBlock.create(partitionConstantBlocks[i], page.getPositionCount());
}
else {
blocks[i] = page.getBlock(channel);
Expand Down
Expand Up @@ -94,7 +94,7 @@ public Block build()
{
Block result;
if (rleValue != null) {
result = new RunLengthEncodedBlock(rleValue, rlePositionCount);
result = RunLengthEncodedBlock.create(rleValue, rlePositionCount);
}
else {
result = delegate.build();
Expand Down
Expand Up @@ -131,7 +131,7 @@ public Block build()
}
else {
Block nullRowBlock = fromFieldBlocks(1, Optional.of(new boolean[] {true}), fieldBlocks);
result = new RunLengthEncodedBlock(nullRowBlock, positionCount);
result = RunLengthEncodedBlock.create(nullRowBlock, positionCount);
}

reset();
Expand Down
Expand Up @@ -123,7 +123,7 @@ public Block build()
result = new ShortArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values);
}
else {
result = new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount);
result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount);
}
reset();
return result;
Expand Down
Expand Up @@ -153,7 +153,7 @@ public Block build()
hasNullValue ? Optional.of(valueIsNull) : Optional.empty());
}
else {
result = new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount);
result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount);
}
reset();
return result;
Expand Down
Expand Up @@ -63,6 +63,6 @@ public InputChannels getInputChannels()
@Override
public Work<Block> project(ConnectorSession session, DriverYieldSignal yieldSignal, Page page, SelectedPositions selectedPositions)
{
return new CompletedWork<>(new RunLengthEncodedBlock(value, selectedPositions.size()));
return new CompletedWork<>(RunLengthEncodedBlock.create(value, selectedPositions.size()));
}
}
Expand Up @@ -162,7 +162,7 @@ private boolean processInternal()
if (block instanceof RunLengthEncodedBlock) {
// single value block is always considered effective, but the processing could have thrown
// in that case we fallback and process again so the correct error message sent
result = new RunLengthEncodedBlock(dictionaryOutput.get(), selectedPositions.size());
result = RunLengthEncodedBlock.create(dictionaryOutput.get(), selectedPositions.size());
return true;
}

Expand Down
Expand Up @@ -244,7 +244,7 @@ public Block getRawBlock(int channel, int position)
// projection always creates a single row block, and will not align with the blocks from the pages index,
// so we use an RLE block of the same length as the raw block
int rawBlockPositionCount = pagesIndex.getRawBlock(0, position(position)).getPositionCount();
return new RunLengthEncodedBlock(compute, rawBlockPositionCount);
return RunLengthEncodedBlock.create(compute, rawBlockPositionCount);
}

@Override
Expand Down
Expand Up @@ -134,8 +134,8 @@ public static Block createRandomDictionaryBlock(Block dictionary, int positionCo

public static RunLengthEncodedBlock createRandomRleBlock(Block block, int positionCount)
{
checkArgument(block.getPositionCount() > 0, format("block positions %d is less than or equal to 0", block.getPositionCount()));
return new RunLengthEncodedBlock(block.getSingleValueBlock(random().nextInt(block.getPositionCount())), positionCount);
checkArgument(block.getPositionCount() >= 2, format("block positions %d is less 2", block.getPositionCount()));
return (RunLengthEncodedBlock) RunLengthEncodedBlock.create(block.getSingleValueBlock(random().nextInt(block.getPositionCount())), positionCount);
}

public static Block createRandomBlockForType(Type type, int positionCount, float nullRate)
Expand Down Expand Up @@ -893,18 +893,18 @@ public static Block createColorSequenceBlock(int start, int end)
return builder.build();
}

public static RunLengthEncodedBlock createRLEBlock(double value, int positionCount)
public static Block createRepeatedValuesBlock(double value, int positionCount)
{
BlockBuilder blockBuilder = DOUBLE.createBlockBuilder(null, 1);
DOUBLE.writeDouble(blockBuilder, value);
return new RunLengthEncodedBlock(blockBuilder.build(), positionCount);
return RunLengthEncodedBlock.create(blockBuilder.build(), positionCount);
}

public static RunLengthEncodedBlock createRLEBlock(long value, int positionCount)
public static Block createRepeatedValuesBlock(long value, int positionCount)
{
BlockBuilder blockBuilder = BIGINT.createBlockBuilder(null, 1);
BIGINT.writeLong(blockBuilder, value);
return new RunLengthEncodedBlock(blockBuilder.build(), positionCount);
return RunLengthEncodedBlock.create(blockBuilder.build(), positionCount);
}

private static <T> Block createBlock(Type type, ValueWriter<T> valueWriter, Iterable<T> values)
Expand Down
Expand Up @@ -157,6 +157,6 @@ public static <T> T[] createTestRleExpectedValues(T[] expectedValues, int positi

public static RunLengthEncodedBlock createTestRleBlock(Block block, int position)
{
return new RunLengthEncodedBlock(block.getRegion(position, 1), 10);
return (RunLengthEncodedBlock) RunLengthEncodedBlock.create(block.getRegion(position, 1), 10);
}
}
Expand Up @@ -42,7 +42,7 @@ public void test()
private void assertRleBlock(int positionCount)
{
Slice expectedValue = createExpectedValue(0);
Block block = new RunLengthEncodedBlock(createSingleValueBlock(expectedValue), positionCount);
Block block = RunLengthEncodedBlock.create(createSingleValueBlock(expectedValue), positionCount);
Slice[] expectedValues = new Slice[positionCount];
for (int position = 0; position < positionCount; position++) {
expectedValues[position] = expectedValue;
Expand All @@ -66,7 +66,7 @@ private static BlockBuilder createBlockBuilder()
public void testPositionsSizeInBytes()
{
Block valueBlock = createSingleValueBlock(createExpectedValue(10));
Block rleBlock = new RunLengthEncodedBlock(valueBlock, 10);
Block rleBlock = RunLengthEncodedBlock.create(valueBlock, 10);
// Size in bytes is not fixed per position
assertTrue(rleBlock.fixedSizeInBytesPerPosition().isEmpty());
// Accepts specific position selection
Expand Down Expand Up @@ -119,7 +119,7 @@ public void testEstimatedDataSizeForStats()
{
int positionCount = 10;
Slice expectedValue = createExpectedValue(5);
Block block = new RunLengthEncodedBlock(createSingleValueBlock(expectedValue), positionCount);
Block block = RunLengthEncodedBlock.create(createSingleValueBlock(expectedValue), positionCount);
for (int postition = 0; postition < positionCount; postition++) {
assertEquals(block.getEstimatedDataSizeForStats(postition), expectedValue.length());
}
Expand Down
Expand Up @@ -83,7 +83,7 @@ public void testSplitPageNonDecreasingPageSize()
Slice expectedValue = wrappedBuffer("test".getBytes(UTF_8));
BlockBuilder blockBuilder = VARCHAR.createBlockBuilder(null, 1, expectedValue.length());
blockBuilder.writeBytes(expectedValue, 0, expectedValue.length()).closeEntry();
Block rleBlock = new RunLengthEncodedBlock(blockBuilder.build(), positionCount);
Block rleBlock = RunLengthEncodedBlock.create(blockBuilder.build(), positionCount);
Page initialPage = new Page(rleBlock);
List<Page> pages = splitPage(initialPage, maxPageSizeInBytes);

Expand Down
Expand Up @@ -68,7 +68,7 @@ public void testBigintSerializedSize()
// empty page
Page page = new Page(builder.build());
int pageSize = serializedSize(ImmutableList.of(BIGINT), page);
assertEquals(pageSize, 52); // page overhead ideally 35 but since a 0 sized block will be a RLEBlock we have an overhead of 17
assertEquals(pageSize, 40);

// page with one value
BIGINT.writeLong(builder, 123);
Expand All @@ -92,7 +92,7 @@ public void testVarcharSerializedSize()
// empty page
Page page = new Page(builder.build());
int pageSize = serializedSize(ImmutableList.of(VARCHAR), page);
assertEquals(pageSize, 60); // page overhead ideally 44 but since a 0 sized block will be a RLEBlock we have an overhead of 16
assertEquals(pageSize, 44);

// page with one value
VARCHAR.writeString(builder, "alice");
Expand Down
Expand Up @@ -255,7 +255,7 @@ else if (pageCount % 3 == 1) {
// rle page
Block[] blocks = new Block[page.getChannelCount()];
for (int channel = 0; channel < blocks.length; ++channel) {
blocks[channel] = new RunLengthEncodedBlock(page.getBlock(channel).getSingleValueBlock(0), page.getPositionCount());
blocks[channel] = RunLengthEncodedBlock.create(page.getBlock(channel).getSingleValueBlock(0), page.getPositionCount());
}
pages.add(new Page(blocks));
}
Expand Down
Expand Up @@ -132,7 +132,7 @@ public void testDistinctMaskWithNulls()
Optional.of(new boolean[] {true, true, true, true}), /* all positions are null */
new byte[] {1, 1, 1, 1}); /* non-zero value is true, all masks are true */

Block trueNullRleMask = new RunLengthEncodedBlock(trueMaskAllNull.getSingleValueBlock(0), 4);
Block trueNullRleMask = RunLengthEncodedBlock.create(trueMaskAllNull.getSingleValueBlock(0), 4);

List<Page> nullTrueMaskInput = ImmutableList.of(
new Page(4, createLongsBlock(1, 2, 3, 4), trueMaskAllNull),
Expand Down
Expand Up @@ -144,8 +144,8 @@ public void testRunLengthEncodedInputPage(GroupByHashType groupByHashType)
Block block = BlockAssertions.createLongsBlock(0L);
Block hashBlock = TypeTestUtils.getHashBlock(ImmutableList.of(BIGINT), block);
Page page = new Page(
new RunLengthEncodedBlock(block, 2),
new RunLengthEncodedBlock(hashBlock, 2));
RunLengthEncodedBlock.create(block, 2),
RunLengthEncodedBlock.create(hashBlock, 2));

groupByHash.addPage(page).process();

Expand Down Expand Up @@ -658,10 +658,10 @@ public void testProperWorkTypesSelected()
{
Block bigintBlock = BlockAssertions.createLongsBlock(1, 2, 3, 4, 5, 6, 7, 8);
Block bigintDictionaryBlock = BlockAssertions.createLongDictionaryBlock(0, 8);
Block bigintRleBlock = BlockAssertions.createRLEBlock(42, 8);
Block bigintRleBlock = BlockAssertions.createRepeatedValuesBlock(42, 8);
Block varcharBlock = BlockAssertions.createStringsBlock("1", "2", "3", "4", "5", "6", "7", "8");
Block varcharDictionaryBlock = BlockAssertions.createStringDictionaryBlock(1, 8);
Block varcharRleBlock = new RunLengthEncodedBlock(new VariableWidthBlock(1, Slices.EMPTY_SLICE, new int[] {0, 1}, Optional.empty()), 8);
Block varcharRleBlock = RunLengthEncodedBlock.create(new VariableWidthBlock(1, Slices.EMPTY_SLICE, new int[] {0, 1}, Optional.empty()), 8);
Block bigintBigDictionaryBlock = BlockAssertions.createLongDictionaryBlock(1, 8, 1000);
Block bigintSingletonDictionaryBlock = BlockAssertions.createLongDictionaryBlock(1, 500000, 1);
Block bigintHugeDictionaryBlock = BlockAssertions.createLongDictionaryBlock(1, 500000, 66000); // Above Short.MAX_VALUE
Expand Down

0 comments on commit 52eb874

Please sign in to comment.