In [1]:
# Importing the necessary libraries
import tensorflow as tf
import albumentations as albu
import numpy as np
import gc
import pickle
import matplotlib.pyplot as plt
from keras.callbacks import CSVLogger
from datetime import datetime
from sklearn.model_selection import train_test_split
from sklearn.metrics import jaccard_score, precision_score, recall_score, accuracy_score, f1_score
# from ModelArchitecture.DiceLoss import dice_metric_loss
# from ModelArchitecture import DUCK_Net
# from ImageLoader import ImageLoader2D

In [2]:
# Checking the number of GPUs available

print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

Num GPUs Available:  0


In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# Kvasir-seg dataset
images_folder_path = "/content/drive/MyDrive/CV_PROJECT/Kvasir-SEG_small/train/images"
masks_folder_path = "/content/drive/MyDrive/CV_PROJECT/Kvasir-SEG_small/train/masks"
dataset_type = 'kvasir'
model_path = "/content/drive/MyDrive/CV_PROJECT/Pre trained Models/DuckNet34 Kvasir Tf Model"

#CVC-ClinicDB
# images_folder_path = "/content/drive/MyDrive/CV_PROJECT/CVC-ClinicDB/test/images"
# masks_folder_path = "/content/drive/MyDrive/CV_PROJECT/CVC-ClinicDB/test/masks"
# dataset_type = 'cvc-clinicdb'
# model_path = "/content/drive/MyDrive/CV_PROJECT/Pre trained Models/DuckNet34 Cvc-ClinicDb Tf Model"

#CVC-ColonDB
# images_folder_path = "/content/drive/MyDrive/CV_PROJECT/CVC-ColonDB/test/images"
# masks_folder_path = "/content/drive/MyDrive/CV_PROJECT/CVC-ColonDB/test/masks"
# dataset_type = 'cvc-colondb'
# model_path = "/content/drive/MyDrive/CV_PROJECT/Pre trained Models/DuckNet34 Cvc-ColonDb Tf Model"

# ETIS-LaribPolypDB
# images_folder_path = "/content/drive/MyDrive/CV_PROJECT/ETIS-LaribPolypDB/test/images"
# masks_folder_path = "/content/drive/MyDrive/CV_PROJECT/ETIS-LaribPolypDB/test/masks"
# dataset_type = 'etis-laribpolypdb'
# model_path = "/content/drive/MyDrive/CV_PROJECT/Pre trained Models/DuckNet34 Etis-laribpolypdb Tf Model"


In [5]:
# Dice loss

import keras.backend as K
import tensorflow as tf


def dice_metric_loss(ground_truth, predictions, smooth=1e-6):
    ground_truth = K.cast(ground_truth, tf.float32)
    predictions = K.cast(predictions, tf.float32)
    ground_truth = K.flatten(ground_truth)
    predictions = K.flatten(predictions)
    intersection = K.sum(predictions * ground_truth)
    union = K.sum(predictions) + K.sum(ground_truth)

    dice = (2. * intersection + smooth) / (union + smooth)

    return 1 - dice


In [6]:
# ConvBlock2D

from keras.layers import BatchNormalization, add
from keras.layers import Conv2D

kernel_initializer = "he_uniform"


def conv_block_2D(
    x, filters, block_type, repeat=1, dilation_rate=1, size=3, padding="same"
):
    result = x

    for i in range(0, repeat):

        if block_type == "separated":
            result = separated_conv2D_block(result, filters, size=size, padding=padding)
        elif block_type == "duckv2":
            result = duckv2_conv2D_block(result, filters, size=size)
        elif block_type == "midscope":
            result = midscope_conv2D_block(result, filters)
        elif block_type == "widescope":
            result = widescope_conv2D_block(result, filters)
        elif block_type == "resnet":
            result = resnet_conv2D_block(result, filters, dilation_rate)
        elif block_type == "conv":
            result = Conv2D(
                filters,
                (size, size),
                activation="relu",
                kernel_initializer=kernel_initializer,
                padding=padding,
            )(result)
        elif block_type == "double_convolution":
            result = double_convolution_with_batch_normalization(
                result, filters, dilation_rate
            )

        else:
            return None

    return result


