## Convolutional Variational Autoencoder

## Import TensorFlow and other libraries

In [None]:
try:
  # %tensorflow_version only exists in Colab.
  %tensorflow_version 2.x
except Exception:
  pass
import tensorflow as tf

import numpy as np
import matplotlib.pyplot as plt
import time
from matplotlib import offsetbox
from sklearn import (manifold, datasets, decomposition, ensemble,
                     discriminant_analysis, random_projection, neighbors)

## Load the MNIST dataset
Each MNIST image is originally a vector of 784 integers, each of which is between 0-255 and represents the intensity of a pixel. We model each pixel with a Bernoulli distribution in our model, and we statically binarize the dataset.

In [None]:
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()

As a localization target, we can use the specific manner in which some digit is written. Thus, VAE can be trained on a single digit.

In [None]:
mask = (train_labels == 2) | (train_labels == 7)
mask2 = (test_labels == 2) | (test_labels == 7)
train_images = train_images[mask]
test_images = test_images[mask2]

In [None]:
train_images = train_images.reshape(train_images.shape[0], 28, 28, 1).astype('float32')
test_images = test_images.reshape(test_images.shape[0], 28, 28, 1).astype('float32')

# Normalizing the images to the range of [0., 1.]
train_images /= 255.
test_images /= 255.

# Binarization

train_images[train_images >= .5] = 1.
train_images[train_images < .5] = 0.
test_images[test_images >= .5] = 1.
test_images[test_images < .5] = 0.


In [None]:
TRAIN_BUF = 60000
BATCH_SIZE = 100

TEST_BUF = 10000

Visualize some of the training images

In [None]:
fig = plt.figure(figsize=(5,5))
for i in range(25):
  plt.subplot(5, 5, i+1)
  plt.imshow(train_images[i, :, :, 0], cmap='gray')
  plt.axis('off')
plt.show()

