In [None]:
import imageio
from PIL import Image, ImageOps

import tensorflow as tf
from keras_preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing import image
from tensorflow.keras import backend as K
from tensorflow.keras.layers import *
from tensorflow.keras.models import *
from tensorflow.keras.optimizers import *
from tensorflow.keras.applications import VGG19
from sklearn.model_selection import train_test_split

In [None]:
images_folder = 'Dataset/augmentation/images/img/'
masks_folder = 'Dataset/augmentation/masks/img/'

# Preprocess Data

In [None]:
def LoadDataset(images_folder, masks_folder):
    """
    Looks for relevant filenames in the shared path
    Returns 2 lists for original and masked files respectively
    
    """
    # Create images and masks list then sort them
    images_dataset = os.listdir(images_folder)
    masks_dataset = os.listdir(masks_folder)

    images_dataset.sort()
    masks_dataset.sort()
    
    return images_dataset, masks_dataset


def view_image(img_array, mask_array):
    """
    View image
    """

    print(f'image dimension: {img_array.shape}')
    print(f'mask dimension: {mask_array.shape}')

    fig, ax = plt.subplots(1, 2, figsize=(15, 15))
    ax[0].imshow(img_array)
    ax[1].imshow(mask_array)
    

def preprocess_data(images_folder, masks_folder, target_shape_img = (256,256,3),
                    target_shape_mask = (256,256,1)):
    # Pull the relevant dimensions for image and mask
    images, masks = LoadDataset(images_folder, masks_folder)
    m = len(images)                     # number of images
    i_h,i_w,i_c = target_shape_img   # pull height, width, and channels of image
    m_h,m_w,m_c = target_shape_mask  # pull height, width, and channels of mask
    
    # Define X and Y as number of images along with shape of one image
    X = np.zeros((m,i_h,i_w,i_c), dtype=np.float32)
    y = np.zeros((m,m_h,m_w,m_c), dtype=np.int32)
    
    # Resize images and masks
    for img in images:
        # convert image into an array of desired shape (3 channels)
        index = images.index(img)
        path = os.path.join(images_folder, img)
        single_img = Image.open(path).convert('RGB')
        single_img = single_img.resize((i_h,i_w))
        single_img = np.reshape(single_img,(i_h,i_w,i_c)) 
        single_img = single_img/255.
        X[index] = single_img
        
        # convert mask into an array of desired shape (1 channel)
        single_mask_ind = masks[index]
        path = os.path.join(masks_folder, single_mask_ind)
        single_mask = Image.open(path)
        single_mask = ImageOps.grayscale(single_mask)
        single_mask = np.reshape(single_mask,(m_h,m_w,m_c)) 
        single_mask = single_mask//255 # to ensure classes #s start from 0
        y[index] = single_mask

    return X,y

In [None]:
images, masks = LoadDataset(images_folder, masks_folder)
X,y = preprocess_data(images_folder, masks_folder)
# Splitting Data
X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, random_state=42)

# Training Model

In [None]:
def conv_block(input, num_filters):
    x = Conv2D(num_filters, (3,3), padding = 'same', activation = 'relu')(input)
    x = Conv2D(num_filters, (3,3), padding = 'same', activation = 'relu')(x)
    return x


def contracting_block(input, num_filters):
    x = conv_block(input, num_filters)
    p = MaxPooling2D(pool_size = (2,2))(x)
    return x, p


def expansive_block(input, skip_connection, num_filters):
    x = Conv2DTranspose(num_filters, (2,2), strides = 2, padding = 'same')(input)
    x = Concatenate()([x, skip_connection])
    x = conv_block(x, num_filters)
    return x

# Model 1
def unet(input_shape):
    input = Input(input_shape)
    x1, p1 = contracting_block(input, 64)
    x2, p2 = contracting_block(p1, 128)
    x3, p3 = contracting_block(p2, 256)
    x4, p4 = contracting_block(p3, 512)

    x5 = conv_block(p4, 1024)

    x6 = expansive_block(x5, x4, 512)
    x7 = expansive_block(x6, x3, 256)
    x8 = expansive_block(x7, x2, 128)
    x9 = expansive_block(x8, x1, 64)

    x10 = Conv2D(1, (1,1), padding = 'same', activation = 'sigmoid')(x9)

    model = Model(input, x10, name = 'U-Net')

    return model


