# NBA Game Predictor Model
### CMPE 257 Project
Authors: Kaushika Uppu, Miranda Billawala, Yun Ei Hlaing, Iris Cheung

## Imports

In [None]:
import pandas as pd
import numpy as np
import time
import matplotlib.pyplot as plt
import seaborn as sns

import random
from datetime import datetime, timedelta
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier

from xgboost import XGBRegressor
from sklearn.metrics import mean_squared_error

## NBA Game Data

First, we load in all of the NBA game data from the CSV file. Exact code for gathering data is in a separate file and use the nba_api file. Only games from the 1985-1986 season and afterward are loaded in as the seasons before that are missing a very significant portion of the game statistics' data. We also want to be able to map from team id to abbreviation and back easily.

In [None]:
all_stats_cleaned = pd.read_csv('all_stats_cleaned.csv')
all_stats_cleaned.head()

In [None]:
all_stats_cleaned.shape

In [None]:
team_id_to_abb = {} # dictionary to convert from team_id to team_abbreviation
team_abb_to_id = {} # dictionary to convert from team_abbreviation to team_id

teams = (all_stats_cleaned[['TEAM_ID', 'TEAM_ABBREVIATION']]).drop_duplicates()

for index, row in teams.iterrows() :
    if row['TEAM_ID'] not in team_id_to_abb.keys():
        team_id_to_abb[row['TEAM_ID']] = []
    team_id_to_abb[row['TEAM_ID']].append(row['TEAM_ABBREVIATION'])
    team_abb_to_id[row['TEAM_ABBREVIATION']] = row['TEAM_ID']

### Merging Home and Away Team Stats Into One Row

Currently, each game is represented by two separate rows in the dataset - one for the home team and one for the away team. To make the data more clear, we decided to combine the two rows into a single row with statistics for both teams. Since predicting with our model will pass one set order of team one and team two (i.e. Lakers as Team One, Warriors as Team Two), we want to make sure that the model realizes games with the Lakers as Team Two and Warriors as Team One are more similar than may appear by the data. To do this, we will duplicate the rows and flip the teams. Then, we will have each game listed twice with the teams flipped. 

Firstly, we split the dataset into two : home games and away games. Then, we performed a join on these two datasets, matching each home team with its corresponding opponent based on the same dates. 

In [None]:
home = all_stats_cleaned[all_stats_cleaned.HOME == 1]
away = all_stats_cleaned[all_stats_cleaned.HOME == 0]

In [None]:
combined_stats_home = pd.merge(home, away, 
                          left_on=['GAME_DATE', 'OPPONENT'], 
                          right_on=['GAME_DATE', 'TEAM_ABBREVIATION'],
                          suffixes=('_ONE', '_TWO'))
combined_stats_away = pd.merge(away, home, 
                          left_on=['GAME_DATE', 'OPPONENT'], 
                          right_on=['GAME_DATE', 'TEAM_ABBREVIATION'],
                          suffixes=('_ONE', '_TWO'))

combined_stats = pd.concat([combined_stats_home, combined_stats_away], ignore_index = True)

In [None]:
combined_stats.head(5)

Comparing the number of rows in the combined dataset to the original shows that the dataset row have been reduced by half, as each game is now represented by a single row instead of two.

After merging the rows, there are some columns that appear twice or are now unneccessary to the dataset. These columns include `MIN_ONE`/`MIN_TWO` (length of game in minutes), `SEASON_YEAR_ONE`/`SEASON_YEAR_TWO`, `OPPONENT_ONE` and `OPPONENT_TWO`.

We first checked if the `MIN_ONE` and `MIN_TWO` for each row has the same values. As seen below, there are 24 games where the minutes differed slightly. However, since the difference did not seem to be significant, we decided to retain one column and rename it `MIN`.

In [None]:
(combined_stats['MIN_ONE'] != combined_stats['MIN_TWO']).sum()

In [None]:
combined_stats[combined_stats['MIN_ONE'] != combined_stats['MIN_TWO']][['MIN_ONE','MIN_TWO']]

In [None]:
combined_stats = combined_stats.drop(columns = ['MIN_TWO', 'OPPONENT_ONE', 'OPPONENT_TWO', 'SEASON_YEAR_ONE'])
combined_stats.rename(columns={'MIN_ONE': 'MIN', 'SEASON_YEAR_TWO': 'SEASON_YEAR'}, inplace=True)

## Feature Engineering

Features to add : 
1) Win streak
2) Win percentage
3) ELO Scores
4) EFG%
5) TS%

### Win Streak and Win Percentage

