In [1]:
import os 
os.environ["PATH"] += os.pathsep + 'C:/Program Files (x86)/Graphviz2.38/bin/'

This first dense layer represents the input layer with a LeakyReLU
activation. Also, notice that the input is the latent space size. The
first number in the dense layer is the number of filters initialized;
however, in the future, you may want to experiment with changing
the network's starting value. The paper here starts with 1,024
filters, but starting out with more can help the GAN's convergence.

Next, we need to reshape the tensor so that it can be built back up in
an image. (Notice that the number of filters, 256 x 8 x 8, equals the
multiplication of each channel in the reshape layer). Each layer must
match the original information of the last—the previous tensor's
sizing information must match. After the reshape to (8, 8, 256), we'll
upsample it and then look at a generator image of 16 x 16 (by three
channels)

![image.png](attachment:image.png)


In [2]:
#!/usr/bin/env python3
import sys
import numpy as np
from keras.layers import Dense, Reshape, Input, BatchNormalization
from keras.layers.core import Activation
from keras.layers.convolutional import UpSampling2D, Convolution2D, MaxPooling2D,Deconvolution2D
from keras.layers.advanced_activations import LeakyReLU
from keras.models import Sequential, Model
from keras.optimizers import Adam, SGD, Nadam,Adamax
from keras import initializers
from keras.utils import plot_model

class Generator(object):
    def __init__(self, width = 28, height= 28, channels = 1, latent_size=100, model_type = 'simple'):
        
        self.W = width
        self.H = height
        self.C = channels
        self.LATENT_SPACE_SIZE = latent_size
        self.latent_space = np.random.normal(0,1,(self.LATENT_SPACE_SIZE,))

        if model_type=='simple':
            self.Generator = self.model()
            self.OPTIMIZER = Adam(lr=0.0002, decay=8e-9)
            self.Generator.compile(loss='binary_crossentropy', optimizer=self.OPTIMIZER)
        elif model_type=='DCGAN':
            self.Generator = self.dc_model()
            self.OPTIMIZER = Adam(lr=1e-4, beta_1=0.2)
            self.Generator.compile(loss='binary_crossentropy', optimizer=self.OPTIMIZER,metrics=['accuracy'])
        self.save_model()
        self.summary()

    def dc_model(self):

        model = Sequential()

        model.add(Dense(256*8*8,activation=LeakyReLU(0.2), input_dim=self.LATENT_SPACE_SIZE))
        model.add(BatchNormalization())

        model.add(Reshape((8, 8, 256)))
        model.add(UpSampling2D())

        model.add(Convolution2D(128, 5, 5, border_mode='same',activation=LeakyReLU(0.2)))
        model.add(BatchNormalization())
        model.add(UpSampling2D())

        model.add(Convolution2D(64, 5, 5, border_mode='same',activation=LeakyReLU(0.2)))
        model.add(BatchNormalization())
        model.add(UpSampling2D())

        model.add(Convolution2D(self.C, 5, 5, border_mode='same', activation='tanh'))
        
        return model


    def model(self, block_starting_size=128,num_blocks=4):
        model = Sequential()
        
        block_size = block_starting_size 
        model.add(Dense(block_size, input_shape=(self.LATENT_SPACE_SIZE,)))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))

        for i in range(num_blocks-1):
            block_size = block_size * 2
            model.add(Dense(block_size))
            model.add(LeakyReLU(alpha=0.2))
            model.add(BatchNormalization(momentum=0.8))

        model.add(Dense(self.W * self.H * self.C, activation='tanh'))
        model.add(Reshape((self.W, self.H, self.C)))
        
        return model

    def summary(self):
        return self.Generator.summary()

    def save_model(self):
        plot_model(self.Generator.model, to_file='/data/Generator_Model.png')

Using TensorFlow backend.


In [3]:
#!/usr/bin/env python3
import sys
import numpy as np
from keras.layers import Input, Dense, Reshape, Flatten, Dropout, BatchNormalization, Lambda, concatenate
from keras.layers.core import Activation
from keras.layers.convolutional import Convolution2D
from keras.layers.advanced_activations import LeakyReLU
from keras.models import Sequential, Model
from keras.optimizers import Adam, SGD,Nadam, Adamax
import keras.backend as K
from keras.utils import plot_model


