Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix partitioned output page flushing #19762

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,38 +13,59 @@
*/
package io.trino.operator.output;

import com.google.common.annotations.VisibleForTesting;
import io.trino.operator.project.PageProcessor;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.type.Type;
import it.unimi.dsi.fastutil.ints.IntArrayList;

import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

public class PositionsAppenderPageBuilder
{
private static final int DEFAULT_INITIAL_EXPECTED_ENTRIES = 8;
@VisibleForTesting
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used in testing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests do assert on this value, since the test logic requires knowing what the value is in order to trigger the flushing based on position count without reaching the size limit.

static final int MAX_POSITION_COUNT = PageProcessor.MAX_BATCH_SIZE * 4;
// Maximum page size before being considered full based on current direct appender size and if RLE channels were converted to direct. Currently,
// dictionary mode appenders still under-report because computing their equivalent size if converted to direct is prohibitively expensive.
private static final int MAXIMUM_DIRECT_SIZE_MULTIPLIER = 8;

private final UnnestingPositionsAppender[] channelAppenders;
private final int maxPageSizeInBytes;
private final int maxDirectPageSizeInBytes;
private int declaredPositions;

public static PositionsAppenderPageBuilder withMaxPageSize(int maxPageBytes, List<Type> sourceTypes, PositionsAppenderFactory positionsAppenderFactory)
{
return new PositionsAppenderPageBuilder(DEFAULT_INITIAL_EXPECTED_ENTRIES, maxPageBytes, sourceTypes, positionsAppenderFactory);
return withMaxPageSize(maxPageBytes, maxPageBytes * MAXIMUM_DIRECT_SIZE_MULTIPLIER, sourceTypes, positionsAppenderFactory);
}

@VisibleForTesting
static PositionsAppenderPageBuilder withMaxPageSize(int maxPageBytes, int maxDirectSizeInBytes, List<Type> sourceTypes, PositionsAppenderFactory positionsAppenderFactory)
{
return new PositionsAppenderPageBuilder(DEFAULT_INITIAL_EXPECTED_ENTRIES, maxPageBytes, maxDirectSizeInBytes, sourceTypes, positionsAppenderFactory);
}

private PositionsAppenderPageBuilder(
int initialExpectedEntries,
int maxPageSizeInBytes,
int maxDirectPageSizeInBytes,
List<? extends Type> types,
PositionsAppenderFactory positionsAppenderFactory)
{
requireNonNull(types, "types is null");
requireNonNull(positionsAppenderFactory, "positionsAppenderFactory is null");
checkArgument(maxPageSizeInBytes > 0, "maxPageSizeInBytes is negative: %s", maxPageSizeInBytes);
checkArgument(maxDirectPageSizeInBytes > 0, "maxDirectPageSizeInBytes is negative: %s", maxDirectPageSizeInBytes);
checkArgument(maxDirectPageSizeInBytes >= maxPageSizeInBytes, "maxDirectPageSizeInBytes (%s) must be >= maxPageSizeInBytes (%s)", maxDirectPageSizeInBytes, maxPageSizeInBytes);

this.maxPageSizeInBytes = maxPageSizeInBytes;
this.maxDirectPageSizeInBytes = maxDirectPageSizeInBytes;
channelAppenders = new UnnestingPositionsAppender[types.size()];
for (int i = 0; i < channelAppenders.length; i++) {
channelAppenders[i] = positionsAppenderFactory.create(types.get(i), initialExpectedEntries, maxPageSizeInBytes);
Expand Down Expand Up @@ -98,7 +119,24 @@ private void declarePositions(int positions)

public boolean isFull()
{
return declaredPositions == Integer.MAX_VALUE || getSizeInBytes() >= maxPageSizeInBytes;
if (declaredPositions == 0) {
return false;
}
if (declaredPositions >= MAX_POSITION_COUNT) {
return true;
}
PositionsAppenderSizeAccumulator accumulator = computeAppenderSizes();
return accumulator.getSizeInBytes() >= maxPageSizeInBytes || accumulator.getDirectSizeInBytes() >= maxDirectPageSizeInBytes;
}

@VisibleForTesting
PositionsAppenderSizeAccumulator computeAppenderSizes()
{
PositionsAppenderSizeAccumulator accumulator = new PositionsAppenderSizeAccumulator();
for (UnnestingPositionsAppender positionsAppender : channelAppenders) {
positionsAppender.addSizesToAccumulator(accumulator);
}
return accumulator;
}

public boolean isEmpty()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.operator.output;

final class PositionsAppenderSizeAccumulator
{
private long sizeInBytes;
private long directSizeInBytes;

public long getSizeInBytes()
{
return sizeInBytes;
}

public long getDirectSizeInBytes()
{
return directSizeInBytes;
}

public void accumulate(long sizeInBytes, long directSizeInBytes)
{
this.sizeInBytes += sizeInBytes;
this.directSizeInBytes += directSizeInBytes;
}

@Override
public String toString()
{
return "PositionsAppenderSizeAccumulator{sizeInBytes=" + sizeInBytes + " directSizeInBytes=" + directSizeInBytes + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,19 @@ public long getRetainedSizeInBytes()
public long getSizeInBytes()
{
return delegate.getSizeInBytes() +
// dictionary size is not included due to the expense of the calculation
// dictionary size is not included due to the expense of the calculation, but we can account for the ids size
(dictionaryIdsBuilder.size() * (long) Integer.BYTES) +
(rleValue != null ? rleValue.getSizeInBytes() : 0);
}

sopel39 marked this conversation as resolved.
Show resolved Hide resolved
void addSizesToAccumulator(PositionsAppenderSizeAccumulator accumulator)
Copy link
Member

@sopel39 sopel39 Nov 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could just return a record (and possibly OptionalLong for directSizeInBytes). IMO it would be cleaner and easier to understand than accumulator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think the extra allocation per appender per check is worthwhile for the small bit of extra clarity is worthwhile here. At that point, you can’t box the direct size into an optional because RLE’s have both a “size” and a separate “direct size” value that need to be summed independently from each other.

{
long sizeInBytes = getSizeInBytes();
// dictionary size is not included due to the expense of the calculation, so this will under-report for dictionaries
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dictionary size is not included due to the expense of the calculation
nit: actually there is io.trino.operator.output.UnnestingPositionsAppender#dictionary so it's easy to account for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s easy to count for the retained size, but not for which positions are actually in the output and the size of those positions without expensive book keeping.

long directSizeInBytes = (rleValue == null) ? sizeInBytes : (rleValue.getSizeInBytes() * rlePositionCount);
accumulator.accumulate(sizeInBytes, directSizeInBytes);
}

private static class DictionaryIdsBuilder
{
private static final int INSTANCE_SIZE = instanceSize(DictionaryIdsBuilder.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.operator.output;

import io.airlift.slice.Slices;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.type.BlockTypeOperators;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.junit.jupiter.api.Test;

import java.util.List;

import static io.trino.spi.type.VarcharType.VARCHAR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestPositionsAppenderPageBuilder
{
@Test
public void testFullOnPositionCountLimit()
pettyjamesm marked this conversation as resolved.
Show resolved Hide resolved
{
int maxPageBytes = 1024 * 1024;
int maxDirectSize = maxPageBytes * 10;
PositionsAppenderPageBuilder pageBuilder = PositionsAppenderPageBuilder.withMaxPageSize(
maxPageBytes,
maxDirectSize,
List.of(VARCHAR),
new PositionsAppenderFactory(new BlockTypeOperators()));

Block rleBlock = RunLengthEncodedBlock.create(VARCHAR, Slices.utf8Slice("test"), 10);
Page inputPage = new Page(rleBlock);

IntArrayList positions = IntArrayList.wrap(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
// Append 32760 positions, just less than MAX_POSITION_COUNT
assertEquals(32768, PositionsAppenderPageBuilder.MAX_POSITION_COUNT, "expected MAX_POSITION_COUNT to be 32768");
for (int i = 0; i < 3276; i++) {
pageBuilder.appendToOutputPartition(inputPage, positions);
}
assertFalse(pageBuilder.isFull(), "pageBuilder should still not be full");
// Append 10 more positions, crossing the threshold on position count
pageBuilder.appendToOutputPartition(inputPage, positions);
assertTrue(pageBuilder.isFull(), "pageBuilder should be full");
PositionsAppenderSizeAccumulator sizeAccumulator = pageBuilder.computeAppenderSizes();
assertEquals(rleBlock.getSizeInBytes(), sizeAccumulator.getSizeInBytes());
assertTrue(sizeAccumulator.getDirectSizeInBytes() < maxDirectSize, "direct size should still be below threshold");
assertEquals(sizeAccumulator.getSizeInBytes(), pageBuilder.getSizeInBytes(), "pageBuilder sizeInBytes must match sizeAccumulator value");
}

@Test
public void testFullOnDirectSizeInBytes()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we test with both RLE and dirct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I'm not sure what we would really be asserting there. maxDirectSizeInBytes must be >= maxSizeInBytes so, we would always be considered full based on maxSizeInBytes and wouldn't be able to tell the difference except when appending in RLE mode.

{
int maxPageBytes = 100;
int maxDirectSize = 1000;
PositionsAppenderPageBuilder pageBuilder = PositionsAppenderPageBuilder.withMaxPageSize(
maxPageBytes,
maxDirectSize,
List.of(VARCHAR),
new PositionsAppenderFactory(new BlockTypeOperators()));

PositionsAppenderSizeAccumulator sizeAccumulator = pageBuilder.computeAppenderSizes();
assertEquals(0L, sizeAccumulator.getSizeInBytes());
assertEquals(0L, sizeAccumulator.getDirectSizeInBytes());
assertFalse(pageBuilder.isFull());

Block rleBlock = RunLengthEncodedBlock.create(VARCHAR, Slices.utf8Slice("test"), 10);
Page inputPage = new Page(rleBlock);

IntArrayList positions = IntArrayList.wrap(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
pageBuilder.appendToOutputPartition(inputPage, positions);
// 10 positions inserted, size in bytes is still the same since we're in RLE mode but direct size is 10x
sizeAccumulator = pageBuilder.computeAppenderSizes();
assertEquals(rleBlock.getSizeInBytes(), sizeAccumulator.getSizeInBytes());
assertEquals(sizeAccumulator.getSizeInBytes(), pageBuilder.getSizeInBytes(), "pageBuilder sizeInBytes must match sizeAccumulator value");
assertEquals(rleBlock.getSizeInBytes() * 10, sizeAccumulator.getDirectSizeInBytes());
assertFalse(pageBuilder.isFull());

// Keep inserting until the direct size limit is reached
while (pageBuilder.computeAppenderSizes().getDirectSizeInBytes() < maxDirectSize) {
pageBuilder.appendToOutputPartition(inputPage, positions);
}
// size in bytes is unchanged
sizeAccumulator = pageBuilder.computeAppenderSizes();
assertEquals(rleBlock.getSizeInBytes(), sizeAccumulator.getSizeInBytes(), "sizeInBytes must still report the RLE block size only");
assertEquals(sizeAccumulator.getSizeInBytes(), pageBuilder.getSizeInBytes(), "pageBuilder sizeInBytes must match sizeAccumulator value");
// builder reports full due to maximum size in bytes reached
assertTrue(pageBuilder.isFull());
Page result = pageBuilder.build();
assertEquals(120, result.getPositionCount(), "result positions should be below the 8192 maximum");
assertTrue(result.getBlock(0) instanceof RunLengthEncodedBlock, "result block is RLE encoded");
}
}