<a href="https://colab.research.google.com/github/tong1123-bit/cpbl-pitcher-evaluation/blob/main/notebooks/01_baseball_pitcher_evaluation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import pandas as pd
import numpy as np

# ===============================
# Step 1. Load data
# ===============================
event = pd.read_csv("/content/drive/MyDrive/CPBL_csv_tables_UTF8_BOM/event.csv")
pbox  = pd.read_csv("/content/drive/MyDrive/CPBL_csv_tables_UTF8_BOM/pitcherBox.csv")

# Convert numeric columns
for col in ["coordX", "coordY", "velocity"]:
    if col in event.columns:
        event[col] = pd.to_numeric(event[col], errors="coerce")

# Keep only pitch types you care about
valid_pitchtypes = ["FF","SI","FC","KN","SL","CU","CH","FO","FS","EP"]
event = event[event["pitchType"].isin(valid_pitchtypes)].copy()

# ===============================
# Step 2. Define starter / reliever
# ===============================
# order == 1 → Starter; >1 → Reliever (min order across appearances)
role_map = pbox.groupby("playerName")["order"].min().apply(
    lambda x: "Starter" if x == 1 else "Reliever"
)

# ===============================
# Step 3. Classify pitch results
# ===============================
def classify_pitch(code: str) -> str:
    """
    Map pitchCode into a compact result category used for metrics.
    """
    if code in ["SW", "TRY_BUNT"]:
        return "whiff"                # swinging strike
    elif code in ["S", "FT", "FOUL_BUNT"]:
        return "called_strike"        # called strike / foul tip caught / bunt foul
    elif code == "F":
        return "foul"                 # foul ball
    elif code == "H":
        return "in_play"              # ball put into play
    elif code == "B":
        return "ball"                 # ball
    elif code == "BUNT":
        return "bunt"                 # bunt in play (or special)
    else:
        return "other"

event["pitch_result"] = event["pitchCode"].apply(classify_pitch)

# ===============================
# Step 4. Aggregate per pitcher + pitchType
# ===============================
def agg_metrics(df: pd.DataFrame) -> pd.Series:
    """
    Compute pitch-type level metrics for a single pitcher and pitch type.
    NOTE: Strike% here means Any-Strike%:
      called_strike + whiff + foul + in_play, divided by total pitches.
    """
    total  = len(df)
    whiff  = (df.pitch_result == "whiff").sum()
    called = (df.pitch_result == "called_strike").sum()
    foul   = (df.pitch_result == "foul").sum()
    inplay = (df.pitch_result == "in_play").sum()
    ball   = (df.pitch_result == "ball").sum()

    swings = whiff + foul + inplay

    # Whiff%: whiff per swing
    whiff_pct = whiff / swings if swings > 0 else np.nan

    # Strike% (Any-Strike%): includes called strike + whiff + foul + in-play
    strike_pct = (called + whiff + foul + inplay) / total if total > 0 else np.nan

    # Ball%: balls per pitch
    ball_pct = ball / total if total > 0 else np.nan

    # InPlay%: in-play per pitch (used as contact risk proxy)
    inplay_pct = inplay / total if total > 0 else np.nan

    # -------------------------------------------------------
    # Command (CURRENT): negative spread of pitch locations
    # smaller dispersion => closer to 0 => "better"
    # -------------------------------------------------------
        # -------------------------------------------------------
    # Command (NEW): continuous "avoid-middle" command
    # Higher is better:
    # - reward being closer to strike-zone edge (within zone)
    # - penalize being far outside zone (to avoid "just miss" inflation)
    # Using elliptical normalized distance:
    #   r = sqrt((x/50)^2 + (y/60)^2)
    #   r=0 center, r=1 zone boundary, r>1 outside zone
    # Command = mean(r | r<=1) - lambda * mean(r-1 | r>1)
    # -------------------------------------------------------
    X_HALF = 50.0
    Y_HALF = 60.0
    LAMBDA_OUT = 0.6

    xy = df[["coordX", "coordY"]].dropna()
    if len(xy) >= 5:  # 太少點會不穩，給個基本門檻
        r = np.sqrt((xy["coordX"] / X_HALF) ** 2 + (xy["coordY"] / Y_HALF) ** 2)

        in_zone = r[r <= 1.0]
        out_zone = r[r > 1.0]

        in_term = in_zone.mean() if len(in_zone) > 0 else 0.0
        out_term = (out_zone - 1.0).mean() if len(out_zone) > 0 else 0.0

        command = in_term - LAMBDA_OUT * out_term
    else:
        command = np.nan


    # TODO (later): Replace command with "avoid-middle" metric if desired

    velo = df["velocity"].mean()

    return pd.Series({
        "total_pitches": total,
        "Whiff%": whiff_pct,
        "Strike%": strike_pct,
        "Ball%": ball_pct,
        "InPlay%": inplay_pct,
        "Command": command,
        "Velo": velo
    })

by_pp = (
    event.groupby(["pitcherName", "pitchType"])
         .apply(agg_metrics)
         .reset_index()
)
by_pp["Role"] = by_pp["pitcherName"].map(role_map)

# ===============================
# Step 5. Compute usage per pitcher
# ===============================
pitch_counts = event.groupby("pitcherName").size().rename("p_all").reset_index()
by_pp = by_pp.merge(pitch_counts, on="pitcherName", how="left")
by_pp["Usage%"] = by_pp["total_pitches"] / by_pp["p_all"]

# ===============================
# Step 6. Scoring (per pitch type)
# ===============================
# Contact: lower InPlay% is better => use negative
by_pp["Contact"] = -by_pp["InPlay%"]