In [None]:
def add_win_streak_and_percentage(df, combined=False):
    """
    Input: Dataframe with team one and team two data for each game and boolean to check if dataframe is combined with both team data
    Output: New dataframe with added win streak and win percentage for both teams
    """
    team_date_stats = all_stats_cleaned[['TEAM_ID', 'GAME_DATE', 'WIN']].sort_values(by=['TEAM_ID', 'GAME_DATE']).reset_index(drop=True)
    team_date_stats['WIN_STREAK'] = 0
    team_date_stats['WIN_PERCENTAGE'] = 0.0
    
    for team_id, group in team_date_stats.groupby('TEAM_ID'):
        streak = 0
        wins = 0
        total_games = 0
        indices = group.index
    
        for i in range(len(indices)):
            idx = indices[i]
    
            # WIN STREAK
            team_date_stats.at[idx, 'WIN_STREAK'] = streak
    
            if team_date_stats.at[idx, 'WIN'] == 1:
                streak += 1
            else: 
                streak = 0
    
            # WIN PERCENTAGE
            if total_games == 0:
                team_date_stats.at[idx, 'WIN_PERCENTAGE'] = 0.0
            else: 
                team_date_stats.at[idx, 'WIN_PERCENTAGE'] = wins / total_games
    
            total_games += 1
            if team_date_stats.at[idx, 'WIN'] == 1:
                wins += 1

    if combined:
    # Join Win streak and Win percentage of team one and team two into the merged table
        team_date_stats.drop('WIN', axis=1, inplace=True)
        df = pd.merge(df, team_date_stats,
                              how='left', 
                              left_on = ['TEAM_ID_ONE', 'GAME_DATE'],
                              right_on=['TEAM_ID', 'GAME_DATE'])
        df.drop('TEAM_ID', axis=1, inplace=True)
        df.rename(columns = {'WIN_STREAK': 'WIN_STREAK_ONE',
                                     'WIN_PERCENTAGE': 'WIN_PERCENTAGE_ONE'}, inplace=True)
        df = pd.merge(df, team_date_stats,
                              how='left', 
                              left_on = ['TEAM_ID_TWO', 'GAME_DATE'],
                              right_on=['TEAM_ID', 'GAME_DATE'])
        df.drop('TEAM_ID', axis=1, inplace=True)
        df.rename(columns = {'WIN_STREAK': 'WIN_STREAK_TWO',
                                     'WIN_PERCENTAGE': 'WIN_PERCENTAGE_TWO'}, inplace=True)
    else:
        # Join Win streak and Win percentage into the dataframe
        team_date_stats.drop('WIN', axis=1, inplace=True)
        df = pd.merge(df, team_date_stats,
                              how='left', 
                              on = ['TEAM_ID', 'GAME_DATE'])
    
    return df

### ELO Score Before Current Game

In [None]:
def merge_opponent_points(df):
    df_opp = df[['TEAM_ABBREVIATION', 'GAME_DATE', 'PTS', 'TEAM_ID']].copy()
    merged_df = pd.merge(df, df_opp, 
                         how='left',
                          left_on=['GAME_DATE', 'OPPONENT'],
                            right_on=['GAME_DATE', 'TEAM_ABBREVIATION'],
                          suffixes=('', '_OPPONENT'))
    merged_df.drop(columns=['TEAM_ABBREVIATION_OPPONENT'], inplace=True)
    return merged_df

