In [None]:
!pip install patchify

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import zipfile
with zipfile.ZipFile("EOphtha_10k_Dataset.zip","r") as zip_ref:
    zip_ref.extractall("DataSet")

In [None]:
import os
import cv2
import random
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

from glob import glob
from tqdm import tqdm
from patchify import patchify
from natsort import natsorted
from patchify import unpatchify
from tensorflow.keras.models import load_model
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [None]:
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Conv2DTranspose, Concatenate, Input, Dropout, Lambda, Add, Multiply, Layer

In [None]:
# some hyper parameters
epsilon = 0.10
delta = 0.85
lamda1 = 0.9
lamda2 = 0.6
gamma = 0.3
initial_LR = 1e-3
num_epochs = 150
batch_size = 64

patch_width = 48
patch_height = 48
patch_channels = 1

In [None]:
# this function is for computing the 'average background luminance'

def func_bg(inp, numFM):

    def my_filter(shape, dtype=None):
        kernel = np.zeros(shape)
        f = [np.array([[1,1,1,1,1],[1,2,2,2,1],[1,2,0,2,1],[1,2,2,2,1],[1,1,1,1,1]])/32 for i in range(numFM)]
        f = np.stack(f, axis=2)
        kernel[:,:,numFM-1] = f
        return kernel

    out = Conv2D(filters=numFM, kernel_size=5, kernel_initializer=my_filter, strides=1, padding='same', trainable=False)(inp)

    return out

In [None]:
# this function is for computing the 'maximal weighted average of gradients'

def func_Gm(inp, numFM):

    def G1(shape, dtype=None):
        kernel = np.zeros(shape)
        f = [np.array([[0,0,0,0,0],[1,3,8,3,1],[0,0,0,0,0],[-1,-3,-8,-3,-1],[0,0,0,0,0]])/16 for i in range(numFM)]
        f = np.stack(f, axis=2)
        kernel[:,:,numFM-1] = f
        return kernel

    def G2(shape, dtype=None):
        kernel = np.zeros(shape)
        f = [np.array([[0,0,1,0,0],[0,8,3,0,0],[1,3,0,-3,-1],[0,0,-3,-8,0],[0,0,-1,0,0]])/16 for i in range(numFM)]
        f = np.stack(f, axis=2)
        kernel[:,:,numFM-1] = f
        return kernel

    def G3(shape, dtype=None):
        kernel = np.zeros(shape)
        f = [np.array([[0,0,1,0,0],[0,0,3,8,0],[-1,-3,0,3,1],[0,-8,-3,0,0],[0,0,-1,0,0]])/16 for i in range(numFM)]
        f = np.stack(f, axis=2)
        kernel[:,:,numFM-1] = f
        return kernel

    def G4(shape, dtype=None):
        kernel = np.zeros(shape)
        f = [np.array([[0,1,0,-1,0],[0,3,0,-3,0],[0,8,0,-8,0],[0,3,0,-3,0],[0,1,0,-1,0]])/16 for i in range(numFM)]
        f = np.stack(f, axis=2)
        kernel[:,:,numFM-1] = f
        return kernel

    grad1 = Conv2D(filters=numFM, kernel_size=5, kernel_initializer=G1, strides=1, padding='same', trainable=False)(inp)
    grad2 = Conv2D(filters=numFM, kernel_size=5, kernel_initializer=G2, strides=1, padding='same', trainable=False)(inp)
    grad3 = Conv2D(filters=numFM, kernel_size=5, kernel_initializer=G3, strides=1, padding='same', trainable=False)(inp)
    grad4 = Conv2D(filters=numFM, kernel_size=5, kernel_initializer=G4, strides=1, padding='same', trainable=False)(inp)

    Gm = K.stack([grad1,grad2,grad3,grad4], axis=0)
    Gm = K.max(K.abs(Gm), axis=0)

    return Gm

In [None]:
# this function is for calculating the pixel-domain JND values of input tensor

