In [None]:
import numpy as np
import matplotlib.pyplot as plt # Corrected import
import os
import pickle
from tensorflow.keras.layers import Input, Conv2D, Flatten, Dense, Conv2DTranspose, Reshape
from tensorflow.keras.layers import Lambda, Activation, BatchNormalization, LeakyReLU, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler



#Note : This data set is based on HUMAN Faces
class VariationalAutoEncoder():
  def __init__(self, input_dim, encoder_conv_filters, encoder_conv_kernel_size, encoder_conv_strides,
               decoder_conv_t_filters, decoder_conv_t_kernel_size, decoder_conv_t_strides,
               z_dim, use_dropout=False): # Corrected use_batch_norm to use_dropout based on usage
    self.name = 'variational_autoencoder'

    self.input_dim = input_dim # size of the input img
    self.encoder_conv_filters = encoder_conv_filters # encoder conv layers depth
    self.encoder_conv_kernel_size = encoder_conv_kernel_size # encoder conv kernel size
    self.encoder_conv_strides = encoder_conv_strides # encoder conv strides
    self.decoder_conv_t_filters = decoder_conv_t_filters # decoder conv transpose layers depth
    self.decoder_conv_t_kernel_size = decoder_conv_t_kernel_size # decoder conv kernel size
    self.decoder_conv_t_strides = decoder_conv_t_strides # decoder conv strides
    self.z_dim = z_dim # dimension of latent space
    self.use_dropout = use_dropout # use dropouts or not

    self.n_layers_encoder = len(encoder_conv_filters) # nor of encoder conv layers
    self.n_layers_decoder = len(decoder_conv_t_filters) # nor of decoder conv transpose layers

    self._build()


    # BUILD THE FULL VAE MODEL
  def _build(self): # Corrected indentation of _build

    # ENCODER

    # A model that takes an input img and encodes it into the 2D latent space,
    # by sampling a point from the normal distribution defined by mu and log_var.

    encoder_input = Input(shape=self.input_dim, name='encoder_input')
    x = encoder_input

    for i in range(self.n_layers_encoder):
      conv_layer = Conv2D(filters=self.encoder_conv_filters[i],
                          kernel_size=self.encoder_conv_kernel_size[i],
                          strides=self.encoder_conv_strides[i],
                          padding='same',name='encoder_conv_' + str(i))
      x = conv_layer(x)
      x = BatchNormalization()(x)
      x = LeakyReLU()(x)
      if self.use_dropout:
        x = Dropout(rate=0.25)(x)

    shape_before_flattening = K.int_shape(x)[1:]
    x = Flatten()(x)
    self.mu = Dense(self.z_dim, name='mu')(x)
    self.log_var = Dense(self.z_dim, name='log_var')(x)
    # We choose to map to the logarithm of the variance, as this can take any real
    # number in the range (-inf, inf) matching the natural output range from a
    # NN unit, whereas variance values are always +ve

    self.encoder_mu_log_var = Model(encoder_input, (self.mu, self.log_var))

    # Now, since we are sampling a random point from an area around mu, the decoder
    # must ensure that all pts in the same neighborhodd produce very similar imgs when
    # decoded, so that the reconstruction loss remians small.

  def sampling(self, args): # Corrected indentation of sampling
    mu, log_var = args
    epsilon = K.random_normal(shape=K.shape(self.mu), mean=0., stddev=1.)
    return mu + K.exp(log_var / 2) * epsilon

    # Latent space
    encoder_output = Lambda(self.sampling, name='encoder_output')([self.mu, self.log_var]) # Added self. to sampling
    self.encoder = Model(encoder_input, encoder_output)

    # DECODER

    # A model that takes a pt in the latent sapce and decodes it into the original img domain

    decoder_input = Input(shape=(self.z_dim,), name='decoder_input')
    x = Dense(np.prod(shape_before_flattening))(decoder_input)
    x = Reshape(shape_before_flattening)(x)

    for i in range(self.n_layers_decoder):
      conv_t_layer = Conv2DTranspose(filters=self.decoder_conv_t_filters[i],
                                     kernel_size=self.decoder_conv_t_kernel_size[i],
                                     strides=self.decoder_conv_t_strides[i],
                                     padding = 'same', name = 'decoder_conv_t_' + str(i))
      x = conv_t_layer(x)
      if i < self.n_layers_decoder - 1: # condition for not having an-leakyrelu-dropout at last layer
        x = BatchNormalization()(x)
        x = LeakyReLU()(x)
        if self.use_dropout:
          x = Dropout(rate=0.25)(x)
      else:
        x = Activation('sigmoid')(x)

    decoder_output = x # Corrected indentation of decoder_output
    self.decoder = Model(decoder_input, decoder_output) # Corrected indentation of self.decoder

    ### THE FULL VAE
    model_input = encoder_input # Corrected model_input variable name
    model_output = self.decoder(encoder_output) # Corrected indentation of model_output

    self.model = Model(model_input, model_output) # Corrected indentation of self.model

    ## DEFINE THE LOSS FUNCN AND OPITMIZER
  def compile(self, learning_rate, reco_loss_factor): # Corrected indentation of compile
    self.learning_rate = learning_rate
    self.reco_loss_factor = reco_loss_factor # Stored reco_loss_factor

    # Binary cross-entropy places havier penalties on predictions at the extremes
    # that are badly wrong so it tends to push pixel predictions to the middle of the
    # range.This results in less vibrant imgs. for this reason, we use RMSE as the LOSS FUNCN

    def vae_r_loss(y_true, y_pred):
      r_loss = K.mean(K.square(y_true - y_pred), axis = [1,2,3])
      return self.reco_loss_factor * r_loss # reco_loss_factor ensures balance with the KL divergence loss


    # KL divergence term finializes the n/w for encoding observations to mu
    # and log_var varibles that differ significantly from the parameters of a
    # standard normal distribution, namely mu = 0 and log_var = 0

    def kl_loss(y_true, y_pred):
      #kl_loss = -0.5 * K.sum(1 + self.log_var - K.square(self.mu) - K.exp(self.log_var), axis = 1)
      # use the mu and log_var that are passed as arg to this funcn

      mu, log_var = self.encoder_mu_log_var(y_true) # get the mu and log_var from the encoder model
      kl_loss = -0.5 * K.sum(1 + log_var - K.square(mu) - K.exp(log_var), axis = 1) # Use the mu, log_var from the encoder model
      return kl_loss

    def vae_loss(y_true, y_pred):
      r_loss = vae_r_loss(y_true, y_pred)
      kl_loss = kl_loss(y_true, y_pred)
      return r_loss + kl_loss

    optimizer = Adam(lr=self.learning_rate) # Corrected indentation of optimizer
    self.model.compile(optimizer=optimizer, loss=vae_loss, metrics=[vae_r_loss, kl_loss]) # Corrected indentation of self.model.compile


