In [None]:
import os, sys
project_path = '/content/gdrive/MyDrive/master/2.5D_sketches_estimator'
sys.path.append(project_path)

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
%cd {project_path}

/content/gdrive/MyDrive/master/2.5D_sketches_estimator


# Setup

In [2]:
# import DL modules
import tensorflow as tf
import keras as keras
from keras.models import Sequential, Model
from keras import layers
# from keras.layers import Conv2D, Flatten, Dense, MaxPooling2D, Dropout, \
#               Conv3D, MaxPooling3D, Input, Deconv3D, BatchNormalization, \
#               Activation, Reshape
from keras import optimizers

# import service modules
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import cv2

In [None]:
tf.__version__

'2.8.2'

In [None]:
#@title Setup
# network and training parameters
RANDOM_SEED = 10
SAMPLE_RATE = 0.01 #@param
N_EPOCH = 1 #@param {type: "integer"}
BATCH_SIZE = 128 #@param {type: "integer"}
VERBOSE = 1 #@param {type: "boolean"}
# LOSS_FUNCTION = 'MeanSquaredError'
LOSS_FUNCTION = 'SSIM'
VALIDATION_SPLIT = 0.2 #@param
TEST_SPLIT = 0.1 #@param
INPUT_SHAPE = (256, 256, 256, 3) #@param
HEIGHT = INPUT_SHAPE[1]
WIDTH = INPUT_SHAPE[2]
LR = 0.0002
OPTIMIZER = tf.optimizers.Adam(
  learning_rate=LR,
  amsgrad=False
)

In [None]:
# fix seed
np.random.seed(RANDOM_SEED)

# Load Pre-processed Data

In [None]:
# reference: https://keras.io/examples/vision/depth_estimation/
class DataLoader(tf.keras.utils.Sequence):
  def __init__(self, data, batch_size=6, dim=(256, 256), n_channels=3, shuffle=True):
    """
    Initialization
    """
    self.data = data
    self.indices = self.data.index.tolist()
    self.dim = dim
    self.n_channels = n_channels
    self.batch_size = batch_size
    self.shuffle = shuffle
    self.min_depth = 0.1
    self.on_epoch_end()

  def __len__(self):
    return int(np.ceil(len(self.data) / self.batch_size))

  def __getitem__(self, index):
    # modify batch size of last batch
    if (index + 1) * self.batch_size > len(self.indices):
      self.batch_size = len(self.indices) - index * self.batch_size
    # Generate one batch of data
    # Generate indices of the batch
    index = self.indices[index * self.batch_size : (index + 1) * self.batch_size]
    # Find list of IDs
    batch = [self.indices[k] for k in index]
    x, y = self.load_batch(batch)
    return x, y


  def on_epoch_end(self):
    """
    Updates indexes after each epoch
    """
    self.index = np.arange(len(self.indices))
    if self.shuffle == True:
      np.random.shuffle(self.index)

  def load(self, image_path, sketch_path):
    """
    Load image and 2.5D sketch pair.
    """
    img = np.float32(cv2.imread(image_path))
    sketch = np.float32(cv2.imread(sketch_path))
    # img = cv2.imread(image_path)
    # sketch = cv2.imread(sketch_path)
    return img, sketch

  def load_batch(self, batch):
    """
    Load one batch of data.
    """
    x = np.empty((self.batch_size, *self.dim, self.n_channels), dtype='float32')
    y = np.empty((self.batch_size, *self.dim, self.n_channels), dtype='float32')

    for i, batch_id in enumerate(batch):
      x[i,], y[i,] = self.load(
        self.data["raw_image_file_paths"][batch_id],
        self.data["normal_masked_paths"][batch_id],
      )
    return x, y

