<a href="https://colab.research.google.com/github/jryuuu22/basketball-eclipse-project/blob/main/Untitled1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [20]:
"""
ft_pressure_final.py

Free Throw Pressure Model:
- Prepares features from NBA play_by_play.csv
- Trains a RandomForest with Optuna-tuned hyperparameters
- Saves model + preprocessing artifacts
- Provides FTPressurePredictor class for single + batch predictions
- Includes evaluation and interpretability (feature importances, OOB, shot-level explanations)
"""

import re
from typing import List, Dict, Union, Optional, Any

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    accuracy_score,
    average_precision_score,
    brier_score_loss,
    roc_curve,
    precision_recall_curve,
    auc,
    confusion_matrix,
)

import pickle

In [21]:
FEATURE_COLS: List[str] = [
    "season_FT_pct",
    "overall_ft_pct",
    "clutch_ft_pct",
    "clutch_factor",
    "career_attempts_so_far",
    "period",
    "seconds_remaining",
    "is_clutch",
    "close_game",
    "late_game",
    "pressure_score",
    "point_differential",
]

# Best RF parameters from your Optuna run (rf_best_params)
BEST_RF_PARAMS = {
    "n_estimators": 188,
    "max_depth": 4,
    "min_samples_split": 6,
    "min_samples_leaf": 11,
    "max_features": "log2",
    "random_state": 42,
    "n_jobs": -1,
    "oob_score": True,  # enable OOB for interpretability
}


# -------------------

In [22]:
def _extract_player_name(description: Any) -> Optional[str]:
    """Extract player name from FT description like 'MISS Jordan Free Throw 2 of 2'."""
    if pd.isna(description):
        return None
    match = re.search(r"^(?:MISS\s+)?([A-Za-z\.\s]+?)\s+Free Throw", str(description))
    return match.group(1).strip() if match else None


def _time_to_seconds(time_str: Any) -> Optional[int]:
    """Convert 'MM:SS' to total seconds."""
    if pd.isna(time_str):
        return None
    try:
        mm, ss = str(time_str).split(":")
        return int(mm) * 60 + int(ss)
    except Exception:
        return None


def _parse_score_diff(score_str: Any) -> Optional[int]:
    """Parse '71 - 85' â†’ |71 - 85|."""
    if pd.isna(score_str):
        return None
    try:
        home, away = score_str.split(" - ")
        return abs(int(home) - int(away))
    except Exception:
        return None


def prepare_ft_dataset_from_df(input_df: pd.DataFrame) -> pd.DataFrame:
    """
    Feature-engineer a modeling DataFrame from raw play_by_play data.

    Returns
    -------
    model_df : DataFrame
        Contains FEATURE_COLS + ['FT_made', 'game_id'].
    """
    df = input_df.copy()

    # 1) Keep only free throw events
    keyword = "Free Throw"
    df = df[
        df[["homedescription", "visitordescription"]]
        .apply(lambda x: x.astype(str).str.contains(keyword, case=False, na=False))
        .any(axis=1)
    ]

