In [1]:
# DO NOT REMOVE!
import os

import numpy as np
import matplotlib.pyplot as plt

import torch

from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F

import torchvision
from torchvision.datasets import MNIST

In [2]:
# Check if GPU is available and determine the device
if torch.cuda.is_available():
  device = 'cuda'
else:
  device = 'cpu'

print(f'The available device is {device}')

The available device is cpu


In [13]:
images_dir = "../results"

In [14]:
# DO NOT REMOVE
PI = torch.from_numpy(np.asarray(np.pi))
EPS = 1.e-5


def log_categorical(x, p, num_classes=256, reduction=None, dim=None):
    x_one_hot = F.one_hot(x.long(), num_classes=num_classes)
    log_p = x_one_hot * torch.log(torch.clamp(p, EPS, 1. - EPS))
    if reduction == 'avg':
        return torch.mean(log_p, dim)
    elif reduction == 'sum':
        return torch.sum(log_p, dim)
    else:
        return log_p


def log_bernoulli(x, p, reduction=None, dim=None):
    pp = torch.clamp(p, EPS, 1. - EPS)
    log_p = x * torch.log(pp) + (1. - x) * torch.log(1. - pp)
    if reduction == 'avg':
        return torch.mean(log_p, dim)
    elif reduction == 'sum':
        return torch.sum(log_p, dim)
    else:
        return log_p


def log_normal_diag(x, mu, log_var, reduction=None, dim=None):
    D = x.shape[1]
    log_p = -0.5 * D * torch.log(2. * PI) - 0.5 * log_var - 0.5 * torch.exp(-log_var) * (x - mu)**2.
    if reduction == 'avg':
        return torch.mean(log_p, dim)
    elif reduction == 'sum':
        return torch.sum(log_p, dim)
    else:
        return log_p


def log_standard_normal(x, reduction=None, dim=None):
    D = x.shape[1]
    log_p = -0.5 * D * torch.log(2. * PI) - 0.5 * x**2.
    if reduction == 'avg':
        return torch.mean(log_p, dim)
    elif reduction == 'sum':
        return torch.sum(log_p, dim)
    else:
        return log_p

In [15]:
class Encoder(nn.Module):
    def __init__(self, encoder_net):
        # The init of the encoder network.
        super(Encoder, self).__init__()
        self.encoder = encoder_net
        
    # The reparameterization trick for Gaussians.
    @staticmethod
    def reparameterization(mu, log_var):
        # The formula is the following: # z = mu + std ∗ epsilon
        # epsilon ~ Normal(0,1)
        
        # First, we need to get std from log−variance.
        std = torch.exp(0.5*log_var)
        
        # Second, we sample epsilon from Normal(0,1).
        eps = torch.randn_like(std)
        
        # The final output
        return mu + std * eps
    
    # This function implements the output of the encoder network (i.e., parameters of a Gaussian).
    def encode(self, x):
        # First, we calculate the output of the encoder network of size 2M.
        h_e = self.encoder(x)
        # Second, we must divide the output to the mean and the log−variance.
        mu_e, log_var_e = torch.chunk(h_e, 2, dim=1) # TODO error
        return mu_e , log_var_e
    
    # Sampling procedure.
    def sample(self, x=None, mu_e=None, log_var_e=None):
        """
        Sample from the encoder. 
        If x is given (not equal to None), then copmute variational posterior distribution q(z|x) and sample from it.
        Otherwise, use `mu_e` and `log_var_e` as parameter of the variational posterior and sample from it.

        x: torch.tensor, with dimensionality (mini-batch, x_dim)
             a mini-batch of data points
        mu_e: torch.tensor, with dimensionality (mini-batch, x_dim)
             mean vector of the variational posterior
        log_var_e: torch.tensor, with dimensionality (mini-batch, x_dim)
             log variance of the variational posterior
        return: z: torch.tensor, with dimensionality (mini-batch, z_dim)
        """
        #If we don’t provide a mean and a log−variance, we must first calculate it:
        if (mu_e is None) and (log_var_e is None):
            mu_e , log_var_e = self.encode(x)
        # Or the final sample
        else:
        # Otherwise, we can simply apply the reparameterization trick!
            if (mu_e is None) or (log_var_e is None):
                raise ValueError('mu and log_var cannot be None')
        z = self.reparameterization(mu_e, log_var_e)
        return z
    
    # This function calculates the log−probability that is later used for calculating the ELBO.
    # log q(z|x)
    def log_prob(self, x=None, mu_e=None, log_var_e=None, z=None):
        # If we provide x alone, then we can calculate a corresponding sample.
        if x is not None:
            mu_e, log_var_e = self.encode(x)
            z = self.sample(mu_e=mu_e, log_var_e=log_var_e)
        else:
        # Otherwise, we should provide mu, log−var and z.
            if (mu_e is None) or (log_var_e is None) or (z is None):
                raise ValueError('mu, log_var and z cannot be None')
        return log_normal_diag(z, mu_e, log_var_e)
    
    # PyTorch forward pass: it is either log−probability (by default) or sampling.
    def forward(self, x, type = "log_prob"):
        """
        Compute log-probability of variational posterior for given x, i.e., log q(z|x)
        x: torch.tensor, with dimensionality (mini-batch, x_dim)
             a mini-batch of data points
        """
        assert type in ["encode", "log_prob"], 'Type can be either encode or log_prob'
        if type == "log_prob":
            return self.log_prob(x)
        else:
            return self.sample(x)

