# Multimodal Approach

The following notebook will attempt to use CNN feature extraction and LSTM for temporal prediction 

In [4]:
# Process imports
import os
import torch
from PIL import Image
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms

## CNN Component

In [17]:

class CNNFeatureExtractor:
    """
    A class to extract features from video frames using a pretrained CNN (ResNet).
    
    Attributes:
    -----------
    model : nn.Module
        The pretrained CNN model (ResNet).
    transform : torchvision.transforms.Compose
        The transformations applied to input frames (resizing, normalization).
    
    Methods:
    --------
    extract_features(frame_batch: torch.Tensor) -> torch.Tensor:
        Extracts features from a batch of frames.
    """
    
    #def __init__(self, device='cuda' if torch.cuda.is_available() else 'cpu'):
    def __init__(self, device='cpu'):
        """
        Initialize the CNN feature extractor with a pretrained ResNet model.
        
        Parameters:
        -----------
        device : str
            The device on which to run the model ('cuda' or 'cpu').
        """
        self.device = device
        self.model = models.resnet50(pretrained=True)
        # Remove the classification head (fc layer)
        self.model = nn.Sequential(*list(self.model.children())[:-2])
        self.model = self.model.to(self.device)
        self.model.eval()  # Set to evaluation mode
        
        # Define the necessary image transformations
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),  # Resize frame to 224x224 (ResNet input size)
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
    
    def extract_features(self, frame_batch):
        """
        Extract features from a batch of frames.
        
        Parameters:
        -----------
        frame_batch : torch.Tensor
            A batch of video frames (B, C, H, W).
        
        Returns:
        --------
        features : torch.Tensor
            Extracted CNN features for each frame in the batch.
        """
        with torch.no_grad():  # Disable gradient calculation
            frame_batch = frame_batch.to(self.device)
            features = self.model(frame_batch)
        return features

    def process_frame(self, frame_path):
        """
        Process a single frame from an image file.
        
        Parameters:
        -----------
        frame_path : str
            Path to the image file (frame).
        
        Returns:
        --------
        processed_frame : torch.Tensor
            Processed frame ready for feature extraction.
        """
        try:
            # Load image and apply transformations
            frame = Image.open(frame_path).convert("RGB")
            processed_frame = self.transform(frame)
            return processed_frame
        except Exception as e:
            raise RuntimeError(f"Error processing frame {frame_path}: {str(e)}")

    def process_batch(self, frame_paths):
        """
        Process a batch of frames from a list of image paths.
        
        Parameters:
        -----------
        frame_paths : list of str
            List of file paths to the frames.
        
        Returns:
        --------
        frame_batch : torch.Tensor
            A batch of processed frames ready for feature extraction.
        """
        try:
            processed_frames = [self.process_frame(fp) for fp in frame_paths]
            frame_batch = torch.stack(processed_frames)
            return frame_batch
        except Exception as e:
            raise RuntimeError(f"Error processing frame batch: {str(e)}")

    def save_features(self, features, output_dir, video_id):
        """
        Save extracted features to a file.
        
        Parameters:
        -----------
        features : torch.Tensor
            Extracted features from the CNN.
        output_dir : str
            Directory to save the features.
        video_id : str
            Identifier for the video (used in the output filename).
        """
        try:
            os.makedirs(output_dir, exist_ok=True)
            output_path = os.path.join(output_dir, f"{video_id}_features.pt")
            torch.save(features.cpu(), output_path)
            print(f"Features saved to {output_path}")
        except Exception as e:
            raise RuntimeError(f"Error saving features: {str(e)}")


# Example usage
def extract_video_features(video_frame_dir, output_dir, extractor):
    """
    Extract features for all frames in a video directory and save to output directory.
    
    Parameters:
    -----------
    video_frame_dir : str
        Path to the directory containing video frames.
    output_dir : str
        Path to the directory where features will be saved.
    extractor : CNNFeatureExtractor
        The CNN feature extractor instance.
    """
    try:
        # List all frame files in the directory
        frame_files = sorted([os.path.join(video_frame_dir, f) for f in os.listdir(video_frame_dir)
                              if f.endswith(('.jpg'))])

        # Process frames in batches (if needed for larger videos)
        batch_size = 16
        for i in range(0, len(frame_files), batch_size):
            batch_files = frame_files[i:i + batch_size]
            frame_batch = extractor.process_batch(batch_files)
            features = extractor.extract_features(frame_batch)
            extractor.save_features(features, output_dir, video_frame_dir.split('/')[-1])
            
        print(f"Feature extraction completed for video {video_frame_dir}")
    
    except Exception as e:
        raise RuntimeError(f"Error extracting features for video {video_frame_dir}: {str(e)}")