In [23]:
def prepare_ft_dataset_from_df(input_df: pd.DataFrame) -> pd.DataFrame:
    """
    Feature-engineer a modeling DataFrame from raw play_by_play data.

    Returns
    -------
    model_df : DataFrame
        Contains FEATURE_COLS + ['FT_made', 'game_id'].
    """
    df = input_df.copy()

    # 1) Keep only free throw events
    keyword = "Free Throw"
    df = df[
        df[["homedescription", "visitordescription"]]
        .apply(lambda x: x.astype(str).str.contains(keyword, case=False, na=False))
        .any(axis=1)
    ]

    # 2) Drop many unused columns
    df = df.drop(
        columns=[
            "neutraldescription",
            "video_available_flag",
            "person1type",
            "player1_team_nickname",
            "player1_team_abbreviation",
            "player1_id",
            "player1_name",
            "player1_team_city",
            "player1_team_id",
            "person2type",
            "player2_team_nickname",
            "player2_team_abbreviation",
            "player2_id",
            "player2_name",
            "player2_team_city",
            "player2_team_id",
            "person3type",
            "player3_team_nickname",
            "player3_team_abbreviation",
            "player3_id",
            "player3_name",
            "player3_team_city",
            "player3_team_id",
        ],
        errors="ignore",
    )

    # 3) Merge home/visitor descriptions into a single FT_Event
    nan_idx = df[df["homedescription"].isna()].index
    df.loc[nan_idx, "homedescription"] = df.loc[nan_idx, "visitordescription"]
    df = df.drop(columns=["visitordescription"], errors="ignore")
    df = df.rename(columns={"homedescription": "FT_Event"})

    # 4) Target: made vs missed FT
    missed_mask = df["FT_Event"].str.contains("MISS", case=False, na=False)
    df["FT_made"] = (~missed_mask).astype(int)

    # 5) Fix score with game-wise forward fill
    df = df.sort_values(["game_id", "period", "eventnum"])
    df["score"] = df.groupby("game_id")["score"].ffill()
    df = df.dropna(subset=["score"])

    # 6) Parse score to home/away and margin
    score_clean = df["score"].str.replace(" ", "")
    df["home_score"] = score_clean.str.split("-").str[0].astype(int)
    df["away_score"] = score_clean.str.split("-").str[1].astype(int)
    df["scoremargin"] = df["home_score"] - df["away_score"]

    # 7) Player name
    df["player_name"] = df["FT_Event"].apply(_extract_player_name)

    # 8) Time and clutch flags
    df["seconds_remaining"] = df["pctimestring"].apply(_time_to_seconds)
    df["is_clutch"] = (
        (df["period"] >= 4) & (df["seconds_remaining"] <= 300)
    ).astype(int)
    df["point_differential"] = df["score"].apply(_parse_score_diff)
    df["high_pressure"] = (
        (df["is_clutch"] == 1) & (df["point_differential"] <= 5)
    ).astype(int)

    # 9) Player-level aggregates
    # Season FT% (within this dataset)
    season_ft = (
        df.groupby("player_name")["FT_made"]
        .mean()
        .reset_index()
        .rename(columns={"FT_made": "season_FT_pct"})
    )
    df = df.merge(season_ft, on="player_name", how="left")

    # Overall FT%, clutch/high-pressure attempts, clutch FT%
    player_stats = (
        df.groupby("player_name")
        .agg(
            {
                "FT_made": ["sum", "count", "mean"],
                "is_clutch": "sum",
                "high_pressure": "sum",
            }
        )
        .reset_index()
    )
    player_stats.columns = [
        "player_name",
        "ft_made_total",
        "ft_attempts",
        "overall_ft_pct",
        "clutch_attempts",
        "high_pressure_attempts",
    ]

    clutch_stats = (
        df[df["is_clutch"] == 1]
        .groupby("player_name")["FT_made"]
        .mean()
        .reset_index()
        .rename(columns={"FT_made": "clutch_ft_pct"})
    )
    high_pressure_stats = (
        df[df["high_pressure"] == 1]
        .groupby("player_name")["FT_made"]
        .mean()
        .reset_index()
        .rename(columns={"FT_made": "high_pressure_ft_pct"})
    )

    player_stats = player_stats.merge(clutch_stats, on="player_name", how="left")
    player_stats = player_stats.merge(high_pressure_stats, on="player_name", how="left")
    player_stats["clutch_factor"] = (
        player_stats["clutch_ft_pct"] - player_stats["overall_ft_pct"]
    )

    df = df.merge(
        player_stats[["player_name", "overall_ft_pct", "clutch_ft_pct", "clutch_factor"]],
        on="player_name",
        how="left",
    )

    # 10) Context flags
    df["close_game"] = (df["point_differential"] <= 3).astype(int)
    df["late_game"] = (df["period"] >= 4).astype(int)
    df["pressure_score"] = (
        df["is_clutch"].astype(int)
        + df["close_game"].astype(int)
        + (df["seconds_remaining"] <= 120).astype(int)
    )

    # 11) Career attempts so far (within this dataset)
    career_attempts = df.groupby("player_name").size().to_dict()
    df["career_attempts_so_far"] = df["player_name"].map(career_attempts)

    # Final modeling frame
    model_df = df[FEATURE_COLS + ["FT_made", "game_id"]].copy()
    model_df = model_df.dropna(subset=["FT_made", "game_id"])

    return model_df

