In [None]:
import os
import pickle
from collections import defaultdict
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_moons

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data
from torchvision.utils import make_grid
from torch.distributions.uniform import Uniform
from torch.distributions.normal import Normal

These functions are helpers that will train your models and visualize the results. You do not have to change them.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
def train_epoch(model, train_loader, optimizer, epoch, loss_key='total'):
    model.train()
    stats = defaultdict(list)
    for x in train_loader:
        x = x.cuda()
        losses = model.loss(x)
        optimizer.zero_grad()
        losses[loss_key].backward()
        optimizer.step()

        for k, v in losses.items():
            stats[k].append(v.item())
    return stats


def eval_model(model, data_loader):
    model.eval()
    stats = defaultdict(float)
    with torch.no_grad():
        for x in data_loader:
            x = x.cuda()
            losses = model.loss(x)
            for k, v in losses.items():
                stats[k] += v.item() * x.shape[0]

        for k in stats.keys():
            stats[k] /= len(data_loader.dataset)
    return stats


def train_model(model, train_loader, test_loader, epochs, lr, loss_key='total'):
    optimizer = optim.Adam(model.parameters(), lr=lr)

    train_losses = defaultdict(list)
    test_losses = defaultdict(list)
    for epoch in range(epochs):
        model.train()
        train_loss = train_epoch(model, train_loader, optimizer, epoch, loss_key)
        test_loss = eval_model(model, test_loader)

        for k in train_loss.keys():
            train_losses[k].extend(train_loss[k])
            test_losses[k].append(test_loss[k])
    return dict(train_losses), dict(test_losses)


def show_2d_latents(latents, labels=None, title='Latent Space'):
    plt.figure()
    plt.title(title)
    if labels is None:
        labels = 'green'
    plt.scatter(latents[:, 0], latents[:, 1], s=1, c=labels)
    plt.xlabel('z1')
    plt.ylabel('z2')

    plt.show()

def show_2d_densities(densities, title='Densities'):
    plt.figure()
    plt.title(title)
    dx, dy = 0.025, 0.025
    x_lim = (-1.5, 2.5)
    y_lim = (-1, 1.5)
    y, x = np.mgrid[slice(y_lim[0], y_lim[1] + dy, dy),
                    slice(x_lim[0], x_lim[1] + dx, dx)]
    plt.pcolor(x, y, densities.reshape([y.shape[0], y.shape[1]]))
    plt.pcolor(x, y, densities.reshape([y.shape[0], y.shape[1]]))
    plt.xlabel('z1')
    plt.ylabel('z2')
    plt.show()


def plot_training_curves(train_losses, test_losses):
    n_train = len(train_losses[list(train_losses.keys())[0]])
    n_test = len(test_losses[list(train_losses.keys())[0]])
    x_train = np.linspace(0, n_test - 1, n_train)
    x_test = np.arange(n_test)

    plt.figure()
    for key, value in train_losses.items():
        plt.plot(x_train, value, label=key + '_train')

    for key, value in test_losses.items():
        plt.plot(x_test, value, label=key + '_test')

    plt.legend()
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.show()

def load_pickle(path):
    with open(path, 'rb') as f:
        data = pickle.load(f)
    train_data, test_data = data['train'], data['test']
    return train_data, test_data


def show_samples(samples, title, nrow=10):
    samples = (torch.FloatTensor(samples) / 255).permute(0, 3, 1, 2)
    grid_img = make_grid(samples, nrow=nrow)
    plt.figure()
    plt.title(title)
    plt.imshow(grid_img.permute(1, 2, 0))
    plt.axis('off')
    plt.show()


def visualize_data(data, title):
    idxs = np.random.choice(len(data), replace=False, size=(100,))
    images = train_data[idxs]
    show_samples(images, title)

# Task 1: Autoregressive flows on 2d data

In this task you will train autoregressive flow mod on 2d data. Let generate and visualize train and test data.

