<a href="https://colab.research.google.com/github/jenny005/Sports_Research/blob/main/Baseball_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
"""
End-to-end, self-contained baseball analytics pipeline.
The module can be run as a script to demonstrate a full workflow:
    1. Pull raw data from multiple modern sources (Statcast, wearable sensors, TrackMan).
    2. Clean and enrich the data.
    3. Train / update predictive models (player evaluation & in-game strategy).
    4. Surface actionable recommendations via a lightweight REST service that any
       department (scouting, player-dev, coaching staff) can consume.

Usage (from project root):
    $ python baseball_model.py --env dev --update-models --serve
"""

from __future__ import annotations

import argparse
import datetime as dt
import json
import os
import warnings
from pathlib import Path
from typing import Any, Dict, List, Tuple

import joblib
import numpy as np
import pandas as pd
import requests
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import TimeSeriesSplit, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler

warnings.filterwarnings("ignore")

# -----------------------------------------------------------------------------
# 1. Configuration
# -----------------------------------------------------------------------------
ENV = os.getenv("ENV", "dev")
ROOT = Path(__file__).resolve().parent
DATA_DIR = ROOT / "data"
MODEL_DIR = ROOT / "models"
MODEL_DIR.mkdir(exist_ok=True)

STATCAST_URL = "https://baseballsavant.mlb.com/api/v1/people?season={year}"
WEARABLE_ENDPOINT = "https://internals.myorg.com/api/wearable"  # mock
TRACKMAN_ENDPOINT = "https://internals.myorg.com/api/trackman"  # mock

# -----------------------------------------------------------------------------
# 2. Data Acquisition
# -----------------------------------------------------------------------------
def pull_statcast(year: int) -> pd.DataFrame:
    """Pull public Statcast data for a given season."""
    url = STATCAST_URL.format(year=year)
    resp = requests.get(url, timeout=30)
    resp.raise_for_status()
    raw = resp.json()
    df = pd.json_normalize(raw["people"])
    return df.rename(columns=str.lower)


def pull_wearable(player_ids: List[int]) -> pd.DataFrame:
    """Pull wearable sensor data (mock)."""
    params = {"player_ids": ",".join(map(str, player_ids))}
    resp = requests.get(WEARABLE_ENDPOINT, params=params, timeout=30)
    resp.raise_for_status()
    return pd.DataFrame(resp.json()["data"])


def pull_trackman(player_ids: List[int]) -> pd.DataFrame:
    """Pull TrackMan pitch-level data (mock)."""
    params = {"player_ids": ",".join(map(str, player_ids))}
    resp = requests.get(TRACKMAN_ENDPOINT, params=params, timeout=30)
    resp.raise_for_status()
    return pd.DataFrame(resp.json()["data"])


# -----------------------------------------------------------------------------
# 3. Data Cleaning & Enrichment
# -----------------------------------------------------------------------------
def clean_statcast(df: pd.DataFrame) -> pd.DataFrame:
    """Basic cleaning for Statcast."""
    # Drop rows with missing key metrics
    df = df.dropna(subset=["war", "ops", "era"])
    # Ensure numeric
    num_cols = ["war", "ops", "era", "avg_exit_velocity", "max_exit_velocity"]
    for c in num_cols:
        df[c] = pd.to_numeric(df[c], errors="coerce")
    # Fill remaining NaNs
    df = df.fillna(df.median(numeric_only=True))
    return df


def merge_sources(
    statcast: pd.DataFrame,
    wearable: pd.DataFrame,
    trackman: pd.DataFrame,
) -> pd.DataFrame:
    """Merge multi-source data on player_id."""
    df = statcast.merge(
        wearable, on="player_id", how="left", suffixes=("", "_wear")
    ).merge(trackman, on="player_id", how="left", suffixes=("", "_track"))
    return df


# -----------------------------------------------------------------------------
# 4. Feature Engineering
# -----------------------------------------------------------------------------
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
    """Create composite metrics."""
    df["hard_hit_pct"] = df["hard_hit"] / df["batted_events"]
    df["spin_efficiency"] = df["spin_rate"] / df["max_spin"]
    df["fatigue_index"] = df["high_intensity_minutes"] / df["total_minutes"]
    return df


# -----------------------------------------------------------------------------
# 5. Modeling
# -----------------------------------------------------------------------------
TARGETS = {
    "war": "war",
    "era": "era",
    "ops": "ops",
}

FEATURES = [
    "avg_exit_velocity",
    "max_exit_velocity",
    "launch_angle",
    "spin_rate",
    "hard_hit_pct",
    "spin_efficiency",
    "fatigue_index",
    "position",
]

