In [2]:
import os
import numpy as np
import pandas as pd
import nibabel as nib
import cv2
from tqdm import tqdm

import mlflow
import mlflow.keras

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import BinaryAccuracy, Precision, Recall

# === Load & Preprocess Data ===
def parse_info_cfg(info_path):
    ed = es = group = None
    with open(info_path, 'r') as f:
        for line in f:
            if "ED:" in line:
                ed = int(line.split(":")[1].strip())
            elif "ES:" in line:
                es = int(line.split(":")[1].strip())
            elif "Group:" in line:
                group = line.split(":")[1].strip()
                if group == "RV":
                    group = "ARV"
    return ed, es, group

def load_mid_slice(path):
    img = nib.load(path).get_fdata()
    mid = img.shape[2] // 2
    return cv2.resize(img[:, :, mid], (128, 128))

def load_tensor(base_path, pid, ed, es):
    ed_img = load_mid_slice(f"{base_path}/{pid}_frame{ed:02d}_gt.nii")
    es_img = load_mid_slice(f"{base_path}/{pid}_frame{es:02d}_gt.nii")
    tensor = np.stack([ed_img, es_img], axis=-1)
    return tensor / np.max(tensor)

def load_dataset_multihead(base_dir):
    X = []
    y_all = {'NOR': [], 'MINF': [], 'DCM': [], 'HCM': [], 'ARV': []}
    classes = list(y_all.keys())

    for patient in tqdm(sorted(os.listdir(base_dir))):
        if not patient.startswith("patient"):
            continue
        try:
            path = os.path.join(base_dir, patient)
            ed, es, group = parse_info_cfg(os.path.join(path, "Info.cfg"))
            tensor = load_tensor(path, patient, ed, es)
            X.append(tensor)

            for cls in classes:
                y_all[cls].append(1 if cls == group else 0)
        except Exception as e:
            print(f" Skipping {patient}: {e}")

    X = np.array(X)
    y_outs = [np.array(y_all[cls]) for cls in classes]
    return X, y_outs, classes

# === Build CNN Multi-Head Model ===
def build_simple_multihead_model(input_shape=(128, 128, 2),
                                 class_names=['NOR', 'MINF', 'DCM', 'HCM', 'ARV'],
                                 learning_rate=0.0004):
    
    inputs = Input(shape=input_shape)

    x = Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
    x = MaxPooling2D(2)(x)

    x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = MaxPooling2D(2)(x)

    x = Flatten()(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.3)(x)

    outputs = []
    losses = {}
    metrics = {}

    for cls in class_names:
        head_name = f"{cls}_head"
        head_output = Dense(1, activation='sigmoid', name=head_name)(x)
        outputs.append(head_output)

        losses[head_name] = 'binary_crossentropy'
        metrics[head_name] = [
            BinaryAccuracy(name='accuracy'),
            Precision(name='precision'),
            Recall(name='recall')
        ]

    model = Model(inputs=inputs, outputs=outputs)

    model.compile(
        optimizer=Adam(learning_rate=learning_rate),
        loss=losses,
        metrics=metrics
    )

    return model

# === Export Predictions and Compute Accuracy ===
def export_predictions(model, X_test, y_test_list, class_names, output_csv="prediction_results.csv"):
    y_pred_probs = model.predict(X_test)
    y_pred_bin = [(p > 0.5).astype(int).flatten() for p in y_pred_probs]
    y_pred_matrix = np.stack(y_pred_bin, axis=1)
    y_pred_probs_matrix = np.stack([p.flatten() for p in y_pred_probs], axis=1)

    true_class_indices = [
        next(j for j in range(len(class_names)) if y_test_list[j][i] == 1)
        for i in range(len(X_test))
    ]
    true_class_names = [class_names[i] for i in true_class_indices]

    predicted_sets = [
        [class_names[j] for j in range(len(class_names)) if y_pred_matrix[i][j] == 1]
        for i in range(len(X_test))
    ]

    containment_match = [
        true_class_names[i] in predicted_sets[i]
        for i in range(len(X_test))
    ]
    top1_indices = np.argmax(y_pred_probs_matrix, axis=1)
    top1_preds = [class_names[i] for i in top1_indices]
    correct_top1 = [
        top1_preds[i] == true_class_names[i]
        for i in range(len(X_test))
    ]

    patient_ids = sorted([pid for pid in os.listdir('./database/testing') if pid.startswith("patient")])

    df = pd.DataFrame({
        'PatientID': patient_ids,
        'True_Label': true_class_names,
        'Top1_Pred': top1_preds,
        'Predicted_Binary': predicted_sets,
        'Correctly_Contained': containment_match,
        'Correct_Top1': correct_top1
    })

    df.to_csv(output_csv, index=False)
    print(f" Saved predictions to {output_csv}")
    return df, np.mean(correct_top1)