In [24]:
def train_and_save_model(
    play_by_play_path: str,
    nrows: Optional[int] = None,
    test_size: float = 0.2,
    model_path: str = "final_rf.pkl",
    imputer_path: str = "imputer.pkl",
    scaler_path: str = "scaler.pkl",
    x_test_path: str = "X_test_raw.csv",
    y_test_path: str = "y_test.csv",
) -> Dict[str, float]:
    """
    End-to-end training from play_by_play CSV.

    - Game-level train/test split (no leakage)
    - Impute + scale features
    - Train RandomForest with BEST_RF_PARAMS (OOB enabled)
    - Save model, imputer, scaler, and raw test set (for later evaluation)

    Returns
    -------
    metrics : dict
        {'accuracy', 'pr_auc', 'brier', 'oob_score'}
    """
    raw_df = pd.read_csv(play_by_play_path, nrows=nrows)
    model_df = prepare_ft_dataset_from_df(raw_df)

    X = model_df[FEATURE_COLS].copy()
    y = model_df["FT_made"].astype(int).copy()
    game_ids = model_df["game_id"].unique()

    # Game-level split
    train_games, test_games = train_test_split(
        game_ids, test_size=test_size, shuffle=False
    )
    train_mask = model_df["game_id"].isin(train_games)
    test_mask = model_df["game_id"].isin(test_games)

    X_train_raw = X[train_mask].copy()
    X_test_raw = X[test_mask].copy()
    y_train = y[train_mask].copy()
    y_test = y[test_mask].copy()

    # Impute + scale
    imputer = SimpleImputer(strategy="median")
    X_train_imp = pd.DataFrame(
        imputer.fit_transform(X_train_raw),
        columns=FEATURE_COLS,
        index=X_train_raw.index,
    )
    X_test_imp = pd.DataFrame(
        imputer.transform(X_test_raw),
        columns=FEATURE_COLS,
        index=X_test_raw.index,
    )

    scaler = StandardScaler()
    X_train_scaled = pd.DataFrame(
        scaler.fit_transform(X_train_imp),
        columns=FEATURE_COLS,
        index=X_train_imp.index,
    )
    X_test_scaled = pd.DataFrame(
        scaler.transform(X_test_imp),
        columns=FEATURE_COLS,
        index=X_test_imp.index,
    )

    # Model with OOB
    model = RandomForestClassifier(**BEST_RF_PARAMS)
    model.fit(X_train_scaled, y_train)

    # Test metrics
    y_proba_test = model.predict_proba(X_test_scaled)[:, 1]
    y_pred_test = (y_proba_test >= 0.5).astype(int)

    acc = accuracy_score(y_test, y_pred_test)
    pr_auc = average_precision_score(y_test, y_proba_test)
    brier = brier_score_loss(y_test, y_proba_test)
    oob = float(model.oob_score_)

    # Save artifacts
    X_test_raw.to_csv(x_test_path, index=False)
    y_test.to_csv(y_test_path, index=False)

    with open(model_path, "wb") as f:
        pickle.dump(model, f)
    with open(imputer_path, "wb") as f:
        pickle.dump(imputer, f)
    with open(scaler_path, "wb") as f:
        pickle.dump(scaler, f)

    metrics = {
        "accuracy": float(acc),
        "pr_auc": float(pr_auc),
        "brier": float(brier),
        "oob_score": oob,
    }
    return metrics

In [25]:
class PredictionExplanation:
    """
    Explanation object for a single prediction.

    Attributes
    ----------
    prob_make : float
    predicted_label : int
    top_factors : List[Dict[str, Any]]
        Each item: {'feature', 'value', 'contribution_pct}
    """

    def __init__(
        self,
        prob_make: float,
        predicted_label: int,
        top_factors: List[Dict[str, Any]],
    ):
        self.prob_make = prob_make
        self.predicted_label = predicted_label
        self.top_factors = top_factors


