In [44]:
import matplotlib
import matplotlib.pyplot as plt
from IPython.display import Image, display, clear_output
import numpy as np
import torch
import gym
import matplotlib.image as mpimg
import cv2
import torchvision
from torch.utils.tensorboard import SummaryWriter



%matplotlib nbagg
%matplotlib inline
import seaborn as sns
sns.set_style("whitegrid")
sns.set_palette(sns.dark_palette("purple"))

In [45]:
def generate_image():
    env = gym.make('Riverraid-v0')
    S = env.observation_space.sample()
    S = cv2.resize(S, dsize=(32, 32), interpolation=cv2.INTER_CUBIC)
    
    S2 = np.zeros((32,32))
    # 601 luma
    S2 = S[:,:,0]*0.299 + S[:,:,1]*0.587 + S[:,:,2]*0.114
    S2-=np.mean(S2)
    S2/=np.std(S2)
    
    S2 = S2.flatten()
    
    return S2

In [46]:
def normalToInt8(I):
    I = I.numpy()
    
    maxd = 255
    mind = 0
    
    maxi = np.amax(I)
    mini = np.amin(I)
    
    m, n = I.shape
    c = (maxd - mind) / (maxi - mini)
    
    Itemp = I
    
    S = c * (Itemp -mini) + mind
    
    return S

In [47]:
import torch
cuda = torch.cuda.is_available()

from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor
from functools import reduce

# Flatten the images into a vector
flatten = lambda x: ToTensor()(x).view(32**2)

# Define the train and test sets
dset_train = np.zeros(shape=(32**2,1024))
dset_test = np.zeros(shape=(32**2,1024))


batch_size = 8
for i in range(1024):
    dset_train[:,i] = generate_image()

for i in range(1024):
    dset_test[:,i] = generate_image()
    

    
# The loaders perform the actual work
train_loader = DataLoader(dset_train, batch_size=batch_size, pin_memory=cuda)
test_loader  = DataLoader(dset_test, batch_size=batch_size, pin_memory=cuda)

In [48]:
import torch.nn as nn
from torch.nn.functional import softplus

# define size variables
num_features = 32**2

class VariationalAutoencoder(nn.Module):
    def __init__(self, latent_features, num_samples):
        super(VariationalAutoencoder, self).__init__()
        
        self.latent_features = latent_features
        self.num_samples = num_samples

        # We encode the data onto the latent space using two linear layers
        self.encoder = nn.Sequential(
            nn.Linear(in_features=num_features, out_features=256),
       #     nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Linear(in_features=256, out_features=128),
        #    nn.BatchNorm1d(128)
            nn.ReLU(),
            nn.Linear(in_features=128, out_features=128),
         #   nn.BatchNorm1d(128),
            nn.Tanh(),
            # A Gaussian is fully characterised by its mean and variance
            nn.Linear(in_features=128, out_features=2*self.latent_features) # <- note the 2*latent_features
        )
        
        # The latent code must be decoded into the original image
        self.decoder = nn.Sequential(
            nn.Linear(in_features=self.latent_features, out_features=128),
        #    nn.BatchNorm1d(128),
            nn.Tanh(),
            nn.Linear(in_features=128, out_features=128),
         #   nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Linear(in_features=128, out_features=256),
        #    nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Linear(in_features=256, out_features=num_features)
        )
        

    def forward(self, x): 
        outputs = {}
        
        # Split encoder outputs into a mean and variance vector
        mu, log_var = torch.chunk(self.encoder(x), 2, dim=-1)
        
        # Make sure that the log variance is positive
        log_var = softplus(log_var)
        
        # :- Reparametrisation trick
        # a sample from N(mu, sigma) is mu + sigma * epsilon
        # where epsilon ~ N(0, 1)
                
        # Don't propagate gradients through randomness
        with torch.no_grad():
            batch_size = mu.size(0)
            epsilon = torch.randn(batch_size, self.num_samples, self.latent_features)
            
            if cuda:
                epsilon = epsilon.cuda()
        
        sigma = torch.exp(log_var/2)
        
        # We will need to unsqueeze to turn
        # (batch_size, latent_dim) -> (batch_size, 1, latent_dim)
        z = mu.unsqueeze(1) + epsilon * sigma.unsqueeze(1)        
        
        # Run through decoder
        x = self.decoder(z)
        
        # The original digits are on the scale [0, 1]
        x = torch.sigmoid(x)
        
        # Mean over samples
        x_hat = torch.mean(x, dim=1)
        
        outputs["x_hat"] = x_hat
        outputs["z"] = z
        outputs["mu"] = mu
        outputs["log_var"] = log_var
        
        return outputs


latent_features = 2
num_samples = 25

net = VariationalAutoencoder(latent_features, num_samples)

# Transfer model to GPU if available
# if cuda:
#     net = net.cuda()

print(net)

VariationalAutoencoder(
  (encoder): Sequential(
    (0): Linear(in_features=1024, out_features=256, bias=True)
    (1): ReLU()
    (2): Linear(in_features=256, out_features=128, bias=True)
    (3): ReLU()
    (4): Linear(in_features=128, out_features=128, bias=True)
    (5): Tanh()
    (6): Linear(in_features=128, out_features=4, bias=True)
  )
  (decoder): Sequential(
    (0): Linear(in_features=2, out_features=128, bias=True)
    (1): Tanh()
    (2): Linear(in_features=128, out_features=128, bias=True)
    (3): ReLU()
    (4): Linear(in_features=128, out_features=256, bias=True)
    (5): ReLU()
    (6): Linear(in_features=256, out_features=1024, bias=True)
  )
)


