In [1]:
from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName("app2").getOrCreate()
sc = spark.sparkContext
spark
sc


Create a DataFrame

In [2]:

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)


+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



 Create a Python Function

In [3]:

def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 


Convert a Python function to PySpark UDF



Now convert this function convertCase() to UDF by passing the function to PySpark SQL udf(), this function is available at org.apache.spark.sql.functions.udf package. Make sure you import this package before using it.

PySpark SQL udf() function returns org.apache.spark.sql.expressions.UserDefinedFunction class object.

In [6]:
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
""" Converting function to UDF """
convertUDF = udf(lambda z: convertCase(z),StringType())


Note: The default type of the udf() is StringType hence, you can also write the above statement without return type.

In [7]:

""" Converting function to UDF 
StringType() is by default hence not required """
convertUDF = udf(lambda z: convertCase(z)) 


    3. Using UDF with DataFrame
    3.1 Using UDF with PySpark DataFrame select()
    * Now you can use convertUDF() on a DataFrame column as a regular build-in function.

In [8]:

df.select(col("Seqno"), \
    convertUDF(col("Name")).alias("Name") ) \
   .show(truncate=False)


+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



3.2 Using UDF with PySpark DataFrame withColumn()
* You could also use udf on DataFrame withColumn() function, to explain this I will create another upperCase() function which converts the input string to upper case.

In [9]:

def upperCase(str):
    return str.upper()


Let’s convert upperCase() python function to UDF and then use it with DataFrame withColumn(). Below example converts the values of “Name” column to upper case and creates a new column “Curated Name”

In [10]:

upperCaseUDF = udf(lambda z:upperCase(z),StringType())   

df.withColumn("Cureated Name", upperCaseUDF(col("Name"))) \
  .show(truncate=False)


+-----+------------+-------------+
|Seqno|Name        |Cureated Name|
+-----+------------+-------------+
|1    |john jones  |JOHN JONES   |
|2    |tracey smith|TRACEY SMITH |
|3    |amy sanders |AMY SANDERS  |
+-----+------------+-------------+



    3.3 Registering PySpark UDF & use it on SQL
In order to use convertCase() function on PySpark SQL, you need to register the function with PySpark by using ```spark.udf.register()```.

In [11]:

""" Using UDF on SQL """
spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
     .show(truncate=False)


+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



    4. Creating UDF using annotation
In the previous sections, you have learned creating a UDF is a 2 step process,<br>
***first, you need to create a Python function***,<br> 
__second convert function to UDF using SQL udf() function__,<br><br><br> however, you can avoid these two steps and create it with just a single step by using annotations.

In [12]:

@udf(returnType=StringType()) 
def upperCase(str):
    return str.upper()

df.withColumn("Cureated Name", upperCase(col("Name"))) \
.show(truncate=False)


+-----+------------+-------------+
|Seqno|Name        |Cureated Name|
+-----+------------+-------------+
|1    |john jones  |JOHN JONES   |
|2    |tracey smith|TRACEY SMITH |
|3    |amy sanders |AMY SANDERS  |
+-----+------------+-------------+



    5. Special Handling
    5.1 Execution order
    One thing to aware is in PySpark/Spark does not guarantee the order of evaluation of subexpressions meaning expressions are not guarantee to evaluated left-to-right or in any other fixed order. PySpark reorders the execution for query optimization and planning hence, AND, OR, WHERE and HAVING expression will have side effects.

    So when you are designing and using UDF, you have to be very careful especially with null handling as these results runtime exceptions.



In [15]:

""" 
No guarantee Name is not null will execute first
If convertUDF(Name) like '%John%' execute first then 
you will get runtime error
"""
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE "+ \
          "where Name is not null and convertUDF(Name) like '%John%'") \
     .show(truncate=False)  


+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+



    5.2 Handling null check
UDF’s are error-prone when not designed carefully. for example, when you have a column that contains the value null on some records



In [16]:

""" null check """

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders"),
    ('4',None)]

df2 = spark.createDataFrame(data=data,schema=columns)
df2.show(truncate=False)
df2.createOrReplaceTempView("NAME_TABLE2")

spark.sql("select convertUDF(Name) from NAME_TABLE2") \
     .show(truncate=False)


+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
|4    |null        |
+-----+------------+



PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "C:\Users\KAUNTEYA\AppData\Local\Temp/ipykernel_17600/4075634363.py", line 3, in convertCase
AttributeError: 'NoneType' object has no attribute 'split'


Note that from the above snippet, record with “Seqno 4” has value “None” for “name” column. Since we are not handling null with UDF function, using this on DataFrame returns above error. Note that in Python None is considered null.

    Below points to remember

* Its always best practice to check for null inside a UDF function rather than checking for null outside.
* In any case, if you can’t do a null check in UDF at lease use IF or CASE WHEN to check for null and call UDF conditionally.

In [17]:
spark.udf.register("_nullsafeUDF", lambda str: convertCase(str) if not str is None else "" , StringType())

spark.sql("select _nullsafeUDF(Name) from NAME_TABLE2") \
     .show(truncate=False)

spark.sql("select Seqno, _nullsafeUDF(Name) as Name from NAME_TABLE2 " + \
          " where Name is not null and _nullsafeUDF(Name) like '%John%'") \
     .show(truncate=False)    


+------------------+
|_nullsafeUDF(Name)|
+------------------+
|John Jones        |
|Tracey Smith      |
|Amy Sanders       |
|                  |
+------------------+

+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+



    5.3 Performance concern using UDF
UDF’s are a black box to PySpark hence it can’t apply optimization and you will lose all the optimization PySpark does on Dataframe/Dataset. When possible you should use Spark SQL built-in functions as these functions provide optimization. Consider creating UDF only when existing built-in SQL function doesn’t have it.

    #Conclusion
In this article, you have learned the following

PySpark UDF is a User Defined Function that is used to create a reusable function in Spark.
Once UDF created, that can be re-used on multiple DataFrames and SQL (after registering).
The default type of the udf() is StringType.
You need to handle nulls explicitly otherwise you will see side-effects.