# Initialize the feature extractor
extractor = CNNFeatureExtractor()

# Example: Process video frames from a directory and save extracted features
video_frame_directory = 'extracted_frames/video_0'
#video_frame_directory = 'extracted_frames/video_9'
output_directory = 'data'
extract_video_features(video_frame_directory, output_directory, extractor)


Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features saved to data/video_9_features.pt
Features sa

## LSTM Component

In [8]:
# Import packages

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import os
import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt


In [28]:
# Video Dataset

class VideoDataset(Dataset):
    """
    PyTorch Dataset for loading video features and corresponding pitch and yaw labels.
    
    Attributes:
    -----------
    feature_dir : str
        Directory containing CNN-extracted feature .pt files.
    label_dir : str
        Directory containing pitch and yaw label .txt files.
    sequence_length : int
        Number of consecutive frames to include in each sequence.
    transform : callable, optional
        Optional transform to be applied on a sample.
    
    Methods:
    --------
    __len__():
        Returns the total number of sequences across all videos.
    __getitem__(idx):
        Retrieves a sequence of features and corresponding labels.
    """
    
    def __init__(self, feature_dir, label_dir, sequence_length=30, transform=None):
        """
        Initializes the VideoDataset.
        
        Parameters:
        -----------
        feature_dir : str
            Path to the directory with CNN-extracted feature .pt files.
        label_dir : str
            Path to the directory with label .txt files.
        sequence_length : int
            Length of each sequence for LSTM input.
        transform : callable, optional
            Optional transform to be applied on a sample.
        """
        self.feature_dir = feature_dir
        self.label_dir = label_dir
        self.sequence_length = sequence_length
        self.transform = transform
        
        # List all feature and label files
        self.video_ids = sorted([f.replace('_features.pt', '').split('_')[1] for f in os.listdir(feature_dir) if f.endswith('_features.pt')])
        
        # Verify corresponding label files exist
        for vid in self.video_ids:
            label_path = os.path.join(label_dir, f"{vid}.txt") # f"{vid}_labels.txt"
            if vid == '5':
                break

            if not os.path.isfile(label_path):
                raise FileNotFoundError(f"Label file for video '{vid}' not found at '{label_path}'")
    
    def __len__(self):
        """
        Returns the total number of sequences across all videos.
        """
        total_sequences = 0
        for vid in self.video_ids:
            features = torch.load(os.path.join(self.feature_dir, f"{vid}_features.pt"))  # Shape: (C, H, W, F)
            num_frames = features.shape[-1]
            if num_frames >= self.sequence_length:
                total_sequences += num_frames - self.sequence_length + 1
        return total_sequences
    
    def __getitem__(self, idx):
        """
        Retrieves a sequence of features and corresponding labels based on the index.
        
        Parameters:
        -----------
        idx : int
            Index of the sequence to retrieve.
        
        Returns:
        --------
        sequence_features : torch.Tensor
            Tensor of shape (sequence_length, feature_dim).
        sequence_labels : torch.Tensor
            Tensor of shape (sequence_length, 2) containing pitch and yaw.
        """
        # Iterate through videos to find which video the idx falls into
        cumulative = 0
        for vid in self.video_ids:
            features = torch.load(os.path.join(self.feature_dir, f"{vid}_features.pt"))  # Shape: (C, H, W, F)
            num_frames = features.shape[-1]
            if num_frames < self.sequence_length:
                continue
            if idx < cumulative + (num_frames - self.sequence_length + 1):
                sequence_idx = idx - cumulative
                # Extract feature sequence
                feature_sequence = features[:, :, :, sequence_idx : sequence_idx + self.sequence_length]  # Shape: (C, H, W, S)
                # Flatten the spatial dimensions (C, H, W) into a single feature vector per frame
                feature_sequence = feature_sequence.permute(3, 0, 1, 2)  # Shape: (S, C, H, W)
                feature_sequence = feature_sequence.view(self.sequence_length, -1)  # Shape: (S, C*H*W)
                
                # Load corresponding labels
                label_path = os.path.join(self.label_dir, f"{vid}.txt") #f"{vid}_labels.txt")
                labels = np.loadtxt(label_path)  # Shape: (F, 2)
                label_sequence = labels[sequence_idx : sequence_idx + self.sequence_length, :]  # Shape: (S, 2)
                label_sequence = torch.from_numpy(label_sequence).float()
                
                if self.transform:
                    feature_sequence = self.transform(feature_sequence)
                
                return feature_sequence, label_sequence
            cumulative += (num_frames - self.sequence_length + 1)
        
        raise IndexError(f"Index {idx} out of range for dataset with length {len(self)}")



