# Imports

In [None]:
import numpy as np
import pandas as pd
import glob, os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import sklearn
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# Functions

In [None]:
def reformat_data(df):
    #Joining clock seconds and minutes
    df['TimeLeft'] = (df['Clock Minutes']*60+df['Clock Seconds'])
    df = df.drop('Clock Minutes', axis = 1)
    df = df.drop('Clock Seconds', axis = 1)
    
    #Combining offense and defense score information
    df['dScore'] = df['DefenseScore']-df['OffenseScore']
    df = df.drop('DefenseScore', axis = 1)
    df = df.drop('OffenseScore', axis = 1)
    
    #Consolidating play types
    df.replace(to_replace='Pass Incomplete', value='Pass', inplace=True)
    df.replace(to_replace='Pass Reception', value='Pass', inplace=True)
    df.replace(to_replace='Passing Touchdown', value='Pass', inplace=True)
    df.replace(to_replace='Sack', value='Pass', inplace=True)
    
    df.replace(to_replace='Field Goal Good', value='FG', inplace=True)
    df.replace(to_replace='Field Goal Missed', value='FG', inplace=True)

    df.replace(to_replace='Rushing Touchdown', value='Rush', inplace=True)
    
    Pass = np.array((df['PlayType'] == 'Pass').astype(int))
    Rush = np.array((df['PlayType'] == 'Rush').astype(int))
    Punt = np.array((df['PlayType'] == 'Punt').astype(int))
    FG = np.array((df['PlayType'] == 'FG').astype(int))

    #Adding in binary variable for later addition of previous play type
    df['Pass'] = Pass

    #Discarding non plays (penalties, timeouts, end of period)
    keep_these = np.maximum(Pass, Rush)
    keep_these = np.maximum(keep_these, Punt)
    keep_these = np.maximum(keep_these, FG)
    df = df.loc[np.where(keep_these == 1)[0]]
    
    return df


    
def grab_previous_plays(df):
    #Loop to grab information on previous plays and cumulative scoring
    new_arr = []
    pass_prop_list = []
    run_count = 0
    pass_count = 0
    play_count = 0
    for i in range(max(df['DriveNumber'])+1):
        drive_arr= df[df['DriveNumber'] == i].to_numpy()
        if drive_arr.shape[0] != 0:
            if drive_arr.shape[0] == 1:
                if drive_arr[0][8] == 'Pass':
                    pass_count += 1
                elif drive_arr[0][8] == 'Rush':
                    run_count += 1
                if run_count+pass_count > 0:
                    pass_prop = pass_count/(run_count+pass_count)
                else: 
                    pass_prop = 0
                play_count +=1
                pass_prop_list.append(pass_prop)
                if play_count > 0:
                    new_arr.append(np.hstack((drive_arr[0],  np.array([0,0,0.5,0, 0.5, pass_prop_list[play_count-1]]))))
                else:
                    new_arr.append(np.hstack((drive_arr[0],  np.array([0,0,0.5,0, 0.5, 0.5]))))

            else:
                drive_arr = drive_arr[drive_arr[:,1].argsort()]
                new_drive_arr = []
                for j in range(len(drive_arr)):
                    
                    if drive_arr[j][8] == 'Pass':
                        pass_count += 1
                    elif drive_arr[j][8] == 'Rush':
                        run_count += 1

                    if run_count+pass_count > 0:
                        pass_prop = pass_count/(run_count+pass_count)
                    else: 
                        pass_prop = 0
                    pass_prop_list.append(pass_prop)
                    play_count+=1
                    if j == 0:
                        if play_count > 0:
                            new_drive_arr.append(np.hstack((drive_arr[j], 
                                    np.array([0,0,0.5,0, 0.5, pass_prop_list[play_count-1]]))))
                        else:
                            new_drive_arr.append(np.hstack((drive_arr[j], 
                                    np.array([0,0,0.5,0, 0.5, 0.5]))))
                        
                    else:
        
                        prev_arr = new_drive_arr[j-1]
                        current_play = drive_arr[j][8]
                        
                        if prev_arr[11] == 1:
                            pos_3 = (prev_arr[14]*prev_arr[15]) + 1
                            #prev play was pass
                            if current_play == 'Pass':
                                #current_play is a pass
                                new_drive_arr.append(np.hstack((drive_arr[j],
                                        np.array([prev_arr[5],prev_arr[7],pos_3, 1, prev_arr[11], pass_prop_list[play_count-1]]))))
                            else:
                                new_drive_arr.append(np.hstack((drive_arr[j], 
                                        np.array([prev_arr[5],prev_arr[7],pos_3,0, prev_arr[11], pass_prop_list[play_count-1]]))))
                        elif prev_arr[11] == 0:
                            pos_3 = (prev_arr[14]*prev_arr[15]) - 1
                            #prev play was rush
                            if current_play == 'Rush':
                                #current_play is a rush
                                new_drive_arr.append(np.hstack((drive_arr[j], 
                                    np.array([prev_arr[5],prev_arr[7],pos_3, 1, prev_arr[11], pass_prop_list[play_count-1]]))))
                            else:
                                new_drive_arr.append(np.hstack((drive_arr[j], 
                                    np.array([prev_arr[5],prev_arr[7],pos_3, 0, prev_arr[11], pass_prop_list[play_count-1]]))))
                new_arr.append(np.vstack(new_drive_arr))    

    #discard data that is not helpful
    full_arr = np.vstack(new_arr)
    full_arr = np.delete(full_arr, np.array([0, 1, 2, 11, 15]), axis = 1)
    
    return full_arr