# Mount google Drive
from google.colab import drive
drive.mount('/content/drive')

# os.mkdri('data/') # Commented out as mkdir is done by cp
!ls

# Corrected cp command - **PLEASE REPLACE 'drive/My Drive/your_data_folder/' with the actual path to your data folder**
!cp -r 'drive/My Drive/your_data_folder/' .


from glob import glob

DATA_FOLDER = 'data/celeba' # Corrected data folder path
IMAGE_FOLDER = 'data/celeba/celeba_dataset' # Corrected image folder path
INPUT_DIM = (128, 128, 3)
BATCH_SIZE = 32
filenames = np.array(glob(os.path.join(IMAGE_FOLDER, '*.jpg')))
NUM_IMAGES = len(filenames)
LEARNING_RATE = 0.0005 # Corrected variable name
R_LOSS_FACTOR = 10000
EPOCHS = 10
print(f"Found {NUM_IMAGES} images.") # Corrected print statement
if NUM_IMAGES == 0: # Corrected indentation and error message
  raise Exception("No IMGS found, check the path.")


# IMPORT Libraries
from tensorflow.keras.preprocessing.image import ImageDataGenerator # Corrected indentation

data_gen = ImageDataGenerator(rescale=1./255)
data_flow = data_gen.flow_from_directory(DATA_FOLDER, target_size = INPUT_DIM[:2], # Corrected target_size
                                         batch_size = BATCH_SIZE, shuffle = True,
                                         class_mode = 'input')