## Use *tf.data* to create batches and shuffle the dataset

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices(train_images).shuffle(TRAIN_BUF).batch(BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices(test_images).shuffle(TEST_BUF).batch(BATCH_SIZE)

## Wire up the generative and inference network with *tf.keras.Sequential*

In our VAE example, we use two small ConvNets for the generative and inference network. Since these neural nets are small, we use `tf.keras.Sequential` to simplify our code. Let $x$ and $z$ denote the observation and latent variable respectively in the following descriptions.

### Generative Network
This defines the generative model which takes a latent encoding as input, and outputs the parameters for a conditional distribution of the observation, i.e. $p(x|z)$. Additionally, we use a unit Gaussian prior $p(z)$ for the latent variable.

### Inference Network
This defines an approximate posterior distribution $q(z|x)$, which takes as input an observation and outputs a set of parameters for the conditional distribution of the latent representation. In this example, we simply model this distribution as a diagonal Gaussian. In this case, the inference network outputs the mean and log-variance parameters of a factorized Gaussian (log-variance instead of the variance directly is for numerical stability).

### Reparameterization Trick
During optimization, we can sample from $q(z|x)$ by first sampling from a unit Gaussian, and then multiplying by the standard deviation and adding the mean. This ensures the gradients could pass through the sample to the inference network parameters.

### Network architecture
For the inference network, we use two convolutional layers followed by a fully-connected layer. In the generative network, we mirror this architecture by using a fully-connected layer followed by three convolution transpose layers (a.k.a. deconvolutional layers in some contexts). Note, it's common practice to avoid using batch normalization when training VAEs, since the additional stochasticity due to using mini-batches may aggravate instability on top of the stochasticity from sampling.

In [None]:
class CVAE(tf.keras.Model):
  def __init__(self, latent_dim):
    super(CVAE, self).__init__()
    self.latent_dim = latent_dim
    self.inference_net = tf.keras.Sequential(
      [
          tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
          tf.keras.layers.Conv2D(
              filters=32, kernel_size=3, strides=(2, 2), activation='relu'),
          tf.keras.layers.Conv2D(
              filters=64, kernel_size=3, strides=(2, 2), activation='relu'),
          tf.keras.layers.Flatten(),
          # No activation
          tf.keras.layers.Dense(latent_dim + latent_dim),
      ]
    )

    self.generative_net = tf.keras.Sequential(
        [
          tf.keras.layers.InputLayer(input_shape=(latent_dim,)),
          tf.keras.layers.Dense(units=7*7*32, activation=tf.nn.relu),
          tf.keras.layers.Reshape(target_shape=(7, 7, 32)),
          tf.keras.layers.Conv2DTranspose(
              filters=64,
              kernel_size=3,
              strides=(2, 2),
              padding="SAME",
              activation='relu'),
          tf.keras.layers.Conv2DTranspose(
              filters=32,
              kernel_size=3,
              strides=(2, 2),
              padding="SAME",
              activation='relu'),
          # No activation
          tf.keras.layers.Conv2DTranspose(
              filters=1, kernel_size=3, strides=(1, 1), padding="SAME"),
        ]
    )

  @tf.function
  def sample(self, eps=None):
    if eps is None:
      eps = tf.random.normal(shape=(100, self.latent_dim))
    return self.decode(eps, apply_sigmoid=True)

  def encode(self, x):
    mean, logvar = tf.split(self.inference_net(x), num_or_size_splits=2, axis=1)
    return mean, logvar

  def reparameterize(self, mean, logvar):
    eps = tf.random.normal(shape=mean.shape)
    return eps * tf.exp(logvar * .5) + mean

  def decode(self, z, apply_sigmoid=False):
    logits = self.generative_net(z)
    if apply_sigmoid:
      probs = tf.sigmoid(logits)
      return probs

    return logits

## Define the loss function and the optimizer

VAEs train by maximizing the evidence lower bound (ELBO) on the marginal log-likelihood:

$$\log p(x) \ge \text{ELBO} = \mathbb{E}_{q(z|x)}\left[\log \frac{p(x, z)}{q(z|x)}\right].$$

In practice, we optimize the single sample Monte Carlo estimate of this expectation:

$$\log p(x| z) + \log p(z) - \log q(z|x),$$
where $z$ is sampled from $q(z|x)$.

**Note**: we could also analytically compute the KL term, but here we incorporate all three terms in the Monte Carlo estimator for simplicity.

In [None]:
optimizer = tf.keras.optimizers.Adam(1e-4)
def log_normal_pdf(sample, mean, logvar, raxis=1):
  log2pi = tf.math.log(2. * np.pi)
  return tf.reduce_sum(
      -.5 * ((sample - mean) ** 2. * tf.exp(-logvar) + logvar + log2pi),
      axis=raxis)

@tf.function
def compute_loss(model, x):
  mean, logvar = model.encode(x)
  z = model.reparameterize(mean, logvar)
  x_logit = model.decode(z)

  cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=x)
  logpx_z = -tf.reduce_sum(cross_ent, axis=[1, 2, 3])
  logpz = log_normal_pdf(z, 0., 0.)
  logqz_x = log_normal_pdf(z, mean, logvar)
  return -tf.reduce_mean(logpx_z + logpz - logqz_x)

@tf.function
def compute_apply_gradients(model, x, optimizer):
  with tf.GradientTape() as tape:
    loss = compute_loss(model, x)
  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

## Training

* We start by iterating over the dataset
* During each iteration, we pass the image to the encoder to obtain a set of mean and log-variance parameters of the approximate posterior $q(z|x)$
* We then apply the *reparameterization trick* to sample from $q(z|x)$
* Finally, we pass the reparameterized samples to the decoder to obtain the logits of the generative distribution $p(x|z)$
* **Note:** Since we use the dataset loaded by keras with 60k datapoints in the training set and 10k datapoints in the test set, our resulting ELBO on the test set is slightly higher than reported results in the literature which uses dynamic binarization of Larochelle's MNIST.

## Generate Images

* After training, it is time to generate some images
* We start by sampling a set of latent vectors from the unit Gaussian prior distribution $p(z)$
* The generator will then convert the latent sample $z$ to logits of the observation, giving a distribution $p(x|z)$
* Here we plot the probabilities of Bernoulli distributions


In [None]:
epochs = 150
latent_dim = 16
testing = False

