## Row like data (RDD), user defined functions (UDFs), and Pandas in PySpark

In [None]:
# If you haven't installed pandas or pyarrow uncomment the line below or run it from the cli
# pip install pandas pyarrow

In [None]:
# cell for imports
import doctest
import math
import re
from dataclasses import dataclass, field
from functools import partial, reduce
from numbers import Number
from operator import add, mul, sub
from typing import Any, Callable, Final, Iterator, Tuple, Generator

import pandas as pd
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pandas.core.frame import DataFrame as PandasDataFrame
from pandas.core.series import Series
from py4j.protocol import Py4JJavaError
from pyspark.broadcast import Broadcast
from pyspark.rdd import RDD
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import ArrayType

In [None]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("error")

#### RDD

PySpark's DataFrame object can be manipulated with a host of methods. Spark will use those manipulations and create an optimised query plan for execution. However, a DataFrame has constraints; we cannot randomly determine what is in a column, for instance. PySpark dictates that all column entries are of the same data type. But what if our data has different types? What if we want to write our own Python functions to manipulate data?

We can use the resilient distributed dataset (RDD). Though I presume that the DataFrame will be enough as the state you need for data manupilation, there are two use cases for the RDD:

1. You have an unordered collection of Python objects that can be pickled.
2. You have unordered key-value pairs like a Python dictionary.

Now, I would not be surprised if you had never heard of [pickling](https://docs.python.org/3/library/pickle.html). A pickle is the Python method for serialising objects. Serialising objects is the process of translating an object's state into a format that can be stored, transmitted, and reconstructed. If you operate in a distributed environment (multiple computers at possibly multiple locations) and you want to perform manipulation of the state (remember, Python is an imperative programming language; it manipulates state; the object in Python is the representation of the state), you need to ensure that all computers that perform operations do this on the same object.

I say all of this while thinking it will be nothing more than some useful background information, but it hardly gives you any insight into a RDD. To understand an RDD, we should go back to the DataFrame. The DataFrame is primarily about columns; the vast majority of the `pyspark.sql` API is about manipulating columns. One of the most important attributes of a column is that the elements in that column have to have the same type (int, string, etc.). The RDD does not have this constraint; it is basically a set theoretic bag*. You can mix types; you can have multiplicity in a bag. I use the set theory example because set theory is the basis upon which all relational algebra (SQL) is based. You can consider an RDD to be row-like, for in tabular data the rows can have multiple types. A Python list is akin to a set theoretic bag; let's use an example with a list.

*Dutch mathematician Dick de Bruijn was the first to formalise the multiset/bag.

In [None]:
bag: list[Any] = [1, "two", 3.0, ("four", 4), {"five": 5}]
for e in bag:
    print(type(e))

With this list, we can create an RDD using the `parallelize` method. This is an excellent name. 
So even if you did not know you had this skill, as Spark is all about parallel computing (not to be confused with concurrency), you know parallel computing. The hallmark of parallel computing is that you use multiple CPUs, most likely on different computers and perhaps in multiple locations.

In [None]:
bag_rdd: RDD = sc.parallelize(bag)
bag_rdd

## MAP, FILTER, REDUCE
With the RDD, we sort of leave Python and enter Scala territory, where we start manipulating the RDD datastructure with the use of three typical concepts from functional programming (Scala is a functional programming language):
1. Map: You map a function to all elements in a collection.
2. Filter: You apply a predicate to all elements of a collection and filter those for which the predicate is true.
3. Reduce (fold): You apply a combining operator to the elements of a collection and recursively recombine the parts.

These functions are enough to understand how to manipulate an RDD. You should understand, that these functions are higher-order functions; these functions take other functions as their argument. Let me give you some Python examples.

In [None]:
ex: tuple[int] = *range(1, 5), 5


def mult2(n: float | int) -> float | int:
    """
    desc: Function that multiplies the input by two
    tests:
    >>> mult2(4)
    8
    >>> mult2(4.0)
    8.0
    """

    return n * 2


doctest.testmod()

In [None]:
# the map
list(map(mult2, ex))

In [None]:
# the filter
list(filter(lambda x: x % 2 == 1, ex))

In [None]:
# the reduce, not built-in in standard Python, but needs to be imported from functools
reduce(add, ex, 0)

Strangely enough, Python didn't have a product function similar to the sum function until Python 3.8. I believe it is because Guide van Rossum (Python's benevolent dictator) thought nobody would use it. Or perhaps he thought you could easily build on yourself, of course, not using the reduce function, as Guido doesn't like that one either. I, however, will use the reduce.

