# Predicting basketball injuries using ML
### Alternative finance project

Supervised by Prof. Marcus Frunza - coded by **Soughati Kenza, Henry-Biabaud Briac, Collin Thibault**

The aim of this project was to work around machine learning models to predict NBA basketball player injuries for the next game-ahead, using a collection of box score individual statistics from previous games.

Having already extracted all the data required in a previous notebook {*extraction*}, we now work on implementing machine learning techniques to analyze the sets and work on the prediction.

We import several well-known libraries, and we initialize the booleans that will determine the final pipeline ran.

In [None]:
import numpy as np
import pandas as pd
import requests
from bs4 import BeautifulSoup
import time

In [None]:
usingPCA = False
usingXGBoost = False
usingDeep = True
useSmote = False
useAdasyn = True
usingRecursiveElim = True
usingFocalLoss = False
usingIsolationForest = True
usingLightGBM = False

## Features engineering

Because our set are so imbalanced by nature (barely 5% of all individual games result in an injury) we have to rebalance conveniently our data. We perform *resampling* to preserve patterns and only remove what can be considered as close duplicates.

In [None]:
stats_df = pd.read_csv('stats_full.txt', sep=' ')

y_df = stats_df[['Inj After']]
x_df = stats_df.drop(columns=['Inj After', 'Season'])

Splitting our set into training and set sets.

In [None]:
from sklearn.model_selection import train_test_split

x_all, x_test, y_all, y_test = train_test_split(x_df, y_df, test_size=0.2, random_state=1)
x_all = x_all.drop(columns=['Player', 'Date'])

Using isolation forest to detect and remove outliers from out set.

https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.IsolationForest.html

In [None]:
from sklearn.ensemble import IsolationForest

if usingIsolationForest:
    iso_forest = IsolationForest(n_estimators=100, max_samples='auto', contamination=0.03, n_jobs=-1, random_state=203)
    outliers_pred = iso_forest.fit_predict(x_all)

    outliers_cleaned_x_all = x_all[outliers_pred == 1]
    outliers_df_scores = y_all[outliers_pred == 1]

## Features selection

We store a baseline entire selection of features.

In [None]:
features = list(x_all.columns)

Performing PCA to retain features that explain a certain threshold of the total variance from the target variable.

https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html

In [None]:
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import numpy as np

if usingPCA:
    scaler = StandardScaler()
    x_scaled = scaler.fit_transform(x_all[features])

    pca = PCA(n_components=0.999)
    x_pca = pca.fit_transform(x_scaled)
    n_components_chosen = pca.n_components_

    components_abs = np.abs(pca.components_)
    important_feature_indices = []
    chosen_indices = set()

    for _ in range(n_components_chosen):
        max_index = np.argmax(components_abs)
        row, col = divmod(max_index, components_abs.shape[1])
        while col in chosen_indices:
            components_abs[row, col] = 0
            max_index = np.argmax(components_abs)
            row, col = divmod(max_index, components_abs.shape[1])
        important_feature_indices.append(col)
        chosen_indices.add(col)

    features_new = [features[i] for i in important_feature_indices]

We also try running recursive elimination to select the features that explain variance the most.

https://scikit-learn.org/stable/modules/generated/sklearn.feature_selection.RFE.html

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.feature_selection import RFE
from sklearn.preprocessing import StandardScaler

if usingRecursiveElim:
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(x_all[features])
    
    classifier = LogisticRegression(max_iter=10000)
    rfe = RFE(estimator=classifier, n_features_to_select=int(0.95*len(features)))
    
    y_all_numpy = y_all.values.ravel() if isinstance(y_all, pd.DataFrame) else y_all.ravel()
    
    rfe.fit(X_scaled, y_all_numpy)
    
    feature_mask = rfe.support_
    features_new = [feature for feature, selected in zip(features, feature_mask) if selected]

## Model selection

First we split our set into train and validation sets for a better overall performance.

https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html

In [None]:
x_train, x_val, y_train, y_val = train_test_split(x_all, y_all, test_size=0.2, random_state=1)

