In [None]:
import cv2
import numpy as np
from numpy import expand_dims
from numpy import zeros
from numpy import ones
from numpy import asarray
from numpy.random import randn
from numpy.random import randint
import random
import glob
import os
import itertools
import math
import copy
from PIL import Image
from skimage.transform import resize
from contextlib import redirect_stdout 
import keras
import keras.backend as K
from keras.models import Sequential, load_model
from keras.layers import Dense, Conv2D, Conv2DTranspose, Dropout
from keras.layers import Activation, LeakyReLU, BatchNormalization, Flatten, Reshape
from keras.initializers import RandomNormal
from keras.optimizers import Adam
from keras.utils import plot_model, np_utils
import tensorflow as tf
import cv2
from keras.preprocessing import image

In [None]:
# # For integrating drive with colab
# from google.colab import files

# #  Google Drive Authentication
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
path = '/content/drive/My Drive/src/'

if not os.path.exists(path):
    os.makedirs(path)
os.chdir(path)

## Loading and Preprocessing 

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:

!unzip -uq "/content/gdrive/MyDrive/Places365val" -d "/content/gdrive/MyDrive/Places365val"


In [None]:
# import cv2
# import os

# def load_images_from_folder(folder):
#     images = []
#     for filename in os.listdir(folder):
#         img = cv2.imread(os.path.join(folder,filename))
#         if img is not None:
#             images.append(img)
#     return images

In [None]:
# load_images_from_folder("/content/drive/My Drive/src/raw_dataset")

In [None]:
#read the images and compress to 128*128*3
path = '/content/gdrive/MyDrive/src/raw_dataset/*.jpg'  
images = [cv2.imread(file) for file in glob.glob(path)]
imagesnp = np.asarray(images) 
# print(imagesnp.size)

resize_images = []
for image in imagesnp:
    compressedimage = resize(image, (128, 128, 3), anti_aliasing=True)
    resize_images.append(compressedimage)
resize_images = np.asarray(resize_images)
# print(imagesnp.size)

# for filename in os.listdir(folder):
#         img = cv2.imread(os.path.join(folder,filename))
#         if img is not None:
#             images.append(img)

# Normalise the dataset
X_train = resize_images/255.0 
X_temp  = copy.deepcopy(X_train)


# Creating mask
total = X_temp.shape[0] #4000
mask_shape  = (total,128,128,1)
mask = np.zeros(mask_shape)
padding_width = 32
mask[:, :, :padding_width, :] = 1.0 # left portion
mask[:, :,-padding_width:, :] = 1.0 # right portion
pix_avg = np.mean(X_temp, axis=(1,2,3))
X_temp[:, :, :padding_width, :] = pix_avg[:, np.newaxis, np.newaxis, np.newaxis]
X_temp[:, :,-padding_width:, :] = pix_avg[:, np.newaxis, np.newaxis, np.newaxis]
X_mask = np.concatenate((X_temp, mask), axis=3)
np.savez('places_dataset.npz', X_train=X_train, X_mask=X_mask) 


#Load the numpy dataset
data = np.load('places_dataset.npz')
X_train = np.load['X_train']
X_mask  = data['X_mask']

np.save('x_mask', X_mask)
np.save('x_train', X_train)

In [None]:
(X_temp.shape) # (4000, 128, 128, 3)

# Define Generator model

In [None]:
def build_generator():
    Gen = Sequential(name='Generator')

    #intialiser - generates tensor with normal distribution
    kernalinit = tf.keras.initializers.RandomNormal(stddev=0.01, seed= None)

    #Encoder block
    Gen.add(Conv2D(64, (5,5), strides=1, dilation_rate=(1,1), input_shape = (128,128,4),kernel_initializer=kernalinit, padding='same', activation='relu' ))
    Gen.add(Conv2D(128, (3,3), strides=2, dilation_rate=(1,1), padding='same', activation='relu')) 
    Gen.add(Conv2D(256, (3,3), strides=1, dilation_rate=(1,1), padding='same', activation='relu'))
    Gen.add(Conv2D(256, (3,3), strides=1, dilation_rate=(2,2), padding='same', activation='relu'))
    Gen.add(BatchNormalization(momentum=0.9))
    Gen.add(Conv2D(256, (3,3), strides=1, dilation_rate=(4,4), padding='same', activation='relu'))
    Gen.add(BatchNormalization(momentum=0.9))
    Gen.add(Conv2D(256, (3,3), strides=1, dilation_rate=(8,8), padding='same', activation='relu'))
    Gen.add(BatchNormalization(momentum=0.9))
    Gen.add(Conv2D(256, (3,3), strides=1, dilation_rate=(1,1), padding='same', activation='relu'))
    Gen.add(BatchNormalization(momentum=0.9))

    #Decoder block
    Gen.add(Conv2DTranspose(128,(4,4),strides=2, padding='same', activation='relu')) 
    Gen.add(BatchNormalization(momentum=0.9)) 
    Gen.add(Conv2D(64, (3,3), strides=1, dilation_rate = (1,1), padding='same', activation='relu'))
    Gen.add(BatchNormalization(momentum=0.9)) 
    Gen.add(Conv2D(3, (3,3), strides=1, activation='sigmoid', padding='same'))
    return Gen

