# Modeling

In [7]:
# Import libraries
import re 
import ast 
import json 
import pickle 
from collections import Counter 
import datetime as dt
import pybaseball
import pandas as pd
import numpy as np 
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta

from pybaseball import pitching_stats_bref
from pybaseball import statcast
from pybaseball import statcast_pitcher
from pybaseball import playerid_lookup
from pybaseball import playerid_reverse_lookup

from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import LabelEncoder

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC, LinearSVC
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score

import warnings
warnings.filterwarnings("ignore")

- XGBoost
- Random Forest
- RNN (LSTM or GRU or both)

RNN will likely take too long depending on how the number of features.

In [87]:
data = pd.read_csv('../data/final/pitch_by_pitch_2023_ml.csv')

In [92]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 503292 entries, 0 to 503291
Data columns (total 26 columns):
 #   Column                             Non-Null Count   Dtype  
---  ------                             --------------   -----  
 0   pitch_type                         503260 non-null  object 
 1   release_speed                      503292 non-null  float64
 2   batter                             503292 non-null  int64  
 3   pitcher                            503292 non-null  int64  
 4   balls                              503292 non-null  int64  
 5   strikes                            503292 non-null  int64  
 6   on_3b                              503292 non-null  int64  
 7   on_2b                              503292 non-null  int64  
 8   on_1b                              503292 non-null  int64  
 9   outs_when_up                       503292 non-null  int64  
 10  inning                             503292 non-null  int64  
 11  at_bat_number                      5032

In [90]:
data.drop(columns=['game_type', 'inning_topbot', 'if_fielding_alignment', 'of_fielding_alignment', 'count', 'game_pitcher_id'], inplace=True)

In [91]:
data['prev_pitch_type'] = data['prev_pitch_type'].astype(str)

In [93]:
data['prev_pitch_type'].tail()

503287    Breaking Ball
503288         Fastball
503289         Fastball
503290    Breaking Ball
503291          Unknown
Name: prev_pitch_type, dtype: object

## 1. XGBoost

