# Convolutional Autoencoder (CAE)
### Authors: Sergi Nogués and Gilles Salem

This is the second assignment for ACML at the MSc in
Artificial Intelligence at Maastricht University.

[Source code](https://github.com/serginogues/convolutional_autoencoder)

CIFAR-10 dataset: https://www.cs.toronto.edu/~kriz/cifar.html

In [None]:
import torch
from torchvision import datasets
from torch.utils.data.dataset import random_split
from torch.utils.data import DataLoader, ConcatDataset
from torch.optim import lr_scheduler
from torchvision import transforms
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt

In [None]:
DATA_PATH = 'D:/UM/ACML/Assignments/'
VALIDATION_SIZE = 0.1  # percentage of the training set used for validation
TRAIN_SIZE = 0.8
TEST_SIZE = 0.1
BATCH_SIZE = 64
# Compatibility with CUDA and GPU -> remember to move into GPU
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print("device: ", device)

## Reconstruction

**_Divide your dataset into training (80%), validation (10%) and test (10%). Normalize the data:_**

The CIFAR10 train and test datasets can be downloaded as follows.
Note that the _ToTensor()_ transform is applied to convert from PIL images
to Tensors and rescale from range $[0, 255]$ to range $[0, 1]$.

In [None]:
train_dataset = datasets.CIFAR10(root=DATA_PATH, train=True, download=True, transform=transforms.ToTensor())
test_dataset = datasets.CIFAR10(root=DATA_PATH, train=False, download=True, transform=transforms.ToTensor())
classes = train_dataset.classes

Now we can plot any image to check the previous code worked:

In [None]:
img, label = train_dataset[6150]
print("image label:", classes[label])

# since we already used the transform, type(img) = torch.Tensor
print(img.shape)

# plot with original axis before converting PIL Image to Tensor, otherwise an Exception arises
# C × H × W to H × W × C
plt.imshow(img.permute(1, 2, 0))
plt.axis('off')
plt.show()

In order to divide our dataset in test, train and validation, we need first to
concatenate both train and test and then split with the desired proportion.

In [None]:
concat_dataset = ConcatDataset([train_dataset, test_dataset])
len_ = len(concat_dataset)
train_set, test_set, valid_set = random_split(concat_dataset, [round(len_ * TRAIN_SIZE), round(len_ * TEST_SIZE), round(len_ * VALIDATION_SIZE)])

print("")
print("# samples train set =", len(train_set))
print("# samples test set =", len(test_set))
print("# samples validation set =", len(valid_set))
total_samp = len(train_set) + len(test_set) + len(valid_set)
print("Sample distribution: " + str(round((len(train_set) / total_samp) * 100))
      + "% train, " + str(round((len(test_set) / total_samp) * 100)) + "% test, "
      + str(round((len(valid_set) / total_samp) * 100)) + "% validation")

train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, num_workers=0, shuffle=True, drop_last=True)
test_loader = DataLoader(test_set, batch_size=BATCH_SIZE, num_workers=0, shuffle=False)
valid_loader = DataLoader(valid_set, batch_size=BATCH_SIZE, num_workers=0, shuffle=False)

Xs, Ys = iter(train_loader).next()
images = Xs.numpy()
images = images
plt.imshow(np.transpose(images[30], (1, 2, 0)))
plt.axis('off')
plt.show()

**_Implement the autoencoder network specified above. Run the training for at least 10 epochs, and plot the
evolution of the error with epochs._**

We first define our simple convolutional autoencoder class.

In [None]:
class CAE(nn.Module):
    def __init__(self):
        super(CAE, self).__init__()

        padding = 1
        stride = 1
        kernel = 3
        channels = [8, 12, 16, 12]

        # Encoder
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=channels[0], kernel_size=kernel, padding=padding, stride=stride)
        self.conv2 = nn.Conv2d(in_channels=channels[0], out_channels=channels[1], kernel_size=kernel, padding=padding, stride=stride)
        self.conv3 = nn.Conv2d(in_channels=channels[1], out_channels=channels[2], kernel_size=kernel, padding=padding, stride=stride)
        self.maxpool = nn.MaxPool2d(kernel_size=kernel-1, stride=stride, padding=0)

        # Decoder
        self.t_conv1 = nn.ConvTranspose2d(in_channels=channels[2], out_channels=channels[3], kernel_size=kernel+1, padding=padding, stride=stride)
        self.t_conv2 = nn.ConvTranspose2d(in_channels=channels[3], out_channels=3, kernel_size=kernel+1, padding=padding, stride=stride)
        self.print_latent_shape = 0

    def forward(self, x):
        # encoder
        x = F.relu(self.conv1(x))

        x = self.maxpool(x)
        x = F.relu(self.conv2(x))
        x = self.maxpool(x)
        x = F.relu(self.conv3(x))

        if self.print_latent_shape == 0:
            print("Latent space shape: " + str(x.shape))
            self.print_latent_shape +=1

        # decoder
        x = F.relu(self.t_conv1(x))
        y = F.sigmoid(self.t_conv2(x))
        return y

