In [34]:
import os
import torch
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from sklearn.metrics import mean_squared_error
from torchvision import transforms
import cv2
import torch
import os
from torchvision import transforms
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

# Define the ConvLSTM model using Keras (TensorFlow)
from tensorflow.keras import layers, models

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def build_conv_lstm(input_shape=(10, 64, 64, 1)):
    model = models.Sequential()
    model.add(layers.ConvLSTM2D(filters=64, kernel_size=(3, 3), activation='relu', 
                                input_shape=input_shape, return_sequences=True, padding='same'))
    model.add(layers.BatchNormalization())
    model.add(layers.ConvLSTM2D(filters=64, kernel_size=(3, 3), activation='relu', return_sequences=True, padding='same'))
    model.add(layers.BatchNormalization())
    model.add(layers.Conv3D(filters=1, kernel_size=(3, 3, 3), activation='sigmoid', padding='same'))
    model.compile(optimizer='adam', loss='mse')
    return model

# PyTorch Models: PredRNN and Transformer
class PredRNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
        super(PredRNN, self).__init__()
        self.num_layers = num_layers
        self.hidden_dim = hidden_dim
        self.input_dim = input_dim
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

class TransformerFramePredictor(nn.Module):
    def __init__(self, input_dim, num_heads, num_layers, hidden_dim, max_seq_length):
        super(TransformerFramePredictor, self).__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.embedding = nn.Linear(input_dim, hidden_dim)
        self.positional_encoding = nn.Parameter(torch.zeros(max_seq_length, hidden_dim))
        self.transformer = nn.Transformer(
            d_model=hidden_dim, nhead=num_heads, num_encoder_layers=num_layers, num_decoder_layers=num_layers
        )
        self.fc = nn.Linear(hidden_dim, input_dim)

    def forward(self, src, tgt):
        src = self.embedding(src) + self.positional_encoding[:src.size(0), :].to(src.device)
        tgt = self.embedding(tgt) + self.positional_encoding[:tgt.size(0), :].to(tgt.device)
        output = self.transformer(src, tgt)
        output = self.fc(output)
        return output

# Load the trained ConvLSTM model
conv_lstm_model = build_conv_lstm((10, 64, 64, 1))
conv_lstm_model.load_weights('conv_lstm_model.h5')  # Update path to the saved model

# Load PredRNN and Transformer models
predrnn_model = PredRNN(input_dim=64*64, hidden_dim=512, output_dim=64*64, num_layers=2).to(device)
predrnn_model.load_state_dict(torch.load('predrnn_model_state.pth', map_location=torch.device('cpu')))

# model = TransformerFramePredictor(
#     input_dim=64 * 64,  # Flattened input size
#     num_heads=8,
#     num_layers=4,
#     hidden_dim=512,
#     max_seq_length=input_frames + target_frames  # Max sequence length
# ).to(device)
# Load the checkpoint and adjust positional encoding
checkpoint = torch.load('transformer_model_state.pth', map_location=torch.device('cpu'))
if 'positional_encoding' in checkpoint:
    checkpoint['positional_encoding'] = checkpoint['positional_encoding'].squeeze(1)  # Remove the batch dimension

# Load the state dictionary into the model
transformer_model = TransformerFramePredictor(
    input_dim=64 * 64, num_heads=8, num_layers=4, hidden_dim=512, max_seq_length=20
).to(device)
transformer_model.load_state_dict(checkpoint)

# transformer_model = TransformerFramePredictor(
#     input_dim=64 * 64, num_heads=8, num_layers=4, hidden_dim=512, max_seq_length=20
# ).to(device)
# transformer_model.load_state_dict(torch.load('transformer_model_state.pth', map_location=torch.device('cpu')))  # Update path

# Helper functions for testing
def load_frames(directory, transform, num_frames):
    frames = sorted(os.listdir(directory))[:num_frames]
    images = []
    for frame in frames:
        img = Image.open(os.path.join(directory, frame))
        if transform:
            img = transform(img)
        images.append(img)
    return torch.stack(images)

def load_frames_from_video(video_path, transform, num_frames):
    """
    Load frames directly from a video file.
    Args:
        video_path (str): Path to the video file.
        transform: Transformation pipeline for the frames.
        num_frames (int): Number of frames to load.
    """
    cap = cv2.VideoCapture(video_path)
    frames = []
    frame_count = 0

    while cap.isOpened() and frame_count < num_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB
        img = Image.fromarray(frame)
        if transform:
            img = transform(img)
        frames.append(img)
        frame_count += 1

    cap.release()
    if len(frames) < num_frames:
        raise ValueError(f"Video {video_path} has fewer than {num_frames} frames.")
    return torch.stack(frames)

