New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize execution for output duplicates insensitive joins #5981
Conversation
cc: @martint |
542e1b0
to
62ccb84
Compare
presto-main/src/main/java/io/prestosql/sql/planner/plan/JoinNode.java
Outdated
Show resolved
Hide resolved
...n/src/main/java/io/prestosql/sql/planner/iterative/rule/PushAggregationThroughOuterJoin.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/io/prestosql/sql/planner/plan/JoinNode.java
Outdated
Show resolved
Hide resolved
62ccb84
to
c3a703d
Compare
...java/io/prestosql/sql/planner/iterative/rule/TestOptimizeCardinalityInsensitiveJoinRule.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few questions:
- Does this change affect any TPCH / DS queries? If so, how much do they benefit?
- Could we have similar rules for ExceptNode distinct and IntersectNode distinct? Also, could the rule's Visitor support those?
- How about removing the AggregationNode in case when it has single grouping set over all outputs and deduplication is supported for the particular join?
presto-main/src/main/java/io/prestosql/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
presto-testing/src/main/java/io/prestosql/testing/AbstractTestJoinQueries.java
Outdated
Show resolved
Hide resolved
c3a703d
to
e342417
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kasiafi ac
Does this change affect any TPCH / DS queries? If so, how much do they benefit?
This is to be benchmarked. I've seen lot's of queries that changed
Could we have similar rules for ExceptNode distinct and IntersectNode distinct? Also, could the rule's Visitor support those?
Sure we could use it in more places. To name few:
- semi join
- correlated
EXISTS
with non-eq conditions instead of streaming aggregations
Could you help to identify more places where it can be used? This doesn't have to be part of this PR though.
How about removing the AggregationNode in case when it has single grouping set over all outputs and deduplication is supported for the particular join?
That doesn't work because only matches are deduplicated, but not join input rows.
...n/src/main/java/io/prestosql/sql/planner/iterative/rule/PushAggregationThroughOuterJoin.java
Outdated
Show resolved
Hide resolved
a595002
to
82830cf
Compare
77d6b11
to
3bbbf33
Compare
@@ -336,8 +342,13 @@ private boolean joinCurrentPosition(LookupSource lookupSource, DriverYieldSignal | |||
joinSourcePositions++; | |||
} | |||
|
|||
// get next position on lookup side for this probe row | |||
joinPosition = lookupSource.getNextJoinPosition(joinPosition, probe.getPosition(), probe.getPage()); | |||
if (!matchSingleBuildRow || !currentProbePositionProducedRow) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we could make pass this flag (matchSingleBuildRow) so when we build the hashTable we could make sure that only one element exists ? Or override getNextJoinPosition
to return -1. Not sure if it would affect other places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe a simple PositionalLink
that could return -1 for PositionalLink#next
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would work when there are no extra filters in JoinNode#filter
. It's a good idea. Could it be a follow up?
Here's a summary: #6005 |
This looks like a semijoin in disguise. Have we looked at turning these joins into semijoins instead? |
@martint we purposefully convert filtering semi-joins into joins + aggregation because:
|
They could be turned into a semijoin after the CBO rules run
That's something we should improve for semijoin operator, regardless
Yes, parts would be similar, but semi-join doesn't need to worry about producing the values from the other side so that's a lot of code that's not relevant. My concern is about turning the join operator into a "kitchen sink" operator. |
I'm not convinced semi-join is that beneficial from perf point of view. It might be necessity for projected semi-join, but for filtering semi-join pruning rows within join has better performance then
@martint resemblance of semi-join superficial. In order to achieve similar level of support for filtering semi-join as for
At this point you pretty much have another implementation of lookup join with a caveat that the new implementation cannot output build columns (which lookup join very easily can do as well). |
Additionally, rule is this PR works when join sides are flipped, so it's another difference from semi join. |
That wouldn't matter, no? For cases where we want "first match" and the columns being output matter, the order can't be flipped without losing the first match semantics. |
@@ -302,6 +306,12 @@ public PlanNode getRight() | |||
return spillable; | |||
} | |||
|
|||
@JsonProperty("canSkipOutputDuplicates") | |||
public boolean isCanSkipOutputDuplicates() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will affect the computed estimates for the join, but I don't see anything in this PR that adjusts that logic. The resulting cardinality should be no larger than that of the left side of the join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, OptimizeOutputDuplicatesInsensitiveJoinRule
is executed after ReorderJoins
since ReorderJoins
creates new JoinNodes
without canSkipOutputDuplicates
property.
I think, we could improve ReorderJoins
to understand that entire join subtree matches canSkipOutputDuplicates
, but this is not part of this PR.
@@ -98,6 +100,7 @@ public JoinNode( | |||
this.criteria = ImmutableList.copyOf(criteria); | |||
this.leftOutputSymbols = ImmutableList.copyOf(leftOutputSymbols); | |||
this.rightOutputSymbols = ImmutableList.copyOf(rightOutputSymbols); | |||
this.canSkipOutputDuplicates = canSkipOutputDuplicates; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
canSkipOutputDuplicates = true
only makes sense if the only columns being output come from the left side of the join (or are equi-join keys in the case of an inner join). Also, this doesn't work for RIGHT or FULL join, so we should validate we're not creating an inconsistent node.
Also, given the semantics of this property, I'm not sure canSkipOutputDuplicates
is the best name. It'd be more accurate to call it "outputsFirstMatchOnly" or something similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this doesn't work for RIGHT or FULL join, so we should validate we're not creating an inconsistent node.
@martint Initially I thought about making JoinNode#canSkipOutputDuplicates
implementation specific (to model how LookupJoinOperator
and cross join work). However, this complicates reasoning and is more prone to rune ordering. For example, with current approach following scenario would work:
OptimizeOutputDuplicatesInsensitiveJoinRule
could markRIGHT
join ascanSkipOutputDuplicates
- later some other rule could flip join sides, but
canSkipOutputDuplicates
property is preserved.
Similarly, other rules like PPD, unalias or pruning can fire but canSkipOutputDuplicates
property is preserved. If I connected canSkipOutputDuplicates
with some concrete join implementation, that would require additional checks if canSkipOutputDuplicates
can be preserved. In worst case, canSkipOutputDuplicates
property is lost during planning.
By making canSkipOutputDuplicates
property of the join algebra itself (and not join implementation) planning logic get simpler and is more stable.
It seems that eventually we should have different concrete plan nodes for different join implementations (e.g LookupJoin
, SortJoin
, CrossJoin
). That would have additional benefit of being able to compare plans with different join implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like I suggested in #6005, we need to sort it out before we start optimizing other plans based on canSkipOutputDuplicates
property.
In the case captured by the OptimizeOutputDuplicatesInsensitiveJoinRule
, the aggregation remains in the plan, so the result is the same, no matter if the join is or is not eventually optimized. However, before we add other transformations depending on join dropping duplicates, we need to find a way to ensure that the execution will follow - that is, that the join will not flip.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, before we add other transformations depending on join dropping duplicates, we need to find a way to ensure that the execution will follow - that is, that the join will not flip.
This looks like committing to some concrete implementation of join that has extra properties (e.g cannot be flipped). IIRC some planners actually plan first with abstract node types and only then have rules that choose concrete implementation.
On one hand committing to some operator implementation can open new optimization possibilities, but on the other hand it can close others. Memo based optimizer would be helpful in such case.
With Potentially we could even insert (final) aggregations upstream, but that should be CBO based since such aggregations might not always be beneficial. |
Can you explain the precise semantics of |
It means that duplicated rows produced by join are irrelevant (e.g because of aggregation) and can be skipped (not produced by join) |
Ok, I see. And I also see that whether the physical join implementation can take advantage of this attribute depends on the type of join and which columns are being output. It's a weird concept, since it's some kind of "preference" that's being recorded in the node itself. I'm not sure we have anything like that today, so I need to think about it a bit more. |
For join node we have |
Benchmarks comparison-optimize-joins.pdf Here are benchmark results. |
I wouldn't call it preference, but rather trait of output data. Potentially, we could keep such output data traits along with output symbols. This particular output trait is not specific to join only (it's also valid for other node types below aggregation). However, only join can take advantage of it with this PR. |
A trait is a characteristic of the operation. The reason it is a preference is that it's a signal of what the downstream operation is ok with or desires (i.e., doesn't care about duplicates). If it were a trait, it would be a description what the operation actually does, which would make this not a regular join, but a different kind of physical join operation and "can skip duplicate" would not be the proper name. |
Indeed, downstream operator can have preferences and upstream operator can produce data with traits. In this case preference is turned (via rule) into property of a join operation. @martint Have you give a though about this approach? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rename the attribute to "maySkipDuplicates". That conveys the meaning of "is allowed to" better, since the other form can also be interpreted as "is capable of".
// Must run before AddExchanges | ||
new PushDeleteIntoConnector(metadata), | ||
// Must run before AddExchanges and after join reordering | ||
new OptimizeOutputDuplicatesInsensitiveJoinRule(metadata)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this agnostic about where it runs relative to join reordering? What kind of issue does it cause for join reordering that forces this to run after?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added comment here:
// Must run after join reordering because join reordering creates
// new join nodes without JoinNode.maySkipOutputDuplicates flag set
...ava/io/prestosql/sql/planner/iterative/rule/OptimizeOutputDuplicatesInsensitiveJoinRule.java
Outdated
Show resolved
Hide resolved
@Override | ||
public Result apply(AggregationNode aggregation, Captures captures, Context context) | ||
{ | ||
return aggregation.getSource().accept(new Rewriter(metadata, context.getLookup()), null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should avoid visitor-based rewrites as much as possible. We're trying to move away from them, as they make it hard to have a fully exploratory optimizer that considers multiple plans simultaneously.
What we need to do this generically is a description of the properties (traits) of the data coming out of the source of the aggregation, possibly, involving functional dependencies to be able to infer whether certain columns are guaranteed to be unique based on whether they are derived from other unique columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. In this case, there should be a way to tell optimizer that I want an input plan node which is producing given trait. Optimizer should try to satisfy such requirement (if possible).
We had some ideas around how this can be done in: https://docs.google.com/presentation/d/1rbtJLG89GxEYLZYA09TH5cD4S6UvYXtq0OmdNqQnmws/edit#slide=id.g1fce08ab98_0_4
3bbbf33
to
9a4b113
Compare
fbc677f
to
09e4edc
Compare
core/trino-main/src/main/java/io/trino/SystemSessionProperties.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/SystemSessionProperties.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/LookupJoinOperator.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/LookupJoinOperator.java
Outdated
Show resolved
Hide resolved
...rc/main/java/io/trino/sql/planner/iterative/rule/OptimizeJoinsBelowEmptyAggregationRule.java
Outdated
Show resolved
Hide resolved
...rc/main/java/io/trino/sql/planner/iterative/rule/OptimizeJoinsBelowEmptyAggregationRule.java
Outdated
Show resolved
Hide resolved
...rc/main/java/io/trino/sql/planner/iterative/rule/OptimizeJoinsBelowEmptyAggregationRule.java
Outdated
Show resolved
Hide resolved
@@ -844,6 +844,28 @@ MatchResult detailMatches(PlanNode node, StatsProvider stats, Session session, M | |||
return match(newAliases.build()); | |||
} | |||
|
|||
public <T extends PlanNode> PlanMatchPattern with(Class<T> clazz, Predicate<T> predicate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe call this matching
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's simplification of with
method below that accepts full Matcher
. Also withXXX
seems to be convention in this class.
09e4edc
to
4fbf5ef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ac @martint
...rc/main/java/io/trino/sql/planner/iterative/rule/OptimizeJoinsBelowEmptyAggregationRule.java
Outdated
Show resolved
Hide resolved
...rc/main/java/io/trino/sql/planner/iterative/rule/OptimizeJoinsBelowEmptyAggregationRule.java
Outdated
Show resolved
Hide resolved
@@ -844,6 +844,28 @@ MatchResult detailMatches(PlanNode node, StatsProvider stats, Session session, M | |||
return match(newAliases.build()); | |||
} | |||
|
|||
public <T extends PlanNode> PlanMatchPattern with(Class<T> clazz, Predicate<T> predicate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's simplification of with
method below that accepts full Matcher
. Also withXXX
seems to be convention in this class.
For empty aggregations duplicate input rows can be skipped. Upstream joins can take advantage of this fact and skip producing of duplicate output rows.
4fbf5ef
to
da74bbe
Compare
No description provided.