In [None]:
from __future__ import print_function, division
from keras.layers import Input, Dense, Flatten, Dropout, Reshape, Concatenate
from keras.layers import BatchNormalization, Activation, Conv2D, Conv2DTranspose
from keras.layers.advanced_activations import LeakyReLU
from keras.models import Model
from keras.optimizers import Adam
from keras.datasets import mnist
import keras.backend as K
import matplotlib.pyplot as plt
import sys
import numpy as np
%pylab inline

In [None]:
#Creating Generator for CGANS
def get_generator(input_layer, condition_layer):

  merged_input = Concatenate()([input_layer, condition_layer])#concatenating (100,) tensor with (10,1) tensor
  hid = Dense(128 * 7 * 7, activation='relu')(merged_input)    
  hid = BatchNormalization(momentum=0.9)(hid)
  hid = LeakyReLU(alpha=0.1)(hid)
  hid = Reshape((7, 7, 128))(hid)
  hid = Conv2D(128, kernel_size=4, strides=1,padding='same')(hid)
  hid = BatchNormalization(momentum=0.9)(hid)    
  hid = LeakyReLU(alpha=0.1)(hid)
  hid = Conv2DTranspose(128, 4, strides=2, padding='same')(hid)
  hid = BatchNormalization(momentum=0.9)(hid)
  hid = LeakyReLU(alpha=0.1)(hid)
  hid = Conv2D(128, kernel_size=5, strides=1,padding='same')(hid)
  hid = BatchNormalization(momentum=0.9)(hid)    
  hid = LeakyReLU(alpha=0.1)(hid)
  hid = Conv2DTranspose(128, 4, strides=2, padding='same')(hid)
  hid = BatchNormalization(momentum=0.9)(hid)
  hid = LeakyReLU(alpha=0.1)(hid)
  hid = Conv2D(128, kernel_size=5, strides=1, padding='same')(hid)
  hid = BatchNormalization(momentum=0.9)(hid)
  hid = LeakyReLU(alpha=0.1)(hid)
  hid = Conv2D(128, kernel_size=5, strides=1, padding='same')(hid)
  hid = BatchNormalization(momentum=0.9)(hid)
  hid = LeakyReLU(alpha=0.1)(hid)        
  hid = Conv2D(1, kernel_size=5, strides=1, padding="same")(hid)
  out = Activation("tanh")(hid)
  model = Model(inputs=[input_layer, condition_layer], outputs=out)
  model.summary()
  return model, out

In [None]:
#Creating Discriminator for CGANS
def get_discriminator(input_layer, condition_layer):
    
  hid = Conv2D(128, kernel_size=3, strides=1, padding='same')(input_layer)
  hid = BatchNormalization(momentum=0.9)(hid)
  hid = LeakyReLU(alpha=0.1)(hid)
  hid = Conv2D(128, kernel_size=4, strides=2, padding='same')(hid)
  hid = BatchNormalization(momentum=0.9)(hid)
  hid = LeakyReLU(alpha=0.1)(hid)
  hid = Conv2D(128, kernel_size=4, strides=2, padding='same')(hid)
  hid = BatchNormalization(momentum=0.9)(hid)
  hid = LeakyReLU(alpha=0.1)(hid)
  hid = Conv2D(128, kernel_size=4, strides=2, padding='same')(hid)
  hid = BatchNormalization(momentum=0.9)(hid)
  hid = LeakyReLU(alpha=0.1)(hid)
  hid = Flatten()(hid)
  merged_layer = Concatenate()([hid, condition_layer])
  hid = Dense(512, activation='relu')(merged_layer)
  hid = Dropout(0.4)(hid)
  out = Dense(1, activation='sigmoid')(hid)
  model = Model(inputs=[input_layer, condition_layer], outputs=out)
  model.summary()
  return model, out

In [None]:
from keras.preprocessing import image

#making one_hot_encoder function to get encoders
def one_hot_encode(y):
  z = np.zeros((len(y), 10))
  idx = np.arange(len(y))
  z[idx, y] = 1
  return z

#function to make a latent space vector
def generate_noise(n_samples, noise_dim):
  X = np.random.normal(0, 1, size=(n_samples, noise_dim))
  return X

#function to return one hot encoders corresponding to labels
def generate_random_labels(n):
  y = np.random.choice(10, n)
  y = one_hot_encode(y)
  return y

tags = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
 
#function to generate/print images from generator
def show_samples(batchidx):
  fig, axs = plt.subplots(5, 6, figsize=(10,6))
  plt.subplots_adjust(hspace=0.3, wspace=0.1)
    
  for classlabel in range(10):
    row = int(classlabel / 2)
    coloffset = (classlabel % 2) * 3
    lbls = one_hot_encode([classlabel] * 3)
    noise = generate_noise(3, 100)
    gen_imgs = generator.predict([noise, lbls])

    for i in range(3):
        img = image.array_to_img(gen_imgs[i], scale=True)
        axs[row,i+coloffset].imshow(img)
        axs[row,i+coloffset].axis('off')
        if i ==1:
          axs[row,i+coloffset].set_title(tags[classlabel])
  plt.show()
  plt.close()

