# Autoencoders

## 1. PCA usando SVD

Antes de abordar la implementación de autoencoders para el aprendizaje de representaciones partamos con recordar el tipo de representaciones que podemos aprender con un método analítico lineal como el análisis de componentes principales (PCA).

In [None]:
#
# Importar librerías que usaremos más adelante
#
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
from scipy.io import loadmat
import tensorflow as tf

%matplotlib inline
plt.style.use('ggplot')

In [None]:
#
# Cargar y visualizar el conjunto de datos que emplearemos
#
ex7data1 = loadmat('ex7data1.mat')
X = np.array(ex7data1['X'])
plt.axes().set_aspect('equal', 'datalim')
plt.xlabel('x1')
plt.ylabel('x2')
plt.scatter(X[:,0], X[:,1], marker='o', cmap='prism')
plt.show()

Antes de aplicar PCA es importante normalizar los datos. Los centraremos para tener una media $\mu \approx 0$ y varianza $\sigma^2 \approx 1$

In [None]:
mu = np.mean(X, axis=0)
sigma = np.std(X, axis=0)

print('Antes de normalizar')
print('mu    = {}'.format(mu))
print('sigma = {}'.format(sigma))
print('\nPrimeros 5 puntos:\n', X[:5])

X_norm = (X - mu) / sigma

mu_norm = np.mean(X_norm, axis=0)
sigma_norm = np.std(X_norm, axis=0)

print('\n\nDespués de normalizar')
print('mu    = {}'.format(mu_norm))
print('sigma = {}'.format(sigma_norm))
print('\nPrimeros 5 puntos:\n', X_norm[:5])

#
# Cargar y visualizar el conjunto de datos que emplearemos
#
ex7data1 = loadmat('ex7data1.mat')
X = np.array(ex7data1['X'])
plt.axes().set_aspect('equal', 'datalim')
plt.xlabel('x1')
plt.ylabel('x2')
plt.scatter(X_norm[:,0], X_norm[:,1], marker='o', cmap='prism')
plt.show()

Aplicando descomposición de valores singulares obtenemos las matrices $U$, $\Sigma$ y $V^\top$ tales que:

$$X = U \Sigma V^\top$$

* Las filas de $V^\top$ corresponden a los **vectores de cargas** de los componentes principales de $X$.
* Las columnas de $U \Sigma$ (o, lo que es lo mismo, $XV$) corresponden a los **vectores de puntajes** de los componentes principales de $X$.
* $\Sigma$ es una matriz diagonal que contiene la **varianza** $\sigma^2$ en cada uno de los ejes de los componentes principales de $X$

In [None]:
#
# SVD
#
(U, s, Vt) = np.linalg.svd(X_norm)

#
# Vectores de cargas
#
pca_loadings = Vt

print('El vector de cargas del primer componente principal es: {} ({:.1f}°)'.format(pca_loadings[0,:],
                                                                               np.arctan(pca_loadings[0,0] / pca_loadings[0,1]) * 360 / (2 * np.pi)))
print('El vector de cargas del primer componente principal es: {} ({:.1f}°)'.format(pca_loadings[1,:],
                                                                               np.arctan(pca_loadings[1,0] / pca_loadings[1,1]) * 360 / (2 * np.pi)))


In [None]:
#
# Reconstruimos Sigma para calcular la varianza explicada por cada componente
#
explained_variance = (s * s) / X_norm.shape[0]

print('Varianza explicada por cada componente principal:')
print(explained_variance)

In [None]:
#
# Visualizamos los datos normalizados y los vectores de cargas
#
plt.axes().set_aspect('equal', 'datalim')
plt.scatter(X_norm[:,0], X_norm[:,1], marker='o', cmap='prism')
plt.xlabel('x1')
plt.ylabel('x2')
a = 1.2
plt.annotate('PCA1', (pca_loadings[0,0]*explained_variance[0]*a, pca_loadings[0,1]*explained_variance[0]*a), color='blue', ha='right')
plt.arrow(0,0, pca_loadings[0,0]*explained_variance[0], pca_loadings[0,1]*explained_variance[0], color='b', head_width=0.1, length_includes_head=True)
plt.annotate('PCA2', (pca_loadings[1,0]*explained_variance[1]*a, pca_loadings[1,1]*explained_variance[1]*a), color='blue', ha='right')
plt.arrow(0,0, pca_loadings[1,0]*explained_variance[1], pca_loadings[1,1]*explained_variance[1], color='b', head_width=0.1, length_includes_head=False)
plt.show()

