Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multi-join dynamic filtering #2659

Merged
merged 1 commit into from
Jan 31, 2020

Conversation

rzeyde-varada
Copy link
Contributor

@rzeyde-varada rzeyde-varada commented Jan 28, 2020

It seems that dynamic filters may get "stuck" in join's node filter (when they can't be pushed down to scan nodes), e.g. see "lower" join node in the following plan:

> EXPLAIN SELECT k1 FROM t0, t1, t2 WHERE (k0 = k1) AND (k0 = k2) AND (v0 + v1 = v2)

Output[k1]
│   Layout: [k0:integer]
│   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
│   k1 := k0
└─ RemoteExchange[GATHER]
   │   Layout: [k0:integer]
   │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
   └─ InnerJoin[("k0" = "k2") AND ("expr_3" = "v2")][$hashvalue_7, $hashvalue_8]
      │   Layout: [k0:integer]
      │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
      │   Distribution: REPLICATED
      │   dynamicFilterAssignments = {k2 -> 421}
      ├─ Project[]
      │  │   Layout: [expr_3:real, k0:integer, $hashvalue_7:bigint]
      │  │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
      │  │   $hashvalue_7 := "combine_hash"("combine_hash"(bigint '0', COALESCE("$operator$hash_code"("k0"), 0)), COALESCE("$operator$hash_code"("expr_3"), 0))
      │  └─ Project[]
      │     │   Layout: [expr_3:real, k0:integer]
      │     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
      │     │   expr_3 := ("v0" + "v1")
      │     └─ InnerJoin[("k0" = "k1") AND (("$internal$dynamic_filter_function"(("v0" + "v1"), 'EQUAL', '123') AND "$internal$dynamic_filter_function"(("v0" + "v1"), 'EQUAL', '422')) AND "$internal$dynamic_filter_function"(("v0" + "v1"), 'EQUAL', '254'))][$hashvalue, $hashvalue_4]
      │        │   Layout: [k0:integer, v0:real, v1:real]
      │        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
      │        │   Distribution: REPLICATED
      │        │   dynamicFilterAssignments = {k1 -> 424}
      │        ├─ ScanFilterProject[table = memory:0, filterPredicate = ("@$internal$dynamic_filter_function|scalar|boolean|integer|varchar|varchar@$internal$dynamic_filter_function<t>(t,varchar,varchar):boolean"("k0", 'EQUAL', '421') AND "@$internal$dynamic_filter_function|scalar|boolean|integer|varchar|varchar@$internal$dynamic_filter_function<t>(t,varchar,varchar):boolean"("k0", 'EQUAL', '424'))]
      │        │      Layout: [k0:integer, v0:real, $hashvalue:bigint]
      │        │      Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
      │        │      $hashvalue := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("k0"), 0))
      │        │      k0 := 0
      │        │      v0 := 1
      │        └─ LocalExchange[HASH][$hashvalue_4] ("k1")
      │           │   Layout: [k1:integer, v1:real, $hashvalue_4:bigint]
      │           │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
      │           └─ RemoteExchange[REPLICATE]
      │              │   Layout: [k1:integer, v1:real, $hashvalue_5:bigint]
      │              │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
      │              └─ ScanFilterProject[table = memory:1, filterPredicate = "@$internal$dynamic_filter_function|scalar|boolean|integer|varchar|varchar@$internal$dynamic_filter_function<t>(t,varchar,varchar):boolean"("k1", 'EQUAL', '421')]
      │                     Layout: [k1:integer, v1:real, $hashvalue_6:bigint]
      │                     Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
      │                     $hashvalue_6 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("k1"), 0))
      │                     k1 := 0
      │                     v1 := 1
      └─ LocalExchange[HASH][$hashvalue_8] ("k2", "v2")
         │   Layout: [k2:integer, v2:real, $hashvalue_8:bigint]
         │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
         └─ RemoteExchange[REPLICATE]
            │   Layout: [k2:integer, v2:real, $hashvalue_9:bigint]
            │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
            └─ ScanProject[table = memory:2]
                   Layout: [k2:integer, v2:real, $hashvalue_10:bigint]
                   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
                   $hashvalue_10 := "combine_hash"("combine_hash"(bigint '0', COALESCE("$operator$hash_code"("k2"), 0)), COALESCE("$operator$hash_code"("v2"), 0))
                   k2 := 0
                   v2 := 1

In this case, Presto fails during the execution - since applying dynamic filters is implemented only for ScanFilterAndProject operators (and not LookupJoin operators):

