<a href="https://colab.research.google.com/github/SzymonNowakowski/Machine-Learning-2024/blob/master/Lab11-autoencoders.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Lab 11 - Autoencoders
### Author: Szymon Nowakowski


# Introduction
--------------

Autoencoders can be thought of as nonlinear extensions of PCA. In this class, we’ll train an autoencoder on the MNIST dataset and compare its encoded representation to the PCA space we constructed earlier (remember our very first class?). This comparison will help us see whether the autoencoder captures the structure of the data more effectively.

Next, we’ll put the trained autoencoder to practical use. It is great in anomaly detection (identification of outliers) and image denoising.

You’ll also notice that throughout this class, we’re treating the images in a class-diagnostic, unsupervised manner—focusing on the structure of the data itself, rather than on labels.

# Deterministic output
--------------------------------



In [1]:
import torch

SEED = 0

torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)


# Reading MNIST Dataset
----------------------------------

In [2]:
import torch
import torchvision
from matplotlib import pyplot

transform = torchvision.transforms.ToTensor() #Converts a PIL Image or numpy.ndarray (H x W x C) in the range [0, 255] to a torch.FloatTensor of shape (C x H x W) in the range [0.0, 1.0]

     # note - we are NOT normalizing pixels, as we want to keep 0-1 range

trainset = torchvision.datasets.MNIST(root='./data',
                                      train=True,
                                      download=True,
                                      transform=transform)

trainloader = torch.utils.data.DataLoader(trainset,
                                          batch_size=2048,
                                          shuffle=True)   #we do shuffle it to give more randomizations to training epochs

testset = torchvision.datasets.MNIST(root='./data',
                                     train=False,
                                     download=True,
                                     transform=transform)

testloader = torch.utils.data.DataLoader(testset,
                                         batch_size=1,
                                         shuffle=False)


100%|██████████| 9.91M/9.91M [00:00<00:00, 15.5MB/s]
100%|██████████| 28.9k/28.9k [00:00<00:00, 494kB/s]
100%|██████████| 1.65M/1.65M [00:00<00:00, 4.39MB/s]
100%|██████████| 4.54k/4.54k [00:00<00:00, 4.95MB/s]


# Tensor Sizes
-------------------

Recall:
- Batched labels are of order one. The first (and only) index is a sample index within a batch. **The labels, however, are of no direct interest to us during this class**.
- Image batches have order 4. The first index is a sample index within a batch, but a second index has size 1 and thus it is always 0.
  - This index represents a Channel number inserted here by `ToTensor()` transformation, always 0.
  - It should be retained because we want to use convolutional layers, which explicitly require this order. For RGB images we have 3 channels, for B&W images we have only one channel.


# Encoder and Decoder Networks
-----------------

Autoencoder is an Encoder followed by a Decoder, as in this diagram: ![Autoencoder](https://github.com/SzymonNowakowski/Machine-Learning-2024/raw/master/autoencoder_diagram.png
)



## Encoder

In [3]:
import torch.nn as nn
import torch.nn.functional as F

class EncoderMLP(nn.Module):
    def __init__(self, bottleneck_dimensionality):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Flatten(),                           # 1x28x28 -> 784
            nn.Linear(784, 256),
            nn.ReLU(),
            nn.Linear(256, 64),
            nn.ReLU(),
            nn.Linear(64, bottleneck_dimensionality)  # -> bottleneck
        )

    def forward(self, x):
        return self.encoder(x)


## Decoder

In [4]:

class DecoderMLP(nn.Module):
    def __init__(self, bottleneck_dimensionality):
        super().__init__()
        self.decoder = nn.Sequential(
            nn.Linear(bottleneck_dimensionality, 64),
            nn.ReLU(),
            nn.Linear(64, 256),
            nn.ReLU(),
            nn.Linear(256, 784),
            nn.Sigmoid(),             # Output in [0, 1]
        )

    def forward(self, x):
        x = self.decoder(x)
        return x.view(-1, 1, 28, 28)  # Reshape to image. -1 means "infer the proper dimensionality automatically"


## Autoencoder


In [5]:
class Autoencoder(nn.Module):
    def __init__(self, bottleneck_dimensionality):
        super().__init__()
        self.encoder = EncoderMLP(bottleneck_dimensionality)
        self.decoder = DecoderMLP(bottleneck_dimensionality)

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# Training Loop
----------------------

Beware: Training takes around 10 minutes on a GPU, and closer to an hour on a CPU. I had the joy of discovering that firsthand when Colab temporarily revoked my GPU access.

In [None]:
import time
import torch
import torch.nn.functional as F

# Start timing
start_time = time.time()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Working on {device}")

net = Autoencoder(bottleneck_dimensionality=2).to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=0.001)

EPOCHS = 128
train_loss_history = []
test_loss_history = []

