In [1]:
# !pip install -U kaleido

In [2]:
## Rush Play Data

import numpy as np
import pandas as pd
pd.set_option('display.max_columns', 500)
pd.options.mode.chained_assignment = None

import matplotlib.pyplot as plt

def calc_run_success(row):
    yards_to_go = row['yardsToGo']
    yards_gained = row['yardsGained']
    if yards_gained<=0:
        return 0
    elif row['down'] == 1:
        return 1 if (yards_gained / yards_to_go) >=.4 else 0
    elif row['down'] == 2:
        return 1 if (yards_gained / yards_to_go) >=.6 else 0 
    else:
        return 1 if yards_to_go<=yards_gained else 0

play_df = pd.read_csv('/kaggle/input/nfl-big-data-bowl-2025/plays.csv')
games_df = pd.read_csv('/kaggle/input/nfl-big-data-bowl-2025/games.csv')

play_df = play_df.merge(games_df[['gameId', 'week']], how='left', on=['gameId'])

play_cols = [
    'gameId',
    'playId',
    'week',
    'playDescription',
    'possessionTeam',
    'quarter',
    'down',
    'yardsToGo',
    'absoluteYardlineNumber',
    'rushLocationType',
    'yardsGained',
    'pff_runConceptPrimary',
    'pff_runConceptSecondary'
]

# DF with run plays, no penaltys no kneels
rush_df = (
    play_df[
    (play_df.playNullifiedByPenalty == 'N')&
    (play_df.isDropback == False)&
    (play_df.qbKneel == 0)&
    (pd.isnull(play_df.penaltyYards))]
)[play_cols]
rush_df['runSuccess'] = rush_df.apply(calc_run_success, axis=1)
rush_df = rush_df.sort_values(by=['gameId', 'playId']).reset_index(drop=True)
# rush_df.head()

In [3]:
# create table of team performance -- wins, ppg, Run Success %

def get_winner(row):
    return row['homeTeamAbbr'] if row['homeFinalScore'] > row['visitorFinalScore'] else row['visitorTeamAbbr']

games_df['winner'] = games_df.apply(get_winner, axis=1)

point_dict = {}
win_dict = {}
for team in games_df.homeTeamAbbr.unique():
    games_played_df = games_df[(games_df.homeTeamAbbr==team)|(games_df.visitorTeamAbbr==team)]
    point_dict[team]=0
    point_dict[team]+=games_df[games_df.homeTeamAbbr==team].homeFinalScore.sum()
    point_dict[team]+=games_df[games_df.visitorTeamAbbr==team].visitorFinalScore.sum()
    point_dict[team]/= len(games_played_df)
    point_dict[team] = round(point_dict[team],1)

    win_dict[team] = round(len(games_played_df[games_played_df.winner==team]) / len(games_played_df),2)
    
team_points_df = pd.DataFrame({
    'team':list(point_dict),
    'ppg':list(point_dict.values())
})

team_wins_df = pd.DataFrame({
    'team':list(win_dict),
    'win_%':list(win_dict.values())
})
# team_wins_df.head(2)

In [4]:
import plotly.express as px
import numpy as np

def px_team_scatter(metric):
    team_runsuccess_df = rush_df[~rush_df.playDescription.str.contains('FUMBLE|TOUCHDOWN')].groupby('possessionTeam')[[metric]].mean().reset_index().rename(columns={'possessionTeam':'team'})
    team_df = team_runsuccess_df.merge(team_wins_df).merge(team_points_df)

    # trend line
    z = np.polyfit(team_df[metric], team_df['ppg'], 1)
    p = np.poly1d(z)
    team_df['trendline'] = p(team_df[metric])
    
    fig = px.scatter(
        team_df, 
        x=metric, 
        y='ppg', 
        size='win_%', 
        hover_name='team',
        size_max=15,
        title='Team Performance by Run Success',
        labels={
            metric: 'Run Success %',
            'ppg': 'Points Per Game'
        }
    )
    
    # trendline
    fig.add_scatter(
        x=team_df[metric], 
        y=team_df['trendline'], 
        mode='lines', 
        name=f'Trend Line', 
        line=dict(dash='dash', color='red')
    )
    
    # Customize legend and layout
    fig.update_traces(marker=dict(opacity=0.8, line=dict(width=1, color='black')))
    fig.update_layout(
        annotations=[
            dict(
                xref="paper", 
                yref="paper", 
                x=.05, 
                y=-0.2, 
                showarrow=False, 
                text="Bubble size represents Win Percentage"
            )
        ]
    )
    fig.update_layout(showlegend=False)
    fig.show()
    # fig.write_image("/kaggle/working/Ex1_Team_Perf_by_RunSuccess.png")
    # fig.write_html("/kaggle/working/Ex1_Team_Perf_by_RunSuccess.html")
px_team_scatter('runSuccess')

In [5]:
# Create table to show EPA differences if play is Run Success or not
    # group by YPG
    # filter out plays where EPA would be significantly imapcted for reasons other than yards gained
        # turnovers, touchdowns, 4th down reached

tmp_df = rush_df[['gameId', 'playId', 'playDescription', 'down', 'yardsGained', 'runSuccess']].merge(play_df[['gameId', 'playId','expectedPointsAdded']])
filtered_df = (tmp_df[
    (~tmp_df.playDescription.str.contains('FUMBLE|TOUCHDOWN'))&
    (tmp_df.yardsGained.between(1,10))&
    (tmp_df.down.isin([1,2]))
    ])
run_success_by_yards_gained = (
    filtered_df
    .groupby(['yardsGained', 'runSuccess'])[['expectedPointsAdded']]
    .mean()
    .reset_index()
    .pivot(index='yardsGained', columns='runSuccess', values='expectedPointsAdded')
    .reset_index()
)
run_success_by_yards_gained.columns = ['Yards Gained', 'Run Fail', 'Run Success']
# filtered_df
# run_success_by_yards_gained.head(2)

In [6]:
# Plotly table for Run Success

from plotly import graph_objects as go

header_values = list(run_success_by_yards_gained.columns)  # Column names
cell_values = [round(run_success_by_yards_gained[col], 3).tolist() for col in run_success_by_yards_gained.columns]  # Data for each column

# Create Plotly Table
fig = go.Figure(data=[go.Table(
    header=dict(values=header_values),
    cells=dict(values=cell_values)
)])
fig.update_layout(
    annotations=[
    dict(
        text="Excludes 3rd down, 4th down, and any play resulting in a turnover or touchdown",
        x=0.5,
        y=-0.1,
        xref="paper",
        yref="paper",
        showarrow=False,
        font=dict(size=12)
    )],
    title=dict(
        text="Avg. Expected Point Added by Rush Yard Gained",
        x=0.5,
        xanchor='center',
        yanchor='top'
    )
)
# fig.write_image("/kaggle/working/Ex2_EPA_YPR_RunSuccess.png")
# fig.write_html("/kaggle/working/Ex2_EPA_YPR_RunSuccess.html")
fig.show()

In [7]:
## Blocks Script -- Code to create Blocks Data

import geopandas as gpd

def o_math(num, val):
    ''' adding and subtracting to an orientation value while ensuring result is [0,360] '''
    min_, max_ = num-val, num+val
    if max_ > 360:
        max_ = max_-360
    if min_ < 0:
        min_ = 360+min_
    return min_, max_

def filter_orientation(df, col1, col2, val):
    ''' 
    use to filter gpd intersection table two players are 'facing' each other 

    col1: player1
    col2: player2
    val: orientation range to consider in one direction 
    '''
    filtered_df = df.copy()
    for index, row in df.iterrows():
        min_, max_ = o_math(row[col1], val)
        if min_ <= max_:
            if not (min_ <= row[col2] <= max_):
                filtered_df.drop(index, inplace=True)
        else:
            if not (row[col2] >= min_ or row[col2] <= max_):
                filtered_df.drop(index, inplace=True)
    return filtered_df

