In [1]:
# Auto Encoder

In [2]:
import os 
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'

In [13]:
import  tensorflow as tf
import  numpy as np
from    tensorflow import keras
from    tensorflow.keras import Sequential, layers
from    PIL import Image
from    matplotlib import pyplot as plt

In [4]:
tf.random.set_seed(22)
np.random.seed(22) 
assert tf.__version__.startswith('2.')


def save_images(imgs, name):
    new_im = Image.new('L', (280, 280))

    index = 0
    for i in range(0, 280, 28):
        for j in range(0, 280, 28):
            im = imgs[index]
            im = Image.fromarray(im, mode='L')
            new_im.paste(im, (i, j))
            index += 1

    new_im.save(name)


h_dim = 20
batchsz = 512
lr = 1e-3

In [5]:
(x_train, y_train), (x_test, y_test) = keras.datasets.fashion_mnist.load_data()
x_train, x_test = x_train.astype(np.float32) / 255., x_test.astype(np.float32) / 255.
# we do not need label
train_db = tf.data.Dataset.from_tensor_slices(x_train)
train_db = train_db.shuffle(batchsz * 5).batch(batchsz)
test_db = tf.data.Dataset.from_tensor_slices(x_test)
test_db = test_db.batch(batchsz)

print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)

(60000, 28, 28) (60000,)
(10000, 28, 28) (10000,)


In [6]:
class AE(keras.Model):
    def __init__(self):
        super(AE, self).__init__()

        # Encoders
        self.encoder = Sequential([
            layers.Dense(256, activation=tf.nn.relu),
            layers.Dense(128, activation=tf.nn.relu),
            layers.Dense(h_dim)
        ])

        # Decoders
        self.decoder = Sequential([
            layers.Dense(128, activation=tf.nn.relu),
            layers.Dense(256, activation=tf.nn.relu),
            layers.Dense(784)
        ])

    def  call(self, inputs, training=None):
        h = self.encoder(inputs)
        x_hat = self.decoder(h)
        return x_hat


In [33]:
model = AE()
model.build(input_shape=(None, 784))
model.summary()

Model: "ae_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 sequential_2 (Sequential)   (None, 20)                236436    
                                                                 
 sequential_3 (Sequential)   (None, 784)               237200    
                                                                 
Total params: 473,636
Trainable params: 473,636
Non-trainable params: 0
_________________________________________________________________


In [34]:
optimizer = tf.optimizers.Adam(lr)

for epoch in range(3):
    for step, x in enumerate(train_db):
        x = tf.reshape(x, [-1, 784])
        with tf.GradientTape() as tape:
            x_rec_logits = model(x)
            rec_loss = tf.losses.binary_crossentropy(x, x_rec_logits, from_logits=True)
            rec_loss = tf.reduce_mean(rec_loss)

        grads = tape.gradient(rec_loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

        if step % 10 == 0:
            print(epoch, step, float(rec_loss))

        # Evaluate
        x = next(iter(test_db))
        
        logits = model(tf.reshape(x, [-1,784]))
        x_hat = tf.sigmoid(logits)
        x_hat = tf.reshape(x_hat, [-1, 28, 28])

        x_concat = tf.concat([x,x_hat], axis=0)
        x_concat = x_hat
        x_concat = x_concat.numpy() * 255.
        x_concat = x_concat.astype(np.uint8)

        save_images(x_concat, 'ae_images/rec_epoch_%d.png'%epoch)

0 0 0.6944249868392944
0 10 0.5526463985443115
0 20 0.464752197265625
0 30 0.420187771320343


KeyboardInterrupt: 

In [16]:
# VAE


In [56]:
z_dim = 10

class VAE(keras.Model):
    def __init__(self):
        super(VAE, self).__init__()

        # Encoder
        self.fc1 = layers.Dense(128)
        self.fc2 = layers.Dense(z_dim)
        self.fc3 = layers.Dense(z_dim)

        # Decoder
        self.fc4 = layers.Dense(128)
        self.fc5 = layers.Dense(784)

    def encoder(self, x):
        h = tf.nn.relu(self.fc1(x))
        mu = self.fc2(h)
        log_var = self.fc3(h)
        return mu, log_var

    def decoder(self, z):
        out = tf.nn.relu(self.fc4(z))
        out = self.fc5(out)
        return out

    def reparameterize(self, mu, log_var):
        eps = tf.random.normal(log_var.shape)
        std = tf.exp(log_var) ** 0.5
        z = mu + std*eps
        return z

    
    def call(self, inputs, training=None):
        
        mu, log_var = self.encoder(inputs)
        # reparameterization trick
        z = self.reparameterize(mu, log_var)
        x_hat = self.decoder(z)
        return x_hat, mu, log_var
        # return x_hat

In [60]:
model = VAE()
# model.summary()
model.build(input_shape=(4, 784))
model.summary()
# model.build(input_shape=(None, 784))
# model.build(input_shape=(4, 784))

for epoch in range(3):
    for step, x in enumerate(train_db):
        x = tf.reshape(x, [-1, 784])
        with tf.GradientTape() as tape:
            
            x_rec_logits, mu, log_var = model(x)

            # rec_loss = tf.losses.binary_crossentropy(x, x_rec_logits, from_logits=True)
            rec_loss = tf.nn.sigmoid_cross_entropy_with_logits(x, x_rec_logits)
            rec_loss = tf.reduce_sum(rec_loss) / x.shape[0]

            # compute kl divergence (mu, var) ~ N(0,1)
            # https://stats.stackexchange.com/questions/7440/kl-divergence-between-two-univariate-gaussians

            kl_div = -0.5*(log_var + 1 - mu**2 - tf.exp(log_var))
            kl_div = tf.reduce_sum(kl_div) / x.shape[0]

            loss = rec_loss + 1. * kl_div

        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

        if step % 100 == 0:
            print(epoch, step, 'kl div:', float(kl_div), 'rec loss:', float(rec_loss))
            print('xshape', x.shape)
            print(x_rec_logits.shape)


    # evaluation
    z = tf.random.normal((batchsz, z_dim))
    logits = model.decoder(z)
    x_hat = tf.sigmoid(logits)
    x_hat = tf.reshape(x_hat, [-1, 28, 28], ).numpy()*255.
    # x_hat = logits
    # x_hat = tf.reshape(x_hat, [-1, 28, 28], ).numpy()
    x_hat = x_hat.astype(np.uint8)
    save_images(x_hat, 'vae_images/sampled_epoch%d.png'%epoch)

    x = next(iter(test_db))
    x = tf.reshape(x, [-1, 784])
    x_hat_logits, _, _ = model(x)
    x_hat = tf.sigmoid(x_hat_logits)
    x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() *255.
    x_hat = x_hat.astype(np.uint8)
    save_images(x_hat, 'vae_images/rec_epoch%d.png'%epoch)

Model: "vae_31"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_167 (Dense)           multiple                  100480    
                                                                 
 dense_168 (Dense)           multiple                  1290      
                                                                 
 dense_169 (Dense)           multiple                  1290      
                                                                 
 dense_170 (Dense)           multiple                  1408      
                                                                 
 dense_171 (Dense)           multiple                  101136    
                                                                 
Total params: 205,604
Trainable params: 205,604
Non-trainable params: 0
_________________________________________________________________
0 0 kl div: 2.056114912033081 rec loss: 545.2406005859

In [61]:
# GAN