 # Skill Drill: Putting it ALL together
 ## Overview

 A VP at your company has written a SQL for a report that shows how much time is being lost due to flight delays by Airport and Carrier.  It is performing below expectations.  Using all of the optimizing skills you have learned in Unit 8, get this query to run as fast as possible.


---

---




 Hint: Initial query takes between 15-20 seconds.  Final query should be <2 seconds


In [None]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.0.1'
# spark_version = 'spark-3.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz

!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-3.0.1-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:13 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:14 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:15 htt

In [None]:
#import packages

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType,StructField,StringType, DateType,IntegerType
import pandas as pd

# we are going to use this to time our queries.
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [None]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-netflix/DelayedFlights.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("DelayedFlights.csv"), sep=",", header=True)
url_cities='https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-netflix/cities500.txt'
spark.sparkContext.addFile(url_cities)
df_lookup_geo = spark.read.csv(SparkFiles.get("cities500.txt"), sep="\t", header=True)

# we are going to do a lookup here as well so upload the airportCodes.csv file from you Resources directory 
df_lookup_city_name=spark.read.csv('airportCodes.csv', sep=',', header=True)


In [None]:
#Create temporary views for each of our dataframes
# We are going to filter the data to US only as we create the Temp Views.

df.createOrReplaceTempView('delayed')
df_lookup_city_name.createOrReplaceTempView('lookup_city')
df_lookup_geo.createOrReplaceTempView('lookup_geo')

In [None]:
# Here is the  initial query presented to you for optimization
# Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|    Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ATL|           EV|    Atlanta, GA|         33.749|       -84.38798|     31.31129|     -92.44514|   99.0| 28.81025641025641|
|   ABQ|           DL|Albuquerque, NM|       35.08449|      -106.65114|       33.749|     -84.38798|   99.0|40.401869158878505|
|   ATW|           EV|   Appleton, WI|       44.26193|       -88.41538|       33.749|     -84.38798|   99.0|30.546666666666667|
|   BWI|           WN|  Baltimore, MD|       39.29038|       -76.61219|     30.26715|     -97.74306|   95.0|12.741935483870968|
|   CAK|           EV|      Akron, OH|       41.08144|       -81.51901|       33.749|     -84.38798|   9

In [None]:
#partition the largest table
df.write.parquet('parquet_delayed',mode='overwrite')

In [None]:
# read the new parquet formatted data
df=spark.read.parquet('parquet_delayed')

In [None]:
# create a view (same name as before so we don't have change our SQL)
df.createOrReplaceTempView('delays')

In [None]:
# run 2 after storing the data more appropriately and partitioning

# Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|    Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ATL|           EV|    Atlanta, GA|         33.749|       -84.38798|     31.31129|     -92.44514|   99.0| 28.81025641025641|
|   ABQ|           DL|Albuquerque, NM|       35.08449|      -106.65114|       33.749|     -84.38798|   99.0|40.401869158878505|
|   ATW|           EV|   Appleton, WI|       44.26193|       -88.41538|       33.749|     -84.38798|   99.0|30.546666666666667|
|   BWI|           WN|  Baltimore, MD|       39.29038|       -76.61219|     30.26715|     -97.74306|   95.0|12.741935483870968|
|   CAK|           EV|      Akron, OH|       41.08144|       -81.51901|       33.749|     -84.38798|   9

In [None]:
# Recall that the default shuffle partitions is 200.  We want to bring that down to a reasonable size for both our data and our Spark cluster
# 4 is reasonable for a free Colab 
start_time =time.time()
spark.sql("""select UniqueCarrier, count(*)from delays group by 1""")
print("---%s seconds ---"% (time.time()-start_time))

---0.03239154815673828 seconds ---


In [None]:
# Run 3 after setting the shuffle partitions to a more appropriate number
# Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|    Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ATL|           EV|    Atlanta, GA|         33.749|       -84.38798|     31.31129|     -92.44514|   99.0| 28.81025641025641|
|   ABQ|           DL|Albuquerque, NM|       35.08449|      -106.65114|       33.749|     -84.38798|   99.0|40.401869158878505|
|   ATW|           EV|   Appleton, WI|       44.26193|       -88.41538|       33.749|     -84.38798|   99.0|30.546666666666667|
|   BWI|           WN|  Baltimore, MD|       39.29038|       -76.61219|     30.26715|     -97.74306|   95.0|12.741935483870968|
|   CAK|           EV|      Akron, OH|       41.08144|       -81.51901|       33.749|     -84.38798|   9

In [None]:
# cache your largest temporary view
# Note: when we use SparkSQL to cache a table, the table is immediately cached (no lazy evaluation), when using Pyspark it will not be cached until an action is ran.
spark.sql("cache table delayed")


DataFrame[]

In [None]:
# check that your table is cached 
spark.catalog.isCached("delayed")

True

