## Develop a RGB-Depth fusion architecture for semantic segmentation based on Fully Convolutional Network (FCN) .


Importing all necessary libraries and modules

In [1]:
import os 
import numpy as np 
import cv2

from sklearn.preprocessing import OneHotEncoder 
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, auc, precision_recall_curve, average_precision_score, confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd 

import tensorflow as tf
from tensorflow import keras
from keras.regularizers import l2
from keras.optimizers import SGD
from keras.losses import categorical_crossentropy
from keras.preprocessing.image import ImageDataGenerator
from keras.applications import ResNet50
from keras.applications.resnet50 import preprocess_input
from keras.utils import Sequence
from keras.models import Model
from keras.layers import Input, Conv2D, Dropout, Concatenate, Conv2DTranspose, Reshape, Softmax


### Load the dataset and Ground-truth

- Dataset consists of 1100 (per modality) images of road scenes. It is divided into train (600 images), test (200 images) and validation (300 images) datasets.
- Change the size of all images into 256*256.
- Converting the labels into one hot encoding
- Create a DataLoader for loading the files when training the model.

In [2]:
num_classes = 19

In [3]:
class CustomDataLoader(Sequence):
    def __init__(self, data_dir, batch_size=32, mode='train'):
        self.data_dir = data_dir
        self.batch_size = batch_size
        self.mode = mode
        self.onehot_encoder = OneHotEncoder()
        
        self.rgb_files = sorted(os.listdir(os.path.join(data_dir, mode, 'rgb')))
        self.depth_files = sorted(os.listdir(os.path.join(data_dir, mode, 'depth')))
        self.label_files = sorted(os.listdir(os.path.join(data_dir, mode, 'label')))
        
    def __len__(self):
        return int(np.ceil(len(self.rgb_files) / float(self.batch_size)))
    
    def __getitem__(self, idx):
        batch_rgb_files = self.rgb_files[idx * self.batch_size : (idx + 1) * self.batch_size]
        batch_depth_files = self.depth_files[idx * self.batch_size : (idx + 1) * self.batch_size]
        batch_label_files = self.label_files[idx * self.batch_size : (idx + 1) * self.batch_size]
        
        X_rgb, X_depth, y = self.__generate_data(batch_rgb_files, batch_depth_files, batch_label_files)
        
        return [X_rgb, X_depth], y
    
    def __generate_data(self, batch_rgb_files, batch_depth_files, batch_label_files):
        X_rgb = np.array([cv2.resize(np.load(os.path.join(self.data_dir, self.mode, 'rgb', filename)), (256, 256)) for filename in batch_rgb_files])
        X_rgb = X_rgb / 255.0
        X_depth = np.array([cv2.resize(np.load(os.path.join(self.data_dir, self.mode, 'depth', filename)), (256, 256)) for filename in batch_depth_files])
        X_depth = self.normalize_depth(X_depth)
        X_depth_rgb = np.stack((X_depth,) * 3, axis=-1)

        y = np.array([cv2.resize(np.load(os.path.join(self.data_dir, self.mode, 'label', filename)), (256, 256)) for filename in batch_label_files])
        y_one_hot = keras.utils.to_categorical(y, num_classes=num_classes)
        
        return X_rgb, X_depth_rgb, y_one_hot
    
    def normalize_depth(self, depth):
        depth_min = np.min(depth)
        depth_max = np.max(depth)
        normalized_depth = (depth - depth_min) / (depth_max - depth_min)
        return normalized_depth
    
    def visualize_examples(self, num_examples=5):
        fig, axes = plt.subplots(num_examples, 3, figsize=(9, 9))
        for i in range(num_examples):
            idx = np.random.randint(len(self.rgb_files))
            rgb = np.load(os.path.join(self.data_dir, self.mode, 'rgb', self.rgb_files[idx]))
            depth = np.load(os.path.join(self.data_dir, self.mode, 'depth', self.depth_files[idx]))
            label = np.load(os.path.join(self.data_dir, self.mode, 'label', self.label_files[idx]))

            axes[i, 0].imshow(rgb)
            axes[i, 0].set_title('RGB')
            axes[i, 0].axis('off')
            
            axes[i, 1].imshow(depth)
            axes[i, 1].set_title('Depth')
            axes[i, 1].axis('off')
            
            axes[i, 2].imshow(label)
            axes[i, 2].set_title('Label')
            axes[i, 2].axis('off')
            
        plt.tight_layout()
        plt.show()

- Visualize the data prepared data

In [None]:
loader = CustomDataLoader(data_dir='../Datasets/road_scenes', batch_size=32, mode='train')
loader.visualize_examples(num_examples=4)

 Here, we define the Fully Convolutional Network (FCN) for image segmentaion by fusing RGB and depth images. The network consists of two sterams which each stream having the following layers:

    1. Pretrained ResNet50 on imageNet as backbone.
    2. We Add two Conv layers with 128 and 256 nodes, respectively. Kernel size (3,3), stride (1,1)
    3. Top of the Conv layers, we add dropout layer with 0.2.
    4. We then concatenate two streams.
    5. Then, we add a transposed convolution layer (Conv2DTranspose)  with Kernel size (64,64), stride (32,32)
    6. Finally, add a softmax activation layer.
    

In [5]:
def build_rgb_model(input_shape, for_fusion=True):
    rgb_input = Input(shape=input_shape)

    base_model = ResNet50(weights='imagenet', include_top=False, input_shape=input_shape)
    rgb_features = base_model(rgb_input)

    x = Conv2D(128, (3, 3), strides=(1, 1), padding='same', activation='relu')(rgb_features)
    x = Conv2D(256, (3, 3), strides=(1, 1), padding='same', activation='relu')(x)
    x = Dropout(0.2)(x)
    
    if for_fusion:
        return x
    else:
        pass