weights = {"Whiff%":0.30, "Command":0.25, "Strike%":0.20, "Contact":0.15, "Velo":0.10}
k = 200        # shrink strength
min_n = 100    # min pitches for ranking

def score_one_pitchtype(df_pt: pd.DataFrame) -> pd.DataFrame:
    tmp = df_pt.copy()

    comp_cols = ["Whiff%", "Command", "Strike%", "Contact", "Velo"]

    # z-score within this pitchType across pitchers
    for c in comp_cols:
        vec = tmp[c]
        if vec.notna().sum() >= 2 and vec.std(skipna=True) > 0:
            tmp[c+"_z_raw"] = (vec - vec.mean(skipna=True)) / vec.std(skipna=True)
        else:
            tmp[c+"_z_raw"] = 0.0

        # shrink by sample size (more pitches => less shrink)
        n = tmp["total_pitches"].clip(lower=1)
        tmp[c+"_z_shrunk"] = (n/(n+k)) * tmp[c+"_z_raw"]
        tmp[c+"_z"] = tmp[c+"_z_shrunk"].clip(-3, 3)

    # weighted pitch-type score (in z space)
    tmp["Pitch_z"] = (
        weights["Whiff%"]  * tmp["Whiff%_z"] +
        weights["Command"] * tmp["Command_z"] +
        weights["Strike%"] * tmp["Strike%_z"] +
        weights["Contact"] * tmp["Contact_z"] +
        weights["Velo"]    * tmp["Velo_z"]
    )

    # standardize Pitch_z within this pitchType to convert to 20-80
    if tmp["Pitch_z"].std(skipna=True) and tmp["Pitch_z"].std(skipna=True) > 0:
        tmp["Pitch_z_std"] = (tmp["Pitch_z"] - tmp["Pitch_z"].mean(skipna=True)) / tmp["Pitch_z"].std(skipna=True)
    else:
        tmp["Pitch_z_std"] = 0.0

    tmp["PitchScore_20_80"] = (50 + 10*tmp["Pitch_z_std"]).clip(20, 80)
    tmp["RankEligible"] = tmp["total_pitches"] >= min_n

    return tmp

scored = by_pp.groupby("pitchType", group_keys=False).apply(score_one_pitchtype)

# ===============================
# Step 7. Export Top20 per pitchType & role
# ===============================
for pt in valid_pitchtypes:
    sub = scored[(scored["pitchType"]==pt) & (scored["RankEligible"])]
    if sub.empty:
        continue

    top_st = (sub[sub["Role"]=="Starter"]
              .sort_values("PitchScore_20_80", ascending=False)
              .head(20))
    top_rl = (sub[sub["Role"]=="Reliever"]
              .sort_values("PitchScore_20_80", ascending=False)
              .head(20))

    top_st.to_csv(f"top20_{pt}_starters.csv", index=False, encoding="utf-8-sig")
    top_rl.to_csv(f"top20_{pt}_relievers.csv", index=False, encoding="utf-8-sig")

scored.to_csv("pitchtype_scored_all.csv", index=False, encoding="utf-8-sig")

# ===============================
# Step 8. Weighted overall score per pitcher (usage-weighted)
# ===============================
tmp_overall = scored.copy()
tmp_overall["Weighted_z"] = tmp_overall["Pitch_z_std"] * tmp_overall["Usage%"]

overall = (
    tmp_overall.groupby("pitcherName")
               .agg(
                   WeightedScore_z=("Weighted_z","sum"),
                   TotalPitches=("p_all","first"),
                   Role=("Role", lambda x: x.mode().iat[0] if len(x.mode())>0 else "Unknown")
               )
               .reset_index()
)

# standardize overall to 20-80
if overall["WeightedScore_z"].std(skipna=True) and overall["WeightedScore_z"].std(skipna=True) > 0:
    overall["Overall_20_80"] = (50 + 10 * (
        (overall["WeightedScore_z"] - overall["WeightedScore_z"].mean()) /
        overall["WeightedScore_z"].std()
    )).clip(20, 80)
else:
    overall["Overall_20_80"] = 50.0

overall = overall.sort_values("Overall_20_80", ascending=False)
overall.to_csv("overall_pitcher_usage_weighted.csv", index=False, encoding="utf-8-sig")

print("✅ Finished! Generated files:")
print("- pitchtype_scored_all.csv")
for pt in valid_pitchtypes:
    print(f"- top20_{pt}_starters.csv / top20_{pt}_relievers.csv")
print("- overall_pitcher_usage_weighted.csv")


  event = pd.read_csv("/content/drive/MyDrive/CPBL_csv_tables_UTF8_BOM/event.csv")
  .apply(agg_metrics)
  scored = by_pp.groupby("pitchType", group_keys=False).apply(score_one_pitchtype)


✅ Finished! Generated files:
- pitchtype_scored_all.csv
- top20_FF_starters.csv / top20_FF_relievers.csv
- top20_SI_starters.csv / top20_SI_relievers.csv
- top20_FC_starters.csv / top20_FC_relievers.csv
- top20_KN_starters.csv / top20_KN_relievers.csv
- top20_SL_starters.csv / top20_SL_relievers.csv
- top20_CU_starters.csv / top20_CU_relievers.csv
- top20_CH_starters.csv / top20_CH_relievers.csv
- top20_FO_starters.csv / top20_FO_relievers.csv
- top20_FS_starters.csv / top20_FS_relievers.csv
- top20_EP_starters.csv / top20_EP_relievers.csv
- overall_pitcher_usage_weighted.csv
