In [4]:
import tensorflow as tf
import matplotlib.pyplot as plt
import os
import numpy as np
import time
import tensorflow_datasets as tfds
import pandas as pd
import datetime
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

#functions or classes from user defined files
import Data_pipline
import metrics
import models
import utils

In [5]:
utils.set_seed_globally()

In [6]:
class training_CAE:
    def __init__(self,model,dataset,lr = 0.0001,optimizer=tf.keras.optimizers.Adam,epochs=100,batch_size=32,
                 random_vectors_for_decoder=None,save_dir=os.path.join(".."),loss_type="mse"):
        self.model = model
        self.lr = lr 
        self.optimizer = optimizer(self.lr)
        self.epochs = epochs
        self.dataset = dataset
        self.dataset_size = self.dataset.train_dataset_size
        self.batch_size = batch_size
        self.train_data = self.dataset.train_data.cache()
        self.random_vectors_for_decoder = random_vectors_for_decoder
        self.save_dir = save_dir
        self.loss_batch_rec = []
        self.loss_type = loss_type
        self.loss_type_list = ["mse","bce"]
        if self.loss_type not in self.loss_type_list:
            raise ValueError("Loss type not defined use amonr these",self.loss_type_list)
        self.metric_loss_dic = {"epoch":[],"reconstruction_loss":[]}
    
    def Reconstruction_loss_mse(self,decoded_x,x):
        loss = tf.reduce_mean(tf.math.reduce_sum(tf.keras.losses.MSE(decoded_x,x),axis=[1,2]))
        return loss
    
    def Reconstruction_loss(self,decoded_x,x):
        loss = tf.reduce_mean(tf.math.reduce_sum(tf.keras.losses.binary_crossentropy(x,decoded_x),axis=[1,2]))
        return loss
     
    def metrics_calc_logging(self,epoch,reconstruction_loss,epoch_end=False):
        if epoch_end:
            self.metric_loss_dic["reconstruction_loss"].append(np.mean(self.loss_batch_rec))
            self.metric_loss_dic["epoch"].append(epoch)
            self.loss_batch = []
            print("\n========> Reconstruction Loss : %.4f"%(self.metric_loss_dic["reconstruction_loss"][epoch]))
            
            if self.random_vectors_for_decoder: 
                z = self.model.encoder(np.array(self.random_vectors_for_decoder["images"]),training=False)
                decoded_x = self.model.decoder(z,training=False)
                fig, rows = plt.subplots(nrows = self.dataset.number_of_classes, ncols=2, figsize=(10,40))
                fig.subplots_adjust(hspace = .1, wspace=.005)
                row = rows.ravel()
                for i in range(0,self.dataset.number_of_classes):
                    row[i*2].imshow(self.random_vectors_for_decoder["images"][i],cmap="gray")
                    row[i*2].axis("off")
                    row[i*2].set_title("class_"+str(self.random_vectors_for_decoder["labels"][i]))
                    row[i*2+1].imshow(decoded_x[i],cmap='gray')
                    row[i*2+1].axis("off")
                    row[i*2+1].set_title("class_"+str(self.random_vectors_for_decoder["labels"][i]))
                plt.savefig(os.path.join(self.save_dir,"Decoded images per epoch","decoded_imgs_epoch"+str(epoch)+".jpg"))
    #             plt.show()
            plt.close()
            return
        self.loss_batch_rec.append(reconstruction_loss.numpy())
            
           
            
    @tf.function   
    def forward_backward_prop(self,x,x_o,training=True):
        with tf.GradientTape() as tape:
            z = self.model.encoder(x,training=training)
            decoded_x = self.model.decoder(z,training=training)
            reconstruction_loss = self.Reconstruction_loss(decoded_x,x_o)
            total_loss = reconstruction_loss
        if training:
            grads = tape.gradient(total_loss, self.model.trainable_weights)
            self.optimizer.apply_gradients(zip(grads, self.model.trainable_weights))
        return reconstruction_loss
    
    @tf.function   
    def forward_backward_prop_mse(self,x,x_o,training=True):
        with tf.GradientTape() as tape:
            z = self.model.encoder(x,training=training)
            decoded_x = self.model.decoder(z,training=training)
            reconstruction_loss = self.Reconstruction_loss_mse(decoded_x,x_o)
            total_loss = reconstruction_loss
        if training:
            grads = tape.gradient(total_loss, self.model.trainable_weights)
            self.optimizer.apply_gradients(zip(grads, self.model.trainable_weights))
        return reconstruction_loss
    
    def __call__(self):
        name = str(input("Enter the extra name for experiment folder if wanted else enter No :"))
        if name!="No":
            save_dir = os.path.join(self.save_dir,name)
            if not os.path.exists(save_dir):
                os.mkdir(save_dir)
            self.save_dir = save_dir
        if self.random_vectors_for_decoder:
            img_dir = os.path.join(self.save_dir,"Decoded images per epoch")
            if not os.path.exists(img_dir):
                os.mkdir(img_dir)
        total_Train_batches = int(np.ceil(self.dataset_size/self.batch_size))
        prev_acc = 0
        for epoch in range(self.epochs):
            start_time = time.time()
            train_data = self.train_data
            print("\nEpoch {} / {}".format(epoch+1,self.epochs))
            train_data = train_data.shuffle(self.dataset_size)
            train_data = train_data.batch(self.batch_size)
            train_data = train_data.prefetch(tf.data.experimental.AUTOTUNE)
            for batch,(img,img_o,label) in train_data.enumerate(1):
                print("====>Training Batch {} / {}".format(batch,total_Train_batches),end="\r")
                if self.loss_type=="mse":
                    reconstruction_loss= self.forward_backward_prop_mse(img,img_o,training=True)
                else:
                    reconstruction_loss= self.forward_backward_prop(img,img_o,training=True)
                self.metrics_calc_logging(epoch,reconstruction_loss)
            self.metrics_calc_logging(epoch,None,epoch_end=True)
                            
            end_time = time.time()
            print("========>Time taken : ",end_time-start_time)
        print("====>Model weights of last epoch are saved in",os.path.join(self.save_dir,"Final_CAE.h5"))
        print("Saving the train log in  : ",self.save_dir,"============>")
        df_train = pd.DataFrame.from_dict(self.metric_loss_dic)
        df_train.to_csv(os.path.join(self.save_dir,"train_log.csv"),index=False)
        self.model.save_weights(os.path.join(self.save_dir,"Fianl_CAE.h5"))
        return self.model          

