Skip to content

Commit

Permalink
[SPARK-31903][SQL][PYSPARK][2.4] Fix toPandas with Arrow enabled to s…
Browse files Browse the repository at this point in the history
…how metrics in Query UI

### What changes were proposed in this pull request?

This is a backport of apache#28730.

In `Dataset.collectAsArrowToPython`, since the code block for `serveToStream` is run in the separate thread, `withAction` finishes as soon as it starts the thread. As a result, it doesn't collect the metrics of the actual action and Query UI shows the plan graph without metrics.

We should call `serveToStream` first, then `withAction` in it.

### Why are the changes needed?

When calling toPandas, usually Query UI shows each plan node's metric:

```py
>>> df = spark.createDataFrame([(1, 10, 'abc'), (2, 20, 'def')], schema=['x', 'y', 'z'])
>>> df.toPandas()
   x   y    z
0  1  10  abc
1  2  20  def
```

![Screen Shot 2020-06-05 at 10 58 30 AM](https://user-images.githubusercontent.com/506656/83914110-6f3b3080-a725-11ea-88c0-de83a833b05c.png)

but if Arrow execution is enabled, it shows only plan nodes and the duration is not correct:

```py
>>> spark.conf.set('spark.sql.execution.arrow.enabled', True)
>>> df.toPandas()
   x   y    z
0  1  10  abc
1  2  20  def
```

![Screen Shot 2020-06-05 at 10 58 42 AM](https://user-images.githubusercontent.com/506656/83914127-782c0200-a725-11ea-84e4-74d861d5c20a.png)

### Does this PR introduce _any_ user-facing change?

Yes, the Query UI will show the plan with the correct metrics.

### How was this patch tested?

I checked it manually in my local.

![Screen Shot 2020-06-05 at 11 29 48 AM](https://user-images.githubusercontent.com/506656/83914142-7e21e300-a725-11ea-8925-edc22df16388.png)

Closes apache#28740 from ueshin/issues/SPARK-31903/2.4/to_pandas_with_arrow_query_ui.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
ueshin authored and HyukjinKwon committed Jun 6, 2020
1 parent bf01280 commit 476010a
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3283,8 +3283,8 @@ class Dataset[T] private[sql](
private[sql] def collectAsArrowToPython(): Array[Any] = {
val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone

withAction("collectAsArrowToPython", queryExecution) { plan =>
PythonRDD.serveToStreamWithSync("serve-Arrow") { out =>
PythonRDD.serveToStreamWithSync("serve-Arrow") { out =>
withAction("collectAsArrowToPython", queryExecution) { plan =>
val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId)
val arrowBatchRdd = toArrowBatchRdd(plan)
val numPartitions = arrowBatchRdd.partitions.length
Expand Down

0 comments on commit 476010a

Please sign in to comment.