In [None]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

# Load the dataset
dataset_path = '../dataset/DRIVE/images'
mask_path = '../dataset/DRIVE/mask'
image_size = (256, 256)


In [None]:

def load_data(dataset_path, mask_path):
    images = []
    masks = []
    
    for img_name in os.listdir(dataset_path):
        img = cv2.imread(os.path.join(dataset_path, img_name))
        img = cv2.resize(img, image_size)
        images.append(img)

        mask_name = img_name.replace(".tif", "_mask.gif")
        mask = cv2.imread(os.path.join(mask_path, mask_name), cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, image_size)
        _, mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY)
        masks.append(mask)

    return np.array(images), np.array(masks)

# Load and preprocess the dataset
images, masks = load_data(dataset_path, mask_path)
images = images.astype('float32') / 255.0
masks = masks.astype('float32') / 255.0
masks = np.expand_dims(masks, axis=-1)

# Split the dataset into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(images, masks, test_size=0.2, random_state=42)


In [1]:
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Dropout, concatenate, UpSampling2D, BatchNormalization, Activation
from tensorflow.keras.models import Model

def conv_block(input_tensor, num_filters):
    x = Conv2D(num_filters, (3, 3), padding='same')(input_tensor)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Conv2D(num_filters, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    return x

def custom_unet(input_shape):
    inputs = Input(input_shape)

    # Encoder
    c1 = conv_block(inputs, 32)
    p1 = MaxPooling2D((2, 2))(c1)
    p1 = Dropout(0.1)(p1)

    c2 = conv_block(p1, 64)
    p2 = MaxPooling2D((2, 2))(c2)
    p2 = Dropout(0.1)(p2)

    c3 = conv_block(p2, 128)
    p3 = MaxPooling2D((2, 2))(c3)
    p3 = Dropout(0.1)(p3)

    c4 = conv_block(p3, 256)
    p4 = MaxPooling2D((2, 2))(c4)
    p4 = Dropout(0.1)(p4)

    # Bridge
    c5 = conv_block(p4, 512)

    # Decoder
    u6 = UpSampling2D((2, 2))(c5)
    u6 = concatenate([u6, c4])
    u6 = Dropout(0.1)(u6)
    c6 = conv_block(u6, 256)

    u7 = UpSampling2D((2, 2))(c6)
    u7 = concatenate([u7, c3])
    u7 = Dropout(0.1)(u7)
    c7 = conv_block(u7, 128)

    u8 = UpSampling2D((2, 2))(c7)
    u8 = concatenate([u8, c2])
    u8 = Dropout(0.1)(u8)
    c8 = conv_block(u8, 64)

    u9 = UpSampling2D((2, 2))(c8)
    u9 = concatenate([u9, c1])
    u9 = Dropout(0.1)(u9)
    c9 = conv_block(u9, 32)

    # Output layer
    outputs = Conv2D(1, (1, 1), activation='sigmoid')(c9)

    return Model(inputs=[inputs], outputs=[outputs])


In [None]:

# Build the model
input_shape = (image_size[0], image_size[1], 3)
model = custom_unet(input_shape)
model.summary()


In [None]:
import tensorflow.keras.backend as K

# Define the Dice Loss function
def dice_loss(y_true, y_pred):
    numerator = 2 * K.sum(y_true * y_pred, axis=(1,2,3))
    denominator = K.sum(y_true + y_pred, axis=(1,2,3))
    return 1 - (numerator + 1) / (denominator + 1)



In [None]:
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Compile the model

# Compile the model using the Dice Loss function
model.compile(optimizer=Adam(learning_rate=1e-4), loss=dice_loss, metrics=['accuracy'])

# Define data augmentation
data_gen_args = dict(rotation_range=20,
                     width_shift_range=0.1,
                     height_shift_range=0.1,
                     zoom_range=0.2,
                     horizontal_flip=True,
                     fill_mode='nearest')

image_datagen = ImageDataGenerator(**data_gen_args)
mask_datagen = ImageDataGenerator(**data_gen_args)

image_datagen.fit(X_train, augment=True, seed=42)
mask_datagen.fit(y_train, augment=True, seed=42)

image_generator = image_datagen.flow(X_train, batch_size=8, seed=42)
mask_generator = mask_datagen.flow(y_train, batch_size=8, seed=42)

train_generator = zip(image_generator, mask_generator)

# Define callbacks
checkpoint = ModelCheckpoint('best_model.h5', monitor='val_loss', mode='min', save_best_only=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-6)
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)



In [None]:

# Train the model
history = model.fit(train_generator,
                    steps_per_epoch=len(X_train) // 8,
                    epochs=50,
                    validation_data=(X_val, y_val),
                    callbacks=[checkpoint, reduce_lr, early_stopping])

In [None]:
import matplotlib.pyplot as plt
from tensorflow.keras.models import load_model

# Load the best saved model
best_model = load_model('best_model.h5')

# Make predictions on the validation set
y_pred = best_model.predict(X_val)

# Threshold predictions
y_pred_thresholded = (y_pred > 0.5).astype(np.uint8)

# Function to display images and masks
def display_images(images, masks, predictions, num_images=5):
    fig, axs = plt.subplots(num_images, 3, figsize=(12, num_images * 4))
    
    for i in range(num_images):
        axs[i, 0].imshow(images[i])
        axs[i, 0].set_title('Original Image')
        axs[i, 1].imshow(masks[i, :, :, 0], cmap='gray')
        axs[i, 1].set_title('Ground Truth Mask')
        axs[i, 2].imshow(predictions[i, :, :, 0], cmap='gray')
        axs[i, 2].set_title('Predicted Mask')
        
        for j in range(3):
            axs[i, j].axis('off')
    
    plt.tight_layout()
    plt.show()

# Display images, ground truth masks, and predicted masks
display_images(X_val, y_val, y_pred_thresholded)


In [None]:
from sklearn.metrics import jaccard_score, f1_score, precision_score, recall_score
import itertools
from scipy import ndimage

# Flatten the ground truth masks and predicted masks for metric calculation
y_val_flat = y_val.flatten()
y_pred_thresholded_flat = y_pred_thresholded.flatten()

# Calculate evaluation metrics
iou = jaccard_score(y_val_flat, y_pred_thresholded_flat)
dice = f1_score(y_val_flat, y_pred_thresholded_flat)
precision = precision_score(y_val_flat, y_pred_thresholded_flat)
recall = recall_score(y_val_flat, y_pred_thresholded_flat)

print(f"IoU: {iou:.4f}")
print(f"Dice Coefficient: {dice:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

# Post-processing: Morphological opening
kernel = np.ones((5, 5), np.uint8)
y_pred_postprocessed = [ndimage.binary_opening(pred[0], structure=kernel).astype(np.uint8) for pred in y_pred_thresholded]

# Visualize the post-processed results
display_images(X_val, y_val, y_pred_postprocessed, num_images=5)
