<a href="https://colab.research.google.com/github/mascee/Spark_Exercises/blob/main/cache_broadcast_solution_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import os
# Find the latest version of spark 3.x  from https://downloads.apache.org/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.5.4'
spark_version = 'spark-3.5.4'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com] [Waiting for headers] [1 InRelease 3,626 B/3,626 B 100%] [Conn0% [Connecting to archive.ubuntu.com] [Waiting for headers] [Connected to r2u.stat.illinois.edu (192                                                                                                    Get:2 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
0% [Connecting to archive.ubuntu.com (91.189.91.81)] [2 InRelease 12.7 kB/129 kB 10%] [Waiting for h0% [Waiting for headers] [2 InRelease 15.6 kB/129 kB 12%] [Waiting for headers] [Waiting for headers                                                                                                    Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [61.9 kB]
0% [Waiting for headers] [2 InRelease 15.6 kB/129 kB 12%] [Waiting for headers] [Waiting for headers0% [3 Packages store

In [3]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder\
    .appName("SparkSQL")\
    .config("spark.sql.debug.maxToStringFields", 2000)\
    .config("spark.driver.memory", "2g")\
    .getOrCreate()

In [4]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
flights_url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/3/DelayedFlights.csv"
spark.sparkContext.addFile(flights_url)
flights_df = spark.read.csv(SparkFiles.get("DelayedFlights.csv"), sep=",", header=True)

# Show the delayed flight data.
flights_df.show()

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
| id|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|  0|2008|    1|         3|        4|   2003|      1955|   2211|      2225|       

In [5]:
# Read in the airport codes from an S3 Bucket
airportCodes_url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/3/airportCodes.csv"
spark.sparkContext.addFile(airportCodes_url)

# The airport codes will be a lookup DataFrame
airportCodes_lookup = spark.read.csv(SparkFiles.get("airportCodes.csv"), sep=',', header=True)

#  Show the lookup table data.
airportCodes_lookup.show()

+--------------+--------------------+-----------+
|          City|             country|airportCode|
+--------------+--------------------+-----------+
|       Aalborg|             Denmark|        AAL|
|      Aalesund|              Norway|        AES|
|        Aarhus|             Denmark|        AAR|
|Abbotsford, BC|              Canada|        YXX|
|Abbotsford, BC|              Canada|        YXX|
|      Aberdeen|            Scotland|        ABZ|
|  Aberdeen, SD|                 USA|        ABR|
|       Abidjan|         Ivory Coast|        ABJ|
|   Abilene, TX|                 USA|        ABI|
|     Abu Dhabi|United Arab Emirates|        AUH|
|         Abuja|             Nigeria|        ABV|
|      Acapulco|              Mexico|        ACA|
|         Accra|               Ghana|        ACC|
|         Adana|              Turkey|        ADA|
|   Addis Ababa|            Ethiopia|        ADD|
|Adelaide, S.A.|           Australia|        ADL|
|          Aden|               Yemen|        ADE|


In [6]:
# Recall that the default shuffle partitions is 200.
# We want to bring that down to a reasonable size for both our data and our Spark cluster
# A good rule of thumb is two times the number of cores.
spark.conf.set("spark.sql.shuffle.partitions", 8)

In [7]:
# Create temporary views for each of our dataframes
flights_df.createOrReplaceTempView('delayed')

airportCodes_lookup.createOrReplaceTempView('lookup')

In [8]:
# This first query joins our airport code lookup data to our delayed fligts table
# By default Spark does a broadcast join when the Join table is < 10MB.  This is configurable
# but since our table is VERY small, it will auto-broadcast.

start_time = time.time()

spark.sql("""
select a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
a.Dest,
c.City as Dest_City,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from delayed a
  inner join lookup b
    on a.Origin=b.airportCode
  inner join lookup c
    on a.Dest=c.airportCode
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|    Origin_City|Dest|      Dest_City|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------

In [10]:
# Here we have added the hint to Broadcast the lookup table.
start_time = time.time()

spark.sql("""
select /*+ BROADCAST(lookup) */
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
a.Dest,
c.City as Dep_City,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a
  inner join lookup b
    on a.Origin=b.airportCode
  inner join lookup c
    on a.Dest=c.airportCode
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|    Origin_City|Dest|       Dep_City|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------

In [11]:
# In this query we are trying to give the cluster some work to do.
# We are creating a a common table expression (CTE) (with allColumns) that joins the two tables together
# and then an aggregation by averaging the delays.

start_time = time.time()

spark.sql("""
with allColumns
(select /*+ BROADCAST(lookup) */
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
a.Dest,
c.City as Dep_City,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a
  inner join lookup b
    on a.Origin=b.airportCode
  inner join lookup c
    on a.Dest=c.airportCode
)
select Origin_City, avg(ArrDelay) avgDelay from allColumns group by 1
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+-----------------+------------------+
|      Origin_City|          avgDelay|
+-----------------+------------------+
|     Amarillo, TX|              63.3|
|    Allentown, PA| 50.94957983193277|
|    Asheville, NC|58.392405063291136|
|  Bloomington, IL| 46.89930555555556|
|    Anchorage, AK| 37.14423076923077|
|      Atlanta, GA| 37.90491635370434|
|       Bangor, ME| 50.27329192546584|
|      Augusta, GA| 55.65714285714286|
|      Bozeman, MT| 38.42857142857143|
|  Albuquerque, NM| 32.54347826086956|
|       Austin, TX|       37.19140625|
|  Baton Rouge, LA|          65.50625|
|    Baltimore, MD|  39.0767004341534|
|       Albany, GA| 50.25352112676056|
|        Aspen, CO|              85.0|
|       Albany, NY| 39.14365671641791|
|       Boston, MA| 47.71041369472183|
|        Akron, OH|55.928196147110334|
|Atlantic City, NJ| 64.64285714285714|
|   Birmingham, AL| 43.17042606516291|
+-----------------+------------------+
only showing top 20 rows

--- 7.455744743347168 seconds ---


In [12]:
# Next, we are use SparkSQL to cache our table
# Note: when we use SparkSQL to cache a table, the table is immediately cached (no lazy evaluation).
# When using PySpark it will not be cached until an action is run.
spark.sql("cache table delayed")

DataFrame[]

In [13]:
# This command checks that our table is cached
# It will return True if it is cached.
spark.catalog.isCached("delayed")

True

In [14]:
# Using the cached data, run the same query with the common table expression (CTE).
# The performance time should improve.

start_time = time.time()

spark.sql("""
with allColumns
(select /*+ BROADCAST(lookup) */
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
a.Dest,
c.City as Dep_City,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a
  inner join lookup b
    on a.Origin=b.airportCode
  inner join lookup c
    on a.Dest=c.airportCode
)
select Origin_City, avg(ArrDelay) avgDelay from allColumns group by 1""").show()

print("--- %s seconds ---" % (time.time() - start_time))


+-----------------+------------------+
|      Origin_City|          avgDelay|
+-----------------+------------------+
|     Amarillo, TX|              63.3|
|    Allentown, PA| 50.94957983193277|
|    Asheville, NC|58.392405063291136|
|  Bloomington, IL| 46.89930555555556|
|    Anchorage, AK| 37.14423076923077|
|      Atlanta, GA| 37.90491635370434|
|       Bangor, ME| 50.27329192546584|
|      Augusta, GA| 55.65714285714286|
|      Bozeman, MT| 38.42857142857143|
|  Albuquerque, NM| 32.54347826086956|
|       Austin, TX|       37.19140625|
|  Baton Rouge, LA|          65.50625|
|    Baltimore, MD|  39.0767004341534|
|       Albany, GA| 50.25352112676056|
|        Aspen, CO|              85.0|
|       Albany, NY| 39.14365671641791|
|       Boston, MA| 47.71041369472183|
|        Akron, OH|55.928196147110334|
|Atlantic City, NJ| 64.64285714285714|
|   Birmingham, AL| 43.17042606516291|
+-----------------+------------------+
only showing top 20 rows

--- 2.2040069103240967 seconds ---


In [15]:
# Remember to uncache the table as soon as you are done.
spark.sql("uncache table delayed")

DataFrame[]

In [16]:
#Verify that the table is no longer cached
spark.catalog.isCached("delayed")

False