In [None]:
def add_elo_score(df, combined=False):
    """
    Input: Dataframe with team one and team two data for each game and boolean to check if dataframe is combined with both team data
    Output: New dataframe with elo scores for both teams added 
    """
    if combined:
        df['GAME_ID'] = df.apply(
        lambda row: '_'.join(sorted([str(row['TEAM_ID_ONE']), str(row['TEAM_ID_TWO'])]) + [str(row['GAME_DATE'])]),
        axis=1
    )
        df['ELO_ONE'] = np.nan
        df['ELO_TWO'] = np.nan
    else:
        df = merge_opponent_points(df)
        df['ELO'] = np.nan
        df['GAME_ID'] = df.apply(
        lambda row: '_'.join(sorted([str(row['TEAM_ID']), str(row['TEAM_ID_OPPONENT'])]) + [str(row['GAME_DATE'])]),
        axis=1
    )
    
    team_elos = {} # to use for checking if a team has appeared and track team last elo scores
    team_last_season = {} # to track last seasons of teams
    processed_games = set() # to track game id - handle duplicate game columns
    elo_map = {} # for faster computation
    df = df.sort_values(by='GAME_DATE').reset_index(drop=True)
    
    for i,row in df.iterrows():
        season = row['SEASON_YEAR']
        game_id = row['GAME_ID']

        if game_id in processed_games:
            continue
        processed_games.add(game_id)

        if combined:
            team_one, team_two = row['TEAM_ID_ONE'], row['TEAM_ID_TWO']
            points_one, points_two = row['PTS_ONE'], row['PTS_TWO']
            home_one = row['HOME_ONE']
        
            # Season adjustment formula for ELO : New Season ELO = 0.75 * Last Season ELO + 0.25 * Mean ELO, Mean ELO = 1505
            for team in [team_one, team_two]:
                # check if team has not appeared yet in the dataset
                if team not in team_elos:
                    team_elos[team] = 1505 
                    team_last_season[team] = season
                # check for new season, if yes, apply season adjustment
                elif team_last_season[team] != season:
                    team_elos[team] = 0.75 * team_elos[team] + 0.25 * 1505
                    team_last_season[team] = season
        
            # elo scores before game
            elo_one = team_elos[team_one]
            elo_two = team_elos[team_two]
        
            # Add 100 score to home team
            if home_one == 1:
                elo_one_after_home_adv = elo_one + 100 
                elo_two_after_home_adv = elo_two
            else:
                elo_one_after_home_adv = elo_one 
                elo_two_after_home_adv = elo_two + 100
        
            # Expected score of game formula : exp = 1/ (1+10^((ELO two after home advantage - ELO one after home advantage) / 400))
            exp = 1/ (1+10**((elo_two_after_home_adv - elo_one_after_home_adv) / 400))
        
            actual = 1 if points_one > points_two else 0
            margin_of_victory = abs(points_one - points_two)
        
            # Margin of Victory Multiplier formula : ((MOV + 3) ** 0.8) / (7.5 + 0.006 * (Elo team one - Elo team two))
            MOVM = ((margin_of_victory + 3) ** 0.8) / (7.5 + 0.006 * (elo_one - elo_two))
        
            # change in ELO: K * MOVM * (actual - exp), k -> attenuation factor -> higher means elo score adjusts quickly to changes in strength of team
            K = 20 # 20 is optimal for nba 
            change = K * MOVM * (actual - exp)
    
            # Update data for ELO ratings
            team_elos[team_one] += change
            team_elos[team_two] -= change
        
            # store elo score for game id at the table
            # df.at[i, 'ELO_ONE'] = elo_one
            # df.at[i, 'ELO_TWO'] = elo_two
            # df.loc[(df['GAME_ID'] == game_id) & df['TEAM_ID_ONE'] == team_two, 'ELO_ONE'] = elo_two
            # df.loc[(df['GAME_ID'] == game_id) & df['TEAM_ID_TWO'] == team_one, 'ELO_TWO'] = elo_one

            # store elo scores in dictionary
            elo_map[(game_id, team_one, team_two)] = elo_one
            elo_map[(game_id, team_two, team_one)] = elo_two
     
        else:
            team, team_opp = row['TEAM_ID'], row['TEAM_ID_OPPONENT']
            points_team, points_opp = row['PTS'], row['PTS_OPPONENT']
            home = row['HOME']
        
            # Season adjustment formula for ELO : New Season ELO = 0.75 * Last Season ELO + 0.25 * Mean ELO, Mean ELO = 1505
            for t in [team, team_opp]:
                # check if team has not appeared yet in the dataset
                if t not in team_elos:
                    team_elos[t] = 1505 
                    team_last_season[t] = season
                # check for new season, if yes, apply season adjustment
                elif team_last_season[t] != season:
                    team_elos[t] = 0.75 * team_elos[t] + 0.25 * 1505
                    team_last_season[t] = season
        
            # elo scores before game
            elo_team = team_elos[team]
            elo_opponent = team_elos[team_opp]
        
            # Add 100 score to home team
            if home == 1:
                elo_team_home = elo_team + 100 
                elo_opp_home = elo_opponent
            else:
                elo_team_home = elo_team 
                elo_opp_home = elo_opponent + 100
        
            # Expected score of game formula : exp = 1/ (1+10^((ELO two after home advantage - ELO one after home advantage) / 400))
            exp = 1/ (1+10**((elo_opp_home - elo_team_home) / 400))
        
            actual = 1 if points_team > points_opp else 0
            margin_of_victory = abs(points_team - points_opp)
        
            # Margin of Victory Multiplier formula : ((MOV + 3) ** 0.8) / (7.5 + 0.006 * (Elo team one - Elo team two))
            MOVM = ((margin_of_victory + 3) ** 0.8) / (7.5 + 0.006 * (elo_team - elo_opponent))
        
            # change in ELO: K * MOVM * (actual - exp), k -> attenuation factor -> higher means elo score adjusts quickly to changes in strength of team
            K = 20 # 20 is optimal for nba 
            change = K * MOVM * (actual - exp)

            # Update data for ELO ratings
            team_elos[team] += change
            team_elos[team_opp] -= change
        
            # store elo score for both row of game at the table
            # df.at[i, 'ELO'] = elo_team
            # df.loc[(df['GAME_ID'] == game_id) & df['TEAM_ID'] == team_opp, 'ELO'] = elo_opponent
            elo_map[(game_id, team)] = elo_team
            elo_map[(game_id, team_opp)] = elo_opponent

    # add data from elo dictionary into dataframe
    if not combined:
        df['ELO'] = df.apply(lambda x: elo_map.get((x['GAME_ID'], x['TEAM_ID']), np.nan), axis=1)
        df.drop(columns=['PTS_OPPONENT', 'TEAM_ID_OPPONENT'], axis=1, inplace=True)
    else: 
        df['ELO_ONE'] = df.apply(lambda x: elo_map.get((x['GAME_ID'], x['TEAM_ID_ONE'], x['TEAM_ID_TWO']), np.nan), axis=1)
        df['ELO_TWO'] = df.apply(lambda x: elo_map.get((x['GAME_ID'], x['TEAM_ID_TWO'], x['TEAM_ID_ONE']), np.nan), axis=1)
    df.drop(columns=['GAME_ID'], axis=1, inplace=True)
    
            
    return df                                   

In [None]:
# test for single team data
test_1 = add_win_streak_and_percentage(all_stats_cleaned)
test_1 = add_elo_score(test_1)
print(test_1.columns)
test_1[test_1['TEAM_ID'] == 14].head(5)

In [None]:
# test for combined team and opponent data
test_2 = add_win_streak_and_percentage(combined_stats, True)
test_2 = add_elo_score(test_2, True)
test_2[test_2['TEAM_ID_ONE'] == 14].head(5)

### Effective Field Goal Percentage and True Shooting Percentage

In [None]:
def add_shooting_percentages(df, combined=False):
    if combined: 
        df['EFG%_ONE'] = (df['FGM_ONE'] + 1.5 * df['FG3M_ONE']) / df['FGA_ONE']
        df['EFG%_TWO'] = (df['FGM_TWO'] + 1.5 * df['FG3M_TWO']) / df['FGA_TWO']
        df['TS%_ONE'] = df['PTS_ONE'] / (2 * (df['FGA_ONE'] + 0.44 * df['FTA_ONE']))
        df['TS%_TWO'] = df['PTS_TWO'] / (2 * (df['FGA_TWO'] + 0.44 * df['FTA_TWO']))
    else:
        df['EFG%'] = (df['FGM'] + 1.5 * df['FG3M']) / df['FGA']
        df['TS%'] = df['PTS'] / (2 * (df['FGA'] + 0.44 * df['FTA']))
    return df    

### Point Differential

