In [37]:
######### spark session

# Ref : https://github.com/subhamkharwal/pyspark-zero-to-hero/blob/master/05_sort_union_aggregation.ipynb

In [1]:
from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .appName("Spark Session")
         .master("local[*]")
         .getOrCreate()
        )

25/06/04 23:17:52 WARN Utils: Your hostname, prashan resolves to a loopback address: 127.0.1.1; using 192.168.29.201 instead (on interface wlp0s20f3)
25/06/04 23:17:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/04 23:17:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
spark #print session

In [21]:
#Create Dataframe
data = [[1, "ABC", 2000], [2, "DEF", 4000], [3, "GGGG", 3000], [4, 'HHHH', 7000], [5, 'IIIII', 4500]]
df = spark.createDataFrame(data=data, schema="id int, name string, salary int")

In [22]:
df.show() #action

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|  ABC|  2000|
|  2|  DEF|  4000|
|  3| GGGG|  3000|
|  4| HHHH|  7000|
|  5|IIIII|  4500|
+---+-----+------+



In [23]:
#filter
df.where("salary > 5000").show() 

+---+----+------+
| id|name|salary|
+---+----+------+
|  4|HHHH|  7000|
+---+----+------+



In [25]:
df.rdd.getNumPartitions() # num of partti

8

In [29]:
df.write.format('csv').save("test.csv") #action

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/home/prashant/pivot/personal/tech/pyspark_samples/test.csv already exists. Set mode as "overwrite" to overwrite the existing path.

In [38]:
# Get Active Session and Rename

spark_new = spark.getActiveSession()
spark_new

In [39]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: integer (nullable = true)



In [40]:
df.schema

StructType([StructField('id', IntegerType(), True), StructField('name', StringType(), True), StructField('salary', IntegerType(), True)])

In [41]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField('id', IntegerType(), False),
    StructField('name', StringType(), True),
    StructField('salary', IntegerType(), True)
]
)

In [42]:
df1 = spark_new.createDataFrame(data=data, schema=schema)

In [43]:
df1.show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|  ABC|  2000|
|  2|  DEF|  4000|
|  3| GGGG|  3000|
|  4| HHHH|  7000|
|  5|IIIII|  4500|
+---+-----+------+



In [44]:
schema_str = "id int, name string, salary int"
df1 = spark_new.createDataFrame(data=data, schema=schema_str)

In [45]:
df1.show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|  ABC|  2000|
|  2|  DEF|  4000|
|  3| GGGG|  3000|
|  4| HHHH|  7000|
|  5|IIIII|  4500|
+---+-----+------+



In [46]:
from pyspark.sql.functions import expr, col
df1.select(col("id"), df1.name, expr("salary")).show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|  ABC|  2000|
|  2|  DEF|  4000|
|  3| GGGG|  3000|
|  4| HHHH|  7000|
|  5|IIIII|  4500|
+---+-----+------+



In [48]:
df1.selectExpr("cast(id as int) as id", "name", "salary").show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|  ABC|  2000|
|  2|  DEF|  4000|
|  3| GGGG|  3000|
|  4| HHHH|  7000|
|  5|IIIII|  4500|
+---+-----+------+



In [52]:
df2 = df1.withColumn("tax", col("salary") * 0.2)

In [54]:
df2.withColumnRenamed("tax", "taxx").show()

+---+-----+------+------+
| id| name|salary|  taxx|
+---+-----+------+------+
|  1|  ABC|  2000| 400.0|
|  2|  DEF|  4000| 800.0|
|  3| GGGG|  3000| 600.0|
|  4| HHHH|  7000|1400.0|
|  5|IIIII|  4500| 900.0|
+---+-----+------+------+



In [56]:
df1.drop("tax").show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|  ABC|  2000|
|  2|  DEF|  4000|
|  3| GGGG|  3000|
|  4| HHHH|  7000|
|  5|IIIII|  4500|
+---+-----+------+



In [58]:
from pyspark.sql.functions import lit
columns = {
    "col1" : lit(1),
    "tax" : expr("salary") * 0.2,
    "col2" : lit("col2")
}

In [69]:
df2.withColumns(columns).show()

+---+-----+------+------+----+------+----+
| id| name|salary| bonus|col1|   tax|col2|
+---+-----+------+------+----+------+----+
|  1|  ABC|  2000| 400.0|   1| 400.0|col2|
|  2|  DEF|  4000| 800.0|   1| 800.0|col2|
|  3| GGGG|  3000| 600.0|   1| 600.0|col2|
|  4| HHHH|  7000|1050.0|   1|1400.0|col2|
|  5|IIIII|  4500| 675.0|   1| 900.0|col2|
+---+-----+------+------+----+------+----+



In [70]:
from pyspark.sql.functions import when

In [72]:
df2 = df2.withColumn("bonus", 
               when(col("salary") <= 4000, col("salary") * 0.2)
               .when(col("salary")>4000, col("salary") * 0.15)
               .otherwise(col("salary")*1)
              )

In [73]:
df2.show()

+---+-----+------+------+
| id| name|salary| bonus|
+---+-----+------+------+
|  1|  ABC|  2000| 400.0|
|  2|  DEF|  4000| 800.0|
|  3| GGGG|  3000| 600.0|
|  4| HHHH|  7000|1050.0|
|  5|IIIII|  4500| 675.0|
+---+-----+------+------+



In [78]:
df2.withColumn("tax", expr("case when salary <= 4000 then (salary * 0.1) when salary > 4000 then (salary * 0.2) else 0 end")).show()

+---+-----+------+------+------+
| id| name|salary| bonus|   tax|
+---+-----+------+------+------+
|  1|  ABC|  2000| 400.0| 200.0|
|  2|  DEF|  4000| 800.0| 400.0|
|  3| GGGG|  3000| 600.0| 300.0|
|  4| HHHH|  7000|1050.0|1400.0|
|  5|IIIII|  4500| 675.0| 900.0|
+---+-----+------+------+------+



In [None]:
# UNION 
'''
Column names, datatypes and sequence of columns of both the dataframes should be same.
union all - keeps duplicate rows
union - keeps distinct records
unionByName - works with diff column seq but number of cols and datatype should be same
'''

In [3]:
# Emp Data & Schema

emp_data_1 = [
    ["001","101","John Doe","30","Male","50000","2015-01-01"],
    ["002","101","Jane Smith","25","Female","45000","2016-02-15"],
    ["003","102","Bob Brown","35","Male","55000","2014-05-01"],
    ["004","102","Alice Lee","28","Female","48000","2017-09-30"],
    ["005","103","Jack Chan","40","Male","60000","2013-04-01"],
    ["006","103","Jill Wong","32","Female","52000","2018-07-01"],
    ["007","101","James Johnson","42","Male","70000","2012-03-15"],
    ["008","102","Kate Kim","29","Female","51000","2019-10-01"],
    ["009","103","Tom Tan","33","Male","58000","2016-06-01"],
    ["010","104","Lisa Lee","27","Female","47000","2018-08-01"]
]

emp_data_2 = [
    ["011","104","David Park","38","Male","65000","2015-11-01"],
    ["012","105","Susan Chen","31","Female","54000","2017-02-15"],
    ["013","106","Brian Kim","45","Male","75000","2011-07-01"],
    ["014","107","Emily Lee","26","Female","46000","2019-01-01"],
    ["015","106","Michael Lee","37","Male","63000","2014-09-30"],
    ["016","107","Kelly Zhang","30","Female","49000","2018-04-01"],
    ["017","105","George Wang","34","Male","57000","2016-03-15"],
    ["018","104","Nancy Liu","29","","50000","2017-06-01"],
    ["019","103","Steven Chen","36","Male","62000","2015-08-01"],
    ["020","102","Grace Kim","32","Female","53000","2018-11-01"]
]

emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"

In [4]:
emp_df1 = spark_new.createDataFrame(data=emp_data_1, schema = emp_schema)
emp_df2 = spark_new.createDataFrame(data=emp_data_2, schema = emp_schema)

