# Semantic Traffic Image Segmentation

We'll build a semantic traffic image segmentation model, using a UNet network. This kind of task allows you to predict a precise mask for each object in the image by labeling each pixel with its corresponding class. 

<center><img src="images/carseg.png" width="45%" height="45%"></center>
<caption><center><b>Figure 1</b>: Example of a segmented image</center></caption>
<br>

Region-specific labeling is a pretty crucial consideration for self-driving cars, which require a pixel-perfect understanding of their environment so they can change lanes and avoid other cars, or any number of traffic obstacles that can put peoples' lives in danger. 

## Packages

In [None]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf

import imageio.v2 as imageio

import matplotlib.pyplot as plt
%matplotlib inline

from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import Dropout 
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import concatenate

<a name='1'></a>
## 1 - Dataset

<a name='1.1'></a>
### 1.1 - Loading the dataset

In [None]:
path = ''
image_path = os.path.join(path, './data/CameraRGB/')
mask_path = os.path.join(path, './data/CameraMask/')

image_list_orig = os.listdir(image_path)
image_list = [image_path + i for i in image_list_orig]
mask_list = [mask_path + i for i in image_list_orig]

In [None]:
# Visualize unmasked and masked images from the dataset
N = 2
img = imageio.imread(image_list[N])
mask = imageio.imread(mask_list[N])

fig, axes = plt.subplots(1, 2, figsize=(9, 6))

axes[0].imshow(img)
axes[0].set_title('Image')
axes[0].set_xticks([])
axes[0].set_yticks([])

axes[1].imshow(mask[:, :, 0])
axes[1].set_title('Segmentation')
axes[1].set_xticks([])
axes[1].set_yticks([])

In [None]:
# Ensure to load the same pattern files (.png)
image_list_ds = tf.data.Dataset.list_files(image_list, shuffle=False)
mask_list_ds = tf.data.Dataset.list_files(mask_list, shuffle=False)

for path in zip(image_list_ds.take(1), mask_list_ds.take(1)):
    print(path)

In [None]:
# Create the image path dataset
image_filenames = tf.constant(image_list)
masks_filenames = tf.constant(mask_list)

dataset = tf.data.Dataset.from_tensor_slices((image_filenames, masks_filenames))

for image, mask in dataset.take(1):
    print(image)
    print(mask)

<a name='1-2'></a>
### 1.2 - Data treatment

We need first to decode png files into 3 channels images, so we can normalize and resize all of them to a standard shape.

