## Mount Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Install Requirements

In [None]:
!pip install torchaudio torch torchvision scikit-learn pandas matplotlib openpyxl

Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (14.1 MB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl (731.7 MB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch)
  Using cached nvidia_cublas_cu12-12.1.3.1-py3-none-manylinux1_x86_64.whl (410.6 MB)
Collecting nvidia-cufft-cu12==11.0.2.54 (from torch)
  Using cached nvidia_cufft_cu12-11.0.2.54-py3-none-manylinux1_x86_64.whl (121.6 MB)
Collecting nvidia-curand-cu12==10.3.2.106 (from torch)
  Using cached nvidia_curand_cu12-10.3.2.106-py3-none-manylinux1_x86_64.whl (56.5 MB)
Collectin

## Data Processing

In [None]:
import argparse
import csv
import os
import shutil
import glob

import numpy as np

# Column names
colNames = ['coltime', 'gsr', 'ecg', 'emg_trapezius', 'emg_corrugator', 'emg_zygomaticus']

# Label dicts
pain_labels = {'BL1': 0,'PA1': 1,'PA2': 2,'PA3': 3,'PA4': 4}

def process_bioVid(data_dir, output_dir):
    """
    :param output_dir: processed data directory
    :param data_dir: data directory
    :return: Save BioVid csv files to npz
    """
    # data_dir_files contain 87 subjects' pain signals,
    # Each subject has 5*20 (pain intensity level: 5; replicates: 20) samples
    data_dir_files = os.listdir(data_dir)
    i = 0
    # Dimension of files: 5*20
    for files in data_dir_files:
        parent_path = os.path.join(data_dir, files)
        path = glob.glob(parent_path + '/*.csv')

        labels_per = []
        EDA_signals_per = []
        ECG_signals_per = []
        EMG_signals_per = []
        # One single csv file per iteration included one pain intensity level and its signals
        for csv_file in path:
            key = csv_file.split('-')[-2]
            label = pain_labels[key]
            EDA_signal = []
            ECG_signal = []
            EMG_signal = []
            with open(csv_file, newline = '') as f:
                next(f)
                reader = csv.reader(f, delimiter = '\t')
                for row in reader:
                    values = [float(num) for num in row]
                    EDA_signal.append(values[1])
                    ECG_signal.append(values[2])
                    EMG_signal.append(values[3])
            EDA_signals_per.append(EDA_signal)
            ECG_signals_per.append(ECG_signal)
            EMG_signals_per.append(EMG_signal)
            labels_per.append(label)
        x_eda = np.asarray(EDA_signals_per)
        x_ecg = np.asarray(ECG_signals_per)
        x_emg = np.asarray(EMG_signals_per)
        y = np.asarray(labels_per)
        print('Data Processed: {}/{}'.format(i+1, len(data_dir_files)))
        print('X_eda, X_ecg, X_emg, y shape:{},{},{}, {}'.format(x_eda.shape,x_ecg.shape,x_emg.shape, y.shape))

        filename = os.path.join(output_dir, data_dir_files[i] + '.npz')
        signals_dict = {"x_eda": x_eda,"x_ecg": x_ecg,"x_emg": x_emg,"y": y}
        np.savez(filename, **signals_dict)
        file_stats = os.stat(filename)
        print(f'File Size - {file_stats.st_size / (1024 * 1024)} MB')
        i += 1

    print("\n===================Done===================\n")


# if __name__ == '__main__':
#     input_dir = '/content/drive/MyDrive/BM5020/PartA/biosignals_filtered'
#     output_dir = 'processed_bioVid_partA'
#     if not os.path.exists(output_dir):
#         os.makedirs(output_dir)
#     else:
#         shutil.rmtree(output_dir)
#         os.makedirs(output_dir)
#     process_bioVid(input_dir,output_dir)

## Utility Functions

In [None]:
## Utils.py

import glob
import os

import numpy as np
import torch

from torch.utils.data import Dataset, DataLoader

import math
from pathlib import Path
import json
from collections import OrderedDict
from itertools import repeat
import pandas as pd

# Label dicts
pain_labels = {
    'BL1': 0,
    'PA1': 1,
    'PA2': 2,
    'PA3': 3,
    'PA4': 4
}

# Multiclass type
class_types_dict = {
    '0 vs 1 vs 2 vs 3 vs 4': 0,
    '0 vs 4': 1,
    '0 vs 1, 2, 3, 4': 2,
    '0, 1 vs 3, 4': 3,
    '0 vs 3, 4': 4,
    '0, 1 vs 4': 5,
    '0, 1 vs 3': 6,
    '0 vs 1, 2 vs 3, 4': 7
}


def generate_kfolds_index(npz_dir, k_folds) -> dict[int: list[str]]:
    """
    Generate k-folds dataset index and store into a dictionary. The length of dictionary is equal to the number of
    folds. Each element contains training set and testing set.
    :param npz_dir: npz files directory
    :param k_folds: the number of folds
    :return: a dict contains k-folds dataset paths, e.g. dict{0: [list[str(train_dir)], list[str(test_dir)]]..., k:[...]}
    """

    if os.path.exists(npz_dir):
        print('================= Creating KFolds Index =================')
    else:
        print('================= Data directory does not exist =================')

    npz_files = glob.glob(os.path.join(npz_dir, '*.npz'))
    npz_files = np.asarray(npz_files)
    kfolds_names = np.array_split(npz_files, k_folds)
    # print(kfolds_names)
    kfolds_index = {}
    for fold_index in range(0, k_folds):
        test_data = kfolds_names[fold_index].tolist()
        train_data = [files for i, files in enumerate(kfolds_names) if i != fold_index]
        train_data = [files for subfiles in train_data for files in subfiles]
        kfolds_index[fold_index] = [train_data, test_data]
    print('================= {} folds dataset created ================='.format(k_folds))
    return kfolds_index


class BioVidLoader(Dataset):
    """
    Input: a list of npz files' directories from k-folds index
    Output: a tensor of values and labels
    """
    def __init__(self, npz_files, label_converter, signal_types):
        super(BioVidLoader, self).__init__()

        if len(signal_types) == 1:
          # Load first npz file which is easy to handle for the rest
          x_values = np.load(npz_files[0])[f'x_{signal_types[0]}']

          # Load npz files starting from position 1
          for file in npz_files[1:]:
            x_values = np.vstack((x_values, np.load(file)[f'x_{signal_types[0]}']))
        else:
          # Load first npz file which is easy to handle for the rest
          x_values = np.load(npz_files[0])[f'x_{signal_types[0]}']
          for signal in signal_types[1:]:
            x_values = np.hstack((x_values, np.load(npz_files[0])[f'x_{signal}']))

          # Load npz files starting from position 1
          for file in npz_files[1:]:
            x_values_new = np.load(file)[f'x_{signal_types[0]}']
            for signal in signal_types[1:]:
              x_values_new = np.hstack((x_values_new, np.load(file)[f'x_{signal}']))
            x_values = np.vstack((x_values, x_values_new))


        # Load first npz file which is easy to handle for the rest
        y_labels = np.load(npz_files[0])['y']
        # Load npz files starting from position 1
        for file in npz_files[1:]:
          y_labels = np.append(y_labels, np.load(file)['y'])

        # Convert the original labels to the task labels, e.g. 4 --> 1
        y_labels = np.array([label_converter[str(label)] for label in y_labels])
        # Create masking indices, -1 as False
        mask = (y_labels != -1)

        # Remove all the values with False
        x_values = x_values[mask]
        y_labels = y_labels[mask]

        self.val = torch.from_numpy(x_values).float()
        self.lbl = torch.from_numpy(y_labels).long()

        # Change shape to (Batch size, Channel size, Length)
        self.val = self.val.unsqueeze(1)

    def __len__(self):
        return self.val.shape[0]

    def __getitem__(self, idx):
        return self.val[idx], self.lbl[idx]

    def __repr__(self):
        return '{}'.format(repr(self.val))

    def __str__(self):
        # BioVid: torch.Size([3440, 1, 2816]), torch.Size([3440])
        return 'The shape of values and labels: {}, {}'.format(self.val.shape, self.lbl.shape)


def load_data(train_set, valid_set,label_converter,batch_size,signal_types, num_workers = 0) -> tuple[DataLoader, DataLoader, list[int]]:
    """
    generate dataloader for both training dataset and validation dataset from one of the k-folds.
    :param train_set: training dataset
    :param valid_set: validation dataset
    :param label_converter: convert the original labels to the desired labels
    :param batch_size: batch size
    :param num_workers: 4*GPU
    :return: dataloader for training dataset, validation dataset, the number of samples for each class,
        e.g. two classes -> list[int,int]
    """
    train_dataset = BioVidLoader(train_set, label_converter, signal_types)
    valid_dataset = BioVidLoader(valid_set, label_converter, signal_types)

    cat_y = torch.cat((train_dataset.lbl, valid_dataset.lbl))

    # dist = [cat_y.count(i) for i in range(n_classes)]

    # e.g. two classes (tensor(list[lbl, lbl]), tensor(list[int, int]))
    unique_counts = cat_y.unique(return_counts = True)
    # number of samples for each class -> list[int, int]
    dist = unique_counts[1].tolist()

    # n_classes = len(unique_counts[0])

    # percent = [pop / sum(dist) for pop in dist]

    train_loader = DataLoader(train_dataset,
                              num_workers = num_workers,
                              batch_size = batch_size,
                              shuffle = True,
                              drop_last = False,
                              pin_memory = True)

    valid_loader = DataLoader(valid_dataset,
                              num_workers = num_workers,
                              batch_size = batch_size,
                              shuffle = False,
                              drop_last = False,
                              pin_memory = True)
    return train_loader, valid_loader, dist

## Metrics

In [None]:
"""
metrics_manager.py

This module contains the implementation of the metrics manager, and other metrics related
functions.
"""

import pandas as pd
import torch
import numpy as np
import os

from sklearn.metrics import f1_score, classification_report, cohen_kappa_score, confusion_matrix, accuracy_score


def accuracy(output, target):
    with torch.no_grad():
        pred = torch.argmax(output, dim=1)
        assert pred.shape[0] == len(target)
        correct = 0
        correct += torch.sum((pred == target).int()).item()
    return correct / len(target)


def f1(output, target):
    with torch.no_grad():
        pred = torch.argmax(output, dim=1)
        assert pred.shape[0] == len(target)
    return f1_score(pred.cpu().numpy(), target.data.cpu().numpy(), average='macro')


def _calc_metrics(checkpoint_dir,name, fold_id=None):

    n_folds = 87
    all_outs = []
    all_trgs = []

    outs_list = []
    trgs_list = []
    save_dir = os.path.abspath(os.path.join(checkpoint_dir, os.pardir))
    for root, dirs, files in os.walk(save_dir):
        for file in files:
            if "outs" in file:
                outs_list.append(os.path.join(root, file))
            if "trgs" in file:
                trgs_list.append(os.path.join(root, file))

    if fold_id is not None:
        outs = np.load(outs_list[fold_id])
        trgs = np.load(trgs_list[fold_id])
        all_outs.extend(outs)
        all_trgs.extend(trgs)
        save_dir = os.path.abspath(os.path.join(checkpoint_dir))
    elif len(outs_list) == n_folds:
        for i in range(len(outs_list)):
            outs = np.load(outs_list[i])
            trgs = np.load(trgs_list[i])
            all_outs.extend(outs)
            all_trgs.extend(trgs)

    all_trgs = np.array(all_trgs).astype(int)
    all_outs = np.array(all_outs).astype(int)

    r = classification_report(all_trgs, all_outs, digits = 6, output_dict = True)
    cm = confusion_matrix(all_trgs, all_outs)
    df = pd.DataFrame(r)
    df["cohen"] = cohen_kappa_score(all_trgs, all_outs)
    df["accuracy"] = accuracy_score(all_trgs, all_outs)
    df = df * 100
    if fold_id is not None:
      file_name = name + f"_classification_report_fold{fold_id}.xlsx"
    else:
      file_name = name + f"_classification_report_all_fold.xlsx"
    report_save_path = os.path.join(save_dir, file_name)
    df.to_excel(report_save_path)

    if fold_id is not None:
      cm_file_name = name + f"_confusion_matrix_fold{fold_id}.torch"
    else:
      cm_file_name = name + f"_confusion_matrix_all_fold.torch"
    cm_Save_path = os.path.join(save_dir, cm_file_name)
    torch.save(cm, cm_Save_path)


class MetricTracker:
    def __init__(self, *keys, writer=None):
        self.writer = writer
        self._data = pd.DataFrame(index=keys, columns=['total', 'counts', 'average'])
        self.reset()

    def reset(self):
        for col in self._data.columns:
            self._data[col].values[:] = 0

    def update(self, key, value, n=1):
        if self.writer is not None:
            self.writer.add_scalar(key, value)
        self._data.total[key] += value * n
        self._data.counts[key] += n
        self._data.average[key] = self._data.total[key] / self._data.counts[key]

    def avg(self, key):
        return self._data.average[key]

    def result(self):
        return dict(self._data.average)

## Main Trainer

In [None]:
## Main_trainer.py

import numpy as np
import torch
from numpy import inf

selected_d = {"outs": [], "trg": []}

def _save_checkpoint(model, optimizer, epoch, mnt_best, checkpoint_dir,fold_id, save_best = True):
    """
    Saving checkpoints

    :param model: model to be saved
    :param optimizer: optimizer to be saved
    :param epoch: current epoch number
    :param mnt_best: metric used to monitor the best model
    :param checkpoint_dir: directory where the checkpoint will be saved
    :param save_best: if True, rename the saved checkpoint to 'model_best.pth'
    """
    arch = type(model).__name__
    state = {
        'arch': arch,
        'epoch': epoch,
        'state_dict': model.state_dict(),
        'optimizer': optimizer.state_dict(),
        'monitor_best': mnt_best
    }
    filename = str('/content/saved/checkpoint-epoch{}-fold{}.pth'.format(epoch,fold_id))
    torch.save(state, filename)
    print("Saving checkpoint: {} ...".format(filename))
    if save_best:
        best_path = str(f'/content/saved/model_best_fold{fold_id}.pth')
        torch.save(state, best_path)
        print("Saving current best: model_best.pth ...")

def _prepare_device(n_gpu_use):
    """
    Setup GPU
    """
    n_gpu = torch.cuda.device_count()
    # n_gpu_use = min(n_gpu, n_gpu_use)
    if n_gpu_use > 0 and n_gpu == 0:
        print("Warning: There\'s no GPU available on this machine," "training will be performed on CPU.")
        n_gpu_use = 0
    if n_gpu_use > n_gpu:
        print("Warning: The number of GPU\'s configured to use is {}, but only {} are available on this machine.".format(n_gpu_use, n_gpu))
        n_gpu_use = n_gpu
    device = torch.device('cuda:0' if n_gpu_use > 0 else 'cpu')
    list_ids = list(range(n_gpu_use))
    return device, list_ids


class Trainer:
    def __init__(self, model, loss, optimizer, data_loader, fold_id, exp_name,valid_data_loader=None):

        # Basic configuration
        self.fold_id = fold_id
        self.device, device_ids = _prepare_device(1)
        self.exp_name = exp_name

        # Save checkpoint directory
        self.checkpoint_dir = '/content/saved'

        # Prepare model
        self.model = model.to(self.device)
        if len(device_ids) > 1:
            self.model = torch.nn.DataParallel(model, device_ids=device_ids)
        self.loss = loss
        self.metric_ftns = [eval(metric) for metric in ["accuracy"]]
        self.optimizer = optimizer

        # Trainer configurations
        self.epochs = 100
        self.save_period = 30
        self.monitor = "min val_loss"
        self.start_epoch = 1

        # Monitoring and early stopping setup
        self._setup_monitoring()

        # DataLoader and related attributes
        self.data_loader = data_loader
        self.len_epoch = len(self.data_loader)
        self.valid_data_loader = valid_data_loader
        self.do_validation = self.valid_data_loader is not None
        self.log_step = int(data_loader.batch_size) * 1

        # Metrics
        self.train_metrics = MetricTracker('loss', *[m.__name__ for m in self.metric_ftns])
        self.valid_metrics = MetricTracker('loss', *[m.__name__ for m in self.metric_ftns])

        self.curr_best = 0

    def _setup_monitoring(self):
        """
        Helper function to setup monitoring configuration
        """
        if self.monitor == 'off':
            self.mnt_mode = 'off'
            self.mnt_best = 0
        else:
            self.mnt_mode, self.mnt_metric = self.monitor.split()
            assert self.mnt_mode in ['min', 'max']
            self.mnt_best = inf if self.mnt_mode == 'min' else -inf
            self.early_stop = inf

    def train(self):
        """
        Training loops
        """
        not_improved_count = 0
        all_outs = []
        all_trgs = []

        for epoch in range(self.start_epoch, self.epochs + 1):
            result, epoch_outs, epoch_trgs = self._train_epoch(epoch, self.epochs)

            # Save logged informations into log dict
            log = {'epoch': epoch}
            log.update(result)
            all_outs.extend(epoch_outs)
            all_trgs.extend(epoch_trgs)
            # Print logged information to the screen
            for key, value in log.items():
                # 15s is the width of the column
                # key is the name of the metric
                print('    {:15s}: {}'.format(str(key), value))

            # Evaluate model performance according to configured metric, save_best checkpoint as model_best
            best = False #True
            if self.mnt_mode != 'off':
                try:
                    # Check whether model performance improved or not, according to specified metric(mnt_metric)
                    improved = (self.mnt_mode == 'min' and log[self.mnt_metric] <= self.mnt_best) or \
                               (self.mnt_mode == 'max' and log[self.mnt_metric] >= self.mnt_best)
                except KeyError:
                    print("Warning: Metric '{}' is not found. Model performance monitoring is disabled.".format(self.mnt_metric))
                    self.mnt_mode = 'off'
                    improved = False

                if improved:
                    self.mnt_best = log[self.mnt_metric]
                    not_improved_count = 0
                    best = True
                else:
                    not_improved_count += 1

                if not_improved_count > self.early_stop:
                    print("Validation performance didn\'t improve for {} epochs. Training stops.".format(self.early_stop))
                    break

            if epoch % self.save_period == 0:
                _save_checkpoint(self.model, self.optimizer, epoch, self.mnt_best,
                                 self.checkpoint_dir,self.fold_id, save_best = True)

        outs_name = "outs_" + str(self.fold_id)
        trgs_name = "trgs_" + str(self.fold_id)
        np.save(f'/content/saved/{outs_name}', all_outs)
        np.save(f'/content/saved/{trgs_name}', all_trgs)

        # Save the metrics for the last fold
        _calc_metrics('/content/saved',self.exp_name, fold_id=self.fold_id)

        # Save the metrics for the entire training
        if self.fold_id == 87 - 1:
            _calc_metrics(self.checkpoint_dir,self.exp_name)

    def _train_epoch(self, epoch, total_epochs):
        """
        Training logic for an epoch

        :param epoch: Integer, current training epoch.
               total_epochs: Integer, the total number of epoch
        :return: A log that contains average loss and metric in this epoch.
        """
        self.model.train()
        self.train_metrics.reset()
        overall_outs = []
        overall_trgs = []
        for batch_idx, (data, target) in enumerate(self.data_loader):
            data, target = data.to(self.device), target.to(self.device)

            self.optimizer.zero_grad()
            output = self.model(data)

            loss = self.loss(output, target)

            loss.backward()
            self.optimizer.step()

            self.train_metrics.update('loss', loss.item())
            for met in self.metric_ftns:
                self.train_metrics.update(met.__name__, met(output, target))

            # When log_step == batch_size, log only shows for one entire batch
            # print('Train Epoch: {} {} Loss: {:.6f} '.format(epoch,self._progress(batch_idx),loss.item(),))
            if batch_idx % self.log_step == 0:
              print('Train Epoch: {}'.format(epoch,))

            if batch_idx == self.len_epoch:
                break

        log = self.train_metrics.result()

        if self.do_validation:
            val_log, outs, trgs = self._valid_epoch(epoch)
            log.update(**{'val_' + k: v for k, v in val_log.items()})
            if val_log["accuracy"] > self.curr_best:
                self.curr_best = val_log["accuracy"]
                selected_d["outs"] = outs
                selected_d["trg"] = trgs
            if epoch == total_epochs:
                overall_outs.extend(selected_d["outs"])
                overall_trgs.extend(selected_d["trg"])

            # THIS part is to reduce the learning rate after 10 epochs to 1e-4
            if epoch == 10:
                for g in self.optimizer.param_groups:
                    g['lr'] = 1e-4

        return log, overall_outs, overall_trgs

    def _valid_epoch(self, epoch):
        """
        One validation epoch after each training epoch

        :param epoch: Integer, current training epoch.
        :return: A log that contains information about validation
        """
        self.model.eval()
        self.valid_metrics.reset()
        with torch.no_grad():
            outs = np.array([])
            trgs = np.array([])
            for batch_idx, (data, target) in enumerate(self.valid_data_loader):
                data, target = data.to(self.device), target.to(self.device)
                output = self.model(data)
                loss = self.loss(output, target)

                self.valid_metrics.update('loss', loss.item())
                for met in self.metric_ftns:
                    self.valid_metrics.update(met.__name__, met(output, target))

                preds_ = output.data.max(1, keepdim=True)[1].cpu()

                outs = np.append(outs, preds_.cpu().numpy())
                trgs = np.append(trgs, target.data.cpu().numpy())
        return self.valid_metrics.result(), outs, trgs


    def _progress(self, batch_idx):
        base = '[{}/{} ({:.0f}%)]'
        if hasattr(self.data_loader, 'n_samples'):
            current = batch_idx * self.data_loader.batch_size
            total = self.data_loader.n_samples
        else:
            current = batch_idx
            total = self.len_epoch
        return base.format(current, total, 100.0 * current / total)

## Model

In [None]:
"""
module_module_mscn.py

This module contains the implementation of the Multiscale Convolutional Network (MSCN) architecture.
It provides the MSCN class.
"""
import torch
import torch.nn as nn


class MSCN(nn.Module):
    def __init__(self, modality):
        super(MSCN, self).__init__()
        dropout = 0.5
        self.modality = modality

        self.short_scale = nn.Sequential(
            nn.Conv1d(1, 64, kernel_size=50, stride=6, dilation=1, bias=False, padding=24),
            nn.BatchNorm1d(64),
            nn.GELU(),
            nn.MaxPool1d(kernel_size=8, stride=2, padding=4),
            nn.Dropout(dropout),

            nn.Conv1d(64, 128, kernel_size=8, stride=1, bias=False, padding=4),
            nn.BatchNorm1d(128),
            nn.GELU(),

            nn.Conv1d(128, 64, kernel_size=8, stride=1, bias=False, padding=4),
            nn.BatchNorm1d(64),
            nn.GELU(),

            nn.MaxPool1d(kernel_size=4, stride=4, padding=2)
        )

        self.medium_scale = nn.Sequential(
            nn.Conv1d(1, 64, kernel_size=512, stride=42, dilation=1, bias=False, padding=256),
            nn.BatchNorm1d(64),
            nn.GELU(),
            nn.MaxPool1d(kernel_size=4, stride=4, padding=0),
            nn.Dropout(dropout),

            nn.Conv1d(64, 128, kernel_size=4, stride=1, bias=False, padding=3),
            nn.BatchNorm1d(128),
            nn.GELU(),

            nn.Conv1d(128, 64, kernel_size=4, stride=1, bias=False, padding=3),
            nn.BatchNorm1d(64),
            nn.GELU(),

            nn.MaxPool1d(kernel_size=2, stride=2, padding=1)
        )

        self.long_scale = nn.Sequential(
            nn.Conv1d(1, 64, kernel_size=1024, stride=84, dilation=1, bias=False, padding=512),
            nn.BatchNorm1d(64),
            nn.GELU(),
            nn.MaxPool1d(kernel_size=8, stride=8, padding=0),
            nn.Dropout(dropout),

            nn.Conv1d(64, 128, kernel_size=7, stride=1, bias=False, padding=3),
            nn.BatchNorm1d(128),
            nn.GELU(),

            nn.Conv1d(128, 64, kernel_size=7, stride=1, bias=False, padding=3),
            nn.BatchNorm1d(64),
            nn.GELU(),

            nn.MaxPool1d(kernel_size=2, stride=2, padding=1)
        )

        self.dropout = nn.Dropout(dropout)

        if self.modality == 1:
          self.fc_short = nn.Linear(60,75)
          self.fc_medium = nn.Linear(12,75)
          self.fc_long = nn.Linear(3,75)
        elif self.modality == 2:
          self.fc_short = nn.Linear(119,75)
          self.fc_medium = nn.Linear(20,75)
          self.fc_long = nn.Linear(5,75)
        elif self.modality == 3:
          self.fc_short = nn.Linear(177,75)
          self.fc_medium = nn.Linear(29,75)
          self.fc_long = nn.Linear(7,75)

    def forward(self, x):
        x_short = self.short_scale(x)
        # print(x_short.shape[2])
        x_medium = self.medium_scale(x)
        # print(x_medium.shape[2])
        x_long = self.long_scale(x)
        # print(x_long.shape[2])

        x_short = self.fc_short(x_short)
        x_medium = self.fc_medium(x_medium)
        x_long = self.fc_long(x_long)

        x_concat = torch.cat((x_short, x_medium, x_long), dim=1)

        x_concat = self.dropout(x_concat)

        return x_concat

In [None]:
"""
module_se_resnet.py

This module contains the implementation of the SEResNet architecture.
"""
import torch.nn as nn


class SENet(nn.Module):
    """
    Squeeze-and-Excitation block for channel-wise attention.
    """
    def __init__(self, channel, reduction=16):
        super(SENet, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Sequential(
            nn.Linear(channel, channel // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channel // reduction, channel, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1)
        return x * y.expand_as(x)


class SEBasicBlock(nn.Module):
    """
    Basic building block for squeeze-and-excitation networks with other layers.
    """
    expansion = 1

    def __init__(self, input_channels, output_channels, stride=1, downsample=None, groups=1,
                 base_width=64, dilation=1, norm_layer=None,
                 *, reduction=16):
        super(SEBasicBlock, self).__init__()
        self.conv1 = nn.Conv1d(input_channels, output_channels, stride)
        self.bn1 = nn.BatchNorm1d(output_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv1d(output_channels, output_channels, 1)
        self.bn2 = nn.BatchNorm1d(output_channels)
        self.se = SENet(output_channels, reduction)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        residual = x

        # First convolutional layer
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        # Second convolutional layer
        out = self.conv2(out)
        out = self.bn2(out)
        out = self.se(out)

        # Downsample if necessary
        if self.downsample is not None:
            residual = self.downsample(x)

        # Add residual connection and apply ReLU
        out += residual
        out = self.relu(out)

        return out


class SEResNet(nn.Module):
    """
    Residual network with squeeze-and-excitation blocks

    Downsampling is performed by conv1 when stride != 1 or
    the input_channels size is not equal to the output size.
    """
    def __init__(self, output_channels, block_size):
        super(SEResNet, self).__init__()
        self.input_channels = 192
        self.block = SEBasicBlock
        self.layer = self._make_layer(self.block, output_channels, block_size)

    # Create a layer with 'blocks' number of SEBasicBlock instances
    def _make_layer(self, block, output_channels, blocks, stride = 1):
        downsample = self._downsample_layer(self.input_channels, output_channels * block.expansion, stride)

        layers = [block(self.input_channels, output_channels, stride, downsample)]
        self.input_channels = output_channels * block.expansion
        layers.extend(block(self.input_channels, output_channels) for _ in range(1, blocks))

        return nn.Sequential(*layers)

    @staticmethod
    def _downsample_layer(input_channels, output_channels, stride):
        if stride != 1 or input_channels != output_channels:
            return nn.Sequential(
                nn.Conv1d(input_channels, output_channels,
                          kernel_size = 1, stride = stride, bias = False),
                nn.BatchNorm1d(output_channels)
            )
        return None

    def forward(self, x):
        return self.layer(x)

In [None]:
"""
module_transformer_encoder.py

This module contains the implementation of the Transformer Encoder architecture.
"""

import torch
import torch.nn as nn
import torch.nn.functional as F
import copy
from copy import deepcopy


# Utility function
def clones(module, N):
    """
    Generate N identical layers

    Args:
        module (nn.Module): PyTorch module to be cloned
        N (int): Number of clones to create

    Returns:
        nn.ModuleList: List of N cloned PyTorch modules
    """
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])


class TCN(nn.Module):
    """
    Temporal Convolutional Network with causal padding, residual connections, and batch normalization

    When kernel_size equals to zero, the padding is not causal
    """
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, dilation=1, causal=True):
        super(TCN, self).__init__()
        padding = (kernel_size - 1) * dilation if causal else 0
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size, stride, padding, dilation)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size, stride, padding, dilation)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.bn2 = nn.BatchNorm1d(out_channels)
        self.relu = nn.ReLU()
        self.causal = causal
        self.tcn_padding = padding
        self.downsample = None

        if in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv1d(in_channels, out_channels, 1),
                nn.BatchNorm1d(out_channels)
            )

    def forward(self, x):
        residual = x
        out = self.relu(self.bn1(self.conv1(x)))

        if self.causal:
            out = out[:, :, :-self.tcn_padding]

        out = self.bn2(self.conv2(out))

        if self.causal:
            out = out[:, :, :-self.tcn_padding]

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        out = self.relu(out)

        return out


