# CAE and CVAE to Reconstruct MNIST

## Introduction

In this exercise, you will develop and experiment with convolutional AEs (CAE) and VAEs (CVAE).
You will be asked to:

- experiment with the architectures and compare the convolutional models to the fully connected ones. 
- investigate and implement sampling and interpolation in the latent space.

In [0]:
import os
os.chdir("drive/Colab Notebooks/")
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.utils import save_image 
import torch.nn.functional as F
from utils import *
import matplotlib.pyplot as plt
import numpy as np

from utils import denorm_for_tanh, denorm_for_sigmoid

### Device selection

In [0]:
GPU = True
device_idx = 0
if GPU:
    device = torch.device("cuda:"+str(device_idx) if torch.cuda.is_available() else "cpu")
else:
    
    device = torch.device("cpu")
print(device)

cuda:0


### Reproducibility

In [0]:
# We set a random seed to ensure that your results are reproducible.
if torch.cuda.is_available():
    torch.backends.cudnn.deterministic = True
torch.manual_seed(0)

<torch._C.Generator at 0x7f4c26bbd0d0>

## Part 1 - CAE

### Normalization: 
$ x_{norm} = \frac{x-\mu}{\sigma} $

_Thus_ :
$ \min{x_{norm}} = \frac{\min{(x)}-\mu}{\sigma} = \frac{0-0.5}{0.5} = -1 $

_Similarly_:

$ \max{(x_{norm})} = ... = 1 $


* Input $\in [-1,1] $
* Output should span the same interval $ \rightarrow$ Activation function of the output layer should be chosen carfeully (Here??)

In [0]:
transform = transforms.Compose([
     transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

denorm = denorm_for_tanh

train_dat = datasets.MNIST(
    "data/", train=True, download=True, transform=transform
)
test_dat = datasets.MNIST("data/", train=False, transform=transform)

### Hyper-parameter selection

In [0]:
if not os.path.exists('./CAE'):
    os.mkdir('./CAE')
    
num_epochs = 20
batch_size = 128
learning_rate = 1e-3

### Define the dataloaders

In [0]:
train_loader = DataLoader(train_dat, batch_size, shuffle=True)
test_loader = DataLoader(test_dat, batch_size, shuffle=False)

it = iter(test_loader)
sample_inputs, _ = next(it)
fixed_input = sample_inputs[:32, :, :, :]

in_dim = fixed_input.shape[-1]*fixed_input.shape[-2]

save_image(fixed_input, './CAE/image_original.png')

### Define the model - CAE

Complete the `encoder` and `decoder` methods in the CAE pipeline.

To find an effective architecture, you can experiment with the following:
- the number of convolutional layers
- the kernels' sizes
- the stride values
- the size of the latent space layer

In [0]:
class CAE(nn.Module):
    def __init__(self, latent_dim):
        super(CAE, self).__init__()
        """
        Define here the layers (convolutions, relu etc.) that will be
        used in the encoder and decoder pipelines.
        """
        self.encoder = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=3, kernel_size=3, stride=1, padding=0),
            nn.BatchNorm2d(3),
            nn.ReLU(inplace = True),
            nn.Conv2d(in_channels=3, out_channels=3, kernel_size=3, stride=1, padding=0),
            nn.BatchNorm2d(3),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=3, out_channels=1, kernel_size=3, stride=1, padding=0),
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(in_channels=1, out_channels=3, kernel_size=3, stride=1, padding=0),
            nn.BatchNorm2d(3),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(in_channels=3, out_channels=3, kernel_size=3,stride=1, padding=0),
            nn.BatchNorm2d(3),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(in_channels=3, out_channels=1, kernel_size=3, stride=1, padding=0),
            nn.Tanh(),
        )
        self.fc1 = nn.Linear(in_features=1*(fixed_input.shape[-2]-6)*(fixed_input.shape[-1]-6), out_features=latent_dim)
        self.fc2 = nn.Linear(in_features=latent_dim, out_features=1*(fixed_input.shape[-2]-6)*(fixed_input.shape[-1]-6))
        
        
        
    def encode(self, x):
        """
        Construct the encoder pipeline here. The encoder's
        output will be the laten space representation of x.
        
        """
        x = self.encoder(x)
        x = x.view(x.size(0),-1)
        x = self.fc1(x)
        return x
    
    def decode(self, z):
        """
        Construct the decoder pipeline here. The decoder should 
        generate an output tensor with equal dimenssions to the
        encoder's input tensor.
        
        """
        z = self.fc2(z)
        z = z.view(z.size(0),1,fixed_input.shape[-2]-6,fixed_input.shape[-1]-6)
        z = self.decoder(z)
        return z

    def forward(self, x):
        x = self.encode(x)
        x = self.decode(x)
        return x