### Proyección de los datos sobre los componentes principales

Después de calcular los componentes principales, puedes usarlos para reducir la dimensión de características de tu conjunto de datos proyectando cada ejemplo sobre un espacio de menores dimensiones, $x^{(i)} \rightarrow z^{(i)}$ (p.ej., proyectando los datos de 2D a 1D).

Para proyectar los datos sólo necesitas truncar las primeras `K` columnas de la matriz de puntajes.

In [None]:
#
# Función para proyectar los datos sobre los primeros K componentes
#
def projectData(pca_loadings, X, K):
  V = pca_loadings.T         # Recuperamos V a partir de pca_loadings = V.T
  pca_scores = np.dot(X, V)  # Puntajes para todos los componentes
  Z = pca_scores[:,:K]       # Puntajes para los primeros K componentes
  return Z

In [None]:
#
# Proyectar los datos
#
Z = projectData(pca_loadings, X_norm, 1)

print('La proyección del primer ejemplo sobre la primera dimensión es: {}'.format(Z[0]))

### Reconstrucción de los datos a partir de la proyección

Luego de proyectar los datos en un espacio de menores dimensiones, puedes recuperar aproximadamente los datos proyectándolos de regreso en el espacio original de alta dimensión. Para hacerlo, multiplicamos la proyección Z por las primeras `K` filas de la matriz de carga.

In [None]:
#
# Función para reconstuir los datos a partir de la proyección Z
#
def recoverData(pca_loadings, Z):
    K = Z.shape[1]
    X_rec = np.dot(Z, pca_loadings[:K,:])
    return X_rec

In [None]:
#
# Reconstuir los datos
#
X_norm_rec = recoverData(pca_loadings, Z)

In [None]:
#
# Función para calcular el error cuadrático medio
#
def MSE(X, X_rec):
  return np.mean(np.sum((X - X_rec)**2, axis=1))

In [None]:
#
# Calcular el error de reconstrucción
#
error_de_reconstruccion = MSE(X_norm, X_norm_rec)
print('Error de reconstrucción (MSE) de los datos normalizados : {:.7f}'.format(error_de_reconstruccion))

### Visualización de las proyecciones

Una vez realizada la proyección y reconstrucción aproximada de los datos, veamos en el siguiente diagrama cómo la proyección afecta los datos. Los puntos de datos originales están indicados con color azul, mientras que los puntos de datos proyectados están indicados con color rojo. La proyección retiene efectivamente la información en la dirección del primer componente principal.

In [None]:
plt.axes().set_aspect('equal', 'datalim')
for i in range(X_norm.shape[0]):
    plt.arrow(X_norm[i,0], X_norm[i,1], X_norm_rec[i,0]-X_norm[i,0], X_norm_rec[i,1]-X_norm[i,1], color='k')
plt.scatter(X_norm[:,0], X_norm[:,1], marker='o', c='blue', label='Datos originales')  
plt.scatter(X_norm_rec[:,0], X_norm_rec[:,1], marker='o', c='red', label='Datos recuperados')
plt.xlabel('x1')
plt.ylabel('x2')
plt.legend(loc='upper left')
plt.show()

## 2. PCA usando un autoencoder lineal

Si construimos un autoencoder lineal a minimizar el error cuadrático medio, el autoencoder aprenderá a describir el mismo supespacio lineal que PCA.

In [None]:
# Adaptodo de https://github.com/ageron/handson-ml/blob/master/15_autoencoders.ipynb

tf.reset_default_graph()

#
# Dimensiones del modelo
#
n_inputs  = X.shape[1]    # 2D
n_hidden  = 1             # 1D
n_outputs = n_inputs

#
# Modelo
#
x = tf.placeholder(tf.float32, shape=[None, n_inputs])
hidden = tf.layers.dense(x, n_hidden, activation=None)
outputs = tf.layers.dense(hidden, n_outputs, activation=None)

#
# Función de pérdida (MSE) y entrenamiento
#
mse_loss = tf.reduce_mean(tf.reduce_sum(tf.square(x - outputs), axis=1))

optimizer = tf.train.AdamOptimizer(0.1)
training_op = optimizer.minimize(mse_loss)

In [None]:
n_iterations = 1001

with tf.Session() as sess:
  tf.global_variables_initializer().run()
  for iteration in range(n_iterations):
    _, mse = sess.run([training_op, mse_loss],
                           feed_dict={x: X_norm})
    if iteration % 100 == 0:
        print('Iteración: {:04d}, MSE: {:.9f}'.format(iteration, mse))

  codings, X_rec_nn = sess.run([hidden, outputs], feed_dict={x: X_norm})

