In [0]:
# example data
cols = ('Department', 'Employees', 'Creation_Year')
rows = (
    ("Department of Defense", 3000000, 1947), 
    ("Department of Education", 4227, 1980), 
    ("Department of Justice", 117047, 1870)
)

# Base Python
Let's start by defining a function in base Python.  For this example, we will write a function to create an acronym from a given string (`'user defined function'` -> `'UDF'`)

In [0]:
def make_acronym(text):
    
    # split text into words
    words = text.split()
    
    # get first letter of each words and capitalize
    first_letters = [word[0].upper() for word in words]
    
    # join letters and return acronym
    return ''.join(first_letters)

# test function
text = 'user defined function'
acronym = make_acronym(text)
print(f'{text} -> {acronym}')

user defined function -> UDF


# Pandas

In [0]:
import pandas as pd

# load into Pandas dataframe
pandas_df = pd.DataFrame(data = rows, columns = cols)
pandas_df.head()



Unnamed: 0,Department,Employees,Creation_Year
0,Department of Defense,3000000,1947
1,Department of Education,4227,1980
2,Department of Justice,117047,1870


To use our function in Pandas, we must use either `map` or `apply`.

In [0]:
# using map
pandas_df['Acronym_map'] = pandas_df['Department'].map(make_acronym)

# using apply
pandas_df['Acronym_apply'] = pandas_df.apply(lambda x: make_acronym(x['Department']), axis = 1)

pandas_df.head()

Unnamed: 0,Department,Employees,Creation_Year,Acronym_map,Acronym_apply
0,Department of Defense,3000000,1947,DOD,DOD
1,Department of Education,4227,1980,DOE,DOE
2,Department of Justice,117047,1870,DOJ,DOJ


The `map` method allows us to pass a Pandas series to our function, whereas `apply` iterates over each row and can access multiple columns.  We would likely choose to use `map` in this situation; however, more complex functions (such as the example below) would require the use of `apply`:

In [0]:
def classify_dept(creation_year, num_employees):
    
    # check age
    if creation_year < 1900:
        age = 'old'
    else:
        age = 'new'
        
    # check size
    if num_employees < 10000:
        size = 'small'
    else:
        size = 'large'
        
    return (age, size)

# apply to dataframe
pandas_df[['age', 'size']] = pandas_df.apply(lambda x: classify_dept(x['Creation_Year'], x['Employees']), result_type = 'expand', axis = 1)
pandas_df.head()

Unnamed: 0,Department,Employees,Creation_Year,Acronym_map,Acronym_apply,age,size
0,Department of Defense,3000000,1947,DOD,DOD,new,large
1,Department of Education,4227,1980,DOE,DOE,new,small
2,Department of Justice,117047,1870,DOJ,DOJ,old,large


# PySpark

In [0]:
# load into Spark dataframe
spark_df = spark.createDataFrame(data = rows, schema = cols)
display(spark_df)

Department,Employees,Creation_Year
Department of Defense,3000000,1947
Department of Education,4227,1980
Department of Justice,117047,1870


Before we can use our function in PySpark, we need to convert it to a PySpark UDF using the `udf` function imported from `spark.sql.functions`.

In [0]:
# import `udf` function
from pyspark.sql.functions import udf

# convert Python function to PySpark UDF
acronym_udf = udf(lambda x: make_acronym(x))

The PySpark UDF can be applied to the dataframe using `withColumn` or `select`.

In [0]:
# using `withColumn`
df_wc = spark_df.withColumn('Acronym', acronym_udf('Department'))

# view results
display(df_wc)

Department,Employees,Creation_Year,Acronym
Department of Defense,3000000,1947,DOD
Department of Education,4227,1980,DOE
Department of Justice,117047,1870,DOJ


In [0]:
# using `select`
df_sel = spark_df.select(
    'Department', 
    acronym_udf('Department').alias('Acronym')
)

# view results
display(df_sel)

Department,Acronym
Department of Defense,DOD
Department of Education,DOE
Department of Justice,DOJ


# Pandas UDFs in PySpark

In [0]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import *

### Method #1: Converting to PySpark UDF using `pandas_udf`

In [0]:
def get_decade(year_series):
    
    # convert to string
    series = year_series.astype(str)
    
    # return formatted string
    return series.str[:-1] + '0s'

pandas_df['Decade'] = get_decade(pandas_df['Creation_Year'])
pandas_df[['Creation_Year', 'Decade']].head()

Unnamed: 0,Creation_Year,Decade
0,1947,1940s
1,1980,1980s
2,1870,1870s


In [0]:
decade_udf = pandas_udf(get_decade, returnType = 'string')

decade_df = spark_df \
    .select(
        'Creation_Year', 
        decade_udf('Creation_Year').alias('Decade')
    )

display(decade_df)

Creation_Year,Decade
1947,1940s
1980,1980s
1870,1870s


### Method #2: Converting to PySpark UDF using Decorator (@)

The same function from above could have been declared using `pandas_udf` as a decorator and specifying the return type.

In [0]:
@pandas_udf("string")
def get_decade_decorator(year_series):
    
    # convert to string
    series = year_series.astype(str)
    
    # return formatted string
    return series.str[:-1] + '0s'

Using this method, there is no need to create a new variable for the PySpark UDF since it was specified by the decorator.

In [0]:
decorator_df = spark_df \
    .select(
        'Creation_Year', 
        get_decade_decorator('Creation_Year').alias('Decade')
    )

display(decorator_df)

Creation_Year,Decade
1947,1940s
1980,1980s
1870,1870s


# PySpark SQL

UDFs must be registered with `spark.udf.register` before they can be used in SQL queries.

In [0]:
spark.udf.register('make_acronym', make_acronym)

Out[15]: <function __main__.make_acronym(text)>

In [0]:
spark_df.createOrReplaceTempView("gov_departments")
sql_df = spark.sql(
    '''
    SELECT Department, make_acronym(Department) AS Acronym
    FROM Gov_Departments
    '''
)

display(sql_df)

Department,Acronym
Department of Defense,DOD
Department of Education,DOE
Department of Justice,DOJ
