In [None]:
import os
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from functools import partial

import tensorflow as tf
import optuna

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn.metrics import roc_curve, auc, precision_recall_curve, average_precision_score

import util_func

In [None]:
# os.environ["CUDA_VISIBLE_DEVICES"]="1"

In [None]:
data = util_func.open_pickle('data/ludb_processed/ludb.pickle')

features = data['features']
y = np.array(data['labels'])

#### Splitting to train, validation, and test set

In [None]:
features_train, features_temp, y_train, y_temp = train_test_split(features, y, test_size=.2, shuffle=False, random_state=2023)
features_test, features_val, y_test, y_val = train_test_split(features_temp, y_temp, test_size=.5, shuffle=False, random_state=2023)

In [None]:
X_train = []
zpad_length_train = []

for row in features_train:
    X_train.append(row[0])
    zpad_length_train.append(row[1])

X_val = []
zpad_length_val = []

for row in features_val:
    X_val.append(row[0])
    zpad_length_val.append(row[1])

X_test = []
zpad_length_test = []

for row in features_test:
    X_test.append(row[0])
    zpad_length_test.append(row[1])


X_train = np.array(X_train)
X_val = np.array(X_val)
X_test = np.array(X_test)
# Reshape X to be the same format as y
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_val = X_val.reshape(X_val.shape[0], X_val.shape[1], 1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)

In [None]:
X_train.shape

In [None]:
len(X_train), len(X_val), len(X_test)

In [None]:
CLASSES = [
    'Zero padding',
    'Pon-Poff',
    'Poff-QRSon',
    'QRSon-Rpeak',
    'Rpeak-QRSoff',
    'QRSoff-Ton',
    'Ton-Toff',
    'Toff-Pon2'
]
COLORS = { # Zero padding given no color
    1: 'red',
    2: 'darkorange',
    3: 'yellow',
    4: 'green',
    5: 'blue',
    6: 'darkcyan',
    7: 'purple'
}

In [None]:
def generate_model(input_shape, output, lr=1e-5, n_layer=1):
    model = tf.keras.models.Sequential()
    opt = tf.keras.optimizers.Adam(learning_rate=lr)

    # Conv layers
    model.add(tf.keras.layers.Conv1D(8, 3, input_shape=input_shape, padding='same', activation='relu'))
    model.add(tf.keras.layers.Conv1D(16, 3, padding='same', activation='relu'))
    model.add(tf.keras.layers.Conv1D(32, 3, padding='same', activation='relu'))
    model.add(tf.keras.layers.Conv1D(64, 3, padding='same', activation='relu'))

    # Bidirectional LSTM
    model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(512, input_shape=input_shape, return_sequences=True)))

    for i in range(n_layer-1):
        model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(512, return_sequences=True)))

    # Fully connected layers
    model.add(tf.keras.layers.Dense(output, activation='softmax'))
    model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])

    return model