Then define the training hyperparameters and train the model for 10 epochs and batch size 64.

In [None]:
SAVE_PATH = 'models/cae.pth'
SAVE_PATH2 = 'models/cae2.pth'
SAVE_PATH3 = 'models/cae3.pth'
SAVE_PATH4 = 'models/cae4.pth'

In [None]:
EPOCHS = 10
LR = 0.01

model = CAE().to(device)
criterion = nn.BCELoss()  # loss function
#optimizer = optim.SGD(model.parameters(), lr=LR)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)

def train(model, save=SAVE_PATH):
    loss_history = []
    running_loss = 0.0
    for i in range(EPOCHS):
        # TRAIN MODEL
        loss_sum = 0
        n = 0
        for j, data in enumerate(train_loader, 0):
            n = j
            # get the training data
            images, label = data
            images = images.to(device)

            # Before the backward pass, set gradients to zero
            optimizer.zero_grad()

            # predict
            output = model.forward(images)

            # compute loss
            loss = criterion(output, images)
            loss_sum += round(float(loss.item()), 4)

            # backpropagate loss error
            loss.backward()

            # optimize with backprop
            optimizer.step()
            del data, images, label

        # region print current loss
        loss_epoch = loss_sum/n
        loss_history.append(loss_epoch)
        print("Epoch "+ str(i) +", Loss = "+ str(loss_epoch))

        # SAVE THE MODEL every EPOCH
        torch.save(model.state_dict(), save)

    print("Training finished")

    # PLOT ACCURACY
    plt.plot(loss_history)
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.title('Training Loss per epoch')
    plt.show()

train(model)

**_Report also the test error._**

In [None]:
def test(trained_model):
    with torch.no_grad():
        loss_sum = 0
        n = 0
        for images, labels in test_loader:
            output = trained_model.forward(images)
            loss = criterion(output, images)
            loss_sum += round(float(loss.item()), 4)
            n += 1
        loss_final = loss_sum/n
        print("Test Loss = ", loss_final)

cae1 = CAE()
cae1.load_state_dict(torch.load(SAVE_PATH))
cae1.eval()
test(cae1)

We now see our trained model's performance with these 5 images:

In [None]:
images, labels = iter(test_loader).next()
images_plot = images.numpy()

ff, axarr = plt.subplots(1, 5, constrained_layout=True)
for i in range(5):
    axarr[i].imshow(np.transpose(images_plot[i], (1, 2, 0)))
    axarr[i].axis('off')
plt.tight_layout()
plt.show()

After 10 epochs of training the reconstruction is good:

In [None]:
# Sample outputs
output = cae1.forward(images)
output = output.detach().numpy()

ff2, axarr = plt.subplots(1, 5)
for i in range(5):
    axarr[i].imshow(np.transpose(output[i], (1, 2, 0)))
    axarr[i].axis('off')
plt.tight_layout()
plt.show()

**_What is the size of the latent space representation of the above network?_**

$W' = \frac{W - K + 2P}{S} + 1$

Encoder:
1. out_conv1 = batch_size x 8 x 32 x 32 where $32 = 32 - 3 + 2 + 1$
- out_pool1 = batch_size x 8 x 31 x 31
- out_conv2 = batch_size x 12 x 31 x 31
- out_pool2 = batch_size x 12 x 30 x 30
- out_conv3 = batch_size x 16 x 30 x 30

Latent space size = 16 x 30 x 30 = 14400

**_Try other architectures (e.g. fewer intermediate layers, different number of channels, filter sizes or stride and
padding configurations) to answer questions such as: What is the impact of those in the reconstruction error
after training? Is there an obvious correlation between the size of the latent space representation and the error?_**

