# 01｜資料整理：Shift-Eligible 打者篩選 + 情境化 Fear Pitch & GB 落點摘要

本 Notebook 會從 `PA.csv` 與 `event.csv` 建立打者層級特徵，並輸出：
- `batter_ctx_fear_pitch_gb_topzones_full.csv`（情境×球數群×球種的揮空風險 + GB 落點區）
- `shift_eligible_batters.csv`（值得佈陣候選打者清單）

⚠️ 先把 `DATA_DIR` 改成你 Google Drive 內存放 CSV 的資料夾。

In [None]:
#（Colab 可選）掛載 Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# === 完整修正版（自動偵測欄位名）：PA.csv + event.csv → fear pitchType + GB top zones ===

import os
import pandas as pd
import numpy as np

DATA_DIR = "/content/drive/MyDrive/CPBL_csv_tables_UTF8_BOM"   # <- 改這裡
PA_PATH = os.path.join(DATA_DIR, "PA.csv")
EVENT_PATH = os.path.join(DATA_DIR, "event.csv")

OUT1 = os.path.join(DATA_DIR, "batter_ctx_fear_pitch_full.csv")
OUT2 = os.path.join(DATA_DIR, "batter_ctx_fear_pitch_gb_topzones_full.csv")

pa = pd.read_csv(PA_PATH, encoding="utf-8-sig", low_memory=False)
evt = pd.read_csv(EVENT_PATH, encoding="utf-8-sig", low_memory=False)

print("PA:", pa.shape, "Event:", evt.shape)

# -------------------------
# 工具：欄位偵測與標準化
# -------------------------
def pick_col(df, candidates, required=True):
    for c in candidates:
        if c in df.columns:
            return c
    if required:
        raise KeyError(f"Cannot find any of columns: {candidates}")
    return None

def to_int_safe(x):
    if x is None or (isinstance(x, float) and np.isnan(x)):
        return np.nan
    s = str(x).strip()
    if s == "" or s.lower() == "nan":
        return np.nan
    try:
        return int(float(s))
    except:
        return np.nan

def to_bool01(x):
    if x is None or (isinstance(x, float) and np.isnan(x)):
        return 0
    s = str(x).strip().lower()
    if s in {"1","true","t","y","yes"}: return 1
    if s in {"0","false","f","n","no",""}: return 0
    try:
        return int(float(s))
    except:
        return 0

# -------------------------
# 1) 找出關鍵欄位（你的版本可能命名不同）
# -------------------------
# keys
PA_GAME = pick_col(pa,  ["game_id","gameId","gameID"])
PA_SIDE = pick_col(pa,  ["side","topBottom","half"])
PA_PAI  = pick_col(pa,  ["pa_index","paIndex","paId","pa_id"])
EV_GAME = pick_col(evt, ["game_id","gameId","gameID"])
EV_SIDE = pick_col(evt, ["side","topBottom","half"])
EV_PAI  = pick_col(evt, ["pa_index","paIndex","paId","pa_id"])
EV_EVI  = pick_col(evt, ["event_index","eventIndex","pitch_index","pitchIndex","idx"])

# batter
PA_BAT = pick_col(pa,  ["batterName","batter_name","batter","batterFullName","batter_name_zh"])
EV_BAT = pick_col(evt, ["batterName","batter_name","batter","batterFullName","batter_name_zh"])

# bases / loc / traj
PA_BASES = pick_col(pa, ["bases","baseState","base_state"])
PA_LOC   = pick_col(pa, ["locationCode","location_code","location"])
PA_TRAJ  = pick_col(pa, ["trajectory","traj","battedBallType","bip_type"])

# pitch fields
EV_PTYPE = pick_col(evt, ["pitchType","pitch_type","ptype"])
EV_PCODE = pick_col(evt, ["pitchCode","pitch_code","pcode"])
EV_INPLAY= pick_col(evt, ["inPlay","in_play"])
EV_STR   = pick_col(evt, ["isStrike","is_strike"])
EV_BALL  = pick_col(evt, ["isBall","is_ball"])

