In [1]:
import findspark
findspark.init('c:/Spark')
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder.appName("UDFs").getOrCreate()

In [5]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]
df = spark.createDataFrame(data = data, schema =columns)
df.show()

+-----+------------+
|Seqno|        Name|
+-----+------------+
|    1|  john jones|
|    2|tracey smith|
|    3| amy sanders|
+-----+------------+



In [6]:
# Create a python function to convert the first letter of a word to upcase.
def convertCase(str):
    resStr =''
    arr = str.split(" ")
    for x in arr:
        resStr = resStr + x[0:1].upper() + x[1:len(x)] + ' '
    return resStr

In [7]:
# Convert function to a udf.  StringType is the default type so you don't have to specify it but did her anyways.
convertUDF = udf(lambda x: convertCase(x),StringType())

In [21]:
# Now use the UDF on the dataframe to fix the name column
df.select(col('Seqno'), convertUDF(col('Name')).alias('Name'))
df.show()

+-----+------------+
|Seqno|        Name|
+-----+------------+
|    1|  john jones|
|    2|tracey smith|
|    3| amy sanders|
+-----+------------+



In [31]:
# Use UDP with pySpark Dataframe and withColumn - UPCASE everything.
def upperCase(str):
    return str.upper()

In [34]:
upperCaseUDF = udf(lambda x: upperCase(x), StringType())
df.withColumn('New Name', upperCaseUDF(col('Name'))).show()

+-----+------------+------------+
|Seqno|        Name|    New Name|
+-----+------------+------------+
|    1|  john jones|  JOHN JONES|
|    2|tracey smith|TRACEY SMITH|
|    3| amy sanders| AMY SANDERS|
+-----+------------+------------+



In [39]:
# Registered a PySpark UDF ahd use it on a SQL
spark.udf.register('convertUDF',convertCase)
spark.udf.register('uppercaseUDF',upperCase)
df.createOrReplaceTempView('people')
spark.sql("Select SEQNO, Name, convertUDF(Name) AS NewName1, uppercaseUDF(Name) AS NewName2 FROM people").show()

+-----+------------+-------------+------------+
|SEQNO|        Name|     NewName1|    NewName2|
+-----+------------+-------------+------------+
|    1|  john jones|  John Jones |  JOHN JONES|
|    2|tracey smith|Tracey Smith |TRACEY SMITH|
|    3| amy sanders| Amy Sanders | AMY SANDERS|
+-----+------------+-------------+------------+



In [43]:
# Using annotation to create UDF.  Skip the 2 step process used previously.
@udf(returnType=StringType())
def upperCase(str):
    return str.upper()

df.withColumn('CureatedName', upperCase(col('Name'))).show()

+-----+------------+------------+
|Seqno|        Name|CureatedName|
+-----+------------+------------+
|    1|  john jones|  JOHN JONES|
|    2|tracey smith|TRACEY SMITH|
|    3| amy sanders| AMY SANDERS|
+-----+------------+------------+



In [44]:
#Special Handling
#One thing to aware is in PySpark/Spark does
#not guarantee the order of evaluation of subexpressions meaning expressions are not guarantee to evaluated left-to-right or in any other 
#fixed order. PySpark reorders the execution for query optimization and planning hence, AND, OR, WHERE and HAVING expression will have side effects.
#So when you are designing and using UDF, you have to be very careful especially with null handling as these results runtime exceptions.

In [49]:
""" 
No guarantee Name is not null will execute first
If convertUDF(Name) like '%John%' execute first then 
you will get runtime error
"""
spark.sql("select Seqno, convertUDF(Name) as Name from people " + \
         "where Name is not null and convertUDF(Name) like '%John%'").show()

+-----+-----------+
|Seqno|       Name|
+-----+-----------+
|    1|John Jones |
+-----+-----------+



In [55]:
""" null check """
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders"),
    ('4',None)]
df2 = spark.createDataFrame(data = data, schema = columns)
df2.show()
df2.createOrReplaceTempView('People2')

+-----+------------+
|Seqno|        Name|
+-----+------------+
|    1|  john jones|
|    2|tracey smith|
|    3| amy sanders|
|    4|        null|
+-----+------------+



In [51]:
# Example of having to be careful when nulls exist.
spark.sql('Select Name, convertUDF(Name) from People2').show()

PythonException: 
  An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace.
Traceback (most recent call last):
  File "c:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
  File "c:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process
  File "c:\Spark\python\lib\pyspark.zip\pyspark\serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "c:\Spark\python\lib\pyspark.zip\pyspark\serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "c:\Spark\python\lib\pyspark.zip\pyspark\serializers.py", line 212, in _batched
    for item in iterator:
  File "c:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 450, in mapper
  File "c:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 450, in <genexpr>
  File "c:\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 90, in <lambda>
  File "c:\Spark\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-6-55a1ceb51a60>", line 4, in convertCase
AttributeError: 'NoneType' object has no attribute 'split'


In [57]:
# Always best practice to check for a null inside a UDF function.
spark.udf.register('_nullsafeUDF', lambda x: convertCase(x) if not x is None else "", StringType())

<function __main__.<lambda>(x)>

In [58]:
spark.sql("select _nullsafeUDF(Name) from People2") \
     .show(truncate=False)

+------------------+
|_nullsafeUDF(Name)|
+------------------+
|John Jones        |
|Tracey Smith      |
|Amy Sanders       |
|                  |
+------------------+



In [59]:
spark.sql("select Seqno, _nullsafeUDF(Name) as Name from People2 " + \
          " where Name is not null and _nullsafeUDF(Name) like '%John%'") \
     .show(truncate=False)

+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+

