<a href="https://colab.research.google.com/github/srijayjk/Computer-Vision/blob/main/Ziess(VAE%2BGradCAM).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [57]:
!pip install -U -q kaggle
!mkdir -p ~/.kaggle

In [None]:
# Json file for Kaggle dataset
from google.colab import files
files.upload()

In [None]:
!cp kaggle.json ~/.kaggle/
!kaggle datasets download -d jessicali9530/celeba-dataset

In [75]:
#Unzip the dataset downloaded from kaggle
from zipfile import ZipFile
with ZipFile('celeba-dataset.zip', 'r') as zipObj:
   # Extract all the contents of zip file in the data directory
   zipObj.extractall('./data/')

In [None]:
# Import necessary libraries and Frameworks
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Input, Conv2D, Flatten, Dense, Conv2DTranspose, Reshape, Lambda, Activation, LeakyReLU
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import plot_model
import os
from glob import glob
import cv2 
import matplotlib.pyplot as plt


In [137]:
print(np.__version__)
print(tf.__version__)



1.19.5
2.5.0


In [138]:
# Display number of images
DATA_FOLDER = './data/img_align_celeba/'
filenames = np.array(glob(os.path.join(DATA_FOLDER, '*/*.jpg')))
NUM_IMAGES = len(filenames)
print("Total number of images : " + str(NUM_IMAGES))

Total number of images : 202599


In [None]:
images_in_row = 1
images_in_column = 1
base_directory = '/content/data/img_align_celeba/img_align_celeba'

paths_to_images = [os.path.join(base_directory, filename) 
                   for filename in os.listdir(base_directory)[:10]]

image = cv2.imread(paths_to_images[5])
current_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) / 255
height = image.shape[0]
width = image.shape[1]
channels = image.shape[2]
print("Size of Image:", height,'X', width)
plt.figure(figsize = (5, 5))
plt.axis('off')
plt.imshow(current_image)

In [140]:
INPUT_DIM = (128,128,3) # Image dimension
BATCH_SIZE = 256
Z_DIM = 200 # Dimension of the latent vector (z)

'''
Image is rescaled 
Data is loaded using generator function flow_from_directory
Class_mode is set as Input - which uses same input and output image
'''
data_flow = ImageDataGenerator(rescale=1./255).flow_from_directory(DATA_FOLDER, 
                                                                   target_size = INPUT_DIM[:2],
                                                                   batch_size = BATCH_SIZE,
                                                                   shuffle = True,
                                                                   class_mode = 'input',
                                                                   subset = 'training'
                                                                   )

Found 202599 images belonging to 1 classes.


In [141]:

'''
Build Encoder
With Input dimension, Output_dimension(Latent dimension)
Name the each Conv layers for further calling

build_vae_encoder

Input arguments

input_dim        : Dimesion of input image (Batch, width, height, channels)
output_dim       : Dimension of Output i.e Latent space
conv_filters     : Number of Conv filters. Expects elements in list []
conv_kernel_size : Size of Kernel. Expects elements in list []
conv_strides     : Strides in operation. Expects elements in list []

Output arguments

encoder_input           : Tensor with size (Batch, width, height, channels)
encoder_output          : (None, Z_dimension)
mean_mu                 : Mean 
log_var                 : Variance
shape_before_flattening : Shape before flatenning or shape of last convolution layer
Encoder Model           : Encoder model with input and all the intermediate layers
'''
def build_vae_encoder(input_dim, output_dim, conv_filters, conv_kernel_size, 
                  conv_strides):
  
  global K
  K.clear_session()
  
  # Number of Conv layers
  n_layers = len(conv_filters)

  # Define model input
  encoder_input = Input(shape = input_dim, name = 'encoder_input')
  x = encoder_input

  # Add convolutional layers
  for i in range(n_layers):
      x = Conv2D(filters = conv_filters[i], kernel_size = conv_kernel_size[i],
                  strides = conv_strides[i], padding = 'same', name = 'encoder_conv_' + str(i))(x)
      x = LeakyReLU()(x)

  # Required for reshaping latent vector while building Decoder
  # int_shape return tuple unlike tf.shape which returns tensor
  shape_before_flattening = K.int_shape(x)[1:] 
  x = Flatten()(x)

  # Mean and Variance
  mean_mu = Dense(output_dim, name = 'mu')(x)
  log_var = Dense(output_dim, name = 'log_var')(x)

  # Defining a function for sampling mean and variance
  def sampling(args):
    mean_mu, log_var = args
    epsilon = K.random_normal(shape=K.shape(mean_mu), mean=0., stddev=1.) 
    return mean_mu + K.exp(log_var/2)*epsilon   
  
  # Using a Keras Lambda Layer to include the sampling function as a layer 
  # in the model
  encoder_output = Lambda(sampling, name='encoder_output')([mean_mu, log_var])

  return encoder_input, encoder_output, mean_mu, log_var, shape_before_flattening, Model(encoder_input, encoder_output, name='Encoder'
                                                                                )

