In [None]:
import os
import sys
import random

import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras

In [None]:
H = 256
W = 256
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

In [None]:
import os
import numpy as np
import cv2
import tensorflow as tf
from tensorflow.keras import backend as K

smooth = 1.
def dice_coef(y_true, y_pred):
    y_true_f = tf.keras.layers.Flatten()(y_true)
    y_pred_f = tf.keras.layers.Flatten()(y_pred)
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + smooth)


def dice_loss(y_true, y_pred):
    return 1.0 - dice_coef(y_true, y_pred)

In [None]:
def cbam_block(inputs, ratio=8):
    channel_axis = -1
    filters = inputs.shape[channel_axis]
    se_shape = (1, 1, filters)

    # Channel Attention
    se = GlobalAveragePooling2D()(inputs)
    se = Reshape(se_shape)(se)
    se = Dense(filters // ratio, activation=tf.keras.layers.LeakyReLU(negative_slope=0.01), kernel_initializer='he_normal', use_bias=False)(se)
    se = Dense(filters, activation='sigmoid', kernel_initializer='he_normal', use_bias=False)(se)
    x = Multiply()([inputs, se])

    # Spatial Attention
    se = Conv2D(filters // ratio, (1, 1), padding='same', 
                activation=tf.keras.layers.LeakyReLU(negative_slope=0.01), 
                kernel_initializer='he_normal', use_bias=False)(x)
    se = Conv2D(1, (1, 1), padding='same', activation='sigmoid', 
                kernel_initializer='he_normal', use_bias=False)(se)
    x = Multiply()([x, se])

    return x

In [None]:
import os
import numpy as np
import cv2

import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model


def stem_block(x, n_filter, strides):
    x_init = x

    ## Conv 1
    x = Conv2D(n_filter, (1, 1), padding="same", strides=strides)(x_init)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = SeparableConv2D(n_filter, (3, 3), padding="same")(x)
    
    #Conv 2
    y = Conv2D(n_filter, (1, 1), padding="same", strides=strides)(x_init)
    
    z = Conv2D(n_filter, (1, 1), padding="same", strides=strides)(x_init)
    z = BatchNormalization()(z)
    z = Activation('relu')(z)
    z = SeparableConv2D(n_filter, (5, 5), padding="same")(z)

    ## Shortcut
    s = MaxPooling2D(pool_size=(3, 3),padding='same',strides=strides)(x_init)
    s  = Conv2D(n_filter, (3, 3), padding="same", strides=strides)(s)
    

    ## Add
    x = Concatenate()([x,y,z,s])
    x = cbam_block(x)
    return x

def resnet_block(x, n_filter, strides=1):
    x_init = x

    ## Conv 1
    x = BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(negative_slope=0.01)(x)
    x = SeparableConv2D(n_filter, (3, 3), padding="same", strides=strides)(x)
    ## Conv 2
    x = BatchNormalization()(x)
    x = tf.keras.layers.LeakyReLU(negative_slope=0.01)(x)
    x = SeparableConv2D(n_filter, (3, 3), padding="same", strides=1)(x)

    ## Shortcut
    s  = Conv2D(n_filter, (1, 1), padding="same", strides=strides)(x_init)
    s = BatchNormalization()(s)

    ## Add
    x = Add()([x, s])
    x = cbam_block(x)
    return x

def aspp_block(x, num_filters, rate_scale=1):
    x1 = Conv2D(num_filters, (3, 3), dilation_rate=(6 * rate_scale, 6 * rate_scale), padding="same")(x)
    x1 = BatchNormalization()(x1)

    x2 = Conv2D(num_filters, (3, 3), dilation_rate=(12 * rate_scale, 12 * rate_scale), padding="same")(x)
    x2 = BatchNormalization()(x2)

    x3 = Conv2D(num_filters, (3, 3), dilation_rate=(18 * rate_scale, 18 * rate_scale), padding="same")(x)
    x3 = BatchNormalization()(x3)

    x4 = Conv2D(num_filters, (3, 3), padding="same")(x)
    x4 = BatchNormalization()(x4)

    y = Add()([x1, x2, x3, x4])
    y = Conv2D(num_filters, (1, 1), padding="same")(y)
    return y

def attetion_block(g, x):
    """
        g: Output of Parallel Encoder block
        x: Output of Previous Decoder block
    """

    filters = x.shape[-1]

    g_conv = BatchNormalization()(g)
    g_conv = tf.keras.layers.LeakyReLU(negative_slope=0.01)(g_conv)
    g_conv = Conv2D(filters, (3, 3), padding="same")(g_conv)

    g_pool = MaxPooling2D(pool_size=(2, 2), strides=(2, 2))(g_conv)

    x_conv = BatchNormalization()(x)
    x_conv = tf.keras.layers.LeakyReLU(negative_slope=0.01)(x_conv)
    x_conv = Conv2D(filters, (3, 3), padding="same")(x_conv)

    gc_sum = Add()([g_pool, x_conv])

    gc_conv = BatchNormalization()(gc_sum)
    gc_conv = tf.keras.layers.LeakyReLU(negative_slope=0.01)(gc_conv)
    gc_conv = Conv2D(filters, (3, 3), padding="same")(gc_conv)

    gc_mul = Multiply()([gc_conv, x])
    return gc_mul

class LightResUnet:
    def __init__(self, input_size=256):
        self.input_size = input_size

    def build_model(self):
        n_filters = [16,32, 64, 128, 256]
        inputs = Input((self.input_size, self.input_size, 3))

        c0 = inputs
        c1 = stem_block(c0, n_filters[0], strides=1)
        print(c1.shape)
        c2 = resnet_block(c1, n_filters[1], strides=2)
        print(c2.shape)
        c3 = resnet_block(c2, n_filters[2], strides=2)
        print(c3.shape)
        c4 = resnet_block(c3, n_filters[3], strides=2)
        print(c4.shape)

        b1 = aspp_block(c4, n_filters[4])

        d1 = attetion_block(c3, b1)
        d1 = UpSampling2D((2, 2))(d1)
        d1 = Concatenate()([d1, c3])
        d1 = resnet_block(d1, n_filters[3])

        d2 = attetion_block(c2, d1)
        d2 = UpSampling2D((2, 2))(d2)
        d2 = Concatenate()([d2, c2])
        d2 = resnet_block(d2, n_filters[2])

        d3 = attetion_block(c1, d2)
        d3 = UpSampling2D((2, 2))(d3)
        d3 = Concatenate()([d3, c1])
        d3 = resnet_block(d3, n_filters[1])

        d4 = resnet_block(d3, n_filters[0])
        outputs = Conv2D(1, (1, 1), padding="same")(d3)
        outputs = Activation("sigmoid")(outputs)

        model = Model(inputs, outputs)
        return model

In [None]:
arch = LightResUnet(input_size=256)
model = arch.build_model()
model.summary()

In [None]:
import tensorflow as tf
from glob import glob
import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split

# Constants for image size
W = 256
H = 256

import os
from glob import glob
from sklearn.model_selection import train_test_split

def read_image(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (W, H))
    x = x / 255.0
    x = x.astype(np.float32)
    return x

def read_mask(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    x = cv2.resize(x, (W, H))
    x = x / 255.0
    x = x.astype(np.float32)
    x = np.expand_dims(x, axis=-1)
    return x

def tf_parse(image_path, mask_path):

    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, [H, W])
    image = image / 255.0

    mask = tf.io.read_file(mask_path)
    mask = tf.image.decode_png(mask, channels=1)
    mask = tf.image.resize(mask, [H, W])
    mask = mask / 255.0

    return image, mask



def tf_dataset(X, Y, batch=2, augment=False):
    dataset = tf.data.Dataset.from_tensor_slices((X, Y))
    dataset = dataset.map(tf_parse, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.batch(batch)
    dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
    return dataset


In [None]:
import numpy as np
import cv2
from glob import glob
from sklearn.utils import shuffle
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, ReduceLROnPlateau, EarlyStopping, TensorBoard
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from tensorflow.keras.metrics import Precision, Recall, MeanIoU, Accuracy

In [None]:
import os
from glob import glob
from sklearn.model_selection import train_test_split

def load_dataset(path, num_data=1200, train_split=0.8, val_split=0.1, test_split=0.1):
    images = sorted(glob(os.path.join(path, "images", "*.jpg")))[:num_data]
    masks = sorted(glob(os.path.join(path, "binary_masks", "*.png")))[:num_data]

    # Ensure that the splits add up to 1
    assert train_split + val_split + test_split == 1, "Splits must sum to 1."

    # First, split the data into training and temporary set (valid + test)
    train_x, temp_x, train_y, temp_y = train_test_split(images, masks, test_size=(1 - train_split), random_state=42)
    
    # Then, split the temporary set into validation and test sets
    valid_x, test_x, valid_y, test_y = train_test_split(temp_x, temp_y, test_size=(test_split / (val_split + test_split)), random_state=42)

    return (train_x, train_y), (valid_x, valid_y), (test_x, test_y)

dataset_path = "/kaggle/input/aquavplant-70-10-20-aug/train_augmented"
(train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_dataset(dataset_path, num_data=1100, train_split=0.8, val_split=0.1,test_split=0.1)

print(f"Train: {len(train_x)} - {len(train_y)}")
print(f"Valid: {len(valid_x)} - {len(valid_y)}")
print(f"Valid: {len(test_x)} - {len(test_y)}")

In [None]:
if __name__ == "__main__":
    np.random.seed(42)
    tf.random.set_seed(42)
    create_dir("files")


    batch_size = 32
    lr = 1e-3
    num_epochs = 50
    model_path = os.path.join("files", "resunet_model.keras")
    csv_path = os.path.join("files", "log.csv")
    
    train_dataset = tf_dataset(train_x, train_y, batch=batch_size,augment=False)
    valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size,augment=False)
    test_dataset = tf_dataset(test_x, test_y, batch=batch_size, augment= False)
    
    plt.figure(figsize=(15, 6))
    for i, (image, mask) in enumerate(train_dataset.take(1)):
        for j in range(10):
            plt.subplot(2, 10, j + 1)
            plt.imshow(image[j])
            plt.title('Image')
            plt.axis('off')

            plt.subplot(2, 10, j + 11)
            plt.imshow(mask[j][:, :, 0], cmap='gray')
            plt.title('Mask')
            plt.axis('off')
    plt.tight_layout()
    plt.show()

    metrics = [Recall(), Precision(), dice_coef, MeanIoU(num_classes=2)]
    model.compile(loss=dice_loss, optimizer=Adam(lr), metrics=metrics)

    callbacks = [
        ModelCheckpoint(model_path, verbose=1, save_best_only=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=10, min_lr=1e-10, verbose=1),
        CSVLogger(csv_path),
        EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=False),
    ]

    history = model.fit(
        train_dataset,
        epochs=num_epochs,
        validation_data=valid_dataset,
        callbacks=callbacks
    )
        
    

In [None]:
import numpy as np
import cv2
import pandas as pd
from tqdm import tqdm
import tensorflow as tf
from tensorflow.keras.utils import CustomObjectScope
from sklearn.metrics import f1_score, jaccard_score, precision_score, recall_score, accuracy_score
from sklearn.model_selection import train_test_split

In [None]:
import numpy as np
import cv2
import pandas as pd
from tqdm import tqdm
import tensorflow as tf
from tensorflow.keras.utils import CustomObjectScope
from sklearn.metrics import f1_score, jaccard_score, precision_score, recall_score, accuracy_score
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import GlobalAveragePooling2D, GlobalMaxPooling2D, Dense, Reshape, multiply, Add, Activation, Concatenate, Conv2D, Layer
import tensorflow as tf
from tensorflow.keras import backend as K
from sklearn.metrics import confusion_matrix   
if __name__ == "__main__":
    np.random.seed(42)
    tf.random.set_seed(42)

    with CustomObjectScope({"dice_coef": dice_coef, "dice_loss": dice_loss, 
                            'LeakyReLU':tf.keras.layers.LeakyReLU(negative_slope=0.01),
                            'cbam_block':cbam_block}):
        model = tf.keras.models.load_model("/kaggle/working/files/resunet_model.keras")

    SCORE = []
    image_data = []
    for x, y in tqdm(zip(test_x, test_y), total=len(test_y)):
        """ Extracting the name """
        name = x.split("/")[-1]

        """ Reading the image """
        image = cv2.imread(x, cv2.IMREAD_COLOR) 
        image = cv2.resize(image, (W, H))       
        x = image / 255.0                       
        x = np.expand_dims(x, axis=0)           

        """ Reading the mask """
        mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, (W, H))
        mask = mask/255.0

        """ Prediction """
        y_pred = model.predict(x, verbose=0)[0]
        y_pred = np.squeeze(y_pred, axis=-1)
        #print("Min/Max Predicted Mask:", y_pred.min(), y_pred.max())  # Debugging

        y_pred = (y_pred > 0.5).astype(np.int32)  # Use `>` instead of `>=`

        """ Flatten the arrays """
        mask = (mask > 0).astype(np.int32).flatten()
        y_pred = y_pred.flatten()

        """ Compute TP, TN, FP, FN manually """
        TP = np.sum((mask == 1) & (y_pred == 1))
        TN = np.sum((mask == 0) & (y_pred == 0))
        FP = np.sum((mask == 0) & (y_pred == 1))
        FN = np.sum((mask == 1) & (y_pred == 0))

        """ Calculate metrics manually """
        accuracy_value = (TP + TN) / (TP + TN + FP + FN)
        precision_value = TP / (TP + FP) if (TP + FP) > 0 else 0
        recall_value = TP / (TP + FN) if (TP + FN) > 0 else 0
        specificity_value = TN / (TN + FP) if (TN + FP) > 0 else 0
        dice_value = (2 * TP) / (2 * TP + FP + FN) if (2 * TP + FP + FN) > 0 else 0
        jac_value = TP / (TP + FP + FN) if (TP + FP + FN) > 0 else 0
        f1_value = (2 * precision_value * recall_value) / (precision_value + recall_value) if (precision_value + recall_value) > 0 else 0

        """ Store the results """
        SCORE.append([name, accuracy_value, f1_value, jac_value, recall_value, precision_value, specificity_value, dice_value])

    """ Compute average scores """
    score = np.mean([s[1:] for s in SCORE], axis=0)

    print(f"Accuracy: {score[0]:0.5f}")
    print(f"F1: {score[1]:0.5f}")
    print(f"Jaccard: {score[2]:0.5f}")
    print(f"Recall: {score[3]:0.5f}")
    print(f"Precision: {score[4]:0.5f}")
    print(f"Specificity: {score[5]:0.5f}")
    print(f"Dice: {score[6]:0.5f}")



In [None]:
def plot_results(image, mask, prediction, name):
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 3, 1)
    plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    plt.title('Image')
    plt.axis('off')

    plt.subplot(1, 3, 2)
    plt.imshow(mask, cmap='gray')
    plt.title('Mask')
    plt.axis('off')

    plt.subplot(1, 3, 3)
    plt.imshow(prediction, cmap='gray')
    plt.title('Prediction')
    plt.axis('off')

    plt.suptitle(name)
    plt.show()

import matplotlib.pyplot as plt
last_image_data = image_data[10]
if last_image_data is not None:
  plot_results(*last_image_data)
else:
  print("No image data found.")


In [None]:
df = pd.read_csv("/kaggle/working/files/score.csv")
df.describe()

In [None]:
df = pd.read_csv("/kaggle/working/files/log.csv")
df.describe()