x_train = x_train[features_new]
x_val = x_val[features_new]
x_test = x_test[features_new + ['Player']]

## Gradient boosting machines

LightGBM is a first model we are going to implement.

https://lightgbm.readthedocs.io/en/stable/

In [None]:
import lightgbm as lgb

if usingLightGBM:
    scale_pos_weight = len(y_train[y_train == 0]) / len(y_train[y_train == 1])
    pr_auc_scorer = make_scorer(average_precision_score, response_method='predict_proba')

    param_grid = {
        'lgbmclassifier__num_leaves': [120],
        'lgbmclassifier__reg_alpha': [1.0],
        'lgbmclassifier__reg_lambda': [1.0],
        'lgbmclassifier__learning_rate': [0.01],
        'lgbmclassifier__n_estimators': [80]
    }
   
    if useSmote:
        lgbm_pipeline = Pipeline([
            ('sampling', SMOTE(random_state=203)),
            ('lgbmclassifier', lgb.LGBMClassifier(use_label_encoder=False, eval_metric='logloss', scale_pos_weight=scale_pos_weight))
        ])
    
    if useAdasyn:
        lgbm_pipeline = Pipeline([
            ('sampling', ADASYN(random_state=203)),
            ('lgbmclassifier', lgb.LGBMClassifier(use_label_encoder=False, eval_metric='logloss', scale_pos_weight=scale_pos_weight))
        ])

    kf = KFold(n_splits=10, shuffle=True, random_state=203)
        
    grid_search = GridSearchCV(
        lgbm_pipeline,
        param_grid,
        cv=kf,
        scoring='roc_auc',
        refit='pr_auc',
        n_jobs=-1,
        verbose=True
    )
    
    grid_search.fit(x_train, y_train)

    optimal_params = grid_search.best_params_
    best_validation_score = grid_search.best_score_
    y_pred_proba_test = grid_search.predict_proba(x_test.drop(columns=['Player']))[:, 1]
    test_set_auc_roc = roc_auc_score(y_test, y_pred_proba_test)

    print("=== Optimal Parameters Found ===")
    for param, value in optimal_params.items():
        print(f"- {param}: {value}")

    print("\n=== Performance Evaluation ===")
    print(f"AUC-ROC Score (Validation Set): {best_validation_score:.4f}")
    print(f"AUC-ROC Score (Test Set): {test_set_auc_roc:.4f}")

We also try an alternative gradient boosting machine, xgBoost.

https://xgboost.readthedocs.io/en/stable/

In [None]:
from imblearn.over_sampling import SMOTE
from imblearn.over_sampling import ADASYN
from imblearn.pipeline import Pipeline as ImbPipeline
from xgboost import XGBClassifier
from sklearn.model_selection import GridSearchCV, KFold
from sklearn.metrics import roc_auc_score, make_scorer, average_precision_score

if usingXGBoost:
    scale_pos_weight = len(y_train[y_train == 0]) / len(y_train[y_train == 1])
    pr_auc_scorer = make_scorer(average_precision_score, response_method='predict_proba')

    param_grid = {
        'xgboost__n_estimators': [135],
        'xgboost__max_depth': [15],
        'xgboost__learning_rate': [0.1],
        'xgboost__subsample': [1.0],
        'xgboost__colsample_bytree': [1.0],
        'xgboost__gamma': [0.1], 
        'xgboost__min_child_weight': [1],
        'xgboost__reg_alpha': [0.1],
        'xgboost__reg_lambda': [0.2],
    }
    
    if useSmote:
        xgb_pipeline = ImbPipeline([
            ('sampling', SMOTE(random_state=203)),
            ('xgboost', XGBClassifier(use_label_encoder=False, eval_metric='logloss', scale_pos_weight=scale_pos_weight))
        ])
    
    if useAdasyn:
        xgb_pipeline = ImbPipeline([
            ('sampling', ADASYN(random_state=203)),
            ('xgboost', XGBClassifier(use_label_encoder=False, eval_metric='logloss', scale_pos_weight=scale_pos_weight))
        ])

    kf = KFold(n_splits=10, shuffle=True, random_state=203)

    grid_search = GridSearchCV(
        xgb_pipeline,
        param_grid,
        cv=kf,
        scoring='roc_auc',
        refit='pr_auc',
        n_jobs=-1,
        verbose=True
    )

    grid_search.fit(x_train, y_train)

    optimal_params = grid_search.best_params_
    best_validation_score = grid_search.best_score_
    y_pred_proba_test = grid_search.predict_proba(x_test.drop(columns=['Player']))[:, 1]
    test_set_auc_roc = roc_auc_score(y_test, y_pred_proba_test)

    print("=== Optimal Parameters Found ===")
    for param, value in optimal_params.items():
        print(f"- {param}: {value}")

    print("\n=== Performance Evaluation ===")
    print(f"AUC-ROC Score (Validation Set): {best_validation_score:.4f}")
    print(f"AUC-ROC Score (Test Set): {test_set_auc_roc:.4f}")

