Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix partitioned output page flushing #19762

Merged

Conversation

pettyjamesm
Copy link
Member

@pettyjamesm pettyjamesm commented Nov 15, 2023

Description

Fixes a number of issues with the flushing behavior of PositionsAppenderPageBuilder and related classes. The fixes are:

  1. To consider PositionsAppenderPageBuilder to be full after at least 4x PageProcessor.MAX_BATCH_SIZE (4 * 8192 = 32,768) positions have been inserted, regardless of the current size in bytes. The previous behavior of only considering the builder full when declaredPositions == Integer.MAX_VALUE was unsafe since PositionsAppenderPageBuilder#isFull() is only checked after inserting an entire page not after each row, so integer overflows could occur.
  2. Accounting for the size of at least the current dictionary ids in UnnestingPositionsAppender when in dictionary mode, even though the size of the actually referenced entries is still not tracked. Previously, repeated insertions that used the same underlying dictionary would report no increase in the size of the builder and never flush. If at some later point a page were inserted with a different dictionary, the builder's transition to direct mode could then create very large memory allocations due to how many positions were buffered.
  3. Setting an upper bound on the resulting size of an UnnestingPositionsAppender in RLE mode if it were to be transitioned to direct mode. Before this change, RLE mode appenders could accumulate an unbounded number of positions and report no size changes so long as the same value kept being inserted. At the point at which a different value was inserted, the transition to direct mode could cause huge spikes in memory usage and cause query failures and/or worker node heap OOMs.

In particular, issues 2 and 3 are exceedingly common when performing CROSS JOIN UNNEST queries since the UnnestOperator will emit RLE and dictionary blocks that repeat the same value or use the same dictionary for extended periods before suddenly transitioning to new values or dictionaries.

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text:

# Section
* Fix an issue that could cause sudden increases in PagePartitioner memory consumption in some scenarios

@cla-bot cla-bot bot added the cla-signed label Nov 15, 2023
@pettyjamesm pettyjamesm changed the title Fix positions appender flushing Fix partitioned output page flushing Nov 15, 2023
@pettyjamesm pettyjamesm marked this pull request as ready for review November 15, 2023 22:09
Copy link
Member

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm % tests

Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm % comments

Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm % comments % benchmarks (will have results soon)

@@ -26,6 +28,9 @@
public class PositionsAppenderPageBuilder
{
private static final int DEFAULT_INITIAL_EXPECTED_ENTRIES = 8;
@VisibleForTesting
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used in testing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests do assert on this value, since the test logic requires knowing what the value is in order to trigger the flushing based on position count without reaching the size limit.

}

@Test
public void testFullOnDirectSizeInBytes()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we test with both RLE and dirct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I'm not sure what we would really be asserting there. maxDirectSizeInBytes must be >= maxSizeInBytes so, we would always be considered full based on maxSizeInBytes and wouldn't be able to tell the difference except when appending in RLE mode.

@pettyjamesm pettyjamesm force-pushed the fix-positions-appender-flushing branch 2 times, most recently from 8d691e5 to d75e68b Compare November 21, 2023 14:53
@sopel39
Copy link
Member

sopel39 commented Nov 21, 2023

benchmarks look good

label 	TPCH wall time 	TPC-DS wall time 	TPCH CPU time 	TPC-DS CPU time 	TPCH peak mem 	TPC-DS peak mem 	TPCH network 	TPC-DS network 	TPCH input 	TPC-DS input
0 	baseline 	477.684667 	731.906833 	44121.5 	67848.357333 	1.217579e+09 	1.032452e+09 	1.441352e+09 	1.401036e+09 	1.383788e+09 	1.294938e+09
1 	baseline 	480.248667 	725.994167 	43740.0 	67811.382833 	1.218880e+09 	1.028290e+09 	1.441439e+09 	1.397992e+09 	1.383787e+09 	1.292503e+09
2 	after   	479.253667 	741.316167 	44653.8 	68609.902667 	1.209194e+09 	1.025864e+09 	1.431125e+09 	1.396864e+09 	1.383787e+09 	1.292680e+09
3 	after   	492.526167 	730.921333 	43796.9 	67022.969000 	1.212940e+09 	1.025419e+09 	1.431126e+09 	1.395533e+09 	1.383787e+09 	1.292576e+09