model = CVAE(latent_dim)

In [None]:
if testing == False:
  for epoch in range(1, epochs + 1):
    start_time = time.time()
    for train_x in train_dataset:
      compute_apply_gradients(model, train_x, optimizer)
    end_time = time.time()

    if epoch % 1 == 0:
      loss = tf.keras.metrics.Mean()
      for test_x in test_dataset:
        loss(compute_loss(model, test_x))
      elbo = -loss.result()
      print('Epoch: {}, Test set ELBO: {}, '
            'time elapse for current epoch {}'.format(epoch,
                                                      elbo,
                                                      end_time - start_time))

Test generating some images from randomly selected points in the latent space (matt's addition)

In [None]:
if testing == False:
  tf.random.set_seed(1234)
  latent_points = tf.random.normal(shape=[25, latent_dim])
  output_points = model.sample(latent_points)
  fig = plt.figure(figsize=(5,5))
  fig.suptitle("Latent Space Digits, " +  str(epochs) +" epochs", fontsize = 16)

  for i in range(25):
    epoch_img = plt.subplot(5, 5, i+1)
    plt.imshow(output_points[i, :, :, 0], cmap='gray')
    plt.axis('off')
  fig.savefig(str(epochs)+"_epochs.png")
  plt.show()


If trying to save output images for a specific range of epochs, this code will run instead of the above two.

In [None]:
if testing:
  latent_points = tf.random.normal(shape = [25, latent_dim])
  get_epoch_img = 100
  end_epochs = 2000

  for epoch in range(1, end_epochs + 1):
      for train_x in train_dataset:
        compute_apply_gradients(model, train_x, optimizer)
      
      if epoch % 1 == 0:
        loss = tf.keras.metrics.Mean()
        for test_x in test_dataset:
          loss(compute_loss(model, test_x))
        elbo = -loss.result()
        print('Epoch: {}'.format(epoch))
      if epoch == get_epoch_img:

        output_points = model.sample(latent_points)
        print('Image generated at: {}'.format(epoch))

        
        fig = plt.figure(figsize=(5,5))
        fig.suptitle("Latent Space Digits, " +  str(epoch) +" epochs", fontsize = 16)

        for i in range(25):
          epoch_img = plt.subplot(5, 5, i+1)
          plt.imshow(output_points[i, :, :, 0], cmap='gray')
          plt.axis('off')
        if epoch < 1000:
          num = "0" + str(epoch)
        else:
          num = str(epoch)
        fig.savefig("/content/"+"epochs_"+num+".png")

        get_epoch_img+=100

#Generate gif from images...will not be in order (ie 225, 25, 250, 275, 300, 75)
import os
import imageio

images = []

directory = "/content"
for filename in os.listdir(directory):
    if filename.endswith(".png"):
      images.append(imageio.imread(filename))

imageio.mimsave("/content/" + str(epoch)+"_epochs.gif", images)

        



# Localization Code

In [None]:
import cvxopt

from cvxopt import matrix
from cvxopt import solvers

def sum_square(X):
    return sum(np.square(X))

def localize(Data):
    # Take an input Data, place into standard form
    #Solve using cvxopt
    
    #%%  Problem data
    k = Data.nComparisons
    n = Data.nDims
    
    Xi = Data.Xi # 2 x k array
    Xj = Data.Xj
    
    #%%  START OF LOCALIZATION FROM PAIRED COMPARISONS ############
    
    ##### SET UP P MATRIX
    P = np.zeros((n+k,n+k))
    P[0:n,0:n] = np.eye(n,n)
    
    ##### SET UP h MATRIX
    b = sum_square(Xi) - sum_square(Xj)
    b = b.reshape(1,k)
    
    b = b.transpose()
    b = -b
    h = np.vstack((b, np.zeros((k,1)))) #2k x 1 vector
    
    #%% set up G matrix
    G11 = 2 * (Xj - Xi)
    G11 = G11.transpose() # k x n
    
    G21 = np.zeros((k,n)) # k x n
    
    G12 = -1 * np.eye(k,k) # k x k
    
    G22 = -1 * np.eye(k,k)
    
    G1 = np.hstack([G11, G12])
    G2 = np.hstack([G21, G22])
    
    G = np.vstack([G1,G2])
    #%% set up q matrix (n + k x 1)
    q1 = np.zeros((n,1))
    q2 = np.ones((k,1))
    q = np.vstack([q1,q2])
    
    #%% Now convert into cvxopt matrix form
    
    P = matrix(P)
    q = matrix(q)
    G = matrix(G)
    h = matrix(h)
    
    sol=cvxopt.solvers.qp(P, q, G, h)
    return sol

# Data Generation

In [None]:
# Similar to data object produced by MATLAB
class ComparisonData:
  def __init__(self, nComparisons, nDims):
    self.nComparisons = nComparisons
    self.nDims = nDims
    self.Xi = np.zeros((nDims, nComparisons))
    self.Xj = np.zeros((nDims, nComparisons))

Code below actually generates images from random points in latent space and prompts the user to compare them. 

Enter '1' if the image on the left is closer to your desired image than the one on the right. Enter anything else otherwise. 

In [None]:
def generate_image(model, test_input):
  predictions = model.sample(test_input)
  fig = plt.figure(figsize=(5,5)) # doesn't really have to be 5 by 5

  for i in range(predictions.shape[0]):
      plt.subplot(5, 5, i+1)
      plt.imshow(predictions[i, :, :, 0], cmap='gray')
      plt.axis('off')

  plt.show()

In [None]:
# Generate and label data 
nComparisons = 100
data = ComparisonData(nComparisons, latent_dim)

for i in range(data.nComparisons):
  paired_points = tf.random.normal(shape=[2, data.nDims])
  generate_image(model, paired_points)
  
  closer = input();
  if closer == 1:
    data.Xi[:, i] = paired_points[0];
    data.Xj[:, i] = paired_points[1];
  else:
    data.Xi[:, i] = paired_points[1];
    data.Xj[:, i] = paired_points[0];

##Solve localization and show target image

In [None]:
sol = localize(data)
point = sol['x'][:data.nDims]
point = np.array(point)
point.shape = (1, data.nDims)

In [None]:
generate_image(model, point)

In [None]:
def plot_embedding(X, title=None):
    y = train_labels[indeces]
    x_min, x_max = np.min(X, 0), np.max(X, 0)
    X = (X - x_min) / (x_max - x_min)

    plt.figure()
    ax = plt.subplot(111)
    for i in range(X.shape[0]):
        plt.text(X[i, 0], X[i, 1], str(y[i]),
                 color=plt.cm.Set1(y[i] / 10.),
                 fontdict={'weight': 'bold', 'size': 9})

    if hasattr(offsetbox, 'AnnotationBbox'):
        # only print thumbnails with matplotlib > 1.0
        shown_images = np.array([[1., 1.]])  # just something big
        for i in range(X.shape[0]):
            dist = np.sum((X[i] - shown_images) ** 2, 1)
            if np.min(dist) < 4e-3:
                # don't show points that are too close
                continue
            shown_images = np.r_[shown_images, [X[i]]]
            # imagebox = offsetbox.AnnotationBbox(
            #     offsetbox.OffsetImage(train_images[i], cmap=plt.cm.gray_r),
            #     X[i])
            # ax.add_artist(imagebox)
    plt.xticks([]), plt.yticks([])
    if title is not None:
        plt.title(title)

In [None]:
# pick n elements from train data to visualize
# takes too long to compute for all
n = 1000
indeces = np.random.choice(train_images.shape[0], n, replace=False)
X = np.reshape(train_images, (60000, 28*28))
X = X[indeces, :]
tsne = manifold.TSNE(n_components=2, init='pca', random_state=0)
X_tsne = tsne.fit_transform(X)
# y = train_labels[indeces]
plot_embedding(X_tsne, "t-SNE embedding of the digits in image space")

In [None]:
mean, logvar = model.encode(train_images[indeces])
z = model.reparameterize(mean, logvar)
z_tsne = tsne.fit_transform(z)
plot_embedding(z_tsne, "t-SNE embedding of the digits in latent space")