<a href="https://colab.research.google.com/github/milindsoorya/colab-notebooks/blob/main/VariationalAutoEncoders.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Using Fashion MNIST dataset. Train VAR to generate pictures of clothing

In [None]:
import tensorflow as tf
import os
import random
import numpy as np

In [None]:
SEED = 12345
os.environ['PYTHONHASHSEED']=str(SEED)
os.environ['TF_CUDNN_DETERMINISTIC']= '1'
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

## Exploring FashionMNIST dataset

In [None]:
from tensorflow.keras.datasets import fashion_mnist

(x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()

In [None]:
x_train.shape, y_train.shape, x_test.shape, y_test.shape

In [None]:
# Visualising the data
import numpy as np
from matplotlib import pyplot as plt

plt.figure(figsize=(9, 9))

# Choose 9 samples out of 60k available train set
randSamples = np.random.choice(60000, 9)

for i in range(9):
  plt.subplot(3, 3, i+1)
  plt.imshow(x_train[randSamples[i]], cmap='Greys_r')
  plt.axis('off')

plt.show()

In [None]:
# As we are not bothered with classification, we can combine the train and test data.
dataset = np.concatenate([x_train, x_test], axis=0)
# Add extra dimension as the convolution layer expects 3 channels, 28x28 --> 28x28x1
# Also normalising the value to [0, 1]
dataset = np.expand_dims(dataset, -1).astype("float32") / 255

In [None]:
# Custom sampling layer
from tensorflow import keras
from tensorflow.keras import layers

# Create a sampling layer
class SamplingLayer(layers.Layer):
  '''Reparameterization Trick z - mu + sigma * epsilon'''

  def call(self, inputs):
    zMean, zLogVar = inputs
    batch = tf.shape(zMean)[0]
    dim = tf.shape(zMean)[1]
    epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
    return zMean + tf.exp(0.5 * zLogVar) * epsilon

## Encoder / Decoder

In [None]:
def buildEncoder(latentDim, encoderInputs):

  # Given a block of images the convolutional block extract the features
  l1 = keras.models.Sequential([
    layers.Conv2D(128, 3, activation="relu", strides=2, padding="same"),
    layers.Conv2D(64, 3, activation="relu", strides=2, padding="same"),
    layers.Flatten(),
    layers.Dense(256, activation="relu")
  ])

  # Pass the inputs through the convolutional block
  x = l1(encoderInputs)

  # A dedicated layer to learn mean in parallel
  zMean = layers.Dense(latentDim, name="z_mean")(x)

  # S dedicated layer to learn variance in parallel
  zLogVar = layers.Dense(latentDim, name='z_log_var')(x)

  # Now the reparameterization trick to find z as defined by mean and variance
  z = SamplingLayer()([zMean, zLogVar])

  # The actual model which takes the imahes as input and returns mean, variance and distribution
  # The zMean and zLogVar are used in the KL divergence loss
  return keras.Model(encoderInputs, [zMean, zLogVar, z], name="encoder")

# Trigger the function to actually build the model
encoderInputs = keras.Input(shape=(28, 28, 1))
encoder = buildEncoder(2, encoderInputs)
encoder.summary()


### Implementing the decoder

In [None]:
def buildDecoder(latentInputs):

    l1 = keras.models.Sequential([
      layers.Dense(7*7*64, activation="relu", input_shape=(latentInputs.shape[1],)),
      layers.Reshape((7, 7, 64)),
      layers.Conv2DTranspose(128, 3, activation="relu", strides=2, padding="same"),
      layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same"),
      layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")
    ])
    
    return keras.Model(latentInputs, l1(latentInputs), name="decoder")

# Build the model
latentInputs = keras.Input(shape=(2,))
decoder = buildDecoder(latentInputs)
decoder.summary()

In [None]:
keras.utils.plot_model(encoder, "encoder.png", show_shapes=True)

In [None]:
keras.utils.plot_model(decoder, "decoder.png", show_shapes=True)

## Loss Functions

overall goal is to make the decoder generate image as close to the image fed into the auto encoder. 

**Reconstruction loss** 
- It penalises the images that are not similar to the original.

- It works by comparing the binary cross entropy.

**kl Divergence loss** 
- It is looking at the distance between the probability distribution from the generated and the original image. The aim is the make the distance as close as posssible

In [None]:
def reconstructionLoss(data, reconstructed):
  return tf.reduce_mean(
      tf.reduce_sum(
          keras.losses.binary_crossentropy(data, reconstructed),
          axis=(1, 2)
      )
  )

In [None]:
def klDivergenceLoss(zMean, zLogVar):
  return tf.reduce_mean(
      tf.reduce_sum(
         -0.5 * (1 + zLogVar -tf.square(zMean) - tf.exp(zLogVar)),
          axis=1
      )
  )

Now combine these into the total loss function, which just weights them and sums them up. Think pf that weight as another hyperparameter you can tune.

In [None]:
def calcTotalLoss(data, reconstructed, zMean, zLogVar):
  loss1 = reconstructionLoss(data, reconstructed)
  loss2 = klDivergenceLoss(zMean, zLogVar)
  klweight = 3.0
  return loss1, loss2, loss1 + klweight * loss2

## Overriding train_step

In [None]:
class VAE(keras.Model):
  def __init__(self, encoder, decoder, **kwargs):
      super(VAE, self).__init__(**kwargs)
      self.encoder = encoder
      self.decoder = decoder
      # Register total loss as an observable metric in the model training history
      self.totalLossTracker = keras.metrics.Mean(name="total_loss")
      self.ceLossTracker = keras.metrics.Mean(name="ce_loss")
      self.klLossTracker = keras.metrics.Mean(name="kl_loss")

  # These are all observable metrics
  @property
  def metrics(self):
    return [
            self.totalLossTracker,
            self.ceLossTracker,
            self.klLossTracker
    ]

  # Now calculate the loss, gradients and update the weights
  def train_step(self, data):
    # Gradient tape is a recording of all gradients for the trainable 
    # weights that need to be updated
    with tf.GradientTape() as tape:
      # forwards path
      zMean, zLogVar, z = self.encoder(data)
      reconstruction = self.decoder(z)
      ceLoss, klLoss, totalLoss = calcTotalLoss(data, reconstruction, zMean, zLogVar)
    # Backward path
    grads = tape.gradient(totalLoss, self.trainable_weights)
    self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

    # keep track of loss
    self.totalLossTracker.update_state(totalLoss)
    self.ceLossTracker.update_state(ceLoss)
    self.klLossTracker.update_state(klLoss)

    # Return the loss for history object
    return {
        "total_loss": self.totalLossTracker.result(),
        "ce_loss": self.ceLossTracker.result(),
        "kl_loss": self.klLossTracker.result()
    } 

### Train the VAE!

In [None]:
vae = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adam(learning_rate=0.001))
history = vae.fit(dataset, batch_size=128, epochs=32)

