In [2]:
from keras.layers import BatchNormalization, add
from keras.layers import Conv2D

kernel_initializer = 'he_uniform'


def conv_block_2D(x, filters, block_type, repeat=1, dilation_rate=1, size=3, padding='same'):
    result = x

    for i in range(0, repeat):

        if block_type == 'separated':
            result = separated_conv2D_block(result, filters, size=size, padding=padding)
        elif block_type == 'duckv2':
            result = duckv2_conv2D_block(result, filters, size=size)
        elif block_type == 'midscope':
            result = midscope_conv2D_block(result, filters)
        elif block_type == 'widescope':
            result = widescope_conv2D_block(result, filters)
        elif block_type == 'resnet':
            result = resnet_conv2D_block(result, filters, dilation_rate)
        elif block_type == 'conv':
            result = Conv2D(filters, (size, size),
                            activation='relu', kernel_initializer=kernel_initializer, padding=padding)(result)
        elif block_type == 'double_convolution':
            result = double_convolution_with_batch_normalization(result, filters, dilation_rate)

        else:
            return None

    return result


def duckv2_conv2D_block(x, filters, size):
    x = BatchNormalization(axis=-1)(x)
    x1 = widescope_conv2D_block(x, filters)

    x2 = midscope_conv2D_block(x, filters)

    x3 = conv_block_2D(x, filters, 'resnet', repeat=1)

    x4 = conv_block_2D(x, filters, 'resnet', repeat=2)

    x5 = conv_block_2D(x, filters, 'resnet', repeat=3)

    x6 = separated_conv2D_block(x, filters, size=6, padding='same')

    x = add([x1, x2, x3, x4, x5, x6])

    x = BatchNormalization(axis=-1)(x)

    return x


def separated_conv2D_block(x, filters, size=3, padding='same'):
    x = Conv2D(filters, (1, size), activation='relu', kernel_initializer=kernel_initializer, padding=padding)(x)

    x = BatchNormalization(axis=-1)(x)

    x = Conv2D(filters, (size, 1), activation='relu', kernel_initializer=kernel_initializer, padding=padding)(x)

    x = BatchNormalization(axis=-1)(x)

    return x


def midscope_conv2D_block(x, filters):
    x = Conv2D(filters, (3, 3), activation='relu', kernel_initializer=kernel_initializer, padding='same',
               dilation_rate=1)(x)

    x = BatchNormalization(axis=-1)(x)

    x = Conv2D(filters, (3, 3), activation='relu', kernel_initializer=kernel_initializer, padding='same',
               dilation_rate=2)(x)

    x = BatchNormalization(axis=-1)(x)

    return x


def widescope_conv2D_block(x, filters):
    x = Conv2D(filters, (3, 3), activation='relu', kernel_initializer=kernel_initializer, padding='same',
               dilation_rate=1)(x)

    x = BatchNormalization(axis=-1)(x)

    x = Conv2D(filters, (3, 3), activation='relu', kernel_initializer=kernel_initializer, padding='same',
               dilation_rate=2)(x)

    x = BatchNormalization(axis=-1)(x)

    x = Conv2D(filters, (3, 3), activation='relu', kernel_initializer=kernel_initializer, padding='same',
               dilation_rate=3)(x)

    x = BatchNormalization(axis=-1)(x)

    return x


def resnet_conv2D_block(x, filters, dilation_rate=1):
    x1 = Conv2D(filters, (1, 1), activation='relu', kernel_initializer=kernel_initializer, padding='same',
                dilation_rate=dilation_rate)(x)

    x = Conv2D(filters, (3, 3), activation='relu', kernel_initializer=kernel_initializer, padding='same',
               dilation_rate=dilation_rate)(x)
    x = BatchNormalization(axis=-1)(x)
    x = Conv2D(filters, (3, 3), activation='relu', kernel_initializer=kernel_initializer, padding='same',
               dilation_rate=dilation_rate)(x)
    x = BatchNormalization(axis=-1)(x)
    x_final = add([x, x1])

    x_final = BatchNormalization(axis=-1)(x_final)

    return x_final


