In [3]:
import os
import wfdb
import numpy as np
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import MinMaxScaler

def load_mitbih_dataset(path):
    # Define a list of annotation symbols that represent different types of heartbeats
    annots_list = ['N', 'L', 'R', 'e', 'j', 'S', 'A', 'a', 'J', 'V', 'E', 'F', '/', 'f', 'Q']

    # Prepare empty lists to store signal segments and corresponding labels
    X = []
    y = []

    # Create a dictionary to map each annotation symbol to a unique integer
    annot_to_int = {annot: i for i, annot in enumerate(annots_list)}

    # Read the list of ECG record filenames from the 'RECORDS' file in the dataset directory
    record_list = []
    with open(os.path.join(path, 'RECORDS'), 'r') as file:
        record_list = [line.strip() for line in file]

    # Create an instance of the MinMaxScaler to scale each segment to the range [0, 1]
    scaler = MinMaxScaler(feature_range=(0, 1))

    # Iterate over each record file in the dataset
    for record_name in record_list:
        # Load the ECG record and its annotations using the wfdb library
        record = wfdb.rdrecord(os.path.join(path, record_name))
        annotation = wfdb.rdann(os.path.join(path, record_name), 'atr')

        # Extract the first channel of the ECG signal (assuming it's a single-channel ECG)
        signal = record.p_signal[:, 0]
        # Get the symbols and sample locations for each annotated beat
        beat_annotations = annotation.symbol
        beat_locations = annotation.sample

        # Process each annotated beat in the current record
        for sym, loc in zip(beat_annotations, beat_locations):
            # Check if the annotation symbol is one of the types we're interested in
            if sym in annots_list:
                # Convert the annotation symbol to its corresponding integer label
                label = annot_to_int[sym]
                
                # Define the window size around the beat location to extract the segment
                win_size = 625  # This results in segments of 1250 samples centered on the beat
                # Ensure that the window does not extend beyond the signal boundaries
                if loc - win_size >= 0 and loc + win_size <= len(signal):
                    # Extract the segment of the signal centered around the beat
                    segment = signal[loc - win_size: loc + win_size]
                    # Normalize the segment to the range [0, 1]
                    segment = scaler.fit_transform(segment.reshape(-1, 1)).flatten()
                    # Add the normalized segment to the list of segments
                    X.append(segment)
                    # Add the label to the list of labels
                    y.append(label)

    # Convert the lists of segments and labels to numpy arrays for use in machine learning models
    X = np.array(X)
    # Convert the integer labels to one-hot encoded format
    y = to_categorical(y, num_classes=len(annots_list))

    return X, y


In [4]:
#Reshape Function
import numpy as np
from sklearn.preprocessing import MinMaxScaler

def reshape_ecg_to_2d(X, target_shape=(32, 32)):
    """
    Reshape 1D ECG data into 2D format with three channels to fit into a VGG16 model.

    Args:
    X (array): 1D numpy array of ECG data.
    target_shape (tuple): The target dimensions to which each ECG segment will be reshaped.

    Returns:
    numpy.array: A 4D array where each ECG segment is reshaped into 2D and replicated across three channels.
    """
    # Initialize the scaler to normalize data
    scaler = MinMaxScaler(feature_range=(0, 1))

    # Prepare a list to hold the reshaped segments
    X_reshaped = []
    
    # Process each segment in the dataset
    for segment in X:
        # Normalize the segment
        segment_normalized = scaler.fit_transform(segment.reshape(-1, 1)).flatten()
        
        # Calculate the padding required to reach the target size, if necessary
        padding = target_shape[0] * target_shape[1] - len(segment_normalized)
        
        if padding > 0:
            # If the segment is too short, pad it with zeros at the end
            segment_normalized = np.pad(segment_normalized, (0, padding), 'constant')
        elif padding < 0:
            # If the segment is too long, trim the excess
            segment_normalized = segment_normalized[:padding]
        
        # Reshape the normalized segment into 2D format
        segment_2d = segment_normalized.reshape(target_shape)
        
        # Stack the 2D image across three channels since VGG16 expects three-channel input images
        segment_3ch = np.stack([segment_2d] * 3, axis=-1)
        
        # Append the reshaped segment to the list
        X_reshaped.append(segment_3ch)

    return np.array(X_reshaped)

# Load your data
X, y = load_mitbih_dataset("/home/researchgroup/mahjabeen_workspace/research/CLINet-ECG-Classification-2024/data/mit-bih/mitbih_database/")