def JND_pixel(inp, numFM):

    bg = func_bg(inp, numFM) # average background luminance
    Gm = func_Gm(inp, numFM) # maximal weighted average of gradients

    # calculating luminance adaption threshold
    T0 = 17.0;
    Gamma = 3/128;
    LA_t = tf.where(bg <= 127.0, (T0*(1.0-K.sqrt((bg/127.0)+K.epsilon()))+3.0), (Gamma*(bg-127.0)+3.0))

    # calculating texture masking threshold
    lamda = 0.5
    alpha = 0.0001*bg + 0.115
    beta = lamda - 0.01*bg
    TM_t = Gm*alpha + beta

    # calculating the final JND
    T = K.stack([LA_t, TM_t], axis=0)
    C_tg = 0.3
    JND = LA_t + TM_t - C_tg*K.min(T, axis=0)

    return JND

In [None]:
# this function is for scaling the attention values

def func_norm(inp):

    mini = K.min(inp, axis=[1,2], keepdims=True)
    maxi = K.max(inp, axis=[1,2], keepdims=True)

    out = (inp - mini) / ((maxi - mini) + K.epsilon())

    return out

In [None]:
# this is JbAM

def JbAM(inp, numFM):
    # JND_pixel() function call
    Attmap_WS = JND_pixel(inp, numFM)

    # func_norm function call
    Attmap_S = func_norm(Attmap_WS)

    # multiplying the pixel-domain JND values with scaled attention values, to generate attention weighted tensor
    out = Multiply()([inp, Attmap_S])
    return out

In [None]:
# defining conv_block which is used multiple times in Encoder and Decoder

def conv_block(inp, num_filters):

    x = Conv2D(num_filters, (3,3), padding="same")(inp)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, (3,3), padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x

In [None]:
# defining the encoder block of Jb_UNet

def encoder_block(inp, num_filters):

    x = conv_block(inp, num_filters)
    p = MaxPool2D((2,2))(x)

    return x,p

In [None]:
# defining the decoder block of Jb_UNet

def decoder_block(inp1, inp2, num_filters):

    x = Conv2DTranspose(num_filters, (2,2), strides=2, padding="same")(inp1)
    skip_features = JbAM(inp2, num_filters)

    x = Concatenate()([x, skip_features])
    x = Dropout(0.2)(x)

    x = Conv2D(num_filters, (3,3), padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x

In [None]:
# defining Jb_UNet
def Jb_UNet(inp_shape):

    inps = Input(inp_shape)
    s = Lambda(lambda k:k/255)(inps)

    c1, p1 = encoder_block(s, 32)
    p1 = Dropout(0.2)(p1)
    c2, p2 = encoder_block(p1, 64)
    p2 = Dropout(0.2)(p2)
    c3, p3 = encoder_block(p2, 128)
    p3 = Dropout(0.2)(p3)
    c4, p4 = encoder_block(p3, 256)
    p4 = Dropout(0.2)(p4)

    c5 = conv_block(p4, 512)

    u6 = decoder_block(c5, c4, 256)
    u7 = decoder_block(u6, c3, 128)
    u8 = decoder_block(u7, c2, 64)
    u9 = decoder_block(u8, c1, 32)

    output = Conv2D(1, (1,1), padding="same", activation="sigmoid")(u9)

    model = Model(inputs=[inps], outputs=[output], name="JbUNet")

    return model

In [None]:
# defining attention_aware BCE loss
def bce_loss(y_true, y_pred):
    term_0 = (1 - y_true) * K.log(1 - y_pred + K.epsilon())
    term_1 = y_true * K.log(y_pred + K.epsilon())
    loss = -K.mean(term_0 + term_1) # This is Binary Cross Entropy Loss
    return loss

# defining accuracy, sensitivity, specificity metrics
def acc_met(y_true,y_pred):
    out = K.mean(K.equal(y_true, y_pred))
    return out

def sensitivity(y_true,y_pred):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    neg_y_pred = 1 - y_pred_f
    tp = K.sum(y_true_f * y_pred_f)
    fn = K.sum(y_true_f * neg_y_pred)
    return tp / (tp+fn+K.epsilon())

def specificity(y_true,y_pred):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    neg_y_true = 1 - y_true_f
    neg_y_pred = 1 - y_pred_f
    fp = K.sum(neg_y_true * y_pred_f)
    tn = K.sum(neg_y_true * neg_y_pred)
    return tn / (tn+fp+K.epsilon())

In [None]:
# Compiling the Model

opt = Adam(learning_rate = initial_LR)
model = Jb_UNet((patch_width, patch_height, patch_channels))
metrics_evaluated = [acc_met,
                     sensitivity,
                     specificity]
model.compile(optimizer=opt, loss=bce_loss, metrics=metrics_evaluated)
model.summary()

In [None]:
# defining a data generator

class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, images, gts, batch_size=batch_size, shuffle=True):
        super().__init__()
        self.images = images
        self.gts = gts
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.key_array = np.arange(self.images.shape[0], dtype=np.uint32)
        self.on_epoch_end()

    def __len__(self):
        return len(self.key_array)//self.batch_size

    def __getitem__(self, index):
        keys = self.key_array[index*self.batch_size:(index+1)*self.batch_size]
        x = np.asarray(self.images[keys], dtype=np.float32)
        y = np.asarray(self.gts[keys], dtype=np.float32)
        return x, y

    def on_epoch_end(self):
        if self.shuffle:
            self.key_array = np.random.permutation(self.key_array)

