In [None]:
import tensorflow as tf
print(tf.__version__)

import keras
from keras.layers import Input, Dense, Conv2D, Reshape, BatchNormalization, LeakyReLU, Conv2DTranspose
from keras.layers import Flatten, Reshape, LSTM
from keras.models import Model, Sequential
from keras.datasets import mnist
from keras.losses import binary_crossentropy
from keras import backend as K
from keras.optimizers import RMSprop, Adam

import numpy as np
import matplotlib.pyplot as plt 
import cv2
import skimage
import imutils
%matplotlib inline

np.random.seed(42)

from imutils import face_utils
import numpy as np
import argparse
import imutils
import dlib
import cv2

In [None]:
data = np.load("/content/drive/MyDrive/stats402_data/human_envolving_colored_data_large_R_channel.npy")
frame_num = data.shape[0]

print(data.shape)

In [None]:
# show some sample frames

plt.figure(figsize=(20,10))
a, b =3, 4
for i in range(11):  
    plt.subplot(a, b, i+1)
    image = data[350*i]
    plt.imshow(image, cmap="gray")
    plt.axis('off')
    plt.title('frame: ' + str(350*i))
plt.show()

In [None]:
# split the training and testing data
threshold_1 = 3500
x_train = data[0:threshold_1]

# convert to float in [0,1]
x_train = x_train.astype('float32') / 255.

print(x_train.shape)

In [None]:
# function which samples a new point in the latent space based on the encoder result
class Sampling(keras.layers.Layer):
    def call(self, inputs):
        mean, log_var = inputs
        return K.random_normal(tf.shape(log_var)) * K.exp(log_var / 2) + mean

In [None]:
# building the encoder
latent_space = 8

inputs = Input(shape=[x_train.shape[1], x_train.shape[2], 1])

z = Conv2D(filters=64, kernel_size=4, strides=2, padding='same', activation="selu")(inputs)
z = BatchNormalization()(z)
z = LeakyReLU()(z)
z = Conv2D(filters=32, kernel_size=4, strides=2, padding='same', activation="selu")(z)
z = BatchNormalization()(z)
z = LeakyReLU()(z)
z = Conv2D(filters=16, kernel_size=4, strides=2, padding='same', activation="selu")(z)
z = BatchNormalization()(z)
z = LeakyReLU()(z)
z = Flatten()(z)


# the encoder network has two outputs which are the parameters for Gaussian distribution 
# in the Sampling function

codings_mean = Dense(latent_space)(z)
codings_log_var = Dense(latent_space)(z)

# use the Sampling function to obtain the point 'codings' in the latent space
codings = Sampling()([codings_mean, codings_log_var])
encoder = Model(inputs=[inputs], outputs=[codings_mean, codings_log_var, codings])

encoder.summary()

In [None]:
# the decoder mirrors the encoder

decoder_inputs = Input(shape=[latent_space])

x = Reshape([-1, 2, 2])(decoder_inputs)
x = Conv2DTranspose(filters=16, kernel_size=4, strides=1, padding='same', activation="selu")(x)
x = BatchNormalization()(x)
x = LeakyReLU()(x)
x = Conv2DTranspose(filters=32, kernel_size=4, strides=1, padding='same', activation="selu")(x)
x = BatchNormalization()(x)
x = LeakyReLU()(x)
x = Conv2DTranspose(filters=64, kernel_size=4, strides=1, padding='same', activation="selu")(x)
x = BatchNormalization()(x)
x = LeakyReLU()(x)
x = Flatten()(x)
x = Dense(x_train.shape[1] * x_train.shape[2], activation="sigmoid")(x)
outputs = keras.layers.Reshape([x_train.shape[1], x_train.shape[2], 1])(x)

decoder = keras.Model(inputs=[decoder_inputs], outputs=[outputs])

decoder.summary()

In [None]:
# combine the encoder and decoder to the variational autoencoder
# only codings, the sampled point in latent space, is passed through
_, _, codings = encoder(inputs)
reconstructions = decoder(codings)
encoder_decoder = keras.Model(inputs=[inputs], outputs=[reconstructions])

In [None]:
# we need to add the Kullback Leibler divergence to the loss function
latent_loss = -0.5 * K.sum(1 + codings_log_var - K.exp(codings_log_var) - K.square(codings_mean), axis=-1)
encoder_decoder.add_loss(K.mean(latent_loss) / 9216.)

# built the model
encoder_decoder.compile(loss='binary_crossentropy', optimizer=keras.optimizers.RMSprop(clipnorm=0.0001))
encoder_decoder.summary()

In [None]:
history = encoder_decoder.fit(x_train, x_train,
                epochs=20,
                batch_size=128)

In [None]:
# show the learning process
loss = history.history['loss']
epochs = range(len(loss))

plt.figure(figsize=(10,6))
plt.plot(epochs, loss, 'bo', label='Training')
plt.xlabel('Epoch', size=14)
plt.ylabel('Loss', size=14)
plt.legend()
plt.ylim([0,1])
plt.show()

In [None]:
# First, for comparison with the autoencoder example we simply feed random twodimensional vectors into the decoder.

random = np.random.uniform(-0.5, 0.5, latent_space*10)
random = random.reshape(10, latent_space)       # reshape to ten vectors
random_img = decoder.predict(random) # make predictions for those vectors

plt.figure(figsize=(10, 30))

for i in range(10):
    plt.subplot(10, 1, i + 1)
    plt.imshow(np.reshape(random_img[i], (x_train.shape[1], x_train.shape[2])), cmap="gray")
    plt.axis('off')

plt.show()


In [None]:
def reconstruct(sample):
    sample_reshape = np.reshape(sample, (1, sample.shape[0], sample.shape[1]))
    z_mean, _, _ = encoder.predict(sample_reshape)
    reconstruction = decoder.predict(z_mean)
    reconstruction = np.reshape(reconstruction, (reconstruction.shape[1], reconstruction.shape[2]))
    return reconstruction

In [None]:
# show some reconstruction result
interval = 350

plt.figure(figsize=(10,60))
for i in range(11):

    sample = data[i * interval]

    sample = sample.astype('float32') / 255.0

    plt.subplot(11, 2, 2*i+1)
    plt.imshow((reconstruct(sample) * 255).astype('uint8'), cmap="gray")
    plt.axis('off')
    plt.title("reconstruction")

    plt.subplot(11, 2, 2*i+2)
    plt.imshow((sample * 255).astype('uint8'), cmap="gray")
    plt.axis('off')
    plt.title("ground truth")

    

In [None]:
# plot the latent space representation versus time

z_mean, _, _ = encoder.predict(x_train)

x = np.arange(z_mean.shape[0])

plt.figure(figsize=(12, 20))

for i in range(8):
    plt.subplot(8, 1, i+1)
    plt.plot(x, z_mean[:, i], "b")
    plt.xlabel("time", size = 14)
    plt.ylabel("component #" + str(i+1), size = 14)

In [None]:
# save the trained model to file
decoder.save("/content/drive/MyDrive/stats402_data/gray_decoder_evolution_B_channel", save_format="h5")
decoder.save_weights("/content/drive/MyDrive/stats402_data/decoder_gray_model_evolution_B_channel_weights.h5")

# save the latent space representation to npy file
time_series = np.arange(0, z_mean.shape[0])
time_series = np.reshape(time_series, (z_mean.shape[0], 1))
latent_representation = np.concatenate((time_series, z_mean), axis=1)
print(latent_representation.shape)
np.save("/content/drive/MyDrive/stats402_data/latent_space_representation_evolution_R_channel.npy", latent_representation)