In [49]:
from torch.nn.functional import binary_cross_entropy
from torch import optim

def ELBO_loss(y, t, mu, log_var):
    # Reconstruction error, log[p(x|z)]
    # Sum over features
    likelihood = -binary_cross_entropy(y, t, reduction="none")
    likelihood = likelihood.view(likelihood.size(0), -1).sum(1)

    # Regularization error: 
    # Kulback-Leibler divergence between approximate posterior, q(z|x)
    # and prior p(z) = N(z | mu, sigma*I).
    
    # In the case of the KL-divergence between diagonal covariance Gaussian and 
    # a standard Gaussian, an analytic solution exists. Using this excerts a lower
    # variance estimator of KL(q||p)
    kl = -0.5 * torch.sum(1 + log_var - mu**2 - torch.exp(log_var), dim=1)

    # Combining the two terms in the evidence lower bound objective (ELBO) 
    # mean over batch
    ELBO = torch.mean(likelihood) - torch.mean(kl)
    
    # notice minus sign as we want to maximise ELBO
    return -ELBO, kl.sum()


# define our optimizer
# The Adam optimizer works really well with VAEs.
optimizer = optim.Adam(net.parameters(), lr=0.001)
loss_function = ELBO_loss

In [50]:
from torch.autograd import Variable

x = next(iter(train_loader))
x = Variable(x)

if cuda:
    x = x.cuda()
    
x = x.float()  

print(x.shape)

outputs = net(x)

x_hat = outputs["x_hat"]
mu, log_var = outputs["mu"], outputs["log_var"]
z = outputs["z"]

loss, kl = loss_function(x_hat, x, mu, log_var)

print(x.shape)
print(x_hat.shape)
print(z.shape)
print(loss)
print(kl)

torch.Size([8, 1024])
torch.Size([8, 1024])
torch.Size([8, 1024])
torch.Size([8, 25, 2])
tensor(711.7577, grad_fn=<NegBackward>)
tensor(2.4241, grad_fn=<SumBackward0>)


In [54]:
import os

writer = SummaryWriter()

num_epochs = 10
tmp_img = "tmp_vae_out.png"
show_sampling_points = False

train_loss, valid_loss = [], []
train_kl, valid_kl = [], []

device = torch.device("cuda:0" if cuda else "cpu")
print("Using device:", device)

for epoch in range(num_epochs):
    batch_loss, batch_kl = [], []
    net.train()
    
    # Go through each batch in the training dataset using the loader
    # Note that y is not necessarily known as it is here
    for x in train_loader:
        x = Variable(x)
        
        # This is an alternative way of putting
        # a tensor on the GPU
        x = x.to(device)
        x = x.float()  
        
        outputs = net(x)        
        x_hat = outputs['x_hat']
        mu, log_var = outputs['mu'], outputs['log_var']

        elbo, kl = loss_function(x_hat, x, mu, log_var)
        
        optimizer.zero_grad()
        elbo.backward()
        optimizer.step()
        
        batch_loss.append(elbo.item())
        batch_kl.append(kl.item())

    train_loss.append(np.mean(batch_loss))
    train_kl.append(np.mean(batch_kl))

    # Evaluate, do not propagate gradients
    with torch.no_grad():
        net.eval()
        
        # Just load a single batch from the test loader
        x = next(iter(test_loader))
        x = Variable(x)
        
        x = x.to(device)
        x = x.float()  
        
        writer.add_image("Input image example", x[:,:], global_step=epoch, dataformats="HW")

        outputs = net(x)
        x_hat = outputs['x_hat']
        mu, log_var = outputs['mu'], outputs['log_var']
        z = outputs["z"]
        
        writer.add_image("Reconstruction", x_hat[:,:], global_step=epoch, dataformats="HW")

        elbo, kl = loss_function(x_hat, x, mu, log_var)
        
        # We save the latent variable and reconstruction for later use
        # we will need them on the CPU to plot
        x = x.to("cpu")
        x_hat = x_hat.to("cpu")
        z = z.detach().to("cpu").numpy()

        valid_loss.append(elbo.item())
        valid_kl.append(kl.item())
        
        writer.add_scalar('Loss', elbo, epoch)
    
    if epoch == 0:
        continue

    with torch.no_grad():
        epsilon = torch.from_numpy(grid).float().to(device)
        samples = torch.sigmoid(net.decoder(epsilon)).detach()


Using device: cpu


In [107]:
import torchvision

def phi_transformer(S, n_phi):
    pre_process = torchvision.transforms.Compose(
    [torchvision.transforms.ToPILImage(),
     torchvision.transforms.Grayscale(num_output_channels=1),
     torchvision.transforms.Resize([32,32]),
     torchvision.transforms.ToTensor(),
     torchvision.transforms.Normalize(mean=[0.], std=[1.])])

    lists = []
    for i in range(n_phi):
        lists.append(pre_process(S[i]))
        
    S = torch.stack(lists)
    return S.numpy()

<function Tensor.float>