In [None]:
"""Imports."""

from pathlib import Path
from PIL import Image

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from torch import nn, optim
from torch.utils.data import TensorDataset, DataLoader, random_split
from torchvision import models, transforms

In [None]:
"""Specify which device (CPU or GPU) to use"""

if torch.cuda.is_available():  # if we have a GPU and its available to pytorch, use it
    device = torch.device("cuda")
    print(f"Using GPU device: {torch.cuda.get_device_name(device)}\n")
else:  # else, use cpu
    device = torch.device("cpu")
    print("Using CPU.\n")

In [None]:
"""Initialize target outputs: read our labels from a file and convert them to a 2-d tensor."""

labels_path = Path(r"C:\Users\jai\veo_nu\data\labels\Initial_combined_labels.csv")  # path to labels file
labels_df = pd.read_csv(labels_path)  # read the labels file into a pandas dataframe table
display(labels_df.head())  # display the first few rows of the labels dataframe

In [None]:
Y = labels_df[["Possession", "Set piece"]].values  # extract the "Possession" and "Set piece" values
Y = torch.from_numpy(Y).float().to(device)  # convert the extracted values to a 2-d tensor
print(Y.shape)  # print the shape of the initialized target outputs
N, n_classes = Y.shape[0], Y.shape[1]  # number of samples, number of classes

In [None]:
"""Initialize inputs: convert the frame images to a list of 3-d tensors (width X height X rgb)."""

frames_path = Path(r"C:\Users\jai\veo_nu\data\initial_combined_frames")  # path to image directory
n_channels, height, width = 3, 224, 224  # initialize the dimensions of the frames
X = torch.empty((N, n_channels, height, width))  # initialize a tensor that will store all frames
print(X.shape)  # print shape of initialized inputs

In [None]:
"""Fill in `X` frame-by-frame."""

for i, file in enumerate(frames_path.glob("*.png")):  # find each image file in `frames_path` directory
    with Image.open(file).convert("RGB") as img:  # open the image file and convert to RGB
        to_tensor = transforms.ToTensor()  # initialize a ToTensor conversion object
        resize = transforms.Resize((height, width))  # initialize a Resize object
        img_tensor = to_tensor(img).float()  # use the ToTensor object to convert the image to a tensor (will be normalized pixel values between 0-1)
        img_tensor = resize(img_tensor)  # use the Resize object to resize the tensor to the specified dimensions
        X[i, :] = img_tensor  # add the tensor of the current frame to our list of tensors

In [None]:
"""Create Datasets and DataLoader."""

# Create training, testing, and validation datasets.
dataset = TensorDataset(X, Y)
train_data, val_data = random_split(dataset, [0.90, 0.10])
# View training and validation data subsets.
print(train_data[:][0].shape, train_data[:][1].shape, val_data[:][0].shape, val_data[:][1].shape)


In [None]:
"""Create function to perform gradient centralization during training."""

def apply_gradient_centralization(optimizer):
    """Applies gradient centralization to the optimizer.
    
    This function should be called before optimizer.step() in the training loop.
    """
    for group in optimizer.param_groups:
        for param in group['params']:
            if param.grad is not None:
                # Compute the mean of the gradient
                grad_mean = param.grad.data.mean(dim=tuple(range(1, len(param.grad.shape))), keepdim=True)
                # Centralize the gradient
                param.grad.data -= grad_mean

In [None]:
"""Create function to train the model."""

def train(
    model: nn.Module,  # model
    train_loader: DataLoader,  # batched dataset for training
    val_loader: DataLoader,  # batched dataset for validation
    optimizer: optim,  # optimizer for performing parameter update step
    loss_fn: nn.modules.loss,  # loss function
    max_epochs: int = 5,  # max n training epochs
    val_check_interval: int = 1,  # check val loss every `val_check_interval` batches
) -> tuple[torch.Tensor, np.ndarray, np.ndarray]:  # -> loss, train_losses, val_losses
    """Trains a model, returns loss."""
    # <s Create Trackers
    train_losses, val_losses = [], []

    # <s Go through training and validation loop
    for epoch in range(max_epochs):  # epoch is all frames in our "train" dataset
        for batch_i, (x_train, y_train) in enumerate(train_loader):  # get train batch of frames and labels
            # <ss Model training.
            model.train()  # set model to training mode (which means it's computing gradients)
            optimizer.zero_grad()  # set all gradients to zero for the current step
            out = model(x_train)  # forward pass through the model
            loss = loss_fn(out, y_train)  # compute loss
            loss.backward()  # backward pass back through the model to compute gradients
            train_losses.append(loss.item())  # append the current train loss to list of train losses
            # /ss>
            # <ss Model validation (for early stopping).
            if i % val_check_interval == 0:  # every `val_check_interval` batches check val_loss and print
                model.eval()  # set model to eval mode
                with torch.no_grad():  # ensure gradients aren't computed
                    x_val, y_val = next(iter(val_loader))  # get val batch of frames and labels
                    val_loss = loss_fn(model(x_val), y_val).item()  # compute val loss
                    val_losses.append(val_loss)  # append the current val loss to list of val losses
                print(  # print the current epoch, batch, train loss, and val loss
                    f"Epoch {epoch + 1}: Batch {batch_i + 1}:  "
                    f"Loss = {train_losses[-1]:.3f}, Val Loss = {val_losses[-1]:.3f}"
                )
            # /ss>
    # /s>
    print("Finished training:")
    print(f"Epoch {epoch + 1}:  Batch {batch_i + 1}: Loss = {train_losses[-1]:.3f}, Val Loss = {val_losses[-1]:.3f}")
    return loss, train_losses, val_losses

In [None]:
"""Load in initial pretrained model."""

# Load in model that was used for AlexNet
# Possible pretrained models to try: DenseNet121_Weights, DenseNet169_Weights, ResNet50_Weights, ResNet101_Weights

model = models.densenet121(weights=models.DenseNet121_Weights.DEFAULT)
print(model)


In [None]:
"""Configure model (just change last layer in model)"""

dropout_rate = 0.2  # 20% cell removal to help with generalization

# Modify the classifier to output 2 probabilities
model.classifier = nn.Sequential(
    nn.Dropout(dropout_rate),
    nn.Linear(model.classifier.in_features, n_classes),
    nn.Sigmoid()  # Use sigmoid for binary multiclass, multilabel classification
)

In [None]:
"""Train model."""

batch_size = 32  # number of frames in each batch to process when computing loss
lr = 0.05  # learning rate: scale factor used in the parameter update step

# Create data loaders
train_loader = DataLoader(train_data, batch_size=batch_size)
val_loader = DataLoader(val_data, batch_size=batch_size)
# Set loss function and optimizer
loss_fn = nn.BCELoss()
optimizer = optim.SGD(model.parameters(), lr=0.02, weight_decay=1e-7, momentum=0.5, nesterov=True)
# Train
loss, train_losses, val_losses = train(
    model, train_loader, val_loader, optimizer, loss_fn, max_epochs=2,
)

In [None]:
# Plot training and validation losses

fig, ax = plt.subplots()
ax.plot(train_losses, label="Train")
ax.plot(val_losses, label="Val")
ax.legend()
ax.set_xlabel("Batch")
ax.set_ylabel("Loss")
ax.set_title("Training and Validation Losses")


In [None]:
# View val images and see how it performed

val_frame = val_data[:][0][0].unsqueeze(0)  # shape of first val image

In [None]:
model.eval()
model(val_frame)

In [None]:
val_data[:][1][0]