# plot_model(build_generator())

In [None]:
# Save the summary and architecture of the Generator model

# with open('./drive/My Drive/Colab Notebooks/Generator.txt', 'w') as f:
#     with redirect_stdout(f):
#         build_generator().summary()

# Define Discriminator model

In [None]:
def build_discriminator():
    
    Disc = Sequential(name='Discriminator')
    
    #intialiser - generates tensor with normal distribution
    kernalinit = tf.keras.initializers.RandomNormal(stddev=0.01, seed= None)
    # we use strided convolution in discriminators to repeatedly downsample the image
    Disc.add(Conv2D(32, (5,5), strides=2,padding='same',input_shape=(128,128,3), kernel_initializer=kernalinit,  activation='relu'))
    Disc.add(Conv2D(64, (5,5), strides=2, padding='same', activation='relu'))
    Disc.add(Conv2D(64, (5,5), strides=2, padding='same', activation='relu'))
    Disc.add(BatchNormalization(momentum=0.9))
    Disc.add(Conv2D(64, (5,5), strides=2, padding='same', activation='relu'))
    Disc.add(BatchNormalization(momentum=0.9))
    Disc.add(Conv2D(64, (5,5), strides=2, padding='same', activation='relu'))
    Disc.add(BatchNormalization(momentum=0.9))
    Disc.add(Flatten())
    Disc.add(Dropout(0.4))
    Disc.add(Dense(512, activation='relu'))  
    Disc.add(Dense(1,   activation='sigmoid'))  
    return Disc

# plot_model(build_discriminator())

In [1]:
#Save the Summary and Architecture of discriminator model 

# with open('./drive/My Drive/Colab Notebooks/Discriminator.txt', 'w') as f:
#     with redirect_stdout(f):
#         build_discriminator(0.2,0.02).summary()

## Create iteration_count.txt and losses.txt files

In [None]:
if not os.path.exists('iteration_count.txt'):
    with open("iteration_count.txt", "a") as f:
        f.write("Iteration_count : \n")
        
if not os.path.exists('losses.txt'):
    with open("losses.txt", "a") as f:
        f.write("D_Real_Loss D_Fake_Loss D_Total_loss G_Loss\n")

# Defining Loss functions

In [None]:
def G_loss_MSE(y_true, y_pred):
    diff = y_pred - y_true
    mse_loss = K.mean(K.square(diff))
    return mse_loss

def G_loss(y_true, y_pred):
    mse_loss = G_loss_MSE(y_true, y_pred)
    fake_loss = tf.log(tf.maximum(y_pred, K.epsilon()))
    return mse_loss - 0.001*tf.reduce_mean(fake_loss)  

def D_loss(y_true, y_pred):
    real_prediction = tf.maximum(y_true, K.epsilon())
    fake_prediction = tf.maximum(1.0 - y_pred, K.epsilon())
    real_loss = tf.log(real_prediction)
    fake_loss = tf.log(fake_prediction)
    loss = -tf.reduce_mean(real_loss+fake_loss)
    return loss


## Build GAN model

In [None]:
def build_gan(g_learning_rate, d_learning_rate, g_beta_1, d_beta_1, flag = True):
    
    # discriminator
    dis = build_discriminator()
    D_optimizer = Adam(learning_rate=d_learning_rate, beta_1=d_beta_1)
    dis.compile(optimizer = D_optimizer,loss = D_loss, metrics=['binary_accuracy'])
    
    # generator
    gen = build_generator()

    # gan
    gan = Sequential([gen,dis])
    gan_optimizer = Adam(learning_rate=g_learning_rate, beta_1= g_beta_1)
    
    if flag == True:
        gan.compile(optimizer=gan_optimizer, loss=G_loss_MSE, metrics=['binary_accuracy'])
    else:
        gan.compile(optimizer=gan_optimizer, loss=G_loss, metrics=['binary_accuracy'])
    
    return gan, gen, dis

In [None]:
#make the models trainable or untrainable based on need
def trainable(gan_model):
    for layer in gan_model.layers:
        layer.trainable = True

def non_trainable(gan_model):
    for layer in gan_model.layers:
        layer.trainable = False       
        
# if input is real data, set as np.ones and if fake, set as np.zeros
def make_labels(batch_size, isReal):
    if isReal==True:
        return np.ones([batch_size, 1])
    else:
        return np.zeros([batch_size, 1])
    



