In [79]:
from keras import backend
from keras import Sequential
from keras.layers import Conv2D, BatchNormalization, LeakyReLU, ReLU, Flatten, Dense
from keras.layers import Reshape, Conv2DTranspose, Dropout
from keras.optimizers import Adam
from keras.initializers import RandomNormal
from keras.datasets.cifar10 import load_data

import matplotlib.pyplot as plt
import numpy as np

In [84]:
def define_disc_model():
    model = Sequential()
    model.add(Conv2D(64, (3,3), padding='same', input_shape=(32,32,3)))
    
    # output image size of 16x16x3
    model.add(Conv2D(128, (3,3), strides=(2,2), padding='same'))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.2))
    
    # output image size of 8x8x3
    model.add(Conv2D(128, (3,3), strides=(2,2), padding='same'))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.2))
    
    model.add(Flatten())
    model.add(Dense(1, activation='sigmoid'))
    
    # discriminator basically detects real/fake
    # so binary crossentropy loss can be used
    opt = Adam(lr=0.0002, beta_1=0.5)
    model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])
    
#     model.build(input_shape=(None, 32,32,3))
#     print(model.summary())
    return model

def define_gen_model():
    model = Sequential()
    hidden_nodes = 256*8*8
    input_nodes = 100
    kernel_init = RandomNormal(mean=0.0, stddev=0.02)

    # input layer = 100 nodes, taken randomly from a Guassian distribution
    # hidden layer = 128 * 8 * 8 nodes, representing 128 4*4 images
    # each of these is a low-res / compressed version of the final image
    model.add(Dense(hidden_nodes, input_dim=input_nodes))
    model.add(BatchNormalization())
#     model.add(LeakyReLU(alpha=0.2))
    model.add(ReLU())
    model.add(Reshape((8,8,256)))
    
    # output image size 16x16
    model.add(Conv2DTranspose(128, (4,4), strides=(2,2), padding='same', kernel_initializer=kernel_init))
    model.add(BatchNormalization())
#     model.add(LeakyReLU(alpha=0.2))
    model.add(ReLU())
    
    # output image size is 32x32
    model.add(Conv2DTranspose(128, (4,4), strides=(2,2), padding='same', kernel_initializer=kernel_init))
    model.add(BatchNormalization())
#     model.add(LeakyReLU(alpha=0.2))
    model.add(ReLU())
    
    model.add(Conv2D(3, (4,4), activation='tanh', padding='same', kernel_initializer=kernel_init))
    
#     modecompileuild(input_shape=(None, 100))
#     print(model.summary())
    return model

def define_gan_model(d_model, g_model):
    # freeze the discriminator model
    # assert that all images received by d_model are real
    # this will generate a loss, which will be used by g_model
    # to improve the quality of generated images
    d_model.trainable = False
    
    model = Sequential()
    model.add(g_model)
    model.add(d_model)
    opt = Adam(lr=0.0002, beta_1=0.5)
    model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])

#     model.build(input_shape=(None, 100))
#     print(model.summary())
    return model

def load_real_samples():
    (train_x, train_y), (test_x, test_y) = load_data()
    
    # scale the pixel values between -1 and 1
    train_x = train_x.astype('float32')
    train_x = ( train_x - 127.5 ) / 127.5
    return train_x 

def gen_real_subset(train_x, n_samples, smoothing=False):
    chosen_indices = np.random.randint(0, train_x.shape[0], n_samples)
    subset_x = train_x[chosen_indices]
    if not smoothing:
        subset_y = np.ones((n_samples, 1))
    else:
        # instead of 1, assign a random value between 0.8 and 1.2
        subset_y = np.empty((n_samples,1))
        for i in range(n_samples):
            subset_y[i] = round(np.random.uniform(0.8,1.2), 1)
    return subset_x, subset_y

def gen_fake_subset(shape, n_samples):
    subset_x = np.random.uniform(size=(n_samples, shape[0], shape[1], shape[2]))
    subset_y = np.zeros((n_samples, 1))
    return subset_x, subset_y

def gen_latent_inputs(n_samples, dimension):
    latent_input = np.random.randn(n_samples, dimension)
    return latent_input