NameError: name 'spark_new' is not defined

In [85]:
emp_df = emp_df1.union(emp_df2)

In [86]:
emp_df.show()

+-----------+-------------+-------------+---+------+------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|
+-----------+-------------+-------------+---+------+------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|
|        011|          104|   David Park| 38|  Male| 65000|2015-11-01|
|     

In [90]:
from pyspark.sql.functions import coalesce, lit
emp_df_new = emp_df.withColumn("new_gender", coalesce("gender", lit("O")))

In [91]:
emp_df_new.show()

+-----------+-------------+-------------+---+------+------+----------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|new_gender|
+-----------+-------------+-------------+---+------+------+----------+----------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|      Male|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|    Female|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|      Male|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|    Female|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|      Male|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|    Female|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|      Male|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|    Female|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|      Male|
|        010|   

In [94]:
from pyspark.sql.functions import count
emp_df_new.groupBy("department_id").agg(count("employee_id")).alias("employee_count").show()

+-------------+------------------+
|department_id|count(employee_id)|
+-------------+------------------+
|          101|                 3|
|          102|                 4|
|          103|                 4|
|          104|                 3|
|          105|                 2|
|          106|                 2|
|          107|                 2|
+-------------+------------------+



In [98]:
# distinct / unique 
emp_df = emp_df.distinct()
emp_df.select("department_id").distinct().show()

+-------------+
|department_id|
+-------------+
|          101|
|          102|
|          103|
|          104|
|          105|
|          106|
|          107|
+-------------+



In [102]:
# window function
from pyspark.sql.window import Window
from pyspark.sql.functions import max, desc, col

window_func = Window.partitionBy(col("department_id")).orderBy(col("salary").desc())
max_func = max(col("salary")).over(window_func)

emp_df.withColumn("max_salary", max_func).show()

+-----------+-------------+-------------+---+------+------+----------+----------+
|employee_id|department_id|         name|age|gender|salary| hire_date|max_salary|
+-----------+-------------+-------------+---+------+------+----------+----------+
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|     70000|
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|     70000|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|     70000|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|     55000|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|     55000|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|     55000|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|     55000|
|        019|          103|  Steven Chen| 36|  Male| 62000|2015-08-01|     62000|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|     62000|
|        009|   

In [111]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, desc

window_func = Window.partitionBy(col("department_id")).orderBy(col("salary").desc())
rn_func = row_number().over(window_func)
emp_df.withColumn("rn", rn_func).where("rn==2").show()

+-----------+-------------+-----------+---+------+------+----------+---+
|employee_id|department_id|       name|age|gender|salary| hire_date| rn|
+-----------+-------------+-----------+---+------+------+----------+---+
|        001|          101|   John Doe| 30|  Male| 50000|2015-01-01|  2|
|        020|          102|  Grace Kim| 32|Female| 53000|2018-11-01|  2|
|        005|          103|  Jack Chan| 40|  Male| 60000|2013-04-01|  2|
|        018|          104|  Nancy Liu| 29|      | 50000|2017-06-01|  2|
|        012|          105| Susan Chen| 31|Female| 54000|2017-02-15|  2|
|        015|          106|Michael Lee| 37|  Male| 63000|2014-09-30|  2|
|        014|          107|  Emily Lee| 26|Female| 46000|2019-01-01|  2|
+-----------+-------------+-----------+---+------+------+----------+---+



In [122]:
# alternative to window function is expr

emp_df.withColumn("rn", expr("row_number() over(partition by department_id order by salary desc)")).where("rn == 2").show()

+-----------+-------------+-----------+---+------+------+----------+---+
|employee_id|department_id|       name|age|gender|salary| hire_date| rn|
+-----------+-------------+-----------+---+------+------+----------+---+
|        001|          101|   John Doe| 30|  Male| 50000|2015-01-01|  2|
|        020|          102|  Grace Kim| 32|Female| 53000|2018-11-01|  2|
|        005|          103|  Jack Chan| 40|  Male| 60000|2013-04-01|  2|
|        018|          104|  Nancy Liu| 29|      | 50000|2017-06-01|  2|
|        012|          105| Susan Chen| 31|Female| 54000|2017-02-15|  2|
|        015|          106|Michael Lee| 37|  Male| 63000|2014-09-30|  2|
|        014|          107|  Emily Lee| 26|Female| 46000|2019-01-01|  2|
+-----------+-------------+-----------+---+------+------+----------+---+



In [132]:
# repartition vs coalesce
'''
repartition : 
can increase or decrease num of partitions, 
full shuffle of data
coalesce : 
can only decrease num of partitions, 
No full shuffle of data merges adjacent partitions
'''

from pyspark.sql.functions import spark_partition_id

In [128]:
emp_df.rdd.getNumPartitions()


1

In [129]:
emp_df.repartition(100)

DataFrame[employee_id: string, department_id: string, name: string, age: string, gender: string, salary: string, hire_date: string]

In [130]:
emp_df.rdd.getNumPartitions()

1

In [135]:
emp_df.withColumn("partition_id", spark_partition_id()).show()

+-----------+-------------+-------------+---+------+------+----------+------------+
|employee_id|department_id|         name|age|gender|salary| hire_date|partition_id|
+-----------+-------------+-------------+---+------+------+----------+------------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|           0|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|           0|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|           0|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|           0|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|           0|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|           0|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|           0|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|           0|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|      

In [137]:
emp_df.repartition(4, "department_id").withColumn("partition_id", spark_partition_id()).show()

+-----------+-------------+-------------+---+------+------+----------+------------+
|employee_id|department_id|         name|age|gender|salary| hire_date|partition_id|
+-----------+-------------+-------------+---+------+------+----------+------------+
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|           0|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|           0|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|           0|
|        014|          107|    Emily Lee| 26|Female| 46000|2019-01-01|           0|
|        016|          107|  Kelly Zhang| 30|Female| 49000|2018-04-01|           0|
|        020|          102|    Grace Kim| 32|Female| 53000|2018-11-01|           0|
|        012|          105|   Susan Chen| 31|Female| 54000|2017-02-15|           1|
|        017|          105|  George Wang| 34|  Male| 57000|2016-03-15|           1|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-08-01|      

In [None]:
#cascade join
'''
emp.alias("e").join(dept.alias(d), 
                    how = 'left_outer', 
                    on = (emp.department_id == dept.department_id) & ((emp.department_id == "101") | (emp.department_id == "102"))
                   )
'''

In [138]:
emp_df.write.format("csv").save("employee.csv")

In [3]:
#creates 1 job in bg as it reads only 1 record to read metadata 
emp_df = spark.read.format("csv").load("employee.csv") 

In [4]:
emp_df.schema

StructType([StructField('_c0', StringType(), True), StructField('_c1', StringType(), True), StructField('_c2', StringType(), True), StructField('_c3', StringType(), True), StructField('_c4', StringType(), True), StructField('_c5', StringType(), True), StructField('_c6', StringType(), True)])

In [5]:
#creates 1 job in bg as it reads only 1 record to read metadata 
emp_df = spark.read.format("csv").option("header", True).load("employee.csv")

In [10]:
#creates 2 jobs in bg 1 to read header and 2nd to load entire data to get metadata
emp_df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("employee_data.csv")

In [11]:
emp_df.schema

StructType([StructField('id', IntegerType(), True), StructField('department_id', IntegerType(), True), StructField('_c2', StringType(), True), StructField('age', IntegerType(), True), StructField('gender', StringType(), True), StructField('salary', IntegerType(), True), StructField('joining_date', DateType(), True)])

In [15]:
#creates 0 job as metadata is already provided as schema
schema_str = "id int, department_id int, name string, age int, gender string, salary int, joining_date date"
emp_df = spark.read.format("csv").schema(schema_str).load("employee_data.csv")