def players_engaged_df(gdf, distance, poss_team):
    '''
    create table intersection of players via coordinates.  
    '''
    play_direction = list(gdf.playDirection)[0]
    # poss_team = list(gdf.possessionTeam)[0]
    
    if isinstance(gdf, pd.DataFrame):
        gdf = gpd.GeoDataFrame(gdf, geometry=gpd.points_from_xy(gdf.x, gdf.y))
        
    gdf = gdf.copy()
    gdf['geometry'] = gdf.geometry.buffer(distance)
    gdf = gpd.sjoin(gdf, gdf, how='inner', predicate='intersects') # point intersection with self
    
    #defending team always on one side of resulting table
    filter_poss = (gdf.club_right==poss_team) if play_direction == 'left' else (gdf.club_left==poss_team)
    gdf = (
        gdf[filter_poss& # only want one pair of the combo
           (gdf.displayName_right!=gdf.displayName_left)&
           (gdf.club_right!=gdf.club_left)&
           (gdf.frameId_right == gdf.frameId_left)&
           (gdf.displayName_left!='football')&
           (gdf.displayName_right!='football')#&
        ]).reset_index(drop=True)
    gdf = filter_orientation(gdf, 'o_left', 'o_right', 270).reset_index(drop=True)
    return gdf.rename(columns={'gameId_left':'gameId', 'playId_left':'playId'})

def get_blocks_from_engaged_df(df:pd.DataFrame, poss_team):
    '''
    Loop through frames in dataframe of engaged_players_df. Find seqential frames where defender blocked by one or two players

    NOTE: did NOT factor in runner of the football here. this is addressed later in analysis portion of code
    '''
    df = df.reset_index(drop=True)
    # poss_team = df.possessionTeam_left.iloc[0]
    # lazy way to determine who has ball:
    if poss_team == df.club_right.iloc[0]:
        poss_id = 'nflId_right'
        poss_name = 'displayName_right'
        def_id = 'nflId_left'
        def_name = 'displayName_left'
    elif poss_team == df.club_left.iloc[0]:
        poss_id = 'nflId_left'
        poss_name = 'displayName_left'
        def_id = 'nflId_right'
        def_name = 'displayName_right'
    else:
        raise ValueError(f'poss_team is invalid name. got {poss_team}')
    
    single_block_dict={}
    double_block_dict={}
    for f_id in df.sort_values(by=['frameId_left']).frameId_left.unique(): # left and right are the same value
        
        # filter by frame, sort by defender
        df_1 = df[df.frameId_left==f_id].sort_values(by=[def_id])

        # loop through player
        for p_id in df_1[def_id].unique():   

            # filter by player, sort by blocker (poss_id)
            df_ = df_1[df_1[def_id]==p_id].reset_index().sort_values(by=[poss_id])

            # dont need engagements with more than 2 offensive players
            if len(df_)>2: # only want 1 rusher, 1-2 defenders
                continue

            # NOTE: dict usage not best practice but ok for now
            if len(df_)==1:
                # (defender_id, defender_name, blocker_id, blocker_name)
                key = (
                    int(df_.iloc[0][def_id]),
                    df_.iloc[0][def_name],
                    int(df_.iloc[0][poss_id]),
                    df_.iloc[0][poss_name],
                )
                # if the key exists, add to value, otherwise create list of one
                if key in single_block_dict:
                    single_block_dict[key].append(f_id) 
                else:
                    single_block_dict[key] = [f_id]
                continue

            # ^ similar pattern for above, but with 2 defenders
            # (defender_id, defender_name, (blocker1_id, blocker1_name, blocker2_id, blocker2_name))
            if len(df_)==2:
                key = (
                    int(df_.iloc[0][def_id]),
                    df_.iloc[0][def_name],
                    (
                        int(df_.iloc[0][poss_id]),
                        df_.iloc[0][poss_name],
                        int(df_.iloc[1][poss_id]),
                        df_.iloc[1][poss_name],
                    )
                )
                if key in double_block_dict:
                    double_block_dict[key].append(f_id) 
                else:
                    double_block_dict[key] = [f_id]
                continue
    return single_block_dict, double_block_dict

def find_consecutive_groups(lst):
    '''
    Blocking dicts have keys where players are engaged, and values as lists of frame_ids.

    This function focuses on the lists: want to account for successive (within 2) engagements between players
    output will be the start and end frame_id of the block
    '''
    groups = []
    start = lst[0] # start at 1st pos
    end = lst[0] # ^ 

    for i in range(1, len(lst)):
        if lst[i] <= end + 2: # if next value is < two more than current end
            end = lst[i] # set end as the next value
        else: # otherwise, we are end of conesecutive groups determine if worthy to add to list
            if end != start: # if not length of one
                groups.append([start, end]) # append to groups
            start = lst[i] # reset to next value in loop
            end = lst[i] # ^ 

    # for the last values (which wont hit else block)
    if end != start:
        groups.append([start, end])

    return groups

def create_blocks_df(single_block_dict, double_block_dict):
    '''
    Blocking dicts have keys where players are engaged, and values as lists of frame_ids.

    This function creates a dataframe with defender & blocker(s) info + the start and end of the block by frame_id
    '''
    records = []
    for k,s in single_block_dict.items():
        rusherId, rusherName, blocker1Id, blocker1Name = k # unpack key
        groups = find_consecutive_groups(list(s)) # find start and end of engagement 
        for group in groups: # append records
            records.append(
                [group[0], group[1], rusherId, rusherName, blocker1Id, blocker1Name, None, None]
            )
    for k,d in double_block_dict.items(): # same as above with added consideration for double block key
        rusherId, rusherName, blockers = k
        blocker1Id, blocker1Name, blocker2Id, blocker2Name = blockers
        groups = find_consecutive_groups(list(d))
        for group in groups:
            records.append(
                [group[0], group[1], rusherId, rusherName, blocker1Id, blocker1Name, blocker2Id, blocker2Name]
            )

    # will rename columns
    col_names = [
        'frameIdStart',
        'frameIdEnd', 
        'defenderId',
        'defenderName',
        'blocker1Id',
        'blocker1Name',
        'blocker2Id',
        'blocker2Name'
    ]
    
    return pd.DataFrame(records, columns=col_names)

def get_def_start_end_tracking_data(df:pd.DataFrame, p_track_df:pd.DataFrame):
    ''' attach the defender starting & ending tracking data to the engagement data '''
    
    df = df.merge(
        p_track_df[['gameId', 'playId', 'nflId', 'frameId', 'playDirection', 'x', 'y', 's', 'a', 'dis', 'o', 'dir']],
        how='left',
        left_on=['gameId', 'playId', 'defenderId', 'frameIdStart'],
        right_on=['gameId', 'playId', 'nflId', 'frameId'],
        suffixes=('_left', '_right')
    )
    df = df.rename(columns={c:f'start_{c}' for c in ['x', 'y', 's', 'a', 'dis', 'o', 'dir']})
    df = df.merge(
        p_track_df[['gameId', 'playId', 'nflId', 'frameId', 'x', 'y']],
        how='left',
        left_on=['gameId', 'playId', 'defenderId', 'frameIdEnd'],
        right_on=['gameId', 'playId', 'nflId', 'frameId'],
        suffixes=('_left', '_right')
    )
    df = df.rename(columns={c:f'end_{c}' for c in ['x', 'y']})

    drop_columns = [c for c in df if c.endswith('_left')] + [c for c in df if c.endswith('_right')]
    # print(drop_columns)
    return df.drop(drop_columns,axis=1)
  

In [8]:
## Data processing script -- code to load data and run functions to create blocks (engagement) table

import pandas as pd
import polars as pl