## Deep learning experimentations

Now trying to implement a deep learning model to handle class imbalances.

https://www.tensorflow.org/api_docs/python/tf/keras/metrics

In [None]:
import tensorflow as tf
from tensorflow.keras import backend as K

def focal_loss(gamma=2.0, alpha=0.25):
    def focal_loss_fixed(y_true, y_pred):
        
        epsilon = K.epsilon()
        y_pred = K.clip(y_pred, epsilon, 1. - epsilon)
        cross_entropy_pos = -y_true * K.log(y_pred)
        cross_entropy_neg = -(1 - y_true) * K.log(1 - y_pred)
        loss = alpha * K.pow(1 - y_pred, gamma) * cross_entropy_pos + (1 - alpha) * K.pow(y_pred, gamma) * cross_entropy_neg
        return K.mean(loss, axis=-1)
    
    return focal_loss_fixed

In [None]:
from tensorflow.keras.layers import Input, Dense, Dropout, BatchNormalization
from tensorflow.keras.models import Sequential
from tensorflow.keras.regularizers import l1_l2
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Precision, Recall, AUC, TruePositives, FalsePositives, TrueNegatives, FalseNegatives
from tensorflow.keras.losses import BinaryCrossentropy
from sklearn.utils.class_weight import compute_class_weight
import numpy as np
from sklearn.metrics import roc_auc_score

