- Author: Umair Khan
- Date: 27/06/20

# 1. Imports

In [1]:
import sys
from os.path import join
sys.path.append('..')
from models import aegan_model
from matplotlib import pyplot
from numpy import zeros
from numpy import ones
from numpy import expand_dims
from numpy import mean
from numpy.random import randn, uniform
from numpy.random import randint

from keras.optimizers import Adam
from keras.models import Model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import Reshape
from keras.layers import Flatten
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import BatchNormalization
from keras.layers import Dropout
from keras.layers import Embedding
from keras.layers import Activation
from keras.layers import Concatenate
from keras.initializers import RandomNormal

Using TensorFlow backend.


# 2. Load Data

In [2]:
from PIL import Image
import numpy as np
from os import listdir
from os.path import isfile

def load_sample_data():
    positive_path = "E:\project\gan_pcam\data\\train_sample\\positive"
    negative_path = "E:\project\gan_pcam\data\\train_sample\\negative"
    positive_images = [f for f in listdir(positive_path) if isfile(join(positive_path, f))]
    negative_images = [f for f in listdir(negative_path) if isfile(join(negative_path, f))]
    images = []
    labels = []
    for pos, neg in zip(positive_images, negative_images):
        img = Image.open(join(positive_path, pos))
        images.append(np.array(img))
        labels.append(1)
        img = Image.open(join(negative_path, neg))
        images.append(np.array(img))
        labels.append(0)
    return np.array(images), np.array(labels)

In [3]:
def load_real_samples():
    # load dataset
    #(trainX, trainy), (_, _) = load_data()
    trainX, trainy = load_sample_data()
    # expand to 3d, e.g. add channels
    #X = expand_dims(trainX, axis=-1)
    X = trainX
    # convert from ints to floats
    X = X.astype('float32')
    # scale from [0,255] to [-1,1]
    X = (X - 127.5) / 127.5
    print(X.shape, trainy.shape)
    return [X, trainy]

# select real samples
def generate_real_samples(dataset, n_samples,  noise=False):
    # split into images and labels
    images, labels = dataset
    # choose random instances
    ix = randint(0, images.shape[0], n_samples)
    # select images and labels
    X, labels = images[ix], labels[ix]
    # generate class labels
    y = ones((n_samples, 1)) if not noise else ones((n_samples, 1)) - uniform(0, 0.1, n_samples).reshape((n_samples, 1))
    return [X, labels], y

# generate points in latent space as input for the generator
def generate_latent_points(latent_dim, n_samples, n_classes=2):
    # generate points in the latent space
    x_input = uniform(-1, 1, latent_dim * n_samples)
    # reshape into a batch of inputs for the network
    z_input = x_input.reshape(n_samples, latent_dim)
    # generate labels
    labels = randint(0, n_classes, n_samples)
    return [z_input, labels]

# use the generator to generate n fake examples, with class labels
def generate_fake_samples(generator, latent_dim, n_samples, noise=False):
    z_input, _ = generate_latent_points(latent_dim, n_samples)
    images = generator.predict(z_input)
    y = zeros((n_samples, 1)) if not noise else zeros((n_samples, 1)) + uniform(0, 0.1, n_samples).reshape((n_samples, 1))
    return [images, _], y

# 3. Train Encoder-Decoder

In [4]:

def get_combined_encoder_decoder(encoder, decoder, opt):
    decoder_output = decoder(encoder.output)
    model = Model(encoder.input, decoder_output)
    model.compile(loss=['mean_absolute_error'], optimizer=opt)
    return model

