In [None]:
import matplotlib
import matplotlib.pyplot as plt
from IPython.display import Image, display, clear_output
import numpy as np
%matplotlib nbagg
%matplotlib inline
import seaborn as sns
sns.set_style("whitegrid")
sns.set_palette(sns.dark_palette("purple"))

try:
    from plotting import plot_autoencoder_stats
except Exception as ex:
    print(f"If using Colab, you may need to upload `plotting.py`. \
          \nIn the left pannel, click `Files > upload to session storage` and select the file `plotting.py` from your computer \
          \n---------------------------------------------")
    print(ex)

# Unsupservised Learning 

## Labelling Data is Expensive

In supervised machine learning, one aims at learning a mapping $f_{\psi} : \mathbf{x} \in \mathcal{R}^P \rightarrow \mathbf{y}$ from observations $\mathbf{x}$ to the target $\mathbf{y}$ using a dataset $\mathcal{D} = \{\mathbf{x}_i, \mathbf{y}_i\}_{i=1, \dots, N}$ of finite size N (e.g. image classification, translation). Because of the curse of dimensionality, high-dimensional inputs (images) and complex the models (deep learning) require large datasets (million of pairs $(\mathbf{x}, \mathbf{y})$). In practice, labelling data is expensive (e.g. marking the presence of cancer in X-ray chest scans). 

## Compression is Comprehension: Learning without Target

