Skip to content

Commit

Permalink
Limit RLE to direct expansion when partitioning
Browse files Browse the repository at this point in the history
Flushes PositionsAppenderPageBuilder entries if the cost of converting
all RLE channels into direct channels would exceed the maximum page size
by a factor of 8x.

Previously, page builders could buffer a very large number of RLE
positions without being considered full and then suddenly expand to huge
sizes when forced to transition to a direct representation as a result
of the RLE input value changing across one or more columns. In
particular, this is can easily happen when pages are produced from CROSS
JOIN UNNEST operations.
  • Loading branch information
pettyjamesm committed Nov 16, 2023
1 parent 3504e87 commit 6952664
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.operator.output;

import com.google.common.annotations.VisibleForTesting;
import io.trino.operator.project.PageProcessor;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
Expand All @@ -21,31 +22,46 @@

import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

public class PositionsAppenderPageBuilder
{
private static final int DEFAULT_INITIAL_EXPECTED_ENTRIES = 8;
// Maximum page size before being considered full if all RLE channels convert are converted to direct
private static final int MAXIMUM_DIRECT_SIZE_MULTIPLIER = 8;
private final UnnestingPositionsAppender[] channelAppenders;
private final int maxPageSizeInBytes;
private final int maxDirectPageSizeInBytes;
private int declaredPositions;

public static PositionsAppenderPageBuilder withMaxPageSize(int maxPageBytes, List<Type> sourceTypes, PositionsAppenderFactory positionsAppenderFactory)
{
return new PositionsAppenderPageBuilder(DEFAULT_INITIAL_EXPECTED_ENTRIES, maxPageBytes, sourceTypes, positionsAppenderFactory);
return withMaxPageSize(maxPageBytes, maxPageBytes * MAXIMUM_DIRECT_SIZE_MULTIPLIER, sourceTypes, positionsAppenderFactory);
}

@VisibleForTesting
static PositionsAppenderPageBuilder withMaxPageSize(int maxPageBytes, int maxDirectSizeInBytes, List<Type> sourceTypes, PositionsAppenderFactory positionsAppenderFactory)
{
return new PositionsAppenderPageBuilder(DEFAULT_INITIAL_EXPECTED_ENTRIES, maxPageBytes, maxDirectSizeInBytes, sourceTypes, positionsAppenderFactory);
}

private PositionsAppenderPageBuilder(
int initialExpectedEntries,
int maxPageSizeInBytes,
int maxDirectPageSizeInBytes,
List<? extends Type> types,
PositionsAppenderFactory positionsAppenderFactory)
{
requireNonNull(types, "types is null");
requireNonNull(positionsAppenderFactory, "positionsAppenderFactory is null");
checkArgument(maxPageSizeInBytes > 0, "maxPageSizeInBytes is negative: %s", maxPageSizeInBytes);
checkArgument(maxDirectPageSizeInBytes > 0, "maxDirectPageSizeInBytes is negative: %s", maxDirectPageSizeInBytes);
checkArgument(maxDirectPageSizeInBytes >= maxPageSizeInBytes, "maxDirectPageSizeInBytes (%s) must be >= maxPageSizeInBytes (%s)", maxDirectPageSizeInBytes, maxPageSizeInBytes);

this.maxPageSizeInBytes = maxPageSizeInBytes;
this.maxDirectPageSizeInBytes = maxDirectPageSizeInBytes;
channelAppenders = new UnnestingPositionsAppender[types.size()];
for (int i = 0; i < channelAppenders.length; i++) {
channelAppenders[i] = positionsAppenderFactory.create(types.get(i), initialExpectedEntries, maxPageSizeInBytes);
Expand Down Expand Up @@ -92,14 +108,23 @@ public long getSizeInBytes()
return sizeInBytes;
}

public long getDirectSizeInBytes()
{
long directSizeInBytes = 0;
for (UnnestingPositionsAppender positionsAppender : channelAppenders) {
directSizeInBytes += positionsAppender.getDirectSizeInBytes();
}
return directSizeInBytes;
}

private void declarePositions(int positions)
{
declaredPositions += positions;
}

public boolean isFull()
{
return declaredPositions >= PageProcessor.MAX_BATCH_SIZE || getSizeInBytes() >= maxPageSizeInBytes;
return declaredPositions >= PageProcessor.MAX_BATCH_SIZE || getSizeInBytes() >= maxPageSizeInBytes || getDirectSizeInBytes() >= maxDirectPageSizeInBytes;
}

public boolean isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ public long getSizeInBytes()
(rleValue != null ? rleValue.getSizeInBytes() : 0);
}

public long getDirectSizeInBytes()
{
return rleValue == null ? getSizeInBytes() : (rleValue.getSizeInBytes() * rlePositionCount);
}

private static class DictionaryIdsBuilder
{
private static final int INSTANCE_SIZE = instanceSize(DictionaryIdsBuilder.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.operator.output;

import io.airlift.slice.Slices;
import io.trino.operator.project.PageProcessor;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.type.BlockTypeOperators;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.junit.jupiter.api.Test;

import java.util.List;

import static io.trino.spi.type.VarcharType.VARCHAR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestPositionsAppenderPageBuilder
{
@Test
public void testFullOnPositionCountLimit()
{
int maxPageBytes = 1024 * 1024;
int maxDirectSize = maxPageBytes * 10;
PositionsAppenderPageBuilder pageBuilder = PositionsAppenderPageBuilder.withMaxPageSize(
maxPageBytes,
maxDirectSize,
List.of(VARCHAR),
new PositionsAppenderFactory(new BlockTypeOperators()));

Block rleBlock = RunLengthEncodedBlock.create(VARCHAR, Slices.utf8Slice("test"), 10);
Page inputPage = new Page(rleBlock);

IntArrayList positions = new IntArrayList();
positions.addAll(List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
// Append 8190 positions, just less than PageProcessor.MAX_BATCH_SIZE
assertEquals(8192, PageProcessor.MAX_BATCH_SIZE, "expected constant to be 8192");
for (int i = 0; i < 819; i++) {
pageBuilder.appendToOutputPartition(inputPage, positions);
}
assertFalse(pageBuilder.isFull(), "pageBuilder should still not be full");
// Append 10 more positions, crossing the threshold on position count
pageBuilder.appendToOutputPartition(inputPage, positions);
assertTrue(pageBuilder.isFull(), "pageBuilder should be full");
assertEquals(rleBlock.getSizeInBytes(), pageBuilder.getSizeInBytes());
assertTrue(pageBuilder.getDirectSizeInBytes() < maxDirectSize, "direct size should still be below threshold");
}

@Test
public void testFullOnDirectSizeInBytes()
{
int maxPageBytes = 100;
int maxDirectSize = 1000;
PositionsAppenderPageBuilder pageBuilder = PositionsAppenderPageBuilder.withMaxPageSize(
maxPageBytes,
maxDirectSize,
List.of(VARCHAR),
new PositionsAppenderFactory(new BlockTypeOperators()));

assertEquals(0L, pageBuilder.getSizeInBytes());
assertEquals(0L, pageBuilder.getDirectSizeInBytes());
assertFalse(pageBuilder.isFull());

Block rleBlock = RunLengthEncodedBlock.create(VARCHAR, Slices.utf8Slice("test"), 10);
Page inputPage = new Page(rleBlock);

IntArrayList positions = new IntArrayList();
positions.addAll(List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
pageBuilder.appendToOutputPartition(inputPage, positions);
// 10 positions inserted, size in bytes is still the same since we're in RLE mode but direct size is 10x
assertEquals(rleBlock.getSizeInBytes(), pageBuilder.getSizeInBytes());
assertEquals(rleBlock.getSizeInBytes() * 10, pageBuilder.getDirectSizeInBytes());
assertFalse(pageBuilder.isFull());

// Keep inserting until the direct size limit is reached
while (pageBuilder.getDirectSizeInBytes() < maxDirectSize) {
pageBuilder.appendToOutputPartition(inputPage, positions);
}
// size in bytes is unchanged
assertEquals(rleBlock.getSizeInBytes(), pageBuilder.getSizeInBytes());
// builder reports full due to maximum size in bytes reached
assertTrue(pageBuilder.isFull());
Page result = pageBuilder.build();
assertEquals(120, result.getPositionCount(), "result positions should be below the 8192 maximum");
assertTrue(result.getBlock(0) instanceof RunLengthEncodedBlock, "result block is RLE encoded");
}
}

0 comments on commit 6952664

Please sign in to comment.