In [None]:
import os
import cv2
import keras
import numpy as np
from glob import glob
import tensorflow as tf
import matplotlib.pyplot as plt

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import json

with open('drive/MyDrive/dpns/coco-stuff/meta.json', 'r') as f:
    meta_data = json.load(f)

class_id_to_index = {cls['id']: idx for idx, cls in enumerate(meta_data['classes'])}
class_title_to_index = {cls['title']: idx for idx, cls in enumerate(meta_data['classes'])}
n_classes = len(meta_data['classes'])

batch_size = 32

In [None]:
dataset = 'drive/MyDrive/dpns/coco-stuff'

train_dir = f'{dataset}/train'
train_img_dir = f'{train_dir}/img'
train_ann_dir= f'{train_dir}/ann'

val_dir = f'{dataset}/test'
val_img_dir = f'{val_dir}/img'
val_ann_dir = f'{val_dir}/ann'

In [None]:
def load_image(img_path, size=(224, 224)):
    img = tf.io.read_file(img_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, size)
    img = tf.cast(img, tf.float32) / 255.0
    return img


In [None]:
import base64
import zlib

def bitmap_to_mask(bitmap_data, origin, height, width):
    compressed_data = base64.b64decode(bitmap_data)
    decompressed_data = zlib.decompress(compressed_data)

    n = np.frombuffer(decompressed_data, np.uint8)
    imdecoded = cv2.imdecode(n, cv2.IMREAD_UNCHANGED)

    mask = imdecoded[:, :, 3].astype(bool)

    full_mask = np.zeros((height, width), dtype=np.uint8)
    x, y = origin
    full_mask[y:y + mask.shape[0], x:x + mask.shape[1]] = mask
    return full_mask

In [None]:
def load_mask(ann_path, size=(224, 224), n_classes=n_classes):
    with open(ann_path, 'r') as file:
        annotation = json.load(file)

    height = annotation['size']['height']
    width = annotation['size']['width']
    mask = np.zeros((height, width), dtype=np.int32)
    for obj in annotation['objects']:
        class_id = obj['classId']
        class_index = class_id_to_index[class_id]
        bitmap_data = obj['bitmap']['data']
        origin = obj['bitmap']['origin']

        obj_mask = bitmap_to_mask(bitmap_data, origin, height, width)
        mask = np.maximum(mask, obj_mask * class_index)

    mask = cv2.resize(mask, size, interpolation=cv2.INTER_NEAREST)

    mask_one_hot = np.zeros((*size, n_classes), dtype=np.float32)
    for c in range(n_classes):
        mask_one_hot[:, :, c] = (mask == c).astype(float)
    return mask_one_hot

In [None]:
def data_augment(image, mask):
    flip_left_right = tf.random.uniform([], 0, 1) > 0.5
    flip_up_down = tf.random.uniform([], 0, 1) > 0.5

    if flip_left_right:
        image = tf.image.flip_left_right(image)
        mask = tf.image.flip_left_right(mask)

    if flip_up_down:
        image = tf.image.flip_up_down(image)
        mask = tf.image.flip_up_down(mask)

    image = tf.image.random_brightness(image, max_delta=0.1)
    image = tf.image.random_contrast(image, lower=0.9, upper=1.1)
    image = tf.image.random_hue(image, max_delta=0.1)
    image = tf.image.random_saturation(image, lower=0.9, upper=1.1)

    return image, mask

In [None]:
def load_paths(img_dir, ann_dir):
    img_paths = sorted([os.path.join(img_dir, f) for f in os.listdir(img_dir)])
    ann_paths = sorted([os.path.join(ann_dir, f) for f in os.listdir(ann_dir)])

    return img_paths, ann_paths