CATEGORICAL = ["position"]
NUMERICAL = [f for f in FEATURES if f not in CATEGORICAL]


def build_pipeline() -> Pipeline:
    """Build sklearn pipeline for tabular data."""
    pre = ColumnTransformer(
        transformers=[
            ("cat", OneHotEncoder(handle_unknown="ignore"), CATEGORICAL),
            ("num", StandardScaler(), NUMERICAL),
        ]
    )
    model = GradientBoostingRegressor(
        n_estimators=400,
        learning_rate=0.05,
        max_depth=4,
        random_state=42,
    )
    return Pipeline(steps=[("prep", pre), ("model", model)])


def train_models(
    df: pd.DataFrame,
    save: bool = True,
) -> Dict[str, Pipeline]:
    """Train and evaluate models for each target."""
    results = {}
    for name, y_col in TARGETS.items():
        X = df[FEATURES]
        y = df[y_col]
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        pipe = build_pipeline()
        pipe.fit(X_train, y_train)
        preds = pipe.predict(X_test)
        mae = mean_absolute_error(y_test, preds)
        print(f"{name} MAE: {mae:.3f}")
        results[name] = pipe
        if save:
            joblib.dump(pipe, MODEL_DIR / f"{name}_model.joblib")
    return results


# -----------------------------------------------------------------------------
# 6. Recommendation Engine
# -----------------------------------------------------------------------------
class RecommendationRequest(BaseModel):
    player_id: int
    role: str  # evaluation | development | strategy


class Recommendation(BaseModel):
    player_id: int
    role: str
    metric: str
    value: float
    recommendation: str
    confidence: float


def generate_recommendation(
    player_id: int,
    role: str,
    df: pd.DataFrame,
    models: Dict[str, Pipeline],
) -> Recommendation:
    """Create actionable recommendation for a single player."""
    row = df[df["player_id"] == player_id]
    if row.empty:
        raise ValueError("Player not found")
    row = row.iloc[0]

    # Choose metric based on role
    if role == "evaluation":
        target = "war"
    elif role == "development":
        target = "ops"
    else:  # strategy
        target = "era"

    model = models[target]
    X = row[FEATURES].to_frame().T
    pred = model.predict(X)[0]
    conf = 0.85  # placeholder; could use prediction intervals

    # Simple rule-based recommendation
    if target == "war" and pred < 2:
        rec = "Consider minor-league assignment for further development."
    elif target == "ops" and pred < 0.700:
        rec = "Increase focus on exit velocity training."
    elif target == "era" and pred > 4.5:
        rec = "Leverage high-spin fastball more often up in the zone."
    else:
        rec = "Status quo; monitor progress."

    return Recommendation(
        player_id=player_id,
        role=role,
        metric=target,
        value=pred,
        recommendation=rec,
        confidence=conf,
    )


# -----------------------------------------------------------------------------
# 7. REST Service
# -----------------------------------------------------------------------------
app = FastAPI(title="Baseball Analytics API", version="1.0")
models: Dict[str, Pipeline] = {}


@app.on_event("startup")
def load_models():
    global models
    for target in TARGETS:
        path = MODEL_DIR / f"{target}_model.joblib"
        if path.exists():
            models[target] = joblib.load(path)


@app.post("/recommend", response_model=Recommendation)
def get_recommendation(req: RecommendationRequest):
    try:
        df = joblib.load(DATA_DIR / "current_data.joblib")
        rec = generate_recommendation(req.player_id, req.role, df, models)
        return rec
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))


# -----------------------------------------------------------------------------
# 8. Script entrypoint
# -----------------------------------------------------------------------------
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--year", type=int, default=dt.datetime.now().year - 1)
    parser.add_argument("--update-models", action="store_true")
    parser.add_argument("--serve", action="store_true")
    args = parser.parse_args()

    # Pull data
    print("Pulling data...")
    statcast = pull_statcast(args.year)
    statcast = clean_statcast(statcast)
    player_ids = statcast["player_id"].unique().tolist()
    wearable = pull_wearable(player_ids)
    trackman = pull_trackman(player_ids)

    # Merge & engineer
    df = merge_sources(statcast, wearable, trackman)
    df = engineer_features(df)
    joblib.dump(df, DATA_DIR / "current_data.joblib")

    # Train
    if args.update_models:
        print("Training models...")
        train_models(df)

    # Serve
    if args.serve:
        import uvicorn

        uvicorn.run("baseball_model:app", host="0.0.0.0", port=8000, reload=False)


if __name__ == "__main__":
    main()