 # Skill Drill: Putting it ALL together
 ## Overview

 A VP at your company has written a SQL for a report that shows how much time is being lost due to flight delays by Airport and Carrier.  It is performing below expectations.  Using all of the optimizing skills you have learned in Unit 8, get this query to run as fast as possible.


---

---




 Hint: Initial query takes between 15-20 seconds.  Final query should be <2 seconds


In [2]:
# activate Spark in our Colab notebook.
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.0.2'
# spark_version = 'spark-3.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Waiting for headers] [Waiting for headers] [1 InRelease 0 B/3,626 B 0%] [Wa0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Waiting f                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Waiting f0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
                                                                               Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [3 InRelease 11.3

In [3]:
#import packages

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType,StructField,StringType, DateType,IntegerType
import pandas as pd

# we are going to use this to time our queries.
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [5]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-netflix/DelayedFlights.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("DelayedFlights.csv"), sep=",", header=True)
url_cities='https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-netflix/cities500.txt'
spark.sparkContext.addFile(url_cities)
df_lookup_geo = spark.read.csv(SparkFiles.get("cities500.txt"), sep="\t", header=True)

# we are going to do a lookup here as well so upload the airportCodes.csv file from you Resources directory 
df_lookup_city_name=spark.read.csv('/content/airportCodes.csv', sep=',', header=True)


In [6]:
#Create temporary views for each of our dataframes
# We are going to filter the data to US only as we create the Temp Views.

df.createOrReplaceTempView('delayed')
df_lookup_city_name.createOrReplaceTempView('lookup_city')
df_lookup_geo.createOrReplaceTempView('lookup_geo')

In [7]:
# Here is the  initial query presented to you for optimization
# Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|    Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ATL|           EV|    Atlanta, GA|         33.749|       -84.38798|     31.31129|     -92.44514|   99.0| 28.81025641025641|
|   ABQ|           DL|Albuquerque, NM|       35.08449|      -106.65114|       33.749|     -84.38798|   99.0|40.401869158878505|
|   ATW|           EV|   Appleton, WI|       44.26193|       -88.41538|       33.749|     -84.38798|   99.0|30.546666666666667|
|   BWI|           WN|  Baltimore, MD|       39.29038|       -76.61219|     30.26715|     -97.74306|   95.0|12.741935483870968|
|   CAK|           EV|      Akron, OH|       41.08144|       -81.51901|       33.749|     -84.38798|   9

In [8]:
#partition the largest table
df.write.partitionBy('UniqueCarrier').mode('overwrite').parquet('partitioned_dataframe')

In [10]:
# read the new parquet formatted data
par_df = spark.read.parquet('partitioned_dataframe')

In [11]:
# create a view (same name as before so we don't have change our SQL)
par_df.createOrReplaceTempView('delayed')

In [12]:
# run 2 after storing the data more appropriately and partitioning

# Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|    Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ATL|           EV|    Atlanta, GA|         33.749|       -84.38798|     31.31129|     -92.44514|   99.0| 28.81025641025641|
|   ABQ|           DL|Albuquerque, NM|       35.08449|      -106.65114|       33.749|     -84.38798|   99.0|40.401869158878505|
|   ATW|           EV|   Appleton, WI|       44.26193|       -88.41538|       33.749|     -84.38798|   99.0|30.546666666666667|
|   BWI|           WN|  Baltimore, MD|       39.29038|       -76.61219|     30.26715|     -97.74306|   95.0|12.741935483870968|
|   CAK|           EV|      Akron, OH|       41.08144|       -81.51901|       33.749|     -84.38798|   9

In [13]:
# Recall that the default shuffle partitions is 200.  We want to bring that down to a reasonable size for both our data and our Spark cluster
# 4 is reasonable for a free Colab 
spark.conf.set("spark.sql.shuffle.partitions",4)

In [14]:
# Run 3 after setting the shuffle partitions to a more appropriate number
# Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+-------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|  Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+-------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ABE|           OO|Allentown, PA|       40.60843|       -75.49018|     33.52066|     -86.80249|   11.0|              null|
|   ALB|           OH|   Albany, NY|       42.65258|       -73.75623|     42.35843|     -71.05977|   55.0|               0.0|
|   ALB|           WN|   Albany, NY|       42.65258|       -73.75623|     39.29038|     -76.61219|   99.0| 8.203647416413373|
|   ASE|           OO|    Aspen, CO|        39.1911|      -106.81754|       33.749|     -84.38798|   82.0|               6.0|
|   ATL|           9E|  Atlanta, GA|         33.749|       -84.38798|     33.52066|     -86.80249|   94.0|15.083333333