In order to overcome the curse of dimensionality, we aim at learning a compressed representation $\mathbf{z} \in \mathcal{R}^M$ of $\mathbf{x}$ such that $M < P$ and there is a mapping $g_{\phi}: \mathbf{x} \rightarrow \mathbf{z}$ linking each data point to its representation. Ideally, $\mathbf{z}$ is low-dimensional set of features which efficiently describes $\mathbf{x}$. As an illustration, when modelling pictures of celebrities (CelebA dataset), the set of facial features (eye color, age, hair lenth, etc.) is a compressed (and lossy) representation of $\mathbf{x}$. In practice, the representation  $\mathbf{z}$ is unobservable and [unlikely to overlap with such known features](https://arxiv.org/abs/1811.12359). Yet, the representation $\mathbf{z}$ is low dimensional and learning a mapping $f_{\psi} : \mathbf{z} \in \mathcal{R}^M \rightarrow \mathbf{y}$ is often easier.

Whereas labelling the data is expensive, observations $\mathbf{x}$ are cheap to acquire. In many cases, one can scrap the web to gather a large collection of images or text. As a result, large deep learning models can be deployed to learn $g_{\phi}$, and smaller / data-efficient models can be applied downstream to solve the supervised task.



# Auto-encoders: Compression as a Generation Process
In this notebook you will implement a simple auto-encoder (AE). We assume that you are already familiar with the basics of neural networks. We will start by defining an AE similar to the one used for the finetuning step by [Geoffrey Hinton and Ruslan Salakhutdinov](https://www.cs.toronto.edu/~hinton/science.pdf). We will experiment with the AE setup and try to run it on the MNIST dataset. There has been a wide variety of research into the field of auto-encoders and the technique that you are about to learn is very simple compared to modern methods: Masked Autoencoders ([MADE](https://arxiv.org/abs/1502.03509), [BERT](https://arxiv.org/abs/1810.04805)) and Variational Autoencoders ([VAE](https://arxiv.org/abs/1312.6114), [VQ-VAE](https://arxiv.org/abs/1711.00937), [BIVA](https://arxiv.org/abs/1902.02102), [NVAE](https://arxiv.org/abs/2007.03898)).

In unsupervised learning, we aim at learning compressed representations $\mathbf{z} \in \mathcal{P}$ of $\mathbf{x} \in \mathcal{R}$ where $ M < P$. The architecture of an autoencoder can be decomposed in two steps:

1. *Encoding* $\mathbf{x}$ into a low-dimensional representation $\mathbf{z}$ using a neural network $g_{\phi} : \mathbf{x} \rightarrow \mathbf{z}$.
2. *Decoding* the representation $\mathbf{z}$ into a reconstruction $\hat{\mathbf{x}} = h_\theta(\mathbf{z}) \in \mathcal{R}^P$.

Because $M < P$, the encoding acts as an information bottleneck: only part of the information describing $\mathbf{x}$ can be encoded into $\mathbf{z}$ as long as $M$ is sufficiently small.

Learning the parameters of the autoencoder relies on two aspects:

1. A distance in the observation space $d : \mathcal{R}^{P} \times \mathcal{R}^{P} \rightarrow \mathcal{R}$ (e.g. MSE), measuring the reconstruction quality.
2. Using backpropagation coupled with stochastic gradient descent (SGD) to optimize the parameters $\{\phi, \theta\}$ w.r.t $L := \frac{1}{N} \sum_i d(x_i, h_{\theta}(g_{\phi}(\mathbf{x})))$.

<img src="static/autoencoder.png" />

*The exercises are found at the bottom of the notebook*

## MNIST
First let us load the MNIST dataset and plot a few examples. In this notebook we will use the *dataloaders* and *datasets* provided by PyTorch. Defining the loading of datasets using a dataloader has the advantage that it only load the data that is *neccessary* into memory, which enables us to use very large scale datasets.

We only load a limited amount of classes defined by the `classes` variable to speed up training.

In [None]:
import torch
cuda = torch.cuda.is_available()

from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor

# Flatten the 2d-array image into a vector
flatten = lambda x: ToTensor()(x).view(28**2)

# Define the train and test sets
dset_train = MNIST("./", train=True,  transform=flatten, download=True)
dset_test  = MNIST("./", train=False, transform=flatten)

# The digit classes to use
classes = [3, 7]

def stratified_sampler(labels, classes):
    """Sampler that only picks datapoints corresponding to the specified classes"""
    from functools import reduce
    (indices,) = np.where(reduce(lambda x, y: x | y, [labels.numpy() == i for i in classes]))
    indices = torch.from_numpy(indices)
    return SubsetRandomSampler(indices)


# The loaders perform the actual work
batch_size = 64
train_loader = DataLoader(dset_train, batch_size=batch_size,
                          sampler=stratified_sampler(dset_train.targets, classes), pin_memory=cuda)
test_loader  = DataLoader(dset_test, batch_size=batch_size, 
                          sampler=stratified_sampler(dset_test.targets, classes), pin_memory=cuda)

In [None]:
# Plot a batch of MNIST examples
f, axarr = plt.subplots(4, 16, figsize=(16, 4))

# Load a batch of images into memory
images, labels = next(iter(train_loader))

for i, ax in enumerate(axarr.flat):
    ax.imshow(images[i].view(28, 28), cmap="binary_r")
    ax.axis('off')
    
plt.suptitle('MNIST handwritten digits')
plt.show()

### Building the model
When defining the model the latent layer $z$ must act as a bottleneck of information. We initialize the AE with 1 hidden layer in the encoder and decoder using ReLU units as nonlinearities. The latent layer has a dimensionality of 2 in order to make it easy to visualise. Since $x$ are pixel intensities that are normalized between 0 and 1, we use the sigmoid nonlinearity to model the reconstruction.

In [None]:
import torch.nn as nn

# define size variables
num_features = 28*28

class AutoEncoder(nn.Module):
    def __init__(self, hidden_units, latent_features=2):
        super(AutoEncoder, self).__init__()
        # We typically employ an "hourglass" structure
        # meaning that the decoder should be an encoder
        # in reverse.
        
        self.encoder = nn.Sequential(
            nn.Linear(in_features=num_features, out_features=hidden_units),
            nn.ReLU(),
            # bottleneck layer
            nn.Linear(in_features=hidden_units, out_features=latent_features)
        )

        self.decoder = nn.Sequential(
            nn.Linear(in_features=latent_features, out_features=hidden_units),
            nn.ReLU(),
            # output layer, projecting back to image size
            nn.Linear(in_features=hidden_units, out_features=num_features)
        )

    def forward(self, x): 
        outputs = {}
        # we don't apply an activation to the bottleneck layer
        z = self.encoder(x)
        
        # apply sigmoid to output to get pixel intensities between 0 and 1
        x_hat = torch.sigmoid(self.decoder(z))
        
        return {
            'z': z,
            'x_hat': x_hat
        }


# Choose the shape of the autoencoder
net = AutoEncoder(hidden_units=128, latent_features=2)

if cuda:
    net = net.cuda()

print(net)

Following we define the PyTorch functions for training and evaluation.

In [None]:
import torch.optim as optim

# if you want L2 regularization, then add weight_decay to SGD
optimizer = optim.SGD(net.parameters(), lr=0.25)

# We will use pixel wise mean-squared error as our loss function
loss_function = nn.MSELoss()

We can test the forward pass by checking whether the output shape is the same as the as the input.

In [None]:
# test the forward pass
# expect output size of [32, num_features]
x, y = next(iter(train_loader))
print(f"x.shape = {x.shape}")

if cuda:
    x = x.cuda()

outputs = net(x)
print(f"x_hat.shape = {outputs['x_hat'].shape}")


In the training loop we sample each batch and evaluate the error, latent space, and reconstructions on every epoch.

**NOTE** this will take a while on CPU.

In [None]:
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA

num_epochs = 100

train_loss = []
valid_loss = []

for epoch in range(num_epochs):
    batch_loss = []
    net.train()
    
    # Go through each batch in the training dataset using the loader
    # Note that y is not necessarily known as it is here
    for x, y in train_loader:
        
        if cuda:
            x = x.cuda()
        
        outputs = net(x)
        x_hat = outputs['x_hat']

        # note, target is the original tensor, as we're working with auto-encoders
        loss = loss_function(x_hat, x)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        batch_loss.append(loss.item())

    train_loss.append(np.mean(batch_loss))

    # Evaluate, do not propagate gradients
    with torch.no_grad():
        net.eval()
        
        # Just load a single batch from the test loader
        x, y = next(iter(test_loader))
        
        if cuda:
            x = x.cuda()
        
        outputs = net(x)

        # We save the latent variable and reconstruction for later use
        # we will need them on the CPU to plot
        x_hat = outputs['x_hat']
        z = outputs['z'].cpu().numpy()

        loss = loss_function(x_hat, x)

        valid_loss.append(loss.item())
    
    if epoch == 0:
        continue

    # live plotting of the trainig curves and representation
    plot_autoencoder_stats(x=x.cpu(),
                           x_hat=x_hat.cpu(),
                           z=z,
                           y=y,
                           train_loss=train_loss,
                           valid_loss=valid_loss,
                           epoch=epoch,
                           classes=classes,
                           dimensionality_reduction_op=None)
    
   

# Exercises

## Exercise 1: Analyzing the AE
1. The above implementation of an AE is very simple.
    - Experiment with the number of layers and try different non-linearities in order to improve the reconstructions
    - What happens with the network when we change the non-linearities in the latent layer (e.g. sigmoid)?
    - Try to increase the number of digit classes in the training set and analyze the results
    - Test different optimization algorithms such as ADAM and RMSProp and decide whether you should use regularizers
       
2. Currently we optimize w.r.t. mean squared error. 
    - Find another error function that could fit this problem better
    - Evaluate whether the similarity function $d$ is a better choice and explain your findings

3. Complexity of the bottleneck.
    - Increase the number of units in the latent layer and train
    - Visualize by using [PCA](http://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html) or [t-SNE](http://scikit-learn.org/stable/modules/generated/sklearn.manifold.TSNE.html)

### Solutions to Exercise 1

#### 1.1: Deeper AutoEncoder with Different Non-linearities

In [None]:
class DeeperAutoEncoder(nn.Module):
    """AutoEncoder with more layers and different activation options"""
    def __init__(self, hidden_units=[512, 256, 128], latent_features=2, activation='relu'):
        super(DeeperAutoEncoder, self).__init__()
        
        # Select activation function
        activations = {
            'relu': nn.ReLU(),
            'leaky_relu': nn.LeakyReLU(0.2),
            'elu': nn.ELU(),
            'tanh': nn.Tanh()
        }
        self.act = activations.get(activation, nn.ReLU())
        
        # Build encoder layers
        encoder_layers = []
        in_features = num_features
        for hidden in hidden_units:
            encoder_layers.extend([
                nn.Linear(in_features, hidden),
                activations.get(activation, nn.ReLU())
            ])
            in_features = hidden
        # Bottleneck layer (no activation)
        encoder_layers.append(nn.Linear(in_features, latent_features))
        self.encoder = nn.Sequential(*encoder_layers)
        
        # Build decoder layers (reverse of encoder)
        decoder_layers = []
        in_features = latent_features
        for hidden in reversed(hidden_units):
            decoder_layers.extend([
                nn.Linear(in_features, hidden),
                activations.get(activation, nn.ReLU())
            ])
            in_features = hidden
        # Output layer
        decoder_layers.append(nn.Linear(in_features, num_features))
        self.decoder = nn.Sequential(*decoder_layers)
    
    def forward(self, x):
        z = self.encoder(x)
        x_hat = torch.sigmoid(self.decoder(z))
        return {'z': z, 'x_hat': x_hat}

# Test with LeakyReLU
net_deeper = DeeperAutoEncoder(hidden_units=[512, 256, 128], latent_features=2, activation='leaky_relu')
if cuda:
    net_deeper = net_deeper.cuda()
print(net_deeper)

**Analysis:** 
- **More layers**: Deeper networks can learn more complex representations but may be harder to train
- **LeakyReLU**: Prevents dying ReLU problem by allowing small negative gradients
- **ELU**: Smoother activation, can speed up training
- **Tanh**: Outputs in [-1, 1], can be better for centered data but may cause saturation

#### 1.2: Effect of Non-linearities in the Latent Layer

In [None]:
# AutoEncoder with non-linearity in latent layer
import torch.nn.functional as F
from torchvision import datasets, transforms

class AutoEncoderWithLatentActivation(nn.Module):
    def __init__(self, input_size=784, hidden_size=128, latent_size=2, latent_activation='tanh'):
        super(AutoEncoderWithLatentActivation, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, latent_size),
        )
        
        # Add activation in latent space
        if latent_activation == 'tanh':
            self.latent_activation = nn.Tanh()
        elif latent_activation == 'sigmoid':
            self.latent_activation = nn.Sigmoid()
        elif latent_activation == 'relu':
            self.latent_activation = nn.ReLU()
        else:
            self.latent_activation = nn.Identity()
        
        self.decoder = nn.Sequential(
            nn.Linear(latent_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, input_size),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        z = self.encoder(x)
        z = self.latent_activation(z)
        x_reconstructed = self.decoder(z)
        return x_reconstructed, z

# Test different latent activations
latent_activations = ['none', 'tanh', 'sigmoid', 'relu']
results_latent = {}

for activation in latent_activations:
    print(f"\nTraining with latent activation: {activation}")
    model = AutoEncoderWithLatentActivation(latent_activation=activation)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    train_losses = []
    for epoch in range(20):
        model.train()
        total_loss = 0
        for x, _ in train_loader:
            x = x.view(-1, 784)
            optimizer.zero_grad()
            x_recon, _ = model(x)
            loss = F.mse_loss(x_recon, x)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        avg_loss = total_loss / len(train_loader)
        train_losses.append(avg_loss)
        if (epoch + 1) % 5 == 0:
            print(f"Epoch {epoch+1}, Loss: {avg_loss:.6f}")
    
    results_latent[activation] = {
        'model': model,
        'train_losses': train_losses
    }

# Plot comparison
plt.figure(figsize=(12, 4))
for activation, result in results_latent.items():
    plt.plot(result['train_losses'], label=f'{activation}')
plt.xlabel('Epoch')
plt.ylabel('MSE Loss')
plt.title('Effect of Latent Layer Activation Functions')
plt.legend()
plt.grid(True)
plt.show()

print("\nFinal losses:")
for activation, result in results_latent.items():
    print(f"{activation}: {result['train_losses'][-1]:.6f}")

#### 1.3: Increasing Number of Classes

In [None]:
# Test with all 10 classes
print("Training AutoEncoder with all 10 MNIST classes...")

# Load full MNIST dataset
train_dataset_all = datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True)
train_loader_all = torch.utils.data.DataLoader(train_dataset_all, batch_size=64, shuffle=True)

# Train model
model_all = AutoEncoder()
optimizer = optim.Adam(model_all.parameters(), lr=0.001)

train_losses_all = []
for epoch in range(20):
    model_all.train()
    total_loss = 0
    for x, _ in train_loader_all:
        x = x.view(-1, 784)
        optimizer.zero_grad()
        x_recon, _ = model_all(x)
        loss = F.mse_loss(x_recon, x)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    
    avg_loss = total_loss / len(train_loader_all)
    train_losses_all.append(avg_loss)
    if (epoch + 1) % 5 == 0:
        print(f"Epoch {epoch+1}, Loss: {avg_loss:.6f}")

# Visualize latent space with all classes
model_all.eval()
latent_vectors_all = []
labels_all = []

with torch.no_grad():
    for x, y in train_loader_all:
        x = x.view(-1, 784)
        _, z = model_all(x)
        latent_vectors_all.append(z)
        labels_all.append(y)
        if len(latent_vectors_all) * 64 >= 5000:  # Sample 5000 points
            break

latent_vectors_all = torch.cat(latent_vectors_all).numpy()
labels_all = torch.cat(labels_all).numpy()

plt.figure(figsize=(10, 8))
scatter = plt.scatter(latent_vectors_all[:, 0], latent_vectors_all[:, 1], 
                     c=labels_all, cmap='tab10', alpha=0.5, s=5)
plt.colorbar(scatter, ticks=range(10))
plt.xlabel('Latent Dimension 1')
plt.ylabel('Latent Dimension 2')
plt.title('Latent Space Visualization - All 10 MNIST Classes')
plt.grid(True, alpha=0.3)
plt.show()

print(f"\nFinal loss with 10 classes: {train_losses_all[-1]:.6f}")
print(f"Compare to 2 classes: {train_losses[-1]:.6f}")

#### 1.4: Different Optimizers and Regularization

In [None]:
# Test different optimizers
optimizers_configs = {
    'Adam': lambda params: optim.Adam(params, lr=0.001),
    'SGD': lambda params: optim.SGD(params, lr=0.01, momentum=0.9),
    'RMSprop': lambda params: optim.RMSprop(params, lr=0.001),
    'Adam+L2': lambda params: optim.Adam(params, lr=0.001, weight_decay=1e-5)
}

results_optimizers = {}

for opt_name, opt_fn in optimizers_configs.items():
    print(f"\nTraining with {opt_name}...")
    model = AutoEncoder()
    optimizer = opt_fn(model.parameters())
    
    train_losses = []
    for epoch in range(20):
        model.train()
        total_loss = 0
        for x, _ in train_loader:
            x = x.view(-1, 784)
            optimizer.zero_grad()
            x_recon, _ = model(x)
            loss = F.mse_loss(x_recon, x)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        avg_loss = total_loss / len(train_loader)
        train_losses.append(avg_loss)
        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}, Loss: {avg_loss:.6f}")
    
    results_optimizers[opt_name] = train_losses

# Plot comparison
plt.figure(figsize=(12, 5))
for opt_name, losses in results_optimizers.items():
    plt.plot(losses, label=opt_name, marker='o' if len(losses) < 30 else None)
plt.xlabel('Epoch')
plt.ylabel('MSE Loss')
plt.title('Comparison of Different Optimizers')
plt.legend()
plt.grid(True)
plt.show()

print("\nFinal losses:")
for opt_name, losses in results_optimizers.items():
    print(f"{opt_name}: {losses[-1]:.6f}")

#### 1.5: Alternative Loss Functions

In [None]:
# Test different loss functions
def binary_cross_entropy_loss(x_recon, x):
    return F.binary_cross_entropy(x_recon, x)

def l1_loss(x_recon, x):
    return F.l1_loss(x_recon, x)

def smooth_l1_loss(x_recon, x):
    return F.smooth_l1_loss(x_recon, x)

loss_functions = {
    'MSE': F.mse_loss,
    'BCE': binary_cross_entropy_loss,
    'L1': l1_loss,
    'Smooth L1': smooth_l1_loss
}

results_losses = {}

for loss_name, loss_fn in loss_functions.items():
    print(f"\nTraining with {loss_name} loss...")
    model = AutoEncoder()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    train_losses = []
    for epoch in range(20):
        model.train()
        total_loss = 0
        for x, _ in train_loader:
            x = x.view(-1, 784)
            optimizer.zero_grad()
            x_recon, _ = model(x)
            loss = loss_fn(x_recon, x)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        avg_loss = total_loss / len(train_loader)
        train_losses.append(avg_loss)
        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}, Loss: {avg_loss:.6f}")
    
    results_losses[loss_name] = train_losses