In [0]:
# Instantiate the model
latent_dim = 144
cv_AE = CAE(latent_dim=latent_dim)

### Define Loss function

In [0]:
criterion = nn.L1Loss(reduction='sum')  # can we use any other loss here?
def loss_function_CAE(recon_x, x):
    recon_loss = criterion(recon_x, x)
    return recon_loss

### Initialize Model and print number of parameters

In [0]:
model = cv_AE.to(device)
params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print("Total number of parameters is: {}".format(params))  # what would the number actually be?
print(model)

Total number of parameters is: 140328
CAE(
  (encoder): Sequential(
    (0): Conv2d(1, 3, kernel_size=(3, 3), stride=(1, 1))
    (1): BatchNorm2d(3, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace)
    (3): Conv2d(3, 3, kernel_size=(3, 3), stride=(1, 1))
    (4): BatchNorm2d(3, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace)
    (6): Conv2d(3, 1, kernel_size=(3, 3), stride=(1, 1))
  )
  (decoder): Sequential(
    (0): ConvTranspose2d(1, 3, kernel_size=(3, 3), stride=(1, 1))
    (1): BatchNorm2d(3, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace)
    (3): ConvTranspose2d(3, 3, kernel_size=(3, 3), stride=(1, 1))
    (4): BatchNorm2d(3, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace)
    (6): ConvTranspose2d(3, 1, kernel_size=(3, 3), stride=(1, 1))
    (7): Tanh()
  )
  (fc1): Linear(in_features=484, out_features=144, bias=True)
  (fc2):

### Choose and initialize optimizer

In [0]:
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

### Train

In [0]:
model.train()

for epoch in range(num_epochs):
    train_loss = 0
    for batch_idx, data in enumerate(train_loader):
        img, _ = data
        img = img.to(device)
        optimizer.zero_grad()
        # forward
        recon_batch = model(img)
        loss = loss_function_CAE(recon_batch, img)
        # backward
        loss.backward()
        train_loss += loss.item()
        optimizer.step()
    # print out losses and save reconstructions for every epoch
    print('epoch [{}/{}], loss:{:.4f}'.format(epoch + 1, num_epochs, train_loss / len(train_loader.dataset)))
    recon = denorm(model(fixed_input.to(device)))
    save_image(recon, './CAE/reconstructed_epoch_{}.png'.format(epoch))

# save the model
torch.save(model.state_dict(), './CAE/model.pth')

epoch [1/20], loss:192.6586
epoch [2/20], loss:105.1258
epoch [3/20], loss:70.8323
epoch [4/20], loss:50.1407
epoch [5/20], loss:39.1389
epoch [6/20], loss:32.7772
epoch [7/20], loss:28.7359
epoch [8/20], loss:25.9221
epoch [9/20], loss:23.7346
epoch [10/20], loss:22.3031
epoch [11/20], loss:21.3493
epoch [12/20], loss:20.7182
epoch [13/20], loss:20.1161
epoch [14/20], loss:19.7955
epoch [15/20], loss:19.3143
epoch [16/20], loss:18.9739
epoch [17/20], loss:18.6636
epoch [18/20], loss:18.4126
epoch [19/20], loss:18.1286
epoch [20/20], loss:17.8232


### Test

In [0]:
# load the model
model.load_state_dict(torch.load("./CAE/model.pth"))
model.eval()
test_loss = 0
with torch.no_grad():
    for i, (img, _) in enumerate(test_loader):
        img = img.to(device)
        recon_batch = model(img)
        test_loss += loss_function_CAE(recon_batch, img)
    # reconstruct and save the last batch
    recon_batch = model(recon_batch.to(device))
    img = denorm(img.cpu())
    # save the original last batch
    save_image(img, './CAE/test_original.png')
    save_image(denorm(recon_batch.cpu()), './CAE/reconstructed_test.png')
    # loss calculated over the whole test set
    test_loss /= len(test_loader.dataset)
    print('Test set loss: {:.4f}'.format(test_loss))

Test set loss: 18.2061


### Interpolations

In [0]:
# Define inpute tensors
x1 = fixed_input[3:4,:,:,:].to(device)
x2 = fixed_input[4:5,:,:,:].to(device)

# Create the latent representations
z1 = model.encode(x1)
z2 = model.encode(x2)

"""
Find a way to create interpolated results from the CAE.
"""
interpolate = 16
step = (z2 - z1)/interpolate
Z = torch.zeros(interpolate, latent_dim).to(device)

for i in range(interpolate):
    Z[i, :] = z1 + i*step
X_hat = model.decode(Z)
save_image(X_hat, './CAE/inter.png')

## Part 2 - CVAE

### Normalization

In [0]:
transform = transforms.Compose([
     transforms.ToTensor(),
     #transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

denorm = denorm_for_sigmoid

train_dat = datasets.MNIST(
    "data/", train=True, download=True, transform=transform
)
test_dat = datasets.MNIST("data/", train=False, transform=transform)

### Hyper-parameter selection

In [0]:
if not os.path.exists('./CVAE'):
    os.mkdir('./CVAE')
    
num_epochs = 20
batch_size = 128
learning_rate = 1e-3

### Define the dataloaders

In [0]:
train_loader = DataLoader(train_dat, batch_size, shuffle=True)
test_loader = DataLoader(test_dat, batch_size, shuffle=False)

it = iter(test_loader)
sample_inputs, _ = next(it)
fixed_input = sample_inputs[:32, :, :, :]

in_dim = fixed_input.shape[-1]*fixed_input.shape[-2]

save_image(fixed_input, './CVAE/image_original.png')

### Define the model - CVAE

Complete the `encoder` and `decoder` methods in the CVAE pipeline.

To find an effective architecture, you can experiment with the following:
- the number of convolutional layers
- the kernels' sizes
- the stride values
- the size of the latent space layer

In [0]:
class CVAE(nn.Module):
    def __init__(self, latent_dim):
        super(CVAE, self).__init__()
        """
        Define here the layers (convolutions, relu etc.) that will be
        used in the encoder and decoder pipelines.
        """
        self.encoder = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=3, kernel_size=3, stride=1, padding=0),
            nn.BatchNorm2d(3),
            nn.ReLU(inplace = True),
            nn.Conv2d(in_channels=3, out_channels=1, kernel_size=3, stride=1, padding=0),
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(in_channels=1, out_channels=3, kernel_size=3, stride=1, padding=0),
            nn.BatchNorm2d(3),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(in_channels=3, out_channels=1, kernel_size=3, stride=1, padding=0),
            nn.Sigmoid(),
        )
        self.fc_mu = nn.Linear(in_features=1*(fixed_input.shape[-2]-4)*(fixed_input.shape[-1]-4), out_features=latent_dim)
        self.fc_var = nn.Linear(in_features=1*(fixed_input.shape[-2]-4)*(fixed_input.shape[-1]-4), out_features=latent_dim)
        self.fc = nn.Linear(in_features=latent_dim, out_features=1*(fixed_input.shape[-2]-4)*(fixed_input.shape[-1]-4))
        
    def encode(self, x):
        """
        Construct the encoder pipeline here.        
        """
        x = self.encoder(x)
        x = x.view(x.size(0),-1)
        mu = self.fc_mu(x)
        logvar = self.fc_var(x)
        return mu, logvar

    def reparametrize(self, mu, logvar):
        """
        Implement reparameterization here.
        """
        std = torch.exp(0.5*logvar)
        eps = torch.randn_like(std)
        z = eps*std+mu
        return z

    def decode(self, z):
        """
        Construct the decoder pipeline here.        
        """
        z = self.fc(z)
        z = z.view(z.size(0), 1, fixed_input.shape[-2]-4, fixed_input.shape[-1]-4)
        z = self.decoder(z)
        return z
 
    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparametrize(mu, logvar)
        x_hat = self.decode(z)
        return x_hat, mu, logvar

In [0]:
# Instantiate the model
latent_dim = 144
cv_VAE = CVAE(latent_dim =latent_dim)

### Define Loss function

In [0]:
# Reconstruction + KL divergence losses summed over all elements and batch
def loss_function_VAE(recon_x, x, mu, logvar):
    BCE = F.binary_cross_entropy(recon_x, x, size_average=False)
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE + KLD

### Initialize Model and print number of parameters

In [0]:
model = cv_VAE.to(device)
params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print("Total number of parameters is: {}".format(params))  # what would the number actually be?
print(model)

Total number of parameters is: 249824
CVAE(
  (encoder): Sequential(
    (0): Conv2d(1, 3, kernel_size=(3, 3), stride=(1, 1))
    (1): BatchNorm2d(3, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace)
    (3): Conv2d(3, 1, kernel_size=(3, 3), stride=(1, 1))
  )
  (decoder): Sequential(
    (0): ConvTranspose2d(1, 3, kernel_size=(3, 3), stride=(1, 1))
    (1): BatchNorm2d(3, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace)
    (3): ConvTranspose2d(3, 1, kernel_size=(3, 3), stride=(1, 1))
    (4): Sigmoid()
  )
  (fc_mu): Linear(in_features=576, out_features=144, bias=True)
  (fc_var): Linear(in_features=576, out_features=144, bias=True)
  (fc): Linear(in_features=144, out_features=576, bias=True)
)


### Choose and initialize optimizer

In [0]:
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

### Train

In [0]:
model.train()

for epoch in range(num_epochs):
    train_loss = 0
    for batch_idx, data in enumerate(train_loader):
        img, _ = data
        img = img.to(device)
        optimizer.zero_grad()
        # forward
        recon_batch, mu, logvar = model(img)
        loss = loss_function_VAE(recon_batch, img, mu, logvar)
        # backward
        loss.backward()
        train_loss += loss.item()
        optimizer.step()
    # print out losses and save reconstructions for every epoch
    print('epoch [{}/{}], loss:{:.4f}'.format(epoch + 1, num_epochs, train_loss / len(train_loader.dataset)))
    recon, _, _ = model(fixed_input.to(device))
    recon = denorm(recon.to(device))
    save_image(recon, './CVAE/reconstructed_epoch_{}.png'.format(epoch))

# save the model
torch.save(model.state_dict(), './CVAE/model.pth')



epoch [1/20], loss:275.4623
epoch [2/20], loss:161.1067
epoch [3/20], loss:145.2654
epoch [4/20], loss:138.1768
epoch [5/20], loss:133.2514
epoch [6/20], loss:129.4952
epoch [7/20], loss:126.5332
epoch [8/20], loss:124.1339
epoch [9/20], loss:122.2564
epoch [10/20], loss:120.7838
epoch [11/20], loss:119.6233
epoch [12/20], loss:118.6610
epoch [13/20], loss:117.9335
epoch [14/20], loss:117.2908
epoch [15/20], loss:116.6565
epoch [16/20], loss:116.0496
epoch [17/20], loss:115.4741
epoch [18/20], loss:115.0195
epoch [19/20], loss:114.6422
epoch [20/20], loss:114.4216


### Test

In [0]:
# load the model
model.load_state_dict(torch.load("./CVAE/model.pth"))
model.eval()
test_loss = 0
with torch.no_grad():
    for i, (img, _) in enumerate(test_loader):
        img = img.to(device)
        recon_batch, mu, logvar = model(img)
        test_loss += loss_function_VAE(recon_batch, img, mu, logvar)
    # reconstruct and save the last batch
    recon_batch, _, _= model(recon_batch.to(device))
    img = denorm(img.cpu())
    # save the original last batch
    save_image(img, './CVAE/test_original.png')
    save_image(denorm(recon_batch.cpu()), './CVAE/reconstructed_test.png')
    # loss calculated over the whole test set
    test_loss /= len(test_loader.dataset)
    print('Test set loss: {:.4f}'.format(test_loss))



Test set loss: 113.3263


### Sample

Sample the latent space and use the `decoder` to generate resutls.

In [0]:
model.load_state_dict(torch.load("./CVAE/model.pth"))
model.eval()
n_samples = 32
with torch.no_grad():
    """
    Investigate how to sample the latent space of the CVAE.
    """
    z = torch.randn(n_samples, latent_dim).to(device)
    sample = model.decode(z)
    save_image(denorm(sample).cpu(), './CVAE/samples_' + '.png')

### Interpolations

In [0]:
# Define inpute tensors
x1 = fixed_input[3:4, :, :, :].to(device)
x2 = fixed_input[5:6, :, :, :].to(device)

# Create the latent representations
mu, logvar = model.encode(x1)
z1 = model.reparametrize(mu, logvar)
mu, logvar = model.encode(x2)
z2 = model.reparametrize(mu, logvar)

interpolate = 64
step = (z2 - z1)/interpolate

"""
Find a way to create interpolated results from the CVAE.
"""
Z = torch.zeros(interpolate, latent_dim).cuda()

for i in range(interpolate):
    Z[i, :] = z1 + i*step
    
X_hat = model.decode(Z)
save_image(X_hat, './CVAE/inter.png')