Skip to content

Commit

Permalink
Limit RLE to direct expansion when partitioning
Browse files Browse the repository at this point in the history
Flushes PositionsAppenderPageBuilder entries if the cost of converting
all RLE channels into direct channels would exceed the maximum page size
by a factor of 8x.

Previously, page builders could buffer a very large number of RLE
positions without being considered full and then suddenly expand to huge
sizes when forced to transition to a direct representation as a result
of the RLE input value changing across one or more columns. In
particular, this is can easily happen when pages are produced from CROSS
JOIN UNNEST operations.
  • Loading branch information
pettyjamesm committed Nov 21, 2023
1 parent 82dd85d commit 00f4269
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

Expand All @@ -30,26 +31,41 @@ public class PositionsAppenderPageBuilder
private static final int DEFAULT_INITIAL_EXPECTED_ENTRIES = 8;
@VisibleForTesting
static final int MAX_POSITION_COUNT = PageProcessor.MAX_BATCH_SIZE * 4;
// Maximum page size before being considered full based on current direct appender size and if RLE channels were converted to direct. Currently,
// dictionary mode appenders still under-report because computing their equivalent size if converted to direct is prohibitively expensive.
private static final int MAXIMUM_DIRECT_SIZE_MULTIPLIER = 8;

private final UnnestingPositionsAppender[] channelAppenders;
private final int maxPageSizeInBytes;
private final int maxDirectPageSizeInBytes;
private int declaredPositions;

public static PositionsAppenderPageBuilder withMaxPageSize(int maxPageBytes, List<Type> sourceTypes, PositionsAppenderFactory positionsAppenderFactory)
{
return new PositionsAppenderPageBuilder(DEFAULT_INITIAL_EXPECTED_ENTRIES, maxPageBytes, sourceTypes, positionsAppenderFactory);
return withMaxPageSize(maxPageBytes, maxPageBytes * MAXIMUM_DIRECT_SIZE_MULTIPLIER, sourceTypes, positionsAppenderFactory);
}

@VisibleForTesting
static PositionsAppenderPageBuilder withMaxPageSize(int maxPageBytes, int maxDirectSizeInBytes, List<Type> sourceTypes, PositionsAppenderFactory positionsAppenderFactory)
{
return new PositionsAppenderPageBuilder(DEFAULT_INITIAL_EXPECTED_ENTRIES, maxPageBytes, maxDirectSizeInBytes, sourceTypes, positionsAppenderFactory);
}