In [16]:
# YOUR CODE GOES HERE
# NOTE: The class must containt the following function: 
# (i) sample
# Moreover, forward must return the log-probability of the conditional likelihood function for given z, i.e., log p(x|z)

class Decoder(nn.Module):
    def __init__(self, decoder_net, distribution = "categorical", num_vals = None):
        super(Decoder, self).__init__()

        # The decoder network.
        self.decoder = decoder_net
        # The distribution used for the decoder, categorical by default.
        self.distribution = distribution
        # The number of possible values. This is important for the categorical distribution.
        self.num_vals = num_vals

    # This function calculates parameters of the likelihood function p(x|z).
    def decode(self, z):
        # First, we apply the decoder network.
        h_d = self.decoder(z)

        # In this example, we use only the categorical distribution...
        if self.distribution == "categorical":
        # We save the shapes: batch size.
            b = h_d.shape[0]
        # and the dimensionality of x.
            d = h_d.shape[1]//self.num_vals
        # Then we reshape to (Batch size, Dimensionality, Number of Values).
            h_d = h_d.view(b , d, self.num_vals)
        # To get probabilities, we apply softmax.
            mu_d = torch.softmax(h_d, 2)
            return [mu_d]

        elif self.distribution == "bernoulli":
        # In the Bernoulli case, we have x_d \in {0,1}.
        # Therefore, it is enough to output a single probability, because
        # p(x_d=1|z) = \theta and p(x_d=0|z)=1-\theta.
            mu_d = torch.sigmoid(h_d)
            return [mu_d]
        
        else:
            raise ValueError("Only distribution options are categorical and bernoulli")

    # This function implements sampling from the decoder.
    def sample(self, z):
        """
        For a given latent code compute parameters of the conditional likelihood 
        and sample x ~ p(x|z)

        z: torch.tensor, with dimensionality (mini-batch, z_dim)

        return:
        x: torch.tensor, with dimensionality (mini-batch, x_dim)
        """
        outs = self.decode(z)

        if self.distribution == "categorical":
            # We take the output of the decoder, shape = (b, d, self.num_vals)
            mu_d = outs[0]
            # and save shapes (we will need that for reshaping).
            b = mu_d.shape[0]
            m = mu_d.shape[1]
            # Here we use reshaping.
        
            # Daniel: This reshape is unnecessary? 
            mu_d = mu_d.view(mu_d.shape[0], -1, self.num_vals)

            # flatten first two dimensions 
            p = mu_d.view(-1, self.num_vals)
            
            # Eventually, we sample from the categorical (the built-in PyTorch function).
            # This generates a prediction for every pixel. Immediately reshape to (b,m,1)
            x_new = torch.multinomial(p, num_samples =  1).view(b,m)
        
        elif self.distribution == "bernoulli":
          # In the case of Bernoulli, we don't need any reshaping
            mu_d = outs[0]
          # and we can use the built-in PyTorch sampler.
            x_new = torch.bernoulli(mu_d)
        
        else:
            raise ValueError("Only distribution options are categorical and bernoulli")
        
        return x_new

    def log_prob(self, x, z):
        outs = self.decode(z)

        if self.distribution == "categorical":
            mu_d = outs[0]
            # log likelihood from probabilities mu_d
            log_p = log_categorical(x, mu_d, num_classes = self.num_vals,
                                    reduction = "sum", dim = -1).sum(-1)
        
        elif self.distribution == "bernoulli":
            mu_d = outs[0]
            log_p = log_bernoulli(x, mu_d, reduction = "sum", dim = -1)

        else:
            raise ValueError("Only distribution options are categorical and bernoulli")

        return log_p

    # The forward pass is either a log-prob or a sample.
    def forward(self, z, x=None, type="log_prob"):
        """
        Compute the log probability: log p(x|z). 
        z: torch.tensor, with dimensionality (mini-batch, z_dim)
        x: torch.tensor, with dimensionality (mini-batch, x_dim)
        """
        assert type in ["decoder", "log_prob"], "Type can be either decode or log_prob"

        if type == "log_prob":
            # calculate the log likelihood log p(x|z)
            return self.log_prob(x, z)
        else:
            # generate an image x^hat
            return self.sample(x)