In [None]:
def product(numbers: list[int | float]) -> int | float:
    """
    desc:Function that returns the product of a list of numbers
    tests:
    >>> product([1,2,3])
    6
    >>> product([100,1000,10_000,0])
    0
    >>> product([100])
    100
    >>> product([2, 2.5])
    5.0
    """
    return reduce(mul, numbers, 1)


doctest.testmod()

In [None]:
product(ex)


As you can see from all three functions, we added another function: mult2, the somewhat silly function I quickly wrote. Then there is a construct you often see in data science and engineering: the anonymous $\lambda$ (lambda) function. Finally, there are the functions add and mul, which are usually used as operators. 

In functional style Python we can even apply a function partially. 

In [None]:
bag_rdd: RDD = bag_rdd.map(partial(add, 1))

In [None]:
try:
    bag_rdd.collect()
except Py4JJavaError:
    ...

What happend? We were trying to apply an operation not defined for certain types! If we read the stack trace, we get the following error: `TypeError: unsupported operand type(s) for +:` We are trying to add two types together, and add is not defined as an operation for one of the types.

Our RDD has the following elements:
1. 1
2. 'two'
3. 3.0
4. ('four', 4)
5. {'five', 5}

For 3 of the 5 elements, we get a type error if we try our function add on them.

#### Lazy evaluation
Maybe you are a bit surprised that we only get an error after we call `collect` on the RDD. The reason is that the `map` function, which we call first, is only evaluated after we perform an action by calling `collect`. This is called lazy evaluation, also known as call by need. The expression is only evaluated when the value is needed. PySpark uses the lazy evaluation strategy. Python's regular evaluation strategy is eager evaluation, where a function is evaluated once encountered.

if you would define function as:

```
def eager(a:Any, b:Any) -> bool | Any:
    if a == 0:
        return True
    else:
        return b
```
and call it like so: `eager(0, 1/0)` in Python you will get a zero division error, because Python will have evaluated both arguments. In a lazy language like Haskell this would pass the compiler, if b is not called then Haskell won't evaluate b, neither will PySpark as its evaluation strategy is lazy as well. 

In [None]:
def safer_add(value: Any) -> int | float:
    """
    desc:function that only adds if that operation is defined for the type
    tests:
    >>> safer_add(1)
    2
    >>> safer_add(1.0)
    2.0
    >>> safer_add({1: 'one'})
    {1: 'one'}
    """
    if isinstance(value, (int, float, complex)):
        return value + 1
    return value


doctest.testmod()

This is not the only way we can implement a safer add. As an exercise you should write one that
uses an error. 

In [None]:
bag_rdd: RDD = sc.parallelize(bag)

bag_rdd.map(safer_add).collect()

In [None]:
bag_rdd.filter(lambda e: isinstance(e, Number)).collect()

## Anonymous or $\lambda$ functions
In data science and engineering, you quite often see expressions like: `lambda e: isinstance(e, Number)`. This is known as the anonymous or $\lambda$ (lambda) function. The expression exists in three parts:

1. lambda: This is the anonymous part, equivalent to def is_number.
2. e: is the argument to the $\lambda$ function, derived from a collection.
3. After the colon `:` is the body of the function inclus return.

$\lambda$ functions derive from functional programming, and there is only one use: not to have to write an actual function and create code bloat. 

Howeve, if writing $\lambda$ functions confuses you or is not your style, then write proper functions. Though, I find $\lambda$ functions are often clearer and more concise. Studies have shown that the time a developer spends reading code to writing code is at least a 10:1 ratio, so writing code that is concise and readable is important. You will see $\lambda$ functions often! 

As a rule, you test and comment on all properly defined functions. With lambda functions, you do not need to do that.

In [None]:
def isnumber(e: Any) -> bool:
    """
    desc: Function that checks if e is a number here defines as a float or an int
    tests:
    >>> isnumber('a')
    False
    >>> isnumber(1)
    True
    >>> isnumber(3.14)
    True
    >>> isnumber(True)
    True
    """
    return isinstance(e, Number)


doctest.testmod()

