<a href="https://colab.research.google.com/github/ucaokylong/Some_small_projects/blob/main/Video_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%mkdir dataset
%cd dataset
!gdown 1N93rb_uFqKRZ9naX8CXShFt5RJHOmjZH
!unzip -q rwf-2000.zip

/content/dataset
Downloading...
From (original): https://drive.google.com/uc?id=1N93rb_uFqKRZ9naX8CXShFt5RJHOmjZH
From (redirected): https://drive.google.com/uc?id=1N93rb_uFqKRZ9naX8CXShFt5RJHOmjZH&confirm=t&uuid=a63df0f3-116d-4a14-a4d9-a7e9b6ffb8c7
To: /content/dataset/rwf-2000.zip
100% 1.25G/1.25G [00:20<00:00, 61.3MB/s]


In [9]:
import os
from PIL import Image
import torch
from torch.utils.data import Dataset

class VideoDataset(Dataset):
    def __init__(self, root_dir, phase="train", transform=None, n_frames=None):
        """
        Args:
            root_dir (string): Directory with all the videos (each video as a subdirectory of frames).
            transform (callable, optional): Optional transform to be applied on a sample.
            n_frames (int, optional): Number of frames to sample from each video, uniformly. If None, use all frames.
        """
        self.root_dir = root_dir
        self.transform = transform
        self.n_frames = n_frames
        self.phase = phase
        self.videos, self.labels = self._load_videos()

    def _load_videos(self):
        videos, labels = [], []
        class_id = 0

        video_folders = os.listdir(os.path.join(self.root_dir, self.phase))

        for folder in video_folders:
            video_paths = os.listdir(os.path.join(self.root_dir, self.phase, folder))

            for video_path in video_paths:
                video_folder = os.path.join(self.root_dir, self.phase, folder, video_path)
                frames = sorted(
                    (os.path.join(video_folder, f) for f in os.listdir(video_folder)),
                    key=lambda f: int("".join(filter(str.isdigit, os.path.basename(f)))),
                )

                if self.n_frames:
                    frames = self._uniform_sample(frames, self.n_frames)

                videos.append(frames)
                labels.append(class_id)

            class_id += 1

        return videos, labels

    def _uniform_sample(self, frames, n_frames):
        """
        Helper method to uniformly sample n_frames from the frames list.
        """
        stride = max(1, len(frames) // n_frames)
        sampled = [frames[i] for i in range(0, len(frames), stride)]
        return sampled[:n_frames]

    def __len__(self):
        return len(self.videos)

    def __getitem__(self, idx):
        video_frames = self.videos[idx]
        label = self.labels[idx]
        images = []
        for frame_path in video_frames:
            image = Image.open(frame_path).convert("RGB")
            if self.transform:
                image = self.transform(image)
            images.append(image)

        # Stack images along new dimension (sequence length)
        data = torch.stack(images, dim=0)

        # Rearrange to have the shape (C, T, H, W)
        data = data.permute(1, 0, 2, 3)
        return data, label


In [7]:
!pip install video_dataset

[31mERROR: Could not find a version that satisfies the requirement video_dataset (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for video_dataset[0m[31m
[0m

In [10]:
import os
from torchvision import transforms
from torch.utils.data import DataLoader
# from video_dataset import VideoDataset  # Ensure this matches the file name where VideoDataset is defined

# Constants
BATCH_SIZE = 16
MAX_LEN = 15
IMAGE_SIZE = 224

# Define transformations
transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
])

# Load dataset
train_dataset = VideoDataset(
    root_dir="/content/dataset/rwf-2000", phase="train", transform=transform, n_frames=MAX_LEN
)

val_dataset = VideoDataset(
    root_dir="/content/dataset/rwf-2000", phase="val", transform=transform, n_frames=MAX_LEN
)

# Count number of CPUs
cpus = os.cpu_count()
print(f"Number of CPUs: {cpus}")

# Create data loaders
train_loader = DataLoader(
    train_dataset, batch_size=BATCH_SIZE, num_workers=cpus, shuffle=True
)
val_loader = DataLoader(
    val_dataset, batch_size=BATCH_SIZE, num_workers=cpus, shuffle=False
)


Number of CPUs: 2


In [11]:
#SINGLE FRAME
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import resnet18

class Model(nn.Module):
    def __init__(self, num_classes=2):
        super(Model, self).__init__()
        self.resnet = resnet18(pretrained=True)
        self.resnet.fc = nn.Sequential(nn.Linear(self.resnet.fc.in_features, 512))
        self.fc1 = nn.Linear(512, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x_3d):
        # (bs, C, T, H, W) -> (bs, T, C, H, W)
        x_3d = x_3d.permute(0, 2, 1, 3, 4)

        logits = []
        for t in range(x_3d.size(1)):
            out = self.resnet(x_3d[:, t, :, :, :])

            x = self.fc1(out)
            x = F.relu(x)
            x = self.fc2(x)

            logits.append(x)

        # mean pooling
        logits = torch.stack(logits, dim=0)
        logits = torch.mean(logits, dim=0)
        return logits


In [12]:
#LATE FUSION
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import resnet18

class Model(nn.Module):
    def __init__(self, num_classes=2):
        super(Model, self).__init__()
        self.resnet = resnet18(pretrained=True)
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 512)
        )
        self.fc1 = nn.Linear(512, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x_3d):
        # (bs, C, T, H, W) -> (bs, T, C, H, W)
        x_3d = x_3d.permute(0, 2, 1, 3, 4)

        features = []
        for t in range(x_3d.size(1)):
            out = self.resnet(x_3d[:, t, :, :, :])
            features.append(out)

        # average pooling
        out = torch.mean(torch.stack(features), dim=0)

        x = self.fc1(out)
        x = F.relu(x)
        x = self.fc2(x)
        return x


In [4]:
#EARLY FUSION
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import resnet18

class Model(nn.Module):
    def __init__(self, num_classes=2, num_input_channel=48):
        super(Model, self).__init__()
        self.resnet = resnet18(pretrained=True)

        # Modify the first convolutional layer to accommodate the custom number of input channels
        self.resnet.conv1 = nn.Conv2d(
            num_input_channel, 64, kernel_size=7, stride=2, padding=3, bias=False
        )

        # Replace the fully connected layer with a custom one
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 512)
        )

        # Additional fully connected layers
        self.fc1 = nn.Linear(512, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x_3d):
        # Input shape: (batch_size, channels, time, height, width)
        # Permute to: (batch_size, time, channels, height, width)
        x_3d = x_3d.permute(0, 2, 1, 3, 4).contiguous()

        # Reshape to: (batch_size, time * channels, height, width)
        x_3d = x_3d.view(
            x_3d.size(0), x_3d.size(1) * x_3d.size(2), x_3d.size(3), x_3d.size(4)
        )

        # Pass through the modified ResNet18 model
        out = self.resnet(x_3d)

        # Fully connected layers
        x = self.fc1(out)
        x = F.relu(x)
        x = self.fc2(x)

        return x