# Generate video from predicted frames
def generate_video(frames, output_path, fps=10):
    height, width = frames[0].shape
    video = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height), isColor=False)
    for frame in frames:
        frame = (frame * 255).astype(np.uint8)  # Convert normalized frame to uint8
        video.write(cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR))  # Write frame in grayscale
    video.release()
    print(f"Video saved at {output_path}")


# Modified testing function for CSV-based paths
def evaluate_model_from_csv(model, model_name, clip_name, output_dir, num_input_frames=10, num_predicted_frames=5):
    # Get input and target paths
    input_path, label = get_clip_paths(clip_name)

    transform = transforms.Compose([
        transforms.Grayscale(),  # Convert to grayscale
        transforms.Resize((64, 64)),
        transforms.ToTensor()
    ])
    
    if input_path.endswith('.avi'):
        # Load frames directly from the video file
        input_frames = load_frames_from_video(input_path, transform, num_input_frames).to(device)
        target_frames = load_frames_from_video(input_path, transform, num_predicted_frames)
    else:
        # Load frames from a directory
        input_frames = load_frames(input_path, transform, num_input_frames).to(device)
        target_frames = load_frames(input_path, transform, num_predicted_frames)


    predictions = []
    sequence = input_frames.unsqueeze(0)  # Add batch dimension
    tgt = sequence[:, -1:]  # Initialize tgt with the last frame of the input sequence
    with torch.no_grad():
        for _ in range(num_predicted_frames):
            if model_name == 'PredRNN':
                # Flatten input for LSTM
                pred = model(sequence.view(sequence.size(0), sequence.size(1), -1))
                pred = pred.view(-1, 1, 64, 64)  # Reshape: [batch_size, channels, height, width]
            elif model_name == 'ConvLSTM':
                pred = conv_lstm_model.predict(sequence.cpu().numpy())
                pred = torch.tensor(pred[0, -1]).to(device).unsqueeze(0)  # Add batch dimension
            else:  # Transformer
                # Ensure src and tgt batch sizes match
                pred = model(
                    sequence.view(sequence.size(0), sequence.size(1), -1),  # Flatten src
                    tgt.view(sequence.size(0), tgt.size(1), -1)  # Flatten tgt
                )
                pred = pred.view(-1, 1, 64, 64)  # Reshape: [batch_size, channels, height, width]
                tgt = pred.unsqueeze(1)  # Update tgt with the latest prediction

            # Ensure pred has the correct shape
            pred = pred.unsqueeze(1)  # Add sequence dimension: [batch_size, 1, channels, height, width]

            predictions.append(pred.cpu().numpy().squeeze(1).squeeze(1))  # Convert to numpy, remove extra dims
            sequence = torch.cat((sequence[:, 1:], pred), dim=1)  # Update input sequence

    # Visualization and MSE calculation
    # Visualization and MSE calculation
    mse_scores = []
    for i, (pred, target) in enumerate(zip(predictions, target_frames)):
        # Reshape prediction to 2D
        if len(pred.shape) == 4:  # Shape [batch_size, channels, height, width]
            pred_2d = pred[0, 0]  # Take the first batch and first channel
        elif len(pred.shape) == 3:  # Shape [channels, height, width]
            pred_2d = pred[0]  # Take the first channel
        elif len(pred.shape) == 2:  # Shape [height, width]
            pred_2d = pred  # Already in the correct shape
        else:
            raise ValueError(f"Unexpected pred shape: {pred.shape}")

        # Target frame
        target_2d = target[0].cpu().numpy()  # Convert target to numpy

        # Calculate MSE
        mse = mean_squared_error(pred_2d.flatten(), target_2d.flatten())
        mse_scores.append(mse)

        # Save visualization
        os.makedirs(output_dir, exist_ok=True)
        plt.figure(figsize=(10, 5))
        plt.subplot(1, 2, 1)
        plt.imshow(pred_2d, cmap='gray')  # Prediction
        plt.title(f"{model_name} Prediction")
        plt.subplot(1, 2, 2)
        plt.imshow(target_2d, cmap='gray')  # Ground truth
        plt.title("Ground Truth")
        plt.savefig(f"{output_dir}/{clip_name}_{model_name}_frame_{i}.png")
        plt.close()



import pandas as pd
import os

# Load the test CSV
test_csv_path = r"C:\Users\Ali Arfa\Downloads\deep_learning_project\test.csv"
test_base_folder = r"C:/Users/Ali Arfa/Downloads/deep_learning_project"  # e.g., 'test'

