<a href="https://colab.research.google.com/github/rafabandoni/nfl-predict/blob/main/notebooks/03_nfl_predict_class_modeling.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install optuna

In [None]:
import pandas as pd
import seaborn as sns
import numpy as np

from abc import ABC, abstractmethod

from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score
from sklearn.base import BaseEstimator, TransformerMixin
# from sklearn.preprocessing import StandardScaler
# from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.pipeline import Pipeline
from xgboost import XGBClassifier

import shap
import optuna
import pickle

# 0.3 Pre Processing

In [None]:
featured_df = pd.read_parquet('https://github.com/rafabandoni/nfl-predict/raw/refs/heads/main/data/output/featured_df.parquet')
featured_df.head()

In [None]:
test_df = pd.read_parquet('https://github.com/rafabandoni/nfl-predict/raw/refs/heads/main/data/output/test_data.parquet')
test_df.head()

## Creating preprocess pipeline

### Creating functions for pipeline

In [None]:
# Let's start by creating an abstract class
class BaseTransformer(BaseEstimator, TransformerMixin, ABC):
  def __init__(self):
    super().__init__()

  def fit(self, X, y=None):
    return self # in our case, there's nothing to fit here

  @abstractmethod
  def transform(self, X):
    pass

In [None]:
def creating_home_and_away_columns(columns: list) -> list:
  new_named_list = []
  for item in columns:
    new_named_list.append(item + '_home_')
    new_named_list.append(item + '_away_')
  return new_named_list

In [None]:
# Cleaning % data
def clean_percent_data(df):
  for column in df.columns:
    if '%' in column:
      df[column] = df[column] / 100

In [None]:
# Remove special characters from columns
def remove_special_char_columns(df):
  for column in df.columns:
    new_name = column.lower().replace(' ','_').replace('%', '_perc')
    df.rename(columns={
        column : new_name
    }, inplace=True)

In [None]:
# Transform turnover in negative data
turnover_columns = [
    'defense_passing_lng',
    'defense_receiving_lng',
    'defense_rushing_lng',
    'defense_interceptions_lng',
    'offense_passing_lng',
    'offense_receiving_lng',
    'offense_rushing_lng',
    'special_teams_kickoff_returns_lng',
    'special_teams_punt_returns_lng',
    'special_teams_punting_lng'
]

def negative_turnover_number(turnover_columns, df):
  for column in turnover_columns:
    new_column = []
    for item in df[column]:
      if 'T' in item:
        new_value = item.replace('T', '')
        new_value = int(new_value)
        new_value = new_value * -1
      else:
        new_value = int(item)
      new_column.append(new_value)
    df[column] = new_column

In [None]:
# Fix columns with A_M (attemps_made)
columns_list = [
    'special_teams_field_goals_1-19_>_a-m',
    'special_teams_field_goals_20-29_>_a-m',
    'special_teams_field_goals_30-39_>_a-m',
    'special_teams_field_goals_40-49_>_a-m',
    'special_teams_field_goals_50-59_>_a-m',
    'special_teams_field_goals_60+_>_a-m'
]

# We will transform it in a percent so we don't need to create a new column for each case
def transform_home_away_columns_percent(percent_columns, df):
  for column in percent_columns:
    attps_list = df[column].str.split('_').str[0]
    matches_list = df[column].str.split('_').str[1]

    percent_matches = matches_list.astype('int') / attps_list.astype('int')
    percent_matches.fillna(0, inplace=True)
    df[column] = percent_matches
    df.rename(columns={
        column : column.replace('a-m', 'percent_a_m')
    }, inplace=True)

In [None]:
# Removing non important columns (trash from the origin)
columns_to_drop = ['data_fgm',
                   'data_fg__perc',
                   'data_xpm',
                   'data_xp_pct',
                   'data_kret_td',
                   'data_pret_t']

def drop_columns(df, columns_to_drop):
  columns_to_drop = creating_home_and_away_columns(columns_to_drop)
  df.drop(columns_to_drop,
          axis=1,
          inplace=True)

In [None]:
def fix_datatype(df, column, data_type):
  df[column] = df[column].astype(data_type)

In [None]:
def home_winner(score_home, score_away):
  if score_home > score_away:
    return True
  else:
    return False # we will not be working on ties here since they are rare on NFL

In [None]:
def drop_unused_columns(df):
  df.drop(['weather_temperature',
           'weather_wind_mph',
           'weather_humidity',
           'weather_detail'],
          axis=1,
          inplace=True)

### Creating pipeline