In [9]:
# Data_Pipeline is implemented in the tesnoflow 2 using tf.data API this Data_Pipeline class returns the tf.dataset object
# there are lots of functionalites about this class you can also load the dataset which is in the local folders in the format
# Dataset_name/Class_name/imgs given its folder path using this Data_pipline class 
dataset = Data_pipline.Data_Pipeline(dataset_path=None,dataset="mnist",image_size = (28,28),image_preprocessing="1",
                                     split=False,split_ratio=[0.8,0.2],labels_required_for_output=True,
                                     images_required_for_output=True,)

#Choosing the images for the observation of Decoder output 
sample_of_the_dataset = {"images":[],"labels":[]}
for idx,(img,img_o,label) in dataset.train_data.enumerate().as_numpy_iterator():
    if len(sample_of_the_dataset["labels"])!=dataset.number_of_classes:
        if label not in sample_of_the_dataset["labels"]:
            sample_of_the_dataset["labels"].append(label)
            sample_of_the_dataset["images"].append(img)
        else:
            continue
    else:
        break

#The models present in the models.py are written only for mnist and fashion mnist dataset.
model = models.CAE_MNIST_VGG(embedding_size=8)

training = training_CAE(model = model,
                        dataset = dataset,
                        epochs = 10,
                        batch_size = 64,
                        random_vectors_for_decoder = sample_of_the_dataset,
                        optimizer = tf.keras.optimizers.Adam,
                        save_dir = os.path.join(".."),
                        loss_type = "mse",
                        lr = 0.0001

                        )
trained_model = training()

Total number of images in Training dataset :  60000
Images are normalized in the range [0,1] 
Belonging to the  10 Classes
Both images and class labels are present at the output in the Train Dataset\Test Dataset : (image,image,label)
Enter the extra name for experiment folder if wanted else enter No :No

Epoch 1 / 10
====>Training Batch 938 / 938

Epoch 2 / 10
====>Training Batch 938 / 938

Epoch 3 / 10
====>Training Batch 938 / 938

Epoch 4 / 10
====>Training Batch 938 / 938

Epoch 5 / 10
====>Training Batch 938 / 938

Epoch 6 / 10
====>Training Batch 938 / 938

Epoch 7 / 10
====>Training Batch 938 / 938

Epoch 8 / 10
====>Training Batch 938 / 938

Epoch 9 / 10
====>Training Batch 938 / 938

Epoch 10 / 10
====>Training Batch 938 / 938
====>Model weights of last epoch are save in ..\Final_CAE.h5
