# **Lightweight Monocular Depth Estimation on Edge Devices**

This paper was published most recently at the IEE Xplore platform that proposed a novice LightWeight Method of Depth Estimation on Edge Devices. They proposed a completely new artchitecture derived from MobileNet, reduced latecy using Pruning Methods and then finally optimized it further by GPU overhead Scheduling.

# **Setup and Imports**

In [None]:
import os 
import sys 

import tensorflow as tf
from tensorflow.nn import relu6 
from tensorflow.keras import layers
from tensorflow.keras import Model 
from tensorflow.keras import optimizers as OPT
from tensorflow.keras import losses
from tensorflow.keras.utils import Sequence 

import pandas as pd 
import numpy as np
import cv2 
import matplotlib.pyplot as plt

 
tf.random.set_seed(123)

# **Loading the Dataset**

The Datasets have already been downloaded and annotated in an external hard disk. There is also a .csv file that has the path to each RGB image andd their corresponding depth maps.

In [None]:
path_root = '/content/drive/MyDrive/Colab Notebooks/'
df = pd.read_csv("nyu2_test.csv", names = ["image", "depth"])
df

Unnamed: 0,image,depth
0,data/nyu2_test/00000_colors.png,data/nyu2_test/00000_depth.png
1,data/nyu2_test/00001_colors.png,data/nyu2_test/00001_depth.png
2,data/nyu2_test/00008_colors.png,data/nyu2_test/00008_depth.png
3,data/nyu2_test/00013_colors.png,data/nyu2_test/00013_depth.png
4,data/nyu2_test/00014_colors.png,data/nyu2_test/00014_depth.png
...,...,...
649,data/nyu2_test/01444_colors.png,data/nyu2_test/01444_depth.png
650,data/nyu2_test/01445_colors.png,data/nyu2_test/01445_depth.png
651,data/nyu2_test/01446_colors.png,data/nyu2_test/01446_depth.png
652,data/nyu2_test/01447_colors.png,data/nyu2_test/01447_depth.png


In [None]:
df["image"] = df["image"].apply(lambda x : path_root + str(x))
df["depth"] = df["depth"].apply(lambda x : path_root + str(x))
df

Unnamed: 0,image,depth
0,/content/drive/MyDrive/Colab Notebooks/data/ny...,/content/drive/MyDrive/Colab Notebooks/data/ny...
1,/content/drive/MyDrive/Colab Notebooks/data/ny...,/content/drive/MyDrive/Colab Notebooks/data/ny...
2,/content/drive/MyDrive/Colab Notebooks/data/ny...,/content/drive/MyDrive/Colab Notebooks/data/ny...
3,/content/drive/MyDrive/Colab Notebooks/data/ny...,/content/drive/MyDrive/Colab Notebooks/data/ny...
4,/content/drive/MyDrive/Colab Notebooks/data/ny...,/content/drive/MyDrive/Colab Notebooks/data/ny...
...,...,...
649,/content/drive/MyDrive/Colab Notebooks/data/ny...,/content/drive/MyDrive/Colab Notebooks/data/ny...
650,/content/drive/MyDrive/Colab Notebooks/data/ny...,/content/drive/MyDrive/Colab Notebooks/data/ny...
651,/content/drive/MyDrive/Colab Notebooks/data/ny...,/content/drive/MyDrive/Colab Notebooks/data/ny...
652,/content/drive/MyDrive/Colab Notebooks/data/ny...,/content/drive/MyDrive/Colab Notebooks/data/ny...


# **Building a Data Pipeline**

1. The Data pipeline takes a dataframe containing the path to the RGB image and the corresponding depth map 
2. It reads and resizes the RGB images. 
3. It reads the depth map image and resizes it.
4. It then finally returns the RGB images and their corresponding depth map one batch at a time.

