@channel **Hello Everyone,**

**2024-02-12 `22.3-Big-Data-Optimizing Spark: Storage, Partitioning, and Caching`**

**Objectives**

* Compare the file storage types (other than tabular) that work the best for Spark.
* Understand how partitioning affects Spark performance.
* Explain the cause of shuffling and limit it when possible.
* Identify when caching is the best option.
* Explain how to broadcast a lookup table, and force it when it doesn't happen automatically.
* Set the shuffle partitions to an appropriate value and demonstrate how to cache data.


**Presentation**
* [22.3-Optimizing Spark: Storage, Partitioning, and Caching](https://git.bootcampcontent.com/University-of-California---Berkeley/UCB-VIRT-DATA-PT-08-2023-U-LOLC/-/blob/main/Slides/Data-22.3-Storage_Partitioning_and_Caching.pdf)

**Best wishes.**

**Install**
* [Download WinRAR](https://www.win-rar.com/start.html?&L=0)
* [Download JDK Development Kit](https://www.oracle.com/java/technologies/downloads/)
* [Download Apache Spark](https://spark.apache.org/downloads.html)
   - Extract Spark to a folder, example `C:/spark-3.5.0-bin-hadoop3`
   - Restart your `terminal`/`git bash`
   - Add to your env `HADOOP_HOME=C:\spark-3.5.0-bin-hadoop3`
   - Add to your env `SPARK_HOME=C:\spark-3.5.0-bin-hadoop3`
   - Add to yor path `%HADOOP_HOME%\bin`
   - Copy [winutils](https://github.com/steveloughran/winutils/tree/master/hadoop-3.0.0/bin) to `C:\spark-3.5.0-bin-hadoop3\bin`
   

```
pip install pyspark findspark
pip install MRJob
pip install plotly
```

# ==========================================

### 3.01 Instructor Do: Introducing Parquet (20 min)

* An RDD is a table in a database and can hold any type of data. Spark stores data in RDD on different partitions in memory as an object across the jobs, which increases computation time, compared to storing the data on a hard disk.

* When Spark loads data from a CSV file, it will distribute the data through different nodes. Each node will then assign rows of the CSV file to a partition. When we query the data using a Spark API, every value has to be read from each partition.

* Computation times for datasets less than a few gigabytes are not greatly affected since the maximum size of a partition is 128MB. But, when you are storing terabyte or larger datasets, it is better to store the data in a parquet format to increase query optimization.

---
Parquet is a split-table in columnar format that works well in distributed storage like HDFS, AWS S3, and Azure data lake storage (ADLS).

* **Columnar** refers to how the data is stored. Columnar stores each column value of a row separately, with a reference to all of its columns.

* This allows you to query and filter a single column and return only the selected columns in your query with great efficiency. It also reduces the amount of reading Spark performs.

* Parquet is widely used in data analytics with Spark, because it greatly reduces query runtime.

* Parquet will also help when **inferring the schema**. To infer the schema in parquet requires a one-time pass, while CSV and JSON require a pass of *all* of the data.

    *  **Remember:** Best practice is to state your schema prior to load, which is highly recommended.

In [14]:
# Import findspark and initialize. 
import findspark
findspark.init()

In [16]:
# Import packages
from pyspark.sql import SparkSession
# Import the time module so we can time our queries.
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").config("spark.driver.memory", "2g").getOrCreate()

In [17]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/3/NYC_Building_Violations.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("NYC_Building_Violations.csv"), sep=",", header=True)
df.show()

+----------------+----+-------+-----+-----+----------+-------------------+----------------+------------+--------------------+----------------+--------------------+-------------+--------------------+----------+--------------------+--------------------+--------------------+
|ISN_DOB_BIS_VIOL|BORO|    BIN|BLOCK|  LOT|ISSUE_DATE|VIOLATION_TYPE_CODE|VIOLATION_NUMBER|HOUSE_NUMBER|              STREET|DISPOSITION_DATE|DISPOSITION_COMMENTS|DEVICE_NUMBER|         DESCRIPTION|ECB_NUMBER|              NUMBER|  VIOLATION_CATEGORY|      VIOLATION_TYPE|
+----------------+----+-------+-----+-----+----------+-------------------+----------------+------------+--------------------+----------------+--------------------+-------------+--------------------+----------+--------------------+--------------------+--------------------+
|         2286033|   1|1009713|00577|00019|  20180507|                  E|     9027/627971|          34|        WEST 14TH ST|        20220509|PPN203 AOC SUB 05...|      1P13420|    

In [18]:
# Get a summary of the data. 
df.summary().show()

+-------+------------------+------------------+-----------------+------------------+------------------+-------------------+-------------------+----------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+
|summary|  ISN_DOB_BIS_VIOL|              BORO|              BIN|             BLOCK|               LOT|         ISSUE_DATE|VIOLATION_TYPE_CODE|VIOLATION_NUMBER|      HOUSE_NUMBER|            STREET|    DISPOSITION_DATE|DISPOSITION_COMMENTS|       DEVICE_NUMBER|         DESCRIPTION|          ECB_NUMBER|            NUMBER|  VIOLATION_CATEGORY|      VIOLATION_TYPE|
+-------+------------------+------------------+-----------------+------------------+------------------+-------------------+-------------------+----------------+------------------+------------------+--------------------+--------------------+--------------------+---------

In [22]:
 # Let's create a view with our DataFrame and run SQL that will sum up the boroughs by the type of violation.
# We can output the time this step runs in seconds.
# Because we are timing the executions, remember to run twice to eliminate the "load time" from the discussion.

df.createOrReplaceTempView('violations')
start_time = time.time()

spark.sql("""select VIOLATION_TYPE, sum(BORO) from violations group by 1""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+--------------------+---------+
|      VIOLATION_TYPE|sum(BORO)|
+--------------------+---------+
|LL10/80-LOCAL LAW...|   3609.0|
|LL11/98-LOCAL LAW...|   9285.0|
|HVIOS-NYCHA ELEV ...|    969.0|
|P-PLUMBING       ...|  29480.0|
|ACH1-(NYCHA) - EL...|   4949.0|
|LANDMRK-LANDMARK ...|   5599.0|
|LL5-LOCAL LAW 5/7...|   1363.0|
|B-BOILER         ...|  17042.0|
|FISP-FACADE SAFET...|   6889.0|
|EGNCY-EMERGENCY  ...|  12607.0|
|ES-ELECTRIC SIGNS...|  18378.0|
|                NULL|    148.0|
|L1198-LOCAL LAW 1...|  10656.0|
|HBLVIO-HIGH PRESS...|  14628.0|
|BENCH-FAILURE TO ...| 110285.0|
|RWNRF-RETAINING W...|   4007.0|
|FISPNRF-NO REPORT...|  21017.0|
|LL2604-PHOTOLUMIN...|    679.0|
|LL2604S-SPRINKLER...|   1513.0|
|ACJ1-(PRIVATE RES...|   2125.0|
+--------------------+---------+
only showing top 20 rows

--- 6.765611886978149 seconds ---


In [23]:
# Write out the data in parquet format
# Note: That this is pretty much the same as writing out to a csv to your local directory.
# We are telling Spark to overwrite all of the data if it already exists
df.write.parquet('parquet_violations', mode='overwrite')



*   click the folder icon on the left of the notebook to expose the folders and files stored in your colab enviornment.  Notice that a new folder is present with the same name as your parquet file (parquet_title_basic)
*   inside of it you will find 'part-*.parquet' files and a '_SUCCESS' file. 
*  The '_SUCCESS' file is created when Spark creates a Parquet folder
*  the part-* files are binary files that store your compressed data in columnar format





In [24]:
# Read in our new parquet formatted data
p_df=spark.read.parquet('parquet_violations')

In [25]:
# A parquet formatted DataFrame has all the same methods as a row-based DataFrame
# We can convert the DataFrame to a view.
p_df.createOrReplaceTempView('p_violations')

In [26]:
# Run the same sql as above.  (Note: If you have small datasets it IS possible that times may be very close.)
# Because we are timing the executions, remember to run twice to eliminate the "load time" from the discussion.

start_time = time.time()
spark.sql("""select VIOLATION_TYPE, sum(BORO) from p_violations group by 1""").show()
print("--- %s seconds ---" % (time.time() - start_time))

+--------------------+---------+
|      VIOLATION_TYPE|sum(BORO)|
+--------------------+---------+
|LL10/80-LOCAL LAW...|   3609.0|
|LL11/98-LOCAL LAW...|   9285.0|
|HVIOS-NYCHA ELEV ...|    969.0|
|P-PLUMBING       ...|  29480.0|
|ACH1-(NYCHA) - EL...|   4949.0|
|LANDMRK-LANDMARK ...|   5599.0|
|LL5-LOCAL LAW 5/7...|   1363.0|
|FISP-FACADE SAFET...|   6889.0|
|B-BOILER         ...|  17042.0|
|EGNCY-EMERGENCY  ...|  12607.0|
|ES-ELECTRIC SIGNS...|  18378.0|
|                NULL|    148.0|
|L1198-LOCAL LAW 1...|  10656.0|
|HBLVIO-HIGH PRESS...|  14628.0|
|BENCH-FAILURE TO ...| 110285.0|
|RWNRF-RETAINING W...|   4007.0|
|FISPNRF-NO REPORT...|  21017.0|
|LL2604-PHOTOLUMIN...|    679.0|
|LL2604S-SPRINKLER...|   1513.0|
|ACJ1-(PRIVATE RES...|   2125.0|
+--------------------+---------+
only showing top 20 rows

--- 3.2563915252685547 seconds ---


In [27]:
# Writing out a csv file from Spark will also create a folder with "part" files.
# These files are not binary or compressed and in reality are just normal csv files broken into partitions.
# You can see the folder 'out_violations.csv' in your local directory.
df.write.csv('out_violations.csv', mode='overwrite')

### Read a parquet file into a Pandas DataFrame

In [28]:
import pandas as pd

In [31]:
# Open the parquet_violations folder and get the name of a file and edit the path to the parquet file.  
# Check for the correct file name, since the file number will change.
parquet_file = "parquet_violations/part-00000-e6999205-aaba-460b-8a0c-01959c0c7047-c000.snappy.parquet"

# Convert the parquet file to a Pandas DataFrame. 
part_00000_df = pd.read_parquet(parquet_file, engine='auto')
part_00000_df.head()

Unnamed: 0,ISN_DOB_BIS_VIOL,BORO,BIN,BLOCK,LOT,ISSUE_DATE,VIOLATION_TYPE_CODE,VIOLATION_NUMBER,HOUSE_NUMBER,STREET,DISPOSITION_DATE,DISPOSITION_COMMENTS,DEVICE_NUMBER,DESCRIPTION,ECB_NUMBER,NUMBER,VIOLATION_CATEGORY,VIOLATION_TYPE
0,2286033,1,1009713,577,19,20180507,E,9027/627971,34,WEST 14TH ST,20220509,PPN203 AOC SUB 050322 BY BP ELEV CO ...,1P13420,,,V*050718E9027/627971,V*-DOB VIOLATION - Resolved,E-ELEVATOR ...
1,2533639,1,1082666,333,1,20210629,E,9027/705433,77,COLUMBIA STREET,20220509,PPN203 AOC SUB 050222 BY MIDTOWN ELEV CO INC ...,1P27474,,,V*062921E9027/705433,V*-DOB VIOLATION - Resolved,E-ELEVATOR ...
2,2347979,1,1083846,1130,1,20190423,E,9028/648125,200,CENTRAL PARK WEST,20220509,PPN203 AOC SUBMITTED ON 050522 BY CENTENNIAL E...,1P40861,,,V*042319E9028/648125,V*-DOB VIOLATION - Resolved,E-ELEVATOR ...
3,2566336,1,1057155,1889,7502,20211123,E,9028/710097,845,WEST END AVE,20220509,PPN203 AOC SUB 050322 BY BP ELEV CO ...,1P14972,,,V*112321E9028/710097,V*-DOB VIOLATION - Resolved,E-ELEVATOR ...
4,2487351,1,1041456,1387,21,20200925,E,9028/689200,31,E 72 ST,20220509,PPN203 AOC SUB 050322 BY BP ELEV CO ...,1P10910,,,V*092520E9028/689200,V*-DOB VIOLATION - Resolved,E-ELEVATOR ...


# ==========================================

### 3.02 Students Do: Practicing Parquet (15 min)

# Practicing Parquet
In this activity you'll practice storing data in Parquet format and executing queries on Parquet data using Spark.

**Instructions:** 
1. Import the `Austin 311 Public Dataset` to a Spark DataFrame.
2. Create a temporary view and write a SparkSQL query that gets a count of each unique name in `description`.
    * Record the execution time of the SparkSQL query using the `time.time()` method. 
    * **Note:** You may want to run this query twice to eliminate initial load time.
3. Write your Spark DataFrame containing Austin 311 s to parquet format.
4. Read your parquet data into a new Spark DataFrame.
5. Using your new parquet DataFrame, create a new temporary view and write a SparkSQL query that gets a count of each unique name in `description`.
    * Be sure to record the execution time using the new parquet DataFrame temporary view.
6. Compare the runtime of the parquet SparkSQL query versus the traditional Spark DataFrame query. 
## Data Source:
[Austin 311 Public Dataset](https://data.austintexas.gov/Utilities-and-City-Services/Austin-311-Public-Data/xwdj-i9he)

---

In [32]:
# Import findspark and initialize. 
import findspark
findspark.init()

In [33]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").config("spark.driver.memory", "2g").getOrCreate()

In [34]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/3/Austin_311_Public_Data.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("Austin_311_Public_Data.csv"), sep=",", header=True)
df.show()


+----------------------+--------------------+------------------+------+--------------+--------------+--------------------+-------------+--------------------+------+--------+----------+----------------+
|service_request_number|         description|   method_received|status|  created_date|   closed_date|            location|street_number|         street_name|  city|zip_code|    county|council_dsitrict|
+----------------------+--------------------+------------------+------+--------------+--------------+--------------------+-------------+--------------------+------+--------+----------+----------------+
|           19-00090956|Animal - Proper Care|             Phone|Closed| 3/14/19 15:02|11/23/20 13:41|4609 RIBBECKE AVE...|         4609|            RIBBECKE|AUSTIN|   78721|    TRAVIS|               1|
|           20-00135805|      Tree Issue ROW|     Mobile Device|Closed|  4/7/20 19:06| 11/23/20 0:02|3521 WEST AVE, AU...|         3521|                WEST|AUSTIN|   78705|    TRAVIS|        

In [35]:
# Lets create a view with our DataFrame that counts the occurrences of the service request description of the 311 issues. 
# Record the runtime using time.time() method.

df.createOrReplaceTempView('311description')

start_time = time.time()

spark.sql("""select description, count(*) from 311description group by 1""").show(truncate=False)

print("--- %s seconds ---" % (time.time() - start_time))

+---------------------------------------------------+--------+
|description                                        |count(1)|
+---------------------------------------------------+--------+
|Tree Issue ROW/Maintenance (PW)                    |4688    |
|Austin Code - Short Term Rental (STR) Appointment  |848     |
|Park Maintenance - Grounds Plumbing Issues         |2690    |
|Austin Code - Short Term Rental Complaint SR       |9865    |
|Lane/Road Closure Notification                     |9836    |
|ATD - Booting Complaint                            |245     |
|Obstruction in ROW                                 |9453    |
|Park Maintenance - Grounds Electrical Issues       |1646    |
|Parking Machine Issue                              |10756   |
|School Zone Flasher - Timing/Maintenance           |2956    |
|ATD - Shared Micromobility                         |717     |
|Residential Parking Permit Enforcement             |153     |
|AW - Water Conservation Violation                  |58

In [36]:
# Write out the data in parquet format
df.write.parquet('parquet_descriptions',mode='overwrite')



*   click the folder icon on the left and notice that a new folder is present with the same name as your parquet file
*   inside of it you will find 'part_x.parquet' files
*  these are binary files that store your compressed data in columnar format





In [37]:
# Read in our new parquet formatted data
p_df=spark.read.parquet('parquet_descriptions')

In [38]:
# A parquet formatted DataFrame has all the same methods as a row-based DataFrame
# we can convert the DataFrame to a view.
p_df.createOrReplaceTempView('p_descriptions')

In [39]:
# Run the same sql as above.  (Note: Due to small datasets it IS possible that times may be very close, however this is not the case in larger sets.)
start_time = time.time()
spark.sql("""select description, count(*) from p_descriptions group by 1 """).show(truncate=False)
print("--- %s seconds ---" % (time.time() - start_time))

+---------------------------------------------------+--------+
|description                                        |count(1)|
+---------------------------------------------------+--------+
|Tree Issue ROW/Maintenance (PW)                    |4688    |
|Austin Code - Short Term Rental (STR) Appointment  |848     |
|Park Maintenance - Grounds Plumbing Issues         |2690    |
|Austin Code - Short Term Rental Complaint SR       |9865    |
|Lane/Road Closure Notification                     |9836    |
|Obstruction in ROW                                 |9453    |
|Park Maintenance - Grounds Electrical Issues       |1646    |
|Parking Machine Issue                              |10756   |
|School Zone Flasher - Timing/Maintenance           |2956    |
|ATD - Shared Micromobility                         |717     |
|Bat Complaint                                      |1074    |
|Street Light Issue- Multiple poles/multiple streets|2052    |
|Austin Code - Construction Ordinance SR            |12

# ==========================================

### 3.03 Instructor Do: All the data must play its "part" (20 min)

**Partition is the main unit of parallelism in Apache Spark.** In other words, without partitioning, Spark is very limited in its ability to perform tasks concurrently.

The size of the partitions plays a significant role in the performance of our Spark queries.

   *  Many small partitions will perform poorly, while well-distributed partitions will yield the best results.

 By default, the size of the partition will max out at 128MB.

   * We can change this setting by altering the Spark config setting: `spark.files.maxPartitionBytes`.

While Spark can partition our data automatically, ideally we want to partition our data on a well-distributed key, based on our intended usage.

It is important to try to keep partitions close to the same size to avoid skew (more data in one partition).

In [40]:
# Import findspark and initialize. 
import findspark
findspark.init()

In [41]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder\
    .appName("SparkSQL")\
    .config("spark.sql.debug.maxToStringFields", 2000)\
    .config("spark.driver.memory", "2g")\
    .getOrCreate()

In [74]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/3/DelayedFlights.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("DelayedFlights.csv"), sep=",", header=True)
df.show()

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
| id|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|  0|2008|    1|         3|        4|   2003|      1955|   2211|      2225|       

In [75]:
# Create a temporary view
df.createOrReplaceTempView('delays')

In [76]:
# Run a sql query that groups the data on UniqueCarrier
# note the time functions will track the time it takes to load and run the data
# we are only interested in the time it take to run so run this cell twice.
start_time = time.time()

spark.sql("""select UniqueCarrier,sum(CRSElapsedTime), count(*) from delays group by 1""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+-------------+-------------------+--------+
|UniqueCarrier|sum(CRSElapsedTime)|count(1)|
+-------------+-------------------+--------+
|           OO|          6883377.0|   73680|
|           YV|          3216400.0|   34890|
|           OH|          3318613.0|   29152|
|           XE|          7386620.0|   62539|
|           WN|        2.4182455E7|  214624|
|           UA|        1.3998834E7|   82022|
|           EV|          4284049.0|   42782|
|           DL|          8245701.0|   48888|
|           F9|          2338358.0|   16006|
|           US|          8759953.0|   53873|
|           AA|        1.7721836E7|  103120|
|           NW|          6761017.0|   48410|
|           AQ|            99698.0|     750|
|           MQ|          7710479.0|   82505|
|           HA|           345580.0|    2597|
|           AS|          2527656.0|   16553|
|           FL|          4807695.0|   37201|
|           9E|          3255692.0|   31833|
|           B6|          4169064.0|   22868|
|         

In [77]:
# Write out the data in parquet format
df.write.parquet('parquet_delayed', mode='overwrite')

In [78]:
# Read in our new parquet formatted data
p_df=spark.read.parquet('parquet_delayed')

In [79]:
# A parquet formatted DataFrame has all the same methods as a row-based dataframe
# We can convert the dataframe to a view.
p_df.createOrReplaceTempView('p_delays')

In [80]:
start_time = time.time()

spark.sql("""select UniqueCarrier, count(*) from p_delays group by 1""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+-------------+--------+
|UniqueCarrier|count(1)|
+-------------+--------+
|           UA|   82022|
|           NW|   48410|
|           OO|   73680|
|           YV|   34890|
|           US|   53873|
|           OH|   29152|
|           XE|   62539|
|           EV|   42782|
|           DL|   48888|
|           WN|  214624|
|           F9|   16006|
|           B6|   22868|
|           AS|   16553|
|           CO|   44282|
|           FL|   37201|
|           AA|  103120|
|           MQ|   82505|
|           HA|    2597|
|           9E|   31833|
|           AQ|     750|
+-------------+--------+

--- 5.75992226600647 seconds ---


In [93]:
# Here is another sample
start_time = time.time()

# spark.sql("""select UniqueCarrier,sum(CRSElapsedTime), count(*) from p_delays group by 1""").show()
spark.sql("""select count(*) from p_delays WHERE UniqueCarrier='UA'""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+--------+
|count(1)|
+--------+
|   82022|
+--------+

--- 0.549628496170044 seconds ---


In [82]:
# Partition our data by UniqueCarrier
df.write.partitionBy("UniqueCarrier").mode("overwrite").parquet("delayed_partitioned")

In [83]:
# Read in our new parquet formatted data
p_df_p=spark.read.parquet('delayed_partitioned')

In [84]:
# Convert the DataFrame to a view.
p_df_p.createOrReplaceTempView('p_delays_p')

In [91]:
# Query the partitioned data on the Partition key.
start_time = time.time()

# spark.sql("""select UniqueCarrier, count(*) from p_delays_p group by 1""").show()
spark.sql("""select count(*) from p_delays_p WHERE UniqueCarrier='UA'""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+--------+
|count(1)|
+--------+
|   82022|
+--------+

--- 0.7977452278137207 seconds ---


In [92]:
# Grouping by partition key and aggregating data.
start_time = time.time()

# spark.sql("""select UniqueCarrier,sum(CRSElapsedTime) from p_delays_p group by 1""").show()
spark.sql("""select sum(CRSElapsedTime) from p_delays_p WHERE UniqueCarrier='UA'""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+-------------------+
|sum(CRSElapsedTime)|
+-------------------+
|        1.3998834E7|
+-------------------+

--- 0.6959357261657715 seconds ---


In [94]:
# Another query filtering on the key.
start_time = time.time()
spark.sql("""Select UniqueCarrier, sum(DepDelay) as total_delayed from p_delays_p where UniqueCarrier='US' group by 1""").show()
print("--- %s seconds ---" % (time.time() - start_time))

+-------------+-------------+
|UniqueCarrier|total_delayed|
+-------------+-------------+
|           US|    2077273.0|
+-------------+-------------+

--- 0.5311038494110107 seconds ---


In [95]:
# Same query as above against the parquet (non-partitioned) data.
start_time = time.time()
spark.sql("""Select UniqueCarrier, sum(DepDelay) as total_delayed from p_delays where UniqueCarrier='US' group by 1""").show()
print("--- %s seconds ---" % (time.time() - start_time))

+-------------+-------------+
|UniqueCarrier|total_delayed|
+-------------+-------------+
|           US|    2077273.0|
+-------------+-------------+

--- 0.5035645961761475 seconds ---


In [96]:
# Here is a query that doesn't use the partition key at all (against the parquet data)
start_time = time.time()
spark.sql("""Select distinct UniqueCarrier, TailNum from p_delays where TailNum='N712SW' """).show()
print("--- %s seconds ---" % (time.time() - start_time))

+-------------+-------+
|UniqueCarrier|TailNum|
+-------------+-------+
|           WN| N712SW|
+-------------+-------+

--- 0.536790132522583 seconds ---


In [97]:
# Here is a query that doesn't use the partition key at all (against the partitioned parquet data)
start_time = time.time()
spark.sql("""Select distinct UniqueCarrier, TailNum from p_delays_p where TailNum='N712SW' """).show()
print("--- %s seconds ---" % (time.time() - start_time))

+-------------+-------+
|UniqueCarrier|TailNum|
+-------------+-------+
|           WN| N712SW|
+-------------+-------+

--- 0.825035810470581 seconds ---


# ==========================================

### 3.04 Students Do: Writing to Parquet (15 min)

In this activity, you'll determine the differences in query execution time between a temporary view of an original Spark DataFrame, a parquet DataFrame, and a partitioned-parquet Spark DataFrame.

**Instructions:**
1. Using the starter code provided, start a Spark session, and then import the `DelayedFlights.csv` file to a Spark DataFrame.
2. Create a temp view of your Spark DataFrame, and create a SparkSQL query that determines the total distance and flight count for each unique “Origin” and “Dest” combination.
    * Record the execution time of the SparkSQL query using the `time.time()` method.
    * **Note:** You may want to run this query twice to eliminate initial load time.
3. Save your DataFrame in parquet format, and reload the dataset into a new Spark DataFrame.
4. Create a new temp view from the parquet Spark DataFrame, and rerun the SparkSQL query that determines the total distance and flight count for each unique “Origin” and “Dest” combination.
    * Record the execution time of the SparkSQL query using the `time.time()` method.
5. Now, save your original Spark DataFrame again, but this time, save it in parquet format, using Origin as the column to partition by.
6. Reload your partitioned parquet dataset into a new Spark DataFrame.
7. Create another new temp view from the partitioned parquet Spark DataFrame, and rerun the SparkSQL query that determines the total distance and flight count for each unique Origin and Dest combination.
    * Record the execution time of the SparkSQL query using the `time.time()` method.
8. Compare the performance of all three SparkSQL queries based upon the underlying data format.
     * Did the partitioned parquet format perform better or worse than the non-partitioned format? Why or why not?
9. Try creating a SparkSQL query that filters the partitioned data where `TailNum='N712SW'`. Show the results for the Origin and TailNum fields.
    * Record the execution time of the SparkSQL query using the `time.time()` method.
    * **Note:** This query tries to filter on your partitioned Origin column.
10. Create a new SparkSQL query that filters the partitioned data where `TailNum='N712SW'`. Show the results for the Dest and TailNum fields.
    * Record the execution time of the SparkSQL query using the `time.time()` method.
    * **Note:** This query does not filter on your partitioned Origin column.
11. Compare the performance of your filter queries from a partitioned versus non-partitioned dataset.
## Data Source:

[Airlines Delay](https://www.kaggle.com/datasets/giovamata/airlinedelaycauses)

---

In [59]:
# Import findspark and initialize. 
import findspark
findspark.init()

In [60]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder\
    .appName("SparkSQL")\
    .config("spark.sql.debug.maxToStringFields", 2000)\
    .config("spark.driver.memory", "2g")\
    .getOrCreate()

In [61]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/3/DelayedFlights.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("DelayedFlights.csv"), sep=",", header=True)
df.show()

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
| id|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|  0|2008|    1|         3|        4|   2003|      1955|   2211|      2225|       

In [62]:
# Create temp view named "delays"
df.createOrReplaceTempView('delays')

# Start the runtime
start_time = time.time()

# Using spark.sql write a query that gives you the total distance 
# and the count of every unique Origin, Dest combination.
spark.sql("""select Origin, Dest ,sum(Distance), count(*) from delays group by 1,2""").show()

# Print out the runtime.
print("--- %s seconds ---" % (time.time() - start_time))

+------+----+-------------+--------+
|Origin|Dest|sum(Distance)|count(1)|
+------+----+-------------+--------+
|   LAS| LIT|      72520.0|      56|
|   PHL| MCO|     986706.0|    1146|
|   SMF| BUR|     178284.0|     498|
|   SNA| PHX|     218010.0|     645|
|   MCI| IAH|     156249.0|     243|
|   BFL| SAN|       4515.0|      21|
|   ROC| CLE|      39935.0|     163|
|   SPI| ORD|      34104.0|     196|
|   ATL| GSP|      54621.0|     357|
|   SFO| PMD|      26860.0|      85|
|   LAX| OXR|       6958.0|     142|
|   SJC| ONT|     145521.0|     437|
|   AUS| ELP|     112992.0|     214|
|   ICT| IAH|      97560.0|     180|
|   CLE| MCI|     129084.0|     186|
|   CPR| DEN|      31510.0|     137|
|   CVG| BDL|     109065.0|     165|
|   TPA| CVG|      90441.0|     117|
|   JFK| ORD|     494320.0|     668|
|   SFO| TUS|      51068.0|      68|
+------+----+-------------+--------+
only showing top 20 rows

--- 4.374485015869141 seconds ---


In [63]:
# Write out the data in parquet format
df.write.parquet('parquet_delay_basic', mode='overwrite')

In [64]:
# Read in our new parquet formatted data
p_df=spark.read.parquet('parquet_delay_basic')

In [65]:
# Convert the DataFrame to a view.
p_df.createOrReplaceTempView('p_delays')

In [66]:
# Start the runtime
start_time = time.time()

# Run the same query here
spark.sql("""select Origin, Dest ,sum(Distance), count(*) from p_delays group by 1,2""").show()

# Print out the runtime
print("--- %s seconds ---" % (time.time() - start_time))

+------+----+-------------+--------+
|Origin|Dest|sum(Distance)|count(1)|
+------+----+-------------+--------+
|   ROC| CLE|      39935.0|     163|
|   MCI| IAH|     156249.0|     243|
|   SNA| PHX|     218010.0|     645|
|   SPI| ORD|      34104.0|     196|
|   LAX| OXR|       6958.0|     142|
|   SFO| PMD|      26860.0|      85|
|   ATL| GSP|      54621.0|     357|
|   ORD| PDX|     909497.0|     523|
|   PBI| DCA|      99412.0|     116|
|   PHL| MCO|     986706.0|    1146|
|   CLE| MCI|     129084.0|     186|
|   ICT| IAH|      97560.0|     180|
|   CPR| DEN|      31510.0|     137|
|   CVG| BDL|     109065.0|     165|
|   JFK| ORD|     494320.0|     668|
|   TPA| CVG|      90441.0|     117|
|   SFO| TUS|      51068.0|      68|
|   ORD| FWA|      89961.0|     573|
|   MYR| CLT|      18720.0|     120|
|   BOS| CVG|     145888.0|     194|
+------+----+-------------+--------+
only showing top 20 rows

--- 3.522430419921875 seconds ---


In [67]:
# Write out your parquet data, partitioning on the Origin column
df.write.partitionBy("Origin").mode("overwrite").parquet("delayed_partitioned")

In [68]:
# Read in our new parquet formatted data
p_df_p=spark.read.parquet('delayed_partitioned')

In [69]:
# Convert the dataframe to a view.
p_df_p.createOrReplaceTempView('p_delays_p')

In [70]:
# Start the runtime
start_time = time.time()

# Run your query against your partitioned data one more time.
spark.sql("""select Origin, Dest ,sum(Distance), count(*) from p_delays_p group by 1,2""").show()

# Print out the runtime
print("--- %s seconds ---" % (time.time() - start_time))

+------+----+-------------+--------+
|Origin|Dest|sum(Distance)|count(1)|
+------+----+-------------+--------+
|   ATL| GSP|      54621.0|     357|
|   ORD| PDX|     909497.0|     523|
|   ATL| HDN|      30820.0|      23|
|   DFW| PNS|     228916.0|     379|
|   DFW| SDF|     142202.0|     194|
|   ORD| FWA|      89961.0|     573|
|   DTW| MKE|      62118.0|     261|
|   DFW| HOU|     102258.0|     414|
|   ORD| BUF|     332992.0|     704|
|   ORD| CAE|     225108.0|     338|
|   DEN| ANC|     235788.0|      98|
|   DEN| RAP|      69230.0|     230|
|   DEN| ABQ|     225105.0|     645|
|   DTW| ANC|     104510.0|      35|
|   IAH| LIT|      72930.0|     195|
|   DFW| VPS|     222230.0|     355|
|   ATL| ALB|     238560.0|     280|
|   DEN| CLE|     150125.0|     125|
|   DEN| SDF|      90024.0|      88|
|   IAH| GSP|     110616.0|     132|
+------+----+-------------+--------+
only showing top 20 rows

--- 8.91665506362915 seconds ---


In [71]:
# Start  the runtime
start_time = time.time()

# Filter the data on something that selects your partition choice.
spark.sql("""Select distinct Origin, TailNum from p_delays where TailNum='N712SW' """).show()

# Print out the runtime.
print("--- %s seconds ---" % (time.time() - start_time))

+------+-------+
|Origin|TailNum|
+------+-------+
|   BWI| N712SW|
|   PVD| N712SW|
|   LAS| N712SW|
|   MDW| N712SW|
|   PIT| N712SW|
|   SNA| N712SW|
|   SMF| N712SW|
|   AUS| N712SW|
|   PHX| N712SW|
|   DAL| N712SW|
|   MSY| N712SW|
|   ABQ| N712SW|
|   BUR| N712SW|
|   MCO| N712SW|
|   TPA| N712SW|
|   BNA| N712SW|
|   JAN| N712SW|
|   SFO| N712SW|
|   SAN| N712SW|
|   SLC| N712SW|
+------+-------+
only showing top 20 rows

--- 0.638530969619751 seconds ---


In [72]:
# Start  the runtime
start_time = time.time()

# Filter the data on something that has nothing to do with your partition choice.
spark.sql("""Select distinct Dest, TailNum from p_delays_p where TailNum='N712SW' """).show()

# Print out the runtime.
print("--- %s seconds ---" % (time.time() - start_time))

+----+-------+
|Dest|TailNum|
+----+-------+
| RNO| N712SW|
| MDW| N712SW|
| STL| N712SW|
| MCO| N712SW|
| TPA| N712SW|
| BNA| N712SW|
| SNA| N712SW|
| OMA| N712SW|
| SFO| N712SW|
| GEG| N712SW|
| ONT| N712SW|
| TUL| N712SW|
| SAN| N712SW|
| OAK| N712SW|
| SEA| N712SW|
| LAS| N712SW|
| LIT| N712SW|
| LAX| N712SW|
| PDX| N712SW|
| MSY| N712SW|
+----+-------+
only showing top 20 rows

--- 3.794306755065918 seconds ---


# ==========================================

### BREAK (0:10)

# ==========================================

### 3.05 Instructor Do: Shuffling Performance Costs (15 min)

In this activity, we want to take a few minutes to caution on the performance costs of shuffling in Spark. Although shuffling cannot be avoided, we can make be more aware of the conditions that cause shuffling in order to make them more mindful when designing your queries.

Shuffling is when the data required for processing, transforming, joining, or aggregating resides on different partitions.

When shuffling, Spark pulls the data from memory to disk, then copies the data from one partition to another.

* Oftentimes these partitions exist on different nodes causing both disk traffic and network traffic.

Shuffling is impossible to avoid completely, but there are ways to keep shuffling at a minimum.

First, aggregations by partition (i.e., `group by <partitioned column>`) will not shuffle data.

Next, if a table is small enough, we can "broadcast" it to every node. Broadcasting a smaller table will also eliminate shuffling when joining with the broadcasted table.

    * Broadcasting copies the smaller table to every node allowing the join to happen on the node and not forcing the data to shuffle to a new node.

    * Broadcasting will happen on any table less than 10MB by default.

      * This can be changed by setting the `spark.sql.autoBroadcastJoinThreshold` variable to a different size.

      * Changing this setting to -1 will stop all automatic broadcasting.

We can also eliminate shuffling if we filter input data earlier in the program rather than later.

Lastly, when Spark shuffles, it creates new partitions based on specific settings. Therefore, if we reduce the number of shuffle partitions, we reduce the disk and network burden.

* Spark has a setting `spark.sql.shuffle.partitions` that by default is set to 200 partitions, which is too large for smaller workloads and should be reduced using the following code:

      - `spark.conf.set("spark.sql.shuffle.partitions", num)`


---
### 3.05.2 Instructor Do: Caching (25 min)

In [1]:
# Import findspark and initialize. 
import findspark
findspark.init()

In [2]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder\
    .appName("SparkSQL")\
    .config("spark.sql.debug.maxToStringFields", 2000)\
    .config("spark.driver.memory", "2g")\
    .getOrCreate()

In [3]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
flights_url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/3/DelayedFlights.csv"
spark.sparkContext.addFile(flights_url)
flights_df = spark.read.csv(SparkFiles.get("DelayedFlights.csv"), sep=",", header=True)

# Show the delayed flight data.
flights_df.show()

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
| id|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|  0|2008|    1|         3|        4|   2003|      1955|   2211|      2225|       

In [None]:
# Read in the airport codes from an S3 Bucket
airportCodes_url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/3/airportCodes.csv"
spark.sparkContext.addFile(airportCodes_url)

# The airport codes will be a lookup DataFrame
airportCodes_lookup = spark.read.csv(SparkFiles.get("airportCodes.csv"), sep=',', header=True)

#  Show the lookup table data.
airportCodes_lookup.show()

+--------------+--------------------+-----------+
|          City|             country|airportCode|
+--------------+--------------------+-----------+
|       Aalborg|             Denmark|        AAL|
|      Aalesund|              Norway|        AES|
|        Aarhus|             Denmark|        AAR|
|Abbotsford, BC|              Canada|        YXX|
|Abbotsford, BC|              Canada|        YXX|
|      Aberdeen|            Scotland|        ABZ|
|  Aberdeen, SD|                 USA|        ABR|
|       Abidjan|         Ivory Coast|        ABJ|
|   Abilene, TX|                 USA|        ABI|
|     Abu Dhabi|United Arab Emirates|        AUH|
|         Abuja|             Nigeria|        ABV|
|      Acapulco|              Mexico|        ACA|
|         Accra|               Ghana|        ACC|
|         Adana|              Turkey|        ADA|
|   Addis Ababa|            Ethiopia|        ADD|
|Adelaide, S.A.|           Australia|        ADL|
|          Aden|               Yemen|        ADE|


In [5]:
# Recall that the default shuffle partitions is 200.  
# We want to bring that down to a reasonable size for both our data and our Spark cluster
# A good rule of thumb is two times the number of cores. 
spark.conf.set("spark.sql.shuffle.partitions", 8)

In [6]:
# Create temporary views for each of our dataframes
flights_df.createOrReplaceTempView('delayed')

airportCodes_lookup.createOrReplaceTempView('lookup')

In [7]:
# This first query joins our airport code lookup data to our delayed fligts table
# By default Spark does a broadcast join when the Join table is < 10MB.  This is configurable
# but since our table is VERY small, it will auto-broadcast. 

start_time = time.time()

spark.sql("""
select a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
a.Dest,
c.City as Dest_City,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from delayed a 
  inner join lookup b
    on a.Origin=b.airportCode
  inner join lookup c
    on a.Dest=c.airportCode
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|    Origin_City|Dest|      Dest_City|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------

In [8]:
# Here we have added the hint to Broadcast the lookup table.  
start_time = time.time()

spark.sql("""
select /*+ BROADCAST(lookup) */ 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
a.Dest,
c.City as Dep_City,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup b
    on a.Origin=b.airportCode
  inner join lookup c
    on a.Dest=c.airportCode
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|    Origin_City|Dest|       Dep_City|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------

In [10]:
# In this query we are trying to give the cluster some work to do.  
# We are creating a a common table expression (CTE) (with allColumns) that joins the two tables together 
# and then an aggregation by averaging the delays.

start_time = time.time()

spark.sql("""
with allColumns
(select /*+ BROADCAST(lookup) */ 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
a.Dest,
c.City as Dep_City,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup b
    on a.Origin=b.airportCode
  inner join lookup c
    on a.Dest=c.airportCode
)
select Origin_City, avg(ArrDelay) avgDelay from allColumns group by 1
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+-----------------+------------------+
|      Origin_City|          avgDelay|
+-----------------+------------------+
|     Amarillo, TX|              63.3|
|    Allentown, PA| 50.94957983193277|
|      Atlanta, GA| 37.90491635370434|
|       Bangor, ME| 50.27329192546584|
|  Albuquerque, NM| 32.54347826086956|
|       Austin, TX|       37.19140625|
|  Baton Rouge, LA|          65.50625|
|    Baltimore, MD|  39.0767004341534|
|       Albany, NY| 39.14365671641791|
|       Boston, MA| 47.71041369472183|
|   Birmingham, AL| 43.17042606516291|
|    Asheville, NC|58.392405063291136|
|  Bloomington, IL| 46.89930555555556|
|      Augusta, GA| 55.65714285714286|
|       Albany, GA| 50.25352112676056|
|        Akron, OH|55.928196147110334|
|Atlantic City, NJ| 64.64285714285714|
|     Appleton, WI| 42.99324324324324|
|   Alexandria, LA|50.947712418300654|
|       Barrow, AK| 25.80851063829787|
+-----------------+------------------+
only showing top 20 rows

--- 3.955842971801758 seconds ---


In [11]:
# Next, we are use SparkSQL to cache our table
# Note: when we use SparkSQL to cache a table, the table is immediately cached (no lazy evaluation).
# When using PySpark it will not be cached until an action is run.
spark.sql("cache table delayed")

DataFrame[]

In [12]:
# This command checks that our table is cached
# It will return True if it is cached.
spark.catalog.isCached("delayed")

True

In [13]:
# Using the cached data, run the same query with the common table expression (CTE).
# The performance time should improve.

start_time = time.time()

spark.sql("""
with allColumns
(select /*+ BROADCAST(lookup) */ 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
a.Dest,
c.City as Dep_City,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup b
    on a.Origin=b.airportCode
  inner join lookup c
    on a.Dest=c.airportCode
)
select Origin_City, avg(ArrDelay) avgDelay from allColumns group by 1""").show()

print("--- %s seconds ---" % (time.time() - start_time))


+-----------------+------------------+
|      Origin_City|          avgDelay|
+-----------------+------------------+
|     Amarillo, TX|              63.3|
|    Allentown, PA| 50.94957983193277|
|      Atlanta, GA| 37.90491635370434|
|       Bangor, ME| 50.27329192546584|
|  Albuquerque, NM| 32.54347826086956|
|       Austin, TX|       37.19140625|
|  Baton Rouge, LA|          65.50625|
|    Baltimore, MD|  39.0767004341534|
|       Albany, NY| 39.14365671641791|
|       Boston, MA| 47.71041369472183|
|   Birmingham, AL| 43.17042606516291|
|    Asheville, NC|58.392405063291136|
|  Bloomington, IL| 46.89930555555556|
|      Augusta, GA| 55.65714285714286|
|       Albany, GA| 50.25352112676056|
|        Akron, OH|55.928196147110334|
|Atlantic City, NJ| 64.64285714285714|
|     Appleton, WI| 42.99324324324324|
|   Alexandria, LA|50.947712418300654|
|       Barrow, AK| 25.80851063829787|
+-----------------+------------------+
only showing top 20 rows

--- 1.8904163837432861 seconds ---


In [15]:
# Remember to uncache the table as soon as you are done.
spark.sql("uncache table delayed")

DataFrame[]

In [16]:
#Verify that the table is no longer cached
spark.catalog.isCached("delayed")

False

# ==========================================

### 3.06 Everyone Do: Caching Flight Delays (30 min)

In this activity, you'll import three datasets, practice creating partitions, filter the data, and create temporary views of the three datasets. Then, you'll write queries to join three datasets and determine the execution times of the queries. Finally, you'll cache your data and compare query execution times with the cached data.

**Instructions:**
1. Using the starter code provided, import packages, create a Spark session, and set the shuffle partitions to 4 or 8.
2. Import the datasets into your Spark session.
    * **NOTE:** The `DelayedFlights.csv` file is comma-delimited, while the `cities500.txt` file is tab-delimited. Be sure to declare the appropriate separator when importing each dataset.
3. Filter the airport codes Spark DataFrame to only contain those whose `country` equals `USA`.
4. Filter the 500 city latitude and longitude DataFrame to only contain the `name`,`latitude`,`longitude`,`admin1_code` fields and rows whose `country_code` equals `US`.
5. Create temporary views for all three Spark DataFrames. Name the temporary views as follows:
    * The delayed flight temporary view as, `delayed`
    * The latitude and longitude temporary view as, `lookup_geo`
    * The airport codes temporary view as, `lookup_city`
6. Modify the provided SparkSQL query that was used in the instructor demonstration to add `origin_latitude`, `origin_longitude`, `dest_latitude` and `dest_longitude` fields from the joined temporary views.
    * **NOTE**:  The two lookup tables do not have matching columns, so you must be mindful of what names are  used when joining both views.
7. Use the same SQL query as in step 5, but this time add a "Broadcast" hint for either the `lookup_geo` or `lookup_city` temporary views and run your query again.
    * Was there any change in the runtime of the query? Why or why not?
8. Use the same SQL query as in step 5, but this time use an aggregate function (i.e. `avg()`, `sum()`, etc.) on some of the fields from the `delayed` flights temporary view.
    * Add enough aggregate functions to increase the runtime of your SparkSQL query.
9. Use SparkSQL to cache the `delayed` temporary view, since it is the largest table.
10. Verify that you successfully cached your table using the `spark.catalog.isCached()` method.
11. Once again, rerun your aggregating SparkSQL query from step 9 and note the runtime.
    * Did caching decrease the runtime of your SparkSQL query? Why or why not?
12. Cache one of the lookup tables.
13. Run the query from step 8 again. You should continue to see improvement.
14. Uncache anything that you have previously cached.
15. Verify that nothing is cached using the `spark.catalog.isCached()` method.

## Data Source:
[Airlines Delay](https://www.kaggle.com/datasets/giovamata/airlinedelaycauses)

---

In [1]:
# Import findspark and initialize. 
import findspark
findspark.init()

In [2]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder\
    .appName("SparkSQL")\
    .config("spark.sql.debug.maxToStringFields", 2000)\
    .config("spark.driver.memory", "2g")\
    .getOrCreate()

# Set the partitions to 4 or 8. 
spark.conf.set("spark.sql.shuffle.partitions", 8)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/08 13:19:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/3/DelayedFlights.csv"
spark.sparkContext.addFile(url)
flights_df = spark.read.csv(SparkFiles.get("DelayedFlights.csv"), sep=",", header=True)

# Create a lookup table for the 500 cities. 
url_cities="https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/3/cities500.txt"
spark.sparkContext.addFile(url_cities)
df_lookup_geo = spark.read.csv(SparkFiles.get("cities500.txt"), sep="\t", header=True)

# Create a lookup table for the airport codes. 
url_airportCodes ="https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/3/airportCodes.csv"
spark.sparkContext.addFile(url_airportCodes)
df_lookup_codes = spark.read.csv(SparkFiles.get("airportCodes.csv"), sep=",", header=True)


                                                                                

In [4]:
# Look over the delayed flight data.
flights_df.show()

+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
| id|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+---+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|  0|2008|    1|         3|        4|   2003|      1955|   2211|      2225|       

In [5]:
# Look over the data of the 500 cities.
df_lookup_geo.show()

+---------+-------------------+-------------------+--------------------+--------+---------+-------------+------------+------------+----+-----------+-----------+-----------+-----------+----------+---------+----+--------------+-----------------+
|geonameid|               name|          asciiname|      alternatenames|latitude|longitude|feature_class|feature_code|country_code| cc2|admin1_code|admin2_code|admin3_code|admin4_code|population|elevation| dem|      timezone|modification_date|
+---------+-------------------+-------------------+--------------------+--------+---------+-------------+------------+------------+----+-----------+-----------+-----------+-----------+----------+---------+----+--------------+-----------------+
|  3038999|             Soldeu|             Soldeu|                null|42.57688|  1.66769|            P|         PPL|          AD|null|         02|       null|       null|       null|       602|     null|1832|Europe/Andorra|       2017-11-06|
|  3039154|          El 

In [6]:
# Look over the airport codes.
df_lookup_codes.show()

+--------------+--------------------+-----------+
|          City|             country|airportCode|
+--------------+--------------------+-----------+
|       Aalborg|             Denmark|        AAL|
|      Aalesund|              Norway|        AES|
|        Aarhus|             Denmark|        AAR|
|Abbotsford, BC|              Canada|        YXX|
|Abbotsford, BC|              Canada|        YXX|
|      Aberdeen|            Scotland|        ABZ|
|  Aberdeen, SD|                 USA|        ABR|
|       Abidjan|         Ivory Coast|        ABJ|
|   Abilene, TX|                 USA|        ABI|
|     Abu Dhabi|United Arab Emirates|        AUH|
|         Abuja|             Nigeria|        ABV|
|      Acapulco|              Mexico|        ACA|
|         Accra|               Ghana|        ACC|
|         Adana|              Turkey|        ADA|
|   Addis Ababa|            Ethiopia|        ADD|
|Adelaide, S.A.|           Australia|        ADL|
|          Aden|               Yemen|        ADE|


In [7]:
# Filter the airport codes to only contain rows whose `country` equals `USA`
df_lookup_city_name=df_lookup_codes.filter("country='USA'")
df_lookup_city_name.show(5)

+------------+-------+-----------+
|        City|country|airportCode|
+------------+-------+-----------+
|Aberdeen, SD|    USA|        ABR|
| Abilene, TX|    USA|        ABI|
|   Akron, OH|    USA|        CAK|
| Alamosa, CO|    USA|        ALS|
|  Albany, GA|    USA|        ABY|
+------------+-------+-----------+
only showing top 5 rows



In [8]:
# Filter the latitude and longitude DataFrame to only contain the 'name','latitude','longitude','admin1_code' fields and rows whose `country_code` equals `US`
df_lookup_geo=df_lookup_geo.select('name','latitude','longitude','admin1_code').filter("country_code='US'")
df_lookup_geo.show(5)

+--------------+--------+---------+-----------+
|          name|latitude|longitude|admin1_code|
+--------------+--------+---------+-----------+
|   Bay Minette|30.88296|-87.77305|         AL|
|          Edna|28.97859|-96.64609|         TX|
|Bayou La Batre|30.40352|-88.24852|         AL|
|     Henderson|32.15322|-94.79938|         TX|
|       Natalia|29.18968|-98.86253|         TX|
+--------------+--------+---------+-----------+
only showing top 5 rows



In [9]:
# Create temporary views for each of our DataFrames
flights_df.createOrReplaceTempView('delayed')
df_lookup_city_name.createOrReplaceTempView('lookup_city')
df_lookup_geo.createOrReplaceTempView('lookup_geo')


In [10]:
# First, join the airport codes lookup table to the delayed flight DataFrame 
# and add the city of origin and destination like we did in the instructor demonstration.  

start_time = time.time()

spark.sql("""
select a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
a.Dest,
c.City as Dest_City,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|    Origin_City|Dest|      Dest_City|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------

In [11]:
# Add the `origin_latitude` and `origin_longitude` fields by joining the `lookup_geo` view 
# to the `lookup_city` view and the delayed flight DataFrame.
# Note:  The two lookup views do not have matching columns, so we must be mindful what names are used when joining both views together.

start_time = time.time()

spark.sql("""
select a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name and trim(split(b.City,',')[1])=geo.admin1_code
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+---------------+----------------+----+---------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|    Origin_City|Origin_latitude|Origin_longitude|Dest|      Dest_City|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+---------------+----------------+----+---------------+--------+------+-------+---

In [12]:
# Finally, add the `dest_latitude` and `dest_longitude` fields by joining the `lookup_geo` view again as another alias, `geo_dest`.

start_time = time.time()

spark.sql("""
select a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+---------------+----------------+----+---------------+-------------+--------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|    Origin_City|Origin_latitude|Origin_longitude|Dest|      Dest_City|Dest_latitude|Dest_longitude|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+---------------+-------

In [13]:
# Run the same query with a Broadcast hint for either table
 
start_time = time.time()

spark.sql("""
select /*+ BROADCAST(lookup_geo) */ 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
""").show()
print("--- %s seconds ---" % (time.time() - start_time))

23/02/08 13:19:41 WARN HintErrorLogger: Count not find relation 'lookup_geo' specified in hint 'BROADCAST(lookup_geo)'.
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+---------------+---------------+----------------+----+---------------+-------------+--------------+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|    Origin_City|Origin_latitude|Origin_longitude|Dest|      Dest_City|Dest_latitude|Dest_longitude|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---

In [14]:
# Run a SQL query using a CTE here that does some aggregations on the new data.  
# The purpose of this SQL is to add some processing time.
# Note the runtime
start_time = time.time()

spark.sql("""
with allColumns
(select /*+ BROADCAST(lookup_geo) */ 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin_City, avg(CarrierDelay) avgCarrierDelay from allColumns group by 1
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

23/02/08 13:19:42 WARN HintErrorLogger: Count not find relation 'lookup_geo' specified in hint 'BROADCAST(lookup_geo)'.




+-----------------+------------------+
|      Origin_City|   avgCarrierDelay|
+-----------------+------------------+
|     Amarillo, TX|             29.75|
|    Allentown, PA| 45.73529411764706|
|    Asheville, NC| 37.00854700854701|
|  Bloomington, IL| 26.61576354679803|
|      Atlanta, GA| 26.44772910507928|
|       Bangor, ME| 38.35087719298246|
|      Augusta, GA|30.810526315789474|
|  Albuquerque, NM|14.058823529411764|
|       Austin, TX|24.712374581939798|
|  Baton Rouge, LA| 50.08943089430894|
|    Baltimore, MD| 13.77671451355662|
|       Albany, GA| 37.48543689320388|
|       Albany, NY|17.555900621118013|
|       Boston, MA|14.291417165668662|
|        Akron, OH| 20.93778801843318|
|Atlantic City, NJ|             106.3|
|   Birmingham, AL|17.817460317460316|
|     Appleton, WI|36.542857142857144|
|   Alexandria, LA| 37.52252252252252|
|    Anchorage, AK| 24.08955223880597|
+-----------------+------------------+
only showing top 20 rows

--- 2.2673869132995605 seconds ---


                                                                                

In [15]:
# Cache your largest temporary view
# Note: when we use SparkSQL to cache a table, the table is immediately cached (no lazy evaluation), when using Pyspark it will not be cached until an action is ran.
spark.sql("cache table delayed")

                                                                                

DataFrame[]

In [16]:
# Check that your table is cached 
spark.catalog.isCached("delayed")

True

In [17]:
# Run the same query again with the data cached. This should greatly improve the run time.  
# Keep in mind we are not working with particularly large data here so the improvements may not be dramatic.

start_time = time.time()

spark.sql("""
with allColumns
(select /*+ BROADCAST(lookup_geo) */ 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin_City, avg(ArrDelay) avgDelay from allColumns group by 1
""").show()

print("--- %s seconds ---" % (time.time() - start_time))


23/02/08 13:19:49 WARN HintErrorLogger: Count not find relation 'lookup_geo' specified in hint 'BROADCAST(lookup_geo)'.
+-----------------+------------------+
|      Origin_City|          avgDelay|
+-----------------+------------------+
|     Amarillo, TX|              63.3|
|    Allentown, PA| 50.94957983193277|
|    Asheville, NC|58.392405063291136|
|  Bloomington, IL| 46.89930555555556|
|      Atlanta, GA| 37.90491635370434|
|       Bangor, ME| 50.27329192546584|
|      Augusta, GA| 55.65714285714286|
|  Albuquerque, NM| 32.54347826086956|
|       Austin, TX|       37.19140625|
|  Baton Rouge, LA|          65.50625|
|    Baltimore, MD|  39.0767004341534|
|       Albany, GA| 50.25352112676056|
|       Albany, NY| 39.14365671641791|
|       Boston, MA| 47.71041369472183|
|        Akron, OH|55.928196147110334|
|Atlantic City, NJ| 64.64285714285714|
|   Birmingham, AL| 43.17042606516291|
|     Appleton, WI| 42.99324324324324|
|   Alexandria, LA|50.947712418300654|
|    Anchorage, AK| 37

In [18]:
# Cache the lookup table.
spark.sql("cache table lookup_geo")

DataFrame[]

In [19]:
# Run the same query again with the data cached. This should greatly improve the run time.

start_time = time.time()

spark.sql("""
with allColumns
(select /*+ BROADCAST(lookup_geo) */ 
a.Year,
a.Month,
a.DayofMonth,
a.DayOfWeek,
a.DepTime,
a.CRSDepTime,
a.ArrTime,
a.CRSArrTime,
a.UniqueCarrier,
a.FlightNum,
a.TailNum,
a.ActualElapsedTime,
a.CRSElapsedTime,
a.AirTime,
a.ArrDelay,
a.DepDelay,
a.Origin,
b.City as Origin_City,
geo.latitude as Origin_latitude,
geo.longitude as Origin_longitude,
a.Dest,
c.City as Dest_City,
geo_dest.latitude as Dest_latitude,
geo_dest.longitude as Dest_longitude,
a.Distance,
a.TaxiIn,
a.TaxiOut,
a.Cancelled,
a.CancellationCode,
a.Diverted,
a.CarrierDelay,
a.WeatherDelay,
a.NASDelay,
a.SecurityDelay,
a.LateAircraftDelay from  delayed a 
  inner join lookup_city b
    on a.Origin=b.airportCode
  inner join lookup_city c
    on a.Dest=c.airportCode
  inner join lookup_geo geo
on split(b.City,',')[0]=geo.name
     and trim(split(b.City,',')[1])=geo.admin1_code
  inner join lookup_geo geo_dest
    on c.City=concat(geo_dest.name,', ',geo_dest.admin1_code)
)
select Origin_City, avg(ArrDelay) avgDelay from allColumns group by 1
""").show()

print("--- %s seconds ---" % (time.time() - start_time))

23/02/08 13:19:51 WARN HintErrorLogger: Count not find relation 'lookup_geo' specified in hint 'BROADCAST(lookup_geo)'.
+-----------------+------------------+
|      Origin_City|          avgDelay|
+-----------------+------------------+
|     Amarillo, TX|              63.3|
|    Allentown, PA| 50.94957983193277|
|    Asheville, NC|58.392405063291136|
|  Bloomington, IL| 46.89930555555556|
|      Atlanta, GA| 37.90491635370434|
|       Bangor, ME| 50.27329192546584|
|      Augusta, GA| 55.65714285714286|
|  Albuquerque, NM| 32.54347826086956|
|       Austin, TX|       37.19140625|
|  Baton Rouge, LA|          65.50625|
|    Baltimore, MD|  39.0767004341534|
|       Albany, GA| 50.25352112676056|
|       Albany, NY| 39.14365671641791|
|       Boston, MA| 47.71041369472183|
|        Akron, OH|55.928196147110334|
|Atlantic City, NJ| 64.64285714285714|
|   Birmingham, AL| 43.17042606516291|
|     Appleton, WI| 42.99324324324324|
|   Alexandria, LA|50.947712418300654|
|    Anchorage, AK| 37

In [20]:
# Remember to uncache the table as soon as you are done.
spark.sql("uncache table delayed")
spark.sql("uncache table lookup_geo")

DataFrame[]

In [21]:
# Verify that the table is no longer cached
if spark.catalog.isCached("delayed") or spark.catalog.isCached("lookup_geo"):
  print("a table is till cached")
else:
  print("all clear")

all clear


# ==========================================

### Rating Class Objectives

* rate your understanding using 1-5 method in each objective

In [None]:
title = "22.3-Big-Data-Optimizing Spark: Storage, Partitioning, and Caching"
objectives = [
    "Compare the file storage types (other than tabular) that work the best for Spark",
    "Understand how partitioning affects Spark performance",
    "Explain the cause of shuffling and limit it when possible",
    "Identify when caching is the best option",
    "Explain how to broadcast a lookup table, and force it when it doesn't happen automatically",
    "Set the shuffle partitions to an appropriate value and demonstrate how to cache data",
]
rating = []
total = 0
for i in range(len(objectives)):
    rate = input(objectives[i]+"? ")
    total += int(rate)
    rating.append(objectives[i] + ". (" + rate + "/5)")
print("="*96)
print(f"Self Evaluation for: {title}")
print("-"*24)
for i in rating:
    print(i)
print("-"*64)
print("Average: " + str(total/len(objectives)))