In [17]:
emp_df = spark.read.format("csv").option("header", True).schema(schema_str).load("employee_data.csv")

In [18]:
emp_df.show()

+---+-------------+-------------+---+------+------+------------+
| id|department_id|          _c2|age|gender|salary|joining_date|
+---+-------------+-------------+---+------+------+------------+
|  1|          101|     John Doe| 30|  Male| 50000|  2015-01-01|
|  2|          101|   Jane Smith| 25|Female| 45000|  2016-02-15|
|  3|          102|    Bob Brown| 35|  Male| 55000|  2014-05-01|
|  5|          103|    Jack Chan| 40|  Male| 60000|  2013-04-01|
|  4|          102|    Alice Lee| 28|Female| 48000|  2017-09-30|
|  6|          103|    Jill Wong| 32|Female| 52000|  2018-07-01|
|  7|          101|James Johnson| 42|  Male| 70000|  2012-03-15|
|  8|          102|     Kate Kim| 29|Female| 51000|  2019-10-01|
| 10|          104|     Lisa Lee| 27|Female| 47000|  2018-08-01|
|  9|          103|      Tom Tan| 33|  Male| 58000|  2016-06-01|
| 11|          104|   David Park| 38|  Male| 65000|  2015-11-01|
| 12|          105|   Susan Chen| 31|Female| 54000|  2017-02-15|
| 13|          106|    Br

25/05/29 23:07:48 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: id, department_id, , age, gender, salary, joining_date
 Schema: id, department_id, _c2, age, gender, salary, joining_date
Expected: _c2 but found: 
CSV file: file:///home/prashant/pivot/personal/tech/pyspark_samples/employee_data.csv


In [19]:
'''
PERMISSIVE mode - mark corrupt record under _corrupt_record column
FAIL
'''
schema_str = "id int, department_id int, name string, age int, gender string, salary int, joining_date date, _corrupt_record string"
emp_df = spark.read.format("csv").option("header", True).option("mode", "PERMISSIVE").schema(schema_str).load("employee_data.csv")

In [20]:
emp_df.show()

+---+-------------+-------------+---+------+------+------------+---------------+
| id|department_id|         name|age|gender|salary|joining_date|_corrupt_record|
+---+-------------+-------------+---+------+------+------------+---------------+
|  1|          101|     John Doe| 30|  Male| 50000|  2015-01-01|           NULL|
|  2|          101|   Jane Smith| 25|Female| 45000|  2016-02-15|           NULL|
|  3|          102|    Bob Brown| 35|  Male| 55000|  2014-05-01|           NULL|
|  5|          103|    Jack Chan| 40|  Male| 60000|  2013-04-01|           NULL|
|  4|          102|    Alice Lee| 28|Female| 48000|  2017-09-30|           NULL|
|  6|          103|    Jill Wong| 32|Female| 52000|  2018-07-01|           NULL|
|  7|          101|James Johnson| 42|  Male| 70000|  2012-03-15|           NULL|
|  8|          102|     Kate Kim| 29|Female| 51000|  2019-10-01|           NULL|
| 10|          104|     Lisa Lee| 27|Female| 47000|  2018-08-01|           NULL|
|  9|          103|      Tom

25/05/29 23:52:48 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: id, department_id, , age, gender, salary, joining_date
 Schema: id, department_id, name, age, gender, salary, joining_date
Expected: name but found: 
CSV file: file:///home/prashant/pivot/personal/tech/pyspark_samples/employee_data.csv


In [6]:
''' use different name to mark bad_record '''
schema_str = "id int, department_id int, name string, age int, gender string, salary int, joining_date date, bad_record string"
emp_df = spark.read.format("csv").option("header", True).option("columnNameOfCorruptRecord", "bad_record").schema(schema_str).load("employee_data.csv")

In [12]:
emp_df.show()

+---+-------------+-------------+---+------+------+------------+
| id|department_id|         name|age|gender|salary|joining_date|
+---+-------------+-------------+---+------+------+------------+
|  1|          101|     John Doe| 30|  Male| 50000|  2015-01-01|
|  2|          101|   Jane Smith| 25|Female| 45000|  2016-02-15|
|  3|          102|    Bob Brown| 35|  Male| 55000|  2014-05-01|
|  5|          103|    Jack Chan| 40|  Male| 60000|  2013-04-01|
|  4|          102|    Alice Lee| 28|Female| 48000|  2017-09-30|
|  6|          103|    Jill Wong| 32|Female| 52000|  2018-07-01|
|  7|          101|James Johnson| 42|  Male| 70000|  2012-03-15|
|  8|          102|     Kate Kim| 29|Female| 51000|  2019-10-01|
| 10|          104|     Lisa Lee| 27|Female| 47000|  2018-08-01|
|  9|          103|      Tom Tan| 33|  Male| 58000|  2016-06-01|
| 11|          104|   David Park| 38|  Male| 65000|  2015-11-01|
| 12|          105|   Susan Chen| 31|Female| 54000|  2017-02-15|
| 13|          106|    Br

25/05/30 13:37:58 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: id, department_id, , age, gender, salary, joining_date
 Schema: id, department_id, name, age, gender, salary, joining_date
Expected: name but found: 
CSV file: file:///home/prashant/pivot/personal/tech/pyspark_samples/employee_data.csv


In [7]:
''' DROPMALFORMED  drop corrupt records '''

emp_df = spark.read.format("csv").option("header", True).option("mode", "DROPMALFORMED").schema(schema_str).load("employee_data.csv")

In [10]:
''' FAILFAST : fail data read if any corrupt record is found '''
schema_str = "id int, department_id int, name string, age int, gender string, salary int, joining_date date"
emp_df = spark.read.format("csv").option("header", True).option("mode", "FAILFAST").schema(schema_str).load("employee_data.csv")

In [11]:
emp_df.show()

25/05/30 06:07:00 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: id, department_id, , age, gender, salary, joining_date
 Schema: id, department_id, name, age, gender, salary, joining_date
Expected: name but found: 
CSV file: file:///home/prashant/pivot/personal/tech/pyspark_samples/employee_data.csv


+---+-------------+-------------+---+------+------+------------+
| id|department_id|         name|age|gender|salary|joining_date|
+---+-------------+-------------+---+------+------+------------+
|  1|          101|     John Doe| 30|  Male| 50000|  2015-01-01|
|  2|          101|   Jane Smith| 25|Female| 45000|  2016-02-15|
|  3|          102|    Bob Brown| 35|  Male| 55000|  2014-05-01|
|  5|          103|    Jack Chan| 40|  Male| 60000|  2013-04-01|
|  4|          102|    Alice Lee| 28|Female| 48000|  2017-09-30|
|  6|          103|    Jill Wong| 32|Female| 52000|  2018-07-01|
|  7|          101|James Johnson| 42|  Male| 70000|  2012-03-15|
|  8|          102|     Kate Kim| 29|Female| 51000|  2019-10-01|
| 10|          104|     Lisa Lee| 27|Female| 47000|  2018-08-01|
|  9|          103|      Tom Tan| 33|  Male| 58000|  2016-06-01|
| 11|          104|   David Park| 38|  Male| 65000|  2015-11-01|
| 12|          105|   Susan Chen| 31|Female| 54000|  2017-02-15|
| 13|          106|    Br

In [6]:
spark.sparkContext.defaultParallelism

8

In [5]:
emp_data_1 = [
    ["001","101","John Doe","30","Male","50000","2015-01-01"],
    ["002","101","Jane Smith","25","Female","45000","2016-02-15"],
    ["003","102","Bob Brown","35","Male","55000","2014-05-01"],
    ["004","102","Alice Lee","28","Female","48000","2017-09-30"],
    ["005","103","Jack Chan","40","Male","60000","2013-04-01"],
    ["006","103","Jill Wong","32","Female","52000","2018-07-01"],
    ["007","101","James Johnson","42","Male","70000","2012-03-15"],
    ["008","102","Kate Kim","29","Female","51000","2019-10-01"],
    ["009","103","Tom Tan","33","Male","58000","2016-06-01"],
    ["010","104","Lisa Lee","27","Female","47000","2018-08-01"]
]

