# CIS 545 Homework 3: Spark SQL

#### **Worth 100 points**

Welcome to CIS 545 Homework 3! In this homework you will gain a mastery of using Spark SQL. By the end, you'll be a star (not that you aren't already one). Over the next few days you will be using an EMR cluster to use Spark to manipulate the  `basic_stats2.json` and `defensive.json` datasets, as well as `superbowl3.csv`.

The goal of the homework will be to create a dataset for a Time Series model.  Yes, we are training you for the portions of the class coming up in the not-so-distant future!!

## The Necessary Notes and Nags
Before we begin here are some important notes to keep in mind,


1.   **IMPORTANT!** I said it twice, it's really important. In this homework, we will be using AWS resources. You are given a quota ($100) to use for the entirety of the homework. There is a small chance you will use all this money, however it is important that at the end of every session, you **shut down your EMR cluster**.
2.   **Be sure you use Google Colab for this Homework** since we must connect to the EMR cluster and local Jupyter will have issues doing that. Using a Google Colab Notebook with an EMR cluster has two important abnormalities:
    * The first line of any cell in which you will use the spark session must be `%%spark`. Notice that all cells below have this.
    * You will, unfortunately, not be able to stop a cell while it is running. If you wish to do so, you will need to restart your cluster. See the Setup EMR Document for reference.