In [None]:
index_df = pd.read_csv(os.getcwd()+'/data_paths_optimal.csv')
index_df = index_df.sample(frac=SAMPLE_RATE, random_state=RANDOM_SEED, ignore_index=True)
num_instances = len(index_df.index)
validation_boundary = int(num_instances*VALIDATION_SPLIT)
test_boundary = int(num_instances*TEST_SPLIT) + validation_boundary

# create data loaders
train_loader = DataLoader(data=index_df[test_boundary:].reset_index(drop='true'), batch_size=BATCH_SIZE, dim=INPUT_SHAPE[1:3], n_channels=INPUT_SHAPE[3], shuffle=False)
test_loader = DataLoader(data=index_df[validation_boundary:test_boundary].reset_index(drop='true'), batch_size=BATCH_SIZE, dim=INPUT_SHAPE[1:3], n_channels=INPUT_SHAPE[3], shuffle=False)
validation_loader = DataLoader(data=index_df[:validation_boundary].reset_index(drop='true'), batch_size=BATCH_SIZE, dim=INPUT_SHAPE[1:3], n_channels=INPUT_SHAPE[3], shuffle=False)

In [None]:
print(len(train_loader.index))
print(len(validation_loader.index))
print(len(test_loader.index))

949
271
135


# Define Model

In [None]:
# reference: https://towardsdev.com/implement-resnet-with-tensorflow2-4ee81e33a1ac
class ResBlock(keras.Model):
  def __init__(self, filters, downsample):
    super().__init__()
    if downsample:
      self.conv1 = layers.Conv2D(filters, 3, 2, padding='same')
      self.shortcut = keras.Sequential([
        layers.Conv2D(filters, 1, 2),
        layers.BatchNormalization()
      ])
    else:
      self.conv1 = layers.Conv2D(filters, 3, 1, padding='same')
      self.shortcut = keras.Sequential()

    self.bn1 = layers.BatchNormalization()
    self.rl1 = layers.LeakyReLU(alpha=0.2)
    self.bn2 = layers.BatchNormalization()
    self.rl2 = layers.LeakyReLU(alpha=0.2)
    self.rl3 = layers.LeakyReLU(alpha=0.2)

    self.conv2 = layers.Conv2D(filters, 3, 1, padding='same')
  def call(self, input):
    shortcut = self.shortcut(input)

    input = self.conv1(input)
    input = self.bn1(input)
    input = self.rl1(input)

    input = self.conv2(input)
    input = self.bn2(input)
    input = self.rl2(input)

    input = input + shortcut
    return self.rl3(input)

In [None]:
# encoder - ResNet18
class encoder(keras.Model):
  def __init__(self, output_shape=256, *args, **kwargs):
    super().__init__(*args, **kwargs)

    self.layer0 = keras.Sequential([
      layers.Conv2D(filters=64, kernel_size=7, strides=2, padding='same'),
      layers.MaxPool2D(pool_size=3, strides=2, padding='same'),
      layers.BatchNormalization(),
      layers.ReLU()
    ], name='layer0')
    
    self.layer1 = keras.Sequential([
      ResBlock(64, downsample=False),
      ResBlock(64, downsample=False)
    ], name='layer1')

    self.layer2 = keras.Sequential([
      ResBlock(128, downsample=True),
      ResBlock(128, downsample=False)
    ], name='layer2')

    self.layer3 = keras.Sequential([
      ResBlock(256, downsample=True),
      ResBlock(256, downsample=False)
    ], name='layer3')

    # self.layer4 = keras.Sequential([
    #   ResBlock(512, downsample=True),
    #   ResBlock(512, downsample=False)
    # ], name='layer4')

    # self.gap = layers.GlobalAveragePooling2D(name='gap')
    # self.fc_output = layers.Dense(output_shape, activation='softmax', name='dense_output')

  def call(self, input):
    input = self.layer0(input)
    input = self.layer1(input)
    input = self.layer2(input)
    input = self.layer3(input)
    # input = self.layer4(input)
    # input = self.gap(input)
    # input = self.fc_output(input)
    # input = tf.keras.activations.tanh(input)
    # input = tf.keras.activations.relu(tf.math.sign(input))
    print(input)

    return input

