# Data Preparation

## Frame Extraction

In [None]:
import cv2
import os
from tqdm import tqdm

# Define the path to the dataset
dataset_path = '/kaggle/input/ucf101-action-recognition/train'
dataset_path_val = '/kaggle/input/ucf101-action-recognition/val'


# Define the path where extracted frames will be saved
frames_output_path = '/kaggle/working/frames/train'
frames_output_path_val = '/kaggle/working/frames/val'

# Create the output directory if it doesn't exist
if not os.path.exists(frames_output_path):
    os.makedirs(frames_output_path)

# Create the output directory if it doesn't exist
if not os.path.exists(frames_output_path_val):
    os.makedirs(frames_output_path_val)

# List of selected classes
classes = ['PushUps', 'PullUps', 'BenchPress', 'Lunges', 'WallPushups']

In [None]:
def extract_frames_from_train_videos():
    for class_name in classes:
        print(f"Processing class: {class_name}")
        class_video_path = os.path.join(dataset_path, class_name)
        class_frames_output_path = os.path.join(frames_output_path, class_name)
        
        if not os.path.exists(class_frames_output_path):
            os.makedirs(class_frames_output_path)
        
        video_files = os.listdir(class_video_path)
        
        for video_file in tqdm(video_files):
            video_path = os.path.join(class_video_path, video_file)
            video_filename = os.path.splitext(video_file)[0]
            video_capture = cv2.VideoCapture(video_path)
            
            frame_count = 0
            success = True
            while success:
                success, frame = video_capture.read()
                if success:
                    # Resize to 64x64 pixels
                    frame = cv2.resize(frame, (64, 64), interpolation=cv2.INTER_AREA)
                    # Convert to grayscale
                    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
                    # Save the frame as an image
                    frame_filename = f"{video_filename}_frame_{frame_count:05d}.jpg"
                    frame_filepath = os.path.join(class_frames_output_path, frame_filename)
                    cv2.imwrite(frame_filepath, frame)
                    frame_count += 1
            video_capture.release()

In [None]:
def extract_frames_from_val_videos():
    for class_name in classes:
        print(f"Processing class: {class_name}")
        class_video_path = os.path.join(dataset_path_val, class_name)
        class_frames_output_path = os.path.join(frames_output_path_val, class_name)
        
        if not os.path.exists(class_frames_output_path):
            os.makedirs(class_frames_output_path)
        
        video_files = os.listdir(class_video_path)
        
        for video_file in tqdm(video_files):
            video_path = os.path.join(class_video_path, video_file)
            video_filename = os.path.splitext(video_file)[0]
            video_capture = cv2.VideoCapture(video_path)
            
            frame_count = 0
            success = True
            while success:
                success, frame = video_capture.read()
                if success:
                    # Resize to 64x64 pixels
                    frame = cv2.resize(frame, (64, 64), interpolation=cv2.INTER_AREA)
                    # Convert to grayscale
                    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
                    # Save the frame as an image
                    frame_filename = f"{video_filename}_frame_{frame_count:05d}.jpg"
                    frame_filepath = os.path.join(class_frames_output_path, frame_filename)
                    cv2.imwrite(frame_filepath, frame)
                    frame_count += 1
            video_capture.release()

In [None]:
extract_frames_from_train_videos()
extract_frames_from_val_videos()

## Video List

In [None]:
import pandas as pd
import os

train_csv_path = '/kaggle/input/ucf101-action-recognition/train.csv'
val_csv_path = '/kaggle/input/ucf101-action-recognition/val.csv'

train_df = pd.read_csv(train_csv_path)
val_df = pd.read_csv(val_csv_path)

In [None]:
def create_video_list(df, classes):
    # Filter the DataFrame for selected classes
    filtered_df = df[df['label'].isin(classes)]
    
    # Extract class names and video filenames
    video_list = list(zip(filtered_df['label'], filtered_df['clip_path'].apply(lambda x: os.path.basename(x))))
    
    return video_list

In [None]:
# Generate the lists
train_videos = create_video_list(train_df, classes)
val_videos = create_video_list(val_df, classes)

# Optional: Print the number of videos in each list
print(f"Number of training videos: {len(train_videos)}")
print(f"Number of validation videos: {len(val_videos)}")

# Optional: Print the first few entries to verify
print("First 5 training videos:")
for video in train_videos[:5]:
    print(video)

print("\nFirst 5 validation videos:")
for video in val_videos[:5]:
    print(video)

## Data Generation

In [None]:
import numpy as np
from tensorflow.keras.utils import Sequence
import cv2
import os
import random

