In [None]:
import numpy as np 
import pandas as pd 
import os
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.image import random_crop
from PIL import Image
import cv2
import random
import matplotlib.pyplot as plt


In [None]:
# Define the directories
image_folder1 = '/kaggle/input/palm-center/Center/Mid/Images'
mask_folder1 = '/kaggle/input/palm-center/Center/Mid/Masks'

image_folder2 = '/kaggle/input/palm-center/Center/Fovea/Images'
mask_folder2 = '/kaggle/input/palm-center/Center/Fovea/Masks'

image_folder3 = '/kaggle/input/palm-center/Center/OD/Images'
mask_folder3 = '/kaggle/input/palm-center/Center/OD/Masks'

In [None]:
# Get the image and mask files from each directory
image_files1 = set(os.listdir(image_folder1))
mask_files1 = set(os.listdir(mask_folder1))

image_files2 = set(os.listdir(image_folder2))
mask_files2 = set(os.listdir(mask_folder2))

image_files3 = set(os.listdir(image_folder3))
mask_files3 = set(os.listdir(mask_folder3))

# Combine the files from all directories
image_files = image_files1.union(image_files2).union(image_files3)
mask_files = mask_files1.union(mask_files2).union(mask_files3)

print(f"Total images: {len(image_files)}")
print(f"Total masks: {len(mask_files)}")

In [None]:
def resize_image(img, size=(256,256)):
    return cv2.resize(img, size, interpolation=cv2.INTER_AREA)

def preprocess_image_mask(image_path, mask_path):
    # Load the grayscale image and mask
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)  
    mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)    

    # Resize image and mask to 512x512
    image_resized = resize_image(image).reshape(256,256, 1)
    mask_resized = resize_image(mask).reshape(256,256, 1)

    # Make the mask binary
    _, mask_binary = cv2.threshold(mask_resized, 127, 255, cv2.THRESH_BINARY)
    return image_resized, mask_binary

In [None]:
image_folders = [image_folder1, image_folder2, image_folder3]
mask_folders = [mask_folder1, mask_folder2, mask_folder3]
centre_labels = [0,1,2]

In [None]:
images, masks, label_list = [], [], []

for image_folder, mask_folder, label in zip(image_folders, mask_folders, centre_labels):
    for img_name in os.listdir(image_folder):
        img_path = os.path.join(image_folder, img_name)
        mask_path = os.path.join(mask_folder, img_name)

    # Preprocess and resize
        image_resized, mask_binary = preprocess_image_mask(img_path, mask_path)
        images.append(image_resized)
        masks.append(mask_binary)

   
    #Append the label
        label_list.append(label)
        
images = np.array(images)
masks = np.array(masks)
labels = np.array(label_list)

In [None]:
# Calculate the average size of the masks for each label

average_mask_sizes = {}
  # Total pixels in a 256x256 image
for label in np.unique(labels):
    label_masks = masks[labels == label]
    # Count the black pixels (mask area) in each mask and average
    average_mask_size = np.mean(np.sum(label_masks == 0, axis=(1, 2)))
    average_mask_sizes[label] = average_mask_size

print("Average Mask Sizes per Label:")
for label, size in average_mask_sizes.items():
    print(f"Label {label}: {size:.2f} pixels")



In [None]:
# Split the data
X_train, X_temp, y_train, y_temp,  label_train, label_temp = train_test_split(
    images, masks,labels, test_size=0.2, random_state=42
)
X_val, X_test, y_val, y_test,  label_val, label_test = train_test_split(
    X_temp, y_temp, label_temp, test_size=0.5, random_state=42
)

In [None]:
y_train = y_train.reshape(-1, 256,256, 1)
y_val = y_val.reshape(-1, 256,256, 1)
y_test = y_test.reshape(-1,256,256, 1)

print('y_train shape', y_train.shape)

In [None]:
label_train = label_train.reshape(-1,1)
label_val = label_val.reshape(-1,1)
label_test = label_test.reshape(-1,1)

In [None]:
# Creating a tensorflow dataset


# Convert arrays to TensorFlow tensors
X_train_tensor = tf.convert_to_tensor(X_train, dtype=tf.float32)
y_train_tensor = tf.convert_to_tensor(y_train, dtype=tf.float32)
label_train_tensor = tf.convert_to_tensor(label_train, dtype=tf.float32)

# Create TensorFlow Dataset
train_dataset = tf.data.Dataset.from_tensor_slices((X_train_tensor,  y_train_tensor))