We first try defining a moodel such that the latent space is smaller.

In [None]:
class CAE2(nn.Module):
    def __init__(self):
        super(CAE2, self).__init__()

        channels=[5, 8, 10, 8, 5]
        padding = 1
        stride = 1
        kernel = 3

        # Encoder
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=channels[0], kernel_size=(4, 4), padding=padding, stride=(2,2))
        self.conv2 = nn.Conv2d(in_channels=channels[0], out_channels=channels[1], kernel_size=(4, 4), padding=padding, stride=stride)
        self.conv3 = nn.Conv2d(in_channels=channels[1], out_channels=channels[2], kernel_size=(4, 4), padding=padding, stride=stride)
        self.maxpool = nn.MaxPool2d(kernel_size=kernel-1, stride=stride, padding=0)

        # Decoder
        self.t_conv1 = nn.ConvTranspose2d(in_channels=channels[2], out_channels=channels[3], kernel_size=(6, 6), padding=padding, stride=(2, 2))
        self.t_conv2 = nn.ConvTranspose2d(in_channels=channels[3], out_channels=channels[4], kernel_size=(6, 6), padding=padding, stride=stride)
        self.t_conv3 = nn.ConvTranspose2d(in_channels=channels[4], out_channels=3, kernel_size=(6, 6), padding=padding, stride=stride)
        self.print_latent_shape = 0

    def forward(self, x):
        # encoder
        x = F.relu(self.conv1(x))
        x = self.maxpool(x)
        x = F.relu(self.conv2(x2))
        x = self.maxpool(x)
        x = F.relu(self.conv3(x))
        if self.print_latent_shape == 0:
            print("Latent space shape: " + str(x.shape))
            self.print_latent_shape +=1

        # decoder
        x = F.relu(self.t_conv1(x))
        x = F.relu(self.t_conv2(x))
        y = F.sigmoid(self.t_conv3(x))
        return y


In [None]:
model = CAE2().to(device)
criterion = nn.BCELoss()  # loss function
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
train(model, save=SAVE_PATH2)

The latent space is now smaller than before: 10 x 12 x 12; and the training error higher.

In [None]:
model = CAE2()
model.load_state_dict(torch.load(SAVE_PATH2))
model.eval()
test(model)

# Sample outputs
output = model.forward(images)
output = output.detach().numpy()

ff2, axarr = plt.subplots(1, 5)
for i in range(5):
    axarr[i].imshow(np.transpose(output[i], (1, 2, 0)))
    axarr[i].axis('off')
plt.tight_layout()
plt.show()

As expected the reconstruction test error is higher and the reconstructed images are worse.

Now let's try with a more channels and more layers to improve the results.

In [None]:
class CAE3(nn.Module):
    def __init__(self):
        super(CAE3, self).__init__()

        channels=[16, 32, 64, 32, 16]
        padding = 1
        stride = 1
        kernel = 3

        # Encoder
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=channels[0], kernel_size=(3, 3), padding=padding, stride=stride)
        self.conv2 = nn.Conv2d(in_channels=channels[0], out_channels=channels[1], kernel_size=(3, 3), padding=padding, stride=stride)
        self.conv3 = nn.Conv2d(in_channels=channels[1], out_channels=channels[2], kernel_size=(3, 3), padding=padding, stride=stride)
        self.maxpool = nn.MaxPool2d(kernel_size=kernel-1, stride=stride, padding=0)

        # Decoder
        self.t_conv1 = nn.ConvTranspose2d(in_channels=channels[2], out_channels=channels[3], kernel_size=(3, 3), padding=padding, stride=stride)
        self.t_conv2 = nn.ConvTranspose2d(in_channels=channels[3], out_channels=channels[4], kernel_size=(4, 4), padding=padding, stride=stride)
        self.t_conv3 = nn.ConvTranspose2d(in_channels=channels[4], out_channels=3, kernel_size=(4, 4), padding=padding, stride=stride)
        self.print_latent_shape = 0

    def forward(self, x):
        # encoder
        x = F.relu(self.conv1(x))
        x = self.maxpool(x)
        x = F.relu(self.conv2(x))
        x = self.maxpool(x)
        x = F.relu(self.conv3(x))
        if self.print_latent_shape == 0:
            print("Latent space shape: " + str(x.shape))
            self.print_latent_shape +=1

        # decoder
        x = F.relu(self.t_conv1(x))
        x = F.relu(self.t_conv2(x))
        y = F.sigmoid(self.t_conv3(x))
        return y


