Clone the repo below to virtual machine.

In [0]:
# !git clone https://github.com/yguo121/VAE_SVHN

In [0]:
# !git pull

Set working directory.

In [3]:
import os
os.chdir('/content/VAE_SVHN')
# !ls
!pwd

/content/VAE_SVHN


First upload `train_32x32.mat` and `test_32x32.mat` to virtual machine; then run the cell below to move these files to the current working directory.

In [0]:
# !mv ../test_32x32.mat test_32x32.mat
# !mv ../train_32x32.mat train_32x32.mat

In [0]:
# import packages
from __future__ import division
from __future__ import print_function
import os
import tensorflow as tf
import numpy as np
import scipy.io as sio
import matplotlib
from matplotlib import pyplot as plt
from dataset import *
from sys import exit
# from IPython import get_ipython

# Some Settings
# get_ipython().magic('matplotlib inline')
np.random.seed(0)
tf.set_random_seed(0)

In [6]:
# Load SVHN Data
print(os.getcwd())
dataset = SVHNDataset('.')
n_samples=dataset.train_size

/content/VAE_SVHN
Loading files
./train_32x32.mat
Convert to grey
SVHN loaded into memory


In [0]:
class VariationalAutoencoder(object):
    """ Variation Autoencoder (VAE) with an sklearn-like interface implemented using TensorFlow.
    https://jmetzen.github.io/2015-11-27/vae.html
    See "Auto-Encoding Variational Bayes" by Kingma and Welling for more details.
    """
    def __init__(self, network_architecture, transfer_fct=tf.nn.relu, 
                 learning_rate=0.001, batch_size=100):
        self.network_architecture = network_architecture
        self.transfer_fct = transfer_fct
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        # tf Graph input
        self.x = tf.placeholder(tf.float32, [None, network_architecture["n_input"]])
#        self.x = tf.placeholder(tf.float32, [None, 32, 32, 1])
        
        # Create autoencoder network
        self._create_network()
        # Define loss function based variational upper-bound and 
        # corresponding optimizer
        self._create_loss_optimizer()
        
        # Initializing the tensor flow variables
        init = tf.global_variables_initializer()

        # Launch the session
        self.sess = tf.InteractiveSession()
        self.sess.run(init)
    
    def _create_network(self):
        # Initialize autoencode network weights and biases
        network_weights = self._initialize_weights(**self.network_architecture)

        # Use recognition network to determine mean and 
        # (log) variance of Gaussian distribution in latent
        # space
        self.z_mean, self.z_log_sigma_sq = self._recognition_network(network_weights["weights_recog"], 
                                      network_weights["biases_recog"])

        # Draw one sample z from Gaussian distribution
        n_z = self.network_architecture["n_z"]
        eps = tf.random_normal((self.batch_size, n_z), 0, 1, 
                               dtype=tf.float32)
        # z = mu + sigma*epsilon
        self.z = tf.add(self.z_mean, 
                        tf.multiply(tf.sqrt(tf.exp(self.z_log_sigma_sq)), eps))

        # Use generator to determine mean of
        # Bernoulli distribution of reconstructed input
        self.x_reconstr_mean =             self._generator_network(network_weights["weights_gener"],
                                    network_weights["biases_gener"])
            
    def _initialize_weights(self, n_hidden_recog_1, n_hidden_recog_2, 
                            n_hidden_gener_1,  n_hidden_gener_2, 
                            n_input, n_z):
        all_weights = dict()
        initializer = tf.contrib.layers.xavier_initializer()
        all_weights['weights_recog'] = {
            'h1': tf.Variable(initializer(shape = [n_input, n_hidden_recog_1])),
            'h2': tf.Variable(initializer(shape = [n_hidden_recog_1, n_hidden_recog_2])),
            'out_mean': tf.Variable(initializer(shape = [n_hidden_recog_2, n_z])),
            'out_log_sigma': tf.Variable(initializer(shape = [n_hidden_recog_2, n_z]))}
        all_weights['biases_recog'] = {
            'b1': tf.Variable(tf.zeros([n_hidden_recog_1], dtype=tf.float32)),
            'b2': tf.Variable(tf.zeros([n_hidden_recog_2], dtype=tf.float32)),
            'out_mean': tf.Variable(tf.zeros([n_z], dtype=tf.float32)),
            'out_log_sigma': tf.Variable(tf.zeros([n_z], dtype=tf.float32))}
        all_weights['weights_gener'] = {
            'h1': tf.Variable(initializer(shape = [n_z, n_hidden_gener_1])),
            'h2': tf.Variable(initializer(shape = [n_hidden_gener_1, n_hidden_gener_2])),
            'out_mean': tf.Variable(initializer(shape = [n_hidden_gener_2, n_input])),
            'out_log_sigma': tf.Variable(initializer(shape = [n_hidden_gener_2, n_input]))}
        all_weights['biases_gener'] = {
            'b1': tf.Variable(tf.zeros([n_hidden_gener_1], dtype=tf.float32)),
            'b2': tf.Variable(tf.zeros([n_hidden_gener_2], dtype=tf.float32)),
            'out_mean': tf.Variable(tf.zeros([n_input], dtype=tf.float32)),
            'out_log_sigma': tf.Variable(tf.zeros([n_input], dtype=tf.float32))}
        return all_weights
            
    def _recognition_network(self, weights, biases):
        # Generate probabilistic encoder (recognition network), which
        # maps inputs onto a normal distribution in latent space.
        # The transformation is parametrized and can be learned.
        layer_1 = self.transfer_fct(tf.add(tf.matmul(self.x, weights['h1']), 
                                           biases['b1'])) 
        layer_2 = self.transfer_fct(tf.add(tf.matmul(layer_1, weights['h2']), 
                                           biases['b2']))
