In [17]:
# Run once
# !pip install torch torchvision pytorchvideo opencv-python scikit-learn

## Imports

In [18]:
import torch
# Initialize model
model = torch.hub.load('facebookresearch/pytorchvideo', 'slowfast_r50', pretrained=True) #else some imports won't work for some reason
import torch.nn as nn
import torch.optim as optim
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import cv2
import numpy as np
import os
from torchvision.transforms import Compose, Resize, Normalize
from tqdm import tqdm

from typing import Dict
import json
import urllib
from torchvision.transforms import Compose, Lambda
from torchvision.transforms._transforms_video import (
    CenterCropVideo,
    NormalizeVideo,
)
from pytorchvideo.data.encoded_video import EncodedVideo
from pytorchvideo.transforms import (
    ApplyTransformToKey,
    ShortSideScale,
    UniformTemporalSubsample,
    UniformCropVideo
)

Using cache found in C:\Users\kaiav/.cache\torch\hub\facebookresearch_pytorchvideo_main


## CSV/DF

In [19]:
# Import, clean up csv file, add clip paths
csv_filename = 'labels.csv'
df = pd.read_csv(csv_filename)

# print( 'Unique labels: ', df['label'].unique() )                               # print label values (should only be 0, 1, 2)
# print( 'Original shape: ', df.shape)                                           # print original shape
# print( 'Data preview:\n', df.head() )                                          # preview data
if df['clip_name'].duplicated().any():                                         # check for duplicates
    print("There are duplicate clip names.")
else:
    print("No duplicate clip names found.")
if not df['clip_name'].str.startswith("clip_").all():                          # validate all filenames start with 'clip_'
    print("Some filenames do not start with 'clip_':")
    print(df[~df['clip_name'].str.startswith("clip_")])                        # print invalid rows, if any
else:
    print("All filenames start with 'clip_'.")    
print()
df['label'] = df['label'].astype(str).str.strip().str.lower()                  # convert to string, strip whitespaces, convert to lowercase
df = df.drop(df[df['label'] == '2'].index)                                     # prune invalid/void pass/dribble scenarios
# print( 'Unique labels: ', df['label'].unique() )                               # print label values (should only be 0/1)
# print( 'New shape: ', df.shape )                                               # print pruned df shape

df['clip_path'] = df['clip_name'].apply(lambda x: os.path.join('raw_clips', x))# add column for full clip path
print( df.head() )                                                             # preview dataframe

No duplicate clip names found.
All filenames start with 'clip_'.

       clip_name label                clip_path
1  clip_0001.mp4     0  raw_clips\clip_0001.mp4
2  clip_0002.mp4     1  raw_clips\clip_0002.mp4
4  clip_0004.mp4     1  raw_clips\clip_0004.mp4
5  clip_0005.mp4     1  raw_clips\clip_0005.mp4
8  clip_0008.mp4     0  raw_clips\clip_0008.mp4


## Input Transform for SlowFast

In [20]:
# Code from: https://colab.research.google.com/github/pytorch/pytorch.github.io/blob/master/assets/hub/facebookresearch_pytorchvideo_slowfast.ipynb 
side_size = 256
mean = [0.45, 0.45, 0.45]
std = [0.225, 0.225, 0.225]
crop_size = 256
num_frames = 32
slowfast_alpha = 4

class PackPathway(torch.nn.Module):
    """
    Transform for converting video frames as a list of tensors. 
    """
    def __init__(self):
        super().__init__()
        
    def forward(self, frames: torch.Tensor):
        fast_pathway = frames
        # Perform temporal sampling from the fast pathway.
        slow_pathway = torch.index_select(
            frames,
            1,
            torch.linspace(
                0, frames.shape[1] - 1, frames.shape[1] // slowfast_alpha
            ).long(),
        )
        frame_list = [slow_pathway, fast_pathway]
        return frame_list

transform =  ApplyTransformToKey(
    key="video",
    transform=Compose(
        [
            UniformTemporalSubsample(num_frames),
            Lambda(lambda x: x/255.0),
            NormalizeVideo(mean, std),
            ShortSideScale(
                size=side_size
            ),
            CenterCropVideo(crop_size),
            PackPathway()
        ]
    ),
)

## VideoDataSet 

