## ***Setting up***

In [None]:
# Importing libraries

import torch # PyTorch
import torch.nn as nn # For convenience when defining a model
import torchvision # To access pre-trained deep learning models
import torchvision.transforms as transforms # For applying pre-processing and data augmentation to the image data
from torchvision.datasets import ImageFolder # For defining the storage location of image data
from torch.utils.data import TensorDataset # PyTorch uses datasets comprised of tensors
from torch.utils.data.dataloader import DataLoader # For loading mini-batches during training process
from torch.utils.data import random_split, Dataset, Subset, WeightedRandomSampler # For generating a random split between the training and testing data,
#   For creating a dataset object (I think), For taking a subset of a dataset.


import numpy as np # For manipulating arrays and other python object types. 'np' is standard shorthand.
import pandas as pd # For manipulating dataframes. 'pd' is standard shorthand.
from collections import Counter # To count images in each class
import matplotlib.pyplot as plt
import math # Some basic mathematic functions
import warnings # For checking this a working as expected
import sklearn
import sklearn.metrics # For easy calculation of performance metrics
#import matplotlib.pyplot as plt
import csv
import os

In [None]:
import warnings

# Suppress all warnings
warnings.filterwarnings("ignore")

##***Functions***

In [None]:
def foldSizes(dataset, k):
  """
  Takes the dataset and the number of desired folds and determines the number of examples in each fold
  """
  fold_base_size = math.floor(len(dataset)/k) # the minimum size of a fold.
  fold_sizes = [fold_base_size]*5 # a list containing the sizes of each fold
  # Calculating the un-allocated images
  remaining_count = len(dataset) % k
  # Distributing the un-allocated images across the folds
  for i in range(remaining_count):
    fold_sizes[i] += 1

  # Checking all images are allocated
  if sum(fold_sizes) != len(dataset):
    warnings.warn("Not all iamges are allocated to folds")

  return fold_sizes

In [None]:
def allFoldsTrainTestIndices(seed, dataset, fold_sizes):
  """
  Outputs a list containing the train and test set indices for each of the k-folds, given a random seed.
  This can be used for completing the 3x 5-fold CV in parts rather than all at once.
  """
  # Set seed for random split
  torch.manual_seed(seed)

  # Create the indices for each fold
  folds = torch.utils.data.random_split(dataset, fold_sizes)
  # Extracting indices
  fold_indices = []
  for fold in folds:
    fold_indices.append(fold.indices)

  # Creating train and test sets (ie. combining indices) for each fold

  train_test_idx_folds = [] # Storage containing train and test indices for all folds

  for fold in range(len(fold_sizes)): # len(fold_sizes) is effectively 'k'
    train_idx = [] # Storage
    test_idx = []  # Storage
    train_test_idx_fold = [] # Storage for both the above lists

    # Using respective folds to create train and test sets
    for i in range(k):
      if i == fold:
        test_idx = fold_indices[i]
      else:
        train_idx = train_idx + fold_indices[i]

      # Checking that there is no overlap
    if len(set(train_idx).intersection(test_idx)): # If there is any overlap between the training and testing sets
      warnings.warn("Train and test set images are not mutually exclusive")

    # Making a list of lists containing train indicies and test indices
    train_test_idx_fold.append(train_idx)
    train_test_idx_fold.append(test_idx)

    # Append train and test indices for this fold to a list containing this for all folds
    train_test_idx_folds.append(train_test_idx_fold)
  return train_test_idx_folds