def duckv2_conv2D_block(x, filters, size):
    x = BatchNormalization(axis=-1)(x)
    x1 = widescope_conv2D_block(x, filters)

    x2 = midscope_conv2D_block(x, filters)

    x3 = conv_block_2D(x, filters, "resnet", repeat=1)

    x4 = conv_block_2D(x, filters, "resnet", repeat=2)

    x5 = conv_block_2D(x, filters, "resnet", repeat=3)

    x6 = separated_conv2D_block(x, filters, size=6, padding="same")

    x = add([x1, x2, x3, x4, x5, x6])

    x = BatchNormalization(axis=-1)(x)

    return x


def separated_conv2D_block(x, filters, size=3, padding="same"):
    x = Conv2D(
        filters,
        (1, size),
        activation="relu",
        kernel_initializer=kernel_initializer,
        padding=padding,
    )(x)

    x = BatchNormalization(axis=-1)(x)

    x = Conv2D(
        filters,
        (size, 1),
        activation="relu",
        kernel_initializer=kernel_initializer,
        padding=padding,
    )(x)

    x = BatchNormalization(axis=-1)(x)

    return x


def midscope_conv2D_block(x, filters):
    x = Conv2D(
        filters,
        (3, 3),
        activation="relu",
        kernel_initializer=kernel_initializer,
        padding="same",
        dilation_rate=1,
    )(x)

    x = BatchNormalization(axis=-1)(x)

    x = Conv2D(
        filters,
        (3, 3),
        activation="relu",
        kernel_initializer=kernel_initializer,
        padding="same",
        dilation_rate=2,
    )(x)

    x = BatchNormalization(axis=-1)(x)

    return x


def widescope_conv2D_block(x, filters):
    x = Conv2D(
        filters,
        (3, 3),
        activation="relu",
        kernel_initializer=kernel_initializer,
        padding="same",
        dilation_rate=1,
    )(x)

    x = BatchNormalization(axis=-1)(x)

    x = Conv2D(
        filters,
        (3, 3),
        activation="relu",
        kernel_initializer=kernel_initializer,
        padding="same",
        dilation_rate=2,
    )(x)

    x = BatchNormalization(axis=-1)(x)

    x = Conv2D(
        filters,
        (3, 3),
        activation="relu",
        kernel_initializer=kernel_initializer,
        padding="same",
        dilation_rate=3,
    )(x)

    x = BatchNormalization(axis=-1)(x)

    return x


def resnet_conv2D_block(x, filters, dilation_rate=1):
    x1 = Conv2D(
        filters,
        (1, 1),
        activation="relu",
        kernel_initializer=kernel_initializer,
        padding="same",
        dilation_rate=dilation_rate,
    )(x)

    x = Conv2D(
        filters,
        (3, 3),
        activation="relu",
        kernel_initializer=kernel_initializer,
        padding="same",
        dilation_rate=dilation_rate,
    )(x)
    x = BatchNormalization(axis=-1)(x)
    x = Conv2D(
        filters,
        (3, 3),
        activation="relu",
        kernel_initializer=kernel_initializer,
        padding="same",
        dilation_rate=dilation_rate,
    )(x)
    x = BatchNormalization(axis=-1)(x)
    x_final = add([x, x1])

    x_final = BatchNormalization(axis=-1)(x_final)

    return x_final


def double_convolution_with_batch_normalization(x, filters, dilation_rate=1):
    x = Conv2D(
        filters,
        (3, 3),
        activation="relu",
        kernel_initializer=kernel_initializer,
        padding="same",
        dilation_rate=dilation_rate,
    )(x)
    x = BatchNormalization(axis=-1)(x)
    x = Conv2D(
        filters,
        (3, 3),
        activation="relu",
        kernel_initializer=kernel_initializer,
        padding="same",
        dilation_rate=dilation_rate,
    )(x)
    x = BatchNormalization(axis=-1)(x)

    return x


In [8]:
# DuckNet
import tensorflow as tf
from keras.layers import Conv2D, UpSampling2D
from keras.layers import add
from keras.models import Model

# from CustomLayers.ConvBlock2D import conv_block_2D

kernel_initializer = 'he_uniform'
interpolation = "nearest"