In [None]:
print('Error de reconstrucción (MSE) con PCA analítico : {:.7f}'.format(error_de_reconstruccion))
print('Error de reconstrucción (MSE) con PCA autoencoder : {:.7f}'.format(mse))

In [None]:
plt.axes().set_aspect('equal', 'datalim')
for i in range(X_norm.shape[0]):
    plt.arrow(X_norm[i,0], X_norm[i,1], X_rec_nn[i,0]-X_norm[i,0], X_rec_nn[i,1]-X_norm[i,1], color='k')
plt.scatter(X_norm[:,0], X_norm[:,1], marker='o', c='blue', label='Datos originales')  
plt.scatter(X_rec_nn[:,0], X_rec_nn[:,1], marker='o', c='red', label='Datos recuperados con autoencoder')
plt.xlabel('x1')
plt.ylabel('x2')
plt.legend(loc='upper left')
plt.show()

## 2. Generalización de PCA usando un autoencoder no lineal

In [None]:
tf.reset_default_graph()

#
# Dimensiones del modelo
#
n_inputs  = X.shape[1]    # 2D
n_hidden1 = 2
n_hidden2 = 1             # Codings
n_hidden3 = n_hidden1
n_outputs = n_inputs

#
# Modelo
#
x = tf.placeholder(tf.float32, shape=[None, n_inputs])
hidden1 = tf.layers.dense(x, n_hidden1, activation=tf.nn.sigmoid)
hidden2 = tf.layers.dense(hidden1, n_hidden2, activation=tf.nn.sigmoid)
hidden3 = tf.layers.dense(hidden2, n_hidden3, activation=tf.nn.sigmoid)
outputs = tf.layers.dense(hidden3, n_outputs, activation=None)

#
# Función de pérdida (MSE) y entrenamiento
#
mse_loss = tf.reduce_mean(tf.reduce_sum(tf.square(x - outputs), axis=1))

optimizer = tf.train.AdamOptimizer(0.15)
training_op = optimizer.minimize(mse_loss)

In [None]:
n_iterations = 2001

with tf.Session() as sess:
  tf.global_variables_initializer().run()
  for iteration in range(n_iterations):
    _, mse = sess.run([training_op, mse_loss],
                           feed_dict={x: X_norm})
    if iteration % 100 == 0:
        print('Iteración: {:04d}, MSE: {:.9f}'.format(iteration, mse))

  codings, X_rec_nn = sess.run([hidden2, outputs], feed_dict={x: X_norm})

In [None]:
print('Error de reconstrucción (MSE) con PCA analítico         : {:.7f}'.format(error_de_reconstruccion))
print('Error de reconstrucción (MSE) con autoencoder no lineal : {:.7f}'.format(mse))

In [None]:
plt.axes().set_aspect('equal', 'datalim')
#for i in range(X_norm.shape[0]):
#    plt.arrow(X_norm[i,0], X_norm[i,1], X_rec_nn[i,0]-X_norm[i,0], X_rec_nn[i,1]-X_norm[i,1], color='k')
plt.scatter(X_norm[:,0], X_norm[:,1], marker='o', c='blue', label='Datos originales')  
plt.scatter(X_rec_nn[:,0], X_rec_nn[:,1], marker='o', c='red', label='Datos recuperados con autoencoder')
plt.xlabel('x1')
plt.ylabel('x2')
plt.legend(loc='upper left')
plt.show()

## 3. PCA estándar de 30 dimensiones para MNIST

En adelante usaremos MNIST para visualizar la calidad de las representaciones obtenidas. Empecemos por establecer una línea base con PCA estándar de 30 dimensiones.

In [None]:
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets("MNIST_data/", one_hot=True, reshape=True)


In [None]:
#
# Proyección y reconstrución de MNIST usando PCA de 30 dimensiones
#
X = mnist.train.images[:5000]   # Si usamos las 55000 recibimos un error de memoria (!)
(U, s, Vt) = np.linalg.svd(X)
Z = projectData(Vt, X, 30)      # Proyectamos los datos a los primeros 30 componentes principales...
R = recoverData(Vt, Z)          # ...y reconstruimos los datos a su espacio original
error_de_reconstruccion = MSE(X, R)
print('Error de reconstrucción (MSE) de los datos normalizados : {:.7f}'.format(error_de_reconstruccion))

In [None]:
#
# Función de ayuda para visualizar las imágenes
#
from matplotlib.pyplot import figure, imshow, axis

