# UDF: user-defined functions

- User-defined functions are a way to bring the flexibility of the RDD to the data frame. They mimic the `.map()` part of working with RDDs.
- `@F.udf(f, returnType)` decorator will promote a Python function to operate on columns of a data frame like the functions from `F`.
- The original Python function is accessible through the `.func` attribute of a UDF.

# Pandas UDF

## Series UDF

- Familirar column-first approach.
- The default batch size is 10,000.
- `spark.sql.execution.arrow.maxRecordsPerBatch` config option changes the maximum batch size.

### *Series to Series* [aka Scalar]

- Takes in a Column, converts it to a Series, returns a Series that is converted back to a Column.
- Column functions, but in Pandas.
- Almost the same as usual Python UDFs, but works in batches instead of one value at a time.

### *Iterator of Series to Iterator of Series*

- Exactly the same as the previous, but with explicit iteration over batches of Series.
- Also takes a Column and returns a Column.
- Allows to improve performance when there is an *expensive cold start* (i.e., unpack an ML model).

### *Iterator of Multiple Series to Iterator of Series*

- Can take multiple Columns as inputs.

## UDFs on grouped data

- Split-apply-combine pattern: split the data using `.groupby()`, apply the function to each batch independently, combine the batches.
- Each group has to be able to fit into a Pandas DataFrame on the executor.

### *Group aggregate UDFs* [aka Series to Scalar]

- Distills the Series into a single value.
- Basically a custom aggregate function.
- Applied in `.groupby().agg()`. Each group becomes a batch.

### *Group map UDFs*

- Iterates over batches of Pandas dat frames, yield Pandas data frames that are combined into a Spark data frame.
- Do not need to be decorated, but are applied using `.groupby().applyInPandas()`.
- The schema of the output has to be supplied as an argument.
- If something needs to be applied to the entire data frame without splitting into groups, use `.mapInPandas()`.

## Choosing which Pandas UDF to use
- Do I need control over the splits of data into batches?
  - *YES:* What will the UDF return?
    - *A single aggregate value:* `Group Aggregate UDF`.
    - *A transformed data frame* `Group Map UDF`.
  - *NO:* Do I need the whole data or only some columns?
    - *Complete data:* `mapInPandas()`
    - *Columns are enough:* Do I need a cold start?
      - *NO:* `Series to Series UDF`
      - *YES:* How many columns do I need to use?
        - *1:* `Iterator of Series to Iterator of Series UDF`
        - *2+:* `Iterator of Multiple Series to Iterator of Series UDF`

# Examples

In [1]:
import datetime
import pathlib
import sys
import time
from typing import Iterator, Tuple
from sklearn.linear_model import LinearRegression

import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

if (root := str(pathlib.Path().absolute().parent)) not in sys.path:
    sys.path.append(root)

import src
import utilities

utilities.setup_environment_for_spark()

import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [3]:
spark = utilities.get_spark_session(r"пе_пе", max_executors=35)
spark.sparkContext.setLogLevel("ERROR")
spark

In [4]:
_hdfs_dir = "/arnsdpsbx/user/team/team_ss/ds_projects/mvs-key-channel-prediction/"
_hdfs_csv = f"{_hdfs_dir}/interim/vsp_freeze_stats.csv"

In [6]:
schema = T.StructType(fields=[
    T.StructField("epk_id", T.LongType()),
    T.StructField("hold_type", T.StringType()),
    T.StructField("score_sbp", T.FloatType()),
    T.StructField("score_dkm", T.FloatType()),
    T.StructField("sd_age_yrs_comp_nv", T.FloatType()),
])

stats = spark.read.csv(_hdfs_csv, header=True, schema=schema)
stats.printSchema()
stats.show(3)

root
 |-- epk_id: long (nullable = true)
 |-- hold_type: string (nullable = true)
 |-- score_sbp: float (nullable = true)
 |-- score_dkm: float (nullable = true)
 |-- sd_age_yrs_comp_nv: float (nullable = true)

