# MLB Pitcher Arsenal Evolution (2020-2025)

投手の球種構成（Arsenal）の年次変化を追跡するデータセット

**データ形式**: Wide format（1行 = 投手×シーズン、球種を横展開）

**対象期間**: 2020-2025シーズン（6シーズン）

**対象投手**: 各シーズンで100球以上投球した投手

**主要球種**: FF, SI, FC, SL, CU, CH, FS, KC, FO, EP, KN

**メトリクス（各球種ごと）**:
- usage_pct: 使用率 (%)
- avg_speed: 平均球速 (mph)
- avg_spin: 平均回転数 (rpm)
- whiff_rate: 空振り率
- avg_pfx_x: 平均横変化量 (inch)
- avg_pfx_z: 平均縦変化量 (inch)

In [None]:
# 必要なパッケージをインストール
!pip install -q pybaseball duckdb

In [None]:
import pandas as pd
import numpy as np
from datetime import date
from pybaseball import statcast, playerid_reverse_lookup
import duckdb
import warnings
warnings.filterwarnings('ignore')

print(f"Data collection date: {date.today()}")

## Step 1: データ取得（2020-2025）

**注意**: この処理は時間がかかります（30分〜1時間）

In [None]:
# 各シーズンごとにデータ取得
seasons = [2020, 2021, 2022, 2023, 2024, 2025]
all_data = []

for season in seasons:
    print(f"\nFetching {season} season data...")
    start_date = f"{season}-03-01"
    end_date = f"{season}-11-30" if season < 2025 else date.today().strftime("%Y-%m-%d")
    
    df = statcast(start_dt=start_date, end_dt=end_date)
    df['season'] = season
    all_data.append(df)
    print(f"  {season}: {len(df):,} pitches")

# 全シーズン結合
df_all = pd.concat(all_data, ignore_index=True)
print(f"\nTotal pitches: {len(df_all):,}")
print(f"Columns: {len(df_all.columns)}")

## Step 2: DuckDBで集計

投手×シーズン×球種でグループ化して統計量を計算

In [None]:
con = duckdb.connect()

# 投手×シーズン×球種で集計
query = """
WITH pitcher_stats AS (
    SELECT 
        pitcher,
        season,
        pitch_type,
        COUNT(*) as n_pitches,
        AVG(release_speed) as avg_speed,
        AVG(release_spin_rate) as avg_spin,
        AVG(pfx_x) as avg_pfx_x,
        AVG(pfx_z) as avg_pfx_z,
        SUM(CASE WHEN description IN ('swinging_strike', 'swinging_strike_blocked') THEN 1 ELSE 0 END)::FLOAT / 
            NULLIF(SUM(CASE WHEN description LIKE '%strike%' OR description LIKE '%foul%' OR description IN ('hit_into_play') THEN 1 ELSE 0 END), 0) as whiff_rate
    FROM df_all
    WHERE pitch_type IS NOT NULL
        AND pitcher IS NOT NULL
    GROUP BY pitcher, season, pitch_type
),
pitcher_totals AS (
    SELECT 
        pitcher,
        season,
        SUM(n_pitches) as total_pitches
    FROM pitcher_stats
    GROUP BY pitcher, season
    HAVING total_pitches >= 100  -- 最低100球以上
)
SELECT 
    ps.pitcher,
    ps.season,
    ps.pitch_type,
    ps.n_pitches,
    ROUND(100.0 * ps.n_pitches / pt.total_pitches, 2) as usage_pct,
    ROUND(ps.avg_speed, 2) as avg_speed,
    ROUND(ps.avg_spin, 0) as avg_spin,
    ROUND(ps.whiff_rate, 4) as whiff_rate,
    ROUND(ps.avg_pfx_x, 2) as avg_pfx_x,
    ROUND(ps.avg_pfx_z, 2) as avg_pfx_z
FROM pitcher_stats ps
INNER JOIN pitcher_totals pt 
    ON ps.pitcher = pt.pitcher AND ps.season = pt.season
ORDER BY ps.pitcher, ps.season, ps.pitch_type
"""