# === Main Execution with Multiple Runs ===
if __name__ == "__main__":
    print("Loading training and testing data...")
    X_train, y_train_list, class_names = load_dataset_multihead('./database/training')
    X_test, y_test_list, _ = load_dataset_multihead('./database/testing')

    mlflow.set_experiment("ACDC_CardiacMRI_Classification")

    target_accuracy = 0.5
    n_trials = 5
    best_accuracy = 0
    best_model = None

    for run_id in range(n_trials):
        print(f"\n=== Run {run_id + 1} ===")
        model = build_simple_multihead_model(
            input_shape=(128, 128, 2),
            class_names=class_names
        )

        with mlflow.start_run(run_name=f"Run_{run_id + 1}"):
            mlflow.log_param("epochs", 200)
            mlflow.log_param("batch_size", 16)
            mlflow.log_param("learning_rate", 0.0004)
            mlflow.log_param("run_id", run_id + 1)

            print("Training model...")
            history = model.fit(
                X_train,
                y_train_list,
                validation_data=(X_test, y_test_list),
                epochs=200,
                batch_size=16,
                verbose=1
            )

            mlflow.keras.log_model(model, "multihead_model")

            print("Predicting and exporting results...")
            df, acc = export_predictions(model, X_test, y_test_list, class_names, 
                                         output_csv=f"prediction_results_run{run_id + 1}.csv")
            mlflow.log_metric("top1_accuracy", acc)

            print(f"Logged accuracy to MLflow: {acc:.3f}")

            if acc > best_accuracy:
                best_accuracy = acc
                best_model = model

            if acc >= target_accuracy:
                print(f" Target accuracy {target_accuracy} achieved at run {run_id + 1}")
                break

    print(f"\n Best accuracy across all runs: {best_accuracy:.3f}")
    if best_model is not None:
        best_model.save("best_multihead_model_above_0.5.h5")


Loading training and testing data...


100%|████████████████████████████████████████| 102/102 [00:00<00:00, 252.41it/s]
100%|██████████████████████████████████████████| 52/52 [00:00<00:00, 226.13it/s]



=== Run 1 ===
Training model...
Epoch 1/200
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 114ms/step - ARV_head_accuracy: 0.7964 - ARV_head_loss: 0.5960 - ARV_head_precision: 0.5000 - ARV_head_recall: 0.2262 - DCM_head_accuracy: 0.7402 - DCM_head_loss: 0.6295 - DCM_head_precision: 0.2000 - DCM_head_recall: 0.1124 - HCM_head_accuracy: 0.6925 - HCM_head_loss: 0.6278 - HCM_head_precision: 0.0000e+00 - HCM_head_recall: 0.0000e+00 - MINF_head_accuracy: 0.7060 - MINF_head_loss: 0.6189 - MINF_head_precision: 0.2545 - MINF_head_recall: 0.1888 - NOR_head_accuracy: 0.6832 - NOR_head_loss: 0.6061 - NOR_head_precision: 0.3000 - NOR_head_recall: 0.2686 - loss: 3.0857 - val_ARV_head_accuracy: 0.8000 - val_ARV_head_loss: 0.4375 - val_ARV_head_precision: 0.0000e+00 - val_ARV_head_recall: 0.0000e+00 - val_DCM_head_accuracy: 0.8000 - val_DCM_head_loss: 0.6922 - val_DCM_head_precision: 0.0000e+00 - val_DCM_head_recall: 0.0000e+00 - val_HCM_head_accuracy: 0.8000 - val_HCM_head_loss: 0.436



Predicting and exporting results...
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 39ms/step
 Saved predictions to prediction_results_run1.csv
Logged accuracy to MLflow: 0.440

=== Run 2 ===
Training model...
Epoch 1/200
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 124ms/step - ARV_head_accuracy: 0.7342 - ARV_head_loss: 0.6082 - ARV_head_precision: 0.0000e+00 - ARV_head_recall: 0.0000e+00 - DCM_head_accuracy: 0.7367 - DCM_head_loss: 0.6350 - DCM_head_precision: 0.2500 - DCM_head_recall: 0.2522 - HCM_head_accuracy: 0.6602 - HCM_head_loss: 0.6110 - HCM_head_precision: 0.2143 - HCM_head_recall: 0.4173 - MINF_head_accuracy: 0.7393 - MINF_head_loss: 0.6364 - MINF_head_precision: 0.3333 - MINF_head_recall: 0.0813 - NOR_head_accuracy: 0.5453 - NOR_head_loss: 0.6041 - NOR_head_precision: 0.0000e+00 - NOR_head_recall: 0.0000e+00 - loss: 3.0973 - val_ARV_head_accuracy: 0.8000 - val_ARV_head_loss: 0.4555 - val_ARV_head_precision: 0.0000e+00 - val_ARV_head_recall: 0