# wrapper object, helpful in dev for organization and to not have to reload data each time a bug was identified
class PlayerTracking:
    def __init__(self):
        self._game_table = self._get_game_table()

    @staticmethod
    def _get_game_table():
        ''' 
        table with the game and the path to the correspoding tracking file

        (not sure why i did this and not using the week in the game table... wouldve been much quicker i think)
        '''
        df_list = []
        for i in range(1,10):
            path = f'/kaggle/input/nfl-big-data-bowl-2025/tracking_week_{i}.csv'
            df_list.append(
                pl
                .scan_csv(path)
                .select('gameId')
                .unique()
                .with_columns(pl.lit(path).alias("path"))
            )
        return pl.concat(df_list).collect().to_pandas()

    @staticmethod
    def _get_play_from_ids(path, g_id, p_id):
        return (
            pl
            .scan_csv(path, ignore_errors=True)
            .filter((pl.col('gameId')==g_id)&(pl.col('playId')==p_id)&(pl.col('frameType')!='BEFORE_SNAP'))
            .collect()
        )

    def get_game(self, g_id) -> pl.DataFrame:
        path = self._game_table[self._game_table.gameId==g_id]['path'].item()
        return pl.read_csv(path, ignore_errors=True).filter((pl.col('gameId')==g_id))
        
    def get_play(self, p_id, g_ind:pl.DataFrame|int|None) -> pd.DataFrame:
        ''' 
        return frames from tracking data post snap 

        in 'real' processing, the dataframe for all games is passed, but in dev i used to test chosen games
        '''
        if g_ind is None:
            raise ValueError

        if isinstance(g_ind, pl.DataFrame): # used in processing (entire df of games)
            _play_filters = [
                (pl.col('playId')==p_id), 
                (pl.col('frameType')!='BEFORE_SNAP')
            ]
            return g_ind.filter(_play_filters).to_pandas()
            
        if isinstance(g_ind, int): # used in dev (single game, by id)
            path = self._game_table[self._game_table.gameId==g_ind]['path'].item()
            _play_filters = [
                (pl.col('gameId')==g_ind), 
                (pl.col('playId')==p_id), 
                (pl.col('frameType')!='BEFORE_SNAP')
            ]
            return pl.scan_csv(path, ignore_errors=True).filter(_play_filters).to_pandas()

def get_run_plays_dict(df, weeks:list|int|None=None):
    ''' return dictionary of running plays for a week {game_id:[play_id]}'''
    if weeks:
        if isinstance(weeks, int):
            weeks = [weeks]
        df = df[df.week.isin(weeks)]
        
    run_plays_tups = list(zip(df.gameId, df.playId))

    d = {}
    for tup in run_plays_tups:
        gid, pid = tup
        if gid in d:
            d[gid].append(pid)
        else:
            d[gid]=[pid]
    return d


def get_possession_team(df:pd.DataFrame, gid, pid):
    return df[(df['gameId']==gid)&(df['playId']==pid)]['possessionTeam'].iloc[0]

def get_play_end(df):
    ''' consider end of play for engagement consideration at one of these '''
    play_end_events = ['out_of_bounds', 'tackle', 'fumble', 'fumble_offense_recovered', 'touchdown']
    df_ = df[df.event.isin(play_end_events)].fillna(0).copy()
    frame = list(df_.frameId)[0] if len(df_)>0 else df.frameId.max()
    return df[df.frameId<=frame]

In [9]:
# %%time

# code takes <2 hours to run if you run in batches of 3 by adjusting the range accordingly

# PT = PlayerTracking()

# for i in range(1,10):
#     run_plays_dict = get_run_plays_dict(rush_df, weeks=[i])
#     df_list = []
#     for gid, pids in run_plays_dict.items():
        
#         game_df = PT.get_game(gid)

#         for pid in pids:
#             # print(gid, pid)
#             _p_track_df = get_play_end(PT.get_play(pid, game_df))

#             poss_team = get_possession_team(rush_df, gid, pid)

#             _engaged_df = players_engaged_df(_p_track_df, .75, poss_team)
#             if len(_engaged_df) == 0:
#                 continue
    
#             single_block_dict, double_block_dict = get_blocks_from_engaged_df(_engaged_df, poss_team)
            
#             _blocks_df = create_blocks_df(single_block_dict, double_block_dict)
#             _blocks_df['gameId'] = gid
#             _blocks_df['playId'] = pid
        
#             _blocks_df = get_def_start_end_tracking_data(_blocks_df, _p_track_df)
    
#             df_list.append(_blocks_df)
            
#     df = pd.concat(df_list)

#     ### NOTE: forgot to account for tackles.. will adjust data after loading 
#     df.to_parquet(f'/kaggle/working/blocks_week_{i}.parquet')

In [10]:
%%time 

# code to create the following tacking related data:
    # ball position during play (used to calc distance from engagement to ball)
    # player starting position (left, right, center, lineman, box)

# if player on right/left side of football
def get_player_side(row):
    if row['fbStart_y']-2 < row['y'] < row['fbStart_y']+2:
        return 'center'
        
    if row['playDirection'] == 'left':
        if row['y'] > row['fbStart_y']:
            return 'right'
        else:
            return 'left'
    elif row['playDirection'] == 'right':
        if row['y'] > row['fbStart_y']:
            return 'left'
        else:
            return 'right'
    else:
        raise ValueError('bad playDirection')

def get_player_on_los(row):
    if row['fbStart_x']+2 >= row['x'] >= row['fbStart_x']-2:
        return 1
    else:
        return 0

def get_player_in_box(row):
    if row['fbStart_y']+3.5 >= row['y'] >= row['fbStart_y']-3.5: # width = 3.5 yards
        if row['fbStart_x']+5.5 >= row['x'] >= row['fbStart_x']-5.5: # depth = 5.5 yards
            return 1
    else:
        return 0

def get_lineman(row):
    if row['playerStartLOS'] == 1 and row['playerStartBox'] == 1:
        return 1
    else:
        return 0

# code takes ~ 4 minutes to run 

# ball_df_list = []
# player_df_list = []
# for i in range(1, 10):
#     p_track_df = pd.read_csv(f'/kaggle/input/nfl-big-data-bowl-2025/tracking_week_{i}.csv')

#     # create football location df 
#     _football_loc_df = (
#         p_track_df[
#         (p_track_df.frameType!='BEFORE_SNAP')&
#         (p_track_df.displayName=='football')
#         ][['gameId', 'playId', 'frameId', 'frameType', 'x', 'y']]
#     )

#     _player_loc_df = (
#         rush_df[rush_df.week==int(i)][['gameId', 'playId', 'week', 'possessionTeam']]
#         .merge(
#             (
#                 p_track_df[
#                 (p_track_df.frameType=='SNAP')&
#                 (p_track_df.displayName!='football')]
#                 .drop(['frameId', 'frameType'],axis=1) # to avoid dup when merging w fb_loc
#             ), # players location at the snap
#             how='left',
#             on=['gameId', 'playId'])
#         .merge(
#             (
#                 _football_loc_df[_football_loc_df.frameType=='SNAP']
#                 .rename(columns={'x':'fbStart_x', 'y':'fbStart_y'})
#             ),
#             how='left',
#             on=['gameId', 'playId'])
#         .query('possessionTeam != club')
#     )
    
#     # add dfs to list for concat later
#     ball_df_list.append(_football_loc_df)
#     player_df_list.append(_player_loc_df)
    
# player_loc_df = pd.concat(player_df_list)
# ball_loc_df = pd.concat(ball_df_list)
# ball_snap_df = ball_loc_df[ball_loc_df.frameType=='SNAP'] # used in the past but might not need anymore (probably?)

# add the player positioning to df
# player_loc_df['playerStartSide'] = player_loc_df.apply(get_player_side, axis=1)
# player_loc_df['playerStartLOS'] = player_loc_df.apply(get_player_on_los, axis=1)
# player_loc_df['playerStartBox'] = player_loc_df.apply(get_player_in_box, axis=1)
# player_loc_df['playerStartLineman'] = player_loc_df.apply(get_lineman, axis=1)

