In [None]:
"""
Make all imports here 
"""
import pandas as pd
import configparser
import glob
from datetime import datetime
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as sf
from pyspark.sql.types import TimestampType as tt

In [None]:
"""
Before processing the data, delete any old or lingering datasets from the directory. 
Not doing so might cause downstream issues if datasets are similarly or renamed. 
"""
import shutil
shutil.rmtree('cities', ignore_errors=True)
shutil.rmtree('game_spreads', ignore_errors=True)
shutil.rmtree('games', ignore_errors=True)
shutil.rmtree('teams', ignore_errors=True)
shutil.rmtree('stadiums', ignore_errors=True)
shutil.rmtree('player_stats_all', ignore_errors=True)
shutil.rmtree('player_stats_defensive_line', ignore_errors=True)
shutil.rmtree('player_stats_offensive_line', ignore_errors=True)
shutil.rmtree('player_stats_kicker', ignore_errors=True)
shutil.rmtree('player_stats_runningback', ignore_errors=True)
shutil.rmtree('player_stats_quarterback', ignore_errors=True)
shutil.rmtree('player_stats_receiver', ignore_errors=True)

### Step 1: Scope the Project and Gather Data

#### Scope 
The Fantasy Football Draft Aid will ingest raw football player, team, stadium, and expert sports betting data. Once in the data warehouse, the data will be transformed into a data model centered around a player performance fact table. Complementary dimension tables will also be available to perform analyses with player demographics, team demographics, and so on. 

The initial work and on-going ETL will use a framework comprised of S3 buckets with a Redshift cluster and database to store the data. The entire application will be run Apache Spark to handle the large amounts of data that are required for these analyses.

The end result will be a comprehensive data model to inform fantasty football team "owners" when making decisions during drafting and throughout the NFL season.  


#### Describe and Gather Data 
To build this data model, the following data sources are required:
1. Detailed NFL Play-by-Play Data from 2009 through 2018 - CSV containing information about players' performance on a play-by-play level https://www.kaggle.com/maxhorowitz/nflplaybyplay2009to2016#NFL%20Play%20by%20Play%202009-2018%20(v5).csv

2. NFL Scores and Betting Data - CSVs containing information about NFL betting spreads, NFL team stadiums, and NFL teams
https://www.kaggle.com/maxhorowitz/nflplaybyplay2009to2016#NFL%20Play%20by%20Play%202009-2018%20(v5).csv

3. NFL Statistics Data - CSVs containing information about NFL players, in aggregation, and broken out by position
https://www.kaggle.com/kendallgillies/nflstatistics/version/1#Game_Logs_Wide_Receiver_and_Tight_End.csv

4. US Cities Demographics Data - JSONs containing information about US cities, which will be used specifically to analyze cities with NFL teams and NFL stadiums 
https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/
 

In [None]:
# read in the data, by json and by csv file formats
def get_json_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    return all_files

def get_csv_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.csv'))
        for f in files :
            all_files.append(os.path.abspath(f))
    return all_files

json_files = get_json_files('nfl_drafting/')
csv_files = get_csv_files('nfl_drafting/')

# create function to get every filepath below
def get_dataset(fileset, data):
    dataset_fp = [s for s in fileset if data in s]
    dataset = dataset_fp[0]
    return dataset

# get filepath for each data set 
cities_fp = get_dataset(json_files, 'cities') 
games_fp = get_dataset(csv_files, 'games') 
game_spreads_fp = get_dataset(csv_files, 'game_spreads') 
stadiums_fp = get_dataset(csv_files, 'stadiums') 
teams_fp = get_dataset(csv_files, 'teams') 
player_stats_defensive_line_fp = get_dataset(csv_files, 'defensive_line') 
player_stats_kicker_fp = get_dataset(csv_files, 'kicker') 
player_stats_offensive_line_fp = get_dataset(csv_files, 'offensive_line') 
player_stats_quarterback_fp = get_dataset(csv_files, 'quarterback') 
player_stats_receiver_fp = get_dataset(csv_files, 'receiver') 
player_stats_runningback_fp = get_dataset(csv_files, 'runningback') 
player_stats_all_fp = get_dataset(csv_files, 'all') 

In [None]:
# clean up columns as they are read in if they have white space
def clean_cols_names(df):
    df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_').str.replace('(', '').str.replace(')', '')
    return df