In [None]:
def trainBaseModel(model, num_epochs, learn_rate, dl_train, dl_test,
                   patience=20, min_delta=1e-4):
  """
  Training loop with early stopping based on F1-score.
  NOTE: Early stopping is performed on test data (not ideal).
  """

  device = torch.device('cuda:0')
  model.to(device)

  best_f1 = 0
  best_acc = 0
  best_re = 0
  best_pr = 0
  best_epoch = 0

  f1_scores = []
  epochs_no_improve = 0

  optimizer = torch.optim.Adam(params=model.parameters(), lr=learn_rate)
  criterion = nn.CrossEntropyLoss()

  for epoch in range(num_epochs):

    # -----------------------
    # TRAIN
    # -----------------------
    model.train()
    for images, labels in dl_train:
      images = images.to(device)
      labels = labels.to(device)

      outputs = model(images)
      loss = criterion(outputs, labels)

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

    # -----------------------
    # EVALUATE
    # -----------------------
    model.eval()
    test_preds = []
    test_labels = []

    with torch.no_grad():
      for images, labels in dl_test:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        test_preds += predicted.tolist()
        test_labels += labels.tolist()

    test_preds = torch.tensor(test_preds).cpu()
    test_labels = torch.tensor(test_labels).cpu()

    accuracy = sklearn.metrics.accuracy_score(test_labels, test_preds)
    recall = sklearn.metrics.recall_score(test_labels, test_preds, average="macro")
    precision = sklearn.metrics.precision_score(test_labels, test_preds, average="macro")
    f1_score = sklearn.metrics.f1_score(test_labels, test_preds, average="macro")

    f1_scores.append(f1_score)

    # -----------------------
    # EARLY STOPPING CHECK
    # -----------------------
    if f1_score > best_f1 + min_delta:
      best_f1 = f1_score
      best_acc = accuracy
      best_re = recall
      best_pr = precision
      best_epoch = epoch
      epochs_no_improve = 0

      torch.save(model, '/kaggle/working/model.pth')

    else:
      epochs_no_improve += 1

    if epoch % 10 == 0:
      print(f"Epoch [{epoch+1}/{num_epochs}] | F1: {f1_score:.4f}")

    if epochs_no_improve >= patience:
      print(f"Early stopping triggered at epoch {epoch+1}")
      break

  # -----------------------
  # RESULTS
  # -----------------------
  print('Results:\n \
  Accuracy  = {:.3f}% \n \
  Recall    = {:.3f}% \n \
  Precision = {:.3f}% \n \
  F1 Score  = {:.3f}% \n \
  Best Epoch = {}'.format(
      best_acc, best_re, best_pr, best_f1, best_epoch + 1
  ))

  plt.plot(f1_scores)
  plt.xlabel("Epoch")
  plt.ylabel("F1-score")
  plt.show()

  return best_acc, best_re, best_pr, best_f1

In [None]:
def trainMetaClassifier(model, num_epochs, learn_rate, momentum, weight_decay,
                        dl_train, dl_test, patience=10, min_delta=1e-4):
  """
  Training loop for the Meta-Classifier with early stopping.
  NOTE: Early stopping is performed on test data (not ideal).
  """

  device = torch.device('cuda:0')
  model.to(device)

  best_f1 = 0
  best_acc = 0
  best_re = 0
  best_pr = 0
  best_epoch = 0

  epochs_no_improve = 0

  optimizer = torch.optim.SGD(
      params=model.parameters(),
      lr=learn_rate,
      momentum=momentum,
      weight_decay=weight_decay
  )

  criterion = nn.CrossEntropyLoss()

  for epoch in range(num_epochs):

    # -----------------------
    # TRAIN
    # -----------------------
    model.train()
    for inputs, labels in dl_train:
      inputs = inputs.to(device)
      labels = labels.to(device)

      outputs = model(inputs)
      loss = criterion(outputs, labels)

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

    # -----------------------
    # EVALUATE
    # -----------------------
    model.eval()
    test_preds = []
    test_labels = []

    with torch.no_grad():
      for inputs, labels in dl_test:
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        test_preds += predicted.tolist()
        test_labels += labels.tolist()

    test_preds = torch.tensor(test_preds).cpu()
    test_labels = torch.tensor(test_labels).cpu()

    accuracy = sklearn.metrics.accuracy_score(test_labels, test_preds)
    recall = sklearn.metrics.recall_score(test_labels, test_preds, average="macro")
    precision = sklearn.metrics.precision_score(test_labels, test_preds, average="macro")
    f1_score = sklearn.metrics.f1_score(test_labels, test_preds, average="macro")

    # -----------------------
    # EARLY STOPPING CHECK
    # -----------------------
    if f1_score > best_f1 + min_delta:
      best_f1 = f1_score
      best_acc = accuracy
      best_re = recall
      best_pr = precision
      best_epoch = epoch
      epochs_no_improve = 0

      torch.save(model, '/kaggle/working/model.pth')

    else:
      epochs_no_improve += 1

    if epoch % 10 == 0:
      print(f"Epoch [{epoch+1}/{num_epochs}] | F1: {f1_score:.4f}")

    if epochs_no_improve >= patience:
      print(f"Early stopping triggered at epoch {epoch+1}")
      break

  # -----------------------
  # RESULTS
  # -----------------------
  print('Results:\n \
  Accuracy  = {:.3f}% \n \
  Recall    = {:.3f}% \n \
  Precision = {:.3f}% \n \
  F1 Score  = {:.3f}% \n \
  Best Epoch = {}'.format(
      best_acc, best_re, best_pr, best_f1, best_epoch + 1
  ))

  return best_acc, best_re, best_pr, best_f1