In [None]:
class DataPipeline(Sequence) :

    def __init__(self, data, batch_size, dim = (256, 256), n_channels = 3, shuffle = True) :
        """
        Initialisation
        """

        self.data = data ## dataframe containing paths to images
        self.indices = self.data.index.tolist() ## List of indices of images
        self.dim = dim ## dimensions of each image
        self.n_channels = n_channels ## Number of Channels for input image 
        self.batch_size = batch_size ## Number of images in each batch
        self.shuffle = shuffle ## Boolean to indicate Shuffling 
        self.on_epoch_end() 
        ## Above is a property of the parent class tensoflow.keras.utils.sequence
        ## which is used if we wish to modify the dataset at each epoch end.

    def __len__(self) :
        """
        Returns number of batches in the sequence
        """

        return int(np.ceil(len(self.data) / self.batch_size))


    def __getitem__(self, index) :
        """
        Returning the next batch of images and corresponding depth maps 
        """

        ## In case there are not enough data points to fill a batch.
        if (index + 1) * self.batch_size > len(self.indices) :
            self.batch_size = len(self.indices) - index * self.batch_size

        ## Generating one batch of data 
            ## Firstly generating the indices of the data 
        batch_index = self.indices[index * self.batch_size : (index + 1) * self.batch_size]

        ## Finding list of ids 
        batch = [self.indices[k] for k in batch_index]
        x, y = self.Data_Generation(batch)

        return x, y

    def on_epoch_end(self) :

        """
        Updating the indices after each apoch
        """

        self.index = np.arange(len(self.indices))
        if self.shuffle is True :
            np.random.shuffle(self.index)

    def load(self, img_path, depth_path) :

        """
        Loads the input RGB image and corresponding depth map 
        """

        image_ = cv2.imread(img_path)
        ## OpenCV reads colour images in BGR format by default 
        ## So conversion is neccessary 
        image_ = cv2.cvtColor(image_, cv2.COLOR_BGR2RGB)
        image_ = cv2.resize(image_, self.dim)
        image_ = tf.image.convert_image_dtype(image_, tf.float32)

        depth_ = cv2.imread(depth_path)
        depth_ = cv2.cvtColor(depth_, cv2.COLOR_BGR2GRAY)
        depth_.resize(self.dim[0], self.dim[1], 1)
        depth_ = tf.image.convert_image_dtype(depth_, tf.float32)

        return image_, depth_


    def Data_Generation(self, batch) :

        x = np.empty((self.batch_size, *self.dim, self.n_channels))
        y = np.empty((self.batch_size, *self.dim, 1))

        for i, batch_id in enumerate(batch) :
            x[i, ], y[i, ] = self.load(
                self.data["image"][batch_id], 
                self.data["depth"][batch_id]
            )

        return x, y
    


# **Declaring Hyperparameters**

In [None]:
HEIGHT =  224
WIDTH = 224
LR = 0.02
EPOCHS = 3
BATCH_SIZE = 10

# **Building the Model**

1. Here, we will generate some special classes to implement the Encoder that is the MobileNetV2 without the classifier, which will need to declare an inherited class of tensorflow.keras.layers.Layer to design the residual and downsizing block.
2. We will also define a class to implement the UpSampConv layer declared in the paper. 

In [None]:
class ResidualMobileNet(layers.Layer) :

    def __init__(self, units, padding = "same", **kwargs) :
        super().__init__(**kwargs)
        self.convStart = layers.Conv2D(units, (1, 1), (1, 1), padding = padding)
        self.DepthConv = layers.DepthwiseConv2D((3, 3), (1, 1), padding = padding)
        self.convEnd = layers.Conv2D(units, (1, 1), (1, 1), padding = padding, activation = 'linear')
        self.conc = layers.Concatenate()
        self.bn2start = layers.BatchNormalization()
        self.bn2depth = layers.BatchNormalization()
        self.bn2end = layers.BatchNormalization()

    def call(self, input_tensor) :
        d = self.convStart(input_tensor)
        x = self.bn2start(d)
        x = relu6(x)

        x = self.DepthConv(x)
        x = self.bn2depth(x)
        x = relu6(x)

        x = self.convEnd(x)
        x = self.bn2end(x)

        x = self.conc([x, input_tensor])
        return x

class DownScalingMobileNet(layers.Layer) :

    def __init__(self, units, padding = "same", **kwargs) :
        super().__init__(**kwargs)
        self.convStart = layers.Conv2D(units, (1, 1), (1, 1), padding = padding)
        self.DepthConv = layers.DepthwiseConv2D((3, 3), (2, 2), padding = padding)
        self.convEnd = layers.Conv2D(units, (1, 1), (1, 1), padding = padding, activation = 'linear')
        self.bn2start = layers.BatchNormalization()
        self.bn2depth = layers.BatchNormalization()
        self.bn2end = layers.BatchNormalization()

    def call(self, input_tensor) :
        d = self.convStart(input_tensor)
        x = self.bn2start(d)
        x = relu6(x)

        x = self.DepthConv(x)
        x = self.bn2depth(x)
        x = relu6(x)

        x = self.convEnd(x)
        x = self.bn2depth(x)

        return x

class UpsampConv(layers.Layer) :

    def __init__(self, units, padding = "same", **kwargs) :
        super().__init__(**kwargs)
        self.convA = layers.Conv2D(units, (1, 1), (1, 1), padding = padding)
        self.convB = layers.Conv2D(units, (3, 3), (1, 1), padding = padding)
        self.convC = layers.DepthwiseConv2D((5, 5), (1, 1), padding = padding)
        self.bn2A = layers.BatchNormalization()
        self.bn2B = layers.BatchNormalization()
        self.bn2C = layers.BatchNormalization()
        self.conc = layers.Concatenate()

    def call(self, input_tensor) :
        d = self.convA(input_tensor)
        x = self.bn2A(d)

        y = self.convB(x)
        y = self.bn2B(y)

        z = self.convC(x)
        z = self.bn2C(x)

        x = tf.identity(x)
        res = self.conc([x, y, z])
        return res 

