# User Defined Functions in PySpark

In [None]:
from pyspark.sql.functions import col, lit, udf
from pyspark.sql.types import IntegerType

df = spark.read.options(header='True', inferSchema='True').csv('/content/sample_data/OfficeData.csv')
df.show()

# ------------------------ first UDF

def get_total_salary(salary):
  return salary + 100


totalSalaryUDF = udf(lambda x: get_total_salary(x), IntegerType())

df.withColumn("total_salary", totalSalaryUDF(df.salary)).show()






+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+

+-------------+----------+-----+------+---+-----+------------------+
|employee_name|department|state|salary|age|bonus|total_bonus_salary|
+-------------+----------+-----+------+---+-----+------------------+
|        James|     Sales|   NY| 90000| 34|10000|            100000|
|      Michael|     Sales|   NY| 86000| 56|20000|            106000|
|   

In [None]:
# ------------------------------ another UDF

def get_salary_and_bonus(salary,bonus):
  return salary + bonus

totalSalaryUDF_bonus = udf(lambda x,y: get_salary_and_bonus(x,y), IntegerType())

df.withColumn("total_bonus_salary", totalSalaryUDF_bonus(df.salary,df.bonus)).show()

In [None]:
df.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [None]:
from pyspark.sql.types import DoubleType


def get_incr(state, salary, bonus):
  sum = 0
  if state == "NY":
    sum = salary * 0.10
    sum += bonus * 0.05
  elif state == "CA":
    sum = salary * 0.12
    sum += bonus * 0.03
  return sum

incrUDF = udf(lambda x,y,z: get_incr(x,y,z), DoubleType())

df.withColumn("increment", incrUDF(df.state, df.salary, df.bonus)).show()

+-------------+----------+-----+------+---+-----+---------+
|employee_name|department|state|salary|age|bonus|increment|
+-------------+----------+-----+------+---+-----+---------+
|        James|     Sales|   NY| 90000| 34|10000|   9500.0|
|      Michael|     Sales|   NY| 86000| 56|20000|   9600.0|
|       Robert|     Sales|   CA| 81000| 30|23000|  10410.0|
|        Maria|   Finance|   CA| 90000| 24|23000|  11490.0|
|        Raman|   Finance|   CA| 99000| 40|24000|  12600.0|
|        Scott|   Finance|   NY| 83000| 36|19000|   9250.0|
|          Jen|   Finance|   NY| 79000| 53|15000|   8650.0|
|         Jeff| Marketing|   CA| 80000| 25|18000|  10140.0|
|        Kumar| Marketing|   NY| 91000| 50|21000|  10150.0|
+-------------+----------+-----+------+---+-----+---------+



# cache() in PySpark

In PySpark, cache() is used to persist a DataFrame or RDD in memory, so that future actions on that DataFrame are faster. Normally, when you perform transformations (like filter, select, or groupBy) on a DataFrame in PySpark, those transformations are lazy — meaning they are not executed until an action (like count() or show()) is called. Without caching, each time an action is triggered, all the transformations are recalculated from the original data source.

Why use cache()?
When you call df.cache(), PySpark stores the DataFrame (or RDD) in memory, meaning it keeps the intermediate result in memory after the first action is executed. Subsequent actions on this DataFrame can reuse the cached data, instead of recalculating the entire sequence of transformations. This can result in significant performance improvements, especially when you plan to reuse the DataFrame multiple times.

Example:
python
Copy code
# Caching the DataFrame
df.cache()

# First action - triggers the caching
df.count()

# Subsequent actions will be faster as they use the cached data
df.show()
Key Points:
When to use: Use cache() when you have a DataFrame that you are going to use multiple times in your workflow.
Storage: By default, cache() stores the DataFrame in memory (as deserialized Java objects). If the data does not fit into memory, it will spill to disk.
Persistence levels: If you need more control over where the data is cached (memory or disk), you can use the persist() method, which allows you to specify the storage level (e.g., memory-only, memory-and-disk, etc.).
Benefits:
Improved performance for repeated operations on the same DataFrame.
Avoid recomputation of expensive transformations.

In [None]:
df.cache()

DataFrame[employee_name: string, department: string, state: string, salary: int, age: int, bonus: int]

In [None]:
# COMMAND ----------

df.show()

# COMMAND ----------

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



# Convert data from dataframe to RDD

In [None]:
df.show()

# COMMAND ----------

type(df)



+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [None]:
# COMMAND ----------

rdd = df.rdd

# COMMAND ----------

