In [None]:
# imports
import os
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim

import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, roc_curve
from sklearn.model_selection import train_test_split

from array_gzip_io_utils import load_from_gz_file
from SubModeles.EcgClassificationModel import EcgClassificationModel
import ecg_classification_helpers as classificationHelper

In [None]:
class ECGClassificationDataset(Dataset):
    def __init__(self, data_items):
        self.data_items = data_items

    def __len__(self):
        return len(self.data_items)

    def __getitem__(self, idx):
        data_item = self.data_items[idx]
        return data_item["ecg_image"], data_item["class"], data_item["class_number"], data_item["signal_name"], data_item["peak_time"]
    

def __collate_fn(data):
    data = np.array(data, dtype=object)
    signals = np.stack(data[:, 0])
    targets = np.stack(data[:, 2])
    
    signal_tensor = torch.from_numpy(signals)
    target_tensor = torch.from_numpy(targets)
    return signal_tensor, target_tensor 

In [None]:
# input data definition
if torch.cuda.is_available():
    device = 'cuda:0'
else:
    device = 'cpu'

SIGNAL_LENGTH = 8000
TEST_SET_SIZE = 0.2
EXPERIMENT = 1
ROOT_DATA_PATH = f"TempData/Data/{SIGNAL_LENGTH}/Test-set-{TEST_SET_SIZE}/{EXPERIMENT}"
TRAINED_MODEL_PATH = f"TempData/Models/{SIGNAL_LENGTH}/Test-set-{TEST_SET_SIZE}/{EXPERIMENT}"

train_array_path = f'{ROOT_DATA_PATH}/train-mit-arrhythmia-fs-400-prefered-leads.pkl.gz'
test_array_path = f'{ROOT_DATA_PATH}/test-mit-arrhythmia-fs-400-prefered-leads.pkl.gz'

print(TRAINED_MODEL_PATH)

In [None]:
# load train set
ecg_beats_train = classificationHelper.create_pathology_classification_dataset_by_ranges(
        load_from_gz_file(train_array_path),
        os.path.join(TRAINED_MODEL_PATH, "ECGAutoencoder-2-mit-china-qt-prefered-leads-fs-400-60epoch-CT1.pt"),
        350,
        350)

ecg_beats_train, validation_ecg_beats = train_test_split(ecg_beats_train, test_size=0.1, random_state=42)
print(len(ecg_beats_train))

In [None]:
# train - test functions
def train_epoch(model, train_dataset, val_dataset,  device, epoch_number=13, lr=0.0001):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss().to(device)

    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []

    for epoch in range(epoch_number):
        model.train()

        total_loss = 0
        predicted_clasess = np.empty(0, dtype=int)
        expected_clasess = np.empty(0, dtype=int)

        for i, batch in enumerate(train_dataset):
            inputs = batch[0].to(device, dtype=torch.float)
            targets = batch[1].to(device, dtype=torch.int64)

            signal_tensor = inputs.unsqueeze(1)
            predictions = model(signal_tensor)
            loss = criterion(predictions, targets)

            total_loss += loss.item()

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            classes = predictions.topk(k=1)[1].view(-1).cpu().numpy()
            predicted_clasess = np.concatenate((predicted_clasess, classes))
            expected_clasess = np.concatenate((expected_clasess, batch[1].numpy()))

        accurancy = accuracy_score(predicted_clasess, expected_clasess)#*100
        train_accuracies.append(accurancy)

        total_loss /= len(train_dataset)
        train_losses.append(total_loss)

    # Validation
        model.eval()
        val_loss = 0

        predicted_clasess = np.empty(0, dtype=int)
        expected_clasess = np.empty(0, dtype=int)

        with torch.no_grad():
            for i, batch in enumerate(val_dataset):
                inputs = batch[0].to(device, dtype=torch.float)
                targets = batch[1].to(device, dtype=torch.int64)

                signal_tensor = inputs.unsqueeze(1)
                predictions = model(signal_tensor)
                loss = criterion(predictions, targets)

                val_loss += loss.item()

                classes = predictions.topk(k=1)[1].view(-1).cpu().numpy()
                predicted_clasess = np.concatenate((predicted_clasess, classes))
                expected_clasess = np.concatenate((expected_clasess, batch[1].numpy()))

            accurancy = accuracy_score(predicted_clasess, expected_clasess)
            val_accuracies.append(accurancy)

            val_loss /= len(val_dataset)
            val_losses.append(val_loss)

        print(f'{epoch} epoch')
        print("Train loss - {:4f}".format(total_loss))
        print("Validation loss - {:4f}".format(val_loss))

        print("Train accurancy - {:4f}".format(train_accuracies[-1]))
        print("Validation accurancy - {:4f}".format(val_accuracies[-1]))
    return train_losses, val_losses, train_accuracies, val_accuracies