#        e_conv1 = self.transfer_fct(tf.layers.conv2d(self.x, 16, 5, strides=2, name='e_conv1'))
#        e_conv2 = self.transfer_fct(tf.layers.conv2d(e_conv1, 32, 5, strides=2,   name='e_conv2'))
#        e_conv2_flat = tf.reshape(e_conv2, [batch_size, -1])
#        z_mean = tf.layers.dense(e_conv2_flat, 32, name='mean')
#        z_log_sigma_sq = tf.layers.dense(e_conv2_flat, 32, name='stddev')
        z_mean = tf.add(tf.matmul(layer_2, weights['out_mean']),
                        biases['out_mean'])
        z_log_sigma_sq =             tf.add(tf.matmul(layer_2, weights['out_log_sigma']), 
                   biases['out_log_sigma'])
        return (z_mean, z_log_sigma_sq)

    def _generator_network(self, weights, biases):
        # Generate probabilistic decoder (decoder network), which
        # maps points in latent space onto a Bernoulli distribution in data space.
        # The transformation is parametrized and can be learned.
        layer_1 = self.transfer_fct(tf.add(tf.matmul(self.z, weights['h1']), 
                                           biases['b1'])) 
        layer_2 = self.transfer_fct(tf.add(tf.matmul(layer_1, weights['h2']), 
                                           biases['b2'])) 
        x_reconstr_mean =             tf.nn.sigmoid(tf.add(tf.matmul(layer_2, weights['out_mean']), 
                                 biases['out_mean']))
