<a href="https://colab.research.google.com/github/kartoone/cosc470/blob/main/examples/lstm/CNN_LSTM_UCF101.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install datasets
!pip install -q pytorchvideo transformers evaluate
# https://www.researchgate.net/publication/364114035_Image_Classification_using_a_Hybrid_LSTM-CNN_Deep_Neural_Network
# https://www.nature.com/articles/s41598-021-93656-0
# https://huggingface.co/docs/transformers/en/tasks/video_classification


In [28]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
from datasets import load_dataset, VerificationMode  # Hugging Face datasets library
import cv2
from huggingface_hub import hf_hub_download

# --- Step 1. Load UCF101 Dataset from Hugging Face ---

# This will download and prepare the UCF101 dataset.
# Note: The dataset on Hugging Face may provide videos as dictionaries.
# Here we assume each sample has a "video" field with an "array" key containing the video frames,
# and a "label" field (an integer from 0 to 100).

hf_dataset = load_dataset("sayakpaul/ucf101-subset", verification_mode=VerificationMode.NO_CHECKS)

#hf_dataset_identifier = "sayakpaul/ucf101-subset"
#filename = "UCF101_subset.tar.gz"
#file_path = hf_hub_download(repo_id=hf_dataset_identifier, filename=filename, repo_type="dataset")

#import tarfile
#with tarfile.open(file_path) as t:
#  t.extractall(".")

# --- Step 2. Create a Custom PyTorch Dataset Wrapper ---

class UCF101TorchDataset(Dataset):
    def __init__(self, hf_dataset, frames_per_video=30, transform=None):
        """
        hf_dataset: Hugging Face dataset object for UCF101.
        frames_per_video: Fixed number of frames per video.
        transform: Optional transform to be applied on a sample.
        """
        self.hf_dataset = hf_dataset
        self.frames_per_video = frames_per_video
        self.transform = transform

    def __len__(self):
        return len(self.hf_dataset)

    def __getitem__(self, idx):
        sample = self.hf_dataset[idx]
        # Extract video array; expected shape: (num_frames, height, width, channels)
        video = sample["video"]["array"]

        # If the video has more frames than needed, take the first frames_per_video frames.
        if video.shape[0] >= self.frames_per_video:
            video = video[:self.frames_per_video]
        else:
            # Pad with black frames if not enough frames.
            pad_count = self.frames_per_video - video.shape[0]
            pad_shape = (pad_count, video.shape[1], video.shape[2], video.shape[3])
            pad_frames = np.zeros(pad_shape, dtype=video.dtype)
            video = np.concatenate([video, pad_frames], axis=0)

        # Convert from HWC to CHW for each frame so that each frame becomes (C, H, W)
        video = np.transpose(video, (0, 3, 1, 2))
        video = video.astype(np.float32) / 255.0  # Normalize pixel values to [0, 1]

        # Apply any additional transforms if provided
        if self.transform:
            video = self.transform(video)

        # Retrieve the label (assumed to be an integer)
        label = sample["label"]

        # Convert video and label to PyTorch tensors
        video_tensor = torch.tensor(video)  # shape: (frames, channels, H, W)
        label_tensor = torch.tensor(label, dtype=torch.long)
        return video_tensor, label_tensor

# Create the PyTorch dataset and dataloader
frames_per_video = 30
batch_size = 8

train_dataset = UCF101TorchDataset(hf_dataset, frames_per_video=frames_per_video)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# --- Step 3. Define a CNN-LSTM Model in PyTorch ---

class CNN_LSTM(nn.Module):
    def __init__(self, cnn_output_size=256, lstm_hidden_size=50, num_classes=101):
        super(CNN_LSTM, self).__init__()

        # Define the CNN part to process individual frames.
        # This CNN will be applied to each frame independently.
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),  # (B, 32, H, W)
            nn.ReLU(),
            nn.MaxPool2d(2),                           # (B, 32, H/2, W/2)
            nn.Conv2d(32, 64, kernel_size=3, padding=1), # (B, 64, H/2, W/2)
            nn.ReLU(),
            nn.MaxPool2d(2),                           # (B, 64, H/4, W/4)
            nn.Flatten(),                              # flatten spatial dimensions
            nn.Linear(64 * 16 * 16, cnn_output_size),   # assuming input frames are 64x64 pixels
            nn.ReLU()
        )
        # LSTM to process sequence of CNN features.
        self.lstm = nn.LSTM(input_size=cnn_output_size, hidden_size=lstm_hidden_size, batch_first=True)
        # Final classification layer.
        self.fc = nn.Linear(lstm_hidden_size, num_classes)

    def forward(self, x):
        # x shape: (batch, time, channels, height, width)
        batch_size, time_steps, C, H, W = x.size()
        # Reshape to combine batch and time: (batch*time, C, H, W)
        x = x.view(batch_size * time_steps, C, H, W)
        # Apply CNN to each frame.
        cnn_features = self.cnn(x)  # shape: (batch*time, cnn_output_size)
        # Reshape back to (batch, time, cnn_output_size)
        cnn_features = cnn_features.view(batch_size, time_steps, -1)
        # Pass the sequence of features to the LSTM.
        lstm_out, (h_n, _) = self.lstm(cnn_features)
        # Use the last hidden state for classification.
        out = self.fc(h_n[-1])
        return out

# Instantiate the model.
model = CNN_LSTM()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# --- Step 4. (Optional) Training Loop Skeleton ---

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

num_epochs = 5  # for demonstration

model.train()
for epoch in range(num_epochs):
   running_loss = 0.0
   for videos, labels in train_loader:
        videos = videos.to(device)  # shape: (batch, frames, channels, H, W)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(videos)  # shape: (batch, num_classes)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
   print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}")

print("Training loop finished.")


KeyError: "Invalid key: 0. Please first select a split. For example: `my_dataset_dictionary['train'][0]`. Available splits: ['train']"