+-------------------+---------+----------+-----------+------------------+
|             epk_id|hold_type| score_sbp|  score_dkm|sd_age_yrs_comp_nv|
+-------------------+---------+----------+-----------+------------------+
|1126087828843062099|   1_main| 0.4078902|  0.2693926|              39.0|
|1126099244876979588|   1_main|0.11111562|0.064469524|              42.0|
|1126100365869525512|   1_main| 0.2480534|  0.1110391|              49.0|
+-------------------+---------+----------+-----------+------------------+
only showing top 3 rows



## Python UDF

In [7]:
@F.udf(returnType=T.StringType())
def reverse_string(string: str) -> str:
    """Reverse a string."""
    return string[::-1]

In [9]:
stats.withColumn("hold_type_r", reverse_string("hold_type")).show(7)

[Stage 3:>                                                          (0 + 1) / 1]

+-------------------+---------+----------+-----------+------------------+-----------+
|             epk_id|hold_type| score_sbp|  score_dkm|sd_age_yrs_comp_nv|hold_type_r|
+-------------------+---------+----------+-----------+------------------+-----------+
|1126087828843062099|   1_main| 0.4078902|  0.2693926|              39.0|     niam_1|
|1126099244876979588|   1_main|0.11111562|0.064469524|              42.0|     niam_1|
|1126100365869525512|   1_main| 0.2480534|  0.1110391|              49.0|     niam_1|
|1126116274430575580|   1_main|0.33973783|  0.3550213|              42.0|     niam_1|
|1126117223621958551|   1_main|0.48102745|  0.2877174|              35.0|     niam_1|
|1126118383268436038|   1_main|0.35070643| 0.51856196|              62.0|     niam_1|
|1126119341050728845|     3_zp|0.25619835|  0.2162769|              33.0|       pz_3|
+-------------------+---------+----------+-----------+------------------+-----------+
only showing top 7 rows



                                                                                

## Series UDF

### *Series*

In [10]:
@F.pandas_udf(returnType=T.DoubleType())
def age_to_fahrenheit(age: pd.Series) -> pd.Series:
    """Transforms age to Fahrenheit (as if it was in Celsius)."""
    return age * 9 / 5 + 32


@F.pandas_udf(returnType=T.DoubleType())
def shuffle_column(col: pd.Series) -> pd.Series:
    """Shuffle the values of the column for a permutation test."""
    return col.sample(frac=1, replace=False)

In [12]:
stats.withColumn("age_f", age_to_fahrenheit(F.col("sd_age_yrs_comp_nv"))).show(7)

+-------------------+---------+----------+-----------+------------------+------------------+
|             epk_id|hold_type| score_sbp|  score_dkm|sd_age_yrs_comp_nv|             age_f|
+-------------------+---------+----------+-----------+------------------+------------------+
|1126087828843062099|   1_main| 0.4078902|  0.2693926|              39.0|102.19999694824219|
|1126099244876979588|   1_main|0.11111562|0.064469524|              42.0| 107.5999984741211|
|1126100365869525512|   1_main| 0.2480534|  0.1110391|              49.0|120.19999694824219|
|1126116274430575580|   1_main|0.33973783|  0.3550213|              42.0| 107.5999984741211|
|1126117223621958551|   1_main|0.48102745|  0.2877174|              35.0|              95.0|
|1126118383268436038|   1_main|0.35070643| 0.51856196|              62.0|143.60000610351562|
|1126119341050728845|     3_zp|0.25619835|  0.2162769|              33.0|  91.4000015258789|
+-------------------+---------+----------+-----------+----------------

                                                                                

In [13]:
sample = stats.limit(5).toPandas()
age_to_fahrenheit.func(sample.sd_age_yrs_comp_nv)

0    102.199997
1    107.599998
2    120.199997
3    107.599998
4     95.000000
Name: sd_age_yrs_comp_nv, dtype: float32

### *Iterator of Series*

