<a href="https://colab.research.google.com/github/robinmaeschreiner/VAE_Model/blob/main/VAE_Model_Git_21_07.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Inspirations and Links to Useful Projects:


https://towardsdatascience.com/variational-autoencoders-vaes-for-dummies-step-by-step-tutorial-69e6d1c9d8e9
https://colab.research.google.com/github/lschmiddey/fastpages_/blob/master/_notebooks/2021-03-14-tabular-data-variational-autoencoder.ipynb#scrollTo=ZG91fCG40vWz 

https://github.com/lschmiddey/fastpages_/blob/master/_notebooks/2021-03-14-tabular-data-variational-autoencoder.ipynb 

https://github.com/lschmiddey - lschmiddey

https://gitlab.com/m4gpie/self-supervised-ecoacoustics/-/blob/main/code/ecoacoustics/models/encoder.py 


https://github.com/geyang/variational_autoencoder_pytorch/blob/master/model.py 

## Setup and Prerequisites 

In [None]:
! pip install feature-engine

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!pip install wandb


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
#Import Statements 
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import nn, optim
from torch.autograd import Variable

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OrdinalEncoder
from numpy import asarray
from feature_engine.imputation import RandomSampleImputer
from pandas.api.types import is_numeric_dtype
from torch.utils.data import Dataset, DataLoader
from torch.nn.modules.activation import LeakyReLU
from torch.nn.modules.activation import Sigmoid
from torch.nn.modules.activation import ReLU

# Functions and Theory:
## Data Preprocessing

In [None]:
#using the RandomSampleImputer library to impute mixed data sets - categorical and numerical
def impute_mixed_data(dataset_mixed): 
  # set up the imputer
  rs_imputer = RandomSampleImputer()

  # fit the imputer
  dataset_mixed_imputed = rs_imputer.fit_transform(dataset_mixed)
  return dataset_mixed_imputed 

In [None]:
# a function to use ordinal encoding for the columsn in the data that have continous categorical data following a natural order, this order can be specified if known but needs [] around it as originally the OrdinalEncoder function expects a list of columns 
def ordinal_encod(df_columns, ordered_catg='auto'): 

  data_col = asarray(df_columns)
  data_col = data_col.reshape(-1, 1)

  # define and transform data with ordinal encoding
  encoder_ordinal = OrdinalEncoder(categories=ordered_catg) # 
  encoder_ordinal.fit(data_col)
  #print(encoder_ordinal.categories_)
  ord_enc_col = encoder_ordinal.transform(data_col)

  #visualization
  df_ord_enc_col = pd.DataFrame(ord_enc_col )
  #display(df_ord_enc_col.iloc[:7])

  return ord_enc_col

In [None]:
def load_and_standardize_data(df):

    # replace nan with mean 
    df = np.array(df)
    df = df.reshape(-1, df.shape[1]).astype('float32')

    # randomly split
    X_train, X_test = train_test_split(df, test_size=0.3, random_state=42)

    # standardize values
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)   
    return X_train, X_test, scaler

In [None]:
# function to build rtain and test datasets for a given batch size using the DataBuilder Class below
def test_train_datasets(df, batch_size): 

  traindata_set = DataBuilder(df, train=True) # drop first column which is the PID and PTID - personal ID as it cannot be predicted
  testdata_set = DataBuilder(df, train=False)

  trainloader = DataLoader(dataset=traindata_set,batch_size= batch_size) 
  testloader= DataLoader(dataset=testdata_set,batch_size= batch_size) 

  print('Data Shape:', trainloader.dataset.x.shape, testloader.dataset.x.shape)
  return trainloader, testloader

In [None]:
# Class to standardize, pre-process and build the train and test datasets for the VAE training
class DataBuilder(Dataset):
    def __init__(self, df, train=True):
        self.X_train, self.X_test, self.standardizer  = load_and_standardize_data(df)
        if train:
            self.x = torch.from_numpy(self.X_train)
            self.len=self.x.shape[0]
        else:
            self.x = torch.from_numpy(self.X_test)
            self.len=self.x.shape[0]
        del self.X_train
        del self.X_test 
    def __getitem__(self,index):      
        return self.x[index]
    def __len__(self):
        return self.len

