In [None]:
################# Code for loading the Human Scans & Data Augmentation##############################

import numpy as np
import os
import imageio
import matplotlib.pyplot as plt
import re
from PIL import Image
import shutil
from skimage.transform import resize
from sklearn.model_selection import train_test_split
import random
from tqdm import tqdm
from skimage.io import imread, imshow
import matplotlib.pyplot as plt
from datetime import datetime
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard
from keras import backend as K

def load_images(input_folder, desired_height, desired_width):
    input_images = []
    loaded_indices = []

    file_pattern = re.compile(r"(circum|radial|longit)_(\d+)\.png")

    image_paths = {}

    for file in os.listdir(input_folder):
        match = file_pattern.match(file)
        if match:
            category, index = match.groups()
            index = int(index)

            if index not in image_paths:
                image_paths[index] = {}

            image_paths[index][category] = os.path.join(input_folder, file)

    for index in sorted(image_paths.keys()):
        paths = image_paths[index]
        if all(category in paths for category in ["circum", "radial", "longit"]):
            images = [Image.open(paths[category]).convert("L").resize((desired_width, desired_height))
                      for category in ["circum", "radial", "longit"]]

            combined_image = np.stack(images, axis=-1)
            input_images.append(combined_image)
            loaded_indices.append(index)  # Save the index

    return np.array(input_images), loaded_indices

# Load images and get their indices
input_folder = '/Human_CMR/'  # locate data folder for inputs
desired_height = 128
desired_width = 128

input_images, indices = load_images(input_folder, desired_height, desired_width)


print(f"Loaded {input_images.shape[0]} sets of images.")
print(f"Indices of loaded images: {indices}")

output_folder = '/Human_CMR/'  # locate data folder for outputs
output_images = []

# Resize each image to 128x128, add a channel dimension, and display it
for i in indices:
    output_paths = [output_folder + f"patient_{i}_LGE.png"]
    images = []

    for path in output_paths:
        img = Image.open(path).convert("L").resize((128, 128))  # Resize to 128x128
        img_array = np.array(img)[..., np.newaxis]  # Add channel dimension (H, W) -> (H, W, 1)
        images.append(img_array)

    output_images.extend(images)  # Append images to the output list

# Convert the list to a NumPy array with shape (5, 128, 128, 1)
output_images = np.array(output_images)


threshold_value = 100
output_images[output_images <= threshold_value] = 0
output_images[output_images > threshold_value] = 255

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Lambda, Conv2D, BatchNormalization, Dropout, MaxPooling2D, Conv2DTranspose, concatenate
from tensorflow.keras.models import Model

seed = 42
np.random.seed = seed

IMG_WIDTH = 128
IMG_HEIGHT = 128
IMG_CHANNELS = 3

# Define the Dice coefficient metric function (just for training evaluation)
def dice_coefficient(y_true, y_pred, smooth=1e-6):
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)

    intersection = tf.reduce_sum(y_true * y_pred)
    union = tf.reduce_sum(y_true) + tf.reduce_sum(y_pred)
    dice = (2.0 * intersection + smooth) / (union + smooth)
    return dice


#Build the UNet model
inputs = tf.keras.layers.Input((IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS))
s = tf.keras.layers.Lambda(lambda x: x / 255)(inputs)

#Contraction path
c1 = tf.keras.layers.Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(s)
c1 = tf.keras.layers.Dropout(0.1)(c1)
c1 = tf.keras.layers.Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c1)
p1 = tf.keras.layers.MaxPooling2D((2, 2))(c1)

c2 = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p1)
c2 = tf.keras.layers.Dropout(0.1)(c2)
c2 = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c2)
p2 = tf.keras.layers.MaxPooling2D((2, 2))(c2)

c3 = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p2)
c3 = tf.keras.layers.Dropout(0.2)(c3)
c3 = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c3)
p3 = tf.keras.layers.MaxPooling2D((2, 2))(c3)

c4 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p3)
c4 = tf.keras.layers.Dropout(0.2)(c4)
c4 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c4)
p4 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(c4)

c5 = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p4)
c5 = tf.keras.layers.Dropout(0.3)(c5)
c5 = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c5)

#Expansive path
u6 = tf.keras.layers.Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(c5)
u6 = tf.keras.layers.concatenate([u6, c4])
c6 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u6)
c6 = tf.keras.layers.Dropout(0.2)(c6)
c6 = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c6)

u7 = tf.keras.layers.Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(c6)
u7 = tf.keras.layers.concatenate([u7, c3])
c7 = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u7)
c7 = tf.keras.layers.Dropout(0.2)(c7)
c7 = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c7)

u8 = tf.keras.layers.Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(c7)
u8 = tf.keras.layers.concatenate([u8, c2])
c8 = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u8)
c8 = tf.keras.layers.Dropout(0.1)(c8)
c8 = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c8)

u9 = tf.keras.layers.Conv2DTranspose(16, (2, 2), strides=(2, 2), padding='same')(c8)
u9 = tf.keras.layers.concatenate([u9, c1], axis=3)
c9 = tf.keras.layers.Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u9)
c9 = tf.keras.layers.Dropout(0.1)(c9)
c9 = tf.keras.layers.Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c9)

outputs = tf.keras.layers.Conv2D(1, (1, 1), activation='sigmoid')(c9)


