# CO460 - Deep Learning - Lab exercise 3

## Introduction

In this exercise, you will develop and experiment with convolutional AEs (CAE) and VAEs (CVAE).
You will be asked to:

- experiment with the architectures and compare the convolutional models to the fully connected ones. 
- investigate and implement sampling and interpolation in the latent space.

In [14]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.utils import save_image 
import torch.nn.functional as F
from utils import *
import matplotlib.pyplot as plt
import numpy as np

from utils import denorm_for_tanh, denorm_for_sigmoid

### Device selection

In [26]:
GPU = True
device_idx = 0
if GPU:
    device = torch.device("cuda:"+str(device_idx) if torch.cuda.is_available() else "cpu")
else:
    
    device = torch.device("cpu")
print(device)

cuda:0


### Reproducibility

In [27]:
# We set a random seed to ensure that your results are reproducible.
if torch.cuda.is_available():
    torch.backends.cudnn.deterministic = True
torch.manual_seed(0)

paa


<torch._C.Generator at 0x7f2f380b6ef0>

## Part 1 - CAE

### Normalization: 
$ x_{norm} = \frac{x-\mu}{\sigma} $

_Thus_ :
$ \min{x_{norm}} = \frac{\min{(x)}-\mu}{\sigma} = \frac{0-0.5}{0.5} = -1 $

_Similarly_:

$ \max{(x_{norm})} = ... = 1 $


* Input $\in [-1,1] $
* Output should span the same interval $ \rightarrow$ Activation function of the output layer should be chosen carfeully (Here??)