@@ -211,6 +211,11 @@ public long getSizeInBytes()
(rleValue != null ? rleValue.getSizeInBytes() : 0);
}

public long getDirectSizeInBytes()
Copy link
Member

@sopel39 sopel39 Nov 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should most likely return OptionalLong and non-empty only for DIRECT, RLE

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still want to count the current size in bytes for direct appenders towards the total direct size, but with a special case for RLE where we want to report the size "as if it were flattened". Dictionaries of course are still unavoidably under-reported here.

@pettyjamesm pettyjamesm force-pushed the fix-positions-appender-flushing branch from d75e68b to ee7ce59 Compare November 21, 2023 15:46
Flushes PositionsAppenderPageBuilder after reaching 4x the
PageProcessor.MAX_BATCH_SIZE positions. The previous limit of
Integer.MAX_VALUE positions could easily overflow since append
operations can insert more than a single row and isFull() is only
checked after inserting an entire page into the builder. Additionally,
buffering too many RLE rows in the builder can prevent it from reaching
the size limit and starve downstream tasks from receiving any input at
all.
Although the total exact size in bytes of UnnestingPositionAppender
dictionaries is expensive to track, we should at least account for the
dictionary ids size. Otherwise, repeated dictionary insertions don't
increase the reported size at all.
@pettyjamesm pettyjamesm force-pushed the fix-positions-appender-flushing branch from ee7ce59 to 00f4269 Compare November 21, 2023 17:04
Flushes PositionsAppenderPageBuilder entries if the cost of converting
all RLE channels into direct channels would exceed the maximum page size
by a factor of 8x.

Previously, page builders could buffer a very large number of RLE
positions without being considered full and then suddenly expand to huge
sizes when forced to transition to a direct representation as a result
of the RLE input value changing across one or more columns. In
particular, this is can easily happen when pages are produced from CROSS
JOIN UNNEST operations.
@pettyjamesm pettyjamesm force-pushed the fix-positions-appender-flushing branch from 00f4269 to a3a6a25 Compare November 21, 2023 19:24
Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm % comments

@@ -211,6 +211,14 @@ public long getSizeInBytes()
(rleValue != null ? rleValue.getSizeInBytes() : 0);
}

void addSizesToAccumulator(PositionsAppenderSizeAccumulator accumulator)
Copy link
Member

@sopel39 sopel39 Nov 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could just return a record (and possibly OptionalLong for directSizeInBytes). IMO it would be cleaner and easier to understand than accumulator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think the extra allocation per appender per check is worthwhile for the small bit of extra clarity is worthwhile here. At that point, you can’t box the direct size into an optional because RLE’s have both a “size” and a separate “direct size” value that need to be summed independently from each other.

void addSizesToAccumulator(PositionsAppenderSizeAccumulator accumulator)
{
long sizeInBytes = getSizeInBytes();
// dictionary size is not included due to the expense of the calculation, so this will under-report for dictionaries
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dictionary size is not included due to the expense of the calculation
nit: actually there is io.trino.operator.output.UnnestingPositionsAppender#dictionary so it's easy to account for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s easy to count for the retained size, but not for which positions are actually in the output and the size of those positions without expensive book keeping.

@pettyjamesm pettyjamesm merged commit ddae2ec into trinodb:master Nov 22, 2023
90 checks passed
@pettyjamesm pettyjamesm deleted the fix-positions-appender-flushing branch November 22, 2023 15:22
@github-actions github-actions bot added this to the 434 milestone Nov 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

None yet

3 participants