In [None]:
  # Softmax function
def softmax(x):
  return np.exp(x) / np.sum(np.exp(x), axis=0)

In [None]:
# Generate predictions using the trained model
def generateCNNPredictions(data_path, transformations, train_test_idx, batch_size, seed, device='cuda:0'):
  """
  This function produces predictions for a single fold using the trained model saved to the temporary location.
  These will form part of the train and test sets for the meta-classifier.
  """

  # Load the data with test set transformations applied (ie. no data augmentation)
  dataset = ImageFolder(data_path, transform=transformations)
  train_test_idx[0].sort() # Sorting in place, not required.
  train_test_idx[1].sort() # ^
  ds_train = Subset(dataset, indices=train_test_idx[0])
  ds_test = Subset(dataset, indices=train_test_idx[1])

  # We need to redefine the train set sampler to ensure all base-models predict on the data in the same order
  labels_train = [dataset.targets[i] for i in train_test_idx[0]]
  counts = dict(Counter(labels_train))
  counts = np.array(list(counts.values())) # converting counts to a numpy array
  # Assigning a sampling weight to each class
  weights = 1. / counts
  # Getting weights for each image (ie. weight for the class that each image belongs to)
  samples_weight = np.array([weights[t] for t in labels_train])
  samples_weight = torch.from_numpy(samples_weight) # Converting to a tensor
  samples_weight = samples_weight.double() # Converting elements to doubles for some reason
  # Defining the sampler
  g = torch.Generator()
  g.manual_seed(seed)
  sampler = WeightedRandomSampler(samples_weight, len(samples_weight), replacement=True, generator=g)
  # Now defining the dataloaders
  dl_train = DataLoader(ds_train, batch_size=batch_size, shuffle=False, sampler=sampler)
  dl_test = DataLoader(ds_test, batch_size=batch_size, shuffle=False)
  # Loading the model from save
  model = torch.load('/kaggle/working/model.pth', weights_only=False)
  # Sending to device for faster computing
  model.to(device)

  # Generating predictions

  # Train set
  # Storage
  preds_train = []
  labels_train = []
  model.eval() # Set model to evaluation mode, we aren't training so we want the best predictions possible
  with torch.no_grad(): # As we aren't performing backprop, we don't need gradients
    for i, (images, labels) in enumerate(dl_train):
      images = images.to(device)
      labels = labels.to(device)
      preds = model(images)
      preds_train = preds_train + preds.tolist()
      labels_train = labels_train + labels.tolist()

  # Test set
  # Storage
  preds_test = []
  labels_test = []
  model.eval() # Set model to evaluation mode, we aren't training so we want the best predictions possible
  with torch.no_grad(): # As we aren't performing backprop, we don't need gradients
    for i, (images, labels) in enumerate(dl_test):
      images = images.to(device)
      labels = labels.to(device)
      preds = model(images)
      preds_test = preds_test + preds.tolist()
      labels_test = labels_test + labels.tolist()

  # Converting to softmax
  # Train
  for i in range(len(preds_train)):
    preds_train[i] = softmax(preds_train[i])
  # Test
  for i in range(len(preds_test)):
    preds_test[i] = softmax(preds_test[i])

  # Convert to dataframes for easier saving
  df_train = pd.merge(pd.DataFrame(labels_train), pd.DataFrame(preds_train), left_index=True, right_index=True)
  df_test = pd.merge(pd.DataFrame(labels_test), pd.DataFrame(preds_test), left_index=True, right_index=True)
  # Adjusting column names
  col_names = ['labels', '0', '1', '2', '3']
  df_train.columns = col_names
  df_test.columns = col_names

  return df_train, df_test



