<a href="https://colab.research.google.com/github/suarim/samsung_prism_camera_motion_estimation/blob/main/monodepth.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:

import tensorflow as tf

from tensorflow.keras.layers import Conv2D, BatchNormalization, ZeroPadding2D, ReLU, MaxPooling2D


class ReflectionPadding2D(tf.keras.layers.Layer):

    def __init__(self, padding=(1, 1), **kwargs):
        self.padding = tuple(padding)
        self.input_spec = [tf.keras.layers.InputSpec(ndim=4)]
        super(ReflectionPadding2D, self).__init__(**kwargs)

    def compute_output_shape(self, s):
        return (s[0], s[1] + 2 * self.padding[0], s[2] + 2 * self.padding[1], s[3])

    def call(self, x, mask=None):
        w_pad, h_pad = self.padding
        return tf.pad(x, [[0, 0], [h_pad, h_pad], [w_pad, w_pad], [0, 0]], 'REFLECT')

    def get_config(self):
        config = super().get_config().copy()
        return config


def res_block(inputs, layer, downsample=False):
    filters = inputs.shape[-1]
    filters *= 2 if downsample else 1
    strides = 2 if downsample else 1
    pad1 = ZeroPadding2D(1)(inputs)
    name = 'en.layer' + str(layer[0]) + '.' + str(layer[1]) + '.'
    conv1 = Conv2D(filters, 3, activation='linear', use_bias=False, strides=strides,name=name + 'conv1')(pad1)
    bn1 = BatchNormalization(momentum=0.9, epsilon=1e-5, name=name + 'bn1')(conv1)
    relu1 = ReLU()(bn1)
    pad2 = ZeroPadding2D(1)(relu1)
    conv2 = Conv2D(filters, 3, activation='linear', use_bias=False,name=name + 'conv2')(pad2)
    bn2 = BatchNormalization(momentum=0.9, epsilon=1e-5, name=name + 'bn2')(conv2)

    if not downsample:
        add = bn2 + inputs
    else:
        name += 'downsample.'
        conv3 = Conv2D(filters, 1, activation='linear',use_bias=False, strides=2, name=name + '0')(inputs)
        bn3 = BatchNormalization(momentum=0.9, epsilon=1e-5, name=name + '1')(conv3)
        add = bn2 + bn3

    relu2 = ReLU()(add)
    return relu2


def conv_block(size, inTensor, disp=False, cnt=''):
    name = 'dispconv' if disp else 'upconv'
    name = 'de.' + name + '.' + str(len("{0:b}".format(size)) - 5) + '.' + cnt
    filters = 1 if disp else size
    x = ReflectionPadding2D()(inTensor)
    x = tf.keras.layers.Conv2D(filters, 3, name=name)(x)
    if not disp:
        x = tf.keras.layers.ELU()(x)
    else:
        x = tf.keras.activations.sigmoid(x)
    return x


def up_conv(size, firstTensor, secondTensor=None):
    x = conv_block(size, firstTensor, cnt='0')
    x = tf.keras.layers.UpSampling2D()(x)
    if size > 16:
        x = tf.keras.layers.concatenate([x, secondTensor], axis=-1)
    x = conv_block(size, x, cnt='1')
    return x

In [None]:
import os
import hashlib
import zipfile
import urllib.request

