In [1]:
import pandas as pd
import numpy as np
import sklearn.preprocessing as skl_preprocessing
from sklearn import metrics
from sklearn.ensemble import RandomForestClassifier
from statistics import mode
import random
from math import log2
import pickle
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
from sklearn.model_selection import GridSearchCV

## Data Preprocessing

In [2]:
def preprocess_data(data):
    relevant_data = data[["release_speed", "release_pos_x", "release_pos_z", "release_pos_y", "release_spin_rate", "vx0", "vy0", "vz0", "ax", "ay", "az", "pfx_x", "pfx_z", "spin_axis", "pitch_number", "zone", "p_throws", "balls", "strikes", "pitch_type"]]
    
    relevant_data = relevant_data.dropna()
    relevant_data['p_throws'] = (relevant_data['p_throws'] == 'R').astype(int)
    
    return relevant_data

In [3]:
raw_input_data = pd.read_csv('data/Statcast_2021.csv')
df = preprocess_data(raw_input_data)
df.head()

Unnamed: 0,release_speed,release_pos_x,release_pos_z,release_pos_y,release_spin_rate,vx0,vy0,vz0,ax,ay,az,pfx_x,pfx_z,spin_axis,pitch_number,zone,p_throws,balls,strikes,pitch_type
0,92.3,1.4,6.8,54.03,2330.0,-6.833043,-134.166485,-7.361843,9.708393,26.562803,-14.083224,0.69,1.38,148.0,4,1.0,0,1,2,FF
1,80.6,1.6,6.64,54.15,2254.0,-3.700232,-117.430885,-3.266842,-6.531123,19.79339,-27.369114,-0.77,0.48,315.0,3,4.0,0,1,1,SL
2,75.5,1.46,6.88,54.34,1940.0,-1.977183,-109.901781,-1.155694,-4.872924,20.602334,-36.262184,-0.65,-0.51,328.0,2,5.0,0,1,0,CU
3,75.0,1.53,6.83,54.61,2017.0,2.37583,-109.20583,2.277617,-5.902656,19.427562,-38.284747,-0.69,-0.69,330.0,1,12.0,0,0,0,CU
4,91.2,1.49,6.66,54.15,2281.0,-5.868477,-132.500539,-6.486796,8.700586,30.11769,-15.941174,0.63,1.28,143.0,2,4.0,0,1,0,FF


## Data Visualizations

In [None]:
df.hist(column=["release_speed", "release_spin_rate"], layout=(2, 1), figsize=[15, 15], grid=False, sharey=True)

In [None]:
combinations = [('release_speed', 'release_spin_rate'), ('release_spin_rate', 'spin_axis'), ('vy0', 'ay'), ('pfx_x', 'pfx_z')]
for combo1, combo2 in combinations:
    sns.relplot(data=df.iloc[:1000, :], x=combo1, y=combo2, hue='pitch_type')
plt.show()

## Logistic Regression Classifier

In [None]:
df_100 = df.head(100)

# loading x and y values
X = df.iloc[:, :-1].values.astype('object')
y = df["pitch_type"]

# Creating training and testing split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25)

# Implementing, fitting, and predicting of Logistic Regression Model
lr_model = LogisticRegression(solver='lbfgs', max_iter=10000)
lr_model.fit(X_train, y_train)
y_pred = np.array((lr_model.predict(X_test)))

solvers = ['newton-cg', 'lbfgs', 'liblinear']
penalty = ['l1','l2']
c_values = [100, 10, 1.0, 0.1, 0.01]
param_grid = dict(solver=solvers,penalty=penalty,C=c_values)

# Hyperparam tuning using GridSearch
grid_search = GridSearchCV(lr_model, param_grid, cv=5)

# Fit the GridSearchCV object to the data
grid_search.fit(X, y)

# Print the best parameters and the corresponding accuracy score
print("Best parameters: ", grid_search.best_params_)
print("Best accuracy score: ", grid_search.best_score_)

# Metrics of Logistic Regression model
print("Overall Score:")
print(lr_model.score(X_test, y_test))
print(f1_score(y_test, y_pred, average=None))
print(precision_score(y_test, y_pred, average=None))
print(recall_score(y_test, y_pred, average=None))