Predicting and exporting results...
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 44ms/step
 Saved predictions to prediction_results_run2.csv
Logged accuracy to MLflow: 0.440

=== Run 3 ===
Training model...
Epoch 1/200
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 123ms/step - ARV_head_accuracy: 0.7987 - ARV_head_loss: 0.5965 - ARV_head_precision: 0.0000e+00 - ARV_head_recall: 0.0000e+00 - DCM_head_accuracy: 0.6725 - DCM_head_loss: 0.6223 - DCM_head_precision: 0.2500 - DCM_head_recall: 0.3395 - HCM_head_accuracy: 0.6623 - HCM_head_loss: 0.6186 - HCM_head_precision: 0.1667 - HCM_head_recall: 0.3201 - MINF_head_accuracy: 0.7289 - MINF_head_loss: 0.6405 - MINF_head_precision: 0.0000e+00 - MINF_head_recall: 0.0000e+00 - NOR_head_accuracy: 0.6122 - NOR_head_loss: 0.6053 - NOR_head_precision: 0.2718 - NOR_head_recall: 0.3179 - loss: 3.0841 - val_ARV_head_accuracy: 0.8000 - val_ARV_head_loss: 0.4443 - val_ARV_head_precision: 0.0000e+00 - val_ARV_head_recall: 0



Predicting and exporting results...
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 40ms/step




 Saved predictions to prediction_results_run3.csv
Logged accuracy to MLflow: 0.520
🎯 Target accuracy 0.5 achieved at run 3

✅ Best accuracy across all runs: 0.520


In [16]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix
import mlflow
import os

# === Load Saved Predictions ===
prediction_csv = "prediction_results_run3.csv"  # 🔁 Update this if needed
df = pd.read_csv(prediction_csv)

# === Define Class Names ===
class_names = ['NOR', 'MINF', 'DCM', 'HCM', 'ARV']

# === Convert Labels to Indices ===
true_labels = df['True_Label'].map({cls: i for i, cls in enumerate(class_names)})
pred_labels = df['Top1_Pred'].map({cls: i for i, cls in enumerate(class_names)})

# === Confusion Matrix ===
cm = confusion_matrix(true_labels, pred_labels)
cm_df = pd.DataFrame(cm, index=class_names, columns=class_names)

# === Save Confusion Matrix Plot ===
plt.figure(figsize=(8, 6))
sns.heatmap(cm_df, annot=True, fmt='d', cmap="Blues", xticklabels=class_names, yticklabels=class_names)
plt.title("Confusion Matrix")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.tight_layout()

confusion_path = "confusion_matrix_run3.png"
plt.savefig(confusion_path)
plt.close()

# === Log confusion matrix to MLflow ===
mlflow.log_artifact(confusion_path)

# === Classification Report ===
report = classification_report(true_labels, pred_labels, target_names=class_names, output_dict=True)
report_df = pd.DataFrame(report).transpose()
report_df[['precision', 'recall', 'f1-score']] = report_df[['precision', 'recall', 'f1-score']].round(3)

# Save and log report
report_path = "classification_report_run3.csv"
report_df.to_csv(report_path)
mlflow.log_artifact(report_path)

# === Log overall accuracy separately ===
accuracy = report['accuracy']
mlflow.log_metric("overall_accuracy_run3", accuracy)

# === Console Output (optional) ===
print("\nConfusion Matrix:")
print(cm_df)

print("\nClassification Report:")
print(report_df[['precision', 'recall', 'f1-score', 'support']])
print(f"\nOverall Accuracy: {accuracy:.3f}")



Confusion Matrix:
      NOR  MINF  DCM  HCM  ARV
NOR     2     2    0    2    4
MINF    3     5    1    1    0
DCM     1     2    6    0    1
HCM     1     0    0    7    2
ARV     2     0    1    1    6

Classification Report:
              precision  recall  f1-score  support
NOR               0.222    0.20     0.211    10.00
MINF              0.556    0.50     0.526    10.00
DCM               0.750    0.60     0.667    10.00
HCM               0.636    0.70     0.667    10.00
ARV               0.462    0.60     0.522    10.00
accuracy          0.520    0.52     0.520     0.52
macro avg         0.525    0.52     0.518    50.00
weighted avg      0.525    0.52     0.518    50.00

