# 변이형 오토인코더

In [1]:
from tensorflow.keras.layers import Input, Conv2D, Flatten, Dense, \
                                                Conv2DTranspose, Reshape, Activation
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint 
from tensorflow.keras.utils import plot_model
import tensorflow_datasets as tfds
import tensorflow as tf

import matplotlib.pyplot as plt
import numpy as np
from IPython import display  

ModuleNotFoundError: No module named 'tensorflow'

In [None]:
BATCH_SIZE = 128
LATENT_DIM = 2

## 데이터 load

In [None]:
def map_image(image, label):
    '''주어진 이미지에서 정규화되고 재구성된 텐서를 반환합니다.'''
    image = tf.cast(image, dtype=tf.float32)
    image = image / 255.0
    image = tf.reshape(image, shape=(28, 28, 1,))
    return image

In [None]:
train_dataset = tfds.load('mnist', as_supervised=True, split="train")
test_dataset = tfds.load('mnist', as_supervised=True, split="test")
train_dataset, len(train_dataset), test_dataset, len(test_dataset)

In [None]:
train_ds = train_dataset.map(map_image).shuffle(1024).batch(BATCH_SIZE)
test_ds = test_dataset.map(map_image).shuffle(1024).batch(BATCH_SIZE)
train_ds, test_ds

## 신경망 구조 정의

<img src="https://drive.google.com/uc?export=view&id=1YAZAeMGEJ1KgieYk1ju-S9DoshpMREeC" width="60%" height="60%"/>

### Sampling Class

먼저 'Sampling' 클래스를 빌드합니다. 이것은 인코더 출력의 평균 (mu) 및 표준 편차 (sigma)와 함께 가우스 노이즈 입력을 제공하는 맞춤형 Keras 레이어입니다. 실제로 이 레이어의 출력은 다음 방정식으로 제공됩니다.

$$z = \mu + e^{0.5\sigma} * \epsilon  $$

여기서 $\mu$ = mean, $\sigma$ = standard deviation, $\epsilon$ = random sample

In [None]:
class Sampling(tf.keras.layers.Layer):
    def call(self, inputs):
        """무작위 샘플을 생성하고 인코더 출력과 결합
        Args:
          inputs -- output tensor from the encoder (mu, sigma)
        Returns:
          `inputs` tensors combined with a random sample
        """
        # unpack the output of the encoder
        mu, sigma = inputs

        # get the size and dimensions of the batch
        batch = tf.shape(mu)[0]
        dim = tf.shape(mu)[1]

        # generate a random tensor
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))

        # reparameterization 적용
        return mu + tf.math.exp(0.5 * sigma) * epsilon