class FTPressurePredictor:
    """
    Wrapper around trained RandomForest + preprocessing.

    Supports:
    - Single and batch predictions
    - Global feature importances
    - Simple per-shot explanations that attribute contributions
      using feature_importances_ and scaled feature values.
    """

    def __init__(
        self,
        model_path: str = "final_rf.pkl",
        imputer_path: str = "imputer.pkl",
        scaler_path: str = "scaler.pkl",
        feature_cols: Optional[List[str]] = None,
    ):
        self.feature_cols = feature_cols or FEATURE_COLS

        with open(model_path, "rb") as f:
            self.model = pickle.load(f)
        with open(imputer_path, "rb") as f:
            self.imputer = pickle.load(f)
        with open(scaler_path, "rb") as f:
            self.scaler = pickle.load(f)

        if not hasattr(self.model, "feature_importances_"):
            raise ValueError(
                "Loaded model does not expose feature_importances_. "
                "Use a tree-based model such as RandomForest."
            )

        if len(self.model.feature_importances_) != len(self.feature_cols):
            raise ValueError(
                "Feature length mismatch between model and feature_cols. "
                "Check training vs. inference configuration."
            )

    # ---- internal helpers ----
    def _to_dataframe(
        self, X: Union[Dict[str, float], pd.Series, pd.DataFrame]
    ) -> pd.DataFrame:
        """Coerce input into a DataFrame with correct column order."""
        if isinstance(X, dict):
            df = pd.DataFrame([X])
        elif isinstance(X, pd.Series):
            df = X.to_frame().T
        elif isinstance(X, pd.DataFrame):
            df = X.copy()
        else:
            raise TypeError("X must be dict, pandas Series, or pandas DataFrame.")

        missing = [c for c in self.feature_cols if c not in df.columns]
        if missing:
            raise ValueError(f"Missing required feature(s): {missing}")

        df = df[self.feature_cols]
        df = df.apply(pd.to_numeric, errors="coerce")
        return df

    def _prepare_for_model(self, X_raw: pd.DataFrame) -> pd.DataFrame:
        """Apply imputer + scaler to raw features."""
        X_imp = pd.DataFrame(
            self.imputer.transform(X_raw),
            columns=self.feature_cols,
            index=X_raw.index,
        )
        X_scaled = pd.DataFrame(
            self.scaler.transform(X_imp),
            columns=self.feature_cols,
            index=X_imp.index,
        )
        return X_scaled

    # ---- public API ----
    def predict_proba(
        self, X: Union[Dict[str, float], pd.Series, pd.DataFrame]
    ) -> np.ndarray:
        """
        Predict P(FT make) for one or many rows of *raw* features.

        Returns
        -------
        probs : np.ndarray of shape (n_samples,)
        """
        df_raw = self._to_dataframe(X)
        X_scaled = self._prepare_for_model(df_raw)
        probs = self.model.predict_proba(X_scaled)[:, 1]
        return probs

    def predict(
        self,
        X: Union[Dict[str, float], pd.Series, pd.DataFrame],
        threshold: float = 0.5,
    ) -> np.ndarray:
        """
        Predict binary outcome (1 = make, 0 = miss).
        """
        probs = self.predict_proba(X)
        return (probs >= threshold).astype(int)

    def predict_single_with_explanation(
        self,
        X: Union[Dict[str, float], pd.Series],
        threshold: float = 0.5,
        top_k: int = 3,
    ) -> PredictionExplanation:
        """
        Single-shot prediction with human-friendly explanation.

        Explanation idea:
        - Take global feature_importances_ (how much the model relies on each feature).
        - Multiply by the absolute value of this shot's scaled feature values.
        - Normalize to get contribution percentages.
        """
        df_raw = self._to_dataframe(X)
        if len(df_raw) != 1:
            raise ValueError("predict_single_with_explanation expects exactly one row.")

        X_scaled = self._prepare_for_model(df_raw)
        prob_make = float(self.predict_proba(df_raw)[0])
        predicted_label = int(prob_make >= threshold)

        fi = np.array(self.model.feature_importances_, dtype=float)
        fi = np.maximum(fi, 0.0)

        row_vals = X_scaled.iloc[0].values.astype(float)
        contrib_scores = np.abs(row_vals) * fi
        total = contrib_scores.sum()
        if total <= 0:
            contrib_scores = fi
            total = contrib_scores.sum() if contrib_scores.sum() > 0 else 1.0

        contrib_pct = (contrib_scores / total) * 100.0

        contrib_df = (
            pd.DataFrame(
                {
                    "feature": self.feature_cols,
                    "value": row_vals,
                    "contribution_pct": contrib_pct,
                }
            )
            .sort_values("contribution_pct", ascending=False)
            .reset_index(drop=True)
        )

        top = contrib_df.head(top_k)
        top_factors = [
            {
                "feature": r["feature"],
                "value": float(r["value"]),
                "contribution_pct": float(r["contribution_pct"]),
            }
            for _, r in top.iterrows()
        ]

        return PredictionExplanation(
            prob_make=prob_make,
            predicted_label=predicted_label,
            top_factors=top_factors,
        )

    def feature_importances(self) -> pd.DataFrame:
        """
        Global feature importance ranking as a tidy DataFrame.
        """
        fi = np.array(self.model.feature_importances_, dtype=float)
        total = fi.sum() if fi.sum() > 0 else 1.0
        pct = (fi / total) * 100.0
        return (
            pd.DataFrame(
                {"feature": self.feature_cols, "importance": fi, "importance_pct": pct}
            )
            .sort_values("importance", ascending=False)
            .reset_index(drop=True)
        )

    def get_oob_score(self) -> Optional[float]:
        """
        Return the model's out-of-bag (OOB) score if available.
        """
        return getattr(self.model, "oob_score_", None)


# ------------------------------
# Evaluation helper
# ------------------------------
def evaluate_model(
    predictor: FTPressurePredictor,
    X_test: pd.DataFrame,
    y_test: Union[pd.Series, np.ndarray],
    threshold: float = 0.5,
) -> Dict[str, float]:
    """
    Evaluate a trained predictor on a given test set.

    Parameters
    ----------
    predictor : FTPressurePredictor
    X_test : DataFrame with raw feature columns (FEATURE_COLS)
    y_test : Series/array of 0/1 labels

    Returns
    -------
    metrics : dict
        {'accuracy', 'pr_auc', 'brier', 'roc_auc'}
    """
    if isinstance(y_test, pd.Series):
        y = y_test.values
    else:
        y = y_test

    probs = predictor.predict_proba(X_test)
    preds = (probs >= threshold).astype(int)

    acc = accuracy_score(y, preds)
    pr_auc = average_precision_score(y, probs)
    brier = brier_score_loss(y, probs)
    fpr, tpr, _ = roc_curve(y, probs)
    roc_auc_val = auc(fpr, tpr)

    cm = confusion_matrix(y, preds)

    print("=== Test Set Performance ===")
    print(f"Accuracy : {acc:.4f}")
    print(f"PR-AUC   : {pr_auc:.4f}")
    print(f"ROC-AUC  : {roc_auc_val:.4f}")
    print(f"Brier    : {brier:.4f}")
    print("Confusion matrix [rows=true, cols=pred]:")
    print(cm)

    return {
        "accuracy": float(acc),
        "pr_auc": float(pr_auc),
        "roc_auc": float(roc_auc_val),
        "brier": float(brier),
    }