### LSTM Regressor

In [18]:
# LSTM

class LSTMRegressor(nn.Module):
    """
    LSTM-based Regressor for predicting pitch and yaw angles from CNN-extracted features.
    
    Attributes:
    -----------
    input_size : int
        Size of the input feature vector per frame.
    hidden_size : int
        Number of features in the hidden state of the LSTM.
    num_layers : int
        Number of recurrent layers in the LSTM.
    dropout : float
        Dropout probability between LSTM layers.
    bidirectional : bool
        If True, becomes a bidirectional LSTM.
    
    Methods:
    --------
    forward(x):
        Defines the forward pass of the model.
    """
    
    def __init__(self, input_size, hidden_size=128, num_layers=2, dropout=0.2, bidirectional=False):
        """
        Initializes the LSTMRegressor.
        
        Parameters:
        -----------
        input_size : int
            Number of input features per frame.
        hidden_size : int, optional
            Number of features in the hidden state (default=128).
        num_layers : int, optional
            Number of recurrent layers (default=2).
        dropout : float, optional
            Dropout probability between LSTM layers (default=0.2).
        bidirectional : bool, optional
            If True, use a bidirectional LSTM (default=False).
        """
        super(LSTMRegressor, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.bidirectional = bidirectional
        
        self.lstm = nn.LSTM(input_size=input_size,
                            hidden_size=hidden_size,
                            num_layers=num_layers,
                            batch_first=True,
                            dropout=dropout,
                            bidirectional=bidirectional)
        
        direction = 2 if bidirectional else 1
        self.fc = nn.Linear(hidden_size * direction, 2)  # Predict pitch and yaw
    
    def forward(self, x):
        """
        Forward pass of the LSTMRegressor.
        
        Parameters:
        -----------
        x : torch.Tensor
            Input tensor of shape (batch_size, sequence_length, input_size).
        
        Returns:
        --------
        out : torch.Tensor
            Output tensor of shape (batch_size, 2) containing predicted pitch and yaw.
        """
        # Initialize hidden and cell states with zeros
        h0 = torch.zeros(self.num_layers * (2 if self.bidirectional else 1),
                        x.size(0),
                        self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers * (2 if self.bidirectional else 1),
                        x.size(0),
                        self.hidden_size).to(x.device)
        
        # LSTM forward pass
        out, _ = self.lstm(x, (h0, c0))  # out: (batch_size, seq_length, hidden_size * num_directions)
        
        # Take the output from the last time step
        out = out[:, -1, :]  # (batch_size, hidden_size * num_directions)
        
        # Fully connected layer
        out = self.fc(out)  # (batch_size, 2)
        return out


### Model Training 

In [19]:
# lstm_pipeline.py (continued)

def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=25, device='cpu'): # device='cuda'
    """
    Trains the LSTM model.
    
    Parameters:
    -----------
    model : nn.Module
        The LSTMRegressor model.
    train_loader : DataLoader
        DataLoader for training data.
    val_loader : DataLoader
        DataLoader for validation data.
    criterion : nn.Module
        Loss function.
    optimizer : torch.optim.Optimizer
        Optimizer for model parameters.
    num_epochs : int, optional
        Number of training epochs (default=25).
    device : str, optional
        Device to train on ('cuda' or 'cpu').
    
    Returns:
    --------
    model : nn.Module
        The trained model.
    history : dict
        Dictionary containing training and validation loss history.
    """
    history = {'train_loss': [], 'val_loss': []}
    best_val_loss = float('inf')
    best_model_wts = None
    
    model.to(device)
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)
        
        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
                dataloader = train_loader
            else:
                model.eval()   # Set model to evaluate mode
                dataloader = val_loader
            
            running_loss = 0.0
            
            # Iterate over data
            for inputs, labels in tqdm(dataloader, desc=f'{phase.capitalize()}'):
                inputs = inputs.to(device)  # Shape: (batch_size, seq_length, input_size)
                labels = labels.to(device)  # Shape: (batch_size, 2)
                
                # Zero the parameter gradients
                optimizer.zero_grad()
                
                # Forward pass
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)  # Shape: (batch_size, 2)
                    loss = criterion(outputs, labels)
                    
                    # Backward pass and optimize only in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                
                running_loss += loss.item() * inputs.size(0)
            
            epoch_loss = running_loss / len(dataloader.dataset)
            history[f'{phase}_loss'].append(epoch_loss)
            
            print(f'{phase.capitalize()} Loss: {epoch_loss:.4f}')
            
            # Deep copy the model
            if phase == 'val' and epoch_loss < best_val_loss:
                best_val_loss = epoch_loss
                best_model_wts = model.state_dict()
        
        print()
    
    print(f'Best Validation Loss: {best_val_loss:.4f}')
    
    # Load best model weights
    if best_model_wts is not None:
        model.load_state_dict(best_model_wts)
    
    return model, history