class Discriminator(object):
    def __init__(self, width = 28, height= 28, channels = 1, latent_size=100,model_type = 'simple'):
        self.W = width
        self.H = height
        self.C = channels
        self.CAPACITY = width*height*channels
        self.SHAPE = (width,height,channels)
        
        if model_type=='simple':
            self.Discriminator = self.model()
            self.OPTIMIZER = Adam(lr=0.0002, decay=8e-9)
            self.Discriminator.compile(loss='binary_crossentropy', optimizer=self.OPTIMIZER, metrics=['accuracy'] )
        elif model_type=='DCGAN':
            self.Discriminator = self.dc_model()
            self.OPTIMIZER = Adam(lr=1e-4, beta_1=0.2)
            self.Discriminator.compile(loss='binary_crossentropy', optimizer=self.OPTIMIZER, metrics=['accuracy'] )

        self.save_model()
        self.summary()

    def dc_model(self):
        model = Sequential()
        model.add(Convolution2D(64, 5, 5, subsample=(2,2), input_shape=(self.W,self.H,self.C), border_mode='same',activation=LeakyReLU(alpha=0.2)))
        model.add(Dropout(0.3))
        model.add(BatchNormalization())
        model.add(Convolution2D(128, 5, 5, subsample=(2,2), border_mode='same',activation=LeakyReLU(alpha=0.2)))
        model.add(Dropout(0.3))
        model.add(BatchNormalization())
        model.add(Flatten())
        model.add(Dense(1, activation='sigmoid'))
        return model

    def model(self):
        model = Sequential()
        model.add(Flatten(input_shape=self.SHAPE))
        model.add(Dense(self.CAPACITY, input_shape=self.SHAPE))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(int(self.CAPACITY/2)))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(1, activation='sigmoid'))
        return model

    def summary(self):
        return self.Discriminator.summary()

    def save_model(self):
        plot_model(self.Discriminator.model, to_file='/data/Discriminator_Model.png')

In [4]:
#!/usr/bin/env python3
import sys
import numpy as np
from keras.models import Sequential, Model
from keras.optimizers import Adam, SGD
from keras.utils import plot_model

class GAN(object):
    def __init__(self,discriminator,generator):
        self.OPTIMIZER = SGD(lr=2e-4,nesterov=True)
        self.Generator = generator

        self.Discriminator = discriminator
        self.Discriminator.trainable = False
        
        self.gan_model = self.model()
        self.gan_model.compile(loss='binary_crossentropy', optimizer=self.OPTIMIZER)
        self.save_model()
        self.summary()

    def model(self):
        model = Sequential()
        model.add(self.Generator)
        model.add(self.Discriminator)
        return model

    def summary(self):
        return self.gan_model.summary()

    def save_model(self):
        plot_model(self.gan_model.model, to_file='/data/GAN_Model.png')

In [5]:
from keras.datasets import mnist
from random import randint
import numpy as np
import matplotlib.pyplot as plt
from copy import deepcopy