def mnist_grid(X):
  X = X.reshape([-1, 28, 28])
  num_images = X.shape[0]
  fig = figure()
  fig.set_size_inches(10, 10)

  for i in range(num_images):
      a=fig.add_subplot(1, num_images, i +1)
      image = X[i,:]
      imshow(image,cmap='gray')
      axis('off')
  plt.show()

Visualicemos las primeras 10 imágenes originales

In [None]:
mnist_grid(X[:10])

Visualicemos su reconstrucción a partir de los primeros 30 componentes de PCA

In [None]:
mnist_grid(R[:10])

## 4. Autoencoder profundo subcompleto 784-256-128-32

In [None]:
tf.reset_default_graph()

#
# Dimensiones del modelo
#
n_inputs  = X.shape[1]    # 28 x 28
n_hidden1 = 256
n_hidden2 = 128
n_hidden3 = 32             # Codings
n_hidden4 = n_hidden2
n_hidden5 = n_hidden1
n_outputs = n_inputs

#
# Modelo
#
x = tf.placeholder(tf.float32, shape=[None, n_inputs])
hidden1 = tf.layers.dense(x, n_hidden1, activation=tf.nn.sigmoid)
hidden2 = tf.layers.dense(hidden1, n_hidden2, activation=tf.nn.sigmoid)
hidden3 = tf.layers.dense(hidden2, n_hidden3, activation=tf.nn.sigmoid)
hidden4 = tf.layers.dense(hidden3, n_hidden4, activation=tf.nn.sigmoid)
hidden5 = tf.layers.dense(hidden4, n_hidden5, activation=tf.nn.sigmoid)
outputs = tf.layers.dense(hidden5, n_outputs, activation=None)

#
# Función de pérdida (MSE) y entrenamiento
#
mse_loss = tf.reduce_mean(tf.reduce_sum(tf.square(x - outputs), axis=1))

optimizer = tf.train.AdamOptimizer()
training_op = optimizer.minimize(mse_loss)

In [None]:
n_iterations = 1001

with tf.Session() as sess:
  tf.global_variables_initializer().run()
  for iteration in range(n_iterations):
    _, mse = sess.run([training_op, mse_loss],
                           feed_dict={x: X})
    if iteration % 100 == 0:
        print('Iteración: {:04d}, MSE: {:.9f}'.format(iteration, mse))
        R = sess.run(outputs, feed_dict={x: X[:10]})
        mnist_grid(R)

## 5. Autoencoder disperso sobrecompleto 784-1000-784

In [None]:
tf.reset_default_graph()

#
# Dimensiones del modelo
#
n_inputs = X.shape[1]    # 28 x 28
n_hidden1 = 1000         # Codings
n_outputs = n_inputs

sparsity_target = 0.032
sparsity_weight = 0.2

def kl_divergence(p, q):
    # Kullback Leibler divergence
    return p * tf.log(p / q) + (1 - p) * tf.log((1 - p) / (1 - q))

#
# Modelo
#
x = tf.placeholder(tf.float32, shape=[None, n_inputs])
hidden1 = tf.layers.dense(x, n_hidden1, activation=tf.nn.sigmoid)
outputs = tf.layers.dense(hidden1, n_outputs, activation=None)

#
# Función de pérdida (MSE) y entrenamiento
#
hidden1_mean = tf.reduce_mean(hidden1, axis=0) # batch mean
sparsity_loss = tf.reduce_sum(kl_divergence(sparsity_target, hidden1_mean))
mse_loss = tf.reduce_mean(tf.reduce_sum(tf.square(x - outputs), axis=1))
loss = mse_loss + sparsity_weight * sparsity_loss

optimizer = tf.train.AdamOptimizer()
training_op = optimizer.minimize(loss)

In [None]:
n_iterations = 1001

with tf.Session() as sess:
  tf.global_variables_initializer().run()
  for iteration in range(n_iterations):
    _, mse = sess.run([training_op, mse_loss],
                           feed_dict={x: X})
    if iteration % 100 == 0:
        print('Iteración: {:04d}, MSE: {:.9f}'.format(iteration, mse))
        R = sess.run(outputs, feed_dict={x: X[:10]})
        mnist_grid(R)

## 6. Denoising autoencoder 784-256-128-32

In [None]:
tf.reset_default_graph()

#
# Dimensiones del modelo
#
n_inputs  = X.shape[1]    # 28 x 28
n_hidden1 = 256
n_hidden2 = 128
n_hidden3 = 32             # Codings
n_hidden4 = n_hidden2
n_hidden5 = n_hidden1
n_outputs = n_inputs
noise_level = 1.0