emp_data_2 = [
    ["011","104","David Park","38","Male","65000","2015-11-01"],
    ["012","105","Susan Chen","31","Female","54000","2017-02-15"],
    ["013","106","Brian Kim","45","Male","75000","2011-07-01"],
    ["014","107","Emily Lee","26","Female","46000","2019-01-01"],
    ["015","106","Michael Lee","37","Male","63000","2014-09-30"],
    ["016","107","Kelly Zhang","30","Female","49000","2018-04-01"],
    ["017","105","George Wang","34","Male","57000","2016-03-15"],
    ["018","104","Nancy Liu","29","","50000","2017-06-01"],
    ["019","103","Steven Chen","36","Male","62000","2015-08-01"],
    ["020","102","Grace Kim","32","Female","53000","2018-11-01"]
]
emp_data_1.extend(emp_data_2)

In [6]:
schema_str = "id string, department_id string, name string, age string, gender string, salary string, joining_date string"
emp_df = spark.createDataFrame(data=emp_data_1, schema=schema_str)

In [13]:
emp_df.rdd.getNumPartitions()

8

In [14]:
'''
8 files got created in the emp_data.csv folder, as each partition wrote its file independently at destination location'''
emp_df.write.format("csv").save("emp_data.csv")

                                                                                

In [16]:
'''
8 cores holds 8 partitions ie. dataframe is divided in 8 parts and then groupby is applied 
each core applies groupby clause for its partitioned data and writes to folders partitioned by department_id

department_id=101
department_id=102
.
.
department_id=108   each folder may have multiple files 
if 1 core has multiple department data then that core would write its data to multiple folders
'''
emp_df.write.format("csv").option("header", True).partitionBy("department_id").save("test_partitioned_data.csv")

In [None]:
'''
write data has 4 modes
append : add files to location
overwrite : overwrite files even if data is present at same location
ignore : ignore if data is available at the same location
error : raise an error if data is present at the destination location
'''

In [None]:
'''
How Spark executes a job in cluster 
2 types of deployment :
1. client mode : standalone mode (spark cluster mode), yarn mode, mesos, kubernetes
2. cluster mode : where driver is part of cluster 


cluster = 1 driver (spark session) + 1 resource manager (cluster mannager) + 2 nodes

step 1 : driver will share the request to resource manager to create resources 4 executors (JVM), 2 cores each = 8 cores in total
step 2 : resource manager creates resources
step 3 : resource manager will share req info
step 4 : driver will copy python code to all the executors 
step 5 : driver will assign task to each core ie 8 task, 1/core
step 6 : once executors completes their task they share the result to driver
step 7 : driver will inform resource manager to terminate the resources
step 8 : resource manager will destroy the executors which it has created
'''


In [None]:
'''
UDF :

driver +  2 nodes (2 jvm each)

driver : copies python udf code to each node 

Node : each node creates separate python process which is outside of JVM
JVM does not have control over this python process
JVM does data serialization and submits data to python process
Python processes data row by row and submits result to JVM 
JVM does deserialization and shares result with driver (master)

Disadvantage of UDF : 
1. serialization - deserialization
2. python process 
3. data row by row

Solution : 
1. Use Higher Order Functions
2. Use Java or Scala UDF which can be run inside JVM and can be called from python itself
'''

In [17]:
def bonus(salary):
    return int(salary) * 0.1


In [19]:
from pyspark.sql.functions import udf

bonus_udf = udf(bonus)

In [16]:
emp_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- department_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- joining_date: string (nullable = true)



In [20]:
emp_df.withColumn("bonus", bonus_udf("salary")).show() #creates python process on worker node and process data row by row

+---+-------------+-------------+---+------+------+------------+------+
| id|department_id|         name|age|gender|salary|joining_date| bonus|
+---+-------------+-------------+---+------+------+------------+------+
|001|          101|     John Doe| 30|  Male| 50000|  2015-01-01|5000.0|
|002|          101|   Jane Smith| 25|Female| 45000|  2016-02-15|4500.0|
|003|          102|    Bob Brown| 35|  Male| 55000|  2014-05-01|5500.0|
|004|          102|    Alice Lee| 28|Female| 48000|  2017-09-30|4800.0|
|005|          103|    Jack Chan| 40|  Male| 60000|  2013-04-01|6000.0|
|006|          103|    Jill Wong| 32|Female| 52000|  2018-07-01|5200.0|
|007|          101|James Johnson| 42|  Male| 70000|  2012-03-15|7000.0|
|008|          102|     Kate Kim| 29|Female| 51000|  2019-10-01|5100.0|
|009|          103|      Tom Tan| 33|  Male| 58000|  2016-06-01|5800.0|
|010|          104|     Lisa Lee| 27|Female| 47000|  2018-08-01|4700.0|
|011|          104|   David Park| 38|  Male| 65000|  2015-11-01|

In [None]:
emp_df.withColumn("bonus", bonus_udf("salary")).show()

In [23]:
spark.udf.register("bonus_sql_udf", bonus, "double")

25/06/03 15:29:13 WARN SimpleFunctionRegistry: The function bonus_sql_udf replaced a previously registered function.


<function __main__.bonus(salary)>

In [25]:
from pyspark.sql.functions import expr
emp_df.withColumn("bonus", expr("bonus_sql_udf(salary)")).show() #spark sql udf

+---+-------------+-------------+---+------+------+------------+------+
| id|department_id|         name|age|gender|salary|joining_date| bonus|
+---+-------------+-------------+---+------+------+------------+------+
|001|          101|     John Doe| 30|  Male| 50000|  2015-01-01|5000.0|
|002|          101|   Jane Smith| 25|Female| 45000|  2016-02-15|4500.0|
|003|          102|    Bob Brown| 35|  Male| 55000|  2014-05-01|5500.0|
|004|          102|    Alice Lee| 28|Female| 48000|  2017-09-30|4800.0|
|005|          103|    Jack Chan| 40|  Male| 60000|  2013-04-01|6000.0|
|006|          103|    Jill Wong| 32|Female| 52000|  2018-07-01|5200.0|
|007|          101|James Johnson| 42|  Male| 70000|  2012-03-15|7000.0|
|008|          102|     Kate Kim| 29|Female| 51000|  2019-10-01|5100.0|
|009|          103|      Tom Tan| 33|  Male| 58000|  2016-06-01|5800.0|
|010|          104|     Lisa Lee| 27|Female| 47000|  2018-08-01|4700.0|
|011|          104|   David Park| 38|  Male| 65000|  2015-11-01|

In [26]:
emp_df.withColumn("bonus", expr("salary * 0.1")).show() #Soln1 : higher order function

+---+-------------+-------------+---+------+------+------------+------+
| id|department_id|         name|age|gender|salary|joining_date| bonus|
+---+-------------+-------------+---+------+------+------------+------+
|001|          101|     John Doe| 30|  Male| 50000|  2015-01-01|5000.0|
|002|          101|   Jane Smith| 25|Female| 45000|  2016-02-15|4500.0|
|003|          102|    Bob Brown| 35|  Male| 55000|  2014-05-01|5500.0|
|004|          102|    Alice Lee| 28|Female| 48000|  2017-09-30|4800.0|
|005|          103|    Jack Chan| 40|  Male| 60000|  2013-04-01|6000.0|
|006|          103|    Jill Wong| 32|Female| 52000|  2018-07-01|5200.0|
|007|          101|James Johnson| 42|  Male| 70000|  2012-03-15|7000.0|
|008|          102|     Kate Kim| 29|Female| 51000|  2019-10-01|5100.0|
|009|          103|      Tom Tan| 33|  Male| 58000|  2016-06-01|5800.0|
|010|          104|     Lisa Lee| 27|Female| 47000|  2018-08-01|4700.0|
|011|          104|   David Park| 38|  Male| 65000|  2015-11-01|

