In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


#metrics

In [42]:
import numpy as np
import tensorflow as tf
from keras import backend as K

def iou(y_true, y_pred):
    def f(y_true, y_pred):
        intersection = (y_true * y_pred).sum()
        union = y_true.sum() + y_pred.sum() - intersection
        x = (intersection + 1e-15) / (union + 1e-15)
        x = x.astype(np.float32)
        return x
    return tf.numpy_function(f, [y_true, y_pred], tf.float32)

def dice_coef(y_true, y_pred):
    y_true = tf.keras.layers.Flatten()(y_true)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    intersection = tf.reduce_sum(y_true * y_pred)
    return (2. * intersection + 1e-15) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + 1e-15)

def dice_loss(y_true, y_pred):
    return 1.0 - dice_coef(y_true, y_pred)

def bce_dice_loss(y_true, y_pred):
    return dice_loss(y_true, y_pred) + tf.keras.losses.binary_crossentropy(y_true, y_pred)

#sparse attention

In [43]:
import tensorflow as tf
from keras.layers import Dropout

class SparseAttention(tf.keras.layers.Layer):

    def __init__(self, key_dim, num_heads=1,regularization_coeff=0.01):
        super(SparseAttention, self).__init__()
        self.layer_norm1 = None
        self.layer_norm2 = None
        self.output_dense = None
        self.v_dense = None
        self.k_dense = None
        self.q_dense = None
        self.key_dim = key_dim
        self.num_heads = num_heads
        self.regularization_coeff = regularization_coeff

    def build(self, input_shape):

        self.q_dense = tf.keras.layers.Dense(self.key_dim * self.num_heads, use_bias=False,
                                         kernel_regularizer=tf.keras.regularizers.l2(self.regularization_coeff))
        self.k_dense = tf.keras.layers.Dense(self.key_dim * self.num_heads, use_bias=False,
                                         kernel_regularizer=tf.keras.regularizers.l2(self.regularization_coeff))
        self.v_dense = tf.keras.layers.Dense(self.key_dim * self.num_heads, use_bias=False,
                                         kernel_regularizer=tf.keras.regularizers.l2(self.regularization_coeff))

        self.output_dense = tf.keras.layers.Dense(input_shape[-1], kernel_regularizer=tf.keras.regularizers.l2(self.regularization_coeff))

        self.layer_norm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layer_norm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

    def call(self, x, **kwargs):
        # Apply Linear Transformation for Query, Key and Value
        q = Dropout(0.1)(self.q_dense(x))
        k = Dropout(0.1)(self.q_dense(x))
        v = Dropout(0.1)(self.q_dense(x))


        # Split the heads
        q = self.split_heads(q)
        k = self.split_heads(k)
        v = self.split_heads(v)
        # Calculate Attention Score
        attn_score = tf.matmul(q, k, transpose_b=True)
        attn_score = attn_score / tf.math.sqrt(tf.cast(self.key_dim, tf.float32))

        # Apply Sparsemax
        attn_score = self.sparsemax(attn_score)

        # Apply Dropout after the Dense layer
        attn_score = Dropout(0.1)(attn_score)
        # Calculate the output value using attention score
        attn_values = tf.matmul(attn_score, v)

        # Combine the heads back
        attn_values = self.combine_heads(attn_values)

        # Apply Layer Normalization (First LayerNorm)
        attn_values = self.layer_norm1(attn_values)

        # Final Linear Transformation
        output = self.output_dense(attn_values)

        # Apply Layer Normalization (Second LayerNorm, if needed)
        output = self.layer_norm2(output)

        return output

    def sparsemax(self, logits, axis=-1):
        logits = tf.convert_to_tensor(logits)
        ob_dim = tf.shape(logits)[axis]
        z = tf.sort(logits, axis=axis, direction='DESCENDING')
        z_cumsum = tf.cumsum(z, axis=axis)
        k = tf.range(1, ob_dim + 1, dtype=tf.float32)
        z_check = 1 + k * z >= z_cumsum
        k_max = tf.reduce_sum(tf.cast(z_check, tf.float32), axis=axis, keepdims=True)
        z_max = tf.gather(z, tf.cast(k_max - 1, tf.int32), batch_dims=len(logits.shape) - 1)
        out = tf.nn.relu(logits - z_max)
        return out


    def split_heads(self, x):
        batch_size = tf.shape(x)[0]
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.key_dim))
        return tf.transpose(x, [0, 2, 1, 3])

    def combine_heads(self, x):
        batch_size = tf.shape(x)[0]
        x = tf.transpose(x, [0, 2, 1, 3])
        return tf.reshape(x, (batch_size, -1, self.key_dim * self.num_heads))