3.   You are **required** to use Spark SQL queries to handle the data in the assignment. Mastering SQL is more beneficial than being able to use Spark commands (functions) as it will show up in more areas of programming and data science/analytics than just Spark. Use the following [function list](https://spark.apache.org/docs/latest/api/sql/index.html#) to see all the SQL functions avaliable in Spark.
4.   Throughout the homework you will be manipulating Spark dataframes (sdfs). We do not specify any ordering on the final output. You are welcome to order your final tables in whatever way you deem fit. We will conduct our own ordering when we grade.
5. Based on the challenges you've faced in the previous homework, we are including information on the expected schema of your results.  Apache Spark is very fiddly but we hope this will help.
6. There are portions of this homework that are _very_ hard. We urge you start early to come to office hours and get help if you get stuck. But don't worry, I can see the future, and you all got this.

With that said, let's dive in.




In [None]:
# As always, run this, restart, rerun
!pip install pandas==1.1.5

Collecting pandas==1.1.5
  Downloading pandas-1.1.5-cp37-cp37m-manylinux1_x86_64.whl (9.5 MB)
[K     |████████████████████████████████| 9.5 MB 4.5 MB/s 
Installing collected packages: pandas
  Attempting uninstall: pandas
    Found existing installation: pandas 1.3.5
    Uninstalling pandas-1.3.5:
      Successfully uninstalled pandas-1.3.5
Successfully installed pandas-1.1.5


## Step 0: Set up EMR

Follow the [AWS Academy Getting Started](https://docs.google.com/document/d/1JPitLGaorjTbXjGsaoIHcLTu2cj8rjm5UNr9bSpZ72k/edit?usp=sharing) instructions.

Move on to Step 0.1 after you have completed all the steps in the document.

![ACME GIANT RUBBER BAND](https://pbs.twimg.com/media/DRqbJh7UMAE2z4o?format=jpg&name=4096x4096)


### Step 0.1: The Superfluous Setup

Run the following two cells. These will allow your colab notebook to connect to an use your EMR.

In [None]:
%%capture
!apt update
!apt install gcc python-dev libkrb5-dev
!pip install sparkmagic
!pip install penngrader


In [None]:
%%capture
%load_ext sparkmagic.magics 

### Step 0.2: The Sharp Spark

Now, connect your notebook to the EMR cluster you created. In the first cell, copy the link to the Master Public DNS specified in the setup document. You will need to add `http://` to the beginning of the address and the auth details to the end.

For example, if my DNS (directly from the AWS EMR console) is `ec2-3-15-237-211.us-east-2.compute.amazonaws.com` my address would be,

`http://ec2-3-15-237-211.us-east-2.compute.amazonaws.com -a cis545-livy -p password1 -t Basic_Access`

Insert this in the `# TODO # below`. For our example, the cell would read,

```
%spark add -s spark_session -l python -u http://ec2-3-15-237-211.us-east-2.compute.amazonaws.com -a cis545-livy -p password1 -t Basic_Access
```

In [None]:
# TODO: Copy the line above, enter your Master Public DNS with the proper formatting and host, and update the password
%spark add -s spark_session -l python -u http://ec2-52-70-85-52.compute-1.amazonaws.com -a cis545-livy -p Lwl970310 -t Basic_Access

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1647874721696_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


In [None]:
# If you ever need to restart, you may need to...
# %spark delete -s my_session
#OR just factory reset runtime under the runtime tab

### Step 0.3: Cluster Log

In order to keep track of clusters you have created and terminated as well as give us information about time spent on this assignment, please enter each date and time you created a cluster and the date and time you terminated the cluster. This will not impact your score.

EX: 

03/15 05:41pm - 03/15 9:20pm

03/16 02:56pm - 03/16 3:08pm

03/16 09:08pm - 03/16 10:40pm

03/17 12:00 pm - 03/17 02:04am

03/17 11:54 pm - 03/18 12:59am

...
    

TODO: Create cluster log

Enter your 8-digit Penn Key as an integer in the cell 
below. This will be used in the autograder.  **Please also update the cell below, with the same ID!**

In [None]:
from penngrader.grader import *
STUDENT_ID = 999999999999

In [None]:
grader = PennGrader(homework_id = 'CIS545_Spring_2022_HW3', student_id = STUDENT_ID)

PennGrader initialized with Student ID: 999999999999

Make sure this correct or we will not be able to store your grade


**Please make sure you also update this one, so the grader can similarly be updated on Spark/EMR!**

In [None]:
%%spark
from penngrader.grader import *
STUDENT_ID = 999999999999
grader = PennGrader(homework_id = 'CIS545_Spring_2022_HW3', student_id = STUDENT_ID)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PennGrader initialized with Student ID: 999999999999

Make sure this correct or we will not be able to store your grade

Run the above cells to setup the autograder in BOTH the Spark session as well as locally, make sure to have set your 8 digit Penn ID in the cell above. It will also import all the modules you need for the homework.

_Note_: Since we are using an EMR cluster we will only have access to some of modules that exist for Python, meaning things like `pandas`, `numpy`, etc. may not all be available. We have written the entire homework such that the solution does not require any of these.

## Step 1: Data Wrangling, Cleaning, and Shaping

It's football fever! We very recently witnessed Super Bowl LVI, in which Los Angeles Rams trumped the Cincinnati Bengals to clinch the title at their brand new home stadium. To continue the hype, we thought it would be exciting for you to work on NFL data and that's what we will be doing today.

<br>

![Winners](https://imagez.tmz.com/image/d6/4by3/2022/02/14/d62fc8c540c348c0aa75730b9c9a5b8d_md.jpg)

<br>

The data you will use is stored in an S3 bucket, a cloud storage service. You now need to download it onto the nodes of your [EMR cluster](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-what-is-emr.html). 

### Step 1.1: The Stupendous Schema

When loading data, Spark will try to infer its structure on its own. This process is faulty because Spark will sometimes infer the type incorrectly. Spark's ability to determine types is not reliable, thus you will need to define a schema for `basic_stats2.json` and `defensive3.json`.

A schema is a description of the structure of data. We have given you an example with `defensive3.json` and you will be defining an explicit schema for `basic_stats2.json`. 


In Spark, schemas are defined using a `StructType` object. This is a collection of data types, termed `StructField`'s, that specify the structure and variable type of each component of the dataset. For example, suppose we have the following simple JSON object,


```
{
 "student_name": "Data Wrangler",
 "GPA": 1.4,
 "courses": [
    {"department": "Computer and Information Science",
     "course_id": "CIS 545",
     "semester": "Fall 2021"},
    {"department": "Computer and Information Science",
     "course_id": "CIS 555",
     "semester": "Fall 2021"}
 ],
 "grad_year": 2022
 }
```

We would define its schema as follows,

```       
schema = StructType([
           StructField("student_name", StringType(), nullable=True),
           StructField("GPA", FloatType(), nullable=True),
           StructField("courses", ArrayType(
                StructType([
                  StructField("department", StringType(), nullable=True),
                  StructField("course_id", StringType(), nullable=True),
                  StructField("semester", StringType(), nullable=True)
                ])
           ), nullable=True),
           StructField("grad_year", IntegerType(), nullable=True)
         ])
```


Each `StructField` has the following structure: `(name, type, nullable)`. The `nullable` flag defines that the specified field may be empty. Your first task is to define the `schema` of `basic_stats2.json`. You can take a look at the schema for `defensive3.json` for reference. A smaller version of the JSON dataset can be found here. [defensive3.json](https://drive.google.com/file/d/1B75g3-GEdrXTiWn9069bDwvIia9xQIQR/view?usp=sharing), [basic_stats2.json](https://drive.google.com/file/d/1aF4fnH4JI_r8wWN27qFdyjiXWOs4mlg9/view?usp=sharing).




There is also no grading cell for this step.  But your JSON file won't load if it's wrong, so you have a way of testing.



In [None]:
%%spark

from pyspark.sql.types import *

#schema for defensive.json

schema_defensive = StructType([
                     
                     StructField("Player Id", StringType(), nullable = True),
                     StructField("Name", StringType(), nullable = True),
                     StructField("Position", StringType(), nullable = True),
                     StructField("Year", IntegerType(), nullable = True),
                     StructField("Team", StringType(), nullable = True),
                     StructField("Games Played", StringType(), nullable = True),
                     StructField("Total Tackles", StringType(), nullable = True),
                     StructField("Solo Tackles", StringType(), nullable = True),
                     StructField("Assisted Tackles", StringType(), nullable = True),
                     StructField("Sacks", StringType(), nullable = True),
                     StructField("Safties", StringType(), nullable = True),
                     StructField("Passes Defended", StringType(), nullable = True),
                     StructField("Ints", StringType(), nullable = True),
                     StructField("Ints for TDs", StringType(), nullable = True),
                     StructField("Int Yards", StringType(), nullable = True),
                     StructField("Yards Per Int", StringType(), nullable = True),
                     StructField("Longest Int Return", StringType(), nullable = True),            
                ])


#TODO: Create a schema for basic_stats2.json

schema_basic = StructType([
                     
                     StructField("Age", IntegerType(), nullable = True),
                     StructField("Birth Place", StringType(), nullable = True),
                     StructField("Birthday", StringType(), nullable = True),
                     StructField("College", StringType(), nullable = True),
                     StructField("Current Status", StringType(), nullable = True),
                     StructField("Current Team", StringType(), nullable = True),
                     StructField("Experience", StringType(), nullable = True),
                     StructField("Height (inches)", IntegerType(), nullable = True),
                     StructField("High School", StringType(), nullable = True),
                     StructField("High School Location", StringType(), nullable = True),
                     StructField("Name", StringType(), nullable = True),
                     StructField("Number", IntegerType(), nullable = True),
                     StructField("Player Id", StringType(), nullable = True),
                     StructField("Position", StringType(), nullable = True),
                     StructField("Weight (lbs)", IntegerType(), nullable = True),
                     StructField("Years Played", StringType(), nullable = True),          
                ])

                    

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 1.2: The Langorous Load

In the following cell, load the `basic_stats2.json` dataset and the `defensive3.json` from your S3 bucket into Spark dataframes (sdf) called `basic_stats_sdf` and `defensive_sdf` respectively. If you have constructed `schema1` and `schema2` correctly, `spark.read.json()` will read in the dataset. 

***You do not need to edit this cell***. If this doesn't work, go back to the prior cell and update your schema.

In [None]:
%%spark

defensive_stats_sdf = spark.read.json("s3://penn-cis545-files/defensive3.json", schema=schema_defensive, multiLine = True, primitivesAsString = True)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
# Let's print out the first few rows to see how the data looks like in tabular form
defensive_stats_sdf.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+--------------+--------+----+-----------------+------------+-------------+------------+----------------+-----+-------+---------------+----+------------+---------+-------------+------------------+
|         Player Id|          Name|Position|Year|             Team|Games Played|Total Tackles|Solo Tackles|Assisted Tackles|Sacks|Safties|Passes Defended|Ints|Ints for TDs|Int Yards|Yards Per Int|Longest Int Return|
+------------------+--------------+--------+----+-----------------+------------+-------------+------------+----------------+-----+-------+---------------+----+------------+---------+-------------+------------------+
|quinnjohnson/79593|Johnson, Quinn|        |2013| Tennessee Titans|           4|            0|          --|              --|   --|     --|             --|  --|          --|       --|            0|                --|
|quinnjohnson/79593|Johnson, Quinn|        |2012| Tennessee Titans|          16|            1|           1|               0|    0|     -

In [None]:
%%spark

#TODO: Create a spark dataframe for basic_stats2
basic_stats_sdf = spark.read.json("s3://penn-cis545-files/basic_stats2.json", schema=schema_basic, multiLine = True, primitivesAsString = True)
basic_stats_sdf.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------------+----------+----------------+--------------+-------------------+----------+---------------+--------------+--------------------+--------------+------+-------------------+--------+------------+------------+
| Age|      Birth Place|  Birthday|         College|Current Status|       Current Team|Experience|Height (inches)|   High School|High School Location|          Name|Number|          Player Id|Position|Weight (lbs)|Years Played|
+----+-----------------+----------+----------------+--------------+-------------------+----------+---------------+--------------+--------------------+--------------+------+-------------------+--------+------------+------------+
|null|Grand Rapids , MI| 5/23/1921|      Notre Dame|       Retired|                   | 3 Seasons|             71|              |                    |   Evans, Fred|  null|  fredevans/2513736|        |         185| 1946 - 1948|
|null|      Dayton , OH|12/21/1930|          Dayton|       Retired|                   | 

The cell below shows how to run SQL commands on Spark tables. Use this as a template for all your SQL queries in this notebook. 

***You do not need to edit this cell***.

In [None]:
%%spark

import pandas as pd

# Create SQL-accesible table
basic_stats_sdf.createOrReplaceTempView("basic_stats")

# Declare SQL query to be excecuted
query = '''SELECT * 
           FROM basic_stats ORDER BY `Player Id` LIMIT 10'''

# Save the output sdf of spark.sql() as answer_sdf and convert to Pandas
answer_basic_sdf = spark.sql(query)
answer_basic_sdf.show()

answer_basic_df = answer_basic_sdf.toPandas()
to_submit = pd.read_json(answer_basic_df.to_json())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+---------------+---------+--------------------+--------------+--------------------+----------+---------------+--------------------+--------------------+-----------------+------+--------------------+--------+------------+------------+
| Age|    Birth Place| Birthday|             College|Current Status|        Current Team|Experience|Height (inches)|         High School|High School Location|             Name|Number|           Player Id|Position|Weight (lbs)|Years Played|
+----+---------------+---------+--------------------+--------------+--------------------+----------+---------------+--------------------+--------------------+-----------------+------+--------------------+--------+------------+------------+
|  45|   Griffin , GA|10/8/1971|       Florida State|       Retired|                    | 2 Seasons|             73|                    |                    |   Ellison, 'Omar|  null|'omarellison/2500540|        |         200| 1995 - 1996|
|  22|Fort Worth , TX|3/21/1995|        

We will copy the `answer_sdf` to Colab to submit to PennGrader...

In [None]:
%%spark

## 4 points

## AUTOGRADER Step 1.2:   

grader.grade(test_case_id = 'first', answer = to_submit)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 4/4 points. You are a star!

Your submission has been successfully recorded in the gradebook.

Now, you will implement the exact same thing yourself for defensive_stats_sdf.

In [None]:
%%spark

# TODO: Create SQL-accesible table

defensive_stats_sdf.createOrReplaceTempView("defensive_stats")

#TODO: Declare SQL query to be excecuted

query = '''SELECT * 
           FROM defensive_stats ORDER BY `Player Id` LIMIT 10'''

#TODO: Save the output sdf of spark.sql() as answer_defensive_sdf and convert to Pandas

answer_defensive_sdf = spark.sql(query)
answer_defensive_sdf.show()

answer_defensive_df = answer_defensive_sdf.toPandas()
to_submit = pd.read_json(answer_defensive_df.to_json())


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------------+--------+----+--------------------+------------+-------------+------------+----------------+-----+-------+---------------+----+------------+---------+-------------+------------------+
|           Player Id|             Name|Position|Year|                Team|Games Played|Total Tackles|Solo Tackles|Assisted Tackles|Sacks|Safties|Passes Defended|Ints|Ints for TDs|Int Yards|Yards Per Int|Longest Int Return|
+--------------------+-----------------+--------+----+--------------------+------------+-------------+------------+----------------+-----+-------+---------------+----+------------+---------+-------------+------------------+
|a'shawnrobinson/2...|Robinson, A'Shawn|      DT|2016|       Detroit Lions|          16|           30|          22|               8|    2|     --|              7|  --|          --|       --|            0|                --|
|   a.j.bouye/2541162|      Bouye, A.J.|      CB|2016|      Houston Texans|          15|           63|  

In [None]:
%%spark

## 4 points

## AUTOGRADER Step 1.2:   

grader.grade(test_case_id = 'second', answer = to_submit)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 4/4 points. You are a star!

Your submission has been successfully recorded in the gradebook.

In the next cell, create `step_1_2_sdf` to fetch the data from the above basic_stats table, returning rows with schema `(Player Id, Name)`, in **lexicographical order** of `Name`.  Limit your sdf to 10 rows. Save your final answer to Colab to submit to PennGrader, as demonstrated above.

In [None]:
%%spark 

# TODO: create step_1_2_sdf
query = '''SELECT `Player Id`, Name
          From basic_stats
          Order by Name ASC
          Limit 10'''
step_1_2_sdf = spark.sql(query)
step_1_2_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------------+
|           Player Id|             Name|
+--------------------+-----------------+
|isaakoaaitui/2531731|   Aaitui, Isaako|
|   jonabbate/2495524|      Abbate, Jon|
|    joeabbey/2508144|       Abbey, Joe|
|  fayeabbott/2508147|     Abbott, Faye|
| vinceabbott/2508148|    Abbott, Vince|
|jaredabbrederis/2...|Abbrederis, Jared|
|dukeabbruzzi/2508149|   Abbruzzi, Duke|
| naderabdallah/89680|  Abdallah, Nader|
|mehdiabdesmad/255...|  Abdesmad, Mehdi|
|isaabdul-quddus/2...|Abdul-Quddus, Isa|
+--------------------+-----------------+

In [None]:
%%spark

# 5 points
grader.grade(test_case_id = 'lex_10_ids_last_names', answer = step_1_2_sdf.toPandas())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 5/5 points. You are a star!

Your submission has been successfully recorded in the gradebook.

### Step 1.3: Further Cleaning

Now, we can see that our data is a little ill-formatted, so let us do some cleaning for our two dataframes. Here is what you will do:

For Defensive:

1. Drop all the rows that have null values in Total Tackles and Games Played.
2. We only want to consider data post 2000 (inclusive).


Save this in `defensive_cleaned_sdf`.

Once you have done that, we want to select the basic stats only for those filtered players in `defensive_cleaned_sdf` and create a cumulated dataframe called `defensive_player_stats_sdf`.

In [None]:
%%spark

#TODO: Create defensive_cleaned_sdf
query = '''SELECT *
          From defensive_stats
          WHERE `Total Tackles` is not NULL and `Games Played` is not NULL and Year >= 2000'''
defensive_cleaned_sdf = spark.sql(query)
defensive_cleaned_sdf.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+--------------+--------+----+-----------------+------------+-------------+------------+----------------+-----+-------+---------------+----+------------+---------+-------------+------------------+
|         Player Id|          Name|Position|Year|             Team|Games Played|Total Tackles|Solo Tackles|Assisted Tackles|Sacks|Safties|Passes Defended|Ints|Ints for TDs|Int Yards|Yards Per Int|Longest Int Return|
+------------------+--------------+--------+----+-----------------+------------+-------------+------------+----------------+-----+-------+---------------+----+------------+---------+-------------+------------------+
|quinnjohnson/79593|Johnson, Quinn|        |2013| Tennessee Titans|           4|            0|          --|              --|   --|     --|             --|  --|          --|       --|            0|                --|
|quinnjohnson/79593|Johnson, Quinn|        |2012| Tennessee Titans|          16|            1|           1|               0|    0|     -

In [None]:
%%spark

# 4 points
grader.grade(test_case_id = 'defensive_cleaned', answer = defensive_cleaned_sdf.toPandas())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 4/4 points. You are a star!

Your submission has been successfully recorded in the gradebook.

In [None]:
%%spark

#TODO: Create defensive_player_stats_sdf
defensive_cleaned_sdf.createOrReplaceTempView("defensive_cleaned")
query = ''' SELECT *
            FROM basic_stats INNER JOIN defensive_cleaned using (`Player Id`)
            ORDER BY `Player Id`'''
defensive_player_stats_sdf = spark.sql(query)
defensive_player_stats_sdf.show(20)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---+---------------+---------+---------------+--------------+--------------------+----------+---------------+--------------------+--------------------+-----------------+------+--------+------------+------------+-----------------+--------+----+--------------------+------------+-------------+------------+----------------+-----+-------+---------------+----+------------+---------+-------------+------------------+
|           Player Id|Age|    Birth Place| Birthday|        College|Current Status|        Current Team|Experience|Height (inches)|         High School|High School Location|             Name|Number|Position|Weight (lbs)|Years Played|             Name|Position|Year|                Team|Games Played|Total Tackles|Solo Tackles|Assisted Tackles|Sacks|Safties|Passes Defended|Ints|Ints for TDs|Int Yards|Yards Per Int|Longest Int Return|
+--------------------+---+---------------+---------+---------------+--------------+--------------------+----------+---------------

In [None]:
%%spark

# 4 points
grader.grade(test_case_id = 'defensive_player_stats', answer = defensive_player_stats_sdf.toPandas())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 4/4 points. You are a star!

Your submission has been successfully recorded in the gradebook.

## Step 2: Analysis!

### Step 2.1: The Robust Ratio

Now, we are interested in seeing the player with the highest total tackles / games played ratio from each team for each year and their corresponding experience. This involves derieving data from both the dataframes we created just now.

Your task is to first find the player with the highest ratio for each team per year from the defensive_player_stats_df, and then join that with the basic_stats_df to find the experience of that player.

Create an sdf called `best_tackles_sdf` that contains the above information. Then, make sure to only retain data pertaining to Name, player Id, Year, tackle_ratio, and experience. 

Remember we will sort the dataframe when grading so you can sort the elements however you wish (you don't need to if you don't want to). 


In [None]:
%%spark

# TODO: Create best_tackles_sdf


# Compute the values for solo tackles / games



# Find the player with the highest ratio for each team

defensive_player_stats_sdf.createOrReplaceTempView('defensive_player_stats')
player_query = '''
          SELECT Experience, `Player Id` AS ID, Year, `Total Tackles`/`Games Played` AS Ratio, Team   
          FROM defensive_player_stats
          WHERE (Team, Year, `Total Tackles`/`Games Played`) in
          (
          SELECT Team, Year, max(`Total Tackles`/`Games Played`)
          FROM defensive_player_stats
          GROUP BY Team, Year)
          '''
      
best_tackles_sdf = spark.sql(player_query)



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
# Print out the first few rows to see if the dataframe looks reasonable
best_tackles_sdf.show(4)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+----+-----------------+----------------+
|Experience|                  ID|Year|            Ratio|            Team|
+----------+--------------------+----+-----------------+----------------+
| 8 Seasons|adamarchuleta/250...|2002|             7.25|  St. Louis Rams|
| 8 Seasons|adamarchuleta/250...|2003|6.076923076923077|  St. Louis Rams|
|7th season|  akeemayers/2495491|2012|              6.5|Tennessee Titans|
|5th season|alecogletree/2540143|2014|           6.9375|  St. Louis Rams|
+----------+--------------------+----+-----------------+----------------+
only showing top 4 rows

In [None]:
%%spark

# 10 points
grader.grade(test_case_id = 'tackle_ratio', answer = best_tackles_sdf.toPandas())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 10/10 points. You are a star!

Your submission has been successfully recorded in the gradebook.


### Step 2.2: The Tremendous Tackles

Your task is to answer the following questions: 

1. Which team has the player which the best ratio? Save their ID to the variable `highest_tackle_ratio_person`.
2. Which team has the total highest ratio? Save this to the variable `highest_tackle_ratio_team`
3. For each player who has played for more than a year, what is the percentage change in their ratio from the first year they played to the last year they played? Save this df in `percentage_change_sdf`, and make sure to order by ID. This DF should just contain two columns- percentage change and ID, as shown below.

```
+-------------------+--------------------+
|  Percentage Change|                  ID|
+-------------------+--------------------+
|                   |                    |
+-------------------+--------------------+
```

In [None]:
%%spark

#TODO: highest_tackle_ratio_person

#TODO: highest_tackle_ratio_team

best_tackles_sdf.createOrReplaceTempView('best_tackles')
query_person = '''SELECT ID
          FROM best_tackles
          WHERE Ratio = (
            SELECT MAX(Ratio)
            FROM best_tackles)'''
highest_tackle_ratio_person_sdf = spark.sql(query_person)
highest_tackle_ratio_person_sdf.show()
highest_tackle_ratio_person_df = highest_tackle_ratio_person_sdf.toPandas()
highest_tackle_ratio_person = highest_tackle_ratio_person_df.iloc[0,0]


query = '''
          SELECT Team
          FROM best_tackles
          GROUP BY Team
          HAVING MAX(Ratio) = 
          (SELECT MAX(Ratio)
          FROM best_tackles
          )
          '''

highest_tackle_ratio_team_sdf = spark.sql(query)
highest_tackle_ratio_team_sdf.show()
highest_tackle_ratio_team_df = highest_tackle_ratio_team_sdf.toPandas()
highest_tackle_ratio_team = highest_tackle_ratio_team_df.iloc[0,0]          

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+
|              ID|
+----------------+
|raylewis/2501750|
+----------------+

+----------------+
|            Team|
+----------------+
|Baltimore Ravens|
+----------------+

In [None]:
%%spark

# 2 points
grader.grade(test_case_id = 'best_player', answer = highest_tackle_ratio_person)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 2/2 points. You are a star!

Your submission has been successfully recorded in the gradebook.

In [None]:
%%spark

# 2 points
grader.grade(test_case_id = 'best_team', answer = highest_tackle_ratio_team)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 2/2 points. You are a star!

Your submission has been successfully recorded in the gradebook.

In [None]:
%%spark

#TODO Create % change table

query = ''' 
            WITH Last AS(
            SELECT MAX(Year) AS Year, MAX(`Player Id`) AS `Player Id`
            FROM defensive_player_stats
            GROUP BY `Player Id`
            ),

            First AS(
            SELECT MIN(Year) AS Year, MIN(`Player Id`) AS `Player Id`
            FROM defensive_player_stats
            GROUP BY `Player Id`
            ),

            Player_team_years AS(
            SELECT Last.`Player Id`, Last.Year AS Last_year, First.Year AS First_Year
            FROM Last inner join First on Last.`Player Id` = First.`Player Id`),

            Ratio_base AS(
            SELECT Year, Team, `Player Id`, `Total Tackles`/ `Games Played` AS Ratio
            FROM defensive_player_stats
            )

            SELECT (R.Ratio - RR.Ratio)/RR.Ratio*100 AS `Percentage Change`, P.`Player Id` AS ID
            FROM Player_team_years P JOIN Ratio_base R on (P.`Player Id` = R.`Player Id`) and (P.Last_year = R.Year) 
            JOIN Ratio_base RR on (P.`Player Id` = RR.`Player Id`) and (P.First_Year = RR.Year)
            WHERE RR.Ratio is not NULL and RR.Ratio <> 0
            ORDER BY ID         
            '''
percentage_change_sdf = spark.sql(query)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark

# Let's visualize the results
percentage_change_sdf.show(20)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+
|  Percentage Change|                  ID|
+-------------------+--------------------+
|                0.0|a'shawnrobinson/2...|
| 1160.0000000000002|   a.j.bouye/2541162|
|             -100.0|     a.j.edds/496921|
|              -50.0|     a.j.edds/496921|
|             -100.0|     a.j.edds/496921|
|             -81.25|     a.j.edds/496921|
|             -100.0|   a.j.green/2495450|
|                0.0|a.j.jefferson/494275|
| 57.460317460317476|   a.j.klein/2539982|
|                0.0| a.j.schable/2506734|
|                0.0| a.j.tarpley/2553605|
| -51.64835164835166|aaronbeasley/2499587|
|                0.0|aaronburbridge/25...|
| -51.11111111111111| aaroncolvin/2543501|
|               null| aarondobson/2539256|
| -2.083333333333333| aarondonald/2543485|
| -55.55555555555556| aaronelling/2504975|
|  672.6190476190476|aaronfrancisco/25...|
|-21.212121212121207|  aaronglenn/2500813|
|             -100.0|aaronkampman/2505138|
+----------

In [None]:
%%spark

## AUTOGRADER Step 2.2: ##
percentage_change_sdf.createOrReplaceTempView("test_2_2")
test_2_2_sdf = spark.sql("SELECT * FROM test_2_2 ORDER BY ID")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark

# 12 points
grader.grade(test_case_id = 'percentage_change', answer = test_2_2_sdf.toPandas())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 12/12 points. You are a star!

Your submission has been successfully recorded in the gradebook.

## 2.3 The Blessed Break

That last section was hard. And it's gonna get harder. Take a break. Sit back and relax for a minute. Listen to some music. Here's a [suggestion](https://www.youtube.com/watch?v=A3yCcXgbKrE).

In the cell below fill out the boolean variable `whatd_you_think` with `True` if you liked it or `False` if you didn't. You will be graded on your response.

In [None]:
whatd_you_think = True

In [None]:
# 1 point
grader.grade(test_case_id = 'tunes', answer = whatd_you_think)

Correct! You earned 1/1 points. You are a star!

Your submission has been successfully recorded in the gradebook.


## Step 3: Formatting the Time Series Data


Our overarching goal is to create an annual time series model that will use the highest tackle ratio per year for each team to predict the highest tackle ratio for that team over the next season. The model is trained on a set of observations. Each observation contains the team name, and the highest tackle ratios for each year from 2001 to 2016. 

```
+----+-----+----------+---------+----------+---
|Team |2001 |...  |2008 |2009 |   ...   |2016 |
+----+-----+----------+---------+----------+---
|TEN  |...  |...  |...  |...  |   ...   |...  |
|SF   |...  |...  |...  |...  |   ...   |...  |
|...  |...  |...  |...  |...  |   ...   |...  |
+----+-----+----------+---------+----------+---
```



### Step 3.1: The Annual Arrangement

Your first task is to create the time series table, i.e. the `2001` through `2016` columns. This will involve reshaping `best_tackles_sdf`. Currently, `best_tackles_sdf` has columns `ID`, `Name`, `Team`, `Ratio` and `Year`. We want to group the rows together based on common `Team` and create new columns for the Ratio corresponding to each year.

Create an sdf called `raw_tackles_time_series_sdf` that has for a single team, the highest tackle ratio in 2001 through 2016. It is ok if for a given team you don't have a given year. However, ensure that for a given team, each year column has an entry, i.e. if no Ratio value is present for the year, the entry for that year column should be `0`. The format of the sdf is shown below: 
```
+----+-----+----------+---------+----------+----------+-------
|Team                |2001 |...  |2008 |2009 |   ...   |2016 |
+----+-----+----------+---------+----------+----------+-------
|Tennessee Titans    |...  |...  |...  |...  |   ...   |...  |
|San Francisco 49ers |...  |...  |...  |...  |   ...   |...  |
|...                 |...  |...  |...  |...  |   ...   |...  |
+----+-----+----------+---------+----------+----------+-------
```
_Hint_: This is a **fiddly and somewhat difficult** question. The tricky part is creating the additional columns of annual ratios, specifically when there are missing years. 

We suggest you look into `CASE` and `WHEN` statements in the [function list](https://spark.apache.org/docs/2.3.0/api/sql/index.html), and use these to **either** fill in a number for column (if appropriate) or put in a 0.

In [None]:
%%spark

# TODO: Create [raw_tackles_time_series_sdf]

# CASE() statements are SQL's equivalent of if else statements. WHEN a CASE is
# true THEN we define a function. ELSE we do another function and then END the
# statement.

# The query is a GROUP BY. We group data based on the same team, as in the
# previous step. We then do a CASE statement. This will seperate out the sets of
# data corresponding to the same year using the 'Year' volumn in the WHEN clause.
# If we have a piece of data, it will be the Ratio value for a given team in 
# a given year and we will save it with a corresponding column name. If there is
# no piece of data here, as per the question, we need to add a 0. This is the
# ELSE clause. Lastly, we do a SUM() to find the final Ratio.



# Define and save raw_tackles_time_series_sdf

query = '''SELECT Team,
          MAX(CASE WHEN Year = 2001 THEN Ratio ELSE 0 END) AS `2001`,
          MAX(CASE WHEN Year = 2002 THEN Ratio ELSE 0 END) AS `2002`,
          MAX(CASE WHEN Year = 2003 THEN Ratio ELSE 0 END) AS `2003`,
          MAX(CASE WHEN Year = 2004 THEN Ratio ELSE 0 END) AS `2004`,
          MAX(CASE WHEN Year = 2005 THEN Ratio ELSE 0 END) AS `2005`,
          MAX(CASE WHEN Year = 2006 THEN Ratio ELSE 0 END) AS `2006`,
          MAX(CASE WHEN Year = 2007 THEN Ratio ELSE 0 END) AS `2007`,
          MAX(CASE WHEN Year = 2008 THEN Ratio ELSE 0 END) AS `2008`,
          MAX(CASE WHEN Year = 2009 THEN Ratio ELSE 0 END) AS `2009`,
          MAX(CASE WHEN Year = 2010 THEN Ratio ELSE 0 END) AS `2010`,
          MAX(CASE WHEN Year = 2011 THEN Ratio ELSE 0 END) AS `2011`,
          MAX(CASE WHEN Year = 2012 THEN Ratio ELSE 0 END) AS `2012`,
          MAX(CASE WHEN Year = 2013 THEN Ratio ELSE 0 END) AS `2013`,
          MAX(CASE WHEN Year = 2014 THEN Ratio ELSE 0 END) AS `2014`,
          MAX(CASE WHEN Year = 2015 THEN Ratio ELSE 0 END) AS `2015`,
          MAX(CASE WHEN Year = 2016 THEN Ratio ELSE 0 END) AS `2016`
          FROM best_tackles
          GROUP BY Team'''

raw_tackles_time_series_sdf = spark.sql(query)
#raw_tackles_time_series_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark

## AUTOGRADER Step 3.1: ##

raw_tackles_time_series_sdf.createOrReplaceTempView("test_3_1")
test_3_1_sdf = spark.sql("SELECT * FROM test_3_1 ORDER BY Team DESC LIMIT 10")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark

# 14 points
grader.grade(test_case_id = 'arrangement', answer = test_3_1_sdf.toPandas())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 14/14 points. You are a star!

Your submission has been successfully recorded in the gradebook.

### Step 3.2 The Clairvoyant Cleaning

We now want to format the `Team` names to an appropriate abbreviation. The formatting will consist of 2 steps.

#### Step 3.2.1 The Ubiquitous UDF

The teams in our dataset are defined by their full names. We think that it would be *cleaner* to have names represented by using abbreviations. Often times when using Spark, there may not be a built-in SQL function that can do the operation we desired. Instead, we can create one on our own with a user-defined function (udf).

A udf is defined as a normal Python function and then registered to be used as a Spark SQL function. Your task is to create a udf, `NAME_TO_ABBV()` that will convert the `Team` field in `raw_tackles_time_series_sdf` to their appropriate abbreviations. This will be done using the provided `name_to_abbv_dict` dictionary. We are only interested in the teams in that dictionary.

Fill out the function `name_to_abbv()` below. Then use `spark.udf.register()` to register it as a SQL function. The command is provided. ***You do not need to edit it***. Note, we have defined the udf as returning `StringType()`. Ensure that your function returns this. You must also deal with any potential `null` cases.

In [None]:
%%spark

# Dictionary linking stock ticker symbols to their names
name_to_abbv_dict = {'Tennessee Titans': 'TEN',
                     'Denver Broncos': 'DEN',
                     'Green Bay Packers': 'GBP',
                     'Pittsburgh Steelers': 'PIT',
                     'Chicago Bears': 'CHI',
                     'Philadelphia Eagles': 'PHI',
                     'Indianapolis Colts': 'IND',
                     'Arizona Cardinals': 'ARI',
                     'Seattle Seahawks': 'SEA',
                     'Baltimore Ravens': 'BAR',
                     'Carolina Panthers': 'CAR',
                     'Kansas City Chiefs': 'KAN',
                     'Oakland Raiders': 'OAK',
                     'St. Louis Rams': 'SLR',
                     'Atlanta Falcons': 'ATL',
                     'New Orleans Saints': 'NOS',
                     'San Francisco 49ers': 'SFF',
                     'New England Patriots': 'NEP',
                     'Buffalo Bills': 'BUF',
                     'Los Angeles Rams': 'LAR',
                     'Dallas Cowboys': 'DAL',
                     'Minnesota Vikings': 'MIN',
                     'Detroit Lions': 'DET',
                     'Washington Redskins': 'WAS',
                     'Jacksonville Jaguars': 'JAC',
                     'New York Giants': 'NYG',
                     'Tampa Bay Buccaneers': 'TBB',
                     'Cleveland Browns': 'CLE',
                     'Houston Texans': 'HOT',
                     'Los Angeles Raiders': 'LOS',
                     'Miami Dolphins': 'MIA',
                     'Houston Oilers': 'HOU',
                     'Cincinnati Bengals': 'CIN',
                     'San Diego Chargers': 'SDC',
                     'Tennessee Oilers': 'TEO',
                     'New York Jets': 'NYJ',
                     'Phoenix Cardinals': 'PHO',
                     'St. Louis Cardinals': 'SLC',
                     'Baltimore Colts': 'BAC'}

# TODO: Fill out [name_to_abbv()] and register it as a udf.
# Fill out name_to_abbv()

# In UDFs we have to cover all possible output cases, or else the function will
# crash. Specifically, this means we need to handle the case when "name" is
# not in "name_to_abbv_dict". We use a try and except statement to return null
# for this case.

def name_to_abbv(name):
  try:
    name = name_to_abbv_dict[name]
  except:
    name = None
  return name
  

# Register udf as a SQL function. DO NOT EDIT
spark.udf.register("NAME_TO_ABBV", name_to_abbv, StringType())


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<function name_to_abbv at 0x7f4ad1b864d0>

Submit a tuple to the autograder for the ticker value of San Franscisco 49ers and Washington Commanders. If the name value in the table, set it to a string equal to "None"

In [None]:
%%spark

## AUTOGRADER Step 3.2.1: ##

print((str(name_to_abbv("San Francisco 49ers")),str(name_to_abbv("Washington Commanders"))))
to_submit = ((str(name_to_abbv("San Francisco 49ers")),str(name_to_abbv("Washington Commanders"))))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

('SFF', 'None')

In [None]:
%%spark

# 2 points
grader.grade(test_case_id = 'clairvoyant', answer = to_submit)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 2/2 points. You are a star!

Your submission has been successfully recorded in the gradebook.

#### Step 3.2.2: The Clean Conversion

With our new `NAME_TO_ABBV()` function we will begin to wrangle `raw_tackles_time_series_sdf`.

Create an sdf called `tackles_time_series_sdf` as follows. Convert all the team names in `raw_tackles_time_series_sdf` to the appropriate abbreviations and save it as `Team`. Drop any team abbreviations that do not appear in `name_to_abbv_dict`. Using .dropna() is acceptable instead of IS NOT NULL. The final df should be in the format shown below:

```
+----+-----+----------+---------+----------+----------+--
|Team |2000 |2001 |   ...   |2008 |2009 |   ...   |2016 |
+----+-----+----------+---------+----------+----------+--
|TEN  |...  |...  |   ...   |...  |...  |   ...   |...  |
|SFF  |...  |...  |   ...   |...  |...  |   ...   |...  |
|...  |...  |...  |   ...   |...  |...  |   ...   |...  |
+----+-----+----------+---------+----------+----------+--
```

In [None]:
%%spark

# Format the "Team" column using our UDF, NAME_TO_ABBV.
# TODO: Create [tackles_time_series_sdf]



# Define and save tackles_time_series_sdf
raw_tackles_time_series_sdf.createOrReplaceTempView('raw_tackles')
query = '''
        SELECT NAME_TO_ABBV(Team) AS Team, `2001`, `2002`, `2003`, `2004`, `2005`, `2006`, `2007`, `2008`, `2009`, `2010`, `2011`, `2012`, `2013`, `2014`, `2015`, `2016`
        FROM raw_tackles
        '''
tackles_time_series_sdf = spark.sql(query)
tackles_time_series_sdf = tackles_time_series_sdf.dropna()
tackles_time_series_sdf.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+------+------------------+-----------------+-----------------+-----------------+-----------------+
|Team|              2001|              2002|              2003|              2004|              2005|              2006|             2007|              2008|             2009|             2010|  2011|              2012|             2013|             2014|             2015|             2016|
+----+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+------+------------------+-----------------+-----------------+-----------------+-----------------+
| NEP|            5.6875| 7.642857142857143|             7.875|             8.625|              6.75|6.2727272727272725|5.66

In [None]:
%%spark

## AUTOGRADER Step 3.2.2: ##

tackles_time_series_sdf.createOrReplaceTempView("test_3_2_2")
test_3_2_2_sdf = spark.sql("SELECT * FROM test_3_2_2 ORDER BY Team LIMIT 10")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark

# 10 points
grader.grade(test_case_id = 'conversion', answer = test_3_2_2_sdf.toPandas())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Correct! You earned 10/10 points. You are a star!

Your submission has been successfully recorded in the gradebook.

And there you go! Now we have formatted a simple time series dataset for the highest tackle ratios of all teams.

## Step 4: The Super... Bacon?

Let's introduce a fun little concept called the Bacon Number! The Bacon number of an actor or actress is the number of degrees of separation they have from actor Kevin Bacon, as defined by the game known as Six Degrees of Kevin Bacon. For example, Kevin Bacon's Bacon number is 0. If an actor works in a movie with Kevin Bacon, the actor's Bacon number is 1. If an actor works with an actor who worked with Kevin Bacon in a movie, the first actor's Bacon number is 2, and so forth.

How do we implement this for NFL though? Let's use a dataset specifically based on the history of the Super Bowl, and find a number we're calling "The Super Bacon." We define this number as follows: if team A has beaten team B, and team B has beaten team C, then the super bacon of C with respect to A will be 2.

Now to calculate this number, we'll use the concepts of graphs and BFS!


*For this section, we will be using Pyspark instead of Spark on the EMR cluster. The syntax remains the same, except you don't need to use the `%%spark` call at the top of each code cell.
Run the setup cells for Pyspark below.*

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 30.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=8dfc3f8962049c7da4d9205f92e885f43e86f737c73bf6a54c504640c0f685c2
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
from pyspark.sql import SparkSession

appName = "PySpark"

# Create Spark session
spark = SparkSession.builder.appName(appName).getOrCreate()


### 4.1 “Traversing” a Graph

Before we jump to finding the Super Bacons for the teams, let's review how BFS works.

In [None]:
import pandas as pd
from IPython.display import Image as I

bfsgif =\
'https://upload.wikimedia.org/wikipedia/commons/5/5d/Breadth-First-S'+\
'earch-Algorithm.gif'
dfsgif=\
'https://upload.wikimedia.org/wikipedia/commons/7/7f/Depth-First-Search.gif'



#### 4.1.1 Intro to Distributed Breadth-First Search


To start off, we will be implementing a graph traversal algorithm known as Breadth First Search. It works in a way that's equivalent to how a stain spreads on a white t-shirt. Take a look at the graph below:

<p align = "center">
<img src = "https://imgur.com/WU3AUwg.png" width= "600" align ="center"/>

* Consider starting BFS from point A (green). This is considered the starting frontier/singular origin node.
* The first round of BFS would involve finding all the nodes directly reachable from A, namely B-F (blue circles). These blue nodes make up the next frontier at depth 1 away from our starting node A.
* The second round would then be identifying the red nodes which are the neighbors of the blue nodes. Now, the red nodes all belong to a frontier 2 depth away from A. Note that node A is also a neighbor of a blue node. However, since it has already been visited, it does not get added to this frontier.

This process continues until all the nodes in the graph have been visited.
If you would like to learn more about BFS, we highly suggest looking [here](https://www.tutorialspoint.com/data_structures_algorithms/breadth_first_traversal.html).


We will now be implementing **spark_bfs(G, N, d)**, our spark flavor of BFS that takes a graph **G**, a set of origin nodes **N**, and a max depth **d**.

In order to write a successful BFS function, you are going to need to figure out 
1. how to keep track of nodes that we have visited
2. how to properly find all the nodes at the next depth
3. how to avoid cycles and ensure that we do not constantly loop through the same edges (take a look at J-K in the graph)


#### [OPTIONAL/ADDITIONAL] BFS vs. DFS Animation 
Run the code cells below to understand the difference between depth and breadth first search! (Source: Wikimedia Commons)

In [None]:
# NOT GRADED
print('BFS:')
I(url=bfsgif)

BFS:


In [None]:
#NOT GRADED
print('DFS:')
I(url=dfsgif)

DFS:


#### 4.1.2 Implement One Traversal

To break down this process, let's think about how we would implement a single traversal of the graph. That is given the green node in the graph above, how are we going to get the blue nodes?


Consider the simple graph below **which is different from the graph in the image above**:

In [None]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

simple = [('A', 'B'),
         ('A', 'C'),
         ('A', 'D'),
         ('C', 'F'),
         ('F', 'A'),
         ('B', 'G'),
         ('G', 'H'),
         ('D', 'E')]

simple_dict = {'from_node': ['A', 'A', 'A', 'C', 'F', 'B', 'G', 'D'],
       'to_node': ['B', 'C', 'D', 'F', 'A', 'G', 'H', 'E']}

simple_graph_df = pd.DataFrame.from_dict(simple_dict)
simple_graph_sdf = spark.createDataFrame(simple_graph_df)
simple_graph_sdf.show()

+---------+-------+
|from_node|to_node|
+---------+-------+
|        A|      B|
|        A|      C|
|        A|      D|
|        C|      F|
|        F|      A|
|        B|      G|
|        G|      H|
|        D|      E|
+---------+-------+



As you can see, each row of this dataframe represents an edge between two nodes Here, we are looking at a **directed graph**, which means that A-->B  does not represent the same edge as B-->A.

Let's define our starting node as follows:

In [None]:
smallOrig = [{'node': 'A'}]

Then, bfs with graph G, starting from smallOrig to depth 1, or  **spark_bfs(G, smallOrig, 1)** would output as follows:

In [None]:
simple_1_round_dict = {'node': ['B', 'D', 'C', 'A'],
       'distance': [1, 1, 1, 0]}
simple_1_round_bfs_df = pd.DataFrame.from_dict(simple_1_round_dict)
simple_1_round_bfs_sdf = spark.createDataFrame(simple_1_round_bfs_df)
simple_1_round_bfs_sdf.show()

+----+--------+
|node|distance|
+----+--------+
|   B|       1|
|   D|       1|
|   C|       1|
|   A|       0|
+----+--------+



As you can see, this dataframe logs each node with its corresponding distance away from A. Moreover, we also know that these nodes are **visited**. 

Hopefully, you can see how we can use our original graph and this new information to find the nodes at depth two. 

This is exactly what we will try to accomplish with **spark_bfs_1_round(visited_nodes)** which will ultimately be the inner function of **spark_bfs** that we use to perform exactly one traversal of a graph.

**TODO**: Write **spark_bfs_1_round(visted_nodes)** that takes the currently dataframe of visited_nodes, performs one round of BFS, and returns an updated visited nodes dataframe. You should assume that a temporary sdf G already exists.

In [None]:
def spark_bfs_1_round(visited_nodes):
  """
  :param visited_nodes: dataframe with columns node and distance
  :return: dataframe of updated visuted nodes, with columns node and distance
  """
  # TODO: Complete this function to implement 1 round of BFS
  
  spark.catalog.dropTempView("visited_nodes")
  visited_nodes.createOrReplaceTempView("visited_nodes")
  #simple_graph_sdf.createOrReplaceTempView("G")
  query = '''SELECT G.to_node AS node, v.distance + 1 AS distance
        FROM visited_nodes v JOIN G on v.node = G.from_node
        WHERE v.distance = (SELECT MAX(distance) FROM visited_nodes)
        UNION
        SELECT *
        FROM visited_nodes
        
      '''
  temp_res = spark.sql(query)
  temp_res = temp_res.sort("distance")
  temp_res = temp_res.drop_duplicates(['node'])
  
  return temp_res


Now, run the inner function on **simple_1_round_bfs_sdf** (i.e. result of 1 round of BFS on the simple graph) and store the results in **simple_bfs_result**. This is ultimately what the output of BFS to depth 2 should look like.

In [None]:
# TODO: Run spark_bfs_1_round on simple_1_round_bfs_sdf
simple_graph_sdf.createOrReplaceTempView("G") 
simple_bfs_result = spark_bfs_1_round(simple_1_round_bfs_sdf)

Convert this result to Pandas, sorted by the node (ascending), and submit it to the autograder.

**HINT:** Make sure distance is formatted as a number!

In [None]:
# TODO: Convert simple_bfs_result to Pandas sorted by node
simple_bfs_test = (simple_bfs_result.toPandas()).sort_values(by=['node'])

In [None]:
simple_bfs_test

Unnamed: 0,node,distance
5,A,0
2,B,1
4,C,1
3,D,1
1,E,2
0,F,2
6,G,2


In [None]:
# 6 points
grader.grade(test_case_id = 'checksimpleBFS', answer = simple_bfs_test)

Correct! You earned 6/6 points. You are a star!

Your submission has been successfully recorded in the gradebook.


#### 4.1.3 Full BFS Implementation

Now, we will fully implement **spark_bfs**. This function should iteratively call your implemented version of **spark_bfs_1_round** and ultimately return the output of this function at **max_depth**.

You are also responsible for initializing the starting dataframe, that is converting the list of origin nodes into a spark dataframe with the nodes logged at distance 0.

Consider the following: 

```
schema = StructType([
            StructField("node", StringType(), True)
        ])

    my_sdf = spark.createDataFrame(origins, schema)
```

The schema ultimately specifies the structure of the Spark DataFrame with a string `node` column. It then calls **spark.createDataFrame** to map this schema to the **origins** nodes. Also, you are responsible for ensuring that a view of your graph is available within this function. (Note: you will also need to add in a distance column)

**TODO:** implement **spark_bfs(G,origins,max_depth)** and run on **teams_graph_sdf** initalized in 4.3. Note: you may want to run tests on the **simple_graph** example as the `teams_graph_sdf` will take quite some time to run.

These imports might be useful: 
`from pyspark.sql.types import StructType, StructField, StringType, IntegerType`

In [None]:
# TODO: iterative search over directed graph
# Worth 5 points directly, but will be needed later

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

def spark_bfs(G, origins, max_depth):
  """ runs distributed BFS to a specified max depth

  :param G: graph dataframe from 4.3
  :param origins: list of origin nodes stored as {"node": nodeValue}
  :param max_depth: integer value of max depth to run BFS to
  :return: dataframe with columns node, distance of all visited nodes
  """
  #schema = StructType([
  #                     StructField("node",StringType(),True),
  #                     StructField("distance",IntegerType(),True)
  #])
  res_df = pd.DataFrame(origins[0],index = [0])
  res_df['distance'] = 0
  res_sdf = spark.createDataFrame(res_df)
  #res_df = origins_sdf
  G.createOrReplaceTempView("G")
  for num in range(max_depth):
    res_sdf = spark_bfs_1_round(res_sdf)
  
  return res_sdf

Test that this function works on the simple example. 

In [None]:
simple_bfs_iterative_result = spark_bfs(simple_graph_sdf, smallOrig, 3)
simple_bfs_iterative_result.show()

+----+--------+
|node|distance|
+----+--------+
|   F|       2|
|   E|       2|
|   B|       1|
|   D|       1|
|   C|       1|
|   A|       0|
|   G|       2|
|   H|       3|
+----+--------+



### Step 4.2: History of the Super Bowl

Time to look at the Super Bowl history dataset. Here we have data for all Super Bowls from 1967 to 2020. Let's load the data to **superbowl_sdf** and see how it looks.

Download the `superbowl.csv` dataset from [here](https://drive.google.com/file/d/1QFLg2hvrjOi_QpDnKb0rWiszvP-g6txo/view?usp=sharing), and upload it to the content folder of this Colab notebook.

In [None]:
superbowl_sdf = spark.read.csv("superbowl.csv", header=True)

In [None]:
superbowl_sdf.show()

+-----------+------------+--------------------+----------+--------------------+---------+---------------+--------------------+---------------+----------+
|       Date|          SB|              Winner|Winner Pts|               Loser|Loser Pts|            MVP|             Stadium|           City|     State|
+-----------+------------+--------------------+----------+--------------------+---------+---------------+--------------------+---------------+----------+
| Feb 2 2020|    LIV (54)|  Kansas City Chiefs|        31| San Francisco 49ers|       20|Patrick Mahomes|   Hard Rock Stadium|  Miami Gardens|   Florida|
| Feb 3 2019|   LIII (53)|New England Patriots|        13|    Los Angeles Rams|        3| Julian Edelman|Mercedes-Benz Sta...|        Atlanta|   Georgia|
| Feb 4 2018|    LII (52)| Philadelphia Eagles|        41|New England Patriots|       33|     Nick Foles|   U.S. Bank Stadium|    Minneapolis| Minnesota|
| Feb 5 2017|     LI (51)|New England Patriots|        34|     Atlanta Falco

Do you know which teams have been the most successful during this period? Let's find out!

**TODO:** Calculate the number of times each team appears in the *Winner* column, and store this **count_sdf**. This should have two columns: *Winner* and *win_count*.

In [None]:
# TODO
superbowl_sdf.createOrReplaceTempView("superbowl")
query = '''
        SELECT Winner, COUNT(Date) AS win_count
        FROM superbowl
        GROUP BY Winner
'''
count_sdf = spark.sql(query)

Convert the dataframe to pandas and pass it to the grader.

In [None]:
count_df = count_sdf.toPandas()

In [None]:
# 3 points
grader.grade(test_case_id = 'checkSuperBowlCount', answer = count_df)

Correct! You earned 3/3 points. You are a star!

Your submission has been successfully recorded in the gradebook.


Now that we have the Super Bowl dataset, let's convert it to a graph sdf just like the one we had in section 4.1 (P.S. it's not as hard as it sounds).

**TODO:** Create **teams_graph_sdf** that has the columns *from_node* and *to_node*. from_node has all the entries from the *Winner* column and to_node has all the entries from the *Loser* column.

In [None]:
teams_graph_sdf = spark.sql("SELECT Winner AS from_node, Loser AS to_node FROM superbowl")

In [None]:
# 2 points
grader.grade(test_case_id = 'checkSuperBowlGraph', answer = teams_graph_sdf.toPandas())

Correct! You earned 2/2 points. You are a star!

Your submission has been successfully recorded in the gradebook.


We have our graph ready! All that's left is running full BFS on it find the Super Bacons.

### Step 4.3: The Super Bacon Search

In the previous section, we found out that the New England Patriots and the Pittsburgh Steelers have been the most successful teams in the 54 years. So let's find out the Super Bacons of teams with respect to the New England Patriots.

First, we'll create an origin node using the New England Patriots.

In [None]:
orig = [{'node': 'New England Patriots'}]

Now, run the **spark_bfs()** function on **teams_graph_sdf** using this origin node and a max depth of 5. Store the result in **bfs_5**.

In [None]:
bfs_5 = spark_bfs(teams_graph_sdf, orig, 5)

In [None]:
bfs_5.show(5)

+--------------------+--------+
|                node|distance|
+--------------------+--------+
|    Tennessee Titans|       2|
| San Francisco 49ers|       5|
|   Arizona Cardinals|       5|
|New England Patriots|       0|
|  Kansas City Chiefs|       4|
+--------------------+--------+
only showing top 5 rows



Converty the result to pandas, store it to answer_df and pass this to the grader.

In [None]:
# TODO
answer_df = bfs_5.toPandas()

In [None]:
# 15 points
grader.grade(test_case_id = 'checkBFS', answer = answer_df)

Correct! You earned 15/15 points. You are a star!

Your submission has been successfully recorded in the gradebook.


Voila! We're all done. Just like this NFL season, you've completed the homework.

One last thing, as we predicted before, you're a star.

# HW Submission

Before you submit on Gradescope (you must submit your notebook to receive credit):


1.   Restart and Run-All to make sure there's nothing wrong with your notebook
2.   **Double check that you have the correct PennID (all numbers) in the autograder**. 
3. Make sure you've run all the PennGrader cells
4. Go to the "File" tab at the top left, and click "Download .ipynb" and then "Download .py".  **Rename** the files to "homework3.ipynb" and "homework3.py" respectively and upload them to Gradescope 

**Let the course staff know ASAP if you have any issues submitting, but otherwise best of luck!**