In [1]:
import findspark
findspark.init()
findspark.find()

'C:\\Installation\\spark-3.3.2-bin-hadoop3'

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = (
            SparkSession
                .builder
                .appName("SparkJoinsApp")
                .master("local[4]")
    
                .config("spark.dynamicAllocation.enabled", "false")
                .config("spark.sql.adaptive.enabled", "false")
    
                .enableHiveSupport()
                
                .getOrCreate()
        )

sc = spark.sparkContext

spark

In [3]:
from IPython.display import *
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Change Auto Broadcast setting

In real-life scenario, only disable if there is a specific use case, otherwise keep it enabled

In [4]:
# Check default value

spark.conf.get( "spark.sql.autoBroadcastJoinThreshold" )

'10485760b'

In [5]:
# Disable auto broadcast by setting value to -1

spark.conf.set( "spark.sql.autoBroadcastJoinThreshold", "-1" )

### Read files

In [6]:
# Create schema for Yellow Taxi Data

yellowTaxiSchema = (
                        StructType
                        ([ 
                            StructField("VendorId"               , IntegerType()   , True),
                            StructField("PickupTime"             , TimestampType() , True),
                            StructField("DropTime"               , TimestampType() , True),                            
                            StructField("PassengerCount"         , DoubleType()    , True),
                            StructField("TripDistance"           , DoubleType()    , True),
                            StructField("RateCodeId"             , DoubleType()    , True),                            
                            StructField("StoreAndFwdFlag"        , StringType()    , True),
                            StructField("PickupLocationId"       , IntegerType()   , True),
                            StructField("DropLocationId"         , IntegerType()   , True),                            
                            StructField("PaymentType"            , IntegerType()   , True),                            
                            StructField("FareAmount"             , DoubleType()    , True),
                            StructField("Extra"                  , DoubleType()    , True),
                            StructField("MtaTax"                 , DoubleType()    , True),
                            StructField("TipAmount"              , DoubleType()    , True),
                            StructField("TollsAmount"            , DoubleType()    , True),
                            StructField("ImprovementSurcharge"   , DoubleType()    , True),
                            StructField("TotalAmount"            , DoubleType()    , True),
                            StructField("CongestionSurcharge"    , DoubleType()    , True),
                            StructField("AirportFee"             , DoubleType()    , True)
                        ])
                   )

yellowTaxiDF = (
                  spark
                    .read
                    .option("header", "true")    
                    .schema(yellowTaxiSchema)    
                    .csv("C:\SparkCourse\DataFiles\Raw\YellowTaxis_202210.csv")
               )

yellowTaxiDF.count()

3675412

In [7]:
taxiZonesSchema = "PickupLocationId INT, Borough STRING, Zone STRING, ServiceZone STRING"

taxiZonesDF = (
                  spark
                    .read                    
                    .schema(taxiZonesSchema)
                    .csv("C:\SparkCourse\DataFiles\Raw\TaxiZones.csv")
              )

taxiZonesDF.count()

265

### Join large and small datasets

Auto-broadcast is already disabled

In [8]:
joinedDF = (
                yellowTaxiDF
                    .join
                    (
                        taxiZonesDF,
                        
                        yellowTaxiDF.PickupLocationId == taxiZonesDF.PickupLocationId,
                        
                        "inner"
                    )
            )

joinedDF.show()

+--------+-------------------+-------------------+--------------+------------+----------+---------------+----------------+--------------+-----------+----------+-----+------+---------+-----------+--------------------+-----------+-------------------+----------+----------------+---------+---------------+-----------+
|VendorId|         PickupTime|           DropTime|PassengerCount|TripDistance|RateCodeId|StoreAndFwdFlag|PickupLocationId|DropLocationId|PaymentType|FareAmount|Extra|MtaTax|TipAmount|TollsAmount|ImprovementSurcharge|TotalAmount|CongestionSurcharge|AirportFee|PickupLocationId|  Borough|           Zone|ServiceZone|
+--------+-------------------+-------------------+--------------+------------+----------+---------------+----------------+--------------+-----------+----------+-----+------+---------+-----------+--------------------+-----------+-------------------+----------+----------------+---------+---------------+-----------+
|       1|2022-10-01 06:09:18|2022-10-01 06:23:51|     

### Option 1 - Broadcast Hash Join: Set join threshold

In [None]:
# Will not run. Will manually perform broadcast

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760b")


### Option 2 - Broadcast Hash Join: Manually broadcast small dataset

In case below, Taxi Zones is smaller dataset, so manually broadcast it. <br/>

Perform broadcast manually if "smaller" dataset size is bigger than auto threshold size.

In [9]:
# Manually broadcast Taxi Zones

joinedDF = (
                yellowTaxiDF
                    .join
                    (
                        broadcast(taxiZonesDF),
                        
                        yellowTaxiDF.PickupLocationId == taxiZonesDF.PickupLocationId,
                        "inner"
                    )
            )

joinedDF.show()