# ball_loc_df.to_parquet('/kaggle/working/ball_position.parquet')
# player_loc_df.to_parquet('/kaggle/working/player_position.parquet')

ball_loc_df = pd.read_parquet('/kaggle/input/nfl-2022-position-tracking/ball_position.parquet')
player_loc_df = pd.read_parquet('/kaggle/input/nfl-2022-position-tracking/player_position.parquet')

player_loc_df['playerStartSide'] = player_loc_df.apply(get_player_side, axis=1)
player_loc_df['playerStartLOS'] = player_loc_df.apply(get_player_on_los, axis=1)
player_loc_df['playerStartBox'] = player_loc_df.apply(get_player_in_box, axis=1)
player_loc_df['playerStartLineman'] = player_loc_df.apply(get_lineman, axis=1)

CPU times: user 3.76 s, sys: 378 ms, total: 4.14 s
Wall time: 4.1 s


In [11]:
# calculate the backwards distance of blocks
# also remove ball carrier from engagement data

# import pandas as pd
# import numpy as np

# Define the function
def get_backward_distance(row):
    # Extract data from the row
    start_x = row['start_x']
    start_y = row['start_y']
    start_o = row['start_o']
    end_x = row['end_x']
    end_y = row['end_y']
    
    # convert angle to radians
    theta = np.radians(start_o)
    
    # calculate displacement
    dx = end_x - start_x
    dy = end_y - start_y
    
    # calculate backward distance using the backward unit vector
    backward_distance = dx * (-np.sin(theta)) + dy * (-np.cos(theta))
    
    return backward_distance

def get_distance(c1, c2):
    return np.sqrt(
        (c1[0]-c2[0])**2 + (c1[1]-c2[1])**2
    )

blocks_df = pd.read_parquet('/kaggle/input/nfl-2022-blocks-v2')

# calculate the backward distance of defender for each block
blocks_df['blockYardsCeded'] = blocks_df.apply(get_backward_distance, axis=1)

# calculate the length of the block
blocks_df['blockLength'] = blocks_df['frameIdEnd'] - blocks_df['frameIdStart']

# merge blocks with ball location
blocks_df = blocks_df.merge(
    (
        ball_loc_df[['gameId', 'playId', 'frameId','x','y']]
        .rename(columns={'x':'fbLoc_x', 'y':'fbLoc_y'})
    ),
    how='left',
    left_on=['gameId', 'playId','frameIdStart'],
    right_on=['gameId', 'playId', 'frameId']
)

# calculate distance from the ball to the defender
blocks_df['distFromBall'] = blocks_df.apply(lambda x: get_distance((x['start_x'],x['start_y']),(x['fbLoc_x'],x['fbLoc_y'])), axis=1)

# going to assume that any 'block' with .75 yds from ball is with ball carrier
blocks_df = blocks_df[blocks_df.distFromBall > .75]

print(blocks_df.shape)
display(blocks_df.head(2))
display(blocks_df.tail(2))

(121029, 26)


Unnamed: 0,frameIdStart,frameIdEnd,defenderId,defenderName,blocker1Id,blocker1Name,blocker2Id,blocker2Name,gameId,playId,playDirection,start_x,start_y,start_s,start_a,start_dis,start_o,start_dir,end_x,end_y,blockYardsCeded,blockLength,frameId,fbLoc_x,fbLoc_y,distFromBall
0,106,113,47917,Greg Gaines,42392,Mitch Morse,,,2022090800,101,left,71.09,29.03,0.03,0.04,0.02,109.88,328.72,71.2,29.27,-0.021832,7,106,72.040001,29.52,1.068926
1,113,115,41239,Aaron Donald,44875,Dion Dawkins,,,2022090800,101,left,71.56,26.99,2.2,2.7,0.23,87.82,22.8,71.78,27.56,-0.241523,2,113,73.580002,29.91,3.550606


Unnamed: 0,frameIdStart,frameIdEnd,defenderId,defenderName,blocker1Id,blocker1Name,blocker2Id,blocker2Name,gameId,playId,playDirection,start_x,start_y,start_s,start_a,start_dis,start_o,start_dir,end_x,end_y,blockYardsCeded,blockLength,frameId,fbLoc_x,fbLoc_y,distFromBall
128409,95,97,52578,Broderick Washington,44844,Ryan Ramczyk,55125.0,Lewis Kidd,2022110700,3787,right,21.25,20.72,0.72,0.97,0.07,308.98,182.4,21.3,20.63,0.095483,2,95,17.530001,23.01,4.368352
128410,101,108,52578,Broderick Washington,43525,Dwayne Washington,55125.0,Lewis Kidd,2022110700,3787,right,21.61,20.72,1.14,1.35,0.11,317.37,64.69,22.81,20.6,0.901003,7,101,20.790001,21.870001,1.412409


In [12]:
# group positions

players_df = pd.read_csv('/kaggle/input/nfl-big-data-bowl-2025/players.csv')

pos_groups_map = {
    'DE':'DL', # probably should consider EDGE as its own group.. but wasnt drastically different from DL, so kept it simple
    'NT':'DL',
    'SS':'DB',
    'FS':'DB',
    'OLB':'LB',
    'DT':'DL',
    'CB':'DB',
    'ILB':'LB',
    'MLB':'LB',
    'DB':'DB',
    'LB':'LB'
}

players_df['positionGroup'] = players_df['position'].map(pos_groups_map)

In [13]:
# creating metric
    # only want to create metric on 7 weeks of data to ensure no data leakage
    # i kept the other formulas i was experimenting with but the uncommented out one is the formula i used

# filter to 7 weeks of training data
blocks_df_wk_1_7 = (
    games_df[games_df.week<=7][['gameId']]
    .merge(blocks_df, how='left')
)

# to be used to add starting positions to base table
player_snap_df = (
    blocks_df_wk_1_7[['defenderId', 'defenderName', 'playId']]
    .drop_duplicates()
    .groupby(['defenderId', 'defenderName'])[['playId']]
    .agg(n_snaps=('playId', 'count'))
    .reset_index()
)

# create table of single and double blocks. will adjust double block dislocations
s_block_df = (
    blocks_df_wk_1_7[blocks_df_wk_1_7.blocker2Id.isna()]
    .groupby(['defenderId' ,'defenderName'])[['blockYardsCeded', 'blockLength']]
    .agg(
        s_block_yc = ('blockYardsCeded', 'sum'),
        s_block_length = ('blockLength', 'sum'),
        s_block_count = ('blockLength', 'count')
    )
    .reset_index()
)
d_block_df = (
    blocks_df_wk_1_7[~blocks_df_wk_1_7.blocker2Id.isna()]
    .assign(d_block_yca=lambda x: -np.log(1 + (np.e ** (1-x['blockYardsCeded'])))) # apply log formula to adjust double blocks dilocation
    .groupby(['defenderId' ,'defenderName'])[['blockYardsCeded', 'd_block_yca', 'blockLength']]
    .agg(
        d_block_yc = ('blockYardsCeded', 'sum'),
        d_block_yca = ('d_block_yca', 'sum'),
        d_block_length = ('blockLength', 'sum'),
        d_block_count = ('blockLength', 'count'),
    )
    .reset_index()
)

# combine the single and double blocks
player_sd_blocks_df = (
    s_block_df
    .merge(d_block_df, on=['defenderId', 'defenderName'], how='left')
    .merge(player_snap_df, how='left')
    .merge(
        players_df[['nflId', 'position', 'positionGroup']],
        how='left',
        left_on=['defenderId'],
        right_on=['nflId'],
    )
)

