In [None]:
# Predicting Pitcher ERA Using PyBaseball


# Setup & Imports

import pandas as pd
import numpy as np
from pybaseball import statcast
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression, Lasso, ElasticNet, OrthogonalMatchingPursuit
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.feature_selection import mutual_info_regression
from sklearn.inspection import permutation_importance
from sklearn.feature_selection import RFE
from sklearn.model_selection import cross_val_score
from sklearn.feature_selection import SequentialFeatureSelector
from sklearn.ensemble import RandomForestRegressor
import shap
import xgboost as xgb
from catboost import CatBoostRegressor
import warnings
warnings.filterwarnings("ignore")



# Download Pitch-Level Statcast Data for One Season

# Example season
season_start = "2024-03-28"
season_end   = "2024-10-01"

df_core = statcast(start_dt=season_start, end_dt=season_end)
df_core.head()


  from .autonotebook import tqdm as notebook_tqdm


This is a large query, it may take a moment to complete


100%|██████████| 188/188 [03:06<00:00,  1.01it/s]


Unnamed: 0,pitch_type,game_date,release_speed,release_pos_x,release_pos_z,player_name,batter,pitcher,events,description,...,batter_days_until_next_game,api_break_z_with_gravity,api_break_x_arm,api_break_x_batter_in,arm_angle,attack_angle,attack_direction,swing_path_tilt,intercept_ball_minus_batter_pos_x_inches,intercept_ball_minus_batter_pos_y_inches
598,CH,2024-10-01,88.1,-1.65,6.12,"Brieske, Beau",518792,689225,field_out,hit_into_play,...,1,2.5,1.4,-1.4,46.4,21.481595,-17.851272,24.228579,43.236189,31.25396
620,CH,2024-10-01,87.1,-1.69,6.17,"Brieske, Beau",518792,689225,,swinging_strike,...,1,2.45,1.19,-1.19,44.1,14.52903,-35.809638,19.463062,61.68396,44.906197
648,CH,2024-10-01,89.7,-1.89,6.14,"Brieske, Beau",518792,689225,,ball,...,1,2.44,1.33,-1.33,47.4,,,,,
652,FF,2024-10-01,97.5,-1.51,6.32,"Brieske, Beau",518792,689225,,foul,...,1,0.82,0.4,-0.4,55.0,7.804441,0.738715,22.363442,50.109329,18.509113
692,CH,2024-10-01,88.6,-1.77,6.19,"Brieske, Beau",518792,689225,,blocked_ball,...,1,2.32,1.3,-1.3,47.0,,,,,


In [9]:
df_core.head()

Unnamed: 0,pitch_type,game_date,release_speed,release_pos_x,release_pos_z,player_name,batter,pitcher,events,description,...,api_break_x_arm,api_break_x_batter_in,arm_angle,attack_angle,attack_direction,swing_path_tilt,intercept_ball_minus_batter_pos_x_inches,intercept_ball_minus_batter_pos_y_inches,swing,miss
598,CH,2024-10-01,88.1,-1.65,6.12,"Brieske, Beau",518792,689225,field_out,hit_into_play,...,1.4,-1.4,46.4,21.481595,-17.851272,24.228579,43.236189,31.25396,1,0
620,CH,2024-10-01,87.1,-1.69,6.17,"Brieske, Beau",518792,689225,,swinging_strike,...,1.19,-1.19,44.1,14.52903,-35.809638,19.463062,61.68396,44.906197,1,1
648,CH,2024-10-01,89.7,-1.89,6.14,"Brieske, Beau",518792,689225,,ball,...,1.33,-1.33,47.4,,,,,,0,0
652,FF,2024-10-01,97.5,-1.51,6.32,"Brieske, Beau",518792,689225,,foul,...,0.4,-0.4,55.0,7.804441,0.738715,22.363442,50.109329,18.509113,1,0
692,CH,2024-10-01,88.6,-1.77,6.19,"Brieske, Beau",518792,689225,,blocked_ball,...,1.3,-1.3,47.0,,,,,,0,0


In [None]:

# Compute Game‑Level ERA from Statcast Only


# Filter to only balls in play or plate appearances
pa = df_core[df_core["events"].notna()].copy()

# Earned run events from Statcast
run_events = [
    "home_run", "single", "double", "triple",
    "field_out", "force_out", "grounded_into_double_play",
    "sac_fly", "walk", "hit_by_pitch"
]