class Trainer:
    def __init__(self, width = 28, height= 28, channels = 1, latent_size=100, epochs =50000, batch=32, checkpoint=50,model_type=-1,data_path = ''):
        self.W = width
        self.H = height
        self.C = channels
        self.EPOCHS = epochs
        self.BATCH = batch
        self.CHECKPOINT = checkpoint
        self.model_type=model_type

        self.LATENT_SPACE_SIZE = latent_size

        self.generator = Generator(height=self.H, width=self.W, channels=self.C, latent_size=self.LATENT_SPACE_SIZE,model_type = 'DCGAN')
        self.discriminator = Discriminator(height=self.H, width=self.W, channels=self.C,model_type = 'DCGAN')
        self.gan = GAN(generator=self.generator.Generator, discriminator=self.discriminator.Discriminator)

        #self.load_MNIST()
        self.load_npy(data_path)

    def load_npy(self,npy_path):
        self.X_train = np.load(npy_path)
        self.X_train = self.X_train[:int(0.25*float(len(self.X_train)))]
        self.X_train = (self.X_train.astype(np.float32) - 127.5)/127.5
        self.X_train = np.expand_dims(self.X_train, axis=3)
        return

    def load_MNIST(self,model_type=3):
        allowed_types = [-1,0,1,2,3,4,5,6,7,8,9]
        if self.model_type not in allowed_types:
            print('ERROR: Only Integer Values from -1 to 9 are allowed')

        (self.X_train, self.Y_train), (_, _) = mnist.load_data()
        if self.model_type!=-1:
            self.X_train = self.X_train[np.where(self.Y_train==int(self.model_type))[0]]
        
        # Rescale -1 to 1
        # Find Normalize Function from CV Class  
        self.X_train = ( np.float32(self.X_train) - 127.5) / 127.5
        self.X_train = np.expand_dims(self.X_train, axis=3)
        return

    def train(self):
        for e in range(self.EPOCHS):
            b = 0
            X_train_temp = deepcopy(self.X_train)
            while len(X_train_temp)>self.BATCH:
                # Keep track of Batches
                b=b+1

                # Train Discriminator
                # Make the training batch for this model be half real, half noise
                # Grab Real Images for this training batch
                if self.flipCoin():
                    count_real_images = int(self.BATCH)
                    starting_index = randint(0, (len(X_train_temp)-count_real_images))
                    real_images_raw = X_train_temp[ starting_index : (starting_index + count_real_images) ]
                    #self.plot_check_batch(b,real_images_raw)
                    # Delete the images used until we have none left
                    X_train_temp = np.delete(X_train_temp,range(starting_index,(starting_index + count_real_images)),0)
                    x_batch = real_images_raw.reshape( count_real_images, self.W, self.H, self.C )
                    y_batch = np.ones([count_real_images,1])
                else:
                    # Grab Generated Images for this training batch
                    latent_space_samples = self.sample_latent_space(self.BATCH)
                    x_batch = self.generator.Generator.predict(latent_space_samples)
                    y_batch = np.zeros([self.BATCH,1])

                # Now, train the discriminator with this batch
                discriminator_loss = self.discriminator.Discriminator.train_on_batch(x_batch,y_batch)[0]
            
                # In practice, flipping the label when training the generator improves convergence
                if self.flipCoin(chance=0.9):
                    y_generated_labels = np.ones([self.BATCH,1])
                else:
                    y_generated_labels = np.zeros([self.BATCH,1])
                x_latent_space_samples = self.sample_latent_space(self.BATCH)
                generator_loss = self.gan.gan_model.train_on_batch(x_latent_space_samples,y_generated_labels)
    
                print ('Batch: '+str(int(b))+', [Discriminator :: Loss: '+str(discriminator_loss)+'], [ Generator :: Loss: '+str(generator_loss)+']')
                if b % self.CHECKPOINT == 0 :
                    label = str(e)+'_'+str(b)
                    self.plot_checkpoint(label)

            print ('Epoch: '+str(int(e))+', [Discriminator :: Loss: '+str(discriminator_loss)+'], [ Generator :: Loss: '+str(generator_loss)+']')
                        
            if e % self.CHECKPOINT == 0 :
                self.plot_checkpoint(e)
        return

    def flipCoin(self,chance=0.5):
        return np.random.binomial(1, chance)

    def sample_latent_space(self, instances):
        return np.random.normal(0, 1, (instances,self.LATENT_SPACE_SIZE))

    def plot_checkpoint(self,e):
        filename = "/data/sample_"+str(e)+".png"

        noise = self.sample_latent_space(16)
        images = self.generator.Generator.predict(noise)
        
        plt.figure(figsize=(10,10))
        for i in range(images.shape[0]):
            plt.subplot(4, 4, i+1)
            if self.C==1:
                image = images[i, :, :]
                image = np.reshape(image, [self.H,self.W])
                image = (255*(image - np.min(image))/np.ptp(image)).astype(int)
                plt.imshow(image,cmap='gray')
            elif self.C==3:
                image = images[i, :, :, :]
                image = np.reshape(image, [self.H,self.W,self.C])
                image = (255*(image - np.min(image))/np.ptp(image)).astype(int)
                plt.imshow(image)
            
            plt.axis('off')
        plt.tight_layout()
        plt.savefig(filename)
        plt.close('all')
        return

    def plot_check_batch(self,b,images):
        filename = "/data/batch_check_"+str(b)+".png"
        subplot_size = int(np.sqrt(images.shape[0]))
        plt.figure(figsize=(10,10))
        for i in range(images.shape[0]):
            plt.subplot(subplot_size, subplot_size, i+1)
            image = images[i, :, :, :]
            image = np.reshape(image, [self.H,self.W,self.C])
            plt.imshow(image)
            plt.axis('off')
        plt.tight_layout()
        plt.savefig(filename)
        plt.close('all')
        return

In [None]:
#!/usr/bin/env python3
# Command Line Argument Method
HEIGHT  = 64
WIDTH   = 64
CHANNEL = 3
LATENT_SPACE_SIZE = 100
EPOCHS = 100
BATCH = 128
CHECKPOINT = 10
PATH = "/data/church_outdoor_train_lmdb_color.npy"

trainer = Trainer(height=HEIGHT,\
                 width=WIDTH,\
                 channels=CHANNEL,\
                 latent_size=LATENT_SPACE_SIZE,\
                 epochs =EPOCHS,\
                 batch=BATCH,\
                 checkpoint=CHECKPOINT,\
                 model_type='DCGAN',\
                 data_path=PATH)
                 
trainer.train()