#### False is 0
You might be surprised that `isnumber(True)` passes the test. A boolean is, after all, not a number. That a boolean is regarded as a number in Python is due to the programming language C, which much of Python is built upon. They didn't incorporate a separate Boolean concept; instead, they said False=0, True=1. Thus, you will see Python code like:
```
 num, denom = frac
    if denom: # equivalent to if True
        gcd = math.gcd(num, denom)
        return num // gcd, denom // gcd
    return None
```
Python even expanded that idea with truthy values, so instead of saying that the truth needs to be 1, in Python, as long as a number isn't 0, it will evaluate to True. This idea seems to me to be silly. Sure, 0/1 could False/True and we should be able to say $T\ne F$, but I prefer not to say that -5 is True per se. 

In [None]:
x: int = -5
if x:
    print("madness")

In [None]:
bag_rdd.filter(isnumber).collect()

#### Reduce 
When using reduce, it is imperative that the function you add to reduce is defined for all elements of the recursive structure that the function will operate on. Our bag_rdd cannot be reduced. The following example can be reduced:

In [None]:
bag2: list[Any] = [1, 2.3, 3, 87.65, 10_000_000]
bag2_rdd: RDD = sc.parallelize(bag2)

bag2_rdd.reduce(mul)

#### `reduce` in a distributed world.
There is an important limit to using reduce in a distributed world: I cannot do the following in PySpark:

In [None]:
reduce(sub, bag2, 0)

Doing the same in PySpark will return silly answers.

In [None]:
bag2_rdd.reduce(sub)