In [None]:
def generate_moons_data(count):
    data, labels = make_moons(n_samples=count, noise=0.1)
    data = data.astype('float32')
    split = int(0.8 * count)
    train_data, test_data = data[:split], data[split:]
    train_labels, test_labels = labels[:split], labels[split:]
    return train_data, train_labels, test_data, test_labels


def visualize_2d_data(train_data, test_data, train_labels=None, test_labels=None):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
    ax1.set_title('train', fontsize=16)
    ax1.scatter(train_data[:, 0], train_data[:, 1], s=1, c=train_labels)
    ax1.tick_params(labelsize=16)
    ax2.set_title('test', fontsize=16)
    ax2.scatter(test_data[:, 0], test_data[:, 1], s=1, c=test_labels)
    ax2.tick_params(labelsize=16)
    plt.show()

In [None]:
COUNT = 5000

train_data, train_labels, test_data, test_labels = generate_moons_data(COUNT)
visualize_2d_data(train_data, test_data, train_labels, test_labels)

Let define our model. We will use autoregressive flow model. 

You have to maximize log-likelihood (or equivalently minimize negative log-likelihood):
$$
    \log p(\mathbf{x} | \boldsymbol{\theta}) = \log p(\mathbf{z}) + \log \det \left| \frac{d \mathbf{z}}{d \mathbf{x}} \right|.
$$

Note: this is equivalent to the minimization of forward KL.

The base distribution will be 2d Uniform: $\mathbf{z} \sim p(\mathbf{z}) = U[0, 1]$ (here $\mathbf{z} = (z_1, z_2)$).

Since we will you autoregressive model, the Jacobian will be triangular.

As we discussed in the lecture cumulative distribution function (CDF) of random variable is distributed uniformly. Thus, we will use CDF to map our data to the latent space. We assume that our data comes from mixture of gaussians distribution, which is defined by:
* $\mathbf{w}$ - weights of mixture components,
* $\boldsymbol{\mu}$ - locations (means) of each gaussian, 
* $\boldsymbol{\sigma}$ - standart deviations of each gaussian.

The mapping function is the following:
$$
    z_i = F(x_i, \mathbf{w}(x_{1:i-1}), \boldsymbol{\mu}(x_{1:i-1}), \boldsymbol{\sigma}(x_{1:i-1})).
$$
Here function $F$ is just CDF of gaussian mixture:
$$
    F(x, \mathbf{w}, \boldsymbol{\mu}, \boldsymbol{\sigma}) = \int_{-\infty}^x f(t) dt, \quad \text{where} \quad f(t) = \sum_{k=1}^K w_k \mathcal{N}(t | \mu_k, \sigma^2_k)
$$

For 2d case you will get

\begin{align*}
    z_1 &= F(x_1, \mathbf{w}, \boldsymbol{\mu}, \boldsymbol{\sigma}) \\
    z_2 &= F(x_1, \mathbf{w}(x_1), \boldsymbol{\mu}(x_1), \boldsymbol{\sigma}(x_1)).
\end{align*}

In [None]:
class MixtureCDFFlow(nn.Module):
    def __init__(
        self, 
        n_components=4
    ):
        super().__init__()
        # this is a base distribution
        self.base_dist = Uniform(0.0, 1.0)
        # this is a distribution of one of the mixture component
        self.mixture_dist = Normal
        self.n_components = n_components

        # ====
        # your code
        # define mixture parameters (location, log_scale, weights_logits)
        # location - is a means of gaussian mixture components
        # log_scale - is a logarithm of standard deviation for gaussian mixture component
        # (since std should be positive we use logarithm and then exponentiate it)
        # weights_logits - logits (before softmax) of gaussian mixture weights
        # these parameters are trainable
        self.loc = 
        self.log_scale = 
        self.weight_logits = 
        # ====

    def forward(self, x):
        assert len(x.shape) == 1
        # ====
        # your code
        # to get weights of each component apply softmax to weight_logits
        weights = 
        # ====

        # ====
        # your code 
        # 1) find CDF value for each component 
        # use .cdf() method of self.mixture_dist with self.loc and self.log_scale
        # (do not forget that scale is logarithmic, we need to exponentiate)
        # 2) multiply the cdf for each gaussian to the mixture weights 
        # 3) sum these values across components
        z = 
        # ====

        # ====
        # your code 
        # calculate logarithm of determinant
        # 1) use .log_prob() method of self.mixture_dist
        # 2) exponentiate the results
        # 3) multiply by weights
        # 4) take the logarithm
        log_det = 
        # ====

        return z, log_det