In [None]:
pipeline_steps = [
    ('clean_percent_data', clean_percent_data()),
    ('remove_special_char_columns', remove_special_char_columns()),
    ('negative_turnover_number', negative_turnover_number()),
    ('transform_home_away_columns_percent', transform_home_away_columns_percent()),
    ('drop_columns', drop_columns()),
    ('fix_datatype', fix_datatype()),
    ('home_winner', home_winner()),
    ('drop_unused_columns', drop_unused_columns()),
]

In [None]:
pipe = Pipeline(
    pipeline_steps
)

In [None]:
pipe.set_params(df=False,
                classifier__C=0.1)

### Applying pipeline

## Variables defining

**IMPORTANT**: X, y and train test split before making preprocessing to avoid data leakage

In [None]:
target = 'home_winner'
X_train = featured_df.drop(target, axis=1)
y_train = featured_df[[target]]
X_test = test_df.drop(target, axis=1)
y_test = test_df[[target]]

In [None]:
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 0.3.1 ML Model

## Building model

In [None]:
#Define the objective function for Optuna
def objective(trial):
    params = {
        'n_estimators': trial.suggest_int('n_estimators', 200, 800),
        'max_depth': trial.suggest_int('max_depth', 2, 25),
        'min_child_weight': trial.suggest_int('min_child_weight', 2, 20),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
        'subsample': trial.suggest_float('subsample', 0.2, 0.8),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.1, 1.0),
        'gamma': trial.suggest_float('gamma', 1e-8, 2, log=True),
        'reg_lambda': trial.suggest_float('reg_lambda', 1e-3, 10.0),
        'alpha': trial.suggest_float('alpha', 1, 15.0, log=True),
        'scale_pos_weight': trial.suggest_float('scale_pos_weight', 0.1, 5.0)
    }

    model = XGBClassifier(
        **params,
        # use_label_encoder=False,
        eval_metric='mlogloss',
        # enable_categorical=True,
        tree_method='hist',
        device= 'cuda',
        # objective='multi:softmax',
        objective='binary:logistic',
        # num_class=2
    )
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    # mcc = matthews_corrcoef(y_test, y_pred)
    # acc = accuracy_score(y_test, y_pred)
    skf = StratifiedKFold(n_splits=5, shuffle=True)
    acc = cross_val_score(model, X_train, y_train, cv=skf, scoring='accuracy').mean()
    trial.set_user_attr("acc", acc)
    return acc

# Callback to print the MCC score for each trial
# def print_mcc_callback(study, trial):
#     mcc = trial.user_attrs["mcc"]
#     print(f"Trial {trial.number}: MCC = {mcc}")

# def print_accuracy_callback(study, trial):
#     acc = trial.user_attrs["acc"]
#     print(f"Trial {trial.number}: Accuracy score = {acc}")

# Optimize hyperparameters with Optuna
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=30)

# Get the best parameters
best_params = study.best_params
print(f"Best parameters: {best_params}")

In [None]:
xgb = XGBClassifier(
    **best_params,
    eval_metric='mlogloss',
    tree_method='hist',
    device= 'cuda',
    objective='binary:logistic'
)
xgb.fit(X_train, y_train)

## Predicting

In [None]:
predictions = xgb.predict(X_test)

## Evaluating model

In [None]:
cf_matrix = confusion_matrix(y_test, predictions)

group_names = ['True Neg','False Pos','False Neg','True Pos']
group_counts = ['{0:0.0f}'.format(value) for value in
                cf_matrix.flatten()]
group_percentages = ['{0:.2%}'.format(value) for value in
                     cf_matrix.flatten()/np.sum(cf_matrix)]
labels = [f'{v1}\n{v2}\n{v3}' for v1, v2, v3 in
          zip(group_names,group_counts,group_percentages)]
labels = np.asarray(labels).reshape(2,2)
sns.heatmap(cf_matrix, annot=labels, fmt='', cmap='Reds')

In [None]:
print(classification_report(y_test, predictions))

In [None]:
# explainer = shap.Explainer(xgb, X_test)
explainer = shap.Explainer(xgb, pd.DataFrame(X_train.astype('float64'), columns=X.columns))

# shap_values = explainer(X_test)
shap_values = explainer(pd.DataFrame(X_test, columns=X.columns))

shap.plots.waterfall(shap_values[0])

In [None]:
shap.plots.beeswarm(shap_values)

## Saving model

In [None]:
# save
with open('model_nfl.pkl','wb') as f:
    pickle.dump(xgb,f)