In [26]:
# ============================
# 1. Write ft_pressure_final.py
# ============================
module_code = r"""
import re
from typing import List, Dict, Union, Optional, Any

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    accuracy_score,
    average_precision_score,
    brier_score_loss,
    roc_curve,
    precision_recall_curve,
    auc,
    confusion_matrix,
)

import pickle


FEATURE_COLS: List[str] = [
    "season_FT_pct",
    "overall_ft_pct",
    "clutch_ft_pct",
    "clutch_factor",
    "career_attempts_so_far",
    "period",
    "seconds_remaining",
    "is_clutch",
    "close_game",
    "late_game",
    "pressure_score",
    "point_differential",
]

BEST_RF_PARAMS = {
    "n_estimators": 188,
    "max_depth": 4,
    "min_samples_split": 6,
    "min_samples_leaf": 11,
    "max_features": "log2",
    "random_state": 42,
    "n_jobs": -1,
    "oob_score": True,
}


def _extract_player_name(description: Any) -> Optional[str]:
    if pd.isna(description):
        return None
    match = re.search(r"^(?:MISS\s+)?([A-Za-z\.\s]+?)\s+Free Throw", str(description))
    return match.group(1).strip() if match else None


def _time_to_seconds(time_str: Any) -> Optional[int]:
    if pd.isna(time_str):
        return None
    try:
        mm, ss = str(time_str).split(":")
        return int(mm) * 60 + int(ss)
    except Exception:
        return None


def _parse_score_diff(score_str: Any) -> Optional[int]:
    if pd.isna(score_str):
        return None
    try:
        home, away = score_str.split(" - ")
        return abs(int(home) - int(away))
    except Exception:
        return None


def prepare_ft_dataset_from_df(input_df: pd.DataFrame) -> pd.DataFrame:
    df = input_df.copy()

    keyword = "Free Throw"
    df = df[
        df[["homedescription", "visitordescription"]]
        .apply(lambda x: x.astype(str).str.contains(keyword, case=False, na=False))
        .any(axis=1)
    ]

    df = df.drop(
        columns=[
            "neutraldescription",
            "video_available_flag",
            "person1type",
            "player1_team_nickname",
            "player1_team_abbreviation",
            "player1_id",
            "player1_name",
            "player1_team_city",
            "player1_team_id",
            "person2type",
            "player2_team_nickname",
            "player2_team_abbreviation",
            "player2_id",
            "player2_name",
            "player2_team_city",
            "player2_team_id",
            "person3type",
            "player3_team_nickname",
            "player3_team_abbreviation",
            "player3_id",
            "player3_name",
            "player3_team_city",
            "player3_team_id",
        ],
        errors="ignore",
    )

    nan_idx = df[df["homedescription"].isna()].index
    df.loc[nan_idx, "homedescription"] = df.loc[nan_idx, "visitordescription"]
    df = df.drop(columns=["visitordescription"], errors="ignore")
    df = df.rename(columns={"homedescription": "FT_Event"})

    missed_mask = df["FT_Event"].str.contains("MISS", case=False, na=False)
    df["FT_made"] = (~missed_mask).astype(int)

    df = df.sort_values(["game_id", "period", "eventnum"])
    df["score"] = df.groupby("game_id")["score"].ffill()
    df = df.dropna(subset=["score"])

    score_clean = df["score"].str.replace(" ", "")
    df["home_score"] = score_clean.str.split("-").str[0].astype(int)
    df["away_score"] = score_clean.str.split("-").str[1].astype(int)
    df["scoremargin"] = df["home_score"] - df["away_score"]

    df["player_name"] = df["FT_Event"].apply(_extract_player_name)
    df["seconds_remaining"] = df["pctimestring"].apply(_time_to_seconds)
    df["is_clutch"] = (
        (df["period"] >= 4) & (df["seconds_remaining"] <= 300)
    ).astype(int)
    df["point_differential"] = df["score"].apply(_parse_score_diff)
    df["high_pressure"] = (
        (df["is_clutch"] == 1) & (df["point_differential"] <= 5)
    ).astype(int)

    season_ft = (
        df.groupby("player_name")["FT_made"]
        .mean()
        .reset_index()
        .rename(columns={"FT_made": "season_FT_pct"})
    )
    df = df.merge(season_ft, on="player_name", how="left")

    player_stats = (
        df.groupby("player_name")
        .agg(
            {
                "FT_made": ["sum", "count", "mean"],
                "is_clutch": "sum",
                "high_pressure": "sum",
            }
        )
        .reset_index()
    )
    player_stats.columns = [
        "player_name",
        "ft_made_total",
        "ft_attempts",
        "overall_ft_pct",
        "clutch_attempts",
        "high_pressure_attempts",
    ]

    clutch_stats = (
        df[df["is_clutch"] == 1]
        .groupby("player_name")["FT_made"]
        .mean()
        .reset_index()
        .rename(columns={"FT_made": "clutch_ft_pct"})
    )
    high_pressure_stats = (
        df[df["high_pressure"] == 1]
        .groupby("player_name")["FT_made"]
        .mean()
        .reset_index()
        .rename(columns={"FT_made": "high_pressure_ft_pct"})
    )

    player_stats = player_stats.merge(clutch_stats, on="player_name", how="left")
    player_stats = player_stats.merge(high_pressure_stats, on="player_name", how="left")
    player_stats["clutch_factor"] = (
        player_stats["clutch_ft_pct"] - player_stats["overall_ft_pct"]
    )

    df = df.merge(
        player_stats[["player_name", "overall_ft_pct", "clutch_ft_pct", "clutch_factor"]],
        on="player_name",
        how="left",
    )

    df["close_game"] = (df["point_differential"] <= 3).astype(int)
    df["late_game"] = (df["period"] >= 4).astype(int)
    df["pressure_score"] = (
        df["is_clutch"].astype(int)
        + df["close_game"].astype(int)
        + (df["seconds_remaining"] <= 120).astype(int)
    )

    career_attempts = df.groupby("player_name").size().to_dict()
    df["career_attempts_so_far"] = df["player_name"].map(career_attempts)

    model_df = df[FEATURE_COLS + ["FT_made", "game_id"]].copy()
    model_df = model_df.dropna(subset=["FT_made", "game_id"])

    return model_df


def train_and_save_model(
    play_by_play_path: str,
    nrows: Optional[int] = None,
    test_size: float = 0.2,
    model_path: str = "final_rf.pkl",
    imputer_path: str = "imputer.pkl",
    scaler_path: str = "scaler.pkl",
    x_test_path: str = "X_test_raw.csv",
    y_test_path: str = "y_test.csv",
) -> Dict[str, float]:
    raw_df = pd.read_csv(play_by_play_path, nrows=nrows)
    model_df = prepare_ft_dataset_from_df(raw_df)

    X = model_df[FEATURE_COLS].copy()
    y = model_df["FT_made"].astype(int).copy()
    game_ids = model_df["game_id"].unique()

    train_games, test_games = train_test_split(
        game_ids, test_size=test_size, shuffle=False
    )
    train_mask = model_df["game_id"].isin(train_games)
    test_mask = model_df["game_id"].isin(test_games)

    X_train_raw = X[train_mask].copy()
    X_test_raw = X[test_mask].copy()
    y_train = y[train_mask].copy()
    y_test = y[test_mask].copy()

    imputer = SimpleImputer(strategy="median")
    X_train_imp = pd.DataFrame(
        imputer.fit_transform(X_train_raw),
        columns=FEATURE_COLS,
        index=X_train_raw.index,
    )
    X_test_imp = pd.DataFrame(
        imputer.transform(X_test_raw),
        columns=FEATURE_COLS,
        index=X_test_raw.index,
    )

    scaler = StandardScaler()
    X_train_scaled = pd.DataFrame(
        scaler.fit_transform(X_train_imp),
        columns=FEATURE_COLS,
        index=X_train_imp.index,
    )
    X_test_scaled = pd.DataFrame(
        scaler.transform(X_test_imp),
        columns=FEATURE_COLS,
        index=X_test_imp.index,
    )

    model = RandomForestClassifier(**BEST_RF_PARAMS)
    model.fit(X_train_scaled, y_train)

    y_proba_test = model.predict_proba(X_test_scaled)[:, 1]
    y_pred_test = (y_proba_test >= 0.5).astype(int)

    acc = accuracy_score(y_test, y_pred_test)
    pr_auc = average_precision_score(y_test, y_proba_test)
    brier = brier_score_loss(y_test, y_proba_test)
    oob = float(model.oob_score_)

    X_test_raw.to_csv(x_test_path, index=False)
    y_test.to_csv(y_test_path, index=False)

    with open(model_path, "wb") as f:
        pickle.dump(model, f)
    with open(imputer_path, "wb") as f:
        pickle.dump(imputer, f)
    with open(scaler_path, "wb") as f:
        pickle.dump(scaler, f)

    return {
        "accuracy": float(acc),
        "pr_auc": float(pr_auc),
        "brier": float(brier),
        "oob_score": oob,
    }


class PredictionExplanation:
    def __init__(
        self,
        prob_make: float,
        predicted_label: int,
        top_factors: List[Dict[str, Any]],
    ):
        self.prob_make = prob_make
        self.predicted_label = predicted_label
        self.top_factors = top_factors


class FTPressurePredictor:
    def __init__(
        self,
        model_path: str = "final_rf.pkl",
        imputer_path: str = "imputer.pkl",
        scaler_path: str = "scaler.pkl",
        feature_cols: Optional[List[str]] = None,
    ):
        self.feature_cols = feature_cols or FEATURE_COLS

        with open(model_path, "rb") as f:
            self.model = pickle.load(f)
        with open(imputer_path, "rb") as f:
            self.imputer = pickle.load(f)
        with open(scaler_path, "rb") as f:
            self.scaler = pickle.load(f)

        if not hasattr(self.model, "feature_importances_"):
            raise ValueError(
                "Loaded model does not expose feature_importances_. "
                "Use a tree-based model such as RandomForest."
            )

        if len(self.model.feature_importances_) != len(self.feature_cols):
            raise ValueError(
                "Feature length mismatch between model and feature_cols."
            )

    def _to_dataframe(
        self, X: Union[Dict[str, float], pd.Series, pd.DataFrame]
    ) -> pd.DataFrame:
        if isinstance(X, dict):
            df = pd.DataFrame([X])
        elif isinstance(X, pd.Series):
            df = X.to_frame().T
        elif isinstance(X, pd.DataFrame):
            df = X.copy()
        else:
            raise TypeError("X must be dict, pandas Series, or pandas DataFrame.")

        missing = [c for c in self.feature_cols if c not in df.columns]
        if missing:
            raise ValueError(f"Missing required feature(s): {missing}")

        df = df[self.feature_cols]
        df = df.apply(pd.to_numeric, errors="coerce")
        return df

    def _prepare_for_model(self, X_raw: pd.DataFrame) -> pd.DataFrame:
        X_imp = pd.DataFrame(
            self.imputer.transform(X_raw),
            columns=self.feature_cols,
            index=X_raw.index,
        )
        X_scaled = pd.DataFrame(
            self.scaler.transform(X_imp),
            columns=self.feature_cols,
            index=X_imp.index,
        )
        return X_scaled

    def predict_proba(
        self, X: Union[Dict[str, float], pd.Series, pd.DataFrame]
    ) -> np.ndarray:
        df_raw = self._to_dataframe(X)
        X_scaled = self._prepare_for_model(df_raw)
        probs = self.model.predict_proba(X_scaled)[:, 1]
        return probs

    def predict(
        self,
        X: Union[Dict[str, float], pd.Series, pd.DataFrame],
        threshold: float = 0.5,
    ) -> np.ndarray:
        probs = self.predict_proba(X)
        return (probs >= threshold).astype(int)

    def predict_single_with_explanation(
        self,
        X: Union[Dict[str, float], pd.Series],
        threshold: float = 0.5,
        top_k: int = 3,
    ) -> PredictionExplanation:
        df_raw = self._to_dataframe(X)
        if len(df_raw) != 1:
            raise ValueError("predict_single_with_explanation expects exactly one row.")

        X_scaled = self._prepare_for_model(df_raw)
        prob_make = float(self.predict_proba(df_raw)[0])
        predicted_label = int(prob_make >= threshold)

        fi = np.array(self.model.feature_importances_, dtype=float)
        fi = np.maximum(fi, 0.0)

        row_vals = X_scaled.iloc[0].values.astype(float)
        contrib_scores = np.abs(row_vals) * fi
        total = contrib_scores.sum()
        if total <= 0:
            contrib_scores = fi
            total = contrib_scores.sum() if contrib_scores.sum() > 0 else 1.0

        contrib_pct = (contrib_scores / total) * 100.0

        contrib_df = (
            pd.DataFrame(
                {
                    "feature": self.feature_cols,
                    "value": row_vals,
                    "contribution_pct": contrib_pct,
                }
            )
            .sort_values("contribution_pct", ascending=False)
            .reset_index(drop=True)
        )

        top = contrib_df.head(top_k)
        top_factors = [
            {
                "feature": r["feature"],
                "value": float(r["value"]),
                "contribution_pct": float(r["contribution_pct"]),
            }
            for _, r in top.iterrows()
        ]

        return PredictionExplanation(
            prob_make=prob_make,
            predicted_label=predicted_label,
            top_factors=top_factors,
        )

    def feature_importances(self) -> pd.DataFrame:
        fi = np.array(self.model.feature_importances_, dtype=float)
        total = fi.sum() if fi.sum() > 0 else 1.0
        pct = (fi / total) * 100.0
        return (
            pd.DataFrame(
                {"feature": self.feature_cols, "importance": fi, "importance_pct": pct}
            )
            .sort_values("importance", ascending=False)
            .reset_index(drop=True)
        )

    def get_oob_score(self) -> Optional[float]:
        return getattr(self.model, "oob_score_", None)


def evaluate_model(
    predictor: FTPressurePredictor,
    X_test: pd.DataFrame,
    y_test: Union[pd.Series, np.ndarray],
    threshold: float = 0.5,
) -> Dict[str, float]:
    if isinstance(y_test, pd.Series):
        y = y_test.values
    else:
        y = y_test

    probs = predictor.predict_proba(X_test)
    preds = (probs >= threshold).astype(int)

    acc = accuracy_score(y, preds)
    pr_auc = average_precision_score(y, probs)
    brier = brier_score_loss(y, probs)
    fpr, tpr, _ = roc_curve(y, probs)
    roc_auc_val = auc(fpr, tpr)
    cm = confusion_matrix(y, preds)

    print("=== Test Set Performance ===")
    print(f"Accuracy : {acc:.4f}")
    print(f"PR-AUC   : {pr_auc:.4f}")
    print(f"ROC-AUC  : {roc_auc_val:.4f}")
    print(f"Brier    : {brier:.4f}")
    print("Confusion matrix [rows=true, cols=pred]:")
    print(cm)

    return {
        "accuracy": float(acc),
        "pr_auc": float(pr_auc),
        "roc_auc": float(roc_auc_val),
        "brier": float(brier),
    }
"""