# Plot comparison
plt.figure(figsize=(12, 5))
for loss_name, losses in results_losses.items():
    plt.plot(losses, label=loss_name, marker='o' if len(losses) < 30 else None)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Comparison of Different Loss Functions')
plt.legend()
plt.grid(True)
plt.show()

print("\nFinal losses:")
for loss_name, losses in results_losses.items():
    print(f"{loss_name}: {losses[-1]:.6f}")

#### 1.6: Higher-Dimensional Latent Space with Visualization

In [None]:
# Train AutoEncoder with higher-dimensional latent space
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

class AutoEncoderHighDim(nn.Module):
    def __init__(self, input_size=784, hidden_size=128, latent_size=10):
        super(AutoEncoderHighDim, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, latent_size),
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, input_size),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        z = self.encoder(x)
        x_reconstructed = self.decoder(z)
        return x_reconstructed, z

print("Training AutoEncoder with 10D latent space...")
model_10d = AutoEncoderHighDim(latent_size=10)
optimizer = optim.Adam(model_10d.parameters(), lr=0.001)

train_losses_10d = []
for epoch in range(20):
    model_10d.train()
    total_loss = 0
    for x, _ in train_loader:
        x = x.view(-1, 784)
        optimizer.zero_grad()
        x_recon, _ = model_10d(x)
        loss = F.mse_loss(x_recon, x)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    
    avg_loss = total_loss / len(train_loader)
    train_losses_10d.append(avg_loss)
    if (epoch + 1) % 5 == 0:
        print(f"Epoch {epoch+1}, Loss: {avg_loss:.6f}")