## VAE Model 

In [None]:
class Autoencoder(nn.Module):
    def __init__(self,D_in,H,H2,latent_dim, activation=nn.ReLU(), inbetween_layer=nn.BatchNorm1d):
        
        #Encoder
        super(Autoencoder,self).__init__()
        self.linear1=nn.Linear(D_in,H)
        self.lin_additional1 = inbetween_layer(num_features=H)
        self.linear2=nn.Linear(H,H2) # adda relu layer for non-linearity
        self.lin_additional2 = inbetween_layer(num_features=H2)
        self.linear3=nn.Linear(H2,H2)
        self.lin_additional3 = inbetween_layer(num_features=H2)
        
        # Latent vectors mu and sigma
        self.fc1 = nn.Linear(H2, latent_dim)
        self.fc_additional1 =inbetween_layer(num_features=latent_dim)
        self.fc21 = nn.Linear(latent_dim, latent_dim) #the output layer for the mean vector of size 7 for each latent dimension
        self.fc22 = nn.Linear(latent_dim, latent_dim) #the output layer for the variance vector of size 7 for each latent dimension
        # however, want a variance/sigma vector of size 37 one for each input vector! 

        # Sampling vector
        self.fc3 = nn.Linear(latent_dim, latent_dim)
        self.fc_additional3 = inbetween_layer(latent_dim)
        self.fc4 = nn.Linear(latent_dim, H2)
        self.fc_additional4 = inbetween_layer(H2)
        
        # Decoder
        self.linear4=nn.Linear(H2,H2)
        self.lin_additional4 = inbetween_layer(num_features=H2)
        self.linear5=nn.Linear(H2,H)
        self.lin_additional5 = inbetween_layer(num_features=H)

        self.linear6=nn.Linear(H,D_in)
        self.linear61=nn.Linear(H,D_in) # the output vector of this layer will be the size of the input features to get the predicted mean
        self.linear62=nn.Linear(H,D_in) # the output vector of this layer will be the size of the input features to get the predicted log_variance

        # Mean and Variance from Reconstruction
        self.fc51 = nn.Linear(D_in, D_in) #the output layer for the mean vector of size 7 for each latent dimension
        self.fc52 = nn.Linear(D_in, D_in) #the output layer for the variance vector of size 7 for each latent dimension

        # Activation Function
        self.activation = activation  #change this to self.activation and add an additional parameter to the AutoEncoder class which represents the type of activation function that should be tested out
        

    def encode(self, x):
        lin1 = self.activation(self.lin_additional1(self.linear1(x)))
        lin2 = self.activation(self.lin_additional2(self.linear2(lin1)))
        lin3 = self.activation(self.lin_additional3(self.linear3(lin2)))

        fc1 = self.activation(self.fc_additional1(self.fc1(lin3)))

        z_mu = self.fc21(fc1)
        z_logvar = self.fc22(fc1)
        
        return z_mu, z_logvar
    
    def reparameterize(self, z_mu, z_logvar):                          
        std = z_logvar.mul(0.5).exp_()
        eps = torch.distributions.Normal(z_mu, std).rsample()
        return eps.mul(std).add_(z_mu) #returns the latent vector z sampled from the mean and variance 

        
    def decode(self, z):
        fc3 = self.activation(self.fc_additional3(self.fc3(z)))
        fc4 = self.activation(self.fc_additional4(self.fc4(fc3)))

        lin4 = self.activation(self.lin_additional4(self.linear4(fc4)))
        lin5 = self.activation(self.lin_additional5(self.linear5(lin4)))

        #outputs
        x_recon = self.linear6(lin5)
        x_recon_mu = self.linear61(lin5) 
        x_recon_log_var = self.linear62(lin5)

        return x_recon, x_recon_mu, x_recon_log_var


    def forward(self, x): 
        z_mu, z_logvar = self.encode(x)
        z = self.reparameterize(z_mu, z_logvar)
        x_recon, x_recon_mu, x_recon_logvar = self.decode(z)

        return x_recon, z_mu, z_logvar, x_recon_mu, x_recon_logvar


    def test_just_meam(self, x): #instead of decoding the sampled z - only the mean is input -> bug testing
        z_mu, z_logvar = self.encode(x)
        x_recon, x_recon_mu, x_recon_logvar = self.decode(z_mu)

        return x_recon, z_mu, z_logvar, x_recon_mu, x_recon_logvar