In [None]:
def add_point_differential(df, window = 5, combined=False):
        #  add opponent points to all_stats_cleaned table
    for team_id in all_stats_cleaned['TEAM_ID'].unique() :
        team_data = all_stats_cleaned[all_stats_cleaned['TEAM_ID'] == team_id].sort_values(by='GAME_DATE')
        for col in cols :
            shift = team_data[col].shift(1)
            team_data[col] = shift.rolling(window = n).mean()
        if result is None :
            result = team_data
        else :
            result = pd.concat([result, team_data])
    
    
    if combined:
        df['PTS_DIFF_ONE'] = df['PTS_ONE'] - df['PTS_TWO']
        df['PTS_DIFF_TWO'] = df['PTS_TWO'] - df['PTS_ONE']
    else:
        df = merge_opponent_points(df)
        df['PTS_DIFF'] = df['PTS'] - df['PTS_OPPONENT']
        df.drop(columns=['PTS_OPPONENT', 'TEAM_ID_OPPONENT'], axis=1, inplace=True)
    return df

### Win for Last Matchup Game

In [None]:
def add_win_last_game(df, combined=False):
    if combined:
        sorted_df = df.sort_values(by=['TEAM_ID_ONE', 'TEAM_ID_TWO', 'GAME_DATE'])
        sorted_df['WIN_LAST'] = sorted_df.groupby(['TEAM_ID_ONE', 'TEAM_ID_TWO'])['WIN_ONE'].shift(1)
        df = df.merge(sorted_df[['TEAM_ID_ONE', 'TEAM_ID_TWO', 'GAME_DATE', 'WIN_LAST']],
                      on=['TEAM_ID_ONE', 'TEAM_ID_TWO', 'GAME_DATE'],
                      how = 'left')
    else:
        sorted_df = df.sort_values(by=['TEAM_ID', 'OPPONENT', 'GAME_DATE'])
        sorted_df['WIN_LAST'] = sorted_df.groupby(['TEAM_ID', 'OPPONENT'])['WIN'].shift(1)
        df = df.merge(sorted_df[['TEAM_ID', 'OPPONENT', 'GAME_DATE', 'WIN_LAST']],
                      on=['TEAM_ID', 'OPPONENT', 'GAME_DATE'],
                      how = 'left')
    
    return df

In [None]:
# test
test_4 = add_win_last_game(all_stats_cleaned)
print(test_4.columns)
test_4[(test_4['TEAM_ID'] == 14) & (test_4['OPPONENT'] == 'BOS')].sort_values(by='GAME_DATE').head(5)

## Get Training Set

In [None]:
df = combined_pred_stats.drop(columns = ['TEAM_ABBREVIATION_ONE', 'TEAM_NAME_ONE', 'MIN', 'FGM_ONE', 
                                             'FGA_ONE', 'FG3M_ONE', 'FG3A_ONE', 'FTM_ONE', 'FTA_ONE', 'PTS_ONE', 
                                             'PLUS_MINUS_ONE', 'TEAM_ABBREVIATION_TWO', 'TEAM_NAME_TWO', 'HOME_TWO',
                                             'WIN_TWO', 'FGM_TWO', 'FGA_TWO', 'FG3M_TWO', 'FG3A_TWO', 'FTM_TWO', 
                                             'FTA_TWO', 'PTS_TWO', 'PLUS_MINUS_TWO'])

In [None]:
def get_val_set (first_season, last_season) :
    dates = []
    for season in range(first_season, last_season) :
        season_data = df[df['SEASON_YEAR'] == season]
        start_date = season_data['GAME_DATE'].min()
        end_date = season_data['GAME_DATE'].max()

        # day around the beginning of the season
        beg = season_data[season_data['GAME_DATE'].between(start_date, start_date + timedelta(weeks = 4))]

        # day around trade deadline (after about 2/3 of the season)
        delta = round((2/3)*(end_date-start_date).days)
        approx_deadline = start_date + timedelta(days = delta)
        mid = season_data[season_data['GAME_DATE'].between(approx_deadline, approx_deadline + timedelta(weeks = 4))]
        
        # day around the end of the season
        end = season_data[season_data['GAME_DATE'].between(end_date - timedelta(weeks = 4), end_date)]

        dates.extend(list(pd.concat([beg.sample(2)['GAME_DATE'], mid.sample(2)['GAME_DATE'], end.sample(2)['GAME_DATE']])))

    return dates

In [None]:
first_season = df['SEASON_YEAR'].min()
last_season = df['SEASON_YEAR'].max() - 4
val_set = get_val_set(first_season, last_season)

### Rolling Window Statistics (Baseline)

In [None]:
# added shooting percentage
all_stats_cleaned = add_shooting_percentages(all_stats_cleaned)
# added win streak and win percentage
all_stats_cleaned = add_win_streak_and_percentage(all_stats_cleaned)
# added ELO score
all_stats_cleaned = add_elo_score(all_stats_cleaned)
# added point differential
# all_stats_cleaned = add_point_differential(all_stats_cleaned)
# added win for last game
all_stats_cleaned = add_win_last_game(all_stats_cleaned)

In [None]:
cols = ['FG_PCT', 'FG3_PCT', 'FT_PCT', 'OREB', 'DREB', 'REB', 'AST', 'STL', 'BLK', 'TOV', 'PF', 'EFG%', 'TS%']

In [None]:
def rolling_window(n) :
    cols = ['FG_PCT', 'FG3_PCT', 'FT_PCT', 'OREB', 'DREB', 'REB', 'AST', 'STL', 'BLK', 'TOV', 'PF', 'EFG%', 'TS%']
    result = None
    for team_id in all_stats_cleaned['TEAM_ID'].unique() :
        team_data = all_stats_cleaned[all_stats_cleaned['TEAM_ID'] == team_id].sort_values(by='GAME_DATE')
        for col in cols :
            shift = team_data[col].shift(1)
            team_data[col] = shift.rolling(window = n).mean()
        if result is None :
            result = team_data
        else :
            result = pd.concat([result, team_data])
    
    return result

