In [None]:
import os
import pandas as pd
import numpy as np
import cv2
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
from torchvision import datasets, transforms
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn.model_selection import StratifiedKFold
from IPython.display import display, Image
import imageio

In [None]:
# Dictionaries to convert labels to integers and vice versa
label_to_int = {'corner':0, 'longpass':1, 'ontarget':2, 'penalty':3, 'substitution':4, 'throw-in':5, 'foul':6, \
                'goalkick':7,'shortpass':8,'freekick':9}
int_to_label = {0:'corner', 1:'longpass', 2:'ontarget', 3:'penalty', 4:'substitution', 5:'throw-in', 6:'foul', \
                7:'goalkick',8:'shortpass',9:'freekick'}

In [None]:
class VideoDataset(Dataset):
    def __init__(self, video_folder, num_frames=5, transform=None):
        self.video_folder = video_folder
        self.num_frames = num_frames
        self.transform = transform

        self.file_names = []
        self.labels_dic = {}
        self.video_files = []
        self.labels = []
        for label in os.listdir(video_folder):
            self.video_files += [f'{label}/{f}' for f in os.listdir(f'{video_folder}/{label}')]
            self.labels += [label_to_int[label]]*len(os.listdir(f'{video_folder}/{label}'))
            self.file_names += [f'{video_folder}/{label}/{f}' for f in os.listdir(f'{video_folder}/{label}')]
            for f in os.listdir(f'{video_folder}/{label}'):
                self.labels_dic[f'{label}/{f}'] = label_to_int[label]

    def __len__(self):
        return len(self.video_files)

    def __getitem__(self, idx):
        video_path = os.path.join(self.video_folder, self.video_files[idx])
        frames = self.read_frames(video_path)

        # Select 5 frames evenly spaced throughout the video
        selected_frames = frames[::len(frames)//self.num_frames][:self.num_frames]

        # Apply transformations to each frame if specified
        if self.transform:
            selected_frames = [self.transform(frame) for frame in selected_frames]

        # Stack frames along a new dimension to create a single tensor
        video_tensor = torch.stack(selected_frames, dim=0)

        # Add the label from the dictionary
        label = self.labels_dic[self.video_files[idx]]
        return video_tensor, label


    def read_frames(self, video_path):
        cap = cv2.VideoCapture(video_path)
        frames = []
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frames.append(frame)
        cap.release()
        return frames


# Change video folder to wherever the video is
video_folder = '../../cs/cs152/shared/isaac_skandda_josh/SoccerAct10Dataset'
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Generate dataset of videos for training
dataset = VideoDataset(video_folder, num_frames=5, transform=transform)

In [None]:
# Load the pre-trained resnet50 model
resnet50 = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V2)
# Remove the last fully connected layer of resnet50
resnet50 = nn.Sequential(*list(resnet50.children())[:-1])

# Load the pre-trained VGG16 model
vgg16 = models.vgg16(pretrained=True)

# Remove the last fully connected layer of VGG16
vgg16 = nn.Sequential(*list(vgg16.children())[:-1])


