In [1]:
# The following lines enable automatic reloading of modules in an IPython/Jupyter environment.
# They work exactly like the commented lines below, but avoid errors when not running in such an environment.
# %load_ext autoreload
# %autoreload 2

try:
    # Only defined inside IPython/Jupyter
    get_ipython().run_line_magic("load_ext", "autoreload")
    get_ipython().run_line_magic("autoreload", "2")
except (NameError, AttributeError):
    # Not running in IPython → just ignore
    pass


In [2]:
# Initializing answer variable
answer = {}


In [3]:
# Some libs that we will use
import torch
import random
import numpy as np
import json_tricks
import lovely_tensors as lt
import matplotlib.pyplot as plt

# Making tensor printouts better
lt.monkey_patch()

# Adding sources to the pythonpath
import sys
root_path = '../../../..'
sys.path.append(root_path)

import dotenv
dotenv.load_dotenv(dotenv.find_dotenv(root_path + '/.env'))

# Importing sources of our project
import src


# Task 0: Prepare the environment

(Take care of reproducibility)

In [4]:
# YOUR CODE HERE


# Task 1: Prepare the data

We will use the same MNIST dataset, so just import it

In [5]:
MNIST_train = ...
MNIST_valid = ...
## YOUR CODE HERE


Check that the data is prepared:

In [6]:
with src.utils.safecatch():
    train_sample = MNIST_train[0]
    valid_sample = MNIST_valid[0]

    X_train = train_sample['image']
    X_valid = valid_sample['image']
    y_train = train_sample['label']
    y_valid = valid_sample['label']


In [7]:
with src.utils.safecatch():
    ## This checks are for dataset verification
    answer['X_train.dtype'] = str(X_train.dtype)
    answer['y_train.dtype'] = str(y_train.dtype)
    answer['X_valid.dtype'] = str(X_valid.dtype)
    answer['y_valid.dtype'] = str(y_valid.dtype)
    answer['X_train.shape'] = X_train.shape
    answer['X_valid.shape'] = X_valid.shape
    answer['y_train.shape'] = y_train.shape
    answer['y_valid.shape'] = y_valid.shape
    answer['X_train.mean'] = float(X_train.mean())
    answer['y_train.mean'] = float(y_train.float().mean())
    answer['X_valid.mean'] = float(X_valid.mean())
    answer['y_valid.mean'] = float(y_valid.float().mean())

    print(X_train.dtype, X_valid.dtype, y_train.dtype, y_valid.dtype)
    print(X_train.shape, X_valid.shape, y_train.shape, y_valid.shape)

    print(X_train)
    print(X_valid)
    print(y_train)
    print(y_valid)


In [8]:
with src.utils.safecatch():
    plt.imshow(X_train)
    plt.show()
    print(y_train)


# Task 2: Build the code for the Autoencoder

This time we will build an autoencoder network that contains two parts:
- Encoder
- Decoder

Your task will be to build a model that contains two of these modules (each Fully-Connected Neural Network) having in mind that volume of Encoder should be exactly equal to the volume of the decoder.

Both encoder and decoder should have the same structure:
```bash
Linear
Activation
Linear     x N
Activation x N
Linear
```

The code should be done in `src/models/feedforward/autoencoders.py`

Number of linear and activation layers should be taken from the input list of channels.

For instance, if the list with channels is `[28*28, 128, 64]`, the encoder should look like:
```
28 * 28 -> Linear -> Activation -> 128 -> Linear -> Activation -> 64 -> Linear -> 64
```

The decoder should look like:
```
28 * 28 <- Linear <- 128 <- Activation <- Linear <- 64 <- Activation <- Linear <- 64
```

In this case the image is compressed into a vector of size 64

In [9]:
with src.utils.safecatch():
    ## Load your Autoencoder model
    ae_model = src.models.feedforward.autoencoder.Autoencoder([28 * 28, 128, 64])

    # Init the model for checking
    src.utils.deterministic_init(ae_model)


    check_input = {'image': torch.randn(10, 28 * 28)}
    with torch.no_grad():
        check_output = ae_model(check_input['image'])

        answer['check_result'] = src.utils.detach_copy(check_output)


# Task 3: Build Variational Autoencoder

The main difference between standard AutoEncoder and Variational AutoEncoder is that in Variational Autoencoder there is a reparametrization block (we will call it `sampler` in our network)

The idea is that this block computes $\mu$ and $\sigma$ from the input signal, and then samples the result from $\mathcal N (\mu, \sigma)$

Your task is to code this block.

Use linear layers to predict a tensor of $\mu$ and a tensor used to calculate $\sigma$ (without activation): $\mathbf z_\mu$, $\mathbf z_{\sigma}$
- `mu_regressor`
- `logvar_regressor`

After that, calculate:
- $\mu = \mathbf z_{\mu}$
- $\sigma = \exp(\frac{1}{2} \mathbf z_{\sigma})$

In the end, you should calculate the result as
- In case of training mode:
    $\mathbf z_{out} = \mu + \mathbf \epsilon * \sigma,$ where $\epsilon \propto \mathcal N (0, 1)$