#        d_fc1 = self.transfer_fct(tf.layers.dense(self.z, 7*7*32, name='d_fc1'))
#        d_fc1 = tf.reshape(d_fc1, [batch_size, 7,7,32])
#        e_transpose_conv1 = self.transfer_fct(tf.layers.conv2d_transpose(d_fc1, 16, 5, strides=2, name='e_transpose_conv1'))
#        e_transpose_conv2 = self.transfer_fct(tf.layers.conv2d_transpose(e_transpose_conv1, 1, 5, strides=2, name='e_transpose_conv2'))
#        x_reconstr_mean = e_transpose_conv2[:,:32,:32,:]
        
        return x_reconstr_mean
            
    def _create_loss_optimizer(self):
        # The loss is composed of two terms:
        # 1.) The reconstruction loss (the negative log probability
        #     of the input under the reconstructed Bernoulli distribution 
        #     induced by the decoder in the data space).
        #     This can be interpreted as the number of "nats" required
        #     for reconstructing the input when the activation in latent
        #     is given.
        # Adding 1e-10 to avoid evaluation of log(0.0)
        reconstr_loss =             -tf.reduce_sum(self.x * tf.log(1e-10 + self.x_reconstr_mean)
                           + (1-self.x) * tf.log(1e-10 + 1 - self.x_reconstr_mean),
                           1)
        # 2.) The latent loss, which is defined as the Kullback Leibler divergence 
        ##    between the distribution in latent space induced by the encoder on 
        #     the data and some prior. This acts as a kind of regularizer.
        #     This can be interpreted as the number of "nats" required
        #     for transmitting the the latent space distribution given
        #     the prior.
        latent_loss = -0.5 * tf.reduce_sum(1 + self.z_log_sigma_sq 
                                           - tf.square(self.z_mean) 
                                           - tf.exp(self.z_log_sigma_sq), 1)
        self.cost = tf.reduce_mean(reconstr_loss + latent_loss)   # average over batch
        # Use ADAM optimizer
        self.optimizer =             tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(self.cost)
        
    def partial_fit(self, X):
        """Train model based on mini-batch of input data.
        
        Return cost of mini-batch.
        """
        opt, cost = self.sess.run((self.optimizer, self.cost), 
                                  feed_dict={self.x: X})
        return cost
    
    def transform(self, X):
        """Transform data by mapping it into the latent space."""
        # Note: This maps to mean of distribution, we could alternatively
        # sample from Gaussian distribution
        return self.sess.run(self.z_mean, feed_dict={self.x: X})
    
    def generate(self, z_mu=None):
        """ Generate data by sampling from latent space.
        
        If z_mu is not None, data for this point in latent space is
        generated. Otherwise, z_mu is drawn from prior in latent 
        space.        
        """
        if z_mu is None:
            z_mu = np.random.normal(size=self.network_architecture["n_z"])
        # Note: This maps to mean of distribution, we could alternatively
        # sample from Gaussian distribution
        return self.sess.run(self.x_reconstr_mean, 
                             feed_dict={self.z: z_mu})
    
    def reconstruct(self, X):
        """ Use VAE to reconstruct given data. """
        return self.sess.run(self.x_reconstr_mean, 
                             feed_dict={self.x: X})




# Train Function
def train(network_architecture, learning_rate=0.001,
          batch_size=100, training_epochs=10, display_step=1):
    vae = VariationalAutoencoder(network_architecture, 
                                 learning_rate=learning_rate, 
                                 batch_size=batch_size)
    # Training cycle
    for epoch in range(training_epochs):
        avg_cost = 0.
        total_batch = int(n_samples / batch_size)
        # Loop over all batches
        for i in range(total_batch):
            batch_xs= dataset.next_batch(batch_size).reshape(batch_size,1024)
#            batch_xs= dataset.next_batch(batch_size).reshape(batch_size, 32, 32, 1)
            # Fit training using batch data
            cost = vae.partial_fit(batch_xs)
            # Compute average loss
            avg_cost += cost / n_samples * batch_size

        # Display logs per epoch step
        if epoch % display_step == 0:
            print("Epoch:", '%04d' % (epoch+1), 
                  "cost=", "{:.9f}".format(avg_cost))
    return vae

In [0]:

# ## Illustrating reconstruction quality

# We can now train a VAE on SVHN by just specifying the network topology. We start with training a VAE with a 20-dimensional latent space.
VAE_EPOCHS = 30
VAE2D_EPOCHS = 20

network_architecture = dict(n_hidden_recog_1=500, # 1st layer encoder neurons
                            n_hidden_recog_2=500, # 2nd layer encoder neurons
                            n_hidden_gener_1=500, # 1st layer decoder neurons
                            n_hidden_gener_2=500, # 2nd layer decoder neurons
                            n_input=1024, # 
                            n_z=32)  # dimensionality of latent space

vae = train(network_architecture, training_epochs=VAE_EPOCHS)


# Based on this we can sample some test inputs and visualize how well the VAE can reconstruct those. In general the VAE does really well.

x_sample = dataset.next_test_batch(100)[0].reshape(100,1024)
x_reconstruct = vae.reconstruct(x_sample)
plt.figure(figsize=(8, 12))
for i in range(5):
    plt.subplot(5, 2, 2*i + 1)
    plt.imshow(x_sample[i].reshape(32, 32), vmin=0, vmax=1, cmap="gray")
    plt.title("Test input")
    plt.colorbar()
    plt.subplot(5, 2, 2*i + 2)
    plt.imshow(x_reconstruct[i].reshape(32, 32), vmin=0, vmax=1, cmap="gray")
    plt.title("Reconstruction")
    plt.colorbar()