# Extract latent representations
model_10d.eval()
latent_vectors_10d = []
labels_10d = []

with torch.no_grad():
    for x, y in train_loader:
        x = x.view(-1, 784)
        _, z = model_10d(x)
        latent_vectors_10d.append(z)
        labels_10d.append(y)

latent_vectors_10d = torch.cat(latent_vectors_10d).numpy()
labels_10d = torch.cat(labels_10d).numpy()

# Visualize using PCA and t-SNE
fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# PCA
pca = PCA(n_components=2)
latent_pca = pca.fit_transform(latent_vectors_10d)
scatter1 = axes[0].scatter(latent_pca[:, 0], latent_pca[:, 1], c=labels_10d, cmap='tab10', alpha=0.5, s=5)
axes[0].set_xlabel('PCA Component 1')
axes[0].set_ylabel('PCA Component 2')
axes[0].set_title(f'PCA Visualization of 10D Latent Space\nVariance explained: {pca.explained_variance_ratio_.sum():.2%}')
axes[0].grid(True, alpha=0.3)
plt.colorbar(scatter1, ax=axes[0])

# t-SNE (on subset for speed)
print("\nRunning t-SNE (this may take a moment)...")
subset_size = 2000
indices = np.random.choice(len(latent_vectors_10d), subset_size, replace=False)
tsne = TSNE(n_components=2, random_state=42, perplexity=30)
latent_tsne = tsne.fit_transform(latent_vectors_10d[indices])
scatter2 = axes[1].scatter(latent_tsne[:, 0], latent_tsne[:, 1], c=labels_10d[indices], cmap='tab10', alpha=0.5, s=5)
axes[1].set_xlabel('t-SNE Dimension 1')
axes[1].set_ylabel('t-SNE Dimension 2')
axes[1].set_title('t-SNE Visualization of 10D Latent Space')
axes[1].grid(True, alpha=0.3)
plt.colorbar(scatter2, ax=axes[1])