# create pandas dataframes to clean if need be; all others will be directly created as Spark dataframes
player_stats_defensive_line_df = pd.read_csv(player_stats_defensive_line_fp)
player_stats_defensive_line_df = clean_cols_names(player_stats_defensive_line_df)
player_stats_kicker_df = pd.read_csv(player_stats_kicker_fp)
player_stats_kicker_df = clean_cols_names(player_stats_kicker_df)
player_stats_offensive_line_df = pd.read_csv(player_stats_offensive_line_fp)
player_stats_offensive_line_df = clean_cols_names(player_stats_offensive_line_df)
player_stats_quarterback_df = pd.read_csv(player_stats_quarterback_fp)
player_stats_quarterback_df = clean_cols_names(player_stats_quarterback_df)
player_stats_receiver_df = pd.read_csv(player_stats_receiver_fp)
player_stats_receiver_df = clean_cols_names(player_stats_receiver_df)
player_stats_runningback_df = pd.read_csv(player_stats_runningback_fp)
player_stats_runningback_df = clean_cols_names(player_stats_runningback_df)
player_stats_all_df = pd.read_csv(player_stats_all_fp)
player_stats_all_df = clean_cols_names(player_stats_all_df)

In [None]:
# begin Spark Session with Hadoop
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [None]:
# create Spark dataframes
cities = spark.read.json(cities_fp)
games = spark.read.format("csv").option("header", "true").load(games_fp)
game_spreads = spark.read.format("csv").option("header", "true").load(game_spreads_fp)
stadiums = spark.read.format("csv").option("header", "true").load(stadiums_fp)
teams = spark.read.format("csv").option("header", "true").load(teams_fp)
player_stats_defensive_line = spark.createDataFrame(player_stats_defensive_line_df.astype(str))
player_stats_kicker = spark.createDataFrame(player_stats_kicker_df.astype(str))
player_stats_offensive_line = spark.createDataFrame(player_stats_offensive_line_df.astype(str))
player_stats_quarterback = spark.createDataFrame(player_stats_quarterback_df.astype(str))
player_stats_receiver = spark.createDataFrame(player_stats_receiver_df.astype(str))
player_stats_runningback = spark.createDataFrame(player_stats_runningback_df.astype(str))
player_stats_all = spark.createDataFrame(player_stats_all_df.astype(str))

In [None]:
# write to parquet
cities.write.parquet("cities")
cities=spark.read.parquet("cities")

games.write.parquet("games")
games=spark.read.parquet("games")

game_spreads.write.parquet("game_spreads")
game_spreads=spark.read.parquet("game_spreads")

stadiums.write.parquet("stadiums")
stadiums=spark.read.parquet("stadiums")

teams.write.parquet("teams")
teams=spark.read.parquet("teams")

player_stats_defensive_line.write.parquet("player_stats_defensive_line")
player_stats_defensive_line=spark.read.parquet("player_stats_defensive_line")

player_stats_kicker.write.parquet("player_stats_kicker")
player_stats_kicker=spark.read.parquet("player_stats_kicker")

player_stats_offensive_line.write.parquet("player_stats_offensive_line")
player_stats_offensive_line=spark.read.parquet("player_stats_offensive_line")

player_stats_quarterback.write.parquet("player_stats_quarterback")
player_stats_quarterback=spark.read.parquet("player_stats_quarterback")

player_stats_receiver.write.parquet("player_stats_receiver")
player_stats_receiver=spark.read.parquet("player_stats_receiver")

player_stats_runningback.write.parquet("player_stats_runningback")
player_stats_runningback=spark.read.parquet("player_stats_runningback")

player_stats_all.write.parquet("player_stats_all")
player_stats_all=spark.read.parquet("player_stats_all")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [None]:
# Check for duplicates

def check_duplicates(df, cols):
    if df.count() > df.dropDuplicates(cols).count():
        print("Error, there are duplicates")
        print("Here is a sample of them: ")
        df.groupBy(cols)\
            .count()\
            .where(sf.col('count') > 1)\
            .show()
    else: 
        print("No duplicates")
    return

cities = cities.select('datasetid', 'fields.*') # cols are stored in struct; break them out 
cities_df_cols = ['city', 'state', 'race']
print("Checking cities")
check_duplicates(cities, cities_df_cols)