df_long = con.execute(query).df()
print(f"Long format: {len(df_long):,} rows (投手×シーズン×球種)")
print(f"Unique pitchers: {df_long['pitcher'].nunique():,}")
print(f"\nPitch types: {sorted(df_long['pitch_type'].unique())}")

In [None]:
# サンプル確認
df_long.head(20)

## Step 3: Wide formatに変換

各球種を横展開（FF_usage_pct, FF_avg_speed, ...）

In [None]:
# メトリクスリスト
metrics = ['usage_pct', 'avg_speed', 'avg_spin', 'whiff_rate', 'avg_pfx_x', 'avg_pfx_z']

# 各メトリクスごとにpivot
pivoted_dfs = []

for metric in metrics:
    pivot = df_long.pivot_table(
        index=['pitcher', 'season'],
        columns='pitch_type',
        values=metric,
        aggfunc='first'
    )
    # カラム名を "PITCH_metric" 形式に変更
    pivot.columns = [f"{col}_{metric}" for col in pivot.columns]
    pivoted_dfs.append(pivot)

# 全メトリクスを結合
df_wide = pd.concat(pivoted_dfs, axis=1).reset_index()

print(f"Wide format: {len(df_wide):,} rows (投手×シーズン)")
print(f"Columns: {len(df_wide.columns)}")

In [None]:
# カラム一覧確認
print("\nColumn names:")
for i, col in enumerate(df_wide.columns, 1):
    print(f"{i:3d}. {col}")

## Step 4: 投手名を追加

In [None]:
# ユニークな投手IDリスト
unique_pitchers = df_wide['pitcher'].unique()
print(f"Looking up names for {len(unique_pitchers):,} pitchers...")

# 投手名取得（バッチ処理）
name_dict = {}
batch_size = 100

for i in range(0, len(unique_pitchers), batch_size):
    batch = unique_pitchers[i:i+batch_size]
    for player_id in batch:
        try:
            result = playerid_reverse_lookup([player_id], key_type='mlbam')
            if not result.empty:
                name_dict[player_id] = f"{result.iloc[0]['name_first']} {result.iloc[0]['name_last']}"
        except:
            name_dict[player_id] = f"Player_{player_id}"
    
    if (i + batch_size) % 500 == 0:
        print(f"  {i + batch_size:,} / {len(unique_pitchers):,}")

# 投手名を追加
df_wide.insert(1, 'player_name', df_wide['pitcher'].map(name_dict))

print(f"\nCompleted. {df_wide['player_name'].notna().sum()} names found.")

In [None]:
# サンプル確認
df_wide.head(10)

## Step 5: CSV出力

In [None]:
# カラム名をpitcherからplayer_idに変更
df_wide.rename(columns={'pitcher': 'player_id'}, inplace=True)

# CSV出力
output_file = "pitcher_arsenal_evolution_2020_2025.csv"
df_wide.to_csv(output_file, index=False)

print(f"\n=== Dataset Summary ===")
print(f"File: {output_file}")
print(f"Rows: {len(df_wide):,}")
print(f"Columns: {len(df_wide.columns)}")
print(f"Pitchers: {df_wide['player_id'].nunique():,}")
print(f"Seasons: {sorted(df_wide['season'].unique())}")
print(f"\nFile size: {df_wide.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB (in memory)")

In [None]:
# 基本統計
print("\n=== Basic Statistics ===")
print(f"\nPitchers per season:")
print(df_wide.groupby('season')['player_id'].nunique())

print(f"\nMost common pitch types (by usage):")
usage_cols = [col for col in df_wide.columns if col.endswith('_usage_pct')]
for col in usage_cols:
    mean_usage = df_wide[col].mean()
    if pd.notna(mean_usage) and mean_usage > 1.0:  # 1%以上の球種のみ
        pitch_type = col.replace('_usage_pct', '')
        print(f"  {pitch_type}: {mean_usage:.2f}%")