class FrameSequenceGenerator(Sequence):
    def __init__(self, video_list, batch_size, input_length, target_length, frames_path, augment=False):
        self.video_list = video_list
        self.batch_size = batch_size
        self.input_length = input_length
        self.target_length = target_length
        self.frames_path = frames_path
        self.augment = augment

        # Precompute all possible sequences
        self.sequences = []
        for class_name, video_filename in self.video_list:
            frames_folder = os.path.join(self.frames_path, class_name)
            frame_prefix = os.path.splitext(video_filename)[0]
            frame_files = sorted([
                f for f in os.listdir(frames_folder)
                if f.startswith(frame_prefix) and f.endswith('.jpg')
            ])

            total_required = self.input_length + self.target_length
            if len(frame_files) < total_required:
                continue  # Skip if not enough frames

            for i in range(len(frame_files) - total_required + 1):
                input_frame_paths = [
                    os.path.join(frames_folder, frame_files[j])
                    for j in range(i, i + self.input_length)
                ]
                target_frame_paths = [
                    os.path.join(frames_folder, frame_files[j])
                    for j in range(i + self.input_length, i + total_required)
                ]

                self.sequences.append((input_frame_paths, target_frame_paths))

    def __len__(self):
        # Number of batches per epoch
        return int(np.ceil(len(self.sequences) / self.batch_size))

    def __getitem__(self, idx):
        # Generate one batch of data
        batch_sequences = self.sequences[idx * self.batch_size:(idx + 1) * self.batch_size]
        X_batch = []
        y_batch = []

        for input_frame_paths, target_frame_paths in batch_sequences:
            # Load and preprocess input frames
            input_sequence = [self.load_and_preprocess_frame(fp, augment=self.augment) for fp in input_frame_paths]
            # Load and preprocess target frames
            target_sequence = [self.load_and_preprocess_frame(fp, augment=False) for fp in target_frame_paths]

            X_batch.append(np.array(input_sequence))
            y_batch.append(np.array(target_sequence))

        # Convert lists to numpy arrays
        X_batch = np.array(X_batch)
        y_batch = np.array(y_batch)

        return X_batch, y_batch

    @staticmethod
    def load_and_preprocess_frame(frame_path, augment=False):
        # Load the image in grayscale mode
        frame = cv2.imread(frame_path, cv2.IMREAD_GRAYSCALE)
        if frame is None:
            raise ValueError(f"Failed to load image at {frame_path}")

        # Apply random horizontal flip
        if augment and random.random() < 0.5:
            frame = cv2.flip(frame, 1)

        # Normalize pixel values to [0, 1]
        frame = frame / 255.0

        # Expand dimensions to add the channel dimension
        frame = np.expand_dims(frame, axis=-1)

        return frame

In [None]:
# Parameters
batch_size = 16
input_length = 20
target_length = 10

# Paths to your extracted frames
train_frames_path = '/kaggle/working/frames/train'
val_frames_path = '/kaggle/working/frames/val'

# Instantiate the data generators with augmentation for training
train_generator = FrameSequenceGenerator(
    video_list=train_videos,
    batch_size=batch_size,
    input_length=input_length,
    target_length=target_length,
    frames_path=train_frames_path,
    augment=True
)

# Validation generator without augmentation
val_generator = FrameSequenceGenerator(
    video_list=val_videos,
    batch_size=batch_size,
    input_length=input_length,
    target_length=target_length,
    frames_path=val_frames_path,
    augment=False
)
print(f"Number of training batches: {len(train_generator)}")
print(f"Number of validation batches: {len(val_generator)}")

# Fetch a batch from the training generator
X_batch, y_batch = train_generator.__getitem__(0)

# Print the shapes
print(f"Input batch shape: {X_batch.shape}")    # Expected: (batch_size, input_length, 64, 64, 1)
print(f"Target batch shape: {y_batch.shape}")  # Expected: (batch_size, target_length, 64, 64, 1)

## Visualization

In [None]:
import matplotlib.pyplot as plt

def visualize_sequence(input_seq, target_seq):
    num_input = input_seq.shape[0]
    num_target = target_seq.shape[0]

    fig, axes = plt.subplots(3, max(num_input, num_target), figsize=(30, 4))

    # Plot input frames
    for i in range(num_input):
        axes[0, i].imshow(input_seq[i].squeeze(), cmap='gray')
        axes[0, i].axis('off')
        axes[0, i].set_title(f"Input Frame {i+1}")

    # Plot target frames
    for i in range(num_target):
        axes[1, i].imshow(target_seq[i].squeeze(), cmap='gray')
        axes[1, i].axis('off')
        axes[1, i].set_title(f"Target Frame {i+1}")

    plt.show()

# Visualize the first sequence in the batch
visualize_sequence(X_batch[0], y_batch[0])

# Model Implementation

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import os
import cv2
from skimage.metrics import structural_similarity as ssim
from tqdm import tqdm
import matplotlib.pyplot as plt