In [17]:
# YOUR CODE GOES HERE
# NOTES:
# (i) Implementing the standard Gaussian prior does not give you any points!
# (ii) The function "sample" must be implemented.
# (iii) The function "forward" must return the log-probability, i.e., log p(z)

# The current implementation of the prior is very simple, namely, it is a
# standard Gaussian. We could have used a built-in PyTorch distribution. However,
# we didn't do so for two reasons:
# (i) It is important to think of the prior as a crucial component in VAEs.
# (ii) We can implement a learnable prior (e.g., a flow-based prior, VamPrior,
# a mixture of distributions.)
class Prior(nn.Module):
    def __init__(self, L, device='cpu'): # deviation, the github code does not have device parameter
        super(Prior, self).__init__()
        self.device = device
        self.L = L

    def sample(self, batch_size):
        z = torch.randn((batch_size, self.L))
        return z

    def forward(self, z):
        return log_standard_normal(z)
    
    def log_prob(self, z):
        return log_standard_normal(z)

In [18]:
# YOUR CODE GOES HERE
# This class combines Encoder, Decoder and Prior.
# NOTES:
# (i) The function "sample" must be implemented.
# (ii) The function "forward" must return the negative ELBO. Please remember to add an argument "reduction" that is either "mean" or "sum".
class VAE(nn.Module):
    def __init__(self, encoder_net, decoder_net, num_vals = 256,
                 L = 16, likelihood_type = "categorical", device='cpu'): # here again, 'device' is new parameter
        super(VAE, self).__init__()

        self.encoder = Encoder(encoder_net=encoder_net)
        self.decoder = Decoder(distribution=likelihood_type, decoder_net=decoder_net,
                               num_vals=num_vals)
        self.prior = Prior(L = L)

        self.num_vals = num_vals

        self.likelihood_type = likelihood_type

    def sample(self, batch_size=64):
        z = self.prior.sample(batch_size=batch_size)
        return self.decoder.sample(z)

    def forward(self, x, reduction='mean'):
        # encoder
        mu_e, log_var_e, = self.encoder.encode(x) # TODO error
        z = self.encoder.sample(mu_e = mu_e, log_var_e=log_var_e)
        
        # Negative ELBO (NELBO)
        #NELBO = 0.
        RE = self.decoder.log_prob(x, z)
        KL = (self.prior.forward(z) - self.encoder.log_prob(mu_e = mu_e,
                                                             log_var_e = log_var_e,
                                                             z=z)).sum(-1)
        NELBO = -(RE + KL)

        if reduction == 'sum':
            return NELBO.sum()
        else:
            return NELBO.mean()

