Skip to content

Commit

Permalink
Replace direct usage of RleBlocks in PageAppenders
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Sep 18, 2022
1 parent 0a0e628 commit aa225ae
Show file tree
Hide file tree
Showing 14 changed files with 34 additions and 56 deletions.
Expand Up @@ -94,9 +94,8 @@ public void append(IntArrayList positions, Block block)
}

@Override
public void appendRle(RunLengthEncodedBlock block)
public void appendRle(Block block, int rlePositionCount)
{
int rlePositionCount = block.getPositionCount();
if (rlePositionCount == 0) {
return;
}
Expand Down
Expand Up @@ -101,9 +101,8 @@ public void append(IntArrayList positions, Block block)
}

@Override
public void appendRle(RunLengthEncodedBlock block)
public void appendRle(Block block, int rlePositionCount)
{
int rlePositionCount = block.getPositionCount();
if (rlePositionCount == 0) {
return;
}
Expand Down
Expand Up @@ -98,9 +98,8 @@ public void append(IntArrayList positions, Block block)
}

@Override
public void appendRle(RunLengthEncodedBlock block)
public void appendRle(Block block, int rlePositionCount)
{
int rlePositionCount = block.getPositionCount();
if (rlePositionCount == 0) {
return;
}
Expand Down
Expand Up @@ -94,9 +94,8 @@ public void append(IntArrayList positions, Block block)
}

@Override
public void appendRle(RunLengthEncodedBlock block)
public void appendRle(Block block, int rlePositionCount)
{
int rlePositionCount = block.getPositionCount();
if (rlePositionCount == 0) {
return;
}
Expand Down
Expand Up @@ -94,9 +94,8 @@ public void append(IntArrayList positions, Block block)
}