def download_model_if_doesnt_exist(model_name):
    """If pretrained kitti model doesn't exist, download and unzip it
    """
    # values are tuples of (<google cloud URL>, <md5 checksum>)
    download_paths = {
        "mono_640x192":
            ("https://storage.googleapis.com/niantic-lon-static/research/monodepth2/mono_640x192.zip",
             "a964b8356e08a02d009609d9e3928f7c"),
        "stereo_640x192":
            ("https://storage.googleapis.com/niantic-lon-static/research/monodepth2/stereo_640x192.zip",
             "3dfb76bcff0786e4ec07ac00f658dd07"),
        "mono+stereo_640x192":
            ("https://storage.googleapis.com/niantic-lon-static/research/monodepth2/mono%2Bstereo_640x192.zip",
             "c024d69012485ed05d7eaa9617a96b81"),
        "mono_no_pt_640x192":
            ("https://storage.googleapis.com/niantic-lon-static/research/monodepth2/mono_no_pt_640x192.zip",
             "9c2f071e35027c895a4728358ffc913a"),
        "stereo_no_pt_640x192":
            ("https://storage.googleapis.com/niantic-lon-static/research/monodepth2/stereo_no_pt_640x192.zip",
             "41ec2de112905f85541ac33a854742d1"),
        "mono+stereo_no_pt_640x192":
            ("https://storage.googleapis.com/niantic-lon-static/research/monodepth2/mono%2Bstereo_no_pt_640x192.zip",
             "46c3b824f541d143a45c37df65fbab0a"),
        "mono_1024x320":
            ("https://storage.googleapis.com/niantic-lon-static/research/monodepth2/mono_1024x320.zip",
             "0ab0766efdfeea89a0d9ea8ba90e1e63"),
        "stereo_1024x320":
            ("https://storage.googleapis.com/niantic-lon-static/research/monodepth2/stereo_1024x320.zip",
             "afc2f2126d70cf3fdf26b550898b501a"),
        "mono+stereo_1024x320":
            ("https://storage.googleapis.com/niantic-lon-static/research/monodepth2/mono%2Bstereo_1024x320.zip",
             "cdc5fc9b23513c07d5b19235d9ef08f7"),
        }

    if not os.path.exists("models"):
        os.makedirs("models")

    model_path = os.path.join("models", model_name)

    def check_file_matches_md5(checksum, fpath):
        if not os.path.exists(fpath):
            return False
        with open(fpath, 'rb') as f:
            current_md5checksum = hashlib.md5(f.read()).hexdigest()
        return current_md5checksum == checksum

    # see if we have the model already downloaded...
    if not os.path.exists(os.path.join(model_path, "encoder.pth")):

        model_url, required_md5checksum = download_paths[model_name]

        if not check_file_matches_md5(required_md5checksum, model_path + ".zip"):
            print("-> Downloading pretrained model to {}".format(model_path + ".zip"))
            urllib.request.urlretrieve(model_url, model_path + ".zip")

        if not check_file_matches_md5(required_md5checksum, model_path + ".zip"):
            print("   Failed to download a file which matches the checksum - quitting")
            quit()

        print("   Unzipping model...")
        with zipfile.ZipFile(model_path + ".zip", 'r') as f:
            f.extractall(model_path)

        print("   Model unzipped to {}".format(model_path))

In [None]:
model_name = "mono_640x192"

download_model_if_doesnt_exist(model_name)
encoder_path = os.path.join("models", model_name, "encoder.pth")
depth_decoder_path = os.path.join("models", model_name, "depth.pth")

In [None]:
import tensorflow as tf
import torch


inputs = tf.keras.Input(shape=(192, 640, 3))
encoder = []
outputs = []

# Encoder part:
x = (inputs - 0.45) / 0.225
x = ZeroPadding2D(3)(x)
x = Conv2D(64, 7, strides=2, activation='linear', use_bias=False, name='conv1')(x)
x = BatchNormalization(momentum=0.9, epsilon=1e-5, name='bn1')(x)
x = ReLU()(x)
encoder.append(x)
x = ZeroPadding2D(1)(x)
x = MaxPooling2D(3, 2)(x)

for i in range(1, 5):
    x = res_block(x, (i, 0), i > 1)
    x = res_block(x, (i, 1))
    encoder.append(x)


# Decoder part:
x = up_conv(256, encoder[4], encoder[3])
x = up_conv(128, x, encoder[2])
outputs.append(conv_block(128, x, disp=True))

x = up_conv(64, x, encoder[1])
outputs.append(conv_block(64, x, disp=True))

x = up_conv(32, x, encoder[0])
outputs.append(conv_block(32, x, disp=True))

x = up_conv(16, x)
outputs.append(conv_block(16, x, disp=True))

outputs = outputs[::-1]
model = tf.keras.Model(inputs=inputs, outputs=outputs, name='depth')


encoder_path = 'models/mono_640x192/encoder.pth'
decoder_path = 'models/mono_640x192/depth.pth'

loaded_dict_enc = torch.load(encoder_path, map_location='cpu')
loaded_dict = torch.load(decoder_path, map_location='cpu')

model.get_layer('conv1').set_weights(
    [loaded_dict_enc['encoder.conv1.weight'].numpy().transpose(2, 3, 1, 0)])

model.get_layer('bn1').set_weights(
    [loaded_dict_enc['encoder.bn1.weight'].numpy(),
     loaded_dict_enc['encoder.bn1.bias'].numpy(),
     loaded_dict_enc['encoder.bn1.running_mean'].numpy(),
     loaded_dict_enc['encoder.bn1.running_var'].numpy()])

for layer in model.layers:
    name = layer.name.split('.')
    if name[0] == 'en':
        name = '.'.join(name[1:])
        num_weights = len(layer.get_weights())
        if num_weights == 1:
            layer.set_weights([loaded_dict_enc['encoder.' + name + '.weight'].numpy().transpose(2, 3, 1, 0)])
        else:
            layer.set_weights(
                [loaded_dict_enc['encoder.' + name + '.weight'].numpy(),
                 loaded_dict_enc['encoder.' + name + '.bias'].numpy(),
                 loaded_dict_enc['encoder.' + name + '.running_mean'].numpy(),
                 loaded_dict_enc['encoder.' + name + '.running_var'].numpy()])

    if name[0] == 'de':
        if name[1] == 'upconv':
            num = str(2 * (4 - int(name[2])) + int(name[3]))
            layer.set_weights([loaded_dict['decoder.' + num + '.conv.conv.weight'].numpy().transpose(2, 3, 1, 0),loaded_dict['decoder.' + num + '.conv.conv.bias'].numpy()])
        else:
            num = str(int(name[2]) + 10)
            layer.set_weights([loaded_dict['decoder.' + num + '.conv.weight'].numpy().transpose(2, 3, 1, 0),loaded_dict['decoder.' + num + '.conv.bias'].numpy()])