game_spreads_df_cols = ['schedule_date', 'schedule_season', 'schedule_week', 'stadium', 'team_home', 'team_away']
print("Checking game_spreads")
check_duplicates(game_spreads, game_spreads_df_cols)

games_df_cols = ['play_id', 'game_id', 'home_team', 'away_team', 'drive']
print("Checking games")
check_duplicates(games, games_df_cols)
# returns duplicates with this combo of columns and upon investigation the duplicates are across all columns
# using distinct on ALL columns in the following step to clean and then check again
games = games.distinct()
check_duplicates(games, games_df_cols)

teams_df_cols = ['team_name']
print("Checking teams")
check_duplicates(teams, teams_df_cols)

stadiums_df_cols = ['stadium_name']
print("Checking stadiums")
check_duplicates(stadiums, stadiums_df_cols)

player_stats_all_df_cols = ['name', 'player_id']
print("Checking player_stats_all")
check_duplicates(player_stats_all, player_stats_all_df_cols)

player_stats_defensive_line_df_cols = ['name', 'player_id', 'year','game_date']
print("Checking player_stats_defensive_line")
check_duplicates(player_stats_defensive_line, player_stats_defensive_line_df_cols) 
# returns 1 "duplicate" and claims the player played on two teams in one day
# this is likely not accurate, but we don't want to hardcode team assignments yet
# will build this into the ETL 

player_stats_offensive_line_df_cols = ['name', 'player_id', 'year','game_date']
print("Checking player_stats_offensive_line")
check_duplicates(player_stats_offensive_line, player_stats_offensive_line_df_cols)

player_stats_kicker_df_cols = ['name', 'player_id', 'year','game_date']
print("Checking player_stats_kicker")
check_duplicates(player_stats_kicker, player_stats_kicker_df_cols)

player_stats_receiver_df_cols = ['name', 'player_id', 'year','game_date']
print("Checking player_stats_receiver")
check_duplicates(player_stats_receiver, player_stats_receiver_df_cols)

player_stats_quarterback_df_cols = ['name', 'player_id', 'year','game_date']
print("Checking player_stats_quarterback")
check_duplicates(player_stats_quarterback, player_stats_quarterback_df_cols)

player_stats_runningback_df_cols = ['name', 'player_id', 'year','game_date']
print("Checking player_stats_runningback")
check_duplicates(player_stats_runningback, player_stats_runningback_df_cols)

In [None]:
# Perform string cleaning
def blank_as_null(df, cols):
    for col in cols:
        sf.when(((df[col] != "" ) | df[col] == "NA" ), df[col]).otherwise(None)
    return df

# since we are reading from strings, we went to set any rogue empty strings or NA's to null 
cities = blank_as_null(cities, cities_df_cols)
game_spreads = blank_as_null(game_spreads, game_spreads_df_cols)
games = blank_as_null(games, games_df_cols)
teams = blank_as_null(teams, teams_df_cols)
stadiums = blank_as_null(stadiums, stadiums_df_cols)
player_stats_receiver = blank_as_null(player_stats_receiver, player_stats_receiver_df_cols)
player_stats_runningback = blank_as_null(player_stats_runningback, player_stats_runningback_df_cols)
player_stats_quarterback = blank_as_null(player_stats_quarterback, player_stats_quarterback_df_cols)
player_stats_kicker = blank_as_null(player_stats_kicker, player_stats_kicker_df_cols)
player_stats_defensive_line = blank_as_null(player_stats_defensive_line, player_stats_defensive_line_df_cols)
player_stats_offensive_line = blank_as_null(player_stats_offensive_line, player_stats_offensive_line_df_cols)
player_stats_all = blank_as_null(player_stats_all, player_stats_all_df_cols)

In [None]:
# check for nulls in the columns which will become keys 

def check_nulls(df, cols):
    for col in cols:
        null_count = df.filter(df[col].isNull()).count()
        print("{} has {} nulls".format(col, null_count))
    return 