vae = VariationalAutoEncoder(input_dim = INPUT_DIM, # Corrected class name
                             encoder_conv_filters = [32, 64, 64, 64],
                             encoder_conv_kernel_size = [3, 3, 3, 3],
                             encoder_conv_strides = [2, 2, 2, 2],
                             decoder_conv_t_filters = [64, 64, 32, 3],
                             decoder_conv_t_kernel_size = [3, 3, 3, 3],
                             decoder_conv_t_strides = [2, 2, 2, 2],
                             z_dim = 200,
                             use_dropout = True)

vae.encoder.summary()

vae.decoder.summary()

vae.compile(LEARNING_RATE, R_LOSS_FACTOR)

checkpoint = ModelCheckpoint('weights_vae.weights.h5', save_weights_only=True) # Added checkpoint definitation
def lr_scheduler(epoch):
  if epoch < 5:
    return 0.0005
  else :
    return 0.0005 * np.exp(0.1 *(5-epoch)) # Adjusted the lr schedule


vae.model.fit(data_flow, shuffle = True, epochs = EPOCHS,
                      steps_per_epoch = NUM_IMAGES // BATCH_SIZE, # Corrected variable name
                      callbacks = [checkpoint, LearningRateScheduler(lr_scheduler)])

# load the model after 50 epochs
# vae.load_weights('weights_vae.weights.h5') # Commented out loading weights after 10 epochs

vae.model.fit(data_flow, shuffle = True, epochs = 50,
                      steps_per_epoch = NUM_IMAGES // BATCH_SIZE, # Corrected variable name
                      callbacks = [checkpoint, LearningRateScheduler(lr_scheduler)])

vae.model.load_weights('weights_vae.weights.h5')

n_to_show = 30
znew = np.random.randn(n_to_show, vae.z_dim)
reconst = vae.decoder.predict(znew)

fig = plt.figure(figsize=(18, 5))
fig.subplots_adjust(hspace=0.4, wspace=0.4)
for i in range(n_to_show):
  ax = fig.add_subplot(3, 10, i+1)
  ax.imshow(reconst[i, :, :, :])
  ax.axis('off')
plt.show() # Corrected indentation of plt.show()




Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
drive  sample_data  virtual-zoom-background-1500-x-1000-1dnfprs7qdlurwup.jpg
cp: cannot stat 'drive/My Drive/your_data_folder/': No such file or directory
Found 0 images.


Exception: No IMGS found, check the path.

# Fashion MNIST Data

In [15]:
# https://keras.io/api/datasets/fashioni_mnist/

import numpy as np
import tensorflow as tf
import keras
from keras import layers
import matplotlib.pyplot as plt


# Load and preprocess fashion MINIST dataset
(x_train, _), (x_test, _) = keras.datasets.fashion_mnist.load_data()
fashion_mnist = np.concatenate([x_train, x_test], axis=0)
fashion_mnist = np.expand_dims(fashion_mnist, axis=-1).astype("float32") / 255


# Define the VAE MODEL

class VAE(keras.Model):
  def __init__(self, encoder, decoder):
    super().__init__()
    self.encoder = encoder
    self.decoder = decoder
    self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
    self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
    self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

  @property
  def metrics(self):
    return [
        self.total_loss_tracker,
        self.reconstruction_loss_tracker,
        self.kl_loss_tracker,
    ]

  def train_step(self, data):
    with tf.GradientTape() as tape:
      mean, log_var, z = self.encoder(data)
      reconstruction = self.decoder(z)
      reconstruction_loss = tf.reduce_mean(
          tf.reduce_sum(keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2))
      )
      kl_loss = -0.5 * (1 + log_var - tf.square(mean) - tf.exp(log_var))
      kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
      total_loss = reconstruction_loss + kl_loss

    grads = tape.gradient(total_loss, self.trainable_weights)
    self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
    self.total_loss_tracker.update_state(total_loss)
    self.reconstruction_loss_tracker.update_state(reconstruction_loss)
    self.kl_loss_tracker.update_state(kl_loss)
    return {
        "loss": self.total_loss_tracker.result(),
        "reconstruction_loss": self.reconstruction_loss_tracker.result(),
        "kl_loss": self.kl_loss_tracker.result(),
    }


