In [1]:
import numpy as np
from matplotlib import pyplot as plt
import os
os.environ["SM_FRAMEWORK"] = "tf.keras"
import sys
import time

from sklearn.preprocessing import LabelEncoder
from sklearn.utils.class_weight import compute_class_weight
from keras.utils import to_categorical

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.applications import VGG16
from tensorflow.keras.metrics import MeanIoU
from keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Conv2DTranspose, BatchNormalization, Dropout, Lambda, Activation
from keras import backend as K
from keras.layers import Layer
from tensorflow.keras import layers, Model, Input
from keras.optimizers import Adam

In [2]:
'''Load data'''
sys.path.extend(['..'])
from data.adv_data_generator import ImageLoader

file = ["train"]
type_mod = ["img","mask","aug"]
dataset_aug = [[],[],[]]
num = 0
for i in file:
    for j in type_mod:
        loader = ImageLoader(i, j)
        dataset_aug[num] = loader.loadAug()
        num += 1

file = ["test","val"]
type_ori = ["images","masks"]
dataset = [[],[],[],[]]
num = 0
for i in file:
    for j in type_ori:
        loader = ImageLoader(i, j)
        dataset[num] = loader.loadData()
        num += 1

In [3]:
'''Define dataset'''
train_img = dataset_aug[0]
train_mask = dataset_aug[1]
train_aug = dataset_aug[2]

test_img = dataset[0]
test_mask = dataset[1]
val_img = dataset[2]
val_mask = dataset[3]

In [4]:
import random

'''
(1) Set the seed
(2) Generate a random permutation of indices
(3) Use the same random indices for sampling both arrays
'''

def sampling(img, mask, seed, percentage=0.2):
    sample_size = int(len(img) * percentage)
    random.seed(seed)
    random_indices = random.sample(range(len(img)), sample_size)
    sample_img = img[random_indices]
    sample_mask = mask[random_indices]

    return sample_img, sample_mask

sam_val_img, sam_val_mask = sampling(val_img, val_mask, 4)
sam_test_img, sam_test_mask = sampling(test_img, test_mask, 7)

print(sam_val_mask.shape)
print(sam_test_mask.shape)
print(np.unique(sam_test_mask))

(6415, 64, 64)
(6415, 64, 64)
[0 1 2 3 4 5 6 7]


In [5]:
'''Define variable to run the model'''
# X = np.concatenate((mod_train_img, mod_train_aug), axis=0)
# y = np.concatenate((mod_train_mask, mod_train_mask), axis=0)
X = train_img
y = train_mask
X_val = sam_val_img
y_val = sam_val_mask

In [6]:
n_classes = 8
from keras.utils import to_categorical
y_cat = to_categorical(y, num_classes=n_classes)
y_val_cat = to_categorical(y_val, num_classes=n_classes)

In [7]:
class MaxPoolingWithArgmax2D(Layer):

    def __init__(
            self,
            pool_size=(2, 2),
            strides=(2, 2),
            padding='same',
            **kwargs):
        super(MaxPoolingWithArgmax2D, self).__init__(**kwargs)
        self.padding = padding
        self.pool_size = pool_size
        self.strides = strides

    def call(self, inputs, **kwargs):
#         print("max pooling with argmax")
        padding = self.padding
        pool_size = self.pool_size
        strides = self.strides
        if K.backend() == 'tensorflow':
            ksize = [1, pool_size[0], pool_size[1], 1]
            padding = padding.upper()
            strides = [1, strides[0], strides[1], 1]
            output, argmax = tf.nn.max_pool_with_argmax(
                    inputs,
                    ksize=ksize,
                    strides=strides,
                    padding=padding)
        else:
            errmsg = '{} backend is not supported for layer {}'.format(
                    K.backend(), type(self).__name__)
            raise NotImplementedError(errmsg)
        argmax = tf.cast(argmax, K.floatx())
        return [output, argmax]
    
    def compute_output_shape(self, input_shape):
#         print("i guess its subsampling")
        ratio = (1, 2, 2, 1)
        output_shape = [
                dim//ratio[idx]
                if dim is not None else None
                for idx, dim in enumerate(input_shape)]
        output_shape = tuple(output_shape)
        return [output_shape, output_shape]