In [None]:
def process_path(image_path, mask_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_png(img, channels=3)
    img = tf.image.convert_image_dtype(img, tf.float32)

    mask = tf.io.read_file(mask_path)
    mask = tf.image.decode_png(mask, channels=3)
    mask = tf.math.reduce_max(mask, axis=-1, keepdims=True)
    
    return img, mask


def preprocess(image, mask):
    input_image = tf.image.resize(image, (96, 128), method='nearest')
    input_mask = tf.image.resize(mask, (96, 128), method='nearest')

    return input_image, input_mask

image_ds = dataset.map(process_path)
processed_image_ds = image_ds.map(preprocess)

<a name='2'></a>
## 2 - UNet Model

U-Net uses a matching number of convolutions for downsampling the input image and transposed convolutions for upsampling back up to the original input image size. It also adds skip connections, to retain information that would otherwise become lost during encoding.

<a name='2-1'></a>
### 2.1 - Model architecture

<center><img src="images/unet.png" width="60%" height="60%"></center>
<caption><center><b>Figure 2</b>: U-Net Architecture</center></caption>

<a name='2-2'></a>
### 2.2 - Downsampling block

The contracting path follows a regular CNN architecture, with the repeated application of two 3 x 3 valid padding convolutions, each followed by a rectified linear unit (ReLU) and a 2 x 2 max pooling operation with stride 2 for downsampling. At each downsampling step, the number of feature channels is doubled.
<br><br>

<center><img src="images/encoder.png" width="50%" height="50%"></center>
<caption><center><b>Figure 3</b>: The U-Net Encoder<br></center></caption>
<br>

The function will return two tensors: 
- `next_layer`: That will go into the next block. 
- `skip_connection`: That will go into the corresponding decoding block.

Notice that the output used to create the skip connection it's from the Conv2D (or Dropout) layer, and not from MaxPooling.

In [None]:
def conv_block(inputs=None, n_filters=64, dropout_prob=0, max_pooling=True):
    """
    Convolutional downsampling block
    
    Arguments:
        inputs -- Input tensor
        n_filters -- Number of filters for the convolutional layers
        dropout_prob -- Dropout probability
        max_pooling -- Use MaxPooling2D to reduce the spatial dimensions of the output volume
        
    Returns: 
        next_layer, skip_connection --  Next layer and skip connection outputs
    """
    
    conv = Conv2D(n_filters,
                  3,   
                  activation='relu',
                  padding='same',
                  kernel_initializer='he_normal')(inputs)
    
    conv = Conv2D(n_filters,
                  3,
                  activation='relu',
                  padding='same',
                  kernel_initializer='he_normal')(conv)

    if dropout_prob > 0:
        conv = Dropout(dropout_prob)(conv)

    if max_pooling:
        next_layer = MaxPooling2D(strides=(2,2))(conv)
        
    else:
        next_layer = conv
        
    skip_connection = conv
    
    return next_layer, skip_connection

<a name='2-3'></a>
### 2.3 - Upsampling block

The expanding path performs the opposite operation of the contracting path, growing the image back to its original size, while shrinking the channels gradually. In each step, we first concatenate the correspondingly cropped feature map from the contracting path and then upsamples the output through two 3 x 3 convolutions, each followed by a ReLU. At the end, we feed the feature map to a 2 x 2 transposed convolution, which halves the number of feature channels, while growing the height and width of the image. 

<center><img src="images/decoder.png" width="60%" height="60%"></center>
<caption><center><b>Figure 4</b>: The U-Net Decoder</center></caption>

In [None]:
def upsampling_block(expansive_input, contractive_input, n_filters=64):
    """
    Convolutional upsampling block
    
    Arguments:
        expansive_input -- Input tensor from previous layer
        contractive_input -- Input tensor from previous skip layer
        n_filters -- Number of filters for the convolutional layers
        
    Returns: 
        conv -- Tensor output
    """

    up = Conv2DTranspose(n_filters,    
                         3,    
                         strides=(2, 2),
                         padding='same')(expansive_input)
    
    merge = concatenate([up, contractive_input], axis=3)
    
    conv = Conv2D(n_filters,   
                  3,     
                  activation='relu',
                  padding='same',
                  kernel_initializer='he_normal')(merge)
    
    conv = Conv2D(n_filters,  
                  3,   
                  activation='relu',
                  padding='same',
                  kernel_initializer='he_normal')(conv)
    
    return conv

<a name='2-4'></a>
### 2.4 - Building the model

In the final layer, a 1x1 convolution is used to map each of the 64 feature vector components to the desired number of classes. In this self-driving car dataset, there are 23 possible labels for each pixel.

In [None]:
def unet_model(input_size=(96, 128, 3), n_filters=64, n_classes=23):
    """
    Unet model
    
    Arguments:
        input_size -- Input shape 
        n_filters -- Number of filters for the convolutional layers
        n_classes -- Number of output classes
    Returns: 
        model -- tf.keras.Model
    """
    
    inputs = Input(input_size)
    
    # Contracting Path (encoding)
    cblock1 = conv_block(inputs, n_filters)
    cblock2 = conv_block(cblock1[0], 2*n_filters)
    cblock3 = conv_block(cblock2[0], 4*n_filters)
    cblock4 = conv_block(cblock3[0], 8*n_filters, dropout_prob=0.3) 
    cblock5 = conv_block(cblock4[0], 16*n_filters, dropout_prob=0.3, max_pooling=False) 
    
    # Expanding Path (decoding)
    ublock6 = upsampling_block(cblock5[0], cblock4[1], 8*n_filters)
    ublock7 = upsampling_block(ublock6, cblock3[1], 4*n_filters)
    ublock8 = upsampling_block(ublock7, cblock2[1], 2*n_filters)
    ublock9 = upsampling_block(ublock8, cblock1[1], n_filters)

    conv9 = Conv2D(n_filters,
                   3,
                   activation='relu',
                   padding='same',
                   kernel_initializer='he_normal')(ublock9)
    conv10 = Conv2D(n_classes, 1, padding='same')(conv9)
    
    model = tf.keras.Model(inputs=inputs, outputs=conv10)

    return model

<a name='2-5'></a>
### 2.5 - Model dimensions

In [None]:
img_height = 96
img_width = 128
num_channels = 3

unet = unet_model((img_height, img_width, num_channels))

In [None]:
# Check model summary
unet.summary()

<a name='2-6'></a>
### 2.6 - Loss function

In the dataset we're using, the pixel labels (from masked images) are integers (from 0-22) assigned in agreement with the class they represent. This is different from categorical crossentropy, where the labels should be one-hot encoded (just 0s and 1s). So we need to use sparse categorical crossentropy as the loss function, to perform pixel-wise multiclass prediction.

In [None]:
unet.compile(optimizer='adam',
            loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
            metrics=['accuracy'])

<a name='2.7'></a>
### 2.7 - Train the model

In [None]:
epochs = 100
batch_size = 32
buffer_size = processed_image_ds.cardinality()

train_dataset = processed_image_ds.cache().shuffle(buffer_size).batch(batch_size)
print(processed_image_ds.element_spec)

model_history = unet.fit(train_dataset, epochs=epochs)

<a name='2.8'></a>
### 2.8 - Plot history and accuracy

In [None]:
df_loss_acc = pd.DataFrame(history.history)

# Plotting loss
plt.figure(figsize=(5,4))
plt.plot(df_loss_acc['loss'])
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.xlabel('Epochs', fontsize=12)
plt.ylabel('Loss', fontsize=12)
plt.title('')

# Plotting accuracy
plt.figure(figsize=(5,4))
plt.plot(df_loss_acc['accuracy'])
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.xlabel('Epochs', fontsize=12)
plt.ylabel('Accuracy', fontsize=12)
plt.title('')

<a name='3'></a>
## 3 - Results

<a name='3-1'></a>
### 3.1 - Dataset handling

Function that allows display input image, true mask and predicted mask.

In [None]:
def display(display_list):
    l = len(display_list)
    plt.figure(figsize=(5*l, 5*l))

    title = ['Input Image', 'True Mask', 'Predicted Mask']
    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i + 1)
        plt.title(title[i])
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        plt.axis('off')
    plt.show()

In [None]:
for image, mask in processed_image_ds.take(1):
    sample_image, sample_mask = image, mask
    print(mask.shape)
    
display([sample_image, sample_mask])

<a name='3-2'></a>
### 3.2 - Create predicted masks 

Function that return the index with the largest class value.

In [None]:
def create_mask(pred_mask):
    pred_mask = tf.argmax(pred_mask, axis=-1)
    pred_mask = pred_mask[..., tf.newaxis]
    
    return pred_mask[0]

<a name='3-3'></a>
### 3.3 - Show predictions 

In [None]:
def show_predictions(dataset, num):
    """
    Displays the first image of each of the num batches
    """
    
    for image, mask in dataset.take(num):
        pred_mask = unet.predict(image)
        display([image[0], mask[0], create_mask(pred_mask)])

In [None]:
show_predictions(train_dataset, 4)