# Image interpolation - train and test

## Load libraries

In [1]:
import cv2
import datetime
import os
import matplotlib.pyplot as plt
import random
import numpy as np

from cv2 import imread, IMREAD_GRAYSCALE, resize

from glob import glob

from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Bidirectional, LSTMCell, Dropout, Conv2D, ConvLSTM2D, Conv2DTranspose, MaxPooling2D, concatenate, Bidirectional, LSTM, Reshape, Layer
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, TensorBoard
from tensorflow.keras.utils import Sequence
from tensorflow.keras import backend as K
import tensorflow as tf

from sklearn.metrics import mean_absolute_error

random.seed()

## Set parameters

In [3]:
params = {
    'img' : {
        'height' : 128,
        'width' : 128,
        'n_channels' : 1
    },
    'batch_size' : 32,
    'frame_dist' : 1,
    'videos_dir' : "./youtube",
    'n_past': 1, 
    'n_future': 1,
    'n_channels': 1,
    'duration_video' : 5,
    'from_fps' : 30
}

## Set custom metric

In [4]:
def SAD(y_true, y_pred):
    return K.sum(K.abs(y_true - y_pred))//params['batch_size']

## Create models

### Unet

In [5]:
class Reduction(Layer):
  def __init__(self, name='Reduction', pooling=True, filter=16): 
    self.layers = [
        Conv2D(filter, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same'),
        Dropout(0.1),
        Conv2D(filter, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')
        ]
    if pooling:
      self.layers.insert(0, MaxPooling2D((2, 2)))
    super().__init__(name=name)
   
  def call(self, net):
    for layer in self.layers:
      net = layer(net)
    return net

In [6]:
class Expandation(Layer):
  def __init__(self, name='Expandation', filter=16):
    self.conv2D_trans = Conv2DTranspose(filter, (2,2), strides=(2,2), padding='same')
    self.conv2D_1 = Conv2D(filter, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')
    self.conv2D_2 = Conv2D(filter, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')
    self.dropout = Dropout(0.1)
    super().__init__(name=name)
  
  def call(self, net, reduction_layer):
    net = self.conv2D_trans(net)
    net = concatenate([net, reduction_layer])
    net = self.conv2D_1(net)
    net = self.dropout(net)
    net = self.conv2D_2(net)
    return net

In [7]:
class U_net(Layer):
  def __init__(self, name='Unet', n_levels=3, starting_number_filters=16, output_dim=(128, 128, 1)):
    self.output_dim = output_dim
    self.n_levels = n_levels
    self.starting_number_filters = starting_number_filters
    super().__init__(name=name)

    self.conv2D_16 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')
    self.conv2D_1 = Conv2D(1, (1, 1), activation='relu', kernel_initializer='he_normal', padding='same')

    self.reductions = [Reduction(name='ReductionMASDAC', filter=16, pooling=False)]
    for i in range(1, self.n_levels):
      filter = starting_number_filters*2**i
      self.reductions.append(Reduction(name=f'Reduction{filter}', filter=filter))

    self.expandations = []
    for i in range(self.n_levels-1):
      filter = starting_number_filters*2**(self.n_levels-i-1)
      self.expandations.append(Expandation(name=f'Expandation{filter}', filter=filter))
  
  def get_config(self):
      config = super().get_config()
      config.update({
          "output_dim": self.output_dim,
          "n_levels": self.n_levels,
          "starting_number_filters": self.starting_number_filters,
      })
      return config

  def build(self, input_shape): 
    print(input_shape)
    self.kernel = self.add_weight(name = 'kernel', shape = (input_shape[1], *self.output_dim), initializer = 'normal', trainable = True) 
    super().build(input_shape)
   
  def call(self, input):
    reductions = []
    for i, reduction in enumerate(self.reductions):
      input_layer = reductions[i-1] if i>0 else input
      reduction_layer = reduction(input_layer)
      reductions.append(reduction_layer)

    expandations = []
    for i, expandation in enumerate(self.expandations):
      input_layer_to_expand = expandations[i-1] if i>0 else reductions[-1]
      input_layer_to_concat = reductions[self.n_levels-i-2]
      expandation_layer = expandation(input_layer_to_expand, input_layer_to_concat)
      expandations.append(expandation_layer)

    output = self.conv2D_16(expandations[-1])
    output = self.conv2D_1(output)
    
    return output
  
  
def get_unet():
  inputs = Input((params['img']['height'], params['img']['width'], params['img']['n_channels']*2))
  unet = U_net(name='unet',output_dim=(params['img']['height'], params['img']['width'], params['img']['n_channels']))
  unet_layer = unet(inputs)
  model = Model(inputs=inputs, outputs=unet_layer)
  model.compile(optimizer='adam', loss='mae')
  model.summary()
  return model

def get_unet_v2():
  inputs1 = Input((params['img']['height'], params['img']['width'], params['img']['n_channels']))
  inputs2 = Input((params['img']['height'], params['img']['width'], params['img']['n_channels']))

  unet = U_net(name='unet')
  unet_layer1 = unet(inputs1)
  unet_layer2 = unet(inputs2)
  concat_ = concatenate([unet_layer1, unet_layer2])
  outputs_conv = Conv2D(params['img']['n_channels'], (1, 1), activation='sigmoid')(concat_)

  model = Model(inputs=[inputs1, inputs2], outputs=outputs_conv)
  model.compile(optimizer='adam', loss='mae')
  model.summary()
  return model

### Dilated convolution

In [None]:
def dilated_conv():
  inputs = Input((params['img']['height'], params['img']['width'], params['n_past']+params['n_future']))

  dc = Conv2D(16, (3, 3), activation='relu', dilation_rate=(2,2), padding='same') (inputs)
  dc = Conv2D(32, (3, 3), activation='relu', dilation_rate=(2,2), padding='same') (dc)
  dc = Conv2D(64, (3, 3), activation='relu', dilation_rate=(2,2), padding='same') (dc)
  dc = Conv2D(128, (3, 3), activation='relu', dilation_rate=(2,2), padding='same') (dc)
  outputs_conv = Conv2D(1, (1, 1), activation='linear') (dc)

  model = Model(inputs=[inputs], outputs=[outputs_conv])
  model.compile(optimizer='adam', loss='mae', metrics=[SAD])
  model.summary()
  return model

def dilated_conv_lstm():
  inputs = Input((params['img']['width'], params['img']['height'], params['n_past']+params['n_future']))
  reshaped = Reshape((1, params['img']['width'], params['img']['height'], params['n_past']+params['n_future']))(inputs)

  dc = ConvLSTM2D(8, (3, 3), activation='relu', dilation_rate=(2,2), padding='same', return_sequences=True) (reshaped)
  dc = ConvLSTM2D(16, (3, 3), activation='relu', dilation_rate=(2,2), padding='same', return_sequences=True) (dc)
  dc = ConvLSTM2D(32, (3, 3), activation='relu', dilation_rate=(2,2), padding='same', return_sequences=False) (dc)

  outputs_conv = Conv2D(1, (1, 1), activation='linear') (dc)

  model = Model(inputs=[inputs], outputs=[outputs_conv])
  model.compile(optimizer='adam', loss='mae', metrics=[SAD])
  model.summary()
  return model

def dilated_conv_lstm_bidirec():
  inputs = Input((params['img']['width'], params['img']['height'], params['n_past']+params['n_future']))
  reshaped = Reshape((1, params['img']['width'], params['img']['height'], params['n_past']+params['n_future']))(inputs)

  dc = Bidirectional(ConvLSTM2D(8, (3, 3), activation='relu', dilation_rate=(2,2), padding='same', return_sequences=True)) (reshaped)
  dc = Bidirectional(ConvLSTM2D(16, (3, 3), activation='relu', dilation_rate=(2,2), padding='same', return_sequences=True)) (dc)
  dc = Bidirectional(ConvLSTM2D(32, (3, 3), activation='relu', dilation_rate=(2,2), padding='same', return_sequences=False)) (dc)

  outputs_conv = Conv2D(1, (1, 1), activation='linear') (dc)

  model = Model(inputs=[inputs], outputs=[outputs_conv])
  model.compile(optimizer='adam', loss='mae', metrics=[SAD])
  model.summary()
  return model

### Custom architecture

In [None]:
def dilated_conv_lstm_bidirec_and_unet():
  inputs1 = Input((params['img']['height'], params['img']['width'], params['img']['n_channels']))
  inputs2 = Input((params['img']['height'], params['img']['width'], params['img']['n_channels']))
  unet = U_net(name='unet')
  unet_layer1 = unet(inputs1)
  unet_layer2 = unet(inputs2)
  outputs_unet = concatenate([unet_layer1, unet_layer2])

  inputs = concatenate([inputs1, inputs2])
  reshaped = Reshape((1, params['img']['width'], params['img']['height'], params['n_past']+params['n_future']))(inputs)
  outputs_conv_lstm = Bidirectional(ConvLSTM2D(8, (3, 3), activation='relu', dilation_rate=(2,2), padding='same', return_sequences=True)) (reshaped)
  outputs_conv_lstm = Bidirectional(ConvLSTM2D(16, (3, 3), activation='relu', dilation_rate=(2,2), padding='same', return_sequences=True)) (outputs_conv_lstm)
  outputs_conv_lstm = Bidirectional(ConvLSTM2D(32, (3, 3), activation='relu', dilation_rate=(2,2), padding='same', return_sequences=False)) (outputs_conv_lstm)
  
  concat_ = concatenate([outputs_unet, outputs_conv_lstm])
  outputs = Conv2D(params['img']['n_channels'], (1, 1), activation='sigmoid') (concat_)

  model = Model(inputs=[inputs1, inputs2], outputs=outputs)
  model.compile(optimizer='adam', loss='mae', metrics=[SAD])
  model.summary()
  return model

model = dilated_conv_lstm_bidirec_and_unet()

## Create callbacks

In [9]:
logdir = os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))

callbacks = [
    ModelCheckpoint(filepath="./dilated_conv_lstm_bidirec_and_unet.hdf5", monitor='val_loss', save_best_only=True, verbose=1),
    TensorBoard(logdir, histogram_freq=1)
]

## Instanciate data generators

In [None]:
from datagenerators import DataGeneratorDoubleInput, DataGeneratorDoubleInputDict, DataGeneratorSingleInput 

params_data_generator = {
    'batch_size' : params['batch_size'],
    'im_size' : (params['img']['height'], params['img']['width']),
    'frame_dist' : params['frame_dist'],
    'n_past' : params['n_past'],
    'n_future' : params['n_future'],
    'type_' : 'train'
}
training_generator = DataGeneratorDoubleInput(**params_data_generator)

params_data_generator = {
    'batch_size' : params['batch_size'],
    'im_size' : (params['img']['height'], params['img']['width']),
    'frame_dist' : params['frame_dist'],
    'n_past' : params['n_past'],
    'n_future' : params['n_future'],
    'type_' : 'valid'
}
validation_generator = DataGeneratorDoubleInput(**params_data_generator)

## Fit model

In [None]:
tf.keras.backend.clear_session()
hist = model.fit(
    training_generator,
    validation_data = validation_generator,
    epochs=10, 
    workers=30, 
    use_multiprocessing=True,
    callbacks=callbacks)

## Test model

In [None]:
model = dilated_conv_lstm_bidirec_and_unet()
model.load_weights('dilated_conv_lstm_bidirec_and_unet.hdf5')

In [None]:
batchX, batchY = validation_generator.__getitem__(2792)
pred = model.predict(batchX)

fig, axs = plt.subplots(nrows=2, ncols=3, figsize=(12,7))
axs[0,0].imshow(batchX[0][0, :, :, 0], cmap='gray')
axs[0,1].imshow(batchY[0, :, :, 0], cmap='gray')
axs[0,2].imshow(batchX[1][0, :, :, 0], cmap='gray')

axs[1,0].imshow(batchX[0][0, :, :, 0], cmap='gray')
axs[1,1].imshow(pred[0, :, :, 0], cmap='gray')
axs[1,2].imshow(batchX[1][0, :, :, 0], cmap='gray')

print('MAE - Batch', mean_absolute_error(batchY.reshape(params['batch_size'], 128*128), pred.reshape(params['batch_size'], 128*128)))

In [None]:
batchX, batchY = validation_generator.__getitem__(random.randint(0, len(training_generator)))
pred = model.predict(batchX)
mean = (batchX[0][:, :, :, 0]+batchX[1][:, :, :, 0])/2

fig, axs = plt.subplots(nrows=1, ncols=3, figsize=(12,7))
axs[0].imshow(batchY[0, :, :, 0], cmap='gray')
axs[1].imshow(pred[0, :, :, 0], cmap='gray')
axs[2].imshow(mean[0, :, :], cmap='gray')

print('MAE - Pred', mean_absolute_error(batchY.reshape(params['batch_size'], 128*128), pred.reshape(params['batch_size'], 128*128)))
print('MAE - Mean', mean_absolute_error(batchY.reshape(params['batch_size'], 128*128), mean.reshape(params['batch_size'], 128*128)))

In [None]:
def model_naif_moyenne(X, y):
  return np.array([(x[0][:, :, :, 0]+x[1][:, :, :, 0])/2 for x  in X])

X, Y = [], []
for k in range(len(validation_generator)//5):
  batchX, batchY = validation_generator.__getitem__(k)
  X.append(batchX)
  Y.append(batchY)

pred_moyenne = model_naif_moyenne(X, Y).reshape((-1, params['img']['height']*params['img']['width']))
mae_model = model.evaluate([
    np.array([x[0][:, :, :, 0] for x  in X]).reshape((-1, params['img']['height'], params['img']['width'],1)), 
    np.array([x[1][:, :, :, 0] for x  in X]).reshape((-1, params['img']['height'], params['img']['width'],1))],

    np.array(Y).reshape((-1, params['img']['height'], params['img']['width'],1))
)[0]

Y = np.array(Y).reshape((-1, params['img']['height']* params['img']['width']))

print('MAE - Moyenne images', mean_absolute_error(Y, pred_moyenne))
print('MAE - Modele FCN', mae_model)