In [None]:
class VideoFrameDataset(Dataset):
    def __init__(self, video_list, frames_path, input_length=10, target_length=10, augment=False):
        self.video_list = video_list
        self.frames_path = frames_path
        self.input_length = input_length
        self.target_length = target_length
        self.augment = augment
        self.sequences = []

        for class_name, video_filename in self.video_list:
            frames_folder = os.path.join(self.frames_path, class_name)
            frame_prefix = os.path.splitext(video_filename)[0]
            frame_files = sorted([
                f for f in os.listdir(frames_folder)
                if f.startswith(frame_prefix) and f.endswith('.jpg')
            ])
            total_required = self.input_length + self.target_length
            if len(frame_files) < total_required:
                continue
            for i in range(len(frame_files) - total_required + 1):
                input_frames = frame_files[i:i + self.input_length]
                target_frames = frame_files[i + self.input_length:i + total_required]
                self.sequences.append((frames_folder, input_frames, target_frames))

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        frames_folder, input_frames, target_frames = self.sequences[idx]
        input_sequence = [self.load_frame(os.path.join(frames_folder, f)) for f in input_frames]
        target_sequence = [self.load_frame(os.path.join(frames_folder, f)) for f in target_frames]

        input_seq = np.stack(input_sequence, axis=0)  # [t, 1, H, W]
        target_seq = np.stack(target_sequence, axis=0)  # [t, 1, H, W]
        
        input_seq = torch.tensor(input_seq, dtype=torch.float32)
        target_seq = torch.tensor(target_seq, dtype=torch.float32)
        
        return input_seq, target_seq

    @staticmethod
    def load_frame(frame_path):
        frame = cv2.imread(frame_path, cv2.IMREAD_GRAYSCALE)
        frame = frame / 255.0  # Normalize
        frame = np.expand_dims(frame, axis=0)  # [1, H, W]
        return frame

In [None]:
from torch.utils.data import DataLoader

# Instantiate the dataset
train_dataset = VideoFrameDataset(
    video_list=train_videos,
    frames_path=train_frames_path,
    input_length=20,
    target_length=10,
    augment=True
)

val_dataset = VideoFrameDataset(
    video_list=val_videos,
    frames_path=val_frames_path,
    input_length=20,
    target_length=10,
    augment=False
)

# Instantiate dataloaders
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=0)

## ConvLSTM

In [None]:
class ConvLSTMCell(nn.Module):
    def __init__(self, input_dim, hidden_dim, kernel_size, bias=True):
        super(ConvLSTMCell, self).__init__()

        padding = kernel_size[0] // 2, kernel_size[1] // 2
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim

        self.conv = nn.Conv2d(in_channels=self.input_dim + self.hidden_dim,
                              out_channels=4 * self.hidden_dim,
                              kernel_size=kernel_size,
                              padding=padding,
                              bias=bias)

    def forward(self, input_tensor, cur_state):
        h_cur, c_cur = cur_state

        # Concatenate along channel axis
        combined = torch.cat([input_tensor, h_cur], dim=1)  # (batch, input_dim + hidden_dim, height, width)
        combined_conv = self.conv(combined)
        cc_i, cc_f, cc_o, cc_g = torch.split(combined_conv, self.hidden_dim, dim=1)
        i = torch.sigmoid(cc_i)
        f = torch.sigmoid(cc_f)
        o = torch.sigmoid(cc_o)
        g = torch.tanh(cc_g)

        c_next = f * c_cur + i * g
        h_next = o * torch.tanh(c_next)

        return h_next, c_next

    def init_hidden(self, batch_size, image_size):
        height, width = image_size
        device = next(self.parameters()).device
        return (torch.zeros(batch_size, self.hidden_dim, height, width, device=device),
                torch.zeros(batch_size, self.hidden_dim, height, width, device=device))