class UpSamplingBlock(layers.Layer):

    def __init__(self, units, height, width, padding = "same", **kwargs) :
        super().__init__(**kwargs)
        self.upsamp = UpsampConv(units, padding = padding)
        self.nearest = layers.Resizing(height, width, "nearest")
        self.bn = layers.BatchNormalization()

    def call(self, input_tensor) :
        x = self.upsamp(input_tensor)
        x = self.nearest(x)
        x = self.bn(x)
        return x


# **Defining the Loss functions**

In [None]:
class DepthEstimation(Model) :

    def __init__(self) :
        super().__init__()
        self.ssim_loss_weight = 0.85
        self.l1_loss_weight = 0.1
        self.edge_loss_weight = 0.9
        self.loss_metric = tf.keras.metrics.Mean(name="loss")

        ## Encoder
        self.convA = layers.Conv2D(32, (1, 1), (2, 2), padding = "same")
        self.Layer1 = ResidualMobileNet(16)
        self.Layer2 = DownScalingMobileNet(24)
        self.Layer3 = DownScalingMobileNet(32)
        self.Layer4 = DownScalingMobileNet(64)
        self.Layer5 = ResidualMobileNet(96)
        self.Layer6 = DownScalingMobileNet(160)
        self.Layer7 = ResidualMobileNet(320)
        self.convB = layers.Conv2D(1280, (1, 1), (1, 1))

        ## Decoder
        self.Layer8 = UpSamplingBlock(640, 14, 14)
        self.Layer9 = UpSamplingBlock(320, 28, 28)
        self.Layer10 = UpSamplingBlock(160, 56, 56)
        self.Layer11 = UpSamplingBlock(80, 112, 112)
        self.Layer12 = UpSamplingBlock(40, 224, 224)
        self.convC = layers.Conv2D(1, (1, 1), (1, 1))


    def calculate_loss(self, target, pred) :
        # Edges
        dy_true, dx_true = tf.image.image_gradients(target)
        dy_pred, dx_pred = tf.image.image_gradients(pred)
        weights_x = tf.exp(tf.reduce_mean(tf.abs(dx_true)))
        weights_y = tf.exp(tf.reduce_mean(tf.abs(dy_true)))

        # Depth smoothness
        smoothness_x = dx_pred * weights_x
        smoothness_y = dy_pred * weights_y

        depth_smoothness_loss = tf.reduce_mean(abs(smoothness_x)) + tf.reduce_mean(
            abs(smoothness_y)
        )

        # Structural similarity (SSIM) index
        ssim_loss = tf.reduce_mean(
            1
            - tf.image.ssim(
                target, pred, max_val=WIDTH, filter_size=7, k1=0.01 ** 2, k2=0.03 ** 2
            )
        )
        # Point-wise depth
        l1_loss = tf.reduce_mean(tf.abs(target - pred))

        loss = (
            (self.ssim_loss_weight * ssim_loss)
            + (self.l1_loss_weight * l1_loss)
            + (self.edge_loss_weight * depth_smoothness_loss)
        )

        return loss

    @property
    def metrics(self):
        return [self.loss_metric]

    def train_step(self, batch_data):
        input, target = batch_data
        with tf.GradientTape() as tape:
            pred = self(input, training=True)
            loss = self.calculate_loss(target, pred)

        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        self.loss_metric.update_state(loss)
        return {
            "loss": self.loss_metric.result(),
        }

    def test_step(self, batch_data):
        input, target = batch_data

        pred = self(input, training=False)
        loss = self.calculate_loss(target, pred)

        self.loss_metric.update_state(loss)
        return {
            "loss": self.loss_metric.result(),
        }


    def call(self, input_tensor) :
        
        ## Calling the Encoder Layers
        x = self.convA(input_tensor)
        x = self.Layer1(x)
        x = self.Layer2(x)
        x = self.Layer3(x)
        x = self.Layer4(x)
        x = self.Layer5(x)
        x = self.Layer6(x)
        x = self.Layer7(x)
        x = self.convB(x)

        ## Calling the Deocder Layers
        x = self.Layer8(x)
        x = self.Layer9(x)
        x = self.Layer10(x)
        x = self.Layer11(x)
        x = self.Layer12(x)
        x = self.convC(x)

        return x


In [None]:
optimizer = OPT.Adam(
    learning_rate=LR,
    amsgrad=False,
)
model = DepthEstimation()
# Define the loss function
cross_entropy = losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction="none"
)
# Compile the model
model.compile(optimizer, loss=cross_entropy)
train_loader = DataPipeline(
     data = df[:100].reset_index(drop="true"), batch_size=BATCH_SIZE, dim=(HEIGHT, WIDTH)
)
validation_loader = DataPipeline(
     data = df[100: 110].reset_index(drop="true"), batch_size=BATCH_SIZE, dim=(HEIGHT, WIDTH)
)
model.fit(
    train_loader, epochs=EPOCHS, validation_data = validation_loader
)

Epoch 1/3
 1/10 [==>...........................] - ETA: 3:20 - loss: 1.2348

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive
