In [1]:
import keras

In [4]:
model_50 = keras.applications.ResNet50(
    include_top=False,
    weights="imagenet",
    classifier_activation="softmax"
)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m94765736/94765736[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 0us/step


In [5]:
model_50.summary()

In [1]:
import os
import requests
from zipfile import ZipFile
import glob
from dataclasses import dataclass

In [2]:
import random
import numpy as np
import cv2

In [3]:
import tensorflow as tf
import keras_cv

from matplotlib import pyplot as plt

RuntimeError: The Tensorflow package version needs to be at least 2.11.0 for KerasCV to run. Currently, your TensorFlow version is 2.10.0. Please upgrade with `$ pip install --upgrade tensorflow`. You can use `pip freeze` to check afterwards that everything is ok.

In [None]:
def system_config(SEED_VALUE):
    random.seed(SEED_VALUE)
    tf.keras.utils.set_random_seed(SEED_VALUE)
    os.environ["CUDA_VISIBLE_DEVICES"] = '0'
    os.environ["TF_CUDNN_DETERMINISTIC"] = '1'
    os.environ['TF_USE_CUDNN'] = "true"

system_config(SEED_VALUE=42)

In [None]:
def download_and_unzip(url, save_path):
    print("Downloading and extracting assets...", end="")
    file = requests.get(url)
    open(save_path, "wb").write(file.content)
    try:
        if save_path.endswith(".zip"):
            with ZipFile(save_path) as zip:
                zip.extractall(os.path.split(save_path)[0])
        print("Done")
    except:
        print("Invalid file")

In [None]:
DATASET_URL = r"https://www.dropbox.com/scl/fi/9k8t9619b4x0hegued5c5/Water-Bodies-Dataset.zip?rlkey=tjgepcai6t74yynmx7tqsm7af&dl=1"
DATASET_DIR = "Water-Bodies-Dataset"
DATASET_ZIP_PATH = os.path.join(os.getcwd(), f"{DATASET_DIR}.zip")

if not os.path.exists(DATASET_DIR):
    download_and_unzip(DATASET_URL, DATASET_ZIP_PATH)
    os.remove(DATASET_ZIP_PATH)

In [None]:
@dataclass(frozen=True)
class DatasetConfig:
    IMAGE_SIZE: tuple = (256,256)
    BATCH_SIZE: int = 16
    NUM_CLASSES: int = 2
    BRIGHTNESS_FACTOR: float = 0.2
    CONTRAST_FACTOR: float = 0.2

In [None]:
@dataclass(frozen=True)
class TrainingConfig:
    MODEL:           str = "resnet50_v2_imagenet"
    EPOCHS:          int = 35
    LEARNING_RATE: float = 1e-4
    CKPT_DIR:        str = os.path.join("checkpoints_"+"_".join(MODEL.split("_")[:2]), 
                                        "deeplabv3_plus_"+"_".join(MODEL.split("_")[:2])+".weights.h5")
    LOGS_DIR:        str = "logs_"+"_".join(MODEL.split("_")[:2])

In [None]:
data_images = glob.glob(os.path.join(DATASET_DIR, "Images", "*.jpg"))
data_masks = glob.glob(os.path.join(DATASET_DIR, "Masks", "*.jpg"))

zipped_data = list(zip(data_images, data_masks))
random.shuffle(zipped_data)
data_images, data_masks = zip(*zipped_data)
data_images = list(data_images)
data_masks = list(data_masks)

In [None]:
org_data = tf.data.Dataset.from_tensor_slices((data_images, data_masks))

In [None]:
SPLIT_RATIO = 0.05
NUM_VAL = int(len(data_images) * SPLIT_RATIO)

train_data = org_data.skip(NUM_VAL)
valid_data = org_data.take(NUM_VAL)

In [None]:
def read_image_mask(image_path, mask=False, size = DatasetConfig.IMAGE_SIZE):
    image = tf.io.read_file(image_path)
 
    if mask:
        image = tf.io.decode_image(image, channels=1)
        image.set_shape([None, None, 1])
        image = tf.image.resize(images=image, size=size, method = "bicubic")
 
        image_mask = tf.zeros_like(image)
        cond = image >=200
        updates = tf.ones_like(image[cond])
        image_mask = tf.tensor_scatter_nd_update(image_mask, tf.where(cond), updates)
        image = tf.cast(image_mask, tf.uint8)
 
    else:
        image = tf.io.decode_image(image, channels=3)
        image.set_shape([None, None, 3])
        image = tf.image.resize(images=image, size=size, method = "bicubic")
        image = tf.cast(tf.clip_by_value(image, 0., 255.), tf.float32)
 
    return image

In [None]:
def load_data(image_list, mask_list):
    image = read_image_mask(image_list)
    mask = read_image_mask(mask_list, mask=True)
    return {"images":image, "segmentation_masks":mask}

In [None]:
train_ds = train_data.map(load_data, num_parallel_calls=tf.data.AUTOTUNE)
valid_ds = valid_data.map(load_data, num_parallel_calls=tf.data.AUTOTUNE)

In [None]:
def unpackage_inputs(inputs):
    images = inputs["images"]
    segmentation_masks = inputs["segmentation_masks"]
    return images, segmentation_masks

In [None]:
id2color = {
    0: (0,  0,    0),    # Background
    1: (102, 204, 255),  # Waterbodies
 }

In [None]:
def num_to_rgb(num_arr, color_map=id2color):
    output = np.zeros(num_arr.shape[:2]+(3,))
    for k in color_map.keys():
        output[num_arr==k] = color_map[k]

    return output.astype(np.uint8)

In [None]:
def image_overlay(image, segmented_image):
    alpha = 1.0
    beta = 0.7
    gamma = 0.0

    image = image.astype(np.uint8)

    segmented_image = cv2.cvtColor(segmented_image, cv2.COLOR_RGB2BGR)

    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

    image = cv2.addWeighted(image, alpha, segmented_image, beta, gamma, image)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    return image

In [None]:
def display_image_and_mask(data_list, title_list, figsize, color_mask=False, color_map=id2color):
    rgb_gt_mask = num_to_rgb(data_list[1], color_map = color_map)
    mask_to_overlay = rgb_gt_mask

    if len(data_list)==3:
        rgb_pred_mask = num_to_rgb(data_list[-1], color_map=color_map)
        mask_to_overlay = rgb_pred_mask

    overlayed_image = image_overlay(data_list[0], mask_to_overlay)
     
    data_list.append(overlayed_image)
 
    fig, axes = plt.subplots(nrows=1, ncols=len(data_list), figsize=figsize)
     
    for idx, axis in enumerate(axes.flat):
        axis.set_title(title_list[idx])
        if title_list[idx] == "GT Mask":
            if color_mask:
                axis.imshow(rgb_gt_mask)
            else:
                axis.imshow(data_list[1], cmap="gray")
 
        elif title_list[idx] == "Pred Mask":
            if color_mask:
                axis.imshow(rgb_pred_mask)
            else:
                axis.imshow(data_list[-1], cmap="gray")
             
        else:
            axis.imshow(data_list[idx])
             
        axis.axis('off')
         
    plt.show()

In [None]:
plot_train_ds = train_ds.map(unpackage_inputs).batch(3)
image_batch, mask_batch = next(iter(plot_train_ds.take(1)))
 
titles = ["GT Image", "GT Mask", "Overlayed Mask"]
 
for image, gt_mask in zip(image_batch, mask_batch):
 
    gt_mask = tf.squeeze(gt_mask, axis=-1).numpy()
    display_image_and_mask([image.numpy().astype(np.uint8), gt_mask], 
                           title_list=titles,
                           figsize=(16,6),
                           color_mask=True)

In [None]:
augment_fn = tf.keras.Sequential(
    [
        keras_cv.layers.RandomFlip(),
        keras_cv.layers.RandomBrightness(factor=DatasetConfig.BRIGHTNESS_FACTOR,
                                         value_range=(0,255)),
        keras_cv.layers.RandomContrast(factor=DatasetConfig.CONTRAST_FACTOR,
                                       value_range=(0,255)),
    ]
)

In [None]:
train_dataset = (
    train_ds.shuffle(DatasetConfig.BATCH_SIZE)
    .map(augment_fn, num_parallel_calls=tf.data.AUTOTUNE)
    .batch(DatasetConfig.BATCH_SIZE)
    .map(unpackage_inputs)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

valid_dataset = (
    valid_ds.batch(DatasetConfig.BATCH_SIZE)
    .map(unpackage_inputs)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

In [None]:
backbone = keras_cv.models.ResNet50V2Backbone.from_preset(preset = TrainingConfig.MODEL,
                                                          input_shape=DatasetConfig.IMAGE_SIZE+(3,),
                                                          load_weights=True)
model = keras_cv.models.segmentation.DeepLabV3Plus(
    num_classes=DatasetConfig.NUM_CLASSES, backbone=backbone)

model.summary()

In [None]:
def get_callbacks(
    TrainingConfig,
    monitor="val_mean_iou",
    mode="max",
    save_weights_only=True,
    save_best_only=True,
):
    tensorboard_callback = tf.keras.callbacks.TensorBoard(
        log_dir=TrainingConfig.LOGS_DIR,
        histogram_freq=20,
        write_graph=False,
        update_freq="epoch",
    )

    if save_weights_only:
        checkpoint_filepath = TrainingConfig.CKPT_DIR

    model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_filepath,
        save_weights_only=save_weights_only,
        monitor=monitor,
        mode=mode,
        save_best_only=save_best_only,
        verbose=1,
    )

    return [tensorboard_callback, model_checkpoint_callback]

In [None]:
def mean_iou(y_true, y_pred):
    num_classes = y_pred.shape[-1]

    y_true = tf.squeeze(y_true, axis=-1)

    y_true = tf.one_hot(tf.cast(y_true, tf.int32), num_classes, axis=-1)
    y_pred = tf.one_hot(tf.math.argmax(y_pred, axis=-1), num_classes, axis=-1)

    intersection = tf.math.reduce_sum(y_true * y_pred, axis=(1,2))

    total = tf.math.reduce_sum(y_true, axis=(1, 2)) + tf.math.reduce_sum(y_pred, axis=(1,2))

    union = total - intersection

    is_class_present = tf.cast(tf.math.not_equal(total, 0), dtype=tf.float32)
    num_classes_present = tf.math.reduce_sum(is_class_present, axis=1)

    iou = tf.math.divide_no_nan(intersection, union)
    iou = tf.math.reduce_sum(iou, axis=1) / num_classes_present

    mean_iou = tf.math.reduce_mean(iou)

    return mean_iou

In [None]:
callbacks = get_callbacks(TrainingConfig)

loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)

model.compile(
    optimizer=tf.keras.optimizers.Adam(TrainingConfig.LEARNING_RATE),
    loss=loss_fn,
    metrics=["accuracy", mean_iou],
)

In [None]:
history = model.fit(
    train_dataset,
    epochs=TrainingConfig.EPOCHS,
    validation_data=valid_dataset,
    callbacks=callbacks
)