class FullyConnectedMLP(nn.Module):
    def __init__(self, input_shape, hiddens, output_shape):
        assert isinstance(hiddens, list)
        super().__init__()
        self.input_shape = (input_shape,)
        self.output_shape = (output_shape,)
        self.hiddens = hiddens

        model = []
        prev_h = input_shape
        for h in hiddens:
            # ====
            # your code
            # just add linear layer followed by relu
            # ====
            prev_h = h
        model.append(nn.Linear(hiddens[-1], output_shape))
        self.net = nn.Sequential(*model)

    def forward(self, x):
        batch_size = x.shape[0]
        x = x.view(batch_size, -1)
        return self.net(x).view(batch_size, *self.output_shape)


class AutoregressiveFlow(nn.Module):
    def __init__(
        self, 
        mix_comp_z1=5,  
        mix_comp_z2=5,
        mlp_hiddens=[64, 64, 64]
    ):
        super().__init__()
        # this is a base distribution
        self.base_dist = Uniform(torch.tensor(0.0).cuda(), torch.tensor(1.0).cuda())
        # this is a distribution of one of the mixture component
        self.mixture_dist = Normal
        self.mix_comp_z2 = mix_comp_z2

        # ====
        # your code
        # define mixture cdf flow defined above for z_1
        self.dim1_flow = 
        # ====

        # ====
        # your code
        # define mlp with mlp_hiddens hidden units
        # mlp will output 3 sets ot parameters (weights_logits, loc, log_scale)
        self.mlp = 
        # =====

    def forward(self, x):
        x1, x2 = torch.chunk(x, 2, dim=1)

        # ====
        # your code
        # apply mixture cdf flow to the x1
        z1, log_det1 = 
        # ====
        
        # ====
        # your code
        # apply mlp to x1, you will get parameters of second mixture components
        loc, log_scale, weight_logits = 
        # ====

        # ====
        # your code
        # apply softmax to weight_logits to get mixture weights
        weights = 
        # ====

        # ====
        # your code
        # 1) find CDF value for z_2 
        # (use .cdf() method of self.mixture_dist with self.loc and self.log_scale)
        # 2) multiply the cdf for each gaussian to the mixture weights 
        # 3) sum these values across components
        z2 = 
        # ====

        # ====
        # your code
        # 1) use .log_prob() method of self.mixture_dist
        # 2) exponentiate
        # 3) multiply by weights 
        # 4) take logarithm
        log_det2 = 
        # ====

        # concatenate the results and return
        z = torch.cat([z1.unsqueeze(1), z2.unsqueeze(1)], dim=1)
        log_det = torch.cat([log_det1.unsqueeze(1), log_det2.unsqueeze(1)], dim=1)
        return z, log_det

    def log_prob(self, x):
        # ====
        # your code
        # apply the model to get z and log_det
        z, log_det = 
        # ====

        # ====
        # your code
        # return the log-likelihood
        # you have to sum log of base distr + log of determinant
        # but think about base distribution (it is uniform), what do we get for log of uniform density?
        return 
        # ====

    def loss(self, x):
        # loss is just negative value of log prob
        return {'total': -self.log_prob(x).mean()}

In [None]:
# ====
# your code
# choose these parameters
BATCH_SIZE = 
EPOCHS = 
LR = 
# ====
MIX_COMP_Z1 = 3
MIX_COMP_Z2 = 3
MLP_HIDDENS = [64, 64, 64]

train_loader = data.DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
test_loader = data.DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=True)

ar_flow = AutoregressiveFlow(
    mix_comp_z1=MIX_COMP_Z1,
    mix_comp_z2=MIX_COMP_Z2,
    mlp_hiddens=MLP_HIDDENS
).cuda()