- In case of evaluation mode:
    $\mathbf z_{out} = \mu$

Note that all the tensors, $\mu$, $\sigma$ and $\epsilon$ should have the same shape as the input tensor.

Add the abovecreated block in the middle of your Autoencoder to get VAE.

Note that VAE should return all of means and standard deviations together with result so that we are able to use them in the loss function.

In [10]:
with src.utils.safecatch():
    ## Load your VAE model
    vae_model = src.models.feedforward.autoencoder.VAE([28 * 28, 128, 64])

    # Init the model for checking
    # src.utils.deterministic_init(vae_model)


    check_input = {'image': torch.randn(10, 28 * 28)}
    with torch.no_grad():
        check_output = vae_model(check_input['image'])

        answer['check_result'] = src.utils.detach_copy(check_output)


# Task 3: Create loss function and optimizer



In this case we will use Adam with standard parameters as optimizer (`lr=1.0e-4` and weight decay `1.0e-4`)

As a loss function we will use MAE loss:

$L(\mathbf x^*, \mathbf x) = \frac{1}{S}\sum_{s=1}^S |x^*_s - x^s|,$

where index $s$ runs through all the pixels of all images (predicted and input)

Also create yet another loss function for regularization of the Variational AutoEncoder (VAE)

It works like this:

$L_{reg} = \frac{1}{2S}\sum_{s=1}^{S} (\mu_s^2 + \sigma_s^2 - \log \sigma_s^2 - 1)$

(this value is KL distance beteween normal distribution $\mathcal N (0, 1)$ and $\mathcal N (\mu_s, \sigma_s)$). What we are doing is we are training the model so that it minimizes the loss above, but does not deviate too much from univariate normal distribution.

The total loss function should be $L_{total} = L + \lambda L_{reg}$

Where $\lambda$ is regularization coefficient `reg_coeff`

In [11]:
def loss(pred, targ):
    val = 0
    ## YOUR CODE HERE
    return val

def loss_reg(means, stds):
    val = 0
    ## YOUR CODE HERE
    return val

epochs = 10

optimizer_ae = ...
optimizer_vae = ...

## YOUR CODE HERE


In [12]:
with src.utils.safecatch():
    # Checking loss
    answer['loss_val'] = loss(torch.randn(5, 28, 28), torch.randn(5, 28, 28))
    answer['loss_reg_val'] = loss_reg(torch.randn(5, 28, 28), torch.randn(5, 28, 28).exp())

    # Adding optimizer to the checker
    answer['optimizer_ae'] = str(optimizer_ae)
    answer['optimizer_vae'] = str(optimizer_vae)
    print(optimizer_vae)


# Task 4: Create DataLoaders

In [13]:
from tqdm import trange, tqdm
import neptune
import os

batch_size = 32
n_epochs = 10

train_dl = ...
valid_dl = ...

## YOUR CODE HERE


# Task 5: Create training loop

The task is the same:
- load samples batch-by-batch
- send the smaples through the model
- calculate loss function
- backpropagate the gradient
- step the optimizer
- perform validation to check overfitting

Create training cycles for AE and VAE

They share a lot of code and actually are different only in loss and returned values

You can try to join both of the training cycles, but not that it will require some code organization.

In [None]:
def train_model_ae(model, n_epochs, train_dl, valid_dl, loss, optimizer):
    train_loss_history = []
    valid_loss_history = []

    ## YOU SHOULD ADD THE nepune.ai TOKEN BEFORE RUNNING THE TRAINING
    run = neptune.init_run(
        ## YOUR CODE HERE
    )

    for epoch in trange(n_epochs):
        train_loss = {'enumerator': 0.0, 'denominator': 1.0e-8}
        valid_loss = {'enumerator': 0.0, 'denominator': 1.0e-8}

        for batch in train_dl:
            ...
            ## YOUR TRAINING CODE HERE

        with torch.no_grad():
            for valid_batch in valid_dl:
                ...
                ## YOUR EVALUATION CODE HERE

            finalized_train_loss = train_loss['enumerator'] / train_loss['denominator']
            finalized_valid_loss = valid_loss['enumerator'] / valid_loss['denominator']

            # Logging the progress to neptune

            train_loss_history.append(finalized_train_loss)
            valid_loss_history.append(finalized_valid_loss)

    run.stop()
    return train_loss_history, valid_loss_history

with src.utils.safecatch():
    train_loss_history_ae, valid_loss_history_ae = train_model_ae(ae_model, n_epochs, train_dl, valid_dl, loss, optimizer_ae)


In [16]:
def train_model_vae(model, n_epochs, train_dl, valid_dl, loss, optimizer, reg_coeff=0.1):
    train_loss_history = []
    valid_loss_history = []

    ## YOU SHOULD ADD THE nepune.ai TOKEN BEFORE RUNNING THE TRAINING
    run = neptune.init_run(
        ## YOUR CODE HERE
    )

    for epoch in trange(n_epochs):
        train_loss = {'enumerator': 0.0, 'denominator': 1.0e-8}
        valid_loss = {'enumerator': 0.0, 'denominator': 1.0e-8}

        for batch in train_dl:
            ...
            ## YOUR TRAINING CODE HERE

        with torch.no_grad():
            for valid_batch in valid_dl:
                ...
                ## YOUR EVALUATION CODE HERE

            finalized_train_loss = train_loss['enumerator'] / train_loss['denominator']
            finalized_valid_loss = valid_loss['enumerator'] / valid_loss['denominator']

            # Logging the progress to neptune

            train_loss_history.append(finalized_train_loss)
            valid_loss_history.append(finalized_valid_loss)

    run.stop()
    return train_loss_history, valid_loss_history