In [28]:
'''
Understand DAG, Explain Plans & Spark Shuffle with Tasks
'''

'\nUnderstand DAG, Explain Plans & Spark Shuffle with Tasks\n'

In [29]:
df_1 = spark.range(4, 200, 2)     #stage 1--> 8 
df_2 = spark.range(2, 200, 4)     #stage 1--> 8

In [31]:
df_1.rdd.getNumPartitions(), df_2.rdd.getNumPartitions()

(8, 8)

In [32]:
df_3 = df_1.repartition(5) #stage 1--> 5
df_4 = df_1.repartition(7) #stage 1--> 7

In [33]:
df_3.rdd.getNumPartitions(), df_4.rdd.getNumPartitions() 

(5, 7)

In [34]:
df_joined = df_3.join(df_4, on="id") # stage 1--> 200 

In [40]:
df_sum = df_joined.selectExpr("sum(id) as total_sum") #stage 1--> 1

In [41]:
df_sum.show() #total stages = 6, total_tasks = 8 + 8 + 5 + 7 + 200 + 1 = 229 tasks

+---------+
|total_sum|
+---------+
|     9898|
+---------+



In [42]:
df_sum.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[sum(id#271L)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=568]
      +- HashAggregate(keys=[], functions=[partial_sum(id#271L)])
         +- Project [id#271L]
            +- BroadcastHashJoin [id#271L], [id#279L], Inner, BuildRight, false
               :- Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [plan_id=555]
               :  +- Range (4, 200, step=2, splits=8)
               +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=563]
                  +- Exchange RoundRobinPartitioning(7), REPARTITION_BY_NUM, [plan_id=557]
                     +- Range (4, 200, step=2, splits=8)




In [None]:
'''
why shuffle writes data ?
because if it writes data to node then if any future task fails then previous stages need not to be executed again
it will read that data written (prev stage in which data was written to node) to node and resume the next stages 
'''

In [43]:
df_union = df_sum.union(df_4)

In [44]:
df_union.show()

+---------+
|total_sum|
+---------+
|     9898|
|       10|
|       12|
|       46|
|       34|
|       64|
|       90|
|       84|
|      120|
|      136|
|      164|
|      166|
|      182|
|      174|
|        6|
|       50|
|       36|
|       56|
|       98|
|       80|
+---------+
only showing top 20 rows



In [45]:
df_union.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Union
   :- HashAggregate(keys=[], functions=[sum(id#271L)])
   :  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=892]
   :     +- HashAggregate(keys=[], functions=[partial_sum(id#271L)])
   :        +- Project [id#271L]
   :           +- BroadcastHashJoin [id#271L], [id#279L], Inner, BuildRight, false
   :              :- Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [plan_id=876]
   :              :  +- Range (4, 200, step=2, splits=8)
   :              +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=887]
   :                 +- Exchange RoundRobinPartitioning(7), REPARTITION_BY_NUM, [plan_id=878]
   :                    +- Range (4, 200, step=2, splits=8)
   +- Exchange RoundRobinPartitioning(7), REPARTITION_BY_NUM, [plan_id=884]
      +- Range (4, 200, step=2, splits=8)




In [None]:
'''
Dataframes are abstractions of RDD (Resilient Distributed Data) sparkcore
1.SparkSQL and DataFrames
2.Pandas API on spark
3.Structured streaming
4.Machine Learning

RDD is recommended when you need to distribtue data physcially by code
'''

In [46]:
'''
Optimize Shuffle

Narrow transformations involves single partitions 
Wide transformations involves multiple partitions

Spark pipeline keep on adding multiple narrow partitions to single pipeline until it encounters wide transformation.
and creates again new pipeline with other narrow transformations.
When spark pipeline encounters any wide transformation it involves shuffle stage.
Hence shuffle stage breaks spark pipeline and creates separate stage.

First Pipeline
Spark pipeline includes all possible narrow tx and in the end writes shuffle files
Shuffles files - are serialized in tungsten binary format (unsafe row) can be read directly in memory heance improving read performance

Second Pipeline
will read shuffle files in first step and then will be processing remaining narrow transformations.

Shuffle files are written to disk and it involves io operation and are transferred to other executors which involves network operation.
Which inturn slow downs the process. Hence try to avoid shuffle operation wherever possible.




'''

In [50]:
#check default parallelism
spark.sparkContext.defaultParallelism

8

In [58]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Optimizing Shuffles")
    .master("spark://17e348267994:7077")
    .config("spark.cores.max", 16)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

25/06/04 07:12:50 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [59]:
spark.sparkContext.defaultParallelism

8

In [52]:
# Disable AQE and broadcast join
spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.adaptive.autoBroadCastJoinThreshold", -1)

In [53]:
# Read EMP CSV file with 10M records

_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.format("csv").schema(_schema).option("header", True).load("employee_records.csv")


In [55]:
# Find out avg salary as per dept
from pyspark.sql.functions import avg

emp_avg = emp.groupBy("department_id").agg(avg("salary").alias("avg_sal"))

In [56]:
# Write data for performance Benchmarking

emp_avg.write.format("noop").mode("overwrite").save()


                                                                                

In [57]:
# Check Spark Shuffle Partition setting

spark.conf.get("spark.sql.shuffle.partitions")

'200'

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", 16)


In [None]:
from pyspark.sql.functions import spark_partition_id

emp.withColumn("partition_id", spark_partition_id()).where("partition_id = 0").show()

In [None]:
'''
Set shuffle partitions to appropriate value
which can avoid overkilling of cpu
lower shuffle partitons can cause memory error
higher shuffle partition can increase network time and overkill
'''

In [None]:
# Read the partitioned data

emp_part = spark.read.format("csv").schema(_schema).option("header", True).load("/data/input/emp_partitioned.csv/")


In [None]:
emp_avg = emp_part.groupBy("department_id").agg(avg("salary").alias("avg_sal"))


In [None]:
emp_avg.write.format("noop").mode("overwrite").save() #simulate write operation without actually writing data with noop


In [None]:
'''
Reading partitioned data improves shuffle operation

Good shuffle - avoid unnecessary shuffle
repartition data - reduces shuffle
filter data - before aggregation
'''

In [None]:
'''
Reading 752 mb of csv file and caching it 

cache in spark uses memory and disk
cache in rdd uses only memory
'''

In [9]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Dynamic Allocation")
    .master("spark://197e20b418a6:7077")
    .config("spark.executor.cores", 2)
    .config("spark.executor.memory", "512M")
    .config("spark.dynamicAllocation.enabled", True)
    .config("spark.dynamicAllocation.minExecutors", 0)
    .config("spark.dynamicAllocation.maxExecutors", 5)
    .config("spark.dynamicAllocation.initialExecutors", 1)
    .config("spark.dynamicAllocation.shuffleTracking.enabled", True)
    .config("spark.dynamicAllocation.executorIdleTimeout", "60s")
    .config("spark.dynamicAllocation.cachedExecutorIdleTimeout", "60s")
    .getOrCreate()
)

spark

In [24]:
# Read Sales data

sales_schema = "transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"

df = spark.read.format("csv").schema(sales_schema).option("delimiter", ',').option("header", True).load("pyspark-zero-to-hero/datasets/new_sales-Copy1.csv")


In [25]:
df.show() #whenerver we do show on dataframe everytime it will hit the csv scan to get the data 

+--------------------+------+-----------+-----------+------+-------+
|       transacted_at|trx_id|retailer_id|description|amount|city_id|
+--------------------+------+-----------+-----------+------+-------+
|oid sha256:5ce2c1...|  NULL|       NULL|       NULL|  NULL|   NULL|
|      size 787780307|  NULL|       NULL|       NULL|  NULL|   NULL|
+--------------------+------+-----------+-----------+------+-------+



25/06/04 23:36:24 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 1, schema size: 6
CSV file: file:///home/prashant/pivot/personal/tech/pyspark_samples/pyspark-zero-to-hero/datasets/new_sales-Copy1.csv


In [22]:
df.where("amount > 50.0").show() #everytime it will call csv scan

+-------------+------+-----------+-----------+------+-------+
|transacted_at|trx_id|retailer_id|description|amount|city_id|
+-------------+------+-----------+-----------+------+-------+
+-------------+------+-----------+-----------+------+-------+



In [12]:
df_cached = df.cache() #check storage on spark UI it will store data to MEMORY AND DISK as data size is greater than executor memory 

In [14]:
df_cached.count() #count and write operation will load entire dataframe in memory and not the partial records 

25/06/04 13:08:37 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 1, schema size: 6
CSV file: file:///home/prashant/pivot/personal/tech/pyspark_samples/pyspark-zero-to-hero/datasets/new_sales.csv


+--------------------+------+-----------+-----------+------+-------+
|       transacted_at|trx_id|retailer_id|description|amount|city_id|
+--------------------+------+-----------+-----------+------+-------+
|oid sha256:5ce2c1...|  NULL|       NULL|       NULL|  NULL|   NULL|
|      size 787780307|  NULL|       NULL|       NULL|  NULL|   NULL|
+--------------------+------+-----------+-----------+------+-------+



In [None]:
df_cached.where("amount > 50.0") #it will not hit the csv scan, it will read data from memory
#spark maintains data lineage and identifies whether to hit the csv or not

In [None]:
df_cached.unpersist() #delete cached data
spark.catlog.clearCache() #remove all cached data

In [None]:
df_cached = df.where("amount > 100.0").cache() #partial cache

In [None]:
df_cached.where("amount > 50") #this will hit the csv scan as partial data is cached

In [26]:
'''
Different storage levels involved in caching
cahce: default is MEMORY AND DISK and data is deserialized

Persist:
MEMORY_ONLY : fits data in MEMORY and data is serialized
MEMORY_AND_DISK : fits data in MEMORY and disk and data is serialized
MEMORY_ONLY_SER : this option is for scala and java 
MEMORY_AND_DISK_SER : this option is for scala and java
DISK_ONLY : 
MEMORY_ONLY_2 : fits data in MEMORY and data is serialized and it is replicated twice in executor memory
MEMORY_AND_DISK_2 : 

'''
import pyspark
df.persist()


DataFrame[transacted_at: string, trx_id: string, retailer_id: string, description: string, amount: double, city_id: string]

In [27]:
'''
Distributed Shared variables
Broadcast variables
Accumulators

            Mutable        Readable in executors                Purpose
Broadcast	No	            Yes	                            Share large read-only data
Accumulator	Yes (add only)	No (write-only in tasks         Aggregate metrics from workers
                            and readable in driver)	


'''

# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Distributed Shared Variables")
    .master("spark://17e348267994:7077")
    .config("spark.cores.max", 16)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark


25/06/05 07:26:07 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [29]:
# Read EMP CSV data

_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.format("csv").schema(_schema).option("header", True).load("pyspark-zero-to-hero/datasets/employee_records.csv")

In [30]:
# Variable (Lookup)
dept_names = {1 : 'Department 1', 
              2 : 'Department 2', 
              3 : 'Department 3', 
              4 : 'Department 4',
              5 : 'Department 5', 
              6 : 'Department 6', 
              7 : 'Department 7', 
              8 : 'Department 8', 
              9 : 'Department 9', 
              10 : 'Department 10'}

In [31]:
# Broadcast the variable

broadcast_dept_names = spark.sparkContext.broadcast(dept_names) 
''' It will broadcast this variable to each of the executor and executor will cache this data 
Each executor will use this broadcast variable locally and no need to shuffle the data
'''

In [None]:
# Check the value of the variable
broadcast_dept_names.value

In [None]:
# Create UDF to return Department name

from pyspark.sql.functions import udf, col

@udf
def get_dept_names(dept_id):
    return broadcast_dept_names.value.get(dept_id)

In [None]:
emp_final = emp.withColumn("dept_name", get_dept_names(col("department_id")))


In [None]:
emp_final.show()


In [None]:
# Calculate total salary of Department 6

from pyspark.sql.functions import sum

emp.where("department_id = 6").groupBy("department_id").agg(sum("salary").cast("long")).show()

'''
this operation need to bring department 6 data to one executor which will then perform sum operation
which involves shuffle

To avoid this accumulators are used
accumulators will be processed row by row in each of the executors and once the each row of each executor is executed then will get 
the final value of the accumulator which provides sum

'''

In [None]:
# Accumulators

dept_sal = spark.sparkContext.accumulator(0)

In [None]:
# Use foreach

def calculate_salary(department_id, salary):
    if department_id == 6:
        dept_sal.add(salary)

emp.foreach(lambda row : calculate_salary(row.department_id, row.salary))


In [None]:
# View total value

dept_sal.value

In [None]:
# Stop Spark Session

spark.stop()

In [None]:
'''
Joins in Spark

1. Shuffle Hash Join - Big table and Small Table
2. Sort Merge Join - Big Table Big Table
3. Broadcast Hash Join - Small Table Small Table
'''oin

In [None]:
'''
Sales table : fact table,  City table : dimensions table join on city_id

Normal Join In Spark
1. Read Data in Partitions              2. Shuffle data         
exec  sales.city_id  city.city_id    
1      121                              111       1
2      334             12               334       34
3      15              34               5         5
4      2               5                22        2

3. Join
4. Write / Count


Shuffle Hash Join :
1. Shuffle data
2. Hash small table
3. Match hash table with big table
4. Join


Sort Merge Join :
1. Shuffle data
2. Sort based on joining key
3. Merge

Broadcast Hash Join
1. small dataset would be broadcasted to each executor
2. Joining based on hash key

small table default broadcast size : 10 MB can be increased to 8 gb

'''

In [1]:
'''
Bucketing Strategy before join

Use Murmur hash to create buckets of joining key for both the tables (sales and city table)


Points To Note : 
1.Bucketing can only work when we save data as table
2.Joining columns different than bucket column, same bucket size - shuffle on both table
3.Joining column same, one table in bucket, shuffle on non bucket table
4.Joining column same, different bucket size, shuffle on smaller bucket side
5.Joining column same, same bucket size, No shuffle (faster join)


1.So its very important to choose correct bucket column and bucket size
2.Decide effectively on number of buckets, as too any buckets with not enough data can lead to small file issue.
3.Datasets are small - you can prefer shuffle hash join
'''

'\nBucketing Strategy before join\n\nUse Murmur hash to create buckets of joining key for both the tables (sales and city table)\n\n\n\n\n'

In [2]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Optimizing Joins")
    .master("spark://17e348267994:7077")
    .config("spark.cores.max", 16)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark


25/06/07 10:31:55 WARN Utils: Your hostname, prashan resolves to a loopback address: 127.0.1.1; using 192.168.29.201 instead (on interface wlp0s20f3)
25/06/07 10:31:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/07 10:31:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/06/07 10:31:56 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 17e348267994:7077
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spar

Py4JJavaError: An error occurred while calling o31.conf.
: java.lang.IllegalStateException: LiveListenerBus is stopped.
	at org.apache.spark.scheduler.LiveListenerBus.addToQueue(LiveListenerBus.scala:92)
	at org.apache.spark.scheduler.LiveListenerBus.addToStatusQueue(LiveListenerBus.scala:75)
	at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:115)
	at org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:143)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:143)
	at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:142)
	at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:162)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:160)
	at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:157)
	at org.apache.spark.sql.SparkSession.conf$lzycompute(SparkSession.scala:185)
	at org.apache.spark.sql.SparkSession.conf(SparkSession.scala:185)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)


<pyspark.sql.session.SparkSession at 0x714938445180>

In [None]:
# Read Sales CSV Data - 752MB Size ~ 7.2M Records

_schema = "transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"

df = spark.read.format("csv").schema(_schema).option("header", True).load("data/input/new_sales.csv")

In [None]:
# Disable AQE and Broadcast join

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [None]:
# Read EMP CSV data

_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.format("csv").schema(_schema).option("header", True).load("/data/input/datasets/employee_records.csv")


In [None]:
# Read DEPT CSV data

_dept_schema = "department_id int, department_name string, description string, city string, state string, country string"

dept = spark.read.format("csv").schema(_dept_schema).option("header", True).load("/data/input/datasets/department_data.csv")


In [None]:
# Join Datasets
from pyspark.sql.functions import broadcast

df_joined = emp.join(broadcast(dept), on=emp.department_id==dept.department_id, how="left_outer")
df_joined.write.format("noop").mode("overwrite").save()
df_joined.explain()

In [None]:
# Read Sales data

sales_schema = "transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"

sales = spark.read.format("csv").schema(sales_schema).option("header", True).load("/data/input/datasets/new_sales.csv")
# Read City data

city_schema = "city_id string, city string, state string, state_abv string, country string"

city = spark.read.format("csv").schema(city_schema).option("header", True).load("/data/input/datasets/cities.csv")
# Join Data

df_sales_joined = sales.join(city, on=sales.city_id==city.city_id, how="left_outer")
df_sales_joined.write.format("noop").mode("overwrite").save()


In [None]:
# Explain Plan
#Write Sales and City data in Buckets
# Write Sales data in Buckets

sales.write.format("csv").mode("overwrite").bucketBy(4, "city_id").option("header", True).option("path", "/data/input/datasets/sales_bucket.csv").saveAsTable("sales_bucket")
# Write City data in Buckets

city.write.format("csv").mode("overwrite").bucketBy(4, "city_id").option("header", True).option("path", "/data/input/datasets/city_bucket.csv").saveAsTable("city_bucket")
# Check tables

spark.sql("show tables in default").show()

In [None]:
# Read Sales table

sales_bucket = spark.read.table("sales_bucket")
# Read City table

city_bucket = spark.read.table("city_bucket")
# Join datasets

df_joined_bucket = sales_bucket.join(city_bucket, on=sales_bucket.city_id==city_bucket.city_id, how="left_outer")
# Write dataset

df_joined_bucket.write.format("noop").mode("overwrite").save()
df_joined_bucket.explain()

In [None]:
'''
Static vs Dynamic Allocation in Spark

Static Allocation :  Single app is running on cluster

    Cluster - 2 worker nodes --> w1 and w2
    w1 = 8 core 8 gb
    w2 = 8 core 8 gb

    total = 16 core 16 gb
    
    user1 submits app1 on cluster
    app1 requirement - 6 core 6gb on each node
    total requirement - 12 core 12 gb

    In case of static allocation, app1 is running on cluster and eventhough some resources ( 6 cores and 6 gb) 
    are not in use or app is not using any resource it still holds those resources

Dynamic Allocation : multiple user / multiple apps running on same cluster
    Whenever any resource is free app running on cluster uses its resources by scaling up 
    and once execution is completed it scales down or releases resources


Dynamic vs Databricks Scale Up Scale Down

Dynamic Allocation : executor scale up scale down happens in fixed cluster
Databricks : worker node gets added to scale up and removed for scale down 
'''




In [None]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Dynamic Allocation")
    .master("spark://197e20b418a6:7077")
    .config("spark.executor.cores", 2)
    .config("spark.executor.memory", "512M")
    .config("spark.dynamicAllocation.enabled", True)
    .config("spark.dynamicAllocation.minExecutors", 0)
    .config("spark.dynamicAllocation.maxExecutors", 5)
    .config("spark.dynamicAllocation.initialExecutors", 1)
    .config("spark.dynamicAllocation.shuffleTracking.enabled", True)
    .config("spark.dynamicAllocation.executorIdleTimeout", "60s")
    .config("spark.dynamicAllocation.cachedExecutorIdleTimeout", "60s")
    .getOrCreate()
)

spark

# Read Sales data

sales_schema = "transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"

sales = spark.read.format("csv").schema(sales_schema).option("header", True).load("/data/input/new_sales.csv")
# Read City data

city_schema = "city_id string, city string, state string, state_abv string, country string"

city = spark.read.format("csv").schema(city_schema).option("header", True).load("/data/input/cities.csv")
# Join Data

df_sales_joined = sales.join(city, on=sales.city_id==city.city_id, how="left_outer")

df_sales_joined.write.format("noop").mode("overwrite").save()
# Difference between Scale UP in Databricks and Dynamic Allocation


In [None]:
'''
Data Skewness 

Data skewness causes spill memory and spill disk issue
spill memory --> deserialized data --> 128 mb
spill disk --> serialized data --> 70 mb

Use salting technique to handle data skewness before joining data

emp.dept_id                            dept.dept_id
1     0 (randomly assigned)                1     0
1     1                                    2     0 
1     0                                    3     0  
2     1                                    1     1   
3     0                                    2     1 
3     1                                    3     1 
3     0
3     1
3
3
3


'''


In [None]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Optimizing Skewness and Spillage")
    .master("spark://197e20b418a6:7077")
    .config("spark.cores.max", 8)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

In [None]:
# Disable AQE and Broadcast join

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# Read Employee data
_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.format("csv").schema(_schema).option("header", True).load("/data/input/employee_records_skewed.csv")
# Read DEPT CSV data
_dept_schema = "department_id int, department_name string, description string, city string, state string, country string"

dept = spark.read.format("csv").schema(_dept_schema).option("header", True).load("/data/input/department_data.csv")
# Join Datasets

df_joined = emp.join(dept, on=emp.department_id==dept.department_id, how="left_outer")
df_joined.write.format("noop").mode("overwrite").save()
#Explain Plan

df_joined.explain()
# Check the partition details to understand distribution
from pyspark.sql.functions import spark_partition_id, count, lit

part_df = df_joined.withColumn("partition_num", spark_partition_id()).groupBy("partition_num").agg(count(lit(1)).alias("count"))

part_df.show()

In [None]:
# Verify Employee data based on department_id
from pyspark.sql.functions import count, lit, desc, col

emp.groupBy("department_id").agg(count(lit(1))).show()

In [None]:
# Set shuffle partitions to a lesser number - 16

spark.conf.set("spark.sql.shuffle.partitions", 32)
# Let prepare the salt
import random
from pyspark.sql.functions import udf

# UDF to return a random number every time and add to Employee as salt
@udf
def salt_udf():
    return random.randint(0, 32)

# Salt Data Frame to add to department
salt_df = spark.range(0, 32)
salt_df.show()

In [None]:
# Salted Employee
from pyspark.sql.functions import lit, concat

salted_emp = emp.withColumn("salted_dept_id", concat("department_id", lit("_"), salt_udf()))

salted_emp.show()    

In [None]:
# Salted Department

salted_dept = dept.join(salt_df, how="cross").withColumn("salted_dept_id", concat("department_id", lit("_"), "id"))

salted_dept.where("department_id = 9").show()

In [None]:
# Lets make the salted join now
salted_joined_df = salted_emp.join(salted_dept, on=salted_emp.salted_dept_id==salted_dept.salted_dept_id, how="left_outer")
 
salted_joined_df.write.format("noop").mode("overwrite").save()
# Check the partition details to understand distribution
from pyspark.sql.functions import spark_partition_id, count

part_df = salted_joined_df.withColumn("partition_num", spark_partition_id()).groupBy("partition_num").agg(count(lit(1)).alias("count"))

part_df.show()

In [None]:
'''
AQE 
'''

In [None]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("AQE in Spark")
    .master("spark://197e20b418a6:7077")
    .config("spark.cores.max", 8)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

In [None]:
# Disable AQE and Broadcast join

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# Read Employee data
_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.format("csv").schema(_schema).option("header", True).load("/data/input/employee_records_skewed.csv")
# Read DEPT CSV data
_dept_schema = "department_id int, department_name string, description string, city string, state string, country string"

dept = spark.read.format("csv").schema(_dept_schema).option("header", True).load("/data/input/department_data.csv")
# Join Datasets

df_joined = emp.join(dept, on=emp.department_id==dept.department_id, how="left_outer")
df_joined.write.format("noop").mode("overwrite").save()
#Explain Plan

df_joined.explain()

In [None]:
# Coalescing post-shuffle partitions - remove un-necessary shuffle partitions
# Skewed join optimization (balance partitions size) - join smaller partitions and split bigger partition

spark.conf.set("spark.sql.adaptive.enabled", True)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True)
# Fix partition sizes to avoid Skew

spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "8MB") #Default value: 64MB
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "10MB") #Default value: 256MB
# Converting sort-merge join to broadcast join

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
# Join Datasets - without specifying specific broadcast table

