# CO460 - Deep Learning - Lab exercise 4

## Introduction

In this exercise, you will develop and experiment with convolutional AEs (CAE).
You will be asked to:

- experiment with the architectures and compare the convolutional model to the fully connected one. 
- investigate and implement sampling and interpolation in the latent space.

In [0]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.utils import save_image 
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np

# utils
def interpolate(z1, z2, num=11):
    Z = np.zeros((z1.shape[0], num))
    for i in range(z1.shape[0]):
        Z[i, :] = np.linspace(z1[i], z2[i], num)
    return Z

def denorm_for_tanh(x):
    x = 0.5 * (x + 1)
    x = x.clamp(0, 1)
    x = x.view(x.size(0), 1, 28, 28)
    return x

def denorm_for_sigmoid(x):
    x = x.clamp(0, 1)
    x = x.view(x.size(0), 1, 28, 28)
    return x

def denorm_for_binary(x):
    x = x.clamp(0, 1)
    x = x>0.5
    x = x.view(x.size(0), 1, 28, 28)
    return x

def show(img):
    if torch.cuda.is_available():
        img = img.cpu()
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1,2,0)))

### Device selection

In [0]:
GPU = True
device_idx = 0
if GPU:
    device = torch.device("cuda:"+str(device_idx) if torch.cuda.is_available() else "cpu")
else:
    
    device = torch.device("cpu")
print(device)

### Reproducibility

In [0]:
# We set a random seed to ensure that your results are reproducible.
if torch.cuda.is_available():
    torch.backends.cudnn.deterministic = True
torch.manual_seed(0)

### Normalization: 
$ x_{norm} = \frac{x-\mu}{\sigma} $

_Thus_ :
$ \min{x_{norm}} = \frac{\min{(x)}-\mu}{\sigma} = \frac{0-0.5}{0.5} = -1 $

_Similarly_:

$ \max{(x_{norm})} = ... = 1 $


* Input $\in [-1,1] $
* Output should span the same interval $ \rightarrow$ Activation function of the output layer should be chosen carfeully (Here??)

In [0]:
transform = transforms.Compose([
     transforms.ToTensor(),
     transforms.Normalize((0.5,), (0.5,))
])

denorm = denorm_for_tanh

train_dat = datasets.MNIST(
    "data/", train=True, download=True, transform=transform
)
test_dat = datasets.MNIST("data/", train=False, transform=transform)

### Hyper-parameter selection

In [0]:
if not os.path.exists('./CAE'):
    os.mkdir('./CAE')
    
num_epochs = 20
batch_size = 128
learning_rate = 1e-3

### Define the dataloaders

In [0]:
train_loader = DataLoader(train_dat, batch_size, shuffle=True)
test_loader = DataLoader(test_dat, batch_size, shuffle=False)

it = iter(test_loader)
sample_inputs, _ = next(it)
fixed_input = sample_inputs[:32, :, :, :]

in_dim = fixed_input.shape[-1]*fixed_input.shape[-2]

save_image(fixed_input, './CAE/image_original.png')

### Define the model - CAE

Complete the `encoder` and `decoder` methods in the CAE pipeline.

To find an effective architecture, you can experiment with the following:
- the number of convolutional layers
- the kernels' sizes
- the stride values
- the size of the latent space layer

In [0]:
class CAE(nn.Module):
    def __init__(self, latent_dim):
        super(CAE, self).__init__()
        """
        TODO: Define here the layers (convolutions, relu etc.) that will be
        used in the encoder and decoder pipelines.
        """
        
        
    def encode(self, x):
        """
        TODO: Construct the encoder pipeline here. The encoder's
        output will be the laten space representation of x.
        
        """
        
        return x
    
    def decode(self, z):
        """
        TODO: Construct the decoder pipeline here. The decoder should 
        generate an output tensor with equal dimenssions to the
        encoder's input tensor.
        
        """
        
        return z

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [0]:
# Instantiate the model
latent_dim = 
cv_AE = CAE(latent_dim=latent_dim)

### Define Loss function

In [0]:
criterion = nn.L1Loss(reduction='sum')  # can we use any other loss here?
def loss_function_CAE(recon_x, x):
    recon_loss = criterion(recon_x, x)
    return recon_loss

### Initialize Model and print number of parameters

In [0]:
model = cv_AE.to(device)
params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print("Total number of parameters is: {}".format(params))  # what would the number actually be?
print(model)

### Choose and initialize optimizer

In [0]:
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

### Train

In [0]:
model.train()

for epoch in range(num_epochs):
    train_loss = 0
    for batch_idx, data in enumerate(train_loader):
        img, _ = data
        img = img.to(device)
        optimizer.zero_grad()
        # forward
        recon_batch = model(img)
        loss = loss_function_CAE(recon_batch, img)
        # backward
        loss.backward()
        train_loss += loss.item()
        optimizer.step()
    # print out losses and save reconstructions for every epoch
    print('epoch [{}/{}], loss:{:.4f}'.format(epoch + 1, num_epochs, train_loss / len(train_loader.dataset)))
    recon = denorm(model(fixed_input.to(device)))
    save_image(recon.float(), './CAE/reconstructed_epoch_{}.png'.format(epoch))

# save the model
torch.save(model.state_dict(), './CAE/model.pth')

### Test

In [0]:
# load the model
model.load_state_dict(torch.load("./CAE/model.pth"))
model.eval()
test_loss = 0
with torch.no_grad():
    for i, (img, _) in enumerate(test_loader):
        img = img.to(device)
        recon_batch = model(img)
        test_loss += loss_function_CAE(recon_batch, img)
    # reconstruct and save the last batch
    recon_batch = model(recon_batch.to(device))
    img = denorm(img.cpu())
    # save the original last batch
    save_image(img.float(), './CAE/test_original.png')
    save_image(denorm(recon_batch.cpu()).float(), './CAE/reconstructed_test.png')
    # loss calculated over the whole test set
    test_loss /= len(test_loader.dataset)
    print('Test set loss: {:.4f}'.format(test_loss))

### Interpolations

In [0]:
# Define inpute tensors
x1 = 
x2 = 

# Create the latent representations
z1 = model.encode(x1)
z2 = model.encode(x2)

"""
TODO: Find a way to create interpolated results from the CAE.
"""
Z = 
X_hat = model.decode(Z)