In [96]:
def train_xgb_models(train_data, pitch_count_cutoff=1000):
    '''
    Function to train and test models for pitch prediction for individual pitchers
    
    train_data: a cleaned data frame
    pitch_count_cutoff: a minimum number of pitches thrown  
    
    returns a pickled file for each pitcher that contains the model and some metadata for that pitcher
    '''

    # build a dict with pitch_id as key and total pitch count as value
    pitcher_count_dict = dict(Counter(train_data['pitcher']))

    # drop pitchers that don't have enough pitches to build a reliable model
    pitcher_count_dict = {k:v for k, v in pitcher_count_dict.items() if v > pitch_count_cutoff}

    # list of pitchers
    pitcher_list = pitcher_count_dict.keys()
    print(f"Number of pitchers that make the cut: {len(pitcher_count_dict)}")

    # loop through the list of pitchers and train models
    accuracy_list = []
    naive_accuracy_list = []
    num_skipped = 0
    for i, pitcher in enumerate(pitcher_list):

        # start a timer
        start = dt.datetime.now()

        df_pitcher = train_data[train_data['pitcher'] == pitcher]
        df_pitcher.drop('pitcher', axis=1, inplace=True)

        # get a unique list of the pitcher's pitches
        pitch_types = list(set(list(df_pitcher['prev_pitch_type'].unique()) + list(df_pitcher['pitch_type'].unique())))
        pitch_type_counts = Counter(df_pitcher['pitch_type'])

        # build maps for pitches to ints and ints back to pitches
        pitch_map = {pitch_types[i]: i for i in range(len(pitch_types))}
        pitch_unmap = {v: k for k, v in pitch_map.items()}

        # map pitch types to ints
        df_pitcher['pitch_type'] = df_pitcher['pitch_type'].apply(lambda x: pitch_map[x])
        df_pitcher['prev_pitch_type'] = df_pitcher['prev_pitch_type'].apply(lambda x: pitch_map[x])

        # split the dataframe into a feature set and an outcome column
        X = df_pitcher.drop('pitch_type', axis=1)
        y = df_pitcher['pitch_type']

        # split the data into train/test sets
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

        # ----------------------
        # train an XGBoost model
        # ----------------------

        # small set of hyperparameters to optimize over
        xgb_params = {"max_depth": (2, 5, 20),
                      "learning_rate": (0.01, 0.1, 0.4)}

        # perform the paramater grid search using 5-fold cross validation
        xgb_opt = GridSearchCV(XGBClassifier(objective='multi:softprob', num_class=len(pitch_type_counts)), 
                               param_grid=xgb_params, cv=5, scoring='accuracy', verbose=0, n_jobs=-1)

        # perform fit and make predictions
        xgb_opt.fit(X_train, y_train)
        y_pred = xgb_opt.predict(X_test)
        y_prob = xgb_opt.predict_proba(X_test)

        # compute accuracy and store in a list for analyzing results later
        accuracy = round(accuracy_score(y_test, y_pred) * 100, 1)
        accuracy_list.append(accuracy)

        # get and store the naive accuracy (accuracy from just predicting the most thrown pitch)
        naive_accuracy = round(max(pitch_type_counts.values()) / sum(pitch_type_counts.values()) * 100., 1)
        naive_accuracy_list.append(naive_accuracy)

        # print some input/results for every 10th pitcher
        if i % 10 == 0:
            print()
            print(f"Pitcher ID: {pitcher}")
            print(f"Pitcher's pitch map: {pitch_map}")
            print(f"Pitcher's pitch counter: {dict(pitch_type_counts)}")
            print(f"Number of data points in training: {X_train.shape[0]}")
            print(f"Number of data points in testing: {X_test.shape[0]}")
            print(f"Best params: {xgb_opt.best_params_}")
            print(f"Total training time: {dt.datetime.now()-start}")
            print(f"Naive accuracy: {naive_accuracy}")
            print(f"XGBooost accuracy: {accuracy}")


        # ----------------------------------------------------------
        # write out the pitchers model and metadata to a pickle file
        # ----------------------------------------------------------

        # things to store in the pitcher's model file:
        #  1) the map and unmap for pitches (used for data clean-up in the prediction process)
        #  2) trained model (used to make prediction)
        #  3) accuracy on the test data (to include with pitch predictions so user can see how confident the model is)
        model_out = {
            "pitcherID": pitcher,
            "pitch_map": pitch_map,
            "pitch_unmap": pitch_unmap,
            "model": xgb_opt,
            "model_accuracy": accuracy
        }

        # pickle up the pitcher's model file
        fpath = "../data/pitcher_models/" + str(pitcher) + ".pkl"
        with open(fpath, 'wb') as fobj:
            pickle.dump(model_out, fobj)
           
    # return the accuracy lists so we can perform assessment 
    return accuracy_list, naive_accuracy_list

