# Customizable VAE demo

Author: Zhihan Yang, inspired by code written by Anurag Sarkar

Date: 19/12/13

The purpose of this notebook:
- With the help of `vae-designer-demo.ipynb`, I hope to construct arbitrary VAE architectures based on various model-level and layer-level parameters.
- Specially, I use this notebook to write up a function that takes in these parameters and output the desired VAE for training.

Todos:
- Remove `DataParallel` because I am agnostic towards how using multi-GPU training affects model convergence. (d)
- Instead of building the VAE from one class, build two subclasses (Encoder and Decoder) and let VAE inherit from them - the benefit is that now we can use `super(self, VAE).__init__` to directly initialize the encoder and decoder network. (d)
    - within the init function of VAE, pytorch only collects parameters that are of certain pytorch types, which prevents me from setting attributes to instances of type Encoder and Decoder; instead, I will create two methods
- Run nb2py on this notebook.
- Use the resulting script to help train a VAE for MNIST in `vae_fast_train_demo.ipynb`

In [1]:
%reload_ext autoreload
%autoreload 2
%matplotlib inline

In [12]:
#export
import torch.nn as nn
import torch.optim
from collections import OrderedDict 

## Ideal workflow for creating a trainable VAE

*vae-designer -(hyperparameters and values)-> design-dict -> methods for users*
- design a vae using **vae-designer**, take note of all the hyper-parameters and their values
- capture these values of all those parameters in a **design dict**
- pass the design dict to **methods for users** to get vae and optimizer

## Convolutional Sampler

In [118]:
#export
def conv_sampler(
    in_channels:int, 
    layer_num:int, 
    kernel_nums:tuple, 
    kernel_sizes:tuple, 
    strides:tuple, 
    paddings:tuple,
    final_activation:nn.Module=None,
    up_sample:bool=False,
    output_type:str='nn.Sequential'
)->nn.Sequential:
    """
    Return a convolutional sampler (nn.Sequential) with batch-normalizations and leaky ReLUs (for
    down-samplers) or ReLUs (for up-samplers).
    
    The DCGAN paper recommends that kernel sizes should be greater than 3, that strides should be 
    greater than 1, and batch-normalization should be used to guarantee a healthy gradient-flow.
    
    :param up_sample: whether the returned sampler is a up-sampler (default: False)
    """
    
    HYPERPARAMS = {
        'conv2d-bias':False,  # set to false because bn introduces biases
        'lrelu-negslope':0.2
    }
    
    # this insight comes from the dcgan paper
    if up_sample: 
        core_layer = nn.ConvTranspose2d
        core_layer_name = 'convtranpose2d'
        activation = nn.ReLU()
    else: 
        core_layer = nn.Conv2d
        core_layer_name = 'conv2d'
        activation = nn.LeakyReLU(HYPERPARAMS['lrelu-negslope'])
        
    layers = OrderedDict([])
    for i in range(layer_num):
        
        layers[f'block{i}-{core_layer_name}'] = core_layer(
            in_channels=in_channels, 
            out_channels=kernel_nums[i], 
            kernel_size=kernel_sizes[i], 
            stride=strides[i],
            padding=paddings[i],
            bias=HYPERPARAMS['conv2d-bias']
        )
        layers[f'block{i}-bn'] = nn.BatchNorm2d(kernel_nums[i])
        if i == layer_num - 1:
            if final_activation is not None:
                layers[f'block{i}-lrelu'] = final_activation
        else:
            layers[f'block{i}-lrelu'] = activation
        
        in_channels = kernel_nums[i]
        
    if output_type == 'nn.Sequential':
        return nn.Sequential(layers)
    elif output_type == 'OrderedDict':
        return layers  # useful for adding extra layers

## Design dicts (down_sampler, up_sampler, h_dim, z_dim)
Caution: Map the `down_sampler` and the `up_sampler` keys to OrderedDicts instead of nn.Sequential's.

In [127]:
#export -class:Designs
# configs of the vae used in pcgml projects
# designed with vae-designer

