<a href="https://colab.research.google.com/github/meabhl85/Canvas-System/blob/main/GAN_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### Imports and Google Drive Connection

In [1]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Reshape, Dropout, Dense 
from tensorflow.keras.layers import Flatten, BatchNormalization
from tensorflow.keras.layers import Activation, ZeroPadding2D
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import UpSampling2D, Conv2D
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.constraints import Constraint
from tensorflow.keras import backend
import numpy as np
from numpy.random import randn
from numpy.random import randint
from numpy import expand_dims
from numpy import mean
from numpy import ones
from PIL import Image
from tqdm import tqdm
import os 
import time
import matplotlib.pyplot as plt
from matplotlib import pyplot
from tensorflow.keras.initializers import RandomNormal

try:
    from google.colab import drive
    drive.mount('/content/drive', force_remount=True)
    COLAB = True
    print("Note: using Google CoLab")
    %tensorflow_version 2.x
except:
    print("Note: not using Google CoLab")
    COLAB = False

#Nicely formatted time string
def hms_string(sec_elapsed):
  h = int(sec_elapsed / (60 * 60))
  m = int((sec_elapsed % (60 * 60)) / 60)
  s = sec_elapsed % 60
  return "{}:{:>02}:{:>05.2f}".format(h, m, s)
  

Mounted at /content/drive
Note: using Google CoLab


#### Model Hyperparamters

In [5]:
#Image Sizes
GENERATE_RES = 5 # Generation resolution factor 
GENERATE_SQUARE = 32 * GENERATE_RES # rows/cols (should be square)
IMAGE_CHANNELS = 3

#Input and Output Paths
NPY_FILES_PATH = '/content/drive/MyDrive/Colab Notebooks/honours_project/Npy_Files/'
OUTPUT_IMAGE_PATH = '/content/drive/MyDrive/Colab Notebooks/honours_project/GAN_Images/'

# Preview image 
PREVIEW_ROWS = 2
PREVIEW_COLS = 3
PREVIEW_MARGIN = 16

# Size vector to generate images from
SEED_SIZE = 800
EPOCHS = 1000
BATCH_SIZE = 32
BUFFER_SIZE = 60000

GENERATOR_OPT = tf.keras.optimizers.Adam(1.2e-4, 0.4)
DISCRIMINATOR_OPT = tf.keras.optimizers.Adam(1.2e-4, 0.4)


#### Model Class