def test_model(model, test_dataset, device):
    model.eval()

    predicted_clasess = np.empty(0, dtype=int)
    expected_clasess = np.empty(0, dtype=int)

    for i, batch in enumerate(test_dataset):
        inputs = batch[0].to(device, dtype=torch.float)

        signal_tensor = inputs.unsqueeze(1)

        predictions = model(signal_tensor)
        classes = predictions.topk(k=1)[1].view(-1).cpu().numpy()

        predicted_clasess = np.concatenate((predicted_clasess, classes))
        expected_clasess = np.concatenate((expected_clasess, batch[1].numpy()))

    accurancy = accuracy_score(predicted_clasess, expected_clasess)*100

    # Use accuracy_score function to get the accuracy
    print("")
    print("CNN Model Accuracy Score -> ", accurancy)
    print("")
    print(classification_report(expected_clasess, predicted_clasess))

    print("Confusion matrix")
    print(confusion_matrix(expected_clasess, predicted_clasess))


In [None]:
# Train model

train_dataloader = DataLoader(ECGClassificationDataset(ecg_beats_train), batch_size=48, shuffle = True, drop_last = True, collate_fn = __collate_fn)
val_dataloader = DataLoader(ECGClassificationDataset(validation_ecg_beats), batch_size=16, shuffle = True, drop_last = True, collate_fn = __collate_fn)

model = EcgClassificationModel(1, 700, 9)
model.to(device)

train_losses1, val_losses1, train_accuracies1, val_accuracies1 = train_epoch(model, train_dataloader, val_dataloader, device, epoch_number=14, lr=0.001)
train_losses2, val_losses2, train_accuracies2, val_accuracies2 = train_epoch(model, train_dataloader, val_dataloader, device, epoch_number=6, lr=0.0001)

# add init value to scale plot 
train_losses1.insert(0, 0.73)
val_losses1.insert(0, 0.66)

train_losses = train_losses1 + train_losses2
val_losses = val_losses1 + val_losses2

# add init value to scale plot
train_accuracies1.insert(0, 0.77)
val_accuracies1.insert(0, 0.72)

train_accuracies = train_accuracies1 + train_accuracies2
val_accuracies = val_accuracies1 + val_accuracies2