In [None]:
#Creating tensors for input images and labels(next 2 lines)
img_input = Input(shape=(28,28,1))
disc_condition_input = Input(shape=(10,))
discriminator, disc_out = get_discriminator(img_input, disc_condition_input)  #'discrminator' will store the model. From now on, we will just need 'dicriminator' from this line for further compilations and classification of fake and real images
discriminator.compile(optimizer=Adam(0.0002, 0.5), loss='binary_crossentropy', metrics=['accuracy'])       
discriminator.trainable = False                                                                            
noise_input = Input(shape=(100,))    #creating tensor                                                                     
gen_condition_input = Input(shape=(10,)) #creating tensor                                                                 
generator, gen_out = get_generator(noise_input, gen_condition_input)
gan_input = Input(shape=(100,))#creating tensor
x = generator([gan_input, gen_condition_input])#feeding random noise/latent space vector and label in 'generator'
gan_out = discriminator([x, disc_condition_input])#whatever comes out of generator after executing the previous line is fed into discriminator along with the labels
gan = Model(inputs=[gan_input, gen_condition_input, disc_condition_input], output=gan_out)#Feeding the above found values into the GAN for both discriminator and generator weights to learn
gan.summary()
gan.compile(optimizer=Adam(0.0002, 0.5), loss='binary_crossentropy')

In [None]:
#creating batchsize, importing dataset from keras library, converting target labels to one hot encoded labels
BATCH_SIZE = 100
(X_train, y_train), (X_test, _) = mnist.load_data()
X_train = (X_train - 127.5) / 127.5
y_train = one_hot_encode(y_train[:])
print ("Training shape: {}".format(X_train.shape))
num_batches = int(X_train.shape[0]/BATCH_SIZE) # number of batches is number of training examples (as given by X_train.shape[0]) divided by batchsize

In [None]:
x=X_train
x = x.reshape((60000, 28, 28, 1))

In [None]:
X_train=x

In [None]:
#Creating an array to store images for experience replay
exp_replay = []

In [None]:
N_EPOCHS = 100 #number of epochs
for epoch in range(N_EPOCHS):
  cum_d_loss = 0.
  cum_g_loss = 0.
  for batch_idx in range(num_batches):
    images = X_train[batch_idx*BATCH_SIZE : (batch_idx+1)*BATCH_SIZE]
    labels = y_train[batch_idx*BATCH_SIZE : (batch_idx+1)*BATCH_SIZE]
    noise_data = generate_noise(BATCH_SIZE, 100)
    random_labels = generate_random_labels(BATCH_SIZE)
    generated_images = generator.predict([noise_data, labels])
    noise_prop = 0.05 
    true_labels = np.zeros((BATCH_SIZE, 1)) + np.random.uniform(low=0.0, high=0.1, size=(BATCH_SIZE, 1))
    flipped_idx = np.random.choice(np.arange(len(true_labels)), size=int(noise_prop*len(true_labels)))
    true_labels[flipped_idx] = 1 - true_labels[flipped_idx]
    d_loss_true = discriminator.train_on_batch([images, labels], true_labels)
    gene_labels = np.ones((BATCH_SIZE, 1)) - np.random.uniform(low=0.0, high=0.1, size=(BATCH_SIZE, 1))
    flipped_idx = np.random.choice(np.arange(len(gene_labels)), size=int(noise_prop*len(gene_labels)))
    gene_labels[flipped_idx] = 1 - gene_labels[flipped_idx]
    d_loss_gene = discriminator.train_on_batch([generated_images, labels], gene_labels)
    r_idx = np.random.randint(BATCH_SIZE)
    exp_replay.append([generated_images[r_idx], labels[r_idx], gene_labels[r_idx]])
    if len(exp_replay) == BATCH_SIZE:
      generated_images = np.array([p[0] for p in exp_replay])
      labels = np.array([p[1] for p in exp_replay])
      gene_labels = np.array([p[2] for p in exp_replay])
      expprep_loss_gene = discriminator.train_on_batch([generated_images, labels], gene_labels)
      exp_replay = []
      break
    d_loss = 0.5 * np.add(d_loss_true, d_loss_gene)
    cum_d_loss += d_loss
    noise_data = generate_noise(BATCH_SIZE, 100)
    random_labels = generate_random_labels(BATCH_SIZE)
    g_loss = gan.train_on_batch([noise_data, random_labels, random_labels], np.zeros((BATCH_SIZE, 1)))
    cum_g_loss += g_loss
  print('\tEpoch: {}, Generator Loss: {}, Discriminator Loss: {}'.format(epoch+1, cum_g_loss/num_batches, cum_d_loss/num_batches))
  show_samples("epoch" + str(epoch))