class MaxUnpooling2D(Layer):
    def __init__(self, size=(2, 2), **kwargs):
        super(MaxUnpooling2D, self).__init__(**kwargs)
        self.size = size

    def call(self, inputs, output_shape=None):
        # one is pool and one is mask
        updates, mask = inputs[0], inputs[1]

        mask = tf.cast(mask, 'int32')
        input_shape = tf.shape(updates, out_type='int32')

        #  calculation new shape
        if output_shape is None:
            output_shape = (
                input_shape[0],
                input_shape[1]*self.size[0],
                input_shape[2]*self.size[1],
                input_shape[3])
        self.output_shape1 = output_shape

        # calculation indices for batch, height, width and feature maps
        one_like_mask = tf.ones_like(mask, dtype='int32')      #creates ones of the same shape as the mask
        batch_shape = tf.concat([[input_shape[0]], [1], [1], [1]], axis=0)
        batch_range = tf.reshape(tf.range(output_shape[0], dtype='int32'),shape=batch_shape)
        b = one_like_mask * batch_range

        y = mask // (output_shape[2] * output_shape[3])

        x = (mask // output_shape[3]) % output_shape[2]

        feature_range = tf.range(output_shape[3], dtype='int32')

        f = one_like_mask * feature_range

        # transpose indices & reshape update values to one dimension
        updates_size = tf.size(updates)       # Prints the number of elements in the updates
        indices = tf.transpose(tf.reshape(tf.stack([b, y, x, f]), [4, updates_size]))
        values = tf.reshape(updates, [updates_size])
        ret = tf.scatter_nd(indices, values, output_shape)
        return ret

    def compute_output_shape(self, input_shape):
        mask_shape = input_shape[1]
        return (
                mask_shape[0],
                mask_shape[1]*self.size[0],
                mask_shape[2]*self.size[1],
                mask_shape[3]
                )

In [8]:
import keras
from keras.models import *
from keras.layers import *
import tensorflow as tf

IMAGE_ORDERING = "channels_last"

if IMAGE_ORDERING == 'channels_first':
    pretrained_url = "https://github.com/fchollet/deep-learning-models/" \
                     "releases/download/v0.1/" \
                     "vgg16_weights_th_dim_ordering_th_kernels_notop.h5"
elif IMAGE_ORDERING == 'channels_last':
    pretrained_url = "https://github.com/fchollet/deep-learning-models/" \
                     "releases/download/v0.1/" \
                     "vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5"


def get_vgg_encoder(input_height=224,  input_width=224, pretrained='imagenet', channels=3):

    assert input_height % 32 == 0
    assert input_width % 32 == 0

    if IMAGE_ORDERING == 'channels_first':
        img_input = Input(shape=(channels, input_height, input_width))
    elif IMAGE_ORDERING == 'channels_last':
        img_input = Input(shape=(input_height, input_width, channels))

    x = Conv2D(64, (3, 3), activation='relu', padding='same',
               name='block1_conv1', data_format=IMAGE_ORDERING)(img_input)
    x = Conv2D(64, (3, 3), activation='relu', padding='same',
               name='block1_conv2', data_format=IMAGE_ORDERING)(x)
    skip = x
    x = MaxPooling2D((2, 2), strides=(2, 2), name='block1_pool',
                     data_format=IMAGE_ORDERING)(x)
    f1 = x
    # Block 2
    x = Conv2D(128, (3, 3), activation='relu', padding='same',
               name='block2_conv1', data_format=IMAGE_ORDERING)(x)
    x = Conv2D(128, (3, 3), activation='relu', padding='same',
               name='block2_conv2', data_format=IMAGE_ORDERING)(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name='block2_pool',
                     data_format=IMAGE_ORDERING)(x)
    f2 = x

    # Block 3
    x = Conv2D(256, (3, 3), activation='relu', padding='same',
               name='block3_conv1', data_format=IMAGE_ORDERING)(x)
    x = Conv2D(256, (3, 3), activation='relu', padding='same',
               name='block3_conv2', data_format=IMAGE_ORDERING)(x)
    x = Conv2D(256, (3, 3), activation='relu', padding='same',
               name='block3_conv3', data_format=IMAGE_ORDERING)(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name='block3_pool',
                     data_format=IMAGE_ORDERING)(x)
    f3 = x

    # Block 4
    x = Conv2D(512, (3, 3), activation='relu', padding='same',
               name='block4_conv1', data_format=IMAGE_ORDERING)(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same',
               name='block4_conv2', data_format=IMAGE_ORDERING)(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same',
               name='block4_conv3', data_format=IMAGE_ORDERING)(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name='block4_pool',
                     data_format=IMAGE_ORDERING)(x)
    f4 = x

    # Block 5
    x = Conv2D(512, (3, 3), activation='relu', padding='same',
               name='block5_conv1', data_format=IMAGE_ORDERING)(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same',
               name='block5_conv2', data_format=IMAGE_ORDERING)(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same',
               name='block5_conv3', data_format=IMAGE_ORDERING)(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name='block5_pool',
                     data_format=IMAGE_ORDERING)(x)
    f5 = x

    if pretrained == 'imagenet':
        VGG_Weights_path = tf.keras.utils.get_file(
            pretrained_url.split("/")[-1], pretrained_url)
        Model(img_input, x).load_weights(VGG_Weights_path, by_name=True, skip_mismatch=True)

    return img_input, [f1, f2, f3, f4, f5], skip

In [9]:
activation = 'relu'

def Conv2DBlock(input_tensor, filters, kernel_size, name_block):
    x = layers.Conv2D(filters, kernel_size, padding='same', name=name_block)(input_tensor)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(activation)(x)
    return x
    
def usegnet_vgg(n_classes, IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS, pool_size=(2, 2)):

    img_input, [f1, f2, f3, f4, f5], skip = get_vgg_encoder(input_height=IMG_HEIGHT, input_width=IMG_WIDTH)
    
    #bottleneck
    x = Conv2DBlock(f3, 256, 3, "block4_conv1")
    x = Conv2DBlock(x, 256, 3, "block4_conv2")
    x = Conv2DBlock(x, 256, 3, "block4_conv3")
    
    #decoder
    unpool_1 = UpSampling2D((2, 2), name="block5_unpool")(x)
    x = Conv2DBlock(unpool_1, 256, 3, "block5_conv1")
    x = Conv2DBlock(x, 256, 3, "block5_conv2")
    x = Conv2DBlock(x, 128, 3, "block5_conv3")
    
    unpool_2 = UpSampling2D((2, 2), name="block6_unpool")(x)
    x = Conv2DBlock(unpool_2, 128, 3, "block6_conv1")
    x = Conv2DBlock(x, 64, 3, "block6_conv2")
    
    unpool_3 = UpSampling2D((2, 2), name="block7_unpool")(x)
    x = concatenate([unpool_3, skip], axis = -1, name="block7_concatenate")
    x = Conv2D(64, (1, 1), padding="same", kernel_initializer='he_normal', name="block7_conv1")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    
    x = Conv2DBlock(x, 64, 3, "block8_conv1")
    x = Conv2D(n_classes, (1, 1), padding="same", kernel_initializer='he_normal')(x)
    x = BatchNormalization()(x)
    outputs = Activation("softmax")(x)

    model = Model(inputs=[img_input], outputs=[outputs])

    return model


In [10]:
# Define the weighted categorical cross-entropy loss function
def weighted_categorical_crossentropy(weights):
    """
    A weighted version of keras.objectives.categorical_crossentropy
    
    Variables:
        weights: numpy array of shape (C,) where C is the number of classes
    
    Usage:
        weights = np.array([0.5,2,10]) # Class one at 0.5, class 2 twice the normal weights, class 3 10x.
        loss = weighted_categorical_crossentropy(weights)
        model.compile(loss=loss,optimizer='adam')
    """
    
    weights = tf.constant(weights, dtype=tf.float32)
        
    def loss(y_true, y_pred):
        # scale predictions so that the class probas of each sample sum to 1
        y_pred /= tf.reduce_sum(y_pred, axis=-1, keepdims=True)
        # clip to prevent NaN's and Inf's
        y_pred = tf.clip_by_value(y_pred, K.epsilon(), 1 - K.epsilon())
        # calc
        loss = y_true * tf.math.log(y_pred) * weights
        loss = -tf.reduce_sum(loss, -1)
        return loss
    
    return loss

In [11]:
IMG_HEIGHT = X.shape[1]
IMG_WIDTH  = X.shape[2]
IMG_CHANNELS = X.shape[3]
weights = np.array([1914880.0, 11.11, 9.98, 2.2, 1.14, 0.44, 0.25, 2.92])

In [12]:
model = usegnet_vgg(8, 64, 64, 3)
# model.summary()

In [13]:
from keras.optimizers import Adam
import segmentation_models as sm

optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, loss="categorical_crossentropy", metrics=['accuracy', sm.metrics.iou_score])