In [None]:
pred_stats = rolling_window(5)
print(pred_stats.shape)
pred_stats.head()

In [None]:
home = pred_stats[pred_stats['HOME'] == 1]
away = pred_stats[pred_stats['HOME'] == 0]

In [None]:
combined_pred_stats_home = pd.merge(home, away, 
                          left_on=['GAME_DATE', 'OPPONENT'], 
                          right_on=['GAME_DATE', 'TEAM_ABBREVIATION'],
                          suffixes=('_ONE', '_TWO'))
combined_pred_stats_away = pd.merge(away, home, 
                          left_on=['GAME_DATE', 'OPPONENT'], 
                          right_on=['GAME_DATE', 'TEAM_ABBREVIATION'],
                          suffixes=('_ONE', '_TWO'))

combined_pred_stats = pd.concat([combined_pred_stats_home, combined_pred_stats_away], ignore_index = True)

combined_pred_stats = combined_pred_stats.drop(columns = ['MIN_TWO', 'OPPONENT_ONE', 'OPPONENT_TWO', 'SEASON_YEAR_ONE'])
combined_pred_stats.rename(columns={'MIN_ONE': 'MIN', 'SEASON_YEAR_TWO': 'SEASON_YEAR'}, inplace=True)

In [None]:
combined_pred_stats.shape

In [None]:
# drop the first n*30 columns which have nan values because rolling window
combined_pred_stats = combined_pred_stats.dropna(axis = 0)

In [None]:
combined_pred_stats.shape

### Predicting Using ML Model

In [None]:
from xgboost import XGBRegressor
from sklearn.metrics import mean_squared_error

In [None]:
# adding shooting percentages to the game stats that need to be predicted
combined_stats = add_shooting_percentages(combined_stats, combined=True)

In [None]:
combined_stats.head()

In [None]:
stat_columns = ['FG_PCT_ONE', 'FG3_PCT_ONE', 'FT_PCT_ONE', 'OREB_ONE', 'DREB_ONE', 'REB_ONE',
                    'AST_ONE', 'STL_ONE', 'BLK_ONE', 'TOV_ONE', 'PF_ONE', 'EFG%_ONE', 'TS%_ONE']

def get_rolling(team_id, games, prefix, rolling_window = 10):
    """
    Gets rolling statistics for given team using given rolling window.
    """
    stats = {stat: [] for stat in stat_columns}
    for _, g in games.tail(rolling_window).iterrows():
        if g['TEAM_ID_ONE'] == team_id:
            for stat in stat_columns:
                stats[stat].append(g[stat])
        else:
            for stat in stat_columns:
                stats[stat].append(g[stat.replace('_ONE', '_TWO')])
    return {f"{prefix}_{stat}": np.mean(vals) for stat, vals in stats.items()}

def build_features_for_game(team_one_id, team_two_id, home_one, date, combined_stats, stat_columns, rolling_window):
    """
    Builds feature vector for a single game using rolling averages and metadata.
    """
    past_games = combined_stats[combined_stats['GAME_DATE'] < date].sort_values('GAME_DATE')

    # getting past games for both teams and calculating rolling averages
    team_one_games = past_games[
        (past_games['TEAM_ID_ONE'] == team_one_id) | (past_games['TEAM_ID_TWO'] == team_one_id)
    ]
    team_two_games = past_games[
        (past_games['TEAM_ID_ONE'] == team_two_id) | (past_games['TEAM_ID_TWO'] == team_two_id)
    ]

    if len(team_one_games) < rolling_window or len(team_two_games) < rolling_window:
        return None

    team_one_features = get_rolling(team_one_id, team_one_games, 'TEAM_ONE', rolling_window)
    team_two_features = get_rolling(team_two_id, team_two_games, 'TEAM_TWO', rolling_window)

    input_features = {
        'TEAM_ID_ONE': team_one_id,
        'TEAM_ID_TWO': team_two_id,
        'HOME_ONE': home_one,
        'SEASON_YEAR': date.year if date.month >= 10 else date.year - 1
    }
    input_features.update(team_one_features)
    input_features.update(team_two_features)
    return input_features

    
def train_model(combined_stats, stat_columns, rolling_window = 10):
    """
    Trains a model to predict team stats for a given game using past rolling averages of both teams.
    Features: past performance of TEAM_ONE and TEAM_TWO.
    Targets: actual stats of TEAM_ONE in the current game.
    """
    combined_stats = combined_stats.sort_values(by='GAME_DATE').reset_index(drop=True)
    feature_rows = []
    target_dict = {stat: [] for stat in stat_columns}

    for idx, row in combined_stats.iterrows():
        date = row['GAME_DATE']
        team_one = row['TEAM_ID_ONE']
        team_two = row['TEAM_ID_TWO']
        home = row['HOME_ONE']

        features = build_features_for_game(team_one, team_two, home, date, combined_stats, stat_columns, rolling_window)
        if features is None:
            continue
            
        feature_rows.append(features)

        for stat in stat_columns:
            target_dict[stat].append(row[stat])

    X = pd.DataFrame(feature_rows)
    X = pd.get_dummies(X, columns=['TEAM_ID_ONE', 'TEAM_ID_TWO', 'HOME_ONE'], drop_first=True)

    # fitting a XGBoost model for each stat
    models = {}
    for stat in stat_columns:
        y = pd.Series(target_dict[stat])
        model = XGBRegressor(n_estimators = 100)
        model.fit(X, y)
        models[stat] = model

    return models, X.columns.tolist()

