Skip to content

Commit

Permalink
[SPARK-38013][SQL][TEST] AQE can change bhj to smj if no extra shuffl…
Browse files Browse the repository at this point in the history
…e introduce

### What changes were proposed in this pull request?

Add a test case in `AdaptiveQueryExecSuite`.

### Why are the changes needed?

AQE can change bhj to smj, and it requires two conditions:
- no extra shuffle introduce, otherwise the built-in cost evaluator will ban it
- AQE does not think the join can be planned as broadcast join. That says the cost statistics in normal planner is not accurate.

It's counterintuitive, but it's an expected behavior as AQE designed.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Pass CI

Closes apache#35353 from ulysses-you/bhj-smj.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit dc2fd57)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 6fc9793)
  • Loading branch information
ulysses-you authored and huaxingao committed Feb 14, 2022
1 parent 47f6e4b commit 3f2a77d
Showing 1 changed file with 23 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,29 @@ class AdaptiveQueryExecSuite
}
}

test("Change broadcast join to merge join") {
withTable("t1", "t2") {
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10000",
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
sql("CREATE TABLE t1 USING PARQUET AS SELECT 1 c1")
sql("CREATE TABLE t2 USING PARQUET AS SELECT 1 c1")
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"""
|SELECT * FROM (
| SELECT distinct c1 from t1
| ) tmp1 JOIN (
| SELECT distinct c1 from t2
| ) tmp2 ON tmp1.c1 = tmp2.c1
|""".stripMargin)
assert(findTopLevelBroadcastHashJoin(plan).size == 1)
assert(findTopLevelBroadcastHashJoin(adaptivePlan).isEmpty)
assert(findTopLevelSortMergeJoin(adaptivePlan).size == 1)
}
}
}

test("Reuse the parallelism of coalesced shuffle in local shuffle read") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
Expand Down

0 comments on commit 3f2a77d

Please sign in to comment.