In [None]:
def train_data_generator(img_paths, ann_paths, batch_size=batch_size, size=(224, 224), n_classes=n_classes):
    indices = np.arange(len(img_paths))
    np.random.shuffle(indices)
    img_paths = np.array(img_paths)[indices]
    ann_paths = np.array(ann_paths)[indices]

    dataset = tf.data.Dataset.from_tensor_slices((img_paths, ann_paths))
    def load_data(img_path, ann_path):
        img = load_image(img_path, size)
        mask = tf.numpy_function(load_mask, inp=[ann_path, size, n_classes], Tout=tf.float32)
        img.set_shape((size[0], size[1], 3))
        mask.set_shape((size[0], size[1], n_classes))
        img, mask = data_augment(img, mask)
        return img, mask

    dataset = dataset.map(load_data, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.repeat()
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

    return dataset

In [None]:
def val_data_generator(img_paths, ann_paths, batch_size=batch_size, size=(224, 224), n_classes=n_classes):
    indices = np.arange(len(img_paths))
    np.random.shuffle(indices)
    img_paths = np.array(img_paths)[indices]
    ann_paths = np.array(ann_paths)[indices]

    dataset = tf.data.Dataset.from_tensor_slices((img_paths, ann_paths))
    def load_data(img_path, ann_path):
        img = load_image(img_path, size)
        mask = tf.numpy_function(load_mask, inp=[ann_path, size, n_classes], Tout=tf.float32)
        img.set_shape((size[0], size[1], 3))
        mask.set_shape((size[0], size[1], n_classes))
        return img, mask

    dataset = dataset.map(load_data, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
    dataset = dataset.repeat()

    return dataset

In [None]:
train_img_paths, train_ann_paths = load_paths(train_img_dir, train_ann_dir)
train_gen = train_data_generator(train_img_paths, train_ann_paths, batch_size=batch_size, size=(224, 224), n_classes=n_classes)

In [None]:
val_img_paths, val_ann_paths = load_paths(val_img_dir, val_ann_dir)
val_gen = val_data_generator(val_img_paths, val_ann_paths, batch_size=batch_size, size=(224, 224), n_classes=n_classes)

In [None]:
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose, concatenate, BatchNormalization, Activation, Dropout, MaxPooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.applications import MobileNetV2

def build_model(n_classes=n_classes, IMG_HEIGHT=224, IMG_WIDTH=224, IMG_CHANNELS=3):
    inputs = Input((IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS), name="input_image")

    encoder = MobileNetV2(input_tensor=inputs, include_top=False)

    BASE_WEIGHT_PATH = ('https://github.com/fchollet/deep-learning-models/releases/download/v0.6/')
    model_name = 'mobilenet_%s_%d_tf_no_top.h5' % ('1_0', 224)
    weight_path = BASE_WEIGHT_PATH + model_name
    weights_path = keras.utils.get_file(model_name, weight_path)
    encoder.load_weights(weights_path, by_name=True, skip_mismatch=True)

    skip_connection_names = ["input_image", "block_1_expand_relu", "block_3_expand_relu", "block_6_expand_relu"]
    encoder_output = encoder.get_layer("block_13_expand_relu").output

    x = encoder_output
    skip_1 = encoder.get_layer(skip_connection_names[-1]).output
    skip_2 = encoder.get_layer(skip_connection_names[-2]).output
    skip_3 = encoder.get_layer(skip_connection_names[-3]).output
    skip_4 = encoder.get_layer(skip_connection_names[-4]).output

    u6 = Conv2DTranspose(256, (3, 3), strides=(2, 2), padding='same')(x)
    u6 = concatenate([u6, skip_1])
    c6 = Conv2D(256, (3, 3), kernel_initializer='he_normal', padding='same')(u6)
    c6 = BatchNormalization()(c6)
    c6 = Activation('relu')(c6)
    c6 = Dropout(0.3)(c6)

    u7 = Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(c6)
    u7 = concatenate([u7, skip_2])
    c7 = Conv2D(128, (3, 3), kernel_initializer='he_normal', padding='same')(u7)
    c7 = BatchNormalization()(c7)
    c7 = Activation('relu')(c7)
    c7 = Dropout(0.3)(c7)

    u8 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(c7)
    u8 = concatenate([u8, skip_3])
    c8 = Conv2D(64, (3, 3), kernel_initializer='he_normal', padding='same')(u8)
    c8 = BatchNormalization()(c8)
    c8 = Activation('relu')(c8)
    c8 = Dropout(0.2)(c8)

    u9 = Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(c8)
    u9 = concatenate([u9, skip_4])
    c9 = Conv2D(32, (3, 3), kernel_initializer='he_normal', padding='same')(u9)
    c9 = BatchNormalization()(c9)
    c9 = Activation('relu')(c9)
    c9 = Dropout(0.2)(c9)

    outputs = Conv2D(n_classes, (1, 1), activation='softmax')(c9)

    model = Model(inputs=[inputs], outputs=[outputs])

    model.summary()

    return model



In [None]:
from tensorflow.keras import  mixed_precision

policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=5,
    verbose=1,
    restore_best_weights=True
)

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint


checkpoint = ModelCheckpoint(
    filepath='drive/MyDrive/dpns/model_checkpoint_v5.keras',
    monitor='val_loss',
    save_best_only=True,
    save_weights_only=False,
    mode='min',
    verbose=1,
)

In [None]:
from tensorflow.keras.callbacks import ReduceLROnPlateau

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=2,
    min_lr=1e-8)


In [None]:
from tensorflow.keras import layers

model = build_model(n_classes=n_classes)

encoder_layers = model.layers[0:-22]

for layer in encoder_layers:
    layer.trainable = False

In [None]:
from tensorflow.keras.optimizers import Adam

initial_learning_rate = 0.0001
optimizer = Adam(learning_rate=initial_learning_rate)

In [None]:
def dice_loss(y_true, y_pred, smooth=1e-4):
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)

    y_true_f = tf.keras.backend.flatten(y_true)
    y_pred_f = tf.keras.backend.flatten(y_pred)

    intersection = tf.keras.backend.sum(y_true_f * y_pred_f)
    union = tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f)

    return 1 - (2. * intersection + smooth) / (union + smooth)

In [None]:
def combo_loss(y_true, y_pred):
     return tf.keras.losses.CategoricalCrossentropy()(y_true, y_pred) + dice_loss(y_true, y_pred)

In [None]:
model.compile(optimizer=optimizer, loss=combo_loss)

In [None]:
model.fit(train_gen,
          steps_per_epoch=len(os.listdir(train_img_dir)) // batch_size,
          validation_steps=len(os.listdir(val_img_dir)) // batch_size,
          epochs=20,
          validation_data=val_gen,
          callbacks=[early_stopping, checkpoint, reduce_lr])