test_data = pd.read_csv(test_csv_path)

# Example: Selecting a specific clip
def get_clip_paths(clip_name):
    clip_row = test_data[test_data['clip_name'] == clip_name]
    if clip_row.empty:
        raise ValueError(f"Clip {clip_name} not found in test CSV.")

    clip_path = clip_row['clip_path'].values[0]  # Path to frames or video
    label = clip_row['label'].values[0]  # Action class

    test_base_folder = r"C:/Users/Ali Arfa/Downloads/deep_learning_project"  # Base folder
    # Relative path

    # Ensure clip_path does not start with a slash
    full_clip_path = os.path.normpath(os.path.join(test_base_folder, clip_path.lstrip('/\\')))

    print(f"Full Path: {full_clip_path}")
    # full_clip_path = os.path.normpath(os.path.join(test_base_folder, clip_path.lstrip('/\\')))
    print(f"Resolved full path: {full_clip_path}")  # Debugging print

    if not os.path.exists(full_clip_path):
        raise FileNotFoundError(f"Clip frames or video not found at {full_clip_path}.")

    return full_clip_path, label



clip_name = "v_Biking_g06_c01"  # Replace with an actual clip name from the CSV
full_path, label = get_clip_paths(clip_name)
print(f"Full Path: {full_path}")
print(f"Label: {label}")

# Paths for evaluation
# input_dir = r'C:\Users\Ali Arfa\Downloads\deep_learning_project\test'
# target_dir = 'path_to_target'
# Example clip to test
clip_name = "v_Biking_g06_c01"  # Replace with a valid clip name from the CSV
output_dir = r'C:\Users\Ali Arfa\Downloads\deep_learning_project\results'

# Test each model
evaluate_model_from_csv(predrnn_model, 'PredRNN', clip_name, output_dir)
evaluate_model_from_csv(transformer_model, 'Transformer', clip_name, output_dir)
evaluate_model_from_csv(conv_lstm_model, 'ConvLSTM', clip_name, output_dir)

# # Test each model
# evaluate_model(predrnn_model, 'PredRNN', input_dir, target_dir, output_dir)
# evaluate_model(transformer_model, 'Transformer', input_dir, target_dir, output_dir)
# evaluate_model(conv_lstm_model, 'ConvLSTM', input_dir, target_dir, output_dir)


  super().__init__(**kwargs)


Full Path: C:\Users\Ali Arfa\Downloads\deep_learning_project\test\Biking\v_Biking_g06_c01.avi
Resolved full path: C:\Users\Ali Arfa\Downloads\deep_learning_project\test\Biking\v_Biking_g06_c01.avi
Full Path: C:\Users\Ali Arfa\Downloads\deep_learning_project\test\Biking\v_Biking_g06_c01.avi
Label: Biking
Full Path: C:\Users\Ali Arfa\Downloads\deep_learning_project\test\Biking\v_Biking_g06_c01.avi
Resolved full path: C:\Users\Ali Arfa\Downloads\deep_learning_project\test\Biking\v_Biking_g06_c01.avi
Full Path: C:\Users\Ali Arfa\Downloads\deep_learning_project\test\Biking\v_Biking_g06_c01.avi
Resolved full path: C:\Users\Ali Arfa\Downloads\deep_learning_project\test\Biking\v_Biking_g06_c01.avi


RuntimeError: the batch number of src and tgt must be equal

In [None]:
# import torch
# import os
# import cv2
# import matplotlib.pyplot as plt
# from torchvision import transforms
# from sklearn.metrics import mean_squared_error
# from PIL import Image
# import numpy as np

# # Helper function to load frames from a directory
# def load_frames(directory, transform, num_frames):
#     frames = sorted(os.listdir(directory))[:num_frames]
#     images = []
#     for frame in frames:
#         frame_path = os.path.join(directory, frame)
#         img = Image.open(frame_path)
#         if transform:
#             img = transform(img)
#         images.append(img)
#     return torch.stack(images)

# # Helper function to generate a video from frames
# def generate_video(frames, output_path, fps=10):
#     height, width = frames[0].shape
#     video = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height), isColor=False)
#     for frame in frames:
#         frame = (frame * 255).astype(np.uint8)
#         video.write(cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR))
#     video.release()
#     print(f"Video saved at {output_path}")

# # Test and evaluate a model
# def test_and_evaluate(model, model_name, input_dir, target_dir, output_dir, num_input_frames=10, num_predicted_frames=5):
#     model.eval()
#     os.makedirs(output_dir, exist_ok=True)

