-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Limit RLE to direct expansion when partitioning
Flushes PositionsAppenderPageBuilder entries if the cost of converting all RLE channels into direct channels would exceed the maximum page size by a factor of 8x. Previously, page builders could buffer a very large number of RLE positions without being considered full and then suddenly expand to huge sizes when forced to transition to a direct representation as a result of the RLE input value changing across one or more columns. In particular, this is can easily happen when pages are produced from CROSS JOIN UNNEST operations.
- Loading branch information
1 parent
cdbb607
commit 13a64f8
Showing
3 changed files
with
129 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
98 changes: 98 additions & 0 deletions
98
core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppenderPageBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
/* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.trino.operator.output; | ||
|
||
import io.airlift.slice.Slices; | ||
import io.trino.spi.Page; | ||
import io.trino.spi.block.Block; | ||
import io.trino.spi.block.RunLengthEncodedBlock; | ||
import io.trino.type.BlockTypeOperators; | ||
import it.unimi.dsi.fastutil.ints.IntArrayList; | ||
import org.junit.jupiter.api.Test; | ||
|
||
import java.util.List; | ||
|
||
import static io.trino.spi.type.VarcharType.VARCHAR; | ||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertFalse; | ||
import static org.junit.jupiter.api.Assertions.assertTrue; | ||
|
||
public class TestPositionsAppenderPageBuilder | ||
{ | ||
@Test | ||
public void testFullOnPositionCountLimit() | ||
{ | ||
int maxPageBytes = 1024 * 1024; | ||
int maxDirectSize = maxPageBytes * 10; | ||
PositionsAppenderPageBuilder pageBuilder = PositionsAppenderPageBuilder.withMaxPageSize( | ||
maxPageBytes, | ||
maxDirectSize, | ||
List.of(VARCHAR), | ||
new PositionsAppenderFactory(new BlockTypeOperators())); | ||
|
||
Block rleBlock = RunLengthEncodedBlock.create(VARCHAR, Slices.utf8Slice("test"), 10); | ||
Page inputPage = new Page(rleBlock); | ||
|
||
IntArrayList positions = IntArrayList.wrap(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); | ||
// Append 32760 positions, just less than MAX_POSITION_COUNT | ||
assertEquals(32768, PositionsAppenderPageBuilder.MAX_POSITION_COUNT, "expected MAX_POSITION_COUNT to be 32768"); | ||
for (int i = 0; i < 3276; i++) { | ||
pageBuilder.appendToOutputPartition(inputPage, positions); | ||
} | ||
assertFalse(pageBuilder.isFull(), "pageBuilder should still not be full"); | ||
// Append 10 more positions, crossing the threshold on position count | ||
pageBuilder.appendToOutputPartition(inputPage, positions); | ||
assertTrue(pageBuilder.isFull(), "pageBuilder should be full"); | ||
assertEquals(rleBlock.getSizeInBytes(), pageBuilder.getSizeInBytes()); | ||
assertTrue(pageBuilder.getDirectSizeInBytes() < maxDirectSize, "direct size should still be below threshold"); | ||
} | ||
|
||
@Test | ||
public void testFullOnDirectSizeInBytes() | ||
{ | ||
int maxPageBytes = 100; | ||
int maxDirectSize = 1000; | ||
PositionsAppenderPageBuilder pageBuilder = PositionsAppenderPageBuilder.withMaxPageSize( | ||
maxPageBytes, | ||
maxDirectSize, | ||
List.of(VARCHAR), | ||
new PositionsAppenderFactory(new BlockTypeOperators())); | ||
|
||
assertEquals(0L, pageBuilder.getSizeInBytes()); | ||
assertEquals(0L, pageBuilder.getDirectSizeInBytes()); | ||
assertFalse(pageBuilder.isFull()); | ||
|
||
Block rleBlock = RunLengthEncodedBlock.create(VARCHAR, Slices.utf8Slice("test"), 10); | ||
Page inputPage = new Page(rleBlock); | ||
|
||
IntArrayList positions = IntArrayList.wrap(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); | ||
pageBuilder.appendToOutputPartition(inputPage, positions); | ||
// 10 positions inserted, size in bytes is still the same since we're in RLE mode but direct size is 10x | ||
assertEquals(rleBlock.getSizeInBytes(), pageBuilder.getSizeInBytes()); | ||
assertEquals(rleBlock.getSizeInBytes() * 10, pageBuilder.getDirectSizeInBytes()); | ||
assertFalse(pageBuilder.isFull()); | ||
|
||
// Keep inserting until the direct size limit is reached | ||
while (pageBuilder.getDirectSizeInBytes() < maxDirectSize) { | ||
pageBuilder.appendToOutputPartition(inputPage, positions); | ||
} | ||
// size in bytes is unchanged | ||
assertEquals(rleBlock.getSizeInBytes(), pageBuilder.getSizeInBytes()); | ||
// builder reports full due to maximum size in bytes reached | ||
assertTrue(pageBuilder.isFull()); | ||
Page result = pageBuilder.build(); | ||
assertEquals(120, result.getPositionCount(), "result positions should be below the 8192 maximum"); | ||
assertTrue(result.getBlock(0) instanceof RunLengthEncodedBlock, "result block is RLE encoded"); | ||
} | ||
} |