- Title: User-defined Function (UDF) in PySpark
- Slug: pyspark-udf
- Date: 2020-11-27 22:55:16
- Category: Computer Science
- Tags: programming, Python, HPC, high performance computing, PySpark, UDF, pandas, pandas_udf, pandas UDF
- Author: Ben Du
- Modified: 2021-12-10 19:51:12


## Tips and Traps

1. The easist way to define a UDF in PySpark is to use the `@udf` tag,
    and similarly the easist way to define a Pandas UDF in PySpark is to use the `@pandas_udf` tag.
    Pandas UDFs are preferred to UDFs for server reasons.
    First, pandas UDFs are typically much faster than UDFs.
    Second, pandas UDFs are more flexible than UDFs on parameter passing.
    Both UDFs and pandas UDFs can take multiple columns as parameters.
    In addition, 
    pandas UDFs can take a DataFrame as parameter 
    (when passed to the `apply` function after `groupBy` is called).

2. You need to specify a value for the parameter `returnType`
    (the type of elements in the PySpark DataFrame Column)
    when creating a (pandas) UDF. 
    Both type objects (e.g., `StringType()`)
    and names of types (e.g., `"string"`) are accepted.
    Specifying names of types is simpler (as you do not have to import the corresponding types
    and names are short to type)
    but at the cost of losing the ability to do static type checking (e.g., using pylint) on the used return types. 
   
4. When invoking a (pandas) UDF, 
    you can either pass column expressions (e.g., `col("name")`)
    or names of columns (e.g., `"name"`) to it.
    It is suggested that you always use the explicit way (`col("name")`)
    as it avoids confusions in certain situations.
    
3. UDFs created using the tags `@udf` and `@pandas_udf` can only be used in DataFrame APIs but not in Spark SQL. 
    To use a UDF or Pandas UDF in Spark SQL, 
    you have to register it using `spark.udf.register`.
    Notice that `spark.udf.register` can not only register UDFs and pandas UDFS but also a regular Python function 
    (in which case you have to specify return types).

5. BinaryType has already been supported in versions earlier than Spark 2.4. 
    However, 
    conversion between a Spark DataFrame which contains BinaryType columns 
    and a pandas DataFrame (via pyarrow) is not supported until spark 2.4.
    
6. Pandas UDF leveraging PyArrow (>=0.15) causes `java.lang.IllegalArgumentException` in PySpark 2.4 
    (PySpark 3 has fixed issues completely). 
    Listed below are 3 ways to fix this issue. 
    For more discussions please refer to 
    [Apache Arrow in PySpark](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html),
    [PySpark pandas_udfs java.lang.IllegalArgumentException error](https://www.nuomiphp.com/eplan/en/52878.html)
    and
    [pandas udf not working with latest pyarrow release (0.15.0)](https://issues.apache.org/jira/browse/SPARK-29367)
    .
    
    1. Downgrade PyArrow to 0.14.1 (if you have to stick to PySpark 2.4).
    2. Set the environment variable `ARROW_PRE_0_15_IPC_FORMAT` to be `1` (if you have to stick to PySpark 2.4).
        You can do this using `spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT=1`
        and `spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT=1`.
    3. Use PySpark 3.
    

4. If your (pandas) UDF needs a non-Column parameter,
    there are 3 ways to achieve it. 
    
    - Use a global variable in your pandas UDF. 
    - Use a curried function which takes non-Column parameter(s)
        and return a (pandas) UDF (which then takes Columns as parameters).
        
            def comparator_udf(n):
                return udf(lambda c: c == n, BooleanType())

            df.where(comparator_udf("Bonsanto")(col("name")))
            
    - Simplify treat a non-Column parameter as a Column parameter
        and wrap the parameter into `lit` when invoking the (pandas) UDF. 
        
    The 1st way is error-prone and inflexible. 
    Both the 2nd and the last approaches are solid,
    however,
    the last ways is preferred 
    as it is universal and also more flexible 
    (if you want to use Column parameters to replace non-Column parameters later).
    

In [1]:
import pandas as pd
import findspark
findspark.init(str(next(Path("/opt/").glob("spark-*"))))
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, StringType, StructType
spark = SparkSession.builder.appName("PySpark UDF").enableHiveSupport().getOrCreate()

Picked up JAVA_TOOL_OPTIONS:  -Xmx2576m
Picked up JAVA_TOOL_OPTIONS:  -Xmx2576m
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/10 19:50:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/12/10 19:50:02 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.
21/12/10 19:50:02 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.
21/12/10 19:50:02 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.
21/12/10 19:50:03 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may c

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.net.BindException: Cannot assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.
	at sun.nio.ch.Net.bind0(Native Method)
	at sun.nio.ch.Net.bind(Net.java:461)
	at sun.nio.ch.Net.bind(Net.java:453)
	at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:222)
	at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:134)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:562)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1334)
	at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:506)
	at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:491)
	at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:973)
	at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:260)
	at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:356)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)