# Reshape your 1D ECG data into 2D
X_2d = reshape_ecg_to_2d(X)


In [9]:
#Cross-Validation Function
from tensorflow import keras
import os
import numpy as np
from sklearn.model_selection import KFold
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.metrics import Precision, Recall
import time
import json
import tensorflow as tf

def create_vgg16_model(num_classes, input_shape=(25, 50, 1), optimizer_type='adam', learning_rate=0.001):
    inputs = Input(shape=input_shape)
    x = Conv2D(32, (3, 3), padding='same', activation='relu')(inputs)
    x = Conv2D(32, (3, 3), padding='same', activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Conv2D(64, (3, 3), padding='same', activation='relu')(x)
    x = Conv2D(64, (3, 3), padding='same', activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Conv2D(128, (3, 3), padding='same', activation='relu')(x)
    x = Conv2D(128, (3, 3), padding='same', activation='relu')(x)
    x = Conv2D(128, (3, 3), padding='same', activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Flatten()(x)
    x = Dense(512, activation='relu')(x)
    x = Dropout(0.5)(x)
    outputs = Dense(num_classes, activation='softmax')(x)

    if optimizer_type == 'adam':
        optimizer = Adam(learning_rate=learning_rate)
    else:
        optimizer = SGD(learning_rate=learning_rate, momentum=0.9)

    model = Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy', Precision(), Recall()])
    return model

def cross_validate_model(X, y, k=10, epochs=10, batch_size=4):
    kfold = KFold(n_splits=k, shuffle=True, random_state=42)
    fold_no = 1
    all_results = []
    total_start_time = time.time()

    for train, test in kfold.split(X, y):
        print(f'Training fold {fold_no}...')
        fold_start_time = time.time()

        X_train, X_test = X[train], X[test]
        y_train, y_test = y[train], y[test]

        model = create_vgg16_model(num_classes=y_train.shape[1], input_shape=(X_train.shape[1], X_train.shape[2], X_train.shape[3]))

        model_checkpoint_path = f'./model_fold_{fold_no}.h5'

        callbacks = [
            keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, verbose=1),
            keras.callbacks.ModelCheckpoint(model_checkpoint_path, save_best_only=True, verbose=1),
            keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, verbose=1)
        ]

        history = model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_data=(X_test, y_test), callbacks=callbacks)

        fold_end_time = time.time()
        fold_elapsed_time = fold_end_time - fold_start_time
        print(f'Fold {fold_no} completed in {fold_elapsed_time:.2f} seconds.')
        results = model.evaluate(X_test, y_test, verbose=1)
        all_results.append(results)

        with open(f'cross_validation_results_fold_{fold_no}.json', 'w') as f:
            json.dump(results, f)

        fold_no += 1

        # Clear memory
        tf.keras.backend.clear_session()
        tf.compat.v1.reset_default_graph()

    total_end_time = time.time()
    total_elapsed_time = total_end_time - total_start_time
    print(f'Total cross-validation time: {total_elapsed_time:.2f} seconds.')

    return all_results, total_elapsed_time

# Perform cross-validation
results, total_time = cross_validate_model(X_2d, y, k=10, epochs=10, batch_size=16)

# Save cross-validation results
with open('cross_validation_results.json', 'w') as f:
    json.dump(results, f)

print("Cross-validation completed. Results saved.")


Training fold 1...
Epoch 1/10
Epoch 1: val_loss improved from inf to 0.08759, saving model to ./model_fold_1.h5
Epoch 2/10
Epoch 2: val_loss improved from 0.08759 to 0.06754, saving model to ./model_fold_1.h5
Epoch 3/10
Epoch 3: val_loss improved from 0.06754 to 0.06164, saving model to ./model_fold_1.h5
Epoch 4/10
Epoch 4: val_loss did not improve from 0.06164
Epoch 5/10
Epoch 5: val_loss did not improve from 0.06164
Epoch 6/10
Epoch 6: val_loss did not improve from 0.06164
Epoch 7/10
Epoch 7: val_loss did not improve from 0.06164
Epoch 8/10
Epoch 8: val_loss did not improve from 0.06164

Epoch 8: ReduceLROnPlateau reducing learning rate to 0.00010000000474974513.
Epoch 9/10
Epoch 9: val_loss improved from 0.06164 to 0.04620, saving model to ./model_fold_1.h5
Epoch 10/10
Epoch 10: val_loss did not improve from 0.04620
Fold 1 completed in 573.04 seconds.
Training fold 2...
Epoch 1/10
Epoch 1: val_loss improved from inf to 0.09591, saving model to ./model_fold_2.h5
Epoch 2/10
Epoch 2: v