In [None]:
# Plotting training and validation accuracy
def plot_acc_loss(model_h, model_name):
    model_res_path = f'result/{model_name}'
    util_func.make_dir(model_res_path)

    train_acc = model_h.history['accuracy']
    train_loss = model_h.history['loss']

    val_acc = model_h.history['val_accuracy']
    val_loss = model_h.history['val_loss']

    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(train_acc, label='Train Accuracy')
    plt.plot(val_acc, label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.legend()

    # Plotting training and validation loss
    plt.subplot(1, 2, 2)
    plt.plot(train_loss, label='Train Loss')
    plt.plot(val_loss, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()

    plt.tight_layout()
    plt.savefig(f'{model_res_path}/Train_val_acc_loss.png', bbox_inches='tight')
    plt.show()

In [None]:
# classes is a list that contains class names in string
def calc_metrics(y_true, y_pred, model_name):
    model_res_path = f'result/{model_name}'
    util_func.make_dir(model_res_path)

    y_true = y_true.reshape(y_true.shape[0] * y_true.shape[1], y_true.shape[2])
    y_pred = y_pred.reshape(y_pred.shape[0] * y_pred.shape[1], y_pred.shape[2])

    y_true = np.argmax(y_true, axis=1)
    y_pred = np.argmax(y_pred, axis=1)

    cm = confusion_matrix(y_true, y_pred)

    metrics = {
        'Recall': [],
        'Precision': [],
        'Specificity': [],
        'F1-score': [],
        'Accuracy': [],
        'Error rate': []
    }
    
    sum_tp = 0
    sum_fn = 0
    sum_fp = 0
    sum_tn = 0
    
    for i in range(len(cm)):
        tp = cm[i, i]
        fn = np.sum(cm[i, :]) - tp
        fp = np.sum(cm[:, i]) - tp
        tn = np.sum(cm) - (tp + fn + fp)

        sum_tp += tp
        sum_fn += fn
        sum_fp += fp
        sum_tn += tn

        recall = tp / (tp + fn)
        precision = tp / (tp + fp)
        specificity = tn / (tn + fp)
        f1 = 2 * (recall * precision) / (recall + precision)
        accuracy = (tp + tn) / (tp + tn + fp + fn)
        error = (fp + fn) / (tp + tn + fp + fn)
        
        metrics['Recall'].append(recall)
        metrics['Precision'].append(precision)
        metrics['Specificity'].append(specificity)
        metrics['F1-score'].append(f1)
        metrics['Accuracy'].append(accuracy)
        metrics['Error rate'].append(error)
    
    # Macro-averaged
    metrics['Recall'].append( np.sum(metrics['Recall']) / len(CLASSES) )
    metrics['Precision'].append( np.sum(metrics['Precision']) / len(CLASSES) )
    metrics['Specificity'].append( np.sum(metrics['Specificity']) / len(CLASSES) )
    metrics['F1-score'].append( np.sum(metrics['F1-score']) / len(CLASSES) )
    metrics['Accuracy'].append( np.sum(metrics['Accuracy']) / len(CLASSES) )
    metrics['Error rate'].append( np.sum(metrics['Error rate']) / len(CLASSES) )

    # Micro-averaged
    micro_recall = sum_tp / (sum_tp + sum_fn)
    micro_precision = sum_tp / (sum_tp + sum_fp)
    metrics['Recall'].append(micro_recall)
    metrics['Precision'].append(micro_precision)
    metrics['Specificity'].append(sum_tn / (sum_tn + sum_fp))
    metrics['F1-score'].append(2 * (micro_recall * micro_precision) / (micro_recall + micro_precision))
    metrics['Accuracy'].append( (sum_tp + sum_tn) / (sum_tp + sum_tn + sum_fp + sum_fn) )
    metrics['Error rate'].append( (sum_fp + sum_fn) / (sum_tp + sum_tn + sum_fp + sum_fn) )

    metrics_indexes = CLASSES + ['Macro-averaged', 'Micro-averaged']

    metrics_dataframe = pd.DataFrame(metrics, index=metrics_indexes)
    metrics_dataframe.to_csv(f'{model_res_path}/metrics.csv')

    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=CLASSES, yticklabels=CLASSES)
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix Heatmap')
    plt.savefig(f'{model_res_path}/CM.png', bbox_inches='tight')


In [None]:
def roc_pr(y_true, y_pred, model_name):
    model_res_path = f'result/{model_name}'
    util_func.make_dir(model_res_path)

    y_true = y_true.reshape(y_true.shape[0] * y_true.shape[1], y_true.shape[2])
    y_pred = y_pred.reshape(y_pred.shape[0] * y_pred.shape[1], y_pred.shape[2])
    
    fpr, tpr, roc_auc = {}, {}, {}
    precision, recall, average_precision = {}, {}, {}

    # exclude zero padding on first index
    for i in range(1, 8):
        yt_i = y_true[:, i]
        yp_i = y_pred[:, i]
        fpr[i], tpr[i], _ = roc_curve(yt_i, yp_i)
        roc_auc[i] = auc(fpr[i], tpr[i])

        precision[i], recall[i], _ = precision_recall_curve(yt_i, yp_i)
        average_precision[i] = average_precision_score(yt_i, yp_i)

    fig, ax = plt.subplots(figsize=(10, 6), dpi=150)

    for i in range(1, 8):
        plt.plot(
            fpr[i],
            tpr[i],
            color=COLORS[i],
            lw=2,
            label=f'Class {CLASSES[i]} (AUC = {roc_auc[i]:.2f})'
        )

    ax.plot([0, 1], [0, 1], color='gray', lw=2, linestyle='--')
    ax.set_xlim([0.0, 1.0])
    ax.set_ylim([0.0, 1.05])
    ax.set_xlabel('False Positive Rate')
    ax.set_ylabel('True Positive Rate')
    ax.set_title('ROC Curve')
    ax.legend(loc='lower right')
    fig.savefig(f'{model_res_path}/ROC_curve.jpg', bbox_inches='tight')

    fig, ax = plt.subplots(figsize=(10, 6), dpi=150)
    for i in range(1, 8):
        plt.plot(
            recall[i],
            precision[i],
            color=COLORS[i],
            lw=2,
            label=f'Class {CLASSES[i]} (AP = {average_precision[i]:.2f})',
        )

    ax.set_xlim([0.0, 1.0])
    ax.set_ylim([0.0, 1.05])
    ax.set_xlabel('Recall')
    ax.set_ylabel('Precision')
    ax.set_title('Precision-Recall Curve')
    ax.legend(loc='lower left')
    fig.savefig(f'{model_res_path}/PR_curve.jpg', bbox_inches='tight')
    

In [None]:
# model = generate_model((X_train.shape[1], 1), 8)
# model.summary()

In [None]:
# history = model.fit(X_train, y_train, epochs=100, batch_size=8, validation_data=(X_val, y_val))

In [None]:
# y_pred = model.predict(X_test)

In [None]:
# CLASSES = [
#     'Zero padding',
#     'Pon-Poff',
#     'Poff-QRSon',
#     'QRSon-Rpeak',
#     'Rpeak-QRSoff',
#     'QRSoff-Ton',
#     'Ton-Toff',
#     'Toff-Pon2'
# ]

# plot_acc_loss(history)
# calc_metrics(y_test, y_pred)
# roc_pr(y_test, y_pred)

In [None]:
# X_temp = X_test.reshape(X_test.shape[0], X_test.shape[1] * X_test.shape[2])
# y_pred_temp = np.argmax(y_pred, axis=2)

In [None]:
# i = 42
# signal_length = len(X_temp[i]) - zpad_length_test[i]
# ECGSignal.plot_signal_segments(X_temp[i, 0:signal_length], y_pred_temp[i, 0:signal_length])

#### Hyperparameter tuning

Using CMA-ES (Covariance Matrix Adaptation - Evolution Strategy), implementation provided by optuna.<br>
<br>
CMA-ES objective will be to minimize validation loss.

In [None]:
BATCH_SIZES = [8, 16, 32, 64]

def objective(trial, X_train, X_val, X_test, y_train, y_val, y_test):
    start_time = time.time()

    lr = np.round(trial.suggest_float('lr', 1e-5, 1e-1, log=True), decimals=5)
    n_layer = trial.suggest_int('n_layer', 1, 5)
    batch_size_index = trial.suggest_int('batch_size_index', 0, len(BATCH_SIZES)-1)
    bs = BATCH_SIZES[batch_size_index]

    model_name = f'ConvBiLSTM-LR_{lr}-Nlayer_{n_layer}-BS_{bs}'
    model = generate_model((X_train.shape[1], 1), 8, lr=lr, n_layer=n_layer)
    history = model.fit(X_train, y_train, epochs=200, batch_size=bs, validation_data=(X_val, y_val))
    y_pred = model.predict(X_test)

    plot_acc_loss(history, model_name)
    calc_metrics(y_test, y_pred, model_name)
    roc_pr(y_test, y_pred, model_name)

    val_loss = history.history['val_loss']

    end_time = time.time()
    time_elapsed = end_time - start_time

    with open(f'result/{model_name}/Model_info.txt', 'w') as info_file:
        info_file.write(f'{model_name}\n')
        info_file.write(f'Learning Rate: {lr} | n_layer: {n_layer} | Batch Size: {bs}\n')
        info_file.write(f'Elapsed Time: {time_elapsed:.2f} seconds\n')

    return np.min(val_loss)

In [None]:
objective_wrapper = partial(
    objective,
    X_train=X_train,X_val=X_val, X_test=X_test,
    y_train=y_train, y_val=y_val, y_test=y_test
)
sampler = optuna.samplers.CmaEsSampler()
study = optuna.create_study(sampler=sampler, direction='minimize')
study.optimize(objective_wrapper, n_trials=20)