@Override
public void appendRle(RunLengthEncodedBlock block)
public void appendRle(Block block, int rlePositionCount)
{
int rlePositionCount = block.getPositionCount();
if (rlePositionCount == 0) {
return;
}
Expand Down
Expand Up @@ -14,20 +14,18 @@
package io.trino.operator.output;

import io.trino.spi.block.Block;
import io.trino.spi.block.RunLengthEncodedBlock;
import it.unimi.dsi.fastutil.ints.IntArrayList;

public interface PositionsAppender
{
void append(IntArrayList positions, Block source);

/**
* Appends value from the {@code rleBlock} to this appender {@link RunLengthEncodedBlock#getPositionCount()} times.
* Appends the specified value positionCount times.
* The result is the same as with using {@link PositionsAppender#append(IntArrayList, Block)} with
* positions list [0...{@link RunLengthEncodedBlock#getPositionCount()} -1]
* but with possible performance optimizations for {@link RunLengthEncodedBlock}.
* positions list [0...positionCount -1] but with possible performance optimizations.
*/
void appendRle(RunLengthEncodedBlock rleBlock);
void appendRle(Block value, int rlePositionCount);

/**
* Creates the block from the appender data.
Expand Down
Expand Up @@ -60,31 +60,32 @@ public void append(IntArrayList positions, Block source)
}

@Override
public void appendRle(RunLengthEncodedBlock source)
public void appendRle(Block value, int positionCount)
{
if (source.getPositionCount() == 0) {
if (positionCount == 0) {
return;
}
checkArgument(value.getPositionCount() == 1, "Expected value to contain a single position but has %d positions".formatted(value.getPositionCount()));

if (rlePositionCount == 0) {
// initial empty state, switch to RLE state
rleValue = source.getValue();
rlePositionCount = source.getPositionCount();
rleValue = value;
rlePositionCount = positionCount;
}
else if (rleValue != null) {
// we are in the RLE state
if (equalOperator.equalNullSafe(rleValue, 0, source.getValue(), 0)) {
if (equalOperator.equalNullSafe(rleValue, 0, value, 0)) {
// the values match. we can just add positions.
this.rlePositionCount += source.getPositionCount();
this.rlePositionCount += positionCount;
return;
}
// RLE values do not match. switch to flat state
switchToFlat();
delegate.appendRle(source);
delegate.appendRle(value, positionCount);
}
else {
// flat state
delegate.appendRle(source);
delegate.appendRle(value, positionCount);
}
}

Expand Down Expand Up @@ -127,7 +128,7 @@ private void switchToFlat()
{
if (rleValue != null) {
// we are in the RLE state, flatten all RLE blocks
delegate.appendRle(new RunLengthEncodedBlock(rleValue, rlePositionCount));
delegate.appendRle(rleValue, rlePositionCount);
rleValue = null;
}
rlePositionCount = NO_RLE;
Expand Down
Expand Up @@ -96,11 +96,10 @@ public void append(IntArrayList positions, Block block)
}

@Override
public void appendRle(RunLengthEncodedBlock rleBlock)
public void appendRle(Block value, int rlePositionCount)
{
int rlePositionCount = rleBlock.getPositionCount();
ensureCapacity(rlePositionCount);
AbstractRowBlock sourceRowBlock = (AbstractRowBlock) rleBlock.getValue();
AbstractRowBlock sourceRowBlock = (AbstractRowBlock) value;
if (sourceRowBlock.isNull(0)) {
// append rlePositionCount nulls
Arrays.fill(rowIsNull, positionCount, positionCount + rlePositionCount, true);
Expand All @@ -111,7 +110,7 @@ public void appendRle(RunLengthEncodedBlock rleBlock)
List<Block> fieldBlocks = sourceRowBlock.getChildren();
int fieldPosition = sourceRowBlock.getFieldBlockOffset(0);
for (int i = 0; i < fieldAppenders.length; i++) {
fieldAppenders[i].appendRle(new RunLengthEncodedBlock(fieldBlocks.get(i).getSingleValueBlock(fieldPosition), rlePositionCount));
fieldAppenders[i].appendRle(fieldBlocks.get(i).getSingleValueBlock(fieldPosition), rlePositionCount);
}
hasNonNullRow = true;
}
Expand Down
Expand Up @@ -94,9 +94,8 @@ public void append(IntArrayList positions, Block block)
}

@Override
public void appendRle(RunLengthEncodedBlock block)
public void appendRle(Block block, int rlePositionCount)
{
int rlePositionCount = block.getPositionCount();
if (rlePositionCount == 0) {
return;
}
Expand Down
Expand Up @@ -121,9 +121,8 @@ public void append(IntArrayList positions, Block block)
}

@Override
public void appendRle(RunLengthEncodedBlock block)
public void appendRle(Block block, int rlePositionCount)
{
int rlePositionCount = block.getPositionCount();
if (rlePositionCount == 0) {
return;
}
Expand Down
Expand Up @@ -15,7 +15,6 @@

import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.type.Type;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.openjdk.jol.info.ClassLayout;
Expand Down Expand Up @@ -53,9 +52,9 @@ public void append(IntArrayList positions, Block source)
}

@Override
public void appendRle(RunLengthEncodedBlock block)
public void appendRle(Block block, int rlePositionCount)
{
for (int i = 0; i < block.getPositionCount(); i++) {
for (int i = 0; i < rlePositionCount; i++) {
type.appendTo(block, 0, blockBuilder);
}
}
Expand Down
Expand Up @@ -19,14 +19,10 @@
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.openjdk.jol.info.ClassLayout;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

/**
* Dispatches the {@link #append} and {@link #appendRle} methods to the {@link #delegate} depending on the input {@link Block} class.
* The {@link Block} is flattened if necessary so that the {@link #delegate} {@link PositionsAppender#append(IntArrayList, Block)}
* always gets flat {@link Block} and {@link PositionsAppender#appendRle(RunLengthEncodedBlock)} always gets {@link RunLengthEncodedBlock}
* with {@link RunLengthEncodedBlock#getValue()} being flat {@link Block}.
*/
public class UnnestingPositionsAppender
implements PositionsAppender
Expand All @@ -47,7 +43,7 @@ public void append(IntArrayList positions, Block source)
return;
}
if (source instanceof RunLengthEncodedBlock) {
delegate.appendRle(flatten((RunLengthEncodedBlock) source, positions.size()));
delegate.appendRle(((RunLengthEncodedBlock) source).getValue(), positions.size());
}
else if (source instanceof DictionaryBlock) {
appendDictionary(positions, (DictionaryBlock) source);
Expand All @@ -58,12 +54,12 @@ else if (source instanceof DictionaryBlock) {
}

@Override
public void appendRle(RunLengthEncodedBlock source)
public void appendRle(Block block, int rlePositionCount)
{
if (source.getPositionCount() == 0) {
if (rlePositionCount == 0) {
return;
}
delegate.appendRle(flatten(source, source.getPositionCount()));
delegate.appendRle(block, rlePositionCount);
}

@Override
Expand All @@ -89,14 +85,6 @@ private void appendDictionary(IntArrayList positions, DictionaryBlock source)
delegate.append(mapPositions(positions, source), source.getDictionary());
}

private RunLengthEncodedBlock flatten(RunLengthEncodedBlock source, int positionCount)
{
checkArgument(positionCount > 0);
Block value = source.getValue().getSingleValueBlock(0);
checkArgument(!(value instanceof DictionaryBlock) && !(value instanceof RunLengthEncodedBlock), "value must be flat but got %s", value);
return new RunLengthEncodedBlock(value, positionCount);
}

private IntArrayList mapPositions(IntArrayList positions, DictionaryBlock block)
{
int[] positionArray = new int[positions.size()];
Expand Down
Expand Up @@ -244,11 +244,11 @@ public void testSliceRle()
PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(VARCHAR, 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES);

// first append some not empty value to avoid RleAwarePositionsAppender for the empty value
positionsAppender.appendRle(new RunLengthEncodedBlock(singleValueBlock("some value"), 1));
positionsAppender.appendRle(singleValueBlock("some value"), 1);
// append empty value multiple times to trigger jit compilation
Block emptyStringBlock = singleValueBlock("");
for (int i = 0; i < 1000; i++) {
positionsAppender.appendRle(new RunLengthEncodedBlock(emptyStringBlock, 2000));
positionsAppender.appendRle(emptyStringBlock, 2000);
}
}

Expand Down
Expand Up @@ -38,12 +38,12 @@ public void testAppendEmptySliceRle()
{
// test SlicePositionAppender.appendRle with empty value (Slice with length 0)
PositionsAppender positionsAppender = new SlicePositionsAppender(1, 100);
RunLengthEncodedBlock rleBlock = new RunLengthEncodedBlock(createStringsBlock(""), 10);
positionsAppender.appendRle(rleBlock);
Block value = createStringsBlock("");
positionsAppender.appendRle(value, 10);

Block actualBlock = positionsAppender.build();

assertBlockEquals(VARCHAR, actualBlock, rleBlock);
assertBlockEquals(VARCHAR, actualBlock, new RunLengthEncodedBlock(value, 10));
}

// test append with VariableWidthBlock using Slice not backed by byte array
Expand Down

0 comments on commit aa225ae

Please sign in to comment.