forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-47070] Fix invalid aggregation after subquery rewrite
## What changes were proposed in this pull request? **tl;dr** This PR fixes a bug related to an `exists` variable being lost after an incorrect subquery rewrite when `exists` is not used neither in grouping expressions nor in aggregate functions. We wrap such variable in `first()` agg func to not lose reference to it. **Motivation** Imagine we had a plan with a subquery: ``` Aggregate [a1#0] [CASE WHEN a1#0 IN (list#3999 []) THEN Hello ELSE Hi END AS strCol#13] : +- LocalRelation <empty>, [b1#3, b2#4, b3#5] + LocalRelation <empty>, [a1#0, a2#1, a3#2] ``` During correlated subquery rewrite, the rule `RewritePredicateSubquery` would rewrite expression `a1#0 IN (list#3999 [])` into `exists#12` and replace the subquery with `ExistenceJoin`, like so: ``` Aggregate [a1#0] [CASE WHEN exists#12 THEN Hello ELSE Hi END AS strCol#13] +- Join ExistenceJoin(exists#12), (a1#0 = b1#3) +- LocalRelation <empty>, [a1#0, a2#1, a3#2] +- LocalRelation <empty>, [b1#3, b2#4, b3#5] ``` Note that `exists#12` doesn't appear neither in the grouping expressions, nor is part of any aggregate function. This is an invalid aggregation. In particular, aggregate pushdown rule rewrite this plan into: ``` Project [CASE WHEN exists#12 THEN Hello WHEN true THEN Hi END AS strCol#13] +- AggregatePart [a1#0], true +- AggregatePart [a1#0], false +- Join ExistenceJoin(exists#12), (a1#0 = b1#3) :- AggregatePart [a1#0], false : +- LocalRelation <empty>, [a1#0, a2#1, a3#2] +- AggregatePart [b1#3], false +- LocalRelation <empty>, [b1#3, b2#4, b3#5] ``` **Solution** We fix the problem by wrapping such `exists` attributes in `first()` function, which is Spark's executable of `any_value()`. Note that such `exists` is always functionally determined by grouping keys, and thus wrapping it in any aggregate function preserving its unique value is safe. Specifically, we only wrap `exists` attributes if they are referenced among aggregate expressions, but NOT within an aggregate function or its filter. Note that a new `exists` attribute cannot appear in groupingExpressions. **Original proposal (NOT used)** The decision is to fix the bug in the `RewritePredicateSubquery` by enforcing the condition that newly introduced variables, if referenced among agg expressions, must either participate in aggregate functions, or appear in the grouping keys. With the fix, the plan after `RewritePredicateSubquery` will look like: ``` Aggregate [a1#0, exists#12] [CASE WHEN exists#12 THEN Hello ELSE Hi END AS strCol#13] +- Join ExistenceJoin(exists#12), (a1#0 = b1#3) +- LocalRelation <empty>, [a1#0, a2#1, a3#2] +- LocalRelation <empty>, [b1#3, b2#4, b3#5] ``` **NOTE:** It is still possible to manually construct ExistenceJoin (e.g via dsl) and an Aggregate on top of it that violate the condition. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Query tests Closes apache#45133 from anton5798/subquery-exists-agg. Authored-by: Anton Lykov <anton.lykov@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
- Loading branch information
Showing
7 changed files
with
736 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.