# Batch the dataset (adjust batch size as needed)
batch_size = 8
train_dataset = train_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

In [None]:
# Repeat for validation and testing sets
X_val_tensor = tf.convert_to_tensor(X_val, dtype=tf.float32)
y_val_tensor = tf.convert_to_tensor(y_val, dtype=tf.float32)
label_val_tensor = tf.convert_to_tensor(label_val, dtype=tf.float32)

X_test_tensor = tf.convert_to_tensor(X_test, dtype=tf.float32)
y_test_tensor = tf.convert_to_tensor(y_test, dtype=tf.float32)
label_test_tensor = tf.convert_to_tensor(label_test, dtype=tf.float32)

# Validation dataset
val_dataset = tf.data.Dataset.from_tensor_slices((X_val_tensor,y_val_tensor))
val_dataset = val_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

# Test dataset

test_dataset = tf.data.Dataset.from_tensor_slices((X_test_tensor, y_test_tensor))
test_dataset = test_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

In [None]:
#Normalize data
def normalize_data(image, mask):
       
    # Normalize image to [0, 1] by dividing by 255
    image = image / 255.0
    
    # Normalize mask to [0, 1] and round to binary
    mask = mask / 255.0
    mask = tf.cast(tf.round(mask), tf.float32)
    
    return image, mask  # Return the image,and the normalized mask


# Apply normalization to train, validation, and test datasets
train_dataset = train_dataset.map(normalize_data)
val_dataset = val_dataset.map(normalize_data)
test_dataset = test_dataset.map(normalize_data)

In [None]:
#Create filtered subsets of test dataset based on image centres

# Get indices based on labels
label_0_indices = np.where(label_test == 0)[0]
label_1_indices = np.where(label_test == 1)[0]
label_2_indices = np.where(label_test == 2)[0]

# Extract images and masks with specified labels
X_test_label_0 = X_test[label_0_indices]
y_test_label_0 = y_test[label_0_indices]
X_test_label_1 = X_test[label_1_indices]
y_test_label_1 = y_test[label_1_indices]
X_test_label_2 = X_test[label_2_indices]
y_test_label_2 = y_test[label_2_indices]

# Convert to tensors 
X_test_label_0_tensor = tf.convert_to_tensor(X_test_label_0, dtype=tf.float32)
y_test_label_0_tensor = tf.convert_to_tensor(y_test_label_0, dtype=tf.float32)
X_test_label_1_tensor = tf.convert_to_tensor(X_test_label_1, dtype=tf.float32)
y_test_label_1_tensor = tf.convert_to_tensor(y_test_label_1, dtype=tf.float32)
X_test_label_2_tensor = tf.convert_to_tensor(X_test_label_2, dtype=tf.float32)
y_test_label_2_tensor = tf.convert_to_tensor(y_test_label_2, dtype=tf.float32)

# Create filtered subsets 
test_dataset_mid = tf.data.Dataset.from_tensor_slices((X_test_label_0_tensor, y_test_label_0_tensor))
test_dataset_mid = test_dataset_mid.batch(batch_size).prefetch(tf.data.AUTOTUNE)

test_dataset_fovea = tf.data.Dataset.from_tensor_slices((X_test_label_1_tensor, y_test_label_1_tensor))
test_dataset_fovea = test_dataset_fovea.batch(batch_size).prefetch(tf.data.AUTOTUNE)

test_dataset_od = tf.data.Dataset.from_tensor_slices((X_test_label_2_tensor, y_test_label_2_tensor))
test_dataset_od = test_dataset_od.batch(batch_size).prefetch(tf.data.AUTOTUNE)

test_dataset_mid = test_dataset_mid.map(normalize_data)
test_dataset_fovea = test_dataset_fovea.map(normalize_data)
test_dataset_od = test_dataset_od.map(normalize_data)

In [None]:
#Define metrics and loss function