Unnamed: 0,pitcher,game_pk,earned_runs,outs,ip,game_era
0,434378,744872,0,17,5.666667,0.0
1,434378,745250,0,21,7.0,0.0
2,434378,745540,0,14,4.666667,0.0
3,434378,745663,0,18,6.0,0.0
4,434378,745752,0,15,5.0,0.0


In [None]:
# simplifies event outcomes into broader categories

def map_event(outcome):
    if outcome in [
        "strikeout", "field_out", "force_out",
        "fielders_choice", "fielders_choice_out"]:
        return "OUT"

    if outcome in ["double_play", "triple_play", "sac_fly_double_play", "grounded_into_double_play", "strikeout_double_play"]:
        return "2OUT"

    if outcome in ["single", "field_error"]:
        return "SINGLE"

    if outcome in ["double"]:
        return "DOUBLE"
    
    if outcome in ["triple"]:
        return "TRIPLE"

    if outcome == "home_run":
        return "HR"

    if outcome in ["walk", "intent_walk", "hit_by_pitch", "catcher_interf"]:
        return "BB"

    if outcome in ["sac_bunt", "sac_fly"]:
        return "SAC"

    # Anything else → treat as OUT
    return "OUT"

pa["events"] = pa["events"].apply(map_event)

In [None]:
# updates base state, runs scored, and outs after a plate appearance

