In [1]:
import cv2
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from PIL import Image
from os import listdir
from os.path import join, isfile

### Configuration

In [2]:
# Model configuration
# Encoder
input_shape = (128, 128, 3)
before_flatten_shape = (8, 8, 64)
enc_layer_nums = 4
enc_filters = [32, 64, 64, 64]
enc_kernel_size = [3, 3, 3, 3]
enc_strides = [2, 2, 2, 2]
enc_padding = ["same"] * enc_layer_nums
latent_dim = 200
# Decoder
dec_layer_nums = 4
dec_filters = [64, 64, 32, 3]
dec_kernel_size = [3, 3, 3, 3]
dec_strides = [2, 2, 2, 2]
dec_padding = ["same"] * dec_layer_nums
# Common
drop_rate = 0.25
model_config = {"input_shape": input_shape,
                "before_flatten_shape": before_flatten_shape,
                "enc_layer_nums": enc_layer_nums,
                "enc_filters": enc_filters,
                "enc_kernel_size": enc_kernel_size,
                "enc_strides": enc_strides,
                "enc_padding": enc_padding,
                "latent_dim": latent_dim,
                "dec_layer_nums": dec_layer_nums,
                "dec_filters": dec_filters,
                "dec_kernel_size": dec_kernel_size,
                "dec_strides": dec_strides,
                "dec_padding": dec_padding,
                "drop_rate": drop_rate}

# Training configuration
kl_loss_factor = 0.00001
learning_rate = 0.001
beta_1 = 0.9
beta_2 = 0.999
batch_size = 32
epochs = 10

### Prepare Dataset

In [23]:
class DataLoader:
    def __init__(self, data_dirs=None, folder_dir=None, reshape=None):
        assert data_dirs is not None or folder_dir is not None, "Either data_dir or folder_dir must provided"
        assert type(reshape) == list or type(reshape) == tuple, "reshape must be list or tuple of length 2"
        assert len(reshape) == 2, "reshape must be list or tuple of length 2"
        
        # Define properties
        self.data_dirs = data_dirs
        self.reshape = reshape
        
        # Get data_dirs
        if data_dirs is None and folder_dir is not None:
            self.data_dirs = [join(folder_dir, file) for file in listdir(folder_dir) if isfile(join(folder_dir, file))]
        
    def __call__(self):
        # Read image by image
        for data_dir in self.data_dirs:
            # Load image and convert to numpy
            img = Image.open(data_dir)
            img = np.asarray(img)
            
            # Resize image to match with input shape
            if self.reshape is not None:
                img = cv2.resize(img, self.reshape)
            
            # Normalization image
            img = img / 255.
            yield (img, img)

In [24]:
# Create DataLoader
data_loader = DataLoader(folder_dir="../datasets/CelebFaces/imgs", reshape=input_shape[:2])

In [25]:
# Create Generator
train_gen = tf.data.Dataset.from_generator(data_loader, (tf.float32, tf.float32), (tf.TensorShape([128, 128, 3]), tf.TensorShape([128, 128, 3])))
                                           
train_gen = train_gen.shuffle(1000, reshuffle_each_iteration=True)
train_gen = train_gen.batch(batch_size, drop_remainder=True)
train_gen = train_gen.repeat()
train_gen = train_gen.prefetch(tf.data.experimental.AUTOTUNE)

### Prepare Model

In [14]:
def build_encoder(config):
    # Define input
    inputs = tf.keras.Input(shape=config["input_shape"], name="encoder_input")
    
    # Define layers
    x = inputs
    for i in range(config["enc_layer_nums"]):
        x = tf.keras.layers.Conv2D(filters=config["enc_filters"][i],
                                   kernel_size=config["enc_kernel_size"][i],
                                   strides=config["enc_strides"][i],
                                   padding=config["enc_padding"][i],
                                   name="encoder_conv_{}".format(i + 1))(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.LeakyReLU()(x)
        x = tf.keras.layers.Dropout(config["drop_rate"])(x)
    x = tf.keras.layers.Flatten()(x)
    
    # Define output
    mean = tf.keras.layers.Dense(config["latent_dim"], name="mean")(x)
    log_var = tf.keras.layers.Dense(config["latent_dim"], name="log_var")(x)
    return tf.keras.Model(inputs=inputs, outputs=[mean, log_var], name="encoder")

def build_decoder(config):
    # Define input
    inputs = tf.keras.Input(shape=config["latent_dim"], name="decoder_input")
    
    # Define layers
    x = inputs
    x = tf.keras.layers.Dense(np.prod(config["before_flatten_shape"]))(x)
    x = tf.keras.layers.Reshape(config["before_flatten_shape"])(x)
    for i in range(config["dec_layer_nums"]):
        x = tf.keras.layers.Conv2DTranspose(filters=config["dec_filters"][i],
                                            kernel_size=config["dec_kernel_size"][i],
                                            strides=config["dec_strides"][i],
                                            padding=config["dec_padding"][i],
                                            name="decoder_conv_t_{}".format(i + 1))(x)
        if i < config["dec_layer_nums"] - 1:
            x = tf.keras.layers.LeakyReLU()(x)
        else:
            x = tf.keras.layers.Activation("sigmoid")(x)
    
    # Define output
    outputs = x
    return tf.keras.Model(inputs=inputs, outputs=outputs, name="decoder")

def sampling(mean, log_var):
    epsilon = tf.random.normal(tf.shape(mean), mean=0.0, stddev=1.0)
    return mean + tf.math.exp(log_var / 2) * epsilon

class Autoencoder(tf.keras.Model):
    def __init__(self, config):
        super().__init__(self)
        self.encoder = build_encoder(config)
        self.decoder = build_decoder(config)
        self.mean = None
        self.log_var = None
        
    def call(self, x, training=False):
        self.mean, self.log_var = self.encoder(x)
        latent_vector = sampling(self.mean, self.log_var)
        outputs = self.decoder(latent_vector)
        return outputs

In [15]:
# Build Autoencoder model
model = Autoencoder(model_config)
model.build(input_shape=(batch_size, ) + input_shape)

### Define loss function

In [16]:
class VAELoss(tf.keras.losses.Loss):
    def __init__(self, model, kl_loss_factor):
        super().__init__()
        self.model = model
        self.kl_loss_factor = kl_loss_factor
        self.r_loss = tf.keras.losses.MeanSquaredError()
        
    def kl_loss(self):
        mean = self.model.mean
        log_var = self.model.log_var
        return -0.5 * tf.reduce_sum(1 + log_var - tf.math.square(mean) - tf.math.exp(log_var), axis=1)
        
    def call(self, y_true, y_pred):
        r_loss = self.r_loss(y_true, y_pred)
        kl_loss = self.kl_loss() * kl_loss_factor
        return r_loss + kl_loss

### Train Model

In [26]:
# Compile model
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate, beta_1=beta_1, beta_2=beta_2),
              loss=VAELoss(model=model, kl_loss_factor=kl_loss_factor))

# Fit model
model.fit(train_gen,
          steps_per_epoch=len(data_loader.data_dirs) // batch_size,
          epochs=epochs,
          shuffle=True)

Train for 6331 steps
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x638360e50>

In [27]:
model.save_weights("./save/03_VAE_to_Generate_Faces/model")

### Analysis

In [28]:
model.load_weights("./save/03_VAE_to_Generate_Faces/model")

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x63822bd10>