-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix partitioned output page flushing #19762
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.trino.operator.output; | ||
|
||
final class PositionsAppenderSizeAccumulator | ||
{ | ||
private long sizeInBytes; | ||
private long directSizeInBytes; | ||
|
||
public long getSizeInBytes() | ||
{ | ||
return sizeInBytes; | ||
} | ||
|
||
public long getDirectSizeInBytes() | ||
{ | ||
return directSizeInBytes; | ||
} | ||
|
||
public void accumulate(long sizeInBytes, long directSizeInBytes) | ||
{ | ||
this.sizeInBytes += sizeInBytes; | ||
this.directSizeInBytes += directSizeInBytes; | ||
} | ||
|
||
@Override | ||
public String toString() | ||
{ | ||
return "PositionsAppenderSizeAccumulator{sizeInBytes=" + sizeInBytes + " directSizeInBytes=" + directSizeInBytes + "}"; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -211,6 +211,14 @@ public long getSizeInBytes() | |
(rleValue != null ? rleValue.getSizeInBytes() : 0); | ||
} | ||
|
||
sopel39 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
void addSizesToAccumulator(PositionsAppenderSizeAccumulator accumulator) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this could just return a record (and possibly There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don’t think the extra allocation per appender per check is worthwhile for the small bit of extra clarity is worthwhile here. At that point, you can’t box the direct size into an optional because RLE’s have both a “size” and a separate “direct size” value that need to be summed independently from each other. |
||
{ | ||
long sizeInBytes = getSizeInBytes(); | ||
// dictionary size is not included due to the expense of the calculation, so this will under-report for dictionaries | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It’s easy to count for the retained size, but not for which positions are actually in the output and the size of those positions without expensive book keeping. |
||
long directSizeInBytes = (rleValue == null) ? sizeInBytes : (rleValue.getSizeInBytes() * rlePositionCount); | ||
accumulator.accumulate(sizeInBytes, directSizeInBytes); | ||
} | ||
|
||
private static class DictionaryIdsBuilder | ||
{ | ||
private static final int INSTANCE_SIZE = instanceSize(DictionaryIdsBuilder.class); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,8 +34,10 @@ public class TestPositionsAppenderPageBuilder | |
public void testFullOnPositionCountLimit() | ||
pettyjamesm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
int maxPageBytes = 1024 * 1024; | ||
int maxDirectSize = maxPageBytes * 10; | ||
PositionsAppenderPageBuilder pageBuilder = PositionsAppenderPageBuilder.withMaxPageSize( | ||
maxPageBytes, | ||
maxDirectSize, | ||
List.of(VARCHAR), | ||
new PositionsAppenderFactory(new BlockTypeOperators())); | ||
|
||
|
@@ -52,6 +54,52 @@ public void testFullOnPositionCountLimit() | |
// Append 10 more positions, crossing the threshold on position count | ||
pageBuilder.appendToOutputPartition(inputPage, positions); | ||
assertTrue(pageBuilder.isFull(), "pageBuilder should be full"); | ||
assertEquals(rleBlock.getSizeInBytes(), pageBuilder.getSizeInBytes()); | ||
PositionsAppenderSizeAccumulator sizeAccumulator = pageBuilder.computeAppenderSizes(); | ||
assertEquals(rleBlock.getSizeInBytes(), sizeAccumulator.getSizeInBytes()); | ||
assertTrue(sizeAccumulator.getDirectSizeInBytes() < maxDirectSize, "direct size should still be below threshold"); | ||
assertEquals(sizeAccumulator.getSizeInBytes(), pageBuilder.getSizeInBytes(), "pageBuilder sizeInBytes must match sizeAccumulator value"); | ||
} | ||
|
||
@Test | ||
public void testFullOnDirectSizeInBytes() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could we test with both RLE and dirct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be honest, I'm not sure what we would really be asserting there. |
||
{ | ||
int maxPageBytes = 100; | ||
int maxDirectSize = 1000; | ||
PositionsAppenderPageBuilder pageBuilder = PositionsAppenderPageBuilder.withMaxPageSize( | ||
maxPageBytes, | ||
maxDirectSize, | ||
List.of(VARCHAR), | ||
new PositionsAppenderFactory(new BlockTypeOperators())); | ||
|
||
PositionsAppenderSizeAccumulator sizeAccumulator = pageBuilder.computeAppenderSizes(); | ||
assertEquals(0L, sizeAccumulator.getSizeInBytes()); | ||
assertEquals(0L, sizeAccumulator.getDirectSizeInBytes()); | ||
assertFalse(pageBuilder.isFull()); | ||
|
||
Block rleBlock = RunLengthEncodedBlock.create(VARCHAR, Slices.utf8Slice("test"), 10); | ||
Page inputPage = new Page(rleBlock); | ||
|
||
IntArrayList positions = IntArrayList.wrap(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); | ||
pageBuilder.appendToOutputPartition(inputPage, positions); | ||
// 10 positions inserted, size in bytes is still the same since we're in RLE mode but direct size is 10x | ||
sizeAccumulator = pageBuilder.computeAppenderSizes(); | ||
assertEquals(rleBlock.getSizeInBytes(), sizeAccumulator.getSizeInBytes()); | ||
assertEquals(sizeAccumulator.getSizeInBytes(), pageBuilder.getSizeInBytes(), "pageBuilder sizeInBytes must match sizeAccumulator value"); | ||
assertEquals(rleBlock.getSizeInBytes() * 10, sizeAccumulator.getDirectSizeInBytes()); | ||
assertFalse(pageBuilder.isFull()); | ||
|
||
// Keep inserting until the direct size limit is reached | ||
while (pageBuilder.computeAppenderSizes().getDirectSizeInBytes() < maxDirectSize) { | ||
pageBuilder.appendToOutputPartition(inputPage, positions); | ||
} | ||
// size in bytes is unchanged | ||
sizeAccumulator = pageBuilder.computeAppenderSizes(); | ||
assertEquals(rleBlock.getSizeInBytes(), sizeAccumulator.getSizeInBytes(), "sizeInBytes must still report the RLE block size only"); | ||
assertEquals(sizeAccumulator.getSizeInBytes(), pageBuilder.getSizeInBytes(), "pageBuilder sizeInBytes must match sizeAccumulator value"); | ||
// builder reports full due to maximum size in bytes reached | ||
assertTrue(pageBuilder.isFull()); | ||
Page result = pageBuilder.build(); | ||
assertEquals(120, result.getPositionCount(), "result positions should be below the 8192 maximum"); | ||
assertTrue(result.getBlock(0) instanceof RunLengthEncodedBlock, "result block is RLE encoded"); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used in testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests do assert on this value, since the test logic requires knowing what the value is in order to trigger the flushing based on position count without reaching the size limit.