In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
from sklearn.preprocessing import LabelEncoder
import lightgbm as lgb
import warnings
warnings.filterwarnings('ignore')


In [18]:
# データの読み込み
def load_data():
    print("データを読み込んでいます...")
    teams = pd.read_csv('csv/1-1:team.csv')
    grounds = pd.read_csv('csv/1-2:ground.csv')
    players = pd.read_csv('csv/1-3:players.csv')
    matches = pd.read_csv('csv/2-1:matches.csv')
    innings_score = pd.read_csv('csv/2-2:innings_score.csv')
    at_bats = pd.read_csv('csv/2-3:at_bats.csv')
    pitches = pd.read_csv('csv/2-4:pitches.csv')
    print("データの読み込みが完了しました。")
    
    return teams, grounds, players, matches, innings_score, at_bats, pitches

In [32]:
# データの前処理
def preprocess_data(teams, grounds, players, matches, innings_score, at_bats, pitches):
    print("データの前処理を行っています...")
    
    display(pitches.head())

    # pitch_typeがNaNの行を除外 <-多分いらない
    # pitches = pitches.dropna(subset=['pitch_type'])
    
    # resultカラムに基づいて目的変数を作成
    pitches['is_foul'] = pitches['result'].str.contains('ファウル').fillna(False).astype(int)
    
    return

    # 投球データと打席データを結合
    data = pitches.merge(
        at_bats,
        on=['match_id', 'inning', 'top_bottom'],
        how='left',
        suffixes=('', '_at_bat')
    )
    
    # 試合情報を結合
    data = data.merge(
        matches,
        on='match_id',
        how='left'
    )
    
    # 打者情報を結合
    data = data.merge(
        players,
        left_on='batter_id',
        right_on='player_id',
        how='left',
        suffixes=('', '_batter')
    )
    
    # 投手情報を結合
    pitcher_data = players.copy()
    data = data.merge(
        pitcher_data,
        left_on='pitcher_id',
        right_on='player_id',
        how='left',
        suffixes=('', '_pitcher')
    )
    
    # チーム情報を結合
    data = data.merge(
        teams,
        left_on='team_id',
        right_on='team_id',
        how='left',
        suffixes=('', '_team')
    )
    
    # 球場情報を結合
    data = data.merge(
        grounds,
        on='ground_id',
        how='left'
    )
    
    # カテゴリ変数のエンコーディング
    categorical_columns = ['pitch_type', 'batting_side', 'pitching_side']
    label_encoders = {}
    
    for col in categorical_columns:
        if col in data.columns:
            le = LabelEncoder()
            data[col + '_encoded'] = le.fit_transform(data[col].fillna('unknown'))
            label_encoders[col] = le
    
    # ランナー有無をダミー変数化
    data['runner_1b_exists'] = data['runner_1b'].apply(lambda x: 0 if pd.isna(x) or x == 0 or x == '0' else 1)
    data['runner_2b_exists'] = data['runner_2b'].apply(lambda x: 0 if pd.isna(x) or x == 0 or x == '0' else 1)
    data['runner_3b_exists'] = data['runner_3b'].apply(lambda x: 0 if pd.isna(x) or x == 0 or x == '0' else 1)
    
    # 投手と打者の相性を表す特徴量
    data['pitcher_batter_combo'] = data['pitcher_id'].astype(str) + '_' + data['batter_id'].astype(str)
    
    # score差を計算
    data['score_diff'] = data['score1'] - data['score2']
    
    # 投球位置からの特徴量
    if 'coordinate_x' in data.columns and 'coordinate_y' in data.columns:
        # 座標がNaNの場合は平均値で補完
        data['coordinate_x'] = data['coordinate_x'].fillna(data['coordinate_x'].mean())
        data['coordinate_y'] = data['coordinate_y'].fillna(data['coordinate_y'].mean())
        
        # ストライクゾーンからの距離を計算（仮のストライクゾーン中心を(0,0)と仮定）
        data['distance_from_center'] = np.sqrt(data['coordinate_x']**2 + data['coordinate_y']**2)
    
    # 時間的特徴（試合の進行度）
    data['game_progress'] = data['inning'] / 9.0
    
    # 打者のカウント状況
    data['count_situation'] = data['ball_count'].astype(str) + '-' + data['strike_count'].astype(str)
    
    # カウント状況をエンコード
    le_count = LabelEncoder()
    data['count_encoded'] = le_count.fit_transform(data['count_situation'])
    
    # ボール、ストライク、アウトカウントが欠損している場合は0で埋める
    data['ball_count'] = data['ball_count'].fillna(0)
    data['strike_count'] = data['strike_count'].fillna(0)
    data['out_count'] = data['out_count'].fillna(0)
    
    # 球速が欠損している場合は平均値で埋める
    data['velocity'] = data['velocity'].fillna(data['velocity'].mean())

    print("データの前処理が完了しました。")
    
    return data, label_encoders

