## Common Usages of Pandas UDF

Vectorized user defined functions that are executed by Spark using [Arrow](https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html) (to transfer to and from the Pandas format) together with the Pandas library. 

In [1]:
import seaborn as sns
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

spark = SparkSession.builder.getOrCreate()

iris = spark.createDataFrame(sns.load_dataset('iris'))
iris.show(5)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/08 16:18:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 0:>                                                          (0 + 1) / 1]

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



                                                                                

## Vectorized operation on a single column

### (Series --> Series)

In [30]:
# <-- return type: note we have to provide column names as well
@f.pandas_udf("square float, cube float") 
def square_cube(df_col: pd.Series) -> pd.DataFrame: # type hints
    sq = (df_col ** 2).values.reshape(-1, 1)
    cube = (df_col ** 3).values.reshape(-1, 1)

    return pd.DataFrame(data=np.hstack((sq, cube)))


@f.pandas_udf("float") # <-- return type
def square(df_col: pd.Series) -> pd.DataFrame: # type hints
    return (df_col ** 2)


iris_op = (
    iris
    .withColumn('square_cube_sepal_length', square_cube('sepal_length'))
    .withColumn('square_sepal_length', square('sepal_length'))
)

iris_op.show(5)

+------------+-----------+------------+-----------+-------+------------------------+-------------------+
|sepal_length|sepal_width|petal_length|petal_width|species|square_cube_sepal_length|square_sepal_length|
+------------+-----------+------------+-----------+-------+------------------------+-------------------+
|         5.1|        3.5|         1.4|        0.2| setosa|        {26.01, 132.651}|              26.01|
|         4.9|        3.0|         1.4|        0.2| setosa|        {24.01, 117.649}|              24.01|
|         4.7|        3.2|         1.3|        0.2| setosa|        {22.09, 103.823}|              22.09|
|         4.6|        3.1|         1.5|        0.2| setosa|         {21.16, 97.336}|              21.16|
|         5.0|        3.6|         1.4|        0.2| setosa|           {25.0, 125.0}|               25.0|
+------------+-----------+------------+-----------+-------+------------------------+-------------------+
only showing top 5 rows



#### Accessing individual components of struct columns

In [32]:
iris_op.select(f.col('square_cube_sepal_length').getField('square')).show(5)

+-------------------------------+
|square_cube_sepal_length.square|
+-------------------------------+
|                          26.01|
|                          24.01|
|                          22.09|
|                          21.16|
|                           25.0|
+-------------------------------+
only showing top 5 rows



### Series to Scalar

In [35]:
@f.pandas_udf("float") # <-- return type
def median(df_col: pd.Series) -> float: # type hints
    return df_col.fillna(0).median()

iris_op.groupBy('species').agg(median('sepal_length')).show()

+----------+--------------------+
|   species|median(sepal_length)|
+----------+--------------------+
|    setosa|                 5.0|
|versicolor|                 5.9|
| virginica|                 6.5|
+----------+--------------------+



23/06/08 16:49:18 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /private/var/folders/k_/d027kgrj3lj8xpj7n_jfcv9c99m08y/T/blockmgr-955d0d92-6096-432a-a1e7-3ecd541f23ac. Falling back to Java IO way
java.io.IOException: Failed to delete: /private/var/folders/k_/d027kgrj3lj8xpj7n_jfcv9c99m08y/T/blockmgr-955d0d92-6096-432a-a1e7-3ecd541f23ac
	at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:177)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:113)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
	at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1(DiskBlockManager.scala:368)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1$adapted(DiskBlockManager.scala:364)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.Indexe

## [GroupedData.applyInPandas](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.GroupedData.applyInPandas.html#pyspark.sql.GroupedData.applyInPandas)

## Windowing Functions