def double_convolution_with_batch_normalization(x, filters, dilation_rate=1):
    x = Conv2D(filters, (3, 3), activation='relu', kernel_initializer=kernel_initializer, padding='same',
               dilation_rate=dilation_rate)(x)
    x = BatchNormalization(axis=-1)(x)
    x = Conv2D(filters, (3, 3), activation='relu', kernel_initializer=kernel_initializer, padding='same',
               dilation_rate=dilation_rate)(x)
    x = BatchNormalization(axis=-1)(x)

    return x



In [13]:
from tensorflow.keras import layers
from tensorflow.keras import backend as K

#gating signal for attention unit
def gatingsignal(input, out_size, batchnorm=False):
    x = layers.Conv2D(out_size, (1, 1), padding='same')(input)
    if batchnorm:
        x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    return x

#attention unit/block based on soft attention
def attention_block(x, gating, inter_shape):
    shape_x = K.int_shape(x)
    shape_g = K.int_shape(gating)
    theta_x = layers.Conv2D(inter_shape, (2, 2), strides=(2, 2), kernel_initializer='he_normal', padding='same')(x) 
    shape_theta_x = K.int_shape(theta_x)
    phi_g = layers.Conv2D(inter_shape, (1, 1), kernel_initializer='he_normal', padding='same')(gating)
    upsample_g = layers.Conv2DTranspose(inter_shape, (3, 3), strides=(shape_theta_x[1] // shape_g[1], shape_theta_x[2] // shape_g[2]), kernel_initializer='he_normal', padding='same')(phi_g)
    concat_xg = layers.add([upsample_g, theta_x])
    act_xg = layers.Activation('relu')(concat_xg)
    psi = layers.Conv2D(1, (1, 1), kernel_initializer='he_normal', padding='same')(act_xg)
    sigmoid_xg = layers.Activation('sigmoid')(psi)
    shape_sigmoid = K.int_shape(sigmoid_xg)
    upsample_psi = layers.UpSampling2D(size=(shape_x[1] // shape_sigmoid[1], shape_x[2] // shape_sigmoid[2]))(sigmoid_xg) 
    upsample_psi = layers.Lambda(lambda x, repnum: K.repeat_elements(x, repnum, axis=3), arguments={'repnum': shape_x[3]})(upsample_psi)                          
    y = layers.multiply([upsample_psi, x])
    result = layers.Conv2D(shape_x[3], (1, 1), kernel_initializer='he_normal', padding='same')(y)
    attenblock = layers.BatchNormalization()(result)
    return attenblock

In [19]:
import tensorflow as tf
from keras.layers import add
from keras.models import Model
from keras.layers import Conv2D, UpSampling2D


kernel_initializer = 'he_uniform'
interpolation = "nearest"


def create_model_ver2(img_height, img_width, input_chanels, out_classes, starting_filters):
    input_layer = tf.keras.layers.Input((img_height, img_width, input_chanels))

    print('Starting DUCK-Net')
    t0 = conv_block_2D(input_layer, starting_filters, 'duckv2', repeat=1)

    l1i = Conv2D(starting_filters * 2, 2, strides=2, padding='same')(t0)
    t1 = conv_block_2D(l1i, starting_filters * 2, 'duckv2', repeat=1)

    l2i = Conv2D(starting_filters * 4, 2, strides=2, padding='same')(t1)
    t2 = conv_block_2D(l2i, starting_filters * 4, 'duckv2', repeat=1)

    l3i = Conv2D(starting_filters * 8, 2, strides=2, padding='same')(t2)
    t3 = conv_block_2D(l3i, starting_filters * 8, 'duckv2', repeat=1)

    l4i = Conv2D(starting_filters * 16, 2, strides=2, padding='same')(t3)
    t4 = conv_block_2D(l4i, starting_filters * 16, 'duckv2', repeat=1)

    l5i = Conv2D(starting_filters * 32, 2, strides=2, padding='same')(t4)
    t51 = conv_block_2D(l5i, starting_filters * 32, 'resnet', repeat=2)
    t53 = conv_block_2D(t51, starting_filters * 16, 'resnet', repeat=2)
    
    #--------------------------------------------------------------------#
    
    gating_5 = gatingsignal(t53, starting_filters * 16 , batchnorm=True)
    att_5 = attention_block(t4, gating_5, starting_filters *16 )
    l5o = UpSampling2D((2, 2), interpolation=interpolation)(t53)
    print(l5o.shape)
    print(att_5.shape)
    #c4 = add([l5o, t4])
    c4 = add([l5o, att_5])
    #q4 = conv_block_2D(c4, starting_filters * 8, 'duckv2', repeat=1)
    q4 = conv_block_2D(c4, starting_filters * 8, 'duckv2', repeat=1)

    gating_4 = gatingsignal(q4,starting_filters * 8 , batchnorm=True)
    att_4 = attention_block(t3, gating_4, starting_filters * 8 )
    l4o = UpSampling2D((2, 2), interpolation=interpolation)(q4)
    print(l4o.shape)
    print(att_4.shape)
    #c3 = add([l4o, t3])
    #q3 = conv_block_2D(c3, starting_filters * 4, 'duckv2', repeat=1)
    c3 = add([l4o, att_4])
    q3 = conv_block_2D(c3, starting_filters * 4, 'duckv2', repeat=1)
    
    gating_3 = gatingsignal(q3,starting_filters * 4, batchnorm=True)
    att_3 = attention_block(t2, gating_3, starting_filters * 4)
    l3o = UpSampling2D((2, 2), interpolation=interpolation)(q3)
    #c2 = add([l3o, t2])
    #q6 = conv_block_2D(c2, starting_filters * 2, 'duckv2', repeat=1)
    c2 = add([l3o, att_3])
    q6 = conv_block_2D(c2, starting_filters * 2, 'duckv2', repeat=1)
    
    gating_2 = gatingsignal(q6 , starting_filters * 2, batchnorm=True)
    att_2 = attention_block(t1, gating_2, starting_filters * 2)
    l2o = UpSampling2D((2, 2), interpolation=interpolation)(q6)
    #c1 = add([l2o, t1])
    #q1 = conv_block_2D(c1, starting_filters, 'duckv2', repeat=1)
    c1 = add([l2o, att_2])
    q1 = conv_block_2D(c1, starting_filters , 'duckv2', repeat=1)
    
    
    gating_1 = gatingsignal(q1 , starting_filters, batchnorm=True)
    att_1 = attention_block(t0, gating_1, starting_filters)
    l1o = UpSampling2D((2, 2), interpolation=interpolation)(q1)
    #c0 = add([l1o, t0])
    #z1 = conv_block_2D(c0, starting_filters, 'duckv2', repeat=1)
    c0 = add([l1o, att_1])
    z1 = conv_block_2D(c0, starting_filters, 'duckv2', repeat=1)

    output = Conv2D(out_classes, (1, 1), activation='sigmoid')(z1)

    model = Model(inputs=input_layer, outputs=output)

    return model

In [39]:
model = create_model_ver2(img_height=256, img_width=256, input_chanels=3, out_classes=1, starting_filters=34)

Starting DUCK-Net
(None, 16, 16, 544)
(None, 16, 16, 544)
(None, 32, 32, 272)
(None, 32, 32, 272)


In [21]:
import keras.backend as K
import tensorflow as tf


def dice_metric_loss(ground_truth, predictions, smooth=1e-6):
    ground_truth = K.cast(ground_truth, tf.float32)
    predictions = K.cast(predictions, tf.float32)
    ground_truth = K.flatten(ground_truth)
    predictions = K.flatten(predictions)
    intersection = K.sum(predictions * ground_truth)
    union = K.sum(predictions) + K.sum(ground_truth)

    dice = (2. * intersection + smooth) / (union + smooth)

    return 1 - dice

In [None]:
model.summary()

In [31]:
import glob

import numpy as np
from PIL import Image
from skimage.io import imread
from tqdm import tqdm

folder_path = "/kaggle/input/cvc-clinic-png/"  # Add the path to your data directory


def load_data(img_height, img_width, images_to_be_loaded, dataset):
    IMAGES_PATH = folder_path + 'Original/'
    MASKS_PATH = folder_path + 'Ground Truth/'

    if dataset == 'kvasir':
        train_ids = glob.glob(IMAGES_PATH + "*.jpg")

    if dataset == 'cvc-clinicdb':
        train_ids = glob.glob(IMAGES_PATH + "*.png")
        #train_ids = train_ids[:30]

    if dataset == 'cvc-colondb' or dataset == 'etis-laribpolypdb':
        train_ids = glob.glob(IMAGES_PATH + "*.png")

    if images_to_be_loaded == -1:
        images_to_be_loaded = len(train_ids)
        print(images_to_be_loaded)

    X_train = np.zeros((images_to_be_loaded, img_height, img_width, 3), dtype=np.float32)
    Y_train = np.zeros((images_to_be_loaded, img_height, img_width), dtype=np.uint8)

    print('Resizing training images and masks: ' + str(images_to_be_loaded))
    for n, id_ in tqdm(enumerate(train_ids)):
        if n == images_to_be_loaded:
            break

        image_path = id_
        mask_path = image_path.replace("images", "masks")

        image = imread(image_path)
        mask_ = imread(mask_path)

        mask = np.zeros((img_height, img_width), dtype=np.bool_)

        pillow_image = Image.fromarray(image)

        pillow_image = pillow_image.resize((img_height, img_width))
        image = np.array(pillow_image)

        X_train[n] = image / 255

        pillow_mask = Image.fromarray(mask_)
        pillow_mask = pillow_mask.resize((img_height, img_width), resample=Image.LANCZOS)
        mask_ = np.array(pillow_mask)

        for i in range(img_height):
            for j in range(img_width):
                if (mask_[i, j] >= 127).all():
                    mask[i, j] = 1

        Y_train[n] = mask

    Y_train = np.expand_dims(Y_train, axis=-1)

    return X_train, Y_train

In [32]:
X, Y = load_data(256, 256, -1, 'cvc-clinicdb')

612
Resizing training images and masks: 612


612it [04:23,  2.32it/s]


In [33]:
from sklearn.model_selection import train_test_split
x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.1, shuffle= True, random_state = 58800)
x_train, x_valid, y_train, y_valid = train_test_split(x_train, y_train, test_size=0.111, shuffle= True, random_state = 58800)
len(x_valid)

62

In [34]:
# Defining the augmentations
import albumentations as albu
aug_train = albu.Compose([
    albu.HorizontalFlip(),
    albu.VerticalFlip(),
    albu.ColorJitter(brightness=(0.6,1.6), contrast=0.2, saturation=0.1, hue=0.01, always_apply=True),
    albu.Affine(scale=(0.5,1.5), translate_percent=(-0.125,0.125), rotate=(-180,180), shear=(-22.5,22), always_apply=True),
])

def augment_images():
    x_train_out = []
    y_train_out = []

    for i in range (len(x_train)):
        ug = aug_train(image=x_train[i], mask=y_train[i])
        x_train_out.append(ug['image'])  
        y_train_out.append(ug['mask'])

    return np.array(x_train_out), np.array(y_train_out)

In [35]:
image_augmented, mask_augmented = augment_images()

In [36]:
len(image_augmented)

488

In [42]:
learning_rate = 1e-4
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
model.compile(optimizer=optimizer, loss=dice_metric_loss)

In [43]:
import gc
EPOCHS = 100
min_loss_for_saving = 0.2
step = 0

for epoch in range(0, EPOCHS):
    
    print(f'Training, epoch {epoch}')
    print('Learning Rate: ' + str(learning_rate))

    step += 1
    
    model.fit(x=image_augmented, y=mask_augmented, epochs=1, batch_size=4, validation_data=(x_valid, y_valid), verbose=1)
    
    prediction_valid = model.predict(x_valid, verbose=0)
    loss_valid = dice_metric_loss(y_valid, prediction_valid)
    
    loss_valid = loss_valid.numpy()
    print("Loss Validation: " + str(loss_valid))
        
    prediction_test = model.predict(x_test, verbose=0)
    loss_test = dice_metric_loss(y_test, prediction_test)
    loss_test = loss_test.numpy()
    print("Loss Test: " + str(loss_test))
     
    if min_loss_for_saving > loss_valid:
        min_loss_for_saving = loss_valid
        print("Saved model with val_loss: ", loss_valid)
        model.save("/kaggle/working/duck-net-ver2.h5")
        
    #del image_augmented
    #del mask_augmented

    gc.collect()

Training, epoch 0
Learning Rate: 0.0001
Loss Validation: 0.7608582
Loss Test: 0.6937314
Training, epoch 1
Learning Rate: 0.0001
Loss Validation: 0.55297995
Loss Test: 0.469436
Training, epoch 2
Learning Rate: 0.0001
Loss Validation: 0.5673422
Loss Test: 0.48513758
Training, epoch 3
Learning Rate: 0.0001
Loss Validation: 0.54304546
Loss Test: 0.43311596
Training, epoch 4
Learning Rate: 0.0001
Loss Validation: 0.4967209
Loss Test: 0.39099443
Training, epoch 5
Learning Rate: 0.0001
Loss Validation: 0.45684242
Loss Test: 0.37426567
Training, epoch 6
Learning Rate: 0.0001
Loss Validation: 0.45028633
Loss Test: 0.32604247
Training, epoch 7
Learning Rate: 0.0001
Loss Validation: 0.4294352
Loss Test: 0.33497876
Training, epoch 8
Learning Rate: 0.0001
Loss Validation: 0.48786604
Loss Test: 0.33823973
Training, epoch 9
Learning Rate: 0.0001
Loss Validation: 0.30775338
Loss Test: 0.22642732
Training, epoch 10
Learning Rate: 0.0001
Loss Validation: 0.28933758
Loss Test: 0.217682
Training, epoch 11

  saving_api.save_model(


Training, epoch 15
Learning Rate: 0.0001
Loss Validation: 0.16859704
Loss Test: 0.12307048
Training, epoch 16
Learning Rate: 0.0001
Loss Validation: 0.1605863
Loss Test: 0.11666906
Saved model with val_loss:  0.1605863
Training, epoch 17
Learning Rate: 0.0001
Loss Validation: 0.22244728
Loss Test: 0.19429582
Training, epoch 18
Learning Rate: 0.0001
Loss Validation: 0.1226024
Loss Test: 0.098169506
Saved model with val_loss:  0.1226024
Training, epoch 19
Learning Rate: 0.0001
Loss Validation: 0.15461534
Loss Test: 0.13084787
Training, epoch 20
Learning Rate: 0.0001
Loss Validation: 0.17753142
Loss Test: 0.14804322
Training, epoch 21
Learning Rate: 0.0001
Loss Validation: 0.25323296
Loss Test: 0.21773404
Training, epoch 22
Learning Rate: 0.0001
Loss Validation: 0.16262221
Loss Test: 0.1157698
Training, epoch 23
Learning Rate: 0.0001
Loss Validation: 0.21232802
Loss Test: 0.16744095
Training, epoch 24
Learning Rate: 0.0001
Loss Validation: 0.18484658
Loss Test: 0.14594692
Training, epoch 

In [44]:
from sklearn.metrics import jaccard_score, precision_score, recall_score, accuracy_score, f1_score
print("Loading the model")

model = tf.keras.models.load_model('/kaggle/working/duck-net-ver2.h5', custom_objects={'dice_metric_loss':dice_metric_loss})

prediction_train = model.predict(x_train, batch_size=4)
prediction_valid = model.predict(x_valid, batch_size=4)
prediction_test = model.predict(x_test, batch_size=4)

print("Predictions done")

dice_train = f1_score(np.ndarray.flatten(np.array(y_train, dtype=bool)),
                           np.ndarray.flatten(prediction_train > 0.5))
dice_test = f1_score(np.ndarray.flatten(np.array(y_test, dtype=bool)),
                          np.ndarray.flatten(prediction_test > 0.5))
dice_valid = f1_score(np.ndarray.flatten(np.array(y_valid, dtype=bool)),
                           np.ndarray.flatten(prediction_valid > 0.5))

print("Dice finished")


miou_train = jaccard_score(np.ndarray.flatten(np.array(y_train, dtype=bool)),
                           np.ndarray.flatten(prediction_train > 0.5))
miou_test = jaccard_score(np.ndarray.flatten(np.array(y_test, dtype=bool)),
                          np.ndarray.flatten(prediction_test > 0.5))
miou_valid = jaccard_score(np.ndarray.flatten(np.array(y_valid, dtype=bool)),
                           np.ndarray.flatten(prediction_valid > 0.5))

print("Miou finished")


precision_train = precision_score(np.ndarray.flatten(np.array(y_train, dtype=bool)),
                                  np.ndarray.flatten(prediction_train > 0.5))
precision_test = precision_score(np.ndarray.flatten(np.array(y_test, dtype=bool)),
                                 np.ndarray.flatten(prediction_test > 0.5))
precision_valid = precision_score(np.ndarray.flatten(np.array(y_valid, dtype=bool)),
                                  np.ndarray.flatten(prediction_valid > 0.5))

print("Precision finished")


recall_train = recall_score(np.ndarray.flatten(np.array(y_train, dtype=bool)),
                            np.ndarray.flatten(prediction_train > 0.5))
recall_test = recall_score(np.ndarray.flatten(np.array(y_test, dtype=bool)),
                           np.ndarray.flatten(prediction_test > 0.5))
recall_valid = recall_score(np.ndarray.flatten(np.array(y_valid, dtype=bool)),
                            np.ndarray.flatten(prediction_valid > 0.5))

print("Recall finished")


accuracy_train = accuracy_score(np.ndarray.flatten(np.array(y_train, dtype=bool)),
                                np.ndarray.flatten(prediction_train > 0.5))
accuracy_test = accuracy_score(np.ndarray.flatten(np.array(y_test, dtype=bool)),
                               np.ndarray.flatten(prediction_test > 0.5))
accuracy_valid = accuracy_score(np.ndarray.flatten(np.array(y_valid, dtype=bool)),
                                np.ndarray.flatten(prediction_valid > 0.5))


print("Accuracy finished")


final_file = '/kaggle/working/'+'results_'+'Attention_DUCK-Net'+ '.txt'
print(final_file)

with open(final_file, 'a') as f:
    f.write('clinic-db' + '\n\n')
    f.write('dice_train: ' + str(dice_train) + ' dice_valid: ' + str(dice_valid) + ' dice_test: ' + str(dice_test) + '\n\n')
    f.write('miou_train: ' + str(miou_train) + ' miou_valid: ' + str(miou_valid) + ' miou_test: ' + str(miou_test) + '\n\n')
    f.write('precision_train: ' + str(precision_train) + ' precision_valid: ' + str(precision_valid) + ' precision_test: ' + str(precision_test) + '\n\n')
    f.write('recall_train: ' + str(recall_train) + ' recall_valid: ' + str(recall_valid) + ' recall_test: ' + str(recall_test) + '\n\n')
    f.write('accuracy_train: ' + str(accuracy_train) + ' accuracy_valid: ' + str(accuracy_valid) + ' accuracy_test: ' + str(accuracy_test) + '\n\n\n\n')

print('File done')

Loading the model
Predictions done
Dice finished
Miou finished
Precision finished
Recall finished
Accuracy finished
/kaggle/working/results_Attention_DUCK-Net.txt
File done