def compute_runs_scored(prev_base_state, event):
    """
    Approximate runs scored from a plate appearance.
    
    Args:
        prev_base_state (int): 3-bit base state before the PA (0-7)
        event (str or float): Statcast event type (after outcome grouping); may be NaN
    Returns:
        tuple: new_base_state (int 0,..,7), runs_scored (int 0,..,4), new_resulting_outs (0,1,2) (triple plays not considered)
    """
    # Treat NaN events as no advancement / no runs
    if not isinstance(event, str):
        return (prev_base_state,0,0)

    prev_base_state=int(prev_base_state)

    # Home run: batter + all runners score
    if event == "HR":
        return (0,1 + prev_base_state.bit_count(),0)
    

    if event in ["OUT", "SAC"]:
        return (prev_base_state,0,1)
    elif event=="2OUT":

        dp_dict={
            0:0,
            1:0,
            2:0,
            3:1,
            4:0,
            5:1,
            6:2,
            7:3
        }

        return (dp_dict[prev_base_state],0,2)

    walk_dict={
        0:1,
        1:3,
        2:3,
        3:7,
        4:5,
        5:7,
        6:7,
        7:7
    }

    if event=="BB": #batter takes their base
        return ((7,1,0) if prev_base_state==7 else (walk_dict[prev_base_state],0,0)) #batter takes their base on bases loaded

    
    if event=="SAC":
        return ((2*prev_base_state)%8, (2*prev_base_state)//8,1)

    advance_dict={
        "SINGLE":1,
        "DOUBLE":2,
        "TRIPLE":3
    }
    advance=advance_dict.get(event, 0)

    
    advanced_state=prev_base_state*(2**advance)+2**(advance-1)
    return (advanced_state%8,(advanced_state//8).bit_count(),0)




In [None]:

# Count runners scoring only when credited to pitcher.
# Statcast has 'runs_scored' column in some years, if not we assume runs_scored = 0.


pa["base_state"] = (
    pa["on_1b"].notna().astype(int)
    + 2 * pa["on_2b"].notna().astype(int)
    + 4 * pa["on_3b"].notna().astype(int)
)

pa["run_scored_per_pa"]= pa.apply(
    lambda row: compute_runs_scored(
        row["base_state"],
        row["events"]
    )[1],
    axis=1
)

#print(pa["run_scored_per_pa"].value_counts())

#sum up runs per inning per pitcher
pa["run_scored"] = (
    pa.groupby(["game_pk", "inning", "pitcher"])["run_scored_per_pa"]
      .transform("sum")
)
# Aggregate per pitcher per game
pa["earned_runs"]=pa.groupby(["pitcher", "game_pk"], as_index=False)["run_scored_per_pa"].transform("sum")

#print(pa["earned_runs"].value_counts())

# Compute innings pitched per pitcher per game
# Outs recorded per PA from Statcast
pa["outs"] = np.where(pa["events"].isin(["OUT", "2OUT", "SAC"]), 1, 0)

innings_per_game = (
    pa.groupby(["pitcher", "game_pk"], as_index=False)["outs"]
      .sum()
)
innings_per_game["ip"] = innings_per_game["outs"] / 3

# Combine innings and earned runs
pitcher_game_stats = pa.merge(innings_per_game, on=["pitcher", "game_pk"], how="left")

# Compute ERA from Statcast directly
def compute_era(row):
    if row["ip"] == 0:
        return np.nan
    return (9 * row["earned_runs"]) / row["ip"]

pitcher_game_stats["game_era"] = pitcher_game_stats.apply(compute_era, axis=1)
pitcher_game_stats.head()




earned_runs
0     81475
1     40438
2     27366
3     16114
4      9384
5      4484
6      2359
7       852
8       314
10       53
9        26
Name: count, dtype: int64


Unnamed: 0,pitch_type,game_date,release_speed,release_pos_x,release_pos_z,player_name,batter,pitcher,events,description,...,miss,runs_scored,outs_x,base_state,run_scored_per_pa,run_scored,earned_runs,outs_y,ip,game_era
0,CH,2024-10-01,88.1,-1.65,6.12,"Brieske, Beau",518792,689225,OUT,hit_into_play,...,0,0,1,7,0,0,0,2,0.666667,0.0
1,SI,2024-10-01,99.0,-1.31,6.23,"Brieske, Beau",676801,689225,BB,blocked_ball,...,0,0,0,6,0,0,0,2,0.666667,0.0
2,CH,2024-10-01,91.1,-1.91,5.97,"Brieske, Beau",605170,689225,OUT,hit_into_play,...,0,0,1,6,0,0,0,2,0.666667,0.0
3,SI,2024-10-01,97.9,-2.02,5.75,"Foley, Jason",665161,671345,SAC,hit_into_play,...,0,0,1,3,0,1,1,1,0.333333,27.0
4,SI,2024-10-01,98.3,-1.66,5.86,"Foley, Jason",673237,671345,SINGLE,hit_into_play,...,0,0,0,5,1,1,1,1,0.333333,27.0


In [None]:

# Compute Game‑Level Pitch Averages (Features)

feature_cols = [
    "release_speed", "release_pos_z", "pfx_z", "pfx_x", "release_spin_rate"
]

# Whiff rate: missed swings / swings
df_core["swing"] = df_core["description"].isin(["swinging_strike", "foul", "hit_into_play"]).astype(int)
df_core["miss"] = df_core["description"].isin(["swinging_strike"]).astype(int)

# Umpire zone: using `plate_x`, `plate_z` within rulebook bounds
df_core = df_core.dropna(subset=["plate_x", "plate_z"]).copy()

df_core["in_zone"] = (
    (df_core["plate_x"].between(-0.83, 0.83)) &
    (df_core["plate_z"].between(1.5, 3.5))
).astype(int)

agg_features = (
    df_core.groupby(["pitcher", "game_pk"], as_index=False)
       .agg({
            "release_speed": "mean",
            "release_pos_z": "mean",
            "pfx_z": "mean",
            "pfx_x": "mean",
            "release_spin_rate": "mean",
            "miss": "sum",
            "swing": "sum",
            "in_zone": "mean"
        })
)

# Compute whiff rate
agg_features["whiff_rate"] = agg_features.apply(
    lambda row: row["miss"] / row["swing"] if row["swing"] > 0 else 0, axis=1
)

agg_features.rename(columns={
    "release_speed": "avg_velocity",
    "release_spin_rate": "avg_spin_rate",
    "release_pos_z": "avg_release_height",
    "pfx_z": "avg_vertical_break",
    "pfx_x": "avg_horizontal_break",
    "in_zone": "avg_umpire_zone_rate"
}, inplace=True)

# Final feature list
agg_features = agg_features[[
    "pitcher", "game_pk",
    "avg_velocity", "avg_spin_rate", "avg_release_height",
    "avg_vertical_break", "avg_horizontal_break",
    "whiff_rate", "avg_umpire_zone_rate"
]]

agg_features.head()



Unnamed: 0,pitcher,game_pk,avg_velocity,avg_spin_rate,avg_release_height,avg_vertical_break,avg_horizontal_break,whiff_rate,avg_umpire_zone_rate
0,434378,744872,87.25641,2393.128205,7.054744,0.666795,-0.26141,0.1,0.448718
1,434378,745250,87.233673,2374.346939,7.147143,0.802857,-0.349796,0.313725,0.5
2,434378,745540,87.096667,2349.511111,7.184333,0.637333,-0.233222,0.2,0.411111
3,434378,745663,86.853535,2380.959596,7.122525,0.714242,-0.334646,0.240741,0.585859
4,434378,745752,88.739175,2446.484536,6.971959,0.621856,0.012784,0.078431,0.525773


In [None]:


# Merge ERA with Gameplay Features

model_data = agg_features.merge(
    pitcher_game_stats[["pitcher", "game_pk", "game_era"]],
    on=["pitcher", "game_pk"],
    how="inner"
)

model_data.dropna(subset=["game_era"], inplace=True)
model_data.head()



Unnamed: 0,pitcher,game_pk,avg_velocity,avg_spin_rate,avg_release_height,avg_vertical_break,avg_horizontal_break,whiff_rate,avg_umpire_zone_rate,game_era
0,434378,744872,87.25641,2393.128205,7.054744,0.666795,-0.26141,0.1,0.448718,1.588235
1,434378,744872,87.25641,2393.128205,7.054744,0.666795,-0.26141,0.1,0.448718,1.588235
2,434378,744872,87.25641,2393.128205,7.054744,0.666795,-0.26141,0.1,0.448718,1.588235
3,434378,744872,87.25641,2393.128205,7.054744,0.666795,-0.26141,0.1,0.448718,1.588235
4,434378,744872,87.25641,2393.128205,7.054744,0.666795,-0.26141,0.1,0.448718,1.588235


In [None]:


# Train/Test Split
model_data = model_data.dropna(subset=[
"avg_velocity", "avg_spin_rate", "avg_release_height",
"avg_vertical_break", "avg_horizontal_break",
"whiff_rate", "avg_umpire_zone_rate", "game_era"
])


X = model_data.drop(columns=["game_era", "pitcher", "game_pk"])
y = model_data["game_era"]

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y, test_size=0.3, random_state=42
)



In [None]:

#  Feature Selection
##  Random Forest Importance

rf = RandomForestRegressor(n_estimators=200, random_state=42)
rf.fit(X_train, y_train)

rf_importances = pd.Series(rf.feature_importances_, index=X.columns).sort_values(ascending=False)
rf_importances


## Permutation Importance

perm = permutation_importance(rf, X_test, y_test, random_state=42)
perm_importances = pd.Series(perm.importances_mean, index=X.columns).sort_values(ascending=False)
perm_importances


##  LASSO

lasso = Lasso(alpha=0.01).fit(X_train, y_train)
pd.Series(lasso.coef_, index=X.columns).sort_values(ascending=False)


##  L0 Approximation (OMP)

omp = OrthogonalMatchingPursuit().fit(X_train, y_train)
pd.Series(omp.coef_, index=X.columns).sort_values(ascending=False)


## RFE

rfe = RFE(estimator=LinearRegression(), n_features_to_select=5)
rfe.fit(X_train, y_train)
pd.Series(rfe.support_, index=X.columns)


## SHAP

explainer = shap.Explainer(rf, X_train)
shap_values = explainer(X_train)

shap.summary_plot(shap_values, X_train, feature_names=X.columns)


## Mutual Information

mi = mutual_info_regression(X_train, y_train)
mi_scores = pd.Series(mi, index=X.columns).sort_values(ascending=False)
mi_scores


## Forward Sequential Selection

sfs_forward = SequentialFeatureSelector(
    LinearRegression(), direction='forward', n_features_to_select=5
)
sfs_forward.fit(X_train, y_train)
pd.Series(sfs_forward.get_support(), index=X.columns)



  0%|                   | 376/127162 [01:02<348:26]       

In [None]:


# Model Comparisons
## Linear Regression

lr = LinearRegression().fit(X_train, y_train)

preds_lr = lr.predict(X_test)
print("LR RMSE:", mean_squared_error(y_test, preds_lr, squared=False))
print("LR R2:", r2_score(y_test, preds_lr))


## Random Forest

preds_rf = rf.predict(X_test)
print("RF RMSE:", mean_squared_error(y_test, preds_rf, squared=False))
print("RF R2:", r2_score(y_test, preds_rf))


## XGBoost

xgb_model = xgb.XGBRegressor(
    n_estimators=400, learning_rate=0.05, max_depth=6, subsample=0.8, colsample_bytree=0.8
)
xgb_model.fit(X_train, y_train)
preds_xgb = xgb_model.predict(X_test)
print("XGB RMSE:", mean_squared_error(y_test, preds_xgb, squared=False))
print("XGB R2:", r2_score(y_test, preds_xgb))


## CatBoost

cat = CatBoostRegressor(verbose=False)
cat.fit(X_train, y_train)

preds_cat = cat.predict(X_test)
print("Cat RMSE:", mean_squared_error(y_test, preds_cat, squared=False))
print("Cat R2:", r2_score(y_test, preds_cat))


