https://nthu-datalab.github.io/ml/labs/14-1_Autoencoder/14-1_Autoencoder.html

In [None]:
%env CUDA_VISIBLE_DEVICES=2
%matplotlib inline
# Import libraries
import sys
import os

import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

RNG_SEED = 0
VALID_SIZE = 5000
TEST_SIZE = 10
BATCH_SIZE = 5000
EPOCH = 64
MNIST_H = 28
MNIST_W = 28
MNIST_C = 1
MNIST_SHAPE = (MNIST_H, MNIST_W, MNIST_C)
NOICE = 0.4
LNT_DIM = 32
DM_DIM = 16
LEARNING_RATE = 1.0e-04

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(gpus[0], True)
tf.config.experimental.set_virtual_device_configuration(gpus[0], [tf.config.experimental.VirtualDeviceConfiguration(memory_limit = 0x2000)])

In [None]:
rng = np.random.RandomState(RNG_SEED)

(train_images, _), (test_images, _) = tf.keras.datasets.mnist.load_data()

train_images = train_images.reshape(-1, 28, 28, 1).astype('float32')
test_images = test_images.reshape(-1, 28, 28, 1).astype('float32')

# Normalizing the images to the range of [0., 1.]
train_images /= 255.0
test_images /= 255.0

x_train = train_images[VALID_SIZE : ]
x_valid = train_images[: VALID_SIZE]
x_test = test_images[: TEST_SIZE]
TRAIN_SIZE = len(x_train)
TRAIN_RCP = np.float32(1.0 / TRAIN_SIZE)
VALID_RCP = np.float32(1.0 / VALID_SIZE)
x_train_noise = np.clip(
    x_train + rng.normal(loc = 0.0, scale = NOICE, size = (TRAIN_SIZE, MNIST_H, MNIST_W, MNIST_C)).astype(np.float32),
    0.0,
    1.0
)
x_valid_noise = np.clip(
    x_valid + rng.normal(loc = 0.0, scale = NOICE, size = (VALID_SIZE, MNIST_H, MNIST_W, MNIST_C)).astype(np.float32),
    0.0,
    1.0
)
x_test_noise = np.clip(
    x_test + rng.normal(loc = 0.0, scale = NOICE, size = (TEST_SIZE, MNIST_H, MNIST_W, MNIST_C)).astype(np.float32),
    0.0,
    1.0
)

# build datasets
ds_train = tf.data.Dataset.from_tensor_slices(x_train).batch(BATCH_SIZE)
ds_valid = tf.data.Dataset.from_tensor_slices(x_valid).batch(BATCH_SIZE)
ds_train_noise = tf.data.Dataset.from_tensor_slices(x_train_noise).batch(BATCH_SIZE)
ds_valid_noise = tf.data.Dataset.from_tensor_slices(x_valid_noise).batch(BATCH_SIZE)

# show data
dmy = []
print('origin data')
fig, axs = plt.subplots(1, 10, figsize=(10,1))
for idx, ax in enumerate(axs):
    ax.imshow(x_train[idx][:, :, 0], cmap = 'gray')
    ax.set_xticks(dmy)
    ax.set_yticks(dmy)
plt.show()
print('noisy data')
fig, axs = plt.subplots(1, 10, figsize=(10,1))
for idx, ax in enumerate(axs):
    ax.imshow(x_train_noise[idx][:, :, 0], cmap = 'gray')
    ax.set_xticks(dmy)
    ax.set_yticks(dmy)
plt.show()

In [None]:
class AutoEncoder(tf.keras.Model):
  def __init__(self, latent_dim):
    super(AutoEncoder, self).__init__()
    self.latent_dim = latent_dim
    self.encoder = tf.keras.Sequential([
          tf.keras.layers.InputLayer(input_shape = MNIST_SHAPE),
          tf.keras.layers.Conv2D(
              filters = 32, kernel_size = 3, strides = (2, 2), activation = 'relu'
          ),
          tf.keras.layers.Conv2D(
              filters = 64, kernel_size = 3, strides = (2, 2), activation = 'relu'
          ),
          tf.keras.layers.Flatten(),
          # No activation
          tf.keras.layers.Dense(latent_dim),
    ])

    self.decoder = tf.keras.Sequential([
          tf.keras.layers.InputLayer(input_shape=(latent_dim,)),
          tf.keras.layers.Dense(units = 7*7*32, activation = tf.nn.relu),
          tf.keras.layers.Reshape(target_shape = (7, 7, 32)),
          tf.keras.layers.Conv2DTranspose(
              filters = 64,
              kernel_size = 3,
              strides = (2, 2),
              padding = "SAME",
              activation = 'relu'
          ),
          tf.keras.layers.Conv2DTranspose(
              filters = 32,
              kernel_size = 3,
              strides = (2, 2),
              padding = "SAME",
              activation = 'relu'
          ),
          # No activation
          tf.keras.layers.Conv2DTranspose(
              filters = 1, kernel_size = 3, strides = (1, 1), padding = "SAME"
          ),
    ])

  @tf.function
  def call(self, x):
    return self.decoder(self.encoder(x))