# Model 2
def unetmod(input_shape):
    input = Input(input_shape)
    x1, p1 = contracting_block(input, 32)
    x2, p2 = contracting_block(p1, 64)
    x3, p3 = contracting_block(p2, 128)
    x4, p4 = contracting_block(p3, 256)

    x5 = conv_block(p4, 512)

    x6 = expansive_block(x5, x4, 256)
    x7 = expansive_block(x6, x3, 128)
    x8 = expansive_block(x7, x2, 64)
    x9 = expansive_block(x8, x1, 32)

    x10 = Conv2D(1, (1,1), padding = 'same', activation = 'sigmoid')(x9)

    model = Model(input, x10, name = 'U-Net_Modification')

    return model


# Model 3
def unet_vgg19(input_shape):
    input = Input(input_shape)

    vgg19 = VGG19(include_top=False, weights="imagenet", input_tensor=input)
    for layer in vgg19.layers:
        layer.trainable = False

    x1 = vgg19.get_layer('block1_conv2').output
    x2 = vgg19.get_layer('block2_conv2').output
    x3 = vgg19.get_layer('block3_conv4').output
    x4 = vgg19.get_layer('block4_conv4').output

    x5 = vgg19.get_layer('block5_conv4').output

    x6 = expansive_block(x5, x4, 512)
    x7 = expansive_block(x6, x3, 256)
    x8 = expansive_block(x7, x2, 128)
    x9 = expansive_block(x8, x1, 64)

    x10 = Conv2D(1, (1,1), padding = 'same', activation = 'sigmoid')(x9)

    model = Model(input, x10, name = 'U-Net_vgg19')

    return model

In [None]:
# Function to calculate IoU
def BinaryIoU_func(y_true, y_pred):
    pred = tf.where(y_pred>=0.5, 1, 0)
    pred = tf.cast(pred, dtype = tf.float32)
    true = tf.cast(y_true, dtype = tf.float32)
    n_true = tf.reduce_sum(true)
    n_pred = tf.reduce_sum(pred)
    intersection = tf.reduce_sum(pred * true)
    union = n_true + n_pred - intersection
    iou = intersection/union
    return iou

In [None]:
# Function to save model
def save_segmentation_model(model_name, model, history):
    dir = f'SegmentationModel/{model_name}'
    try:
        os.makedirs(dir)
        print(f'SUCCESS CREATING {model_name} DIRECTORY')
    except:
        print('FOLDER ALREADY EXISTS')
    model.save(f'{dir}/model.h5')
    print('SUCCESS SAVING MODEL')
    with open(f'{dir}/history.json', 'w') as f:
        json.dump(history, f)
    print('SUCCESS SAVING HISTORY MODEL')

## Model 1

In [None]:
model = unet((256,256,3))
model.summary()

In [None]:
model.compile(optimizer = Adam(learning_rate=1e-4),
              loss = 'BinaryCrossentropy',
              metrics=[BinaryIoU_func])

result = model.fit(X_train,
                   y_train,
                   validation_data = (X_valid, y_valid),
                   batch_size = 32,
                   epochs = 50,
                   verbose = 2)

In [None]:
# Plot loss and IoU
plt.plot(result.history["loss"], color='r', label = 'train_loss')
plt.plot(result.history["val_loss"], color='b', label = 'val_loss')
plt.title('Loss')
plt.legend()
plt.show()

plt.plot(result.history["BinaryIoU_func"], color='r', label = 'train_IoU')
plt.plot(result.history["val_BinaryIoU_func"], color='b', label = 'val_IoU')
plt.title('IoU')
plt.legend()
plt.show()

In [None]:
# Save model
save_segmentation_model(model_name= 'UNET', model = model, history=result.history)

## Model 2

In [None]:
model = unetmod((256,256,3))
model.summary()

In [None]:
model.compile(optimizer = Adam(learning_rate=1e-4),
              loss = 'BinaryCrossentropy',
              metrics=[BinaryIoU_func])

In [None]:
result = model.fit(X_train,
                   y_train,
                   validation_data = (X_valid, y_valid),
                   batch_size = 32,
                   epochs = 50,
                   verbose = 2)

