# Lab 01: PySpark UDF 

* User-Defined Function (UDF) 
* most useful feature of Spark SQL & DataFrame that is used to extend the PySpark build in capabilities.

you can extend RDBMS server by adding a new function that behaves just like built-in function, such as abs() or concat().

these functions need to register in the database library and use them on SQL as regular functions. 

## Why do we need UDF?

we are using parallel processing, then if we use normal for-loops we will need to iterate along all records in iterative manner. 

but when we use udf then the operations are independent to each other  

for example: we need to create a new column with string values from another column in reverse order, there's no built-in function to do this

Before you create any UDF, do your research to check if the similar function you wanted is already available in Spark SQL Functions.

When you are creating UDF’s you need to design them very carefully otherwise you will come across optimization & performance issues.

## Create UDF

In [1]:
# importing libraries
from pyspark import SparkContext
from pyspark.sql import SparkSession

### 1. Create Data Frame DF

In [2]:
spark = SparkSession.builder.appName('Lab01').getOrCreate()

21/09/25 22:30:33 WARN Utils: Your hostname, fou4d-ubuntu resolves to a loopback address: 127.0.0.1; using 192.168.1.156 instead (on interface wlp8s0)
21/09/25 22:30:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/09/25 22:30:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
columns = ["ID","Name"]
data = [("1", "Ali mohamed  "),
    ("2", "saadya Mahmoud"),
    ("3", "Mohamed mahmoud"),
    ("4", "aliaa eldemery")
       ]

df = spark.createDataFrame(data=data,schema=columns)

df.printSchema()
df.show(truncate=False)


root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)



[Stage 0:>                                                          (0 + 1) / 1]

+---+---------------+
|ID |Name           |
+---+---------------+
|1  |Ali mohamed    |
|2  |saadya Mahmoud |
|3  |Mohamed mahmoud|
|4  |aliaa eldemery |
+---+---------------+



                                                                                

we need to modify names to be in proper format, capitalize first letter

### 2. Create a Python Function

In [4]:
def convertCase(str):
    resultingStr=""
    arr = str.split(" ")
    for x in arr:
        resultingStr = resultingStr + x[0:1].upper() + x[1:len(x)] + " "
    resultingStr = resultingStr.strip()
    return resultingStr

In [5]:
# it's just a normal simple python function!
convertCase("small letters")

'Small Letters'

### 3. Convert a Python function to PySpark UDF

Now convert this function convertCase() to UDF by passing the function to PySpark SQL udf(), this function is available at org.apache.spark.sql.functions.udf package. Make sure you import this package before using it.

PySpark SQL udf() function returns *org.apache.spark.sql.expressions.UserDefinedFunction* class object.

In [6]:
# import org.apache.spark.sql.functions.udf package
from pyspark.sql.functions import udf
from pyspark.sql.types import *


#### pyspark udf() function. 

It takes 2 arguments, the custom function and the returned datatype (the data type of value returned by custom function. It is **StringType()** by default)

In [7]:
convertUDF = udf(lambda z: convertCase(z))
#from pyspark.sql.types import IntegerType
#convertUDF = udf(lambda z: convertCase(z), StringType())


### Using UDF with DataFrame

#### 3.1 Using UDF with PySpark DataFrame select()

In [8]:
# import sql functions for col and other functions we might need later
from pyspark.sql import functions as F
# using select
df.select(F.col("ID"), \
    convertUDF(F.col("Name")).alias("Name") ) \
   .show(truncate=False)


+---+---------------+
|ID |Name           |
+---+---------------+
|1  |Ali Mohamed    |
|2  |Saadya Mahmoud |
|3  |Mohamed Mahmoud|
|4  |Aliaa Eldemery |
+---+---------------+



#### 3.2 Using UDF with PySpark DataFrame withColumn()

let's create a new function to reverse the string

In [9]:
def reverseOrder(str):
    reStr= str[::-1]
    return reStr

In [10]:
reverseOrderUDF = udf(lambda z:reverseOrder(z))   

df.withColumn("Reversed Name", reverseOrderUDF(F.col("Name"))) \
  .show(truncate=False)