In [None]:
def generateMCPredictions(dl_train, dl_test, batch_size, device='cuda:0'):
  """
  Generates predictions using the trained and temporarily saved meta-classifier
  """
    # Loading the model from save
  model = torch.load('/kaggle/working/model.pth', weights_only=False)
  # Sending to device for faster computing
  model.to(device)

  # Generating predictions
  #
  # Train set
  # Storage
  preds_train = []
  labels_train = []
  model.eval() # Set model to evaluation mode, we aren't training so we want the best predictions possible
  with torch.no_grad(): # As we aren't performing backprop, we don't need gradients
    for i, (inputs, targets) in enumerate(dl_train):
      inputs = inputs.to(device)
      targets = targets.to(device)
      preds = model(inputs)
      preds_train = preds_train + preds.tolist()
      labels_train = labels_train + targets.tolist()

  # Test set
  # Storage
  preds_test = []
  labels_test = []
  model.eval() # Set model to evaluation mode, we aren't training so we want the best predictions possible
  with torch.no_grad(): # As we aren't performing backprop, we don't need gradients
    for i, (inputs, targets) in enumerate(dl_test):
      inputs = inputs.to(device)
      targets = targets.to(device)
      preds = model(inputs)
      preds_test = preds_test + preds.tolist()
      labels_test = labels_test + targets.tolist()

  # Converting to softmax
  # Train
  for i in range(len(preds_train)):
    preds_train[i] = softmax(preds_train[i])
  # Test
  for i in range(len(preds_test)):
    preds_test[i] = softmax(preds_test[i])

  # Convert to dataframes for easier saving
  df_train = pd.merge(pd.DataFrame(labels_train), pd.DataFrame(preds_train), left_index=True, right_index=True)
  df_test = pd.merge(pd.DataFrame(labels_test), pd.DataFrame(preds_test), left_index=True, right_index=True)
  # Adjusting column names
  col_names = ['labels', '0', '1', '2', '3']
  df_train.columns = col_names
  df_test.columns = col_names

  return df_train, df_test

In [None]:
# Identity class for modifying ResNet-34
#   Setting any layer to this class will effectively remove the layer.
class Identity(nn.Module):
    def __init__(self):
        super(Identity, self).__init__()

    def forward(self, x):
        return x

In [None]:
def getModel(model_name):
    """
    Loads and returns a pre-trained CNN
    """
    pretrained = True
    num_classes=4
    
    if model_name == 'vgg16':
        model = torchvision.models.vgg16(pretrained=pretrained)
        model.classifier[6] = nn.Linear(in_features=4096, out_features=num_classes, bias=True)
    elif model_name == 'vgg19':
        model = torchvision.models.vgg19(pretrained=pretrained)
        model.classifier[6] = nn.Linear(in_features=4096, out_features=num_classes, bias=True)
    elif model_name == 'resnet':
        model = torchvision.models.resnet34(pretrained=pretrained)
        model.fc = nn.Linear(in_features=256, out_features=num_classes, bias=True)
        model.layer4 = Identity()
    elif model_name == 'densenet':
        model = torchvision.models.densenet161(pretrained=pretrained)
        model.classifier = nn.Linear(in_features=2208, out_features=num_classes, bias=True)
    
    # Returning the required model
    return model

In [None]:
import os

def save_name_generator(cv_config):
    """
    Generates the path to save the generated predictions to.
    """
    # cv_config is a dataframe row containing the info for the model fit that was just done
    model_name, repeat, fold, = cv_config['model'], cv_config['repeat'], cv_config['fold']
    path_base = '/kaggle/working/'
    repeats = ['repeat 0/', 'repeat 1/', 'repeat 2/']
    # repeats = ['repeat 0/']
    folds = ['fold 0/', 'fold 1/', 'fold 2/', 'fold 3/', 'fold 4/']
    repeat = repeats[repeat]
    fold = folds[fold]
    train_path = path_base + repeat + fold + model_name + '_train.csv'
    test_path = path_base + repeat + fold + model_name + '_test.csv'
    os.makedirs(os.path.dirname(train_path), exist_ok=True)  # Create directory for train_path
    os.makedirs(os.path.dirname(test_path), exist_ok=True)   # Create directory for test_path
    return train_path, test_path