Overall Accuracy: 0.520


In [10]:
from tensorflow.keras.models import Model
from tf_keras_vis.gradcam import Gradcam
from tf_keras_vis.utils.model_modifiers import ReplaceToLinear
import matplotlib.pyplot as plt
import numpy as np
import os

# === Setup Output Folder ===
os.makedirs("gradcam_outputs_run3", exist_ok=True)

# === Define Class Names ===
class_names = ['NOR', 'MINF', 'DCM', 'HCM', 'ARV']

# === Choose a Sample (e.g., first sample) ===
sample_index = 0
sample = X_test[sample_index:sample_index+1]  # shape (1, 128, 128, 2)

# === Loop through All Heads ===
for class_idx, class_name in enumerate(class_names):
    print(f"Generating Grad-CAM for class: {class_name}...")

    # --- Extract single-head submodel ---
    target_head_name = model.output_names[class_idx]
    submodel = Model(inputs=model.input, outputs=model.get_layer(target_head_name).output)

    # --- Score function (identity) ---
    def score_fn(output):
        return output

    # --- Init GradCAM ---
    gradcam = Gradcam(submodel, model_modifier=ReplaceToLinear(), clone=True)

    # --- Generate heatmap ---
    cam = gradcam(score_fn, sample, penultimate_layer=-1)

    # --- Plot ED slice with Grad-CAM overlay ---
    plt.figure(figsize=(4, 4))
    plt.imshow(sample[0][:, :, 0], cmap='gray')         # ED slice
    plt.imshow(cam[0], cmap='jet', alpha=0.5)           # Overlay CAM
    plt.title(f"Grad-CAM - {class_name}")
    plt.axis('off')
    plt.tight_layout()

    # --- Save plot ---
    save_path = f"gradcam_outputs_run3/gradcam_{class_name}_run3.png"
    plt.savefig(save_path)
    plt.close()
    print(f"Saved Grad-CAM for {class_name} to {save_path}")


Generating Grad-CAM for class: NOR...
Saved Grad-CAM for NOR to gradcam_outputs_run3/gradcam_NOR_run3.png
Generating Grad-CAM for class: MINF...
Saved Grad-CAM for MINF to gradcam_outputs_run3/gradcam_MINF_run3.png
Generating Grad-CAM for class: DCM...
Saved Grad-CAM for DCM to gradcam_outputs_run3/gradcam_DCM_run3.png
Generating Grad-CAM for class: HCM...
Saved Grad-CAM for HCM to gradcam_outputs_run3/gradcam_HCM_run3.png
Generating Grad-CAM for class: ARV...
Saved Grad-CAM for ARV to gradcam_outputs_run3/gradcam_ARV_run3.png


In [12]:
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import os

# === Create Folder for Output ===
os.makedirs("saliency_outputs_run3", exist_ok=True)

# === Class Names ===
class_names = ['NOR', 'MINF', 'DCM', 'HCM', 'ARV']

# === Select a Test Sample (e.g., first sample) ===
sample_index = 0
input_image = X_test[sample_index:sample_index+1]  # shape: (1, 128, 128, 2)

# === Loop Through Each Class Head ===
for class_idx, class_name in enumerate(class_names):
    print(f"Generating saliency map for class: {class_name}...")

    # --- Extract Head Submodel ---
    head_name = model.output_names[class_idx]
    submodel = tf.keras.Model(inputs=model.input, outputs=model.get_layer(head_name).output)

    # --- Track Gradients ---
    input_tensor = tf.convert_to_tensor(input_image)
    with tf.GradientTape() as tape:
        tape.watch(input_tensor)
        predictions = submodel(input_tensor)
        output = predictions[:, 0]  # scalar value

    # --- Compute Gradient w.r.t. Input ---
    grads = tape.gradient(output, input_tensor)[0]  # shape: (128, 128, 2)

    # --- Channel-wise Max Absolute Gradient (ED & ES) ---
    saliency = np.max(np.abs(grads), axis=-1)

    # --- Normalize for Visualization ---
    saliency = (saliency - saliency.min()) / (saliency.max() - saliency.min() + 1e-8)

    # --- Plot Overlay on ED Slice ---
    plt.figure(figsize=(4, 4))
    plt.imshow(input_image[0][:, :, 0], cmap='gray')     # ED slice
    plt.imshow(saliency, cmap='hot', alpha=0.5)
    plt.title(f"Saliency Map - {class_name}")
    plt.axis('off')
    plt.tight_layout()

    # --- Save Output ---
    save_path = f"saliency_outputs_run3/saliency_{class_name}_run3.png"
    plt.savefig(save_path)
    plt.close()
    print(f"Saved: {save_path}")


