<a href="https://colab.research.google.com/github/michael-rowland/Pitch-Predictions/blob/master/Pitch-Predictions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## NOTES

---

Features:
- Shifting?
- Home/Away

Per pitcher models

Exponential Smoothing

Permutation Importances

## SETUP

---



In [0]:
%%capture
import sys

# If you're on Colab:
if 'google.colab' in sys.modules:
    !pip install category_encoders==2.*
    !pip install pybaseball

# If you're working locally:
else:
    DATA_PATH = '../data/'

In [0]:
# import os
import numpy as np
import pandas as pd
import category_encoders as ce
from pybaseball import statcast_pitcher

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

## DATA GATHERING

---



In [0]:
%%capture
raw_data = statcast_pitcher('2015-01-01', '2020-01-01', 621244)

## DATA CLEANING

---



In [0]:
def reorder(df):
  '''
  reorders pitches in ascending order
  '''
  df = df.copy()

  df = df.reindex(index=df.index[::-1])
  df = df.reset_index(drop=True)

  return df

In [0]:
def pitch_cleaning(df):
  '''
  cleans target ('pitch_type'), dropping NaN's (bad data) and pitch out's
  '''
  df = df.copy()

  df = df.dropna(subset=['pitch_type'])
  
  df['pitch_type'] = df['pitch_type'].str.lower()
  remove_pitches = ['po']
  df = df[~df['pitch_type'].isin(remove_pitches)]
  df = df.drop('pitch_name', axis=1)

  return df

In [0]:
def historical_cleaning(df):
  '''
  shifts relevant historical data to prevent data leakage
  '''
  df = df.copy()

  df['previous_pitch'] = df['pitch_type'].shift(1) # target, should probably be dropped initially
  df['previous_strike'] = df['type'].shift(1) == 'S'
  df['previous_zone'] = df['zone'].shift(1) # this may need to be encoded somehow
  df = df.drop(['type', 'zone'], axis=1)

  return df

## DATA PROCESSING

---


In [0]:
def total_pitches(df):
  '''
  creates column of total pitches prior to current/target pitch
  '''
  df = df.copy()

  df['pitch_total'] = df.groupby('game_date').cumcount(ascending=True)

  return df

In [0]:
def score_margin(df):
  '''
  creates column of score margin for the pitcher's team
  '''
  df = df.copy()

  df['score_margin'] = df['fld_score'] - df['bat_score']
  df = df.drop(['fld_score', 'bat_score'], axis=1)

  return df

In [0]:
def pitcher_advantage(df):
  '''
  creates column of pitcher advantage based on the ball/strike count
  '''
  df = df.copy()

  advantages = {'0-0':'neutral', '0-1':'ahead', '0-2':'ahead', '1-0':'neutral', 
                '1-1':'neutral', '1-2':'ahead', '2-0':'behind', '2-1':'neutral',
                '2-2':'ahead', '3-0':'behind', '3-1':'behind', '3-2':'neutral'}

  df['count'] = df['balls'].astype(str) + '-' + df['strikes'].astype(str)
  df['pitcher_advantage'] = df['count'].map(advantages)
  df = df.drop('count', axis=1)

  return df

In [0]:
def baserunners_mapping(df):
  '''
  maps baserunner to boolean values and creates column if there is a baserunner 
  on any base
  '''
  df = df.copy()

  df['on_1b'] = df['on_1b'].notnull()
  df['on_2b'] = df['on_2b'].notnull()
  df['on_3b'] = df['on_3b'].notnull()
  
  df['baserunner'] = df['on_1b'] | df['on_2b'] | df['on_3b']

  return df

In [0]:
def at_bat_counter(df):
  '''
  creates column of number of batters faced
  '''
  df = df.copy()

  df['at_bat_count'] = df.groupby('at_bat_number').ngroup(ascending=True)+1
  df = df.drop('at_bat_number', axis=1)

  return df

In [0]:
def pitch_encoding(df):
  '''
  creates columns of previous pitch for each pitch type using OneHotEncoder
  '''
  df = df.copy()

  encoder = ce.OneHotEncoder(use_cat_names=True)
  encoded = encoder.fit_transform(df['previous_pitch'])
  df = pd.merge(df, encoded, how='outer', on=df.index)
  df = df.drop(['key_0', 'previous_pitch_nan'], axis=1)
  
  return df

In [0]:
def pitch_counter(df):
  '''
  creates columuns of cumulative count of each specific pitch type
  '''
  df = df.copy()

  pitches = [col for col in df if col.startswith('previous_pitch_')]
  
  for pitch in pitches:
    df[f'count_{pitch[-2:]}'] = (df[pitch] == 1).cumsum()

  return df

In [0]:
def pitch_tendency(df, window):
  '''
  creates columns of pitch tendency (mean) over last n = window pitches for 
  each pitch type
  * This is incomplete (minimum viable product) as averages include first row. 
  As a result, the sum of all tendencies does not equal 1 while row < window.
  '''
  df = df.copy()

  pitches = [col for col in df if col.startswith('previous_pitch_')]
  
  for pitch in pitches:
    df[f'{pitch[-2:]}_tendency_{window}'] = df[pitch].rolling(
        window, min_periods=1).mean()
    
  return df