# generate samples and save as a plot and save the model
def summarize_encoder_decoder_performance(epoch, enc_dec_model, encoder_model, decoder_model, dataset, n_samples=25):
    # prepare fake examples
    [X_real, _], _ = generate_real_samples(dataset, n_samples)
    X = decoder_model.predict(encoder_model.predict(X_real))
    # scale from 0 to 255 (as unsigned integer)
    X = ((X * 127.5) + 127.5).astype('uint8')
    pyplot.figure(figsize=(12, 10))
    for i in range(25):
        # define subplot
        pyplot.subplot(5, 5, 1 + i)
        # turn off axis
        pyplot.axis('off')
        # plot raw pixel data
        pyplot.imshow(X[i, :, :, :])
    # save plot to file
    output = f'.\\AEGAN with sample PCam data\\output\\encoder-decoder\\encoder_decoder_{epoch}.png'
    pyplot.savefig(output)
    pyplot.close()
    encoder_filename = f'.\\AEGAN with sample PCam data\\model\\encoder-decoder\\encoder_model_{epoch}.h5'
    decoder_filename = f'.\\AEGAN with sample PCam data\\model\\encoder-decoder\\decoder_model_{epoch}.h5'
    encoder_model.save(encoder_filename)
    decoder_model.save(decoder_filename)
    print('>Saved: %s, %s and %s' % (output, encoder_filename, decoder_filename))


def train_encoder_decoder(enc_dec_model, encoder_model, decoder_model, dataset, n_epochs=200, n_batch=64):
    bat_per_epo = int(dataset[0].shape[0] / n_batch)
    # calculate the number of training iterations
    n_steps = bat_per_epo * n_epochs
    # calculate the size of half a batch of samples
    half_batch = int(n_batch / 2)
    # manually enumerate epochs
    err_list = []
    for i in range(n_steps):
        # get randomly selected 'real' samples
        [X_real, _], _ = generate_real_samples(dataset, n_batch)
        # update discriminator model weights
        err = enc_dec_model.train_on_batch(X_real, X_real)
        err_list.append(err)
        # summarize loss on this batch
        print(f'>Step:{i+1} L1[{err:.3f}]\r', end="")
        # evaluate the model performance every 'epoch'
        if (i+1) % bat_per_epo == 0:
            print(f'>Epoch:{(i+1)//bat_per_epo} L1[{mean(err_list):.3f}]\r', end="")
            print()
            err_list = []
        if (i+1) % (bat_per_epo * 10) == 0:
            summarize_encoder_decoder_performance((i+1)//bat_per_epo, enc_dec_model, encoder_model, decoder_model, dataset)

dataset = load_real_samples()
encoder_model = aegan_model.get_encoder()
decoder_model = aegan_model.get_decoder()
opt_RS = Adam(lr=0.00001, beta_1=0.5)
enc_dec_model = get_combined_encoder_decoder(encoder_model, decoder_model, opt_RS)

train_encoder_decoder(enc_dec_model, encoder_model, decoder_model, dataset)

(10000, 96, 96, 3) (10000,)
Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.


  model = Model(input=image, output=output)
  model = Model(input=embedding, output=output)


Instructions for updating:
Use tf.cast instead.
>Epoch:1 L1[0.380]]
>Epoch:2 L1[0.214]]
>Epoch:3 L1[0.173]]
>Epoch:4 L1[0.147]]
>Epoch:5 L1[0.122]]
>Epoch:6 L1[0.108]]
>Epoch:7 L1[0.101]6]
>Epoch:8 L1[0.096]2]
>Epoch:9 L1[0.093]5]
>Epoch:10 L1[0.090]]
>Saved: .\AEGAN with sample PCam data\output\encoder-decoder\encoder_decoder_10.png, .\AEGAN with sample PCam data\model\encoder-decoder\encoder_model_10.h5 and .\AEGAN with sample PCam data\model\encoder-decoder\decoder_model_10.h5
>Epoch:11 L1[0.086]]
>Epoch:12 L1[0.084]]
>Epoch:13 L1[0.082]]
>Epoch:14 L1[0.080]]
>Epoch:15 L1[0.080]]
>Epoch:16 L1[0.079]]
>Epoch:17 L1[0.076]]
>Epoch:18 L1[0.075]]
>Epoch:19 L1[0.074]]
>Epoch:20 L1[0.073]]
>Saved: .\AEGAN with sample PCam data\output\encoder-decoder\encoder_decoder_20.png, .\AEGAN with sample PCam data\model\encoder-decoder\encoder_model_20.h5 and .\AEGAN with sample PCam data\model\encoder-decoder\decoder_model_20.h5
>Epoch:21 L1[0.072]]
>Epoch:22 L1[0.071]]
>Epoch:23 L1[0.070]]
>Epoch:24

