In [6]:
import torch
import torch.nn as nn
import sys
import os
import torch.nn.functional as F
sys.path.append(r"C:\Users\jashw\Desktop\Video Surveillance")
from datasets.custom_dataset import get_data_loader  # Custom dataset loader

In [7]:

class Simple3DCNN(nn.Module):
    def __init__(self, num_classes=8, num_frames=32):
        super(Simple3DCNN, self).__init__()
        self.num_frames = num_frames

        self.conv1 = nn.Conv3d(3, 32, kernel_size=(3, 3, 3), padding=(1, 1, 1))
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool3d(kernel_size=(1, 2, 2), stride=(1, 2, 2))

        self.conv2 = nn.Conv3d(32, 64, kernel_size=(3, 3, 3), padding=(1, 1, 1))
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2))

        self.conv3 = nn.Conv3d(64, 128, kernel_size=(3, 3, 3), padding=(1, 1, 1))
        self.relu3 = nn.ReLU()
        self.pool3 = nn.MaxPool3d(kernel_size=(2, 2, 2), stride=(2, 2, 2))

        # Adjust the fully connected layer input size based on the number of frames
        # and the output size of the convolutional layers
        self.fc1 = nn.Linear(128 * (num_frames // 8) * 28 * 28, 512) # Example for 32 frames
        self.relu_fc1 = nn.ReLU()
        self.fc2 = nn.Linear(512, 512)
        self.relu_fc1 = nn.ReLU()
        self.fc3 = nn.Linear(512, 512)
        self.relu_fc1 = nn.ReLU()
        self.fc4 = nn.Linear(512, num_classes)

    def forward(self, x):
        # Input x shape: (batch_size, channels, frames, height, width) -> (B, 3, T, 224, 224)
        x = self.pool1(self.relu1(self.conv1(x)))
        x = self.pool2(self.relu2(self.conv2(x)))
        x = self.pool3(self.relu3(self.conv3(x)))

        x = x.flatten(1) # Flatten the tensor for the fully connected layers
        x = self.relu_fc1(self.fc1(x))
        x = self.fc2(x)
        return x


In [None]:

# Example instantiation for 32 frames
model = Simple3DCNN(num_classes=8, num_frames=32)

# -------------------
# Hyperparameters
# -------------------
train_dir = r"C:\Users\jashw\Desktop\Video Surveillance\data\train"
val_dir = r"C:\Users\jashw\Desktop\Video Surveillance\data\val"
batch_size = 5  # Reduce batch size to lower VRAM usage
num_epochs = 20
learning_rate = 1e-4
num_classes = 8
accumulation_steps = 4  # Gradient accumulation

In [None]:
# -------------------
# Training Function
# -------------------
def train_model(model, train_loader, val_loader, device, class_weights,class_counts):
    model = model.to(device)
    criterion = nn.CrossEntropyLoss(weight=class_weights)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    scaler = torch.amp.GradScaler(device='cuda')  # Mixed Precision Training

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        optimizer.zero_grad()
        i = 1

        for step, (videos, labels) in enumerate(train_loader):
            videos, labels = videos.to(device), labels.to(device)
            print(f"Processing Video {i} Remaining {(sum(class_counts) - i)}")
            with torch.amp.autocast(device_type='cuda'):  # Mixed Precision
                videos = videos.permute(0, 3, 2, 1, 4)  # [B, C, T, H, W]
                outputs = model(videos)
                loss = criterion(outputs, labels) / accumulation_steps  # Scale loss

            scaler.scale(loss).backward()  # Scaled backpropagation

            if (step + 1) % accumulation_steps == 0:  # Update weights after accumulating gradients
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad()
                torch.cuda.empty_cache()  # Free unused GPU memory

            running_loss += loss.item() * accumulation_steps
            i += 1

        avg_loss = running_loss / len(train_loader)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

        validate_model(model, val_loader, device)

In [None]:
# -------------------
# Validation Function
# -------------------
def validate_model(model, val_loader, device):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for videos, labels in val_loader:
            videos, labels = videos.to(device), labels.to(device)
            videos = videos.permute(0, 3, 2, 1, 4)
            outputs = model(videos)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    if total == 0:
        print("No validation samples found. Skipping validation.")
    else:
        accuracy = 100 * correct / total
        print(f"Validation Accuracy: {accuracy:.2f}%")

In [None]:
# -------------------
# Main Function
# -------------------
def main():
    device = torch.device("cuda")
    print(f"Using device: {device}")

    pretrained_path = "data/trained_models/Kinetics-400 models/TimeSformer_divST_96x4_224_K400.pyth"
    model = load_model(pretrained_path)

    train_loader = get_data_loader(train_dir, batch_size=batch_size, clip_len=32, shuffle=True)  # Reduce clip_len
    val_loader = get_data_loader(val_dir, batch_size=batch_size, clip_len=32, shuffle=False)

    # Compute class weights
    data_path = r"C:\Users\jashw\Desktop\Video Surveillance\data\train"
    class_counts = [len(os.listdir(os.path.join(data_path, class_name))) for class_name in os.listdir(data_path)]
    class_labels = np.arange(len(class_counts))
    class_weights = compute_class_weight('balanced', classes=class_labels, y=np.concatenate([[i] * count for i, count in enumerate(class_counts)]))
    class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)

    # Train model
    train_model(model, train_loader, val_loader, device, class_weights,class_counts)

    # Save fine-tuned model
    model_path = "data/trained_models/timesformer_finetuned.pth"
    torch.save(model.state_dict(), model_path)
    print(f"Fine-tuned model saved to {model_path}")