Generating saliency map for class: NOR...
Saved: saliency_outputs_run3/saliency_NOR_run3.png
Generating saliency map for class: MINF...
Saved: saliency_outputs_run3/saliency_MINF_run3.png
Generating saliency map for class: DCM...
Saved: saliency_outputs_run3/saliency_DCM_run3.png
Generating saliency map for class: HCM...
Saved: saliency_outputs_run3/saliency_HCM_run3.png
Generating saliency map for class: ARV...
Saved: saliency_outputs_run3/saliency_ARV_run3.png


In [22]:
def export_predictions(model, X_test, y_test_list, class_names, output_csv="prediction_results.csv", prob_output="y_pred_probs_run3.npy"):
    y_pred_probs = model.predict(X_test)
    y_pred_bin = [(p > 0.5).astype(int).flatten() for p in y_pred_probs]
    y_pred_matrix = np.stack(y_pred_bin, axis=1)
    y_pred_probs_matrix = np.stack([p.flatten() for p in y_pred_probs], axis=1)

    # Save predicted probability matrix for ROC
    np.save(prob_output, y_pred_probs_matrix)

    true_class_indices = [
        next(j for j in range(len(class_names)) if y_test_list[j][i] == 1)
        for i in range(len(X_test))
    ]
    true_class_names = [class_names[i] for i in true_class_indices]

    predicted_sets = [
        [class_names[j] for j in range(len(class_names)) if y_pred_matrix[i][j] == 1]
        for i in range(len(X_test))
    ]

    containment_match = [
        true_class_names[i] in predicted_sets[i]
        for i in range(len(X_test))
    ]
    top1_indices = np.argmax(y_pred_probs_matrix, axis=1)
    top1_preds = [class_names[i] for i in top1_indices]
    correct_top1 = [
        top1_preds[i] == true_class_names[i]
        for i in range(len(X_test))
    ]

    patient_ids = sorted([pid for pid in os.listdir('./database/testing') if pid.startswith("patient")])

    df = pd.DataFrame({
        'PatientID': patient_ids,
        'True_Label': true_class_names,
        'Top1_Pred': top1_preds,
        'Predicted_Binary': predicted_sets,
        'Correctly_Contained': containment_match,
        'Correct_Top1': correct_top1
    })

    df.to_csv(output_csv, index=False)
    print(f" Saved predictions to {output_csv}")
    print(f" Saved prediction probabilities to {prob_output}")
    return df, np.mean(correct_top1)


In [24]:
df, acc = export_predictions(model, X_test, y_test_list, class_names,
                             output_csv="prediction_results_run3.csv",
                             prob_output="y_pred_probs_run3.npy")


[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step 
 Saved predictions to prediction_results_run3.csv
 Saved prediction probabilities to y_pred_probs_run3.npy


In [26]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_curve, auc
import mlflow

# === Load prediction CSV from run 3 ===
df = pd.read_csv("prediction_results_run3.csv")

# === Define class names and binarize labels ===
class_names = ['NOR', 'MINF', 'DCM', 'HCM', 'ARV']
n_classes = len(class_names)

# Map labels to indices
label_to_index = {cls: i for i, cls in enumerate(class_names)}
true_labels = df['True_Label'].map(label_to_index).values
y_true = label_binarize(true_labels, classes=range(n_classes))

# Load prediction probabilities for each class
# Assuming you saved predicted probabilities earlier as a CSV or you have access via inference
# If not yet available, modify your export_predictions function to save `y_pred_probs_matrix`
y_pred_probs = np.load("y_pred_probs_run3.npy")  # Shape: (n_samples, 5)

# === Compute ROC curve and AUC ===
fpr = dict()
tpr = dict()
roc_auc = dict()

for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(y_true[:, i], y_pred_probs[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# === Plot ROC Curve for all classes ===
plt.figure(figsize=(8, 6))
for i in range(n_classes):
    plt.plot(fpr[i], tpr[i], label=f'{class_names[i]} (AUC = {roc_auc[i]:.2f})')

plt.plot([0, 1], [0, 1], 'k--', label='Random Chance')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curves for Multi-Head CNN')
plt.legend(loc='lower right')
plt.tight_layout()
plt.savefig("roc_curve_multihead_run3.png")
plt.close()

# === Log AUC and ROC plot ===
for i in range(n_classes):
    mlflow.log_metric(f"AUC_{class_names[i]}", roc_auc[i])
mlflow.log_artifact("roc_curve_multihead_run3.png")

print("ROC curves and AUC scores logged to MLflow.")


ROC curves and AUC scores logged to MLflow.
