In [1]:
#|default_exp testing
#|export

from LendingClubAutoencoder import preprocessing, autoencoders, training

import torch
import torch.nn.functional as F
from torcheval.metrics.functional import multiclass_f1_score

from datetime import datetime, timedelta

In [2]:
#|test

import json

In [None]:
#|export

def test_error_score(reconstruction:torch.Tensor, x:torch.Tensor, not_null_mask:torch.Tensor, binary_mask:torch.Tensor)->tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
    '''
    Calculates the RMSE for numeric features and F1 score for binary features in the VAE reconstruction, using masks to ignore missing values.

    Args:
        reconstruction (torch.Tensor): The reconstructed input from the VAE decoder.
        x (torch.Tensor): The original input tensor.
        not_null_mask (torch.Tensor): Mask indicating non-missing values in the input.
        binary_mask (torch.Tensor): Mask indicating which features are binary.

    Returns:
        tuple: (rmse_loss, f1_score) for numeric and binary features respectively.
    '''
    inverse_binary_mask = ~binary_mask

    binary_mask = not_null_mask & binary_mask[None, :]# Calculate which features are numeric and which are binary
    numeric_mask = not_null_mask & inverse_binary_mask[None, :]
    
    # RMSE
    mse_loss = F.mse_loss(reconstruction[numeric_mask], x[numeric_mask], reduction='mean')# Only compute MSE for non-masked values
    rmse_loss = torch.sqrt(mse_loss)

    # F1 Score
    binary_values = (x[binary_mask] > 0.5).long()
    predicted_values = (reconstruction[binary_mask] > 0.5).long()
    f1_score = multiclass_f1_score(predicted_values, binary_values, num_classes=2, average='macro')
        
    return rmse_loss, f1_score


In [1]:
#|export

def test_vae(model_file_name, test_loader, sigmoid_mask, binary_mask, device):
    '''
    Evaluates a trained VAE model on a test set, returning average RMSE and F1 score.

    Args:
        model_file_name (str): Path to the saved model file.
        test_loader (DataLoader): DataLoader for the test set.
        sigmoid_mask (torch.Tensor): Mask for features to apply sigmoid activation.
        binary_mask (torch.Tensor): Mask for binary features.
        device (torch.device): Device to run the evaluation on.

    Returns:
        tuple: (average_rmse_loss, average_f1_score) across all test batches.
    '''
    model = training.get_best_model(autoencoders.VariationalAutoencoder, sigmoid_mask, model_file_name)#Loads model
    
    model.eval()# Sets model into evaluation mode
    test_rmse_loss = 0
    test_f1_score = 0
    n_batches = 0

    #Test loop
    with torch.no_grad():
        for data_batch, mask_batch in test_loader:
            data_batch, mask_batch = data_batch.to(device), mask_batch.to(device)
            reconstruction, mean = model(data_batch)
            
            rmse_loss, f1_score = test_error_score(reconstruction, data_batch, mask_batch, binary_mask)
            
            test_rmse_loss = test_rmse_loss + rmse_loss.item()
            test_f1_score = test_f1_score + f1_score.item()
            n_batches = n_batches + 1

    average_rmse_loss = test_rmse_loss / n_batches
    average_f1_score = test_f1_score/ n_batches

    return average_rmse_loss, average_f1_score



In [8]:
#|export

def cross_validate_vae(lending_club_data_handler: preprocessing.DataHandler, start: int, end: int, sigmoid_mask: torch.Tensor, binary_mask: torch.Tensor, learning_rate: float = 1e-3, n_folds: int = 5 ):
    '''
    Performs k-fold cross-validation for the VAE model using a time-based sliding window.

    Args:
        lending_club_data_handler (DataHandler): Handler for loading and masking data.
        start (datetime): Start date for the data window.
        end (datetime): End date for the data window.
        sigmoid_mask (torch.Tensor): Mask for features to apply sigmoid activation.
        binary_mask (torch.Tensor): Mask for binary features.
        learning_rate (float, optional): Learning rate for the optimiser. Defaults to 1e-3.
        n_folds (int, optional): Number of cross-validation folds. Defaults to 5.

    Returns:
        dict: Test losses and F1 scores for each fold.
    '''

    total_days = (end - start).days

    # Outlines propotion of each dataset
    train_size = 0.7
    validation_size = 0.15
    test_size = 0.15

    # Calculate window sizes for train, validation, and test splits
    window_size = total_days / (1+train_size*(n_folds-1))
    train_step_size = window_size * train_size
    validation_step_size = window_size * validation_size
    test_step_size = window_size * test_size

    fold_test_losses = {}

    current_start = start

    # Loops over each fold
    for fold in range(n_folds):
        # Defines train, validation, and test periods for this fold
        train_start = current_start
        train_end = train_start + timedelta(days=int(train_step_size))

        validation_start = train_end + timedelta(days=1)
        validation_end = validation_start + timedelta(days=int(validation_step_size))

        test_start = validation_end + timedelta(days=1)
        test_end = test_start + timedelta(days=int(test_step_size))

        current_start = validation_start
        
        # Ensure we don't go past the end date
        if test_end > end:
            test_end = end

        train_data, train_mask = lending_club_data_handler.get_train_data(train_start, train_end)
        validation_data, validation_mask = lending_club_data_handler.get_test_data(validation_start, validation_end)
        test_data, test_mask = lending_club_data_handler.get_test_data(test_start, test_end)

        train_loader = preprocessing.to_torch_dataloader(train_data,train_mask)
        validation_loader = preprocessing.to_torch_dataloader(validation_data,validation_mask)
        test_loader = preprocessing.to_torch_dataloader(test_data,test_mask)

        # Instantiate model and optimiser 
        model = autoencoders.VariationalAutoencoder(input_size=len(train_data[0]), sigmoid_mask=sigmoid_mask)
        optimiser = torch.optim.Adam(model.parameters(), lr=learning_rate)#original is 1e-3

        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

        # Train model for this fold 
        training.train_variational_autoencoder(model, optimiser, train_loader, validation_loader, binary_mask=binary_mask, device=device)

        # Evaluate best model for this fold
        model_file_name = f'trained_models/vae_best-input_size:{len(train_data[0])}.pt'
        
        test_rmse_loss, test_f1_score = test_vae(model_file_name, test_loader, device)

        fold_test_losses[fold]={'rmse_loss':test_rmse_loss, 'fq_score':test_f1_score}
        print(f'Test Loss for Fold {fold+1}: {test_rmse_loss:.4f}, {test_f1_score:.4f}')

    return fold_test_losses