In [1]:
import os.path
import torch
%matplotlib inline
import torch
import torch.nn as nn
from torch.optim import Adam
from torchvision.datasets import MNIST
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, SubsetRandomSampler
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
#!pip install datasets
from keras.datasets import mnist
from sklearn import datasets
import keras
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler

In [65]:
#Loading MNIST dataset

# get cpu or gpu device for training
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using {} device".format(device))


#########################
# Data Preparation
###

# these functions download (if needed) and load MNIST or fashion-MNIST
# Note: sometimes the websites providing the data are down...
#
dataset_path = '~/datasets'
batch_size = 100

#transform the data into tensor
mnist_transform = transforms.Compose([
        transforms.ToTensor(),
])

kwargs = {'num_workers': 1, 'pin_memory': True} 


#split the dataset into 60000 training and 10000 validation samples
#they are already normalized
(train_x, train_labels), (test_x, test_labels) = mnist.load_data()

#train_x, train_labels, test_x, test_labels = datasets.load_mnist()
#train_x, train_labels, test_x, test_labels = datasets.load_fashion_mnist()



# normalize data
#train_x = datasets.normalize_min_max(train_x, 0., 1.)
#test_x = datasets.normalize_min_max(test_x, 0., 1.)

# split off a validation set (not used here, but good practice)
valid_x = train_x[-10000:, :]
train_x = train_x[:-10000, :]
valid_labels = train_labels[-10000:]
train_labels = train_labels[:-10000]

# generate torch tensors
train_x = torch.tensor(train_x).to(device)
valid_x = torch.tensor(valid_x).to(device)
test_x = torch.tensor(test_x).to(device)

# add dummy dimension for number of color channels
train_x = train_x.unsqueeze(1)
valid_x = valid_x.unsqueeze(1)
test_x = test_x.unsqueeze(1)

# print some summary characteristics of the datasets
print("Dataset summary:")
print(f"Train: {train_x.shape}")
print(f"Valid: {valid_x.shape}")
print(f"Test : {test_x.shape}")

# extract number of training data points, number of color channels, and number of data features
#train_N, train_C, train_D = train_x.shape
train_N = train_x.shape[0]
train_C = train_x.shape[1]
train_D = train_x.shape[2]
# compute image size from number of data features
imgsize = int(np.sqrt(train_D))

Using cpu device
Dataset summary:
Train: torch.Size([50000, 1, 28, 28])
Valid: torch.Size([10000, 1, 28, 28])
Test : torch.Size([10000, 1, 28, 28])


In [66]:
###########################################
class MLPDecoder(torch.nn.Module):
    def __init__(self, num_var, num_latent, num_neurons, var=0.05):
        super(MLPDecoder, self).__init__()

        self.num_var = num_var
        self.num_latent = num_latent
        self.num_neurons = num_neurons
        self.var = var

        # generate hidden layers
        # E.g., if num_neurons = [500, 1000, 500], then three hidden layers are generated,
        # with 500, 1000 and 400 neurons, respectively.
        layers = []
        num_units = [num_latent] + num_neurons
        for n_prev, n_next in zip(num_units[0:-1], num_units[1:]):
            layers.append(torch.nn.Linear(n_prev, n_next))
            layers.append(torch.nn.ReLU())
            #layers.append(torch.nn.Conv2d(64, 128, 5,1,2))
            # layers.append(torch.nn.BatchNorm1d(num_features=n_next))
            #layers.append(torch.nn.Conv2d(1, 64, 5,1,2))
  
        self.layers = torch.nn.ModuleList(layers)
        # generate output layers, mu
        self.mu = torch.nn.Linear(num_hidden[-1], num_var)

    def forward(self, z):
        res = z
        for layer in self.layers:
            res = layer(res)
        mu = self.mu(res)
        return mu
        
    def sample(self, N, convert_to_numpy=False, suppress_noise=True):
        with torch.no_grad():
            z = torch.randn(N, self.num_latent, device=device)
            mu = self.forward(z)
            x = mu
            # the conditional VAE distribution is isotropic Gaussian, hence we just add noise when sampling it
            # for images, one might want to suppress this
            if not suppress_noise:
                x += np.sqrt(self.var) * torch.randn(N, self.num_var, device=device)

        if convert_to_numpy:
            z = z.cpu().numpy()
            x = x.cpu().numpy()
        return x, z