Custom Loss Classes using MSE and or Individual Variances:

In [None]:
class customLoss(nn.Module): 
    def __init__(self):
        super(customLoss, self).__init__()
        self.mse_loss = nn.MSELoss(reduction="mean")
    
    def forward(self, x_recon, x, mu, logvar, x_recon_logvar): #x_recon_logvar is not needed but included as a place holder so that the right number of values is unpacked when changing between the functions
        loss_MSE = self.mse_loss(x_recon, x)
        loss_KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

        return loss_MSE + loss_KLD

In [None]:
from torch.nn.modules.loss import GaussianNLLLoss
#Inferring an individual variance for each feature - represent how confident the model is in predicting a given feature

class customLoss_individual(nn.Module): 
    def __init__(self):
        super(customLoss_individual, self).__init__()
        self.log_prob_Gaus_loss = nn.GaussianNLLLoss(reduction='none')
    
    
    def forward(self, x_recon, x, z_mu, z_logvar, x_recon_logvar): 
        loss_log_prob_sum = torch.sum(self.log_prob_Gaus_loss(x_recon, x, torch.exp(x_recon_logvar)), axis=1) #take the sum over the observations -> sum up the 37 values to get a value for each sample in the batch of size 500
        #print(loss_log_prob_sum.size())
        loss_log_prob = loss_log_prob_sum.mean(axis=0) #take the batch-wise mean 
        #print('Batch-wise Mean Gaussian',loss_log_prob)

        loss_KLD_sum =  -0.5 * torch.sum(1 + z_logvar - z_mu.pow(2) - z_logvar.exp()).sum(axis=0)
        loss_KLD = loss_KLD_sum.mean(axis=0) #after summing the KL terms for each of the latent space variables it takes the batch-wise mean

        return loss_log_prob  + loss_KLD

In [None]:
class customLoss_Gaussian_NLLL(nn.Module): 
    def __init__(self):
        super(customLoss_Gaussian_NLLL, self).__init__()
        self.log_prob_Gaus_loss = nn.GaussianNLLLoss(reduction='none') 
    
    def forward(self, x_recon, x, z_mu, z_logvar, x_recon_logvar): 
        loss_log_prob_sum = torch.sum(self.log_prob_Gaus_loss(x_recon, x, torch.exp(x_recon_logvar)), axis=-1) #take the sum over the observations -> sum up the 37 values to get a value for each sample in the batch of size 500
        loss_log_prob = loss_log_prob_sum.mean(axis=0) #take the batch-wise mean - mean over the rows 

        return loss_log_prob  

In [None]:
class customLoss_KL_divergence(nn.Module): 
    def __init__(self):
        super(customLoss_KL_divergence, self).__init__()
    
    def forward(self, x_recon, x, z_mu, z_logvar, x_recon_logvar): 
        loss_KLD_sum =  -0.5 * torch.sum(1 + z_logvar - z_mu.pow(2) - z_logvar.exp()).sum(axis=-1) #taking the sum over the latent dimensions
        loss_KLD = loss_KLD_sum.mean(axis=0) #after summing the KL terms taking the batch-wise mean

        return loss_KLD

## Training VAE Model

In [None]:
def train(epoch, trainloader, model, optimizer, customized_loss, train_losses, return_loss=False):
    model.train()
    train_loss = 0

    for batch_idx, data in enumerate(trainloader):
        data = data.to(device)
        optimizer.zero_grad() # Sets the gradients of all optimized torch.Tensor s to zero
        x_recon, z_mu, z_logvar, x_recon_mu, x_recon_logvar = model(data)
        loss = customized_loss(x_recon, data, z_mu, z_logvar, x_recon_logvar)
        loss.backward()
        train_loss += loss.item()
        optimizer.step()

    if epoch % 200 == 0:        
        print('====> Epoch: {} Average training loss: {:.4f}'.format(
            epoch, train_loss / len(trainloader.dataset)))
        
    train_losses.append(train_loss / len(trainloader.dataset))

    if return_loss==True :
      return train_loss / len(trainloader.dataset)

