<a href="https://colab.research.google.com/github/kanguR00t/adv_ml_autoencoders/blob/main/Advanced_ML_Autoencoders.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Active Training Course "Advanced Deep Learning" - Autoencoder Tutorial

Hi there! This notebook is designed to contain the full tutorial on autoencoders. We will be investigating various topics from [yesterday's lecture](https://indico.desy.de/event/40560/timetable/?view=standard#b-19795-autoencoder-lecture):



*   Image de-noising
*   Anomaly detection in a dataset
*   Dimensionality Reduction

For this, the notebook is divided into multiple parts. We will be focusing on a dataset of [classified galaxy images](https://astronn.readthedocs.io/en/latest/galaxy10.html) for the tutorial.

If you get stuck or have any questions at any point, don't hesitate to ask!



# Part 0: The Dataset

As a first step, let's initialise our notebook and load the dataset we will be using. Afterwards, we'll take a peek at what we will be working with.

In [None]:
import os
import h5py

import torch
import torchvision
from torchvision.transforms import v2

import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
# Just to make the session somewhat determinate
def set_seed(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
set_seed(0)

If you are running a CPU session currently, this cell will tell you. In that case, you might want to switch to a T4 GPU or TPU session to speed up training the models!

In [None]:
print(f"CUDA is available: {torch.cuda.is_available()}")

if not torch.cuda.is_available():
    print("If you want, you might want to switch to a GPU-accelerated session!")
    device = torch.device('cpu')
else:
    device = torch.device('cuda')

In [None]:
data_url = 'https://www.astro.utoronto.ca/~hleung/shared/Galaxy10/Galaxy10_DECals.h5'
data_file = os.path.basename(data_url)

In [None]:
# You should only need to execute this cell once - Afterwards, the data will be available!
!wget {data_url}

In [None]:
class Features_Dataset(torch.utils.data.Dataset):
    def __init__(self, archive):
        self.archive = archive
        self._load()

    def _load(self):
        with h5py.File(self.archive, 'r', libver='latest', swmr=True) as archive:
            # Directly transpose to BCHW from BHWC for PyTorch
            self.images = np.transpose(np.array(archive['images']), [0, 3, 1, 2])
            self.labels = np.array(archive['ans'])

    def __getitem__(self, index):
        return self.images[index], self.labels[index]

    def __len__(self):
        return len(self.labels)

We'll already split up our dataset into training, validation, and testing here in a 3/1/1 split. In principle, feel free to change the proportions here, but having separate datasets for training, validating the model against overtraining, and for testing performance afterwards is absolutely beneficial.

In [None]:
train_split = 0.6
valid_split = 0.2

full_dataset = Features_Dataset(archive=data_file)

test_split = 1 - train_split - valid_split

train_dataset, valid_dataset, test_dataset = torch.utils.data.random_split(
        full_dataset, [train_split, valid_split, test_split]
)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True)
valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=128, shuffle=False)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=128, shuffle=False)

This next cell simply sets up some functions we'll use for plotting our input and output images in the following.

In [None]:
fig_size = 4

def show_images(images, labels=None, num_columns=5, num_rows=3):
    fig, axs = plt.subplots(nrows=num_rows, ncols=num_columns, figsize=(fig_size * num_columns, fig_size * num_rows),
                        subplot_kw={'xticks': [], 'yticks': []})

    labels = [None] * num_columns * num_rows if labels is None else labels

    for ax, image, label in zip(axs.flat, images, labels):
        # Use a grayscale colourmap if we have only a single channel
        cmap = 'gray' if image.shape[0] == 1 else None

        # Imshow expects HWC, so backtransform here again
        ax.imshow(np.transpose(image, [1, 2, 0]), cmap=cmap)
        if label:
            ax.set_title(f"{label}")

    plt.tight_layout()
    plt.show()

Let's immediately plot some galaxies to get a feel for our data!

In [None]:
images, labels = next(iter(train_loader))

show_images(images, labels)

We now have some initial images of different galaxy types. Currently, the associated labels are simply integers.