VAE_PCGML = {
    'down_sampler': conv_sampler(
        in_channels=17, 
        layer_num=2, 
        kernel_nums=(64, 128), 
        kernel_sizes=(4, 4), 
        strides=(2, 2), 
        paddings=(0, 0),
        final_activation=nn.LeakyReLU(0.2),
        up_sample=False,
        output_type='OrderedDict'
    ),
    'up_sampler': conv_sampler(
        in_channels=64, 
        layer_num=3, 
        kernel_nums=(128, 64, 17), 
        kernel_sizes=(4, 4, 4), 
        strides=(1, 2, 2), 
        paddings=(0, 1, 1),
        final_activation=nn.Sigmoid(),
        up_sample=True,
        output_type='OrderedDict'
    ),
    'h_dim': 512,
    'z_dim': 64,
}

## Custom layers

In [124]:
#export
class Flatten(nn.Module):
    def forward(self, input):
        return input.view(input.size(0), -1)  # view(batch_size, flattened_example)

class UnFlatten(nn.Module):
    
    def __init__(self, out_channels):
        self.out_channels = out_channels
    
    def forward(self, input):
        return input.view(input.size(0), self.out_channels, 1, 1)

## VAE class (design_dict, dev)

In [125]:
#export
class VAE(nn.Module):
    """Trainable variational auto-encoder implemented in PyTorch."""
    
    def __init__(self, design_dict:dict, dev:str):
        super(VAE, self).__init__()
        self.dev = dev
        
        # the down-sampler is an OrderedDict of layers
        design_dict['down_sampler']['flatten'] = Flatten()  # append a new layer at the end
        self.encoder = nn.Sequential(design_dict['down_sampler'])

        h_dim, z_dim = design_dict['h_dim'], design_dict['z_dim']
        self.fc1 = nn.Linear(h_dim, z_dim)  # get means
        self.fc2 = nn.Linear(h_dim, z_dim)  # get logvars
        self.fc3 = nn.Linear(z_dim, h_dim)  # process the samples for the up_sampler
        
        # the up-sampler is also an OrderedDict of layers
        design_dict['up_sampler']['unflatten'] = UnFlatten(out_channels=h_dim)
        design_dict['up_sampler'].move_to_end('unflatten', last=False)  # append a new layer at the front
        self.decoder = nn.Sequential(design_dict['up_sampler'])

    def reparametrize(self, mu:torch.Tensor, logvar:torch.Tensor)->torch.Tensor:
        """Helper method to self.bottleneck"""
        std = logvar.mul(0.5).exp_()  # logvar to std
        esp = torch.randn(*mu.size())  # number of std
        z = mu + std * esp.to(self.dev).double()  # sample latent vectors
        return z

    def bottleneck(self, h:torch.Tensor)->tuple:
        """Helper method to self.encode"""
        mu, logvar = self.fc1(h), self.fc2(h)
        z = self.reparametrize(mu, logvar)
        return z, mu, logvar

    def encode(self, x:torch.Tensor)->tuple:
        """Helper method to self.forward"""
        h = self.encoder(x)
        z, mu, logvar = self.bottleneck(h)
        return z, mu, logvar

    def decode(self, z:torch.Tensor)->torch.Tensor:
        """Helper method to self.forward"""
        z = self.fc3(z)
        z = self.decoder(z)
        return z

    def forward(self, x:torch.Tensor)->tuple:
        z, mu, logvar = self.encode(x)
        z = self.decode(z)
        return z, mu, logvar

## Methods for users

In [126]:
#export
def get_vae_and_opt(design_dict:dict, dev:str):
    """Get a trainable VAE and its optimizer."""
    vae = VAE(design_dict, dev)  # this dev is used in the VAE.reparameterize function
    vae = vae.to(dev).double()  # this dev decides where model parameters are loaded
    opt = torch.optim.Adam(vae.parameters(), lr=1e-3)
    return vae, opt

def load_vae(path:str, design_dict:dict, dev:str='cpu'):
    """
    Load trained weights into a VAE architecture.
    
    :param path: the path to the trained weights
    :param design_dict: the design dict of a VAE architecture
    :param dev: where the resulting model would exist (options: 'cpu', 'cuda') (default: 'cpu')
    """
    vae = VAE(design_dict, dev)
    vae = vae.to(dev).double()
    vae.load_state_dict(torch.load(path, map_location=dev))
    return vae