## Training GAN

In [None]:
def train_gan(g_learning_rate, d_learning_rate,g_beta_1,d_beta_1):

    max_iters = 1000
    gen_iters = 200
    disc_iters = 20
    batch_size = 64     
    
    # if old gan, generator and discriminator exist, load them, else build new
    if os.path.exists('generator.h5'):
        generator     = load_model('generator.h5')
        discriminator = load_model('discriminator.h5')
        gan           = load_model('gan.h5')
    else:
        gan, generator, discriminator = build_gan(g_learning_rate, g_beta_1, d_learning_rate, d_beta_1)
    
    # if iteration_count.txt exists, then get the prev_iter as model is trained before
    prev_iter = 0
    if os.path.exists('iteration_count.txt'):
        prev_iter = sum(1 for line in open('iteration_count.txt'))-1
    print('Previous iteration count :',prev_iter)
    
    # labels for real and fake images
    y_train_real = make_labels(batch_size,True)
    y_train_fake = make_labels(batch_size,False)
    
    for iters in range(prev_iter, max_iters):
        print("Iteration :",iters)
        with open('iteration_count.txt', 'w') as f:
            f.write(str(iters))
        
        # Train the generator
        if iters < gen_iters: 
            discriminator.trainable = False 
            X_batch_masked = X_mask[np.random.choice(X_mask.shape[0], batch_size , replace = True), :] 
            gan.train_on_batch(X_batch_masked, y_train_real)
    
    
        # Train the discriminator
        elif iters < (gen_iters + disc_iters):
            discriminator.trainable = True 
            X_batch_real   = X_train[np.random.choice(X_train.shape[0], batch_size , replace = True), :] 
            X_batch_masked = X_mask[np.random.choice(X_mask.shape[0], batch_size , replace = True), :] 
            X_batch_fake   = generator.predict_on_batch(X_batch_masked)

            discriminator.train_on_batch(X_batch_real, y_train_real)
            discriminator.train_on_batch(X_batch_fake, y_train_fake)
        
        #  Train both generator and discriminator adversarially
        else:
            if iters==(gen_iters + disc_iters):
                gan, generator,_ = build_gan(g_learning_rate,d_learning_rate, g_beta_1, d_beta_1, flag = False)
                gan.set_weights(gan.get_weights())
                generator.set_weights(generator.get_weights())
                
            # Train discriminator
            discriminator.trainable = True 

            X_batch_real   = X_train[np.random.choice(X_train.shape[0], batch_size , replace = True), :] 
            X_batch_masked = X_mask[np.random.choice(X_mask.shape[0], batch_size , replace = True), :] 
            X_batch_fake   = generator.predict_on_batch(X_batch_masked)

            discriminator.train_on_batch(X_batch_real, y_train_real)
            discriminator.train_on_batch(X_batch_fake, y_train_fake)

            # Train generator
            discriminator.trainable = False 
            gan.train_on_batch(X_batch_masked, y_train_real)
        
        
        # Training Losses
        if (iters + 1) % 100 == 0:
            X_batch_masked = X_mask[np.random.choice(X_mask.shape[0], batch_size , replace = True), :]        

            gan_images     = generator.predict_on_batch(X_batch_masked)
            g_loss_batch   = gan.test_on_batch(X_batch_masked, y_train_real)

            X_batch_real    = X_train[np.random.choice(X_train.shape[0], batch_size , replace = True), :] 

            d_loss_real  = discriminator.test_on_batch(X_batch_real, y_train_real)
            d_loss_fake  = discriminator.test_on_batch(gan_images, y_train_fake)

            with open("losses.txt", "a") as f:
                total = 0.5 * ( round(d_loss_real[0],12) + round(d_loss_fake[0],12) )
                f.write( str(round(d_loss_real[0],12)) + ' ' + str(round(d_loss_fake[0],12)) +' ' + str(total) + ' ' + str( round(g_loss_batch[0],12) ) )
                f.write('\n')
        

        # Save models at regular intervals once gan starts training adversarially
        if (iters >= (gen_iters + disc_iters)) and ((iters + 1) % 100 == 0):
            if os.path.exists('generator.h5'):
                os.remove('generator.h5')
            generator.save('generator.h5')

            if os.path.exists("discriminator.h5"):
                os.remove("discriminator.h5")
            discriminator.save('discriminator.h5')

            if os.path.exists("gan.h5"):
                os.remove("gan.h5")
            gan.save('gan.h5')
            
    
    return gan, generator, discriminator

In [None]:
gan, generator, discriminator = train_gan(g_learning_rate=0.0001,d_learning_rate=0.0001,g_beta_1=0.4,d_beta_1=0.4)