In [14]:
@F.pandas_udf(returnType=T.DoubleType())
def age_to_fahrenheit(age: Iterator[pd.Series]) -> Iterator[pd.Series]:
    """Transforms age to Fahrenheit (as if it was in Celsius)."""

    # Here can be an expensive cold start, that will happen once on each worker
    # rather than once for every batch.
    time.sleep(7)

    for batch in age:
        yield batch * 9 / 5 + 32

In [15]:
stats.withColumn("age_f", age_to_fahrenheit(F.col("sd_age_yrs_comp_nv"))).show(7)

[Stage 7:>                                                          (0 + 1) / 1]

+-------------------+---------+----------+-----------+------------------+------------------+
|             epk_id|hold_type| score_sbp|  score_dkm|sd_age_yrs_comp_nv|             age_f|
+-------------------+---------+----------+-----------+------------------+------------------+
|1126087828843062099|   1_main| 0.4078902|  0.2693926|              39.0|102.19999694824219|
|1126099244876979588|   1_main|0.11111562|0.064469524|              42.0| 107.5999984741211|
|1126100365869525512|   1_main| 0.2480534|  0.1110391|              49.0|120.19999694824219|
|1126116274430575580|   1_main|0.33973783|  0.3550213|              42.0| 107.5999984741211|
|1126117223621958551|   1_main|0.48102745|  0.2877174|              35.0|              95.0|
|1126118383268436038|   1_main|0.35070643| 0.51856196|              62.0|143.60000610351562|
|1126119341050728845|     3_zp|0.25619835|  0.2162769|              33.0|  91.4000015258789|
+-------------------+---------+----------+-----------+----------------

                                                                                

### *Iterator of Multiple Series*