In [19]:
# DO NOT REMOVE

def evaluation(test_loader, name=None, model_best=None, epoch=None):
    # EVALUATION
    if model_best is None:
        # load best performing model
        model_best = torch.load(name + '.model')

    model_best.eval()
    loss = 0.
    N = 0.
    for indx_batch, (test_batch, _) in enumerate(test_loader):
#     for indx_batch, test_batch in enumerate(test_loader): # For Digits
        test_batch = test_batch.to(device)
        loss_t = model_best.forward(test_batch, reduction='sum')
        loss = loss + loss_t.item()
        N = N + test_batch.shape[0]
    loss = loss / N

    if epoch is None:
        print(f'FINAL LOSS: nll={loss}')
    else:
        print(f'Epoch: {epoch}, val nll={loss}')

    return loss


def samples_real(name, test_loader, shape=(28,28)):
    # real images-------
    num_x = 4
    num_y = 4
    x = next(iter(test_loader))
#     x, _ = next(iter(test_loader))
    x = x.to('cpu').detach().numpy()

    fig, ax = plt.subplots(num_x, num_y)
    for i, ax in enumerate(ax.flatten()):
        plottable_image = np.reshape(x[i], shape)
        ax.imshow(plottable_image, cmap='gray')
        ax.axis('off')

    plt.savefig(name+'_real_images.pdf', bbox_inches='tight')
    plt.close()


def samples_generated(name, data_loader, shape=(28,28), extra_name=''):
    x = next(iter(data_loader))
#     x, _ = next(iter(data_loader))
    x = x.to('cpu').detach().numpy()

    # generations-------
    model_best = torch.load(name + '.model')
    model_best.eval()

    num_x = 4
    num_y = 4
    x = model_best.sample(num_x * num_y)
    x = x.to('cpu').detach().numpy()

    fig, ax = plt.subplots(num_x, num_y)
    for i, ax in enumerate(ax.flatten()):
        plottable_image = np.reshape(x[i], shape)
        ax.imshow(plottable_image, cmap='gray')
        ax.axis('off')

    plt.savefig(name + '_generated_images' + extra_name + '.pdf', bbox_inches='tight')
    plt.close()


def plot_curve(name, nll_val):
    plt.plot(np.arange(len(nll_val)), nll_val, linewidth='3')
    plt.xlabel('epochs')
    plt.ylabel('nll')
    plt.savefig(name + '_nll_val_curve.pdf', bbox_inches='tight')
    plt.close()

In [20]:
# DO NOT REMOVE

def training(name, max_patience, num_epochs, model, optimizer, training_loader,
             val_loader, shape=(28,28)):
    nll_val = []
    best_nll = 1000.
    patience = 0

    # Main loop
    for e in range(num_epochs):
        # TRAINING
        model.train()
#         for indx_batch, batch in enumerate(training_loader): # for Digits
        for indx_batch, (batch, _) in enumerate(training_loader):
            batch = batch.to(device)
            loss = model.forward(batch, reduction='mean') # TODO error

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Validation
        loss_val = evaluation(val_loader, model_best=model, epoch=e)
        nll_val.append(loss_val)  # save for plotting

        if e == 0:
            print('saved!')
            torch.save(model, name + '.model')
            best_nll = loss_val
        else:
            if loss_val < best_nll:
                print('saved!')
                torch.save(model, name + '.model')
                best_nll = loss_val
                patience = 0

                samples_generated(name, val_loader, shape=shape, extra_name="_epoch_" + str(e))
            else:
                patience = patience + 1

        if patience > max_patience:
            break

    nll_val = np.asarray(nll_val)

    return nll_val

In [21]:
# PLEASE DEFINE APPROPRIATE TRANFORMS FOR THE DATASET
transforms_train = torchvision.transforms.ToTensor()#,
                    #torchvision.transforms.Normalize((0.1307,), (0.3081,))]