In [None]:
# setting the paths for images and ground truths

datapath = "/content/DataSet/EOphtha_Asitis_Dataset4/Images"
training_imgs = sorted(glob(os.path.join(datapath, "Training_Set", "*jpg")))

datapath = "/content/DataSet/EOphtha_Asitis_Dataset4/Ground_Truths"
training_maps = sorted(glob(os.path.join(datapath, "Training_Set", "*png")))

In [None]:
# reading the images

length = len(training_imgs)
train_X = np.zeros((length, patch_width, patch_height, patch_channels), dtype=np.uint8)

for n in range(length):
    image = cv2.imread(training_imgs[n], 0)

    image = np.expand_dims(image, axis=-1)
    train_X[n] = image

In [None]:
# reading the corresponding ground truths

length = len(training_maps)
train_Y = np.zeros((length, patch_width, patch_height, patch_channels), dtype=float)

for n in range(length):
    seg_map = cv2.imread(training_maps[n], 0)

    if np.max(seg_map)==0:
        seg_map = seg_map
    else:
        seg_map = seg_map / np.max(seg_map)

    seg_map = seg_map > 0.5
    seg_map = np.expand_dims(seg_map, axis=-1)
    train_Y[n] = seg_map

In [None]:
generator = DataGenerator(images=train_X, gts=train_Y, batch_size=batch_size, shuffle=True)
n_batches = len(generator)

In [None]:
# these are to the loss, acc, sens, spec values during training
loss_train = np.zeros(shape=(num_epochs,), dtype=float)
acc_train  = np.zeros(shape=(num_epochs,), dtype=float)
sens_train = np.zeros(shape=(num_epochs,), dtype=float)
spec_train = np.zeros(shape=(num_epochs,), dtype=float)

# training loop
for epoch in range(num_epochs):

    epoch_loss_avg = tf.keras.metrics.Mean() # Keeping track of the training loss
    epoch_acc_avg  = tf.keras.metrics.Mean() # Keeping track of the training accuracy
    epoch_sens_avg = tf.keras.metrics.Mean() # Keeping track of the training sensitivity
    epoch_spec_avg = tf.keras.metrics.Mean() # Keeping track of the training specificity

    print('==== Epoch {}/{} ===='.format(epoch+1, num_epochs))

    for batch in tqdm(range(n_batches)):

        x, y = generator[batch]

        with tf.GradientTape() as tape: # Forward pass
            y_p = model(x, training=True)
            loss = bce_loss(y_true=y, y_pred=y_p)

        grad = tape.gradient(loss, model.trainable_variables) # Backpropagation
        opt.apply_gradients(zip(grad, model.trainable_variables)) # Update network weights

        epoch_loss_avg(loss)
        y_pt = K.cast((y_p>0.5),float)
        epoch_acc_avg(acc_met(y_true=y, y_pred=y_pt))
        epoch_sens_avg(sensitivity(y_true=y, y_pred=y_pt))
        epoch_spec_avg(specificity(y_true=y, y_pred=y_pt))

    generator.on_epoch_end()

    # Training Predictions
    loss_train[epoch] = epoch_loss_avg.result()
    acc_train[epoch] = epoch_acc_avg.result()
    sens_train[epoch] = epoch_sens_avg.result()
    spec_train[epoch] = epoch_spec_avg.result()

    print("loss: {:.4f} - accuracy: {:.4f} - sensitivity: {:.4f} - specificity: {:.4f}".format(loss_train[epoch], acc_train[epoch], sens_train[epoch], spec_train[epoch]))

In [None]:
# Saving the Model
model.save("JbUNet_EOphtha.h5")