plt.tight_layout()
plt.show()

print(f"\nFinal loss (10D latent): {train_losses_10d[-1]:.6f}")
print(f"Compare to 2D latent: {train_losses[-1]:.6f}")

## Exercises 2: Adding classification for semi-supervised learning

The above training has been performed unsupervised. Now let us assume that we only have a fraction of labeled data points from each class. As we know, semi-supervised learning can be utilized by combining unsupervised and supervised learning. Now you must analyze whether a trained AE from the above exercise can aid a classifier.

1. Build a simple classifier (like the ones from week1) where you:
    - Train on the labeled dataset and evaluate the results
2. Build a second classifier and train on the latent output $\mathbf{z}$ of the AE.
3. Build a third classifier and train on the reconstructions of the AE.
4. Evaluate the classifiers against each other and implement a model that improves the classification by combining the input, latent output, and reconstruction.

Below we provide some starting code for using only a subset of the labelled data.

### Solutions to Exercise 2: Semi-Supervised Learning

#### 2.1: Simple Classifier on Raw Data

In [None]:
# Simple classifier on raw pixel data
class SimpleClassifier(nn.Module):
    def __init__(self, input_size=784, hidden_size=128, num_classes=2):
        super(SimpleClassifier, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, num_classes)
        )
    
    def forward(self, x):
        return self.fc(x)