In [None]:
vae_encoder_input, vae_encoder_output,  mean_mu, log_var, vae_shape_before_flattening, vae_encoder  = build_vae_encoder(input_dim = INPUT_DIM,
                                    output_dim = Z_DIM, 
                                    conv_filters = [32, 64, 64, 64],
                                    conv_kernel_size = [3,3,3,3],
                                    conv_strides = [2,2,2,2])

vae_encoder.summary()

In [143]:

'''
build_decoder

Input arguments

input_dim        : Dimesion of Latent space. [(None, Z_dimension)]   
conv_filters     : Number of Conv filters. Expects elements in list []
conv_kernel_size : Size of Kernel. Expects elements in list []
conv_strides     : Strides in operation. Expects elements in list []

Output arguments

Decoder_input           : Tensor with size [(None, Z_dimension)]   
Decoder_output          : (Batch, width, height, channels)
Decoder Model           : Decoder model with input and all the intermediate layers
'''
# Decoder
def build_decoder(input_dim, shape_before_flattening, conv_filters, conv_kernel_size, 
                  conv_strides):

  # Number of Conv layers
  n_layers = len(conv_filters)

  # Define model input
  decoder_input = Input(shape = (input_dim,) , name = 'decoder_input')

  # To get an exact mirror image of the encoder
  x = Dense(np.prod(shape_before_flattening))(decoder_input)
  x = Reshape(shape_before_flattening)(x)

  # Add convolutional layers
  for i in range(n_layers):
      x = Conv2DTranspose(filters = conv_filters[i], 
                  kernel_size = conv_kernel_size[i],
                  strides = conv_strides[i], 
                  padding = 'same',
                  name = 'decoder_conv_' + str(i)
                  )(x)
      
      # Adding a sigmoid layer at the end to restrict the outputs 
      # between 0 and 1
      if i < n_layers - 1:
        x = LeakyReLU()(x)
      else:
        x = Activation('sigmoid')(x)

  # Define model output
  decoder_output = x

  return decoder_input, decoder_output, Model(decoder_input, decoder_output, name='Decoder')

In [None]:
vae_decoder_input, vae_decoder_output, vae_decoder = build_decoder(input_dim = Z_DIM,
                                        shape_before_flattening = vae_shape_before_flattening,
                                        conv_filters = [64,64,32,3],
                                        conv_kernel_size = [3,3,3,3],
                                        conv_strides = [2,2,2,2]
                                        )
vae_decoder.summary()

In [None]:
# The input to the model will be the image fed to the encoder.
vae_input = vae_encoder_input

# Output will be the output of the decoder. The term - decoder(encoder_output) 
# combines the model by passing the encoder output to the input of the decoder.
vae_output = vae_decoder(vae_encoder_output)

# Input to the combined model will be the input to the encoder.
# Output of the combined model will be the output of the decoder.
vae_model = Model(vae_input, vae_output)

