Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use non partitioned dynamic filter limits for FTE #17831

Merged
merged 3 commits into from Jun 15, 2023

Conversation

arhimondr
Copy link
Contributor

@arhimondr arhimondr commented Jun 9, 2023

Description

This PR does several things:

Drops the word "broadcast" from dynamic filter limit properties. Instead there will be two types of limits: standard and partitioned. Standard limits are applied when collection happens over raw data stream (broadcast join for streaming and any join for FTE). Partitioned limits are applied when collection happens over pre-partitioned data source. Partitioned limits are expected to be lower as each operator processes only a subset of keys.

Additional this PR changes FTE to use "standard" limits instead of "partitioned", because dynamic filters collection in FTE happens over raw data source (each operator will see the entire set of keys, similar when collecting dynamic filters for broadcast join in streaming).

It also further increase "standard" limits. The increase doesn't produce any visible CPU regression (tested with standard TPC-DS benchmark over partitioned sf10000 schema).

Additional context and related issues

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(X) Release notes are required, with the following suggested text:

# Section
* Deprecate `dynamic-filtering.small-broadcast.*` and `dynamic-filtering.large-broadcast.*` configuration properties in favor of `dynamic-filtering.small.*` and `dynamic-filtering.large.*`. These limits are now considered "standard" and applied every time dynamic filters are collected over raw, non partitioned data source (happens when running with FTE enabled or when a join is executed as a broadcast join). The `dynamic-filtering.small-partitioned.*` and `dynamic-filtering.large-partitioned.*` properties remain unchanged and the limits configured with these properties are applied when dynamic filters are collected over a pre-partitioned data source (when executing a partitioned join in streaming mode). The `*-partitioned*` limits are expected to be lower as they are applied per operator, and when data set is partitioned each operator observes only a subset of the join key space. 

@@ -41,7 +41,7 @@
private boolean enableCoordinatorDynamicFiltersDistribution = true;
private boolean enableLargeDynamicFilters;

private int smallMaxDistinctValuesPerDriver = 1_000;
private int smallMaxDistinctValuesPerDriver = 10_000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add benchmark results for streaming mode at 1k scale factor ?
In general, rather than increasing config by 10X because it didn't significantly regress some TPC workloads, I would prefer that we increase this only as much as is necessary for good TPC results. I already 5X'ed this number very recently.
fyi @sopel39

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only increases the limits for BROADCAST joins where build side is usually small, do you think it may still be significant?

I would prefer that we increase this only as much as is necessary for good TPC results

There are 5000+ partitions in partitioned tables in a TPC/DS sf10000 schema. There are also plenty of joins in TPC-DS based on date dimension. This limit increase ensures the filters are always available.

@@ -51,7 +51,7 @@
private DataSize smallPartitionedMaxSizePerOperator = DataSize.of(500, KILOBYTE);
private DataSize smallMaxSizePerFilter = DataSize.of(5, MEGABYTE);

private int largeMaxDistinctValuesPerDriver = 10_000;
private int largeMaxDistinctValuesPerDriver = 100_000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is a substantial increase, is this increase benchmarked separately from the "small" one ? IIUC we're only benchmarking "small" DF limits in our usual perf benchmarks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please elaborate why do we see performance regression with increasing the limits? The values are collected with a hash set, and it's performance shouldn't vary dramatically with an increase of it's size. The complexity is still O(1) assuming right fill ration, maybe only cache misses may increase (though the total memory footprint is limited?).

Replace `*.small-broadcast.*' and '*.large-broadcast.*' configuration parameters
with simply `*.small.*` and `*.large.*`.

This commit also adjusts documentation accordingly. "partitioned" limits
no longer map to "partitioned" joins one to one, but merely indicate
limits being applied to collection of dynamic filters over a
"partitioned" data (such as build sides in partitioned joins in non-fte
mode).
@arhimondr
Copy link
Contributor Author

@raunaqmorarka I may need to take a closer look into the limits and run additional benchmarks. I decided to drop the last commit (increasing the limits) for now.

@arhimondr arhimondr merged commit ae3e1eb into trinodb:master Jun 15, 2023
88 checks passed
@arhimondr arhimondr deleted the dynamic-filter-limits branch June 15, 2023 13:37
@github-actions github-actions bot added this to the 420 milestone Jun 15, 2023
@colebow
Copy link
Member

colebow commented Jun 21, 2023

@arhimondr I think a lot of this proposed release note should be added to docs as an explanation for these config properties...

@arhimondr
Copy link
Contributor Author

I updated the docs (https://github.com/trinodb/trino/pull/17831/files#diff-f7929648d722198d4cb41322270ed5016db39e42312b14f60853cf92396f84a1R214). Do you think it is insufficient and should be expanded?

@colebow
Copy link
Member

colebow commented Jun 23, 2023

I updated the docs (https://github.com/trinodb/trino/pull/17831/files#diff-f7929648d722198d4cb41322270ed5016db39e42312b14f60853cf92396f84a1R214). Do you think it is insufficient and should be expanded?

I somehow missed the docs changes for whatever reason. Yeah, that should be good.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

None yet

3 participants