In [None]:
class ConvLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, kernel_size, num_layers, batch_first=False, bias=True, return_all_layers=False):
        super(ConvLSTM, self).__init__()

        self._check_kernel_size_consistency(kernel_size)

        # Make lists
        kernel_size = self._extend_for_multilayer(kernel_size, num_layers)
        hidden_dim  = self._extend_for_multilayer(hidden_dim, num_layers)
        if not len(kernel_size) == len(hidden_dim) == num_layers:
            raise ValueError("Inconsistent list length.")

        self.input_dim    = input_dim
        self.hidden_dim   = hidden_dim
        self.kernel_size  = kernel_size
        self.num_layers   = num_layers
        self.batch_first  = batch_first
        self.bias         = bias
        self.return_all_layers = return_all_layers

        cell_list = []
        for i in range(0, self.num_layers):
            cur_input_dim = self.input_dim if i == 0 else self.hidden_dim[i-1]

            cell = ConvLSTMCell(input_dim=cur_input_dim,
                                hidden_dim=self.hidden_dim[i],
                                kernel_size=self.kernel_size[i],
                                bias=self.bias)

            cell_list.append(cell)

        self.cell_list = nn.ModuleList(cell_list)

    def forward(self, input_tensor, hidden_state=None):
        if not self.batch_first:
            # (t, b, c, h, w) -> (b, t, c, h, w)
            input_tensor = input_tensor.permute(1, 0, 2, 3, 4)

        b, _, _, h, w = input_tensor.size()

        # Implement stateful ConvLSTM
        if hidden_state is None:
            hidden_state = self._init_hidden(batch_size=b,
                                             image_size=(h, w))

        layer_output_list = []
        last_state_list   = []

        seq_len = input_tensor.size(1)

        current_input = input_tensor

        for layer_idx in range(self.num_layers):

            h, c = hidden_state[layer_idx]
            output_inner = []

            for t in range(seq_len):
                h, c = self.cell_list[layer_idx](input_tensor=current_input[:, t, :, :, :],
                                                 cur_state=[h, c])
                output_inner.append(h)

            layer_output = torch.stack(output_inner, dim=1)
            current_input = layer_output

            layer_output_list.append(layer_output)
            last_state_list.append([h, c])

        if not self.return_all_layers:
            layer_output_list = layer_output_list[-1:]
            last_state_list   = last_state_list[-1:]

        return layer_output_list, last_state_list

    def _init_hidden(self, batch_size, image_size):
        init_states = []
        for i in range(self.num_layers):
            init_states.append(self.cell_list[i].init_hidden(batch_size, image_size))
        return init_states

    @staticmethod
    def _check_kernel_size_consistency(kernel_size):
        if not (isinstance(kernel_size, list) or isinstance(kernel_size, tuple)):
            raise ValueError('`kernel_size` must be a list or tuple')
        for ks in kernel_size:
            if not (isinstance(ks, list) or isinstance(ks, tuple)):
                raise ValueError('`kernel_size` must be a list or tuple of list or tuples')
    
    @staticmethod
    def _extend_for_multilayer(param, num_layers):
        if isinstance(param, list) or isinstance(param, tuple):
            return param
        return [param] * num_layers

In [None]:
class ConvLSTMNet(nn.Module):
    def __init__(self, input_length=30, target_length=15, img_height=64, img_width=64, channels=1, hidden_dims=[32, 32]):
        super(ConvLSTMNet, self).__init__()
        self.input_length = input_length
        self.target_length = target_length
        self.img_height = img_height
        self.img_width = img_width
        self.channels = channels

        # Define kernel_size as a list of tuples, one per layer
        kernel_size = [(3, 3) for _ in hidden_dims]

        self.convlstm = ConvLSTM(
            input_dim=channels,
            hidden_dim=hidden_dims,
            kernel_size=kernel_size,
            num_layers=len(hidden_dims),
            batch_first=True,
            bias=True,
            return_all_layers=False
        )

        self.conv = nn.Conv3d(
            in_channels=hidden_dims[-1],
            out_channels=channels,
            kernel_size=(3, 3, 3),
            padding=(1, 1, 1)
        )
        self.activation = nn.Sigmoid()

    def forward(self, x):
        # Pass through ConvLSTM
        convlstm_out, _ = self.convlstm(x)
        convlstm_out = convlstm_out[0]

        # Apply Conv3D to generate predictions
        convlstm_out = convlstm_out.permute(0, 2, 1, 3, 4)
        pred = self.conv(convlstm_out)
        pred = self.activation(pred)

        # Permute back to (batch, seq_len, channels, h, w)
        pred = pred.permute(0, 2, 1, 3, 4)

        # Take the last target_length frames
        pred = pred[:, -self.target_length:, :, :, :]

        return pred

In [None]:
# Instantiate the ConvLSTMNet
model = ConvLSTMNet()

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Training parameters
num_epochs = 5

# Initialize lists to store losses and metrics
train_losses = []
val_losses = []
val_mse_scores = []
val_ssim_scores = []

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    for X_batch, y_batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Training"):
        # Move to device
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)
        
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * X_batch.size(0)
    
    train_loss /= len(train_loader.dataset)
    train_losses.append(train_loss)
    
    # Validation phase
    model.eval()
    val_loss = 0.0
    total_mse = 0.0
    total_ssim = 0.0
    num_samples = 0
    
    with torch.no_grad():
        for X_batch, y_batch in tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Validation"):
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            val_loss += loss.item() * X_batch.size(0)
            
            # Compute MSE and SSIM per frame
            batch_size = X_batch.size(0)
            num_samples += batch_size
            outputs_np = outputs.cpu().numpy()
            y_batch_np = y_batch.cpu().numpy()
            
            for i in range(batch_size):
                mse_per_video = []
                ssim_per_video = []
                for t in range(outputs_np.shape[1]):
                    # Predicted and ground truth frames
                    pred_frame = outputs_np[i, t, 0]  # Assuming single channel
                    true_frame = y_batch_np[i, t, 0]
                    
                    # Compute MSE
                    mse_frame = np.mean((pred_frame - true_frame) ** 2)
                    mse_per_video.append(mse_frame)
                    
                    # Compute SSIM
                    ssim_frame = ssim(true_frame, pred_frame, data_range=1.0)
                    ssim_per_video.append(ssim_frame)
                
                # Average over frames in the sequence
                total_mse += np.mean(mse_per_video)
                total_ssim += np.mean(ssim_per_video)
    
    val_loss /= len(val_loader.dataset)
    val_losses.append(val_loss)
    
    # Average MSE and SSIM over the validation set
    avg_mse = total_mse / num_samples
    avg_ssim = total_ssim / num_samples
    val_mse_scores.append(avg_mse)
    val_ssim_scores.append(avg_ssim)
    
    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {train_loss:.6f}, Validation Loss: {val_loss:.6f}, MSE: {avg_mse:.6f}, SSIM: {avg_ssim:.6f}")