df_joined = emp.join(dept, on=emp.department_id==dept.department_id, how="left_outer")
df_joined.write.format("noop").mode("overwrite").save()

In [None]:
'''
Spark SQL
'''

In [None]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Spark SQL")
    .master("local[*]")
    .enableHiveSupport()     #to store metadata to metastore permanent storage 
    .config("spark.sql.warehouse.dir", "/data/output/spark-warehouse")
    .getOrCreate()
)

spark


In [None]:
# Read Employee data
_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.format("csv").schema(_schema).option("header", True).load("/data/input/employee_records_skewed.csv")
# Read DEPT CSV data
_dept_schema = "department_id int, department_name string, description string, city string, state string, country string"

dept = spark.read.format("csv").schema(_dept_schema).option("header", True).load("/data/input/department_data.csv")
# Spark Catalog (Metadata) - in-memory/hive

spark.conf.get("spark.sql.catalogImplementation")


In [None]:
# Show databases
db = spark.sql("show databases")
db.show()


spark.sql("show tables in default").show()

# Register dataframes are temp views

emp.createOrReplaceTempView("emp_view")

dept.createOrReplaceTempView("dept_view")
# Show tables/view in catalog
# View data from table

emp_filtered = spark.sql("""
    select * from emp_view
    where department_id = 1
""")
emp_filtered.show()