teams_check = check_nulls(teams, teams_df_cols)
game_spreads_check = check_nulls(game_spreads, game_spreads_df_cols)
games_check = check_nulls(games, games_df_cols)
stadiums_check = check_nulls(stadiums, stadiums_df_cols)
player_stats_receiver_check = check_nulls(player_stats_receiver, player_stats_receiver_df_cols)
player_stats_runningback_check = check_nulls(player_stats_runningback, player_stats_runningback_df_cols)
player_stats_quarterback_check = check_nulls(player_stats_quarterback, player_stats_quarterback_df_cols)
player_stats_kicker_check = check_nulls(player_stats_kicker, player_stats_kicker_df_cols)
player_stats_defensive_line_check = check_nulls(player_stats_defensive_line, player_stats_defensive_line_df_cols)
player_stats_offensive_line_check = check_nulls(player_stats_offensive_line, player_stats_offensive_line_df_cols)
player_stats_all_check = check_nulls(player_stats_all, player_stats_all_df_cols)
cities_check = check_nulls(cities, cities_df_cols)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

The data model will be comprised of one main fact table, one minor fact table, and ten supporting dimension tables. 

The main fact table - games - will contain all available information about players' performance within the context of a game. Gametime performance is the most critical piece of information to a fantasy football league participant because it directly translates into how the participant performs, as a result. All of the data is largely the "business facts" of the model, from the perspective of this football industry. 

The minor fact table can be considered an alternative to the main games table, because it is also descriptive of players' performance at a game-level. The difference is that this minor table - game_spreads - is the predictive performance and outcome, based upon sports betting professionals. When fantasy football leage participants make in-season decisions about which players to rest and to play, they often consult these data points. 

The supporting dimension tables will allow users of this data model to cut game and game_spread data by the grain named in the tables:
cities - for participants who consider the signifiance of sports in the home city as a decision factor 
teams - for participants who consider the players' context within their teams
stadiums - for participants who consider the characteristics and location of a stadium (or the traveled distance between a hometown and an away game) 
player_stats (broken out by most positions) - for participants who consider the high school, experience, and some aggregate statistical information about players 


#### 3.2 Mapping Out Data Pipelines
To make this data model possible the following work will be the most critical:

1. New ID fields must be made for each table which can be joined upon (current IDs are from the source data and are not necessarily linked in any way). In many cases these IDs will be a combination of unique values. 
2. Columns must be dropped, particularly from the games and game_spreads data sets. The width of the tables in their current state is excessive, costly to performance, and difficult for users to use in a reasonable way. 
3. Field types in all tables must be modified. They were read in as primarly strings and this will not be necessary for numeric data. 

To accomplish this we will follow these steps:
1. Create the table structure for the primary and secondary fact tables and all dimension tables
2. In doing 1, create the ID fields, implement correct field types and constraints, and select the necessary columns to include
3. Build INSERT statements for each of the 12 tables, while transform the data to meet each of the criteria set forth in step 2
4. Write execute statements to begin loading the data and evaluate whether the same statements can be used for going-forward ETL or if more lightweight logic is appropriate after this initial backfill 

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
"""
Grab the AWS Access Key ID and Secret Access Key to read and write our data 
"""

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_SECRET_ACCESS_KEY']

output_data = "s3://paulosky_nfldrafting/"


In [None]:
# cities
# extract columns   
cities_table = cities.select(city
                             ,state
                             ,state_code
                             ,sum(male_population).alias('male_population')
                             ,sum(female_population).alias('female_population')
                            ).distinct().groupBy(city, state) # since we removing race 
# write parquet files
cities_table.write.mode('overwrite').parquet(output_data + "cities")

In [None]:
# teams
# extract columns   
teams_table = teams.select(team_name
                           ,sf.col(team_name_short).alias('team_short_name')
                           ,sf.col(team_id).alias('team_code')
                           ,sf.col(team_conference).alias('team_conference_code')
                           ,sf.col(team_division).alias('team_division_short_name')
                           ,sf.col(team_conference_pre2002).alias('team_conference_pre2002_code')
                           ,sf.col(team_division_pre2002).alias('team_division_pre2002_short_name')
                          )
# write parquet files
teams_table.write.mode('overwrite').parquet(output_data + "teams")

In [None]:
# stadiums
# extract columns   
stadiums_table = stadiums.select(stadium_name
                                 ,stadium_location
                                 ,stadium_open
                                 ,stadium_close 
                                 ,stadium_type 
                                 ,stadium_weather_type
                                 ,stadium_surface 
                                 ,sf.col(elevation).alias('stadium_elevation_feet')
                            )