In [18]:
# cache your largest temporary view
# Note: when we use SparkSQL to cache a table, the table is immediately cached (no lazy evaluation), when using Pyspark it will not be cached until an action is ran.
spark.catalog.cacheTable("delayed")

In [19]:
# check that your table is cached 
spark.catalog.isCached("delayed")

True

In [20]:
# Run 4 - after caching driver table
# Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+-------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|  Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+-------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ABE|           OO|Allentown, PA|       40.60843|       -75.49018|     33.52066|     -86.80249|   11.0|              null|
|   ALB|           OH|   Albany, NY|       42.65258|       -73.75623|     42.35843|     -71.05977|   55.0|               0.0|
|   ALB|           WN|   Albany, NY|       42.65258|       -73.75623|     39.29038|     -76.61219|   99.0| 8.203647416413373|
|   ASE|           OO|    Aspen, CO|        39.1911|      -106.81754|       33.749|     -84.38798|   82.0|               6.0|
|   ATL|           9E|  Atlanta, GA|         33.749|       -84.38798|     33.52066|     -86.80249|   94.0|15.083333333

In [24]:
# you can even cache a large lookup table.
spark.catalog.cacheTable("lookup_geo")

In [25]:
spark.catalog.isCached("lookup_geo")

True

In [26]:
# Run 5 - caching one of the lookups
#Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+-------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|  Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+-------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ABE|           OO|Allentown, PA|       40.60843|       -75.49018|     33.52066|     -86.80249|   11.0|              null|
|   ALB|           OH|   Albany, NY|       42.65258|       -73.75623|     42.35843|     -71.05977|   55.0|               0.0|
|   ALB|           WN|   Albany, NY|       42.65258|       -73.75623|     39.29038|     -76.61219|   99.0| 8.203647416413373|
|   ASE|           OO|    Aspen, CO|        39.1911|      -106.81754|       33.749|     -84.38798|   82.0|               6.0|
|   ATL|           9E|  Atlanta, GA|         33.749|       -84.38798|     33.52066|     -86.80249|   94.0|15.083333333

In [32]:
# run 6 - remove unnecesary columns from the SQL
# Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select 
a.UniqueCarrier,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.CarrierDelay,
a.LateAircraftDelay from delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select UniqueCarrier, Origin_City, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by UniqueCarrier, Origin_City, Dest_latitude, Dest_longitude
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+-------------+---------------+-------------+--------------+-------+------------------+
|UniqueCarrier|    Origin_City|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+-------------+---------------+-------------+--------------+-------+------------------+
|           9E|    Atlanta, GA|     33.52066|     -86.80249|   94.0|15.083333333333334|
|           9E|  Baltimore, MD|       33.749|     -84.38798|    8.0| 7.466666666666667|
|           9E|     Bangor, ME|       33.749|     -84.38798|    8.0|              null|
|           9E| Birmingham, AL|       33.749|     -84.38798|   99.0| 10.23076923076923|
|           AS|     Barrow, AK|     61.21806|    -149.90028|   97.0| 4.483516483516484|
|           DL|    Atlanta, GA|     41.08144|     -81.51901|    7.0| 17.11111111111111|
|           DL|  Baltimore, MD|       33.749|     -84.38798|   99.0|25.518234165067177|
|           DL|    Bozeman, MT|       33.749|     -84.38798|   81.0|            18.125|
|           EV|      Akron, OH| 

In [37]:
# run 7 - filter the lookup tables in the SQL
# Note the runtime
start_time = time.time()

#re-write your sql applying a filter to each lookup table

spark.sql("""
with lookup_city_filter as (
  select
  City,
  airportCode
  from lookup_city where country = "USA"
),
lookup_geo_filter as (
    select
    name,
    latitude,
    longitude,
    admin1_code
    from lookup_geo where country_code = "US"),
allColumns as
(select
a.UniqueCarrier,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.CarrierDelay,
a.LateAircraftDelay from delayed a
  inner join lookup_city_filter b
    on a.Origin=b.airportCode
  inner join lookup_city_filter c
    on a.Dest=c.airportCode
  inner join lookup_geo_filter geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo_filter geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select UniqueCarrier, Origin_City, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay
from allColumns
group by UniqueCarrier, Origin_City, Dest_latitude, Dest_longitude
""").show()