class Data(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y
        self.len = self.X.shape[0]
       
    def __getitem__(self, index):
        return self.X[index], self.y[index]
   
    def __len__(self):
        return self.len

# Grabbing and formatting the data

In [None]:
# Path should be a folder containing all team data labeled as team_game_year
Folder_Path = ""

csv_files = sorted(glob.glob(os.path.join(Folder_Path, "*.csv")))
df_dict = {}
for f in csv_files:
    year = int(f.split('\\')[-1].split('_')[-1].split('.')[0])
    team = int(f.split('\\')[-1].split('_')[0])
    game = int(f.split('\\')[-1].split('_')[1])
    df = pd.read_csv(f, sep = ',')
    df_dict[game, team] = df[['DriveNumber', 'PlayNumber', 'OffenseScore', 'DefenseScore', 'Home', 'Period', 'Clock Minutes', 'Clock Seconds', 
 'YardsToGoal', 'Down', 'Distance', 'YardsGained', 'PlayType']]

# The following list of pass % was manually input for each team in 2023 from ESPN datqa. The ith position corresponding to the ith team. 
percent_pass = [0.42, 0.57, 0.49, 0.51, 0.46, 0.36, 0.46, 0.48, 0.44, 0.56, 0.51, 0.42, 0.40]

#reformatting and combining the data into an array
all_data = []
for key in df_dict.keys():
    df = reformat_data(df_dict[key])
    data_arr = grab_previous_plays(df)
    data_arr = np.vstack((data_arr.T, np.full(len(data_arr), percent_pass[key[1]-1]))).T
    all_data.append(data_arr)
game_data = np.vstack(all_data)

#play type incoder tranforming each play into an array [0,0,0,0] with a 1 in the position corresponding to the play call
encoder = sklearn.preprocessing.OneHotEncoder(sparse_output = False).fit(game_data[:,5].astype(str).reshape(-1,1))
encoded_output = encoder.transform(game_data[:,5].astype(str).reshape(-1,1))

#feature selection of data that does not inform post-snap information
game_data = game_data[:,[0,1,2,3,7,8,9,10,11,12,13]]

#The data is scaled between 0 and 1
scaled_data = []
for i in range(len(game_data.T)):
    max_val = max(game_data[:,i])
    min_val = min(game_data[:,i])
    scaled = ((game_data[:,i]-min_val)/(max_val-min_val))
    scaled_data.append(scaled)
game_data = np.vstack(scaled_data).T.astype('float32')


# Loading the data for training

In [None]:
#Split up data into test and training
play_type = encoded_output
Pre_snap_data = game_data
data_train, data_test, play_type_train, play_type_test = train_test_split(Pre_snap_data, play_type, train_size=0.8, shuffle=True)

#Convert the data to PyTorch Tensors
data_train = torch.tensor(data_train, dtype=torch.float32)
play_type_train = torch.tensor(play_type_train, dtype=torch.float32)
data_test = torch.tensor(data_test, dtype=torch.float32)
play_type_test = torch.tensor(play_type_test, dtype=torch.float32)

#Setting batch size and putting the data into the dataloader
batch_size = 32

train_data = Data(data_train, play_type_train)
train_dataloader = DataLoader(dataset=train_data, batch_size = batch_size, shuffle=True)

test_data = Data(data_test, play_type_test)
test_dataloader = DataLoader(dataset=test_data, batch_size = batch_size, shuffle=True)

# Defining the Neural Network

In [None]:
class NeuralNetwork(torch.nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.relu = nn.ReLU()
        self.linear1 = nn.Linear(11, 165)
        self.linear2 = nn.Linear(165, 153)
        self.linear3 = nn.Linear(153, 68)
        self.linear4 = nn.Linear(68, 4)
        
    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.linear2(x)
        x = self.relu(x)
        x = self.linear3(x)
        x = self.relu(x)
        x = self.linear4(x)
        return x

# Training the model

In [None]:
model = NeuralNetwork()
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.009084)

num_epochs = 50

loss_vals = []
loss = 0
for epoch in range(num_epochs):
    for X, y in train_dataloader:
        pred = model(X)
        loss = loss_fn(pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        loss_vals.append(loss.item())


## Plotting Loss

In [None]:
step = range(len(loss_vals))
div_val = max(step)/num_epochs

plt.plot(np.array(step)/div_val, np.array(loss_vals), color = 'red')
plt.title("Training Loss")
plt.xlabel("Epoch")
plt.ylabel("Cross Entropy Loss")
plt.show()

# Validating the model

#### Note: The code below is used for the validation data, use the code below for running test data as well once hyperparameters are tuned.

In [None]:
acc_list = []
with torch.no_grad():
    for X, y in test_dataloader:
        pred= model(X)  # Get model outputs
        loss = loss_fn(pred, y)

        pred_int = torch.max(pred, axis = 1)[1]
        gt_int = torch.max(y, axis = 1)[1]

        acc_list.append(np.vstack([pred_int, gt_int]).T)

## Plotting the confusion matrix

In [None]:
acc_arr = np.vstack(acc_list)
print('Accuracy:', sum(acc_arr[:,0]==acc_arr[:,1])/(len(acc_arr)),'%')

matrix = confusion_matrix(acc_arr[:,1], acc_arr[:,0])
summed = np.sum(matrix, axis = 1)
summed_div = np.vstack((summed, summed, summed, summed)).T

plt.subplots(figsize=(8, 5))
ticklabels = encoder.categories_[0]
ax1 = sns.heatmap(matrix/summed_div, annot=matrix, xticklabels = ticklabels, yticklabels = ticklabels,  cbar=False, linewidth = 2, linecolor = 'white', cmap = 'gist_heat', vmin = 0, vmax = 2, fmt="g")
# cbar = ax1.collections[0].colorbar
# cbar.ax.tick_params(colors = 'black')
# cbar.ax.set_yticks(ticks = [0,2])
# cbar.ax.set_yticklabels(labels = ['Min','Max'], fontsize = 14)
plt.title('Confusion Matrix', fontsize = 18)
plt.xlabel('Predicted', fontsize = 14)
plt.ylabel('Ground Truth', fontsize = 14)
plt.show()

# Hyperparameter Tuning with Bayesian Optimization

In [None]:
from bayes_opt import BayesianOptimization
import plotly.graph_objects as go
import time
from typing import Optional

### Functions

In [None]:
def NN_hyperparams(
    H_Layers: Optional[float] = None, 
    Hidden_layer_1_Nodes: Optional[float] = None, 
    Hidden_layer_2_Nodes: Optional[float] = None, 
    Hidden_layer_3_Nodes: Optional[float] = None, 
    learning_rate: Optional[float] = None, 
    batch_size: Optional[float] = None
    ) -> float:
    
    global Pre_snap_data
    global play_type

    #Reformating input variables
    Hidden_layer_1_Nodes = int(np.round(Hidden_layer_1_Nodes))
    Hidden_layer_2_Nodes = int(np.round(Hidden_layer_2_Nodes))
    Hidden_layer_3_Nodes = int(np.round(Hidden_layer_3_Nodes))

    #Defining batch size
    batch_size = 32    

    #Creating the model
    model = nn.Sequential(
        nn.Linear(11, Hidden_layer_1_Nodes),
        nn.ReLU(), 
        nn.Linear(Hidden_layer_1_Nodes, Hidden_layer_2_Nodes),
        nn.ReLU(),
        nn.Linear(Hidden_layer_2_Nodes, Hidden_layer_3_Nodes),
        nn.ReLU(),
        nn.Linear(Hidden_layer_3_Nodes, 4),
    )
    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    #Looping thought the model to average validation output, resplitting the data each time.
    Epochs = 10
    output = []
    for i in range(5):
        data_train, data_test, play_type_train, play_type_test = train_test_split(Pre_snap_data, play_type, train_size=0.8, shuffle=True)
        
        data_train = torch.tensor(data_train, dtype=torch.float32)
        play_type_train = torch.tensor(play_type_train, dtype=torch.float32)
        data_test = torch.tensor(data_test, dtype=torch.float32)
        play_type_test = torch.tensor(play_type_test, dtype=torch.float32)
        
        train_data = Data(data_train, play_type_train)
        train_dataloader = DataLoader(dataset=train_data, batch_size = batch_size, shuffle=True)
        test_data = Data(data_test, play_type_test)
        test_dataloader = DataLoader(dataset=test_data, batch_size = batch_size, shuffle=True)
        
        for epoch in range(Epochs):
            for X, y in train_dataloader:
                pred = model(X)
                loss = loss_fn(pred, y)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        acc_list = []
        with torch.no_grad():
            for X, y in test_dataloader:
                pred= model(X)  # Get model outputs
                loss = loss_fn(pred, y)
        
                pred_int = torch.max(pred, axis = 1)[1]
                gt_int = torch.max(y, axis = 1)[1]
        
                acc_list.append([pred_int, gt_int])
        
        correct = 0
        total = 0
        for j in range(len(acc_list)):
            correct += sum(acc_list[j][0]==acc_list[j][1])
            total += len(acc_list[j][0])
        output.append(correct/total)
    
    return np.average(np.vstack((output)))

### Running the Optimization

In [None]:
pbounds = {'Hidden_layer_1_Nodes': (2, 200), 'Hidden_layer_2_Nodes': (2, 200), 'Hidden_layer_3_Nodes': (2, 200), 'learning_rate': (0.0005, 0.1)}

runs = 3
rand_n = 1

optimizer = BayesianOptimization(f = NN_hyperparams, 
                                    pbounds=pbounds, verbose=2, 
                                    random_state=int(time.time()),
                                    allow_duplicate_points=True)

optimizer.maximize(init_points=runs, n_iter=rand_n)

### Plotting

In [None]:
#Grabbing the optimizer data
LR = np.array([[res["params"]["learning_rate"]] for res in optimizer.res]).flatten()
HL1 = np.array([[res["params"]["Hidden_layer_1_Nodes"]] for res in optimizer.res]).flatten()
HL2 = np.array([[res["params"]["Hidden_layer_2_Nodes"]] for res in optimizer.res]).flatten()
HL3 = np.array([[res["params"]["Hidden_layer_3_Nodes"]] for res in optimizer.res]).flatten()
LR = np.array([[res["params"]["learning_rate"]] for res in optimizer.res]).flatten()
target_arr = np.array([[res["target"] for res in optimizer.res]]).flatten()
opt_arr = np.vstack((HL1, HL2, HL3, LR, target_arr)).T

df = pd.DataFrame(opt_arr, columns = ['Hidden Neurons 1', 'Hidden Neurons 2', 'Hidden Neurons 3', 'Learning Rate', 'Accuracy'])

In [None]:
#Plotting with plotly
fig = go.Figure(data=
    go.Parcoords(
        line = dict(color = df['Accuracy'],
                   colorscale = 'Reds',
                   showscale = True,
                   cmin = min(df['Accuracy']),
                   cmax = max(df['Accuracy'])),
        dimensions = list([
            dict(range = [1,200],
                 label = 'Hidden Neurons 1', values = df['Hidden Neurons 1']),
            dict(range = [1,200],
                 label = 'Hidden Neurons 2', values = df['Hidden Neurons 2']),
            dict(range = [1,200],
                 label = 'Hidden Neurons 3', values = df['Hidden Neurons 3']),
            dict(range = [min(df['Learning Rate']),max(df['Learning Rate'])],
                 label = 'Learning Rate', values = df['Learning Rate']),
            dict(range = [min(df['Accuracy']),max(df['Accuracy'])],
                 label = 'Accuracy', values = df['Accuracy']),
            ])
))

fig.update_layout(
    font = dict(size = 20)
)
fig.show()