# calculate the time spent on each block, and number of total blocks
player_sd_blocks_df['%_time_d_blocked'] = player_sd_blocks_df['d_block_length'] / (player_sd_blocks_df['s_block_length'] + player_sd_blocks_df['d_block_length'])
player_sd_blocks_df['n_blocks'] = player_sd_blocks_df['s_block_count'] + player_sd_blocks_df['d_block_count']

# 0 -> S + D
# 1 -> (S + D) / Nb
# 2 -> (S + D) / Ns
# 3 -> S + Da
# 4 -> (S + Da) / Nb
# 5 -> (S + Da) / Ns
# 6 -> Sw + Dw
# 7 -> (Sw + Dw) / Nb
# 8-> (Sw + Dw) / Ns
# 9 -> S / Nbs + D / Nbd

# player_sd_blocks_df['yca0'] = player_sd_blocks_df['s_block_yc'] + player_sd_blocks_df['d_block_yc']

# player_sd_blocks_df['yca1'] = (player_sd_blocks_df['s_block_yc'] + player_sd_blocks_df['d_block_yc']) / player_sd_blocks_df['n_blocks']

# player_sd_blocks_df['yca2'] = (player_sd_blocks_df['s_block_yc'] + player_sd_blocks_df['d_block_yc']) / player_sd_blocks_df['n_snaps']

# player_sd_blocks_df['yca3'] = player_sd_blocks_df['s_block_yc'] + player_sd_blocks_df['d_block_yca']

# player_sd_blocks_df['yca4'] = (player_sd_blocks_df['s_block_yc'] + player_sd_blocks_df['d_block_yca']) / player_sd_blocks_df['n_blocks']

# player_sd_blocks_df['yca5'] = (player_sd_blocks_df['s_block_yc'] + player_sd_blocks_df['d_block_yca']) / player_sd_blocks_df['n_snaps']

player_sd_blocks_df['yca6'] = (
    (player_sd_blocks_df['s_block_yc'] * (1-player_sd_blocks_df['%_time_d_blocked'])+ 
     player_sd_blocks_df['d_block_yca'] * player_sd_blocks_df['%_time_d_blocked'] )
)

# player_sd_blocks_df['yca7'] = (
#     (player_sd_blocks_df['s_block_yc'] * (1-player_sd_blocks_df['%_time_d_blocked'])+ 
#      player_sd_blocks_df['d_block_yca'] * player_sd_blocks_df['%_time_d_blocked'] ) / 
#     player_sd_blocks_df['n_blocks']
# )

# player_sd_blocks_df['yca8'] = (
#     (player_sd_blocks_df['s_block_yc'] * (1-player_sd_blocks_df['%_time_d_blocked'])+ 
#      player_sd_blocks_df['d_block_yca'] * player_sd_blocks_df['%_time_d_blocked'] ) / 
#     player_sd_blocks_df['n_snaps']
# )

# player_sd_blocks_df['yca9'] = (
#     (player_sd_blocks_df['s_block_yc'] / player_sd_blocks_df['s_block_count']) + (player_sd_blocks_df['d_block_yca'] / player_sd_blocks_df['d_block_count'])
# )

player_sd_blocks_df.head(2)

# only consider players with adequate number of blocks
min_blocks = 30

In [14]:
# show graph % of double blocks by avg. block dislocation

import plotly.express as px

# tmp_df = player_metric_df.merge(player_sd_blocks_df).groupby('metric')[['%_time_d_blocked']].mean().reset_index()
# plt.plot(tmp_df['metric'], tmp_df['%_time_d_blocked'], marker='o')
_tmp_df = player_sd_blocks_df[player_sd_blocks_df.n_blocks>=min_blocks]
_tmp_df['t_block_yc'] = (_tmp_df.s_block_yc + _tmp_df.d_block_yc) / _tmp_df.n_snaps
    
# trend line
z = np.polyfit(_tmp_df.t_block_yc, _tmp_df['%_time_d_blocked'], 1)
p = np.poly1d(z)
_tmp_df['trendline'] = p(_tmp_df.t_block_yc)
    
fig = px.scatter(
    _tmp_df, 
    x='t_block_yc', 
    y='%_time_d_blocked', 
    hover_name='defenderName',
    title='Average Defender Block Dislocation by Double Team Blocked %',
    labels={
        't_block_yc': 'Defender Block Dislocation',
        '%_time_d_blocked': 'Double Teamed %'
    }
)
    
# trendline
fig.add_scatter(
    x=_tmp_df.t_block_yc, 
    y=_tmp_df['trendline'], 
    mode='lines', 
    name=f'Trend Line', 
    line=dict(dash='dash', color='red')
)
    
# Customize legend and layout
fig.update_traces(marker=dict(opacity=0.8, line=dict(width=1, color='black')))
fig.update_layout(showlegend=False)
fig.show()

# fig.write_image("/kaggle/working/Ex3_AvgBD_by_PctDoubleTeamd.png")
# fig.write_html("/kaggle/working/Ex3_AvgBD_by_PctDoubleTeamd.html")

In [15]:
# see range of dislocation metric
var = 'yca6'

# display(player_sd_blocks_df[player_sd_blocks_df.n_blocks>=min_blocks][var].describe())
# player_sd_blocks_df[player_sd_blocks_df.n_blocks>=min_blocks][var].min(), player_sd_blocks_df[player_sd_blocks_df.n_blocks>=min_blocks][var].max()

In [16]:
# see what players are rated highly
test = (
    player_sd_blocks_df[player_sd_blocks_df.n_blocks>=min_blocks]
    .sort_values(by=var)[['defenderName', 'positionGroup', 's_block_yc', 'd_block_yca', var]]
)
# test.head(50)

In [17]:
# create table with players, raw dislocation value, and index (masked as 'metric')
metric = 'metric'
_tmp_df = player_sd_blocks_df[player_sd_blocks_df.n_blocks>=30]
_tmp_df[metric] = pd.qcut(_tmp_df[var], q=12, labels=False)+1
player_metric_df = (
    player_sd_blocks_df[['nflId' ,'defenderName', 'positionGroup', 'n_blocks']]
    .drop_duplicates()
    .merge(_tmp_df[['nflId', 'defenderName', var, metric]], how='left')
)

In [18]:
# show stacked bar of BDI by position group

graph_index_df = player_metric_df[player_metric_df.n_blocks>min_blocks]
graph_index_df['metric'] = graph_index_df['metric'].astype('O')
fig = px.bar(
    graph_index_df,
    x="metric",
    color="positionGroup",
    title="Block Dislocation Index by Position Group",
    labels={"count": "Count", "metric": "Index"},
    hover_name='defenderName',
    barmode="stack"
)
# fig.write_html("/kaggle/working/Ex4_BD_by_PosGrp.html")
# fig.write_image("/kaggle/working/Ex4_BD_by_PosGrp.png")

fig.show()

In [19]:
# show component tables of base dataframe
    # at defender level
    # includes starting position relative to ball and in defense
    # includes BDI

# print(rush_df.shape)
# display(rush_df.head(2))

# print(player_loc_df.shape)
# display(player_loc_df.head(2))

# print(player_metric_df.shape)
# display(player_metric_df.head(2))

In [20]:
# determine if defender is playside (if run is directed towards their starting position)
    # if inside run, we consider players in the 'center' lined between the guards
    # map BDI to a simplified value
    # apply the count to each player

def get_playside_ind(row):
    rush_typ = row['rushLocationType'].split('_')[0]
    rush_dir = row['rushLocationType'].split('_')[-1].lower()
    if row['playerStartSide'] == rush_dir:
        return 1
    if rush_typ=='INSIDE':
        if row['playerStartSide'] == 'center':
            return 1
    return 0
    # return 1 if row['playerStartSide'] == row['rushLocationType'].split('_')[-1].lower() else 0

def get_counter(row, lower, upper, metric):
    ''' arbitarily setting the 'counter' (yes reference to blackjack counting) '''
    if row[metric] > upper:
        return 1
    elif row[metric] < lower:
        return -1
    else:
        return 0

