In [4]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Input, Conv2D, Flatten, Dense, Conv2DTranspose, Reshape, Lambda, Activation, BatchNormalization, LeakyReLU, Dropout
from tensorflow.keras.models import Model
from keras import backend as K
from keras.utils.vis_utils import plot_model
# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
#for dirname, _, filenames in os.walk('/kaggle/input'):
    #for filename in filenames:
        #print(os.path.join(dirname, filename))
# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
INPUT_DIM = (128,128,3)
BATCH_SIZE = 512
Z_DIM = 128

In [5]:
data_flow_test =ImageDataGenerator(rescale=1./255).flow_from_directory('../input/testcleb/testcleb', 
                                                                   target_size = INPUT_DIM[:2],
                                                                   batch_size = 1500,
                                                                   shuffle = True,
                                                                   class_mode = 'input',
                                                                   subset = 'training'
                                                                   )

In [6]:
data_flow=ImageDataGenerator(rescale=1./255).flow_from_directory('../input/cleberity1/traincleb/traincleb', 
                                                                   target_size = INPUT_DIM[:2],
                                                                   batch_size = BATCH_SIZE,
                                                                   shuffle = True,
                                                                   class_mode = 'input',
                                                                   subset = 'training'
                                                                   )

In [17]:
from tensorflow.keras import initializers


# ENCODER
def build_vae_encoder(input_dim, output_dim, conv_filters, conv_kernel_size, 
                  conv_strides, use_batch_norm = False, use_dropout = False):
  
  global K
  K.clear_session()
  

  n_layers = len(conv_filters)
  encoder_input = Input(shape = input_dim, name = 'encoder_input',)
  x = encoder_input

  for i in range(n_layers):
      x = Conv2D(filters = conv_filters[i], 
                  kernel_size = conv_kernel_size[i],
                  strides = conv_strides[i], 
                  kernel_initializer="he_normal",
                  padding = 'same',
                  name = 'encoder_conv_' + str(i)
                  )(x)
      if use_batch_norm:
        x = BathcNormalization()(x)
  
      x = LeakyReLU()(x)

      if use_dropout:
        x = Dropout(rate=0.25)(x)
        
  shape_before_flattening = K.int_shape(x)[1:] 
  
  x = Flatten()(x)
  
  mean_mu = Dense(output_dim, name = 'mu',kernel_initializer="he_normal")(x)
  log_var = Dense(output_dim, name = 'log_var',kernel_initializer="he_normal")(x)

  
  def sampling(args):
    mean_mu, log_var = args
    epsilon = K.random_normal(shape=K.shape(mean_mu), mean=0., stddev=1.) 
    return mean_mu + K.exp(log_var/2)*epsilon   
  
  
  encoder_output = Lambda(sampling, name='encoder_output')([mean_mu, log_var])

  return encoder_input, encoder_output, mean_mu, log_var, shape_before_flattening, Model(encoder_input, encoder_output)

In [18]:
vae_encoder_input, vae_encoder_output,  mean_mu,log_var,  shape_before_flattening, vae_encoder  = build_vae_encoder(input_dim = INPUT_DIM,
                                    output_dim = Z_DIM, 
                                    conv_filters = [32, 64, 64,64,64],
                                    conv_kernel_size = [3,3,3,3,3],
                                    conv_strides = [2,2,2,2,2])

vae_encoder.summary()

In [19]:
# Decoder
def build_decoder(input_dim, shape_before_flattening, conv_filters, conv_kernel_size, 
                  conv_strides):

  
  n_layers = len(conv_filters)
  decoder_input = Input(shape = (input_dim,) , name = 'decoder_input')

  x = Dense(np.prod(shape_before_flattening))(decoder_input)
  x = Reshape(shape_before_flattening)(x)


  for i in range(n_layers):
      x = Conv2DTranspose(filters = conv_filters[i], 
                  kernel_size = conv_kernel_size[i],
                  kernel_initializer="he_normal",
                  strides = conv_strides[i], 
                  padding = 'same',
                  name = 'decoder_conv_' + str(i)
                  )(x)
      
      
      if i < n_layers - 1:
        x = LeakyReLU()(x)
      else:
        x = Activation('sigmoid')(x)
  decoder_output = x

  return decoder_input, decoder_output, Model(decoder_input, decoder_output)

In [20]:
decoder_input, decoder_output, vae_decoder = build_decoder(input_dim = Z_DIM,
                                        shape_before_flattening = shape_before_flattening,
                                        conv_filters = [64,64,64,32,3],
                                        conv_kernel_size = [3,3,3,3,3],
                                        conv_strides = [2,2,2,2,2]
                                        )
vae_decoder.summary()

In [21]:
from tensorflow.python.framework.ops import disable_eager_execution
disable_eager_execution()

In [22]:

vae_input = vae_encoder_input


vae_output = vae_decoder(vae_encoder_output)


vae_model = Model(vae_input, vae_output)

vae_model.summary()

In [23]:
LEARNING_RATE = 0.001
N_EPOCHS = 10
LOSS_FACTOR = 10000

def r_loss(y_true, y_pred):
    return K.mean(K.square(y_true - y_pred), axis = [1,2,3])

def kl_loss(y_true, y_pred):
    kl_loss =  -0.5 * K.sum(1 + log_var - K.square(mean_mu) - K.exp(log_var), axis = 1)
    return kl_loss

def total_loss(y_true, y_pred):
    return LOSS_FACTOR*r_loss(y_true, y_pred) + kl_loss(y_true, y_pred)
  
