## Load Data Set

In [None]:
import zipfile
import os
import numpy as np
import pathlib
import pandas as pd
from math import ceil
import tensorflow as tf
import numpy as np
import IPython.display as display
import keras
from keras import backend as K
from matplotlib import pyplot as plt
from keras.utils import to_categorical


print(tf.__version__)


mnist = keras.datasets.mnist

(x_train, y_train),(x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

In [None]:
print(x_train.shape, y_train.shape)
x_train = np.expand_dims(x_train, -1)
y_train = to_categorical(y_train)
print(x_train.shape, y_train.shape)

In [None]:
def custom_loader():
  BATCH_SIZE = 32

  trainset_size = x_train.shape[0]

  p = 0
  while True:

    idx_from = (p * BATCH_SIZE) % trainset_size

    idx_to = idx_from + BATCH_SIZE

    batch_x = x_train[idx_from: idx_to]
    batch_y = y_train[idx_from: idx_to]
    
    p += 1
  
    yield batch_x, batch_y

In [None]:
custom_gen = custom_loader()
for i in range(3):
  batch_x, batch_y = next(custom_gen)
  print(batch_x.shape, batch_y.shape)

## Model Design

In [None]:
enc_len = 0
age_len = 10
img_shape = (28, 28, 1)
width, height, depth = (28, 28, 1)
img_len = np.prod(img_shape)
latent_dim = enc_len + age_len + img_len
noise_len = 50  # 32 x 32 x 3
input_dim = enc_len + age_len + noise_len
cond_len = enc_len + age_len


def build_discriminator():
    model = keras.Sequential([
        # dense 1
        keras.layers.Dense(64, input_shape=(latent_dim,)),
        keras.layers.Activation(tf.nn.relu),
        keras.layers.Dropout(0.2),
        
        # output
        keras.layers.Dense(1),
        keras.layers.Activation(tf.nn.sigmoid),
    ])
    
    # condition
#     c1 = keras.layers.Input(shape=(enc_len,))
    c2 = keras.layers.Input(shape=(age_len,))
    
    # image
    z = keras.layers.Input(shape=img_shape)
    
    # flatten image
    z_flat = keras.layers.Flatten()(z)
    
    # concatenation
    inputs = keras.layers.concatenate([c2, z_flat])
    
    # real or fake
    outputs = model(inputs)
    
    return keras.models.Model([c2, z], outputs)


def build_generator():
    model = keras.Sequential([
        
        # dense 1
        keras.layers.Dense(128, input_shape=(input_dim,)),
        keras.layers.Activation(tf.nn.relu),
        keras.layers.Dropout(0.2),
        
        # dense 2
        keras.layers.Dense(256, input_shape=(input_dim,)),
        keras.layers.Activation(tf.nn.relu),
        keras.layers.Dropout(0.2),
        
#         # reshape 1d to 3d
#         keras.layers.Reshape((12, 12, 1)),
        
#         # transpose conv block 1
#         keras.layers.Conv2DTranspose(filters=1, kernel_size=(3, 3), padding='same'),
#         keras.layers.Activation(tf.nn.relu),
#         keras.layers.UpSampling2D(size=(2,2)),
#         keras.layers.Dropout(0.2),
        
#         # transpose conv block 2
#         keras.layers.Conv2DTranspose(filters=2, kernel_size=(3, 3), padding='same'),
#         keras.layers.Activation(tf.nn.relu),
#         keras.layers.UpSampling2D(size=(2,2)),
#         keras.layers.Dropout(0.2),
        
#         # flatten
#         keras.layers.Flatten(),
        
        # dense 3
        keras.layers.Dense(784),
        keras.layers.Activation(tf.nn.relu),
        keras.layers.Dropout(0.2),
        
        # reshape 1d to 3d
        keras.layers.Reshape((width, height, depth)),
        
        # output
        keras.layers.Activation(tf.nn.tanh),
    ])
    
    # condition
#     c1 = keras.layers.Input(shape=(enc_len,))
    c2 = keras.layers.Input(shape=(age_len,))
    
    # noise
    x = keras.layers.Input(shape=(noise_len,))

    # concatenation
    inputs = keras.layers.concatenate([c2, x])
    
    # real or fake
    outputs = model(inputs)
    
    return keras.models.Model([c2, x], outputs)


generator = build_generator()
discriminator = build_discriminator()

In [None]:
generator.summary()

In [None]:
discriminator.summary()

## Generative Adversarial Network

In [None]:
GLR = 0.02  # generator
DLR = 0.001  # discriminator


discriminator.compile(
    optimizer=keras.optimizers.Adam(lr=DLR),
    loss=keras.losses.mean_squared_error,
    metrics=['accuracy']
)


# condition
# c1 = keras.layers.Input(shape=(enc_len,))
c2 = keras.layers.Input(shape=(age_len,))

# noise
x = keras.layers.Input(shape=(noise_len,))

# freeze discriminator
discriminator.trainable = False

# output
z = generator([c2, x])
out = discriminator([c2, z])

# GAN
gan = keras.models.Model(inputs=[c2, x], outputs=out)

gan.compile(
    optimizer=keras.optimizers.Adam(lr=GLR),
    loss=keras.losses.mean_squared_error,
    metrics=['accuracy'])

In [None]:
gan.summary()

## Visualization Method

In [None]:
# from google.colab import drive
import os


# drive.mount('/content/gdrive', force_remount=True)

root_path = './'
tgt_pth = os.path.join(root_path, 'visualize_mnist-v6')

if not os.path.exists(tgt_pth):
  os.mkdir(tgt_pth)

In [None]:
def visualizeGAN(e, z_real, z_fake):

    fig, axes = plt.subplots(8, 8, figsize=(40, 36))

    r_real = 0
    r_fake = 0
    for row, axe in enumerate(axes):
        for col, cell in enumerate(axe):
            if row % 2 == 0:
                cell.imshow(z_real[r_real * 8 + col])
            else:
                cell.imshow(z_fake[r_fake * 8 + col])

            cell.axis("off")

        if row % 2 == 0:
            r_real += 1
        else:
            r_fake += 1

    plt.axis("off")
    
    fig.savefig(os.path.join(tgt_pth, '{}.jpg'.format(str(e).zfill(3))))
    
    plt.close()

## Load Batch

In [None]:
def load_noise():
    
    y_true = tf.ones((BATCH_SIZE,))
    
    config = tf.ConfigProto()
    config.gpu_options.allow_growth = True
    config = tf.ConfigProto(allow_soft_placement=True)
    config.gpu_options.allocator_type = 'BFC'
    config.gpu_options.per_process_gpu_memory_fraction = 0.40

    with tf.Session(config=config) as sess:
        tf.initialize_all_variables().run()
        
        # run once
        y_true = y_true.eval()

        while True:
            batch_x, batch_y = next(custom_gen)

            sz = batch_x.shape[0]

            if sz != BATCH_SIZE:
                continue
            
            # fake data
            c2 = tf.cast(batch_y, tf.float32).eval()
            x = tf.random.normal((sz, noise_len,)).eval()
            
            yield c2, x, y_true


def load_batch():
    
    y_fake = tf.zeros((BATCH_SIZE,))
    y_true = tf.ones((BATCH_SIZE,))
    
    config = tf.ConfigProto()
    config.gpu_options.allow_growth = True
    config = tf.ConfigProto(allow_soft_placement=True)
    config.gpu_options.allocator_type = 'BFC'
    config.gpu_options.per_process_gpu_memory_fraction = 0.40

    with tf.Session(config=config) as sess:
        tf.initialize_all_variables().run()
        
        # run once
        y_fake = y_fake.eval()
        y_true = y_true.eval()

        while True:
            batch_x, batch_y = next(custom_gen)

            sz = batch_x.shape[0]

            if sz != BATCH_SIZE:
                continue
            
            # fake data
            c2 = tf.cast(batch_y, tf.float32).eval()
            x = tf.random.normal((sz, noise_len,)).eval()
            z_fake = generator.predict([c2, x])

            # real data
            c2 = tf.cast(batch_y, tf.float32).eval()
            z_real = batch_x
                        
            yield c2, x, z_fake, y_fake, z_real, y_true

## Train Model

In [None]:
BATCH_SIZE = 32
EPOCHS = 128
STEPS = 1  # 60000 // BATCH_SIZE


train_loss_g = []
train_loss_d = []

train_acc_g = []
train_acc_d = []


disc_itr = load_batch()
gen_itr = load_noise()


# epochs
for e in range(EPOCHS):

    #batches
    loss = []
    acc = []

    for p in range(STEPS * 2):
        
        c2, x, z_fake, y_fake, z_real, y_real = next(disc_itr)
    
        # train
        loss_2, acc_2 = discriminator.train_on_batch([c2, z_real], y_real)
        loss_1, acc_1 = discriminator.train_on_batch([c2, z_fake], y_fake)

        batch_loss = 0.5 * (loss_1 + loss_2)
        batch_acc = 0.5 * (acc_1 + acc_2)

        loss.append(batch_loss)
        acc.append(batch_acc)

    train_loss_d.append(np.mean(np.array(loss)))
    train_acc_d.append(np.mean(np.array(acc)))

    #batches
    loss = []
    acc = []

    for p in range(STEPS):

      c2, x, y_true = next(gen_itr)

      # train
      loss_1, acc_1 = gan.train_on_batch([c2, x], y_true)

      loss.append(loss_1)
      acc.append(acc_1)

    train_loss_g.append(np.mean(np.array(loss)))
    train_acc_g.append(np.mean(np.array(acc)))


    print("Epoch: {}, Steps: {}, Discriminator Accuracy: %{:.2f}, GAN Accuracy: %{:.2f}".format(
          e,
          STEPS,
          train_acc_d[-1] * 100,
          train_acc_g[-1] * 100
      ))

    ## visualize results
    batch_x, batch_y = next(custom_gen)

    sz = batch_x.shape[0]

    # fake data
    #     c1 = row['enc']
    c2 = tf.cast(batch_y, tf.float32).eval()
    x = tf.random.normal((sz, noise_len,)).eval()
    z_fake = generator.predict([c2, x])
    z_fake = np.squeeze(z_fake, axis=-1)

    # real data
    z_real = batch_x
    z_real = np.squeeze(z_real, axis=-1)

    visualizeGAN(e, z_real, z_fake)

## Plot Loss

In [None]:
plt.figure(figsize=(20, 18))
plt.plot(train_loss_g, label="Generator Loss");
plt.plot(train_loss_d, label="Discriminator Loss");
plt.legend();

## Plot Accuracy

In [None]:
plt.figure(figsize=(20, 18))
plt.plot(train_acc_g, label="Generator Accuracy");
plt.plot(train_acc_d, label="Discriminator Accuracy");
plt.legend();