def evaluate_model(model, test_loader, criterion, device='cpu'): # device='cuda'):
    """
    Evaluates the trained model on the test set.
    
    Parameters:
    -----------
    model : nn.Module
        The trained LSTMRegressor model.
    test_loader : DataLoader
        DataLoader for test data.
    criterion : nn.Module
        Loss function.
    device : str, optional
        Device to evaluate on ('cuda' or 'cpu').
    
    Returns:
    --------
    test_loss : float
        Mean loss on the test set.
    predictions : list of np.ndarray
        List containing predicted pitch and yaw angles.
    targets : list of np.ndarray
        List containing actual pitch and yaw angles.
    """
    model.eval()
    running_loss = 0.0
    predictions = []
    targets = []
    
    with torch.no_grad():
        for inputs, labels in tqdm(test_loader, desc='Testing'):
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item() * inputs.size(0)
            
            predictions.append(outputs.cpu().numpy())
            targets.append(labels.cpu().numpy())
    
    test_loss = running_loss / len(test_loader.dataset)
    predictions = np.vstack(predictions)
    targets = np.vstack(targets)
    
    return test_loss, predictions, targets

def plot_training_history(history):
    """
    Plots the training and validation loss over epochs.
    
    Parameters:
    -----------
    history : dict
        Dictionary containing training and validation loss history.
    """
    epochs = range(1, len(history['train_loss']) + 1)
    
    plt.figure(figsize=(10, 5))
    plt.plot(epochs, history['train_loss'], 'bo-', label='Training Loss')
    plt.plot(epochs, history['val_loss'], 'ro-', label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss (MSE)')
    plt.legend()
    plt.grid(True)
    plt.show()

def save_model(model, path):
    """
    Saves the trained model to the specified path.
    
    Parameters:
    -----------
    model : nn.Module
        The trained model.
    path : str
        Path to save the model.
    """
    try:
        torch.save(model.state_dict(), path)
        print(f"Model saved to {path}")
    except Exception as e:
        raise RuntimeError(f"Error saving model: {str(e)}")

def load_model(model, path, device='cuda'):
    """
    Loads the model weights from the specified path.
    
    Parameters:
    -----------
    model : nn.Module
        The model architecture to load weights into.
    path : str
        Path to the saved model weights.
    device : str, optional
        Device to load the model on ('cuda' or 'cpu').
    
    Returns:
    --------
    model : nn.Module
        The model with loaded weights.
    """
    try:
        model.load_state_dict(torch.load(path, map_location=device))
        model.to(device)
        model.eval()
        print(f"Model loaded from {path}")
        return model
    except Exception as e:
        raise RuntimeError(f"Error loading model: {str(e)}")


### Main 

In [29]:
# lstm_pipeline.py (continued)

def main():
    """
    Main function to train and evaluate the LSTM model for pitch and yaw prediction.
    """
    # Configuration
    FEATURE_DIR = 'data'  # Directory containing feature .pt files
    LABEL_DIR = 'labeled'                # Directory containing label .txt files
    SEQUENCE_LENGTH = 30                 # Number of frames per sequence
    BATCH_SIZE = 32
    NUM_EPOCHS = 50
    LEARNING_RATE = 1e-3
    HIDDEN_SIZE = 128
    NUM_LAYERS = 2
    DROPOUT = 0.2
    BIDIRECTIONAL = False
    MODEL_SAVE_PATH = 'lstm_regressor.pth'
    VALIDATION_SPLIT = 0.2
    RANDOM_SEED = 42
    DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    # Set random seeds for reproducibility
    torch.manual_seed(RANDOM_SEED)
    np.random.seed(RANDOM_SEED)
    
    # Initialize Dataset
    dataset = VideoDataset(feature_dir=FEATURE_DIR,
                           label_dir=LABEL_DIR,
                           sequence_length=SEQUENCE_LENGTH)
    
    # Split into training and validation sets
    total_size = len(dataset)
    val_size = int(total_size * VALIDATION_SPLIT)
    train_size = total_size - val_size
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size],
                                                               generator=torch.Generator().manual_seed(RANDOM_SEED))
    
    # Create DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)
    
    # Initialize the LSTM model
    # Assuming each feature vector is flattened (C*H*W)
    sample_feature, _ = dataset[0]
    input_size = sample_feature.shape[1]  # C*H*W
    model = LSTMRegressor(input_size=input_size,
                          hidden_size=HIDDEN_SIZE,
                          num_layers=NUM_LAYERS,
                          dropout=DROPOUT,
                          bidirectional=BIDIRECTIONAL)
    
    # Define loss function and optimizer
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    
    # Train the model
    trained_model, history = train_model(model, train_loader, val_loader, criterion, optimizer,
                                        num_epochs=NUM_EPOCHS, device=DEVICE)
    
    # Plot training history
    plot_training_history(history)
    
    # Save the trained model
    save_model(trained_model, MODEL_SAVE_PATH)
    
    # Evaluate the model on the validation set
    val_loss, val_predictions, val_targets = evaluate_model(trained_model, val_loader, criterion, device=DEVICE)
    print(f"Validation MSE Loss: {val_loss:.4f}")
    
    # Save predictions and targets for further evaluation
    np.savetxt('val_predictions.txt', val_predictions, fmt='%.6f')
    np.savetxt('val_targets.txt', val_targets, fmt='%.6f')
    
    # Optionally, visualize some predictions vs actual
    plt.figure(figsize=(12, 6))
    
    # Plot Pitch
    plt.subplot(2, 1, 1)
    plt.plot(val_targets[:, 0], label='Actual Pitch', color='blue')
    plt.plot(val_predictions[:, 0], label='Predicted Pitch', color='green', linestyle='--')
    plt.title('Actual vs Predicted Pitch')
    plt.xlabel('Sample')
    plt.ylabel('Pitch (degrees)')
    plt.legend()
    plt.grid(True)
    
    # Plot Yaw
    plt.subplot(2, 1, 2)
    plt.plot(val_targets[:, 1], label='Actual Yaw', color='blue')
    plt.plot(val_predictions[:, 1], label='Predicted Yaw', color='green', linestyle='--')
    plt.title('Actual vs Predicted Yaw')
    plt.xlabel('Sample')
    plt.ylabel('Yaw (degrees)')
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.show()

if __name__ == '__main__':
    main()


FileNotFoundError: [Errno 2] No such file or directory: 'data/0_features.pt'