transforms_test = torchvision.transforms.ToTensor()#[torchvision.transforms.ToTensor(),
                   #torchvision.transforms.Normalize((0.1307,), (0.3081,))]

In [23]:
# JH: make sure to have directory "VAE/results/vae/", otherwise you'll get an error.

# DO NOT REMOVE
#-dataset
dataset = MNIST('../results/', train=True, download=True,
                      transform=transforms_train
                )

train_dataset, val_dataset = torch.utils.data.random_split(dataset, [50000, 10000], generator=torch.Generator().manual_seed(14))

test_dataset = MNIST('../results/', train=False, download=True,
                      transform=transforms_test
                     )
#-dataloaders
batch_size = 32

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

#-creating a dir for saving results
result_dir = images_dir + '/vae/'
if not(os.path.exists(result_dir)):
    os.mkdir(result_dir)

#-hyperparams (please do not modify them for the final report)
num_epochs = 1000 # max. number of epochs
max_patience = 20 # an early stopping is used, if training doesn't improve for longer than 20 epochs, it is stopped

In [24]:
# Hyperparameters
D = 28   # input dimension
# D = 64 # for Digits dataset
L = 16  # number of latents
M = 256  # the number of neurons in scale (s) and translation (t) nets

lr = 1e-3 # learning rate
num_epochs = 1000 # max. number of epochs
max_patience = 20 # an early stopping is used, if training doesn't improve for longer than 20 epochs, it is stopped

In [27]:
likelihood_type = 'categorical'

# if likelihood_type == 'categorical':
#     num_vals = 17
# elif likelihood_type == 'bernoulli':
#     num_vals = 1

# encoder = nn.Sequential(nn.Linear(D, M), nn.LeakyReLU(),
#                         nn.Linear(M, M), nn.LeakyReLU(),
#                         nn.Linear(M, 2 * L))

# decoder = nn.Sequential(nn.Linear(L, M), nn.LeakyReLU(),
#                         nn.Linear(M, M), nn.LeakyReLU(),
#                         nn.Linear(M, num_vals * D))

In [25]:
# D is gehardcode, D = 28. Verder weet ik niet of deze nets werken met trainen, maar
# de shapes kloppen in elk geval wel.
# inspiratie: https://towardsdatascience.com/building-a-convolutional-vae-in-pytorch-a0f54c947f71
L = 4
encoder = nn.Sequential(nn.Conv2d(1, 16, kernel_size = 3, stride = 1, padding = 1),
                        nn.ReLU(), # size: batch, 1, 28, 28, 
                        nn.Conv2d(16, 32, kernel_size = 3, stride = 1, padding = 1),
                        nn.ReLU(), 
                        nn.Flatten(1),
                        nn.Linear(32*28*28, 2 * L))

decoder = nn.Sequential(nn.Linear(L, 32*28*28),
                        nn.ReLU(), # output shape: (batch, 32*28*28)
                        nn.Unflatten(1, (32, 28, 28)),  # output shape: (batch, 32, 28, 28)
                        nn.ConvTranspose2d(32, 64, kernel_size = 3, padding = 1),
                        nn.ReLU(), #output (batch, 64, 28, 28)
                        nn.ConvTranspose2d(64, 128, kernel_size = 3, padding = 1),
                        nn.ReLU(),#output (batch, 128, 28, 28)
                        nn.ConvTranspose2d(128, 256, kernel_size = 3, padding = 1),
                        nn.ReLU(), #output (batch, 256, 28, 28)
                        nn.Flatten(2),
                        nn.Softmax(dim = -1)) # output (batch, 256, 28*28)


num_vals = 256 
L = 4 # number of latent variables
batch = 2

## Test net and shape ##
# images = torch.randn(2, 1, 28, 28)
# Z = torch.randn(batch, L)

# encoder(images).shape # test if encoder works
# p= decoder(Z).transpose(1,2).reshape(-1, num_vals) # test if decoder works and reshape to (b*28*28)
# p.shape
# x_hat = torch.multinomial(p, num_samples = 1).view(batch, 28,28) # sample using probabilities p and torch.multinomial