train_losses, test_losses = train_model(ar_flow, train_loader, test_loader, epochs=EPOCHS, lr=LR)
assert test_losses['total'][-1] < 0.58


In [None]:
dx, dy = 0.025, 0.025
x_lim = (-1.5, 2.5)
y_lim = (-1, 1.5)
y, x = np.mgrid[slice(y_lim[0], y_lim[1] + dy, dy),
                slice(x_lim[0], x_lim[1] + dx, dx)]
mesh_xs = torch.FloatTensor(np.stack([x, y], axis=2).reshape(-1, 2)).cuda()
densities = np.exp(ar_flow.log_prob(mesh_xs).cpu().detach().numpy())

z, _ = ar_flow(torch.FloatTensor(train_data).cuda())
latents = z.cpu().detach().numpy()

plot_training_curves(train_losses, test_losses)
show_2d_densities(densities)
show_2d_latents(latents, train_labels)

Note that it is not trivial to invert this flow. Hence, we can not easily sample from it. 

# Task 2: VAE with Autoregressive prior on CIFAR10

In this task you will fit Variational Lossy Antoencoder (https://arxiv.org/abs/1611.02731) model to the CIFAR10 dataset (https://drive.google.com/file/d/16j3nrJV821VOkkuRz7aYam8TyIXLnNme/view?usp=sharing).  

In [None]:
train_data, test_data = load_pickle(os.path.join('drive', 'MyDrive', 'DGM', 'homework_supplementary', 'cifar10.pkl'))
visualize_data(train_data, 'CIFAR10 samples')

The model consists of:
* convolutional encoder (variational posterior destrituion $q(\mathbf{z} | \mathbf{x})$)
* convolutional decoder $p(\mathbf{x} | \mathbf{z})$
* **autoregressive prior** $p(\mathbf{z})$

We will use MADE model for autoregressive prior. MADE Autoregressive flow is a mapping from $\mathbf{z}$ to $\boldsymbol{\varepsilon}$). We denote it $\mathbf{z} = \mathbf{g}(\boldsymbol{\varepsilon}, \boldsymbol{\lambda}) = \mathbf{f}(\boldsymbol{\varepsilon}, \boldsymbol{\lambda})^{-1}$ 
The mapping from $z$ to $\boldsymbol{\varepsilon}$ has the form:
$$
    \varepsilon_i = z_i * \sigma(\mathbf{z}_{1:i-1}) + \mu(\mathbf{z}_{1:i-1})
$$

Note that it is parallel. We use this during training.

However, the reverse mapping will be parallel:

$$
    z_i = \frac{\varepsilon_i - \mu(\mathbf{z}_{1:i-1})}{\sigma(\mathbf{z}_{1:i-1})}
$$

We will use it during sampling from the model. Despite the fact that this transform is sequential, it is performed in the latent space, so it is quite efficient.

The ELBO objective in this task is:
$$
-E_{q(\mathbf{z}|\mathbf{x})}[\log{p(\mathbf{x}|\mathbf{z})}] + E_{q(\mathbf{z}|\mathbf{x})}[\log{q(\mathbf{z}|\mathbf{x})} - \log{p(\mathbf{z})}]
$$
where 
$$
\log{p(\mathbf{z})} = \log{p(\boldsymbol{\varepsilon})} + \log{\det\left|\frac{d\boldsymbol{\varepsilon}}{d\mathbf{z}}\right|}
$$



In [None]:
class ConvEncoder(nn.Module):
    def __init__(self, input_shape, n_latent):
        super().__init__()
        self.input_shape = input_shape
        self.n_latent = n_latent

        # ====
        # your code
        # we suggest to use the following architecture
        # conv2d(32) -> relu -> conv(64) -> relu -> conv(128) -> relu -> conv(256) -> fc(2 * n_latent)
        # but we encourage you to create your own architecture
        self.convs = nn.Sequential(
            
        )
        conv_out_dim = input_shape[1] // 8 * input_shape[2] // 8 * 256
        self.fc = nn.Linear(conv_out_dim, 2 * n_latent)
        # ====

    def forward(self, x):
        # ====
        # your code
        # 1) apply convs
        # 2) reshape the output to 2d matrix for last fc layer
        # 3) apply fc layer
        # ====
        return mu, log_std
        

