In [None]:
import os
import tensorflow as tf

In [None]:
annotation_folder = "/dataset/"
if not os.path.exists(os.path.abspath(".") + annotation_folder):
    annotation_zip = tf.keras.utils.get_file(
        "val.tar.gz",
        cache_subdir=os.path.abspath("."),
        origin="http://diode-dataset.s3.amazonaws.com/val.tar.gz",
        extract=True,
    )

In [None]:
import pandas as pd
import numpy as np
import cv2
import matplotlib.pyplot as plt

In [None]:
path = 'val/indoors'

filelist = []

for root, dirs, files in os.walk(path):
    for file in files:
        filelist.append(os.path.join(root, file))

filelist.sort()
data = {
    "image": [x for x in filelist if x.endswith('.png')],
    "depth": [x for x in filelist if x.endswith('_depth.npy')],
    "mask": [x for x in filelist if x.endswith('_depth_mask.npy')],
}
df = pd.DataFrame(data)
df = df.sample(frac=1, random_state=42)

In [None]:
HEIGHT = 256
WIDTH = 256
LR = 0.0002
EPOCHS = 150
BATCH_SIZE = 64

In [None]:
class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, data, batch_size=6, dim=(768, 1024), n_channels=3, shuffle=True):
        self.data = data
        self.indices = self.data.index.tolist()
        self.dim = dim
        self.n_channels = n_channels
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.min_depth = 0.1
        self.on_epoch_end()
        
    def __len__(self):
        return int(np.ceil(len(self.data) / self.batch_size))
    
    def __getitem__(self, index):
        if (index+1) * self.batch_size > len(self.indices):
            self.batch_size = len(self.indices) - index * self.batch_size
        
        index = self.indices[index * self.batch_size: (index+1)*self.batch_size]
        
        batch = [self.indices[k] for k in index]
        x, y = self.data_generation(batch)
        return x, y
    
    def on_epoch_end(self):
        self.index = np.arange(len(self.indices))
        if self.shuffle == True:
            np.random.shuffle(self.index)
    
    def load(self, image_path, depth_map, mask):
        image_ = cv2.imread(image_path)
        image_ = cv2.cvtColor(image_, cv2.COLOR_BGR2RGB)
        image_ = cv2.resize(image_, self.dim)
        image_ = tf.image.convert_image_dtype(image_, tf.float32)
        
        depth_map = np.load(depth_map).squeeze()
        
        mask = np.load(mask)
        mask = mask > 0
        
        max_depth = min(300, np.percentile(depth_map, 99))
        depth_map = np.clip(depth_map, self.min_depth, max_depth)
        depth_map = np.log(depth_map, where=mask)
        
        depth_map = np.ma.masked_where(~mask, depth_map)
        
        depth_map = np.clip(depth_map, 0.1, np.log(max_depth))
        depth_map = cv2.resize(depth_map, self.dim)
        depth_map = np.expand_dims(depth_map, axis=2)
        depth_map = tf.image.convert_image_dtype(depth_map, tf.float32)
        return image_, depth_map
    
    def data_generation(self, batch):
        x = []
        y = []
        for i, batch_id in enumerate(batch):
            x.append(self.load(
                self.data["image"][batch_id],
                self.data["depth"][batch_id],
                self.data["mask"][batch_id],
            )[0])
            y.append(self.load(
                self.data["image"][batch_id],
                self.data["depth"][batch_id],
                self.data["mask"][batch_id],
            )[1])
        x = np.array(x)
        y = np.array(y)
        return x, y

In [None]:
def visualize_depth_map(samples, test=False, model=None):
    input_img, target = samples

    cmap = plt.cm.jet
    cmap.set_bad(color='black')

    if test:
        pred = model.predict(input_img)
        fig, ax = plt.subplots(6, 3, figsize=(50, 50))
        for i in range(6):
            ax[i, 0].imshow((input_img[i].squeeze()))
            ax[i, 1].imshow((target[i].squeeze()),cmap=cmap)
            ax[i, 2].imshow((pred[i].squeeze()), cmap=cmap)
    else:
        fig, ax = plt.subplots(6, 2, figsize=(50, 50))
        for i in range(6):
            ax[i, 0].imshow((np.squeeze(input_img[i])))
            ax[i, 1].imshow((np.squeeze(target[i])), cmap=cmap)

visualize_samples = next(iter(DataGenerator(data=df, batch_size=6, dim=(HEIGHT, WIDTH))))
visualize_depth_map(visualize_samples)

In [None]:
from tensorflow.keras import layers

In [None]:
class Encoder(layers.Layer):
    def __init__(self, filters, kernel_size=(3,3), padding='same', strides=1):
        super().__init__()
        self.convA = layers.Conv2D(filters, kernel_size, strides, padding)
        self.convB = layers.Conv2D(filters, kernel_size, strides, padding)
        self.reluA = layers.LeakyReLU(alpha=.2)
        self.reluB = layers.LeakyReLU(alpha=.2)
        self.bn2a = tf.keras.layers.BatchNormalization()
        self.bn2b = tf.keras.layers.BatchNormalization()
        self.pool = layers.MaxPool2D((2, 2), (2, 2))
        
    def call(self, input_tensor):
        d = self.convA(input_tensor)
        x = self.bn2a(d)
        x = self.reluA(x)
        
        x = self.convB(x)
        x = self.bn2b(x)
        x = self.reluB(x)
        
        x += d
        p = self.pool(x)
        return x, p