## PredRNN

In [None]:
class ST_LSTMCell(nn.Module):
    def __init__(self, input_dim, hidden_dim, kernel_size=(3, 3)):
        super(ST_LSTMCell, self).__init__()
        self.hidden_dim = hidden_dim  # Store the hidden dimension
        padding = kernel_size[0] // 2

        self.conv_x = nn.Conv2d(input_dim, 4 * hidden_dim, kernel_size, padding=padding)
        self.conv_h = nn.Conv2d(hidden_dim, 4 * hidden_dim, kernel_size, padding=padding)

    def forward(self, x, h, c):
        gates_x = self.conv_x(x)
        gates_h = self.conv_h(h)
        gates = gates_x + gates_h

        i, f, o, g = torch.split(gates, gates.size(1) // 4, dim=1)
        i = torch.sigmoid(i)
        f = torch.sigmoid(f)
        o = torch.sigmoid(o)
        g = torch.tanh(g)

        c_next = f * c + i * g
        h_next = o * torch.tanh(c_next)

        return h_next, c_next

class PredRNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers):
        super(PredRNN, self).__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        self.layers = nn.ModuleList(
            [ST_LSTMCell(input_dim if i == 0 else hidden_dim, hidden_dim) for i in range(num_layers)]
        )
        # Output convolution to produce the final predicted frame
        self.output_conv = nn.Conv2d(hidden_dim, input_dim, kernel_size=3, padding=1)
    
    def forward(self, input_seq, future_seq_length):
        batch_size, seq_len, _, height, width = input_seq.size()
        
        # Initialize hidden and cell states
        h = [torch.zeros(batch_size, self.hidden_dim, height, width).to(input_seq.device) for _ in range(self.num_layers)]
        c = [torch.zeros(batch_size, self.hidden_dim, height, width).to(input_seq.device) for _ in range(self.num_layers)]
        
        outputs = []
        
        # Encoding phase
        for t in range(seq_len):
            x = input_seq[:, t]
            for i, layer in enumerate(self.layers):
                h[i], c[i] = layer(x, h[i], c[i])
                x = h[i]
            # Optionally, collect outputs during the encoding phase
            # outputs.append(self.output_conv(h[-1]))
        
        # Decoding phase
        x = input_seq[:, -1]  # Start with the last input frame
        for t in range(future_seq_length):
            for i, layer in enumerate(self.layers):
                h[i], c[i] = layer(x, h[i], c[i])
                x = h[i]
            # Generate the output frame
            output_frame = self.output_conv(h[-1])
            outputs.append(output_frame)
            x = output_frame  # Use the output as the next input (closed loop)
        
        # Stack outputs and return
        output_seq = torch.stack(outputs, dim=1)  # [batch_size, future_seq_length, channels, height, width]
        return output_seq

In [None]:
def train_model(model, train_loader, optimizer, criterion, device, num_epochs=5, target_length=10):
    model.train()
    for epoch in range(num_epochs):
        epoch_loss = 0
        for input_seq, target_seq in tqdm(train_loader):
            input_seq, target_seq = input_seq.to(device), target_seq.to(device)
            
            optimizer.zero_grad()
            output_seq = model(input_seq, future_seq_length=target_length)
            
            loss = criterion(output_seq, target_seq)
            loss.backward()
            optimizer.step()
            
            epoch_loss += loss.item()
        
        avg_loss = epoch_loss / len(train_loader)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

In [None]:
from skimage.metrics import structural_similarity as ssim

