In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install xlsxwriter tqdm

from __future__ import print_function
import keras
from keras.layers import Dense, Conv2D, BatchNormalization, Activation
from keras.layers import AveragePooling2D, Input, Flatten
from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint, LearningRateScheduler, ReduceLROnPlateau
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras.regularizers import l2
from keras import backend as K
from keras.models import Model
from keras.utils import to_categorical
import numpy as np
import os
import time
import xlsxwriter
import cv2
from random import shuffle
from tqdm import tqdm


In [None]:
# ----------------------- CONFIG ------------------------
batch_size = 16
epochs = 25
data_augmentation = True
num_classes = 4
IMG_SIZE = (224, 224)
input_shape = (224, 224, 3)
subtract_pixel_mean = True
version = 2
n = 3

if version == 1:
    depth = n * 6 + 2
elif version == 2:
    depth = n * 9 + 2
model_type = 'ResNet%dv%d' % (depth, version)

In [None]:
# ⚠️ Update these with your own dataset paths:
Train_DIR = "/content/drive/MyDrive/new_split/train"
Test_DIR = "/content/drive/MyDrive/new_split/test"
result_excel_link = "/content/drive/MyDrive/ECG_CLASSIFICATION_RESNET/RESNET_11/RESNET11_RESULTS/ResNet_epoch25.xlsx"

In [None]:
# Classes based on your dataset
classes = ['Normal Heartbeat', 'Abnormal Heartbeat', 'Myocardial Infarction (MI)', 'History of MI']
class_map = {label: i for i, label in enumerate(classes)}

In [None]:
start1= time.time()

In [None]:
# Function to load data from folder structure with loading feedback
def load_data_from_directory(base_dir, classes, class_map, IMG_SIZE):
    data = []
    total_images = sum(len(os.listdir(os.path.join(base_dir, label))) for label in classes)

    print(f"Loading {total_images} images from '{base_dir}'...")

    # Use tqdm to create a progress bar
    with tqdm(total=total_images, unit="image") as pbar:
        for label in classes:
            path = os.path.join(base_dir, label)
            for img_name in os.listdir(path):
                try:
                    img_path = os.path.join(path, img_name)
                    img = cv2.imread(img_path, cv2.IMREAD_COLOR)  # Load as RGB

                    # Check if image is successfully loaded
                    if img is None:
                        raise ValueError(f"Image {img_name} could not be read.")

                    # Resize to (width, height) = (250, 200) to match model input (200, 250, 3)
                    img = cv2.resize(img, (IMG_SIZE[1], IMG_SIZE[0]), interpolation=cv2.INTER_AREA)

                    # Normalize pixel values to [0, 1]
                    img = img.astype('float32') / 255.0

                    # Append image, label, and name to data
                    data.append([np.array(img), class_map[label], img_name])

                    # Update progress bar
                    pbar.update(1)

                except Exception as e:
                    print(f"Error reading {img_name}: {e}")
                    pbar.update(1)  # Still update progress even if there's an error

    shuffle(data)
    print("Loading complete!")
    return data

In [None]:
train_data = load_data_from_directory(Train_DIR, classes, class_map, IMG_SIZE)
test_data = load_data_from_directory(Test_DIR, classes, class_map, IMG_SIZE)

trainImages = np.array([i[0] for i in train_data])
trainLabels = to_categorical([i[1] for i in train_data], num_classes)

testImages = np.array([i[0] for i in test_data])
testLabels = to_categorical([i[1] for i in test_data], num_classes)

In [None]:
# ------------------- MODEL BUILDING ---------------------
def lr_schedule(epoch):
    lr = 1e-3
    if epoch > 20:
        lr *= 0.5e-3
    elif epoch > 15:
        lr *= 1e-3
    elif epoch > 10:
        lr *= 1e-2
    elif epoch > 5:
        lr *= 1e-1
    print('Learning rate:', lr)
    return lr

def resnet_layer(inputs, num_filters=16, kernel_size=3, strides=1,
                 activation='relu', batch_normalization=True, conv_first=True):
    conv = Conv2D(num_filters, kernel_size=kernel_size, strides=strides, padding='same',
                  kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))
    x = inputs
    if conv_first:
        x = conv(x)
        if batch_normalization:
            x = BatchNormalization()(x)
        if activation:
            x = Activation(activation)(x)
    else:
        if batch_normalization:
            x = BatchNormalization()(x)
        if activation:
            x = Activation(activation)(x)
        x = conv(x)
    return x

def resnet_v2(input_shape, depth, num_classes):
    if (depth - 2) % 9 != 0:
        raise ValueError('depth should be 9n+2')
    num_filters_in = 16
    num_res_blocks = int((depth - 2) / 9)
    inputs = Input(shape=input_shape)
    x = resnet_layer(inputs, num_filters=num_filters_in)
    for stage in range(3):
        for res_block in range(num_res_blocks):
            strides = 1
            num_filters_out = num_filters_in * (4 if stage == 0 else 2)
            if res_block == 0 and stage != 0:
                strides = 2
            y = resnet_layer(x, num_filters_in, strides=strides)
            y = resnet_layer(y, num_filters_out, activation=None)
            if res_block == 0:
                x = resnet_layer(x, num_filters_out, kernel_size=1, strides=strides, activation=None, batch_normalization=False)
            x = keras.layers.add([x, y])
            x = Activation('relu')(x)
        num_filters_in = num_filters_out
    x = AveragePooling2D(pool_size=8)(x)
    y = Flatten()(x)
    outputs = Dense(num_classes, activation='softmax', kernel_initializer='he_normal')(y)
    return Model(inputs, outputs)