def create_model(img_height, img_width, input_chanels, out_classes, starting_filters):
    input_layer = tf.keras.layers.Input((img_height, img_width, input_chanels))

    print('Starting DUCK-Net')

    p1 = Conv2D(starting_filters * 2, 2, strides=2, padding='same')(input_layer)
    p2 = Conv2D(starting_filters * 4, 2, strides=2, padding='same')(p1)
    p3 = Conv2D(starting_filters * 8, 2, strides=2, padding='same')(p2)
    p4 = Conv2D(starting_filters * 16, 2, strides=2, padding='same')(p3)
    p5 = Conv2D(starting_filters * 32, 2, strides=2, padding='same')(p4)

    t0 = conv_block_2D(input_layer, starting_filters, 'duckv2', repeat=1)

    l1i = Conv2D(starting_filters * 2, 2, strides=2, padding='same')(t0)
    s1 = add([l1i, p1])
    t1 = conv_block_2D(s1, starting_filters * 2, 'duckv2', repeat=1)

    l2i = Conv2D(starting_filters * 4, 2, strides=2, padding='same')(t1)
    s2 = add([l2i, p2])
    t2 = conv_block_2D(s2, starting_filters * 4, 'duckv2', repeat=1)

    l3i = Conv2D(starting_filters * 8, 2, strides=2, padding='same')(t2)
    s3 = add([l3i, p3])
    t3 = conv_block_2D(s3, starting_filters * 8, 'duckv2', repeat=1)

    l4i = Conv2D(starting_filters * 16, 2, strides=2, padding='same')(t3)
    s4 = add([l4i, p4])
    t4 = conv_block_2D(s4, starting_filters * 16, 'duckv2', repeat=1)

    l5i = Conv2D(starting_filters * 32, 2, strides=2, padding='same')(t4)
    s5 = add([l5i, p5])
    t51 = conv_block_2D(s5, starting_filters * 32, 'resnet', repeat=2)
    t53 = conv_block_2D(t51, starting_filters * 16, 'resnet', repeat=2)

    l5o = UpSampling2D((2, 2), interpolation=interpolation)(t53)
    c4 = add([l5o, t4])
    q4 = conv_block_2D(c4, starting_filters * 8, 'duckv2', repeat=1)

    l4o = UpSampling2D((2, 2), interpolation=interpolation)(q4)
    c3 = add([l4o, t3])
    q3 = conv_block_2D(c3, starting_filters * 4, 'duckv2', repeat=1)

    l3o = UpSampling2D((2, 2), interpolation=interpolation)(q3)
    c2 = add([l3o, t2])
    q6 = conv_block_2D(c2, starting_filters * 2, 'duckv2', repeat=1)

    l2o = UpSampling2D((2, 2), interpolation=interpolation)(q6)
    c1 = add([l2o, t1])
    q1 = conv_block_2D(c1, starting_filters, 'duckv2', repeat=1)

    l1o = UpSampling2D((2, 2), interpolation=interpolation)(q1)
    c0 = add([l1o, t0])
    z1 = conv_block_2D(c0, starting_filters, 'duckv2', repeat=1)

    output = Conv2D(out_classes, (1, 1), activation='sigmoid')(z1)

    model = Model(inputs=input_layer, outputs=output)

    return model


In [9]:
import glob
import numpy as np
from PIL import Image
from skimage.io import imread
from tqdm import tqdm
from pathlib import Path

img_size = 352
filters = 34 # Number of filters, the paper presents the results with 17 and 34

model_type = "DuckNet"

def load_data(img_height, img_width, images_to_be_loaded, dataset):
    if dataset == "kvasir" or dataset == "etis-laribpolypdb" or dataset == "cvc-clinicdb" or dataset == "cvc-colondb":
        extension = "*.jpg"
    else:
        raise ValueError("Unsupported dataset type.")

    images_path = Path(images_folder_path)
    masks_path = Path(masks_folder_path)

    train_image_paths = list(images_path.glob(extension))

    if images_to_be_loaded == -1 or images_to_be_loaded > len(train_image_paths):
        images_to_be_loaded = len(train_image_paths)

    X_train = np.zeros((images_to_be_loaded, img_height, img_width, 3), dtype=np.float32)
    Y_train = np.zeros((images_to_be_loaded, img_height, img_width), dtype=np.uint8)

    print(f"Resizing validation images and masks: {images_to_be_loaded}")
    for n, image_path in tqdm(enumerate(train_image_paths), total=images_to_be_loaded):
        if n >= images_to_be_loaded:
            break

        mask_path = masks_path / image_path.name

        image = Image.open(image_path).resize((img_width, img_height))
        image = np.array(image) / 255.0

        mask = Image.open(mask_path).resize((img_width, img_height))
        mask = np.array(mask)
        mask = np.where(mask >= 127, 1, 0).astype(np.uint8)

        X_train[n] = image
        Y_train[n] = mask

    Y_train = np.expand_dims(Y_train, axis=-1)

    return X_train, Y_train

