In [None]:
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import numpy as np
import torch
from torchviz import make_dot
from torch.utils.data import TensorDataset, DataLoader
import matplotlib.pyplot as plt
import shap

In [None]:
images = np.load('image_file.npy')
meta = np.load('meta_file.npy')
df = pd.read_csv('processed_pitch_table.csv')

train_dataset = TensorDataset(torch.Tensor(images[:9000]/255),
                        torch.Tensor(meta[:9000]),
                        torch.Tensor(df.label.values[:9000]))


test_dataset = TensorDataset(torch.Tensor(images[9000:]/255),
                        torch.Tensor(meta[9000:]),
                        torch.Tensor(df.label.values[9000:]))



train_loader = DataLoader(train_dataset, batch_size=128)#, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=128)#, shuffle=True)


In [None]:



class BaseballCNN(nn.Module):
    def __init__(self,filters=32,dropout=0.1,nodes=32):
        super().__init__()
        self.conv1 = nn.Conv2d(1, filters, 3)
        self.pool1 = nn.MaxPool2d(2)
        self.drop1 = nn.Dropout(p=dropout)
        
        self.conv2 = nn.Conv2d(filters, filters, 3)
        self.pool2 = nn.MaxPool2d(2)
        self.drop2 = nn.Dropout(p=dropout)
        
        self.conv3 = nn.Conv2d(filters, filters, 3)
        self.pool3 = nn.MaxPool2d(2)
        
        self.linear_image = nn.Linear(filters* 11* 14,nodes-4)
        self.linear_meta = nn.Linear(20,4)
        self.drop3 = nn.Dropout(dropout)
        
        self.linear2 = nn.Linear(nodes,nodes) 
        self.drop4 = nn.Dropout(dropout)
        self.linear3 = nn.Linear(nodes,1)
        
    def forward(self, x_image,x_meta):
        
        x_image = F.relu(self.conv1(x_image))
        x_image = self.pool1(x_image)
        x_image = self.drop1(x_image)
        # print(x_image.shape)
        x_image = F.relu(self.conv2(x_image))
        x_image = self.pool2(x_image)
        x_image = self.drop2(x_image)

        x_image = F.relu(self.conv3(x_image))
        # print(x_image.shape)
        x_image = self.pool3(x_image)
        # print(x_image.shape)
        # x_image = x_image.view(-1, 128 * 154)
        x_image = torch.flatten(x_image,1)
        # print(x_image.shape)
        x_image = self.linear_image(x_image)
        # print(x_image.shape)
        # print()
        x_meta = self.linear_meta(x_meta)
        # print(x_image.shape,x_meta.shape)
        x = torch.cat((x_image,x_meta),1)
        # x = self.linear3(self.drop4(self.linear2(x)))
        x= torch.sigmoid(self.linear3(self.drop4(self.linear2(x))))
        # x = F.softmax(self.linear3(self.drop4(self.linear2(x))),dim=0)
        return x




In [None]:
filters = 32

dropout = 0.1
nodes = 32
num_epochs = 40#60
# learning_rate = 0.0005 
learning_rate = 0.0001
device = torch.device('mps')



# Loss and optimizer
criterion = nn.BCELoss()

model = BaseballCNN(filters,dropout,nodes).to(device)
model.train()
# print(f'filters: {filters} dropout: {dropout} nodes: {nodes}')
# Device configuration

# Loss and optimizer

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)  

