In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
import math
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
import random
from sklearn.manifold import TSNE
import seaborn as sns
import plotly.express as px
import torch.optim as optim
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.manifold import TSNE
import seaborn as sns
import plotly.express as px
import cv2

In [None]:
# Source: https://debuggercafe.com/getting-started-with-variational-autoencoder-using-pytorch/
class VariationalAutoencoder(nn.Module):
    def __init__(self, num_features=8, num_dim=784):
        super(VariationalAutoencoder, self).__init__()
        
        self.num_features = num_features
        self.num_dim = num_dim
        
        self.encoder_layer_1 = nn.Linear(in_features=self.num_dim, out_features=512)
        self.encoder_layer_2 = nn.Linear(in_features=512, out_features=(self.num_features * 2))
        
        self.decoder_layer_1 = nn.Linear(in_features=self.num_features, out_features=512)
        self.decoder_layer_2 = nn.Linear(in_features=512, out_features=self.num_dim)
        
    def reparameterize(self, mu, log_var):
        """
        :param mu: mean from the encoder's latent space
        :param log_var: log variance from the encoder's latent space
        """
        std = torch.exp(0.5*log_var) # standard deviation
        eps = torch.randn_like(std)  # `randn_like` as we need the same size
        sample = mu + (eps * std)    # sampling as if coming from the input space
        
        return sample
    
    def forward(self, x):
        # encoding
        x = F.relu(self.encoder_layer_1(x))
        x = self.encoder_layer_2(x).view(-1, 2, self.num_features)
        
        # get `mu` and `log_var`
        mu = x[:, 0, :] # the first feature values as mean
        log_var = x[:, 1, :] # the other feature values as variance
        
        print(mu.shape)
        print(log_var.shape)
        
        print(mu)
        
        # get the latent vector through reparameterization
        z = self.reparameterize(mu, log_var)
 
        # decoding
        x = F.relu(self.decoder_layer_1(z))
        reconstruction = torch.sigmoid(self.decoder_layer_2(x))
        
        return reconstruction, mu, log_var
    
    def sample(self, mu, log_var):
        # get the latent vector through reparameterization
        z = self.reparameterize(mu, log_var)
 
        # decoding
        x = F.relu(self.decoder_layer_1(z))
        reconstruction = torch.sigmoid(self.decoder_layer_2(x))
        
        return reconstruction

In [None]:
class AutoencoderDataset(Dataset):         
    def __init__(self, x):    
        self.x = x

        self.n_samples = len(x)
                                                                  
    def __getitem__(self, index):                   
        return self.x[index], self.x[index]                
                                                     
    def __len__(self):                                                                                   
        return self.n_samples

In [None]:
img_width = 28
img_height = 28

dim = (img_width, img_height)

doggos = [
    "./images/labrador1.jpg",
    "./images/labrador2.jpg",
    "./images/labrador3.jpg",
    "./images/labrador4.jpg",
    "./images/labrador5.jpg"
]

images = []

for doggo in doggos:
    img = cv2.imread(doggo, 0) / 255 # read as grayscale 1 channel images and normalize
    
    img = cv2.resize(img, dim)
    
    images.append(img)

num_images = len(images)
num_cols   = 1

col_names = [
    "Original"
]

fig, axes = plt.subplots(nrows=num_images, ncols=num_cols, figsize=(num_cols*4, num_images*4))
counter = 0

for img in images:
    counter += 1
    
    plt.subplot(len(images), num_cols, counter)
    plt.imshow(img)

In [None]:
data = []

for img in images:
    data.append(img.ravel())
    
data

In [None]:
x = torch.Tensor(np.array(data))

x

In [None]:
device = 'cpu'
learning_rate = 0.001
epochs = 100
batch_size = 1
num_features=8

model = VariationalAutoencoder(num_features=num_features).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCELoss(reduction='sum')

In [None]:
def final_loss(bce_loss, mu, logvar):
    """
    This function will add the reconstruction loss (BCELoss) and the 
    KL-Divergence.
    KL-Divergence = 0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2)
    :param bce_loss: recontruction loss
    :param mu: the mean from the latent vector
    :param logvar: log variance from the latent vector
    """
    BCE = bce_loss 
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE + KLD

In [None]:
train_ds = AutoencoderDataset(
    x=x
)

train_loader = DataLoader(
    train_ds,
    batch_size=batch_size,
    shuffle=False,
    drop_last=False
)

In [None]:
def train_fn(loader, model, optimizer, loss_fn):
    loop = tqdm(loader)

    ave_loss = 0.0
    count = 0

    for batch_idx, (data, targets) in enumerate(loop):
        data    = data.to(device=device)
        targets = targets.to(device=device)

        # Backward
        optimizer.zero_grad()
        
        reconstruction, mu, logvar = model.forward(data)
        
        bce_loss = criterion(reconstruction, data)
        
        loss = final_loss(bce_loss, mu, logvar)

        loss.backward()

        optimizer.step()

        # update tqdm
        loop.set_postfix(loss=loss.item())

        ave_loss += loss.item()
        count += 1

    ave_loss = ave_loss / count

    return ave_loss

In [None]:
losses = []

for epoch in range(epochs):
    print("Epoch: {}".format(epoch))
    ave_loss = train_fn(train_loader, model, optimizer, final_loss)
    print("Ave Loss: {}".format(ave_loss))

    state = {
        'state_dict':       model.state_dict(),
        'optimizer':        optimizer.state_dict()
    }
    
    torch.save(state, "autoencoder.pth")
    
    losses.append(ave_loss)

In [None]:
sampled_mu = torch.Tensor([np.zeros(num_features)])
sampled_logvar = torch.Tensor([np.zeros(num_features)])

sampled_mu.shape

In [None]:
reconstruction = model.sample(sampled_mu, sampled_logvar)

reconstruction

In [None]:
reconstructed_image = reconstruction[0].detach().cpu().numpy()
reconstructed_image = reconstructed_image.reshape(img_width, img_height)
plt.imshow(reconstructed_image)