def predict_game_stats(models, feature_cols, combined_stats, team_one_id, team_two_id, home_one, date,
                       stat_columns, rolling_window = 10):
    """
    Predicts the statistics for given game.
    """
    features = build_features_for_game(team_one_id, team_two_id, home_one, date, combined_stats, stat_columns, rolling_window)
    if features is None:
        raise ValueError("Not enough past games to make prediction.")

    X_new = pd.DataFrame([features])
    X_new = pd.get_dummies(X_new)
    for col in feature_cols:
        if col not in X_new.columns:
            X_new[col] = 0
    X_new = X_new[feature_cols]

    # predict stats using previously fitted models
    prediction = {}
    for stat, model in models.items():
        prediction[stat] = model.predict(X_new)[0]
    return prediction

In [None]:
def evaluate_stats_model(combined_stats, rolling_window = 10, test_seasons = 4, models = None, feature_cols = None):
    """
    Evaluates predicting stats model by testing on last `test_seasons` seasons using RMSE.
    """
    combined_stats = combined_stats.sort_values('GAME_DATE').reset_index(drop=True)
    combined_stats['SEASON_YEAR'] = combined_stats['GAME_DATE'].apply(
        lambda d: d.year if d.month >= 10 else d.year - 1
    )

    stat_columns = ['FG_PCT_ONE', 'FG3_PCT_ONE', 'FT_PCT_ONE', 'OREB_ONE', 'DREB_ONE', 'REB_ONE',
                    'AST_ONE', 'STL_ONE', 'BLK_ONE', 'TOV_ONE', 'PF_ONE', 'EFG%_ONE', 'TS%_ONE']

    # splitting into training and testing sets
    all_seasons = sorted(combined_stats['SEASON_YEAR'].unique())
    train_seasons = all_seasons[:-test_seasons]
    test_seasons_list = all_seasons[-test_seasons:]
    train_data = combined_stats[combined_stats['SEASON_YEAR'].isin(train_seasons)].copy()
    test_data = combined_stats[combined_stats['SEASON_YEAR'].isin(test_seasons_list)].copy()

    # if models and feature columns not given as parameters
    if models is None and feature_cols is None:
        models, feature_cols = train_model(train_data, stat_columns, rolling_window)
    
    predictions = []
    actuals = []

    # predict stats for all games in testing set
    for _, row in test_data.iterrows():
        team_one_id = row['TEAM_ID_ONE']
        team_two_id = row['TEAM_ID_TWO']
        home = row['HOME_ONE']
        date = row['GAME_DATE']

        pred = predict_game_stats(
                models, feature_cols, combined_stats,
                team_one_id = team_one_id,
                team_two_id = team_two_id,
                home_one = home,
                date = date,
                stat_columns = stat_columns,
                rolling_window = rolling_window
            )

        predictions.append([pred[stat] for stat in stat_columns])
        actuals.append([row[stat] for stat in stat_columns])

    # evaluating model's predictions
    y_true = np.array(actuals)
    y_pred = np.array(predictions)
    total_rmse = np.sqrt(mean_squared_error(y_true.flatten(), y_pred.flatten()))
    return total_rmse

In [None]:
rmse = evaluate_stats_model(combined_stats, rolling_window = 10, test_seasons = 4)

In [None]:
rmse

### Hyperparameter Tuning (NEEDS TO BE FIXED)

In [None]:
from sklearn.model_selection import RandomizedSearchCV

In [None]:
def tune_stats_model(X, y, n_iter = 5, cv = 3):
    param_grid = {
        'n_estimators': [50, 100, 200],
        'max_depth': [4, 6, 8, 10],
        'learning_rate': [0.01, 0.05, 0.1, 0.3],
        'subsample': [0.5, 0.5, 1.0],
        'colsample_bytree': [0.5, 0.5, 1.0]
    }

    model = XGBRegressor()
    search = RandomizedSearchCV(
        estimator = model,
        param_distributions = param_grid,
        n_iter = n_iter,
        cv = cv,
        scoring = 'neg_root_mean_squared_error',
        verbose = False,
        n_jobs = -1
    )
    search.fit(X, y)
    return search.best_estimator_, search.best_params_

In [None]:
def train_mult_models(combined_stats, stat_columns, rolling_window = 10, n_iter = 5, cv = 3):
    """
    Trains a model to predict team stats for a given game and does randomized hyperparameter tuning for each stat.
    """
    combined_stats = combined_stats.sort_values(by='GAME_DATE').reset_index(drop=True)
    feature_rows = []
    target_dict = {stat: [] for stat in stat_columns}

    for idx, row in combined_stats.iterrows():
        date = row['GAME_DATE']
        team_one = row['TEAM_ID_ONE']
        team_two = row['TEAM_ID_TWO']
        home = row['HOME_ONE']

        features = build_features_for_game(team_one, team_two, home, date, combined_stats, stat_columns, rolling_window)
        if features is None:
            continue
            
        feature_rows.append(features)

        for stat in stat_columns:
            target_dict[stat].append(row[stat])

    X = pd.DataFrame(feature_rows)
    X = pd.get_dummies(X, columns=['TEAM_ID_ONE', 'TEAM_ID_TWO', 'HOME_ONE'], drop_first=True)

    # finding best hyperparameters for each stat
    models = {}
    best_params = {}
    
    for stat in stat_columns:
        print(f"Tuning model for {stat}...")
        y = pd.Series(target_dict[stat])
        model, params = tune_stats_model(X, y, n_iter = 5, cv = 3)
        models[stat] = model
        best_params[stat] = params

    return models, X.columns.tolist(), best_params

In [None]:
best_models, feature_cols, best_params = train_mult_models(
    combined_stats,
    stat_columns = stat_columns,
    rolling_window = 5,
    n_iter = 5,
    cv = 3
)