In [None]:
modelA = AutoEncoder(LNT_DIM)
optimizer = tf.keras.optimizers.Adam(LEARNING_RATE)
tvs = modelA.trainable_variables
train_loss_A = [None] * EPOCH
valid_loss_A = [None] * EPOCH
for i in range(EPOCH):
    total_loss = 0.0
    for tx in ds_train:
        with tf.GradientTape() as tape:
            out = modelA(tx)
            loss = tf.reduce_mean(tf.square(out - tx))
        total_loss += loss
        # Slow
        optimizer.apply_gradients(
            zip(
                tape.gradient(loss, tvs),
                tvs
            )
        )
    train_loss_A[i] = total_loss * TRAIN_RCP
    
    total_loss = 0.0
    for tx in ds_valid:
        out = modelA(tx)
        total_loss += tf.reduce_mean(tf.square(out - tx))
    valid_loss_A[i] = total_loss * VALID_RCP

In [None]:
plt.plot(range(EPOCH), train_loss_A, color = 'blue', label = 'Train loss')
plt.plot(range(EPOCH), valid_loss_A, color = 'red', label = 'Valid loss')
plt.legend(loc="upper right")
plt.xlabel('#Epoch')
plt.ylabel('Loss')
plt.show()

In [None]:
def plot_imgs(imgs, n, title=None):
    fig, axs = plt.subplots(1, n, figsize = (n, 2))
    for i in range(n):
        axs[i].imshow(imgs[i][... ,0], cmap = 'gray')
        axs[i].get_xaxis().set_visible(False)
        axs[i].get_yaxis().set_visible(False)
    if title is not None:
        fig.suptitle(title)
    plt.show()

In [None]:
plot_imgs(x_test[: TEST_SIZE], n = TEST_SIZE, title = 'Test Samples')
plot_imgs(modelA(tf.convert_to_tensor(x_test[: TEST_SIZE])), n = TEST_SIZE, title = 'Reconstruct Samples')

In [None]:
@tf.function
def jacob(f, x):
        # return gradient df(x)/dx
        y = f(x)[0]
        return tf.convert_to_tensor([
            tf.gradients(
                y[i],
                x
            )[0][0, :, :]
        for i in range(LNT_DIM) ])
    
def tangent_vecs(jaco_matrix):    
    # get jacobian matrix of size (code_size * img_dim)    
    # get tangent vectors via SVD
    U, s, V = np.linalg.svd(jaco_matrix, full_matrices=False)
    plt.bar(range(s.shape[0]), s, alpha=0.5)
    plt.ylabel('SVD values')
    plt.xlabel('Index')
    plt.tight_layout()
    plt.show()
    return U, s, V

In [None]:
img = x_test[0]
plt.imshow(img[..., 0],cmap='gray')
plt.show()

x = tf.convert_to_tensor(img[None, ...])
J = jacob(modelA.encoder, x).numpy()
U, s, V = tangent_vecs(J.reshape([-1, 28 * 28]))
print(J.shape)
print(U.shape)
print(s.shape)
print(V.shape)
plot_imgs(J, n = DM_DIM, title = 'Jacobian Matrix')
plot_imgs(V.reshape([-1, 28, 28, 1]), n = DM_DIM, title = 'Tangent Vectors')

In [None]:
modelB = AutoEncoder(LNT_DIM)
optimizer = tf.keras.optimizers.Adam(LEARNING_RATE)
tvs = modelB.trainable_variables
train_loss_B = [None] * EPOCH
valid_loss_B = [None] * EPOCH
for i in range(EPOCH):
    total_loss = 0.0
    for tx, ty in zip(ds_train, ds_train_noise):
        with tf.GradientTape() as tape:
            out = modelB(ty)
            loss = tf.reduce_mean(tf.square(out - tx))
        total_loss += loss
        optimizer.apply_gradients(
            zip(
                tape.gradient(loss, tvs),
                tvs
            )
        )
    train_loss_B[i] = total_loss * TRAIN_RCP
    
    total_loss = 0.0
    for tx, ty in zip(ds_valid, ds_valid_noise):
        out = modelB(ty)
        total_loss += tf.reduce_mean(tf.square(out - tx))
    valid_loss_B[i] = total_loss * VALID_RCP

In [None]:
plt.plot(range(EPOCH), train_loss_B, color = 'blue', label = 'Train loss')
plt.plot(range(EPOCH), valid_loss_B, color = 'red', label = 'Valid loss')
plt.legend(loc = "upper right")
plt.xlabel('#Epoch')
plt.ylabel('Loss')
plt.show()

In [None]:
plot_imgs(x_test_noise[: TEST_SIZE], n = TEST_SIZE, title = 'Test Samples')
plot_imgs(modelB(tf.convert_to_tensor(x_test_noise[: TEST_SIZE])).numpy(), n = TEST_SIZE, title = 'Reconstruct Samples')

In [None]:
img = x_test_noise[0]
plt.imshow(img[..., 0],cmap='gray')
plt.show()

x = tf.convert_to_tensor(img[None, ...])
J = jacob(modelB.encoder, x).numpy()
U, s, V = tangent_vecs(J.reshape([-1, 28 * 28]))
print(J.shape)
print(U.shape)
print(s.shape)
print(V.shape)
plot_imgs(J, n = DM_DIM, title = 'Jacobian Matrix')
plot_imgs(V.reshape([-1, 28, 28, 1]), n = DM_DIM, title = 'Tangent Vectors')