# UDF (USER DEFINE FUNCTION)
Spark stores data in dataframes or RDDs—resilient distributed datasets. Think of these like databases. As with a traditional SQL database, e.g. mySQL, you cannot create your own custom function and run that against the database directly. You have to register the function first. That is, save it to the database as if it were one of the built-in database functions, like sum(), average, count(),etc.

Three approaches to UDFs
There are three ways to create UDFs:

df = df.withColumn
df = sqlContext.sql(“sql statement from <df>”)

    
    Below, we create a simple dataframe and RDD. We write a function to convert the only text field in the data structure to an integer. That is something you might do if, for example, you are working with machine learning where all the data must be converted to numbers before you plug that into an algorithm.

Notice the imports below. Refer to those in each example, so you know what object to import for each of the three approaches.

Below is the complete code for Approach 1. First, we look at key sections. Create a dataframe using the usual approach:

# Approach 1: withColumn()


In [20]:
import pyspark
from pyspark import SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType
from pyspark.sql.functions import udf
from pyspark.sql import Row


conf = pyspark.SparkConf() 

 
sc = pyspark.SparkContext.getOrCreate(conf=conf)
spark = SQLContext(sc)

schema = StructType([
    StructField("sales", FloatType(),True),    
    StructField("employee", StringType(),True),
    StructField("ID", IntegerType(),True)
])

data = [[ 10.2, "Fred",123]]

df = spark.createDataFrame(data,schema=schema)



# Now we do two things. First, we create a function colsInt and register it. That registered function calls another function toInt(), which we don’t need to register. The first argument in udf.register(“colsInt”, colsInt) is the name we’ll use to refer to the function. The second is the function we want to register.



In [21]:
colsInt = udf(lambda z: toInt(z), IntegerType())
spark.udf.register("colsInt", colsInt)

def toInt(s):
    if isinstance(s, str) == True:
        st = [str(ord(i)) for i in s]
        return(int(''.join(st)))
    else:
         return Null


df2 = df.withColumn( 'semployee',colsInt('employee'))
df2.show()

+-----+--------+---+----------+
|sales|employee| ID| semployee|
+-----+--------+---+----------+
| 10.2|    Fred|123|1394624364|
+-----+--------+---+----------+



# Approach 2: Using SQL
The first step here is to register the dataframe as a table, so we can run SQL statements against it. df is the dataframe and dftab is the temporary table we create.



In [22]:
spark.registerDataFrameAsTable(df, "dftab")


# Now we create a new dataframe df3 from the existing on df and apply the colsInt function to the employee column.



In [23]:
df3 = spark.sql("select sales, employee, ID, colsInt(employee) as iemployee from dftab")

In [24]:
df3.show()

+-----+--------+---+----------+
|sales|employee| ID| iemployee|
+-----+--------+---+----------+
| 10.2|    Fred|123|1394624364|
+-----+--------+---+----------+