for epoch in range(EPOCHS):
    # === Train ===
    net.train()
    epoch_loss_sum = 0.0
    epoch_sample_count = 0

    for batch_inputs, _ in trainloader:
        batch_inputs = batch_inputs.to(device)

        optimizer.zero_grad()
        reconstructed = net(batch_inputs)

        loss = F.mse_loss(reconstructed, batch_inputs, reduction='mean')
        loss.backward()
        optimizer.step()

        batch_size = batch_inputs.size(0)
        epoch_loss_sum += loss.item() * batch_size
        epoch_sample_count += batch_size

    avg_train_loss = epoch_loss_sum / epoch_sample_count
    train_loss_history.append(avg_train_loss)

    # === Evaluate on test set ===
    net.eval()
    test_loss_sum = 0.0
    test_sample_count = 0
    with torch.no_grad():
        for batch_inputs, _ in testloader:
            batch_inputs = batch_inputs.to(device)
            reconstructed = net(batch_inputs)
            loss = F.mse_loss(reconstructed, batch_inputs, reduction='mean')

            batch_size = batch_inputs.size(0)
            test_loss_sum += loss.item() * batch_size
            test_sample_count += batch_size

    avg_test_loss = test_loss_sum / test_sample_count
    test_loss_history.append(avg_test_loss)

    if epoch % 16 == 0:
        print(f"Epoch {epoch:03d} | Train Loss (averaged over the epoch): {avg_train_loss:.6f} | Test Loss (after the epoch): {avg_test_loss:.6f}")

# End timing
end_time = time.time()
print(f"Elapsed time: {end_time - start_time:.2f} seconds")


Working on cpu
Epoch 000 | Train Loss (averaged over the epoch): 0.126929 | Test Loss (after the epoch): 0.073962
Epoch 016 | Train Loss (averaged over the epoch): 0.048716 | Test Loss (after the epoch): 0.048246
Epoch 032 | Train Loss (averaged over the epoch): 0.043901 | Test Loss (after the epoch): 0.043801
Epoch 048 | Train Loss (averaged over the epoch): 0.041402 | Test Loss (after the epoch): 0.041567


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 5))
plt.plot(train_loss_history, label="Train loss", color='blue')
plt.plot(test_loss_history, label="Test loss", color='orange')
plt.title("Autoencoder Loss per Epoch (Avg per Sample)")
plt.xlabel("Epoch")
plt.ylabel("Average Loss")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()



# Visual Testing
----------------------

In [None]:
import numpy as np

# Number of examples to show
N = 20

net.eval()
fig, axs = plt.subplots(2, N, figsize=(2 * N, 4))

test_iter = iter(testloader)
for i in range(N):
    img, _ = next(test_iter)
    img = img.to(device)

    with torch.no_grad():
        recon = net(img)

    axs[0, i].imshow(img[0, 0].cpu().numpy(), cmap="gray")
    axs[0, i].axis("off")
    axs[1, i].imshow(recon[0, 0].cpu().numpy(), cmap="gray")
    axs[1, i].axis("off")

axs[0, 0].set_title("Original")
axs[1, 0].set_title("Reconstructed")
plt.tight_layout()
plt.show()

# Latent Space
----------------------

It is instructive to plot the bottleneck (for the test set) in a 2D plot. As a comparison, we will plot the PCA of the test set to the side.

In [None]:
from sklearn.decomposition import PCA


# === Collect latent codes and raw data ===
net.eval()
all_latents = []
all_labels = []
raw_images = []

with torch.no_grad():
    for inputs, labels in testloader:
        inputs = inputs.to(device)
        latents = net.encoder(inputs)

        all_latents.append(latents.cpu())
        all_labels.append(labels)
        raw_images.append(inputs.cpu().view(inputs.size(0), -1))  # Flatten: (B, 784)

all_latents = torch.cat(all_latents).numpy()
all_labels = torch.cat(all_labels).numpy()
raw_images = torch.cat(raw_images).numpy()

# === PCA ===
pca = PCA(n_components=2)
raw_images_2d = pca.fit_transform(raw_images)

# === Plot side-by-side with proper colorbar ===
fig, axes = plt.subplots(1, 2, figsize=(14, 6))

# Bottleneck
sc1 = axes[0].scatter(all_latents[:, 0], all_latents[:, 1], c=all_labels, cmap="tab10", s=10, alpha=0.7)
axes[0].set_title("2D Bottleneck (Autoencoder)")
axes[0].set_xlabel("Latent dim 1")
axes[0].set_ylabel("Latent dim 2")
axes[0].grid(True)

# PCA
sc2 = axes[1].scatter(raw_images_2d[:, 0], raw_images_2d[:, 1], c=all_labels, cmap="tab10", s=10, alpha=0.7)
axes[1].set_title("PCA of Original Images (Flattened)")
axes[1].set_xlabel("PC1")
axes[1].set_ylabel("PC2")
axes[1].grid(True)

# Shared colorbar – place to the right of both subplots
cbar_ax = fig.add_axes([0.92, 0.15, 0.02, 0.7])  # [left, bottom, width, height]
fig.colorbar(sc2, cax=cbar_ax, ticks=range(10), label='Digit Label')

