Skip to content

Commit

Permalink
[SPARK-30722][DOCS][FOLLOW-UP] Explicitly mention the same entire inp…
Browse files Browse the repository at this point in the history
…ut/output length restriction of Series Iterator UDF

### What changes were proposed in this pull request?

This PR explicitly mention that the requirement of Iterator of Series to Iterator of Series and Iterator of Multiple Series to Iterator of Series (previously Scalar Iterator pandas UDF).

The actual limitation of this UDF is the same length of the _entire input and output_, instead of each series's length. Namely you can do something as below:

```python
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import pandas_udf

pandas_udf("long")
def func(
        iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    return iter([pd.concat(iterator)])

spark.range(100).select(func("id")).show()
```

This characteristic allows you to prefetch the data from the iterator to speed up, compared to the regular Scalar to Scalar (previously Scalar pandas UDF).

### Why are the changes needed?

To document the correct restriction and characteristics of a feature.

### Does this PR introduce any user-facing change?

Yes in the documentation but only in unreleased branches.

### How was this patch tested?

Github Actions should test the documentation build

Closes apache#28160 from HyukjinKwon/SPARK-30722-followup.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
HyukjinKwon authored and Seongjin Cho committed Apr 14, 2020
1 parent 2c9e79a commit 471b812
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 21 deletions.
35 changes: 20 additions & 15 deletions docs/sql-pyspark-pandas-with-arrow.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ In the following sections, it describes the combinations of the supported type h

The type hint can be expressed as `pandas.Series`, ... -> `pandas.Series`.

By using `pandas_udf` with the function having such type hints, it creates a Pandas UDF where the given
By using `pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the given
function takes one or more `pandas.Series` and outputs one `pandas.Series`. The output of the function should
always be of the same length as the input. Internally, PySpark will execute a Pandas UDF by splitting
columns into batches and calling the function for each batch as a subset of the data, then concatenating
Expand All @@ -118,13 +118,15 @@ For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/p

The type hint can be expressed as `Iterator[pandas.Series]` -> `Iterator[pandas.Series]`.

By using `pandas_udf` with the function having such type hints, it creates a Pandas UDF where the given
function takes an iterator of `pandas.Series` and outputs an iterator of `pandas.Series`. The output of each
series from the function should always be of the same length as the input. In this case, the created
Pandas UDF requires one input column when the Pandas UDF is called. To use multiple input columns,
a different type hint is required. See Iterator of Multiple Series to Iterator of Series.
By using `pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the given
function takes an iterator of `pandas.Series` and outputs an iterator of `pandas.Series`. The
length of the entire output from the function should be the same length of the entire input; therefore, it can
prefetch the data from the input iterator as long as the lengths are the same.
In this case, the created Pandas UDF requires one input column when the Pandas UDF is called. To use
multiple input columns, a different type hint is required. See Iterator of Multiple Series to Iterator
of Series.

It is useful when the UDF execution requires initializing some states although internally it works
It is also useful when the UDF execution requires initializing some states although internally it works
identically as Series to Series case. The pseudocode below illustrates the example.

{% highlight python %}
Expand Down Expand Up @@ -153,10 +155,11 @@ For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/p

The type hint can be expressed as `Iterator[Tuple[pandas.Series, ...]]` -> `Iterator[pandas.Series]`.

By using `pandas_udf` with the function having such type hints, it creates a Pandas UDF where the
By using `pandas_udf` with the function having such type hints above, it creates a Pandas UDF where the
given function takes an iterator of a tuple of multiple `pandas.Series` and outputs an iterator of `pandas.Series`.
In this case, the created pandas UDF requires multiple input columns as many as the series in the tuple
when the Pandas UDF is called. It works identically as Iterator of Series to Iterator of Series case except the parameter difference.
when the Pandas UDF is called. Otherwise, it has the same characteristics and restrictions as Iterator of Series
to Iterator of Series case.

The following example shows how to create this Pandas UDF:

Expand All @@ -172,7 +175,7 @@ For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/p

The type hint can be expressed as `pandas.Series`, ... -> `Any`.

By using `pandas_udf` with the function having such type hints, it creates a Pandas UDF similar
By using `pandas_udf` with the function having such type hints above, it creates a Pandas UDF similar
to PySpark's aggregate functions. The given function takes `pandas.Series` and returns a scalar value.
The return type should be a primitive data type, and the returned scalar can be either a python
primitive type, e.g., `int` or `float` or a numpy data type, e.g., `numpy.int64` or `numpy.float64`.
Expand All @@ -198,12 +201,14 @@ For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/p

## Pandas Function APIs

Pandas function APIs can directly apply a Python native function against the whole DataFrame by
using Pandas instances. Internally it works similarly with Pandas UDFs by Spark using Arrow to transfer
data and Pandas to work with the data, which allows vectorized operations. A Pandas function API behaves
as a regular API under PySpark `DataFrame` in general.
Pandas Function APIs can directly apply a Python native function against the whole `DataFrame` by
using Pandas instances. Internally it works similarly with Pandas UDFs by using Arrow to transfer
data and Pandas to work with the data, which allows vectorized operations. However, A Pandas Function
API behaves as a regular API under PySpark `DataFrame` instead of `Column`, and Python type hints in Pandas
Functions APIs are optional and do not affect how it works internally at this moment although they
might be required in the future.

From Spark 3.0, Grouped map pandas UDF is now categorized as a separate Pandas Function API,
From Spark 3.0, grouped map pandas UDF is now categorized as a separate Pandas Function API,
`DataFrame.groupby().applyInPandas()`. It is still possible to use it with `PandasUDFType`
and `DataFrame.groupby().apply()` as it was; however, it is preferred to use
`DataFrame.groupby().applyInPandas()` directly. Using `PandasUDFType` will be deprecated
Expand Down
12 changes: 6 additions & 6 deletions python/pyspark/sql/pandas/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,11 @@ def pandas_udf(f=None, returnType=None, functionType=None):
The function takes an iterator of `pandas.Series` and outputs an iterator of
`pandas.Series`. In this case, the created pandas UDF instance requires one input
column when this is called as a PySpark column. The output of each series from
the function should always be of the same length as the input.
column when this is called as a PySpark column. The length of the entire output from
the function should be the same length of the entire input; therefore, it can
prefetch the data from the input iterator as long as the lengths are the same.
It is useful when the UDF execution
It is also useful when the UDF execution
requires initializing some states although internally it works identically as
Series to Series case. The pseudocode below illustrates the example.
Expand Down Expand Up @@ -194,9 +195,8 @@ def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
The function takes an iterator of a tuple of multiple `pandas.Series` and outputs an
iterator of `pandas.Series`. In this case, the created pandas UDF instance requires
input columns as many as the series when this is called as a PySpark column.
It works identically as Iterator of Series to Iterator of Series case except
the parameter difference. The output of each series from the function should always
be of the same length as the input.
Otherwise, it has the same characteristics and restrictions as Iterator of Series
to Iterator of Series case.
>>> from typing import Iterator, Tuple
>>> from pyspark.sql.functions import struct, col
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,11 @@ def map_batch(batch):
num_output_rows = 0
for result_batch, result_type in result_iter:
num_output_rows += len(result_batch)
# This assert is for Scalar Iterator UDF to fail fast.
# The length of the entire input can only be explicitly known
# by consuming the input iterator in user side. Therefore,
# it's very unlikely the output length is higher than
# input length.
assert is_map_iter or num_output_rows <= num_input_rows[0], \
"Pandas SCALAR_ITER UDF outputted more rows than input rows."
yield (result_batch, result_type)
Expand Down

0 comments on commit 471b812

Please sign in to comment.