In [None]:
class Decoder(layers.Layer):
    def __init__(self, filters, kernel_size=(3,3), padding='same', strides=1):
        super().__init__()
        self.us = layers.UpSampling2D((2,2))
        self.convA = layers.Conv2D(filters, kernel_size, strides, padding)
        self.convB = layers.Conv2D(filters, kernel_size, strides, padding)
        self.reluA = layers.LeakyReLU(alpha=.2)
        self.reluB = layers.LeakyReLU(alpha=.2)
        self.bn2a = tf.keras.layers.BatchNormalization()
        self.bn2b = tf.keras.layers.BatchNormalization()
        self.conc = layers.Concatenate()
        
    def call(self, x, skip):
        x = self.us(x)
        concat = self.conc([x, skip])
        x = self.convA(concat)
        x = self.bn2a(x)
        x = self.reluA(x)
        
        x = self.convB(x)
        x = self.bn2b(x)
        x = self.reluB(x)
        return x

In [None]:
class BottleNeck(layers.Layer):
    def __init__(self, filters, kernel_size=(3,3), padding='same', strides=1):
        super().__init__()
        self.convA = layers.Conv2D(filters, kernel_size, strides, padding)
        self.convB = layers.Conv2D(filters, kernel_size, strides, padding)
        self.reluA = layers.LeakyReLU(alpha=.2)
        self.reluB = layers.LeakyReLU(alpha=.2)
        self.drop = layers.Dropout(rate=.4)
        
    def call(self, x):
        x = self.convA(x)
        x = self.reluA(x)
        x = self.convB(x)
        x = self.reluB(x)
        x = self.drop(x)
        return x

In [None]:
class Model(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.ssim_loss_weight = .9
        self.l1_loss_weight = .07
        self.edge_loss_weight = .9
        self.loss_metric = tf.keras.metrics.Mean(name='loss')
        self.f = [16, 32, 64, 128, 256]
        self.encoder_block = [
            Encoder(self.f[0]),
            Encoder(self.f[1]),
            Encoder(self.f[2]),
            Encoder(self.f[3]),
        ]
        
        self.bottle_neck = BottleNeck(self.f[4])
        
        self.decoder_block = [
            Decoder(self.f[3]),
            Decoder(self.f[2]),
            Decoder(self.f[1]),
            Decoder(self.f[0]),
        ]
        self.conv_layer = layers.Conv2D(1, (1,1), padding='same', activation='tanh')
    
    def calculate_loss(self, target, pred):
        dy_true, dx_true = tf.image.image_gradients(target)
        dy_pred, dx_pred = tf.image.image_gradients(pred)
        weights_x = tf.exp(tf.reduce_mean(tf.abs(dx_true)))
        weights_y = tf.exp(tf.reduce_mean(tf.abs(dy_true)))
        
        smoothness_x = dx_pred * weights_x
        smoothness_y = dy_pred * weights_y
        
        depth_smoothness_loss = tf.reduce_mean(abs(smoothness_x)) + tf.reduce_mean(abs(smoothness_y))
        ssim_loss = tf.reduce_mean(1 - tf.image.ssim(target, pred, max_val=WIDTH, filter_size=7, k1=.01**2, k2=.03**2))
        l1_loss = tf.reduce_mean(tf.abs(target - pred))
        
        loss = (
            (self.ssim_loss_weight * ssim_loss) +
            (self.l1_loss_weight * l1_loss) +
            (self.edge_loss_weight * depth_smoothness_loss)
        )
        return loss
    
    @property
    def metrics(self):
        return [self.loss_metric]
    
    def train_step(self, batch_data):
        input_, target = batch_data
        with tf.GradientTape() as tape:
            pred = self(input_, training=True)
            loss = self.calculate_loss(target, pred)
            
        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        self.loss_metric.update_state(loss)
        return {
            'loss': self.loss_metric.result(),
        }
    
    def test_step(self, batch_data):
        input_, target = batch_data
        pred = self(input_, training=False)
        loss = self.calculate_loss(target, pred)
        self.loss_metric.update_state(loss)
        
        return {'loss': self.loss_metric.result()}
    
    def call(self, x):
        c1, p1 = self.encoder_block[0](x)
        c2, p2 = self.encoder_block[1](p1)
        c3, p3 = self.encoder_block[2](p2)
        c4, p4 = self.encoder_block[3](p3)

        bn = self.bottle_neck(p4)
        
        u1 = self.decoder_block[0](bn, c4)
        u2 = self.decoder_block[1](u1, c3)
        u3 = self.decoder_block[2](u2, c2)
        u4 = self.decoder_block[3](u3, c1)
        
        return self.conv_layer(u4)

In [None]:
optimizer = tf.keras.optimizers.Adam(LR, amsgrad=False, clipnorm=1)

model = Model()

cross_entropy = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction="none")

model.compile(optimizer, loss=cross_entropy)
train_loader = DataGenerator(
    data = df[:260].reset_index(drop='true'), batch_size=BATCH_SIZE, dim=(HEIGHT, WIDTH))

validation_loader = DataGenerator(
    data=df[260:].reset_index(drop='true'), batch_size=BATCH_SIZE, dim=(HEIGHT, WIDTH))

model.fit(train_loader, epochs=EPOCHS, validation_data=validation_loader)

In [None]:
plt.plot(model.history['loss'])
plt.plot(model.history['val_loss'])
plt.title('Model loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
test_loader = next(iter(
    DataGenerator(data=df[265:].reset_index(drop='true'), batch_size=6, dim=(HEIGHT, WIDTH))
))
visualize_depth_map(test_loader, test=True, model=model)

test_loader = next(iter(
    DataGenerator(data=df[300:].reset_index(drop='true'), batch_size=6, dim=(HEIGHT, WIDTH))
))
visualize_depth_map(test_loader, test=True, model=model)

In [None]:
model.save('depth_estimate')

In [None]:
# load model
new_model = tf.keras.models.load_model('depth_estimate')
new_model.summary()

In [None]:
#new_model.predict(image)