plt.subplots_adjust(right=0.9)  # make room for colorbar
plt.show()


# 16 Dimensional Bottleneck

To experience a little bit more expressive network, let us now train another network, with a 16D bottleneck:

In [None]:
#### TRAINING

import time
import torch
import torch.nn.functional as F

# Start timing
start_time = time.time()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Working on {device}")

net_16 = Autoencoder(bottleneck_dimensionality=16).to(device)
optimizer = torch.optim.Adam(net_16.parameters(), lr=0.001)

EPOCHS = 128
train_loss_history = []
test_loss_history = []

for epoch in range(EPOCHS):
    # === Train ===
    net_16.train()
    epoch_loss_sum = 0.0
    epoch_sample_count = 0

    for batch_inputs, _ in trainloader:
        batch_inputs = batch_inputs.to(device)

        optimizer.zero_grad()
        reconstructed = net_16(batch_inputs)

        loss = F.mse_loss(reconstructed, batch_inputs, reduction='mean')
        loss.backward()
        optimizer.step()

        batch_size = batch_inputs.size(0)
        epoch_loss_sum += loss.item() * batch_size
        epoch_sample_count += batch_size

    avg_train_loss = epoch_loss_sum / epoch_sample_count
    train_loss_history.append(avg_train_loss)

    # === Evaluate on test set ===
    net_16.eval()
    test_loss_sum = 0.0
    test_sample_count = 0
    with torch.no_grad():
        for batch_inputs, _ in testloader:
            batch_inputs = batch_inputs.to(device)
            reconstructed = net_16(batch_inputs)
            loss = F.mse_loss(reconstructed, batch_inputs, reduction='mean')

            batch_size = batch_inputs.size(0)
            test_loss_sum += loss.item() * batch_size
            test_sample_count += batch_size

    avg_test_loss = test_loss_sum / test_sample_count
    test_loss_history.append(avg_test_loss)

    if epoch % 16 == 0:
        print(f"Epoch {epoch:03d} | Train Loss (averaged over the epoch): {avg_train_loss:.6f} | Test Loss (after the epoch): {avg_test_loss:.6f}")

# End timing
end_time = time.time()
print(f"Elapsed time: {end_time - start_time:.2f} seconds")

#### VISUALISING LEARNING PROGRESS

plt.figure(figsize=(10, 5))
plt.plot(train_loss_history, label="Train loss", color='blue')
plt.plot(test_loss_history, label="Test loss", color='orange')
plt.title("Autoencoder Loss per Epoch (Avg per Sample)")
plt.xlabel("Epoch")
plt.ylabel("Average Loss")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

#### SOME EXAMPLES

N = 20 # Number of examples to show

net_16.eval()
fig, axs = plt.subplots(2, N, figsize=(2 * N, 4))

test_iter = iter(testloader)
for i in range(N):
    img, _ = next(test_iter)
    img = img.to(device)

    with torch.no_grad():
        recon = net_16(img)

    axs[0, i].imshow(img[0, 0].cpu().numpy(), cmap="gray")
    axs[0, i].axis("off")
    axs[1, i].imshow(recon[0, 0].cpu().numpy(), cmap="gray")
    axs[1, i].axis("off")

axs[0, 0].set_title("Original")
axs[1, 0].set_title("Reconstructed")
plt.tight_layout()
plt.show()

#### AND FINALY THE 16D BOTTLENECK PLOTTED (ONLY ITS 2 PRINCIPAL COMPONENTS GET PLOTTED)

net.eval()
net_16.eval()

def get_latents(net, dataloader):
    all_latents, all_labels = [], []
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            latents = net.encoder(inputs)
            all_latents.append(latents.cpu())
            all_labels.append(labels)
    return torch.cat(all_latents).numpy(), torch.cat(all_labels).numpy()

# Get latent spaces
latents_2d, labels = get_latents(net, testloader)
latents_16d, _ = get_latents(net_16, testloader)

# PCA: 16D to 2D
pca = PCA(n_components=2)
latents_16d_pca = pca.fit_transform(latents_16d)

# Plot side-by-side
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

scatter1 = axes[0].scatter(latents_2d[:, 0], latents_2d[:, 1], c=labels, cmap='tab10', s=10, alpha=0.7)
axes[0].set_title("True 2D Bottleneck (net)")
axes[0].set_xlabel("Dim 1")
axes[0].set_ylabel("Dim 2")
axes[0].grid(True)

scatter2 = axes[1].scatter(latents_16d_pca[:, 0], latents_16d_pca[:, 1], c=labels, cmap='tab10', s=10, alpha=0.7)
axes[1].set_title("PCA of 16D Bottleneck (net_16)")
axes[1].set_xlabel("PC 1")
axes[1].set_ylabel("PC 2")
axes[1].grid(True)

# Shared colorbar
fig.colorbar(scatter1, ax=axes.ravel().tolist(), ticks=range(10), label='Digit Label')
plt.tight_layout()
plt.show()