In [13]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from torchvision import models
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision import transforms
import cv2
import os
from  training_utils import *

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [14]:
class RNN_Model(nn.Module):
    def __init__(self, num_classes, hidden_size, num_layers):
        super(RNN_Model, self).__init__()
        # Load the pretrained ResNet-18 model
        self.resnet = models.resnet18(weights = models.ResNet18_Weights.DEFAULT)
        self.resnet = nn.Sequential(*(list(self.resnet.children())[:-1]))
        
        # RNN (LSTM) layer
        self.lstm = nn.LSTM(input_size=512, hidden_size=hidden_size,
                            num_layers=num_layers, batch_first=True)
        
        # Classification layer
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # x shape: [batch, time, channels, height, width]
        batch_size, timesteps, C, H, W = x.size()
        
        # Flatten dimensions for ResNet
        x = x.view(batch_size * timesteps, C, H, W)
        
        # Feature extraction through ResNet
        with torch.no_grad():
            features = self.resnet(x)
        
        # Reshape for LSTM
        features = features.view(batch_size, timesteps, -1)
        
        # Sequence processing through LSTM
        lstm_out, _ = self.lstm(features)
        
        # Classification
        out = self.fc(lstm_out[:, -1, :])
        return out
    
def preprocess_frame(frame, size=(224, 224)):
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize(size),
        transforms.ToTensor(),
    ])
    return transform(frame)

# Function to load and preprocess video
def load_video(video_path, max_frames=16):
    cap = cv2.VideoCapture(video_path)
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret or len(frames) == max_frames:
                break
            frame = preprocess_frame(frame)
            frames.append(frame)
    finally:
        cap.release()
    
    # Stack frames and add batch dimension
    frames_tensor = torch.stack(frames)
    frames_tensor = frames_tensor.unsqueeze(0)  # Add batch dimension
    return frames_tensor

# Hyperparameters
num_classes = 1 # Define the number of classes
hidden_size = 256 # LSTM hidden size
num_layers = 2 # Number of LSTM layers

# Model instance
model = RNN_Model(num_classes, hidden_size, num_layers)
model.to(device)

video_path = "video_test_dataset/0/miss_4.mp4"
video_tensor = load_video(video_path).to(device)

with torch.no_grad():
    outputs = model(video_tensor)
    prob = torch.sigmoid(outputs).item()
    
print("Probability of miss: {:.2f}%".format(prob * 100))

Probability of miss: 49.46%


In [4]:
class VideoDataset(Dataset):
    def __init__(self, directory, max_frames=16, transform=None):
        self.directory = directory
        self.max_frames = max_frames
        self.transform = transform
        self.classes = ['0', '1']
        self.data = []
        
        # Load data
        for label in self.classes:
            class_dir = os.path.join(directory, label)
            for video in os.listdir(class_dir):
                self.data.append((os.path.join(class_dir, video), int(label)))
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        video_path, label = self.data[idx]
        cap = cv2.VideoCapture(video_path)
        frames = []
        try:
            while True:
                ret, frame = cap.read()
                if not ret or len(frames) == self.max_frames:
                    break
                if self.transform:
                    frame = self.transform(frame)
                frames.append(frame)
        finally:
            cap.release()
        
        frames_tensor = torch.stack(frames)
        return frames_tensor, label

In [9]:
dataset_dir = "cropped_video_dataset"
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

video_dataset = VideoDataset(dataset_dir, transform=transform)

In [15]:
# Define batch size
batch_size = 16

# Split dataset into train and test sets
train_size = int(0.8 * len(video_dataset))
test_size = len(video_dataset) - train_size
train_dataset, test_dataset = random_split(video_dataset, [train_size, test_size])

# Create train loader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Create test loader
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

optimizer = optim.Adam(model.parameters(), lr = 0.001)
criterion = nn.BCEWithLogitsLoss()

In [23]:
train_epoch(model = model,
            optimizer = optimizer,
            criterion = criterion,
            train_loader = train_loader,
            device = device,
            )

test(model = model, 
     criterion = criterion,
     valid_loader = test_loader,
     device = device)


(tensor(0.0494, device='cuda:0'), 0.7777777777777778)

In [24]:
num_epochs = 10
model = RNN_Model(num_classes, hidden_size, num_layers)
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for i, (inputs, labels) in enumerate(train_loader):
        # Convert labels to the correct shape
        labels = labels.float().unsqueeze(1)  # Reshape for BCEWithLogitsLoss

        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if i % 10 == 9:
            print(f'Epoch {epoch + 1}, Batch {i + 1}, Loss: {running_loss / 10:.4f}')
            running_loss = 0.0

    # Validation step after each epoch
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            outputs = model(inputs)
            predicted = torch.sigmoid(outputs) >= 0.5  # Apply sigmoid and threshold
            total += labels.size(0)
            correct += (predicted.flatten() == labels).sum().item()

    print(f'Epoch {epoch + 1}, Accuracy: {100 * correct / total}%')

torch.save(model.state_dict(), 'video_classification_model.pth')


Epoch 1, Accuracy: 77.77777777777777%
Epoch 2, Accuracy: 77.77777777777777%
Epoch 3, Accuracy: 66.66666666666667%
Epoch 4, Accuracy: 66.66666666666667%
Epoch 5, Accuracy: 66.66666666666667%
Epoch 6, Accuracy: 66.66666666666667%
Epoch 7, Accuracy: 66.66666666666667%
Epoch 8, Accuracy: 66.66666666666667%
Epoch 9, Accuracy: 66.66666666666667%
Epoch 10, Accuracy: 66.66666666666667%


In [26]:
len(test_loader.dataset)

9

In [28]:
len(video_dataset)

41