with open("ft_pressure_final.py", "w") as f:
    f.write(module_code)

import os
print("Files in cwd:", os.listdir())

# ============================
# 2. Import and use the module
# ============================
import importlib
import ft_pressure_final
importlib.reload(ft_pressure_final)

from ft_pressure_final import (
    train_and_save_model,
    FTPressurePredictor,
    evaluate_model,
)

import kagglehub
import pandas as pd

# Download dataset
basketball_path = kagglehub.dataset_download("wyattowalsh/basketball")
csv_path = f"{basketball_path}/csv"
pbp_path = f"{csv_path}/play_by_play.csv"
print("Using:", pbp_path)

# Train and save model
metrics = train_and_save_model(pbp_path, nrows=100000)
print("Train metrics:", metrics)

# Load predictor
predictor = FTPressurePredictor()

# Evaluate on saved test set
X_test = pd.read_csv("X_test_raw.csv")
y_test = pd.read_csv("y_test.csv").iloc[:, 0]
eval_metrics = evaluate_model(predictor, X_test, y_test)
print("Eval metrics:", eval_metrics)

# Single-shot example with explanation
example_shot = {
    "season_FT_pct": 0.80,
    "overall_ft_pct": 0.78,
    "clutch_ft_pct": 0.85,
    "clutch_factor": 0.07,
    "career_attempts_so_far": 100,
    "period": 4,
    "seconds_remaining": 45,
    "is_clutch": 1,
    "close_game": 1,
    "late_game": 1,
    "pressure_score": 3,
    "point_differential": 2,
}
exp = predictor.predict_single_with_explanation(example_shot)
print("P(make):", exp.prob_make)
print("Label:", exp.predicted_label)
print("Top factors:", exp.top_factors)