### Kullback–Leibler Divergence
모델의 생성 능력을 향상 시키려면 잠재 공간에 도입된 랜덤 정규 분포를 고려해야 합니다. 이를 위해 [Kullback–Leibler Divergence](https://arxiv.org/abs/2002.07514)가 계산되어 재구성 손실에 추가됩니다. 공식은 아래 함수에서 정의됩니다.

In [None]:
def kl_reconstruction_loss(inputs, outputs, mu, sigma):
    """ Computes the Kullback-Leibler Divergence (KLD)
    Args:
    inputs -- batch from the dataset
    outputs -- output of the Sampling layer
    mu -- mean
    sigma -- standard deviation

    Returns:
    KLD loss
    """
    kl_loss = 1 + sigma - tf.square(mu) - tf.math.exp(sigma)
    kl_loss = tf.reduce_mean(kl_loss) * -0.5

    return kl_loss

### VAE Model
이제 전체 VAE 모델을 정의할 수 있습니다. KL reconstruction loss를 추가하기 위해 `model.add_loss()`를 사용합니다. 이 손실을 계산하는 것은 `y_true`와 `y_pred`를 사용하지 않으므로 `model.compile()`에서 사용할 수 없습니다. 

- add_loss() 메서드 : 손실이 있는 경우, 자동으로 합산되어 주 손실에 추가

In [None]:
 ### THE ENCODER
encoder_inputs = Input(shape=(28, 28, 1))
x = Conv2D(32, kernel_size=3, strides=1,
                   padding = 'same', activation='leaky_relu')(encoder_inputs)
x = Conv2D(64, kernel_size=3, strides=2, 
                   padding = 'same', activation='leaky_relu')(x)
x = Conv2D(64, kernel_size=3, strides=2, 
                   padding = 'same', activation='leaky_relu')(x)
before_flatten = Conv2D(64, kernel_size=3, strides=1,
                   padding = 'same', activation='leaky_relu')(x)
x = Flatten()(before_flatten)
x = Dense(20, activation='relu')(x)

mu = Dense(LATENT_DIM, name='mu')(x)
sigma = Dense(LATENT_DIM, name='sigma')(x)
z = Sampling()((mu, sigma))

encoder = Model(encoder_inputs, outputs=[mu, sigma, z])
encoder.summary()

In [None]:
shape = before_flatten.shape
shape

In [None]:
### THE DECODER
decoder_inputs = Input(shape=(LATENT_DIM,))
x = Dense(shape[1] * shape[2] * shape[3])(decoder_inputs)
x = Reshape((shape[1], shape[2], shape[3]))(x)
x = Conv2DTranspose(filters=64, kernel_size=3, strides=1, 
                        padding = 'same', activation='leaky_relu')(x)
x = Conv2DTranspose(filters=64, kernel_size=3, strides=2, 
                        padding = 'same', activation='leaky_relu')(x)
x = Conv2DTranspose(filters=32, kernel_size=3, strides=2, 
                        padding = 'same', activation='leaky_relu')(x)
decoder_output = Conv2DTranspose(filters=1, kernel_size=3, strides=1, 
                        padding = 'same', activation='sigmoid')(x)
decoder = Model(decoder_inputs, decoder_output)
decoder.summary()

In [None]:
vae_inputs = encoder_inputs
mu, sigma, z = encoder(vae_inputs)
reconstructed = decoder(z)

vae = Model(vae_inputs, reconstructed)
# add KL loss
loss = kl_reconstruction_loss(vae_inputs, z, mu, sigma)
vae.add_loss(loss)

In [None]:
learning_rate = 0.001
optimizer = Adam(learning_rate=learning_rate)
loss_metric = tf.keras.metrics.Mean()
bce_loss = tf.keras.losses.BinaryCrossentropy()

## 오토인코더 훈련

In [None]:
def generate_and_save_images(decoder, epoch, step, test_input):
    """Helper function to plot our 16 images
    Args:
    epoch -- current epoch number during training
    step -- current step number during training
    test_input -- random tensor with shape (16, LATENT_DIM)
    """
    # generate images from the test input
    reconstructed = decoder(test_input)

    # plot the results
    fig = plt.figure(figsize=(4, 4))

    for i in range(reconstructed.shape[0]):
        plt.subplot(4, 4, i+1)
        plt.imshow(reconstructed[i, :, :, 0], cmap='gray')
        plt.axis('off')

    fig.suptitle("epoch: {}, step: {}".format(epoch, step))
    plt.savefig('image_at_epoch_{:04d}_step{:04d}.png'.format(epoch, step))
    plt.show()

In [None]:
# Training loop. 
epochs = 50

for epoch in range(epochs):
    print('Start of epoch %d' % (epoch,))

    for step, (x_batch_train, x_batch_test) in enumerate(zip(train_ds, test_ds)):
        with tf.GradientTape() as tape:

            # VAE 모델에 배치 공급
            reconstructed = vae(x_batch_train)

            # 재구성 손실 계산
            flattened_inputs   = tf.reshape(x_batch_train, shape=[-1])
            flattened_outputs = tf.reshape(reconstructed, shape=[-1])
            loss = bce_loss(flattened_inputs, flattened_outputs) * 784

            # add KLD regularization loss
            loss += sum(vae.losses)  

        # get the gradients and update the weights
        grads = tape.gradient(loss, vae.trainable_weights)
        optimizer.apply_gradients(zip(grads, vae.trainable_weights))

        # compute the loss metric
        loss_metric(loss)

        # display outputs every 100 steps
        if step % 100 == 0:
            display.clear_output(wait=False)    
            _, _, z = encoder(x_batch_test[:16])
            generate_and_save_images(decoder, epoch, step, z)
            print('Epoch: %s step: %s mean loss = %s' % (epoch, step, loss_metric.result().numpy()))

In [None]:
x_batch_test = next(iter(test_ds))
print(x_batch_test.shape)

_, _, z = encoder(x_batch_test[:16])
generate_and_save_images(decoder, epoch, step, z)

## 원본 그림 재구성

In [None]:
n_to_show = 10
example_images = x_batch_test[:n_to_show]
example_images.shape

In [None]:
mu, sigma, z = encoder(example_images)
reconst_images  = decoder(mu, sigma, z)

In [None]:
fig = plt.figure(figsize=(15, 3))
fig.subplots_adjust(hspace=0.4, wspace=0.4)

for i in range(n_to_show):
    img = example_images[i].numpy().squeeze()
    sub = fig.add_subplot(2, n_to_show, i+1)
    sub.axis('off')
    sub.text(0.5, -0.35, str(np.round(z[i],1)), fontsize=10, ha='center', transform=sub.transAxes)      
    sub.imshow(img, cmap='gray_r')

for i in range(n_to_show):
    img = reconst_images[i].numpy().reshape(28, 28)
    sub = fig.add_subplot(2, n_to_show, i+n_to_show+1)
    sub.axis('off')
    sub.imshow(img, cmap='gray_r')