Epoch 1/32
Epoch 2/32
Epoch 3/32
Epoch 4/32
Epoch 5/32
Epoch 6/32
Epoch 7/32
Epoch 8/32
Epoch 9/32
Epoch 10/32

In [None]:
#First you access the learnt weights of the encoder and decoder from the VAE model and save them
vae.get_layer('encoder').save_weights('encoder_weights.h5')
vae.get_layer('decoder').save_weights('decoder_weights.h5')


#Since both encoder and decoder are treated as models, you also need to save their architecture defined via instantiated VAE model
vae.get_layer('encoder').save('encoder_arch') #If you are using Google Colab they are saved in a folders: decoder_arch
vae.get_layer('decoder').save('decoder_arch') #and encoder_arch.

In [None]:
######
#TIP#
#####

#If you are using Google Colab you can download the folders (such as decoder_arch and encoder_arch) by first zipping them and then 
#using the Google Colab functionality to download them (or do it manually ;))

from google.colab import files

!zip -r /content/decoder_arch.zip /content/decoder_arch
!zip -r /content/encoder_arch.zip /content/encoder_arch

from google.colab import files
files.download('/content/encoder_arch.zip')
files.download('/content/decoder_arch.zip')

In [None]:
#Lets load the model in new VAE and also the corresponding weights. Upload the folders and .hd5 files into Colab

encoder_new = keras.models.load_model('encoder_arch') #Loading the encoder model
decoder_new = keras.models.load_model('decoder_arch') #Loading the decoder model

vae_new = VAE(encoder_new, decoder_new) #You need to have VAE class defined for this to works
vae_new.get_layer('encoder').load_weights('encoder_weights.h5') #On a given encoder model defined by vae_new we want to load the weights
vae_new.get_layer('decoder').load_weights('decoder_weights.h5') #for encoder and decoder
vae_new.compile(optimizer=keras.optimizers.Adam(learning_rate=0.001)) #Now we need to compile the model and we are ready to go!