In [16]:
@F.pandas_udf(returnType=T.DoubleType())
def harmonic_mean(scores: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    """Compute the harmonic mean of two columns."""

    for sbp, dkm in scores:
        yield 2 * sbp * dkm / (sbp + dkm)

In [17]:
stats.withColumn("hm", harmonic_mean(F.col("score_sbp"), F.col("score_dkm"))).show(7)

+-------------------+---------+----------+-----------+------------------+-------------------+
|             epk_id|hold_type| score_sbp|  score_dkm|sd_age_yrs_comp_nv|                 hm|
+-------------------+---------+----------+-----------+------------------+-------------------+
|1126087828843062099|   1_main| 0.4078902|  0.2693926|              39.0|0.32448071241378784|
|1126099244876979588|   1_main|0.11111562|0.064469524|              42.0|0.08159654587507248|
|1126100365869525512|   1_main| 0.2480534|  0.1110391|              49.0|0.15340685844421387|
|1126116274430575580|   1_main|0.33973783|  0.3550213|              42.0|0.34721145033836365|
|1126117223621958551|   1_main|0.48102745|  0.2877174|              35.0| 0.3600673973560333|
|1126118383268436038|   1_main|0.35070643| 0.51856196|              62.0| 0.4184277355670929|
|1126119341050728845|     3_zp|0.25619835|  0.2162769|              33.0| 0.2345510721206665|
+-------------------+---------+----------+-----------+------

## Group UDF

### Group aggregate

In [18]:
@F.pandas_udf(returnType=T.DoubleType())
def rate_of_change(age: pd.Series, score: pd.Series) -> float:
    """Fit a linear regression within the group and return the slope of the line."""

    reg = LinearRegression().fit(X=age.values.reshape(-1, 1), y=score)

    return reg.coef_[0]

In [19]:
(
    stats.groupBy("hold_type")
    .agg(rate_of_change("sd_age_yrs_comp_nv", "score_sbp").alias("rate"))
    .show(7)
)

[Stage 11:>                                                         (0 + 1) / 1]

+---------+--------------------+
|hold_type|                rate|
+---------+--------------------+
|   1_main|-8.99054866749793...|
|    2_lpr|-2.25336276344023...|
|     3_zp|-0.00277369585819...|
+---------+--------------------+



                                                                                

### Group map

#### `applyInPandas`

In [20]:
def min_max_scale_age(df: pd.DataFrame) -> pd.DataFrame:
    """Min-max scale the age column."""

    age = df.sd_age_yrs_comp_nv
    age_min, age_max = age.min(), age.max()
    out = (age - age_min) / (age_max - age_min)
    n = df.shape[0]

    return df.drop("sd_age_yrs_comp_nv", axis="columns").assign(age_scaled=out, n=n)

In [22]:
sample = stats.limit(5).toPandas()
min_max_scale_age(sample)

Unnamed: 0,epk_id,hold_type,score_sbp,score_dkm,age_scaled,n
0,1126087828843062099,1_main,0.40789,0.269393,0.285714,5
1,1126099244876979588,1_main,0.111116,0.06447,0.5,5
2,1126100365869525512,1_main,0.248053,0.111039,1.0,5
3,1126116274430575580,1_main,0.339738,0.355021,0.5,5
4,1126117223621958551,1_main,0.481027,0.287717,0.0,5


In [23]:
schema = T.StructType(fields=[
    T.StructField("epk_id", T.LongType()),
    T.StructField("hold_type", T.StringType()),
    T.StructField("score_sbp", T.DoubleType()),
    T.StructField("score_dkm", T.DoubleType()),
    T.StructField("age_scaled", T.DoubleType()),
    T.StructField("n", T.IntegerType()),
])

(
    stats.groupby("hold_type")
    .applyInPandas(min_max_scale_age, schema=schema)
    .show(7)
)

[Stage 16:>                                                         (0 + 1) / 1]

+-------------------+---------+-------------------+-------------------+-------------------+------+
|             epk_id|hold_type|          score_sbp|          score_dkm|         age_scaled|     n|
+-------------------+---------+-------------------+-------------------+-------------------+------+
|1131576990673220011|   1_main|0.24425622820854187| 0.2018217295408249|0.40909090638160706|270000|
|1131578133139453629|   1_main| 0.3485866189002991|0.14937470853328705| 0.5454545617103577|270000|
|1131578889056964978|   1_main|0.27660831809043884|0.06328414380550385| 0.7272727489471436|270000|
|1131579954213543423|   1_main| 0.2329607456922531| 0.2735936939716339| 0.8863636255264282|270000|
|1131580607051412559|   1_main| 0.3482333719730377|0.38330310583114624| 0.3636363744735718|270000|
|1131581006485048709|   1_main|0.18145854771137238|0.25156131386756897| 0.7045454382896423|270000|
|1131581023664976379|   1_main| 0.6668580770492554|0.44185078144073486| 0.5454545617103577|270000|
+---------

                                                                                

In [24]:
stats.groupby("hold_type").count().show()

+---------+------+
|hold_type| count|
+---------+------+
|   1_main|270000|
|     3_zp|  9810|
|    2_lpr| 19143|
+---------+------+



                                                                                

#### `mapInPandas`

In [25]:
def sum_scores(df: pd.DataFrame) -> pd.DataFrame:
    """Assign a group based on the score."""

    for batch_df in df:
        batch_df["total_score"] = batch_df["score_sbp"] + batch_df["score_sbp"]
        batch_df["n"] = batch_df.shape[0]
        yield batch_df[["epk_id", "total_score", "n"]]

In [26]:
schema = T.StructType(fields=[
    T.StructField("epk_id", T.LongType()),
    T.StructField("total_score", T.DoubleType()),
    T.StructField("n", T.IntegerType()),
])

stats.mapInPandas(sum_scores, schema=schema).show(7)

+-------------------+-------------------+-----+
|             epk_id|        total_score|    n|
+-------------------+-------------------+-----+
|1126087828843062099| 0.8157804012298584|10000|
|1126099244876979588|0.22223123908042908|10000|
|1126100365869525512| 0.4961068034172058|10000|
|1126116274430575580| 0.6794756650924683|10000|
|1126117223621958551| 0.9620549082756042|10000|
|1126118383268436038| 0.7014128565788269|10000|
|1126119341050728845| 0.5123966932296753|10000|
+-------------------+-------------------+-----+
only showing top 7 rows