with src.utils.safecatch():
    train_loss_history_vae, valid_loss_history_vae = train_model_vae(vae_model, n_epochs, train_dl, valid_dl, loss, optimizer_vae)


In [None]:
with src.utils.safecatch():
    answer['train_loss_history_ae'] = train_loss_history_ae
    answer['valid_loss_history_ae'] = valid_loss_history_ae

    answer['train_loss_history_vae'] = train_loss_history_vae
    answer['valid_loss_history_vae'] = valid_loss_history_vae

json_tricks.dump(answer, '.answer.json')


# Task 7. Experiment time!

The code below is commented -- slowly uncomment it trying to understand, what is happenning

Create a new autoencoder model and train it for 150 epochs (or more if you like)

In [None]:
# n_epochs = 150
# model_ae = src.models.feedforward.autoencoder.Autoencoder([28 * 28, 128, 64])
# optimizer_ae = torch.optim.Adam(model_ae.parameters(), lr=1.0e-3, weight_decay=1.0e-8)

# train_loss_history, valid_loss_history = train_model_ae(model_ae, n_epochs, train_dl, valid_dl, loss, optimizer_ae)


After you're done, you can elaborate on the experiments. The taks is to make the reconstruction as good as possible meanwhile forcing the network to generalize between the concepts. That means that if you have a linear interpolation between two embeddings, you should have a continuous sequence of something that at least distantly resembles a digit.

The last will be extremely hard to achieve with simple AE.

In [None]:
# with torch.no_grad():
#     img1 = MNIST_valid[0]['image']
#     img2 = MNIST_valid[1]['image']

#     plt.figure()
#     plt.imshow(img1.reshape([28, 28]))
#     plt.figure()
#     plt.imshow(img2.reshape([28, 28]))

#     emb1 = model_ae.encoder(img1.reshape([1, -1]))
#     emb2 = model_ae.encoder(img2.reshape([1, -1]))

#     rec1 = model_ae.decoder(emb1)
#     rec2 = model_ae.decoder(emb2)

#     plt.figure()
#     plt.imshow(rec1.reshape([28, 28]))
#     plt.figure()
#     plt.imshow(rec2.reshape([28, 28]))


In [None]:
# figures = []

# with torch.no_grad():
#     for a in np.linspace(0, 1, 10):
#         rec = model_ae.decoder(a * emb1 + (1 - a) * emb2)
#         figures.append(rewhat c.reshape([28, 28]))

# plt.figure(figsize=(10, 10))
# plt.imshow(np.concatenate(figures, axis=1))


As you can see, the standard AE does not mix the entities, it mixes the images.

Now try to achieve the same task with VAE. Start with regularization $10^{-1}$ and then try to find the configuration, where your network is mixing the concepts the best. Obviously, if you make regularization term equal exactly to $0$, you will have exact Autoencoder behavior. 

In [None]:
# n_epochs = 50
# model_vae = src.models.feedforward.autoencoder.VAE([28 * 28, 128, 64])
# optimizer_vae = torch.optim.Adam(model_vae.parameters(), lr=1.0e-3, weight_decay=1.0e-8)

# train_loss_history, valid_loss_history = train_model_vae(model_vae, n_epochs, train_dl, valid_dl, loss, optimizer_vae)


In [None]:
# with torch.no_grad():
#     img1 = MNIST_valid[0]['image']
#     img2 = MNIST_valid[1]['image']

#     plt.figure()
#     plt.imshow(img1.reshape([28, 28]))
#     plt.figure()
#     plt.imshow(img2.reshape([28, 28]))

#     emb1 = model_vae.encoder(img1.reshape([1, -1]))
#     emb2 = model_vae.encoder(img2.reshape([1, -1]))

#     emb1, _, _ = model_vae.sampler(emb1)
#     emb2, _, _ = model_vae.sampler(emb2)

#     rec1 = model_vae.decoder(emb1)
#     rec2 = model_vae.decoder(emb2)

#     plt.figure()
#     plt.imshow(rec1.reshape([28, 28]))
#     plt.figure()
#     plt.imshow(rec2.reshape([28, 28]))


In [None]:
# figures = []

# with torch.no_grad():
#     for a in np.linspace(0, 1, 10):
#         rec = model_vae.decoder(a * emb1 + (1 - a) * emb2)
#         figures.append(rec.reshape([28, 28]))

# plt.figure(figsize=(10, 10))
# plt.imshow(np.concatenate(figures, axis=1))


As we can see, after inclusion of variation into the autoencoder at least tries to mix the concepts of different patterns