# Plot the training and validation losses
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(val_accuracies, label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
# Test model
ecg_beats_test =classificationHelper.create_pathology_classification_dataset_by_ranges(
        load_from_gz_file(test_array_path),
        os.path.join(TRAINED_MODEL_PATH, "ECGAutoencoder-2-mit-china-qt-prefered-leads-fs-400-60epoch-CT1.pt"),
        350,
        350)

print("Executed for TRAIN data")
test_model(model, train_dataloader, device)
print("Executed for TEST data")
test_dataloader = DataLoader(ECGClassificationDataset(ecg_beats_test), batch_size=16, shuffle = True, drop_last = True, collate_fn = __collate_fn)
test_model(model, test_dataloader, device)

In [None]:
full_model_path = os.path.join(TRAINED_MODEL_PATH, f"EcgClassificationModel-{SIGNAL_LENGTH}-{EXPERIMENT}-fs-400.pt")
torch.save(model.state_dict(), full_model_path)

print(f'{full_model_path} saved')

In [None]:

model = EcgClassificationModel(1, 700, 9)
full_model_path = os.path.join(TRAINED_MODEL_PATH, f"EcgClassificationModel-{SIGNAL_LENGTH}-{EXPERIMENT}-fs-400.pt")
model.load_state_dict(torch.load(full_model_path))
model.to(device)


In [None]:
# ROC curve
model.eval()

from sklearn.preprocessing import label_binarize

from sklearn.metrics import roc_curve, auc
import numpy as np
import matplotlib.pyplot as plt

def plot_roc_curve(y_true, y_score, num_classes, class_names=None):
    fpr = dict()
    tpr = dict()
    roc_auc = dict()

    for i in range(num_classes):
        y_true_binary = label_binarize(y_true, classes=[i]).ravel()
        fpr[i], tpr[i], _ = roc_curve(y_true_binary, y_score[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])


    plt.figure(figsize=(8, 6))
    for i in range(num_classes):
        if class_names == None:
            class_label = f'Class {i+1}'
        else:
            class_label = f'Class {class_names[i]}'

        plt.plot(fpr[i], tpr[i], label=f'{class_label} (AUC = {roc_auc[i]:.2f})')

    plt.plot([0, 1], [0, 1], 'k--', lw=2)
    plt.xlabel('False Positive Rate', fontsize=14)
    plt.ylabel('True Positive Rate', fontsize=14)
    plt.title('ROC Curve for Multi-Class Classification', fontsize=14)
    plt.legend(loc='lower right', fontsize=12)
    plt.xticks(fontsize=14)
    plt.yticks(fontsize=14)
    plt.show()

def test(model, dataloader, device):
    model.eval()
    all_probs = []
    all_true_class = []

    for i, batch in enumerate(dataloader):
        inputs = batch[0].to(device, dtype=torch.float)
        signal_tensor = inputs.unsqueeze(1)

        predictions = model(signal_tensor)
        y_pred_proba = torch.softmax(predictions, dim=1)
        all_probs.append(y_pred_proba.cpu().detach().numpy())
        all_true_class.append(batch[1].numpy())

    all_probs = np.concatenate(all_probs, axis=0)
    all_true_class = np.concatenate(all_true_class, axis=0)
    return all_probs, all_true_class

print("TEST DATA")
prediction, true_classes = test(model, test_dataloader, device)
plot_roc_curve(true_classes, prediction, 9)


In [None]:
# ROC One-One
def plot_roc_curve_one_to_one(y_true, y_score, negative_class, positive_class):
    plt.figure(figsize=(6, 4))

    y_true_binary = label_binarize(y_true, classes=[negative_class, positive_class]).ravel()
    fpr, tpr, _ = roc_curve(y_true_binary, y_score[:, positive_class])
    roc_auc = auc(fpr, tpr)

    plt.plot(fpr, tpr, label=f'{negative_class} to {positive_class} class (AUC = {roc_auc:.2f})')

    plt.plot([0, 1], [0, 1], 'k--', lw=2)
    plt.xlabel('False Positive Rate', fontsize=12)
    plt.ylabel('True Positive Rate', fontsize=12)
    plt.title('ROC Curve for Classification (One-to-One)', fontsize=12)
    plt.legend(loc='lower right', fontsize=12)
    plt.xticks(fontsize=12)
    plt.yticks(fontsize=12)

    plt.show()

filtered_arr = np.array([obj for obj in ecg_beats_test if obj["class_number"] == 0 or obj["class_number"] == 8])

print(len(filtered_arr))

test_one_one_dataloader = DataLoader(ECGClassificationDataset(filtered_arr), batch_size=16, shuffle = True, drop_last = True, collate_fn = __collate_fn)
prediction, true_classes = test(model,test_one_one_dataloader, device)
plot_roc_curve_one_to_one(true_classes, prediction, 0, 8)
plot_roc_curve_one_to_one(true_classes, prediction, 8, 0)
