In [1]:
import tensorflow as tf
import numpy as np
from tensorflow.keras import Model, layers
from tensorflow.keras.layers import Conv2D, Flatten, Dense, Reshape
import matplotlib.pyplot as plt

In [10]:
(x_train, y_train),(x_test,y_test) = tf.keras.datasets.mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.

x_train = np.reshape(x_train, (x_train.shape[0], np.prod(x_train.shape[1:])))

In [3]:
class Sampling(layers.Layer):
    """Uses (mean, log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        epsilon = tf.keras.backend.random_normal(shape=tf.shape(z_mean))
        return z_mean + tf.exp(0.5* z_log_var) * epsilon

class Encoder(Model):
    def __init__(self):
        super(Encoder, self).__init__()
        
    
    def call(self, inputs, latent_dim, summary = True):
        x = Flatten()(inputs)
        x = Dense(512, activation="relu")(x)
        
        mean = Dense(latent_dim, name="mean")(x)
        std_dev = Dense(latent_dim, name="log_var")(x)
        
        sampled = Sampling()([mean, std_dev])
        
        self.encoder = Model(inputs, sampled, name = "Encoder")
        if summary:
            print(self.encoder.summary())
        return self.encoder


class Decoder(Model):
    def __init__(self, ):
        super(Decoder, self).__init__()
        
    def call(self, latent_input, output_shape, summary = True):
        x = Dense(512, activation="relu")(latent_input)
        x = Dense(784, activation="relu")(x)
        output = Reshape(output_shape)(x)
        
        self.decoder = Model(latent_input, output, name="Decoder")
        
        if summary:
            self.decoder.summary()
        return self.decoder



    def encoder(self,  inputs, latent_dim, ):
        x = Flatten()(inputs)
        x = Dense(512, activation="relu")(x)
        mean = Dense(latent_dim, name="mean")(x)
        log_var = Dense(latent_dim, name="log_var")(x)
        
        sampled = Sampling()([mean, log_var])
        return sampled, mean, log_var
    
    def decoder(self,latent_input, output_shape ):
        x = Dense(512, activation="relu")(latent_input)
        x = Dense(784, activation="relu")(x)
        output = Reshape(output_shape)(x)
        return output
        
        
        
                self.inputs = tf.keras.Input(shape= img_shape)
        self.sampled, mean, std_dev  = encoder(self.inputs, self.latent_dim)
        self.output_layer = decoder(self.sampled, img_shape)
        
        self.latent_inputs = tf.keras.Input(shape= (self.latent_dim,))
        self.encoder = Model(self.inputs, self.sampled, name="Encoder")
        self.decoder = Model(self.latent_inputs, self.output_layer, name="Decoder")
        self.vae = Model(self.inputs, self.output_layer, name = "VAE")


In [46]:
class VAE(Model):
    def __init__(self, img_shape, latent_dim, summary = True):
        super(VAE, self).__init__()
        
        '''Defining encoder model'''
        self.inputs = tf.keras.Input(shape= img_shape)
        #x = Flatten()(self.inputs)
        x = Dense(512, activation="relu")(self.inputs)
        self.mean = Dense(latent_dim, name="mean")(x)
        self.log_var = Dense(latent_dim, name="log_var")(x)
        
        #reparameterisation trick(sampling from a random distribution)
        sampled = Sampling()([self.mean, self.log_var])
        
        #create encoder model
        self.encoder = Model(self.inputs, sampled, name = "Encoder")
        
        '''Defining decoder model'''
        latent_input = tf.keras.Input(shape= (latent_dim))
        y = Dense(512, activation="relu")(latent_input)
        output = Dense(784, activation="relu")(y)
       # output = Reshape(img_shape)(output)
        
        #create decoder model
        self.decoder = Model(latent_input, output, name = "Decoder")
        
        #create Variational Autoencoder
        encoded = self.encoder(self.inputs)
        decoded = self.decoder(encoded)
        self.vae = Model(self.inputs, decoded, name = "VAE")
            
    def get_model(self, summary = True):  
        if summary:
            self.vae.summary()
        return self.vae

    def get_encoder(self, summary = True):
        if summary:
            self.encoder.summary()
            
        return self.encoder
            
    def get_decoder(self, summary = True):
        if summary:
            self.decoder.summary()
            
        return self.decoder
    
    def loss_function(y_true, y_pred):

    #         encoded = self.encoder(self.inputs)
    #         decoded = self.decoder(encoded)

        recon_loss = tf.reduce_sum(tf.keras.losses.binary_crossentropy(y_true, y_pred))
        #print(recon_loss.shape)
        reconstruction_loss = tf.reduce_mean(recon_loss)

        kl_loss = -0.5 * (1 + self.log_var - tf.square(self.mean) - tf.exp(self.log_var))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
        total_loss = reconstruction_loss + kl_loss
        return total_loss

def loss_function(mean, variance):
    
    def get_reconstruction_loss(y_true, y_pred):
        reconstruction_loss = tf.keras.losses.mse(y_true, y_pred)
        reconstruction_loss_batch = tf.reduce_mean(reconstruction_loss)
        return reconstruction_loss_batch*28*28
    
    def get_kl_loss(mean, variance):
        kl_loss = 1 + variance - tf.square(mean) - tf.exp(variance)
        kl_loss_batch = tf.reduce_mean(kl_loss)
        return kl_loss_batch*(-0.5)
    
    def total_loss(y_true, y_pred):
        reconstruction_loss_batch = get_reconstruction_loss(y_true, y_pred)
        kl_loss_batch = get_kl_loss(mean, variance)
        return reconstruction_loss_batch + kl_loss_batch
    
    return total_loss
        

In [47]:
latent_dim = 2
img_shape = (784)

vae = VAE(img_shape, latent_dim)

In [51]:
encoder = vae.get_encoder()
decoder = vae.get_decoder()
model = vae.get_model()

model.add_loss(loss_function(vae.mean, vae.log_var))

Model: "Encoder"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_17 (InputLayer)           [(None, 784)]        0                                            
__________________________________________________________________________________________________
dense_24 (Dense)                (None, 512)          401920      input_17[0][0]                   
__________________________________________________________________________________________________
mean (Dense)                    (None, 2)            1026        dense_24[0][0]                   
__________________________________________________________________________________________________
log_var (Dense)                 (None, 2)            1026        dense_24[0][0]                   
____________________________________________________________________________________________

In [52]:
# model.compile(loss = vae.loss_function(), optimizer='adam')
# model.compile(loss = tf.keras.losses.MeanSquaredError(), optimizer='adam')
model.compile(optimizer='adam')

In [53]:
model.fit(x_train, 
          x_train,
          epochs = 5)

Epoch 1/5


TypeError: in user code:

    C:\Users\asus\AppData\Roaming\Python\Python37\site-packages\tensorflow\python\keras\engine\training.py:806 train_function  *
        return step_function(self, iterator)
    C:\Users\asus\AppData\Roaming\Python\Python37\site-packages\tensorflow\python\keras\engine\training.py:796 step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    C:\Users\asus\AppData\Roaming\Python\Python37\site-packages\tensorflow\python\distribute\distribute_lib.py:1211 run
        return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    C:\Users\asus\AppData\Roaming\Python\Python37\site-packages\tensorflow\python\distribute\distribute_lib.py:2585 call_for_each_replica
        return self._call_for_each_replica(fn, args, kwargs)
    C:\Users\asus\AppData\Roaming\Python\Python37\site-packages\tensorflow\python\distribute\distribute_lib.py:2945 _call_for_each_replica
        return fn(*args, **kwargs)
    C:\Users\asus\AppData\Roaming\Python\Python37\site-packages\tensorflow\python\keras\engine\training.py:789 run_step  **
        outputs = model.train_step(data)
    C:\Users\asus\AppData\Roaming\Python\Python37\site-packages\tensorflow\python\keras\engine\training.py:749 train_step
        y, y_pred, sample_weight, regularization_losses=self.losses)
    C:\Users\asus\AppData\Roaming\Python\Python37\site-packages\tensorflow\python\keras\engine\base_layer.py:1433 losses
        loss_tensor = regularizer()
    C:\Users\asus\AppData\Roaming\Python\Python37\site-packages\tensorflow\python\keras\engine\base_layer.py:1509 _tag_callable
        loss = loss()

    TypeError: total_loss() missing 2 required positional arguments: 'y_true' and 'y_pred'