def weighted_binary_crossentropy(y_true, y_pred):
    weights = tf.where(tf.less(tf.range(tf.shape(y_true)[2]), tf.shape(y_true)[2] // 2), 2.0, 1.0)
    bce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
    weighted_bce = bce * weights
    return tf.reduce_mean(weighted_bce)


def dice_coefficient(y_true, y_pred):
    # Cast both y_true and y_pred to float32 to ensure compatibility
    y_true_f = tf.keras.backend.flatten(tf.cast(y_true, tf.float32))
    y_pred_f = tf.keras.backend.flatten(tf.cast(y_pred, tf.float32))
    
    intersection = tf.keras.backend.sum(y_true_f * y_pred_f)
    dice = (2. * intersection + 1e-6) / (tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f) + 1e-6)
    
    return dice

def iou(y_true, y_pred):
    y_true_f = tf.keras.backend.flatten(tf.cast(y_true, tf.float32))
    y_pred_f = tf.keras.backend.flatten(tf.cast(y_pred, tf.float32))

    intersection = tf.keras.backend.sum(y_true_f * y_pred_f)
    union = tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f) - intersection
    iou_metric = (intersection ) / (union )
    return iou_metric




In [None]:
#Define Attention UNet

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, Concatenate, Layer, GlobalAveragePooling2D, GlobalMaxPooling2D
from tensorflow.keras.layers import Multiply, Add, Reshape, Activation

# Define input shape
input_shape = (256, 256, 1)


# Define Custom Spatial Attention Layer
class SpatialAttention(Layer):
    def __init__(self, **kwargs):
        super(SpatialAttention, self).__init__(**kwargs)

    def build(self, input_shape):
        self.conv1 = Conv2D(2, (3, 3), activation='relu', padding='same')
        self.conv2 = Conv2D(1, (3, 3), activation='sigmoid', padding='same')
        self.avg_pool = GlobalAveragePooling2D(keepdims=True)
        self.max_pool = GlobalMaxPooling2D(keepdims=True)
        self.concat = Concatenate(axis=-1)

    def call(self, inputs):
        avg_pool = self.avg_pool(inputs)
        max_pool = self.max_pool(inputs)
        concat = self.concat([avg_pool, max_pool])
        conv = self.conv1(concat)
        conv = self.conv2(conv)
        return Multiply()([inputs, conv])

# Define Spatial Attention module using the custom layer
def spatial_attention(input):
    return SpatialAttention()(input)

# Define input layer
inputs = Input(input_shape)

# Contracting Path with Spatial Attention
conv1 = Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
conv1 = Conv2D(64, (3, 3), activation='relu', padding='same')(conv1)
conv1_attention = spatial_attention(conv1)  # Apply Spatial Attention
pool1 = MaxPooling2D(pool_size=(2, 2))(conv1_attention)

conv2 = Conv2D(128, (3, 3), activation='relu', padding='same')(pool1)
conv2 = Conv2D(128, (3, 3), activation='relu', padding='same')(conv2)
conv2_attention = spatial_attention(conv2)  # Apply Spatial Attention
pool2 = MaxPooling2D(pool_size=(2, 2))(conv2_attention)

conv3 = Conv2D(256, (3, 3), activation='relu', padding='same')(pool2)
conv3 = Conv2D(256, (3, 3), activation='relu', padding='same')(conv3)
conv3_attention = spatial_attention(conv3)  # Apply Spatial Attention
pool3 = MaxPooling2D(pool_size=(2, 2))(conv3_attention)

# Bottleneck
conv4 = Conv2D(512, (3, 3), activation='relu', padding='same')(pool3)
conv4 = Conv2D(512, (3, 3), activation='relu', padding='same')(conv4)

# Expansive Path
up5 = UpSampling2D(size=(2, 2))(conv4)
merge5 = concatenate([conv3_attention, up5], axis=3)
conv5 = Conv2D(256, (3, 3), activation='relu', padding='same')(merge5)
conv5 = Conv2D(256, (3, 3), activation='relu', padding='same')(conv5)

up6 = UpSampling2D(size=(2, 2))(conv5)
merge6 = concatenate([conv2_attention, up6], axis=3)
conv6 = Conv2D(128, (3, 3), activation='relu', padding='same')(merge6)
conv6 = Conv2D(128, (3, 3), activation='relu', padding='same')(conv6)

up7 = UpSampling2D(size=(2, 2))(conv6)
merge7 = concatenate([conv1_attention, up7], axis=3)
conv7 = Conv2D(64, (3, 3), activation='relu', padding='same')(merge7)
conv7 = Conv2D(64, (3, 3), activation='relu', padding='same')(conv7)

# Output Layer
outputs = Conv2D(1, (1, 1), activation='sigmoid')(conv7)

# Define the model
model = Model(inputs=[inputs], outputs=[outputs])
model.summary()

In [None]:
#Train

from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

learning_rates = [1e-4, 0.5e-4]

predictions_dict = {}