In [None]:
def train_xgb_models(train_data, pitch_count_cutoff=1000):
    '''
    Function to train and test models for pitch prediction for individual pitchers
    
    train_data: a cleaned data frame
    pitch_count_cutoff: a minimum number of pitches thrown  
    
    returns accuracy lists for model performance analysis
    '''

    # Build a dict with pitch_id as key and total pitch count as value
    pitcher_count_dict = dict(Counter(train_data['pitcher']))

    # Drop pitchers that don't have enough pitches to build a reliable model
    pitcher_count_dict = {k:v for k, v in pitcher_count_dict.items() if v > pitch_count_cutoff}

    # List of pitchers
    pitcher_list = list(pitcher_count_dict.keys())
    print(f"Number of pitchers that make the cut: {len(pitcher_count_dict)}")

    # Initialize lists to track model performance
    accuracy_list = []
    naive_accuracy_list = []

    # Loop through the list of pitchers and train models
    for i, pitcher in enumerate(pitcher_list):
        # Start a timer for performance tracking
        start = dt.datetime.now()

        # Filter data for specific pitcher

        df_pitcher = train_data[train_data['pitcher'] == pitcher].copy()
        df_pitcher.drop('pitcher', axis=1, inplace=True)

        # Create a comprehensive list of unique pitch types
        pitch_types = list(set(list(df_pitcher['prev_pitch_type'].unique()) + 
                               list(df_pitcher['pitch_type'].unique())))
        
        # Create a continuous mapping of pitch types
        unique_pitch_types = sorted(set(df_pitcher['pitch_type']))
        pitch_map = {pitch_type: idx for idx, pitch_type in enumerate(unique_pitch_types)}
        pitch_unmap = {v: k for k, v in pitch_map.items()}

        # Map pitch types to consecutive integers
        df_pitcher['pitch_type'] = df_pitcher['pitch_type'].map(pitch_map)
        df_pitcher['prev_pitch_type'] = df_pitcher['prev_pitch_type'].map(pitch_map)

        # Verify mapping worked correctly
        pitch_type_counts = Counter(df_pitcher['pitch_type'])

        # Split features and target
        X = df_pitcher.drop('pitch_type', axis=1)
        y = df_pitcher['pitch_type']

        # Split into train and test sets
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

        # Hyperparameter grid
        xgb_params = {
            "max_depth": (2, 5, 20),
            "learning_rate": (0.01, 0.1, 0.4)
        }

        # Create XGBoost classifier with dynamic number of classes
        xgb_opt = GridSearchCV(
            XGBClassifier(
                objective='multi:softprob', 
                num_class=len(unique_pitch_types)  # Dynamic class count
            ), 
            param_grid=xgb_params, 
            cv=5, 
            scoring='accuracy', 
            verbose=0, 
            n_jobs=-1
        )

        # Fit the model
        xgb_opt.fit(X_train, y_train)
        y_pred = xgb_opt.predict(X_test)
        y_prob = xgb_opt.predict_proba(X_test)

        # Compute accuracies
        accuracy = round(accuracy_score(y_test, y_pred) * 100, 1)
        accuracy_list.append(accuracy)

        # Compute naive accuracy (most frequent class)
        naive_accuracy = round(max(pitch_type_counts.values()) / sum(pitch_type_counts.values()) * 100., 1)
        naive_accuracy_list.append(naive_accuracy)

        # Periodic logging for every 10th pitcher
        if i % 10 == 0:
            print(f"\nPitcher ID: {pitcher}")
            print(f"Pitch map: {pitch_map}")
            print(f"Pitch type counts: {dict(pitch_type_counts)}")
            print(f"Training data points: {X_train.shape[0]}")
            print(f"Testing data points: {X_test.shape[0]}")
            print(f"Best params: {xgb_opt.best_params_}")
            print(f"Training time: {dt.datetime.now()-start}")
            print(f"Naive accuracy: {naive_accuracy}")
            print(f"XGBoost accuracy: {accuracy}")

        # Prepare model output for pickling
        model_out = {
            "pitcherID": pitcher,
            "pitch_map": pitch_map,
            "pitch_unmap": pitch_unmap,
            "model": xgb_opt,
            "model_accuracy": accuracy
        }

        # Pickle the model
        fpath = "../data/pitcher_models/" + str(pitcher) + ".pkl"
        with open(fpath, 'wb') as fobj:
            pickle.dump(model_out, fobj)
           
    # Return accuracy lists for further analysis
    return accuracy_list, naive_accuracy_list

In [None]:
accuracy_list, naive_accuracy_list = train_xgb_models(data, pitch_count_cutoff=2000)


## 2. LSTM

In [101]:
df = pd.read_csv('../data/final/pitch_by_pitch_2023_lstm.csv')