print("\n[Detected columns]")
print("PA keys:", PA_GAME, PA_SIDE, PA_PAI, "| batter:", PA_BAT, "| bases:", PA_BASES, "| loc:", PA_LOC, "| traj:", PA_TRAJ)
print("EV keys:", EV_GAME, EV_SIDE, EV_PAI, EV_EVI, "| batter:", EV_BAT, "| ptype:", EV_PTYPE, "| pcode:", EV_PCODE,
      "| inPlay:", EV_INPLAY, "| isStrike:", EV_STR, "| isBall:", EV_BALL)

# -------------------------
# 2) 統一成標準欄位名
# -------------------------
pa2 = pa.rename(columns={
    PA_GAME:"game_id", PA_SIDE:"side", PA_PAI:"pa_index",
    PA_BAT:"batterName", PA_BASES:"bases",
    PA_LOC:"locationCode", PA_TRAJ:"trajectory"
}).copy()

evt2 = evt.rename(columns={
    EV_GAME:"game_id", EV_SIDE:"side", EV_PAI:"pa_index", EV_EVI:"event_index",
    EV_BAT:"batterName",
    EV_PTYPE:"pitchType", EV_PCODE:"pitchCode",
    EV_INPLAY:"inPlay", EV_STR:"isStrike", EV_BALL:"isBall"
}).copy()

# 清理 key + 型態
for df in [pa2, evt2]:
    df["game_id"] = df["game_id"].astype(str).str.strip()
    df["side"] = df["side"].astype(str).str.strip()
    df["pa_index"] = df["pa_index"].apply(to_int_safe)

evt2["event_index"] = evt2["event_index"].apply(to_int_safe)

pa2["batterName"] = pa2["batterName"].astype(str).str.strip()
evt2["batterName"] = evt2["batterName"].astype(str).str.strip()

pa2["locationCode_int"] = pa2["locationCode"].apply(to_int_safe)
pa2["trajectory"] = pa2["trajectory"].astype(str).str.strip()

evt2["pitchType"] = evt2["pitchType"].astype(str).str.strip().str.upper()
evt2["pitchCode"] = evt2["pitchCode"].astype(str).str.strip().str.upper()

# ★關鍵：布林解析（修正 inPlay 全 0 問題）
print("\nRAW inPlay value_counts (top 10):")
print(evt2["inPlay"].astype(str).str.strip().value_counts().head(10))

evt2["inPlay"]   = evt2["inPlay"].apply(to_bool01).astype(int)
evt2["isStrike"] = evt2["isStrike"].apply(to_bool01).astype(int)
evt2["isBall"]   = evt2["isBall"].apply(to_bool01).astype(int)

print("\nParsed inPlay value_counts:")
print(evt2["inPlay"].value_counts())

# -------------------------
# 3) bases_group + count_group（逐球還原投出前球數）
# -------------------------
def bases_group(b):
    if b is None or (isinstance(b, float) and np.isnan(b)):
        return "UNK"
    s = str(b).strip()
    if s == "" or s.lower() == "nan":
        return "UNK"
    if s in {"0","---","EMPTY","NONE","none","None"}:
        return "Empty"
    u = s.upper()
    if ("2" in s) or ("3" in s) or ("2B" in u) or ("3B" in u):
        return "RISP"
    return "OnBase"

def count_group(balls, strikes):
    b, s = int(balls), int(strikes)
    if (b, s) in {(0,2), (1,2), (0,1)}: return "PitcherAdv"
    if (b, s) in {(2,0), (3,0), (3,1), (2,1)}: return "BatterAdv"
    if (b, s) in {(0,0), (1,1), (2,2), (3,2)}: return "Even"
    return "Other"

pa2["bases_group"] = pa2["bases"].apply(bases_group)

FOUL_CODES = {"F","FOUL","FOULTIP","FT"}

evt2 = evt2.sort_values(["game_id","side","pa_index","event_index"]).copy()

