In [1]:
# understand cluster

In [2]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Cluster Execution")
    .master("local[4]")
    .config("spark.executor.instances", 4)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/07 22:16:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
# Create a sample data frame

df = spark.range(10)

In [7]:
# Write the data of the data frame

df.write.format("csv").option("header", True).save("data/output/15/3/range.csv")

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/Users/niyantmehta/Spiced/pyspark project venv/myvenv/data/output/15/3/range.csv already exists. Set mode as "overwrite" to overwrite the existing path.

In [8]:
# Stop Spark Settion

spark.stop()

In [9]:
# User_Defined_Functions

In [10]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("User Defined Functions")
    .master("local[*]")
    .config("spark.executor.cores", 2)
    .config("spark.cores.max", 6)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

In [11]:
# Read employee data

emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"

emp = spark.read.format("csv").option("header", True).schema(emp_schema).load("data/input/emp.csv")

emp.rdd.getNumPartitions()

1

In [12]:
# Create new column as bonus without UDF
from pyspark.sql.functions import expr

emp.withColumn("bonus", expr("salary * 0.1")).show()

+-----------+-------------+-------------+---+------+------+----------+------+
|employee_id|department_id|         name|age|gender|salary| hire_date| bonus|
+-----------+-------------+-------------+---+------+------+----------+------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|5000.0|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|4500.0|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|5500.0|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|4800.0|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|6000.0|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|5200.0|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|7000.0|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|5100.0|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|5800.0|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-

In [13]:
# Create a function to generate 10% of Salary as Bonus

def bonus(salary):
    return int(salary) * 0.1

In [14]:
# Register as UDF
from pyspark.sql.functions import udf

bonus_udf = udf(bonus)

spark.udf.register("bonus_sql_udf", bonus, "double")

<function __main__.bonus(salary)>

In [15]:
# Create new column as bonus using UDF
from pyspark.sql.functions import expr

emp.withColumn("bonus", expr("bonus_sql_udf(salary)")).show()

+-----------+-------------+-------------+---+------+------+----------+------+
|employee_id|department_id|         name|age|gender|salary| hire_date| bonus|
+-----------+-------------+-------------+---+------+------+----------+------+
|        001|          101|     John Doe| 30|  Male| 50000|2015-01-01|5000.0|
|        002|          101|   Jane Smith| 25|Female| 45000|2016-02-15|4500.0|
|        003|          102|    Bob Brown| 35|  Male| 55000|2014-05-01|5500.0|
|        004|          102|    Alice Lee| 28|Female| 48000|2017-09-30|4800.0|
|        005|          103|    Jack Chan| 40|  Male| 60000|2013-04-01|6000.0|
|        006|          103|    Jill Wong| 32|Female| 52000|2018-07-01|5200.0|
|        007|          101|James Johnson| 42|  Male| 70000|2012-03-15|7000.0|
|        008|          102|     Kate Kim| 29|Female| 51000|2019-10-01|5100.0|
|        009|          103|      Tom Tan| 33|  Male| 58000|2016-06-01|5800.0|
|        010|          104|     Lisa Lee| 27|Female| 47000|2018-

In [16]:
spark.stop()

In [17]:
# Understand dag plan

In [18]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Understand Plans and DAG")
    .master("local[*]")
    .getOrCreate()
)

In [19]:
spark

In [20]:
# Disable AQE and Broadcast join

spark.conf.set("spark.sql.adaptive.enabled", False)

In [21]:
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)

In [22]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [23]:
# Check default Parallism

spark.sparkContext.defaultParallelism

8

In [24]:
# Create dataframes

df_new1 = spark.range(4, 200, 2)
df_new2 = spark.range(2, 200, 4)

In [25]:
df_new2.rdd.getNumPartitions()

8

In [26]:
# Re-partition data

df_new3 = df_new1.repartition(5)
df_new4 = df_new2.repartition(7)

In [27]:
df_new4.rdd.getNumPartitions()

7

In [28]:
# Join the dataframes

df_joined = df_new3.join(df_new4, on="id")

In [29]:
# Get the sum of ids

df_sum = df_joined.selectExpr("sum(id) as total_sum")

In [30]:
# View data
df_sum.show()

+---------+
|total_sum|
+---------+
|     4998|
+---------+



                                                                                

In [31]:
# Explain plan

df_sum.explain()