class ConvDecoder(nn.Module):
    def __init__(self, n_latent, output_shape):
        super().__init__()
        self.n_latent = n_latent
        self.output_shape = output_shape

        self.base_size = (128, output_shape[1] // 8, output_shape[2] // 8)
        # ====
        # your code
        # we suggest to use the following architecture
        # fc -> conv2dtranspose(128) -> relu -> conv2dtranspose(64) -> relu 
        # -> conv2dtranspose(32) -> relu -> conv2dtranspose(3)
        # but we encourage you to create your own architecture
        self.fc = nn.Linear(n_latent, np.prod(self.base_size))
        self.deconvs = nn.Sequential(
            
        )
        # ====

    def forward(self, z):
        # ====
        # your code
        # 1) apply fc layer
        # 2) reshape the output to 4d tensor 
        # 3) apply conv layers
        # ====
        return out


class MaskedLinear(nn.Linear):
    # do not change this class
    def __init__(self, in_features, out_features, bias=True):
        super().__init__(in_features, out_features, bias)
        self.register_buffer('mask', torch.ones(out_features, in_features))

    def set_mask(self, mask):
        self.mask.data.copy_(torch.from_numpy(mask.astype(np.uint8).T))

    def forward(self, input):
        return F.linear(input, self.mask * self.weight, self.bias)


class MADE(nn.Module):
    def __init__(self, nin, bins, hidden_sizes):
        super().__init__()
        self.nin = nin
        self.nout = nin * bins
        self.bins = bins
        self.hidden_sizes = hidden_sizes
        # we will use the trivial ordering of input units
        self.ordering = np.arange(self.nin)

        self.net = []
        hs = [self.nin] + self.hidden_sizes + [self.nout]
        # ====
        # your code
        # define a simple MLP neural net
        # stack MaskedLinear layers followed by ReLU
        # (do not add ReLU after the last MaskedLinear)
        # ====
        self.net = nn.Sequential(*self.net)

        self.create_mask() # builds the initial self.m connectivity

    def create_mask(self):
        self.m = {}
        L = len(self.hidden_sizes)

        # the initial ordering is trivial
        self.m[-1] = self.ordering
        # ====
        # your code
        # for each layer and for each hidden unit we have to assign the random number from 1 to self.nin - 1
        # note that it is more efficient to assign random number from self.m[l - 1].min() to self.nin - 1
        for l in range(L):
            self.m[l] = 
        # ====

        # ====
        # your code
        # 1) for each hidden layer connect each hidden unit with random number k 
        #    with the previous layer units which has the number is less or equal than k.
        # 2) for the last mask: connect each output unit with number k with the previous layer units 
        #    which has the number is less than k.
        masks = [?? for l in range(L)]
        masks.append(??)
        # ====

        masks[-1] = np.repeat(masks[-1], self.bins, axis=1)
        self.masks = masks

        # set the masks in all MaskedLinear layers
        layers = [l for l in self.net.modules() if isinstance(l, MaskedLinear)]
        for l, m in zip(layers, masks):
            l.set_mask(m)

    def visualize_masks(self):
        for m in self.masks:
            plt.figure(figsize=(5, 5))
            plt.imshow(m, cmap='gray')
            plt.show()

    def forward(self, x):
        batch_size = x.shape[0]
        out = x.view(batch_size, self.nin)
        out = self.net(out)
        out = out.view(batch_size, self.nin, self.bins)
        return out


class AFVAE(nn.Module):
    def __init__(self, input_shape, latent_size, use_afp=False):
        super().__init__()
        assert len(input_shape) == 3
        self.input_shape = input_shape
        self.latent_size = latent_size
        # if the flag is False, we will get standard VAE model without autoregressive prior
        self.use_afp = use_afp

        if use_afp:
            # made model has latent_size nin, 2 bins (for mean and std of output normal) 
            self.made = MADE(latent_size, 2, hidden_sizes=[512, 512])
        self.encoder = ConvEncoder(input_shape, latent_size)
        self.decoder = ConvDecoder(latent_size, input_shape)

    def loss(self, x):
        # we normalize input to [-1; 1] range
        x = 2 * x.float() - 1
        # ====
        # your code
        # apply encoder to x to get variational distribution parameters
        mu_z, log_std_z = 
        # ====

        # ====
        # your code
        # sample z from variational distribution (reparametrization trick)
        z = 
        # ====

        # ====
        # your code
        # apply decoder to get reconstructed x
        x_recon = 
        # ====

        # ====
        # your code
        # calculate recon_loss
        recon_loss = 
        # ====

        # ====
        # your code
        # enc log prob is logarithm of normal distribution density on z: log q(z|x)
        enc_log_prob = 
        # ====

        if self.use_afp:
            # ====
            # your code
            # apply MADE model to z
            out = 
            # ====
            mu, log_std = out.chunk(2, dim=-1)

            # this trick is just for model stability (do not touch it)
            log_std = torch.tanh(log_std)
            mu, log_std = mu.squeeze(-1), log_std.squeeze(-1)

            # ====
            # your code
            # scale z to sigma and shift to mu get epsilon (reparametrization trick)
            eps = 
            # ====
        else:
            # if we do not use autoregressive prior prior_log_prob is log p(z) = log p(epsilob)
            eps = z
            log_std = 0.0

        # ====
        # your code
        # compute log p(z) = log p(epsilon) + log |d epsilon / d z|
        # 1) compute prior log prob (logarithm of standart normal distribution): log p(epsilon)
        # 2) add logarithm of determinant (in out case it is just log_std): log |d epsilon / d z|
        prior_log_prob = 
        prior_log_prob += 
        # ====

        # kl loss is difference between encoder log prob and prior log prob
        kl_loss = (enc_log_prob - prior_log_prob).sum(1).mean()
        return {
            'total': recon_loss + kl_loss,
            'recon_loss': recon_loss,
            'kl_loss': kl_loss
        }

    def sample_prior(self, n):
        with torch.no_grad():
            z = torch.randn(n, self.latent_size).cuda()
            if self.use_afp:
                for i in range(self.latent_size):
                    mu, log_std = self.made(z)[:, i].chunk(2, dim=-1)
                    log_std = torch.tanh(log_std)
                    mu, log_std = mu.squeeze(-1), log_std.squeeze(-1)
                    # note! it is reverse transform and it is sequential
                    z[:, i] = (z[:, i] - mu) * torch.exp(-log_std)
        return z

    def sample(self, n):
        z = self.sample_prior(n)
        with torch.no_grad():
            out = self.decoder(z).cpu().permute(0, 2, 3, 1).numpy() * 0.5 + 0.5
        return out

Firstly, we fit standard VAE model without autorregressive prior.

In [None]:
# ====
# your code
# choose these parameters
BATCH_SIZE = 
EPOCHS = 
LR = 
# ====

train_data, test_data = load_pickle(os.path.join('drive', 'My Drive', 'DGM', 'homework_supplementary', 'cifar10.pkl'))

train_data = (np.transpose(train_data, (0, 3, 1, 2)) / 255.).astype('float32')
test_data = (np.transpose(test_data, (0, 3, 1, 2)) / 255.).astype('float32')

model = AFVAE((3, 32, 32), 16, use_afp=False).cuda()

train_loader = data.DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
test_loader = data.DataLoader(test_data, batch_size=BATCH_SIZE)
train_losses, test_losses = train_model(model, train_loader, test_loader, epochs=EPOCHS, lr=LR)
samples = model.sample(100) * 255

x = next(iter(test_loader))[:50].cuda()
with torch.no_grad():
    z = model.encoder(2 * x - 1)[0]
    x_recon = model.decoder(z).cpu().permute(0, 2, 3, 1).numpy() * 0.5 + 0.5
x = x.cpu().permute(0, 2, 3, 1).numpy()
reconstructions = np.stack((x, x_recon), axis=1).reshape((-1, 32, 32, 3)) * 255

x = next(iter(test_loader))[:20].cuda()
with torch.no_grad():
    x = 2 * x - 1
    z, _ = model.encoder(x)
    z1, z2 = z.chunk(2, dim=0)
    interps = [model.decoder(z1 * (1 - alpha) + z2 * alpha) for alpha in np.linspace(0, 1, 10)]
    interps = torch.stack(interps, dim=1).view(-1, 3, 32, 32)
    interps = torch.clamp(interps, -1, 1) * 0.5 + 0.5
interps = interps.permute(0, 2, 3, 1).cpu().numpy() * 255

samples, reconstructions, interps = np.clip(samples, 0, 255), np.clip(reconstructions, 0, 255), np.clip(interps, 0, 255)

samples, reconstructions, interps = samples.astype('float32'), reconstructions.astype('float32'), interps.astype('float32')
for key, value in test_losses.items():
    print('{}: {:.4f}'.format(key, value[-1]))
plot_training_curves(train_losses, test_losses)
show_samples(samples, title='Samples')
show_samples(reconstructions, title='Reconstructions')
show_samples(interps, title='Interpolations')

In [None]:
z_prior = model.sample_prior(5000).cpu().detach().numpy()
show_2d_latents(z_prior)

In this case the latent space is just standard normal.

Now we fit the true AFVAE model with MADE in prior space.

In [None]:
# ====
# your code
# choose these parameters
BATCH_SIZE = 
EPOCHS = 
LR = 
# ====

train_data, test_data = load_pickle(os.path.join('drive', 'My Drive', 'DGM', 'homework_supplementary', 'cifar10.pkl'))

train_data = (np.transpose(train_data, (0, 3, 1, 2)) / 255.).astype('float32')
test_data = (np.transpose(test_data, (0, 3, 1, 2)) / 255.).astype('float32')

model = AFVAE((3, 32, 32), 16, use_afp=True).cuda()

train_loader = data.DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
test_loader = data.DataLoader(test_data, batch_size=BATCH_SIZE)
train_losses, test_losses = train_model(model, train_loader, test_loader, epochs=EPOCHS, lr=LR)
samples = model.sample(100) * 255

x = next(iter(test_loader))[:50].cuda()
with torch.no_grad():
    z = model.encoder(2 * x - 1)[0]
    x_recon = model.decoder(z).cpu().permute(0, 2, 3, 1).numpy() * 0.5 + 0.5
x = x.cpu().permute(0, 2, 3, 1).numpy()
reconstructions = np.stack((x, x_recon), axis=1).reshape((-1, 32, 32, 3)) * 255

x = next(iter(test_loader))[:20].cuda()
with torch.no_grad():
    x = 2 * x - 1
    z, _ = model.encoder(x)
    z1, z2 = z.chunk(2, dim=0)
    interps = [model.decoder(z1 * (1 - alpha) + z2 * alpha) for alpha in np.linspace(0, 1, 10)]
    interps = torch.stack(interps, dim=1).view(-1, 3, 32, 32)
    interps = torch.clamp(interps, -1, 1) * 0.5 + 0.5
interps = interps.permute(0, 2, 3, 1).cpu().numpy() * 255

samples, reconstructions, interps = np.clip(samples, 0, 255), np.clip(reconstructions, 0, 255), np.clip(interps, 0, 255)

samples, reconstructions, interps = samples.astype('float32'), reconstructions.astype('float32'), interps.astype('float32')
for key, value in test_losses.items():
    print('{}: {:.4f}'.format(key, value[-1]))
plot_training_curves(train_losses, test_losses)
show_samples(samples, title='Samples')
show_samples(reconstructions, title='Reconstructions')
show_samples(interps, title='Interpolations')

In [None]:
model.made.visualize_masks()

In [None]:
z_prior = model.sample_prior(5000).cpu().detach().numpy()
# ====
# your code
# try to find two latent units that will give the distribution that is not standard normal
idx1 = 
idx2 = 
# ====
show_2d_latents(z_prior[..., [idx1, idx2]])