# Sampling layer to introduce stochasticity in latent space representation
# we are generating one random noise vector per sample in the bacth

class Sampling(layers.Layer):
  """Sampling layer applies reparameterization trick to enable backpropagation."""
  def call(self, inputs):
    mean, log_var = inputs
    batch = tf.shape(mean)[0]
    dim = tf.shape(mean)[1]
    epsilon = tf.keras.backend.random_normal(shape=(batch, dim))  # mean and epsilon must have the same shape so that element-wise operations work correctly
    return mean + tf.exp(0.5 * log_var) * epsilon


# Define latent_dim
latent_dim = 2


# Define Encoder

encoder_inputs = keras.Input(shape=(28, 28, 1))
x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(encoder_inputs)
x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Flatten()(x)
x = layers.Dense(16, activation="relu")(x)
mean = layers.Dense(latent_dim, name="mean")(x)
log_var = layers.Dense(latent_dim, name="log_var")(x)
z = Sampling()([mean, log_var])
encoder = keras.Model(encoder_inputs, [mean, log_var, z], name="encoder")
encoder.summary()


# Define Decoder

latent_inputs = keras.Input(shape=(latent_dim,))
x = layers.Dense(7 * 7 * 64, activation="relu")(latent_inputs)
x = layers.Reshape((7, 7, 64))(x)
x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)
decoder_outputs = layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")(x)
decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()

# Instantiate and train the VAE model

vae = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adam())
vae.fit(fashion_mnist, epochs=10, batch_size=128)

# Function to visualize the latent sapce
def plot_latent_space(vae, n=10, figsize=15):
  img_size = 28
  scale = 0.5
  figure = np.zeros((img_size * n, img_size * n))
  grid_x = np.linspace(-scale, scale, n)
  grid_y = np.linspace(-scale, scale, n)[::-1]

  for i, yi in enumerate(grid_y):
    for j, xi in enumerate(grid_x):
      x_decoded = vae.decoder.predict(np.array([[xi, yi]]))
      digit = x_decoded[0].reshape(img_size, img_size)
      figure[
        i * img_size : (i + 1) * img_size,
        j * img_size : (j + 1) * img_size,
      ] = digit

  plt.figure(figsize=(figsize, figsize))
  start_range = img_size // 2
  end_range = n * img_size + start_range
  pixel_range = np.arrange(start_range, end_range, img_size)
  sample_range_x = np.round(grid_x, 1)
  sample_range_y = np.round(grid_y, 1)
  plt.xticks(pixel_range, sample_range_x)
  plt.yticks(pixel_range, sample_range_y)
  plt.xlabel("z[0]")
  plt.ylabel("z[1]")
  plt.imshow(figure, camp="Greys_r")
  plt.show()

  plot_latent_space(vae)


  #NOTE: This kind of model is used for the CHILD TO OLD AGE FACE

Epoch 1/10
[1m547/547[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m95s[0m 167ms/step - kl_loss: 3.0986 - loss: 400.2387 - reconstruction_loss: 397.1401
Epoch 2/10
[1m547/547[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m90s[0m 165ms/step - kl_loss: 5.5776 - loss: 331.2611 - reconstruction_loss: 325.6836
Epoch 3/10
[1m547/547[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m142s[0m 165ms/step - kl_loss: 5.2892 - loss: 313.8908 - reconstruction_loss: 308.6015
Epoch 4/10
[1m547/547[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m144s[0m 168ms/step - kl_loss: 4.8979 - loss: 308.1933 - reconstruction_loss: 303.2953
Epoch 5/10
[1m547/547[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m90s[0m 165ms/step - kl_loss: 4.7790 - loss: 304.1121 - reconstruction_loss: 299.3330
Epoch 6/10
[1m547/547[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m144s[0m 168ms/step - kl_loss: 4.7444 - loss: 303.5197 - reconstruction_loss: 298.7753
Epoch 7/10
[1m547/547[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m