[ERROR] Tests run: 17, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 47.153 s <<< FAILURE! - in io.prestosql.plugin.memory.TestMemorySmoke
[ERROR] testJoinDynamicFilteringMultiJoin(io.prestosql.plugin.memory.TestMemorySmoke)  Time elapsed: 1.321 s  <<< FAILURE!
java.lang.AssertionError: Execution of 'actual' query failed: SELECT k0, k1, k2 FROM t0, t1, t2 WHERE (k0 = k1) AND (k0 = k2) AND (v0 + v1 = v2)
	at org.testng.Assert.fail(Assert.java:83)
	at io.prestosql.testing.QueryAssertions.assertQuery(QueryAssertions.java:147)
	at io.prestosql.testing.QueryAssertions.assertQuery(QueryAssertions.java:103)
	at io.prestosql.testing.AbstractTestQueryFramework.assertQuery(AbstractTestQueryFramework.java:135)
	at io.prestosql.plugin.memory.TestMemorySmoke.testJoinDynamicFilteringMultiJoin(TestMemorySmoke.java:156)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:104)
	at org.testng.internal.Invoker.invokeMethod(Invoker.java:645)
	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:851)
	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1177)
	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129)
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: java.lang.UnsupportedOperationException
	at io.prestosql.testing.AbstractTestingPrestoClient.execute(AbstractTestingPrestoClient.java:114)
	at io.prestosql.testing.DistributedQueryRunner.execute(DistributedQueryRunner.java:400)
	at io.prestosql.testing.QueryAssertions.assertQuery(QueryAssertions.java:144)
	... 16 more
Caused by: java.lang.UnsupportedOperationException
	at io.prestosql.sql.DynamicFilters$Function.dynamicFilter(DynamicFilters.java:206)
	at io.prestosql.$gen.JoinFilterFunction_20200128_223248_88.filter(Unknown Source)
	at io.prestosql.operator.StandardJoinFilterFunction.filter(StandardJoinFilterFunction.java:49)
	at io.prestosql.operator.JoinHash.isJoinPositionEligible(JoinHash.java:118)
	at io.prestosql.operator.PartitionedLookupSource.isJoinPositionEligible(PartitionedLookupSource.java:169)
	at io.prestosql.operator.LookupJoinOperator$JoinProcessor.joinCurrentPosition(LookupJoinOperator.java:616)
	at io.prestosql.operator.LookupJoinOperator$JoinProcessor.processProbe(LookupJoinOperator.java:547)
	at io.prestosql.operator.LookupJoinOperator$JoinProcessor.lambda$processProbe$3(LookupJoinOperator.java:480)
	at io.prestosql.operator.PartitionedLookupSourceFactory$SpillAwareLookupSourceProvider.withLease(PartitionedLookupSourceFactory.java:437)
	at io.prestosql.operator.LookupJoinOperator$JoinProcessor.processProbe(LookupJoinOperator.java:477)
	at io.prestosql.operator.LookupJoinOperator$JoinProcessor.getOutput(LookupJoinOperator.java:392)
	at io.prestosql.operator.LookupJoinOperator$JoinProcessor.process(LookupJoinOperator.java:688)
	at io.prestosql.operator.LookupJoinOperator$JoinProcessor.process(LookupJoinOperator.java:139)
	at io.prestosql.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:319)
	at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
	at io.prestosql.operator.WorkProcessorOperatorAdapter.getOutput(WorkProcessorOperatorAdapter.java:90)
	at io.prestosql.operator.Driver.processInternal(Driver.java:379)
	at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
	at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
	at io.prestosql.operator.Driver.processFor(Driver.java:276)
	at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
	at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
	at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
	at io.prestosql.$gen.Presto_null__testversion____20200128_223220_4.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

IIUC, the fix may be to ignore dynamic filters that are left at the join nodes - since they should not affect correctness.

@cla-bot cla-bot bot added the cla-signed label Jan 28, 2020
@raunaqmorarka raunaqmorarka added the bug Something isn't working label Jan 29, 2020
Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm % @raunaqmorarka comments

@rzeyde-varada rzeyde-varada force-pushed the fix-multijoin-df branch 2 times, most recently from 066ea1f to 6d4a8f0 Compare January 29, 2020 22:09
@rzeyde-varada rzeyde-varada force-pushed the fix-multijoin-df branch 2 times, most recently from 2d80c7b to d58e74a Compare January 30, 2020 20:19
@rzeyde-varada
Copy link
Contributor Author

Rebased and force-pushed (to re-run which has failed on seemingly unrelated test).

@sopel39 sopel39 merged commit f1113ef into trinodb:master Jan 31, 2020
@sopel39 sopel39 mentioned this pull request Jan 31, 2020
7 tasks
@sopel39
Copy link
Member

sopel39 commented Jan 31, 2020

merged, thanks!

@rzeyde-varada rzeyde-varada deleted the fix-multijoin-df branch February 2, 2020 07:28
@findepi findepi added this to the 330 milestone May 6, 2020
kewang1024 added a commit to kewang1024/presto that referenced this pull request Aug 21, 2020
cherry pick from trinodb/trino#2659

Co-Authored-By: Roman Zeyde <zeyde@varada.io>
highker pushed a commit to prestodb/presto that referenced this pull request Aug 21, 2020
cherry pick from trinodb/trino#2659

Co-Authored-By: Roman Zeyde <zeyde@varada.io>
shangxinli pushed a commit to shangxinli/presto that referenced this pull request Nov 18, 2020
cherry pick from trinodb/trino#2659

Co-Authored-By: Roman Zeyde <zeyde@varada.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working cla-signed
Development

Successfully merging this pull request may close these issues.

4 participants