In [None]:
model = CAE3().to(device)
criterion = nn.BCELoss()  # loss function
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
train(model, save=SAVE_PATH3)

The latent space is now bigger than before 64 x 30 x 30 and the training error lower.

In [None]:
cae3 = CAE3()
cae3.load_state_dict(torch.load(SAVE_PATH3))
cae3.eval()
test(cae3)

# Sample outputs
output = cae3.forward(images)
output = output.detach().numpy()

ff2, axarr = plt.subplots(1, 5)
for i in range(5):
    axarr[i].imshow(np.transpose(output[i], (1, 2, 0)))
    axarr[i].axis('off')
plt.tight_layout()
plt.show()

As expected the reconstruction test error is lower and the reconstructed images are better.
Note that the colors are better reconstructed now.

## Colorization

**_Adapt your network from the previous part such that it learns to reconstruct colors by feeding in grayscale
images but predicting all RGB channels. As a starting point, use the hyperparameters (including the network
architecture) that you identified to yield the best performance in Exercise 3.2._**


**_Report on your results and reason about potential shortcomings of your network.
What aspects of the architecture/hyperparameters/optimization could be improved upon to fit the model more adequately to this
application? Try out some ideas._**

Let's first see what YUV channels look like, how to access Chrominance
and how to reconstruct back the RGB color by combining the grayscale image with the two Chrominance channels.

In [None]:
import cv2
images, labels = iter(test_loader).next()
img = images[50].numpy()
img = np.transpose(img, (1, 2, 0))
yuv_image = cv2.cvtColor(img, cv2.COLOR_BGR2YUV)
gray_image = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)

plt.subplot(1,5,1)
plt.imshow(img)
plt.title('Original')
plt.axis('off')
plt.subplot(1,5,2)
plt.imshow(gray_image, cmap="gray")
plt.title('Grayscale')
plt.axis('off')
plt.subplot(1,5,3)
plt.imshow(yuv_image[:,:,0],cmap="gray")
plt.title('Luminance')
plt.axis('off')
plt.subplot(1, 5, 4)
plt.imshow(yuv_image[:,:,1],cmap="gray")
plt.title('Chrominance1')
plt.axis('off')
plt.subplot(1,5,5)
plt.imshow(yuv_image[:,:,2],cmap="gray")
plt.title('Chrominance2')
plt.axis('off')
plt.show()

By combining the 2 chrominance channels with the luminance channel we can reconstruct
the original RGB image.

In [None]:
luminance_image = yuv_image[:,:,0]

chr_image = yuv_image[:,:,1:]
yuv_reconstructed = cv2.merge((luminance_image, chr_image[:,:,0], chr_image[:,:,1]))

rgb_image = cv2.cvtColor(yuv_reconstructed, cv2.COLOR_YUV2BGR)
plt.imshow(rgb_image)
plt.show()

Now the goal is to predict the 2D chrominance by using as input the 1D luminance.

In [None]:
class CAE4(nn.Module):
    def __init__(self):
        super(CAE4, self).__init__()

        channels=[16, 32, 64, 64, 32]
        padding = 1
        stride = 1
        kernel = 3

        # Encoder
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=channels[0], kernel_size=(3, 3), padding=padding, stride=stride)
        self.conv2 = nn.Conv2d(in_channels=channels[0], out_channels=channels[1], kernel_size=(3, 3), padding=padding, stride=stride)
        self.conv3 = nn.Conv2d(in_channels=channels[1], out_channels=channels[2], kernel_size=(3, 3), padding=padding, stride=stride)
        self.maxpool = nn.MaxPool2d(kernel_size=kernel-1, stride=stride, padding=0)

        # Decoder
        self.t_conv1 = nn.ConvTranspose2d(in_channels=channels[2], out_channels=channels[3], kernel_size=(3, 3), padding=padding, stride=stride)
        self.t_conv2 = nn.ConvTranspose2d(in_channels=channels[3], out_channels=channels[4], kernel_size=(4, 4), padding=padding, stride=stride)
        self.t_conv3 = nn.ConvTranspose2d(in_channels=channels[4], out_channels=2, kernel_size=(4, 4), padding=padding, stride=stride)
        self.print_latent_shape = 0

    def forward(self, x):
        # encoder
        x = F.relu(self.conv1(x))
        x = self.maxpool(x)
        x = F.relu(self.conv2(x))
        x = self.maxpool(x)
        x = F.relu(self.conv3(x))
        if self.print_latent_shape == 0:
            print("Latent space shape: " + str(x.shape))
            self.print_latent_shape +=1

        # decoder
        x = F.relu(self.t_conv1(x))
        x = F.relu(self.t_conv2(x))
        y = F.sigmoid(self.t_conv3(x))
        return y