In [None]:
for stat in best_params:
    print(f'Best parameters for {stat} :')
    for p in best_params[stat]:
        print(f'{p}: {best_params[stat][p]}')
    print('\n')

In [None]:
rmse_tuned = evaluate_stats_model(combined_stats, rolling_window = 10, test_seasons = 4,
                                  models = best_models, feature_cols = feature_cols)

In [None]:
rmse_tuned

## Model Training and Testing

### Validation and Testing

In [None]:
def get_training_set (date, num_seasons) :
    """
    Input: Date of games and number of seasons to include in dataset
    Output: All rows from the last num_seasons and all games in the current season up till the given date
    """
    # determine season of the game
    season = date.year if date.month >= 10 else date.year - 1
    
    # get games for training
    data = df[df['SEASON_YEAR'].between(season - num_seasons, season)].copy()
    data['DAYS_SINCE_GAME'] = [(date-game_day).days for game_day in data['GAME_DATE']]
    data = data[data['DAYS_SINCE_GAME'] > 0]

    data = data.sort_values(by = 'DAYS_SINCE_GAME')

    # split into X and y and only look at relevant columns
    X = data.drop(columns = ['WIN_ONE', 'GAME_DATE'])
    y = data['WIN_ONE']

    return (X,y)

def pred_by_date (df, model, date) :
    """
    Predict the outcome of all games on the given date. 
    """
    n = 5 # how many years in the past for training
    
    # determine season of the game
    season = date.year if date.month >= 10 else date.year - 1

    # get data in relevant time frame
    X, y = get_training_set(date, n)

    #df = pd.DataFrame(columns=X.columns)

    games_on_day = df[df['GAME_DATE'] == date].copy()
    games_on_day['DAYS_SINCE_GAME'] = np.zeros(len(games_on_day))

    test = games_on_day.drop(columns = ['WIN_ONE', 'GAME_DATE'])

    model.fit(X,y)
    pred = model.predict(test)
    correct = np.sum(pred == games_on_day['WIN_ONE'])
    games = len(pred)
    return correct, games

In [None]:
# testing the model by choosing 3 days each season and checking score
def test_model(df, model) :
    total_correct = total_games = 0

    for d in val_set:
        correct, games = pred_by_date(df, model, d)

        total_correct += correct
        total_games += games
    return total_correct, total_games

In [None]:
model = XGBClassifier(objective='binary:logistic', random_state = 33)
correct,games = test_model(model)
correct / games

# before FE : 0.6298076923076923
# didn't include point differential since accuracy is 1

### Feature Selection

The average feature importance scores is calculated for the three games for each season using XG Boost built-in feature importance 

In [None]:
def pred_by_date_with_importance(model, date):
    n = 5 
    season = date.year if date.month >= 10 else date.year - 1
    X, y = get_training_set(date, n)
    # one hot encoding on the Home feature 
    games_on_day = df[df['GAME_DATE'] == date].copy()
    games_on_day['DAYS_SINCE_GAME'] = np.zeros(len(games_on_day))

    test = games_on_day.drop(columns = ['WIN_ONE', 'GAME_DATE'])

    model.fit(X,y)
    pred = model.predict(test)
    correct = np.sum(pred == games_on_day['WIN_ONE'])
    games = len(pred)
    importance_scores = model.get_booster().get_score(importance_type='gain')
    return correct, games, importance_scores

In [None]:
def test_model_with_importance(model) :
    """
    Outputs the average feature importance scores of game predictions
    """
    total_correct = total_games = 0
    feature_scores = {}
    for t in test:
        correct, games, importance_scores = pred_by_date_with_importance(model, t)
        
        for feature, score in importance_scores.items():
            if feature not in feature_scores:
                feature_scores[feature] = []
            feature_scores[feature].append(score)
            

        total_correct += correct
        total_games += games

    average_importance = {features: sum(scores)/len(scores) for features, scores in feature_scores.items()}  
    sorted_features = sorted(average_importance.items(), key=lambda x: x[1], reverse=True)
    
    return sorted_features

In [None]:
model = XGBClassifier(objective='binary:logistic')
importance_scores = test_model_with_importance(model)
print(importance_scores)

Testing the model with the feature importance scores by iteratively removing the least important features and comparing the accuracy:

In [None]:
def get_training_set_with_features (date, num_seasons, features) :
    """
    Input: Date of games, number of seasons and feature subset to include in dataset
    Output: All rows from the last num_seasons and all games in the current season up till the given date
    """
    season = date.year if date.month >= 10 else date.year - 1
    data = df[df['SEASON_YEAR'].between(season - num_seasons, season)].copy()
    data['DAYS_SINCE_GAME'] = [(date-game_day).days for game_day in data['GAME_DATE']]
    data = data[data['DAYS_SINCE_GAME'] > 0]

    data = data.sort_values(by = 'DAYS_SINCE_GAME')

    X = data[features]
    y = data['WIN_ONE']

    return (X,y)

def pred_by_date_with_features (model, date, features) :
    n = 5 
    season = date.year if date.month >= 10 else date.year - 1

    X, y = get_training_set_with_features(date, n, features)

    games_on_day = df[df['GAME_DATE'] == date].copy()
    games_on_day['DAYS_SINCE_GAME'] = np.zeros(len(games_on_day))

    test = games_on_day[features]
    model.fit(X,y)
    pred = model.predict(test)
    correct = np.sum(pred == games_on_day['WIN_ONE'])
    games = len(pred)
    return correct, games

