In [0]:
import tensorflow as tf

In [4]:
(x_train, _), (x_test, _) = tf.keras.datasets.mnist.load_data()
x_train.shape

(60000, 28, 28)

In [0]:
n_images = 60000
flatten_dim = 28*28

In [8]:
x_train = x_train.reshape(n_images, flatten_dim).astype('float32')
x_train.shape

(60000, 784)

Making Dataset From Tensor

In [0]:
#tf.data.Dataset
train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
#64*16 = 1024, 16 batches on 1024 samples
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

Model Building

In [0]:
from tensorflow.keras.layers import Layer, Dense

In [0]:
class Sampling(Layer):
  def __init__(self, name='sampling', **kwargs):
    super(Sampling,self).__init__(name='sampling', **kwargs)

  def call(self, inputs):
    z_mean,z_log_var = inputs

    intermediate_dim = tf.shape(z_mean)[0]
    dims = tf.shape(z_mean)[1]
    #eps finally generates a normal distribution
    eps = tf.keras.backend.random_normal(shape=(intermediate_dim, dims))

    # Since it is Log, exponential is calculated
    res = tf.exp(0.5 * z_log_var) * eps + z_mean # eps*sig+mean // SD = (var)

    return res

In [0]:

class Encoder(Layer):
  # latent:z
  def __init__(self, latent_dim=32, intermediate_dim=64, name='encoder', **kwargs):
    #super call is done to initialize all the contents of the Layer in Python
    super(Encoder,self).__init__(name=name, **kwargs)

    # the 784 dimension vector is projected into 64 dimension
    self.dense_proj = Dense(intermediate_dim,activation='relu')

    #Dense Layer of Mean
    self.dense_mean = Dense(latent_dim)
    #Dense Layer of Variance
    self.dense_log_var = Dense(latent_dim)

    self.sampling = Sampling()

    #Z = ?
  
  def call(self,inputs):
    x = self.dense_proj(inputs)
    
    z_mean = self.dense_mean(x)
    z_log_var = self.dense_log_var(x)

    z = self.sampling((z_mean, z_log_var))

    return z_mean, z_log_var, z

In [0]:
class Decoder(Layer):
  def __init__(self, flatten_dim, intermediate_dim=64, name='decoder', **kwargs):
    super(Decoder, self).__init__(name=name, **kwargs)
    self.dense_proj = Dense(intermediate_dim, activation='relu')

    self.dense_output = Dense(flatten_dim, activation='sigmoid')

  def call(self, inputs):
    x = self.dense_proj(inputs)
    x = self.dense_output(x)
    return x


In [0]:
from tensorflow.keras.models import Model

In [0]:
#Model
class VAE(Model):
  def __init__(self,flatten_dim, intermediate_dim=64, latent_dim=32, name='autoencoder', **kwargs):
    super(VAE,self).__init__(name=name, **kwargs)

    self.flatten_dim = flatten_dim

    self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
    self.decoder = Decoder(flatten_dim=flatten_dim, intermediate_dim=intermediate_dim)

  def call(self, inputs):
    z_mean, z_log_var, z = self.encoder(inputs)
    resconstructed = self.decoder(z)

    kl_loss = -0.5 * tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)

    self.add_loss(kl_loss)

    return resconstructed


In [74]:
vae = VAE(flatten_dim,64,32)
vae

<__main__.VAE at 0x7f83ea6092b0>

In [0]:
mse_loss_fn = tf.keras.losses.MeanSquaredError()
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

loss_metric = tf.keras.metrics.Mean()

In [87]:
epochs = 10
for epoch in range(epochs):
  print(f'Epoch {epoch}: ')

  for step, x_batch_train in enumerate(train_dataset):
    with tf.GradientTape() as tape:
      reconstructed = vae(x_batch_train)

      loss = mse_loss_fn(x_batch_train, reconstructed)

      loss += sum(vae.losses) #KL Divergence loss also added now
      #loss = reconstruction loss + KLD loss

    grads = tape.gradient(loss, vae.trainable_weights)
    optimizer.apply_gradients(zip(grads, vae.trainable_weights))

    metLoss = loss_metric(loss)

    if step % 100 == 0:
      print(f'step: {step}, meanloss: {intmetLoss}')

Epoch 0: 
step: 0, meanloss: nan
step: 100, meanloss: nan


KeyboardInterrupt: ignored