In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.conf import SparkConf
config = SparkConf()
# config.set("property", "value")
config.setMaster("local").setAppName("UDF")

from pyspark.sql import SparkSession
# spark Session, entry point for Spark SQL, DataFrame
# in single spark driver/note book/spark application,
# there can be many spark sessions, and 
# only one spark context
spark = SparkSession.builder\
                    .config(conf=config)\
                    .getOrCreate()
# spark core operations, like rdd, partitions, actions etc
# spark session shall use catalyst engine, which will use spark context for low level
# code execution
sc = spark.sparkContext

22/05/18 00:07:37 WARN Utils: Your hostname, ubuntu-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.174.129 instead (on interface ens33)
22/05/18 00:07:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/05/18 00:07:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/18 00:07:50 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
# UDF - User Defined Functions, custom functions written in scala,java, python 
# usedin spakr sql, spark dataframe
# python - slow
# scala, java - fast

In [5]:
# Databricks notebook source
# UDF - User Defined Functions
# useful to extend spark sql functions with custom code

power = lambda n : n * n

from pyspark.sql.functions import udf 
from pyspark.sql.types import LongType
# create udf with return type
powerUdf = udf(power, LongType())

# we must register udf in spark session
# udf too private within spark session, udf registered in spark session not avaialble in another spark session
# "power" is udf function name, can be used sql
spark.udf.register("power", powerUdf)


22/05/18 00:13:18 WARN SimpleFunctionRegistry: The function power replaced a previously registered function.


<function __main__.<lambda>(n)>

In [6]:

# power is udf function
spark.sql("SELECT power(5)").show()

[Stage 0:>                                                          (0 + 1) / 1]

+--------+
|power(5)|
+--------+
|      25|
+--------+



                                                                                

In [8]:
# Databricks notebook source
# Databricks notebook source
orders = [ 
          # (product_id, product_name, brand_id, price, qty, discount, taxp)  
         (1, 'iPhone', 100, 1000, 2, 5, 18),
         (2, 'Galaxy', 200, 800, 1, 8, 22),

]
 


orderDf = spark.createDataFrame(data=orders, schema=["product_id", "product_name", "brand_id", "price", "qty", "discount", "taxp"])
orderDf.show()

+----------+------------+--------+-----+---+--------+----+
|product_id|product_name|brand_id|price|qty|discount|taxp|
+----------+------------+--------+-----+---+--------+----+
|         1|      iPhone|     100| 1000|  2|       5|  18|
|         2|      Galaxy|     200|  800|  1|       8|  22|
+----------+------------+--------+-----+---+--------+----+



In [9]:
# UDF to calculate amount
# amount = ( price * qty ) * apply discount * taxp

def calculateAmount(price, qty, discount, taxp):
    a = price * qty
    a = a - (a * discount/100) # discounted price
    amount = a + a * taxp / 100 # with tax
    print ("amount is" , amount) 
    return amount

print(calculateAmount(1000, 2, 5, 18))

amount is 2242.0
2242.0


In [10]:

from pyspark.sql.types import DoubleType
# udf function
calculate = udf(calculateAmount, DoubleType())
# "calculate" is used in spark sql SELECT calculate(...)
spark.udf.register("calculate", calculate)

<function __main__.calculateAmount(price, qty, discount, taxp)>

In [11]:
# use udf in data frame
from pyspark.sql.functions import col
df = orderDf.withColumn("amount", calculate( col("price"), col("qty"), col("discount"), col("taxp")))
df.printSchema()
df.show()

root
 |-- product_id: long (nullable = true)
 |-- product_name: string (nullable = true)
 |-- brand_id: long (nullable = true)
 |-- price: long (nullable = true)
 |-- qty: long (nullable = true)
 |-- discount: long (nullable = true)
 |-- taxp: long (nullable = true)
 |-- amount: double (nullable = true)



amount is 2242.0
amount is 897.92
                                                                                

+----------+------------+--------+-----+---+--------+----+------+
|product_id|product_name|brand_id|price|qty|discount|taxp|amount|
+----------+------------+--------+-----+---+--------+----+------+
|         1|      iPhone|     100| 1000|  2|       5|  18|2242.0|
|         2|      Galaxy|     200|  800|  1|       8|  22|897.92|
+----------+------------+--------+-----+---+--------+----+------+



In [15]:
# create a temp table/view
orderDf.createOrReplaceTempView("orders")

In [16]:
## now apply udf on spark sql

df = spark.sql("SELECT *, calculate(price, qty, discount, taxp) as amount from orders")
df.printSchema()
df.show()

root
 |-- product_id: long (nullable = true)
 |-- product_name: string (nullable = true)
 |-- brand_id: long (nullable = true)
 |-- price: long (nullable = true)
 |-- qty: long (nullable = true)
 |-- discount: long (nullable = true)
 |-- taxp: long (nullable = true)
 |-- amount: double (nullable = true)

+----------+------------+--------+-----+---+--------+----+------+
|product_id|product_name|brand_id|price|qty|discount|taxp|amount|
+----------+------------+--------+-----+---+--------+----+------+
|         1|      iPhone|     100| 1000|  2|       5|  18|2242.0|
|         2|      Galaxy|     200|  800|  1|       8|  22|897.92|
+----------+------------+--------+-----+---+--------+----+------+



amount is 2242.0
amount is 897.92


In [None]:
# DIY: spylon in SCALA
# DIY: udf for power, calculate
