In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
tf.compat.v1.disable_eager_execution()

# Loading the data
data is loaded and then normalized to have values between 0 and 1. We only need one channel for the images, so we will convert the images to grayscale.

In [None]:
from keras.datasets import mnist
(x_train,y_train),(x_test,y_test)=mnist.load_data()

In [None]:
x_train = np.expand_dims(x_train,axis=-1)  #expanding dimesnion
x_test = np.expand_dims(x_test,axis=-1) #expanding dimesnion
x_train=x_train.astype('float32')/255  #Normalization
x_test=x_test.astype('float32')/255 #Normalization
y_train=y_train.astype('float32')

print(x_train.shape)

# Varitional Auto Encoder (VAE)
The following link provides a good high level explanation of VAEs:[VAE explanation](https://towardsdatascience.com/understanding-generative-adversarial-networks-gans-cd6e4651a29) <br>




In [None]:
from keras.models import Model,Sequential
from keras.layers import Input,Dense,Conv2D,Flatten,ReLU,Dropout,UpSampling2D,Embedding,Concatenate,Conv2DTranspose
from keras.layers import Reshape,MaxPool2D
#from keras.layers import LeakyReLU
from keras.optimizers import Adam
from keras import backend as bk 
from keras.layers import Lambda

# Encoder
A variational autoencoder consists of two parts, the encoder and the decoder.<br>

The general flow of a VAE is as follows:<br>
$$
x -> p(z|x) -> z -> p(x|z) -> \hat{x},
$$
where x is the input data, z is the latent space, p(z|x) is the encoder, and p(x|z) is the decoder.<br>

In the current code, the encoder is a deep neural network that takes the input data and outputs the mean($\mu$) and the log of the variance($\sigma$) of the distribution of the latent space. In order to send a sample from this distribution to the decoder, we need to sample from the distribution. This is done using the reparameterization trick:<br>

$$
Z = \mu + \sigma \cdot \epsilon,
$$
where  
$$
\epsilon \sim N(0,1).
$$

This reprameterization trick is also used to make the model differentiable, which is necessary for training the model using backpropagation.<br>

In [None]:
def sample(inp):
  mu , var = inp
  epslion = bk.random_normal(shape=(bk.shape(mu)[0],bk.shape(mu)[1]))
  samp = mu + bk.exp(var/2)*epslion
  return samp

In [None]:
network_input = Input(shape=(28,28,1))

encoder = Conv2D(16, (4,4),strides=(2,2),activation='relu',kernel_initializer='he_uniform',  padding='same')(network_input)
encoder = Conv2D(32, (4,4),strides=(2,2),activation='relu',kernel_initializer='he_uniform',  padding='same')(encoder)
#encoder = Conv2D(64, (4, 4),activation='relu',kernel_initializer='he_uniform',  padding='same')(encoder)

encoder = Conv2D(64, (4,4),activation='relu',kernel_initializer='he_uniform',  padding='same')(encoder)

encoder = Flatten()(encoder)
encoder = Dense(32 , activation='relu')(encoder)

mu_e = Dense(2)(encoder) # 2 is the latent space dimension
var_e = Dense(2)(encoder)

z = Lambda(sample,output_shape=(2,))([mu_e,var_e])

encoder_network = Model(network_input,[mu_e,var_e,z])


In [None]:
encoder_network.summary()

# Decoder

In [None]:
decoder_inp = Input(shape=(2,))

decoder = Dense(7*7*128,activation='relu')(decoder_inp)
decoder = Reshape((7,7,128))(decoder)

decoder = Conv2DTranspose(64,(4,4),activation='relu',strides=(2,2),kernel_initializer='he_uniform',padding='same')(decoder)
#decoder =Conv2DTranspose(64,(4, 4),activation='relu',strides=(2,2),kernel_initializer='he_uniform',padding='same')(decoder)
decoder =Conv2DTranspose(32,(4,4),strides=(2,2),activation='relu',kernel_initializer='he_uniform',padding='same')(decoder)
decoder =Conv2DTranspose(16,(4,4),activation='relu',kernel_initializer='he_uniform',padding='same')(decoder)
decoder = Conv2DTranspose(1,(4,4),activation='relu',kernel_initializer='he_uniform',padding='same')(decoder)

decoder_network = Model(decoder_inp,decoder)

In [None]:
decoder_network.summary()

## End To End Network

In [None]:
encod = encoder_network(network_input)[2]
decod = decoder_network(encod)
vae = Model(network_input,decod)

In [None]:
vae.summary()

## Loss function

$$
Final loss = Reconstruction loss + Reularization term (KL divergence loss)
$$
Regularization term is the KL divergence loss between the distribution of the latent space and the standard normal distribution. This term is used to ensure that the distribution of the latent space is close to a standard normal distribution. This is done to ensure that the latent space is continuous and smooth, which is important for generating new data points.<br>
$$
D_{KL}(N(\mu, \sigma^2) || N(0,1))= -\frac{1}{2} \sum_{i=1}^{N} (1 + \log(\sigma_i^2) - \mu_i^2 - \sigma_i^2)
$$

For the reconstruction loss, we will use the mean squared error loss. This loss measures the difference between the $x$ and $\hat{x}$<br>

$$
Final loss = \frac{1}{N} \sum_{i=1}^{N} (x_i - \hat{x}_i)^2 + D_{KL}(N(\mu, \sigma^2) || N(0,1))
$$

In [None]:
def network_loss(mu,var):
  def normal_ae_loss(y_true,y_pred):
    return tf.reduce_mean(tf.keras.metrics.mse(y_true,y_pred))*28*28
  def kl_divergence(mu,var):
    return 0.5*tf.reduce_mean(-1-var+tf.square(mu)+tf.exp(var))
  def final_loss(y_true,y_pred):
    return bk.mean(normal_ae_loss(y_true, y_pred) + kl_divergence(mu,var))

  return final_loss

## Training

In [None]:
opt = Adam(lr = 0.0002,beta_1=0.5)
vae.compile(loss=network_loss(mu_e, var_e), optimizer=opt)

The followig code is used to generate images after every 10 epochs. This code can be used to observe the evolution of the images.

In [None]:
for j in range(6):
  for i in range(9):
    plt.subplot(331+i)
    output = vae.predict(np.array([x_test[i+500]]))
    op_image = np.reshape(output[0]*255, (28, 28))
    plt.imshow(op_image, cmap='gray')
    plt.title(j*10)
  plt.show()
  histoy = vae.fit(x_train, x_train, epochs=10, batch_size=64, validation_split = 0.2) # استفاده از این بخش برای ترسیم خروجی دیکودر در هر 10 ایپاک

The following code is used to plot the loss graph (from epoch 0 to epoch 50).


In [None]:
history = vae.fit(x_train, x_train, epochs=50, batch_size=64, validation_split = 0.2) #از این بخش برای ترسیم نمودار لاس استفاده می کنیم، از بخش قبلی برای رسم کردن تصاویر خروجی دیکودر در هر 10 ایپاک 

In [None]:
for i in range(9):
    plt.subplot(331+i)
    plt.imshow(x_test[500+i,:,:, -1], cmap='gray')
plt.show()

## Scatter plot

In [None]:
import pandas as pd
import seaborn as sns
def my_scatterplot(x_test,y_test):
  x = []
  y = []
  z = []
  for i in range(100*100):
    z.append(y_test[i]) #Real number coressponding to the input
    latent_space = encoder_network.predict(np.array([x_test[i]]))[2]
    x.append(latent_space[0][0])
    y.append(latent_space[0][1])
  datam = pd.DataFrame()
  datam['x'] = x
  datam['y'] = y
  zz = []
  for j in z:
    zz.append(str(j))
  datam['z'] = zz

  plt.figure(figsize=(8,8))
  sns.scatterplot(x='x', y='y', hue='z', data=datam)
  plt.show()
  return x,y

In [None]:
z1,z2 = my_scatterplot(x_test,y_test)

In [None]:
print(max(z1))
print(min(z1))
print("___________")
print(max(z2))
print(min(z2))

## output of VAE Network

In [None]:
x_values = np.linspace(-3, 3, 30)
y_values = np.linspace(-3, 3, 30)
finalimg = np.zeros((28 * 30, 28 * 30))
for ix in range(len(x_values)):
    for iy in range(len(y_values)):
        latent_point = np.array([[x_values[ix], y_values[iy]]])
        network_out = decoder_network.predict(latent_point)[0]
        finalimg[ix*28:(ix+1)*28, iy*28:(iy+1)*28,] = network_out[:,:,-1]
 
plt.figure(figsize=(15, 15))
plt.imshow(finalimg, cmap='gray', extent=[3,-3,3,-3])
plt.show()

## loss plot

In [None]:
def plot_loss(history):
  plt.plot(history.history['loss'],label='train_loss')
  plt.plot(history.history['val_loss'],label='val_loss')
  plt.xlabel('Epoch')
  plt.ylabel('Loss')
  plt.legend()
  plt.grid()

In [None]:
plot_loss(history)

# Conditional Variational Auto Encoder (CVAE)

This network is similar to the previous network, with the difference that the labels are also given as input to the network.

In [None]:
from keras.utils import to_categorical
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
y_train=y_train.astype('float32')
y_test=y_test.astype('float32')

## Encoder & End-To-End Netwrok

In [None]:
from keras.layers.merge import concatenate 
label_input = Input(shape=(10,))
li = Dense(28*28*1)(label_input)
li = Reshape((28,28,1))(li)

image_input = Input(shape=(28,28,1))

network_input = concatenate([image_input,li])

encoder = Conv2D(16, (4,4),strides=(2,2),activation='relu',kernel_initializer='he_uniform',  padding='same')(network_input)
encoder = Conv2D(32, (4,4),strides=(2,2),activation='relu',kernel_initializer='he_uniform',  padding='same')(encoder)
#encoder = Conv2D(64, (4, 4),activation='relu',kernel_initializer='he_uniform',  padding='same')(encoder)

encoder = Conv2D(64, (4,4),activation='relu',kernel_initializer='he_uniform',  padding='same')(encoder)

encoder = Flatten()(encoder)
encoder = Dense(32 , activation='relu')(encoder)

mu_e = Dense(2)(encoder) # 2 is the latent space dimension
var_e = Dense(2)(encoder)

z = Lambda(sample,output_shape=(2,))([mu_e,var_e])
decoder_input = concatenate([z, label_input])

decoder_h1 = Dense(7*7*128,activation='relu')
decoder_h2 = Reshape((7,7,128))

decoder_h3 = Conv2DTranspose(64,(4,4),activation='relu',strides=(2,2),kernel_initializer='he_uniform',padding='same')
#decoder =Conv2DTranspose(64,(4, 4),activation='relu',strides=(2,2),kernel_initializer='he_uniform',padding='same')(decoder)
decoder_h4 =Conv2DTranspose(32,(4,4),strides=(2,2),activation='relu',kernel_initializer='he_uniform',padding='same')
decoder_h5 =Conv2DTranspose(16,(4,4),activation='relu',kernel_initializer='he_uniform',padding='same')
decoder_h6  = Conv2DTranspose(1,(4,4),activation='relu',kernel_initializer='he_uniform',padding='same')

h = decoder_h1(decoder_input)
h = decoder_h2(h)
h = decoder_h3(h)
h = decoder_h4(h)
h = decoder_h5(h)
out = decoder_h6(h)


encoder_network = Model([image_input,label_input],z)
cvae = Model([image_input,label_input],out)


## Decoder Network

In [None]:
decoder_in = Input(shape=(12,))
d_1 = decoder_h1(decoder_in)
d_1 = decoder_h2(d_1)
d_1 = decoder_h3(d_1)
d_1 = decoder_h4(d_1)
d_1 = decoder_h5(d_1)
decoder_out = decoder_h6(d_1)
decoder_network = Model(decoder_in,decoder_out)

## Network Summary

In [None]:
decoder_network.summary()

In [None]:
encoder_network.summary()

In [None]:
cvae.summary()

## Training

In [None]:
opt = Adam(lr = 0.0002,beta_1=0.5)
cvae.compile(loss=network_loss(mu_e, var_e), optimizer=opt)

In [None]:
cvae_history = cvae.fit([x_train,y_train], x_train, epochs=50, batch_size=64, validation_split = 0.2)

In [None]:
plot_loss(cvae_history)

In [None]:
for j in range(6):
  output = cvae.predict([x_test, y_test])
  for i in range(9):
    plt.subplot(331+i)
    outt = output[i+500]
    op_image = np.reshape(outt*255, (28, 28))
    plt.imshow(op_image, cmap='gray')
    plt.title(j*10)
  plt.show()
  histoy = cvae.fit([x_train,y_train], x_train, epochs=10, batch_size=64, validation_split = 0.2) # استفاده از این بخش برای ترسیم خروجی دیکودر در هر 10 ایپاک

## Scatter plot

In [None]:
import pandas as pd
import seaborn as sns
latent_space = encoder_network.predict([x_test, y_test])  
latent_space = latent_space.reshape(x_test.shape[0], 2)
x = latent_space[:, 0]
y = latent_space[:, 1]
z_z = []
for i in range(100*100):
  z_z.append(y_test2[i]) #Real number coressponding to the input
zz = []
for j in z_z:
  zz.append(str(j))
datam = pd.DataFrame()
datam['x'] = x
datam['y'] = y
datam['z'] = zz
plt.figure(figsize=(8,8))
sns.scatterplot(x='x', y='y', hue='z', data=datam)
plt.show()

## Network Output

In [None]:
number = 4
x_values = np.linspace(-3, 3, 30)
y_values = np.linspace(-3, 3, 30)
finalimg = np.zeros((28 * 30, 28 * 30))
c = np.zeros(12)
c[2+number] =1 
for ix in range(len(x_values)):
  for iy in range(len(y_values)):
    latent_point = np.array([x_values[ix], y_values[iy]])
    for jj in range(len(latent_point)):
      c[jj] = latent_point[jj]
    network_out = decoder_network.predict(np.array([c]))[0]
    finalimg[ix*28:(ix+1)*28, iy*28:(iy+1)*28,] = network_out[:,:,-1]
plt.figure(figsize=(15, 15))
plt.imshow(finalimg, cmap='gray', extent=[3,-3,3,-3])
plt.show()