balls_before, strikes_before = [], []
cur_key, b, s = None, 0, 0

for r in evt2.itertuples(index=False):
    key = (r.game_id, r.side, r.pa_index)
    if key != cur_key:
        cur_key, b, s = key, 0, 0

    balls_before.append(b)
    strikes_before.append(s)

    code = str(r.pitchCode).strip().upper()
    if int(r.isBall) == 1 and b < 3:
        b += 1
    if int(r.isStrike) == 1:
        if s < 2:
            s += 1
        else:
            if code not in FOUL_CODES:
                pass

evt2["balls_before"] = balls_before
evt2["strikes_before"] = strikes_before
evt2["count_group"] = [count_group(bb, ss) for bb, ss in zip(evt2["balls_before"], evt2["strikes_before"])]

# whiff
def is_whiff(isStrike, inPlay, pitchCode):
    if (isStrike == 1) and (inPlay == 0):
        return int(str(pitchCode).strip().upper() not in FOUL_CODES)
    return 0

evt2["isWhiff"] = [is_whiff(a,b,c) for a,b,c in zip(evt2["isStrike"], evt2["inPlay"], evt2["pitchCode"])]

# -------------------------
# 4) join PA資訊到 event（bases_group + loc + traj）
# -------------------------
pa_key = pa2[["game_id","side","pa_index","bases_group","locationCode_int","trajectory","batterName"]].drop_duplicates(
    subset=["game_id","side","pa_index"]
)

evtJ = evt2.merge(pa_key, on=["game_id","side","pa_index"], how="left", suffixes=("","_pa"))

# last pitch
last_idx = evtJ.groupby(["game_id","side","pa_index"])["event_index"].transform("max")
evtJ["isLastPitch"] = (evtJ["event_index"] == last_idx).astype(int)

print("\nLastPitch count:", evtJ["isLastPitch"].sum())
print("LastPitch & inPlay:", ((evtJ["isLastPitch"]==1) & (evtJ["inPlay"]==1)).sum())

# GB 判定（trajectory: 你診斷有 G/F/L/P/nan）
GB_KEYS = {"G","GB","GROUND","GROUNDBALL"}
def is_gb_traj(traj):
    t = str(traj).strip().upper()
    return (t in GB_KEYS) or t.startswith("G")

evtJ["isGB_last_inplay"] = (
    (evtJ["isLastPitch"]==1) &
    (evtJ["inPlay"]==1) &
    (evtJ["trajectory"].apply(is_gb_traj)) &
    (evtJ["locationCode_int"].notna())
).astype(int)

print("GB_last_inplay count:", evtJ["isGB_last_inplay"].sum())

# -------------------------
# 5) fear pitchType（batter×bases×count 下 whiff_rate最高）
# -------------------------
MIN_PITCH_N = 25  # 不夠就降到 10~15

agg = (evtJ.groupby(["batterName","bases_group","count_group","pitchType"])
          .agg(pitch_n=("isWhiff","size"), whiff_n=("isWhiff","sum"))
          .reset_index())
agg["whiff_rate"] = agg["whiff_n"] / agg["pitch_n"]

agg_f = agg[agg["pitch_n"] >= MIN_PITCH_N].copy()
agg_f = agg_f.sort_values(["batterName","bases_group","count_group","whiff_rate","pitch_n"],
                          ascending=[True,True,True,False,False])

fear = agg_f.groupby(["batterName","bases_group","count_group"], as_index=False).head(1).copy()
fear = fear.rename(columns={"pitchType":"fear_pitchType","pitch_n":"fear_pitch_n","whiff_n":"fear_whiff_n","whiff_rate":"fear_whiff_rate"})
fear.to_csv(OUT1, index=False, encoding="utf-8-sig")
print("\nSaved:", OUT1, "| rows:", len(fear))
print(fear.head(10))

