In [2]:
import gc
import os
from typing import List

import cv2
import numpy as np
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.distributed import DistributedSampler

In [3]:
if torch.cuda.is_available():
    print(f"Number of GPU devices: {torch.cuda.device_count()}")
    for i in range(torch.cuda.device_count()):

        gpu_props = torch.cuda.get_device_properties(i)
        print(f"\nGPU properties:")
        print(f"  Name: {gpu_props.name}")
        print(f"  Multi Processor Count: {gpu_props.multi_processor_count}")
        print(f"  Total memory: {gpu_props.total_memory / 1024**2:.0f} MB")
        print(f"  Memory allocated: {torch.cuda.memory_allocated(i) / 1024**2:.2f} MB")
        print(f"  Memory cached: {torch.cuda.memory_reserved(i) / 1024**2:.2f} MB")

Number of GPU devices: 2

GPU properties:
  Name: NVIDIA GeForce RTX 3060 Ti
  Multi Processor Count: 38
  Total memory: 7870 MB
  Memory allocated: 0.00 MB
  Memory cached: 0.00 MB

GPU properties:
  Name: NVIDIA GeForce GTX 1080 Ti
  Multi Processor Count: 28
  Total memory: 11165 MB
  Memory allocated: 0.00 MB
  Memory cached: 0.00 MB


In [5]:
with open("./data/data.pkl", "rb") as f:
    X_data, Y_data = pk.load(f)
X_data.shape, Y_data.shape

((20584,), (20584,))

# Pre-processing X data

meme
- gif: visual (5 frames) / audio (Null) / text (fr visual)
- mp4: visual (5 frames) / audio (Null) / text (fr visual)

In [6]:
# CONSTANTS
NUM_FRAMES = 5
IMAGE_SIZE = (512, 512)

In [None]:
# Helper functions
def input_processor(video_path: str) -> np.ndarray:
    frames = []
    cap = cv2.VideoCapture(video_path)

    # Get total number of frames
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    if total_frames <= 0:
        raise ValueError("Could not read frames from video file")

    # Calculate frame indices to extract
    frame_indices = [
        i * (total_frames - 1) // (NUM_FRAMES - 1) for i in range(NUM_FRAMES)
    ]

    for frame_idx in frame_indices:
        # Set frame position
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)

        # Read frame
        ret, frame = cap.read()
        if ret:
            # Convert BGR to RGB
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame_rgb = cv2.resize(frame_rgb, (512, 512))

            frames.append(frame_rgb)

    # Release video capture object
    cap.release()

    while len(frames) < 5:
        frames.append(frames[-1])

    video_data = np.stack(frames) / 255.0
    return video_data.astype(np.float32)


def train_epoch(model, dataloader, criterion, optimizer, losses, accuracies):
    model.train()
    total_loss = 0
    correct = 0
    total = 0

    for batch_idx, (videos, labels) in enumerate(dataloader):
        videos, labels = videos.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(videos)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        # Calculate accuracy
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

        # Print progress
        if batch_idx % 10 == 0:
            loss = loss.item()
            acc = 100.0 * correct / total
            print(
                f"Batch: {batch_idx}/{len(dataloader)}, "
                f"Loss: {loss:.4f}, "
                f"Acc: {acc:.2f}%"
            )
            losses.append(loss)
            accuracies.append(acc)

In [None]:
def input_processor(video_path: str) -> np.ndarray:
    frames = []
    cap = cv2.VideoCapture(video_path)

    # Get total number of frames
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    if total_frames <= 0:
        raise ValueError("Could not read frames from video file")

    # Calculate frame indices to extract
    frame_indices = [
        i * (total_frames - 1) // (NUM_FRAMES - 1) for i in range(NUM_FRAMES)
    ]

    for frame_idx in frame_indices:
        # Set frame position
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)

        # Read frame
        ret, frame = cap.read()
        if ret:
            # Convert BGR to RGB
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame_rgb = cv2.resize(frame_rgb, (512, 512))

            frames.append(frame_rgb)

    # Release video capture object
    cap.release()

    while len(frames) < 5:
        frames.append(frames[-1])

    video_data = np.stack(frames) / 255.0
    return video_data.astype(np.float32)


