In [1]:
INIT_LEARNING_RATE = 0.0001

In [2]:
import csv
import random
import matplotlib.pyplot as plt
from datetime import datetime
from tensorflow.keras.utils import Sequence
from tensorflow.keras.optimizers import Adam
import tensorflow.keras.backend as K

In [3]:
def train_val_split(train_paths, val_size):
    random.shuffle(train_paths)
    len_train_paths = len(train_paths)
    i = int(len_train_paths * (1.0 - val_size))
    train = train_paths[0:i]
    val = train_paths[i:len(train_paths)]
    return train, val

In [4]:
def load_train_csv(train_path):
    train_paths = read_csv(train_path)
    labels = {img_path: dm_path for img_path, dm_path in train_paths}
    x_paths = [img_path for img_path, dm in train_paths]
    x_train_paths, x_val_paths = train_val_split(x_paths, 0.2)

    partition = {
        'train': x_train_paths,
        'validation': x_val_paths
    }
    return partition, labels

In [5]:
def plot_history(history):    
    plt.plot(history['loss'],          label='loss')
    plt.plot(history['val_loss'],      label='val_loss')
    plt.plot(history['depth_acc'],     label='acc')
    plt.plot(history['val_depth_acc'], label='val_acc')
    plt.legend()
    plt.show()

In [6]:
def save_model(type, model):
    name = 'model_{}_{}.keras'.format(type, int(datetime.timestamp(datetime.now())))
    model.save(name)
    return name

In [7]:
opt = Adam(learning_rate=INIT_LEARNING_RATE, amsgrad=True)

In [8]:
def gen_depth_loss(y_true, y_pred, params=(1.0,1.0,0.1)):
    w1, w2, w3 = params
    
    l_depth = K.mean(K.abs(y_pred - y_true), axis=-1)
    
    dy_true, dx_true = tf.image.image_gradients(y_true)
    dy_pred, dx_pred = tf.image.image_gradients(y_pred)
    l_edges = K.mean(K.abs(dy_pred - dy_true) + K.abs(dx_pred - dx_true), axis=-1)
    
    l_ssim = K.clip((1 - tf.image.ssim(y_true, y_pred, 1.0)) * 0.5, 0, 1)
    
    return (w1 * l_ssim) + (w2 * K.mean(l_edges)) + (w3 * K.mean(l_depth))

In [9]:
def depth_acc(y_true, y_pred):
    return K.mean(K.equal(K.round(y_true), K.round(y_pred)))

In [10]:
class DataGenerator(Sequence):
    def __init__(self, list_IDs, labels, batch_size=16, dim=(128,128),
                 n_channels=3, shuffle=True, pred=False):
        self.dim = dim
        self.batch_size = batch_size
        self.labels = labels
        self.list_IDs = list_IDs
        self.n_channels = n_channels
        self.shuffle = shuffle
        self.pred = pred
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        indexes = self.indexes[index * self.batch_size : (index + 1) * self.batch_size]
        list_IDs_temp = [self.list_IDs[k] for k in indexes]
        if self.pred:
            X = self.__data_generation(list_IDs_temp)
            return X
        X, y = self.__data_generation(list_IDs_temp)
        return X, y

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, list_IDs_temp):
        X = np.empty((self.batch_size, self.dim[0], self.dim[1], self.n_channels))
        
        if not self.pred:
            y = np.empty((self.batch_size, self.dim[0], self.dim[1], 1))
        
            for i, ID in enumerate(list_IDs_temp):
                res = random.choice([True, False])
                X[i,] = preprocess_image(ID, self.dim[0], self.dim[1], False, res)
                y[i,] = preprocess_image(self.labels[ID], self.dim[0], self.dim[1], True, res)
            return X, y
        else:
            for i, ID in enumerate(list_IDs_temp):
                res = random.choice([True, False])
                X[i,] = preprocess_image(ID, self.dim[0], self.dim[1], False, res)
            return X