print("--- %s seconds ---" % (time.time() - start_time))

+-------------+---------------+-------------+--------------+-------+------------------+
|UniqueCarrier|    Origin_City|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+-------------+---------------+-------------+--------------+-------+------------------+
|           9E|    Atlanta, GA|     33.52066|     -86.80249|   94.0|15.083333333333334|
|           9E|  Baltimore, MD|       33.749|     -84.38798|    8.0| 7.466666666666667|
|           9E|     Bangor, ME|       33.749|     -84.38798|    8.0|              null|
|           9E| Birmingham, AL|       33.749|     -84.38798|   99.0| 10.23076923076923|
|           AS|     Barrow, AK|     61.21806|    -149.90028|   97.0| 4.483516483516484|
|           DL|    Atlanta, GA|     41.08144|     -81.51901|    7.0| 17.11111111111111|
|           DL|  Baltimore, MD|       33.749|     -84.38798|   99.0|25.518234165067177|
|           DL|    Bozeman, MT|       33.749|     -84.38798|   81.0|            18.125|
|           EV|      Akron, OH| 

In [41]:
#recreate the dataframes selecting only the columns you need, filtering the data before creating the view, then caching the views.
# columns needed are 'UniqueCarrier','DepDelay','Origin','CarrierDelay','Dest' from the main table
dataframe_filter = df.select(df.UniqueCarrier, df.DepDelay, df.Origin, df.CarrierDelay, df.Dest)
# filter the df_lookup_city data prior to creating the view to only contain USA data
dataframe_lookup_city_filter = df_lookup_city_name.where(df_lookup_city_name.country == 'USA')
# filter the df_lookup_geo data prior to creating the view to only contain US data and select only the columns you need to perform the lookup
# fields from geo ('name','latitude','longitude','admin1_code')
dataframe_lookup_geo_filter = df_lookup_geo.select(df_lookup_geo.name, df_lookup_geo.latitude, 
                                              df_lookup_geo.longitude, df_lookup_geo.admin1_code).where (df_lookup_geo.country_code == 'US')
dataframe_lookup_city_filter.createOrReplaceTempView('lookup_city')
dataframe_lookup_geo_filter.createOrReplaceTempView('lookup_geo')
dataframe_filter.createOrReplaceTempView('delayed')

spark.catalog.cacheTable('delayed')


spark.catalog.cacheTable('lookup_geo')

In [42]:
spark.catalog.isCached('lookup_geo')

True

In [46]:
# run 8 - filtered lookup dataframes
# Note the runtime
start_time = time.time()

#<final query should return the same data as the first query, but much faster>

spark.sql("""
with allColumns
(select 
a.UniqueCarrier,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.CarrierDelay
from delayed a 
inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select UniqueCarrier, Origin_City, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by UniqueCarrier, Origin_City, Dest_latitude, Dest_longitude
""").show()


print("--- %s seconds ---" % (time.time() - start_time))

+-------------+---------------+-------------+--------------+-------+------------------+
|UniqueCarrier|    Origin_City|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+-------------+---------------+-------------+--------------+-------+------------------+
|           9E|    Atlanta, GA|     33.52066|     -86.80249|   94.0|15.083333333333334|
|           9E|  Baltimore, MD|       33.749|     -84.38798|    8.0| 7.466666666666667|
|           9E|     Bangor, ME|       33.749|     -84.38798|    8.0|              null|
|           9E| Birmingham, AL|       33.749|     -84.38798|   99.0| 10.23076923076923|
|           AS|     Barrow, AK|     61.21806|    -149.90028|   97.0| 4.483516483516484|
|           DL|    Atlanta, GA|     41.08144|     -81.51901|    7.0| 17.11111111111111|
|           DL|  Baltimore, MD|       33.749|     -84.38798|   99.0|25.518234165067177|
|           DL|    Bozeman, MT|       33.749|     -84.38798|   81.0|            18.125|
|           EV|      Akron, OH| 

In [47]:
# Remember to uncache the table as soon as you are done.
spark.sql("uncache table delayed")
spark.sql("uncache table lookup_geo")

DataFrame[]

In [48]:
#Verify that the table is no longer cached
if spark.catalog.isCached("delayed") or spark.catalog.isCached("lookup_geo"):
  print("a table is till cached")
else:
  print("all clear")

all clear
