Training file to detect pass or run

In [3]:
# !pip install torch
# !pip install numpy
# !pip install opencv-python
# !pip install torchvision
# !pip install scikit-learn
# !pip install transformers torch

In [4]:
# Import neccessary libraries
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import Dataset
import numpy as np
import cv2


In [5]:
def extract_raw_frames(video_path):
    """
    Reads a video and returns a list of raw RGB frames as NumPy arrays.
    """
    cap = cv2.VideoCapture(video_path)
    frames_list = []
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        # Convert BGR (OpenCV) to RGB (NumPy/HuggingFace expectation)
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frames_list.append(rgb_frame)
        
    cap.release()
    
    # Hugging Face models often expect a list of frames, not a stacked tensor, at this stage
    return frames_list


In [6]:
# The custom PyTorch Dataset class
class VideoDataset(Dataset):
    def __init__(self, video_paths, labels, transforms):
        self.video_paths = video_paths
        self.labels = labels
        self.processor = transforms

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        video_path = self.video_paths[idx]
        label = self.labels[idx]
        
        # 1. Get raw frames (using the helper function)
        raw_frames = extract_raw_frames(video_path)
        
        if not raw_frames:
             # Handle a broken/empty video file (Crucial to prevent crashes!)
             # In a real project, you'd likely filter these out pre-training.
             return None, None 

        # 2. Use the TimesformerProcessor to transform the frames
        # The 'padding' and 'return_tensors' arguments are essential.
        encoding = self.processor(
            images=raw_frames, 
            return_tensors="pt"
        )
        
        # The processor returns a dictionary. We extract the pixel values tensor.
        # We also squeeze the tensor to remove the batch dimension (which is 1 here).
        pixel_values = encoding['pixel_values'] 

        # 3. Return the processed tensor and the label
        # The 'pixel_values' tensor is now in the required TimeSformer shape: 
        # (channels, num_frames, height, width)
        return pixel_values, label



In [9]:
def custom_video_collate_fn(batch):
    # 1. Filter out corrupted/empty videos
    batch = [item for item in batch if item[0] is not None]
    if not batch:
        return None, None

    # 2. Separate and Prepare
    pixel_values_list = [item[0].squeeze(0).clone() for item in batch]
    labels = torch.LongTensor([item[1] for item in batch])
    
    # 3. Permute and Flatten (Preparation for Padding)
    # Target shape for pad_sequence is (Frames, Features)
    
    # a. Permute: (C, F, H, W) -> (F, C, H, W)
    permuted_tensors = [t.permute(1, 0, 2, 3) for t in pixel_values_list]

    # b. Flatten: (F, C, H, W) -> (F, C*H*W)
    # The new shape is (Frames, 3 * 224 * 224)
    flattened_tensors = [t.flatten(1) for t in permuted_tensors]

    # 4. Padding (Output shape: Batch, Max_F, C*H*W)
    padded_sequences = torch.nn.utils.rnn.pad_sequence(
        flattened_tensors, 
        batch_first=True, 
        padding_value=0.0
    )

    # --- 5. RESHAPE FIX ---
    # Retrieve current dimensions after padding
    batch_size, max_frames, _ = padded_sequences.shape 
    
    # Unflatten the C*H*W back into (C, H, W)
    # Shape: (Batch, Max_F, C, H, W)
    final_inputs = padded_sequences.view(
        batch_size, 
        max_frames, 
        3, 224, 224 # Fixed Dimensions: Channels, Height, Width
    )

    # 6. Final Permute for TimeSformer Input
    # (B, F, C, H, W) -> (B, C, F, H, W)
    final_inputs = final_inputs.permute(0, 2, 1, 3, 4) 
    
    return final_inputs, labels

In [18]:
## 1. Setup: Define Hyperparameters and Device
learning_rate = 0.001
num_epochs = 10
batch_size = 2
frame_height = 224
frame_width = 224
num_channels = 3 # For RGB images

# Use GPU if available, otherwise use CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Create the Dataset and DataLoader
game = 'USCOffsensevClemsonPlays'
# Get every file from the drive folder using os library
import os
video_paths = []
file_names = []
for root, dirs, files in os.walk(game):
    for file in files:
        if file.endswith('.mp4'):
            video_paths.append(os.path.join(root, file))
            file_names.append(file)
# Split the filename by - and take the second part to get the label
labels = [
    int(f.split('-')[1][0]) for f in file_names if f.endswith('.mp4')
]    # A list of corresponding labels (0 (run) or 1 (pass))


from transformers import AutoModelForVideoClassification, AutoProcessor


processor = AutoProcessor.from_pretrained("facebook/timesformer-base-finetuned-k400")

dataset = VideoDataset(video_paths, labels, transforms=processor)

total_size = len(dataset)

# Split the data (e.g., 80% train, 20% test)
train_size = int(0.8 * total_size)
test_size = total_size - train_size
from torch.utils.data import random_split
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])


from torch.utils.data import DataLoader # provides TimeSformer (e.g., HuggingFace, or a custom im
training_dataloader = DataLoader(
    train_dataset, 
    batch_size=batch_size, 
    shuffle=True,
    pin_memory=True,
    num_workers=1,
    collate_fn=custom_video_collate_fn
    )


# --- 2. Load the Model and Adjust the Final Layer ---
model_name = "facebook/timesformer-base-finetuned-k400"
num_labels = 2 # Your two classes: 'run' and 'pass'