model = resnet_v2(input_shape=input_shape, depth=depth, num_classes=num_classes)
model.compile(loss='categorical_crossentropy',
              optimizer=Adam(learning_rate=lr_schedule(0)),
              metrics=['accuracy'])
model.summary()

In [None]:
# --------------------- TRAIN ----------------------
start_time = time.time()
callbacks = [
    ModelCheckpoint(filepath=f'{model_type}_best.h5', monitor='val_accuracy', save_best_only=True),
    ReduceLROnPlateau(factor=np.sqrt(0.1), patience=5, min_lr=0.5e-6),
    LearningRateScheduler(lr_schedule)
]

if not data_augmentation:
    history = model.fit(trainImages, trainLabels,
                        batch_size=batch_size, epochs=epochs,
                        validation_data=(testImages, testLabels),
                        shuffle=True, callbacks=callbacks)
else:
    datagen = ImageDataGenerator(width_shift_range=0.1, height_shift_range=0.1, horizontal_flip=True)
    datagen.fit(trainImages)
    history = model.fit(datagen.flow(trainImages, trainLabels, batch_size=batch_size),
                        validation_data=(testImages, testLabels),
                        epochs=epochs, verbose=1, callbacks=callbacks)

In [None]:
# --------------------- EVALUATE ----------------------
scores = model.evaluate(testImages, testLabels, verbose=1)
print('Test loss:', scores[0])
print('Test accuracy:', scores[1])

In [None]:
# Block 9: Evaluate the Model and Generate Metrics
from sklearn.metrics import confusion_matrix, classification_report
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from keras.models import load_model

# Load the best saved model
try:
    best_model = load_model(f'{model_type}_best.h5')
    print(f"Loaded best model from {model_type}_best.h5")
except Exception as e:
    print(f"Error loading best model: {e}. Using the current model instead.")
    best_model = model  # Fallback to the trained model if checkpoint loading fails

# Evaluate the model on test data
scores = best_model.evaluate(testImages, testLabels, verbose=1)
print('Test loss:', scores[0])
print('Test accuracy:', scores[1])

# Predict on test data
preds = best_model.predict(testImages)

# Convert predictions to class labels
pred_labels = np.argmax(preds, axis=1)

# Convert one-hot encoded testLabels to class indices
true_labels = np.argmax(testLabels, axis=1)

# Generate classification report
class_names = classes  # Use the defined classes ['N', 'L', 'R', 'A', 'V']
report = classification_report(true_labels, pred_labels, target_names=class_names)
print("Classification Report:\n")
print(report)

# Generate confusion matrix
cm = confusion_matrix(true_labels, pred_labels)

# Plot confusion matrix
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()

# Save results to Excel
try:
    workbook = xlsxwriter.Workbook(result_excel_link)
    worksheet = workbook.add_worksheet()
    # Write headers
    worksheet.write(0, 0, "Filename")
    worksheet.write(0, 1, "Original Label")
    worksheet.write(0, 2, "Predicted Label")
    worksheet.write(0, 3, "Match (1/0)")
    worksheet.write(0, 4, "Total Execution Time")

    # Write data
    test_img_num = len(testImages)
    for i, (img_arr, true_label_numeric, filename) in enumerate(test_data[:test_img_num]):
        pred_label_numeric = np.argmax(preds[i])
        true_label = classes[true_label_numeric]
        pred_label = classes[pred_label_numeric]
        match = 1 if true_label == pred_label else 0
        worksheet.write(i +1, 0, filename)
        worksheet.write(i + 1, 1, true_label)
        worksheet.write(i + 1, 2, pred_label)
        worksheet.write(i + 1, 3, match)

    # Write total execution time
    end1 = time.time()
    time_taken = end1 - start1
    worksheet.write(1, 4, time_taken)

    # Close the workbook
    workbook.close()
    print(f"Results successfully written to {result_excel_link}")
except Exception as e:
    print(f"An error occurred while writing to Excel: {e}")

# Plot training results
plt.figure(figsize=(10, 10))
plt.subplot(2, 2, 1)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Val Loss')
plt.legend()
plt.title('Loss')

plt.subplot(2, 2, 2)
plt.plot(history.history['accuracy'], label='Train Acc')
plt.plot(history.history['val_accuracy'], label='Val Acc')
plt.legend()
plt.title('Accuracy')

plt.show()

In [None]:
import matplotlib.pyplot as plt

# Assume 'history' is the object returned by model.fit()
# Example:
# history = model.fit(X_train, y_train, epochs=10, validation_data=(X_val, y_val))

# Plot Training & Validation Accuracy
plt.figure(figsize=(10, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

# Plot Training & Validation Loss
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()


In [None]:
# --------------------- RESULTS TO EXCEL ----------------------
preds = model.predict(testImages)
predicted_labels_numeric = np.argmax(preds, axis=1)
true_labels_numeric = np.argmax(testLabels, axis=1)
predicted_labels = [classes[i] for i in predicted_labels_numeric]
true_labels = [classes[i] for i in true_labels_numeric]

workbook = xlsxwriter.Workbook(result_excel_link)
worksheet = workbook.add_worksheet()
worksheet.write(0, 0, 'True Label')
worksheet.write(0, 1, 'Predicted Label')
worksheet.write(0, 2, 'Match (1/0)')
worksheet.write(0, 3, 'Total Execution Time')

for i in range(len(testImages)):
    match = 1 if predicted_labels[i] == true_labels[i] else 0
    worksheet.write(i + 1, 0, true_labels[i])
    worksheet.write(i + 1, 1, predicted_labels[i])
    worksheet.write(i + 1, 2, match)

worksheet.write(1, 3, time.time() - start_time)
workbook.close()

print(f"✅ Results saved to {result_excel_link}")

In [None]:
model.save('final_resnet20_ecg_epoch75_model.h5')  # HDF5 format