class VideoDataset(Dataset):
    def __init__(
        self,
        video_paths: List[str],
        template_ids: List[int],
        custom_processor: callable,
    ):
        self.video_paths = video_paths
        # Convert template IDs to tensor immediately
        self.template_ids = torch.tensor(template_ids, dtype=torch.long)
        self.custom_processor = custom_processor

        # Store unique template IDs for reference
        self.unique_templates = torch.unique(self.template_ids)
        self.num_classes = len(self.unique_templates)

        # Create mapping from template ID to class index (0 to num_classes-1)
        self.template_to_idx = {
            int(tid): idx for idx, tid in enumerate(self.unique_templates)
        }

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        video_path = self.video_paths[idx]
        template_id = int(self.template_ids[idx])

        video = self.custom_processor(video_path)
        video_tensor = torch.from_numpy(video)

        # Convert template ID to class index
        class_idx = self.template_to_idx[template_id]

        return video_tensor, torch.tensor(class_idx, dtype=torch.long)

    def get_original_template_id(self, class_idx: int) -> int:
        """Convert back from class index to original template ID"""
        return int(self.unique_templates[class_idx])


class VideoCNN(nn.Module):
    def __init__(self, num_classes: int):
        super().__init__()

        self.conv3d = nn.Sequential(
            nn.Conv3d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=2),
            nn.Conv3d(32, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=2),
        )

        # Calculate the size of the flattened features
        self.flat_features = self._calculate_flat_features()

        self.classifier = nn.Sequential(
            nn.Linear(self.flat_features, 8), nn.ReLU(), nn.Linear(8, num_classes)
        )

    def _calculate_flat_features(self) -> int:
        x = torch.randn(1, 3, NUM_FRAMES, 512, 512)
        x = self.conv3d(x)
        return int(np.prod(x.shape[1:]))

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # Expected input shape: (batch_size, num_frames, height, width, channels)
        # Need to permute to: (batch_size, channels, num_frames, height, width)
        x = x.permute(0, 4, 1, 2, 3).contiguous()

        x = self.conv3d(x)
        # print(f"Shape after conv3d: {x.shape}")

        x = x.view(x.size(0), -1)
        # print(f"Shape after flatten: {x.shape}")

        x = self.classifier(x)
        return x


def setup_ddp(rank, world_size):
    """
    Setup for distributed training
    """
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"

    # Initialize the process group
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

    # Set device for this process
    torch.cuda.set_device(rank)


def cleanup_ddp():
    """
    Clean up distributed training
    """
    dist.destroy_process_group()


def train_model_ddp(rank, world_size: int, dataset, num_epochs=1):
    # Setup DDP
    setup_ddp(rank, world_size)

    # Create dataloader
    train_sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
    num_workers = 4 * world_size
    dataloader = DataLoader(
        dataset,
        batch_size=32,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=True,
        sampler=train_sampler,
    )

    # Create model and move it to GPU
    model = VideoCNN(num_classes=dataset.num_classes).to(rank)
    model = DDP(model, device_ids=[rank])

    # Criterion & Optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(num_epochs):
        model.train()
        train_sampler.set_epoch(epoch)  # Important for proper shuffling
        running_loss = 0.0
        correct = 0
        total = 0

        for batch_idx, (videos, labels) in enumerate(dataloader):
            videos = videos.to(rank)
            labels = labels.to(rank)

            optimizer.zero_grad()
            outputs = model(videos)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

            if batch_idx % 10 == 0 and rank == 0:
                print(
                    f"Epoch: {epoch}, Batch: {batch_idx}, "
                    f"Loss: {loss.item():.4f}, "
                    f"Acc: {100.0 * correct / total:.2f}%"
                )

        if rank == 0:
            epoch_loss = running_loss / len(dataloader)
            accuracy = 100.0 * correct / total
            print(
                f"Epoch [{epoch+1}/{num_epochs}], "
                f"Loss: {epoch_loss:.4f}, "
                f"Accuracy: {accuracy:.2f}%"
            )

    cleanup_ddp()


if __name__ == "__main__":
    gc.collect()
    torch.cuda.empty_cache()

    dataset = VideoDataset(X_data, Y_data, custom_processor=input_processor)

    # Create model with correct number of output classes
    # model = VideoCNN(num_classes=dataset.num_classes)
    # Training setup
    # device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # model = model.to(device)
    # criterion = nn.CrossEntropyLoss()
    # optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    # losses = []
    # accuracies = []
    # train_epoch(model, dataloader, criterion, optimizer, losses, accuracies)

    world_size = torch.cuda.device_count()
    mp.set_start_method("spawn", force=True)
    mp.spawn(
        train_model_ddp,
        args=(world_size, dataset, 1),
        nprocs=world_size,
        join=True,
    )

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/lehoangchibach/anaconda3/envs/gif_analytic/lib/python3.11/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/lehoangchibach/anaconda3/envs/gif_analytic/lib/python3.11/multiprocessing/spawn.py", line 132, in _main
    self = reduction.pickle.load(from_parent)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'train_model_ddp' on <module '__main__' (built-in)>