To figure out what types of galaxies the dataset is considering, you might want to find some association [here](https://astronn.readthedocs.io/en/latest/galaxy10.html) and enter the corresponding information into the dictionary below so that you get a better understanding of the dataset contents (and to make the image information slightly nicer to read).

In [None]:
GALAXY_LABEL_DICT = {
    # TODO: Figure out what types of galaxies we are dealing with here
    0: "<Some galaxy type>",
    1: "<Another galaxy type>",
}

def get_label_text(labels):
    return [GALAXY_LABEL_DICT[label.item()] for label in labels]

Now, let's plot again what galaxies we were looking at before!

In [None]:
show_images(images, get_label_text(labels))

To make the autoencoder training take slightly shorter, we will also reduce the image size slightly down to 64x64 pixels and convert the images to grayscale.

If you have some time left over at the and or want to retry some of the tasks, feel free to change the image size or train on the 3-channel RGB images. (For this, you will also have to adjust the model architecture!)

In [None]:
resize_transform = v2.Compose([
    v2.ToDtype(torch.float32, scale=True),
    v2.Resize(size=64),
    v2.Grayscale(),
])

In [None]:
show_images(resize_transform(images), get_label_text(labels))

# Part 1: De-Noising

After taking a first look at our data, we will train our Autoencoder to de-noise galaxy images here. Although the images show some noise in the telescope pictures already, we will have to add another component to obtain a 'noise-less' training target first.

Then we can start the training with the goal to minimise the loss between the model output and the noise-less images.

This is just setting up some helpful functions for model training and validation

In [None]:
import fastprogress


def train(dataloader, optimizer, model, loss_fn, device, master_bar,
          transform_common=None, transform_input=None):
    """Run one training epoch.

    Args:
        dataloader (DataLoader): Torch DataLoader object to load data
        optimizer: Torch optimizer object
        model (nn.Module): Torch model to train
        loss_fn: Torch loss function
        device (torch.device): Torch device to use for training
        master_bar (fastprogress.master_bar): Will be iterated over for each
            epoch to draw batches and display training progress
        transform_common (function): Transform to apply to input and target
        transform_input (function): Transform to apply to the input for de-noising.
            By default, no transform is carried out

    Returns:
        float: Mean loss of this epoch
    """
    epoch_loss = []

    for x, _ in fastprogress.progress_bar(dataloader, parent=master_bar):
        optimizer.zero_grad()
        model.train()

        x = transform_common(x) if transform_common else x
        x_inp = transform_input(x) if transform_input else x

        # Forward pass
        x = x.to(device)
        x_inp = x_inp.to(device)
        x_hat, mu, logvar = model(x_inp)

        # Compute loss
        loss = loss_fn(x_hat, x, mu, logvar)

        # Backward pass
        loss.backward()
        optimizer.step()

        # For plotting the train loss, save it for each sample
        epoch_loss.append(loss.item())
        master_bar.child.comment = f"Train Loss: {epoch_loss[-1]:.3f}"

    # Return the mean loss and the accuracy of this epoch
    return np.mean(epoch_loss)


def validate(dataloader, model, loss_fn, device, master_bar,
             transform_common=None, transform_input=None):
    """Compute loss on validation set.

    Args:
        dataloader (DataLoader): Torch DataLoader object to load data
        model (nn.Module): Torch model to train
        loss_fn: Torch loss function
        device (torch.device): Torch device to use for training
        master_bar (fastprogress.master_bar): Will be iterated over to draw
            batches and show validation progress
        transform_common (function): Transform to apply to input and target
        transform_input (function): Transform to apply to the input for de-noising.
            By default, no transform is carried out

    Returns:
        float: Mean loss on validation set
    """
    epoch_loss = []

    model.eval()
    with torch.no_grad():
        for x, _ in fastprogress.progress_bar(dataloader, parent=master_bar):
            x = transform_common(x) if transform_common else x

            x_inp = transform_input(x) if transform_input else x

            # make a prediction on test set
            x = x.to(device)
            x_inp = x_inp.to(device)
            x_hat, mu, logvar = model(x_inp)

            # Compute loss
            loss = loss_fn(x_hat, x, mu, logvar)

            # For plotting the train loss, save it for each sample
            epoch_loss.append(loss.item())
            master_bar.child.comment = f"Valid. Loss: {epoch_loss[-1]:.3f}"

    # Return the mean loss, the accuracy and the confusion matrix
    return np.mean(epoch_loss)




def train_model(model, optimizer, loss_function, device, num_epochs,
                train_dataloader, valid_dataloader,
                transform_common=None, transform_input=None):
    """Run model training.

    Args:
        model (nn.Module): Torch model to train
        optimizer: Torch optimizer object
        loss_fn: Torch loss function for training
        device (torch.device): Torch device to use for training
        num_epochs (int): Max. number of epochs to train
        train_dataloader (DataLoader): Torch DataLoader object to load the
            training data
        valid_dataloader (DataLoader): Torch DataLoader object to load the
            test data
        transform_common (function): Transform to apply to input and target
        transform_input (function): Transform to apply to the input for de-noising.
            By default, no transform is carried out

    Returns:
        list, list: Return list of train losses, test losses.
    """
    master_bar = fastprogress.master_bar(range(num_epochs))
    epoch_list, train_losses, valid_losses = [], [], []

    master_bar.names = ["Train", "Valid."]

    for epoch in master_bar:
        # Train the model
        epoch_train_loss = train(train_dataloader, optimizer, model, loss_function, device, master_bar, transform_common, transform_input)
        # Validate the model
        epoch_valid_loss = validate(valid_dataloader, model, loss_function, device, master_bar, transform_common, transform_input)

        # Save loss and acc for plotting
        epoch_list.append(epoch + 1)
        train_losses.append(epoch_train_loss)
        valid_losses.append(epoch_valid_loss)

        graphs = [[epoch_list, train_losses], [epoch_list, valid_losses]]
        x_bounds = [1, num_epochs]

        master_bar.write(
            f"Epoch {epoch + 1}, "
            f"avg. train loss: {epoch_train_loss:.3f}, "
            f"avg. valid. loss: {epoch_valid_loss:.3f}"
        )
        master_bar.update_graph(graphs, x_bounds)


    return train_losses, valid_losses

The following will define our first autoencoder. It already has some capabilities for the later parts of the tutorial (mainly for the variational autoencoder), which we will not need as of yet...

The idea behind this autoencoder architecture is to first reduce image dimensions further with a couple of 2D-convolutional layers (for a good visualisation of what these do, check out [this link](https://github.com/vdumoulin/conv_arithmetic/blob/master/README.md) stolen from the pytorch convolution layer [documentation](https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html)).

After the convolutions, the low-dimensional images are fed into a couple of linear layers to reduce the model dimensionality further to the latent space.

After this encoder part, the decoder should just revert the process in our case. For this, we start with the encoder's linear layers in reverse, before scaling up the images again in a number of 2D-convolutional transpose layers.

Some parts of the architecture are left out here on purpose so that you can play around with the convolutional layers yourself!

As a suggestion, you might want to implement an architecture similar to the following:
 - Start with a conv-layer reducing image dimensions to 32x32 with a 4x4
   convolutional kernel
   (To do this, use a stride of 2 and appropriate padding)
 - Next, further reduce the image size to 16x16 (again using stride, padding
   and a 4x4 kernel)
 - Use a third convolutional layer to get the image dimension down to 8x8
 - Feel free to use any number of kernels per layer you like. For this, the
   `num_filters` hyperparameter is available.
 - Don't forget to introduce non-linear activation layers in between the
   convolutions!
 - Remember to 'mirror' the encoder part for the decoder model!

In [None]:
from torch import nn

class Autoencoder(nn.Module):

    def __init__(self, image_size=64, num_channels=1, latent_dims=128, num_filters=64, do_sampling=False):
        super(Autoencoder, self).__init__()

        self.latent_dims  = latent_dims
        self.image_size   = image_size
        self.num_channels = num_channels
        self.num_filters  = num_filters
        self.do_sampling  = do_sampling

        # Encoder
        self.conv_encoder = nn.Sequential(
            # TODO: Build the convolutional layers (torch.nn.Conv2d) here
        )

        # Linear Encoder
        # TODO: Match the dimensionality of the first and last layer here!
        self.fc_lin_down = nn.Linear(<MATCH_DIM>, 8 * self.num_filters)
        self.fc_mu       = nn.Linear(8 * self.num_filters, self.latent_dims)
        self.fc_logvar   = nn.Linear(8 * self.num_filters, self.latent_dims)
        self.fc_z        = nn.Linear(self.latent_dims, 8 * self.num_filters)
        self.fc_lin_up   = nn.Linear(8 * self.num_filters, <MATCH_DIM>)

        # Decoder
        self.conv_decoder = nn.Sequential(
            # TODO: Implement the reverse of the encoder here using torch.nn.ConvTranspose2d layers
            # The last activation here should be a sigmoid to keep the pixel values clipped in [0, 1)
            nn.Sigmoid(),
        )

    def encode(self, x):
        ''' Encoder: output is (mean, log(variance))'''
        x       = self.conv_encoder(x)
        # Here, we resize the convolutional output appropriately for a linear layer
        # TODO: Fill in the correct dimensionality for the reordering
        x       = x.view(-1, self.num_filters * 8 * 8)
        x       = self.fc_lin_down(x)
        x       = nn.functional.relu(x)
        mu      = self.fc_mu(x)
        logvar  = self.fc_logvar(x)
        return mu, logvar

    def sample(self, mu, logvar):
        ''' Sample from Gaussian with mean `mu` and SD `sqrt(exp(logvarz))`'''
        # Only use the full mean/stddev procedure if we want to later do sampling
        # And only reparametrise if we are in training mode
        if self.training and self.do_sampling:
            std = torch.exp(logvar * 0.5)
            eps = torch.randn_like(std)
            sample = mu + (eps * std)
            return sample
        else:
            return mu

    def decode(self, z):
        '''Decoder: produces reconstruction from sample of latent z'''
        z = self.fc_z(z)
        z = nn.functional.relu(z)
        z = self.fc_lin_up(z)
        z = nn.functional.relu(z)
        # TODO: Fill in the correct dimensionality for the reordering here again
        z = z.view(-1, self.num_filters, 8, 8)
        z = self.conv_decoder(z)
        return z

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.sample(mu, logvar)
        x_hat = self.decode(z)
        if self.do_sampling:
            return x_hat, mu, logvar
        else:
            return x_hat, None, None

Now, let's define some loss for our autoencoder. Initially, we just want the model to map images onto themselves - remember, the power of this lies in the low latent dimensionality forcing the autoencoder to be smart about this mapping!

Therefore, the basic loss function here is just a minimum square loss.

For Part 3 of the tutorial, there is also the option to feed in some additional parameters from the model's latent space. For now, we do not need this, but we'll get back to implementing an additional Kullbar-Leibler divergence term for this later on!

In [None]:
def autoencoder_loss(recon_x, x, mu=None, logvar=None):
    mse_loss = torch.nn.functional.mse_loss(recon_x, x, reduction='sum') / x.size(dim=0)

    if mu is not None and logvar is not None:
        raise NotImplementedError("Looks like you still need to implement the KL divergence loss!")
    else:
        return mse_loss

In [None]:
model = Autoencoder()
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), learning_rate)

In [None]:
# Let's display our model here
model

After defining model and optimiser, we should now also prepare the Gaussian noise for our input images. This is done below. The training function above also has a special parameter to apply additional transforms (such as the noise) only for the model inputs.

In [None]:
# Some training parameters - feel free to modify them as you like!
learning_rate = 2e-4
num_epochs = 15
noise_level = 5e-2

def transform_noise(x):
    noise = noise_level * torch.randn_like(x)
    return torch.clamp(x + noise, 0.0, 1.0)

In [None]:
show_images(resize_transform(images), [f"Image #{i}" for i in range(1, 6)], num_rows=1, num_columns=5)
show_images(transform_noise(resize_transform(images)), [f"Image #{i} with Noise" for i in range(1, 6)], num_rows=1, num_columns=5)

Now that we know how the Gaussian noise looks like in our galaxy images, let's try to get rid of it again! Below this is the training of the autoencoder, which might take a while depending on your chosen model architecture.

If you want, make sure to use a GPU-accelerated session for speeding up the training!

In [None]:
train_model(model, optimizer, autoencoder_loss, device, num_epochs, train_loader, valid_loader, resize_transform, transform_noise)

In [None]:
test_images = resize_transform(next(iter(test_loader))[0][:5])

noisy_test_images = transform_noise(test_images)
model.eval()
with torch.no_grad():
    denoised_test_images, _, _ = model(noisy_test_images.to(device))

denoised_test_images = denoised_test_images.to('cpu')

noise_labels = [f"Noisy Image #{i}" for i in range(1, 6)]\
             + [f"De-noised Image #{i}" for i in range(1, 6)]\
             + [f"Original Image #{i}" for i in range(1, 6)]
noise_images = torch.cat([noisy_test_images, denoised_test_images, test_images])

show_images(noise_images, noise_labels, num_rows=3, num_columns=5)

In [None]:
test_images = resize_transform(next(iter(test_loader))[0][:5])

model.eval()
with torch.no_grad():
    denoised_test_images, _, _ = model(test_images.to(device))

denoised_test_images = denoised_test_images.to('cpu')

noise_labels = [f"De-noised Image #{i}" for i in range(1, 6)]\
             + [f"Original Image #{i}" for i in range(1, 6)]
noise_images = torch.cat([denoised_test_images, test_images])

show_images(noise_images, noise_labels, num_rows=2, num_columns=5)

Depending on your exact architecture, this procedure should hopefully have gotten rid of the Gaussian noise.

If you want, you can play around with the noise level and the architecture to see how this changes performance. What happens with more convolutional layers and a slower dimensionality reduction? How about lowering the noise level for training and cranking it up during the later inference?

# Part 2: Anomaly Detection

In this part, we will use our Autoencoder to find some anomalies in a separate dataset.

To do that, we'll implicitly use the feature encoding that our autoencoder just learned: It should now be fairly good at reproducing galaxies, while not knowing at all how to encoding other images. Therefore, the mean-squared error when running anomalous galaxy images through the autoencoder should hopefully be way higher than when using some more galaxies.

For the exercise, let's first download an unlabelled test dataset, in which some anomalies are hidden on purpose!

In [None]:
anomaly_url = 'https://cernbox.cern.ch/remote.php/dav/public-files/1enTE460igQLPiz/anomaly_test.h5'
anomaly_file = os.path.basename(anomaly_url)

In [None]:
!wget {anomaly_url}

Unlike the training dataset, the anomaly data does not have any labels, to make your life not too easy. Therefore, the h5-file only has an `images` dataset.

Before we can therefore use the anomaly data, we should create a separate `torch.Dataset` type without labels for it.

In [None]:
class Anomaly_Dataset(torch.utils.data.Dataset):
    def __init__(self, archive):
        self.archive = archive
        self._load()

    def _load(self):
        # TODO: Implement a loading procedure into self.images here

    def __getitem__(self, index):
        return self.images[index]

    def __len__(self):
        return len(self.images)

In [None]:
anomaly_dataset = Anomaly_Dataset(archive=anomaly_file)

anomaly_loader = torch.utils.data.DataLoader(anomaly_dataset, batch_size=128, shuffle=False)

Now, we can simply throw our model onto the data - For this, we will have to compute the autoencoder MSE loss for the test data.

In [None]:
anomaly_loss = torch.empty(0)

model.eval()
with torch.no_grad():
    for anomaly_images in anomaly_loader:
        anomaly_images = resize_transform(anomaly_images).to(device)
        output_images, _, _ = model(anomaly_images)

        anomaly_loss = torch.cat((
            anomaly_loss,
            torch.nn.functional.mse_loss(output_images, anomaly_images, reduction='none')
        ))

# Now figure out which images are most anomalous
# For this, you should sort the anomaly loss indices!
anomaly_indices = torch.argsort(anomaly_loss, descending=True)

In [None]:
anomaly_images = [anomaly_dataset[idx] for idx in anomaly_indices[:10]]
anomaly_labels = [f"Anomaly #{i}" for i in range(1, 11)]

# We'll forego the usual image transforms here...
show_images(anomaly_images, anomaly_labels, num_rows=2, num_columns=5)

Maybe we can do better by starting from the noise-less images or using an optimised model geometry? Let's train a second model on data without noise!

In [None]:
# You should be able to do this part very much on your own.
# Train a second autoencoder on non-noisy data and check how the predictions (and the anomaly detection) develop
# Feel free to change any parts you want to, from architecture to training procedure!

In the end, which anomalies did you find in the test dataset? What do they have in common?

# Part 3: Variational Autoencoder

Now, we want to use autoencoders for another task: Generating new galaxy images!

For this, we will train a Variational Autoencoder, a specific architecture in which we enforce a specific shape of the latent feature space with a reduced dimensionality. In our case, we will use a KL-divergence term in the model loss to enforce a standard Gaussian for these.

This will allow us to sample from such a distribution to generate new galaxy images with the decoder part of our VAE!

Additionally, we can also use the output of the encoder part to carry out dimensionality reduction - similar to principal component analysis.

(As an aside: You will encounter the same idea of translating the non-trivial distribution to a simpler distribution for drawing samples from again soon - It also forms the basis of normalising flows.)

As a first step, let's implement a term for the additional KL loss of our VAE.
For this, we have the original loss in the cell below again.

Since we would like our latent features to be distributed in a normal distribution, we'll try to enforce this with a Kullbar-Leibler divergence using such a distribution as our reference distribution.

Check out Appendix B of [this paper](https://arxiv.org/abs/1312.6114) proposing VAEs for how to calculate this loss term. Using it, you should be able to implement a KL divergence loss. Afterwards, we simply have to feed in the means and standard deviations of our latent variables in order to nudge them towards normal distributions.

In [None]:
def autoencoder_loss(recon_x, x, mu=None, logvar=None):
    mse_loss = torch.nn.functional.mse_loss(recon_x, x, reduction='sum') / x.size(dim=0)

    if mu is not None and logvar is not None:
        raise NotImplementedError("Looks like you still need to implement the KL divergence loss!")
        # TODO: Implement me and remove the error above!
        # You might also want to divide by the batch size to keep the loss independent of it!
        kld_loss = 0.0
        total_loss = (mse_loss + kld_loss)

        return total_loss
    else:
        return mse_loss

Great! Let's use this new loss to train a VAE and generate some samples!

Before we do so, you might want to have a look at the VAE architecture above: Instead of using linear layers to constrict the latent feature space, we are instead using two individual linear layers to generate mean and standard deviation proxies for the latent features.

In the decoding, we can then use these as a starting point so that switching to a true normal distribution for generating new data becomes straightforward!

In [None]:
learning_rate = 1e-3

vae_model = Autoencoder(latent_dims=64, do_sampling=True)
vae_model = vae_model.to(device)
vae_optimizer = torch.optim.Adam(vae_model.parameters(), learning_rate)

In [None]:
# Just a function to generate some new images later on using just the decoder part of the VAE
def generate(model, samples):
    with torch.no_grad():
        gen_imgs = model.decode(samples.to(device))
    return gen_imgs

In [None]:
# Now it's your turn again: Train the VAE and generate some images!

# For the second part of this, you can use values randomly sampled from a normal distribution with the dimensionality of the VAE's latent feature space
# Alternatively, you can also scan across some of the latent features in a grid-search pattern

# Also, make sure that you don't train on the noisy data anymore by removing the additional noise transform!

Another way to utilise the VAE is for dimensionality reduction - because of the bottleneck in the latent feature space, the features calculated by the encoder part of the model should hopefully be expressive. This means that they should also capture some differences between the different galaxy types.

Plot the correlations between some latent features for the different galaxy classes. Which galaxy types are easily distinguishable from each other? Which ones are not? Does this agree with your expectations?

In [None]:
# To help getting you started, a plotting function is already included in this cell
# In it, the label should simply be the galaxy type number again - but feel free to update the code for usability with the explicit names!

def plot_latent(feature1, feature2, labels)
    plt.scatter(feature1, feature2, c=labels, cmap='tab10')
    plt.colorbar()
    plt.show()