plt.tight_layout()


# ## Illustrating latent space

# Next, we train a VAE with 2d latent space and illustrates how the encoder (the recognition network) encodes some of the labeled inputs (collapsing the Gaussian distribution in latent space to its mean). This gives us some insights into the structure of the learned manifold (latent space)

network_architecture = dict(n_hidden_recog_1=500, # 1st layer encoder neurons
                            n_hidden_recog_2=500, # 2nd layer encoder neurons
                            n_hidden_gener_1=500, # 1st layer decoder neurons
                            n_hidden_gener_2=500, # 2nd layer decoder neurons
                            n_input=1024, 
                            n_z=2)  # dimensionality of latent space

vae_2d = train(network_architecture, training_epochs=VAE2D_EPOCHS)


#x_sample = dataset.next_test_batch(5000).reshape(100,1024,3)[:,:,1].reshape(5000,1024)
x_sample, y_sample = dataset.next_test_batch(5000)
x_sample = x_sample.reshape(5000,1024)
z_mu = vae_2d.transform(x_sample)
plt.figure(figsize=(8, 6)) 
plt.scatter(z_mu[:, 0], z_mu[:, 1], c=y_sample.reshape(5000))
plt.colorbar()
plt.grid()


# An other way of getting insights into the latent space is to use the generator network to plot reconstrunctions at the positions in the latent space for which they have been generated:

nx = ny = 20
x_values = np.linspace(-3, 3, nx)
y_values = np.linspace(-3, 3, ny)

canvas = np.empty((32*ny, 32*nx))
for i, yi in enumerate(x_values):
    for j, xi in enumerate(y_values):
        z_mu = np.array([[xi, yi]]*vae.batch_size)
        x_mean = vae_2d.generate(z_mu)
        canvas[(nx-i-1)*32:(nx-i)*32, j*32:(j+1)*32] = x_mean[0].reshape(32, 32)

plt.figure(figsize=(8, 10))        
Xi, Yi = np.meshgrid(x_values, y_values)
plt.imshow(canvas, origin="upper", cmap="gray")
plt.tight_layout()



Epoch: 0001 cost= 661.694837503
Epoch: 0002 cost= 648.476237332
Epoch: 0003 cost= 647.376368103
Epoch: 0004 cost= 647.095272619
Epoch: 0005 cost= 646.946242425
Epoch: 0006 cost= 646.687570656
Epoch: 0007 cost= 646.249926665
Epoch: 0008 cost= 645.952744350
Epoch: 0009 cost= 645.716548035
Epoch: 0010 cost= 645.625780309
Epoch: 0011 cost= 645.508587177
Epoch: 0012 cost= 645.446675781
Epoch: 0013 cost= 645.404792585
Epoch: 0014 cost= 645.255426626
Epoch: 0015 cost= 645.067208108
Epoch: 0016 cost= 644.889021228
Epoch: 0017 cost= 644.791205927
Epoch: 0018 cost= 644.767307843
Epoch: 0019 cost= 644.706358169
Epoch: 0020 cost= 644.623626151
Epoch: 0021 cost= 644.558326357
Epoch: 0022 cost= 644.455421669
Epoch: 0023 cost= 644.390798155
Epoch: 0024 cost= 644.369331746
Epoch: 0025 cost= 644.300296541
Epoch: 0026 cost= 644.304770387
Epoch: 0027 cost= 644.257256996
Epoch: 0028 cost= 644.222063946
Epoch: 0029 cost= 644.184451637
Epoch: 0030 cost= 644.175194259




Epoch: 0001 cost= 676.469367418
Epoch: 0002 cost= 657.498972741
Epoch: 0003 cost= 655.159636015
Epoch: 0004 cost= 653.101496503
Epoch: 0005 cost= 651.930687704
Epoch: 0006 cost= 651.642836336
Epoch: 0007 cost= 651.410387930
Epoch: 0008 cost= 651.279048992
Epoch: 0009 cost= 651.140170579