# Example usage
img_height, img_width = 352, 352
images_to_be_loaded = -1 # Load all images
# dataset_type = 'kvasir'  # Options: 'kvasir', 'cvc-clinicdb', 'cvc-colondb', 'etis-laribpolypdb'

X, Y = load_data(img_height, img_width, images_to_be_loaded, dataset_type)
print(f"Loaded {len(X)} images and masks.")


Resizing validation images and masks: 200


100%|██████████| 200/200 [01:09<00:00,  2.86it/s]

Loaded 200 images and masks.





In [10]:
from sklearn.model_selection import train_test_split
import tensorflow as tf
from keras.callbacks import ModelCheckpoint
import os

# Split data into training and validation sets
X_train, X_val, Y_train, Y_val = train_test_split(X, Y, test_size=0.2, random_state=42)

# Initialize the model
img_height, img_width = 352, 352
input_channels = 3  # Assuming RGB images
out_classes = 1    # Binary classification for masks
starting_filters = 34
model = create_model(img_height, img_width, input_channels, out_classes, starting_filters)

# Define the path where the model will be saved and specify the filename format
model_directory = "/content/drive/MyDrive/CV_PROJECT/Training_models"
model_filename = "best.h5"
model_path = os.path.join(model_directory, model_filename)

# Ensure the directory exists
os.makedirs(model_directory, exist_ok=True)

# Define the model checkpoint
checkpoint = ModelCheckpoint(
    model_path,
    monitor='val_loss',
    verbose=1,
    save_best_only=True,
    mode='min'
)

# Compile the model
optimizer = tf.keras.optimizers.Adam()
model.compile(optimizer=optimizer, loss=dice_metric_loss, metrics=[dice_metric_loss])

# Train the model
epochs = 100
batch_size = 1
history = model.fit(
    X_train, Y_train,
    validation_data=(X_val, Y_val),
    epochs=epochs,
    batch_size=batch_size,
    callbacks=[checkpoint],
    verbose=1  # Set verbose to 1 for default Keras progress bar
)

print("Training complete. Best model saved at:", model_path)


Starting DUCK-Net


KeyboardInterrupt: 

In [None]:
import tensorflow as tf
from keras.metrics import MeanIoU
import numpy as np

# Load the best model
best_model_path = "/content/drive/MyDrive/CV_PROJECT/Training_models/best.h5"  # Update this path to your best model path
model = tf.keras.models.load_model(best_model_path, custom_objects={'dice_metric_loss': dice_metric_loss})

# Evaluate the model on the validation set
scores = model.evaluate(X_val, Y_val, verbose=1)
print(f"Loss on validation set: {scores[0]}, Dice Metric on validation set: {scores[1]}")

# Optionally: Predict on the validation set
predictions = model.predict(X_val)

# Calculate Mean IoU for a more detailed evaluation
num_classes = 2  # Since it's a binary classification, we have two classes (foreground and background)
IoU_calculator = MeanIoU(num_classes=num_classes)
IoU_calculator.update_state(np.round(predictions).astype(int), Y_val.astype(int))
mean_IoU = IoU_calculator.result().numpy()
print("Mean IoU on validation set:", mean_IoU)

# Visualize the results for a specific example
import matplotlib.pyplot as plt

# Select a random example
random_index = np.random.randint(0, len(X_val))

plt.figure(figsize=(12, 6))
plt.subplot(1, 3, 1)
plt.title('Input Image')
plt.imshow(X_val[random_index])
plt.subplot(1, 3, 2)
plt.title('True Mask')
plt.imshow(Y_val[random_index].squeeze(), cmap='gray')
plt.subplot(1, 3, 3)
plt.title('Predicted Mask')
plt.imshow(predictions[random_index].squeeze(), cmap='gray')
plt.show()