In [None]:
def MyDataset(path):
  """
  For preparing meta-classifier input data to use with PyTorch.
  """
  # Load dataframe from CSV file
  df = pd.read_csv(path)
  # Define inputs and outputs
  df_inputs = df.iloc[:,1:] # Select all columns except first (labels col)
  df_targets = df.iloc[:,0] # Select first column
  # Performing a necessary conversion from dataframe object
  inputs = torch.from_numpy(df_inputs.values)
  targets = torch.from_numpy(df_targets.values)
  # Converting to a dataset object
  ds = TensorDataset(inputs.float(), targets)
  # Return the final dataset object we wanted
  return ds

In [None]:
# The meta-classifier class
class metaClassifier(nn.Module):
    def __init__(self):
      super(metaClassifier, self).__init__()
      # Here we define the layers
      self.fc1 = nn.Linear(20, 32) # The input layer. It takes 20 inputs, and outputs 32
      self.bn1 = nn.BatchNorm1d(32)
      self.dropout1 = nn.Dropout(0.2)
      self.fc2 = nn.Linear(32, 32) # Takes the 32 outputted from fc1
      self.bn2 = nn.BatchNorm1d(32)
      self.dropout2 = nn.Dropout(0.2)
      self.fc3 = nn.Linear(32, 5) # Takes the 32 from fc2, outputs our 5 classes

    def forward(self, x):
      # Here we define how the data will pass through the layers
      x = self.fc1(x)
      x = self.bn1(x)
      x = self.dropout1(x)
      x = self.fc2(x)
      x = self.bn2(x)
      x = self.dropout2(x)
      out = self.fc3(x)
      return out

In [None]:
def getSampler(subset_train):
  """Generates the weighted random sampler using the train subset as input"""
  # Getting train set indices
  # Getting labels after sorting indices so weights are applied correctly
  subset_train.indices.sort()
  labels_train = [subset_train.dataset.targets[i] for i in subset_train.indices]

  # Counting
  counts = dict(Counter(labels_train))
  counts = np.array(list(counts.values())) # converting counts to a numpy array

  # Assigning a sampling weight to each class
  weights = 1. / counts

  # Getting/assigning weights for each image (ie. weight for the class that each image belongs to)
  samples_weight = np.array([weights[t] for t in labels_train])
  samples_weight = torch.from_numpy(samples_weight) # Converting to a tensor
  samples_weight = samples_weight.double() # Convrting elements to doubles for some reason

  # Defining the sampler
  sampler = WeightedRandomSampler(samples_weight, len(samples_weight), replacement=True)
  return sampler

## ***Base-Models***

In [None]:
class TransformSubset(torch.utils.data.Dataset):
    def __init__(self, subset, transform):
        self.subset = subset
        self.transform = transform

    def __len__(self):
        return len(self.subset)

    def __getitem__(self, idx):
        img, label = self.subset[idx]
        if self.transform is not None:
            img = self.transform(img)
        return img, label

In [None]:
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score  # Example metric for evaluation
from collections import defaultdict

# Data preparation