if usingDeep:
    class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(y_train['Inj After'].values), y=y_train['Inj After'].values)
    class_weight_dict = {i: class_weights[i] for i in range(len(class_weights))}

    model = Sequential([
        Input(shape=x_train.shape[1:]),
        Dense(128, activation='relu', kernel_initializer='he_uniform', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
        BatchNormalization(),
        Dropout(0.5),
        Dense(96, activation='relu', kernel_initializer='he_uniform', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
        BatchNormalization(),
        Dropout(0.5),
        Dense(64, activation='relu', kernel_initializer='he_uniform', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
        BatchNormalization(),
        Dropout(0.5),
        Dense(48, activation='relu', kernel_initializer='he_uniform', kernel_regularizer=l1_l2(l1=1e-5, l2=1e-4)),
        BatchNormalization(),
        Dropout(0.5),
        Dense(1, activation='sigmoid')
    ])

    lr_schedule = ExponentialDecay(initial_learning_rate=1e-3, decay_steps=5000, decay_rate=0.90)
    optimizer = Adam(learning_rate=lr_schedule)

    if usingFocalLoss:
        model.compile(optimizer=optimizer, loss=focal_loss(gamma=2.0, alpha=0.25), metrics=[AUC()])
    else:
        model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=[AUC()])

    early_stopping = EarlyStopping(monitor='val_loss', patience=4, restore_best_weights=True)

    history = model.fit(x_train, y_train, epochs=12, validation_data=(x_val, y_val), callbacks=[early_stopping], verbose=False, class_weight=class_weight_dict)

    test_results = model.evaluate(x_test.drop(columns=['Player']), y_test)
    val_pred = model.predict(x_val)
    val_auc_roc = roc_auc_score(y_val, val_pred)
    print("\n=== Performance Evaluation ===")
    print(f"AUC-ROC Score (Validation Set): {val_auc_roc:.4f}")
    print(f"AUC-ROC Score (Test Set): {test_results[1]:.4f}")

The scores indicate a modest but correct ability to discern those highly imbalanced classes.

## Prevent betting on injury-prone valuable players

The model has a modest ability to predict players with a tendency to get injured at the next game. We therefore introduce  a second test set, more recent and only composed of very recent players statistics. We will run our prediction model and output an injury watch list, with an indicator of how important would the player's absence be. Then, we will scrap the upcoming game and work on giving betting advices based on odds.

In [None]:
testing_df = pd.read_csv('testing_stats.txt', sep=' ')

testing_df = testing_df.drop(columns=['Inj After'])
x_testing_df = testing_df.drop(columns=['Date', 'Season'])

In [None]:
y_pred = model.predict(x_testing_df.drop(columns=['Player'])[features_new])

threshold = 0.35
y_pred_binary = (y_pred < threshold).astype(int)

testing_df['Prediction'] = y_pred_binary
predicted_df = testing_df[testing_df['Prediction'] == 1]

In [None]:
injury_watch = predicted_df[['Player', 'Valuation_Indic']]
injury_watch_sorted = injury_watch.sort_values(by='Valuation_Indic', ascending=False).head(10)

## Retrieve the full name and current team of the players from the injury watch

We therefore get a more visual indicator on where not to bet.

In [None]:
players = injury_watch_sorted['Player']

Below we scrap the needed data: retrieving the full names of each player for easier understanding, what game they are supposed to play next, and then an indication on how an injury would impact the game, based on short-term previous performances.

In [None]:
for player in players:
    base_url = 'https://www.basketball-reference.com/players/'
    url = base_url + str(player[0]) + '/' + str(player) + '.html'
    
    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')

    player_name_tag = soup.select_one('h1 span')
    player_name = player_name_tag.text.strip()
    
    injury_watch_sorted.loc[injury_watch_sorted['Player'] == player, 'Name'] = player_name
    
    url = base_url + str(player[0]) + '/' + str(player) + '/gamelog/2023'
    response = requests.get(url)

    soup = BeautifulSoup(response.content, "html.parser")
    table = soup.find("table", {"id": "pgl_basic"})

    columns = [th.getText() for th in table.find("thead").findAll("th")][1:]

    data_rows = table.find("tbody").findAll("tr")
    data = [[td.getText() for td in data_rows[i].findAll("td")] for i in range(len(data_rows))]

    df = pd.DataFrame(data, columns=columns)

    df['Date'] = pd.to_datetime(df['Date'])
    target_date = pd.Timestamp('2023-03-27')
    after_target_date = df[df['Date'] > target_date].iloc[0]

    injury_watch_sorted.loc[injury_watch_sorted['Player'] == player, 'Current team'] = after_target_date['Tm']
    injury_watch_sorted.loc[injury_watch_sorted['Player'] == player, 'Next opponent'] = after_target_date['Opp']
    injury_watch_sorted.loc[injury_watch_sorted['Player'] == player, 'Next game date'] = after_target_date['Date']
    
    time.sleep(2)

Transforming the ouput to provide a more straightforward metric for the user.

In [None]:
conditions = [
    injury_watch_sorted['Valuation_Indic'] < 35,
    (injury_watch_sorted['Valuation_Indic'] >= 35) & (injury_watch_sorted['Valuation_Indic'] < 50),
    (injury_watch_sorted['Valuation_Indic'] >= 50) & (injury_watch_sorted['Valuation_Indic'] < 75),
    injury_watch_sorted['Valuation_Indic'] >= 75]

labels = ['Poor', 'Average', 'Significant', 'Critical']

injury_watch_sorted['Injury influence on game'] = np.select(conditions, labels, default='Unknown')
injury_watch_sorted = injury_watch_sorted.drop(columns=['Player', 'Valuation_Indic']).drop_duplicates()

The resulting dataframe shows the user a list of future players and games to avoid betting on, giving a relative risk of the player getting injured during the game. The theoretical influence his injury would have is also listed for context.

In [None]:
injury_watch_sorted