In [5]:
input = tf.constant([-1, -0.6, -0.4, 0, 0.4, 0.6, 1], dtype = tf.float32)
input = tf.keras.activations.tanh(input-0.5)
input = tf.keras.activations.relu(tf.math.sign(input))

In [6]:
input

<tf.Tensor: shape=(7,), dtype=float32, numpy=array([0., 0., 0., 0., 0., 1., 1.], dtype=float32)>

In [None]:
# # Build Model
# # initialize model
encoder_model=encoder()
# encoder_model.build(input_shape=INPUT_SHAPE)
encoder_model.build(input_shape=(None, 256, 256, 3))

# summary model
# encoder_model.call(layers.Input((256, 256, 3)))
encoder_model.summary()

Tensor("Relu:0", shape=(None, 16, 16, 256), dtype=float32)
Model: "encoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 layer0 (Sequential)         (None, 64, 64, 64)        9728      
                                                                 
 layer1 (Sequential)         (None, 64, 64, 64)        148736    
                                                                 
 layer2 (Sequential)         (None, 32, 32, 128)       527488    
                                                                 
 layer3 (Sequential)         (None, 16, 16, 256)       2103552   
                                                                 
Total params: 2,789,504
Trainable params: 2,785,024
Non-trainable params: 4,480
_________________________________________________________________


In [None]:
# decode block
class DecodeBlock(keras.Model):
  def __init__(self, name, filters, kernel_size=3, strides=2):
    super().__init__()

    self._name = name
    
    self.deconv1 = layers.Conv2DTranspose(filters=filters, kernel_size=3, strides=strides, padding='same')
    self.bn1 = layers.BatchNormalization()
    self.rl1 = layers.LeakyReLU(alpha=0.2)

    

  def call(self, input):
    input = self.deconv1(input)
    input = self.bn1(input)
    input = self.rl1(input)



    return input

In [None]:
# decoder
class decoder(keras.Model):
  def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)

    self.layer0 = DecodeBlock(name='layer0', filters=128)

    self.layer1 = DecodeBlock(name='layer1', filters=64)

    self.layer2 = DecodeBlock(name='layer2', filters=32)

    self.layer3 = DecodeBlock(name='layer3', filters=3)

    # self.layer4 = DecodeBlock(3, name='layer4')


  def call(self, input):
    input = self.layer0(input)
    input = self.layer1(input)
    input = self.layer2(input)
    input = self.layer3(input)
    # input = self.layer4(input)

    return input

In [None]:
# Build Model
# initialize model
decoder_model=decoder()
decoder_model.build(input_shape=(400, 16, 16, 256))

# summary model
decoder_model.call(layers.Input((16, 16, 256)))
decoder_model.summary()

Model: "decoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 layer0 (DecodeBlock)        (None, 32, 32, 128)       295552    
                                                                 
 layer1 (DecodeBlock)        (None, 64, 64, 64)        74048     
                                                                 
 layer2 (DecodeBlock)        (None, 128, 128, 32)      18592     
                                                                 
 layer3 (DecodeBlock)        (None, 256, 256, 3)       879       
                                                                 
Total params: 389,071
Trainable params: 388,617
Non-trainable params: 454
_________________________________________________________________


In [None]:
# customized loss
def ssim_loss(target, pred):
  # edges
  dy_true, dx_true = tf.image.image_gradients(target)
  dy_pred, dx_pred = tf.image.image_gradients(pred)
  weights_x = tf.exp(tf.reduce_mean(tf.abs(dx_true)))
  weights_y = tf.exp(tf.reduce_mean(tf.abs(dy_true)))

  # smoothness
  smoothness_x = dx_pred * weights_x
  smoothness_y = dy_pred * weights_y
  smoothness_loss = tf.reduce_mean(abs(smoothness_x)) + tf.reduce_mean(abs(smoothness_y))

  # Structural similarity (SSIM) index
  loss = tf.reduce_mean(
    1 - tf.image.ssim(
      target, pred, max_val=WIDTH, filter_size=7, k1=0.01 ** 2, k2=0.03 ** 2
    )
  )

  return loss

