### Start

In [None]:
!pip install yacs --quiet
!pip install lightning-bolts --quiet
!pip install mlflow --quiet
!pip install pyngrok --quiet

In [None]:
import os
import torch
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader, sampler
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
import copy
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.model_selection import StratifiedKFold

import mlflow
import mlflow.pytorch

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")


#### GPU

In [None]:
import torch
def use_gpu():
  cuda_dev = '0' #GPU device 0 (can be changed if multiple GPUs are available)
  use_cuda = torch.cuda.is_available()
  device = torch.device("cuda:" + cuda_dev if use_cuda else "cpu")
  print('Device: ' + str(device))
  if use_cuda:
      print('GPU: ' + str(torch.cuda.get_device_name(int(cuda_dev)))) 
  set_gpu = lambda x=True: torch.set_default_tensor_type(torch.cuda.FloatTensor if torch.cuda.is_available() and x else torch.FloatTensor)
  set_gpu()
  return device

device = use_gpu()

In [None]:
import os
from google.colab import drive
drive.mount('/content/drive') # Outputs will be saved in your google drive

import sys
sys.path.insert(0, "/content/drive/MyDrive/Imbalance-data/Imbalance-data/src")

%cd /content/drive/MyDrive/Imbalance-data/Data/
!pwd

In [None]:
%load_ext autoreload
%autoreload 2

from dataloaders.PTB_dataset import PTBDataLoader
from models.models import ResNet18_1D
from utils.transforms import ToTensor
from utils.help_functions import set_seeds
from train_function import TrainModule
from config import get_cfg_from_yaml

#### Config

In [None]:
from config import get_cfg_from_yaml
config_file = '/content/drive/MyDrive/Imbalance-data/Imbalance-data/config.yaml'
cfg = get_cfg_from_yaml(config_file)
cfg.freeze()

### Dataset

In [None]:
ptb_data_loader_2cls = PTBDataLoader(cfg,
                  train_transform=transforms.Compose([ToTensor()]),
                  val_transform=transforms.Compose([ToTensor()]),
                  test_transform=transforms.Compose([ToTensor()]),
                  downsample=None, normalize=True,
                  two_class=True,
                )

dataloaders_two_cls = ptb_data_loader_2cls.create_dataloaders()

ptb_data_loader = PTBDataLoader(cfg,
                  train_transform=transforms.Compose([ToTensor()]),
                  val_transform=transforms.Compose([ToTensor()]),
                  test_transform=transforms.Compose([ToTensor()]),
                  downsample=None, normalize=True,
                  two_class=False,
                )

dataloaders = ptb_data_loader.create_dataloaders()

In [None]:
for batch_idx, (batch_samples, w) in enumerate(dataloaders_two_cls['train']):
    print(batch_samples['ecg'].shape)
    print(batch_samples['label'].squeeze())
    break

## Cross Validation

#### run cross validation

###### monitor: uar

In [None]:
from models.losses import LogitAdjustLoss, FocalLoss, CrossEntropyLoss, instance_weighted_loss
from sklearn.metrics import confusion_matrix
from utils.help_functions import Voting, compute_metrics_from_confusion_matrix