In [17]:
transform = transforms.Compose([
     transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

denorm = denorm_for_tanh

train_dat = datasets.MNIST(
    "data/", train=True, download=True, transform=transform
)
test_dat = datasets.MNIST("data/", train=False, transform=transform)

### Hyper-parameter selection

In [18]:
if not os.path.exists('./CAE'):
    os.mkdir('./CAE')
    
num_epochs = 20
batch_size = 128
learning_rate = 1e-3

### Define the dataloaders

In [19]:
train_loader = DataLoader(train_dat, batch_size, shuffle=True)
test_loader = DataLoader(test_dat, batch_size, shuffle=False)

it = iter(test_loader)
sample_inputs, _ = next(it)
fixed_input = sample_inputs[:32, :, :, :]

x1 = sample_inputs[0, :, :, :]
x2 = sample_inputs[1, :, :, :]

in_dim = fixed_input.shape[-1]*fixed_input.shape[-2]

save_image(fixed_input, './CAE/image_original.png')

### Define the model - CAE

Complete the `encoder` and `decoder` methods in the CAE pipeline.

To find an effective architecture, you can experiment with the following:
- the number of convolutional layers
- the kernels' sizes
- the stride values
- the size of the latent space layer

In [20]:
class CAE(nn.Module):
    def __init__(self, latent_dim):
        super(CAE, self).__init__()
        """
        TODO: Define here the layers (convolutions, relu etc.) that will be
        used in the encoder and decoder pipelines.
        """
        # encoder layers
        self.conv1 = nn.Conv2d(1, 16, 3, stride=3, padding=1)
        self.maxp1 = nn.MaxPool2d(2, stride=2)
        self.conv2 = nn.Conv2d(16, 8, 3, stride=2, padding=1)
        self.maxp2 = nn.MaxPool2d(2, stride=1)
        self.fc = nn.Linear(32, latent_dim)
        
        # decoder layers
        self.deconv1 = nn.ConvTranspose2d(8, 16, 3, stride=2)
        self.deconv2 = nn.ConvTranspose2d(16, 8, 5, stride=3, padding=1)
        self.deconv3 = nn.ConvTranspose2d(8, 1, 2, stride=2, padding=1)
        self.tanh = nn.Tanh()
        
        # relu
        self.relu = nn.ReLU(True)
        
        
    def encode(self, x):
        """
        TODO: Construct the encoder pipeline here. The encoder's
        output will be the laten space representation of x.
        """
        x = self.relu(self.conv1(x)) # b, 16, 10, 10
        x = self.maxp1(x) # b, 16, 5, 5
        x = self.relu(self.conv2(x)) # b, 8, 3, 3
        x = self.maxp2(x) # b, 8, 2, 2
        x = x.view(x.size(0), -1) # b, 8*2*2
        x = self.fc(x) # b, h_dim
        return x
    
    def decode(self, z):
        """
        TODO: Construct the decoder pipeline here. The decoder should 
        generate an output tensor with equal dimenssions to the
        encoder's input tensor.
        """
        z = z.view(z.size(0), 8, 2, 2) # b, 8, 2, 2
        z = self.relu(self.deconv1(z)) # b, 16, 5, 5
        z = self.relu(self.deconv2(z)) # b, 8, 15, 15
        z = self.tanh(self.deconv3(z)) # b, 1, 28, 28
        return z
        
        return z

    def forward(self, x):
        x = self.encode(x)
        x = self.decode(x)
        return x

In [21]:
# Instantiate the model
latent_dim = 32
cv_AE = CAE(latent_dim=latent_dim)

### Define Loss function

In [22]:
criterion = nn.L1Loss(reduction='sum')  # can we use any other loss here?
def loss_function_CAE(recon_x, x):
    recon_loss = criterion(recon_x, x)
    return recon_loss

### Initialize Model and print number of parameters

In [23]:
model = cv_AE.to(device)
params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print("Total number of parameters is: {}".format(params))  # what would the number actually be?
print(model)

Total number of parameters is: 6785
CAE(
  (conv1): Conv2d(1, 16, kernel_size=(3, 3), stride=(3, 3), padding=(1, 1))
  (maxp1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(16, 8, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (maxp2): MaxPool2d(kernel_size=2, stride=1, padding=0, dilation=1, ceil_mode=False)
  (fc): Linear(in_features=32, out_features=32, bias=True)
  (deconv1): ConvTranspose2d(8, 16, kernel_size=(3, 3), stride=(2, 2))
  (deconv2): ConvTranspose2d(16, 8, kernel_size=(5, 5), stride=(3, 3), padding=(1, 1))
  (deconv3): ConvTranspose2d(8, 1, kernel_size=(2, 2), stride=(2, 2), padding=(1, 1))
  (tanh): Tanh()
  (relu): ReLU(inplace)
)


### Choose and initialize optimizer

In [24]:
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

### Train

In [25]:
model.train()

for epoch in range(num_epochs):
    train_loss = 0
    for batch_idx, data in enumerate(train_loader):
        img, _ = data
        img = img.to(device)
        optimizer.zero_grad()
        # forward
        recon_batch = model(img)
        loss = loss_function_CAE(recon_batch, img)
        # backward
        loss.backward()
        train_loss += loss.item()
        optimizer.step()
    # print out losses and save reconstructions for every epoch
    print('epoch [{}/{}], loss:{:.4f}'.format(epoch + 1, num_epochs, train_loss / len(train_loader.dataset)))
    recon = denorm(model(fixed_input.to(device)))
    save_image(recon, './CAE/reconstructed_epoch_{}.png'.format(epoch))

# save the model
torch.save(model.state_dict(), './CAE/model.pth')

RuntimeError: cuDNN error: CUDNN_STATUS_ARCH_MISMATCH

### Test

In [None]:
# load the model
model.load_state_dict(torch.load("./CAE/model.pth"))
model.eval()
test_loss = 0
with torch.no_grad():
    for i, (img, _) in enumerate(test_loader):
        img = img.to(device)
        recon_batch = model(img)
        test_loss += loss_function_CAE(recon_batch, img)
    # reconstruct and save the last batch
    recon_batch = model(recon_batch.to(device))
    img = denorm(img.cpu())
    # save the original last batch
    save_image(img, './CAE/test_original.png')
    save_image(denorm(recon_batch.cpu()), './CAE/reconstructed_test.png')
    # loss calculated over the whole test set
    test_loss /= len(test_loader.dataset)
    print('Test set loss: {:.4f}'.format(test_loss))

### Interpolations

In [None]:
# Define inpute tensors
x1 = sample_inputs[10, :, :, :]
x2 = sample_inputs[36, :, :, :]

# Create the latent representations
z1 = model.encode(x1.to(device)[None,:]).cpu().detach().numpy()[0]
z2 = model.encode(x2.to(device)[None,:]).cpu().detach().numpy()[0]

"""
TODO: Find a way to create interpolated results from the CAE.
"""
Z = interpolate(z1, z2, num=11)
Z = torch.FloatTensor(Z.T).cuda()
X_hat = model.decode(Z)

In [None]:
save_image(denorm(X_hat.cpu()), './CAE/interpolation.png')

## Part 2 - CVAE

### Normalization

In [None]:
transform = transforms.Compose([
     transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

denorm = denorm_for_tanh

train_dat = datasets.MNIST(
    "data/", train=True, download=True, transform=transform
)
test_dat = datasets.MNIST("data/", train=False, transform=transform)

### Hyper-parameter selection

In [None]:
if not os.path.exists('./CVAE'):
    os.mkdir('./CVAE')
    
num_epochs = 20
batch_size = 128
learning_rate = 1e-3

### Define the dataloaders

In [None]:
train_loader = DataLoader(train_dat, batch_size, shuffle=True)
test_loader = DataLoader(test_dat, batch_size, shuffle=False)

it = iter(test_loader)
sample_inputs, _ = next(it)
fixed_input = sample_inputs[:32, :, :, :]

in_dim = fixed_input.shape[-1]*fixed_input.shape[-2]

save_image(fixed_input, './CVAE/image_original.png')

### Define the model - CVAE

Complete the `encoder` and `decoder` methods in the CVAE pipeline.

To find an effective architecture, you can experiment with the following:
- the number of convolutional layers
- the kernels' sizes
- the stride values
- the size of the latent space layer

In [None]:
# For the CVAE use the same architecture as in the CAE. 
# You also need to implement reparameterization (also included in tutorial3)

class CVAE(nn.Module):
    def __init__(self, latent_dim):
        super(CVAE, self).__init__()
        """
        TODO: Define here the layers (convolutions, relu etc.) that will be
        used in the encoder and decoder pipelines.
        """
        
        
    def encode(self, x):
        """
        TODO: Construct the encoder pipeline here.        
        """

        return mu, logvar

    def reparametrize(self, mu, logvar):
        """
        TODO: Implement reparameterization here.
        """

        return z

    def decode(self, z):
        """
        TODO: Construct the decoder pipeline here.        
        """

        return z
 
    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparametrize(mu, logvar)
        x_hat = self.decode(z)
        return x_hat, mu, logvar

In [None]:
# Instantiate the model
latent_dim = 
cv_VAE = CVAE(latent_dim =latent_dim)

### Define Loss function

In [None]:
# Reconstruction + KL divergence losses summed over all elements and batch
def loss_function_VAE(recon_x, x, mu, logvar):
    BCE = F.binary_cross_entropy(recon_x, x, size_average=False)
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE + KLD

### Initialize Model and print number of parameters

In [None]:
model = cv_AE.to(device)
params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print("Total number of parameters is: {}".format(params))  # what would the number actually be?
print(model)

### Choose and initialize optimizer

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

### Train

In [None]:
model.train()

for epoch in range(num_epochs):
    train_loss = 0
    for batch_idx, data in enumerate(train_loader):
        img, _ = data
        img = img.to(device)
        optimizer.zero_grad()
        # forward
        recon_batch = model(img)
        loss = loss_function_CAE(recon_batch, img)
        # backward
        loss.backward()
        train_loss += loss.item()
        optimizer.step()
    # print out losses and save reconstructions for every epoch
    print('epoch [{}/{}], loss:{:.4f}'.format(epoch + 1, num_epochs, train_loss / len(train_loader.dataset)))
    recon = denorm(model(fixed_input.to(device)))
    save_image(recon, './CVAE/reconstructed_epoch_{}.png'.format(epoch))

# save the model
torch.save(model.state_dict(), './CVAE/model.pth')

### Test

In [None]:
# load the model
model.load_state_dict(torch.load("./CVAE/model.pth"))
model.eval()
test_loss = 0
with torch.no_grad():
    for i, (img, _) in enumerate(test_loader):
        img = img.to(device)
        recon_batch = model(img)
        test_loss += loss_function_CAE(recon_batch, img)
    # reconstruct and save the last batch
    recon_batch = model(recon_batch.to(device))
    img = denorm(img.cpu())
    # save the original last batch
    save_image(img, './CVAE/test_original.png')
    save_image(denorm(recon_batch.cpu()), './CVAE/reconstructed_test.png')
    # loss calculated over the whole test set
    test_loss /= len(test_loader.dataset)
    print('Test set loss: {:.4f}'.format(test_loss))

### Sample

Sample the latent space and use the `decoder` to generate resutls.

In [None]:
model.load_state_dict(torch.load("./CVAE/model.pth"))
model.eval()
with torch.no_grad():
    """
    TODO: Investigate how to sample the latent space of the CVAE.
    """
    z = torch.randn(32, latent_dim).to(device)
    sample = model.decode(z)
    save_image(denorm(sample).cpu(), './CVAE/samples_' + '.png')

### Interpolations

In [None]:
# Define inpute tensors
x1 = sample_inputs[10, :, :, :]
x2 = sample_inputs[36, :, :, :]

# Create the latent representations
z1 = model.encode(x1.to(device)[None,:]).cpu().detach().numpy()[0]
z2 = model.encode(x2.to(device)[None,:]).cpu().detach().numpy()[0]

"""
TODO: Find a way to create interpolated results from the CAE.
"""
Z = interpolate(z1, z2, num=11)
Z = torch.FloatTensor(Z.T).cuda()
X_hat = model.decode(Z)

In [None]:
save_image(denorm(X_hat.cpu()), './CAE/interpolation.png')