#Se

In [44]:
from keras.layers import GlobalAveragePooling2D, Reshape, Dense, Multiply, Add, Permute, Conv2D
from keras import backend as K


def squeeze_excite_block(input, ratio=16):
    ''' Create a channel-wise squeeze-excite block

    Args:
        input: input tensor
        filters: number of output filters

    Returns: a keras tensor

    References
    -   [Squeeze and Excitation Networks](https://arxiv.org/abs/1709.01507)
    '''
    init = input
    channel_axis = 1 if K.image_data_format() == "channels_first" else -1
    filters = init.shape[channel_axis]
    se_shape = (1, 1, filters)

    se = GlobalAveragePooling2D()(init)
    se = Reshape(se_shape)(se)
    se = Dense(filters // ratio, activation='relu', kernel_initializer='he_normal', use_bias=False)(se)
    se = Dense(filters, activation='sigmoid', kernel_initializer='he_normal', use_bias=False)(se)

    x = Multiply()([init, se])
    return x


def spatial_squeeze_excite_block(input):
    ''' Create a spatial squeeze-excite block

    Args:
        input: input tensor

    Returns: a keras tensor

    References
    -   [Concurrent Spatial and Channel Squeeze & Excitation in Fully Convolutional Networks](https://arxiv.org/abs/1803.02579)
    '''

    se = Conv2D(1, (1, 1), activation='sigmoid', use_bias=False,
                kernel_initializer='he_normal')(input)

    x = Multiply([input, se])
    return x


def channel_spatial_squeeze_excite(input, ratio=16):
    ''' Create a spatial squeeze-excite block

    Args:
        input: input tensor
        filters: number of output filters

    Returns: a keras tensor

    References
    -   [Squeeze and Excitation Networks](https://arxiv.org/abs/1709.01507)
    -   [Concurrent Spatial and Channel Squeeze & Excitation in Fully Convolutional Networks](https://arxiv.org/abs/1803.02579)
    '''

    cse = squeeze_excite_block(input, ratio)
    sse = spatial_squeeze_excite_block(input)

    x = Add([cse, sse])
    return x


# model

In [45]:
import os
import tensorflow as tf
from keras.layers import Reshape
os.environ['SSL_CERT_DIR'] = '/etc/ssl/certs'
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
from keras.applications import MobileNetV2
from keras.models import Model
from keras.layers import Conv2D, Multiply, Add, BatchNormalization, Activation
from keras.layers import Cropping2D,UpSampling2D, Input, Concatenate
from keras.layers import Dropout
from keras.regularizers import l2

def residual_block(x, num_filters):
    x_init = x
    x = Conv2D(num_filters//4, (1, 1), padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters//4, (3, 3), padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, (3, 3), padding="same", kernel_regularizer=l2(0.02))(x)
    x = BatchNormalization()(x)

    s = Conv2D(num_filters, (1, 1), padding="same")(x_init)
    s = BatchNormalization()(s)

    x = Add()([x, s])
    x = Activation("relu")(x)
    x = squeeze_excite_block(x)
    return x

def NanoNet_A_Sparse_Attention(input_shape):
    f = [32, 64, 128]
    inputs = Input(shape=input_shape, name="input_image")

    # Encoder: MobileNetV2
    encoder = MobileNetV2(input_tensor=inputs, weights="imagenet", include_top=False, alpha=1)
    encoder_output = encoder.get_layer(name="block_6_expand_relu").output
    skip_connections_name = ["input_image", "block_1_expand_relu", "block_3_expand_relu"]

    x = residual_block(encoder_output, 192)  # Residual Block


    # SparseAttention Layer
    transformer_shape = (x.shape[1], x.shape[2], x.shape[3])
    x = Reshape((transformer_shape[0] * transformer_shape[1], transformer_shape[2]))(x)
    x = SparseAttention(key_dim=transformer_shape[2], num_heads=2)(x)
    x = Reshape((transformer_shape[0], transformer_shape[1], transformer_shape[2]))(x)

    # Decoder
    for i in range(1, len(skip_connections_name) + 1, 1):
        x_skip = encoder.get_layer(skip_connections_name[-i]).output
        x_skip = Conv2D(f[-i], (1, 1), padding="same")(x_skip)
        x_skip = BatchNormalization()(x_skip)
        x_skip = Activation("relu")(x_skip)

        x = UpSampling2D((2, 2), interpolation='bilinear')(x)

        try:
            x = Concatenate()([x, x_skip])
        except Exception as e:
            x = Cropping2D(cropping=((1, 0), (0, 0)))(x)
            x = Concatenate()([x, x_skip])

        x = residual_block(x, f[-i])
    # Output layer
    x = Conv2D(1, (1, 1), padding="same")(x)
    x = Activation("sigmoid")(x)

    model = Model(inputs=inputs, outputs=x)
    return model


if __name__ == "__main__":
    params = {"img_height": 256, "img_width": 256, "img_channels": 3, "mask_channels": 1}
    input_shape = (params["img_height"], params["img_width"], params["img_channels"])
    model = NanoNet_A_Sparse_Attention(input_shape)
    model.summary()




Model: "model_10"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_image (InputLayer)    [(None, 256, 256, 3)]        0         []                            
                                                                                                  
 Conv1 (Conv2D)              (None, 128, 128, 32)         864       ['input_image[0][0]']         
                                                                                                  
 bn_Conv1 (BatchNormalizati  (None, 128, 128, 32)         128       ['Conv1[0][0]']               
 on)                                                                                              
                                                                                                  
 Conv1_relu (ReLU)           (None, 128, 128, 32)         0         ['bn_Conv1[0][0]']     

#Data

In [46]:
import os
import numpy as np
import cv2
from glob import glob
import tensorflow as tf
from sklearn.model_selection import train_test_split
# from dimensionality_reduction import apply_pca_to_image,reduce_mask_dimension,save_image

H = 256
W = 256

def load_names(path, file_path):
    f = open(file_path, "r")
    data = f.read().split("\n")[:-1]
    images = [os.path.join(path, "images", name) + ".jpg" for name in data]
    masks = [os.path.join(path, "masks", name) + ".jpg" for name in data]
    return images, masks

def load_data(path):
    train_names_path = f"{path}/train.txt"
    valid_names_path = f"{path}/val.txt"

    train_x, train_y = load_names(path, train_names_path)
    valid_x, valid_y = load_names(path, valid_names_path)

    return (train_x, train_y), (valid_x, valid_y)

def load_test_data(path):
    train_names_path = f"{path}/train.txt"
    test_names_path = f"{path}/test.txt"

    train_x, train_y = load_names(path, train_names_path)
    test_x, test_y = load_names(path, test_names_path)

    return (train_x, train_y), (test_x, test_y)
def read_image(path):

    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (W, H))
    x = x/255.0
    x = x.astype(np.float32)
    return x

def read_mask(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    x = cv2.resize(x, (W, H))
    x = x/255.0
    x = np.expand_dims(x, axis=-1)
    x = x.astype(np.float32)
    return x


def read_image2(img_path):
    return np.load(img_path)

def read_mask2(mask_path):
    return np.load(mask_path)



# def augment_data(image, mask):
#     # Random horizontal flip
#     if tf.random.uniform(()) > 0.5:
#         image = tf.image.flip_left_right(image)
#         mask = tf.image.flip_left_right(mask)

#     # Random vertical flip
#     if tf.random.uniform(()) > 0.5:
#         image = tf.image.flip_up_down(image)
#         mask = tf.image.flip_up_down(mask)

#     # Random rotation (in 90-degree increments)
#     num_rotations = tf.random.uniform([], minval=0, maxval=4, dtype=tf.int32)
#     image = tf.image.rot90(image, k=num_rotations)
#     mask = tf.image.rot90(mask, k=num_rotations)

#     # Intensity-based augmentation

#     # RGB to HSV
#     image_hsv = tf.image.rgb_to_hsv(image)

#     # Do some operations in HSV space, adjust saturation

#     delta = 0.2
#     image_hsv = tf.stack([
#         image_hsv[:, :, 0],  # Hue
#         tf.clip_by_value(image_hsv[:, :, 1] + delta, 0, 1),  # Saturation
#         image_hsv[:, :, 2],  # Value
#     ], axis=-1)

#     # Convert back to RGB
#     image_rgb = tf.image.hsv_to_rgb(image_hsv)

#     return image_rgb, mask


def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape([H, W, 3])
    y.set_shape([H, W, 1])
    # x, y = augment_data(x, y)
    return x, y

def tf_dataset(x, y, batch_size=8):
    dataset = tf.data.Dataset.from_tensor_slices((x, y))
    dataset = dataset.shuffle(buffer_size=1000)  # Scramble data
    dataset = dataset.map(tf_parse, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.cache()  # Cache data
    dataset = dataset.batch(batch_size)
    dataset = dataset.repeat()
    dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
    return dataset


def process_dataset(image_paths, mask_paths, save_image_dir, save_mask_dir):
    processed_image_paths = []
    processed_mask_paths = []

    for i, (img_path, mask_path) in enumerate(zip(image_paths, mask_paths)):
        img = read_image2(img_path)
        mask = read_mask2(mask_path)

        processed_img = apply_pca_to_image(img)
        processed_mask = reduce_mask_dimension(mask)

        processed_img_path = os.path.join(save_image_dir, f"processed_image_{i}.jpg")
        processed_mask_path = os.path.join(save_mask_dir, f"processed_mask_{i}.jpg")

        save_image(processed_img, processed_img_path)
        save_image(processed_mask, processed_mask_path)

        processed_image_paths.append(processed_img_path)
        processed_mask_paths.append(processed_mask_path)

    return processed_image_paths, processed_mask_paths



# Utils

In [47]:
from typing import Tuple, List
import numpy as np
import tensorflow as tf
import os

from keras.utils import CustomObjectScope
from sklearn.utils import shuffle
from keras.models import load_model
from keras.utils import custom_object_scope

from keras import backend as K

def create_dir(path):
    """ Create a directory. """
    try:
        if not os.path.exists(path):
            os.makedirs(path)
    except OSError:
        print(f"Error: creating directory with name {path}")

def shuffling(x, y):
    x, y = shuffle(x, y, random_state=42)
    return x, y

def load_model_file(path):
    with CustomObjectScope({
            'iou': iou,
            'dice_coef': dice_coef,
            'dice_loss': dice_loss,
            'bce_dice_loss': bce_dice_loss,
            'SparseAttention': SparseAttention  #  SparseAttention
        }):
        model = tf.keras.models.load_model(path)
    return model

# sgdr


In [48]:
from keras.callbacks import Callback
import keras.backend as K
import numpy as np

class SGDRScheduler(Callback):

    def __init__(self,
                 min_lr,
                 max_lr,
                 steps_per_epoch,
                 lr_decay=1,
                 cycle_length=10,
                 mult_factor=2):

        self.min_lr = min_lr
        self.max_lr = max_lr
        self.lr_decay = lr_decay

        self.batch_since_restart = 0
        self.next_restart = cycle_length

        self.steps_per_epoch = steps_per_epoch

        self.cycle_length = cycle_length
        self.mult_factor = mult_factor

        self.history = {}

    def clr(self):
        '''Calculate the learning rate.'''
        fraction_to_restart = self.batch_since_restart / (self.steps_per_epoch * self.cycle_length)
        lr = self.min_lr + 0.5 * (self.max_lr - self.min_lr) * (1 + np.cos(fraction_to_restart * np.pi))
        return lr

    def on_train_begin(self, logs={}):
        '''Initialize the learning rate to the minimum value at the start of training.'''
        logs = logs or {}
        K.set_value(self.model.optimizer.lr, self.max_lr)

    def on_batch_end(self, batch, logs={}):
        '''Record previous batch statistics and update the learning rate.'''
        logs = logs or {}
        self.history.setdefault('lr', []).append(K.get_value(self.model.optimizer.lr))
        for k, v in logs.items():
            self.history.setdefault(k, []).append(v)

        self.batch_since_restart += 1
        K.set_value(self.model.optimizer.lr, self.clr())

    def on_epoch_end(self, epoch, logs={}):
        '''Check for end of current cycle, apply restarts when necessary.'''
        if epoch + 1 == self.next_restart:
            self.batch_since_restart = 0
            self.cycle_length = np.ceil(self.cycle_length * self.mult_factor)
            self.next_restart += self.cycle_length
            self.max_lr *= self.lr_decay
            self.best_weights = self.model.get_weights()

    def on_train_end(self, logs={}):
        '''Set weights to the values from the end of the most recent cycle for best performance.'''
        self.model.set_weights(self.best_weights)


#Train

In [49]:
import os
from keras.optimizers import Adam

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
import numpy as np
import tensorflow as tf
from keras.callbacks import ModelCheckpoint, CSVLogger, ReduceLROnPlateau, EarlyStopping, TensorBoard
from keras.metrics import Recall, Precision, MeanIoU



if __name__ == "__main__":
    """ Seeding """
    np.random.seed(42)
    tf.random.set_seed(42)
    """ Remove folders and files """
    # os.system("rm files/files.csv")
    # os.system("rm -r logs")
    """ Hyperparameters """
    input_shape = (256, 256, 3)
    batch_size = 8
    lr = 1e-4
    epochs = 200
    model_name = "NanoNetA_SparseAttention"
    model_path = f"files/{model_name}/model.h5"
    csv_path = f"files/{model_name}/model.csv"
    log_path = f"logs/{model_name}/"
    """ Creating folders """
    create_dir(f"files/{model_name}")
    """ Dataset """
    path = '/content/drive/MyDrive/capstone/Kvasir-SEG'

    (train_x, train_y), (valid_x, valid_y) = load_data(path)

    # processed_train_x, processed_train_y = process_dataset(
    #     train_x, train_y,
    #     "/Users/xuzhenke/Documents/USYD/CapStone/Capstone-Project/Kvasir-SEG/processed/train/images",
    #     "/Users/xuzhenke/Documents/USYD/CapStone/Capstone-Project/Kvasir-SEG/processed/train/masks"
    # )
    # processed_valid_x, processed_valid_y = process_dataset(
    #     valid_x, valid_y,
    #     "/Users/xuzhenke/Documents/USYD/CapStone/Capstone-Project/Kvasir-SEG/processed/valid/images",
    #     "/Users/xuzhenke/Documents/USYD/CapStone/Capstone-Project/Kvasir-SEG/processed/valid/masks"
    # )

    # train_dataset = tf_dataset(processed_train_x, processed_train_y, batch)
    # valid_dataset = tf_dataset(processed_valid_x, processed_valid_y, batch)
    #
    # extractor = feature_extractor(input_shape)
    # train_dataset = feature_extracted_tf_dataset(train_x, train_y, extractor, batch)
    # valid_dataset = feature_extracted_tf_dataset(valid_x, valid_y, extractor, batch)

    train_dataset = tf_dataset(train_x, train_y, batch_size)
    valid_dataset = tf_dataset(valid_x, valid_y, batch_size)

    # """ Model """
    model = NanoNet_A_Sparse_Attention(input_shape)

    metrics = [dice_coef, iou, Recall(), Precision()]
    model.compile(loss=bce_dice_loss, optimizer=Adam(lr), metrics=metrics)
    model.summary()

    #
    callbacks = [
        ModelCheckpoint(model_path, verbose=1, save_best_only=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=10, min_lr=1e-7, verbose=1),
        CSVLogger(csv_path),
        TensorBoard(log_dir=log_path),
        EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=False),
    ]

    train_steps = (len(train_x)//batch_size)
    valid_steps = (len(valid_x)//batch_size)

    if len(train_x) % batch_size != 0:
        train_steps += 1

    if len(valid_x) % batch_size != 0:
        valid_steps += 1

    model.fit(train_dataset,
            epochs=epochs,
            validation_data=valid_dataset,
            steps_per_epoch=train_steps,
            validation_steps=valid_steps,
            callbacks=callbacks)




Model: "model_11"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_image (InputLayer)    [(None, 256, 256, 3)]        0         []                            
                                                                                                  
 Conv1 (Conv2D)              (None, 128, 128, 32)         864       ['input_image[0][0]']         
                                                                                                  
 bn_Conv1 (BatchNormalizati  (None, 128, 128, 32)         128       ['Conv1[0][0]']               
 on)                                                                                              
                                                                                                  
 Conv1_relu (ReLU)           (None, 128, 128, 32)         0         ['bn_Conv1[0][0]']     

  saving_api.save_model(


Epoch 2/200
Epoch 2: val_loss improved from 7.53706 to 5.70456, saving model to files/NanoNetA_SparseAttention/model.h5
Epoch 3/200
Epoch 3: val_loss improved from 5.70456 to 4.46629, saving model to files/NanoNetA_SparseAttention/model.h5
Epoch 4/200
Epoch 4: val_loss improved from 4.46629 to 3.63174, saving model to files/NanoNetA_SparseAttention/model.h5
Epoch 5/200
Epoch 5: val_loss improved from 3.63174 to 3.01015, saving model to files/NanoNetA_SparseAttention/model.h5
Epoch 6/200
Epoch 6: val_loss improved from 3.01015 to 2.62923, saving model to files/NanoNetA_SparseAttention/model.h5
Epoch 7/200
Epoch 7: val_loss improved from 2.62923 to 2.19438, saving model to files/NanoNetA_SparseAttention/model.h5
Epoch 8/200
Epoch 8: val_loss improved from 2.19438 to 1.89914, saving model to files/NanoNetA_SparseAttention/model.h5
Epoch 9/200
Epoch 9: val_loss improved from 1.89914 to 1.59489, saving model to files/NanoNetA_SparseAttention/model.h5
Epoch 10/200
Epoch 10: val_loss improved

#Test

In [50]:

import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

import time
from operator import add
import tensorflow as tf
import numpy as np
from glob import glob
import cv2
from tqdm import tqdm
from sklearn.metrics import (
    jaccard_score, f1_score, recall_score, precision_score, accuracy_score, fbeta_score)


def calculate_metrics(y_true, y_pred):
    y_pred = y_pred > 0.5
    y_pred = y_pred.reshape(-1)
    y_pred = y_pred.astype(np.uint8)

    y_true = y_true > 0.5
    y_true = y_true.reshape(-1)
    y_true = y_true.astype(np.uint8)

    ## Score
    score_jaccard = jaccard_score(y_true, y_pred, average='binary')
    score_f1 = f1_score(y_true, y_pred, average='binary')
    score_recall = recall_score(y_true, y_pred, average='binary')
    score_precision = precision_score(y_true, y_pred, average='binary', zero_division=1)
    score_acc = accuracy_score(y_true, y_pred)
    score_fbeta = fbeta_score(y_true, y_pred, beta=2.0, average='binary', zero_division=1)

    return [score_jaccard, score_f1, score_recall, score_precision, score_acc, score_fbeta]

def mask_parse(mask):
    mask = np.squeeze(mask)
    mask = [mask, mask, mask]
    mask = np.transpose(mask, (1, 2, 0))
    return mask

if __name__ == "__main__":
    """ Seeding """
    np.random.seed(42)
    tf.random.set_seed(42)

    """ Load dataset """
    path = "/content/drive/MyDrive/capstone/Kvasir-SEG"
    (train_x, train_y), (test_x, test_y) = load_test_data(path)

    """ Hyperparameters """
    size = (256, 256)
    input_shape = (256, 256, 3)
    model_name = "NanoNetA_SparseAttention"
    model_path = f"files/{model_name}/model.h5"

    """ Directories """
    create_dir(f"results/{model_name}")

    """ Load the model """
    model = load_model_file(model_path)

    """ Sample prediction: To improve FPS """
    image = np.zeros((1, 256, 256, 3))
    mask = model.predict(image)

    """ Testing """
    metrics_score = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
    time_taken = []

    for i, (x, y) in enumerate(zip(test_x, test_y)):
        name = y.split("/")[-1].split(".")[0]

        """ Image """
        image = cv2.imread(x, cv2.IMREAD_COLOR)
        image = cv2.resize(image, size)
        ori_img = image
        image = image/255.0
        image = np.expand_dims(image, axis=0)
        image = image.astype(np.float32)

        """ Mask """
        mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, size)
        ori_mask = mask
        mask = np.expand_dims(mask, axis=0)
        mask = mask/255.0
        mask = mask.astype(np.float32)

        """ Time taken """
        start_time = time.time()
        pred_y = model.predict(image)
        total_time = time.time() - start_time
        time_taken.append(total_time)
        print(f"{name}: {total_time:1.5f}")

        """ Metrics calculation """
        score = calculate_metrics(mask, pred_y)
        metrics_score = list(map(add, metrics_score, score))

        """ Saving masks """
        pred_y = pred_y[0] > 0.5
        pred_y = pred_y * 255
        pred_y = np.array(pred_y, dtype=np.uint8)

        ori_img = ori_img
        ori_mask = mask_parse(ori_mask)
        pred_y = mask_parse(pred_y)
        sep_line = np.ones((size[0], 10, 3)) * 255

        tmp = [
            ori_img, sep_line,
            ori_mask, sep_line,
            pred_y
        ]

        cat_images = np.concatenate(tmp, axis=1)
        cv2.imwrite(f"results/{model_name}/{name}.png", cat_images)

    jaccard = metrics_score[0]/len(test_x)
    f1 = metrics_score[1]/len(test_x)
    recall = metrics_score[2]/len(test_x)
    precision = metrics_score[3]/len(test_x)
    acc = metrics_score[4]/len(test_x)
    f2 = metrics_score[5]/len(test_x)

    print("")
    print(f"Jaccard: {jaccard:1.4f} - F1: {f1:1.4f} - Recall: {recall:1.4f} - Precision: {precision:1.4f} - Acc: {acc:1.4f} - F2: {f2:1.4f}")

    mean_time_taken = np.mean(time_taken)
    mean_fps = 1/mean_time_taken
    print("Mean FPS: ", mean_fps)

cju87vqa0ndwg0850onjdz7ol: 0.07314
cju87xn2snfmv0987sc3d9xnq: 0.08511
cju87z6o6nh73085045bzsx6o: 0.07433
cju87zv8lni0o0850hbbecbq6: 0.11769
cju8828oxnool0801qno9luhr: 0.07749
cju884985nlmx0817vzpax3y4: 0.07247
cju7dp3dw2k4n0755zhe003ad: 0.07368
cju7dqcwi2dz00850gcmr2ert: 0.07080
cju7druhp2gp308715i6km7be: 0.07338
cju7dsrtb2f8i085064kwugfk: 0.07005
cju7dtb1e2j0t0818deq51ib3: 0.07136
cju7dubap2g0w0801fgl42mg9: 0.08271
cju7dvl5m2n4t0755hlnnjjet: 0.07021
cju7dwe282dc309876rco45ts: 0.07128
cju7dxffn2eam0817qxosfwch: 0.07124
cju7dymur2od30755eg8yv2ht: 0.07153
cju7dz5yy2i7z0801ausi7rna: 0.07612
cju7ea4om2l910801bohqjccy: 0.07269
cju7ebe962hr409872ovibahw: 0.07295
cju7ecl9i2i060987xawjp4l0: 0.08631
cju7eea9b2m0z0801ynqv1fqu: 0.06843
cju88oh0po9gq0801nge4tgr1: 0.07300
cju88q6h6obpd0871ckmiabbo: 0.07580
cju88rl5eo94l0850kf5wtrm1: 0.07292
cju88t4fvokxf07558ymyh281: 0.06878
cju88trl3ogi208716qvti51b: 0.07120
cju88v2f9oi8w0871hx9auh01: 0.07589
cju88vx2uoocy075531lc63n3: 0.07552
cju88y1mwoln50871emy