private PositionsAppenderPageBuilder(
int initialExpectedEntries,
int maxPageSizeInBytes,
int maxDirectPageSizeInBytes,
List<? extends Type> types,
PositionsAppenderFactory positionsAppenderFactory)
{
requireNonNull(types, "types is null");
requireNonNull(positionsAppenderFactory, "positionsAppenderFactory is null");
checkArgument(maxPageSizeInBytes > 0, "maxPageSizeInBytes is negative: %s", maxPageSizeInBytes);
checkArgument(maxDirectPageSizeInBytes > 0, "maxDirectPageSizeInBytes is negative: %s", maxDirectPageSizeInBytes);
checkArgument(maxDirectPageSizeInBytes >= maxPageSizeInBytes, "maxDirectPageSizeInBytes (%s) must be >= maxPageSizeInBytes (%s)", maxDirectPageSizeInBytes, maxPageSizeInBytes);

this.maxPageSizeInBytes = maxPageSizeInBytes;
this.maxDirectPageSizeInBytes = maxDirectPageSizeInBytes;
channelAppenders = new UnnestingPositionsAppender[types.size()];
for (int i = 0; i < channelAppenders.length; i++) {
channelAppenders[i] = positionsAppenderFactory.create(types.get(i), initialExpectedEntries, maxPageSizeInBytes);
Expand Down Expand Up @@ -103,7 +119,21 @@ private void declarePositions(int positions)

public boolean isFull()
{
return declaredPositions >= MAX_POSITION_COUNT || getSizeInBytes() >= maxPageSizeInBytes;
if (declaredPositions >= MAX_POSITION_COUNT) {
return true;
}
PositionsAppenderSizeAccumulator accumulator = computeAppenderSizes();
return accumulator.getSizeInBytes() >= maxPageSizeInBytes || accumulator.getDirectSizeInBytes() >= maxDirectPageSizeInBytes;
}

@VisibleForTesting
PositionsAppenderSizeAccumulator computeAppenderSizes()
{
PositionsAppenderSizeAccumulator accumulator = new PositionsAppenderSizeAccumulator();
for (UnnestingPositionsAppender positionsAppender : channelAppenders) {
positionsAppender.addSizesToAccumulator(accumulator);
}
return accumulator;
}

public boolean isEmpty()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.operator.output;

final class PositionsAppenderSizeAccumulator
{
private long sizeInBytes;
private long directSizeInBytes;

public long getSizeInBytes()
{
return sizeInBytes;
}

public long getDirectSizeInBytes()
{
return directSizeInBytes;
}

public void accumulate(long sizeInBytes, long directSizeInBytes)
{
this.sizeInBytes += sizeInBytes;
this.directSizeInBytes += directSizeInBytes;
}

@Override
public String toString()
{
return "PositionsAppenderSizeAccumulator{sizeInBytes=" + sizeInBytes + " directSizeInBytes=" + directSizeInBytes + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,14 @@ public long getSizeInBytes()
(rleValue != null ? rleValue.getSizeInBytes() : 0);
}

void addSizesToAccumulator(PositionsAppenderSizeAccumulator accumulator)
{
long sizeInBytes = getSizeInBytes();
// dictionary size is not included due to the expense of the calculation, so this will under-report for dictionaries
long directSizeInBytes = (rleValue == null) ? sizeInBytes : (rleValue.getSizeInBytes() * rlePositionCount);
accumulator.accumulate(sizeInBytes, directSizeInBytes);
}

private static class DictionaryIdsBuilder
{
private static final int INSTANCE_SIZE = instanceSize(DictionaryIdsBuilder.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ public class TestPositionsAppenderPageBuilder
public void testFullOnPositionCountLimit()
{
int maxPageBytes = 1024 * 1024;
int maxDirectSize = maxPageBytes * 10;
PositionsAppenderPageBuilder pageBuilder = PositionsAppenderPageBuilder.withMaxPageSize(
maxPageBytes,
maxDirectSize,
List.of(VARCHAR),
new PositionsAppenderFactory(new BlockTypeOperators()));

Expand All @@ -52,6 +54,52 @@ public void testFullOnPositionCountLimit()
// Append 10 more positions, crossing the threshold on position count
pageBuilder.appendToOutputPartition(inputPage, positions);
assertTrue(pageBuilder.isFull(), "pageBuilder should be full");
assertEquals(rleBlock.getSizeInBytes(), pageBuilder.getSizeInBytes());
PositionsAppenderSizeAccumulator sizeAccumulator = pageBuilder.computeAppenderSizes();
assertEquals(rleBlock.getSizeInBytes(), sizeAccumulator.getSizeInBytes());
assertTrue(sizeAccumulator.getDirectSizeInBytes() < maxDirectSize, "direct size should still be below threshold");
assertEquals(sizeAccumulator.getSizeInBytes(), pageBuilder.getSizeInBytes(), "pageBuilder sizeInBytes must match sizeAccumulator value");
}

@Test
public void testFullOnDirectSizeInBytes()
{
int maxPageBytes = 100;
int maxDirectSize = 1000;
PositionsAppenderPageBuilder pageBuilder = PositionsAppenderPageBuilder.withMaxPageSize(
maxPageBytes,
maxDirectSize,
List.of(VARCHAR),
new PositionsAppenderFactory(new BlockTypeOperators()));

PositionsAppenderSizeAccumulator sizeAccumulator = pageBuilder.computeAppenderSizes();
assertEquals(0L, sizeAccumulator.getSizeInBytes());
assertEquals(0L, sizeAccumulator.getDirectSizeInBytes());
assertFalse(pageBuilder.isFull());

Block rleBlock = RunLengthEncodedBlock.create(VARCHAR, Slices.utf8Slice("test"), 10);
Page inputPage = new Page(rleBlock);

IntArrayList positions = IntArrayList.wrap(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
pageBuilder.appendToOutputPartition(inputPage, positions);
// 10 positions inserted, size in bytes is still the same since we're in RLE mode but direct size is 10x
sizeAccumulator = pageBuilder.computeAppenderSizes();
assertEquals(rleBlock.getSizeInBytes(), sizeAccumulator.getSizeInBytes());
assertEquals(sizeAccumulator.getSizeInBytes(), pageBuilder.getSizeInBytes(), "pageBuilder sizeInBytes must match sizeAccumulator value");
assertEquals(rleBlock.getSizeInBytes() * 10, sizeAccumulator.getDirectSizeInBytes());
assertFalse(pageBuilder.isFull());

// Keep inserting until the direct size limit is reached
while (pageBuilder.computeAppenderSizes().getDirectSizeInBytes() < maxDirectSize) {
pageBuilder.appendToOutputPartition(inputPage, positions);
}
// size in bytes is unchanged
sizeAccumulator = pageBuilder.computeAppenderSizes();
assertEquals(rleBlock.getSizeInBytes(), sizeAccumulator.getSizeInBytes(), "sizeInBytes must still report the RLE block size only");
assertEquals(sizeAccumulator.getSizeInBytes(), pageBuilder.getSizeInBytes(), "pageBuilder sizeInBytes must match sizeAccumulator value");
// builder reports full due to maximum size in bytes reached
assertTrue(pageBuilder.isFull());
Page result = pageBuilder.build();
assertEquals(120, result.getPositionCount(), "result positions should be below the 8192 maximum");
assertTrue(result.getBlock(0) instanceof RunLengthEncodedBlock, "result block is RLE encoded");
}
}

0 comments on commit 00f4269

Please sign in to comment.