mlflow.end_run()
with mlflow.start_run(run_name='focal_loss', experiment_id=exp.experiment_id):
    with mlflow.start_run(run_name='two_class_focal', experiment_id=exp.experiment_id, nested=True):
        mlflow.log_text(cfg.dump(), 'config.yaml')

        model = ResNet18_1D(num_classes=2).to(cfg.DEVICE)

        train_loader = dataloaders_two_cls['train']
        val_loader = dataloaders_two_cls['val']

        train_func = TrainModule(cfg, 
                    model, 
                    train_loader, val_loader, 
                    loss_func=FocalLoss(alpha=[1,10], gamma=1, size_average=True),
                    use_instance_weight=False,
                    posthoc_adjustment=False)

        model, UAR_val, matrix_val = train_func.train()

        print('='*30 + 'TEST' + '='*30)
        pred_2_class, label_2_class = train_func.test(dataloaders_two_cls['test'])
    
    with mlflow.start_run(run_name='four_class_focal', experiment_id=exp.experiment_id, nested=True):
        # mlflow.log_text(cfg.dump(), 'config.yaml')

        model = ResNet18_1D(num_classes=5).to(cfg.DEVICE)

        train_loader = dataloaders['train']
        val_loader = dataloaders['val']

        train_func = TrainModule(cfg, 
                    model, 
                    train_loader, val_loader, 
                    loss_func=FocalLoss(alpha=[1,3,3,5,0], gamma=1, size_average=True),
                    use_instance_weight=False,
                    posthoc_adjustment=False)

        model, UAR_val, matrix_val = train_func.train()

        print('='*30 + 'TEST' + '='*30)
        pred_5_class, label_5_class = train_func.test(dataloaders['test'])

    for i in range(len(label_5_class)):
        if pred_2_class[i] == 1:
            pred_5_class[i] = 4
    
    matrix_test = confusion_matrix(np.asarray(label_5_class), 
                                    np.asarray(pred_5_class),
                                    labels=range(5))

    UAR_test, acc_test, metrics_test, fig = compute_metrics_from_confusion_matrix(matrix_test, visualize=True)            
    results = {'test_uar_final': np.round(UAR_test,3),
                'test_acc_final': np.round(acc_test,3),
                'test_recall_final': np.round(metrics_test['recall'],3),
            }
    mlflow.log_params(results)
    mlflow.log_figure(fig, "test_confusion_matrix_final.png")

In [None]:
model = ResNet18_1D(num_classes=2).to(cfg.DEVICE)

train_loader = dataloaders_two_cls['train']
val_loader = dataloaders_two_cls['val']

train_func = TrainModule(cfg, 
                        model, 
                        train_loader, val_loader, 
                        loss_func=LogitAdjustLoss(base_probs=[0.95,0.05], tau=1.0),
                        use_instance_weight=False,
                        posthoc_adjustment=False)

model, UAR_val, matrix_val = train_func.train()

print('='*30 + 'TEST' + '='*30)
pred_2_class, label_2_class = train_func.test(dataloaders_two_cls['test'])

### Test

In [None]:
model.eval()
test_loader = dataloaders['test']

base_probs = torch.tensor([0.8, 0.2])
tau = torch.tensor(1.0)

loss_test = 0
num_samples = 0
correct_test_pos = 0
correct_test_neg = 0
all_test_pos = 0
all_test_neg = 0

pred_all_test = [None]*len(test_loader)
label_all_test = [None]*len(test_loader)
name_all_test = [None]*len(test_loader)
with torch.no_grad():
    for batch_idx, batch_samples in enumerate(test_loader):
        mfcc,label,wavname = batch_samples['mfcc'].to(cfg.DEVICE,dtype=torch.float32),batch_samples['label'].to(cfg.DEVICE),batch_samples['wavname']

        pred = model(mfcc)

        loss_test += loss_func(pred, label).item()
        pred = pred - torch.log(torch.Tensor(base_probs**tau + 1e-12).to(device,dtype=torch.float32))
        num_samples += 1


        pred_all_test[batch_idx] = pred.argmax(dim=1).squeeze().tolist()
        label_all_test[batch_idx] = label.squeeze().tolist()
        name_all_test[batch_idx] = list(wavname)
# validation loss
loss_test = loss_test / num_samples
# UAR
name_all_test = sum(name_all_test, [])
pred_voting, label_voting = Voting(OriginalName['test'], name_all_test, pred_all_test, label_all_test, gpu=True)
UAR_test, _, _ = ComputeUAR(pred_voting, label_voting, phase='test')
#UAR_test_all = (correct_train_pos/all_train_pos + correct_train_neg/all_train_neg)/2 * 100
print('Test \tLoss: {:.6f}  UAR:{}\tUAR_all: {} subjects'.format(loss_test, UAR_test, num_samples),'\n')