loc_drop_cols = [
    'week', 
    'possessionTeam', 
    'displayName', 
    'time', 
    'jerseyNumber',
    'x',
    'y',
    's', 
    'a',
    'dis',
    'o',
    'dir',
    'event',
    'frameId',
    'frameType'
]
base_df = (
    rush_df
    .merge(
        (
            player_loc_df
            .drop(loc_drop_cols, axis=1)
        ),
        how='left',
        on=['gameId', 'playId']
    )
    .merge(
        (
            player_metric_df
            # .drop(['nflId'], axis=1)
        ),
        how='left',
        # left_on='nflId',
        # right_on='defenderId'
    )
)

base_df['playerStartRushLocationSide'] = base_df.apply(get_playside_ind, axis=1) # great name

# setting the count
metric_count = f'{metric}_count'
base_df[metric_count] = base_df['metric'].map({
    1:0,
    2:0,
    3:1,
    4:1,
    5:1,
    6:1,
    7:2,
    8:2,
    9:2,
    10:2,
    11:2,
    12:2,
})

play_pos_grp_df = (
    base_df
    .groupby(['gameId', 'playId', 'positionGroup'])
    .size()
    .reset_index(name='count')
    .pivot(index=['gameId', 'playId'], columns='positionGroup', values='count')
    .reset_index()
)

# print(base_df.shape)
# base_df.head(10)

In [21]:
## this is ugly... surely theres a better way.....

# group the base data to play data in order to aggregate the count

base_df[f'{metric}_allPlayerAvg'] = base_df.groupby(['gameId', 'playId'])[metric].transform('mean')
base_df[f'{metric}_boxPlayerAvg'] = base_df[base_df.playerStartBox==1].groupby(['gameId', 'playId'])[metric].transform('mean')
base_df[f'{metric}_losPlayerAvg'] = base_df[base_df.playerStartLOS==1].groupby(['gameId', 'playId'])[metric].transform('mean')
base_df[f'{metric}_linemanPlayerAvg'] = base_df[base_df.playerStartLineman==1].groupby(['gameId', 'playId'])[metric].transform('mean')

base_df[f'{metric}_allPlayerCnt'] = base_df.groupby(['gameId', 'playId'])[metric_count].transform('sum')
base_df[f'{metric}_boxPlayerCnt'] = base_df[base_df.playerStartBox==1].groupby(['gameId', 'playId'])[metric_count].transform('sum')
base_df[f'{metric}_losPlayerCnt'] = base_df[base_df.playerStartLOS==1].groupby(['gameId', 'playId'])[metric_count].transform('sum')
base_df[f'{metric}_linemanPlayerCnt'] = base_df[base_df.playerStartLineman==1].groupby(['gameId', 'playId'])[metric_count].transform('sum')

base_df[f'{metric}_allPlaysidePlayerAvg'] = base_df[base_df.playerStartRushLocationSide==1].groupby(['gameId', 'playId'])[metric].transform('mean')
base_df[f'{metric}_boxPlaysidePlayerAvg'] = base_df[(base_df.playerStartBox==1)&(base_df.playerStartRushLocationSide==1)].groupby(['gameId', 'playId'])[metric].transform('mean')
base_df[f'{metric}_losPlaysidePlayerAvg'] = base_df[(base_df.playerStartLOS==1)&(base_df.playerStartRushLocationSide==1)].groupby(['gameId', 'playId'])[metric].transform('mean')
base_df[f'{metric}_linemanPlaysidePlayerAvg'] = base_df[(base_df.playerStartLineman==1)&(base_df.playerStartRushLocationSide==1)].groupby(['gameId', 'playId'])[metric].transform('mean')

base_df[f'{metric}_allPlaysidePlayerCnt'] = base_df[base_df.playerStartRushLocationSide==1].groupby(['gameId', 'playId'])[metric_count].transform('sum')
base_df[f'{metric}_boxPlaysidePlayerCnt'] = base_df[(base_df.playerStartBox==1)&(base_df.playerStartRushLocationSide==1)].groupby(['gameId', 'playId'])[metric_count].transform('sum')
base_df[f'{metric}_losPlaysidePlayerCnt'] = base_df[(base_df.playerStartLOS==1)&(base_df.playerStartRushLocationSide==1)].groupby(['gameId', 'playId'])[metric_count].transform('sum')
base_df[f'{metric}_linemanPlaysidePlayerCnt'] = base_df[(base_df.playerStartLineman==1)&(base_df.playerStartRushLocationSide==1)].groupby(['gameId', 'playId'])[metric_count].transform('sum')

grp_cols = ['gameId', 'playId']
first_cols = [
    'quarter',
    'down',
    'week',
    'possessionTeam',
    'yardsToGo',
    'rushLocationType',
    'yardsGained',
    'pff_runConceptPrimary',
    'pff_runConceptSecondary',
    'runSuccess'
]
avg_cols = list(base_df.columns[-16:])

agg_dict = {**{c:'first' for c in first_cols}, **{c:'mean' for c in avg_cols}}

psp_df = base_df.groupby(grp_cols).agg(agg_dict).reset_index().merge(play_pos_grp_df, how='left')

# psp_df.head(2)

In [22]:
# play visual script - used to animate plays
def get_track_play(df, gid, pid):
    return df[(df['gameId']==gid)&(df['playId']==pid)]

def animate_play(df, yards_to_go):

    # Determine the line of scrimmage
    line_of_scrimmage = list(df[(df.displayName=='football')&(df.frameId==1)]['x'])[0]
    line_first_down = round(abs(line_of_scrimmage - yards_to_go)) if df['playDirection'].iloc[0] == 'left' else round(abs(line_of_scrimmage + yards_to_go))

    snap_id = list(df[df.frameType=='SNAP']['frameId'])[0]
    df=df[df.frameId>snap_id-20]
    
    # Define the field dimensions
    min_x = max(0, df["x"].min() - 10)
    max_x = df["x"].max() + 10
    field_height = [0, 53.3]
    
    fig = px.scatter(
        df,
        x="x",
        y="y",
        color="club",
        hover_name="displayName",
        animation_frame="frameId",
        text="jerseyNumber", 
        # size_max=50 # not working
    )

    # fig.update_traces(marker=dict(size=10))

    # Style the text on the markers
    fig.update_traces(
        marker=dict(size=15),
        textfont=dict(size=12, color="white"),  # Adjust text size and color
        textposition="middle center",  # Position text in the center of each marker
    )
    
    # Add a green background for the field
    fig.update_layout(
        plot_bgcolor="green",
        xaxis=dict(range=[min_x, max_x], title="Yards", zeroline=False, showgrid=False),
        # xaxis=dict(range=[10, 110], title="Yards", zeroline=False, showgrid=False),
        yaxis=dict(range=field_height, title="Yards", zeroline=False, showgrid=False),
    )

    # Add the line of scrimmage (blue) and first down line (yellow)
    fig.add_shape(
        type="line",
        x0=line_of_scrimmage,
        x1=line_of_scrimmage,
        y0=field_height[0],
        y1=field_height[1],
        line=dict(color="blue", width=2),
        name="Line of Scrimmage"
    )
    fig.add_shape(
        type="line",
        x0=line_first_down,
        x1=line_first_down,
        y0=field_height[0],
        y1=field_height[1],
        line=dict(color="yellow", width=2),
        name="First Down"
    )

    # yard to go -- not working
    # fig.add_annotation(
    #     x=(line_of_scrimmage + line_first_down) / 2,
    #     y=field_height[1] + 1,
    #     text=f"Yards to Gain: {yards_to_go}",
    #     showarrow=True,
    #     font=dict(color="black", size=16),
    #     # bgcolor="rgba(0,0,0,0.7)",
    #     # borderpad=5,
    # )

    # Add yard lines (10-yard increments)
    for x in range(int(min_x), int(max_x) + 1, 10):
    # for x in range(10, 110 + 1, 10):
        fig.add_shape(
            type="line",
            x0=x,
            x1=x,
            y0=field_height[0],
            y1=field_height[1],
            line=dict(color="white", width=1, dash="dot"),
        )
    
    fig.layout.updatemenus[0].buttons[0].args[1]["frame"]["duration"] = 150
    
    return fig
    
