From 476010aedd101e1a807c202d71328415109660ae Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 6 Jun 2020 16:50:40 +0900 Subject: [PATCH] [SPARK-31903][SQL][PYSPARK][2.4] Fix toPandas with Arrow enabled to show metrics in Query UI ### What changes were proposed in this pull request? This is a backport of #28730. In `Dataset.collectAsArrowToPython`, since the code block for `serveToStream` is run in the separate thread, `withAction` finishes as soon as it starts the thread. As a result, it doesn't collect the metrics of the actual action and Query UI shows the plan graph without metrics. We should call `serveToStream` first, then `withAction` in it. ### Why are the changes needed? When calling toPandas, usually Query UI shows each plan node's metric: ```py >>> df = spark.createDataFrame([(1, 10, 'abc'), (2, 20, 'def')], schema=['x', 'y', 'z']) >>> df.toPandas() x y z 0 1 10 abc 1 2 20 def ``` ![Screen Shot 2020-06-05 at 10 58 30 AM](https://user-images.githubusercontent.com/506656/83914110-6f3b3080-a725-11ea-88c0-de83a833b05c.png) but if Arrow execution is enabled, it shows only plan nodes and the duration is not correct: ```py >>> spark.conf.set('spark.sql.execution.arrow.enabled', True) >>> df.toPandas() x y z 0 1 10 abc 1 2 20 def ``` ![Screen Shot 2020-06-05 at 10 58 42 AM](https://user-images.githubusercontent.com/506656/83914127-782c0200-a725-11ea-84e4-74d861d5c20a.png) ### Does this PR introduce _any_ user-facing change? Yes, the Query UI will show the plan with the correct metrics. ### How was this patch tested? I checked it manually in my local. ![Screen Shot 2020-06-05 at 11 29 48 AM](https://user-images.githubusercontent.com/506656/83914142-7e21e300-a725-11ea-8925-edc22df16388.png) Closes #28740 from ueshin/issues/SPARK-31903/2.4/to_pandas_with_arrow_query_ui. Authored-by: Takuya UESHIN Signed-off-by: HyukjinKwon --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 6bc0d0bfaa3f4..a755a6f5e0371 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3283,8 +3283,8 @@ class Dataset[T] private[sql]( private[sql] def collectAsArrowToPython(): Array[Any] = { val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone - withAction("collectAsArrowToPython", queryExecution) { plan => - PythonRDD.serveToStreamWithSync("serve-Arrow") { out => + PythonRDD.serveToStreamWithSync("serve-Arrow") { out => + withAction("collectAsArrowToPython", queryExecution) { plan => val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId) val arrowBatchRdd = toArrowBatchRdd(plan) val numPartitions = arrowBatchRdd.partitions.length