+--------+-------------------+-------------------+--------------+------------+----------+---------------+----------------+--------------+-----------+----------+-----+------+---------+-----------+--------------------+-----------+-------------------+----------+----------------+---------+--------------------+-----------+
|VendorId|         PickupTime|           DropTime|PassengerCount|TripDistance|RateCodeId|StoreAndFwdFlag|PickupLocationId|DropLocationId|PaymentType|FareAmount|Extra|MtaTax|TipAmount|TollsAmount|ImprovementSurcharge|TotalAmount|CongestionSurcharge|AirportFee|PickupLocationId|  Borough|                Zone|ServiceZone|
+--------+-------------------+-------------------+--------------+------------+----------+---------------+----------------+--------------+-----------+----------+-----+------+---------+-----------+--------------------+-----------+-------------------+----------+----------------+---------+--------------------+-----------+
|       1|2022-10-01 05:33:41|2022-10-01

### Optimize shuffle sort merge join performance with Bucketing

### Create two views on a large DataFrame

Note using same DataFrame for joining is for demo purpose only. <br/>
In real-life scenario, you would be (likely) joining two different datasets.

In [10]:

yellowTaxiDF.createOrReplaceTempView("YellowTaxis1_Unbucketed")

yellowTaxiDF.createOrReplaceTempView("YellowTaxis2_Unbucketed")

### Join two unbucketed datasets

This should perform Shuffle Sort Merge Join

In [11]:
spark.sql("""

SELECT *

FROM YellowTaxis1_Unbucketed b1

    JOIN YellowTaxis2_Unbucketed b2 ON b1.PickupLocationId = b2.DropLocationId

""").show()

+--------+-------------------+-------------------+--------------+------------+----------+---------------+----------------+--------------+-----------+----------+-----+------+---------+-----------+--------------------+-----------+-------------------+----------+--------+-------------------+-------------------+--------------+------------+----------+---------------+----------------+--------------+-----------+----------+-----+------+---------+-----------+--------------------+-----------+-------------------+----------+
|VendorId|         PickupTime|           DropTime|PassengerCount|TripDistance|RateCodeId|StoreAndFwdFlag|PickupLocationId|DropLocationId|PaymentType|FareAmount|Extra|MtaTax|TipAmount|TollsAmount|ImprovementSurcharge|TotalAmount|CongestionSurcharge|AirportFee|VendorId|         PickupTime|           DropTime|PassengerCount|TripDistance|RateCodeId|StoreAndFwdFlag|PickupLocationId|DropLocationId|PaymentType|FareAmount|Extra|MtaTax|TipAmount|TollsAmount|ImprovementSurcharge|TotalAmo

### Write datasets as tables with bucketing

1. Bucketing should be on columns that would be used in joins <br/>
2. Both datasets should be stored with same number of buckets

In [15]:
(
    yellowTaxiDF
            .write
    
            .bucketBy(4, "PickupLocationId")
    
            #.sortBy("PickupLocationId")      Optionally sort the data
            
            .option("header", "true")
            .option("dateFormat", "yyyy-MM-dd HH:mm:ss.S")    
            .mode("overwrite")    
            .format("csv")
    
            .saveAsTable("YellowTaxis_PickupBucket")
)

(
    yellowTaxiDF
            .write
    
            .bucketBy(4, "DropLocationId")
    
            #.sortBy("DropLocationId")       Optionally sort the data
    
            .option("header", "true")
            .option("dateFormat", "yyyy-MM-dd HH:mm:ss.S")    
            .mode("overwrite")    
            .format("csv")
    
            .saveAsTable("YellowTaxis_DropBucket")
)

### Join two bucketed tables

1. Bucketing is on columns being used in joins <br/>
2. Both tables have same number of buckets

In [16]:
spark.sql("""

SELECT *

FROM YellowTaxis_PickupBucket b1

    JOIN YellowTaxis_DropBucket b2 ON b1.PickupLocationId = b2.DropLocationId

""").show()

+--------+-------------------+-------------------+--------------+------------+----------+---------------+----------------+--------------+-----------+----------+-----+------+---------+-----------+--------------------+-----------+-------------------+----------+--------+-------------------+-------------------+--------------+------------+----------+---------------+----------------+--------------+-----------+----------+-----+------+---------+-----------+--------------------+-----------+-------------------+----------+
|VendorId|         PickupTime|           DropTime|PassengerCount|TripDistance|RateCodeId|StoreAndFwdFlag|PickupLocationId|DropLocationId|PaymentType|FareAmount|Extra|MtaTax|TipAmount|TollsAmount|ImprovementSurcharge|TotalAmount|CongestionSurcharge|AirportFee|VendorId|         PickupTime|           DropTime|PassengerCount|TripDistance|RateCodeId|StoreAndFwdFlag|PickupLocationId|DropLocationId|PaymentType|FareAmount|Extra|MtaTax|TipAmount|TollsAmount|ImprovementSurcharge|TotalAmo