## Random Forest Classifier

In [None]:
model = RandomForestClassifier(n_estimators=10, criterion="entropy", max_depth=10)
ground_truth = df.iloc[:, -1]
classes = pd.unique(ground_truth)

scores = {}
k = 10
for i in range(k):
    test_data = df.iloc[i * len(df) // k: (i + 1) * len(df) // k]
    train_data = pd.concat([df.iloc[0:i * len(df) // k], df.iloc[(i + 1) * len(df) // k:]])

    gt_data = []
    for i, item in train_data.iloc[:, -1].items():
        gt_data.append(np.where(classes == item)[0][0])

    model.fit(train_data.iloc[:, :-1], gt_data)

    predictions = model.predict(test_data.drop(['pitch_type'], axis=1))

    score = metrics.classification_report(test_data['pitch_type'], [classes[p] for p in predictions], digits=4, output_dict=True)
    average = score["weighted avg"]
    scores[i] = average


    print(f"Scores for Fold {i}: {average}")

mean_validated_scores = Counter()
for score in scores.values():
    mean_validated_scores["precision"] += score["precision"]
    mean_validated_scores["recall"] += score["recall"]
    mean_validated_scores["f1-score"] += score["f1-score"]
    
for key in mean_validated_scores.keys():
    mean_validated_scores[key] /= len(scores)
    
print(mean_validated_scores)

gt_data = []
for i, item in data.iloc[:, -1].items():
    gt_data.append(np.where(classes == item)[0][0])
model.fit(data.iloc[:, :-1], gt_data)

## KNN Classifier

In [None]:
# Define features and target

X = df.iloc[:, :-1].values.astype('object')
y = df["pitch_type"]

# Set up the K-fold cross-validation object
kfold = KFold(n_splits=10, shuffle=True, random_state=42)

# Create an empty list to store the accuracies for each fold
accuracies = []

# Loop over each fold
for train_index, test_index in kfold.split(X):
    # Split the data into training and test sets for this fold
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = y.iloc[train_index], y.iloc[test_index]

    # Create a KNN classifier with k=5
    knn = KNeighborsClassifier(n_neighbors=5)

    # Train the classifier on the training set
    knn.fit(X_train, y_train)

    # Make predictions on the test set
    y_pred = knn.predict(X_test)

    # Calculate the accuracy of the classifier for this fold
    accuracy = accuracy_score(y_test, y_pred)
    accuracies.append(accuracy)

# Calculate the average accuracy over all folds
avg_accuracy = np.mean(accuracies)
print('Average accuracy:', avg_accuracy)

In [None]:
# Visualize the accuracies for each fold
plt.plot(accuracies)
plt.xlabel('Fold')
plt.ylabel('Accuracy')
plt.title('K-Fold Cross-Validation Accuracy')
plt.show()

In [None]:
# Create a confusion matrix
cm = confusion_matrix(y_test, y_pred)

# Plot the confusion matrix
sns.heatmap(cm, annot=True, cmap='Blues', fmt='g')
plt.title('Confusion Matrix')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.show()

## K-Means Classifier

In [None]:
class labeled_KMeans:
    def __init__(self):
        self.model = None
    
    def train(self, X, n_clusters=3, max_k=15, plot_elbow=False):
        # Standardize the data
        scaler = skl_preprocessing.StandardScaler()
        X_scaled = scaler.fit_transform(X)
        
        if plot_elbow:
            # Find the optimal number of clusters using elbow method
            wcss = []
            for k in range(1, max_k+1):
                kmeans = KMeans(n_clusters=k)
                kmeans.fit(X_scaled)
                wcss.append(kmeans.inertia_)
            plt.plot(range(1, max_k+1), wcss)
            plt.title('Elbow Method')
            plt.xlabel('Number of Clusters')
            plt.ylabel('WCSS')
            plt.show()
        
        # Fit the KMeans model to the data
        self.model = KMeans(n_clusters=n_clusters)
        self.model.fit(X_scaled)
        
    def predict(self, X):
        # Standardize the data and predict the labels
        scaler = skl_preprocessing.StandardScaler()
        X_scaled = scaler.fit_transform(X)
        labels = self.model.predict(X_scaled)
        return labels
    
    def score(self, X, y):
        # Standardize the data and compute the accuracy score
        scaler = skl_preprocessing.StandardScaler()
        X_scaled = scaler.fit_transform(X)
        labels = self.model.predict(X_scaled)
        encoder = skl_preprocessing.LabelEncoder()
        y_encoded = encoder.fit_transform(y)
        accuracy = accuracy_score(y_encoded, labels)
        f1 = f1_score(y_encoded, labels)
        precision = precision_score(y_encoded, labels)
        recall = recall_score(y_encoded, labels)
        return accuracy,f1, precision, recall

def print_full(df, labels, ch='CH', y='pitch_type'):
    df['xpitch_type'] = labels

    # Load and preprocess data
    xdata = df[[y, 'xpitch_type','release_speed']]

    # Create pivot table
    pivot_table = pd.pivot_table(xdata, columns=y, index='xpitch_type', aggfunc='count', fill_value=0)

    # Display full dataframe
    pd.set_option('display.max_rows', len(X))
    display(pivot_table.sort_values(('release_speed', ch),ascending=False))
    pd.reset_option('display.max_rows')

In [None]:
df_copy = df.copy()

# Extract the features (drop the label column)
X = df_copy.drop("pitch_type", axis=1)

# Create a SampleModel object and train it on the training data
model = labeled_KMeans()
model.train(X, n_clusters=15)

# Use the trained model to predict labels for the testing data
labels = model.predict(X)

print_full(df_copy, labels)

In [None]:
# Create a left and right handed pitcher df
left = df.loc[df['p_throws'] == 0]
right = df.loc[df['p_throws'] == 1]

# Extract the features (drop the label column)
X = left.iloc[:,:-2]

# Create a SampleModel object and train it on the training data
model = labeled_KMeans()
model.train(X, n_clusters=13)

# Use the trained model to predict labels for the testing data
labels = model.predict(X)

print_full(left, labels)

In [None]:
# Extract the features (drop the label column)
X = right.iloc[:,:-2]

# Create a SampleModel object and train it on the training data
model = labeled_KMeans()
model.train(X, n_clusters=13)

# Use the trained model to predict labels for the testing data
labels = model.predict(X)

print_full(right, labels)
##
def label_pitch_group(pitch_type):
    if pitch_type in ['FA', 'FT', 'FC', 'SI', 'FS']:
        return 'Fastball'
    elif pitch_type in ['SL', 'ST', 'SV']:
        return 'Slider'
    elif pitch_type in ['CU', 'KC']:
        return 'Curveball'
    elif pitch_type == 'CH':
        return 'Changeup'
    else:
        return float('nan')
    
left['Pitch Group'] = left['pitch_type'].apply(label_pitch_group)
    
# Extract the features (drop the label column)
X = left.iloc[:,:-3]

# Create a SampleModel object and train it on the training data
model = labeled_KMeans()
model.train(X, n_clusters=4)

# Use the trained model to predict labels for the testing data
labels = model.predict(X)

print_full(left, labels, y='Pitch Group',ch='Changeup')

In [None]:
right['Pitch Group'] = right['pitch_type'].apply(label_pitch_group)
    
# Extract the features (drop the label column)
X = right.iloc[:,:-3]

# Create a SampleModel object and train it on the training data
model = labeled_KMeans()
model.train(X, n_clusters=4)

# Use the trained model to predict labels for the testing data
labels = model.predict(X)

print_full(right, labels, y='Pitch Group',ch='Changeup')

In [None]:
# Extract the features (drop the label column)
X = left.iloc[:,:-3]

# Create a SampleModel object and train it on the training data
model = labeled_KMeans()
# Find best K
model.train(X, n_clusters=6, plot_elbow=True)

# Predict on optimized K
labels = model.predict(X)
left['xpitch_type'] = labels

# Group by clusters and see pitch characteristics
right.groupby('xpitch_type').mean()[['release_speed','release_spin_rate','pfx_x','pfx_z','balls','strikes']]

## Results

- Random Forest: 84% 
- Logistic Regression: ~80%
- KNN: 84%
- K-Means: 73%

# Best Model for Pitch Prediction: <u>Random Forest</u>