# Defining the transformations
transformations_train = transforms.Compose([
    transforms.CenterCrop(70),  # Crop the center to 70x70
    # transforms.Resize(224),
    transforms.RandomVerticalFlip(0.5),
    # transforms.RandomHorizontalFlip(0.5),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

transformations_test = transforms.Compose([
    transforms.CenterCrop(70),  # Crop the center to 70x70
    # transforms.Resize(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Loading the image datasets
data_dir_real = '/kaggle/input/sperm-datasets/Stained/HuSHem'
data_dir_syn  = '/kaggle/input/sperm-datasets/Stained/HuSHem_synthetic'
dataset_real_train = ImageFolder(data_dir_real, transform=transformations_train)
dataset_real_test  = ImageFolder(data_dir_real, transform=transformations_test)
dataset_syn = ImageFolder(data_dir_syn, transform=None)

# Generate fold sizes and the indices for K-fold CV (since you're not using CSV)
k = 5  # Number of folds for K-fold CV
fold_sizes = foldSizes(dataset_real_train, k)  # Number of images in each fold

# Getting train-test indices for all folds and repeats
seeds = [0, 69, 420]  # The random split is dependent on the seeds chosen.
train_test_idx_all = []  # Storage for train-test indices

# synthetic_lookup[class_idx][filename] = synthetic_dataset_index
synthetic_lookup = defaultdict(dict)

for idx, (path, label) in enumerate(dataset_syn.samples):
    fname = os.path.basename(path)
    synthetic_lookup[label][fname] = idx

# Generate indices for each seed and store them
for seed in seeds:
    train_test_idx_all = train_test_idx_all + allFoldsTrainTestIndices(seed, dataset_real_train, fold_sizes)

# train_test_idx_all is of length 15 (3 x 5-fold CV). To easily iterate over it in the loop below, let's put 4 train_test_idx_all's together.
train_test_idx_all = train_test_idx_all

# Defining hyperparameters for each base-model
hyperparameters_all = [[100, 1e-4, 32], [100, 1e-4, 32], [100, 1e-4, 32], [100, 1e-4, 32]]
model_names = ['vgg16', 'vgg19', 'resnet', 'densenet']


# Loop over each model
for model_name in model_names:
    metrics = []
    # Get the corresponding hyperparameters
    model_hyperparameters = hyperparameters_all[model_names.index(model_name)]
    
    # Loop through train-test splits for each seed and fold
    for idx, train_test_idx in enumerate(train_test_idx_all):
        # Create DataLoader for training and testing
        # -----------------------
        # REAL TRAIN / TEST
        # -----------------------
        real_train_idx = train_test_idx[0]
        real_test_idx  = train_test_idx[1]
        
        # -----------------------
        # MATCHED SYNTHETIC TRAIN
        # -----------------------
        syn_train_idx = []
        
        for real_idx in real_train_idx:
            real_path, real_label = dataset_real_train.samples[real_idx]
            fname = os.path.basename(real_path)
        
            if fname in synthetic_lookup[real_label]:
                syn_idx = synthetic_lookup[real_label][fname]
                syn_train_idx.append(syn_idx)
        
        # -----------------------
        # FINAL TRAIN DATASET
        # -----------------------
        ds_train_real = Subset(dataset_real_train, real_train_idx)
        ds_train_syn_raw = Subset(dataset_syn, syn_train_idx)
        
        ds_train_syn = TransformSubset(
            ds_train_syn_raw,
            transform=transformations_train
        )
        
        ds_train = torch.utils.data.ConcatDataset([
            ds_train_real,
            ds_train_syn
        ])

        # ds_train = Subset(dataset_real_train, real_train_idx)

        # -----------------------
        # TEST DATASET (REAL ONLY)
        # -----------------------
        ds_test = Subset(dataset_real_test, real_test_idx)
        
        dl_train = DataLoader(ds_train, batch_size=model_hyperparameters[2], shuffle=True)
        dl_test = DataLoader(ds_test, batch_size=len(ds_test), shuffle=False)

        print(f"Fold {idx}: real train = {len(real_train_idx)}, synthetic added = {len(syn_train_idx)}")
        
        # Get the model
        model = getModel(model_name)
        
        # Training
        best_acc, best_re, best_pr, best_f1 = trainBaseModel(model=model,
                       num_epochs=model_hyperparameters[0],
                       learn_rate=model_hyperparameters[1],
                       dl_train=dl_train,
                       dl_test=dl_test)
    
        # Use idx % len(seeds) to cycle through the seeds list if it exceeds the length of seeds
        seed = seeds[idx % len(seeds)]  # Cycle through seeds
    
        # Generate predictions using the trained model
        df_preds_train, df_preds_test = generateCNNPredictions(data_path=data_dir_real,
                                                              transformations=transformations_test,
                                                              train_test_idx=train_test_idx,
                                                              batch_size=model_hyperparameters[2],
                                                              seed=seed)  # Use the corresponding seed
    
        # Save predictions using the save_name_generator function, passing the cv_config for this run
        cv_config = {"model": model_name, "repeat": idx // 5, "fold": idx % 5}  # Create the config for this fold
        train_path, test_path = save_name_generator(cv_config)  # Pass the config here
    
        # Save predictions
        df_preds_train.to_csv(train_path, index=False)
        df_preds_test.to_csv(test_path, index=False)
    
        metrics.append((best_acc, best_re, best_pr, best_f1))
    
        # Print progress
        print(f"Completed training and prediction for {model_name}, fold {idx + 1}, accuracy: {best_acc}")
    
    metrics = np.array(metrics)
    
    mean_acc = metrics[:, 0].mean()
    std_acc  = metrics[:, 0].std(ddof=1)
    
    mean_re  = metrics[:, 1].mean()
    std_re   = metrics[:, 1].std(ddof=1)
    
    mean_pr  = metrics[:, 2].mean()
    std_pr   = metrics[:, 2].std(ddof=1)
    
    mean_f1  = metrics[:, 3].mean()
    std_f1   = metrics[:, 3].std(ddof=1)
    
    print(f"Accuracy : {mean_acc*100:.2f} ± {std_acc*100:.2f}")
    print(f"Recall   : {mean_re*100:.2f} ± {std_re*100:.2f}")
    print(f"Precision: {mean_pr*100:.2f} ± {std_pr*100:.2f}")
    print(f"F1-score : {mean_f1*100:.2f} ± {std_f1*100:.2f}")

## ***Meta-Classifier***

In [None]:
import os
import pandas as pd

# =========================
# Meta-classifier data preparation
# =========================

# Defining the path variables
repeats = ['repeat 0/', 'repeat 1/', 'repeat 2/'] 
# repeats = ['repeat 0/']
folds = ['fold 0/', 'fold 1/', 'fold 2/', 'fold 3/', 'fold 4/']
model_names = ['vgg16', 'vgg19', 'resnet', 'densenet']
path_base = '/kaggle/working/'

# Expected column names for final meta-classifier input
col_names = (
    ['labels'] +
    [f'vgg16_{i}' for i in range(5)] +
    [f'vgg19_{i}' for i in range(5)] +
    [f'resnet_{i}' for i in range(5)] +
    [f'densenet_{i}' for i in range(5)]
)

# =========================
# Iterating through each CV config
# =========================

for repeat in repeats:
    for fold in folds:

        train_list, test_list = [], []

        # -------------------------
        # Load and rename model predictions
        # -------------------------
        for model_name in model_names:

            train_file = f"{path_base}{repeat}{fold}{model_name}_train.csv"
            test_file = f"{path_base}{repeat}{fold}{model_name}_test.csv"

            if not (os.path.exists(train_file) and os.path.exists(test_file)):
                print(f"Warning: Missing files for {model_name} in {repeat}{fold}. Skipping.")
                continue

            df_train = pd.read_csv(train_file)
            df_test = pd.read_csv(test_file)

            # Rename probability columns to avoid collisions
            prob_cols = [c for c in df_train.columns if c != 'labels']

            df_train = df_train.rename(
                columns={c: f"{model_name}_{c}" for c in prob_cols}
            )
            df_test = df_test.rename(
                columns={c: f"{model_name}_{c}" for c in prob_cols}
            )

            train_list.append(df_train)
            test_list.append(df_test)

        # Skip if no data loaded
        if len(train_list) == 0 or len(test_list) == 0:
            print(f"Skipping {repeat}{fold} due to missing files.")
            continue

        # -------------------------
        # Merge predictions from different models
        # -------------------------
        mc_train = train_list[0]
        mc_test = test_list[0]

        for i in range(1, len(train_list)):
            mc_train = mc_train.merge(
                train_list[i].drop(columns=['labels']),
                left_index=True,
                right_index=True
            )
            mc_test = mc_test.merge(
                test_list[i].drop(columns=['labels']),
                left_index=True,
                right_index=True
            )

        # -------------------------
        # Final sanity checks
        # -------------------------
        assert mc_train.shape[1] == len(col_names), \
            f"Column mismatch in train: {mc_train.shape[1]} vs {len(col_names)}"
        assert mc_test.shape[1] == len(col_names), \
            f"Column mismatch in test: {mc_test.shape[1]} vs {len(col_names)}"

        # -------------------------
        # Assign final column names
        # -------------------------
        mc_train.columns = col_names
        mc_test.columns = col_names

        # -------------------------
        # Save final meta-classifier inputs
        # -------------------------
        mc_train.to_csv(f"{path_base}{repeat}{fold}mc_train_inputs.csv", index=False)
        mc_test.to_csv(f"{path_base}{repeat}{fold}mc_test_inputs.csv", index=False)

        print(f"Saved meta-classifier inputs for {repeat}{fold}")

In [None]:
# Training the meta-classifier

# Defining path variables (for saving the predictions)
repeats = ['repeat 0/', 'repeat 1/', 'repeat 2/']
# repeats = ['repeat 0/']
folds = ['fold 0/', 'fold 1/', 'fold 2/', 'fold 3/', 'fold 4/']
path_base = '/kaggle/working/'

# Meta-classifier hyperparameters
num_epochs, learn_rate, batch_size, momentum, weight_decay = 200, 7.801e-2, 47, 0.9855, 5.526e-2

# This training loop operates differently to the Base-model one.
#   This one will not resume from the most recent fit.
#   As the meta-classifier trains very quickly, this should be fine.

# Training the meta-classifier
for repeat in repeats:
  for fold in folds:
    # Load data and convert to tensor dataset
    ds_train = MyDataset(path_base + repeat + fold + 'mc_train_inputs.csv')
    ds_test = MyDataset(path_base + repeat + fold + 'mc_test_inputs.csv')
    # Define dataloaders
    dl_train = DataLoader(ds_train, batch_size=batch_size, shuffle=True)
    dl_test = DataLoader(ds_test, batch_size=batch_size, shuffle=True)

    # Define the model
    model = metaClassifier()

    # Training
    trainMetaClassifier(model=model,
          num_epochs=num_epochs,
          learn_rate=learn_rate,
          momentum=momentum,
          weight_decay=weight_decay,
          dl_train=dl_train,
          dl_test=dl_test)

    # Generate predictions
    df_preds_train, df_preds_test = generateMCPredictions(dl_train=dl_train,
                                                          dl_test=dl_test,
                                                          batch_size=batch_size)
    # Saving predictions
    train_path = path_base + repeat + fold + 'mc_train_outputs.csv'
    test_path = path_base + repeat + fold + 'mc_test_outputs.csv'
    df_preds_train.to_csv(train_path, index=False)
    df_preds_test.to_csv(test_path, index=False)

In [None]:
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score
import numpy as np
import pandas as pd

all_model_metrics_train = []
all_model_metrics_test  = []

repeats = ['repeat 0/', 'repeat 1/', 'repeat 2/']
folds   = ['fold 0/', 'fold 1/', 'fold 2/', 'fold 3/', 'fold 4/']
path_base = '/kaggle/working/'

for r in repeats:
    for k in folds:

        mc_data_train = pd.read_csv(path_base + r + k + 'mc_train_outputs.csv')
        mc_data_test  = pd.read_csv(path_base + r + k + 'mc_test_outputs.csv')

        y_train = mc_data_train["labels"]
        y_test  = mc_data_test["labels"]

        # Argmax over class probabilities
        y_pred_train = mc_data_train.iloc[:, 1:].idxmax(axis=1).astype(int)
        y_pred_test  = mc_data_test.iloc[:, 1:].idxmax(axis=1).astype(int)

        # Train metrics
        all_model_metrics_train.append([
            f1_score(y_train, y_pred_train, average="macro"),
            recall_score(y_train, y_pred_train, average="macro"),
            precision_score(y_train, y_pred_train, average="macro"),
            accuracy_score(y_train, y_pred_train)
        ])

        # Test metrics
        all_model_metrics_test.append([
            f1_score(y_test, y_pred_test, average="macro"),
            recall_score(y_test, y_pred_test, average="macro"),
            precision_score(y_test, y_pred_test, average="macro"),
            accuracy_score(y_test, y_pred_test)
        ])

# Convert to DataFrames
col_names = ['f1-score', 'recall', 'precision', 'accuracy']

df_train = pd.DataFrame(all_model_metrics_train, columns=col_names)
df_test  = pd.DataFrame(all_model_metrics_test,  columns=col_names)

# Mean ± std (sample std)
mean_train = df_train.mean()
std_train  = df_train.std(ddof=1)

mean_test = df_test.mean()
std_test  = df_test.std(ddof=1)

# Format as percentages
avg_metrics_train = pd.DataFrame({
    col: [f"{mean_train[col]*100:.2f} ± {std_train[col]*100:.2f}"]
    for col in col_names
})

avg_metrics_test = pd.DataFrame({
    col: [f"{mean_test[col]*100:.2f} ± {std_test[col]*100:.2f}"]
    for col in col_names
})

print("Train (mean ± std, %):")
display(avg_metrics_train)

print("Test (mean ± std, %):")
display(avg_metrics_test)