We will implement learning rate decay and early stop.

In [206]:
model_colorization = CAE4().to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model_colorization.parameters(), lr=LR)
scheduler = lr_scheduler.ExponentialLR(optimizer, gamma=0.9)

In [207]:
EPOCHS = 70
def train4(model, save):
    loss_history = []
    running_loss = 0.0
    for i in range(EPOCHS):
        # TRAIN MODEL
        loss_sum = 0
        n = 0
        for j, data in enumerate(train_loader, 0):
            n = j
            # get the training data
            imgs, _ = data

            # rgb to yuv
            images_yuv = np.zeros((BATCH_SIZE, 32, 32, 3))
            for idx in range(BATCH_SIZE):
                img = imgs[idx].numpy()
                yuv_image = cv2.cvtColor(np.transpose(img, (1, 2, 0)), cv2.COLOR_BGR2YUV)
                images_yuv[idx] = yuv_image

            # numpy to tensor
            target = torch.tensor(np.reshape(images_yuv, (64, 3, 32, 32))[:, 1:, :, :]).float().to(device)
            images_yuv_tensor = torch.from_numpy(np.reshape(np.reshape(images_yuv, (64, 3, 32, 32))[:, 0, :, :], (64, 1, 32, 32))).to(device)  # luminance as input

            # Before the backward pass, set gradients to zero
            optimizer.zero_grad()

            # predict
            input = images_yuv_tensor.float().to(device)
            output = model.forward(input).to(device)  # chrominance
            # compute loss
            loss = criterion(output, target)
            loss_sum += round(float(loss.item()), 4)

            # backpropagate loss error
            loss.backward()

            # optimize with backprop
            optimizer.step()
            del data, imgs

        scheduler.step()

        # region print current loss
        loss_epoch = loss_sum / n
        loss_history.append(loss_epoch)
        print("Epoch " + str(i) + ", Loss = " + str(loss_epoch))

        if i > 1 and loss_epoch < loss_history[-1]:
            # SAVE THE MODEL
            print("model saved")
            torch.save(model.state_dict(), save)

        if i>2 and loss_epoch > loss_history[-2] or i > 15 and (loss_history[1] - loss_epoch) < 0.02:
            break

    print("Training finished")

    # PLOT ACCURACY
    plt.plot(loss_history)
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.title('Training Loss per epoch')
    plt.show()

In [211]:
train4(model_colorization, SAVE_PATH4)

KeyboardInterrupt: 

This task is much more difficult for the current model as can be seen by the loss evolution per epoch.

In [None]:
images, _ = iter(test_loader).next()
images_plot = images.numpy()
img = images[50].numpy()
img = np.transpose(img, (1, 2, 0))
yuv_image = cv2.cvtColor(img, cv2.COLOR_BGR2YUV)

In [None]:
cae4 = CAE4()
cae4.load_state_dict(torch.load(SAVE_PATH4))
cae4.eval()

luminance_image = torch.tensor(np.reshape(yuv_image[:,:,0], (1, 1, 32, 32)))
chr_image = cae4.forward(luminance_image)

In [None]:
chr_image = np.reshape(chr_image.detach().numpy(), (32, 32, 2))
yuv_reconstructed = cv2.merge((yuv_image[:,:,0], chr_image[:,:,0], chr_image[:,:,1]))
reconstructed = cv2.cvtColor(yuv_reconstructed, cv2.COLOR_YUV2BGR)
plt.subplot(1,3,1)
plt.imshow(img)
plt.title('Original')
plt.subplot(1,3,2)
plt.imshow(yuv_image[:,:,0],cmap="gray")
plt.title('Grayscale')
plt.subplot(1, 3, 3)
plt.imshow(reconstructed)
plt.title('Colorized')
plt.show()

It is clear that the model needs improvement. Better results could be achieved by adding more convolutional layers such that
the encoded latent space has bigger size. This would require more computational power of course.

