In [1]:
import numpy as np
import pandas as pd
import pickle

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 125)

In [2]:
data = pd.read_csv("data/Sample_PFF_Data.csv")

In [3]:
dataset = data.copy(deep=True)

In [4]:
# should Shotgun and Pistol be NaN for run plays and 0 for passes where no shotgun? Or just zero fine?
dataset.SHOTGUN.where(dataset.SHOTGUN.isna(),1, inplace=True)
dataset.SHOTGUN.fillna(0, inplace=True)

dataset.PISTOL.where(dataset.PISTOL.isna(),1, inplace=True)
dataset.PISTOL.fillna(0, inplace=True)

dataset['MOFO_PLAYED'] = dataset.MOFOCPLAYED.replace({'O': 1, 'C': 0})
dataset['MOFO_SHOWN'] = dataset.MOFOCPLAYED.replace({'O': 1, 'C': 0})


In [5]:
# convert string time (2:00) into seconds as int (120)
def convert_time(time_str):
    return int(time_str[0:2])*60 + int(time_str[3:])
# remove * and +Q to simplify features
# decided to group all 3-RB sets together since had similar Run vs Pass rates (and all had low sample sizes)
def convert_off_personnel(personnel_str):
    new_alignment = personnel_str[0:2]
    if new_alignment[0] == '3':
        new_alignment = '3+'
    if new_alignment == 'Un':
        new_alignment = np.nan
    return new_alignment
# just return nans for values like (10 men, X-X-X)
def convert_def_personnel(personnel_str):
    try:
        num_lineman = int(personnel_str[0])
        num_linebackers = int(personnel_str[2])
        num_defensivebacks = int(personnel_str[4])
    except:
        num_lineman = np.nan 
        num_linebackers = np.nan 
        num_defensivebacks = np.nan
    return pd.Series([num_lineman, num_linebackers, num_defensivebacks])

In [6]:
dataset['CLOCK_INT'] = dataset['CLOCK'].apply(convert_time)

In [7]:
dataset.QUARTER = dataset.QUARTER.astype("category")
dataset.DOWN = dataset.DOWN.astype("category")
dataset.OFFTIMEOUTSREMAINING = dataset.OFFTIMEOUTSREMAINING.astype("category")
dataset.DEFTIMEOUTSREMAINING = dataset.DEFTIMEOUTSREMAINING.astype("category")
dataset.HASH = dataset.HASH.astype("category")

In [8]:
dataset = dataset.loc[(dataset.RUNPASS == 'P') | (dataset.RUNPASS == 'R') ]
dataset = dataset.query("DOWN != 0").reset_index(drop=True)
dataset.RUNPASS = dataset.RUNPASS.astype("category")

In [9]:
dataset['OFFPERSONNEL_SIMPLIFIED'] = dataset['OFFPERSONNELBASIC'].apply(convert_off_personnel)
dataset[['DEFPERSONNEL_num_linemen', 'DEFPERSONNEL_num_linebackers', 'DEFPERSONNEL_num_defensivebacks']] = dataset['DEFPERSONNEL'].apply(convert_def_personnel)

In [11]:
misc_col_names = [
    'historical_yards_per_carry',
    'historical_yards_per_pass_attempt', 
    'historical_yards_allowed_per_carry', 
    'historical_yards_allowed_per_pass_attempt'
]

col_names_for_numeric_previous = [
'FORCEDFUMBLE',
'HIT',
'HURRY',
'GAINLOSSNET',
'INTERCEPTION',
'NOHUDDLE',
'PENALTY',
'PASSDEPTH',
'PASSBREAKUP',

'DROPBACKDEPTH',

'MOFO_PLAYED',
'MOFO_SHOWN',
'DEFPERSONNEL_num_linemen',
'DEFPERSONNEL_num_linebackers',
'DEFPERSONNEL_num_defensivebacks',

'PISTOL',
'PLAYACTION',
'SACK',
'SCREEN',
'SHIFTMOTION',
'SHOTGUN',

'QBMOVEDOFFSPOT',
'QBPRESSURE',
'TIMETOPRESSURE',
'TIMETOTHROW',
'YARDSAFTERCATCH',
'YARDSAFTERCONTACT',
]

In [12]:
# store all feature names to use in lists, with convention pre_, game_prev_, and historical_prev_
prev_names = ['prev_' + x for x in col_names_for_numeric_previous]
game_prev_names = ['game_prev_' + x for x in col_names_for_numeric_previous] # instead of just previous, get cumulative mean from all previous plays in game
historical_prev_names = ['historical_prev_' + x for x in col_names_for_numeric_previous] # get cumulative mean from all previous plays in all previous games

In [13]:
def rolling_mean_func(g, K=10):
    return g.expanding(min_periods=K).mean().shift(1)