In [None]:
# Create a new column dob_year and register as temp view

emp_temp = spark.sql("""
    select e.*, date_format(dob, 'yyyy') as dob_year from emp_view e
""")
emp_temp.createOrReplaceTempView("emp_temp_view")
spark.sql("select * from emp_temp_view").show()

In [None]:
# Join emp and dept - HINTs

emp_final = spark.sql("""
    select /*+ BROADCAST(d) */
    e.* , d.department_name
    from emp_view e left outer join dept_view d
    on e.department_id = d.department_id
""")
# Show emp data

emp_final.show()

In [None]:
# Write the data as Table

emp_final.write.format("parquet").saveAsTable("emp_final")
# Read the data from Table

emp_new = spark.sql("select * from emp_final")
emp_new.show()

In [None]:
# Persist metadata
# Show details of metadata

spark.sql("describe extended emp_final").show()

In [None]:
'''
Read optimizations

1. partitioning : 
    Hive-style partitioning groups similar data in the same directory in storage. 
    Can be used to partition countries, department etc
    


2. Z ordering :
Z Ordering groups similar data in the same files without creating directories.
Z Ordering your data reorganizes the data in storage and allows certain queries to read less data, 
so they run faster. When your data is appropriately ordered, more files can be skipped.

Z Order is particularly important for the ordering of multiple columns. 
If you only need to order by a single column, then simple sorting suffices. 
If there are multiple columns, but we always/only query a common prefix of those columns, then hierarchical sorting suffices. 
Z Ordering is good when querying on one or multiple columns.

3. partitioning + z ordering : 
You can partition a Delta table in storage and Z Order the data within a given partition.

For example, you can partition by ingestion_date and Z Order by user_id. 
This design would be a great way to run queries on user activity over time.\


Delete Optimizations

Deletion vectors:
Deletion vectors are a storage optimization feature that can be enabled on Delta Lake tables. 
By default, when a single row in a data file is deleted, the entire Parquet file containing the record must be rewritten.
With deletion vectors enabled for the table, some Delta operations use deletion vectors to mark existing rows
as removed without rewriting the Parquet file. Subsequent reads on the table resolve current table state by applying
the deletions noted by deletion vectors to the most recent table version.

Deletion vectors indicate changes to rows as soft-deletes that logically modify existing Parquet data files in the Delta Lake tables.
These changes are applied physically when data files are rewritten, as triggered by one of the following events:

A DML command with deletion vectors disabled (by a command flag or a table property) is run on the table.
An OPTIMIZE command is run on the table.
REORG TABLE ... APPLY (PURGE) is run against the table.




Liquid clustering for Delta tables

Liquid clustering improves the existing partitioning and ZORDER techniques by simplifying data layout decisions in order to optimize
query performance. Liquid clustering provides flexibility to redefine clustering columns without rewriting existing data, 
allowing data layout to evolve alongside analytic needs over time.


What is liquid clustering used for?
The following are examples of scenarios that benefit from clustering:

Tables often filtered by high cardinality columns.
Tables with significant skew in data distribution.
Tables that grow quickly and require maintenance and tuning effort.
Tables with access patterns that change over time.
Tables where a typical partition column could leave the table with too many or too few partitions.


'''