== Physical Plan ==
*(6) HashAggregate(keys=[], functions=[sum(id#129L)])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=250]
   +- *(5) HashAggregate(keys=[], functions=[partial_sum(id#129L)])
      +- *(5) Project [id#129L]
         +- *(5) SortMergeJoin [id#129L], [id#131L], Inner
            :- *(2) Sort [id#129L ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(id#129L, 200), ENSURE_REQUIREMENTS, [plan_id=234]
            :     +- Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [plan_id=233]
            :        +- *(1) Range (4, 200, step=2, splits=8)
            +- *(4) Sort [id#131L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(id#131L, 200), ENSURE_REQUIREMENTS, [plan_id=241]
                  +- Exchange RoundRobinPartitioning(7), REPARTITION_BY_NUM, [plan_id=240]
                     +- *(3) Range (2, 200, step=4, splits=8)




In [32]:
# Union the data again to see the skipped stages

df_union = df_sum.union(df_new4)

In [33]:
df_union.show()

+---------+
|total_sum|
+---------+
|     4998|
|        2|
|       46|
|       58|
|       90|
|      102|
|      138|
|      158|
|      194|
|       18|
|       38|
|       74|
|      166|
|      174|
|       30|
|       66|
|       98|
|      122|
|      146|
|      162|
+---------+
only showing top 20 rows



In [34]:
# Explain plan

df_union.explain()

== Physical Plan ==
Union
:- *(6) HashAggregate(keys=[], functions=[sum(id#129L)])
:  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=488]
:     +- *(5) HashAggregate(keys=[], functions=[partial_sum(id#129L)])
:        +- *(5) Project [id#129L]
:           +- *(5) SortMergeJoin [id#129L], [id#131L], Inner
:              :- *(2) Sort [id#129L ASC NULLS FIRST], false, 0
:              :  +- Exchange hashpartitioning(id#129L, 200), ENSURE_REQUIREMENTS, [plan_id=472]
:              :     +- Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [plan_id=471]
:              :        +- *(1) Range (4, 200, step=2, splits=8)
:              +- *(4) Sort [id#131L ASC NULLS FIRST], false, 0
:                 +- Exchange hashpartitioning(id#131L, 200), ENSURE_REQUIREMENTS, [plan_id=479]
:                    +- Exchange RoundRobinPartitioning(7), REPARTITION_BY_NUM, [plan_id=478]
:                       +- *(3) Range (2, 200, step=4, splits=8)
+- ReusedExchange [id#150L], Exchange Roun

In [35]:
# DataFrame to RDD

df_new1.rdd

MapPartitionsRDD[70] at javaToPython at NativeMethodAccessorImpl.java:0

In [36]:
spark.stop()

In [37]:
# optimizing_shuffles

In [38]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Optimizing Shuffles")
    .master("local[*]")
    .config("spark.cores.max", 16)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

In [39]:
# Check Spark defaultParallelism

spark.sparkContext.defaultParallelism

8

In [40]:
# Disable AQE and Broadcast join

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [41]:
# Read EMP CSV file with 10M records

_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

# Read the EMP CSV file (10M records)
emp = (
    spark.read.format("csv")
    .schema(_schema)
    .option("header", True)
    .load("data/input/employee_records.csv")
)


In [42]:
# cd path_to_venv folder
# ls -ld data/output/ --->  drwxr-xr-x
# chmod -R 777 data/output/
# ls -ld data/output/ ---> drwxrwxrwx
# Partition the data by department_id and save it
emp.write.partitionBy("department_id").format("csv").option("header", True).save("data/output/emp_partitioned.csv")


                                                                                

In [43]:
# Find out avg salary as per dept
from pyspark.sql.functions import avg

emp_avg = emp.groupBy("department_id").agg(avg("salary").alias("avg_sal"))

In [44]:
# Write data for performance Benchmarking
# noop = "no operation"
# "noop" is used in performance testing to measure query execution time without actually writing to disk.
# Spark executes the query but skips the actual data write operation.
emp_avg.write.format("noop").mode("overwrite").save()

In [45]:
# Check Spark Shuffle Partition setting

spark.conf.get("spark.sql.shuffle.partitions")

'200'

In [46]:
spark.conf.set("spark.sql.shuffle.partitions", 200)

In [47]:
from pyspark.sql.functions import spark_partition_id

emp.withColumn("partition_id", spark_partition_id()).where("partition_id = 0").show()

+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+------------+
|first_name| last_name|           job_title|       dob|               email|               phone|  salary|department_id|partition_id|
+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+------------+
|   Richard|  Morrison|Public relations ...|1973-05-05|melissagarcia@exa...|       (699)525-4827|512653.0|            8|           0|
|     Bobby|  Mccarthy|   Barrister's clerk|1974-04-25|   llara@example.net|  (750)846-1602x7458|999836.0|            7|           0|
|    Dennis|    Norman|Land/geomatics su...|1990-06-24| jturner@example.net|    873.820.0518x825|131900.0|           10|           0|
|      John|    Monroe|        Retail buyer|1968-06-16|  erik33@example.net|    820-813-0557x624|485506.0|            1|           0|
|  Michelle|   Elliott|      Air cabin crew|1975-03-31|tiffany

In [48]:
# Read the partitioned data
emp_part = (
    spark.read.format("csv")
    .schema(_schema)
    .option("header", True)
    .load("data/output/emp_partitioned.csv")
)

In [49]:
emp_avg = emp_part.groupBy("department_id").agg(avg("salary").alias("avg_sal"))

In [50]:
emp_avg.write.format("noop").mode("overwrite").save()

In [51]:
spark.stop()

In [52]:
# spark_caching_techiniques

In [53]:
# Spark Session
# Mac : sysctl -n hw.ncpu : Check Available CPU Cores - System's Available Resources (CPU and Memory)
# .master("local[*]")  # Use all available cores / .master("local[8]")  # Use all 8 cores

from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Understand Caching")
    .master("local[4]") # Use only 2 cores, or adjust based on your available resources
    .config("spark.executor.memory", "512M")
    .config("spark.ui.enabled", "true") 
    .config("spark.ui.port", "4041") 
    .getOrCreate()
)

In [54]:
spark

In [55]:
# Read Sales CSV Data - 752MB Size ~ 7.2M Records

_schema = "transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"

df = spark.read.format("csv").schema(_schema).option("header", True).load("data/input/new_sales.csv")

In [56]:
df.where("amount > 300").show()

+--------------------+----------+-----------+--------------------+-------+----------+
|       transacted_at|    trx_id|retailer_id|         description| amount|   city_id|
+--------------------+----------+-----------+--------------------+-------+----------+
|2017-11-24T19:00:...|1734117022|  847200066|Wal-Mart  ppd id:...|1737.26|1646415505|
|2017-11-24T19:00:...|1734117030| 1953761884|Home Depot     pp...|  384.5| 287177635|
|2017-11-24T19:00:...|1734117153|  847200066|unkn        Kings...|2907.57|1483931123|
|2017-11-24T19:00:...|1734117241|  486576507|              iTunes|2912.67|1663872965|
|2017-11-24T19:00:...|2076947146|  511877722|unkn     ccd id: ...|1915.35|1698762556|
|2017-11-24T19:00:...|2076947113| 1996661856|AutoZone  arc id:...| 1523.6|1759612211|
|2017-11-24T19:00:...|2076946994| 1898522855|Target    ppd id:...|2589.93|2074005445|
|2017-11-24T19:00:...|2076946121|  562903918|unkn    ccd id: 5...| 315.86|1773943669|
|2017-11-24T19:00:...|2076946063| 1070485878|Amazon.co

In [57]:
# Cache DataFrame (cache or persist)

df_cache = df.where("amount > 100").cache()

In [58]:
df_cache.count()

                                                                                

2549058

In [59]:
df.where("amount > 50").show()

+--------------------+----------+-----------+--------------------+-------+----------+
|       transacted_at|    trx_id|retailer_id|         description| amount|   city_id|
+--------------------+----------+-----------+--------------------+-------+----------+
|2017-11-24T19:00:...|1995601912| 2077350195|Walgreen       11-25| 197.23| 216510442|
|2017-11-24T19:00:...|1734117022|  847200066|Wal-Mart  ppd id:...|1737.26|1646415505|
|2017-11-24T19:00:...|1734117030| 1953761884|Home Depot     pp...|  384.5| 287177635|
|2017-11-24T19:00:...|1734117089| 1898522855| Target        11-25|  66.33|1855530529|
|2017-11-24T19:00:...|1734117117|  997626433|Sears  ppd id: 85...| 298.87| 957346984|
|2017-11-24T19:00:...|1734117153|  847200066|unkn        Kings...|2907.57|1483931123|
|2017-11-24T19:00:...|1734117212| 1996661856|unkn    ppd id: 4...| 140.38| 336763936|
|2017-11-24T19:00:...|1734117241|  486576507|              iTunes|2912.67|1663872965|
|2017-11-24T19:00:...|2076947148|  847200066|Wal-Mart 

In [60]:
# MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK_2
import pyspark

In [61]:
df_persist = df.persist(pyspark.StorageLevel.MEMORY_ONLY_2)

In [62]:
df_persist.show()

25/04/07 22:17:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/04/07 22:17:51 WARN BlockManager: Block rdd_24_1 replicated to only 0 peer(s) instead of 1 peers
25/04/07 22:17:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/04/07 22:17:51 WARN BlockManager: Block rdd_24_2 replicated to only 0 peer(s) instead of 1 peers
25/04/07 22:17:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/04/07 22:17:51 WARN BlockManager: Block rdd_24_0 replicated to only 0 peer(s) instead of 1 peers
25/04/07 22:17:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/04/07 22:17:51 WARN BlockManager: Block rdd_24_3 replicated to only 0 peer(s) instead of 1 peers
25/04/07 22:17:54 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/04/07 22:17:54 WARN BlockManager: Block rdd_24_5 replicated to only 0 peer(s) instead of 1 peers

+--------------------+----------+-----------+--------------------+-------+----------+
|       transacted_at|    trx_id|retailer_id|         description| amount|   city_id|
+--------------------+----------+-----------+--------------------+-------+----------+
|2017-11-24T19:00:...|1995601912| 2077350195|Walgreen       11-25| 197.23| 216510442|
|2017-11-24T19:00:...|1734117021|  644879053|unkn    ppd id: 7...|   8.58| 930259917|
|2017-11-24T19:00:...|1734117022|  847200066|Wal-Mart  ppd id:...|1737.26|1646415505|
|2017-11-24T19:00:...|1734117030| 1953761884|Home Depot     pp...|  384.5| 287177635|
|2017-11-24T19:00:...|1734117089| 1898522855| Target        11-25|  66.33|1855530529|
|2017-11-24T19:00:...|1734117117|  997626433|Sears  ppd id: 85...| 298.87| 957346984|
|2017-11-24T19:00:...|1734117123| 1953761884|unkn   ppd id: 15...|  19.55|  45522086|
|2017-11-24T19:00:...|1734117152| 1429095612|Ikea     arc id: ...|   9.39|1268541279|
|2017-11-24T19:00:...|1734117153|  847200066|unkn     

25/04/07 22:17:55 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/04/07 22:17:55 WARN BlockManager: Block rdd_24_4 replicated to only 0 peer(s) instead of 1 peers
                                                                                

In [63]:
df_persist.write.format("noop").mode("overwrite").save()

                                                                                

In [64]:
# Remove Cache

spark.catalog.clearCache()

In [65]:
spark.stop()

In [66]:
# distributed_shared_variables

In [67]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Advanced Spark")
    .master("local[*]")
    .config("spark.cores.max", 16)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

In [68]:
# Read EMP CSV data

_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.format("csv").schema(_schema).option("header", True).load("data/input/employee_records.csv")


In [69]:
# Variable (Lookup)
dept_names = {1 : 'Department 1', 
              2 : 'Department 2', 
              3 : 'Department 3', 
              4 : 'Department 4',
              5 : 'Department 5', 
              6 : 'Department 6', 
              7 : 'Department 7', 
              8 : 'Department 8', 
              9 : 'Department 9', 
              10 : 'Department 10'}

In [70]:
# Broadcast the variable

broadcast_dept_names = spark.sparkContext.broadcast(dept_names)

In [71]:
# Check the value of the variable
broadcast_dept_names.value

{1: 'Department 1',
 2: 'Department 2',
 3: 'Department 3',
 4: 'Department 4',
 5: 'Department 5',
 6: 'Department 6',
 7: 'Department 7',
 8: 'Department 8',
 9: 'Department 9',
 10: 'Department 10'}

In [72]:
# Create UDF to return Department name

from pyspark.sql.functions import udf, col

@udf
def get_dept_names(dept_id):
    return broadcast_dept_names.value.get(dept_id)

In [73]:
emp_final = emp.withColumn("dept_name", get_dept_names(col("department_id")))

In [74]:
emp_final.show()

+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+-------------+
|first_name| last_name|           job_title|       dob|               email|               phone|  salary|department_id|    dept_name|
+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+-------------+
|   Richard|  Morrison|Public relations ...|1973-05-05|melissagarcia@exa...|       (699)525-4827|512653.0|            8| Department 8|
|     Bobby|  Mccarthy|   Barrister's clerk|1974-04-25|   llara@example.net|  (750)846-1602x7458|999836.0|            7| Department 7|
|    Dennis|    Norman|Land/geomatics su...|1990-06-24| jturner@example.net|    873.820.0518x825|131900.0|           10|Department 10|
|      John|    Monroe|        Retail buyer|1968-06-16|  erik33@example.net|    820-813-0557x624|485506.0|            1| Department 1|
|  Michelle|   Elliott|      Air cabin crew|1975-03-31|

In [75]:
# Calculate total salary of Department 6

from pyspark.sql.functions import sum

emp.where("department_id = 6").groupBy("department_id").agg(sum("salary").cast("long")).show()

+-------------+---------------------------+
|department_id|CAST(sum(salary) AS BIGINT)|
+-------------+---------------------------+
|            6|                50294510721|
+-------------+---------------------------+



                                                                                

In [None]:
# Accumulators

dept_sal = spark.sparkContext.accumulator(0)

In [None]:
# Use foreach

def calculate_salary(department_id, salary):
    if department_id == 6:
        dept_sal.add(salary)

emp.foreach(lambda row : calculate_salary(row.department_id, row.salary))

In [None]:
# View total value

dept_sal.value

In [None]:
spark.stop()

In [None]:
# optimizing_joins

In [139]:
# Run Spark with Hive Support
# If using saveAsTable(), ensure your SparkSession is configured to use Hive:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Sales Bucketing") \
    .master("local[*]") \
    .config("spark.cores.max", 16) \
    .config("spark.executor.cores", 4) \
    .config("spark.executor.memory", "512M") \
    .getOrCreate()

In [140]:
spark

In [141]:
# Disable AQE and Broadcast join

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [142]:
## Join Big and Small table - SortMerge vs BroadCast Join

In [143]:
# Read EMP CSV data

_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.format("csv").schema(_schema).option("header", True).load("data/input/employee_records.csv")
emp.show()

+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+
|first_name| last_name|           job_title|       dob|               email|               phone|  salary|department_id|
+----------+----------+--------------------+----------+--------------------+--------------------+--------+-------------+
|   Richard|  Morrison|Public relations ...|1973-05-05|melissagarcia@exa...|       (699)525-4827|512653.0|            8|
|     Bobby|  Mccarthy|   Barrister's clerk|1974-04-25|   llara@example.net|  (750)846-1602x7458|999836.0|            7|
|    Dennis|    Norman|Land/geomatics su...|1990-06-24| jturner@example.net|    873.820.0518x825|131900.0|           10|
|      John|    Monroe|        Retail buyer|1968-06-16|  erik33@example.net|    820-813-0557x624|485506.0|            1|
|  Michelle|   Elliott|      Air cabin crew|1975-03-31|tiffanyjohnston@e...|       (705)900-5337|604738.0|            8|
|    Ashley|   Montoya|        C

In [144]:
# Read DEPT CSV data

_dept_schema = "department_id int, department_name string, description string, city string, state string, country string"

dept = spark.read.format("csv").schema(_dept_schema).option("header", True).load("data/input/department_data.csv")
dept.show()

+-------------+--------------------+--------------------+--------------------+-----+-------------------+
|department_id|     department_name|         description|                city|state|            country|
+-------------+--------------------+--------------------+--------------------+-----+-------------------+
|            1|         Bryan-James|Optimized disinte...|        Melissaburgh|   FM|Trinidad and Tobago|
|            2|Smith, Craig and ...|Digitized empower...|          Morrisside|   DE|          Sri Lanka|
|            3|Pittman, Hess and...|Multi-channeled c...|         North David|   SC|       Turkmenistan|
|            4|Smith, Snyder and...|Reactive neutral ...|       Lake Jennifer|   TX|         Madagascar|
|            5|          Hardin Inc|Re-contextualized...|           Hayestown|   WA|               Fiji|
|            6|         Sanders LLC|Innovative multim...|         Phamchester|   TN|         Micronesia|
|            7|         Ward-Gordon|Progressive logis..

In [145]:
# Join Datasets
from pyspark.sql.functions import broadcast

df_joined = emp.join(broadcast(dept), on=emp.department_id==dept.department_id, how="left_outer")

In [146]:
df_joined.write.format("noop").mode("overwrite").save()

In [147]:
df_joined.explain()

== Physical Plan ==
*(2) BroadcastHashJoin [department_id#1875], [department_id#1926], LeftOuter, BuildRight, false
:- FileScan csv [first_name#1868,last_name#1869,job_title#1870,dob#1871,email#1872,phone#1873,salary#1874,department_id#1875] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/niyantmehta/Spiced/pyspark project venv/myvenv/data/input/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<first_name:string,last_name:string,job_title:string,dob:string,email:string,phone:string,s...
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=1666]
   +- *(1) Filter isnotnull(department_id#1926)
      +- FileScan csv [department_id#1926,department_name#1927,description#1928,city#1929,state#1930,country#1931] Batched: false, DataFilters: [isnotnull(department_id#1926)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/niyantmehta/Spiced/pyspark project venv/myve

In [148]:
df_joined.show(5)

+----------+---------+--------------------+----------+--------------------+------------------+--------+-------------+-------------+---------------+--------------------+--------------------+-----+-------------------+
|first_name|last_name|           job_title|       dob|               email|             phone|  salary|department_id|department_id|department_name|         description|                city|state|            country|
+----------+---------+--------------------+----------+--------------------+------------------+--------+-------------+-------------+---------------+--------------------+--------------------+-----+-------------------+
|   Richard| Morrison|Public relations ...|1973-05-05|melissagarcia@exa...|     (699)525-4827|512653.0|            8|            8|     Parker PLC|Assimilated multi...|         Barnettside|   AL|   Marshall Islands|
|     Bobby| Mccarthy|   Barrister's clerk|1974-04-25|   llara@example.net|(750)846-1602x7458|999836.0|            7|            7|    W

In [149]:
## Join Big and Big table - SortMerge without Buckets

In [150]:
# Read Sales data

sales_schema = "transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"

sales = spark.read.format("csv").schema(sales_schema).option("header", True).load("data/input/new_sales.csv")

In [151]:
# Read City data

city_schema = "city_id string, city string, state string, state_abv string, country string"

city = spark.read.format("csv").schema(city_schema).option("header", True).load("data/input/cities.csv")

In [152]:
# Join Data

df_sales_joined = sales.join(city, on=sales.city_id==city.city_id, how="left_outer")

In [153]:
df_sales_joined.write.format("noop").mode("overwrite").save()

                                                                                

In [154]:
# Explain Plan


In [155]:
## Write Sales and City data in Buckets

In [158]:
# Write Sales data in Buckets

sales.write.format("parquet").mode("overwrite").bucketBy(4, "city_id").option("header", True).option("path", "data/output/sales_bucket.parquet").saveAsTable("sales_bucket")


                                                                                

In [159]:
# Write City data in Buckets

city.write.format("parquet").mode("overwrite").bucketBy(4, "city_id").option("header", True).option("path", "./data/output/city_bucket.parquet").saveAsTable("city_bucket")

                                                                                

In [160]:
# Check tables

spark.sql("show tables in default").show()

+---------+------------+-----------+
|namespace|   tableName|isTemporary|
+---------+------------+-----------+
|  default| city_bucket|      false|
|  default|sales_bucket|      false|
+---------+------------+-----------+



In [161]:
## Join Sales and City data - SortMerge with Bucket

In [162]:
# Read Sales table

sales_bucket = spark.read.table("sales_bucket")

In [163]:
# Read City table

city_bucket = spark.read.table("city_bucket")

In [164]:
    # Join datasets

df_joined_bucket = sales_bucket.join(city_bucket, on=sales_bucket.city_id==city_bucket.city_id, how="left_outer")

In [165]:
# Write dataset

df_joined_bucket.write.format("noop").mode("overwrite").save()

                                                                                

In [166]:
df_joined_bucket.explain()

== Physical Plan ==
*(3) SortMergeJoin [city_id#2281], [city_id#2288], LeftOuter
:- *(1) Sort [city_id#2281 ASC NULLS FIRST], false, 0
:  +- *(1) ColumnarToRow
:     +- FileScan parquet spark_catalog.default.sales_bucket[transacted_at#2276,trx_id#2277,retailer_id#2278,description#2279,amount#2280,city_id#2281] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/niyantmehta/Spiced/pyspark%20project%20venv/myvenv/spark-w..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<transacted_at:string,trx_id:string,retailer_id:string,description:string,amount:double,cit..., SelectedBucketsCount: 4 out of 4
+- *(2) Sort [city_id#2288 ASC NULLS FIRST], false, 0
   +- *(2) Filter isnotnull(city_id#2288)
      +- *(2) ColumnarToRow
         +- FileScan parquet spark_catalog.default.city_bucket[city_id#2288,city#2289,state#2290,state_abv#2291,country#2292] Batched: true, Bucketed: true, DataFilters: [isnotnull(city_id#2288)], Fo

In [167]:
## View how tasks are reading Bucket data

In [168]:
# dynamic_allocation

In [169]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Dynamic Allocation")
    .master("local[*]")
    .config("spark.executor.cores", 2)
    .config("spark.executor.memory", "512M")
    .config("spark.dynamicAllocation.enabled", True)
    .config("spark.dynamicAllocation.minExecutors", 0)
    .config("spark.dynamicAllocation.maxExecutors", 5)
    .config("spark.dynamicAllocation.initialExecutors", 1)
    .config("spark.dynamicAllocation.shuffleTracking.enabled", True)
    .config("spark.dynamicAllocation.executorIdleTimeout", "60s")
    .config("spark.dynamicAllocation.cachedExecutorIdleTimeout", "60s")
    .getOrCreate()
)

spark

25/04/07 22:52:33 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [171]:
# Read Sales data

sales_schema = "transacted_at string, trx_id string, retailer_id string, description string, amount double, city_id string"

sales = spark.read.format("csv").schema(sales_schema).option("header", True).load("data/input/new_sales.csv")

In [173]:
# Read City data

city_schema = "city_id string, city string, state string, state_abv string, country string"

city = spark.read.format("csv").schema(city_schema).option("header", True).load("data/input/cities.csv")


In [174]:
# Join Data

df_sales_joined = sales.join(city, on=sales.city_id==city.city_id, how="left_outer")

In [175]:
df_sales_joined.write.format("noop").mode("overwrite").save()

                                                                                

In [176]:
## # Difference between Scale UP in Databricks and Dynamic Allocation

In [177]:
# 20. pending- skewness_and_spillage

In [178]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Optimizing Skewness and Spillage")
    .master("spark://197e20b418a6:7077")
    .config("spark.cores.max", 8)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

25/04/07 22:54:15 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [179]:
# Disable AQE and Broadcast join

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [180]:
# Read Employee data
_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.format("csv").schema(_schema).option("header", True).load("data/input/employee_records_skewed.csv")


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/Users/niyantmehta/Spiced/pyspark project venv/myvenv/data/input/employee_records_skewed.csv.

In [181]:
spark.stop()

In [182]:
# 21. pending - AQE in Spark

In [183]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("AQE in Spark")
    .master("local[*]")
    .config("spark.cores.max", 8)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

In [184]:
# Disable AQE and Broadcast join

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [185]:
# Read Employee data
_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"

emp = spark.read.format("csv").schema(_schema).option("header", True).load("data/input/employee_records_skewed.csv")

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/data/input/employee_records_skewed.csv.

In [None]:
# 22. pending - spark_sql

In [191]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Spark SQL")
    .master("local[*]")
    .enableHiveSupport()
    .config("spark.sql.warehouse.dir", "/data/output/spark-warehouse")
    .getOrCreate()
)

spark

In [189]:
# delta lake

In [192]:
# Spark Session
spark

In [193]:
# Default Catalog for Databricks
spark.conf.get("spark.sql.catalogImplementation")

'hive'

In [195]:
%sql
show databases

SyntaxError: invalid syntax (1399782929.py, line 2)

In [196]:
# Read Sales parquet data
df_sales = spark.read.parquet("data/input/sales_data.parquet")

In [197]:

# View data
#display(df_sales)
display(df_sales.limit(5))

DataFrame[transacted_at: timestamp, trx_id: int, retailer_id: int, description: string, amount: double, city_id: int]

In [199]:
# Read a particular version - pyspark api
df_sales_delta = spark.read.table("sales_delta@v1")



ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near '@'.(line 1, pos 11)

== SQL ==
sales_delta@v1
-----------^^^


25/04/08 00:14:59 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1009700 ms exceeds timeout 120000 ms
25/04/08 00:14:59 WARN SparkContext: Killing executors is not supported by current scheduler.
25/04/08 00:15:01 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$