class CustomModel(nn.Module):
    def __init__(self, vgg_features, resnet_features, num_classes, num_time_steps, \
                 reduce_size = 256, gru_or_LSTM = "GRU",vgg_or_res = "VGG"):
        super(CustomModel, self).__init__()
        self.num_time_steps = num_time_steps
        self.gru_or_LSTM = gru_or_LSTM
        self.vgg_or_res = vgg_or_res
        
        self.vgg_features = vgg_features
        self.resnet_features = resnet_features

        vgg_output_size = self._calculate_vgg_output_size()
        resnet_output_size = self._calculate_resnet_output_size()
        output_size = vgg_output_size if vgg_or_res == "VGG" else resnet_output_size 
        
        self.reduce_layer = nn.Linear(output_size,reduce_size)
        self.time_distributed_fc = nn.ModuleList([
            nn.Linear(reduce_size, reduce_size) for _ in range(num_time_steps)
        ])
        
        self.flat = nn.Flatten()
        self.gru = nn.GRU(reduce_size, 20, 2, batch_first = True)
        self.lstm = nn.LSTM(reduce_size, 20 , 2, batch_first = True)
        self.dropout = nn.Dropout(0.25)
        self.fc = nn.Linear(20, num_classes)

        for param in self.vgg_features.parameters():
                param.requires_grad = False
            
        for param in self.resnet_features.parameters():
                param.requires_grad = False
    
    def _calculate_vgg_output_size(self):
        sample_input = torch.randn(dataset[0][0].shape)

        # Pass the sample input through the VGG model
        vgg_output = self.vgg_features(sample_input)

        # Calculate the total number of elements in the output tensor
        vgg_output_size = vgg_output.view(vgg_output.size(0), -1).size(1)

        return vgg_output_size
    
    def _calculate_resnet_output_size(self):
        sample_input = torch.randn(dataset[0][0].shape)

        # Pass the sample input through the resnet model
        resnet_output = self.resnet_features(sample_input)

        # Calculate the total number of elements in the output tensor
        resnet_output_size = resnet_output.view(resnet_output.size(0), -1).size(1)

        return resnet_output_size

    def forward(self, x):
        batch_size, num_frames, channels, height, width = x.size()

        # Resize input tensor so it can be passed to VGG/resnet
        x = x.view(-1, channels, height, width)

        # Pass resized input tensor through VGG/resnet
        if self.vgg_or_res == "VGG":
            x = self.vgg_features(x)
        else:
            x = self.resnet_features(x)

        # Resize to have batch_size as first dimension again
        x = x.view(batch_size, num_frames, -1)
        updated_x = torch.zeros_like(torch.empty(batch_size, self.num_time_steps, 256))

        # Apply time-distributed stacked dense layers
        for i in range(self.num_time_steps):
            # Reduce the input, using the reduce layer before passing it to the dense layer
            updated_x[:, i, :] = F.relu(self.time_distributed_fc[i](F.relu(self.reduce_layer(x[:, i, :]))))

        x = updated_x

        # Apply GRU/LSTM
        if self.gru_or_LSTM == "GRU":
            _,x = self.gru(x)
            x = x[1,:,:,]
        else:
            x,_ = self.lstm(x)
            x = x[:,-1,:]

        # Apply dropout layer
        x = self.dropout(x)

        # Apply fully connected layer
        x = self.fc(x)
        return x


num_classes = 10
num_time_steps = 5

In [None]:
import random

train_ratio = 0.8
test_ratio = 1 - train_ratio
learning_rate = 0.001

# Calculate the number of samples for the train and test sets
num_samples = len(dataset)
num_train = int(train_ratio * num_samples)
num_test = num_samples - num_train

random.seed(42)
# Generate random indices for the train and test sets
random_indices = random.sample(range(num_samples), num_samples)
train_indices = random_indices[:num_train]
test_indices = random_indices[num_train:]

# Separate dataset into train and test sets
train_dataset = torch.utils.data.Subset(dataset, train_indices)
test_dataset = torch.utils.data.Subset(dataset, test_indices)

# Convert datasets into loaders so we can use for training or evaluation
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
# Define the model architecture
custom_model = CustomModel(vgg16, resnet50, num_classes, num_time_steps, reduce_size=256, gru_or_LSTM = "GRU",vgg_or_res = "VGG")

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(custom_model.parameters(), lr=learning_rate)

# Loop over the dataset every epoch
for epoch in range(5):  

    running_loss = 0.0
    for i, data in tqdm(enumerate(train_loader, 0),total = len(train_loader)):
        inputs, labels = data

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = custom_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 10 == 9:    # print every 10 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 10:.3f}')
            running_loss = 0.0

print('Finished Training')

In [None]:
# Change as needed
model_name = 'model1'
torch.save(custom_model.state_dict(), f'{model_name}.pth')

In [None]:
# Function to create a gif from frames
def create_gif(frames, gif_path):
    imageio.mimsave(gif_path, frames, duration=0.1)