In [14]:
# need to sort for adding calculated columns back to original dataframe
dataset = dataset.sort_values(["OffTeam", "GAMEID", "PLAYID"]).reset_index(drop=True)
dataset[prev_names] = dataset.groupby(["OffTeam", "GAMEID", "DRIVE"], sort=False)[col_names_for_numeric_previous].shift(1)  # don't need to sort since dataset already sorted
# shift(1) to exlcude current row from mean calculations
dataset[game_prev_names] = dataset.groupby(["OffTeam", "GAMEID"], sort=False)[col_names_for_numeric_previous].apply(rolling_mean_func).reset_index(drop=True)
dataset[historical_prev_names] = dataset.groupby(["OffTeam"], sort=False)[col_names_for_numeric_previous].apply(rolling_mean_func, K=100).reset_index(drop=True)


In [15]:
dataset['running_gain'] = dataset.RUNPASS.map({"R":1, "P":np.nan})*dataset.GAINLOSSNET
dataset['passing_gain'] = dataset.RUNPASS.map({"P":1, "R":np.nan})*dataset.GAINLOSSNET
dataset['historical_yards_per_carry'] = dataset.groupby(["OffTeam"])['running_gain'].apply(rolling_mean_func, K=100).reset_index(drop=True)
dataset['historical_yards_per_pass_attempt'] = dataset.groupby(["OffTeam"])['passing_gain'].apply(rolling_mean_func, K=100).reset_index(drop=True)

In [16]:
dataset = dataset.sort_values(["DefTeam", "GAMEID", "PLAYID"]).reset_index(drop=True)
dataset['historical_yards_allowed_per_carry'] = dataset.groupby(["DefTeam"])['running_gain'].apply(rolling_mean_func, K=100).reset_index(drop=True)
dataset['historical_yards_allowed_per_pass_attempt'] = dataset.groupby(["DefTeam"])['passing_gain'].apply(rolling_mean_func, K=100).reset_index(drop=True)

In [17]:
col_names_for_categorical_previous = [
    'OFFPERSONNEL_SIMPLIFIED', 
    'CENTERPASSBLOCKDIRECTION'
]
for col_name in col_names_for_categorical_previous:
    dataset[col_name] = dataset[col_name].astype("category")

prev_categorical_names = ['prev_' + x for x in col_names_for_categorical_previous]
# create previous play categorical
dataset[prev_categorical_names] = dataset.groupby(["OffTeam","GAMEID", "DRIVE"])[col_names_for_categorical_previous].shift(1)

In [18]:
# get a list of all the names of the columns that are dummy variables for the categorical features used
dummy_categorical_names = []
for col_name in col_names_for_categorical_previous:
    unique_vals = dataset[col_name].unique()
    for val in unique_vals:
        dummy_column_name = f"{col_name}_{val}"
        dummy_categorical_names.append(dummy_column_name)
game_prev_dummy_categorical_names = ['game_prev_' + x for x in dummy_categorical_names]

dummy_dataset = pd.get_dummies(dataset, prefix_sep="_", dummy_na=True, columns=col_names_for_categorical_previous, drop_first=False)
dummy_dataset = dummy_dataset.sort_values(["OffTeam", "GAMEID", "PLAYID"]).reset_index(drop=True)


In [19]:
dummy_dataset[game_prev_dummy_categorical_names] = dummy_dataset.groupby(["OffTeam", "GAMEID"])[dummy_categorical_names].apply(rolling_mean_func, K=10).reset_index(drop=True)

In [20]:
historical_prev_dummy_categorical_names = ['historical_prev_' + x for x in dummy_categorical_names]
dummy_dataset[historical_prev_dummy_categorical_names] = dummy_dataset.groupby(["OffTeam"])[dummy_categorical_names].apply(rolling_mean_func, K=100).reset_index(drop=True)

In [21]:
base_feature_names = [
    'WEEK',
    'QUARTER',
    'SCOREDIFFERENTIAL',
    'SCORE',
    'DISTANCE',
    'DOWN',
    'FIELDPOSITION',
    'DRIVE',
    'DRIVEPLAY',
    'OFFTIMEOUTSREMAINING',
    'DEFTIMEOUTSREMAINING',
    'HASH',
    'SPOTLEFT',
    '2MINUTE', 
    'CLOCK_INT']
engineered_features_names = prev_names + game_prev_names + historical_prev_names + \
                            prev_categorical_names + game_prev_dummy_categorical_names + historical_prev_dummy_categorical_names + \
                            misc_col_names
target_names = ['RUNPASS']

In [23]:
dummy_dataset = dummy_dataset.sort_values(["GAMEID", "PLAYID"]).reset_index(drop=True)    # so aligns with indices used in model fitting 

In [24]:
base_dataset = dummy_dataset[['GAMEID', 'PLAYID'] + base_feature_names + target_names]
#base_dataset.to_pickle("datasets/base_dataset.pkl")   # use pickle to keep data dtypes