def L1_loss(y_true,y_pre): 
    return K.sum(K.abs(y_true-y_pre))

def L2_loss(y_true,y_pre):
    return K.sum(K.square(y_true-y_pre), axis = [1,2,3])


In [24]:
from keras.optimizers import adam_v2
adam_optimizer = adam_v2.Adam(learning_rate=LEARNING_RATE, beta_1=0.9, beta_2=0.999, epsilon=1e-08)

vae_model.compile(optimizer=adam_optimizer, loss = L2_loss,metrics=[L2_loss]) # loss = L1_loss

In [None]:
NUM_IMAGES= 17902
vae_model.fit_generator(data_flow, 
                        shuffle=True, 
                        epochs = N_EPOCHS, 
                        initial_epoch = 0, 
                        steps_per_epoch=NUM_IMAGES / BATCH_SIZE
                        )

In [None]:
example_batch = next(data_flow_test)
example_batch = example_batch[0]
example_images = example_batch[:10]

images = example_batch[:10]
reconst_images = vae_model.predict(images)
print(example_batch.shape)

In [None]:
def plot_compare_vae(images=None):
  
  if images is None:
    example_batch = next(data_flow_test)
    example_batch = example_batch[0]
    images = example_batch[:10]

  n_to_show = images.shape[0]
  reconst_images = vae_model.predict(images)

  fig = plt.figure(figsize=(15, 3))
  fig.subplots_adjust(hspace=0.4, wspace=0.4)

  for i in range(n_to_show):
      img = images[i].squeeze()
      sub = fig.add_subplot(2, n_to_show, i+1)
      sub.axis('off')        
      sub.imshow(img)

  for i in range(n_to_show):
      img = reconst_images[i].squeeze()
      sub = fig.add_subplot(2, n_to_show, i+n_to_show+1)
      sub.axis('off')
      sub.imshow(img)

In [None]:
import matplotlib.pyplot as plt
plot_compare_vae(images = example_images)

In [None]:
def vae_generate_images(n_to_show=10):
  reconst_images = vae_decoder.predict(np.random.normal(0,1,size=(n_to_show,Z_DIM)))

  fig = plt.figure(figsize=(15, 3))
  fig.subplots_adjust(hspace=0.4, wspace=0.4)

  for i in range(n_to_show):
        img = reconst_images[i].squeeze()
        sub = fig.add_subplot(2, n_to_show, i+1)
        sub.axis('off')        
        sub.imshow(img)
        
vae_generate_images(n_to_show=10)

In [None]:
%%writefile utils.py
import numpy as np
from numpy import cov
from numpy import trace
from numpy import iscomplexobj
from numpy import asarray
from numpy.random import randint
from scipy.linalg import sqrtm
from tensorflow.keras.applications.inception_v3 import InceptionV3
from tensorflow.keras.applications.inception_v3 import preprocess_input
from tensorflow.keras.datasets.mnist import load_data
from tensorflow.keras.models import load_model
from skimage.transform import resize
import math

def scale_images(images, new_shape):
	images_list = list()
	for image in images:
		# resize with nearest neighbor interpolation
		new_image = resize(image, new_shape, 0)
		# store
		images_list.append(new_image)
	return asarray(images_list)
 
# calculate frechet inception distance
def calculate_fid(model, images1, images2):
	# calculate activations
	act1 = model.predict(images1)
	act2 = model.predict(images2)
	# calculate mean and covariance statistics
	mu1, sigma1 = act1.mean(axis=0), cov(act1, rowvar=False)
	mu2, sigma2 = act2.mean(axis=0), cov(act2, rowvar=False)
	# calculate sum squared difference between means
	ssdiff = np.sum((mu1 - mu2)**2.0)
	# calculate sqrt of product between cov
	covmean = sqrtm(sigma1.dot(sigma2))
	# check and correct imaginary numbers from sqrt
	if iscomplexobj(covmean):
		covmean = covmean.real
	# calculate score
	fid = ssdiff + trace(sigma1 + sigma2 - 2.0 * covmean)
	return fid
 
# prepare the inception v3 model

def getFID(real_images, fake_images):
	'''
	real_images : your real images B,W,H,C
	fake_images : your fake images B,W,H,C
	

	'''
	
	model = InceptionV3(include_top=False, pooling='avg', input_shape=(299,299,3))

	assert real_images.shape == fake_images.shape, ' Images should have identical shape !!'
	
	print('YOUR input size is ', real_images.shape, fake_images.shape)
	# convert integer to floating point values
	images1 = real_images.astype('float32')
	images2 = fake_images.astype('float32')
	# resize images
	images1 = scale_images(images1, (299,299,3))
	images2 = scale_images(images2, (299,299,3))
	print('Scaled', images1.shape, images2.shape)
	# pre-process images
	images1 = preprocess_input(images1)
	images2 = preprocess_input(images2)

	# fid between images1 and images2
	fid = calculate_fid(model, images1, images2) * 10
	print('FID of your Fake Output: %.3f' % fid)

if __name__ == '__main__':
	images1 = randint(0, 255, 10*32*32*3)
	images1 = images1.reshape((10,32,32,3))
	images2 = randint(0, 255, 10*32*32*3)
	images2 = images2.reshape((10,32,32,3))
	getFID(images1,images2)

In [None]:
import pytorch

In [None]:
from utils import getFID

In [None]:
getFID(images, reconst_images)

In [None]:
generated_image= vae_decoder.predict(np.random.normal(0,1,size=(1500,Z_DIM)))
getFID(example_batch, generated_image)