>Epoch:181 L1[0.035]]
>Epoch:182 L1[0.036]]
>Epoch:183 L1[0.036]]
>Epoch:184 L1[0.035]]
>Epoch:185 L1[0.035]]
>Epoch:186 L1[0.036]]
>Epoch:187 L1[0.035]]
>Epoch:188 L1[0.035]]
>Epoch:189 L1[0.035]]
>Epoch:190 L1[0.035]]
>Saved: .\AEGAN with sample PCam data\output\encoder-decoder\encoder_decoder_190.png, .\AEGAN with sample PCam data\model\encoder-decoder\encoder_model_190.h5 and .\AEGAN with sample PCam data\model\encoder-decoder\decoder_model_190.h5
>Epoch:191 L1[0.035]]
>Epoch:192 L1[0.035]]
>Epoch:193 L1[0.034]]
>Epoch:194 L1[0.035]]
>Epoch:195 L1[0.035]]
>Epoch:196 L1[0.035]]
>Epoch:197 L1[0.035]]
>Epoch:198 L1[0.034]]
>Epoch:199 L1[0.035]]
>Epoch:200 L1[0.034]]
>Saved: .\AEGAN with sample PCam data\output\encoder-decoder\encoder_decoder_200.png, .\AEGAN with sample PCam data\model\encoder-decoder\encoder_model_200.h5 and .\AEGAN with sample PCam data\model\encoder-decoder\decoder_model_200.h5


# 4. Train Generator Discriminator

In [4]:
def define_gan(g_model, d_model):
    # make weights in the discriminator not trainable
    d_model.trainable = False
    # connect the outputs of the generator to the inputs of the discriminator
    gan_output = d_model(g_model.output)
    # define gan model as taking noise and label and outputting real/fake and label outputs
    model = Model(g_model.input, gan_output)
    # compile model
    opt = Adam(lr=0.0004, beta_1=0.5)
    model.compile(loss=['binary_crossentropy'], optimizer=opt)
    return model

# generate samples and save as a plot and save the model
def summarize_generator_discriminator_performance(epoch, g_model, d_model, dec_model, dataset, latent_dim=99, n_samples=25):
    # prepare fake examples
    [X, _], _ = generate_fake_samples(g_model, latent_dim, n_samples)
    X = dec_model.predict(X)
    # scale from 0 to 255 (as unsigned integer)
    X = ((X * 127.5) + 127.5).astype('uint8')
    pyplot.figure(figsize=(12, 10))
    for i in range(25):
        # define subplot
        pyplot.subplot(5, 5, 1 + i)
        # turn off axis
        pyplot.axis('off')
        # plot raw pixel data
        pyplot.imshow(X[i, :, :, :])
    # save plot to file
    output = f'.\\AEGAN with sample PCam data\\output\\generator_discriminator\\generator-discriminator_{epoch}.png'
    pyplot.savefig(output)
    pyplot.close()
    g_filename = f'.\\AEGAN with sample PCam data\\model\\generator_discriminator\\g_model_{epoch}.h5'
    d_filename = f'.\\AEGAN with sample PCam data\\model\\generator_discriminator\\d_model_{epoch}.h5'
    g_model.save(g_filename)
    d_model.save(d_filename)
    print('>Saved: %s, %s and %s' % (output, g_filename, d_filename))