In [None]:
def feature_selection_with_importance(model, current_features, min_subset_size, top_n) :
    """
    Iterates through the feature importance scores and iteratively remove the least importance features
    """
    results = []
    # current_features = [f[0] for f in feature_importance]
    
    while len(current_features) >= min_subset_size:
        total_correct = total_games = 0
        print(f"Evaluating with {len(current_features)} features...")
        for t in test:    
            correct, games = pred_by_date_with_features(model, t, features = current_features)
        
            total_correct += correct
            total_games += games
        print(current_features, ':', total_correct/total_games)
        results.append((current_features.copy(), total_correct/total_games))
        current_features.pop(-1)
    results.sort(key=lambda x: x[1], reverse=True)
    return results[:top_n]

In [None]:
model = XGBClassifier(objective='binary:logistic')
sorted_features = [f[0] for f in importance_scores]
print(sorted_features)
top_subsets = feature_selection_with_importance(model, sorted_features, min_subset_size=20, top_n=10)

for i, (subset, acc) in enumerate(top_subsets, 1):
    print(f"#{i}: Features = {subset}, Accuracy = {acc:.4f}")

In [None]:
# best performing feature subset
best_feature_subset = top_subsets[0][0]
print('Best feature subset: ', best_feature_subset)
total_correct = total_games = 0
for t in test:
    correct, games = pred_by_date_with_features(model, t, best_feature_subset)

    total_correct += correct
    total_games += games
print('Accuracy:', total_correct / total_games)

In [None]:
# from itertools import combinations
# def feature_selection(model, feature_names, min_subset_size, max_subset_size, top_n) :
#     """
#     Iterates through the feature subsets and returns the top n subsets that gives the best scores
#     """
#     print('start')
#     results = []
#     for n in range(min_subset_size, max_subset_size + 1):
#         print(n)
#         for subset in combinations(feature_names, n):
#             print(subset)
#             total_correct = total_games = 0
#             for t in test:
#                 print('test')
#                 correct, games = pred_by_date_with_features(model, t, features = list(subset))
        
#                 total_correct += correct
#                 total_games += games
#             print(subset, ':', total_correct/total_games)
#             results.append((subset, correct/games))
#     results.sort(key=lambda x: x[1], reverse=True)
#     return results[:top_n]

In [None]:
# goes through every combinations of size 40; takes too long (5+ hours)
# model = XGBClassifier(objective='binary:logistic')
# all_features = [col for col in df.columns if col not in ['WIN_ONE', 'GAME_DATE', 'SEASON_YEAR']]
# top_subsets = feature_selection(model, all_features, min_subset_size=40, max_subset_size=40, top_n=10)

# for i, (subset, acc, total) in enumerate(top_subsets, 1):
#     print(f"#{i}: Features = {subset}, Accuracy = {acc:.4f}")

### Hyperparameter Tuning for XGBoost

In [None]:
def pred_by_date_multiple_models (models_dict, date) :
    """
    Predict the outcome of all games on the given date for all models given. Used specifically to make
    cross validation more efficient
    """
    n = 5 # how many years in the past for training
    
    # determine season of the game
    season = date.year if date.month >= 10 else date.year - 1

    # get data in relevant time frame
    X, y = get_training_set(date, n)

    games_on_day = df[df['GAME_DATE'] == date].copy()
    games_on_day['DAYS_SINCE_GAME'] = np.zeros(len(games_on_day))

    test = games_on_day.drop(columns = ['WIN_ONE', 'GAME_DATE'])

    scores = np.zeros(len(models_dict))
    for k, v in models_dict.items() :
        v.fit(X,y)
        pred = v.predict(val_set)
        scores[k] = np.sum(pred == games_on_day['WIN_ONE'])
    return scores, len(games_on_day)

In [None]:
import itertools

# XGBoost parameters
param_grid = {
    "n_estimators": [50, 100, 200, 400],
    "eta": [0.01, 0.05, 0.1, 0.2], # learning_rate
    "max_depth": [4, 6, 8, 10], # maximum depth of a tree
    "subsample": [0.5, 0.7, 1], # fraction of observation to be radnomly sampled for each tree
    "colsample_bytree": [0.5, 0.7, 1], # fraction of columns to be random samples for each tree
    "alpha": [0.5, 1, 2, 5] # lasso regression
}

param_dict = {} # store params with key corresponding to index of score in np.array
index = 0

# Iterate over all combinations of hyperparameters
for values in itertools.product(*param_grid.values()):
    param_dict[index] = XGBClassifier(objective='binary:logistic', random_state = 33, **dict(zip(param_grid.keys(), values)))
    index += 1

scores = np.zeros(len(param_dict))
total_games = 0

first_season = df['SEASON_YEAR'].min()
last_season = df['SEASON_YEAR'].max()-4

for t in test:
    s, g = pred_by_date_multiple_models(param_dict, t)

    scores += s
    total_games += g
    print(scores / total_games)

print('final scores: ', scores / total_games)

In [None]:
all_scores = scores / total_games
best_model = param_dict[all_scores.argmax()]
best_model.get_params() #'n_estimators': 400, eta: 0.01, max_depth: 4, subsample: 0.7, colsample_bytree: 0.7, alpha: 2

In [None]:
top_five_models = np.argpartition(all_scores, -5)[-5:]
top_five_models = top_five_models[np.argsort(-all_scores[top_five_models])]
top_five_scores = all_scores[top_five_models]
print(top_five_scores)
for i in top_five_models : 
    p = param_dict[i].get_params()
    print(f"n_estimators = {p['n_estimators']}, eta = {p['eta']}, max_depth = {p['max_depth']}, subsample = {p['subsample']}, colsample_bytree = {p['colsample_bytree']}, alpha = {p['alpha']}")

In [None]:
final_model = XGBClassifier(n_estimators = 50, eta = 0.05, max_depth = 4, subsample = 0.5, colsample_bytree = 0.5, alpha = 1)
correct, games = test_model(df_rolling, final_model)
print("Score:", correct / games)