In [0]:
def strike_encoding(df):
  '''
  creates columns of strike pitch for each pitch type using OneHotEncoder
  '''
  df = df.copy()
  
  s = df['previous_strike'] == True
  df['strike_pitch'] = s.mask(s, df['previous_pitch'])
  
  encoder = ce.OneHotEncoder(use_cat_names=True)
  encoded = encoder.fit_transform(df['strike_pitch'])
  df = pd.merge(df, encoded, how='outer', on=df.index)
  df = df.drop(['key_0', 'strike_pitch_False', 'strike_pitch'], axis=1)
  
  return df

In [0]:
def strike_percentage(df, percentage=True):
  '''
  creates columns of strike percentage (or raw count) for each pitch type
  * This could be further engineered to calculate percentage of total number of 
  strikes thrown rather than number of total pitches
  '''
  df = df.copy()

  pitches = [col for col in df if col.startswith('strike_pitch_')]

  for i in pitches:
    if percentage:
      df[f'{i[-2:]}_cumulative_strikes'] = (df[f'strike_pitch_{i[-2:]}'] == 1).cumsum() / df['pitch_total']
    else:
      df[f'{i[-2:]}_cumulative_strikes'] = (df[f'strike_pitch_{i[-2:]}'] == 1).cumsum()
      
  return df

In [0]:
def strike_counter(df, window):
  '''
  creates columns of number of previous n (window) strikes thrown for each pitch 
  type
  '''
  df = df.copy()
  
  pitches = [col for col in df if col.startswith('strike_pitch_')]
  
  for i in pitches:
    target = f'{i[-2:]}_strike_tendency_{window}'
    # df[target] = df[df['strike_pitch'] != False][i].rolling(window, 
    #                                                         min_periods=1).sum()
    df[target] = df[df[i] != 0][i].rolling(window, min_periods=1).sum()
    df[target] = df[target].fillna(method='ffill')
    df[target] = df[target].fillna(0)

  return df

In [0]:
def wrangle_game(df):
  '''
  this function does data cleaning and feature engineering on a per game basis
  '''
  df = df.copy()

  # CLEANING FUNCTIONS
  df = reorder(df)
  df = pitch_cleaning(df)
  if df.empty:
    return
  df = historical_cleaning(df)

  # FEATURE ENGINEERING FUNCTIONS
  df = total_pitches(df)
  df = score_margin(df)
  df = pitcher_advantage(df)
  df = baserunners_mapping(df)
  df = at_bat_counter(df)
  df = pitch_encoding(df)
  df = pitch_counter(df)
  
  windows = [5, 10, 20]
  for window in windows:
    df = pitch_tendency(df, window) 
  df = strike_encoding(df)
  df = strike_percentage(df, percentage=True)
  for window in windows:
    df = strike_counter(df, window)
  
  return df



---



In [0]:
# sample = 'https://raw.githubusercontent.com/michael-rowland/Pitch-Predictions/master/sample.csv'
# df = pd.read_csv(sample)
# df = df[df['game_date'] == '2018-09-28']

target = 'pitch_type'
features = ['pitch_name', 'pitch_type', 'game_date', 'stand', 'balls', 
            'strikes', 'on_3b', 'on_2b', 'on_1b', 'outs_when_up', 'inning', 
            'at_bat_number', 'bat_score', 'fld_score', 'type', 'zone']
# df = df[features]

In [0]:
df = pd.DataFrame()
# nulls = '2018-04-18'

games = raw_data['game_date'].unique().tolist()
for game in games:
  game_data = wrangle_game(raw_data[raw_data['game_date'] == game][features])
  df = df.append(game_data, ignore_index=True, sort=False)

## MODEL

---



In [0]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline

In [0]:
backup = df

In [220]:
train, test = train_test_split(df, train_size=0.8, random_state=42)
train, val = train_test_split(train, train_size=0.8, random_state=42)
train.shape, val.shape, test.shape

((6266, 58), (1567, 58), (1959, 58))

In [0]:
target = 'pitch_type'
features = df.columns.drop([target, 'game_date'])

X_train = train[features]
y_train = train[target]
X_val = val[features]
y_val = val[target]
X_test = test[features]
y_test = test[target]

In [222]:
processor = make_pipeline(
    ce.OrdinalEncoder(),
    SimpleImputer(strategy='median')
)
X_train_processed = processor.fit_transform(X_train)
X_val_processed = processor.transform(X_val)

model = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)
model.fit(X_train_processed, y_train)

print(f'Train accuracy: {model.score(X_train_processed, y_train):.5f}')
print(f'Validation accuracy: {model.score(X_val_processed, y_val):.5f}')

Train accuracy: 0.99697
Validation accuracy: 0.39566


In [0]:
def log_model(txt_file='model_log.txt'):
    with open(txt_file, 'a') as file:
        file.write(f'Validation accuracy: {model.score(X_val_processed, y_val):.5f}\n')
        file.write(f'Features: {X_train.columns}\n')
        file.write(f'{model.get_params}\n')
        file.write('\n')
        file.close()

## MISC

---



In [0]:
def pitch_map(df):
  '''
  generates dictionary mapping 'pitch_type' to 'pitch_name'
  '''
  df = df.copy()

  df = df.dropna(subset=['pitch_type', 'pitch_name'])
  pitch_types = df['pitch_name'].unique().tolist()
  pitch_types_abv = [i.lower() for i in df['pitch_type'].unique().tolist()]
  
  return dict(zip(pitch_types_abv, pitch_types))