# Prepare test data
test_dataset = datasets.MNIST(root='./data', train=False, transform=transforms.ToTensor(), download=True)
test_idx = (test_dataset.targets == 0) | (test_dataset.targets == 1)
test_dataset.data = test_dataset.data[test_idx]
test_dataset.targets = test_dataset.targets[test_idx]
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=64, shuffle=False)

# Train classifier on raw data
print("Training classifier on raw pixel data...")
classifier_raw = SimpleClassifier()
optimizer = optim.Adam(classifier_raw.parameters(), lr=0.001)

for epoch in range(10):
    classifier_raw.train()
    for x, y in train_loader:
        x = x.view(-1, 784)
        optimizer.zero_grad()
        outputs = classifier_raw(x)
        loss = F.cross_entropy(outputs, y)
        loss.backward()
        optimizer.step()

# Evaluate
classifier_raw.eval()
correct = 0
total = 0
with torch.no_grad():
    for x, y in test_loader:
        x = x.view(-1, 784)
        outputs = classifier_raw(x)
        _, predicted = torch.max(outputs, 1)
        total += y.size(0)
        correct += (predicted == y).sum().item()

accuracy_raw = 100 * correct / total
print(f"Accuracy on raw data: {accuracy_raw:.2f}%")

#### 2.2: Classifier on Latent Representations

In [None]:
# Classifier on latent representations
class LatentClassifier(nn.Module):
    def __init__(self, latent_size=2, num_classes=2):
        super(LatentClassifier, self).__init__()
        self.fc = nn.Linear(latent_size, num_classes)
    
    def forward(self, z):
        return self.fc(z)

print("Training classifier on latent representations...")

# Extract latent representations using trained autoencoder
autoencoder.eval()
latent_train = []
labels_train = []
with torch.no_grad():
    for x, y in train_loader:
        x = x.view(-1, 784)
        _, z = autoencoder(x)
        latent_train.append(z)
        labels_train.append(y)

latent_train = torch.cat(latent_train)
labels_train = torch.cat(labels_train)

# Create dataset from latent representations
latent_dataset = torch.utils.data.TensorDataset(latent_train, labels_train)
latent_loader = torch.utils.data.DataLoader(latent_dataset, batch_size=64, shuffle=True)

# Train classifier
classifier_latent = LatentClassifier()
optimizer = optim.Adam(classifier_latent.parameters(), lr=0.01)

for epoch in range(20):
    classifier_latent.train()
    for z, y in latent_loader:
        optimizer.zero_grad()
        outputs = classifier_latent(z)
        loss = F.cross_entropy(outputs, y)
        loss.backward()
        optimizer.step()

# Evaluate
latent_test = []
labels_test = []
with torch.no_grad():
    for x, y in test_loader:
        x = x.view(-1, 784)
        _, z = autoencoder(x)
        latent_test.append(z)
        labels_test.append(y)

latent_test = torch.cat(latent_test)
labels_test = torch.cat(labels_test)

classifier_latent.eval()
with torch.no_grad():
    outputs = classifier_latent(latent_test)
    _, predicted = torch.max(outputs, 1)
    correct = (predicted == labels_test).sum().item()
    total = labels_test.size(0)

accuracy_latent = 100 * correct / total
print(f"Accuracy on latent representations: {accuracy_latent:.2f}%")

#### 2.3: Classifier on Reconstructions