In [43]:
6422528/(8*32*28*28)

32.0

In [44]:
32*28*28

25088

In [28]:
prior = torch.distributions.MultivariateNormal(torch.zeros(L), torch.eye(L))
model = VAE(encoder_net=encoder, decoder_net=decoder, num_vals=num_vals, L=L, likelihood_type=likelihood_type)

In [29]:
# PLEASE DEFINE YOUR OPTIMIZER
optimizer = torch.optim.Adamax([p for p in model.parameters() if p.requires_grad == True], lr=lr)

In [30]:
# DO NOT REMOVE OR MODIFY
# Training procedure
nll_val = training(name=result_dir + name, max_patience=max_patience, 
                   num_epochs=num_epochs, model=model, optimizer=optimizer,
                   training_loader=train_loader, val_loader=val_loader,
                   shape=(28,28))
#                    shape=(28,28)) # 'shape' argument here is not on github version

RuntimeError: shape '[32, 1, 256]' is invalid for input of size 6422528

In [None]:
# 14-12: Error in training > VAE.forward > Encoder.encode > torch.chunk
# 18-12: Error in training > VAE.forward > Decoder.log_prob > Decoder.decode >
# shape '[32, 1, 256]' is invalid for input of size 6422528

# 6422528 = 28*28*32*256

In [None]:
# DO NOT REMOVE OR MODIFY
# Final evaluation
test_loss = evaluation(name=result_dir + name, test_loader=test_loader)
f = open(result_dir + name + '_test_loss.txt', "w")
f.write(str(test_loss))
f.close()

samples_real(result_dir + name, test_loader, shape = (28,28))
samples_generated(result_dir + name, test_loader, extra_name='_FINAL', shape = (28,28))

plot_curve(result_dir + name, nll_val)

In [27]:
for indx_batch, (batch, _) in enumerate(train_loader):
    if indx_batch < 2:
        print(batch.shape)
#         for indx_batch, (batch, _) in enumerate(training_loader):
#     batch = batch.to(device)

torch.Size([32, 1, 28, 28])
torch.Size([32, 1, 28, 28])


# To run with Digits dataset:

In this example, we go wild and use a dataset that is simpler than MNIST! We use a scipy dataset called Digits. It consists of ~1500 images of size 8x8, and each pixel can take values in 
{
0
,
1
,
…
,
16
}
.

MNIST: size 28x28

In [21]:
from sklearn.datasets import load_digits
from sklearn import datasets

In [22]:
class Digits(Dataset):
    """Scikit-Learn Digits dataset."""

    def __init__(self, mode='train', transforms=None):
        digits = load_digits()
        if mode == 'train':
            self.data = digits.data[:1000].astype(np.float32)
        elif mode == 'val':
            self.data = digits.data[1000:1350].astype(np.float32)
        else:
            self.data = digits.data[1350:].astype(np.float32)

        self.transforms = transforms

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        if self.transforms:
            sample = self.transforms(sample)
        return sample

In [23]:
train_data = Digits(mode='train')
val_data = Digits(mode='val')
test_data = Digits(mode='test')

train_loader = DataLoader(train_data, batch_size=64, shuffle=True)
val_loader = DataLoader(val_data, batch_size=64, shuffle=False)
test_loader = DataLoader(test_data, batch_size=64, shuffle=False)

result_dir = './results/'
if not(os.path.exists(result_dir)):
    os.mkdir(result_dir)
name = 'vae'

In [24]:
for indx_batch, batch in enumerate(train_loader):
    if indx_batch < 2:
        print(batch.shape)
#         for indx_batch, (batch, _) in enumerate(training_loader):
#     batch = batch.to(device)

torch.Size([64, 64])
torch.Size([64, 64])


In [None]:
16*16

Information:
- name : MNIST
- length : 70000

Input Summary:
- shape : (28, 28, 1)
- range : (0.0, 1.0)

Target Summary:
- shape : (10,)
- range : (0.0, 1.0)