#     # Load input and target frames
#     input_frames = load_frames(input_dir, transform, num_input_frames).to(device)
#     target_frames = load_frames(target_dir, transform, num_predicted_frames)

#     # Predict frames
#     predictions = []
#     current_sequence = input_frames
#     with torch.no_grad():
#         for _ in range(num_predicted_frames):
#             prediction = model(current_sequence.unsqueeze(0))  # Predict next frame
#             predictions.append(prediction.cpu().view(64, 64).numpy())  # Reshape prediction
#             current_sequence = torch.cat((current_sequence[1:], prediction), dim=0)

#     # Evaluate using MSE
#     mse_scores = []
#     for i in range(num_predicted_frames):
#         mse = mean_squared_error(predictions[i].flatten(), target_frames[i][0].cpu().numpy().flatten())
#         mse_scores.append(mse)

#     # Visualize predictions vs ground truth
#     for i, (pred_frame, actual_frame) in enumerate(zip(predictions, target_frames)):
#         fig, axs = plt.subplots(1, 2, figsize=(10, 5))
#         axs[0].imshow(pred_frame, cmap="gray")
#         axs[0].set_title(f"{model_name} Predicted Frame {i + 1}")
#         axs[0].axis("off")
#         axs[1].imshow(actual_frame[0].cpu().numpy(), cmap="gray")
#         axs[1].set_title(f"Ground Truth Frame {i + 1}")
#         axs[1].axis("off")

#         save_path = os.path.join(output_dir, f"{model_name}_frame_{i + 1}.png")
#         plt.savefig(save_path)
#         plt.close(fig)
#         print(f"Saved visualization: {save_path}")

#     # Generate video from predicted frames
#     generate_video(predictions, os.path.join(output_dir, f"{model_name}_predicted.mp4"))

#     # Return evaluation results
#     avg_mse = np.mean(mse_scores)
#     print(f"{model_name} Average MSE: {avg_mse:.4f}")
#     return avg_mse

# # Load models (define your model loading logic here)
# def load_model(model_class, state_path, **model_kwargs):
#     model = model_class(**model_kwargs).to(device)
#     model.load_state_dict(torch.load(state_path))
#     model.eval()
#     print(f"{model_class.__name__} model loaded successfully.")
#     return model

# # Configuration
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# transform = transforms.Compose([
#     transforms.Grayscale(),
#     transforms.Resize((64, 64)),
#     transforms.ToTensor(),
#     transforms.Normalize(mean=[0.5], std=[0.5])
# ])
# input_dir = "path_to_input_frames"
# target_dir = "path_to_target_frames"
# output_base_dir = "model_comparison_results"

# # Define models and paths
# models = [
#     {
#         "name": "PredRNN",
#         "class": PredRNN,
#         "state_path": "predrnn_model_state.pth",
#         "kwargs": {"input_dim": 64 * 64, "hidden_dim": 512, "output_dim": 64 * 64, "num_layers": 2}
#     },
#     {
#         "name": "ConvLSTM",
#         "class": ConvLSTM,  # Assume this is defined elsewhere
#         "state_path": "convlstm_model_state.pth",
#         "kwargs": {"input_dim": 64 * 64, "hidden_dim": 512, "output_dim": 64 * 64, "num_layers": 2}
#     },
#     {
#         "name": "Transformer",
#         "class": TransformerFramePredictor,
#         "state_path": "transformer_model_state.pth",
#         "kwargs": {"input_dim": 64 * 64, "num_heads": 8, "num_layers": 4, "hidden_dim": 512, "max_seq_length": 15}
#     }
# ]

# # Evaluate all models
# results = {}
# for model_config in models:
#     model_name = model_config["name"]
#     output_dir = os.path.join(output_base_dir, model_name)
#     model = load_model(model_config["class"], model_config["state_path"], **model_config["kwargs"])

#     avg_mse = test_and_evaluate(
#         model=model,
#         model_name=model_name,
#         input_dir=input_dir,
#         target_dir=target_dir,
#         output_dir=output_dir,
#         num_input_frames=10,
#         num_predicted_frames=5
#     )
#     results[model_name] = avg_mse

# # Display final results
# print("\nFinal Evaluation Results:")
# for model_name, mse in results.items():
#     print(f"{model_name}: Average MSE = {mse:.4f}")


In [None]:
# import torch
# import os
# import matplotlib.pyplot as plt
# from torchvision import transforms
# from PIL import Image
# import numpy as np
# from sklearn.metrics import mean_squared_error