In [4]:
df_p = pd.DataFrame(
    data=[
        ["Ben", 2, 30],
        ["Dan", 4, 25],
        ["Will", 1, 26],
    ],
    columns=["name", "id", "age"]
)
df = spark.createDataFrame(df_p)
df.show()

+----+---+---+
|name| id|age|
+----+---+---+
| Ben|  2| 30|
| Dan|  4| 25|
|Will|  1| 26|
+----+---+---+



In [4]:
df.createOrReplaceTempView("table_df")

## Pandas UDF 

1. BinaryType has already been supported in versions earlier than Spark 2.4. 
    However, 
    conversion between a Spark DataFrame which contains BinaryType columns 
    and a pandas DataFrame (via pyarrow) is not supported until spark 2.4.

In [None]:
dir(PandasUDFType)

### Return a `pandas.Series`

The following pandas UDF take a `pandas.Series` (converted from a PySpark DataFrame Column on one partition) as parameter
and returns a `pandas.Series` of the same length.
Failure to stick to the constraint will cause runtime errors.


In [6]:
@pandas_udf("integer")
def age_plus_one(age: pd.Series) -> pd.Series:
    return age + 1

In [5]:
df.withColumn("age1", age_plus_one("age")).show()

+----+---+---+----+
|name| id|age|age1|
+----+---+---+----+
| Ben|  2| 30|  31|
| Dan|  4| 25|  26|
|Will|  1| 26|  27|
+----+---+---+----+



In [6]:
@pandas_udf("integer")
def age_plus_two(age: pd.Series) -> pd.Series:
    return pd.Series(a + 2 for a in age)

In [7]:
df.withColumn("age2", age_plus_two("age")).show()

+----+---+---+----+
|name| id|age|age2|
+----+---+---+----+
| Ben|  2| 30|  32|
| Dan|  4| 25|  27|
|Will|  1| 26|  28|
+----+---+---+----+



In [8]:
@pandas_udf("string")
def concat(name: pd.Series, age: pd.Series) -> pd.Series:
    return name + " is " + "age years old."

In [9]:
df.withColumn("intro", concat("name", "age")).show()

+----+---+---+--------------------+
|name| id|age|               intro|
+----+---+---+--------------------+
| Ben|  2| 30|Ben is age years ...|
| Dan|  4| 25|Dan is age years ...|
|Will|  1| 26|Will is age years...|
+----+---+---+--------------------+



In [10]:
@pandas_udf("string")
def concat2(name: pd.Series, age: pd.Series) -> pd.Series:
    return pd.Series(f"{name} is {age} years old." for name, age in zip(name, age))

In [11]:
df.withColumn("intro", concat2("name", "age")).show()

+----+---+---+--------------------+
|name| id|age|               intro|
+----+---+---+--------------------+
| Ben|  2| 30|Ben is 30 years old.|
| Dan|  4| 25|Dan is 25 years old.|
|Will|  1| 26|Will is 26 years ...|
+----+---+---+--------------------+



### Return a `pandas.DataFrame`

The following pandas UDF takes a pandas DataFrame as input and returns a pandas UDF. 
It can only be applied after `groupBy`.

In [None]:
@pandas_udf(
    returnType="id long, prob double",
    functionType=PandasUDFType.GROUPED_MAP
)
def predict_udf(df):
    df["prob"] = model.predict_proba(df[features])[:, 1]
    return df[["id", "prob"]]

In [None]:
df.groupBy(spark_partition_id()).apply(predict_udf)

## Pandas UDFs in Spark SQL

Pandas UDFs created using `@pandas_udf` can only be used in DataFrame APIs but not in Spark SQL.
To use a Pandas UDF in Spark SQL, 
you have to register it using `spark.udf.register`.
The same holds for UDFs. 
Notice that `spark.udf.register` can not only register pandas UDFS and UDFS but also a regular Python function 
(in which case you have to specify return types).

