In [1]:
import matplotlib.pyplot as plt
import numpy as np
import os
import time

import tensorflow as tf
from tensorflow.keras import backend, optimizers, Model, applications, metrics, activations, losses
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, LeakyReLU, Dropout, Flatten, Dense, Reshape, Concatenate, Conv2DTranspose, ReLU, Activation
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping, CSVLogger, TensorBoard
import tensorflow.keras.utils as utils

In [2]:
from google.colab import drive
from google.colab import files 


drive.mount("/content/gdrive")
print(os.getcwd())
os.chdir("/content/gdrive/My Drive/AML2021/AML2021")
print(os.getcwd())

ModuleNotFoundError: No module named 'google.colab'

In [12]:
EPOCHS = 2000
INITIAL_EPOCH = 0
BATCH_SIZE = 16
INPUT_SHAPE = (299,299,3)
CLASSES = 2
LATENT_DIMS = 100
OPT_GEN = optimizers.Adam(learning_rate=0.0002, beta_1=0.5)
OPT_DISC = optimizers.Adam(learning_rate=0.0002, beta_1=0.5)
os.makedirs('models/GAN', exist_ok = True) 
os.makedirs('models/GAN/imgs', exist_ok = True)

# Discriminator

In [4]:
def block(i, filter, kernel, stride):
  x = Conv2D(filter, kernel, strides=stride, padding='same')(i)
  x = BatchNormalization()(x)
  x = LeakyReLU(alpha=0.2)(x)
  x = Dropout(0.5)(x)
  return x

i = Input(shape=INPUT_SHAPE)
x = block(i, 32, (3,3), (1,1))
x = block(x, 64, (3,3), (2,2))
x = block(x, 64, (3,3), (1,1))
x = block(x, 128, (3,3), (2,2))
x = block(x, 128, (3,3), (1,1))
x = block(x, 256, (3,3), (2,2))
x = block(x, 256, (3,3), (1,1))
x = block(x, 512, (3,3), (2,2))
x = Flatten()(x)
real_fake_out = Dense(1, activation="sigmoid")(x)
class_out = Dense(2, activation="softmax")(x)

discriminator = Model(i,[real_fake_out, class_out], name="discriminator")
discriminator.summary()