# -------------------------
# 6) 在 fear pitchType 下，統計「最後一球inPlay且GB」的落點 top zones
# -------------------------
def top_zones(series, k=3):
    s = series.dropna().astype(int)
    if len(s) == 0:
        return ""
    vc = s.value_counts().head(k)
    return ",".join(map(str, vc.index.tolist()))

evt_fear = evtJ.merge(
    fear[["batterName","bases_group","count_group","fear_pitchType"]],
    on=["batterName","bases_group","count_group"],
    how="inner"
)

evt_gb = evt_fear[
    (evt_fear["pitchType"] == evt_fear["fear_pitchType"]) &
    (evt_fear["isGB_last_inplay"] == 1)
].copy()

print("\nRows after fear_pitchType + GB filter:", len(evt_gb))

gb_zone = (evt_gb.groupby(["batterName","bases_group","count_group","fear_pitchType"])
              .agg(gb_inplay_n=("locationCode_int","count"),
                   gb_topzones=("locationCode_int", lambda x: top_zones(x,3)))
              .reset_index())

out = fear.merge(gb_zone, on=["batterName","bases_group","count_group","fear_pitchType"], how="left")
out["gb_inplay_n"] = out["gb_inplay_n"].fillna(0).astype(int)
out["gb_topzones"] = out["gb_topzones"].fillna("")

out.to_csv(OUT2, index=False, encoding="utf-8-sig")
print("Saved:", OUT2, "| rows:", len(out))
print(out.head(15))

print("\n完成。若 gb_inplay_n 還偏少：把 MIN_PITCH_N 降到 10~15 再跑。")

## 1.5 依 GB% 與方向集中度篩選 Shift-Eligible 打者
以下程式會：
1) 以 `PA >= min_pa` 過濾打者
2) 以 BIP（有落點/有 trajectory/hardness 等）近似篩出可用擊球
3) 計算 GB% 與方向集中度（Pull/Middle/Oppo），挑出值得佈陣者


In [None]:
# === Shift-Eligible Batters 篩選（可直接丟 Colab 跑）===
# 依你的想法：
# 1) 先用 PA >= min_pa 過濾打者
# 2) 只看「有擊球落點」的球（BIP）
# 3) 算每位打者 GB%（groundball rate），挑 > 聯盟平均者
# 4) 再算擊球方向（Pull/Middle/Oppo）集中度，挑集中者（max >= threshold）
#
# 輸出：shift_eligible_batters.csv

import os
import pandas as pd
import numpy as np


# ---------------------------
# 0) 路徑：改成你 CSV 放的資料夾
# ---------------------------
DATA_DIR = "/content/drive/MyDrive/CPBL_csv_tables_UTF8_BOM"  # <- 改這裡
PA_PATH = os.path.join(DATA_DIR, "PA.csv")

pa = pd.read_csv(PA_PATH, encoding="utf-8-sig")
print("PA rows:", len(pa))
print("PA cols:", list(pa.columns))


# ---------------------------
# 1) 一些工具：把欄位整理乾淨
# ---------------------------
def _to_str(x):
    if pd.isna(x):
        return ""
    return str(x).strip()

def _to_int_safe(x):
    if pd.isna(x):
        return np.nan
    s = str(x).strip()
    if s == "":
        return np.nan
    # 有些可能是 "HARDNESS" 這種怪東西混進去，直接轉失敗就 NaN
    try:
        return int(float(s))
    except:
        return np.nan

# 你的 PA 欄位有：locationCode、trajectory、hardness、batterHand...
pa["batterName"] = pa["batterName"].map(_to_str)
pa["batterHand"] = pa["batterHand"].map(_to_str).replace({"": np.nan})
pa["trajectory"]  = pa["trajectory"].map(_to_str)
pa["locationCode_num"] = pa["locationCode"].map(_to_int_safe)

# 把 balls/strikes/outs 可能用不到，但先確保不炸
for c in ["balls", "strikes", "outs", "RBI", "paRound", "paOrder"]:
    if c in pa.columns:
        pa[c] = pa[c].map(_to_int_safe)