In [13]:
spark.sql("select *, age_plus_one(age) as age1 from table_df").show()

AnalysisException: Undefined function: 'age_plus_one'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 10

In [18]:
spark.udf.register("age_plus_one", age_plus_one)

<function __main__.age_plus_one(age: pandas.core.series.Series) -> pandas.core.series.Series>

In [19]:
spark.sql("select *, age_plus_one(age) as age1 from table_df").show()

+----+---+---+----+
|name| id|age|age1|
+----+---+---+----+
| Ben|  2| 30|  31|
| Dan|  4| 25|  26|
|Will|  1| 26|  27|
+----+---+---+----+



In [20]:
df.withColumn("age1", age_plus_one("age")).show()

+----+---+---+----+
|name| id|age|age1|
+----+---+---+----+
| Ben|  2| 30|  31|
| Dan|  4| 25|  26|
|Will|  1| 26|  27|
+----+---+---+----+



## Non-Column Parameters

If your (pandas) UDF needs a non-Column parameter,
simplify treat it as a Column parameter
and wrap the parameter into `lit` when invoking the (pandas) UDF. 
Another possibility is to define a regular Python function with takes non-Column parameters 
and return a (pandas) UDF.
The first approach is simpler, universal and also more flexible if later you want to use a Column parameters to replace the non-Column parameters.

In [7]:
@pandas_udf("integer")
def age_plus_one(age: pd.Series) -> pd.Series:
    return age + 1

In [9]:
df.withColumn("age0", age_plus_one(lit(0))).show()

+----+---+---+----+
|name| id|age|age0|
+----+---+---+----+
| Ben|  2| 30|   1|
| Dan|  4| 25|   1|
|Will|  1| 26|   1|
+----+---+---+----+



## UDF Taking One Column as Parameter

In [19]:
@udf(StringType())
def say_hello(name: str) -> str:
    return f"Hello {name}"

In [21]:
df.withColumn("greetings", say_hello(col("name"))).show()

+----+---+---+----------+
|name| id|age| greetings|
+----+---+---+----------+
| Ben|  2| 30| Hello Ben|
| Dan|  4| 25| Hello Dan|
|Will|  1| 26|Hello Will|
+----+---+---+----------+



In [21]:
def say_hello_2(name: str) -> str:
    return f"Hello {name}"

In [25]:
spark.udf.register("say_hello_udf", say_hello_2, StringType())


<function __main__.say_hello_2(name: str) -> str>

In [26]:
spark.sql("select *, say_hello_2(name) as hello from table_df").show()

+----+---+---+----------+
|name| id|age|     hello|
+----+---+---+----------+
| Ben|  2| 30| Hello Ben|
| Dan|  4| 25| Hello Dan|
|Will|  1| 26|Hello Will|
+----+---+---+----------+



In [27]:
df.withColumn(say_hello_udf("name")).show()

NameError: name 'say_hello_udf' is not defined

## UDF Taking Two Columns as Parameters

In [31]:
@udf("string")
def concat(name: str, age: int) -> str:
    return f"{name} is {age} years old."

In [32]:
df.withColumn("greetings", concat(col("name"), col("age"))).show()

+----+---+---+--------------------+
|name| id|age|           greetings|
+----+---+---+--------------------+
| Ben|  2| 30|Ben is 30 years old.|
| Dan|  4| 25|Dan is 25 years old.|
|Will|  1| 26|Will is 26 years ...|
+----+---+---+--------------------+



In [24]:
df.withColumn("greetings", concat("name", "age")).show()

+----+---+---+--------------------+
|name| id|age|           greetings|
+----+---+---+--------------------+
| Ben|  2| 30|Ben is 30 years old.|
| Dan|  4| 25|Dan is 25 years old.|
|Will|  1| 26|Will is 26 years ...|
+----+---+---+--------------------+



## References
    
https://docs.databricks.com/spark/latest/spark-sql/udf-python.html
    
https://changhsinlee.com/pyspark-udf/
    
https://medium.com/@ayplam/developing-pyspark-udfs-d179db0ccc87

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

[pyspark.sql.functions.pandas_udf](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html#pyspark.sql.functions.pandas_udf)