model.summary()

In [None]:

import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint
import glob
import os
import PIL.Image as pil
import numpy as np
import tensorflow as tf
import math

from PIL import Image as pil
import tensorflow as tf
from tensorflow.keras.losses import Loss

class CustomSmoothnessLoss(Loss):
    def __init__(self, lambda_smooth=0.1):
        super(CustomSmoothnessLoss, self).__init__()
        self.lambda_smooth = lambda_smooth

    def gradient(self, pred):
        D_dy = pred[:, 1:, :, :] - pred[:, :-1, :, :]
        D_dx = pred[:, :, 1:, :] - pred[:, :, :-1, :]
        return D_dx, D_dy

    def second_order_gradient(self, pred):
        dx, dy = self.gradient(pred)
        dx2, dxdy = self.gradient(dx)
        dydx, dy2 = self.gradient(dy)
        return dx2, dxdy, dydx, dy2

    def call(self, y_true, y_pred):
        # Assuming y_pred is the predicted disparity map
        pred_disp = y_pred

        dx2, dxdy, dydx, dy2 = self.second_order_gradient(pred_disp)

        smooth_loss = tf.reduce_mean(tf.abs(dx2)) + \
                      tf.reduce_mean(tf.abs(dxdy)) + \
                      tf.reduce_mean(tf.abs(dydx)) + \
                      tf.reduce_mean(tf.abs(dy2))

        total_loss = self.lambda_smooth * smooth_loss

        return total_loss




def load_and_preprocess_image(image_path, target_size=(192, 640)):
    img = pil.open(image_path).convert('RGB')
    img_resized = img.resize(target_size, pil.LANCZOS)
    img_array = tf.convert_to_tensor(img_resized, dtype=tf.float32)
    img_array = tf.expand_dims(img_array, axis=0)
    return img_array / 255.0, img_array / 255.0



def data_generator(dataset_loc, batch_size=8, target_size=(192, 640)):
    while True:
        batch_paths = np.random.choice(dataset_loc, size=batch_size)
        batch = [load_and_preprocess_image(path, target_size=target_size) for path in batch_paths]
        batch_images, target_images = zip(*batch)
        batch_images = tf.concat(batch_images, axis=0)
        target_images = tf.concat(target_images, axis=0)
        yield batch_images, target_images




# def depth_loss(predicted_depth1, predicted_depth2):
#     # Concatenate predicted depth maps along the last axis (-1)
#     predicted_depth1 = tf.concat(predicted_depth1, axis=-1)
#     predicted_depth2 = tf.concat(predicted_depth2, axis=-1)

#     # Resize the second predicted depth map to match the size of the first one
#     predicted_depth2 = tf.image.resize(predicted_depth2, size=tf.shape(predicted_depth1)[1:3])

#     # Compute reconstruction loss as the mean squared error
#     reconstruction_loss = tf.reduce_mean(tf.square(predicted_depth1 - predicted_depth2))

#     return reconstruction_loss
# def depth_loss(true_depth, predicted_depth):
#     predicted_depth = tf.concat(predicted_depth, axis=-1)

#     predicted_depth = tf.image.resize(predicted_depth, size=tf.shape(true_depth)[1:3])

#     mask = tf.math.logical_not(tf.math.is_nan(true_depth))

#     reconstruction_loss = tf.reduce_sum(tf.square(tf.boolean_mask(true_depth - predicted_depth, mask)))

#     num_valid_pixels = tf.reduce_sum(tf.cast(mask, tf.float32))
#     reconstruction_loss = tf.cond(tf.greater(num_valid_pixels, 0),
#                                   lambda: reconstruction_loss / num_valid_pixels,
#                                   lambda: tf.constant(0.0, dtype=tf.float32))

#     return reconstruction_loss



learning_rate = 1e-4

optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

checkpoint_callback = ModelCheckpoint('best_weights.h5', save_best_only=True, save_weights_only=True)



dataset_loc = []
data = ['2011_09_26_drive_0001'
   ]



for j in data:
    for i in range(4):
        base_path_template = f"/content/drive/MyDrive/complete/2011_09_26/{j}_sync/image_0{i}/data"
        specific_path = os.path.join(base_path_template)
        image_files = glob.glob(os.path.join(specific_path, "*.png"))
        dataset_loc.extend(image_files)