In [None]:
# encoder-decoder
model = Sequential()
encoder_model = encoder()
model.add(encoder_model)
decoder_model = decoder()
model.add(decoder_model)

if LOSS_FUNCTION == 'SSIM':
  model.compile(loss=ssim_loss, optimizer=OPTIMIZER, metrics=['accuracy'])
else:
  model.compile(loss=LOSS_FUNCTION, optimizer=OPTIMIZER, metrics=['accuracy'])


# Train Model

In [None]:
# checkpoints
# additional checkpoint load code and temp directory definition
from keras.callbacks import ModelCheckpoint
from keras.models import load_model

checkpoint_path = project_path+"/intermediate/weights{epoch:02d}.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)

# definition of checkpoint parameters file
checkpoint = ModelCheckpoint(filepath=checkpoint_path, verbose=1, save_best_only=False, save_freq='epoch')

In [None]:
# train
history = model.fit(train_loader, batch_size=BATCH_SIZE, epochs=N_EPOCH, verbose=VERBOSE, validation_data=validation_loader)

Tensor("encoder_1/Relu:0", shape=(128, 16, 16, 256), dtype=float32)
tf.Tensor(
[[[[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   ...
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   ...
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   ...
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  ...

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   ...
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   ...
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]]

  [[0. 0. 0. ... 0. 0. 0.]
   [0. 0. 0. ... 0. 0. 0.]
   [0

KeyboardInterrupt: ignored

In [None]:
# continue train
new_model = load_model(project_path+"/intermediate/weights%02d.ckpt" % history.params['epochs'])
remain_n_epoch = N_EPOCH - history.params['epochs']
if remain_n_epoch == 0:
  print("Train finished.")
else:
  new_history = new_model.fit(train_loader, batch_size=BATCH_SIZE, epochs=remain_n_epoch, verbose=VERBOSE, validation_data=validation_loader, callbacks=[checkpoint])

# Demonstrate Results

In [None]:
# print test score
test_score = new_model.evaluate(test_loader, verbose=1)
print("\nTest score/loss:", test_score[0])
print('Test accuracy:', test_score[1])

In [None]:
score = model.evaluate(test_loader, verbose=VERBOSE)
print("\nTest score/loss:", score[0])
print('Test accuracy:', score[1])

# list all data in history
print(history.history.keys())
# summarize history for accuracy
plt.figure(figsize=(8,5))
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Accuracy History of Net')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.xticks(np.arange(0, N_EPOCH+1, 2.0))
plt.yticks(np.arange(0.6, 1.0, 0.1))
plt.ylim(0.6, 0.95)
plt.legend(['train', 'validation'], loc='upper left')
plt.savefig(os.getcwd()+'/intermediate/acc_hist.png')
plt.show()
# summarize history for loss
plt.figure(figsize=(8,5))
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Loss History of Net')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.xticks(np.arange(0, N_EPOCH+1, 2.0))
plt.yticks(np.arange(0.6, 1.0, 0.1))
plt.ylim(0.6, 0.95)
plt.legend(['train', 'validation'], loc='upper left')
plt.savefig(os.getcwd()+'/intermediate/acc_hist.png')
plt.show()

# Save Model Configures

In [None]:
# from keras.models import model_from_json
# model_json = model.to_json()
# open(project_path+'/intermediate/Net_architecture.json', 'w').write(model_json)

In [None]:
# r1 = tf.random.uniform(shape=[256, 256, 3])
# r2 = tf.random.uniform(shape=[256, 256, 3])
# mse = tf.keras.losses.MeanSquaredError()
# mse(r1, r2).numpy()