In [None]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.0'
spark_version = 'spark-3.4.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder\
    .appName("SparkSQL")\
    .config("spark.sql.debug.maxToStringFields", 2000)\
    .config("spark.driver.memory", "2g")\
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/16 17:01:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
flights_url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/3/DelayedFlights.csv"
spark.sparkContext.addFile(flights_url)
flights_df = spark.read.csv(SparkFiles.get("DelayedFlights.csv"), sep=",", header=True)

# Show the delayed flight data.
flights_df.show()

                                                                                

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|id |Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|0  |2008|1    |3         |4        |2003   |1955      |2211   |2225      |WN     

In [None]:
# Read in the airport codes from an S3 Bucket
airportCodes_url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/3/airportCodes.csv"
spark.sparkContext.addFile(airportCodes_url)

# The airport codes will be a lookup DataFrame
airportCodes_lookup = spark.read.csv(SparkFiles.get("airportCodes.csv"), sep=',', header=True)

# Show the lookup table data.
airportCodes_lookup.show()

+--------------+--------------------+-----------+
|          City|             country|airportCode|
+--------------+--------------------+-----------+
|       Aalborg|             Denmark|        AAL|
|      Aalesund|              Norway|        AES|
|        Aarhus|             Denmark|        AAR|
|Abbotsford, BC|              Canada|        YXX|
|Abbotsford, BC|              Canada|        YXX|
|      Aberdeen|            Scotland|        ABZ|
|  Aberdeen, SD|                 USA|        ABR|
|       Abidjan|         Ivory Coast|        ABJ|
|   Abilene, TX|                 USA|        ABI|
|     Abu Dhabi|United Arab Emirates|        AUH|
|         Abuja|             Nigeria|        ABV|
|      Acapulco|              Mexico|        ACA|
|         Accra|               Ghana|        ACC|
|         Adana|              Turkey|        ADA|
|   Addis Ababa|            Ethiopia|        ADD|
|Adelaide, S.A.|           Australia|        ADL|
|          Aden|               Yemen|        ADE|


In [None]:
# Recall that the default shuffle partitions is 200.  
# We want to bring that down to a reasonable size for both our data and our Spark cluster
# A good rule of thumb is two times the number of cores. 
spark.conf.set("spark.sql.shuffle.partitions", 8)

In [None]:
# Create temporary views for each of our dataframes


In [None]:
# This first query joins our airport code lookup data to our delayed fligts table
# By default Spark does a broadcast join when the Join table is < 10MB.  This is configurable
# but since our table is VERY small, it will auto-broadcast. 

start_time = time.time()

spark.sql(""" 

SQL code here

""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|    Origin_City|Dest|      Dest_City|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------

In [None]:
# Here we have added the hint to Broadcast the lookup table.  
start_time = time.time()

spark.sql("""

SQL code here


""").show()

print("--- %s seconds ---" % (time.time() - start_time))

22/11/16 17:13:00 WARN HintErrorLogger: Count not find relation 'lookup' specified in hint 'BROADCAST(lookup)'.


+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|    Origin_City|Dest|       Dep_City|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------

In [None]:
# In this query we are trying to give the cluster some work to do.  
# We are creating a a common table expression (CTE), with allColumns, that joins the two tables together 
# and then an aggregation by averaging the delays.

start_time = time.time()

spark.sql("""
with allColumns
<SQL code here>

""").show()

print("--- %s seconds ---" % (time.time() - start_time))

22/11/16 17:13:35 WARN HintErrorLogger: Count not find relation 'lookup' specified in hint 'BROADCAST(lookup)'.

+-----------------+------------------+
|      Origin_City|          avgDelay|
+-----------------+------------------+
|       Boston, MA| 47.71041369472183|
|     Amarillo, TX|              63.3|
|   Birmingham, AL| 43.17042606516291|
|     Appleton, WI| 42.99324324324324|
|       Austin, TX|       37.19140625|
|  Albuquerque, NM| 32.54347826086956|
|    Baltimore, MD|  39.0767004341534|
|   Alexandria, LA|50.947712418300654|
|  Baton Rouge, LA|          65.50625|
|      Augusta, GA| 55.65714285714286|
|Atlantic City, NJ| 64.64285714285714|
|  Bloomington, IL| 46.89930555555556|
|        Akron, OH|55.928196147110334|
|       Bangor, ME| 50.27329192546584|
|      Atlanta, GA| 37.90491635370434|
|    Asheville, NC|58.392405063291136|
|       Albany, GA| 50.25352112676056|
|    Allentown, PA| 50.94957983193277|
|       Albany, NY| 39.14365671641791|
|       Bethel, AK|31.984848484848484|
+-----------------+------------------+
only showing top 20 rows

--- 3.463031053543091 seconds ---



                                                                                

In [None]:
# Next, we are use SparkSQL to cache our table
# Note: when we use SparkSQL to cache a table, the table is immediately cached (no lazy evaluation).
# When using Pyspark it will not be cached until an action is run.



                                                                                

DataFrame[]

In [None]:
# This command checks that our table is cached
# It will return True if it is cached.


True

In [None]:
# Using the cached data, run the same query with the common table expression (CTE).
# The performance time should improve.

start_time = time.time()

spark.sql("""

SQL code here

""").show()

print("--- %s seconds ---" % (time.time() - start_time))


22/11/16 17:13:52 WARN HintErrorLogger: Count not find relation 'lookup' specified in hint 'BROADCAST(lookup)'.


+-----------------+------------------+
|      Origin_City|          avgDelay|
+-----------------+------------------+
|       Boston, MA| 47.71041369472183|
|     Amarillo, TX|              63.3|
|   Birmingham, AL| 43.17042606516291|
|     Appleton, WI| 42.99324324324324|
|       Austin, TX|       37.19140625|
|  Albuquerque, NM| 32.54347826086956|
|    Baltimore, MD|  39.0767004341534|
|   Alexandria, LA|50.947712418300654|
|  Baton Rouge, LA|          65.50625|
|      Augusta, GA| 55.65714285714286|
|Atlantic City, NJ| 64.64285714285714|
|  Bloomington, IL| 46.89930555555556|
|        Akron, OH|55.928196147110334|
|       Bangor, ME| 50.27329192546584|
|      Atlanta, GA| 37.90491635370434|
|    Asheville, NC|58.392405063291136|
|       Albany, GA| 50.25352112676056|
|    Allentown, PA| 50.94957983193277|
|       Albany, NY| 39.14365671641791|
|       Bethel, AK|31.984848484848484|
+-----------------+------------------+
only showing top 20 rows

--- 0.7122211456298828 seconds ---


In [None]:
# Remember to uncache the table as soon as you are done.


DataFrame[]

In [None]:
#Verify that the table is no longer cached


False