In [4]:
# 特徴量の選択
def select_features(data):
    print("特徴量を選択しています...")
    
    # モデル構築に使用する特徴量を選択
    features = [
        # 投球関連
        'pitch_type_encoded', 
        'velocity',
        
        # カウント状況
        'ball_count', 
        'strike_count', 
        'out_count',
        'count_encoded',
        
        # 塁状況
        'runner_1b_exists',
        'runner_2b_exists',
        'runner_3b_exists',
        
        # 試合状況
        'inning',
        'top_bottom',
        'score_diff',
        'game_progress',
        
        # 打者・投手情報
        'batting_side_encoded',
        'pitching_side_encoded',
    ]
    
    # 投球座標がデータに含まれている場合
    if 'coordinate_x' in data.columns and 'coordinate_y' in data.columns:
        features.extend(['coordinate_x', 'coordinate_y', 'distance_from_center'])
    
    # 特徴量と目的変数を分離
    X = data[features].copy()
    y = data['is_foul']
    
    return X, y, features

In [5]:
# LightGBMモデルのトレーニングと評価
def train_and_evaluate_model(X, y, features):
    print("モデルのトレーニングと評価を行っています...")
    
    # トレーニングデータとテストデータに分割
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # LightGBMのパラメータ設定
    params = {
        'objective': 'binary',
        'metric': 'binary_logloss',
        'boosting_type': 'gbdt',
        'num_leaves': 31,
        'learning_rate': 0.05,
        'feature_fraction': 0.9,
        'bagging_fraction': 0.8,
        'bagging_freq': 5,
        'verbose': -1
    }
    
    # グリッドサーチの設定
    param_grid = {
        'num_leaves': [31, 50, 70],
        'learning_rate': [0.01, 0.05, 0.1],
        'n_estimators': [100, 200]
    }
    
    # LightGBMモデルのインスタンス化
    lgb_model = lgb.LGBMClassifier(**params)
    
    # グリッドサーチでハイパーパラメータを最適化
    grid_search = GridSearchCV(
        estimator=lgb_model,
        param_grid=param_grid,
        cv=3,
        scoring='roc_auc',
        verbose=0
    )
    
    # モデルのトレーニング
    print("グリッドサーチでハイパーパラメータを最適化中...")
    grid_search.fit(X_train, y_train)
    
    # 最適なモデルを取得
    best_model = grid_search.best_estimator_
    print(f"最適なパラメータ: {grid_search.best_params_}")
    
    # テストデータで予測
    y_pred_proba = best_model.predict_proba(X_test)[:, 1]
    y_pred = best_model.predict(X_test)
    
    # モデルの評価
    print("\n分類レポート:")
    print(classification_report(y_test, y_pred))
    
    # 混同行列
    cm = confusion_matrix(y_test, y_pred)
    
    # 特徴量の重要度
    feature_importance = pd.DataFrame({
        'Feature': features,
        'Importance': best_model.feature_importances_
    }).sort_values(by='Importance', ascending=False)
    
    return best_model, feature_importance, y_test, y_pred, y_pred_proba, cm

In [6]:
# 結果の可視化
def visualize_results(feature_importance, y_test, y_pred, y_pred_proba, cm):
    print("結果を可視化しています...")
    
    # 特徴量の重要度を可視化
    plt.figure(figsize=(10, 8))
    sns.barplot(x='Importance', y='Feature', data=feature_importance.head(15))
    plt.title('Feature Importance')
    plt.tight_layout()
    plt.savefig('feature_importance.png')
    
    # ROC曲線
    fpr, tpr, _ = roc_curve(y_test, y_pred_proba)
    roc_auc = auc(fpr, tpr)
    
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic')
    plt.legend(loc="lower right")
    plt.savefig('roc_curve.png')
    
    # 混同行列の可視化
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False,
                xticklabels=['Not Foul', 'Foul'],
                yticklabels=['Not Foul', 'Foul'])
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.tight_layout()
    plt.savefig('confusion_matrix.png')
    
    print("可視化結果を保存しました。")

In [7]:
# モデルの保存
def save_model(model, label_encoders):
    print("モデルを保存しています...")
    import joblib
    
    # モデルを保存
    joblib.dump(model, 'foul_prediction_model.pkl')
    
    # ラベルエンコーダーを保存
    joblib.dump(label_encoders, 'label_encoders.pkl')
    
    print("モデルと前処理パイプラインを保存しました。")