+---+---------------+---------------+
|ID |Name           |Reversed Name  |
+---+---------------+---------------+
|1  |Ali mohamed    |  demahom ilA  |
|2  |saadya Mahmoud |duomhaM aydaas |
|3  |Mohamed mahmoud|duomham demahoM|
|4  |aliaa eldemery |yremedle aaila |
+---+---------------+---------------+



### 3.3 Registering PySpark UDF & use it on SQL

In order to use convertCase() function on PySpark SQL, you need to register the function with PySpark by using spark.udf.register().

In [11]:
spark.udf.register("convertUDF2", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select ID, convertUDF2(Name) as Name_Corrected from NAME_TABLE") \
     .show(truncate=False)

+---+---------------+
|ID |Name_Corrected |
+---+---------------+
|1  |Ali Mohamed    |
|2  |Saadya Mahmoud |
|3  |Mohamed Mahmoud|
|4  |Aliaa Eldemery |
+---+---------------+



## 4. Creating UDF using annotation

with just a single step by using annotations.

In [16]:
@udf(returnType=StringType()) 
def upperCase(str):
    return str.upper()

df.withColumn("Cureated Name", upperCase(F.col("Name"))) \
.show(truncate=False)

+---+---------------+---------------+
|ID |Name           |Cureated Name  |
+---+---------------+---------------+
|1  |Ali mohamed    |ALI MOHAMED    |
|2  |saadya Mahmoud |SAADYA MAHMOUD |
|3  |Mohamed mahmoud|MOHAMED MAHMOUD|
|4  |aliaa eldemery |ALIAA ELDEMERY |
+---+---------------+---------------+



we can try to not use the annotation to test the expected error

In [19]:
# comment the next line and check the result
@udf(returnType=StringType()) 
def lowCase(str):
    return str.lower()

df.withColumn("Cureated Name", lowCase(F.col("Name"))) \
.show(truncate=False)

+---+---------------+---------------+
|ID |Name           |Cureated Name  |
+---+---------------+---------------+
|1  |Ali mohamed    |ali mohamed    |
|2  |saadya Mahmoud |saadya mahmoud |
|3  |Mohamed mahmoud|mohamed mahmoud|
|4  |aliaa eldemery |aliaa eldemery |
+---+---------------+---------------+



## Special Handling

### 1. Execution order

Spark does not guarantee the order of evaluation of subexpressions meaning expressions are not guarantee to evaluated left-to-right or in any other fixed order. PySpark reorders the execution for query optimization and planning hence, AND, OR, WHERE and HAVING expression will have side effects.

So when you are designing and using UDF, you have to be very careful especially with null handling as these results runtime exceptions.

### 2. Handling null check

In [13]:
columns = ["ID","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders"),
    ('4',None)]

df2 = spark.createDataFrame(data=data,schema=columns)
df2.show(truncate=False)
df2.createOrReplaceTempView("NAME_TABLE2")

+---+------------+
|ID |Name        |
+---+------------+
|1  |john jones  |
|2  |tracey smith|
|3  |amy sanders |
|4  |null        |
+---+------------+



In [14]:
spark.udf.register("_nullsafeUDF", lambda str: convertCase(str) if not str is None else "UNKNOWN" , StringType())

spark.sql("select _nullsafeUDF(Name) from NAME_TABLE2") \
     .show(truncate=False)

spark.sql("select ID, _nullsafeUDF(Name) as Name from NAME_TABLE2 " + \
          " where Name is not null") \
     .show(truncate=False)    

+------------------+
|_nullsafeUDF(Name)|
+------------------+
|John Jones        |
|Tracey Smith      |
|Amy Sanders       |
|UNKNOWN           |
+------------------+

+---+------------+
|ID |Name        |
+---+------------+
|1  |John Jones  |
|2  |Tracey Smith|
|3  |Amy Sanders |
+---+------------+



__________
thank you

Mohamed Fakhruldeen,

fakhruldeen@gmail.com

references:
    
    https://docs.databricks.com/spark/latest/spark-sql/udf-python.html
    http://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/udf.html
    https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/
    https://medium.com/analytics-vidhya/user-defined-functions-udf-in-pyspark-928ab1202d1c
    https://www.bmc.com/blogs/how-to-write-spark-udf-python/
    https://changhsinlee.com/pyspark-udf/
    
    