Segmentation Models: using `tf.keras` framework.


In [14]:
start_time = time.time()
history = model.fit(X, y_cat, 
                    batch_size = 64, 
                    verbose=1, 
                    epochs=50, 
                    validation_data=(X_val, y_val_cat),
                    shuffle=False)
end_time = time.time()
training_time = end_time - start_time
print(f"Training time: {training_time:.4f} seconds")

Epoch 1/50
[1m468/468[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1074s[0m 2s/step - accuracy: 0.5539 - iou_score: 0.1005 - loss: 1.4767 - val_accuracy: 0.6635 - val_iou_score: 0.1488 - val_loss: 1.0737
Epoch 2/50
[1m468/468[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1066s[0m 2s/step - accuracy: 0.6885 - iou_score: 0.1655 - loss: 0.9516 - val_accuracy: 0.6330 - val_iou_score: 0.1716 - val_loss: 1.0308
Epoch 3/50
[1m468/468[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1067s[0m 2s/step - accuracy: 0.7058 - iou_score: 0.1951 - loss: 0.8125 - val_accuracy: 0.7261 - val_iou_score: 0.2142 - val_loss: 0.7477
Epoch 4/50
[1m468/468[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1066s[0m 2s/step - accuracy: 0.7254 - iou_score: 0.2187 - loss: 0.7242 - val_accuracy: 0.4615 - val_iou_score: 0.1514 - val_loss: 1.3914
Epoch 5/50
[1m468/468[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1067s[0m 2s/step - accuracy: 0.7377 - iou_score: 0.2349 - loss: 0.6738 - val_accuracy: 0.7075 - v

In [21]:
X_test, y_test = sam_test_img, sam_test_mask

In [22]:
y_test_cat = to_categorical(y_test, num_classes=n_classes)

In [23]:
# Predict and evaluate
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=-1)
y_true_classes = y_test

[1m201/201[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m55s[0m 273ms/step


In [24]:
# Flatten the arrays
y_pred_flat = y_pred_classes.flatten()
y_true_flat = y_true_classes.flatten()

In [25]:
# Generate the classification report
from sklearn.metrics import classification_report
report = classification_report(y_true_flat, y_pred_flat, target_names=['Class 0', 
                                                                       'Class 1', 
                                                                       'Class 2', 
                                                                       'Class 3', 
                                                                       'Class 4', 
                                                                       'Class 5', 
                                                                       'Class 6', 
                                                                       'Class 8'])
print(report)

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


              precision    recall  f1-score   support

     Class 0       0.00      0.00      0.00         8
     Class 1       0.47      0.10      0.16    293759
     Class 2       0.37      0.64      0.47    324962
     Class 3       0.48      0.48      0.48   1487533
     Class 4       0.60      0.61      0.60   2862958
     Class 5       0.66      0.78      0.72   7413470
     Class 6       0.95      0.82      0.88  12772966
     Class 8       0.52      0.63      0.57   1120184

    accuracy                           0.75  26275840
   macro avg       0.51      0.51      0.49  26275840
weighted avg       0.77      0.75      0.76  26275840



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [26]:
#IOU

n_classes = 8
IOU_keras = MeanIoU(num_classes=n_classes)  
IOU_keras.update_state(y_test, y_pred_classes)
print("Mean IoU =", IOU_keras.result().numpy())

Mean IoU = 0.36115667