In [None]:
#evaluation function
import numpy as np
from tensorflow import keras
from sklearn.metrics import classification_report, confusion_matrix, precision_recall_fscore_support
import matplotlib.pyplot as plt
import seaborn as sns
import json
import tensorflow as tf

class_labels = ['N', 'L', 'R', 'e', 'j', 'S', 'A', 'a', 'J', 'V', 'E', 'F', '/', 'f', 'Q']

def evaluate_model_on_folds(X, y, k=10):
    kfold = KFold(n_splits=k, shuffle=True, random_state=42)
    fold_no = 1
    all_confusion_matrices = []
    all_metrics = []

    for train, test in kfold.split(X, y):
        model_path = f'./model_fold_{fold_no}.h5'
        if not os.path.exists(model_path):
            print(f'Model file for fold {fold_no} not found!')
            continue
        
        model = keras.models.load_model(model_path)
        print(f'Model for fold {fold_no} loaded successfully.')

        X_test = X[test]
        y_test = y[test]

        start_time = time.time()
        results = model.evaluate(X_test, y_test, verbose=1)
        end_time = time.time()
        elapsed_time = end_time - start_time
        print(f'Test Loss, Test Accuracy, and other metrics for fold {fold_no}: {results}')
        print(f'Evaluation time for fold {fold_no}: {elapsed_time:.2f} seconds')

        y_pred = model.predict(X_test)
        y_pred_classes = np.argmax(y_pred, axis=1)
        y_true_classes = np.argmax(y_test, axis=1)

        unique_classes = np.unique(y_true_classes)

        report_dict = classification_report(y_true_classes, y_pred_classes, labels=unique_classes, target_names=[class_labels[i] for i in unique_classes], output_dict=True)
        print(f"\nClassification Report for fold {fold_no}:")
        print(classification_report(y_true_classes, y_pred_classes, labels=unique_classes, target_names=[class_labels[i] for i in unique_classes]))

        cm = confusion_matrix(y_true_classes, y_pred_classes, labels=unique_classes)
        all_confusion_matrices.append(cm)

        metrics = precision_recall_fscore_support(y_true_classes, y_pred_classes, labels=unique_classes)
        all_metrics.append(metrics)

        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=[class_labels[i] for i in unique_classes], yticklabels=[class_labels[i] for i in unique_classes])
        plt.title(f'Confusion Matrix for Fold {fold_no}')
        plt.xlabel('Predicted Labels')
        plt.ylabel('True Labels')
        plt.show()

        fold_no += 1

        # Clear memory
        tf.keras.backend.clear_session()
        tf.compat.v1.reset_default_graph()

    return all_confusion_matrices, all_metrics

def calculate_average_metrics(all_metrics):
    avg_precision = []
    avg_recall = []
    avg_fscore = []
    avg_support = []

    for metrics in all_metrics:
        precision, recall, fscore, support = metrics
        avg_precision.append(np.mean(precision))
        avg_recall.append(np.mean(recall))
        avg_fscore.append(np.mean(fscore))
        avg_support.append(np.mean(support))

    return {
        "avg_precision": np.mean(avg_precision),
        "avg_recall": np.mean(avg_recall),
        "avg_fscore": np.mean(avg_fscore),
        "avg_support": np.mean(avg_support)
    }

# Evaluate model on each fold
all_confusion_matrices, all_metrics = evaluate_model_on_folds(X_2d, y, k=10)

# Calculate and print average metrics
average_metrics = calculate_average_metrics(all_metrics)

print("\nAverage metrics across all folds:")
print(f"Precision: {average_metrics['avg_precision']}")
print(f"Recall: {average_metrics['avg_recall']}")
print(f"F1-Score: {average_metrics['avg_fscore']}")
print(f"Support: {average_metrics['avg_support']}")

# Visualize all confusion matrices in one figure
fig, axes = plt.subplots(5, 2, figsize=(20, 25))
axes = axes.ravel()

for i in range(10):
    sns.heatmap(all_confusion_matrices[i], annot=True, fmt='d', cmap='Blues', xticklabels=class_labels, yticklabels=class_labels, ax=axes[i])
    axes[i].set_title(f'Confusion Matrix for Fold {i+1}')
    axes[i].set_xlabel('Predicted Labels')
    axes[i].set_ylabel('True Labels')

plt.tight_layout()
plt.show()