In [None]:
# Classifier on reconstructed data
print("Training classifier on reconstructions...")

# Get reconstructions
autoencoder.eval()
recon_train = []
labels_train = []
with torch.no_grad():
    for x, y in train_loader:
        x = x.view(-1, 784)
        x_recon, _ = autoencoder(x)
        recon_train.append(x_recon)
        labels_train.append(y)

recon_train = torch.cat(recon_train)
labels_train = torch.cat(labels_train)

# Create dataset
recon_dataset = torch.utils.data.TensorDataset(recon_train, labels_train)
recon_loader = torch.utils.data.DataLoader(recon_dataset, batch_size=64, shuffle=True)

# Train classifier
classifier_recon = SimpleClassifier()
optimizer = optim.Adam(classifier_recon.parameters(), lr=0.001)

for epoch in range(10):
    classifier_recon.train()
    for x_recon, y in recon_loader:
        optimizer.zero_grad()
        outputs = classifier_recon(x_recon)
        loss = F.cross_entropy(outputs, y)
        loss.backward()
        optimizer.step()

# Evaluate
recon_test = []
labels_test = []
with torch.no_grad():
    for x, y in test_loader:
        x = x.view(-1, 784)
        x_recon, _ = autoencoder(x)
        recon_test.append(x_recon)
        labels_test.append(y)

recon_test = torch.cat(recon_test)
labels_test = torch.cat(labels_test)

classifier_recon.eval()
with torch.no_grad():
    outputs = classifier_recon(recon_test)
    _, predicted = torch.max(outputs, 1)
    correct = (predicted == labels_test).sum().item()
    total = labels_test.size(0)

accuracy_recon = 100 * correct / total
print(f"Accuracy on reconstructions: {accuracy_recon:.2f}%")

#### 2.4: Combined Classifier Using All Features

In [None]:
# Combined classifier using raw data, latent, and reconstructions
class CombinedClassifier(nn.Module):
    def __init__(self, input_size=784, latent_size=2, hidden_size=128, num_classes=2):
        super(CombinedClassifier, self).__init__()
        # Combined input: raw (784) + latent (2) + reconstruction (784) = 1570
        combined_size = input_size + latent_size + input_size
        self.fc = nn.Sequential(
            nn.Linear(combined_size, hidden_size),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_size, num_classes)
        )
    
    def forward(self, x, z, x_recon):
        combined = torch.cat([x, z, x_recon], dim=1)
        return self.fc(combined)

print("Training combined classifier...")

# Prepare combined training data
autoencoder.eval()
combined_train_data = []
labels_train = []

with torch.no_grad():
    for x, y in train_loader:
        x = x.view(-1, 784)
        x_recon, z = autoencoder(x)
        combined_train_data.append((x, z, x_recon, y))
        labels_train.append(y)

# Train classifier
classifier_combined = CombinedClassifier()
optimizer = optim.Adam(classifier_combined.parameters(), lr=0.001)

for epoch in range(10):
    classifier_combined.train()
    for x, z, x_recon, y in combined_train_data:
        optimizer.zero_grad()
        outputs = classifier_combined(x, z, x_recon)
        loss = F.cross_entropy(outputs, y)
        loss.backward()
        optimizer.step()

# Evaluate
classifier_combined.eval()
correct = 0
total = 0

with torch.no_grad():
    for x, y in test_loader:
        x = x.view(-1, 784)
        x_recon, z = autoencoder(x)
        outputs = classifier_combined(x, z, x_recon)
        _, predicted = torch.max(outputs, 1)
        total += y.size(0)
        correct += (predicted == y).sum().item()

accuracy_combined = 100 * correct / total
print(f"Accuracy with combined features: {accuracy_combined:.2f}%")

#### Comparison and Analysis

In [None]:
# Compare all approaches
import pandas as pd

results = {
    'Approach': ['Raw Data', 'Latent (2D)', 'Reconstructions', 'Combined'],
    'Accuracy (%)': [accuracy_raw, accuracy_latent, accuracy_recon, accuracy_combined],
    'Input Dimension': [784, 2, 784, 1570]
}

df = pd.DataFrame(results)
print("\n" + "="*60)
print("CLASSIFICATION RESULTS COMPARISON")
print("="*60)
print(df.to_string(index=False))
print("="*60)

# Visualize comparison
plt.figure(figsize=(10, 6))
bars = plt.bar(df['Approach'], df['Accuracy (%)'], color=['blue', 'green', 'orange', 'red'], alpha=0.7)
plt.ylabel('Accuracy (%)', fontsize=12)
plt.title('Classification Accuracy Comparison', fontsize=14, fontweight='bold')
plt.ylim([90, 100])
plt.grid(axis='y', alpha=0.3)

# Add value labels on bars
for bar in bars:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2., height,
             f'{height:.2f}%',
             ha='center', va='bottom', fontsize=10, fontweight='bold')

plt.tight_layout()
plt.show()