# Initialize a DataFrame to store the results
results_df = pd.DataFrame(columns=["LR", "Dice", "IoU", "F_Dice", "F_IoU","M_Dice", "M_IoU","O_Dice", "O_IoU","Epochs"])

# Training loop
for lr in learning_rates:
    
    # Initialize the model


    # Compile the model
    optimizer = Adam(learning_rate=lr)
    model.compile(optimizer=optimizer, loss=weighted_binary_crossentropy, metrics=[dice_coefficient, iou])

    # Callbacks
    model_name = f"Attn_UNet_lr{lr}.keras"
    checkpoint_callback = ModelCheckpoint(
        filepath=os.path.join("saved_models", model_name),
        monitor='val_loss',
        save_best_only=True,
        verbose=1,
        save_weights_only=False
    )
    early_stopping_callback = EarlyStopping(
        monitor='val_loss',
        patience=200,
        restore_best_weights=True,
        verbose=1
    )

    # Train the model
    history = model.fit(
        train_dataset,
        validation_data=val_dataset,
        epochs=1,
        batch_size=8,
        callbacks=[checkpoint_callback, early_stopping_callback],
        verbose=1
    )

    # Evaluate the model
    val_metrics_combined = model.evaluate(test_dataset, verbose=0)
    dice_coeff, iou_score = val_metrics_combined[1], val_metrics_combined[2]

    val_metrics_fovea = model.evaluate(test_dataset_fovea, verbose=0)
    dice_coeff_f, iou_score_f = val_metrics_fovea[1], val_metrics_fovea[2]

    val_metrics_mid = model.evaluate(test_dataset_mid, verbose=0)
    dice_coeff_m, iou_score_m = val_metrics_mid[1], val_metrics_mid[2]

    val_metrics_od = model.evaluate(test_dataset_od, verbose=0)
    dice_coeff_o, iou_score_o = val_metrics_od[1], val_metrics_od[2]

    # Add results to DataFrame
    new_row = {
        "LR": lr,
        "Dice": dice_coeff,
        "IoU": iou_score,
        "F_Dice": dice_coeff_f,
        "F_IoU": iou_score_f,
        "M_Dice": dice_coeff_m,
        "M_IoU": iou_score_m,
        "O_Dice": dice_coeff_o,
        "O_IoU": iou_score_o,
        "Epochs": len(history.history['loss'])
    }

    new_row_df = pd.DataFrame([new_row])

    results_df = pd.concat([results_df, new_row_df], ignore_index= True)

    predictions = model.predict(test_dataset)
    predictions_dict[model_name] = predictions

# Display the full DataFrame
pd.set_option('display.max_columns', None)
pd.set_option('display.expand_frame_repr', False)
pd.set_option('display.max_rows', None)

print(results_df)

In [None]:
# Define the test datasets and corresponding labels
test_datasets = {
    "Fovea": test_dataset_fovea,
    "Mid": test_dataset_mid,
    "OD": test_dataset_od
}

num_models = len(predictions_dict)

# Loop through each dataset
for dataset_name, dataset in test_datasets.items():
    # Unpack the dataset
    test_batch = next(iter(dataset.take(1)))  # Take one batch
    test_images = test_batch[0]
    test_masks  = test_batch[1]

    # Convert tensors to numpy arrays
    test_images = test_images.numpy()
    
    test_masks = test_masks.numpy()
    
    # Create a figure for visualization
    fig, axes = plt.subplots(5, num_models + 2, figsize=(15, 25))
    fig.subplots_adjust(hspace=0.4, wspace=0.4)

    # Loop through the first 5 test images
    for row in range(min(5, len(test_images))):
        # Plot the original image
        axes[row, 0].imshow(test_images[row][..., 0], cmap="gray")
        axes[row, 0].set_title("Image")
        axes[row, 0].axis("off")

        # Plot the true mask
        axes[row, 1].imshow(test_masks[row], cmap="gray")
        axes[row, 1].set_title("True Mask")
        axes[row, 1].axis("off")

        # Plot the predictions for each model
        for col, (model_name, predictions) in enumerate(predictions_dict.items()):
            predicted_mask = (predictions[row].squeeze() > 0.3).astype(int)
            ax = axes[row, col + 2]
            ax.imshow(predicted_mask, cmap="gray")
            ax.set_title(f'Model {model_name}')
            ax.axis("off")

    # Set the figure title based on the dataset name
    fig.suptitle(f'{dataset_name} Dataset Predictions')

    # Show the plot
    plt.show()