# p_track_df = pd.read_csv('/kaggle/input/nfl-big-data-bowl-2025/tracking_week_1.csv')
# play_df = get_track_play(p_track_df, 2022091100, 2353)

# animate_play(play_df, 10)#.write_html("/kaggle/working/play_animation_2022091100_2353.html")

In [23]:
# play visual script - show pre-snap defensive count

def show_count(df, yards_to_go):
    line_of_scrimmage = list(df[df.displayName=='football']['x'])[0]
    line_first_down = round(abs(line_of_scrimmage - yards_to_go)) if df['playDirection'].iloc[0] == 'left' else round(abs(line_of_scrimmage + yards_to_go))
    
    # Define the field dimensions
    min_x = max(0, df["x"].min() - 10)
    max_x = df["x"].max() + 10
    field_height = [0, 53.3]
    
    fig = px.scatter(
        df,
        x="x",
        y="y",
        color="playerStartSide",
        hover_name="displayName",
        text="metric"
    )
    
    # Style the text on the markers
    fig.update_traces(
        marker=dict(size=15),
        textfont=dict(size=12, color="white"),  # Adjust text size and color
        textposition="middle center",  # Position text in the center of each marker
    )
    
    # Add a green background for the field
    fig.update_layout(
        plot_bgcolor="green",
        xaxis=dict(range=[min_x, max_x], title="Yards", zeroline=False, showgrid=False),
        # xaxis=dict(range=[10, 110], title="Yards", zeroline=False, showgrid=False),
        yaxis=dict(range=field_height, title="Yards", zeroline=False, showgrid=False),
    )

    # Add the line of scrimmage (blue) and first down line (yellow)
    fig.add_shape(
        type="line",
        x0=line_of_scrimmage,
        x1=line_of_scrimmage,
        y0=field_height[0],
        y1=field_height[1],
        line=dict(color="blue", width=2),
        name="Line of Scrimmage"
    )
    fig.add_shape(
        type="line",
        x0=line_first_down,
        x1=line_first_down,
        y0=field_height[0],
        y1=field_height[1],
        line=dict(color="yellow", width=2),
        name="First Down"
    )

    for x in range(round(int(min_x), -1), round(int(max_x) + 1, -1), 10):
    # for x in range(10, 110 + 1, 10):
        fig.add_shape(
            type="line",
            x0=x,
            x1=x,
            y0=field_height[0],
            y1=field_height[1],
            line=dict(color="white", width=1, dash="dot"),
        )

    football_x = list(df[df.displayName == 'football']['x'])[0]
    football_y = list(df[df.displayName == 'football']['y'])[0]

    # Define the bounds of the dotted box
    box_x_min = football_x
    box_x_max = football_x + 5.5
    box_y_min = football_y - 3.5
    box_y_max = football_y + 3.5

    # Add the dotted box to the plot
    fig.add_shape(
        type="rect",
        x0=box_x_min,
        x1=box_x_max,
        y0=box_y_min,
        y1=box_y_max,
        line=dict(color="red", width=2, dash="dot"),
    )
    
    return fig

#list(play_df[play_df.frameType=='SNAP'].frameId)[0]-1 # -1
# tmp_df = play_df[play_df.frameId==151].merge(base_df, how='left')[['displayName', 'club', 'jerseyNumber', 'playDirection', 'x', 'y','playerStartSide', 'metric']]
# tmp_df['metric'] = tmp_df['metric'].map({
#     1:0,
#     2:0,
#     3:1,
#     4:1,
#     5:1,
#     6:1,
#     7:2,
#     8:2,
#     9:2,
#     10:2,
#     11:2,
#     12:2,
# })
# tmp_df['playerStartSide'] = tmp_df['playerStartSide'].fillna('offense')
# tmp_df.loc[22, 'playerStartSide'] = 'football'
# tmp_df['metric'] = tmp_df['metric'].fillna(99).astype(int).astype('O').replace(99,'')
# fig = show_count(tmp_df, 10)
# tmp_df
# fig.write_html(f"/kaggle/working/Ex9_BDIC_presnap.html")
# fig.write_image(f"/kaggle/working/Ex9_BDIC_presnap.png")

In [24]:
# snapshot of the count

# psp_df[psp_df.columns[-16:]].describe()
# psp_df[['runSuccess']+[c for c in psp_df if c.startswith('metric')]].corr()

In [25]:
## Show pattern of avoiding the top players on DL and in box is beneficial - Table

def get_run_success_matrix(var):
    df_list = []
    for ds in ['train', 'holdout']:
        filter_ = (~psp_df.week.isin([8,9])) if ds=='train' else (psp_df.week.isin([8,9]))
        tmp_df = (
            psp_df[filter_]
            .groupby(var)[['runSuccess']]
            .agg(
                count=('runSuccess', 'count'), 
                avg_success=('runSuccess', 'mean')
            )
            .reset_index()
            .rename(columns={'count':f'{ds}_count', 'avg_success':f'{ds}_avg_success'})
        )
        df_list.append(tmp_df)
    df = df_list[0].merge(df_list[1], on=var)
    return df

# display(get_run_success_matrix('metric_boxPlaysidePlayerCnt'))

psp_df['count_range'] = np.where(psp_df['metric_boxPlaysidePlayerCnt'] < 2, '0-2', '3+')
psp_df['partition'] = np.where(psp_df['week'] <8, 'training', 'testing')
grp_df = psp_df.groupby(['count_range', 'partition'])['runSuccess'].mean().unstack()
# grp_df.reset_index()

In [26]:
## Show pattern of avoiding the top players on DL and in box is beneficial - Graph

import plotly.graph_objects as go
import numpy as np
import pandas as pd

def get_run_success_matrix(var, psp_df, max_=6):
    title = 'Run Success by Play BDI Count'

    # Split data into train and test sets
    filter_train = (~psp_df.week.isin([8, 9]))
    filter_test = (psp_df.week.isin([8, 9]))

    # Calculate standard error for each group
    tmp_df_train = (
        psp_df[filter_train]
        .groupby(var)[['runSuccess']]
        .agg(
            count=('runSuccess', 'count'),
            avg_success=('runSuccess', 'mean'),
            std_error=('runSuccess', lambda x: np.std(x) / np.sqrt(len(x)))  # Standard error
        )
        .reset_index()
    )
    tmp_df_test = (
        psp_df[filter_test]
        .groupby(var)[['runSuccess']]
        .agg(
            count=('runSuccess', 'count'),
            avg_success=('runSuccess', 'mean'),
            std_error=('runSuccess', lambda x: np.std(x) / np.sqrt(len(x)))  # Standard error
        )
        .reset_index()
    )

    # Filter by the max_ value
    tmp_df_train = tmp_df_train[tmp_df_train[var] <= max_]
    tmp_df_test = tmp_df_test[tmp_df_test[var] <= max_]

    # Create the figure
    fig = go.Figure()

    # Add mean + error bars for the training dataset
    fig.add_trace(go.Scatter(
        x=[x-.2 for x in tmp_df_train[var]],
        y=tmp_df_train['avg_success'],
        error_y=dict(
            type='data',
            array=tmp_df_train['std_error'],
            visible=True
        ),
        mode='markers',
        name='Train',
        marker=dict(color='blue')
    ))

    # Add mean + error bars for the validation dataset
    fig.add_trace(go.Scatter(
        x=[x+.2 for x in tmp_df_test[var]],
        y=tmp_df_test['avg_success'],
        error_y=dict(
            type='data',
            array=tmp_df_test['std_error'],
            visible=True
        ),
        mode='markers',
        name='Test',
        marker=dict(color='red')
    ))

    # Add count bars for the training dataset
    fig.add_trace(go.Bar(
        x=[x - 0.2 for x in tmp_df_train[var]],  # Shift bars slightly to the left
        y=tmp_df_train['count'],
        name='Train Count',
        marker=dict(color='blue', opacity=0.3),
        yaxis='y2',
        showlegend=False
    ))

    # Add count bars for the validation dataset
    fig.add_trace(go.Bar(
        x=[x + 0.2 for x in tmp_df_test[var]],  # Shift bars slightly to the right
        y=tmp_df_test['count'],
        name='Test Count',
        marker=dict(color='red', opacity=0.3),
        yaxis='y2',
        showlegend=False
    ))

    # Update layout for dual y-axes and legend
    fig.update_layout(
        title=title,
        xaxis=dict(title='BDI Count in Box on Playside'),
        yaxis=dict(title='Avg. Run Success', side='left'),
        yaxis2=dict(
            title='Plays',
            overlaying='y',
            side='right'
        ),
        barmode='group',  # Group bars side by side
        legend=dict(title='Legend'),
    )

    return fig, title