def gen_fake_samples(g_model, n_samples, latent_dim, smoothing=False):
    latent_inputs = gen_latent_inputs(n_samples, latent_dim)
    fake_x = g_model.predict(latent_inputs)
    if not smoothing:
        fake_y = np.zeros((n_samples, 1))
    else:
        # instead of 0, assign a random value between 0 and 0.3
        fake_y = np.empty((n_samples, 1))
        for i in range(n_samples):
            fake_y[i] = round(np.random.uniform(0,0.3), 1)
    return fake_x, fake_y

def train_d_model(d_model, train_x, batch_size=128, iterations=100):
    n_samples = int(batch_size / 2)
    for i in range(iterations):
        real_x, real_y = gen_real_subset(train_x, n_samples)
        img_shape = real_x.shape[1:]
        fake_x, fake_y = gen_fake_subset(img_shape, n_samples)
        
        _, real_acc = d_model.train_on_batch(real_x, real_y)
        _, fake_acc = d_model.train_on_batch(fake_x, fake_y)
    
    print(f'Real Accuracy: {real_acc*100:.2f}\tFake Accuracy: {fake_acc*100:.2f}')

    return d_model
       
def train_gan(d_model, g_model, gan_model, train_x, latent_dim=100, batch_size=256, n_epochs=100):
    n_samples = int(batch_size / 2)
    n_batches = int(train_x.shape[0] / batch_size)

    with open('train_history.log', 'a+') as train_log:
        for epoch in range(n_epochs):
            for batch in range(n_batches):

                real_x, real_y = gen_real_subset(train_x, n_samples, smoothing=True)
                d_loss1, d_acc1 = d_model.train_on_batch(real_x, real_y)

                fake_x, fake_y = gen_fake_samples(g_model, n_samples, latent_dim, smoothing=True)
                d_loss2, d_acc2 = d_model.train_on_batch(fake_x, fake_y)

                # gan will receive (n_samples * 2) latent space vectors
                gan_x = gen_latent_inputs(n_samples*2, latent_dim)
                # these latent inputs will be marked as real i.e. 1
                gan_y = np.ones((n_samples*2, 1))

                # during training, only generator weights will be updated,
                # since we have frozen discriminator weights in gan definition
                gan_loss, gan_acc = gan_model.train_on_batch(gan_x, gan_y)

                metric = f'D Loss Real: {d_loss1:.4f}\tD Loss Fake: {d_loss2:.4f}\tGAN Loss: {gan_loss:.4f}\tBatch: {batch+1}\tEpoch: {epoch+1}'
                train_log.write(metric+'\n')

            if((epoch+1) % 10 == 0):
                evaluate_gan(d_model, g_model, train_x, latent_dim, epoch, n_samples)

def evaluate_gan(d_model, g_model, train_x, latent_dim, n_epoch, n_samples=25):
    # discriminator performance on real samples
    real_x, real_y = gen_real_subset(train_x, n_samples, smoothing=True)
    real_loss, real_acc = d_model.evaluate(real_x, real_y)
    
    # discriminator perfomance on fake samples
    fake_x, fake_y = gen_fake_samples(g_model, n_samples, latent_dim, smoothing=True)
    fake_loss, fake_acc = d_model.evaluate(fake_x, fake_y)
    
    metric = f'R Acc: {real_acc*100:.2f}\tR Loss: {real_loss:.4f}\tF Acc: {fake_acc*100:.2f}\tF Loss: {fake_loss:.4f}\tEpoch: {n_epoch+1}'
    with open('test_history.log', 'a+') as test_log:
        test_log.write(metric+'\n')
    
    # save the generator model snapshots
    f_name = f'./gen_models/g_model_e_{n_epoch+1}.h5'
    g_model.save(f_name)
    
    # plot generated images, save to file
    save_plot(fake_x[0:10], n_epoch)

def save_plot(images, n_epoch):
    for i in range(images.shape[0]):
        plt.subplot(2, 5, i+1)
        plt.axis('off')
        plt.imshow((127.5*(1+images[i])).astype(np.uint8))
    f_name = f'./gen_images/image_e_{n_epoch+1}.png'
    plt.tight_layout()
    plt.savefig(f_name)
    plt.close()

In [81]:
print(f'Image Data Format: {backend.image_data_format()}')
d_model = define_disc_model()
g_model = define_gen_model()
gan_model = define_gan_model(d_model, g_model)
train_x = load_real_samples()

Image Data Format: channels_last


In [82]:
n_epochs=200
batch_size = 128
latent_dim = 100

In [None]:
train_gan(d_model, g_model, gan_model, train_x, latent_dim, batch_size, n_epochs)