# ---------------------------
# 2) 定義「哪些 trajectory 算 GB / FB / LD」
#    注意：你資料裡 trajectory 的值可能是縮寫或英文，先用「可調參」寫法
# ---------------------------
# 你之後只要看 pa["trajectory"].value_counts()，把集合改成符合你的資料即可
GB_KEYS = {"G", "GB", "GROUND", "GROUNDBALL"}   # 滾地球
FB_KEYS = {"F", "FB", "FLY", "FLYBALL"}        # 飛球
LD_KEYS = {"L", "LD", "LINE", "LINEDRIVE"}     # 平飛

def classify_bip_type(traj: str) -> str:
    t = traj.upper().strip()
    if t in GB_KEYS or t.startswith("G"):
        return "GB"
    if t in FB_KEYS or t.startswith("F"):
        return "FB"
    if t in LD_KEYS or t.startswith("L"):
        return "LD"
    return "OTHER"


# ---------------------------
# 3) 定義「locationCode -> Pull/Middle/Oppo」(可調)
#    你之前提到：拉打傾向（6,56,7）
#    我先把這組當作「右打 Pull」的預設；左打會鏡像。
#    如果你們之後確認 RetroSheet/野球革命的 mapping 不同，改這裡就好。
# ---------------------------
PULL_RH   = {6, 56, 7}     # 右打拉打（你提供的那組）
MIDDLE    = {5, 55}        # 中間（常見做法：5/55）
OPPO_RH   = {3, 45, 4}     # 右打反方向（先給一個常見鏡像組；可自行改）

# 左打鏡像：Pull 跟 Oppo 對調
PULL_LH = OPPO_RH
OPPO_LH = PULL_RH

def classify_direction(loc_code: float, hand: str) -> str:
    if pd.isna(loc_code):
        return "UNK"
    c = int(loc_code)
    h = (hand or "").upper().strip()
    if h == "L":
        if c in PULL_LH:   return "PULL"
        if c in MIDDLE:    return "MIDDLE"
        if c in OPPO_LH:   return "OPPO"
    else:  # 預設當右打（R / switch 未處理先當 R）
        if c in PULL_RH:   return "PULL"
        if c in MIDDLE:    return "MIDDLE"
        if c in OPPO_RH:   return "OPPO"
    return "OTHER"