train_losses=[]
test_losses=[]
test_acc=[]
train_acc=[]
# Train the model
patience = 5
lowest_loss = 1
epochs_without_improvement = 0
for epoch in range(num_epochs):
    print(epoch)
    epoch_train_losses=0
    epoch_test_losses=0
    epoch_train_acc=0
    epoch_test_acc=0
    model.train()
    for i, (image_batch,meta_batch, y_batch) in enumerate(train_loader):  
        
        optimizer.zero_grad()
        image_batch = image_batch.to(device)
        meta_batch  = meta_batch.to(device)
        y_batch     = y_batch.to(device)
        
        # Forward pass and loss calculation
        outputs = model(image_batch,meta_batch)
        loss = criterion(outputs.squeeze(1), y_batch)

        # Backward and optimize
        loss.backward()
        optimizer.step()

        # Validation Loop 

    with torch.no_grad(): 
        model.eval() 
        for i, (image_batch,meta_batch, y_batch) in enumerate(test_loader): 
            image_batch = image_batch.to(device)
            meta_batch  = meta_batch.to(device)
            y_batch     = y_batch.to(device)
            outputs = model(image_batch,meta_batch)
            loss = criterion(outputs.squeeze(1), y_batch)
            epoch_test_losses+=outputs.shape[0]*loss.item()
            epoch_test_acc += (((outputs[:,0].cpu().detach().numpy()>0.5)*1==y_batch.cpu().detach().numpy())*1).sum()

        for i, (image_batch,meta_batch, y_batch) in enumerate(train_loader): 
            image_batch = image_batch.to(device)
            meta_batch  = meta_batch.to(device)
            y_batch     = y_batch.to(device)
            outputs = model(image_batch,meta_batch)
            loss = criterion(outputs.squeeze(1), y_batch)
            epoch_train_losses+=outputs.shape[0]*loss.item()
            epoch_train_acc += (((outputs[:,0].cpu().detach().numpy()>0.5)*1==y_batch.cpu().detach().numpy())*1).sum()


    # print(f'Validation Loss: {running_val_loss/len(test_loader):.4f}')

    test_losses += [epoch_test_losses/len(test_dataset)]
    test_acc += [epoch_test_acc/len(test_dataset)]
    train_losses += [epoch_train_losses/len(train_dataset)]
    train_acc += [epoch_train_acc/len(train_dataset)]

    if (test_losses[-1] < lowest_loss) or (epoch < 10):
        lowest_loss = test_losses[-1]
        epochs_without_improvement = 0
    else:
        epochs_without_improvement += 1

    if epochs_without_improvement > patience:
        print('Early stopping')
        break

    # Additional information
    EPOCH = epoch
    PATH = f"model/model_{EPOCH}.pt"

    torch.save({
                'epoch': EPOCH,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': test_losses[-1],
                }, PATH)

In [None]:

import matplotlib.ticker as mtick
from matplotlib.ticker import PercentFormatter
  



epochs=len(train_losses)

fig = plt.figure(1, (7,5))
ax = fig.add_subplot(1,1,1)

ax.plot(np.arange(1,epochs+1),train_losses,label='train')
ax.plot(np.arange(1,epochs+1),test_losses,label='test')
ax.set_xlabel('epoch')
ax.set_ylabel('loss')
ax.legend()
ax.set_title('Model Loss')
ax.grid()
plt.savefig('cnn_loss.png')



In [None]:
import matplotlib.ticker as mtick
from matplotlib.ticker import PercentFormatter
  
epochs=len(train_losses)

fig = plt.figure(1, (7,5))
ax = fig.add_subplot(1,1,1)

ax.plot(np.arange(1,epochs+1),train_acc,label='train')
ax.plot(np.arange(1,epochs+1),test_acc,label='test')
ax.plot([1,epoch+1],[0.6569541217989322,0.6569541217989322],label='upper bound',ls='--',color='grey')
ax.plot([1,epoch+1],[0.5535245679124061,0.5535245679124061],label='baseline',ls='--',color='black')
ax.set_xlabel('epoch')
ax.set_ylabel('accuracy')
ax.legend()
ax.set_title('Model Accuracy')
ax.grid()
ax.yaxis.set_major_formatter(mtick.PercentFormatter(1)) 
plt.savefig('cnn_acc.png')



In [None]:
image_batch,meta_batch, y_batch = test_dataset[:2000]
image_batch2,meta_batch2, y_batch2 = test_dataset[-500:]

In [None]:
# since shuffle=True, this is a random sample of test data
model.to('cpu')
background = [image_batch2.to('cpu'),meta_batch2.to('cpu')]
test_images = [image_batch.to('cpu'),meta_batch.to('cpu')]

e = shap.DeepExplainer(model, background)
shap_values = e.shap_values(test_images)


shap_numpy = [np.swapaxes(s, 0, 2) for s in shap_values[0]]
test_numpy = np.swapaxes(test_images[0].numpy(), 1, 3)

In [None]:
top = pd.DataFrame({'p':p[:2000]}).sort_values('p',ascending=False).head(100).index
for i in top:
# plot the feature attributions
    fig = shap.image_plot(shap_numpy[i],image_batch[i].numpy().squeeze(0).T,show=False)
    plt.savefig(f'cnn_shap_{i}.png')
    plt.close('all')