In [None]:
def test(epoch, testloader, model, optimizer, customized_loss, test_losses):

    with torch.no_grad(): #The wrapper with torch.no_grad() temporarily sets all of the requires_grad flags to false
        test_loss = 0

        for batch_idx, data in enumerate(testloader):
            data = data.to(device)
            optimizer.zero_grad()
            x_recon, z_mu, z_logvar, x_recon_mu, x_recon_logvar  = model(data)
            loss = customized_loss(x_recon, data, z_mu, z_logvar, x_recon_logvar)
            test_loss += loss.item()

            if epoch % 200 == 0:        
                print('====> Epoch: {} Average test loss: {:.4f}'.format(
                    epoch, test_loss / len(testloader.dataset)))
                
            test_losses.append(test_loss / len(testloader.dataset))

## Evaluating VAE Results


In [None]:
# Comparing the effects of the Reconstructed Values :

def compare_results(trainloader, testloader, model, optimizer):

  full_recon_x_test = []

  # encoding the test data using the trained model's weights
  with torch.no_grad():
      for batch_idx_recon, data in enumerate(testloader):
          data = data.to(device)
          optimizer.zero_grad()
          recon_batch, z_mu, z_logvar, x_recon_mu, x_recon_logvar = model(data)
          full_recon_x_test.append(recon_batch) # adding each of the test batch's data to a full reconstructed batch list

  # reshaping all rows of a test batch 
  recon_batch_reshaped_list = []
  recon_row_list = []
  int_recon_row_list = []
  scaler = testloader.dataset.standardizer

  for i in range(len(recon_batch)) :
    recon_batch_reshaped_list.append(np.reshape(recon_batch[i].cpu(), (1,-1)))
    recon_row_list.append(scaler.inverse_transform(recon_batch_reshaped_list[i].numpy()))
    int_recon_row_list.append(np.int_(recon_row_list[i]))


  # rescaling the initially normalized and rescaled data
  real_reshaped_list = []
  real_row_list = []
  int_real_row_list = []
  scaler = testloader.dataset.standardizer


#We want to compare the same batch form the reconstructed and original test data 

  for batch_idx_real, data in enumerate(testloader):
    untouched_data = data.to(device)

  for i in range(len(recon_batch)) :
    real_reshaped_list.append(np.reshape(untouched_data[i].cpu(), (1,-1))) # x[0] - the first batch or a specific batch? 
    real_row_list.append(scaler.inverse_transform(real_reshaped_list[i].numpy()))
    int_real_row_list.append(np.int_(real_row_list[i]))

  reconstructed_batch = int_recon_row_list
  real_batch = int_real_row_list

  # zipping real with reconstructed row for each item in the test batch
  orig_recon_zip = zip(real_batch, reconstructed_batch)
  orig_recon = list(orig_recon_zip)


  # checking condition that the test data from the real and reconstructed rows comes from the same batch!
  if batch_idx_recon == batch_idx_real:
    df = pd.DataFrame(data=orig_recon, columns=['Real Row', 'Reconstructed Row'])

  else:
    df = pd.DataFrame(data=[0], columns=['Not the from same batch'])

  
  return df, orig_recon_zip, orig_recon, real_batch, reconstructed_batch


In [None]:
#A function that computes the difference between each element pair (real, reconstructed)

def error_diff(zip_real_recon):
  error_diff_pairs = []

  for i in range(len(zip_real_recon)):
    curr_pair = zip_real_recon[i]
    curr_real = curr_pair[0][0]
    curr_recon = curr_pair[1][0]
    new_list = []
    error_diff_pairs.append(new_list)

    for j in range(len(curr_real)):
      val_real = curr_real[j]
      val_recon = curr_recon[j]
      error_diff_pairs[i].append(abs(val_recon - val_real))

  return error_diff_pairs