# # Define a function to load frames from a directory
# def load_frames(directory, transform, num_frames):
#     frames = sorted(os.listdir(directory))[:num_frames]
#     images = []
#     for frame in frames:
#         frame_path = os.path.join(directory, frame)
#         img = Image.open(frame_path)
#         if transform:
#             img = transform(img)
#         images.append(img)
#     return torch.stack(images)

# # Test and evaluate a model
# def test_and_evaluate(model, model_name, input_dir, target_dir, output_dir, num_input_frames=10, num_predicted_frames=5):
#     model.eval()
#     os.makedirs(output_dir, exist_ok=True)

#     # Load input and target frames
#     input_frames = load_frames(input_dir, transform, num_input_frames).to(device)
#     target_frames = load_frames(target_dir, transform, num_predicted_frames)

#     # Predict frames
#     predictions = []
#     current_sequence = input_frames
#     with torch.no_grad():
#         for _ in range(num_predicted_frames):
#             prediction = model(current_sequence.unsqueeze(0))  # Predict next frame
#             predictions.append(prediction.cpu().view(64, 64).numpy())  # Reshape prediction
#             current_sequence = torch.cat((current_sequence[1:], prediction), dim=0)

#     # Evaluate using MSE
#     mse_scores = []
#     for i in range(num_predicted_frames):
#         mse = mean_squared_error(predictions[i].flatten(), target_frames[i][0].cpu().numpy().flatten())
#         mse_scores.append(mse)

#     # Visualize predictions vs ground truth
#     for i, (pred_frame, actual_frame) in enumerate(zip(predictions, target_frames)):
#         fig, axs = plt.subplots(1, 2, figsize=(10, 5))
#         axs[0].imshow(pred_frame, cmap="gray")
#         axs[0].set_title(f"{model_name} Predicted Frame {i + 1}")
#         axs[0].axis("off")
#         axs[1].imshow(actual_frame[0].cpu().numpy(), cmap="gray")
#         axs[1].set_title(f"Ground Truth Frame {i + 1}")
#         axs[1].axis("off")

#         save_path = os.path.join(output_dir, f"{model_name}_frame_{i + 1}.png")
#         plt.savefig(save_path)
#         plt.close(fig)
#         print(f"Saved visualization: {save_path}")

#     # Return evaluation results
#     avg_mse = np.mean(mse_scores)
#     print(f"{model_name} Average MSE: {avg_mse:.4f}")
#     return avg_mse

# # Load models
# def load_model(model_class, state_path, **model_kwargs):
#     model = model_class(**model_kwargs).to(device)
#     model.load_state_dict(torch.load(state_path))
#     model.eval()
#     print(f"{model_class.__name__} model loaded successfully.")
#     return model

# # Configuration
# transform = transforms.Compose([
#     transforms.Grayscale(),
#     transforms.Resize((64, 64)),
#     transforms.ToTensor(),
#     transforms.Normalize(mean=[0.5], std=[0.5])
# ])
# input_dir = "path_to_input_frames"
# target_dir = "path_to_target_frames"
# output_base_dir = "model_comparison_results"

# # Define models and paths
# models = [
#     {
#         "name": "PredRNN",
#         "class": PredRNN,
#         "state_path": "predrnn_model_state.pth",
#         "kwargs": {"input_dim": 64 * 64, "hidden_dim": 512, "output_dim": 64 * 64, "num_layers": 2}
#     },
#     {
#         "name": "ConvLSTM",
#         "class": ConvLSTM,  # Assume this is defined elsewhere
#         "state_path": "convlstm_model_state.pth",
#         "kwargs": {"input_dim": 64 * 64, "hidden_dim": 512, "output_dim": 64 * 64, "num_layers": 2}
#     },
#     {
#         "name": "Transformer",
#         "class": TransformerFramePredictor,
#         "state_path": "transformer_model_state.pth",
#         "kwargs": {"input_dim": 64 * 64, "num_heads": 8, "num_layers": 4, "hidden_dim": 512, "max_seq_length": 15}
#     }
# ]

# # Evaluate all models
# results = {}
# for model_config in models:
#     model_name = model_config["name"]
#     output_dir = os.path.join(output_base_dir, model_name)
#     model = load_model(model_config["class"], model_config["state_path"], **model_config["kwargs"])

#     avg_mse = test_and_evaluate(
#         model=model,
#         model_name=model_name,
#         input_dir=input_dir,
#         target_dir=target_dir,
#         output_dir=output_dir,
#         num_input_frames=10,
#         num_predicted_frames=5
#     )
#     results[model_name] = avg_mse

# # Display final results
# print("\nFinal Evaluation Results:")
# for model_name, mse in results.items():
#     print(f"{model_name}: Average MSE = {mse:.4f}")