In [None]:
'''
Spark Memory

1. Spark Memory Distribution
2. Storage and Execution Memory
3. Why spark runs into OOM errors, even if it can spill data ?
4. Understand different reasons for OOM errors.
5. See one practical example of OOM error.


Spark Memory Management :

Driver --> 2 executors --> 512 MB each --> 4 cores

2 worker node 
worker node 1 ---> jvm --> 1 executor --> 2 cores --> 512 mb
worker node 2 ---> jvm --> 1 executor --> 2 cores --> 512 mb

JVM Memory (JVM heap)

On-Heap (default)            Off-Heap(disabled)
512mb                           OS
managed by jvm

512 mb = 89% (available for use 455.69mb) + 11% (GC)

455.69 mb = 300 mb (reserved memory for spark internals) + 155.69 mb ( for use) 

Min Memory to set should be 
1.5 * 300 mb (reserved memory)
400 mb (min memory to set)

155.69 mb = 60% (unified spark memory)+ 40% (user memory)  (spark.memory.fraction = 0.6)
                93.4mb         +        62.28mb (udf)


93.4mb = storage (cache) + execution (hash, agg, shuffle)
                                priority

1. storage memory can borrow space from execution memory only if execution memory is free
2. Execution memory can borrow space from storage memory if its empty 
and has not reached its storage fraction limit (immune to eviction) spark.memory.storageFraction = 0.5
3. Execution memory is used by storage memory and execution needs more memory
it can forcefully evict the memory occupied by storage memory at execution side.
4. storage needs more memory, it cannot forcefully evict the excess block occupied by
Execution memory. It has to wait for execution.


OOM errors 
1. storage memory is full and execution mem is also full 
and in conf memory mode is MEMORY_ONLY not MEMORY_AND_DISK is set 
in this scenario we get OOM error

memory --> java objects (deserialized) disk --> byte stream (serialized) --> resource and time

2. execution memory full
assume 
Execution memory = 40 mb and 2 cores
20mb /core

first core has data of 25mb then it wont fit in execution memory spillage to disk
other core is having 5 mb record this is known as data skewness

lets say there are 3 records after deserialization size is 25 mb each
then one record itself wont fit in memory
as it cannot split a record

3. wide shuffle scenario 

4. broadcast variable

5. data explosion - cross join, explode, compressed data when u uncompress it

6. compress

7. GC is taking more time to free up in that case we get OOM

Off Heap Memory

need to enable to use but GC need to handle explicitly

'''         