fig, title = get_run_success_matrix('metric_boxPlaysidePlayerCnt', psp_df)
# fig.show()
# fig.write_html(f"/kaggle/working/Ex5_{'_'.join(title.split(' '))}.html")
# fig.write_image(f"/kaggle/working/Ex5_{'_'.join(title.split(' '))}.png")

In [27]:
# EDA modeling

import lightgbm as lgb

from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score
from sklearn.inspection import PartialDependenceDisplay
from sklearn.ensemble import HistGradientBoostingClassifier
from skopt import BayesSearchCV
from sklearn.model_selection import StratifiedKFold

def pdp_feature(model, df, feature_name, feature_names):
    if isinstance(feature_name, str):
        feature_name = [feature_name]
    features = [feature_names.index(f) for f in feature_name]
    fig, ax = plt.subplots(figsize=(12, 6))
    PartialDependenceDisplay.from_estimator(model, df, features, ax=ax)
    plt.show()

In [28]:
def data_prep(drop_cols, m=True):
    ''' curate data for modeling. toggle between metric included or not '''
    df = psp_df[psp_df.down.isin([1,2,3,4])].drop(['count_range', 'partition'], axis=1)
    
    all_m = [c for c in psp_df if c.startswith('metric')]
    
    if m:
        df = df.drop([c for c in all_m if c!='metric_boxPlaysidePlayerCnt'], axis=1)
    else:
        df = df.drop(all_m, axis=1)
    
    label_encoder = LabelEncoder()
    for cat_col in ['rushLocationType', 'possessionTeam', 'pff_runConceptPrimary', 'pff_runConceptSecondary']:
        df[cat_col] = label_encoder.fit_transform(df[cat_col])
    
    train_df = df[df.week<=7]
    test_df = df[df.week>7]
 
    X_train = train_df[[c for c in df if c not in drop_cols]]
    y_train = train_df['runSuccess']

    X_test = test_df[[c for c in df if c not in drop_cols]]
    y_test = test_df['runSuccess']

    return X_train, X_test, y_test, y_train


In [29]:
# ensure target is consitent in train/test
# print(y_train.mean())
# print(y_test.mean())

In [30]:
# optimize model

# drop_cols = [
#     'gameId',
#     'playId',
#     'week',
#     'runSuccess',
#     'yardsGained',
#     'possessionTeam',
#     'DB',
#     'DL',
#     'LB',
#     'pff_runConceptPrimary',
#     # 'pff_runConceptSecondary',
#     ]

# X_train, X_test, y_test, y_train = data_prep(drop_cols, True)

# param_space = {
#     'num_leaves': (4, 70),
#     'max_depth': (3, 20),
#     'learning_rate': (0.005, 0.3, 'log-uniform'),
#     'n_estimators': (20, 200)
# }

# opt = BayesSearchCV(
#     estimator=lgb.LGBMClassifier(),
#     search_spaces=param_space,
#     cv=StratifiedKFold(n_splits=5),
#     n_iter=32,
#     scoring='accuracy',
#     n_jobs=-1,
#     verbose=0
# ).fit(X_train, y_train)

# # Best parameters
# print(opt.best_params_) # params
# print( opt.best_score_) # score

In [31]:
# build single model -- with metric

drop_cols = [
    'gameId',
    'playId',
    'week',
    'quarter',
    'runSuccess',
    'yardsGained',
    'possessionTeam',
    'DB',
    'DL',
    'LB',
    # 'pff_runConceptPrimary',
    'pff_runConceptSecondary',
    ]

X_train, X_test, y_test, y_train = data_prep(drop_cols, True)

model_a_params = {
    'learning_rate':0.01,
    'max_depth':4,
    'n_estimators':163,
    'num_leaves':69,
    'verbose':-1
}
model_a = lgb.LGBMClassifier(**model_a_params).fit(X_train, y_train)

display(dict(zip(model_a.feature_name_, model_a.feature_importances_)))

y_pred = model_a.predict(X_test)
# print(y_pred.mean())
print(f'Accuracy: {accuracy_score(y_test, y_pred)}')

{'down': 284,
 'yardsToGo': 667,
 'rushLocationType': 364,
 'pff_runConceptPrimary': 618,
 'metric_boxPlaysidePlayerCnt': 367}

Accuracy: 0.6075851393188855


In [32]:
# build single model -- without metric

X_train, X_test, y_test, y_train = data_prep(drop_cols, False)

model_b_params = {
    'learning_rate':0.0492,
    'max_depth':4,
    'n_estimators':34,
    'num_leaves':47,
    'verbose':-1
}
model_b = lgb.LGBMClassifier(**model_b_params).fit(X_train, y_train)

display(dict(zip(model_b.feature_name_, model_b.feature_importances_)))

y_pred = model_b.predict(X_test)
# print(y_pred.mean())
print(f'Accuracy: {accuracy_score(y_test, y_pred)}')

{'down': 73,
 'yardsToGo': 159,
 'rushLocationType': 112,
 'pff_runConceptPrimary': 142}

Accuracy: 0.608359133126935


In [33]:
# create pdp with feature
# pdp_feature(
#     model_a, 
#     X_train, 
#     # X_test, 
#     'metric_boxPlaysidePlayerCnt', 
#     model_a.feature_name_
# )

In [34]:
# END HERE

# below is HistGradientBoostingClassifier experimentation i did, but was not used in the model

In [35]:
# param_space = {
#     'max_iter': (50, 200),
#     'max_leaf_nodes': (20, 50),
#     'learning_rate': (0.01, 0.3, 'log-uniform'),
#     'max_depth': (5, 15)
# }

# opt = BayesSearchCV(
#     estimator=HistGradientBoostingClassifier(max_iter=200),
#     search_spaces=param_space,
#     cv=StratifiedKFold(n_splits=5),
#     n_iter=32,
#     scoring='accuracy',
#     n_jobs=-1,
#     verbose=0
# ).fit(X_train, y_train)

# # Best parameters
# print(opt.best_params_) # params
# print( opt.best_score_) # score

In [36]:
# hgbc_params = {
#     'learning_rate':0.018487089636919054,
#     'max_depth':8,
#     'max_iter':158,
#     'max_leaf_nodes':20,
#     'verbose':0
# }
# model = HistGradientBoostingClassifier(**hgbc_params).fit(X_train, y_train)
# # dict(zip(model.feature_names_in_, model.feature_importances_))

In [37]:
# y_pred = model.predict(X_test)
# print(y_pred.mean())
# print(f'Accuracy: {accuracy_score(y_test, y_pred)}')

In [38]:
# pdp_feature(
#     model, 
#     X_train, 
#     m, 
#     list(model.feature_names_in_)
# )