<a href="https://colab.research.google.com/github/krishna-kenny/nbaWinNeuralNetModel/blob/main/nba.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install nba_api

Collecting nba_api
  Downloading nba_api-1.7.0-py3-none-any.whl.metadata (5.5 kB)
Downloading nba_api-1.7.0-py3-none-any.whl (280 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m280.2/280.2 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: nba_api
Successfully installed nba_api-1.7.0


In [3]:
import time
import pandas as pd
from nba_api.stats.endpoints import TeamInfoCommon, TeamGameLogs, PlayerGameLogs, LeagueGameFinder, LeagueLeaders, PlayerCareerStats
from nba_api.stats.static import teams

MAX_RETRIES = 3
seasons = [
    "2015-16", "2016-17", "2017-18", "2018-19", "2019-20",
    "2020-21", "2021-22", "2022-23", "2023-24", "2024-25"
]


In [4]:
def fetch_with_retries(func, *args, **kwargs):
    """Attempts a function call up to MAX_RETRIES with exponential backoff."""
    for attempt in range(MAX_RETRIES):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            wait_time = 2**attempt
            print(f"Error: {e}. Retrying in {wait_time} seconds...")
            time.sleep(wait_time)
    print(f"Failed after {MAX_RETRIES} attempts.")
    return None

In [5]:
def get_team_info(seasons):
    """Fetches relevant team information for the specified seasons."""
    print("Fetching team information...")
    nba_teams = teams.get_teams()
    team_data = []

    for team in nba_teams:
        team_info = fetch_with_retries(
            TeamInfoCommon,
            team_id=team["id"],
            season_type_nullable="Regular Season",
            timeout=60,
        )
        if team_info:
            df_team = team_info.get_data_frames()[0]
            team_data.append(df_team)
            time.sleep(0.6)

    if team_data:
        df_teams = pd.concat(team_data, ignore_index=True)
        df_teams.to_csv("nba_team_data.csv", index=False)
    else:
        print("No team data fetched.")

get_team_info(seasons[-1])
print("Team information data stored.")

Fetching team information...
Team information data stored.


In [6]:
def get_team_game_logs(seasons):
    """Fetches team game logs for the specified seasons and processes the MATCHUP column."""
    print("Fetching team game logs...")
    game_log_data = []

    for season in seasons:
        game_logs = fetch_with_retries(
            TeamGameLogs,
            season_nullable=season,
            season_type_nullable="Regular Season",
            timeout=60,
        )
        if game_logs:
            df_game_logs = game_logs.get_data_frames()[0]
            game_log_data.append(df_game_logs)

    if game_log_data:
        df_all_game_logs = pd.concat(game_log_data, ignore_index=True)
        matchups_split = df_all_game_logs['MATCHUP'].str.split(' @ | vs. ', expand=True)
        df_all_game_logs['TEAM1'] = matchups_split[0]
        df_all_game_logs['TEAM2'] = matchups_split[1]
        df_all_game_logs.drop(columns=['MATCHUP'], inplace=True)

        df_all_game_logs['SEASON_YEAR'] = pd.to_datetime(df_all_game_logs['GAME_DATE']).dt.year.astype(str)

        df_all_game_logs.to_csv("nba_game_logs.csv", index=False)
        print("Processed game logs saved to 'nba_game_logs.csv'.")
    else:
        print("No game log data fetched.")

get_team_game_logs(seasons)
print("Team game logs data stored.")

Fetching team game logs...
Processed game logs saved to 'nba_game_logs.csv'.
Team game logs data stored.


In [7]:
import pandas as pd

def process_game_logs():
    file_path = "nba_game_logs.csv"
    df = pd.read_csv(file_path)

    df["GAME_DATE"] = pd.to_datetime(df["GAME_DATE"])

    current_season = df["SEASON_YEAR"].max()
    df = df[df["SEASON_YEAR"] <= current_season]

    aggregation_rules = {
        "GAME_ID": "count",
        "WL": lambda x: (x == "W").sum(),
        "PTS": "sum",
        "FGM": "sum", "FGA": "sum", "FG_PCT": "mean",
        "FG3M": "sum", "FG3A": "sum", "FG3_PCT": "mean",
        "FTM": "sum", "FTA": "sum", "FT_PCT": "mean",
        "OREB": "sum", "DREB": "sum", "REB": "sum",
        "AST": "sum", "TOV": "sum", "STL": "sum", "BLK": "sum", "BLKA": "sum",
        "PF": "sum", "PFD": "sum", "PLUS_MINUS": "sum"
    }

    aggregated_df = df.groupby(["SEASON_YEAR", "TEAM_ID", "TEAM_ABBREVIATION"]).agg(aggregation_rules)

    aggregated_df.rename(columns={"GAME_ID": "GP", "WL": "W"}, inplace=True)

    aggregated_df["L"] = aggregated_df["GP"] - aggregated_df["W"]
    aggregated_df["W_PCT"] = aggregated_df["W"] / aggregated_df["GP"]

    aggregated_df.reset_index(inplace=True)

    output_file = "nba_aggregated_data.csv"
    aggregated_df.to_csv(output_file, index=False)

    print(f"Aggregated data saved to {output_file}")

process_game_logs()
print("Aggregated data stored.")


Aggregated data saved to nba_aggregated_data.csv
Aggregated data stored.


In [27]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Dense, Dropout, Input
from keras.optimizers import Adam
import joblib
import tensorflow.keras.backend as K
from keras.saving import register_keras_serializable

@register_keras_serializable()
def custom_accuracy(y_true, y_pred):
    y_true = K.cast(y_true, dtype="float32")
    return K.mean(K.equal(K.round(y_pred), y_true))

def prepare_training_data(game_logs_file, aggregated_file):
    df_logs = pd.read_csv(game_logs_file, parse_dates=["GAME_DATE"])
    df_agg = pd.read_csv(aggregated_file)
    df_logs["TEAM1"] = df_logs["TEAM1"].str.strip().str.upper()
    df_logs["TEAM2"] = df_logs["TEAM2"].str.strip().str.upper()
    df_agg["TEAM_ABBREVIATION"] = df_agg["TEAM_ABBREVIATION"].str.strip().str.upper()
    df_logs["SEASON_YEAR"] = df_logs["SEASON_YEAR"].astype(str).str.strip()
    df_agg["SEASON_YEAR"] = df_agg["SEASON_YEAR"].astype(str).str.strip()
    # Merge aggregated stats for TEAM1 and TEAM2
    df_train = df_logs.merge(
        df_agg.add_suffix("_TEAM1"),
        left_on=["SEASON_YEAR", "TEAM1"],
        right_on=["SEASON_YEAR_TEAM1", "TEAM_ABBREVIATION_TEAM1"],
        how="left"
    ).merge(
        df_agg.add_suffix("_TEAM2"),
        left_on=["SEASON_YEAR", "TEAM2"],
        right_on=["SEASON_YEAR_TEAM2", "TEAM_ABBREVIATION_TEAM2"],
        how="left"
    )
    df_train.dropna(inplace=True)
    df_train["WL"] = df_train["WL"].astype(str).str.strip().str.upper()
    y = (df_train["WL"] == "W").astype(int).to_numpy()
    # Define base features: use only numeric columns from aggregated data (discard TEAM_ID, TEAM_ABBREVIATION, SEASON_YEAR)
    base_cols = [col for col in df_agg.columns if col not in ["TEAM_ID", "TEAM_ABBREVIATION", "SEASON_YEAR"]]
    feature_cols = [f"{col}_TEAM1" for col in base_cols] + [f"{col}_TEAM2" for col in base_cols]
    X = df_train[feature_cols].to_numpy()
    joblib.dump(feature_cols, "feature_names.pkl")
    return X, y, feature_cols

def build_model(input_shape):
    model = Sequential([
        Input(shape=(input_shape,)),
        Dense(128, activation="relu"),
        Dropout(0.3),
        Dense(64, activation="relu"),
        Dropout(0.3),
        Dense(32, activation="relu"),
        Dense(1, activation="sigmoid")
    ])
    model.compile(optimizer=Adam(learning_rate=0.001), loss="binary_crossentropy", metrics=[custom_accuracy])
    return model

def train_model(X, y):
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    X_train, X_val, y_train, y_val = train_test_split(X_scaled, y, test_size=0.2, random_state=42)
    model = build_model(X_train.shape[1])
    model.fit(X_train, y_train, epochs=10, batch_size=1, validation_data=(X_val, y_val))
    return model, scaler

def main():
    game_logs_file = "nba_game_logs.csv"
    aggregated_file = "nba_aggregated_data.csv"
    X, y, _ = prepare_training_data(game_logs_file, aggregated_file)
    model, scaler = train_model(X, y)
    model.save("model.keras")
    joblib.dump(scaler, "scaler.pkl")

if __name__ == "__main__":
    main()


Epoch 1/10
[1m18564/18564[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m56s[0m 3ms/step - custom_accuracy: 0.6320 - loss: 0.6503 - val_custom_accuracy: 0.6534 - val_loss: 0.6260
Epoch 2/10
[1m18564/18564[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m53s[0m 3ms/step - custom_accuracy: 0.6593 - loss: 0.6264 - val_custom_accuracy: 0.6521 - val_loss: 0.6281
Epoch 3/10
[1m18564/18564[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m54s[0m 3ms/step - custom_accuracy: 0.6625 - loss: 0.6217 - val_custom_accuracy: 0.6622 - val_loss: 0.6241
Epoch 4/10
[1m18564/18564[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m78s[0m 3ms/step - custom_accuracy: 0.6555 - loss: 0.6273 - val_custom_accuracy: 0.6570 - val_loss: 0.6556
Epoch 5/10
[1m18564/18564[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m54s[0m 3ms/step - custom_accuracy: 0.6547 - loss: 0.6263 - val_custom_accuracy: 0.6575 - val_loss: 0.6217
Epoch 6/10
[1m18564/18564[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m54s[0m 3ms/step - cu

In [28]:
import numpy as np
import pandas as pd
import joblib
from tensorflow.keras.models import load_model
import tensorflow.keras.backend as K
from nba_api.stats.static import teams
from keras.saving import register_keras_serializable

@register_keras_serializable()
def custom_accuracy(y_true, y_pred):
    y_true = K.cast(y_true, dtype="float32")
    return K.mean(K.equal(K.round(y_pred), y_true))

def load_feature_names(feature_names_file="feature_names.pkl"):
    return joblib.load(feature_names_file)

def fetch_team_stats(team_abbr, season_year, aggregated_file, base_features):
    df = pd.read_csv(aggregated_file)
    df.columns = df.columns.str.strip()
    df["TEAM_ABBREVIATION"] = df["TEAM_ABBREVIATION"].str.strip().str.upper()
    df["SEASON_YEAR"] = df["SEASON_YEAR"].astype(str).str.strip()
    row = df[(df["TEAM_ABBREVIATION"] == team_abbr.upper()) & (df["SEASON_YEAR"] == str(season_year))]
    if row.empty:
        raise ValueError(f"❌ Features for team '{team_abbr}' in {season_year} not found.")
    return row[base_features].iloc[0]

def predict_matchup_win_probability(team1_abbr, team2_abbr, season_year, aggregated_file,
                                    model_path="model.keras", scaler_path="scaler.pkl", feature_names_file="feature_names.pkl"):
    model = load_model(model_path, custom_objects={"custom_accuracy": custom_accuracy})
    scaler = joblib.load(scaler_path)
    # Load expected feature names (e.g. ['GP_TEAM1', 'W_TEAM1', ... 'GP_TEAM2', 'W_TEAM2', ...])
    expected_feature_names = load_feature_names(feature_names_file)
    # Determine base features from expected_feature_names (remove suffixes)
    base_features = sorted({name.rsplit("_", 1)[0] for name in expected_feature_names})
    df = pd.read_csv(aggregated_file)
    df.columns = df.columns.str.strip()
    # For team1 and team2, fetch base aggregated stats
    team1_stats = fetch_team_stats(team1_abbr, season_year, aggregated_file, base_features)
    team2_stats = fetch_team_stats(team2_abbr, season_year, aggregated_file, base_features)
    team1_stats.index = [f"{col}_TEAM1" for col in team1_stats.index]
    team2_stats.index = [f"{col}_TEAM2" for col in team2_stats.index]
    merged = pd.concat([team1_stats, team2_stats])
    merged = merged.reindex(expected_feature_names, fill_value=0)
    X_input = merged.values.reshape(1, -1)
    if X_input.shape[1] != scaler.mean_.shape[0]:
        raise ValueError(f"❌ Feature size mismatch! Model expects {scaler.mean_.shape[0]}, got {X_input.shape[1]}.")
    X_scaled = scaler.transform(X_input)
    prob = model.predict(X_scaled)[0][0]
    print(f"\n🏀 Win Probability for {team1_abbr} vs {team2_abbr} in {season_year}: {prob*100:.2f}%")

def display_team_data():
    nba = teams.get_teams()
    for t in nba:
        print(f"{t['abbreviation']} - {t['full_name']}")

def main():
    aggregated_file = "nba_aggregated_data.csv"
    display_team_data()
    team1 = input("Enter Team 1 abbreviation: ").strip().upper()
    team2 = input("Enter Team 2 abbreviation: ").strip().upper()
    season_year = int(input("Enter season year: "))
    try:
        predict_matchup_win_probability(team1, team2, season_year, aggregated_file)
    except ValueError as e:
        print(f"⚠️ Error: {e}")

if __name__ == "__main__":
    main()


ATL - Atlanta Hawks
BOS - Boston Celtics
CLE - Cleveland Cavaliers
NOP - New Orleans Pelicans
CHI - Chicago Bulls
DAL - Dallas Mavericks
DEN - Denver Nuggets
GSW - Golden State Warriors
HOU - Houston Rockets
LAC - Los Angeles Clippers
LAL - Los Angeles Lakers
MIA - Miami Heat
MIL - Milwaukee Bucks
MIN - Minnesota Timberwolves
BKN - Brooklyn Nets
NYK - New York Knicks
ORL - Orlando Magic
IND - Indiana Pacers
PHI - Philadelphia 76ers
PHX - Phoenix Suns
POR - Portland Trail Blazers
SAC - Sacramento Kings
SAS - San Antonio Spurs
OKC - Oklahoma City Thunder
TOR - Toronto Raptors
UTA - Utah Jazz
MEM - Memphis Grizzlies
WAS - Washington Wizards
DET - Detroit Pistons
CHA - Charlotte Hornets
Enter Team 1 abbreviation: CHI
Enter Team 2 abbreviation: PHI
Enter season year: 2025




[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 99ms/step

🏀 Win Probability for CHI vs PHI in 2025: 49.85%