# write parquet files
stadiums_table.write.mode('overwrite').parquet(output_data + "stadiums")

In [None]:
# games
# extract columns   
games_table = games.select( play_id, 
                            col(desc).alias('play_descr'),
                            game_id, 
                            game_date, 
                            sf.col(home_team).alias('home_team_id'), 
                            sf.col(away_team).alias('away_team_id)', 
                            sf.col(posteam).alias('poss_team_id'),
                            sf.col(defteam).alias('def_team_id'),
                            side_of_field,
                            sf.col(yardline_100).alias(yardline),
                            game_seconds_remaining, 
                            drive,
                            qtr,
                            down,
                            time,
                            sf.col(ydstogo).alias('yards_to_go'),
                            play_type,
                            sf.col(epa).alias('expected_points_added'),
                            punt_blocked,
                            first_down_rush,
                            first_down_pass,
                            first_down_penalty,
                            third_down_converted,
                            third_down_failed,
                            fourth_down_converted,
                            fourth_down_failed,
                            incomplete_pass,
                            interception,
                            punt_inside_twenty,
                            punt_in_endzone,
                            punt_out_of_bounds,
                            punt_downed,
                            punt_fair_catch,
                            kickoff_inside_twenty,
                            kickoff_in_endzone,
                            kickoff_out_of_bounds,
                            kickoff_downed,
                            kickoff_fair_catch,
                            fumble_forced,
                            fumble_not_forced,
                            fumble_out_of_bounds
                            solo_tackle,
                            safety,
                            penalty,
                            tackled_for_loss,                       
                            fumble_lost,
                            qb_hit
                            rush_attempt,
                            pass_attempt,
                            sack,
                            touchdown,
                            pass_touchdown,
                            rush_touchdown,
                            return_touchdown,
                            extra_point_attempt,
                            two_point_attempt,
                            field_goal_attempt,
                            kickoff_attempt
                            punt_attempt,
                            fumble,
                            complete_pass,
                            assist_tackle,
                            lateral_reception,
                            lateral_rush, 
                            lateral_return,
                            lateral_recovery
                            passer_player_name,
                            receiver_player_name,
                            rusher_player_name,
                            lateral_receiver_player_name,
                            lateral_rusher_player_name,
                            lateral_sack_player_name,
                            interception_player_name,
                            lateral_interception_player_name,
                            punt_returner_player_name,
                            lateral_punt_returner_player_name,
                            kickoff_returner_player_name,
                            lateral_kickoff_returner_player_name,
                            punter_player_name,
                            kicker_player_name,
                            own_kickoff_recovery_player_name,
                            blocked_player_name,
                            tackle_for_loss_1_player_name,
                            tackle_for_loss_2_player_name,
                            qb_hit_1_player_name,
                            qb_hit_2_player_name,
                            forced_fumble_player_1_player_name,
                            forced_fumble_player_2_player_name,
                            solo_tackle_1_player_name,
                            solo_tackle_2_player_name,
                            assist_tackle_1_player_name	
                            assist_tackle_2_player_name	
                            assist_tackle_3_player_name,
                            assist_tackle_4_player_name,
                            pass_defense_1_player_name,
                            pass_defense_2_player_name,
                            fumbled_1_player_name,
                            fumbled_2_player_name,
                            fumble_recovery_1_yards,
                            fumble_recovery_1_player_name,
                            fumble_recovery_2_yards,
                            fumble_recovery_2_player_name,
                            return_yards,
                            penalty_player_name,
                            penalty_yards,
                            replay_or_challenge,
                            replay_or_challenge_result,
                            penalty_type,
                            defensive_two_point_attempt,
                            defensive_two_point_conv,
                            defensive_extra_point_attempt,
                            defensive_extra_point_conv
                            )
# write parquet files
games_table.write.mode('overwrite').parquet(output_data + "games")

In [None]:
# game_spreads
# extract columns   
game_spreads_table = game_spreads.select(sf.col(schedule_date).alias('game_date')
                                         ,sf.col(schedule_season).alias('season_year')
                                         ,sf.col(schedule_week).alias('season_week')
                                         ,sf.col(schedule_playoff).alias('playoff_game_flag')
                                         ,sf.col(team_home).alias('home_team_name')
                                         ,sf.col(score_home).alias('home_team_score')
                                         ,sf.col(score_away).alias('away_team_score')
                                         ,sf.col(team_away).alias('away_team_name')
                                         ,sf.col(team_favorite_id).alias('favorite_team_name')
                                         ,sf.col(spread_favorite).alias('spread_favorite_team_name')
                                         ,sf.col(over_under_line).alias('over_under_line')
                                         ,sf.col(stadium).alias('stadium_name')
                                         ,sf.col(stadium_neutral).alias('neutral_stadium_flag')
                                         ,sf.col(weather_temperature).alias('weather_temperature_f')
                                         ,sf.col(weather_wind_mph).alias('weather_wind_mph')
                                         ,sf.col(weather_humidity).alias('weather_humidity_perc')
                                         

# write parquet files
game_spreads_table.write.mode('overwrite').parquet(output_data + "game_spreads")

In [None]:
# player_stats_all
# extract columns   
player_stats_all_table = player_stats_all.select(player_id
                                                ,sf.col(name).split(', ')[0].alias('last_name')
                                                ,sf.col(name).split(', ')[1].alias('first_name')
                                                ,sf.col(birthday).alias('birth_date')
                                                ,sf.col(birth_place).split(', ')[0].alias('birth_city')
                                                ,sf.col(birth_place).split(', ')[1].alias('birth_state')
                                                ,sf.col(current_status).alias('status')
                                                ,sf.col(high_school).alias('high_school_name')
                                                ,sf.col(college).alias('college_name')
                                                ,sf.col(current_team).alias('current_team_name')
                                                ,experience
                                                ,height_inches
                                                ,weight_pounds
                                                ,sf.col(years_played).split(' - ')[0].alias('year_start')
                                                ,sf.col(years_played).split(' - ')[1].alias('year_end')
                            )
# write parquet files
player_stats_all_table.write.mode('overwrite').parquet(output_data + "player_stats_all")

In [None]:
# player_stats_defensive_line
# extract columns   
player_stats_defensive_line_table = player_stats_defensive_line.select(player_id
                                                                       ,sf.col(name).split(', ')[0].alias('last_name')
                                                                       ,sf.col(name).split(', ')[1].alias('first_name')
                                                                       ,sf.col(position).alias('position_code')
                                                                       ,sf.col(year).alias('season_year')
                                                                       ,sf.col(week).alias('season_week')
                                                                       ,game_date
                                                                       ,sf.col(season).alias('season_phase')
                                                                       ,home_or_away
                                                                       ,sf.col(opponent).alias('opposing_team_code')
                                                                       ,outcome
                                                                       ,score
                                                                       ,games_played
                                                                       ,games_started
                                                                       ,total_tackles
                                                                       ,solo_tackles
                                                                       ,assisted_tackles
                                                                       ,sacks
                                                                       ,sf.col(safties).alias('safeties')
                                                                       ,passes_defended
                                                                       ,ints
                                                                       ,int_yards
                                                                       ,yards_per_int
                                                                       ,longest_int_return
                                                                       ,ints_for_tds
                                                                       ,forced_fumbles
                            )
# write parquet files
player_stats_defensive_line_table.write.mode('overwrite').parquet(output_data + "player_stats_defensive_line")

In [None]:
# player_stats_offensive_line
# extract columns   
player_stats_offensive_line_table = player_stats_offensive_line.select(player_id
                                                                       ,sf.col(name).split(', ')[0].alias('last_name')
                                                                       ,sf.col(name).split(', ')[1].alias('first_name')
                                                                       ,sf.col(position).alias('position_code')
                                                                       ,sf.col(year).alias('season_year')
                                                                       ,sf.col(week).alias('season_week')
                                                                       ,game_date
                                                                       ,sf.col(season).alias('season_phase')
                                                                       ,home_or_away
                                                                       ,sf.col(opponent).alias('opposing_team_code')
                                                                       ,outcome
                                                                       ,score
                                                                       ,games_played
                                                                       ,games_started
                            )
# write parquet files
player_stats_offensive_line_table.write.mode('overwrite').parquet(output_data + "player_stats_offensive_line")

In [None]:
# player_stats_kicker
# extract columns   
player_stats_kicker_table = player_stats_kicker.select(player_id
                                                       ,sf.col(name).split(', ')[0].alias('last_name')
                                                       ,sf.col(name).split(', ')[1].alias('first_name')
                                                       ,sf.col(position).alias('position_code')
                                                       ,sf.col(year).alias('season_year')
                                                       ,sf.col(week).alias('season_week')
                                                       ,game_date
                                                       ,sf.col(season).alias('season_phase')
                                                       ,home_or_away
                                                       ,sf.col(opponent).alias('opposing_team_code')
                                                       ,outcome
                                                       ,score
                                                       ,games_played
                                                       ,games_started
                                                       ,kicks_blocked
                                                       ,longest_fg_made
                                                       ,fgs_attempted
                                                       ,fgs_made
                                                       ,extra_points_made
                                                       ,extra_points_attempted
                                                       ,extra_points_blocked
                                                       ,kickoffs
                                                       ,yards_per_kickoff
                                                       ,touchbacks
                                                       ,kickoffs_returned
                            )
# write parquet files
player_stats_kicker_table.write.mode('overwrite').parquet(output_data + "player_stats_kicker")

In [None]:
# player_stats_quarterback
# extract columns   
player_stats_quarterback_table = player_stats_quarterback.select(player_id
                                                                 ,sf.col(name).split(', ')[0].alias('last_name')
                                                                 ,sf.col(name).split(', ')[1].alias('first_name')
                                                                 ,sf.col(position).alias('position_code')
                                                                 ,sf.col(year).alias('season_year')
                                                                 ,sf.col(week).alias('season_week')
                                                                 ,game_date
                                                                 ,sf.col(season).alias('season_phase')
                                                                 ,home_or_away
                                                                 ,sf.col(opponent).alias('opposing_team_code')
                                                                 ,outcome
                                                                 ,score
                                                                 ,games_played
                                                                 ,games_started
                                                                 ,passes_completed
                                                                 ,passes_attempted
                                                                 ,passing_yards
                                                                 ,passing_yards_per_attempt
                                                                 ,td_passes
                                                                 ,ints
                                                                 ,sacks
                                                                 ,sacked_yards_lost
                                                                 ,passer_rating
                                                                 ,rushing_attempts
                                                                 ,rushing_yards
                                                                 ,yards_per_carry
                                                                 ,rushing_tds
                                                                 ,fumbles
                                                                 ,fumbles_lost
                            )
# write parquet files
player_stats_quarterback_table.write.mode('overwrite').parquet(output_data + "player_stats_quarterback")

In [None]:
# player_stats_receiver
# extract columns   
player_stats_receiver_table = player_stats_receiver.select(player_id
                                                           ,sf.col(name).split(', ')[0].alias('last_name')
                                                           ,sf.col(name).split(', ')[1].alias('first_name')
                                                           ,sf.col(position).alias('position_code')
                                                           ,sf.col(year).alias('season_year')
                                                           ,sf.col(week).alias('season_week')
                                                           ,game_date
                                                           ,sf.col(season).alias('season_phase')
                                                           ,home_or_away
                                                           ,sf.col(opponent).alias('opposing_team_code')
                                                           ,outcome
                                                           ,score
                                                           ,games_played
                                                           ,games_started
                                                           ,receptions
                                                           ,receiving_yards
                                                           ,yards_per_reception
                                                           ,longest_reception
                                                           ,receiving_tds
                                                           ,rushing_attempts
                                                           ,rushing_yards
                                                           ,yards_per_carry
                                                           ,longest_rushing_run
                                                           ,rushing_tds
                                                           ,fumbles
                                                           ,fumbles_lost
                            )
# write parquet files
player_stats_receiver_table.write.mode('overwrite').parquet(output_data + "player_stats_receiver")

In [None]:
# player_stats_runningback
# extract columns   
player_stats_runningback_table = player_stats_runningback.select(player_id
                                                                 ,sf.col(name).split(', ')[0].alias('last_name')
                                                                 ,sf.col(name).split(', ')[1].alias('first_name')
                                                                 ,sf.col(position).alias('position_code')
                                                                 ,sf.col(year).alias('season_year')
                                                                 ,sf.col(week).alias('season_week')
                                                                 ,game_date
                                                                 ,sf.col(season).alias('season_phase')
                                                                 ,home_or_away
                                                                 ,sf.col(opponent).alias('opposing_team_code')
                                                                 ,outcome
                                                                 ,score
                                                                 ,games_played
                                                                 ,games_started
                                                                 ,receptions
                                                                 ,receiving_yards
                                                                 ,yards_per_reception
                                                                 ,longest_reception
                                                                 ,receiving_tds
                                                                 ,rushing_attempts
                                                                 ,rushing_yards
                                                                 ,yards_per_carry
                                                                 ,longest_rushing_run
                                                                 ,rushing_tds
                                                                 ,fumbles
                                                                 ,fumbles_lost
                            )
# write parquet files
player_stats_runningback_table.write.mode('overwrite').parquet(output_data + "player_stats_runningback")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks 

check_nulls(teams_table, teams_df_cols)
check_nulls(game_spreads_table, game_spreads_df_cols)
check_nulls(games_table, games_df_cols)
check_nulls(stadiums_table, stadiums_df_cols)
check_nulls(player_stats_receiver_table, player_stats_receiver_df_cols)
check_nulls(player_stats_runningback_table, player_stats_runningback_df_cols)
check_nulls(player_stats_quarterback_table, player_stats_quarterback_df_cols)
check_nulls(player_stats_kicker_table, player_stats_kicker_df_cols)
check_nulls(player_stats_defensive_line_table, player_stats_defensive_line_df_cols)
check_nulls(player_stats_offensive_line_table, player_stats_offensive_line_df_cols)
check_nulls(player_stats_all_table, player_stats_all_df_cols)
check_nulls(cities_table, cities_df_cols)

check_duplicates(cities_table, cities_df_cols)
check_duplicates(game_spreads_table, game_spreads_df_cols)
check_duplicates(games_table, games_df_cols)
check_duplicates(teams_table, teams_df_cols)
check_duplicates(stadiums_table, stadiums_df_cols)
check_duplicates(player_stats_all_table, player_stats_all_df_cols)
check_duplicates(player_stats_defensive_line_table, player_stats_defensive_line_df_cols) 
check_duplicates(player_stats_offensive_line_table, player_stats_offensive_line_df_cols)
check_duplicates(player_stats_kicker_table, player_stats_kicker_df_cols)
check_duplicates(player_stats_receiver_table, player_stats_receiver_df_cols)
check_duplicates(player_stats_quarterback_table, player_stats_quarterback_df_cols)
check_duplicates(player_stats_runningback_table, player_stats_runningback_df_cols)

#### 4.3 Data dictionary 
Please see data_dictionary.txt

#### Step 5: Complete Project Write Up
* Rationale for selected tools and technologies
  Given the size and variety of data in this project, Spark was used to process the data in a parquet columnar storage format. 
  A Redshift cluster with S3 buckets is used to store the data for future querying and analyses and player performance. 
* Latency
  This data should be updated differently based on time-of-year. 
  The data should be processed every morning. At first, it would seem this data only needs to be updated on a weekly basis, after all games have been played. However, we do see variance in sports betting each day, as well as, trades which impact which team a player is on, who is teammates are, and whether he is injured. The stadiums data will likely not change much at all, but given the very small size of data in this table, it is trivial to keep its schedule in line with the other tables. 
* Forward-thinking stratagies:
 * If the volume of data increased 100x (which will not take long given the amount of data gathered during each season of play), we will likely need to create sub-data models for each skillset. For example, player_stats_defensive_line can be broken down into individual positions, or specific data pointss per table, i.e. tackles vs. sacks. This will help to alleviate run time of queries that do not need to pull all data points to answer a single question about a skillset. 
 * If this data populated a dashboard with an SLA of 7am daily, we would begin processing the data at 2AM UTC each evening. This would allow the final data, posted by the NFL, to accumulate post-game. At this time, run time for this entire notebook is under 30 minutes. 
 * If 100 people needed to access this database, we could consider how this data model might benefit different use cases. For example, some individuals would only be concerned with play conditions, which means they would not need a data model comprised of each play or each player's performance. They would be more interested in information about the stadium, the weather of each game, and the general outcome of each game. Other use cases might include understanding the demographic of each city which has an NFL team to better serve the fans or incrase fan participation at games. These users would need to see stadium, city, team, and sometimes player information (at an aggregate level). For all of these use cases and more, we could create more tables with more defined subject-areas to reduce table access and limit to only data which the users need.