In [None]:
plt.plot(result.history["loss"], color='r', label = 'train_loss')
plt.plot(result.history["val_loss"], color='b', label = 'val_loss')
plt.title('Loss')
plt.legend()
plt.show()

plt.plot(result.history["BinaryIoU_func"], color='r', label = 'train_IoU')
plt.plot(result.history["val_BinaryIoU_func"], color='b', label = 'val_IoU')
plt.title('IoU')
plt.legend()
plt.show()

In [None]:
plt.plot(result.history["loss"], color='r', label = 'train_loss')
plt.plot(result.history["val_loss"], color='b', label = 'val_loss')
plt.title('Loss')
plt.legend()
plt.show()

plt.plot(result.history["BinaryIoU_func"], color='r', label = 'train_IoU')
plt.plot(result.history["val_BinaryIoU_func"], color='b', label = 'val_IoU')
plt.title('IoU')
plt.legend()
plt.show()

In [None]:
# Save model
save_segmentation_model(model_name= 'UNET_Mod', model = model, history=result.history)

## Model 3

In [None]:
model = unet_vgg19((256,256,3))
model.summary()

In [None]:
model.compile(optimizer = Adam(learning_rate=1e-4),
              loss = 'BinaryCrossentropy',
              metrics=[BinaryIoU_func])

In [None]:
result = model.fit(X_train,
                   y_train,
                   validation_data = (X_valid, y_valid),
                   batch_size = 32,
                   epochs = 50,
                   verbose = 2)

In [None]:
plt.plot(result.history["loss"], color='r', label = 'train_loss')
plt.plot(result.history["val_loss"], color='b', label = 'val_loss')
plt.title('Loss')
plt.legend()
plt.show()

plt.plot(result.history["BinaryIoU_func"], color='r', label = 'train_IoU')
plt.plot(result.history["BinaryIoU_func"], color='b', label = 'val_IoU')
plt.title('IoU')
plt.legend()
plt.show()

In [None]:
# Save model
save_segmentation_model(model_name = 'UNET_VGG19', model = model, history = result.history)

# Comparing The Result

In [None]:
# Open the result history
def open_json(json_path):
    with open(json_path, 'r') as f:
        data = json.load(f)
    return data

unet_ori = open_json('/SegmentationModel/UNET/history.json')
unet_mod = open_json('/SegmentationModel/UNET_Mod/history.json')
unet_vgg = open_json('/SegmentationModel/UNET_VGG19/history.json')

In [None]:
epochs = len(ori_IoU)

# Plot result
def plot_result(ori, mod, vgg, type_metrics, type_data):
    epochs = range(len(ori))
    plt.plot(epochs, ori, label = 'Model1')
    plt.plot(epochs, mod, label = 'Model2')
    plt.plot(epochs, vgg, label = 'Model3')
    plt.legend()
    plt.title(f'{type_metrics} ({type_data})')
    plt.xlabel('Epoch')
    plt.ylabel(f'{type_metrics}')
    plt.show()

In [None]:
# Loss
ori_loss = unet_ori['loss']
mod_loss = unet_mod['loss']
vgg_loss = unet_vgg['loss']

# Val loss
ori_val_loss = unet_ori['val_loss']
mod_val_loss = unet_mod['val_loss']
vgg_val_loss = unet_vgg['val_loss']

# IoU
ori_IoU = unet_ori['BinaryIoU_func']
mod_IoU = unet_mod['BinaryIoU_func']
vgg_IoU = unet_vgg['BinaryIoU_func']

# val_IoU
ori_val_IoU = unet_ori['val_BinaryIoU_func']
mod_val_IoU = unet_mod['val_BinaryIoU_func']
vgg_val_IoU = unet_vgg['val_BinaryIoU_func']

In [None]:
plot_result(ori_loss, mod_loss, vgg_loss, 'Loss', 'Training Data')
plot_result(ori_val_loss, mod_val_loss, vgg_val_loss, 'Loss', 'Validation Data')
plot_result(ori_IoU, mod_IoU, vgg_IoU, 'IoU', 'Training Data')
plot_result(ori_val_IoU, mod_val_IoU, vgg_val_IoU, 'IoU', 'Validation Data')