# User Defined Functions

Recall that the`pandas` `Series.apply` and `Series.map` methods allowed us to apply functions to each individual data value in a column.  We would like to do the same in `pyspark`, but the underlying code runs in Scala, we need some extra machinery to apply/map a function to the data. Our two solitions are

- Define a Python `udf` column function that will run Python code on each data value.
- Define a `pandas_udf` that is faster and uses existing pandas methods to perform [vectorized operations](https://en.wikipedia.org/wiki/Array_programming).



### The Problem: pure Python `udf` are slow

When using a pure Python `udf`, the runtime needs to

1. Send each separate row to the Python runtime.
2. Unpack all values, converting each to a Python object.
3. Run the `udf` on each value
4. Repackage each row and send it back to the JVM
5. Convert each row back to a Java object.

This is a **lot** of overhead.

### The Solution: Apache Arrow and `pandas_udf`

When using a pure Python `udf`, the runtime needs to

1. Apache Arrow stores the panda data *inside* the JVM
    * No transfering to a Python runtime
2. The Apache Arrow data structure allows `pandas` vectorized operations
    * Process chunks of data instead of individually
    * Allows us to leverage the famialiar `pandas` interface

## Before we start

Make sure you have Apache Arrow installed

In [2]:
!pip install pyarrow



### Testing `pyarrow` using `toPandas`

In [3]:
from pyspark.sql import SparkSession
from more_pyspark import get_spark_types, to_pandas

spark = SparkSession.builder.appName('Ops').getOrCreate()

artists = spark.read.csv("./data/Artists.csv", inferSchema=True, header=True)

22/11/03 12:28:43 WARN Utils: Your hostname, nn1448lr222 resolves to a loopback address: 127.0.1.1; using 172.22.172.10 instead (on interface eth0)
22/11/03 12:28:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/03 12:28:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/11/03 12:28:46 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/11/03 12:28:46 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/11/03 12:28:46 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
22/11/03 12:28:46 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
22/11/03 12:28:46 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.


[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [4]:
artists.printSchema()

root
 |-- ConstituentID: integer (nullable = true)
 |-- DisplayName: string (nullable = true)
 |-- ArtistBio: string (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- BeginDate: integer (nullable = true)
 |-- EndDate: integer (nullable = true)
 |-- Wiki QID: string (nullable = true)
 |-- ULAN: integer (nullable = true)



#### Careful! Returns the WHOLE dataframe

In [5]:
artists.toPandas() #Uses Arrow to **quickly** transfer all data to pandas

Unnamed: 0,ConstituentID,DisplayName,ArtistBio,Nationality,Gender,BeginDate,EndDate,Wiki QID,ULAN
0,1,Robert Arneson,"American, 1930–1992",American,Male,1930,1992,,
1,2,Doroteo Arnaiz,"Spanish, born 1936",Spanish,Male,1936,0,,
2,3,Bill Arnold,"American, born 1941",American,Male,1941,0,,
3,4,Charles Arnoldi,"American, born 1946",American,Male,1946,0,Q1063584,500027998.0
4,5,Per Arnoldi,"Danish, born 1941",Danish,Male,1941,0,,
...,...,...,...,...,...,...,...,...,...
15217,133006,Andrew Chesnutt,"American, 1861–1934",American,Male,1861,1934,,
15218,133007,Lewis Chesnutt,"American, 1860–1933",American,Male,1860,1933,,
15219,133026,Alfred Tritschler,"German, 1905 – 1970",German,,1905,1970,,
15220,133027,Studio of Dr. Paul Wolff & Tritschler,,,,0,0,,


#### MoMA artwork

In [6]:
from MoMA_schema import artwork_schema

artwork = spark.read.csv("./data/Artworks.csv", schema=artwork_schema, header=True)

In [7]:
artwork.take(2) >> to_pandas

22/11/03 12:28:55 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,Title,Artist,ConstituentID,ArtistBio,Nationality,BeginDate,EndDate,Gender,Date,Medium,...,ThumbnailURL,Circumference (cm),Depth (cm),Diameter (cm),Height (cm),Length (cm),Weight (kg),Width (cm),Seat Height (cm),Duration (sec.)
0,"Ferdinandsbrücke Project, Vienna, Austria (Ele...",Otto Wagner,6210,"(Austrian, 1841–1918)",(Austrian),(1841),(1918),(Male),1896,Ink and cut-and-pasted painted pages on paper,...,http://www.moma.org/media/W1siZiIsIjU5NDA1Il0s...,,,,48.6,,,168.9,,
1,"City of Music, National Superior Conservatory ...",Christian de Portzamparc,7470,"(French, born 1944)",(French),(1944),(0),(Male),1987,Paint and colored pencil on print,...,http://www.moma.org/media/W1siZiIsIjk3Il0sWyJw...,,,,40.6401,,,29.8451,,


## Creating and applying a `pandas_udf` in `pyspark`

* **udf**: <b>U</b>ser <b>D</b>efined <b>F</b>unction
* Use `pyspark.sql.functions.pandas_udf(func, pyspark_type)` to define the function
* Do stuff like you would to pandas data frames and/or columns
* Use the `pandas_udf` inside `withColumn` to make/change columns

### Example 1 -  Compute the Century of Birth

#### Step 1 -- Figure out how to do it with `pandas.Series`

In [8]:
import pandas as pd

artist_pandas = pd.read_csv('./data/Artists.csv')
artist_pandas.head()

Unnamed: 0,ConstituentID,DisplayName,ArtistBio,Nationality,Gender,BeginDate,EndDate,Wiki QID,ULAN
0,1,Robert Arneson,"American, 1930–1992",American,Male,1930,1992,,
1,2,Doroteo Arnaiz,"Spanish, born 1936",Spanish,Male,1936,0,,
2,3,Bill Arnold,"American, born 1941",American,Male,1941,0,,
3,4,Charles Arnoldi,"American, born 1946",American,Male,1946,0,Q1063584,500027998.0
4,5,Per Arnoldi,"Danish, born 1941",Danish,Male,1941,0,,


In [9]:
artist_pandas.dtypes

ConstituentID      int64
DisplayName       object
ArtistBio         object
Nationality       object
Gender            object
BeginDate          int64
EndDate            int64
Wiki QID          object
ULAN             float64
dtype: object

In [10]:
artist_pandas.BeginDate//100

0        19
1        19
2        19
3        19
4        19
         ..
15217    18
15218    18
15219    19
15220     0
15221     0
Name: BeginDate, Length: 15222, dtype: int64

In [11]:
artist_pandas.BeginDate//100*100

0        1900
1        1900
2        1900
3        1900
4        1900
         ... 
15217    1800
15218    1800
15219    1900
15220       0
15221       0
Name: BeginDate, Length: 15222, dtype: int64

#### Step 2 -- Package in `pandas_udf`

In [12]:
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import IntegerType

century = pandas_udf(lambda col: (col//100)*100,
              IntegerType())

In [13]:
century(artists.BeginDate) # lazy column expression

Column<'<lambda>(BeginDate)'>

In [14]:
(artists
.select('BeginDate')
.withColumn('Century of Birth', century(artists.BeginDate))
.take(3)
) >> to_pandas

                                                                                

Unnamed: 0,BeginDate,Century of Birth
0,1930,1900
1,1936,1900
2,1941,1900


### Example 2 - Clean up `Gender` column

#### Step 1 -- Figure out how to do it with `pandas.Series`

In [15]:
artwork_pandas = pd.read_csv('./data/Artworks.csv')

In [16]:
artwork_pandas.Gender.str.replace('[()]', '', regex=True)

0               Male
1               Male
2               Male
3               Male
4               Male
             ...    
138146     Male Male
138147        Female
138148        Female
138149        Female
138150        Female
Name: Gender, Length: 138151, dtype: object

#### Step 2 -- Package in `pandas_udf`

In [17]:
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import StringType

remove_paren = pandas_udf(lambda col: col.str.replace('[()]', '', regex=True),
               StringType())

In [18]:
(artwork
.select(artwork.Gender)
.withColumn('Clean Gender', remove_paren(artwork.Gender))
.take(3)
) >> to_pandas

                                                                                

Unnamed: 0,Gender,Clean Gender
0,(Male),Male
1,(Male),Male
2,(Male),Male


## <font color="red"> Exercise 6.6.2 </font>

Solve each of the following tasks by creating and applying a Python `udf`.

1. Clean up and split the `artwork.ArtistBio` column, keeping the nationality.
2. Perform the log + 1 transformation on the artwork['Height (cm)']` column.

In both cases, make sure you perform the task in one long `pandas.Series` operations/dot chain.

In [19]:
# Hint - numpy has vectorized log function that can handle missing values.
import numpy as np

np.log(artwork_pandas['Height (cm)'] + 1)

0         3.903991
1         3.729064
2         3.563883
3         3.947390
4         3.673766
            ...   
138146    2.467678
138147    2.674149
138148    2.674149
138149    2.484907
138150    2.674149
Name: Height (cm), Length: 138151, dtype: float64

In [25]:
# Your code here
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import StringType, DoubleType
from pyspark.sql.functions import split


extr_Nati = pandas_udf(lambda col: col.str.split(",").str.get(0).str.replace("(","", regex=False),
                       StringType())

logplus1 = pandas_udf(lambda col: np.log(col + 1),
                       DoubleType())

(artwork
.select(artwork.ArtistBio, col('Height (cm)'))
.withColumn('Nationality', extr_Nati(artwork.ArtistBio))
.withColumn('log+1', logplus1(col('Height (cm)')))
.take(3)
) >> to_pandas

[Stage 11:>                                                         (0 + 1) / 1]                                                                                

Unnamed: 0,ArtistBio,Height (cm),Nationality,log+1
0,"(Austrian, 1841–1918)",48.6,Austrian,3.903991
1,"(French, born 1944)",40.6401,French,3.729064
2,"(Austrian, 1876–1957)",34.3,Austrian,3.563883