# Use AutoModelForVideoClassification to load the TimeSformer architecture
model = AutoModelForVideoClassification.from_pretrained(
    model_name, 
    num_labels=num_labels, # This correctly sets the output layer size to 2
    ignore_mismatched_sizes=True
).to(device)

for param in model.timesformer.parameters(): 
    param.requires_grad = False  # Freeze the main Transformer body

criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(
    filter(lambda p: p.requires_grad, model.parameters()), 
    lr=learning_rate)

# Training Loop
print("Starting training...")
for epoch in range(num_epochs):
    model.train()  # Set th

    # 3. Reshape and Permute to Final 5D Tensor
    running_loss = 0.0
    
    for i, (inputs, labels) in enumerate(training_dataloader):
        # Move data to the device
        inputs = inputs.permute(0, 2, 1, 3, 4).to(device)
        labels = labels.to(device)
        if inputs.dim() == 4:
            inputs = inputs.unsqueeze(0)
        # The DataLoader may return None if a video is not loaded correctly
        if inputs is None:
            print(f"Skipping batch {i} due to loading error.")
            continue

        optimizer.zero_grad()
        outputs = model(pixel_values=inputs, labels=labels)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()

        del inputs, labels, outputs, loss  # Free up memory
        torch.cuda.empty_cache()  # Clear the GPU cache if using GPU
        
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(training_dataloader):.4f}")

print("Training finished!")

Some weights of TimesformerForVideoClassification were not initialized from the model checkpoint at facebook/timesformer-base-finetuned-k400 and are newly initialized because the shapes did not match:
- classifier.weight: found shape torch.Size([400, 768]) in the checkpoint and torch.Size([2, 768]) in the model instantiated
- classifier.bias: found shape torch.Size([400]) in the checkpoint and torch.Size([2]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Starting training...


RuntimeError: Caught RuntimeError in DataLoader worker process 0.
Original Traceback (most recent call last):
  File "/home/jrsteel/.venv/lib64/python3.11/site-packages/torch/utils/data/_utils/worker.py", line 349, in _worker_loop
    data = fetcher.fetch(index)  # type: ignore[possibly-undefined]
           ^^^^^^^^^^^^^^^^^^^^
  File "/home/jrsteel/.venv/lib64/python3.11/site-packages/torch/utils/data/_utils/fetch.py", line 55, in fetch
    return self.collate_fn(data)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/local_scratch/slurm.6730891/ipykernel_3735693/2886333734.py", line 32, in custom_video_collate_fn
    final_inputs = padded_sequences.view(
                   ^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: shape '[2, 3, 3, 224, 224]' is invalid for input of size 108079104


In [None]:
# Save the model state for testing use
torch.save(model.state_dict(), 'video_classifier_model_attempt_2.pth')

In [None]:
# Create the test DataLoader
test_dataloader = DataLoader(
    test_dataset, 
    batch_size=batch_size, # Use the same batch size or even larger for testing
    shuffle=False,         # Crucial: DO NOT shuffle test data
    num_workers=1,
    pin_memory=True,
    collate_fn=video_collate_fn # Use the custom collate function
)


import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

# Switch the model to evaluation mode
# This disables layers like Dropout and sets Batch Norm to use population statistics
model.eval() 

# Lists to store true labels and predicted labels
all_labels = []
all_predictions = []
test_loss_total = 0.0

print("Starting evaluation on test set...")

# Turn off gradient calculations for memory and speed
with torch.no_grad():
    for i, (inputs, labels) in enumerate(test_dataloader):
        # Move data to the device
        inputs = inputs.to(device)
        labels = labels.to(device)

        if inputs is None:
            continue
            
        # 1. Forward Pass
        outputs = model(inputs)
        loss = criterion(outputs, labels) # Calculate loss for reporting
        test_loss_total += loss.item()

        # 2. Get Predicted Class
        # torch.max(outputs, 1) returns (values, indices)
        # The indices are the predicted class (0 for run, 1 for pass)
        _, predicted = torch.max(outputs.data, 1)
        
        # 3. Store Results
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy())

# Calculate the final average loss
avg_test_loss = test_loss_total / len(test_dataloader)
print(f"Test Loss: {avg_test_loss:.4f}")

Starting evaluation on test set...


ValueError: too many values to unpack (expected 5)

In [None]:
# Assuming you ran the evaluation loop above and have all_labels, all_predictions

# Calculate Accuracy
test_accuracy = accuracy_score(all_labels, all_predictions)

# Calculate Precision, Recall, F1-score
# 'average=binary' is appropriate for a two-class problem where '1' is the positive class (Pass)
precision, recall, f1_score, _ = precision_recall_fscore_support(
    all_labels, 
    all_predictions, 
    average='binary', 
    pos_label=1  # Assuming '1' (Pass) is your positive class
)

# Print results
print("\n--- Model Performance Metrics ---")
print(f"Overall Accuracy: {test_accuracy:.4f}")
print(f"Precision (Class 1 'Pass'): {precision:.4f}")
print(f"Recall (Class 1 'Pass'): {recall:.4f}")
print(f"F1-Score (Class 1 'Pass'): {f1_score:.4f}")


--- Model Performance Metrics ---
Overall Accuracy: 0.3333
Precision (Class 1 'Pass'): 0.4286
Recall (Class 1 'Pass'): 0.6000
F1-Score (Class 1 'Pass'): 0.5000
