# Convolutional Variational Autoencoder (VAE) - Implementation

The purpose of this notebook is to implement the Convolutional Variational Autoencoder architecture, as outlined in section 3.4.2 of the bachelor thesis.

The code provided in this notebook was developed using the Google Colab platform.

The code in this notebook incorporates the following sources as references:

- https://github.com/AntixK/PyTorch-VAE
- https://medium.com/dataseries/variational-autoencoder-with-pytorch-2d359cbf027b

## Step 1 - Importing Dependencies

- Importing the necessary libraries to execute the code.

In [None]:
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import os
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd 
import torch
import torchvision
from torchvision import transforms
from torch import nn
import torch.nn.functional as F

## Step 2 - Hyperparameter Settings

- Set the HPs for the Convolutional VAE deep generative model. Besides, also check whether a GPU is available for use.

In [None]:
batch_size = 4          # Batch size for the Conv. VAE training
image_size = 64         # Image resolution for Conv. VAE training and output
num_workers = 2         # Number of CPU workers to process the data
d = 4                   # Latent Dimension
lr = 1e-4               # Learning rate for the Adam optimizer 
num_epochs = 800        # Number of epochs that the Conv. VAE will be trained
weight_decay=1e-5       # Weight decay for the Adam optimizer
torch.manual_seed(42)   # Manual seed definition

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f'Selected device: {device}')

## Step 3 - Dataset Loading

- Defining the custom class for loading the images as PyTorch dataset.

In [None]:
class Dataset(Dataset):
    def __init__(self, labels_file, root_dir, transform=None):
        self.annotations = pd.read_csv(labels_file, header=None)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        img_path = os.path.join(self.root_dir, str(self.annotations.iloc[index, 1]), self.annotations.iloc[index, 0])
        image = Image.open(img_path)
        label = torch.tensor(int(self.annotations.iloc[(index, 2)]))

        if self.transform:
            image = self.transform(image)

        return(image, label)
    
    def __getlabel__(self, index):
        label = (self.annotations.iloc[(index, 1)])        

        return(label)

- The preprocessing transformation matchs the data with the expected format from the Conv. VAE model.
- The labels .cvs file should be passed in a class-wise definiton, since the model is unconditional will generate one class per time.

In [None]:
preprocessing = transforms.Compose([transforms.Resize(image_size), 
                                    transforms.ToTensor(),
                                    transforms.Normalize((0,), (1,))
                                   ])

labels_file = '/path/to/class/labels/csv'
root_dir = '/path/to/root/image/folder'

dataset = Dataset(labels_file=labels_file, root_dir=root_dir, transform=preprocessing)
dataloader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)

## Step 4 - Convolutional VAE Model Definition

- **Encoder:** Defining the encoder of the VAE model, the encoder has the role of mapping the image data distribution.

In [None]:
class Encoder(nn.Module):
    def __init__(self, latent_dims):
        super(Encoder, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=2, padding=1)
        self.batch1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1)
        self.batch2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1)
        self.batch3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=0)
        self.batch4 = nn.BatchNorm2d(256)
        self.linear1 = nn.Linear (256 * 3 * 3, 1152)
        self.linear2 = nn.Linear(1152, latent_dims)
        self.linear3 = nn.Linear(1152, latent_dims)

        self.N = torch.distributions.Normal(0, 1)

        if torch.cuda.is_available():
          self.N.loc = self.N.loc.cuda()
          self.N.scale = self.N.scale.cuda()

        self.kl = 0


    def forward(self, x):

        x = x.to(device)
        x = F.leaky_relu(self.batch1(self.conv1(x)))
        x = F.leaky_relu(self.batch2(self.conv2(x)))
        x = F.leaky_relu(self.batch3(self.conv3(x)))
        x = F.leaky_relu(self.batch4(self.conv4(x)))
        x = torch.flatten(x, start_dim=1)
        x = F.leaky_relu(self.linear1(x))
        mu =  self.linear2(x)
        sigma = torch.exp(self.linear3(x))
        z = mu + sigma*self.N.sample(mu.shape)

        self.kl = -0.5 * torch.sum(1 + torch.log(sigma.pow(2)) - mu.pow(2) - sigma.pow(2))

        return z      

- **Decoder:** Defining the decoder of the VAE implementation, the decoder has the role of learning the original image distribution and, based on that, reconstructing new data instances that belong to the same original distribution.