vae_model.summary()

In [146]:
LEARNING_RATE = 0.0005
N_EPOCHS = 2

In [147]:
# Reconstruction Loss for Decoder
def r_loss(y_true, y_pred):
    return K.mean(K.square(y_true - y_pred), axis = [1,2,3])
    
# KL divergence loss for Encoder to force the unknown input distribution in Known Gaussian distribution
def kl_loss(y_true, y_pred):
    kl_loss =  -0.5 * K.sum(1 + log_var - K.square(mean_mu) - K.exp(log_var), axis = 1)
    return kl_loss

def total_loss(y_true, y_pred):
    return r_loss(y_true, y_pred) + kl_loss(y_true, y_pred)

In [148]:
adam_optimizer = Adam(learning_rate = LEARNING_RATE)
vae_model.compile(optimizer=adam_optimizer, loss = total_loss, metrics = [r_loss, kl_loss])


In [None]:
vae_model.fit(data_flow, 
                        shuffle=True, 
                        epochs = N_EPOCHS, 
                        initial_epoch = 0, 
                        steps_per_epoch=NUM_IMAGES / BATCH_SIZE)

Reconstrction

In [None]:
example_batch = next(data_flow)
example_batch = example_batch[0]
example_images = example_batch[:10]
img_array = example_images[0]
matplotlib.pyplot.imshow(img_array)
img_array = tf.keras.preprocessing.image.img_to_array(img_array)
img_array = np.expand_dims(img_array, axis=0)
res = vae_model.predict(img_array)

In [None]:
import matplotlib
res = res.reshape((128,128,3))
matplotlib.pyplot.imshow(res[1:])

#HeatMap

In [None]:
last_conv_layer_name = "encoder_conv_3"
encoder_out = "encoder_output"

In [None]:
def make_gradcam_heatmap(img_array, model, last_conv_layer_name, encoder_out, pred_index=None):
    grad_model = tf.keras.models.Model(
        [model.inputs], [model.get_layer(last_conv_layer_name).output, model.get_layer(encoder_out).output]
    )
    with tf.GradientTape() as tape:
        last_conv_layer_output, preds = grad_model(img_array)
    
    grads = tape.gradient(preds, last_conv_layer_output)

    # This is a vector where each entry is the mean intensity of the gradient
    # over a specific feature map channel
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    # We multiply each channel in the feature map array
    # by "how important this channel is" with regard to the top predicted class
    # then sum all the channels to obtain the heatmap class activation
    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = tf.matmul(last_conv_layer_output, pooled_grads[..., tf.newaxis])
    heatmap = tf.squeeze(heatmap)

    # For visualization purpose, we will also normalize the heatmap between 0 & 1
    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap, grads, pooled_grads, preds, last_conv_layer_output

In [None]:
heatmap, grads, pooled_grads, preds, last_conv_layer_output = make_gradcam_heatmap(img_array, vae_model, last_conv_layer_name, encoder_out)

In [None]:
print(preds)
print(last_conv_layer_output)
print(grads)
print(pooled_grads)

In [None]:
def save_and_display_gradcam(img, heatmap,cam_path="superimposed_img.jpg", alpha=0.4):
    heatmap = np.uint8(255 * heatmap)
    jet = cm.get_cmap("jet")
    jet_colors = jet(np.arange(256))[:, :3]
    jet_heatmap = jet_colors[heatmap]
    jet_heatmap = keras.preprocessing.image.array_to_img(jet_heatmap)
    jet_heatmap = jet_heatmap.resize((img.shape[1], img.shape[0]))
    jet_heatmap = keras.preprocessing.image.img_to_array(jet_heatmap)
    superimposed_img = cv2.addWeighted(jet_heatmap, 0.005, img, 0.995, 0)
    superimposed_img = keras.preprocessing.image.array_to_img(superimposed_img)
    superimposed_img.save(cam_path)

save_and_display_gradcam(img_array.squeeze(), heatmap)