In [25]:
non_categorical_dataset = dummy_dataset[['GAMEID', 'PLAYID'] + base_feature_names + prev_names + game_prev_names + historical_prev_names + \
                                  misc_col_names + target_names]
#non_categorical_dataset.to_pickle("datasets/non_categorical_dataset.pkl")   # use pickle to keep data dtypes

In [26]:
dummy_dataset_filtered = dummy_dataset[['GAMEID', 'PLAYID'] + base_feature_names + engineered_features_names + target_names]
#dummy_dataset_filtered.to_pickle("datasets/dummy_dataset.pkl")   # use pickle to keep data dtypes

In [26]:
''' 
Tests for filtering 
'''
test_df = dummy_dataset.copy(deep=True)
test_df = test_df.sort_values(["OffTeam", "GAMEID", "PLAYID"]).reset_index(drop=True)


In [28]:
# tests that we are calculating the rolling means correctly for a given team in a given game
# ASSUMES df is sorted how it should be sorted
def test_game_prev_calculation(df, game_id, off_team, col_name = 'HURRY'):
    slice = df.query("GAMEID == @game_id and OffTeam == @off_team")

    #### manually calculate some means #####
    hurry_first_10_play_avg = slice[col_name][0:10].mean()
    hurry_first_11_play_avg = slice[col_name][0:11].mean()
    hurry_first_12_play_avg = slice[col_name][0:12].mean()

    assert np.isnan(slice[f'game_prev_{col_name}'].iloc[0])
    assert np.isnan(slice[f'game_prev_{col_name}'].iloc[9])
    assert hurry_first_10_play_avg == slice[f'game_prev_{col_name}'].iloc[10]
    assert hurry_first_11_play_avg == slice[f'game_prev_{col_name}'].iloc[11]
    assert hurry_first_12_play_avg == slice[f'game_prev_{col_name}'].iloc[12]
    assert slice[col_name][0:-1].mean() == slice[f'game_prev_{col_name}'].iloc[-1]
    print(f"Successful")

In [29]:
test_game_prev_calculation(df=test_df, game_id=19752, off_team='Team_25')
test_game_prev_calculation(df=test_df, game_id=19752, off_team='Team_25', col_name='GAINLOSSNET')
test_game_prev_calculation(df=test_df, game_id=19752, off_team='Team_25', col_name='HIT')
test_game_prev_calculation(df=test_df, game_id=19746, off_team='Team_28')
test_game_prev_calculation(df=test_df, game_id=19663, off_team='Team_22')
# check categorical features
test_game_prev_calculation(df=test_df, game_id=18548, off_team='Team_3', col_name='CENTERPASSBLOCKDIRECTION_C')
test_game_prev_calculation(df=test_df, game_id=18548, off_team='Team_3', col_name='CENTERPASSBLOCKDIRECTION_L')
test_game_prev_calculation(df=test_df, game_id=18548, off_team='Team_3', col_name='CENTERPASSBLOCKDIRECTION_R')
test_game_prev_calculation(df=test_df, game_id=18548, off_team='Team_3', col_name='CENTERPASSBLOCKDIRECTION_nan')

Successful
Successful
Successful
Successful
Successful
Successful
Successful
Successful
Successful


In [40]:
# tests that we are calculating the rolling means correctly for a given team
# ASSUMES df is sorted how it should be sorted
def test_historical_prev_calculation(df, off_team, col_name = 'CENTERPASSBLOCKDIRECTION_C'):
    slice = df.query("OffTeam == @off_team")
    max_index = slice.shape[0]

    assert np.isnan(slice[f'historical_prev_{col_name}'].iloc[0])
    assert np.isnan(slice[f'historical_prev_{col_name}'].iloc[50])
    assert np.isnan(slice[f'historical_prev_{col_name}'].iloc[100-1])
    assert slice[col_name][0:100].mean() == slice[f'historical_prev_{col_name}'].iloc[100]
    assert slice[col_name][0:(max_index//2)].mean()  == slice[f'historical_prev_{col_name}'].iloc[(max_index//2)]
    assert slice[col_name][0:-1].mean() == slice[f'historical_prev_{col_name}'].iloc[-1]
    print(f"Successful")


In [41]:
test_historical_prev_calculation(df=test_df, off_team='Team_21', col_name='CENTERPASSBLOCKDIRECTION_C')
test_historical_prev_calculation(df=test_df, off_team='Team_21', col_name='CENTERPASSBLOCKDIRECTION_L')
test_historical_prev_calculation(df=test_df, off_team='Team_21', col_name='CENTERPASSBLOCKDIRECTION_R')
test_historical_prev_calculation(df=test_df, off_team='Team_21', col_name='CENTERPASSBLOCKDIRECTION_nan')

Successful
Successful
Successful
Successful