In [None]:
# Run 4 - after caching driver table
# Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|    Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ATL|           EV|    Atlanta, GA|         33.749|       -84.38798|     31.31129|     -92.44514|   99.0| 28.81025641025641|
|   ABQ|           DL|Albuquerque, NM|       35.08449|      -106.65114|       33.749|     -84.38798|   99.0|40.401869158878505|
|   ATW|           EV|   Appleton, WI|       44.26193|       -88.41538|       33.749|     -84.38798|   99.0|30.546666666666667|
|   BWI|           WN|  Baltimore, MD|       39.29038|       -76.61219|     30.26715|     -97.74306|   95.0|12.741935483870968|
|   CAK|           EV|      Akron, OH|       41.08144|       -81.51901|       33.749|     -84.38798|   9

In [None]:
# you can even cache a large lookup table.
spark.sql("cache table delayed")

DataFrame[]

In [None]:
spark.catalog.isCached("delayed")

True

In [None]:
# Run 5 - caching one of the lookups
#Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude, max(DepDelay) as delayed, avg(CarrierDelay) avgCarrierDelay 
from allColumns 
group by Origin, UniqueCarrier, Origin_City, Origin_latitude, Origin_Longitude, Dest_latitude, Dest_longitude
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|Origin|UniqueCarrier|    Origin_City|Origin_latitude|Origin_Longitude|Dest_latitude|Dest_longitude|delayed|   avgCarrierDelay|
+------+-------------+---------------+---------------+----------------+-------------+--------------+-------+------------------+
|   ATL|           EV|    Atlanta, GA|         33.749|       -84.38798|     31.31129|     -92.44514|   99.0| 28.81025641025641|
|   ABQ|           DL|Albuquerque, NM|       35.08449|      -106.65114|       33.749|     -84.38798|   99.0|40.401869158878505|
|   ATW|           EV|   Appleton, WI|       44.26193|       -88.41538|       33.749|     -84.38798|   99.0|30.546666666666667|
|   BWI|           WN|  Baltimore, MD|       39.29038|       -76.61219|     30.26715|     -97.74306|   95.0|12.741935483870968|
|   CAK|           EV|      Akron, OH|       41.08144|       -81.51901|       33.749|     -84.38798|   9

In [None]:
# run 6 - remove unnecesary columns from the SQL
# Note the runtime
start_time = time.time()

spark.sql("""select UniqueCarrier, sum(CRSElapsedTime), count(*) from delays group by 1""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+-------------+-----------------------------------+--------+
|UniqueCarrier|sum(CAST(CRSElapsedTime AS DOUBLE))|count(1)|
+-------------+-----------------------------------+--------+
|           UA|                        2.4361525E7|  141426|
|           AA|                        3.2826567E7|  191865|
|           NW|                        1.1290359E7|   79108|
|           EV|                          8059597.0|   81877|
|           B6|                        1.0088034E7|   55315|
|           DL|                        1.8642864E7|  114238|
|           OO|                        1.2431166E7|  132433|
|           F9|                          4092210.0|   28269|
|           YV|                          6117806.0|   67063|
|           US|                        1.5893179E7|   98425|
|           AQ|                            99698.0|     750|
|           MQ|                        1.3302061E7|  141920|
|           OH|                          5970707.0|   52657|
|           HA|         

In [None]:
# run 7 - filter the lookup tables in the SQL
# Note the runtime
start_time = time.time()

spark.sql("""select UniqueCarrier, count(*) from delays group by 1""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+-------------+--------+
|UniqueCarrier|count(1)|
+-------------+--------+
|           UA|  141426|
|           AA|  191865|
|           NW|   79108|
|           EV|   81877|
|           B6|   55315|
|           DL|  114238|
|           OO|  132433|
|           F9|   28269|
|           YV|   67063|
|           US|   98425|
|           AQ|     750|
|           MQ|  141920|
|           OH|   52657|
|           HA|    7490|
|           XE|  103663|
|           AS|   39293|
|           FL|   71284|
|           CO|  100195|
|           WN|  377602|
|           9E|   51885|
+-------------+--------+

--- 1.785923957824707 seconds ---


In [None]:
#recreate the dataframes selecting only the columns you need, filtering the data before creating the view, then caching the views.
# columns needed are 'UniqueCarrier','DepDelay','Origin','CarrierDelay','Dest' from the main table
#<df.select here>
# filter the df_lookup_city data prior to creating the view to only contain USA data

# filter the df_lookup_geo data prior to creating the view to only contain US data and select only the columns you need to perform the lookup
# fields from geo ('name','latitude','longitude','admin1_code')

df_lookup_city_name.createOrReplaceTempView('lookup_city')
df_lookup_geo.createOrReplaceTempView('lookup_geo')
df.createOrReplaceTempView('delayed')
spark.sql("cache table delayed")
spark.sql("cache table lookup_geo")

DataFrame[]

In [None]:
# run 8 - filtered lookup dataframes
# Note the runtime
start_time = time.time()

#final query should return the same data as the first query, but much faster>

print("--- %s seconds ---" % (time.time() - start_time))

--- 0.00011920928955078125 seconds ---


In [None]:
# Remember to uncache the table as soon as you are done.
spark.sql("uncache table delayed")
spark.sql("uncache table lookup_geo")

DataFrame[]

In [None]:
#Verify that the table is no longer cached
if spark.catalog.isCached("delayed") or spark.catalog.isCached("lookup_geo"):
  print("a table is till cached")
else:
  print("all clear")

all clear