Model: "discriminator"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 299, 299, 3) 0                                            
__________________________________________________________________________________________________
conv2d (Conv2D)                 (None, 299, 299, 32) 896         input_1[0][0]                    
__________________________________________________________________________________________________
batch_normalization (BatchNorma (None, 299, 299, 32) 128         conv2d[0][0]                     
__________________________________________________________________________________________________
leaky_re_lu (LeakyReLU)         (None, 299, 299, 32) 0           batch_normalization[0][0]        
______________________________________________________________________________________

In [5]:
y, r = discriminator(np.random.rand(1,299,299,3).astype("float32"))
print(y,r)

tf.Tensor([[0.49945325]], shape=(1, 1), dtype=float32) tf.Tensor([[0.5001768  0.49982324]], shape=(1, 2), dtype=float32)


#Generator

In [6]:
# original paper has pading=same and filtersize=5, we adept this a bit to match the requiered outputsize of 299,299
def block(i, filter, kernel, strides=(2,2), padding='same'):
  x = Conv2DTranspose(filter, kernel, strides=strides, padding=padding)(i)
  x = BatchNormalization()(x)
  x = ReLU()(x)
  return x

z = Input(shape=(LATENT_DIMS,))
zx = Dense(9*9*1024)(z)
zx = Reshape((9,9,1024))(zx)
c = Input(shape=(CLASSES,))
cx = Dense(9*9*1)(c)
cx = Reshape((9,9,1))(cx)
x = Concatenate(axis=-1)([zx,cx])
x = block(x, 512, (5,5), strides=(2,2), padding='same')#18x18x512
x = block(x, 256, (5,5), strides=(2,2), padding='same')#36x36x256
x = block(x, 128, (4,4), strides=(2,2), padding='valid')#74x74x128
x = block(x, 64, (5,5), strides=(2,2), padding='same')#148x148x64
x = Conv2DTranspose(3, (5,5), strides=(2,2), padding='valid')(x)#299x299x64
o = activations.sigmoid(x)# original paper has tanh, but since we normalize between 0,1 instead of -1,1 we use sigmoid

generator = Model([z,c],o)
generator.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            [(None, 100)]        0                                            
__________________________________________________________________________________________________
input_3 (InputLayer)            [(None, 2)]          0                                            
__________________________________________________________________________________________________
dense_2 (Dense)                 (None, 82944)        8377344     input_2[0][0]                    
__________________________________________________________________________________________________
dense_3 (Dense)                 (None, 81)           243         input_3[0][0]                    
______________________________________________________________________________________________

In [7]:
generator([np.random.rand(1,100).astype("float32"), np.random.rand(1,2).astype("float32")]).shape

TensorShape([1, 299, 299, 3])

# Training

In [8]:
# Load Data
x_train = np.load("x_train.npy")
y_train = np.load("y_train.npy")

print(x_train.shape, y_train.shape)
print(x_train.min(), x_train.max())

(16324, 299, 299, 3) (16324, 2)
0 255


In [None]:
previous_disc_loss = float('inf')
previous_gen_loss = float('inf')

for epoch in range(INITIAL_EPOCH, EPOCHS):
  num_batches = int(np.ceil(x_train.shape[0]/BATCH_SIZE))
  gen_losses = []
  disc_losses = []
  # batching
  for batch in range(int(num_batches)):
    indices = np.arange(x_train.shape[0])
    np.random.shuffle(indices)
    batch_start = batch*BATCH_SIZE
    batch_end = batch_start+BATCH_SIZE
    batch_idx = indices[batch_start:batch_end]

    # training
    random_latent_vectors = tf.random.normal(shape=(BATCH_SIZE, LATENT_DIMS))
    condition = tf.one_hot(
        tf.random.uniform(shape=(BATCH_SIZE, ), minval=0, maxval=2, dtype=tf.int32),
        depth=2, dtype=tf.int32)
    real = tf.cast(utils.normalize(x_train[batch_idx], axis=0), tf.float32)

    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
      fake = generator([random_latent_vectors, condition])

      discrimination_real, y_real = discriminator(real)
      discrimination_fake, y_fake = discriminator(fake)

      binary_cross_entropy = losses.BinaryCrossentropy()
      categorical_cross_entropy = losses.CategoricalCrossentropy()
      loss_disc_real = binary_cross_entropy(tf.ones_like(discrimination_real), discrimination_real)
      loss_disc_fake = binary_cross_entropy(tf.zeros_like(discrimination_fake), discrimination_fake)
      loss_cls_real = categorical_cross_entropy(y_train[batch_idx], y_real)
      loss_cls_fake = categorical_cross_entropy(condition, y_fake)
      loss_disc = loss_disc_real + loss_disc_fake + loss_cls_real + loss_cls_fake

      loss_gen_disc = binary_cross_entropy(tf.ones_like(discrimination_fake), discrimination_fake)
      loss_gen_cls = categorical_cross_entropy(condition, y_fake)
      loss_gen = loss_gen_disc + loss_gen_cls
    
    grads_disc = disc_tape.gradient(loss_disc, discriminator.trainable_weights)
    OPT_DISC.apply_gradients(zip(grads_disc, discriminator.trainable_weights))

    grads_gen = gen_tape.gradient(loss_gen, generator.trainable_weights)
    OPT_GEN.apply_gradients(zip(grads_gen, generator.trainable_weights))

    # reporting
    gen_losses.append(loss_gen.numpy())
    disc_losses.append(loss_disc.numpy())

    if batch == 0:
        z = tf.random.normal(shape=(2, LATENT_DIMS))
        c = tf.one_hot([0, 1], depth=2, dtype=tf.int32)

        imgs = generator([z, c])
        img_0 = tf.keras.preprocessing.image.array_to_img(imgs[0])
        img_1 = tf.keras.preprocessing.image.array_to_img(imgs[1])

        img_0.save(f"models/GAN/imgs/generated_image_c0_ep{epoch:04}.png")
        img_1.save(f"models/GAN/imgs/generated_image_c1_ep{epoch:04}.png")
        
    print(f"\r epoch {epoch}/{EPOCHS}, batch {batch+1}/{num_batches}", end="")
  
  avg_disc_loss = sum(disc_losses) / len(disc_losses)
  avg_gen_loss = sum(gen_losses) / len(gen_losses)
  print(f", disc loss= {avg_disc_loss}, gen loss= {avg_gen_loss}")
  with open("models/GAN/log.csv","a+") as f:
    f.write(f"{epoch}, {avg_gen_loss}, {avg_disc_loss}")

  # saving
  if avg_disc_loss < previous_disc_loss:
    discriminator.save('models/GAN/best-Discriminator.hdf5')
    print(f"disc loss improved from {previous_disc_loss} to {avg_disc_loss}, best-Discriminator.hdf5 saved")
  if avg_gen_loss < previous_gen_loss:
    generator.save('models/GAN/best-Generator.hdf5')
    print(f"gen loss improved from {previous_gen_loss} to {avg_gen_loss}, best-Generator.hdf5 saved")
  previous_disc_loss = avg_disc_loss
  previous_gen_loss = avg_gen_loss

 epoch 0/2000, batch 361/1021