In [1]:
import pandas as pd
import numpy as np
from xgboost import XGBRegressor
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error
from pybaseball import statcast, cache
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
import joblib
import math
import scipy.stats as stats
cache.enable()

In [2]:
# CSV from call to pybaseball('2021-04-01', '2022-10-31')
sc = pd.read_csv('../statcast 21-22.csv')

In [3]:
sc = sc[(sc['strikes'] < 3) & (sc['balls'] < 4)]

In [4]:
columns = ['player_name', 'p_throws', 'pitch_name', 'stand', 'delta_run_exp', 'release_speed', 'spin_axis', 'pfx_x', 'pfx_z', 'plate_x', 'plate_z', 'balls', 'strikes', 'release_pos_x', 'release_pos_z', 'release_extension', 'description', 'release_spin_rate', 'events']
df = sc[columns]
df = df[df['pitch_name'].isin(['Slider', 'Curveball', '4-Seam Fastball', 'Changeup', 'Sinker',
       'Cutter', 'Knuckle Curve', 'Split-Finger', 'Sweeper'])]

In [5]:
non_features = ['player_name', 'delta_run_exp', 'stand', 'pitch_name', 'p_throws', 'balls', 'strikes', 'release_speed', 'spin_axis', 'plate_x', 'plate_z', 'description', 'events', 'release_extension', 'release_pos_x', 'release_pos_z']
features = [column for column in columns if column not in non_features]

In [6]:
df = df.dropna(subset=features)

In [7]:
capps_constant = 8.2 - df.release_extension.mean()

In [8]:
average_extension = df['release_extension'].mean()
# Found this using the added Carter Capps perceived velo divided by his added extension over average
extension_constant = (3.5/capps_constant)
def calculate_perceived_velocity(row):
    adjusted_extension = row['release_extension'] - average_extension
    perceived_velocity = row['release_speed'] + (adjusted_extension * extension_constant)
    return perceived_velocity

df['perceived_velocity'] = df.apply(calculate_perceived_velocity, axis=1)

In [9]:
df['perceived_velocity'].min()

31.776406656335816

In [10]:
player_name_counts = df['player_name'].value_counts()
# Dropping all position players pitching from data
valid_player_names = player_name_counts[player_name_counts >= 50].index.tolist()
df = df[df['player_name'].isin(valid_player_names)]

In [11]:
df['perceived_velocity'].min()

56.040586989350196

In [12]:
df.loc[df['p_throws'] == 'L', 'pfx_x'] *= -1
df.loc[df['p_throws'] == 'L', 'spin_axis'] = 360 - df.loc[df['p_throws'] == 'L', 'spin_axis']
df.loc[df['p_throws'] == 'L', 'release_pos_x'] *= -1

In [13]:
arm_angles = np.degrees(np.arctan2(df['release_pos_z'], df['release_pos_x']))
df['arm_angle'] = arm_angles
features.append('arm_angle')

In [14]:
features.append('perceived_velocity')
features

['pfx_x', 'pfx_z', 'release_spin_rate', 'arm_angle', 'perceived_velocity']

In [15]:
df['spin_axis_rad'] = df['spin_axis'] * np.pi / 180

df['TSM'] = df['release_spin_rate'] * np.sin(df['spin_axis_rad'])
df['G'] = df['release_spin_rate'] * np.cos(df['spin_axis_rad'])

df['spin_efficiency'] = (df['TSM'] / df['release_spin_rate']) * 100

min_spin_efficiency = df['spin_efficiency'].min(skipna=True)
max_spin_efficiency = df['spin_efficiency'].max(skipna=True)

df['normalized_spin_efficiency'] = (df['spin_efficiency'] - min_spin_efficiency) / (max_spin_efficiency - min_spin_efficiency)

In [16]:
features.append('normalized_spin_efficiency')

In [17]:
df = df.dropna(subset=['delta_run_exp'])
df = df.dropna(subset=features)

In [18]:
# Pitch breakdowns as per Driveline's model
pitch_types = [['4-Seam Fastball', 'Sinker'], ['Curveball', 'Slider', 'Cutter', 'Knuckle Curve', 'Sweeper'], ['Split-Finger', 'Changeup', 'Forkball']]

for pitch_type in pitch_types:
    np.random.seed()
    param_dist = {
    'n_estimators': stats.randint(100, 1000),
    'max_depth': stats.randint(1, 10),
    'learning_rate': stats.uniform(0.01, 0.3),
    'subsample': stats.uniform(0.6, 0.4),
    'colsample_bytree': stats.uniform(0.6, 0.4),
    }
    pitch_df = df[df['pitch_name'].isin(pitch_type)]
    
    X = pitch_df[features]
    y = pitch_df['delta_run_exp']
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=np.random.seed())
    
    model = XGBRegressor()
    random_search = RandomizedSearchCV(estimator=model, param_distributions=param_dist, n_iter=5, scoring='neg_mean_squared_error', cv=3, random_state=np.random.seed(), n_jobs=-1)
    random_search.fit(X_train, y_train)
    
    best_model = random_search.best_estimator_
    
    y_pred = best_model.predict(X_test)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    print(f"Best Parameters: {random_search.best_params_}")
    print(f"RMSE on Test Set: {rmse:.4f}")
    joblib.dump(best_model, f'models/{pitch_type}_optimized_random_stuff.joblib')