In [None]:
class Decoder(nn.Module):
    
    def __init__(self, latent_dims):
        super().__init__()

        self.decoder_lin = nn.Sequential(
            nn.Linear(latent_dims, 1152),
            nn.LeakyReLU(),
            nn.Linear(1152, 256 * 3 * 3),
            nn.LeakyReLU()
        )

        self.unflatten = nn.Unflatten(dim=1, unflattened_size=(256, 3, 3))

        self.decoder_conv = nn.Sequential(
            nn.ConvTranspose2d(256, 128, 4, stride=2),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(),
            nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(),
            nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(),
            nn.ConvTranspose2d(32, 1, 3, stride=2, padding=1, output_padding=1)
        )
        
    def forward(self, x):
        x = self.decoder_lin(x)
        x = self.unflatten(x)
        x = self.decoder_conv(x)
        x = torch.sigmoid(x)
        return x

- Defining the Convolutional VAE complete class.

In [None]:
class VariationalAutoencoder(nn.Module):
    def __init__(self, latent_dims):
        super(VariationalAutoencoder, self).__init__()
        self.encoder = Encoder(latent_dims)
        self.decoder = Decoder(latent_dims)

    def forward(self, x):
        x = x.to(device)
        z = self.encoder(x)
        return self.decoder(z)

## Step 5 - Training the Model

- Initializing the Convolutional VAE model.

In [None]:
vae = VariationalAutoencoder(latent_dims=d)
optim = torch.optim.Adam(vae.parameters(), lr=lr, weight_decay=weight_decay)
vae.to(device)

- Training function for the Convolutional VAE model.

In [None]:
def train_epoch(vae, device, dataloader, optimizer, loss_fn):
    vae.train()
    train_loss = 0.0
    
    for x, _ in dataloader: 
        x = x.to(device)
        x_hat = vae(x)

        reconst_loss = loss_fn(x_hat, x)
        loss = reconst_loss + vae.encoder.kl

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss+=loss.item()

    return train_loss / len(dataloader.dataset)

- Training the Convolutional VAE model, the train loss is obtained and will be also displayed later.

In [None]:
train_loss_log = []
loss_fn = nn.BCELoss(reduction="sum")

for epoch in range(num_epochs):
   train_loss = train_epoch(vae, device, dataloader, optim, loss_fn)
   train_loss_log.append(train_loss)
   print('\n EPOCH {}/{} \t train loss {:.3f}'.format(epoch + 1, num_epochs,train_loss))

## Step 6 - Visualizing the Results

- **Training Results:** Train loss over epochs

In [None]:
plt.figure(figsize=(10,5))
plt.title("Loss During Training")
plt.plot(train_loss_log)
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

- **Synthetic Images:** Visualizing synthetic data generated by the model

In [None]:
def show_image(img):
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))

vae.eval()

with torch.no_grad():

    latent = torch.randn(128, d, device=device)
    
    img_recon = vae.decoder(latent)
    img_recon = img_recon.cpu()

    fig, ax = plt.subplots(figsize=(15, 10))
    show_image(torchvision.utils.make_grid(img_recon.data[:100],10,5))
    plt.show()

## Step 7 - Saving the Synthetic Images

- Defining a function to create a new images based on a noise input.
- Saving images in a desired folder location.
- Defining the desired number of images in total after the synthetic augmentation
- As mentioned, the images are created per class since this is a unconditional implementation.

In [None]:
def save_synthetic_data(dataset, save_path, num_instances, vae):
    num_images = num_instances - dataset.__len__()
    vae.eval()
    with torch.no_grad():
        latent = torch.randn(num_images, d, device=device)

        img_recon = vae.decoder(latent)
        img_recon = img_recon.cpu()

        for i in range(num_images):
            tensor_image = img_recon[i].detach().cpu()
            pil_image = transforms.ToPILImage()(tensor_image)
            pil_image = transforms.Resize((224, 224))(pil_image)
            pil_image.save(os.path.join(save_path, dataset.__getlabel__(0), 'vae_'+str(dataset.__getlabel__(0))+'_'+str(i)+'.jpg'))


In [None]:
save_path = "/path/to/save/the/images"
num_instances = 1000
save_synthetic_data(dataset, save_path, num_instances, vae)