batch_size = 8
epochs =3



model.compile(optimizer=optimizer, loss=CustomSmoothnessLoss(lambda_smooth=0.1))
train_steps_per_epoch = (len(dataset_loc) / batch_size)
train_generator = data_generator(dataset_loc, batch_size=batch_size)
model.fit_generator(
    generator=train_generator,
    steps_per_epoch=train_steps_per_epoch,
    epochs=epochs,
    callbacks=[checkpoint_callback],
)

In [None]:
import numpy as np
import PIL.Image as pil
import matplotlib.pyplot as plt
import tensorflow as tf
import time

def test_function(image_path, loaded_dict_enc):
    input_image = pil.open(image_path).convert('RGB')
    original_width, original_height = input_image.size

    feed_height = loaded_dict_enc['height']
    feed_width = loaded_dict_enc['width']
    input_image_resized = input_image.resize((feed_width, feed_height), pil.LANCZOS)

    input_image_tf = tf.convert_to_tensor(input_image_resized, dtype=tf.float32)
    input_image_tf = tf.expand_dims(input_image_tf, axis=0)

    return input_image_tf

image_path = r"/content/drive/MyDrive/complete/2011_09_26/2011_09_26_drive_0059_sync/image_00/data/0000000000.png"

start_time = time.time()
input_image_tf = test_function(image_path, loaded_dict_enc)
end_time = time.time()

processing_time = (end_time - start_time) * 1000
print(f"Processing time: {processing_time} milliseconds")

In [None]:
res = model.predict(input_image_tf)

In [None]:
res

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Assuming res is a list of NumPy arrays

# Access individual arrays directly
disparity_map = res[0][0, :, :, 0]
depth_map = 1 / (disparity_map + 1e-6)

# Plotting
fig, axs = plt.subplots(1, 2, figsize=(12, 6))

# Disparity Map
axs[0].imshow(disparity_map, cmap='plasma')
axs[0].set_title('Disparity Map')
axs[0].axis('off')

# Depth Map
im = axs[1].imshow(depth_map, cmap='viridis')
axs[1].set_title('Depth Map')
axs[1].axis('off')
fig.colorbar(im, ax=axs[1])

plt.show()

In [None]:
# Given strings
strings = "2011_09_26_drive_0001 2011_09_26_drive_0002 2011_09_26_drive_0005 2011_09_26_drive_0009 2011_09_26_drive_0011 2011_09_26_drive_0013 2011_09_26_drive_0014 2011_09_26_drive_0015 2011_09_26_drive_0017 2011_09_26_drive_0018 2011_09_26_drive_0019 2011_09_26_drive_0020 2011_09_26_drive_0022 2011_09_26_drive_0023 2011_09_26_drive_0027 2011_09_26_drive_0028 2011_09_26_drive_0029 2011_09_26_drive_0032 2011_09_26_drive_0035 2011_09_26_drive_0036 2011_09_26_drive_0039 2011_09_26_drive_0046 2011_09_26_drive_0048 2011_09_26_drive_0051 2011_09_26_drive_0052 2011_09_26_drive_0056 2011_09_26_drive_0057 2011_09_26_drive_0059 2011_09_26_drive_0060 2011_09_26_drive_0061 2011_09_26_drive_0064 2011_09_26_drive_0070 2011_09_26_drive_0079 2011_09_26_drive_0084 2011_09_26_drive_0086 2011_09_26_drive_0087 2011_09_26_drive_0091 2011_09_26_drive_0093 2011_09_26_drive_0095 2011_09_26_drive_0096 2011_09_26_drive_0101 2011_09_26_drive_0104 2011_09_26_drive_0106 2011_09_26_drive_0113 2011_09_26_drive_0117 2011_09_26_drive_0119 2011_09_26_drive_0121 2011_09_26_drive_0122 2011_09_26_drive_0125 2011_09_26_drive_0126 2011_09_26_drive_0128 2011_09_26_drive_0132 2011_09_26_drive_0134 2011_09_26_drive_0135 2011_09_26_drive_0136 2011_09_26_drive_0138 2011_09_26_drive_0141 2011_09_26_drive_0143 2011_09_26_drive_0145 2011_09_26_drive_0146 2011_09_26_drive_0149 2011_09_26_drive_0153 2011_09_26_drive_0154 2011_09_26_drive_0155 2011_09_26_drive_0156 2011_09_26_drive_0160 2011_09_26_drive_0161 2011_09_26_drive_0162 2011_09_26_drive_0165 2011_09_26_drive_0166 2011_09_26_drive_0167 2011_09_26_drive_0168 2011_09_26_drive_0171"

# Split the string and return the list
result_list = strings.split()
print(result_list)