type(rdd)



In [None]:
# COMMAND ----------

rdd.collect()

# COMMAND ----------

rdd.filter(lambda x: x[2] == 'NY' ).collect()




[Row(employee_name='James', department='Sales', state='NY', salary=90000, age=34, bonus=10000),
 Row(employee_name='Michael', department='Sales', state='NY', salary=86000, age=56, bonus=20000),
 Row(employee_name='Scott', department='Finance', state='NY', salary=83000, age=36, bonus=19000),
 Row(employee_name='Jen', department='Finance', state='NY', salary=79000, age=53, bonus=15000),
 Row(employee_name='Kumar', department='Marketing', state='NY', salary=91000, age=50, bonus=21000)]

In [None]:
# COMMAND ----------

rdd.filter(lambda x: x["salary"] >87000 ).collect()

[Row(employee_name='James', department='Sales', state='NY', salary=90000, age=34, bonus=10000),
 Row(employee_name='Maria', department='Finance', state='CA', salary=90000, age=24, bonus=23000),
 Row(employee_name='Raman', department='Finance', state='CA', salary=99000, age=40, bonus=24000),
 Row(employee_name='Kumar', department='Marketing', state='NY', salary=91000, age=50, bonus=21000)]

# Spark SQL

In [None]:

df = spark.read.options(header='True', inferSchema='True').csv('/content/sample_data/StudentData_.csv')
df.show()

# COMMAND ----------

df.createOrReplaceTempView("Student")

# COMMAND ----------


+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [None]:
# Student.collect()   ----> Not running
# Student.show()   ----> Not running

In [None]:

spark.sql("select course, gender, count(*) from Student group by course, gender").show()
df.groupBy("course", "gender").count().show()

+------+------+--------+
|course|gender|count(1)|
+------+------+--------+
|   OOP|  Male|      70|
|    DB|  Male|      82|
| Cloud|Female|     106|
|  NULL|  NULL|       1|
|   MVC|  Male|      86|
|   DSA|Female|      98|
|   DSA| Male |       1|
|    PF|  Male|      97|
|   MVC|Female|      71|
| Cloud|  Male|      86|
|    PF|Female|      69|
|   DSA|  Male|      78|
|    DB|Female|      75|
|   OOP|Female|      82|
+------+------+--------+

+------+------+-----+
|course|gender|count|
+------+------+-----+
|   OOP|  Male|   70|
|    DB|  Male|   82|
| Cloud|Female|  106|
|  NULL|  NULL|    1|
|   MVC|  Male|   86|
|   DSA|Female|   98|
|   DSA| Male |    1|
|    PF|  Male|   97|
|   MVC|Female|   71|
| Cloud|  Male|   86|
|    PF|Female|   69|
|   DSA|  Male|   78|
|    DB|Female|   75|
|   OOP|Female|   82|
+------+------+-----+



# Write DataFrame in different file

In [None]:
df.show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 

In [None]:
df.write.mode("overwrite").options(header='True').csv('/content/sample_data/output_sachin')

# COMMAND ----------


In [None]:

df = spark.read.options(header='True', inferSchema='True').csv('/content/sample_data/output_sachin')
df.show()

+---+------+----------------+------+------+-----+--------------------+
|age|gender|            name|course|  roll|marks|               email|
+---+------+----------------+------+------+-----+--------------------+
| 28|Female| Hubert Oliveras|    DB|  2984|   59|Annika Hoffman_Na...|
| 29|Female|Toshiko Hillyard| Cloud| 12899|   62|Margene Moores_Ma...|
| 28|  Male|  Celeste Lollis|    PF| 21267|   45|Jeannetta Golden_...|
| 29|Female|    Elenore Choy|    DB| 32877|   29|Billi Clore_Mitzi...|
| 28|  Male|  Sheryll Towler|   DSA| 41487|   41|Claude Panos_Judi...|
| 28|  Male|  Margene Moores|   MVC| 52771|   32|Toshiko Hillyard_...|
| 28|  Male|     Neda Briski|   OOP| 61973|   69|Alberta Freund_El...|
| 28|Female|    Claude Panos| Cloud| 72409|   85|Sheryll Towler_Al...|
| 28|  Male|  Celeste Lollis|   MVC| 81492|   64|Nicole Harwood_Cl...|
| 29|  Male|  Cordie Harnois|   OOP| 92882|   51|Judie Chipps_Clem...|
| 29|Female|       Kena Wild|   DSA|102285|   35|Dustin Feagins_Ma...|
| 29| 