The functions you add to reduce need to be both 
[commutative and associative](https://en.wikipedia.org/wiki/Commutative_property). `add`, `mul`, `min`, and `max` are both commutative and associative. Substraction and division are not. The reason behind this is simple: we distribute the workload over several nodes; therefore, we need to know that if we combine the partial results, it does not matter in what order they come in or what the grouping is. Obviously, PySparks reduce does not check that the functions have these properties. This would take some serious math; you would have to prove these properties for all functions you give as an argument to reduce.

#### DataFrame == RDD?

In [None]:
df: DataFrame = spark.createDataFrame([[1], [2], [6]], schema=["column"])
df.rdd

As you can see a DataFrame is very much also an RDD. You can think of each row as dictionary;
the key is the column name, the value is the element in the row. 

In [None]:
df.rdd.collect()

#### RDD conclusion
You should work with the DataFrame as the data structure; it has a more intuitive API, and when working with persistent data, it makes more sense to take the column approach. Only when you have one of the two use cases of an RDD should you use them.
1. You have an unordered collection of Python objects that can be pickled.
2. You have unordered key-value pairs, like a Python dictionary.

## User Defined Function UDF
There is another option: if you want to extend PySpark with your own Python code, you can use Pandas UDF (user-defined functions). You can use `pyspark.sql.functions.udf` with your own defined function as an argument. PySpark will promote your function to work on columns.

There are a few things you should know about UDFs:

1. You cannot use conditional expressions in a UDF. A conditional expression (a.k.a. ternary expression) takes the form of: x if condition else y
2. You cannot use short-circuiting boolean expressions. if a != None and a.getSomething() is an example of short circuiting, if a is None, the right part of the expression will never be evaluated in Python. However, it will be in PySpark, and a NameError will be thrown, after which you cannot continue. 
3. The UDF cannot use keyword arguments on the calling side. You can use them in your definition, but you can enforce their use. You cannot write `def wht(*, arg1: int, arg2:str)`.

Let us create a type that is not available in PySpark: the fraction.

In modern Python, I think we have three options to create a fraction object:

1. We create a class. The advantage of that is that we can group all methods together in that class. We can also use properties to ensure the denominator does not equal 0.
2. We can use a NamedTuple to create the Fraction and write functions instead of methods.
3. We can use a type alias. 

I would opt for the NamedTuple because we are writing PySpark scripts and not object-oriented applications. However, as we promote our Python function to PySpark, we need to ensure that we work with types that can be translated to PySpark types and vice versa, so a type alias will have to do.We need to be sure that the types that we use can be translated into PySpark types and vice versa, so a type alias will have to do.  

In [None]:
Fraction = Tuple[int, int]

fractions: list[tuple[int, int]] = [(x, y) for x in range(10) for y in range(1, 10)]

frac_df: DataFrame = spark.createDataFrame(fractions, ["numerator", "denominator"])

frac_df.show(n=5, truncate=False)

In [None]:
def py_simplify_fraction(frac: Fraction) -> Fraction | None:
    """
    desc: Function to simplify fractions
    tests:
    >>> py_simplify_fraction((3,6))
    (1, 2)
    >>> py_simplify_fraction((2,5))
    (2, 5)
    >>> py_simplify_fraction((2,0))

    """
    num, denom = frac
    if denom:
        gcd = math.gcd(num, denom)
        return num // gcd, denom // gcd  # // -> floor division
    return None


def py_fraction_to_float(frac: Fraction) -> float | None:
    """
    desc: Function to retrieve a float rounded to two decimals from a fraction
    tests:
    >>> py_fraction_to_float((1,2))
    0.5
    >>> py_fraction_to_float((2,3))
    0.67
    >>> py_fraction_to_float((2,0))
    """
    num, denom = frac
    try:
        return round(num / denom, 2)
    except ZeroDivisionError:
        return None


doctest.testmod()

#### The importance of typing

I cannot stress enough the importance of using typing, both in Python and in PySpark scripts. Not only does typing give you instant commenting on code, this is important for others who read your code, but even for you. Often, you will have forgotten that it was you who wrote something. Suddenly, you find yourself wondering why you made the choices that you did. You need your own code to be well documented. Furthermore, typing prevents errors, especially if used in conjunction with a type checker such as [MyPy](https://www.mypy-lang.org/).

Now let's use these functions on the DataFrame.

In [None]:
# we create a type alias
SparkFrac: ArrayType = T.ArrayType(T.LongType())

simplify_fraction: Callable = F.udf(py_simplify_fraction, SparkFrac)

df: DataFrame = frac_df.select(F.array(F.col("numerator"), F.col("denominator")).alias("fraction"))

df: DataFrame = df.withColumn("simplified_fraction", simplify_fraction(F.col("fraction")))
df.show(n=5, truncate=False)

#### Typing again
Typing in Python is sometimes a bit messy. If I ask for the type of `simplify_fraction`, then the return I will receive is `function`. However, if I type it as function `simplify_fraction: function` I will get an error... In Python, we type a function with its abstract base class, `Callable`. 

Typing in Python will remain awkward because, at its core, Python is a dynamic language, yet we need to use typing for the reasons I mentioned before. 

In [None]:
issubclass(type(simplify_fraction), Callable)

In [None]:
fraction_to_float: Callable = F.udf(py_fraction_to_float, T.DoubleType())

df: DataFrame = df.withColumn(
    "fraction_to_float", fraction_to_float(F.col("fraction")).alias("float")
)
df.show(n=50, truncate=False)

#### Using a udf decorator

In [None]:
@F.udf(T.DoubleType())
def reverse_fraction_to_float(frac: Fraction) -> float | None:
    """
    desc: reverse the float the numerator -> & denominator -> numerator
    tests: No tests because of the decorator
    """
    num, denom = frac
    if num:
        return py_fraction_to_float((denom, num))
    return None

In [None]:
df: DataFrame = df.withColumn(
    "reverse_fraction_to_float",
    reverse_fraction_to_float(F.col("fraction")).alias("reverse"),
)

In [None]:
df.show(n=50, truncate=False)

## Conclusions
1. The RDD allows for more flexibility compared to the DataFrame.
2. The RDD is the most low-level and flexible way of running code in the distributed PySpark environment. You will need to be careful with your types to prevent type errors.
3. The API for RDD is heavily inspired by Google's MapReduce. Apache Hadoop is an open-source variation of MapReduce.
4. Using the `udf` function, we can promote Python functions to work with DataFrames. It is like mapping the Python function to the DataFrame.
5. You have to be careful with your data design; the types of your function input and output must be comparable to PySpark types.
6. If you use `udf` as a decorator, you cannot simply test your functions using doctest or assert (logically). 

#### Remark
In the book, there are a few exercises. Amongst them is one to write a temperature converter, exercise 8.3. Below is my version of this. Unfortunately, Jonathan Rioux's version is old Python, not the kind of Python I would promote. In this example we use a parguet file. [Parquet](https://en.wikipedia.org/wiki/Apache_Parquet) is a Hadoop column-based file format.

Here I show you the code as I think it should be, in general I think it is better to write helper functions outside the body of a function, furthermore use pattern matching with the `match/case` statement as it is much clearer than a myriad of ifs. I recommend you compare the two code examples.

In [None]:
# I am not commeting any further on these helpers I feel the name and the types are sufficient
def celsius_to_fahrenheit(degree: float | int) -> float:
    """
    >>> celsius_to_fahrenheit(30)
    86.0
    """
    return degree * 9 / 5 + 32


def fahrenheit_to_celsius(degree: float | int) -> float:
    """
    >>> fahrenheit_to_celsius(86)
    30.0
    """
    return (degree - 32) * 5 / 9


doctest.testmod()

In [None]:
def temp_to_temp_converter(
    value: float | int, domain: str, image: str
) -> float | int | None:
    """
    desc: Function to convert temperatures from domain to image.
    The excepted temperature scales are: (C)elsius, (F)ahrenheit, (K)elvin, and (R)ankine
    tests
    >>> temp_to_temp_converter(30, "C", "F")
    86.0
    >>> temp_to_temp_converter(30, "C", "C")
    30
    >>> temp_to_temp_converter(32, "F", "C")
    0.0
    >>> temp_to_temp_converter(86, "F", "K")
    303.15
    >>> temp_to_temp_converter(0, "C", "R")
    491.67
    >>> temp_to_temp_converter(30, "C", "G")
    >>> temp_to_temp_converter(30, "Q", "R")
    """
    match domain:
        case "C":
            match image:
                case "C":
                    return value
                case "F":
                    return celsius_to_fahrenheit(value)
                case "K":
                    return value + 273.15
                case "R":
                    return celsius_to_fahrenheit(value) + 459.67
                case _:
                    None
        case "F":
            match image:
                case "C":
                    return fahrenheit_to_celsius(value)
                case "F":
                    return value
                case "K":
                    return fahrenheit_to_celsius(value) + 273.15
                case "R":
                    return value + 459.67
                case _:
                    None
        case "K":
            match image:
                case "C":
                    return value - 273.15
                case "F":
                    return celsius_to_fahrenheit(value - 273.15)
                case "K":
                    return value
                case "R":
                    return value * 1.8
                case _:
                    return None
        case "R":
            match image:
                case "C":
                    return fahrenheit_to_celsius(value - 459.67)
                case "F":
                    return value - 459.67
                case "K":
                    return value / 1.8
                case "R":
                    return value
                case _:
                    None
        case _:
            None


doctest.testmod()

In [None]:
path: str = "./ProgrammingProjects/SparkTest/DataAnalysisWithPythonAndPySpark-Data-trunk/"

gsod: DataFrame = (
    reduce(
        lambda x, y: x.unionByName(y, allowMissingColumns=True),
        [
            spark.read.parquet(f"{path}gsod_noaa/gsod{year}.parquet")
            for year in range(2010, 2021)
        ],
    )
    .dropna(subset=["year", "mo", "da", "temp"])
    .where(F.col("temp") != 9999.9)
    .drop("date")
)

In [None]:
gsod: DataFrame = gsod.select(
    "stn",
    "year",
    "mo",
    "da",
    "temp",
)
conv: DataFrame = gsod.withColumn(
    "converted_temp", temp_to_temp_converter(gsod.temp, "F", "C")
).select("*", F.expr("round(converted_temp, 1)").alias("celsius")).drop("converted_temp")
conv.show(n=50)

## Treating big data as a bunch of small data, a.k.a. using Pandas

With Pandas we can start treating big data as nothing more than a bunch of small data, using Series UDF for column transformations. The simplest of the Pandas UDFs are the series UDFs. The series UDF is a one-dimensional Numpy ndarray. A numpy ndarray is actually an N-dimensional array, based upon the Fortran and C contiguous arrays. An instance of class ndarray consists of a contiguous one-dimensional segment of computer memory (owned by the array or by some other object) combined with an indexing scheme that maps N integers into the location of an item in the block. This construct makes the numpy ndarray very fast, much faster than a Python collection such as a list or tuple.

The Pandas series object takes a Column object as input and returns a Column object as output. In PySpark, it is mostly used to make use of the Pandas library, but also of all those libraries that Pandas works excellent with. Libraries such as Numpy (obviously), scikit-learn, and statsmodel all operate seamlessly with Pandas and thus in PySpark.

You can basically promote a Pandas (or friends) function to the distributed world of PySpark. If you are like me, then you straight away wonder how, after all, the numpy ndarray is that fast because it is contiguous in memory, ergo on one computer. This is solved by using [Apache Arrow](https://arrow.apache.org/) with its in-memory columnar data format.

There are four Series UDFs we will look at:

1. Series to Series
2. Iterator of Series to Iterator of Series
3. Iterator of Multiple Series to Iterator of Series
4. Series to Scalar

I will use some examples from Databricks to illuminate the UDF, then show how to apply them to real data. I will start of with a **Series to Series** example.   

In [None]:
# the function we want to promote
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    """
    desc: function that multiplies to columns together element wise
    test:
    >>> multiply_func(pd.Series([1, 2, 3]),pd.Series([1, 2, 3]))
    0    1
    1    4
    2    9
    dtype: int64
    """
    return a * b


# the promotion
multiply: Callable = F.pandas_udf(multiply_func, returnType=T.LongType())

col: Series = pd.Series([1, 2, 3])


df: DataFrame = spark.createDataFrame(pd.DataFrame(col, columns=["col"]))

# Execute function as a Spark vectorized UDF
df.select(
    multiply(F.col("col"), F.col("col")).alias("element_wise_mutiplication")
).show()

#### Vectorization
If you remember any linear algebra, this is a simple vector operation. 

$\begin{matrix}1 \\ 2 \\ 3 \end{matrix} \times \begin{matrix}1 \\ 2 \\ 3 \end{matrix} = \begin{matrix}1 \\ 4 \\ 9 \end{matrix}$ 

In Python, we call this vectorization, replacing explicit loops with array expressions. In general, vectorized array operations will often be one or two (or more) orders of magnitude faster than their pure Python equivalents, with the biggest impact seen in any kind of numerical computation. Vectorized operations in NumPy use highly optimised C and Fortran functions, resulting in cleaner and faster Python code. 

In [None]:
# using the same technique on actual data with gsod and a decorator
@F.pandas_udf("double")
def fahrenheit_to_celsius(degrees: pd.Series) -> pd.Series:
    '''converts degrees in Fahrenheit to Celsius using vectorization'''
    return round((degrees - 32) * 5 / 9, 1)


gsod: DataFrame = gsod.withColumn("temp_in_celsius", fahrenheit_to_celsius(gsod["temp"]))
gsod.select(F.col("temp"), F.col("temp_in_celsius")).distinct().show(n=20)

But what if I want to use `fahrenheit_to_celsius` on a Pandas DataFrame? I cannot use the decorated function; this elevates the function to be used on a Spark DataFrame. It seems it is not very `DRY` (don't repeat yourself) to write the same function without the decorator. Luckily, you do not need to. If you call `__dict__` on the function `fahrenheit_to_celsius`, then you will see there is func key word that returns the actual function. We can use `fahrenheit_to_celsius.func` to call the undecorated function for use on a Pandas DataFrame.

In [None]:
fahrenheit_to_celsius.__dict__

In [None]:
gsod_local: PandasDataFrame = gsod.filter(
    "year='2018' and mo='08' and stn='710920'"
).toPandas()
gsod_local.assign(temp_in_c_again = fahrenheit_to_celsius.func(gsod_local['temp'])).tail(5)

#### `functools.wraps`
What is nice is that the PySpark people have made the decorated functions easily inspectable by using [`functools.wraps`](https://docs.python.org/3/library/functools.html)  when writing the decorator. The wraps function takes the meta data of the decorated function and promotes it to the decorator. An example of the metadata of the function is its documentation.

In [None]:
fahrenheit_to_celsius.__doc__

In [None]:
help(fahrenheit_to_celsius)

#### Iterator

Before we move on to the Iterator of Series to Iterator of Series UDF, we need to answer the question: What is an iterator? An iterator provides a way to access the elements of an aggregated object sequentially without exposing its underlying representation. New traversal operations should be defined for an aggregate object without changing its interface. This is worded a bit opaquely, but it isn't that complicated.

When you define an iterator in any programming language, you have to answer two questions:

1. How do I get to the next element in the collection? 
2. Is there a next element?

In Python, you can make a class iterable by including a definition of [`__getitem__`](https://docs.python.org/3/reference/datamodel.html#object.__getitem__); this way, you implement the sequence class. You can implement the `__iter__` function in the iterable class and create an iterator class to perform the actual iteration. By doing so, you follow [the design pattern](https://en.wikipedia.org/wiki/Iterator_pattern#) strictly. The Pythonic way is to use a generator function, you write a generator by using the yield keyword in the body of the function. Yield kan suspend and pick up where it left. 

Using a generator, we can make a sentence iterable and then iterate over it using Python's built-in `next()` function. Python has no `hasNext()` function; it uses the StopIteration error, which is thrown by the yield key word if there are no more elements. 

In [None]:
@dataclass
class Sentence:
    text: str
    words: list[str] = field(default_factory=list)

    def __post_init__(self):
        RE_WORD = re.compile(r"\w+")
        self.words = RE_WORD.findall(self.text)

    def __repr__(self) -> str:
        return f"Sentence({reprlib.repr(self.text)})"

    def __iter__(self):
        for word in self.words:
            yield word

In [None]:
sentence: Sentence = Sentence("Croc is peckish!")
it: Generator = iter(sentence)
for word in sentence:
    print(next(it))

In [None]:
@F.pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

col: PandasDataFrame = pd.Series([1, 2, 3])
df: DataFrame = spark.createDataFrame(pd.DataFrame(col, columns=["col"]))

df.select(F.col("col"), plus_one(F.col("col")).alias("added_one")).show()

Now I can imagine that you question the usefulness of this example. This Pandas UDF is useful when the UDF execution requires initialising some state, for example, loading a machine learning model file to apply inference to every input batch. This is called an expensive cold start, meaning you will have to do some computationally expensive operation, for instance, loading that machine learning model, before we can start our manipulation.

A simple example of using some initial state, is using a broadcast. Broadcast a read-only variable to the cluster, returning a Broadcast object for reading it in distributed functions. Such as our UDF

In [None]:
y_bc: Broadcast = spark.sparkContext.broadcast(3)


@F.pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # all nodes in the cluster should have y_bc
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass


df.select(
    F.col("col"),
    plus_one(F.col("col")).alias("added_one"),
    plus_y(F.col("col")).alias("added_y"),
).show()

**Iterator of Multiple Series to Iterator of Series**

In [None]:
@F.pandas_udf("long")
def multiply_two_cols(
    iterator: Iterator[Tuple[pd.Series, pd.Series]]
) -> Iterator[pd.Series]:
    '''function that returns an element wise multiplied column'''
    for a, b in iterator:
        yield a * b


df.select(
    F.col("col"),
    plus_one(F.col("col")).alias("added_one"),
    plus_y(F.col("col")).alias("added_y"),
    multiply_two_cols(F.col("added_one"), F.col("added_y")).alias("added_one_times_added_y"),
).show()

As you can see, we use the [yield](https://docs.python.org/3/reference/simple_stmts.html#grammar-token-python-grammar-yield_stmt) keyword, which yields a result every time we need it.

The yield keyword returns a generator function. This is not the same as an iterator; a generator controls the iterating behaviour of a loop. This is an idea from Barbera Liskov. In Python, generators are efficient because they are lazy; they only yield when needed.

Let's see this pattern in conjunction with some actual data and write a function I know you have written in SQL.

In [None]:
@F.pandas_udf("date")
def create_date(
    ymd: Iterator[Tuple[pd.Series, pd.Series, pd.Series]]
) -> Iterator[pd.Series]:
    """create a data from a tuple containing year month and date"""
    for year, month, day in ymd:
        yield pd.to_datetime(pd.DataFrame(dict(year=year, month=month, day=day)))


gsod.select(
    "year", "mo", "da", create_date(gsod["year"], gsod["mo"], gsod["da"]).alias("date")
).distinct().show(n=5)

**Series-to-Scalar**    
This is akin to a PySpark aggregate function. A Series-to-Scalar pandas UDF defines an aggregation from one or more pandas Series to a scalar value, where each pandas Series represents a Spark column. 

In [None]:
df: DataFrame = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "value")
)


@F.pandas_udf("double")
def mean_udf(value: pd.Series) -> float:
    """returns for the column value"""
    return value.mean()


df.select(mean_udf(F.col("value"))).show()

#### Distributed Pandas?

With this technique, we are free to use the wealth of functions in the Pandas API. However, there is a caveat: as PySpark is a distributed environment and Pandas is not, we might need to take the composition of batches into account with certain user-defined functions, e.g.,

- Group aggregate UDFs
- Group map UDFs

These two UDFs are PySpark's solution to the split-apply-combine pattern:

1. Split the batches of data into groups based on some criteria. Using `groupBy`.
2. Apply a function to each group/batch independently. For instance, aggregate functions like sum, filter functions based upon some predicate, and transformations like plus_one are examples of applications.
3. Combine the batches into a unified data set.

Apache Arrow will default to batches of 10.000; this is a good size, but there might be circumstances (you run this code on an old Toshiba laptop) where you might want to reduce the batch size. This can be done by altering `spark.sql.execution.arrow.batchSize`.

Finally, to make use of the computing power, you need to make sure each batch/group can be loaded into memory. If one of your batches cannot be placed in memory, you will get an out-of-memory exception.

The Pandas Series-to-Scalar is split-apply-combine pattern applied.

In [None]:
df.groupby("id").agg(mean_udf(df["value"])).show()

#### group map UDF
A group map UDF returns a `pd.DataFrame` which PySpark elevates to a PySpark DataFrame. 

In [None]:
def scale_temperature(temp_by_day: pd.DataFrame) -> pd.DataFrame:
    """
    function that returns a normalization of temperature for a site.
    If the temperature is constant for the whole period, defaults to 0.5
    """
    temp = temp_by_day.temp
    answer = temp_by_day[["stn", "year", "mo", "da", "temp"]]
    if temp.min() == temp.max():
        return answer.assign(temp_norm=0.5) # pandas method: assigns a new column to a DataFrame.
    return answer.assign( 
        temp_norm=round((temp - temp.min()) /(temp.max() - temp.min()),1) # the normalisation
    )

In [None]:
gsod_map: DataFrame = gsod.groupBy("stn", "year", "mo").applyInPandas(
    scale_temperature,
    schema="stn string, year string, mo string, da string, temp double, temp_norm double"
)

In [None]:
gsod_map.show(n=5, truncate=False)

As you can see, because we are applying this function in pandas we do not need to use `pandas_udf`, 
see the documentation on [`applyInPandas`](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.GroupedData.applyInPandas.html).

#### Revisiting temp_to_temp_converter
The `temp_to_temp_converter` I wrote above can be easily adapted to use a `pandas_udf`.

In [None]:
def f_to_c(degrees: pd.Series) -> pd.Series:
    '''converts degrees in Fahrenheit to Celsius'''
    return (degrees - 32) * 5 / 9

def c_to_f(degrees: pd.Series) -> pd.Series:
    '''converts degrees in Celsius to Fahrenheit'''
    return degrees * 9 / 5 + 32

# these are constants change the value and code will break
rankine: Final =  459.67
kelvin: Final = 273.15

def temp_to_temp_converter(
    value: pd.Series, domain: str, image: str
) -> pd.Series:
        match domain:
            case "C":
                match image:
                    case "C":
                        return value
                    case "F":
                        return c_to_f(value)
                    case "K":
                        return value + kelvin
                    case "R": 
                        return c_to_f(value) + rankine
                    case _: 
                        return value.apply(lambda _: None)
            case "F":
                match image:
                    case "C": 
                        return f_to_c(value)
                    case "F": 
                        return value 
                    case "K": 
                        return f_to_c(value) + kelvin
                    case "R": 
                        return value + rankine
                    case _:
                        value.apply(lambda _: None) 
            case "K":  
                match image:
                    case "C":
                        return value - kelvin
                    case "F": 
                        return c_to_f(value - kelvin) 
                    case "K":
                        return value 
                    case "R": 
                        return value * 1.8 
                    case _: 
                        return value.apply(lambda _: None)
            case "R": 
                match image: 
                    case "C": 
                        return f_to_c(value - rankine)
                    case "F": 
                        return value - rankine 
                    case "K": 
                        return value / 1.8 
                    case "R": 
                        return value 
                    case _: 
                        value.apply(lambda _: None)
            case _:
                value.apply(lambda _: None)    

#### Code comment

1. Like the previous code and unlike Rioux's code, I think the conversion functions are general functions and should be kept out of the larger functions. This prevents clutter in your code.
2. This does not apply to $\lambda$ functions. I use a lambda here because I assign a whole series of None values to a Pandas Series object.
3. Python does not know constants as other programming languages do. For instance, in Java, you have the final key word: `public static final double kelvin = 273.15`. Python's typing library knows the type `Final`, which indicates a constant. If you use MyPy, you will get an error if the constant is reassigned somewhere. The interpeter will not throw an error and just make the change. I opt for leaving these constants out of the function; they are general constants. If I put them in the function `temp_to_temp_converter`, I will change the scope of these constants to just that function. I therefore cannot use these constants outside of my function, though I might want to. There are ways to prevent such random access in Python, but that is out of the scope of this notebook; therefore, I just wrote some comments

In [None]:
gsod: DataFrame = gsod.withColumn("converted_temp", temp_to_temp_converter(gsod["temp"], "F", "C"))
gsod.select(
    F.col("temp"), F.round(F.col("converted_temp"), 1).alias("converted_temp")
).distinct().show(n=20)