In [67]:
#####################################
class MLPEncoder(torch.nn.Module):
    def __init__(self, num_var, num_latent, num_neurons):
        super(MLPEncoder, self).__init__()

        self.num_var = num_var
        self.num_latent = num_latent
        self.num_neurons = num_neurons

        layers = []
        num_units = [num_var] + num_neurons
        for n_prev, n_next in zip(num_units[0:-1], num_units[1:]):
            # layers.append(torch.nn.BatchNorm1d(num_features=n_next))
            #layers.append(torch.nn.Conv2d(1, 64, 5,1,2))
            #layers.append(torch.nn.ReLU())
            #layers.append(torch.nn.Conv2d(64, 128, 5,1,2))
            #layers.append(torch.nn.ReLU())
            #layers.append(nn.MaxPool2d(2))
            layers.append(torch.nn.Linear(n_prev, n_next))
            layers.append(torch.nn.ReLU())
        self.layers = torch.nn.ModuleList(layers)

        # generate output layers, mu and var
        self.mu = torch.nn.Linear(num_neurons[-1], num_latent)
        self.var = torch.nn.Linear(num_neurons[-1], num_latent)
        self.var_act = torch.nn.Softplus()

    def forward(self, x):
        res = x
        for layer in self.layers:
            res = layer(res)
        mu = self.mu(res)
        var = self.var_act(self.var(res))
        return mu, var

In [69]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# training params
# MNIST has 784 pixels
# we are using a fixed variance for the decoder
# the variance can be also made an output of the decoder, see lecture slides,
# but training becomes trickier and somewhat "brittle"

var_x = 0.05
num_latent = 50
num_hidden = [1000, 1000]
batch_size = 50
num_epochs = 2
decoder = MLPDecoder(train_C, num_latent, num_hidden, var=var_x).to(device)
encoder = MLPEncoder(train_C, num_latent, num_hidden).to(device)

print(decoder)
print(encoder)

MLPDecoder(
  (layers): ModuleList(
    (0): Linear(in_features=50, out_features=1000, bias=True)
    (1): ReLU()
    (2): Linear(in_features=1000, out_features=1000, bias=True)
    (3): ReLU()
  )
  (mu): Linear(in_features=1000, out_features=1, bias=True)
)
MLPEncoder(
  (layers): ModuleList(
    (0): Linear(in_features=1, out_features=1000, bias=True)
    (1): ReLU()
    (2): Linear(in_features=1000, out_features=1000, bias=True)
    (3): ReLU()
  )
  (mu): Linear(in_features=1000, out_features=50, bias=True)
  (var): Linear(in_features=1000, out_features=50, bias=True)
  (var_act): Softplus(beta=1, threshold=20)
)


In [70]:
optimizer = torch.optim.Adam(list(decoder.parameters()) + list(encoder.parameters()), lr=0.0001)

ELBO_history = []
for epoch in range(num_epochs):
    # make batches of training indices
    shuffled_idx = torch.randperm(50000)
    idx_batches = shuffled_idx.split(batch_size)

    sum_neg_ELBO = 0.0
    for batch_count, idx in enumerate(idx_batches):
        optimizer.zero_grad()

        batch_x = train_x[idx, :]
        #batch_x = batch_x.float()
        # batch_mu_z: batch_size, num_latent
        # batch_var_z: batch_size, num_latent
        batch_mu_z, batch_var_z = encoder(batch_x)

        # sample z, using the "reparametrization trick"
        batch_z = batch_mu_z + torch.sqrt(batch_var_z) * torch.randn(batch_var_z.shape, device=device)

        # mu_x: batch_size, D
        mu_x = decoder(batch_z)

        # squared distances between mu_x and batch_x
        d2 = (mu_x - batch_x) ** 2
        # Gaussian likelihood: 1/sqrt(2*pi*var) exp(-0.5 * (mu-x)**2 / var)
        # Thus, log-likelihood = -0.5 * ( log(2*pi*var) + (mu-x)**2 / var )
        log_p = -0.5 * torch.sum(np.log(decoder.var * 2 * np.pi) + d2 / decoder.var)
        KL = -0.5 * torch.sum(1 + torch.log(batch_var_z) - batch_mu_z**2 - batch_var_z)

        # we want to maximize the ELBO, hence minimize the negative ELBO
        negative_ELBO = -log_p + KL
        negative_ELBO.backward()
        optimizer.step()

        sum_neg_ELBO += negative_ELBO

    mean_neg_ELBO = sum_neg_ELBO / train_x.shape[0]
    print('epoch {}   mean negative ELBO = {}'.format(epoch, mean_neg_ELBO))
    ELBO_history.append(mean_neg_ELBO)

    if epoch % 5 == 0:
        with torch.no_grad():
            # sample from the VAE
            x, z = decoder.sample(5)

            # encode some samples
            mu_z, var_z = encoder(train_x[0:5, :])
            z_encoded = mu_z + torch.sqrt(var_z) * torch.randn(5, num_latent, device=device)

            # decode the samples
            x_decoded = decoder(z_encoded)

            # save images
            plot_img = np.stack((train_x[0:5, :].detach().cpu().numpy(),
                                 x_decoded.detach().cpu().numpy(),
                                 x.detach().cpu().numpy()))
            plot_img = np.reshape(plot_img, (15, 28, 28))
            file_name = os.path.join('img_vae', 'samples_{}.png'.format(epoch))
            datasets.save_image_stack(plot_img, 3, 5, file_name, margin = 3)

        plt.figure(1)
        plt.clf()
        plt.plot(ELBO_history)
        plt.savefig(os.path.join('img_vae', 'elbo.png'))


RuntimeError: ignored