New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DatasourceScanExec uses runtime sparksession #93
Conversation
@robert3005 can you explain why DataSources should use the SparkSession from time of execution instead of time of creation? Independent of any changes we make internally I want to make sure this is the right design for Spark directly. |
This lets you change settings at runtime instead of needing to create a new dataset. When you look around at the code you see they're using active session upon creation in most physical plans. I think for IO related work it makes sense since this interacts with hadoop and allows you to configure hadoop settings at runtime instead of creation/first exec time. There's definitely a bug here where hadoopfsrelation is using spark session from logical plan which I haven't seen anywhere else and seems wrong. |
val supportsBatch = relation.fileFormat.supportBatch( | ||
relation.sparkSession, StructType.fromAttributes(output)) | ||
def supportsBatch: Boolean = relation.fileFormat.supportBatch( | ||
SparkSession.getActiveSession.get, StructType.fromAttributes(output)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparkSession.getActiveSession.get
is used often enough -- maybe make it a private def on this class called activeSparkSession
?
val supportsBatchInitially = scan1.supportsBatch | ||
|
||
val newSession = spark.newSession() | ||
newSession.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you relying on this being true beforehand? explicitly set that key on the old spark session so this test works even if the default changes
} | ||
|
||
assert(scan1 == scan2) | ||
assert(fileScanRDD1 != fileScanRDD2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we more explicitly test that the active session is different by comparing the sessions directly? rather than a side effect of the session being different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's pretty difficult in this place. This is generic for all datasources and as a result you can't access some specific properties. The SparkSession used is pretty much an implementation detail, maybe can delete this and keep one in parquetquerysuite?
Do we also need to modify the See |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems reasonable, though I really don't know what's going on here.
What do you think of filing a SPARK ticket and sending this PR for review? It's a long chance but we could get a response in the next 24hr.
Yeah, will do. |
6746d9c
to
2f273cc
Compare
Will close in favour of upstream |
* Trigger scalatest plugin in the integration-test phase * Clean up unnecessary config section
* Trigger scalatest plugin in the integration-test phase * Clean up unnecessary config section
…tions ### What changes were proposed in this pull request? In order to avoid frequently changing the value of `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions`, we usually set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` much larger than `spark.sql.shuffle.partitions` after enabling adaptive execution, which causes some bucket map join lose efficacy and add more `ShuffleExchange`. How to reproduce: ```scala val bucketedTableName = "bucketed_table" spark.range(10000).write.bucketBy(500, "id").sortBy("id").mode(org.apache.spark.sql.SaveMode.Overwrite).saveAsTable(bucketedTableName) val bucketedTable = spark.table(bucketedTableName) val df = spark.range(8) spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) // Spark 2.4. spark.sql.adaptive.enabled=false // We set spark.sql.shuffle.partitions <= 500 every time based on our data in this case. spark.conf.set("spark.sql.shuffle.partitions", 500) bucketedTable.join(df, "id").explain() // Since 3.0. We enabled adaptive execution and set spark.sql.adaptive.shuffle.maxNumPostShufflePartitions to a larger values to fit more cases. spark.conf.set("spark.sql.adaptive.enabled", true) spark.conf.set("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions", 1000) bucketedTable.join(df, "id").explain() ``` ``` scala> bucketedTable.join(df, "id").explain() == Physical Plan == *(4) Project [id#5L] +- *(4) SortMergeJoin [id#5L], [id#7L], Inner :- *(1) Sort [id#5L ASC NULLS FIRST], false, 0 : +- *(1) Project [id#5L] : +- *(1) Filter isnotnull(id#5L) : +- *(1) ColumnarToRow : +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500 +- *(3) Sort [id#7L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#7L, 500), true, [id=palantir#49] +- *(2) Range (0, 8, step=1, splits=16) ``` vs ``` scala> bucketedTable.join(df, "id").explain() == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- Project [id#5L] +- SortMergeJoin [id#5L], [id#7L], Inner :- Sort [id#5L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#5L, 1000), true, [id=palantir#93] : +- Project [id#5L] : +- Filter isnotnull(id#5L) : +- FileScan parquet default.bucketed_table[id#5L] Batched: true, DataFilters: [isnotnull(id#5L)], Format: Parquet, Location: InMemoryFileIndex[file:/root/opensource/apache-spark/spark-3.0.0-SNAPSHOT-bin-3.2.0/spark-warehou..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 500 out of 500 +- Sort [id#7L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#7L, 1000), true, [id=palantir#92] +- Range (0, 8, step=1, splits=16) ``` This PR makes read bucketed tables always obeys `spark.sql.shuffle.partitions` even enabling adaptive execution and set `spark.sql.adaptive.shuffle.maxNumPostShufflePartitions` to avoid add more `ShuffleExchange`. ### Why are the changes needed? Do not degrade performance after enabling adaptive execution. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Unit test. Closes apache#26409 from wangyum/SPARK-29655. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…f-join (#93) * [SPARK-34354][SQL] Fix failure when apply CostBasedJoinReorder on self-join This PR introduces a new analysis rule `DeduplicateRelations`, which deduplicates any duplicate relations in a plan first and then deduplicates conflicting attributes(which resued the `dedupRight` of `ResolveReferences`). `CostBasedJoinReorder` could fail when applying on self-join, e.g., ```scala // test in JoinReorderSuite test("join reorder with self-join") { val plan = t2.join(t1, Inner, Some(nameToAttr("t1.k-1-2") === nameToAttr("t2.k-1-5"))) .select(nameToAttr("t1.v-1-10")) .join(t2, Inner, Some(nameToAttr("t1.v-1-10") === nameToAttr("t2.k-1-5"))) // this can fail Optimize.execute(plan.analyze) } ``` Besides, with the new rule `DeduplicateRelations`, we'd be able to enable some optimizations, e.g., LeftSemiAnti pushdown, redundant project removal, as reflects in updated unit tests. Added and updated unit tests. Closes apache#32027 from Ngone51/join-reorder-3. Lead-authored-by: yi.wu <yi.wu@databricks.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> * Undo tpcds-plan-stability changes * Resolve conflicts in list of rules in 'Resolution' batch * Resolve conflicts in #collectConflictPlans * Resolve conflicts in #dedupRight * Resolve conflicts in ResolveReferences#apply * Resolve conflicts around #newAliases and #findAliases * Resolve conflicts in AnalysisSuite * [SPARK-34178][SQL] Copy tags for the new node created by MultiInstanceRelation.newInstance Call `copyTagsFrom` for the new node created by `MultiInstanceRelation.newInstance()`. ```scala val df = spark.range(2) df.join(df, df("id") <=> df("id")).show() ``` For this query, it's supposed to be non-ambiguous join by the rule `DetectAmbiguousSelfJoin` because of the same attribute reference in the condition: https://github.com/apache/spark/blob/537a49fc0966b0b289b67ac9c6ea20093165b0da/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala#L125 However, `DetectAmbiguousSelfJoin` can not apply this prediction due to the right side plan doesn't contain the dataset_id TreeNodeTag, which is missing after `MultiInstanceRelation.newInstance`. That's why we should preserve the tags info for the copied node. Fortunately, the query is still considered as non-ambiguous join because `DetectAmbiguousSelfJoin` only checks the left side plan and the reference is the same as the left side plan. However, this's not the expected behavior but only a coincidence. No. Updated a unit test Closes apache#31260 from Ngone51/fix-missing-tags. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: yi.wu <yi.wu@databricks.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Very annoying feature of datasources where they would use sparksession from the time of creation. This should convert it to use the one that's active when executing. @pwoody @ash211 @mccheah
Will send upstream once we iron out the tests