#
# Modelo
#
x = tf.placeholder(tf.float32, shape=[None, n_inputs])
x_noisy = x + noise_level * tf.random_normal(tf.shape(x))
hidden1 = tf.layers.dense(x_noisy, n_hidden1, activation=tf.nn.sigmoid)
hidden2 = tf.layers.dense(hidden1, n_hidden2, activation=tf.nn.sigmoid)
hidden3 = tf.layers.dense(hidden2, n_hidden3, activation=tf.nn.sigmoid)
hidden4 = tf.layers.dense(hidden3, n_hidden4, activation=tf.nn.sigmoid)
hidden5 = tf.layers.dense(hidden4, n_hidden5, activation=tf.nn.sigmoid)
outputs = tf.layers.dense(hidden5, n_outputs, activation=None)

#
# Función de pérdida (MSE) y entrenamiento
#
mse_loss = tf.reduce_mean(tf.reduce_sum(tf.square(x - outputs), axis=1))

optimizer = tf.train.AdamOptimizer()
training_op = optimizer.minimize(mse_loss)

In [None]:
n_iterations = 1001

with tf.Session() as sess:
  tf.global_variables_initializer().run()
  for iteration in range(n_iterations):
    _, mse = sess.run([training_op, mse_loss],
                           feed_dict={x: X})
    if iteration % 100 == 0:
        print('Iteración: {:04d}, MSE: {:.9f}'.format(iteration, mse))
        R = sess.run(outputs, feed_dict={x: X[:10]})
        mnist_grid(R)

## 7. Variational autoencoder 784-256-256-32

In [None]:
tf.reset_default_graph()

#
# Dimensiones del modelo
#
n_inputs  = X.shape[1]    # 28 x 28
n_hidden1 = 256
n_hidden2 = 256
n_hidden3 = 32             # Codings
n_hidden4 = n_hidden2
n_hidden5 = n_hidden1
n_outputs = n_inputs

#
# Modelo
#
x = tf.placeholder(tf.float32, shape=[None, n_inputs])
hidden1 = tf.layers.dense(x, n_hidden1, activation=tf.nn.elu)
hidden2 = tf.layers.dense(hidden1, n_hidden2, activation=tf.nn.elu)
hidden3_mean = tf.layers.dense(hidden2, n_hidden3, activation=None)
hidden3_sigma = tf.layers.dense(hidden2, n_hidden3, activation=None)
noise = tf.random_normal(tf.shape(hidden3_sigma), dtype=tf.float32)
hidden3 = hidden3_mean + hidden3_sigma * noise
hidden4 = tf.layers.dense(hidden3, n_hidden4, activation=tf.nn.elu)
hidden5 = tf.layers.dense(hidden4, n_hidden5, activation=tf.nn.elu)
logits = tf.layers.dense(hidden5, n_outputs, activation=None)
outputs = tf.sigmoid(logits)

#
# Función de pérdida (MSE) y entrenamiento
#
xentropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=x, logits=logits)
xentropy_loss = tf.reduce_sum(xentropy)
eps = 1e-10 # smoothing term to avoid computing log(0) which is NaN
latent_loss = 0.5 * tf.reduce_sum(
    tf.square(hidden3_sigma) + tf.square(hidden3_mean)
    - 1 - tf.log(eps + tf.square(hidden3_sigma)))
loss = xentropy_loss + latent_loss

mse_loss = tf.reduce_mean(tf.reduce_sum(tf.square(x - outputs), axis=1))

optimizer = tf.train.AdamOptimizer()
training_op = optimizer.minimize(loss)
saver = tf.train.Saver()

In [None]:
n_iterations = 2001

with tf.Session() as sess:
  tf.global_variables_initializer().run()
  for iteration in range(n_iterations):
    _, mse = sess.run([training_op, mse_loss],
                           feed_dict={x: X})
    if iteration % 100 == 0:
        print('Iteración: {:04d}, MSE: {:.9f}'.format(iteration, mse))
        R = sess.run(outputs, feed_dict={x: X[:10]})
        mnist_grid(R)
        
  saver.save(sess, "./VAE.ckpt") 

In [None]:
n_digits = 10
random_codings = np.random.normal(size=[n_digits, n_hidden3])

with tf.Session() as sess:
  saver.restore(sess, "./VAE.ckpt")
  generated_images = sess.run(outputs, feed_dict={hidden3: random_codings})

mnist_grid(generated_images)