# Function to return the predictions and actual label for a given video
def predict_on_video(video_path):
    idx = dataset.file_names.index(video_path)

    # Make predictions
    actual_label = dataset[idx][1]
    with torch.no_grad():
        predictions = custom_model(dataset[idx][0].unsqueeze(0))

    # Convert predictions to probabilities
    probabilities = torch.nn.functional.softmax(predictions, dim=1)

    # Convert tensor to a list of probabilities
    probabilities_list = probabilities.numpy().tolist()

    # Create a dictionary with class labels and corresponding probabilities
    result = {int_to_label[i]: probability for i, probability in enumerate(probabilities_list[0])}

    return result, int_to_label[actual_label]


# Function to display videos with labels and predictions
def display_video_with_predictions(video_path):
    cap = cv2.VideoCapture(video_path)
    frames = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Convert frame to RGB (OpenCV uses BGR by default)
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frames.append(frame_rgb)

    cap.release()

    # Save frames as GIF
    gif_path = 'temp.gif'
    imageio.mimsave(gif_path, frames, duration=0.1)
    
    predicted_labels, actual_label = predict_on_video(video_path)

    print(f'Actual Label: {actual_label}')


    for j in predicted_labels:
        print(f'Predicted Probability for {j}: {predicted_labels[j]}')

    display(Image(filename=gif_path,width=500))

# Display sample videos with predictions to validate model
for label in os.listdir(video_folder):
    for file in os.listdir(f'{video_folder}/{label}')[:1]:
        display_video_with_predictions(f'{video_folder}/{label}/{file}')

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def calculate_classification_metrics(predictions, targets):
    # Convert PyTorch tensors to numpy arrays
    predictions = predictions.detach().numpy()
    targets = targets.detach().numpy()

    # Convert probabilities to class labels
    predicted_labels = np.argmax(predictions, axis=1)

    # Calculate accuracy
    accuracy = accuracy_score(targets, predicted_labels)

    # Calculate precision, recall, and F1 score
    precision = precision_score(targets, predicted_labels, average='weighted')
    recall = recall_score(targets, predicted_labels, average='weighted')
    f1 = f1_score(targets, predicted_labels, average='weighted')

    return accuracy, precision, recall, f1


model = CustomModel(vgg16, resnet50, num_classes, num_time_steps, gru_or_LSTM = "GRU",vgg_or_res = "RES")
model.load_state_dict(torch.load(f'{model_name}.pth'), strict=False)
model.eval()

# Lists to store predictions and targets
all_predictions = []
all_targets = []

# Iterate through the test data loader
for inputs, targets in test_loader:
    with torch.no_grad():
        outputs = model(inputs)
    all_predictions.append(outputs)
    all_targets.append(targets)

all_predictions = torch.cat(all_predictions)
all_targets = torch.cat(all_targets)

accuracy, precision, recall, f1 = calculate_classification_metrics(all_predictions, all_targets)
print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")

In [None]:
num_folds = 5 
skf = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=42)

# Define training parameters
num_epochs = 5
learning_rate = 0.001


# Perform cross-validation
for fold, (train_index, val_index) in enumerate(skf.split(range(len(dataset)), dataset.labels)):
    print(f'Fold {fold + 1}/{num_folds}')

    # Split data into training and validation sets for this fold
    train_dataset = torch.utils.data.Subset(dataset, train_index)
    val_dataset = torch.utils.data.Subset(dataset, val_index)

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

    # Create an instance of your custom model
    custom_model = CustomModel(models.vgg16(pretrained=True).features, num_classes, num_time_steps, \
                               reduce_size=256, gru_or_LSTM = "GRU",vgg_or_res = "VGG")

    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(custom_model.parameters(), lr=learning_rate)

    # Training loop
    for epoch in range(num_epochs):
        custom_model.train()  # Set the model to training mode
        running_loss = 0.0

        for videos, labels in train_loader:
            optimizer.zero_grad()

            outputs = custom_model(videos)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        # Print training statistics
        print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {running_loss / len(train_loader)}')

    # Validation loop
    custom_model.eval()  # Set the model to evaluation mode
    val_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for videos, labels in val_loader:
            outputs = custom_model(videos)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    # Print validation statistics
    print(f'Validation Loss: {val_loss / len(val_loader)}, Accuracy: {100 * correct / total}%')

print('Cross-validation finished')