In [6]:
def build_depth_model(input_shape, for_fusion=True):
    depth_input = Input(shape=input_shape)

    base_model = ResNet50(weights='imagenet', include_top=False, input_shape=input_shape)
    rgb_features = base_model(depth_input)
    
    x = Conv2D(128, (3, 3), strides=(1, 1), padding='same', activation='relu')(rgb_features)
    x = Conv2D(256, (3, 3), strides=(1, 1), padding='same', activation='relu')(x)
    x = Dropout(0.2)(x)
    
    return x

In [19]:
def create_fcn(input_shape, trainable_layers):
    rgb_input = Input(shape=input_shape)
    depth_input = Input(shape=input_shape)
    
    base_model_rgb = ResNet50(weights='imagenet', include_top=False, input_shape=input_shape)
    base_model_rgb._name = 'pretrained_rgb_model'
    
    for layer in base_model_rgb.layers:
        layer._name = f'{layer.name}_rgb'

    base_model_depth = ResNet50(weights='imagenet', include_top=False, input_shape=input_shape)
    base_model_depth._name = 'pretrained_depth_model'

    for layer in base_model_depth.layers:
        layer._name = f'{layer.name}_depth'

    
    conv1_rgb = Conv2D(128, (3, 3), activation='relu', padding='same')(base_model_rgb.output)
    conv2_rgb = Conv2D(256, (3, 3), activation='relu', padding='same')(conv1_rgb)
    conv2_rgb = Dropout(0.2)(conv2_rgb)
    
    conv1_depth = Conv2D(128, (3, 3), activation='relu', padding='same')(base_model_depth.output)
    conv2_depth = Conv2D(256, (3, 3), activation='relu', padding='same')(conv1_depth)
    conv2_depth = Dropout(0.2)(conv2_depth)
    
    concat_features = Concatenate(axis=-1)([conv2_rgb, conv2_depth])
    
    transposed_conv = Conv2DTranspose(num_classes, kernel_size=(64, 64), strides=(32, 32), padding='same')(concat_features)
    
    reshaped_output = Reshape((input_shape[0], input_shape[1], num_classes))(transposed_conv)

    output = Softmax()(reshaped_output)
    
    model = Model(inputs=[base_model_rgb.input, base_model_depth.input], outputs=output)
    
    return model

Compiling and training the model

In [None]:
model = create_fcn(input_shape=(256, 256, 3), trainable_layers=[])
model.summary()

keras.utils.plot_model(model, show_shapes=True)


In [None]:
lr_schedule = keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=0.008,
    decay_steps=10000,
    decay_rate=1e-6)

sgd = tf.keras.optimizers.legacy.SGD(learning_rate=0.008, decay=1e-6, momentum=0.9)
model.compile(optimizer=sgd, loss=categorical_crossentropy, metrics=['accuracy'])

In [None]:
train_loader = CustomDataLoader(data_dir='../Datasets/road_scenes', batch_size=32, mode='train')
validation_loader = CustomDataLoader(data_dir='../Datasets/road_scenes', batch_size=32, mode='validation')

history = model.fit(train_loader, epochs=10, validation_data=validation_loader)

 We evaluate the trained model on the training and test dataset. The results are shown as: 

- Loss and accuracy of model for test dataset.

- Prediction of semantically segmented images on 5 random example of test dataset.

- Visualization the 5 random examples alongside the ground truth and prediction.

In [None]:
test_loader = CustomDataLoader(data_dir='../Datasets/road_scenes', batch_size=32, mode='test')
test_loss, test_accuracy = model.evaluate(test_loader)

print("Test Loss:", test_loss)
print("Test Accuracy:", test_accuracy)

In [None]:
num_examples = 5  

fig, axes = plt.subplots(num_examples, 4, figsize=(12, 12))
for i in range(num_examples):
    rgb = np.array(cv2.resize(np.load(os.path.join('../Datasets/road_scenes', 'test', 'rgb', f'{i}.npy')), (256,256)))
    
    depth = np.array(cv2.resize(np.load(os.path.join('../Datasets/road_scenes', 'test', 'depth', f'{i}.npy')), (256,256))) 
    depth_rgb = np.stack((depth,) * 3, axis=-1)

    prediction = model.predict([np.expand_dims(rgb, axis=0), np.expand_dims(depth_rgb, axis=0)])

    predicted_mask = np.argmax(prediction, axis=-1)

    axes[i, 0].imshow(rgb)
    axes[i, 0].set_title('RGB')
    axes[i, 0].axis('off')

    axes[i, 1].imshow(depth)
    axes[i, 1].set_title('Depth')
    axes[i, 1].axis('off')

    if os.path.exists(os.path.join('../Datasets/road_scenes', 'test', 'label', f'{i}.npy')):
        label = np.array(cv2.resize(np.load(os.path.join('../Datasets/road_scenes', 'test', 'label', f'{i}.npy')), (256,256)))
        axes[i, 2].imshow(label)
        axes[i, 2].set_title('Ground Truth Label')
    else:
        axes[i, 2].set_title('Ground Truth Label (Not Available)')
    axes[i, 2].axis('off')

    axes[i, 3].imshow(predicted_mask[0], cmap='jet', vmin=0, vmax=18)
    axes[i, 3].set_title('Predicted Mask')
    axes[i, 3].axis('off')

plt.tight_layout()
plt.show()

### Extra fun experiment :

Implement FCNs for each sing modality and compare their accuracy with fusion model. Compare the performance of the fused model to the single modality based models.

Enjoy!!!