Best Parameters: {'colsample_bytree': 0.9429947588142006, 'learning_rate': 0.08335765688521098, 'max_depth': 9, 'n_estimators': 222, 'subsample': 0.757479377198551}
RMSE on Test Set: 0.2434
Best Parameters: {'colsample_bytree': 0.7284609469753212, 'learning_rate': 0.2148024546393382, 'max_depth': 1, 'n_estimators': 200, 'subsample': 0.9635651868157075}
RMSE on Test Set: 0.2360
Best Parameters: {'colsample_bytree': 0.7377282966171337, 'learning_rate': 0.24567778670920615, 'max_depth': 1, 'n_estimators': 100, 'subsample': 0.7041885320250866}
RMSE on Test Set: 0.2446


In [19]:
def get_pitch_data(df, pitch_name, players):
    velo_map = {player: math.nan for player in players}
    h_movement_map = {player: math.nan for player in players}
    v_movement_map = {player: math.nan for player in players}

    grouped_data = df.loc[(df['pitch_name'] == pitch_name) & (df['player_name'].isin(players))].groupby('player_name').agg({
        'release_speed': 'median',
        'pfx_x': 'median',
        'pfx_z': 'median'
    }).reset_index()

    for _, row in grouped_data.iterrows():
        player_name = row['player_name']
        velo_map[player_name] = row['release_speed']
        h_movement_map[player_name] = row['pfx_x']
        v_movement_map[player_name] = row['pfx_z']

    return velo_map, h_movement_map, v_movement_map

all_players = df['player_name'].unique()

fastball_velo_map, fastball_h_movement_map, fastball_v_movement_map = get_pitch_data(df, '4-Seam Fastball', all_players)
nan_fastball = [key for key in fastball_velo_map if math.isnan(fastball_velo_map[key])]

sinker_velo_map, sinker_h_movement_map, sinker_v_movement_map = get_pitch_data(df, 'Sinker', nan_fastball)
nan_sink = [key for key in sinker_velo_map if math.isnan(sinker_velo_map[key])]

cutter_velo_map, cutter_h_movement_map, cutter_v_movement_map = get_pitch_data(df, 'Cutter', nan_sink)

In [20]:
def calculate_velo_dif(row):
    if row['player_name'] in cutter_velo_map:
        return row['release_speed'] - cutter_velo_map[row['player_name']]
    elif row['player_name'] in sinker_velo_map:
        return row['release_speed'] - sinker_velo_map[row['player_name']]
    return row['release_speed'] - fastball_velo_map[row['player_name']]

def calculate_h_movement_dif(row):
    if row['player_name'] in cutter_velo_map:
        return row['pfx_x'] - cutter_h_movement_map[row['player_name']]
    elif row['player_name'] in sinker_velo_map:
        return row['pfx_x'] - sinker_velo_map[row['player_name']]
    return row['pfx_x'] - fastball_h_movement_map[row['player_name']]

def calculate_v_movement_dif(row):
    if row['player_name'] in cutter_velo_map:
        return row['pfx_z'] - cutter_v_movement_map[row['player_name']]
    elif row['player_name'] in sinker_velo_map:
        return row['pfx_z'] - sinker_velo_map[row['player_name']]
    return row['pfx_z'] - fastball_v_movement_map[row['player_name']]

df['velo_dif'] = df.apply(lambda row: calculate_velo_dif(row), axis=1)
df['h_movement_dif'] = df.apply(lambda row: calculate_h_movement_dif(row), axis=1)
df['v_movement_dif'] = df.apply(lambda row: calculate_v_movement_dif(row), axis=1)

In [21]:
new_features = features.copy()

In [22]:
new_features += ['velo_dif', 'h_movement_dif', 'v_movement_dif']

In [23]:
new_features

['pfx_x',
 'pfx_z',
 'release_spin_rate',
 'arm_angle',
 'perceived_velocity',
 'normalized_spin_efficiency',
 'velo_dif',
 'h_movement_dif',
 'v_movement_dif']

In [24]:
# Pitch breakdowns as per Driveline's model
pitch_types = [['Curveball', 'Slider', 'Cutter', 'Knuckle Curve', 'Sweeper'], ['Split-Finger', 'Changeup', 'Forkball']]

for pitch_type in pitch_types:
    np.random.seed()
    param_dist = {
    'n_estimators': stats.randint(100, 1000),
    'max_depth': stats.randint(1, 10),
    'learning_rate': stats.uniform(0.01, 0.3),
    'subsample': stats.uniform(0.6, 0.4),
    'colsample_bytree': stats.uniform(0.6, 0.4),
    }
    pitch_df = df[df['pitch_name'].isin(pitch_type)]
    
    X = pitch_df[new_features]
    y = pitch_df['delta_run_exp']
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=np.random.seed())
    
    model = XGBRegressor()
    random_search = RandomizedSearchCV(estimator=model, param_distributions=param_dist, n_iter=5, scoring='neg_mean_squared_error', cv=3, random_state=np.random.seed(), n_jobs=-1)
    random_search.fit(X_train, y_train)
    
    best_model = random_search.best_estimator_
    
    y_pred = best_model.predict(X_test)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    print(f"Best Parameters: {random_search.best_params_}")
    print(f"RMSE on Test Set: {rmse:.4f}")
    joblib.dump(best_model, f'models/{pitch_type}new_stuff.joblib')

Best Parameters: {'colsample_bytree': 0.6365022643696724, 'learning_rate': 0.08692947913967677, 'max_depth': 5, 'n_estimators': 333, 'subsample': 0.6472663146756419}
RMSE on Test Set: 0.2409
Best Parameters: {'colsample_bytree': 0.8167464028858771, 'learning_rate': 0.04644799341689158, 'max_depth': 5, 'n_estimators': 242, 'subsample': 0.7717621912165686}
RMSE on Test Set: 0.2452


In [25]:
features

['pfx_x',
 'pfx_z',
 'release_spin_rate',
 'arm_angle',
 'perceived_velocity',
 'normalized_spin_efficiency']