In [8]:
# 予測用関数
def predict_foul(model, label_encoders, input_data):
    """
    新しい投球データに対してファウルの可能性を予測する
    
    Parameters:
    -----------
    model : LGBMClassifier
        トレーニング済みのモデル
    label_encoders : dict
        カテゴリ変数用のラベルエンコーダー
    input_data : dict
        予測したい投球のデータ
        
    Returns:
    --------
    float
        ファウルになる確率
    """
    # 入力データをDataFrameに変換
    input_df = pd.DataFrame([input_data])
    
    # カテゴリ変数をエンコード
    for col, encoder in label_encoders.items():
        if col in input_df.columns:
            try:
                input_df[col + '_encoded'] = encoder.transform(input_df[col].fillna('unknown'))
            except:
                # エンコーダーに存在しない値の場合は0を設定
                input_df[col + '_encoded'] = 0
    
    # ランナー状況の処理
    input_df['runner_1b_exists'] = input_df['runner_1b'].apply(lambda x: 0 if pd.isna(x) or x == 0 or x == '0' else 1)
    input_df['runner_2b_exists'] = input_df['runner_2b'].apply(lambda x: 0 if pd.isna(x) or x == 0 or x == '0' else 1)
    input_df['runner_3b_exists'] = input_df['runner_3b'].apply(lambda x: 0 if pd.isna(x) or x == 0 or x == '0' else 1)
    
    # 他の特徴量を計算
    input_df['game_progress'] = input_df['inning'] / 9.0
    input_df['score_diff'] = input_df['score1'] - input_df['score2']
    input_df['count_situation'] = input_df['ball_count'].astype(str) + '-' + input_df['strike_count'].astype(str)
    
    # 座標から距離を計算
    if 'coordinate_x' in input_df.columns and 'coordinate_y' in input_df.columns:
        input_df['distance_from_center'] = np.sqrt(input_df['coordinate_x']**2 + input_df['coordinate_y']**2)
    
    # モデルの特徴量を選択
    features = [col for col in model.feature_name_ if col in input_df.columns]
    
    # 予測
    foul_probability = model.predict_proba(input_df[features])[0, 1]
    return foul_probability

In [26]:
# メイン関数
def main():
    try:
        # データの読み込み
        teams, grounds, players, matches, innings_score, at_bats, pitches = load_data()

        # データの前処理
        data, label_encoders = preprocess_data(teams, grounds, players, matches, innings_score, at_bats, pitches)
        return
        # 特徴量の選択
        X, y, features = select_features(data)
        
        # モデルのトレーニングと評価
        model, feature_importance, y_test, y_pred, y_pred_proba, cm = train_and_evaluate_model(X, y, features)
        
        # 結果の可視化
        visualize_results(feature_importance, y_test, y_pred, y_pred_proba, cm)
        
        # モデルの保存
        save_model(model, label_encoders)
        
        # 予測例
        print("\n予測の例:")
        sample_input = {
            'pitch_type': 'ストレート',
            'velocity': 145,
            'ball_count': 1,
            'strike_count': 2,
            'out_count': 0,
            'runner_1b': 0,
            'runner_2b': 0,
            'runner_3b': 0,
            'inning': 5,
            'top_bottom': 0,
            'score1': 2,
            'score2': 3,
            'batting_side': '1',  # 右打者
            'pitching_side': '0',  # 左投手
            'coordinate_x': 0.1,
            'coordinate_y': 0.2
        }
        
        foul_prob = predict_foul(model, label_encoders, sample_input)
        print(f"この投球がファウルになる確率: {foul_prob:.2%}")
        
        print("\nファウル予測モデルの構築が完了しました！")
        
    except Exception as e:
        print(f"エラーが発生しました: {e}")
        import traceback
        traceback.print_exc()

In [33]:
if __name__ == "__main__":
    main()

データを読み込んでいます...
データの読み込みが完了しました。
データの前処理を行っています...


Unnamed: 0,pitch_id,match_id,inning,top_bottom,pitch_type,velocity,result,total_pitch_count,batter_pitch_count,ball_count,strike_count,out_count,coordinate_x,coordinate_y
0,2025_03_28_1_5,2025_03_28_1_5_1_0_1_1,1,0,ストレート,146,ファウル,1,1,0,1,1,75,84
1,2025_03_28_1_5,2025_03_28_1_5_1_0_1_2,1,0,スライダー,128,一ゴロ,2,2,0,1,1,101,55
2,2025_03_28_1_5,2025_03_28_1_5_1_0_2_1,1,0,ストレート,146,二ゴロ,3,1,0,0,2,87,32
3,2025_03_28_1_5,2025_03_28_1_5_1_0_3_1,1,0,スライダー,131,ボール,4,1,1,0,3,172,10
4,2025_03_28_1_5,2025_03_28_1_5_1_0_3_2,1,0,ストレート,146,見逃し,5,2,1,1,3,124,88


エラーが発生しました: cannot unpack non-iterable NoneType object


Traceback (most recent call last):
  File "/var/folders/rp/zznb4p7n1sb4lyjjckjh4j_80000gn/T/ipykernel_95926/2375729125.py", line 8, in main
    data, label_encoders = preprocess_data(teams, grounds, players, matches, innings_score, at_bats, pitches)
TypeError: cannot unpack non-iterable NoneType object