In [21]:
# Create custom DataSet object
class VideoDataset(Dataset):
    def __init__(self, data_frame, transform=transform):
        """
        Args:
            data_frame (pd.DataFrame): DataFrame with columns ['clip_name', 'label', 'clip_path']
            transform (callable, optional): Transform to be applied to video frames
        Returns: frame_pathways and label for given clip
        """
        self.data_frame = data_frame
        self.transform = transform

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        # Get clip path and label
        clip_path = self.data_frame.iloc[idx]['clip_path']
        label = self.data_frame.iloc[idx]['label']
        label = torch.tensor( int(label) )

        # Initialize an EncodedVideo helper class and load the video
        video = EncodedVideo.from_path(clip_path)
        video_data = video.get_clip(0, 1) # 0 to 1 sec
        
        # Apply a transform to normalize the video input
        video_data = transform(video_data)
        
        # Move the frame_pathways and label to the desired device
        frame_pathways = video_data["video"]
        frame_pathways = [i.to(device) for i in frame_pathways]
        label = label.to(device)
        
        return frame_pathways, label

## DataLoaders

In [22]:
# Split data and set up DataLoaders

# Split data frames into train/test/validation
train_df, temp_df = train_test_split(df, test_size=0.3, random_state=42)  # 70% train
test_df, val_df = train_test_split(temp_df, test_size=0.5, random_state=42)  # 15% test 15% validation

# Create Dataset objects
train_dataset = VideoDataset(train_df, transform=transform)
test_dataset = VideoDataset(test_df, transform=transform)
val_dataset = VideoDataset(val_df, transform=transform)

# Create DataLoader objects
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)

## Model Setup

In [23]:
# model = slowfast_r50 imported in Imports section
new_fc = nn.Linear(400, 2)
new_model = torch.nn.Sequential(model, new_fc) # this is the final model we will use to classify pass vs dribble clips
softmax = torch.nn.Softmax(dim=1) #post activation function from raw logits from model output

device = "cpu"
new_model = new_model.to(device)

## Sinlge Batch Inference

In [24]:
# Get a batch of clip and label pairs
test_iter = iter(test_loader)
frame_pathways, labels = next(test_iter) #frame_pathways holds the slow and fast pathway tensors, so its a list of 2 tensors, each one has all the frames for the batch
print(f"Batch size: {frame_pathways[0].shape[0]}\nClip frames shape: {frame_pathways[0].shape}\nLabels: {labels}")

#Get predictions
new_model.eval()
preds = new_model(frame_pathways.copy())
preds = softmax(preds)
preds = torch.argmax(preds, dim=1)

print(f"Predicted labels: {preds}")

Batch size: 8
Clip frames shape: torch.Size([8, 3, 8, 256, 256])
Labels: tensor([0, 1, 0, 0, 1, 1, 1, 0])
Predicted labels: tensor([1, 0, 0, 0, 0, 0, 0, 0])


## Baseline performance

In [25]:
def test_accuracy():
    new_model.eval()
    total = 0
    correct = 0
    with torch.no_grad():
        for frame_pathways, labels in tqdm(test_loader, desc="Testing", unit="batch"):
            # Get model predictions
            outputs = new_model(frame_pathways)
            
            # Apply softmax if not part of the model's last layer
            probabilities = softmax(outputs)
            
            # Get predicted class (highest probability)
            predicted_classes = torch.argmax(probabilities, dim=1)
            
            # Update the total and correct counts
            total += labels.size(0)  # Total samples in this batch
            correct += (predicted_classes == labels).sum().item()  # Correct predictions
    
    # Calculate accuracy
    accuracy = correct / total
    print(f"Accuracy: {accuracy * 100:.2f}%")

In [26]:
test_accuracy()

Testing: 100%|█████████████████████████████████████████████████████████████████████████████████| 22/22 [10:13<00:00, 27.90s/batch]

Accuracy: 54.91%





## Fine-tuning

In [None]:
# Freeze base layers, leaving only the last two FC layers for fine-tuning
for param in model.parameters():
    param.requires_grad = False
for param in new_fc.parameters():
    param.requires_grad = True
# Set up loss function, optimizer, etc
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(new_fc.parameters(), lr=0.001)
num_epochs = 1

for epoch in range(num_epochs):
    new_model.train()
    running_loss = 0.0 

    for batch_idx, (frame_pathways, labels) in enumerate(train_loader):
        # Forward pass
        outputs = new_model(frame_pathways)
        loss = criterion(outputs, labels)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        if (batch_idx + 1) % 10 == 0:  # Print every 10 batches
            print(f"Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}")

    # Print average loss for the epoch
    avg_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Average Loss: {avg_loss:.4f}")

print("Training complete!")

## Fine-tuned performance

In [None]:
test_accuracy()

In [None]:
# Plan
# 1) fine-tune model a little
# 2) test baseline performance
# 3) finish fine-tuning with entire dataset
# 4) evaluate fine-tuned model performance (with the test set)