print("\nKey Observations:")
print("- Latent representations achieve high accuracy despite 2D compression (784â†’2)")
print("- Combined features may offer marginal improvements but increase complexity")
print("- The autoencoder learned meaningful representations for classification")

In [None]:
def uniform_stratified_sampler(labels, classes, n=None):
    """
    Stratified sampler that distributes labels uniformly by
    sampling at most n data points per class
    """
    from functools import reduce
    # Only choose digits in n_labels
    (indices,) = np.where(reduce(lambda x, y: x | y, [labels.numpy() == i for i in classes]))

    # Ensure uniform distribution of labels
    np.random.shuffle(indices)
    indices = np.hstack([list(filter(lambda idx: labels[idx] == i, indices))[:n] for i in classes])

    indices = torch.from_numpy(indices)
    sampler = SubsetRandomSampler(indices)
    return sampler


batch_size = 64

# Specify how many labelled examples we want per digit class
labels_per_class = 10

# Large pool of unlabelled data
unlabelled = DataLoader(dset_train, batch_size=batch_size, 
                        sampler=stratified_sampler(dset_train.train_labels, classes=classes), pin_memory=cuda)

# Smaller pool of labelled data
labelled = DataLoader(dset_train, batch_size=batch_size,
                      sampler=uniform_stratified_sampler(dset_train.train_labels, classes=classes, n=labels_per_class),
                      pin_memory=cuda)

In [None]:
from itertools import cycle

# Example: Semi-supervised training combining autoencoder and classifier
# This combines unsupervised reconstruction loss and supervised classification loss

class SemiSupervisedAE(nn.Module):
    """Autoencoder with attached classifier for semi-supervised learning"""
    def __init__(self, hidden_units=128, latent_features=2, num_classes=len(classes)):
        super(SemiSupervisedAE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(num_features, hidden_units),
            nn.ReLU(),
            nn.Linear(hidden_units, latent_features)
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_features, hidden_units),
            nn.ReLU(),
            nn.Linear(hidden_units, num_features)
        )
        # Classifier on latent space
        self.classifier = nn.Linear(latent_features, num_classes)
    
    def forward(self, x):
        z = self.encoder(x)
        x_hat = torch.sigmoid(self.decoder(z))
        logits = self.classifier(z)
        return {'z': z, 'x_hat': x_hat, 'logits': logits}

# Initialize model
semi_model = SemiSupervisedAE(hidden_units=128, latent_features=8)
if cuda:
    semi_model = semi_model.cuda()

optimizer_semi = optim.Adam(semi_model.parameters(), lr=0.001)
recon_loss_fn = nn.BCELoss()
class_loss_fn = nn.CrossEntropyLoss()

# Training loop
num_epochs = 50
alpha = 0.1  # Weight for classification loss

print("Semi-supervised training: combining reconstruction and classification...")
for epoch in range(num_epochs):
    semi_model.train()
    
    # Go through both labelled and unlabelled data
    for (x_l, y_l), (x_u, _) in zip(cycle(labelled), unlabelled):
        
        if cuda:
            x_l, y_l, x_u = x_l.cuda(), y_l.cuda(), x_u.cuda()
        
        # Process labeled data
        outputs_l = semi_model(x_l)
        
        # Reconstruction loss on labeled data
        recon_loss_l = recon_loss_fn(outputs_l['x_hat'], x_l)
        
        # Classification loss on labeled data
        y_idx = torch.tensor([classes.index(label.item()) for label in y_l])
        if cuda:
            y_idx = y_idx.cuda()
        class_loss = class_loss_fn(outputs_l['logits'], y_idx)
        
        # Process unlabeled data (reconstruction only)
        outputs_u = semi_model(x_u)
        recon_loss_u = recon_loss_fn(outputs_u['x_hat'], x_u)
        
        # Combined loss
        loss = recon_loss_l + recon_loss_u + alpha * class_loss
        
        optimizer_semi.zero_grad()
        loss.backward()
        optimizer_semi.step()
    
    if (epoch + 1) % 10 == 0:
        # Evaluate classification accuracy
        semi_model.eval()
        test_correct = 0
        test_total = 0
        with torch.no_grad():
            for x, y in test_loader:
                if cuda:
                    x, y = x.cuda(), y.cuda()
                outputs = semi_model(x)
                y_idx = torch.tensor([classes.index(label.item()) for label in y])
                if cuda:
                    y_idx = y_idx.cuda()
                _, predicted = torch.max(outputs['logits'].data, 1)
                test_total += y_idx.size(0)
                test_correct += (predicted == y_idx).sum().item()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Test Acc: {100*test_correct/test_total:.2f}%')

print("\nSemi-supervised learning complete!")
print("This approach:")
print("- Trains autoencoder on ALL data (labeled + unlabeled)")
print("- Trains classifier only on labeled data")
print("- Shares representations between tasks")
print("- Typically outperforms supervised learning with limited labels")