In [1]:
# 2 types of user defined fucntions

# UDF with standard API

# Installing the Pyspark Library
!pip install -q findspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=ec5e94a66d365a98a91832b68ca9b6461fb2c58170161df99d66e40b4050d1e5
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [42]:
# Import Important libraries for creating SparkSession.
from pyspark.sql import SparkSession, window
from pyspark import SparkConf
from pyspark.sql.functions import *


In [43]:
my_conf = SparkConf()
my_conf.set("spark.app.name", "my_udf")
my_conf.set("Spark.master", "local[*]") # this is Specific to the node but works in your local machine and also works in platforms like google colab

<pyspark.conf.SparkConf at 0x7f397cddc430>

In [44]:
# Creating a Spark Sesssion in Standard form
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()

In [45]:
# Loading A File in Spark Standard format.
inputDF = spark.read.format("csv").option("path","/content/dataset1").load()

In [46]:
inputDF.show()

+-------+---+---------+
|    _c0|_c1|      _c2|
+-------+---+---------+
|  sumit| 30|bangalore|
|  kapil| 32|hyderabad|
|sathish| 16|  chennai|
|   ravi| 39|bangalore|
| kavita| 12|hyderabad|
|  kavya| 19|   mysore|
+-------+---+---------+



In [47]:
# So, Now lets attach a schema to our Dataframe.

SchemaDF = inputDF.toDF("name", "age", "city")

In [48]:
SchemaDF.show()

+-------+---+---------+
|   name|age|     city|
+-------+---+---------+
|  sumit| 30|bangalore|
|  kapil| 32|hyderabad|
|sathish| 16|  chennai|
|   ravi| 39|bangalore|
| kavita| 12|hyderabad|
|  kavya| 19|   mysore|
+-------+---+---------+



In [49]:
# Printing the Schema
SchemaDF.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- city: string (nullable = true)



In [50]:
# I want to get who are the adults in my dataset and what is there name and which city they belong. 
# I am using definfing a user defined fucntions with condition if age > 18 then return Yes or else No. Let's create a UDF
# Note: Everytime we create a udf we need to register the UDF.

def ageCheck(age):
  if age > '18':
    return 'Yes'
  else:
    return 'No'

In [51]:
# Lets Register our UDF in regular Format.
# Register takes 3 parameters (Name, function_name, type)
spark.udf.register("parseAgeCheck", ageCheck, StringType())

<function __main__.ageCheck(age)>

In [54]:
# Now i need to pass the function to my input and print a new column along the existing colums with yes or No data.

functionDF = SchemaDF.withColumn("Adult",expr("parseAgeCheck(age)"))

In [59]:
# Using grouped Aggregations 
groupedDF = functionDF.groupBy("city").count()

In [60]:
groupedDF.show()

+---------+-----+
|     city|count|
+---------+-----+
|  chennai|    1|
|bangalore|    2|
|hyderabad|    2|
|   mysore|    1|
+---------+-----+



In [61]:
functionDF.select("name","Adult", "city").show()

+-------+-----+---------+
|   name|Adult|     city|
+-------+-----+---------+
|  sumit|  Yes|bangalore|
|  kapil|  Yes|hyderabad|
|sathish|   No|  chennai|
|   ravi|  Yes|bangalore|
| kavita|   No|hyderabad|
|  kavya|  Yes|   mysore|
+-------+-----+---------+



In [63]:
# Lets also see another type of User defined Function which is SQL Standard
# we will use the same fucntion which has created above but with small changes.

In [64]:
# here the age is of string type i want to make to int type and apply to function
SchemaDF.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- city: string (nullable = true)



In [65]:
# Changing the type of a column 
# we can use withcolumn to change the type.
Changed_type = SchemaDF.withColumn("age",col("age").cast("Integer"))

In [66]:
Changed_type.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)



In [67]:
#now it's itenger lets write our function and register it based on sql/string expression standard

def ageCheck2(age):
  if age > 18 :
    return 'Y'
  else:
    return 'N'

In [69]:
# Registering the function:
parseAgeCheck2 = udf(ageCheck2,StringType())

In [73]:
Changed_type.withColumn("Adult", parseAgeCheck2("age"))

DataFrame[name: string, age: int, city: string, Adult: string]

In [74]:
Changed_type.show()

+-------+---+---------+
|   name|age|     city|
+-------+---+---------+
|  sumit| 30|bangalore|
|  kapil| 32|hyderabad|
|sathish| 16|  chennai|
|   ravi| 39|bangalore|
| kavita| 12|hyderabad|
|  kavya| 19|   mysore|
+-------+---+---------+



In [108]:
#lets look at another example for making each first letter of name to capital

def capitalize(name):
  return name[0].upper() + name[1:]
  

In [111]:
# Register the function

capitalized = udf(capitalize, StringType())

In [101]:
Changed_type.select(capitalized(col('name')).alias("name"),col("age"),col("city")).show()

+-------+---+---------+
|   name|age|     city|
+-------+---+---------+
|  Sumit| 30|bangalore|
|  Kapil| 32|hyderabad|
|Sathish| 16|  chennai|
|   Ravi| 39|bangalore|
| Kavita| 12|hyderabad|
|  Kavya| 19|   mysore|
+-------+---+---------+



In [102]:
#you can also achieve the same thing with Spark Sql just by converting the dataframe to view using createOrReplaceTempView()

Changed_type.createOrReplaceTempView("Person")

In [113]:
# Simple Just like That
spark.sql("select name, age, city from Person").show()

+-------+---+---------+
|   name|age|     city|
+-------+---+---------+
|  sumit| 30|bangalore|
|  kapil| 32|hyderabad|
|sathish| 16|  chennai|
|   ravi| 39|bangalore|
| kavita| 12|hyderabad|
|  kavya| 19|   mysore|
+-------+---+---------+