In [6]:
class GAN:
  def __init__(self, input_data_name):
    self.data_name = input_data_name
    self.image_shape = (GENERATE_SQUARE, GENERATE_SQUARE, IMAGE_CHANNELS)
    self.training_binary_path = os.path.join(NPY_FILES_PATH, self.data_name + f'_training_data_{GENERATE_SQUARE}_{GENERATE_SQUARE}.npy')
    self.loss_function = tf.keras.losses.BinaryCrossentropy()
    self.output_image_paths = self.data_name

    self.output_image_path = os.path.join(OUTPUT_IMAGE_PATH, os.path.join(self.data_name, f'images_square_{GENERATE_SQUARE}_res_{GENERATE_RES}_seed_{SEED_SIZE}_epochs_{EPOCHS}_batchSize_{BATCH_SIZE}_bufferSize_{BUFFER_SIZE}.npy'))
    if not os.path.exists(self.output_image_path):
      os.makedirs(self.output_image_path)

    print(f"Looking for file: {self.training_binary_path}")
    if not os.path.isfile(self.training_binary_path):
      print('Failed loading NPY file...')
    else:
      print("Loading previous training pickle...")
      training_data = np.load(self.training_binary_path)
      
      #Batch and Shuffle the Data
      self.training_dataset = tf.data.Dataset.from_tensor_slices(training_data).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)  
    
  
  def build_generator(self):
    model = Sequential()

    model.add(Dense(4*4*256, activation = "relu", input_dim = SEED_SIZE))
    model.add(Reshape((4, 4, 256)))

    model.add(UpSampling2D())
    model.add(Conv2D(256, kernel_size = 3, padding = "same"))
    model.add(BatchNormalization(momentum = 0.8))
    model.add(Activation("relu"))

    model.add(UpSampling2D())
    model.add(Conv2D(256, kernel_size = 3, padding = "same"))
    model.add(BatchNormalization(momentum = 0.8))
    model.add(Activation("relu"))
   
    # Output resolution, additional upsampling
    model.add(UpSampling2D())
    model.add(Conv2D(128, kernel_size = 3, padding = "same"))
    model.add(BatchNormalization(momentum = 0.8))
    model.add(Activation("relu"))

    if GENERATE_RES > 1:
      model.add(UpSampling2D(size = (GENERATE_RES, GENERATE_RES)))
      model.add(Conv2D(128,kernel_size=3, padding="same"))
      model.add(BatchNormalization(momentum = 0.8))
      model.add(Activation("relu"))

    # Final CNN layer
    model.add(Conv2D(IMAGE_CHANNELS, kernel_size=3, padding="same"))
    model.add(Activation("tanh"))

    return model

  def build_discriminator(self):
    model = Sequential()

    model.add(Conv2D(32, kernel_size = 3, strides = 2, input_shape = self.image_shape, padding = "same"))
    model.add(LeakyReLU(alpha = 0.2))

    model.add(Dropout(0.25))
    model.add(Conv2D(64, kernel_size = 3, strides = 2, padding = "same"))
    model.add(ZeroPadding2D(padding = ((0, 1), (0, 1))))
    model.add(BatchNormalization(momentum = 0.8))
    model.add(LeakyReLU(alpha = 0.2))

    model.add(Dropout(0.25))
    model.add(Conv2D(128, kernel_size = 3, strides = 2, padding = "same"))
    model.add(BatchNormalization(momentum = 0.8))
    model.add(LeakyReLU(alpha = 0.2))

    model.add(Dropout(0.25))
    model.add(Conv2D(256, kernel_size = 3, strides = 1, padding = "same"))
    model.add(BatchNormalization(momentum = 0.8))
    model.add(LeakyReLU(alpha = 0.2))

    model.add(Dropout(0.25))
    model.add(Conv2D(512, kernel_size = 3, strides = 1, padding = "same"))
    model.add(BatchNormalization(momentum = 0.8))
    model.add(LeakyReLU(alpha = 0.2))

    model.add(Dropout(0.25))
    model.add(Flatten())
    model.add(Dense(1, activation = 'sigmoid'))

    return model

  def generator_loss(self, fake_pred):
    #As G wants to create real images the loss is cross entropy of 1's and the generated output
    return self.loss_function(tf.ones_like(fake_pred), fake_pred)

  def discriminator_loss(self, real_pred, fake_pred):
    #D should compare the real predictions to 1 and fake to 0 (as they are the real values)
    real_image_loss = self.loss_function(tf.ones_like(real_pred), real_pred)
    fake_image_loss = self.loss_function(tf.zeros_like(fake_pred), fake_pred)

    #Discriminator uses the complete loss
    final_loss = real_image_loss + fake_image_loss
    return final_loss

  def compile_models(self):
    self.generator_optimiser = GENERATOR_OPT
    self.discriminator_optimiser = DISCRIMINATOR_OPT

  @tf.function
  def step(self, real_images):
    seed = tf.random.normal([BATCH_SIZE, SEED_SIZE])

    #Performs automatic differentiation used for backpropagation 
    with tf.GradientTape() as g_tape, tf.GradientTape() as d_tape:
      #Gnerate images with noise vector
      generated_images = self.generator(seed, training = True)

      #Put resulting image into the D and return prediction
      real_pred = self.discriminator(real_images, training = True)
      fake_pred = self.discriminator(generated_images, training = True)

      #Calculate loss for both D and G
      g_loss = self.generator_loss(fake_pred)
      d_loss = self.discriminator_loss(real_pred, fake_pred)

    #Calculate gradients
    gen_gradients = g_tape.gradient(g_loss, self.generator.trainable_variables)
    dis_gradients = d_tape.gradient(d_loss, self.discriminator.trainable_variables)

    #Update weights 
    self.generator_optimiser.apply_gradients(zip(gen_gradients, self.generator.trainable_variables))
    self.discriminator_optimiser.apply_gradients(zip(dis_gradients, self.discriminator.trainable_variables))

    return g_loss, d_loss

  def train(self, data, epochs, save_epoch, save_checkpoint):
    fixed_seed = np.random.normal(0, 1, (PREVIEW_ROWS * PREVIEW_COLS, SEED_SIZE))
    start = time.time()

    for epoch in range(epochs):
      #Initalise timing and loss lists
      epoch_time = time.time()
      g_loss_list = []
      d_loss_list = []

      #Run the step training
      for image_batch in data:
        losses = self.step(image_batch)

        #Record losses
        g_loss_list.append(losses[0])
        d_loss_list.append(losses[1])
      
      #Average losses for entire image batch
      average_g_loss = sum(g_loss_list) / len(g_loss_list)
      average_d_loss = sum(d_loss_list) / len(d_loss_list)

      #Calculate epoch time
      epoch_finished = time.time() - epoch_time
      print (f'Epoch {epoch + 1}, gen loss={average_g_loss}, disc loss={average_d_loss}, epoch time={hms_string(epoch_finished)}')
      
      #Check for saving
      if epoch % save_epoch:
        print("Saving example image...")
        self.save_images(epoch, fixed_seed)
      if epoch % save_checkpoint == 0:
        print("Saving models...")
        self.save_models()

    #End of epoch processing time
    final_time = time.time()-start
    print (f'Training time: {hms_string(final_time)}')

  def save_models(self):
    output_model_path = os.path.join(OUTPUT_IMAGE_PATH, self.data_name + "_Models")
    if not os.path.exists(output_model_path):
      os.makedirs(output_model_path)

    self.generator.save(os.path.join(output_model_path, f'generator_model_{GENERATE_SQUARE}_res_{GENERATE_RES}_seed_{SEED_SIZE}_epochs_{EPOCHS}_batchSize_{BATCH_SIZE}_bufferSize_{BUFFER_SIZE}.npy'))
    self.discriminator.save(os.path.join(output_model_path, f'discriminator_model_{GENERATE_SQUARE}_res_{GENERATE_RES}_seed_{SEED_SIZE}_epochs_{EPOCHS}_batchSize_{BATCH_SIZE}_bufferSize_{BUFFER_SIZE}.npy'))

  def load_models(self):
    output_model_path = os.path.join(OUTPUT_IMAGE_PATH, self.data_name + "_Models")
    self.generator = load_model(os.path.join(output_model_path, f'generator_model_{GENERATE_SQUARE}_res_{GENERATE_RES}_seed_{SEED_SIZE}_epochs_{EPOCHS}_batchSize_{BATCH_SIZE}_bufferSize_{BUFFER_SIZE}.npy'))
    self.discriminator = load_model(os.path.join(output_model_path, f'discriminator_model_{GENERATE_SQUARE}_res_{GENERATE_RES}_seed_{SEED_SIZE}_epochs_{EPOCHS}_batchSize_{BATCH_SIZE}_bufferSize_{BUFFER_SIZE}.npy'))

  #Saves images in cols and rows specified
  def save_images(self, count, noise):
    image_array = np.full(( 
        PREVIEW_MARGIN + (PREVIEW_ROWS * (GENERATE_SQUARE+PREVIEW_MARGIN)), 
        PREVIEW_MARGIN + (PREVIEW_COLS * (GENERATE_SQUARE+PREVIEW_MARGIN)), 3), 
        255, dtype=np.uint8)
    
    #Generate fake images from input noise
    generated_images = self.generator.predict(noise)

    generated_images = 0.5 * generated_images + 0.5

    image_count = 0
    for row in range(PREVIEW_ROWS):
        for col in range(PREVIEW_COLS):
          r = row * (GENERATE_SQUARE+16) + PREVIEW_MARGIN
          c = col * (GENERATE_SQUARE+16) + PREVIEW_MARGIN
          image_array[r:r+GENERATE_SQUARE,c:c+GENERATE_SQUARE] \
              = generated_images[image_count] * 255
          image_count += 1

    filename = os.path.join(self.output_image_path, f"train-{count}.png")
    im = Image.fromarray(image_array)
    im.save(filename)

  def show_image(self, data):
    plt.figure()
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    plt.imshow(data[0])
    plt.show()

  def run(self, new_model, save_img_checkpoint, save_model_checkpoint):
    
    if new_model:
      #Build generator and discriminator 
      self.generator = self.build_generator()
      self.discriminator = self.build_discriminator()
    else:
      #Load previous model
      self.load_models()

    #Compiling G and D
    self.compile_models()

    print('Starting training...')
    self.train(self.training_dataset, EPOCHS, save_img_checkpoint, save_model_checkpoint)

    print('Saving generator model...')
    self.save_models()


#### Running the Model

In [4]:
if __name__ == "__main__":
  #Initialise gan class with art style
  gan = GAN('Early_Renaissance')

  #Run model
  gan.run(new_model = True, save_img_checkpoint = 100, save_model_checkpoint = 500)

Looking for file: /content/drive/MyDrive/Colab Notebooks/honours_project/Npy_Files/Early_Renaissance_training_data_160_160.npy
Loading previous training pickle...
Starting training...
Epoch 1, gen loss=1.7659804821014404, disc loss=1.8178436756134033, epoch time=0:01:00.76
INFO:tensorflow:Assets written to: /content/drive/MyDrive/Colab Notebooks/honours_project/GAN_Images/Early_Renaissance_Models/generator_model_160_res_5_seed_800_epochs_1000_batchSize_32_bufferSize_60000.npy/assets
INFO:tensorflow:Assets written to: /content/drive/MyDrive/Colab Notebooks/honours_project/GAN_Images/Early_Renaissance_Models/discriminator_model_160_res_5_seed_800_epochs_1000_batchSize_32_bufferSize_60000.npy/assets


KeyboardInterrupt: ignored