# Compile the model with masked losses and metrics

model = Model(inputs=[inputs], outputs=[outputs])
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=[dice_coefficient])

model.summary()




In [None]:
model.load_weights('UNet_single_fidelity_random_1.h5')
preds_test = model.predict(input_images, verbose=1)
preds_test_t = (preds_test > 0.75).astype(np.uint8)

In [None]:
# Define the circular mask function
def create_circular_mask(height, width, radius=63):
    center = (int(height / 2), int(width / 2))
    Y, X = np.ogrid[:height, :width]
    dist_from_center = np.sqrt((X - center[1]) ** 2 + (Y - center[0]) ** 2)
    mask = dist_from_center <= radius
    return mask.astype(np.float32)

# Generate the circular mask for 128x128 images
IMG_HEIGHT, IMG_WIDTH = 128, 128
circular_mask = create_circular_mask(IMG_HEIGHT, IMG_WIDTH)
circular_mask = tf.convert_to_tensor(circular_mask, dtype=tf.float32)

In [None]:
# y_test_sample = output_images[i, :, :, 0]
# thresholded_y_test = tf.cast(y_test_sample > 0, tf.float32)
# masked_y_test = thresholded_y_test * circular_mask

i=4
# Mask and threshold the prediction
y_pred_sample = preds_test_t[i, :, :, 0]
thresholded_y_pred = tf.cast(y_pred_sample > 0, tf.float32)
masked_y_pred = thresholded_y_pred * circular_mask


In [None]:


# Define functions for calculating metrics
def dice_score(masked_y_true, masked_y_pred, smooth=1e-6):
    intersection = tf.reduce_sum(masked_y_true * masked_y_pred)
    union = tf.reduce_sum(masked_y_true) + tf.reduce_sum(masked_y_pred)
    dice = (2.0 * intersection + smooth) / (union + smooth)
    return dice

def accuracy(masked_y_true, masked_y_pred, circular_mask):
    correct = tf.reduce_sum(tf.cast(masked_y_true == masked_y_pred, tf.float32) * circular_mask)
    total = tf.reduce_sum(circular_mask)
    return correct / total

def precision(masked_y_true, masked_y_pred, smooth=1e-6):
    true_positives = tf.reduce_sum(masked_y_true * masked_y_pred)
    predicted_positives = tf.reduce_sum(masked_y_pred)
    return (true_positives + smooth) / (predicted_positives + smooth)

def recall(masked_y_true, masked_y_pred, smooth=1e-6):
    true_positives = tf.reduce_sum(masked_y_true * masked_y_pred)
    actual_positives = tf.reduce_sum(masked_y_true)
    return (true_positives + smooth) / (actual_positives + smooth)

def iou(masked_y_true, masked_y_pred, smooth=1e-6):
    intersection = tf.reduce_sum(masked_y_true * masked_y_pred)
    union = tf.reduce_sum(masked_y_true) + tf.reduce_sum(masked_y_pred) - intersection
    return (intersection + smooth) / (union + smooth)

# Calculate and print metrics for each test sample
metrics_results = {
    'dice_infarct': [],
    'accuracy_infarct': [],
    'precision_infarct': [],
    'recall_infarct': [],
    'iou_infarct': [],

}

for i in range(len(output_images)):
    # Mask and threshold the ground truth
    y_test_sample = output_images[i, :, :, 0]
    thresholded_y_test = tf.cast(y_test_sample > 0, tf.float32)
    masked_y_test = thresholded_y_test * circular_mask

    # Mask and threshold the prediction
    y_pred_sample = preds_test_t[i, :, :, 0]
    thresholded_y_pred = tf.cast(y_pred_sample > 0, tf.float32)
    masked_y_pred = thresholded_y_pred * circular_mask

    # Calculate metrics for infarct region
    dice_infarct = dice_score(masked_y_test, masked_y_pred)
    acc_infarct = accuracy(masked_y_test, masked_y_pred, circular_mask).numpy()
    prec_infarct = precision(masked_y_test, masked_y_pred).numpy()
    rec_infarct = recall(masked_y_test, masked_y_pred).numpy()
    iou_infarct = iou(masked_y_test, masked_y_pred).numpy()

    metrics_results['dice_infarct'].append(dice_infarct.numpy())
    metrics_results['accuracy_infarct'].append(acc_infarct)
    metrics_results['precision_infarct'].append(prec_infarct)
    metrics_results['recall_infarct'].append(rec_infarct)
    metrics_results['iou_infarct'].append(iou_infarct)


    # Print metrics for each sample
    print(f"Sample {i}:")
    print(f" - Dice Score (Infarct): {dice_infarct.numpy()}")
    print(f" - Accuracy (Infarct): {acc_infarct}")
    print(f" - Precision (Infarct): {prec_infarct}")
    print(f" - Recall (Infarct): {rec_infarct}")
    print(f" - IoU (Infarct): {iou_infarct}")

# Calculate average metrics across all test samples
average_metrics = {metric: np.mean(scores) for metric, scores in metrics_results.items()}
print("\nAverage Metrics Across All Test Samples:")
for metric, avg_score in average_metrics.items():
    print(f" - {metric.capitalize()}: {avg_score}")