Files in cwd: ['.config', 'final_rf.pkl', 'imputer.pkl', '__pycache__', 'scaler.pkl', 'ft_pressure_final.py', 'y_test.csv', 'X_test_raw.csv', 'sample_data']
Using Colab cache for faster access to the 'basketball' dataset.
Using: /kaggle/input/basketball/csv/play_by_play.csv
Train metrics: {'accuracy': 0.7559306569343066, 'pr_auc': 0.8320305547620233, 'brier': 0.17782058094015893, 'oob_score': 0.735840756625976}
=== Test Set Performance ===
Accuracy : 0.7559
PR-AUC   : 0.8320
ROC-AUC  : 0.6375
Brier    : 0.1778
Confusion matrix [rows=true, cols=pred]:
[[  31  515]
 [  20 1626]]
Eval metrics: {'accuracy': 0.7559306569343066, 'pr_auc': 0.8320305547620233, 'roc_auc': 0.6374816961086707, 'brier': 0.17782058094015893}
P(make): 0.8042499617828059
Label: 1
Top factors: [{'feature': 'season_FT_pct', 'value': 0.5829577750926249, 'contribution_pct': 32.51531387481439}, {'feature': 'overall_ft_pct', 'value': 0.3889053128440795, 'contribution_pct': 23.053519200567806}, {'feature': 'clutch_ft_pct', 

In [27]:
from ft_pressure_final import FTPressurePredictor

predictor = FTPressurePredictor()

# Single shot
shot = {
    "season_FT_pct": 0.80,
    "overall_ft_pct": 0.78,
    "clutch_ft_pct": 0.85,
    "clutch_factor": 0.07,
    "career_attempts_so_far": 100,
    "period": 4,
    "seconds_remaining": 45,
    "is_clutch": 1,
    "close_game": 1,
    "late_game": 1,
    "pressure_score": 3,
    "point_differential": 2,
}
explanation = predictor.predict_single_with_explanation(shot)
print(explanation.prob_make, explanation.predicted_label, explanation.top_factors)

0.8042499617828059 1 [{'feature': 'season_FT_pct', 'value': 0.5829577750926249, 'contribution_pct': 32.51531387481439}, {'feature': 'overall_ft_pct', 'value': 0.3889053128440795, 'contribution_pct': 23.053519200567806}, {'feature': 'clutch_ft_pct', 'value': 0.6687318528550419, 'contribution_pct': 10.618390741707678}]