def evaluate_model(model, val_loader, device, target_length=10):
    model.eval()
    total_mse = 0
    total_ssim = 0
    num_batches = len(val_loader)
    
    with torch.no_grad():
        for input_seq, target_seq in tqdm(val_loader):
            input_seq, target_seq = input_seq.to(device), target_seq.to(device)
            output_seq = model(input_seq, future_seq_length=target_length)
            
            mse = ((output_seq - target_seq) ** 2).mean().item()
            total_mse += mse
            
            # Calculate SSIM for each frame in the sequence
            output_np = output_seq.cpu().numpy()
            target_np = target_seq.cpu().numpy()
            
            batch_ssim = 0
            for i in range(output_np.shape[0]):  # Iterate over batch
                for t in range(output_np.shape[1]):  # Iterate over sequence length
                    ssim_frame = ssim(
                        target_np[i, t, 0], 
                        output_np[i, t, 0], 
                        data_range=1.0, 
                        win_size=11  # Adjust as needed
                    )
                    batch_ssim += ssim_frame
            total_ssim += batch_ssim / (output_np.shape[0] * output_np.shape[1])
    
    avg_mse = total_mse / num_batches
    avg_ssim = total_ssim / num_batches
    print(f"Validation MSE: {avg_mse:.4f}, SSIM: {avg_ssim:.4f}")

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = PredRNN(input_dim=1, hidden_dim=32, num_layers=3).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

# Training
train_model(
    model,
    train_loader,
    optimizer,
    criterion,
    device,
    num_epochs=5,
    target_length=10  # Set your desired target_length
)

# Evaluation
evaluate_model(
    model,
    val_loader,
    device,
    target_length=10  # Match the target_length used during training
)

## Transformer-based

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from skimage.metrics import structural_similarity as ssim

