In [1]:
import os
import numpy as np
import torch
from torch.utils.data import DataLoader, random_split
from torchvision import transforms
import torch.nn as nn
import torch.optim as optim

import rppg
%matplotlib inline

In [2]:
def training_script(name, model, root_dir, root_dir_val=None, clip=False, clip_length=64, batch_size=4, num_epochs=30):
    """
    Train and test the given model on the given frame-level dataset. If root_dir_val is given, then root_dir is the directory of the training set. Otherwise, the dataset at root_dir is split into training and validation sets.
    :param name: Name of the experiment
    :param root_dir: Root directory of the (entire/training) dataset
    :param model: Model to train and test
    :param root_dir_val: Root directory of the validation dataset
    :param clip: If False, train and test on frame-level dataset. If True, train and test on clip-level dataset.
    """
    # Create experiment directory
    exp_dir = os.path.join("results", name)
    os.makedirs(exp_dir, exist_ok=True)

    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    if not clip:
        if root_dir_val is None:
            # Load dataset
            dataset = rppg.rPPGFrameDataset(root_dir, transform=transforms.ToTensor())

            # Split dataset into training and validation sets
            train_size = int(0.8 * len(dataset))
            val_size = len(dataset) - train_size
            train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
        else:
            # Load training and validation datasets
            train_dataset = rppg.rPPGFrameDataset(root_dir, transform=transforms.ToTensor())
            val_dataset = rppg.rPPGFrameDataset(root_dir_val, transform=transforms.ToTensor())
    else:
        if root_dir_val is None:
            # Load dataset
            dataset = rppg.rPPGClipDataset(root_dir, clip_length=clip_length, transform=transforms.ToTensor())

            # Split dataset into training and validation sets
            train_size = int(0.8 * len(dataset))
            val_size = len(dataset) - train_size
            train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
        else:
            # Load training and validation datasets
            train_dataset = rppg.rPPGClipDataset(root_dir, clip_length=clip_length, transform=transforms.ToTensor())
            val_dataset = rppg.rPPGClipDataset(root_dir_val, clip_length=clip_length, transform=transforms.ToTensor())

    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

    # Print dataset sizes
    if not clip:
        print(f"Training set size: {len(train_dataset) * batch_size} frames")
        print(f"Validation set size: {len(val_dataset) * batch_size} frames")
    else:
        print(f"Training set size: {len(train_dataset) * batch_size} clips")
        print(f"Validation set size: {len(val_dataset) * batch_size} clips")

    # Create model
    model = model.to(device)

    # Create loss function and optimiser
    criterion = nn.L1Loss()
    optimiser = optim.Adam(model.parameters(), lr=0.001)

    # Train model
    best_model, train_losses, val_losses = rppg.train_model(model, train_loader, val_loader, criterion, optimiser, num_epochs, device=device)

    # Save model
    torch.save(best_model.state_dict(), os.path.join(exp_dir, "model.pt"))

    # Test model
    test_loss, predictions, labels = rppg.test_model(best_model, val_loader, criterion, device=device)

    # Save losses, predictions, and labels
    np.savez(os.path.join(exp_dir, "metrics.npz"), train_losses=train_losses, val_losses=val_losses, test_loss=test_loss, predictions=predictions, labels=labels)

    # Plot losses
    rppg.plot_losses(train_losses, val_losses)

    # Plot predictions vs labels
    rppg.plot_predictions(predictions, labels, clip=clip)