In [None]:
plt.figure(figsize=(10, 9))
plt.plot(history.history.get('total_loss'), label="total loss")
plt.plot(history.history.get('ce_loss'), label="reconstruction loss")
plt.plot(history.history.get('kl_loss'), label="KL loss")
plt.legend()

In [None]:
plt.figure(figsize=(10, 9))
plt.plot(history.history.get('kl_loss'), label="KL loss")
plt.legend()

## Explore the results

In [None]:
synth = vae.decoder.predict([[1, 2]])
plt.imshow(synth[0].reshape((28, 28)), cmap="Greys_r")
plt.axis('off')

In [None]:
z = np.random.normal(loc=0, scale=4, size=(256, 2))
synth = vae.decoder.predict(z)

plt.figure(figsize=(28, 28))

for i in range(256):
  plt.subplot(16, 16, i+1)
  plt.imshow(synth[i].reshape((28, 28)), cmap="Greys_r")
  plt.axis('off')

plt.show()

## Drawing a specific kind of clothing

In [None]:
idx = 1200
batch = np.expand_dims(x_train[idx], axis=0)
batchOfImages = np.expand_dims(batch, axis=-1).astype("float32") / 255
print(batchOfImages.shape)

# Obtain z(mu, sigma) for the given image
_, _, z = vae.encoder.predict(batchOfImages)

# Now reconstruct a similar image
synth = vae.decoder.predict([z])

z

In [None]:
plt.figure(figsize=(28, 28))

# Original image
plt.subplot(1, 2, 1)
plt.axis('off')
plt.imshow(x_train[idx], cmap="Greys_r")

# Reconstructed
plt.subplot(1, 2, 2)
plt.axis('off')
plt.imshow(synth[0].reshape((28, 28)), cmap="Greys_r")
plt.show()

In [None]:
# VAE can be used for clusturing for large text or unlabelled images corpus
labels = np.concatenate([y_train, y_test], axis=0)
meu, _, _ = vae.encoder.predict(dataset)
plt.figure(figsize=(12, 10))
plt.scatter(meu[:, 0], meu[:, 1], c=labels)
plt.colorbar()
plt.xlabel("meu[0]")
plt.ylabel("meu[1]")
plt.show()

# Methods to save VAE Model

In [None]:
'''
# https://www.reddit.com/r/learnmachinelearning/comments/t4dbmb/how_to_save_vae_model_made_by_keras/
#First you access the learnt weights of the encoder and decoder from the VAE model and save them
your_vae_model.get_layer('encoder').save_weights('encoder_weights.h5')
your_vae_model.get_layer('decoder').save_weights('decoder_weights.h5')


#Since both encoder and decoder are treated as models, you also need to save their architecture defined via instantiated VAE model
your_vae_model.get_layer('encoder').save('encoder_arch') #If you are using Google Colab they are saved in a folders: decoder_arch
your_vae_model.get_layer('decoder').save('decoder_arch') #and encoder_arch.


######
#TIP#
#####

#If you are using Google Colab you can download the folders (such as decoder_arch and encoder_arch) by first zipping them and then 
#using the Google Colab functionality to download them (or do it manually ;))

from google.colab import files

!zip -r /content/decoder_arch.zip /content/decoder_arch
!zip -r /content/encoder_arch.zip /content/encoder_arch

from google.colab import files
files.download('/content/encoder_arch.zip')
files.download('/content/decoder_arch.zip')

#Lets load the model in new VAE and also the corresponding weights. Upload the folders and .hd5 files into Colab

encoder_new = keras.models.load_model('encoder_arch') #Loading the encoder model
decoder_new = keras.models.load_model('decoder_arch') #Loading the decoder model

vae_new = VAE(encoder_new, decoder_new) #You need to have VAE class defined for this to works
vae_new.get_layer('encoder').load_weights('encoder_weights.h5') #On a given encoder model defined by vae_new we want to load the weights
vae_new.get_layer('decoder').load_weights('decoder_weights.h5') #for encoder and decoder
vae_new.compile(optimizer=keras.optimizers.Adam()) #Now we need to compile the model and we are ready to go!

'''

In [None]:
'''
# loading dependency
import joblib

# saving our model # model - model , filename-model_jlib
joblib.dump(history.history, 'model_jlib')

# opening the file- model_jlib
vae = joblib.load('model_jlib')


# loading library
import pickle

# create an iterator object with write permission - model.pkl
with open('model_pkl', 'wb') as files:
    pickle.dump(vae, files)

# load saved model
with open('model_pkl' , 'rb') as f:
    vae = pickle.load(f)
'''