# ---------------------------
# 4) 主程式：算每位打者的 GB%、方向分布、集中度，篩 Shift-Eligible
# ---------------------------
def build_shift_eligible_batters(
    pa_df: pd.DataFrame,
    min_pa: int = 100,
    min_bip: int = 30,              # 至少要有一定 BIP 才穩
    dir_conc_threshold: float = 0.45, # max(PULL/MID/OPPO) >= 這個才算方向集中
    gb_margin: float = 0.0,         # > league_avg + margin（你要嚴格可設 0.05）
) -> pd.DataFrame:
    df = pa_df.copy()

    # 只看「有擊球落點」的打席，當作 BIP（你資料層級在 PA）
    bip = df[df["locationCode_num"].notna()].copy()
    if len(bip) == 0:
        raise ValueError("沒有任何 locationCode 可用，請確認 PA.csv 是否有 locationCode。")

    # BIP 類型（GB/FB/LD）
    bip["bip_type"] = bip["trajectory"].map(classify_bip_type)
    # 方向（PULL/MIDDLE/OPPO/OTHER）
    bip["direction"] = [
        classify_direction(lc, h) for lc, h in zip(bip["locationCode_num"], bip["batterHand"])
    ]

    # 每位打者 PA 計算：用原始 PA 表計（不是只算 BIP）
    pa_cnt = df.groupby("batterName").size().rename("PA_total").reset_index()

    # 每位打者 BIP 計算：用 bip 表
    bip_cnt = bip.groupby("batterName").size().rename("BIP").reset_index()

    # GB/FB/LD 次數
    bip_type_cnt = (
        bip.pivot_table(index="batterName", columns="bip_type", values="inning", aggfunc="count", fill_value=0)
        .reset_index()
    )
    for col in ["GB", "FB", "LD"]:
        if col not in bip_type_cnt.columns:
            bip_type_cnt[col] = 0

    # 方向次數
    dir_cnt = (
        bip.pivot_table(index="batterName", columns="direction", values="inning", aggfunc="count", fill_value=0)
        .reset_index()
    )
    for col in ["PULL", "MIDDLE", "OPPO"]:
        if col not in dir_cnt.columns:
            dir_cnt[col] = 0

    # 合併
    out = pa_cnt.merge(bip_cnt, on="batterName", how="left").merge(bip_type_cnt, on="batterName", how="left").merge(dir_cnt, on="batterName", how="left")
    out = out.fillna(0)

    # GB%：只用 GB/FB/LD 分母（OTHER 不算）
    out["BIP_classified"] = out["GB"] + out["FB"] + out["LD"]
    out["GB_rate"] = np.where(out["BIP_classified"] > 0, out["GB"] / out["BIP_classified"], np.nan)

    # 方向比例（用 PULL/MIDDLE/OPPO 分母）
    out["DIR_total"] = out["PULL"] + out["MIDDLE"] + out["OPPO"]
    out["PullRate"] = np.where(out["DIR_total"] > 0, out["PULL"] / out["DIR_total"], np.nan)
    out["MiddleRate"] = np.where(out["DIR_total"] > 0, out["MIDDLE"] / out["DIR_total"], np.nan)
    out["OppoRate"] = np.where(out["DIR_total"] > 0, out["OPPO"] / out["DIR_total"], np.nan)

    out["DirConcentration"] = out[["PullRate", "MiddleRate", "OppoRate"]].max(axis=1)
    out["PrimaryDir"] = out[["PullRate", "MiddleRate", "OppoRate"]].idxmax(axis=1).str.replace("Rate", "", regex=False)

    # 聯盟平均 GB%（用合格樣本）
    league_pool = out[(out["PA_total"] >= min_pa) & (out["BIP"] >= min_bip) & (out["GB_rate"].notna())]
    league_gb_avg = league_pool["GB_rate"].mean()
    print(f"League GB_rate avg (PA>={min_pa}, BIP>={min_bip}): {league_gb_avg:.4f}  (margin={gb_margin})")

    # 篩選條件
    cond = (
        (out["PA_total"] >= min_pa) &
        (out["BIP"] >= min_bip) &
        (out["GB_rate"].notna()) &
        (out["GB_rate"] > (league_gb_avg + gb_margin)) &
        (out["DirConcentration"].notna()) &
        (out["DirConcentration"] >= dir_conc_threshold)
    )

    eligible = out[cond].copy()
    eligible["LeagueGBAvg"] = league_gb_avg
    eligible = eligible.sort_values(["GB_rate", "DirConcentration"], ascending=False)

    # 也把所有打者的 summary 一起回傳（方便你做附錄/表格）
    all_summary = out.sort_values("PA_total", ascending=False).copy()
    return eligible, all_summary


eligible, all_summary = build_shift_eligible_batters(
    pa,
    min_pa=100,
    min_bip=30,
    dir_conc_threshold=0.45,
    gb_margin=0.0
)

print("Eligible batters:", len(eligible))
display(eligible.head(20))

# 輸出 CSV
eligible_out = os.path.join(DATA_DIR, "shift_eligible_batters.csv")
all_out = os.path.join(DATA_DIR, "batters_bip_gb_direction_summary.csv")
eligible.to_csv(eligible_out, index=False, encoding="utf-8-sig")
all_summary.to_csv(all_out, index=False, encoding="utf-8-sig")

print("Saved:", eligible_out)
print("Saved:", all_out)


# ---------------------------
# 5)（建議你立刻做）看 trajectory 的實際值，調整 GB/FB/LD mapping
# ---------------------------
print("\nTop trajectory values:")
print(pa["trajectory"].astype(str).str.strip().value_counts().head(30))