class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, model_dim, se_reduced_size, dropout=0.1):
        super(MultiHeadAttention, self).__init__()
        self.tcn = clones(TCN(se_reduced_size, se_reduced_size, kernel_size=7), 2)
        # If set batch_first=True, for nn.MultiheadAttention()
        # then the input and output tensors are provided as (batch, seq, feature(channels))
        self.multihead_attention = nn.MultiheadAttention(se_reduced_size, num_heads, batch_first=True,
                                                         dropout=dropout)

    def forward(self, query, key, value):
        """
        We swap the seq and channel dimensions for the input tensors to
        meet the requirements of nn.MultiheadAttention()
        e.g. BioVid, (batch_size=128, seq_len=75, out_channels=30)
        :return: dimension (batch_size, out_channels, seq_len)
        """
        query = query.transpose(1, 2)
        key = self.tcn[0](key).transpose(1, 2)
        value = self.tcn[1](value).transpose(1, 2)
        attn_output, attn_output_weights = self.multihead_attention(query, key, value)
        # Swap back to the original dimensions
        attn_output = attn_output.transpose(1, 2)
        return attn_output

class MLP(nn.Module):
    """
    Multi-Layer Perceptron
    """
    def __init__(self, model_dim, d_mlp, dropout=0.1):
        super(MLP, self).__init__()
        self.w_1 = nn.Linear(model_dim, d_mlp)
        self.w_2 = nn.Linear(d_mlp, model_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        """
        MLP forward pass
        """
        return self.w_2(self.dropout(F.relu(self.w_1(x))))


class LayerNorm(nn.Module):
    """
    Layer Normalization
    """
    def __init__(self, features, eps=1e-6):
        super(LayerNorm, self).__init__()
        self.a_2 = nn.Parameter(torch.ones(features))
        self.b_2 = nn.Parameter(torch.zeros(features))
        print(self.a_2.shape, self.b_2.shape)
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.a_2 * (x - mean) / (std + self.eps) + self.b_2


class SublayerOutput(nn.Module):
    """
    Residual connection followed by a layer norm.
    """
    def __init__(self, se_reduced_size, dropout):
        super(SublayerOutput, self).__init__()
        self.norm = nn.LayerNorm(se_reduced_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        """
        Apply residual connection to any sublayer with the same size.
        """
        normalized_x = self.norm(x.transpose(1, 2)).transpose(1, 2)
        return x + self.dropout(sublayer(normalized_x))


class TransformerEncoder(nn.Module):
    """
    Transformer Encoder

    Integration of MHA and MLP.
    Each of these sublayers have residual and layer norm, implemented by SublayerOutput.
    """
    def __init__(self, model_dim, self_attn, feed_forward, se_reduced_size, dropout):
        super(TransformerEncoder, self).__init__()
        self.self_attn = self_attn
        self.feed_forward = feed_forward
        self.sublayer_output = clones(SublayerOutput(se_reduced_size, dropout), 2)
        self.size = model_dim
        self.conv = TCN(se_reduced_size, se_reduced_size, kernel_size=7)

    def forward(self, x_in):
        query = self.conv(x_in)
        # Encoder self-attention
        x = self.sublayer_output[0](query, lambda x: self.self_attn(query, x_in, x_in))
        return self.sublayer_output[1](x, self.feed_forward)


class EncoderWrapper(nn.Module):
    """
    Transformer Encoder Wrapper

    It is a stack of N layers of transformer encoder, default N=2.
    """
    def __init__(self, num_heads, model_dim, se_reduced_size, d_mlp, dropout, N):
        super(EncoderWrapper, self).__init__()
        attn = MultiHeadAttention(num_heads, model_dim, se_reduced_size)

        mlp = MLP(model_dim, d_mlp, dropout)
        layer = TransformerEncoder(model_dim, deepcopy(attn), deepcopy(mlp), se_reduced_size, dropout)

        self.layers = clones(layer, N)
        self.norm = nn.LayerNorm(layer.size)

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return self.norm(x)

In [None]:
"""
PainAttnNet with Ablation Support

"""
import torch
import torch.nn as nn

class PainAttnNet(nn.Module):
    """
    PainAttnNet model with Ablation Support
    """
    def __init__(self, modality, ablation_mode = 'full'):
        super(PainAttnNet, self).__init__()
        self.ablation_mode = ablation_mode
        num_classes = 2

        if self.ablation_mode == 'full':
          N = 2 # Number of Transformer Encoder Stacks
          model_dim = 75 # Model dimension from MSCN
          d_mlp = 120 # Dimension of MLP
          num_heads = 5 # Number of attention heads
          dropout = 0.1
          senet_reduced_size = 30 # Output SEResNet size

          self.mscn = MSCN(modality) # Multiscale Convolutional Network
          self.seresnet = SEResNet(senet_reduced_size, 1) # SEResNet
          self.encoderWrapper = EncoderWrapper(num_heads, model_dim, senet_reduced_size, d_mlp, dropout, N) # Transformer Encoder
          self.fc = nn.Linear(model_dim * senet_reduced_size, num_classes) # Fully connected layer to output the final prediction
        elif self.ablation_mode == 'mscn':
          model_dim = 75 # Model dimension from MSCN

          self.mscn = MSCN(modality) # Multiscale Convolutional Network
          self.fc = nn.Linear(192 * model_dim, num_classes) # Fully connected layer to output the final prediction
        elif self.ablation_mode == 'mscn_se':
          model_dim = 75 # Model dimension from MSCN
          senet_reduced_size = 30 # Output SEResNet size

          self.mscn = MSCN(modality) # Multiscale Convolutional Network
          self.seresnet = SEResNet(senet_reduced_size, 1) # SEResNet
          self.fc = nn.Linear(model_dim * senet_reduced_size, num_classes) # Fully connected layer to output the final prediction
        elif self.ablation_mode == 'mscn_tran':
          model_dim = 75 # Model dimension from MSCN
          N = 2 # Number of Transformer Encoder Stacks
          d_mlp = 120 # Dimension of MLP
          num_heads = 6 # Number of attention heads
          dropout = 0.1

          self.mscn = MSCN(modality) # Multiscale Convolutional Network
          self.encoderWrapper = EncoderWrapper(num_heads, model_dim, 192, d_mlp, dropout, N) # Transformer Encoder
          self.fc = nn.Linear(model_dim * 192, num_classes) # Fully connected layer to output the final prediction

    def forward(self, x):
      if self.ablation_mode == 'full':
        mscn_feat = self.mscn(x)
        se_feat = self.seresnet(mscn_feat)
        transformer_feat = self.encoderWrapper(se_feat)
        # Flatten the output of Transformer Encoder to feed into the fully connected layer
        transformer_feat = transformer_feat.contiguous().view(transformer_feat.shape[0], -1)
        final_output = self.fc(transformer_feat)
        return final_output
      elif self.ablation_mode == 'mscn':
        mscn_feat = self.mscn(x)
        mscn_feat = mscn_feat.contiguous().view(mscn_feat.shape[0], -1)
        final_output = self.fc(mscn_feat)
        return final_output
      elif self.ablation_mode == 'mscn_se':
        mscn_feat = self.mscn(x)
        se_feat = self.seresnet(mscn_feat)
        se_feat = se_feat.contiguous().view(se_feat.shape[0], -1)
        final_output = self.fc(se_feat)
        return final_output
      elif self.ablation_mode == 'mscn_tran':
        mscn_feat = self.mscn(x)
        transformer_feat = self.encoderWrapper(mscn_feat)
        transformer_feat = transformer_feat.contiguous().view(transformer_feat.shape[0], -1)
        final_output = self.fc(transformer_feat)
        return final_output

In [None]:
"""
Multimodal PainAttnNet

"""
import torch
import torch.nn as nn

class PainAttnNet_Multimodal(nn.Module):
    """
    Multimodal PainAttnNet model
    """
    def __init__(self, modality, fusion_type):
        super(PainAttnNet_Multimodal, self).__init__()
        self.modality = modality
        self.fusion_type = fusion_type

        if self.fusion_type == 'early':
          self.PainAttnNet = PainAttnNet(modality)
        elif self.fusion_type == 'late':
          if modality <= 1:
            raise ValueError("For late fusion, modality must be greater than 1.")
          model_dim = 75 # Model dimension from MSCN
          num_classes = 2
          senet_reduced_size = 30 # Output SEResNet size
          # Create multiple instances of Unimodal PainAttnNet
          self.model_list = nn.ModuleList([PainAttnNet(1) for _ in range(modality)])
          # Learnable parameters for linear combination
          self.weights = nn.Parameter(torch.ones(modality))
          self.fc = nn.Linear(model_dim * senet_reduced_size, num_classes)

    def forward(self, x):
        if self.fusion_type == 'early':
            # Return the output of the PainAttnNet with appropriate modality
            return self.PainAttnNet(x)
        elif self.fusion_type == 'late':
            # Split the input tensor x into chunks along the last dimension
            x_chunks = torch.chunk(x, self.modality, dim = -1)
            # Forward pass through each PainAttnNet model and combine transformer outputs
            transformer_outputs = [model.encoderWrapper(model.seresnet(model.mscn(x))) for model,x in zip(self.model_list, x_chunks)]
            # Apply linear combination
            combined_output = sum(weight * output for weight, output in zip(self.weights, transformer_outputs))
            # Flatten the combined output
            combined_output = combined_output.contiguous().view(combined_output.size(0), -1)
            # Feed the combined output to the final fully connected layer
            final_output = self.fc(combined_output)
            return final_output

## Model Parameters

In [None]:
for ablation_mode in ['full','mscn','mscn_se','mscn_tran']:
  for modality in [1,2,3]:
    model = PainAttnNet(modality,ablation_mode)
    print(f'{ablation_mode} {modality} - {sum(p.numel() for p in model.parameters() if p.requires_grad)}') # Trainable parameters
    #print(model)

full 1 - 558308
full 2 - 563483
full 3 - 568658
mscn 1 - 448988
mscn 2 - 454163
mscn 3 - 459338
mscn_se 1 - 437408
mscn_se 2 - 442583
mscn_se 3 - 447758
mscn_tran 1 - 3887000
mscn_tran 2 - 3892175
mscn_tran 3 - 3897350


In [None]:
for fusion_type in ['early','late']:
  for modality in [1,2,3]:
    if fusion_type != 'late' or modality != 1:
      model = PainAttnNet_Multimodal(modality,fusion_type)
      print(f'{fusion_type} {modality} - {sum(p.numel() for p in model.parameters() if p.requires_grad)}') # Trainable parameters
      #print(model)

early 1 - 558308
early 2 - 563483
early 3 - 568658
late 2 - 1121120
late 3 - 1679429


## Training Script

In [None]:
"""
train_kfold_cv.py

This module contains the implementation of the main training loop.
"""

import os
import torch
import torch.nn as nn

# Fix random seeds for reproducibility
SEED = 5012023
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = False
torch.backends.cudnn.benchmark = True
np.random.seed(SEED)


def weights_init_normal(m):
    """
    Initial weights
    """
    # torch.nn.init.xavier_normal_(m.weight.data)
    if type(m) == nn.Conv2d:
        torch.nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif type(m) == nn.Conv1d:
        torch.nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif type(m) == nn.BatchNorm1d:
        torch.nn.init.normal_(m.weight.data, 1.0, 0.02)
        torch.nn.init.constant_(m.bias.data, 0.0)


def train_kfold(exp_name,folds_data,fold_id,batch_size,model,label_converter,signal_types):

    model.apply(weights_init_normal)

    # Get optimizer
    trainable_params = filter(lambda p: p.requires_grad, model.parameters())
    optimizer = torch.optim.Adam(trainable_params)

    data_loader, valid_data_loader, data_count = load_data(folds_data[fold_id][0], folds_data[fold_id][1], label_converter, batch_size,signal_types)

    # May set different weights for different classes here
    loss = nn.CrossEntropyLoss()

    trainer = Trainer(model, loss, optimizer,data_loader=data_loader,fold_id=fold_id,exp_name=exp_name,valid_data_loader=valid_data_loader)
    trainer.train()

In [None]:
folder_name = 'saved'

# Check if the folder exists
if os.path.exists(folder_name):
    # If it exists, delete it
    !rm -r {folder_name}

# Create the empty folder
os.makedirs(folder_name)

In [None]:
folds_data = generate_kfolds_index('/content/drive/MyDrive/BM5020/processed_bioVid_partA',87)
batch_size = 128
label_converter = {"0": 0,"1": -1,"2": -1,"3": -1,"4": 1} # Other combinations --> T0 vs T1, T0 vs T2, T0 vs T3
signal_types = ['eda','ecg','emg']
modality = len(signal_types)

exp_type = 'a' # Change to 'f' for fusion/multimodal experiments

if exp_type == 'a':
  ablation_mode = 'mscn' # 'mscn' 'mscn_se' 'mscn_tran' 'full'
  model = PainAttnNet(modality,ablation_mode)
  exp_name = f"T0_vs_T4_ablation_{ablation_mode}_mod_{modality}"
elif exp_type == 'f':
  fusion_type = 'late' # 'early' 'late'
  model = PainAttnNet_Multimodal(modality,fusion_type)
  exp_name = f"T0_vs_T4_fusion_{fusion_type}_mod_{modality}"

train_kfold(exp_name,folds_data,0,batch_size,model,label_converter,signal_types)

# for fold_id in range(87):
#   print(f"================= Fold {fold_id} =================")
#   train_kfold(exp_name,folds_data,fold_id,batch_size,model,label_converter,signal_types)

Train Epoch: 1
    epoch          : 1
    loss           : 0.9017452641769692
    accuracy       : 0.5257936507936508
    val_loss       : 0.3802861273288727
    val_accuracy   : 0.875
Train Epoch: 2
    epoch          : 2
    loss           : 0.6887036937254446
    accuracy       : 0.5823826058201058
    val_loss       : 0.4570162892341614
    val_accuracy   : 0.8
Train Epoch: 3
    epoch          : 3
    loss           : 0.6472134987513224
    accuracy       : 0.6253720238095238
    val_loss       : 0.4146419167518616
    val_accuracy   : 0.825
Train Epoch: 4
    epoch          : 4
    loss           : 0.6487958696153429
    accuracy       : 0.6044973544973545
    val_loss       : 0.43803176283836365
    val_accuracy   : 0.8
Train Epoch: 5
    epoch          : 5
    loss           : 0.635691225528717
    accuracy       : 0.626942791005291
    val_loss       : 0.4599619507789612
    val_accuracy   : 0.8
Train Epoch: 6
    epoch          : 6
    loss           : 0.6207564892592253
    

In [None]:
os.rename('saved',exp_name)

In [None]:
new_folder_path = f'/content/{exp_name}'

# Path to the zip file
zip_file_path = new_folder_path + '.zip'

if os.path.exists(zip_file_path):
    os.remove(zip_file_path)

# Compress the folder into a zip file
!zip -r {zip_file_path} {os.path.basename(new_folder_path)}

  adding: T0_vs_T4_ablation_mscn_mod_3/ (stored 0%)
  adding: T0_vs_T4_ablation_mscn_mod_3/model_best_fold0.pth (deflated 8%)
  adding: T0_vs_T4_ablation_mscn_mod_3/trgs_0.npy (deflated 83%)
  adding: T0_vs_T4_ablation_mscn_mod_3/checkpoint-epoch30-fold0.pth (deflated 8%)
  adding: T0_vs_T4_ablation_mscn_mod_3/T0_vs_T4_ablation_mscn_mod_3_confusion_matrix_fold0.torch (deflated 62%)
  adding: T0_vs_T4_ablation_mscn_mod_3/outs_0.npy (deflated 80%)
  adding: T0_vs_T4_ablation_mscn_mod_3/checkpoint-epoch90-fold0.pth (deflated 8%)
  adding: T0_vs_T4_ablation_mscn_mod_3/T0_vs_T4_ablation_mscn_mod_3_classification_report_fold0.xlsx (deflated 11%)
  adding: T0_vs_T4_ablation_mscn_mod_3/checkpoint-epoch60-fold0.pth (deflated 8%)


In [None]:
from google.colab import files
files.download(f'{exp_name}.zip')
files.download(f'{exp_name}_classification_report_all_fold.xlsx')
files.download(f'{exp_name}_confusion_matrix_all_fold.torch')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

## References



- [PainAttnNet GitHub Code](https://github.com/zhenyuanlu/PainAttnNet)