def train_generator_discriminator(g_model, d_model, enc_model, dec_model,
                                  gan_model, dataset, latent_dim=99, n_epochs=200, n_batch=128):
    # calculate the number of batches per training epoch
    bat_per_epo = int(dataset[0].shape[0] / n_batch)
    # calculate the number of training iterations
    n_steps = bat_per_epo * n_epochs
    # calculate the size of half a batch of samples
    half_batch = int(n_batch / 2)
    # manually enumerate epochs
    dr_list = []
    df_list = []
    g_list = []
    for i in range(n_steps):
        # get randomly selected 'real' samples
        [X_real, _], y_real = generate_real_samples(dataset, half_batch, True)
        # update discriminator model weights
        embedding_real = enc_model.predict(X_real)
        dr_loss = d_model.train_on_batch(embedding_real, y_real)
        # generate 'fake' examples
        [embedding_fake, _], y_fake = generate_fake_samples(g_model, latent_dim, half_batch, True)
        # update discriminator model weights
        df_loss = d_model.train_on_batch(embedding_fake, y_fake)
        # prepare points in latent space as input for the generator
        [z_input, _] = generate_latent_points(latent_dim, n_batch)
        # create inverted labels for the fake samples
        y_gan = ones((n_batch, 1))
        # update the generator via the discriminator's error
        g_loss = gan_model.train_on_batch(z_input, y_gan)
        dr_list.append(dr_loss)
        df_list.append(df_loss)
        g_list.append(g_loss)
        # evaluate the model performance every 'epoch'
        print(f'>Step:{i+1} DR[{dr_loss:.3f}], DF[{df_loss:.3f}] GL[{g_loss:.3f}]\r', end="")
        # evaluate the model performance every 'epoch'
        if (i+1) % bat_per_epo == 0:
            print(f'>Epoch:{(i+1)//bat_per_epo} DR[{mean(dr_list):.3f}], DF[{mean(df_list):.3f}] GL[{mean(g_list):.3f}]\r', end="")
            print()
            dr_list = []
            df_list = []
            g_list = []
        if (i+1) % (bat_per_epo * 10) == 0:
            summarize_generator_discriminator_performance((i+1)//bat_per_epo, g_model, d_model, dec_model, dataset)


In [5]:
dataset = load_real_samples()
latent_dim = 112
enc_model = aegan_model.get_encoder()
enc_model.load_weights('.\\AEGAN with sample PCam data\\model\\encoder-decoder\\encoder_model_200.h5')
dec_model = aegan_model.get_decoder()
dec_model.load_weights('.\\AEGAN with sample PCam data\\model\\encoder-decoder\\decoder_model_200.h5')

g_model = aegan_model.get_embedding_generator(dropout=True)

d_model = aegan_model.get_embedding_discriminator(dropout=True)
opt = Adam(lr=0.0002, beta_1=0.5)
d_model.compile(loss=['binary_crossentropy'], optimizer=opt)

#g_model.load_weights('.\\AEGAN with sample PCam data\\model\\generator_discriminator\\g_model_300.h5')
#d_model.load_weights('.\\AEGAN with sample PCam data\\model\\generator_discriminator\\d_model_300.h5')

gan_model = define_gan(g_model, d_model)

train_generator_discriminator(g_model, d_model, enc_model, dec_model,
                                  gan_model, dataset, latent_dim, n_epochs=400, n_batch=64)


(10000, 96, 96, 3) (10000,)
Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.


  model = Model(input=image, output=output)
  model = Model(input=embedding, output=output)
  model = Model(input=in_latent, output=output)
  model = Model(input=embedding, output=output)


Instructions for updating:
Use tf.cast instead.


  'Discrepancy between trainable weights and collected trainable'


ValueError: Error when checking input: expected input_4 to have shape (48, 48, 128) but got array with shape (64, 64, 128)

In [None]:
gan