class PatchEmbedding(nn.Module):
    def __init__(self, img_size=64, patch_size=8, in_chans=1, embed_dim=128):
        super(PatchEmbedding, self).__init__()
        self.img_size = img_size
        self.patch_size = patch_size
        self.num_patches = (img_size // patch_size) ** 2
        self.embed_dim = embed_dim

        self.proj = nn.Conv2d(in_chans, embed_dim, kernel_size=patch_size, stride=patch_size)

    def forward(self, x):
        # x shape: [B, C, H, W]
        x = self.proj(x)  # [B, embed_dim, H/patch_size, W/patch_size]
        x = x.flatten(2)  # [B, embed_dim, num_patches]
        x = x.transpose(1, 2)  # [B, num_patches, embed_dim]
        return x

class PositionalEncoding(nn.Module):
    def __init__(self, num_patches, embed_dim):
        super(PositionalEncoding, self).__init__()
        self.pos_embed = nn.Parameter(torch.zeros(1, num_patches, embed_dim))

    def forward(self, x):
        x = x + self.pos_embed
        return x

class TransformerModel(nn.Module):
    def __init__(self, img_size=64, patch_size=8, in_chans=1, embed_dim=128, num_layers=6, num_heads=8, mlp_dim=256, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.patch_embed = PatchEmbedding(img_size, patch_size, in_chans, embed_dim)
        num_patches = (img_size // patch_size) ** 2
        self.pos_embed = PositionalEncoding(num_patches, embed_dim)

        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, dim_feedforward=mlp_dim, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        self.decoder = nn.Linear(embed_dim, patch_size * patch_size * in_chans)

        self.img_size = img_size
        self.patch_size = patch_size
        self.num_patches = num_patches
        self.embed_dim = embed_dim
        self.in_chans = in_chans

    def forward(self, x):
        # x shape: [B, T, C, H, W]
        B, T, C, H, W = x.size()
        x = x.view(B * T, C, H, W)  # [B*T, C, H, W]
        x = self.patch_embed(x)  # [B*T, num_patches, embed_dim]
        x = self.pos_embed(x)

        x = self.transformer_encoder(x)  # [B*T, num_patches, embed_dim]
        x = self.decoder(x)  # [B*T, num_patches, patch_size * patch_size * in_chans]
        x = x.transpose(1, 2)  # [B*T, patch_size * patch_size * in_chans, num_patches]

        # Reshape to reconstruct the images
        x = x.view(B*T, self.in_chans, self.patch_size, self.patch_size, self.num_patches)
        x = x.permute(0, 1, 4, 2, 3)  # [B*T, C, num_patches, patch_size, patch_size]
        x = x.contiguous().view(B*T, C, self.img_size, self.img_size)  # [B*T, C, H, W]

        x = x.view(B, T, C, H, W)  # [B, T, C, H, W]
        x = torch.sigmoid(x)  # Ensure output is between 0 and 1
        return x

In [None]:
# Instantiate the Transformer model
model = TransformerModel(
    img_size=64,
    patch_size=32,
    in_chans=1,
    embed_dim=128,
    num_layers=6,
    num_heads=8,
    mlp_dim=256,
    dropout=0.1
)

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Training parameters
num_epochs = 10

# Initialize lists to store losses and metrics
train_losses = []
val_losses = []
val_mse_scores = []
val_ssim_scores = []

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    for X_batch, y_batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Training"):
        # Concatenate input and target sequences
        input_seq = torch.cat([X_batch, y_batch], dim=1)  # [B, T_input + T_target, C, H, W]
        input_seq = input_seq.to(device)
        y_batch = y_batch.to(device)
        
        optimizer.zero_grad()
        outputs = model(input_seq)
        outputs = outputs[:, X_batch.size(1):]  # Get only the predicted frames
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * X_batch.size(0)
    
    train_loss /= len(train_loader.dataset)
    train_losses.append(train_loss)
    
    # Validation phase
    model.eval()
    val_loss = 0.0
    total_mse = 0.0
    total_ssim = 0.0
    num_samples = 0
    
    with torch.no_grad():
        for X_batch, y_batch in tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Validation"):
            input_seq = torch.cat([X_batch, y_batch], dim=1)
            input_seq = input_seq.to(device)
            y_batch = y_batch.to(device)
            outputs = model(input_seq)
            outputs = outputs[:, X_batch.size(1):]
            loss = criterion(outputs, y_batch)
            val_loss += loss.item() * X_batch.size(0)
            
            # Compute MSE and SSIM per frame
            batch_size = X_batch.size(0)
            num_samples += batch_size
            outputs_np = outputs.cpu().numpy()
            y_batch_np = y_batch.cpu().numpy()
            
            for i in range(batch_size):
                mse_per_video = []
                ssim_per_video = []
                for t in range(outputs_np.shape[1]):
                    # Predicted and ground truth frames
                    pred_frame = outputs_np[i, t, 0]
                    true_frame = y_batch_np[i, t, 0]
                    
                    # Compute MSE
                    mse_frame = np.mean((pred_frame - true_frame) ** 2)
                    mse_per_video.append(mse_frame)
                      
                    # Compute SSIM
                    ssim_frame = ssim(true_frame, pred_frame, data_range=1.0)
                    ssim_per_video.append(ssim_frame)
                
                # Average over frames in the sequence
                total_mse += np.mean(mse_per_video)
                total_ssim += np.mean(ssim_per_video)
    
    val_loss /= len(val_loader.dataset)
    val_losses.append(val_loss)
    
    # Average MSE and SSIM over the validation set
    avg_mse = total_mse / num_samples 
    avg_ssim = total_ssim / num_samples 
    val_mse_scores.append(avg_mse)   
    val_ssim_scores.append(avg_ssim)    
    
    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {train_loss:.6f}, Validation Loss: {val_loss:.6f}, MSE: {avg_mse:.6f}, SSIM: {avg_ssim:.6f}")


# Video Generation

In [None]:
def generate_predictions(model, dataset, sample_index=0):
    """
    Generate predicted frames using the model for a sample from the dataset.

    Args:
        model: Trained Transformer model.
        dataset: The dataset to sample from (e.g., val_dataset).
        sample_index: Index of the sample in the dataset to use.

    Returns:
        input_frames: Numpy array of input frames.
        pred_frames: Numpy array of predicted frames.
    """
    model.eval()
    device = next(model.parameters()).device

    # Get a sample from the dataset
    input_seq, target_seq = dataset[sample_index]
    input_seq = input_seq.unsqueeze(0)  # Add batch dimension
    target_seq = target_seq.unsqueeze(0)  # Add batch dimension

    # Concatenate input and target sequences
    input_and_target = torch.cat([input_seq, target_seq], dim=1).to(device)

    with torch.no_grad():
        # Generate predictions
        output_seq = model(input_and_target)
        pred_seq = output_seq[:, input_seq.size(1):]  # Get the predicted frames
        pred_seq = pred_seq.squeeze(0).cpu()  # Remove batch dimension and move to CPU

    # Convert tensors to numpy arrays and rescale to [0, 255]
    input_frames = input_seq.squeeze(0).numpy()  # Shape: [input_length, channels, H, W]
    pred_frames = pred_seq.numpy()  # Shape: [target_length, channels, H, W]

    # Rescale pixel values from [0, 1] to [0, 255]
    input_frames = (input_frames * 255).astype(np.uint8)
    pred_frames = (pred_frames * 255).astype(np.uint8)

    # Remove channel dimension if channels == 1
    if input_frames.shape[1] == 1:
        input_frames = input_frames.squeeze(1)  # Shape: [input_length, H, W]
        pred_frames = pred_frames.squeeze(1)    # Shape: [target_length, H, W]

    return input_frames, pred_frames

In [None]:
# Generate predictions
input_frames, pred_frames = generate_predictions(model, val_dataset, sample_index=1)

# Visualize the frames (optional)
visualize_combined_frames(input_frames, pred_frames)

# Combine input and predicted frames
all_frames = np.concatenate((input_frames, pred_frames), axis=0)  # Shape: [total_frames, H, W]

# Save the video using OpenCV
save_frames_as_video(all_frames, save_path='/kaggle/working/transformer_generated_video.mp4', fps=10)

# User Interface

In [None]:
!pip install gradio

In [None]:
import gradio as gr
import cv2
import os

# Function to extract frames
def extract_frames(video_path, output_folder="/kaggle/working/temp_frames", frame_size=(64, 64)):
    """
    Extract frames from a video, resize to 64x64, and save to the specified folder.

    Parameters:
        video_path (str): Path to the video file.
        output_folder (str): Directory to save extracted frames.
        frame_size (tuple): Target frame size (width, height).

    Returns:
        list: List of file paths to the extracted frames.
    """
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
        print(f"Directory created: {output_folder}")
    else:
        print(f"Directory already exists: {output_folder}")

    video_capture = cv2.VideoCapture(video_path)
    frame_count = 0
    success = True
    extracted_frames = []

    while success:
        success, frame = video_capture.read()
        if success:
            # Resize and convert to grayscale
            frame = cv2.resize(frame, frame_size, interpolation=cv2.INTER_AREA)
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

            # Save frame as image
            frame_filename = f"frame_{frame_count:05d}.jpg"
            frame_filepath = os.path.join(output_folder, frame_filename)
            cv2.imwrite(frame_filepath, frame)

            extracted_frames.append(frame_filepath)
            frame_count += 1

    video_capture.release()
    print(f"Extracted {len(extracted_frames)} frames.")
    return extracted_frames


# Function to process video input
def process_video(uploaded_video, selected_sample):
    """
    Process the uploaded or selected video and extract frames.

    Parameters:
        uploaded_video (Gradio file): Uploaded video by user.
        selected_sample (str): Path to a selected video from test dataset.

    Returns:
        list: List of paths to extracted frames.
    """
    if uploaded_video is not None:
        # Save uploaded video to /kaggle/working/temp_videos
        temp_dir = "/kaggle/working/temp_videos"
        os.makedirs(temp_dir, exist_ok=True)
        video_path = os.path.join(temp_dir, uploaded_video.name)
        with open(video_path, "wb") as f:
            f.write(uploaded_video.read())
        print(f"Uploaded video saved at: {video_path}")
    elif selected_sample is not None:
        # Use selected sample video path
        video_path = selected_sample
        print(f"Selected sample video path: {video_path}")
    else:
        print("No video input provided.")
        return []

    # Extract frames
    extracted_frames = extract_frames(video_path, output_folder="/kaggle/working/temp_frames")
    return extracted_frames


# Video paths for Dropdown (example paths)
video_paths = [
    "/kaggle/input/ucf101-action-recognition/test/JumpingJack/k_JumpingJack_g12_c04.avi",
    "/kaggle/input/ucf101-action-recognition/test/Walking/k_Walking_g12_c04.avi",
    "/kaggle/input/ucf101-action-recognition/test/Biking/k_Biking_g12_c04.avi",
]

# Gradio Interface
with gr.Blocks() as demo:
    gr.Markdown("# Video Processing Interface")
    gr.Markdown("Upload a video or select a video from the test folder to process and extract frames.")

    with gr.Row():
        with gr.Column():
            # Upload option
            video_input = gr.File(label="Upload Video", file_types=[".mp4", ".avi", ".mkv"])
            # Dropdown selection
            sample_input = gr.Dropdown(
                choices=video_paths,
                label="Or Select a Video from Test Folder",
                interactive=True
            )
            run_button = gr.Button("Run Models")
        with gr.Column():
            with gr.Tab("Model 1"):
                gr.Markdown("### Model 1 Results")
                frames_output1 = gr.Gallery(label="Extracted Frames", columns=5, height="auto")  # For testing
                video_output1 = gr.Video(label="Final Video")  # Placeholder
                time_output1 = gr.Textbox(label="Inference Time")  # Placeholder
            with gr.Tab("Model 2"):
                gr.Markdown("### Model 2 Results")
                frames_output2 = gr.Gallery(label="Generated Frames", columns=2, height="auto")
                video_output2 = gr.Video(label="Final Video")
                time_output2 = gr.Textbox(label="Inference Time")
            with gr.Tab("Model 3"):
                gr.Markdown("### Model 3 Results")
                frames_output3 = gr.Gallery(label="Generated Frames", columns=2, height="auto")
                video_output3 = gr.Video(label="Final Video")
                time_output3 = gr.Textbox(label="Inference Time")

    # Handle button click
    def handle_run(uploaded_video, selected_sample):
        frames = process_video(uploaded_video, selected_sample)
        return frames, None, "0.0 seconds", [], None, None, [], None, None

    run_button.click(
        handle_run,
        inputs=[video_input, sample_input],
        outputs=[
            frames_output1, video_output1, time_output1,
            frames_output2, video_output2, time_output2,
            frames_output3, video_output3, time_output3
        ]
    )

    gr.Markdown("© 2024 Your Application")

# Launch the application
demo.launch()