In [None]:
#LSTM-CNN
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import resnet18

class Model(nn.Module):
    def __init__(self, num_classes=2):
        super(Model, self).__init__()
        self.resnet = resnet18(pretrained=True)

        # Replace the final fully connected layer of ResNet18
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 512)
        )

        # Define an LSTM layer
        self.lstm = nn.LSTM(input_size=512, hidden_size=389, num_layers=3)

        # Define additional fully connected layers
        self.fc1 = nn.Linear(389, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x_3d):
        # Input shape: (batch_size, channels, time, height, width)
        # Permute to: (batch_size, time, channels, height, width)
        x_3d = x_3d.permute(0, 2, 1, 3, 4)

        hidden = None
        for t in range(x_3d.size(1)):
            # Pass each frame through the ResNet model
            x = self.resnet(x_3d[:, t, :, :, :])

            # Pass the output through the LSTM
            out, hidden = self.lstm(x.unsqueeze(0), hidden)

        # Take the last output of the LSTM
        x = self.fc1(out[-1, :, :])
        x = F.relu(x)
        x = self.fc2(x)

        return x


In [None]:
#VISION TRANSFORMER
import torch.nn as nn
from transformers import VivitConfig, VivitForVideoClassification

class Model(nn.Module):
    def __init__(self, num_classes=2, image_size=224, num_frames=15):
        super(Model, self).__init__()

        # Initialize Vivit configuration
        cfg = VivitConfig()
        cfg.num_classes = num_classes
        cfg.image_size = image_size
        cfg.num_frames = num_frames

        # Load pre-trained Vivit model for video classification
        self.vivit = VivitForVideoClassification.from_pretrained(
            "google/vivit-b-16x2-kinetics400",
            config=cfg,
            ignore_mismatched_sizes=True
        )

    def forward(self, x_3d):
        # Input shape: (batch_size, channels, time, height, width)
        # Permute to: (batch_size, time, channels, height, width)
        x_3d = x_3d.permute(0, 2, 1, 3, 4)

        # Pass the permuted tensor through the Vivit model
        out = self.vivit(x_3d)

        # Return the logits from the Vivit model's output
        return out.logits
