Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Add Alias column support in join condition for JoinIndex rule #25

Closed
apoorvedave1 opened this issue Jun 23, 2020 · 6 comments
Closed

Add Alias column support in join condition for JoinIndex rule #25

apoorvedave1 opened this issue Jun 23, 2020 · 6 comments
Assignees
Labels
enhancement New feature or request good first issue Good for newcomers
Milestone

Comments

@apoorvedave1
Copy link
Contributor

apoorvedave1 commented Jun 23, 2020

Describe the issue
Add support for org.apache.spark.sql.catalyst.expressions.Alias as column type in JoinIndexRule.scala. Currently, this rule only supports Join Conditions on org.apache.spark.sql.catalyst.expressions.Attributes . Consider a case where a column c from the base table has an alias aliasC. A Join query with c will be able to utilize indexes but a query with aliasC won't be.

To Reproduce
Create a query plan of the form

select T1.A, T2Temp.B 
from T1 
INNER JOIN (
   select c as aliasC 
   from T2
) as T2Temp 
where T1.C = T2Temp.aliasC

Expected behavior
Query utilizes index

Screenshots
NA
Desktop (please complete the following information):
NA
Additional context
NA

@apoorvedave1 apoorvedave1 added untriaged This is the default tag for a newly created issue enhancement New feature or request and removed untriaged This is the default tag for a newly created issue labels Jun 23, 2020
@rapoth rapoth added this to the 0.3.0 milestone Jun 26, 2020
@rapoth rapoth added the good first issue Good for newcomers label Jul 8, 2020
@rapoth
Copy link
Contributor

rapoth commented Jul 9, 2020

CC: @sezruby

@rapoth
Copy link
Contributor

rapoth commented Jul 16, 2020

CC: @imback82

@imback82
Copy link
Contributor

imback82 commented Jul 17, 2020

I think this should have been fixed by apache/spark#26943 in Spark 3.0, and I don't think it's worthwhile to update the rule to support this scenario for Spark 2.4.

For the following:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
val df1 = (0 until 100).map(i => (i % 5, i % 11)).toDF("i1", "j1")
val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("i2", "j2")
df1.write.format("parquet").bucketBy(8, "i1").saveAsTable("t1")
df2.write.format("parquet").bucketBy(8, "i2").saveAsTable("t2")
sql("SELECT t1.i1, t2Temp.aliasC FROM t1 INNER JOIN (SELECT i2 as aliasC from t2) as t2Temp WHERE t1.i1 = t2Temp.aliasC").explain

Spark 2.4.5 outputs:

== Physical Plan ==
*(5) SortMergeJoin [i1#31], [aliasC#37], Inner
:- *(2) Sort [i1#31 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i1#31, 200)
:     +- *(1) Project [i1#31]
:        +- *(1) Filter isnotnull(i1#31)
:           +- *(1) FileScan parquet default.t1[i1#31] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:...], PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int>, SelectedBucketsCount: 8 out of 8
+- *(4) Sort [aliasC#37 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(aliasC#37, 200)
      +- *(3) Project [i2#33 AS aliasC#37]
         +- *(3) Filter isnotnull(i2#33)
            +- *(3) FileScan parquet default.t2[i2#33] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:...], PartitionFilters: [], PushedFilters: [IsNotNull(i2)], ReadSchema: struct<i2:int>, SelectedBucketsCount: 8 out of 8

whereas Spark 3.0 prints:

== Physical Plan ==
*(3) SortMergeJoin [i1#27], [aliasC#33], Inner
:- *(1) Sort [i1#27 ASC NULLS FIRST], false, 0
:  +- *(1) Project [i1#27]
:     +- *(1) Filter isnotnull(i1#27)
:        +- *(1) ColumnarToRow
:           +- FileScan parquet default.t1[i1#27] Batched: true, DataFilters: [isnotnull(i1#27)], Format: Parquet, Location: InMemoryFileIndex[file:...], PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Sort [aliasC#33 ASC NULLS FIRST], false, 0
   +- *(2) Project [i2#29 AS aliasC#33]
      +- *(2) Filter isnotnull(i2#29)
         +- *(2) ColumnarToRow
            +- FileScan parquet default.t2[i2#29] Batched: true, DataFilters: [isnotnull(i2#29)], Format: Parquet, Location: InMemoryFileIndex[file:...], PartitionFilters: [], PushedFilters: [IsNotNull(i2)], ReadSchema: struct<i2:int>, SelectedBucketsCount: 8 out of 8

@rapoth
Copy link
Contributor

rapoth commented Jul 17, 2020

Great thanks! I'm closing this issue.

@rapoth rapoth closed this as completed Jul 17, 2020
@imback82
Copy link
Contributor

@apoorvedave1 Can you confirm if this is not needed in the join rule? I see that we clean up the aliases, so matching should be ok, right?

@apoorvedave1
Copy link
Contributor Author

yes this looks like a solved issue now. Thanks for checking @imback82

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request good first issue Good for newcomers
Projects
None yet
Development

No branches or pull requests

4 participants