In [1]:
import tensorflow as tf 
from tensorflow.keras.layers import Input, Reshape, Dropout, Dense 
from tensorflow.keras.layers import Flatten, BatchNormalization
from tensorflow.keras.layers import Activation, ZeroPadding2D
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import UpSampling2D, Conv2D
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.optimizers import Adam, SGD, Nadam
import random
import numpy as np
from PIL import Image
from tqdm import tqdm
import os 
import time
import matplotlib.pyplot as plt
### In order to spped up training.  Use the notebook as part of Google Colab with GPU Runtimes enabled

gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

Thu Dec  2 23:14:34 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 495.44       Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla P100-PCIE...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   47C    P0    30W / 250W |      0MiB / 16280MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [2]:
### Check to see if the notebook is running on Google Colab or locally. 
### If running on Google Colab, you will need to connect your Google Drive
### in order to access training data
try:
    from google.colab import drive
    drive.mount('/content/drive', force_remount=True)
    COLAB = True
    print("Note: using Google CoLab")
    %tensorflow_version 2.x
except:
    print("Note: not using Google CoLab")
    COLAB = False

Mounted at /content/drive
Note: using Google CoLab


In [3]:
# Generation resolution - Must be square 
# Training data is also scaled to this.
# Note GENERATE_RES 4 or higher will blow Google CoLab's memory

GENERATE_RES = 2 # Generation resolution factor 
# (1=32, 2=64, 3=96, 4=128, etc.)
GENERATE_SQUARE = 32 * GENERATE_RES # rows/cols (should be square)
IMAGE_CHANNELS = 3

### These constants are used to support the output of sample images in a grid foramt
# Preview image 
PREVIEW_ROWS = 4
PREVIEW_COLS = 7
PREVIEW_MARGIN = 16

# Size vector to generate images from (sandard value in most research is 100)
SEED_SIZE = 100


### Modify these values to fine tune training and provide a path to training data if using Google Colab
DATA_PATH = '/content/drive/MyDrive/ColabNotebooks/Pet/'
EPOCHS = 1000
BATCH_SIZE = 16
BUFFER_SIZE = 60000


In [4]:

### To save time on subsequent runs, save the training images in .npy file.  This prevents the files from being read each time the process is re-ran
training_binary_path = os.path.join(DATA_PATH,  f'training_data_{GENERATE_SQUARE}_{GENERATE_SQUARE}.npy')
print(f"Looking for file: {training_binary_path}")

### If the .npy file is not found load images from image path and save a new .npy file
if not os.path.isfile(training_binary_path):
  print("Loading training images...")
  try:
    training_data = []
    faces_path = os.path.join(DATA_PATH,'PetImbalance')
    print("PATH:", faces_path)
    for filename in tqdm(os.listdir(faces_path)):
        path = os.path.join(faces_path,filename)
        ### Some images may be black and white instead of color.  
        ### Corrupt images may also be present. 
        ### Use try / except to remove those files when loading
        try:
          image = Image.open(path).convert('RGB').resize((GENERATE_SQUARE,GENERATE_SQUARE),Image.ANTIALIAS)
          training_data.append(np.asarray(image))
        except: 
          print("bad file:", path, filename)
  except:
    print("Exception ",path )
  training_data = np.reshape(training_data,(-1,GENERATE_SQUARE, GENERATE_SQUARE,IMAGE_CHANNELS))
  training_data = training_data.astype(np.float32)
  training_data = training_data / 127.5 - 1.


  print("Saving training image binary...")
  np.save(training_binary_path,training_data)
else:
  print("Loading previous training binary...")
  training_data = np.load(training_binary_path)

Looking for file: /content/drive/MyDrive/ColabNotebooks/Pet/training_data_64_64.npy
Loading previous training binary...


In [5]:
# Batch and shuffle the data
train_dataset = tf.data.Dataset.from_tensor_slices(training_data).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)


In [6]:
def build_generator(seed_size, channels):
    model = Sequential()

    model.add(Dense(4*4*256,activation="relu",input_dim=seed_size))
    model.add(Reshape((4,4,256)))

    model.add(UpSampling2D())
    model.add(Conv2D(256,kernel_size=3,padding="same"))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Activation("relu"))

    model.add(UpSampling2D())
    model.add(Conv2D(256,kernel_size=3,padding="same"))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Activation("relu"))
   
    # Output resolution, additional upsampling
    model.add(UpSampling2D())
    model.add(Conv2D(128,kernel_size=3,padding="same"))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Activation("relu"))

    if GENERATE_RES>1:
      model.add(UpSampling2D(size=(GENERATE_RES,GENERATE_RES)))
      model.add(Conv2D(128,kernel_size=3,padding="same"))
      model.add(BatchNormalization(momentum=0.8))
      model.add(Activation("relu"))

    # Final CNN layer
    model.add(Conv2D(channels,kernel_size=3,padding="same"))
    model.add(Activation("tanh"))

    return model


def build_discriminator(image_shape):
    model = Sequential()

    model.add(Conv2D(32, kernel_size=3, strides=2, input_shape=image_shape, 
                     padding="same"))
    model.add(LeakyReLU(alpha=0.2))

    model.add(Dropout(0.25))
    model.add(Conv2D(64, kernel_size=3, strides=2, padding="same"))
    model.add(ZeroPadding2D(padding=((0,1),(0,1))))
    model.add(BatchNormalization(momentum=0.8))
    model.add(LeakyReLU(alpha=0.2))

    model.add(Dropout(0.25))
    model.add(Conv2D(128, kernel_size=3, strides=2, padding="same"))
    model.add(BatchNormalization(momentum=0.8))
    model.add(LeakyReLU(alpha=0.2))

    model.add(Dropout(0.25))
    model.add(Conv2D(256, kernel_size=3, strides=1, padding="same"))
    model.add(BatchNormalization(momentum=0.8))
    model.add(LeakyReLU(alpha=0.2))

    model.add(Dropout(0.25))
    model.add(Conv2D(512, kernel_size=3, strides=1, padding="same"))
    model.add(BatchNormalization(momentum=0.8))
    model.add(LeakyReLU(alpha=0.2))

    model.add(Dropout(0.25))
    model.add(Flatten())
    model.add(Dense(1, activation='sigmoid'))

    return model


In [7]:
def save_images(cnt,noise,file_name_prefix="default_"):
  ### Create a grid to output multiple generated images at the same time. 
  image_array = np.full(( PREVIEW_MARGIN + (PREVIEW_ROWS * (GENERATE_SQUARE+PREVIEW_MARGIN)),  PREVIEW_MARGIN + (PREVIEW_COLS * (GENERATE_SQUARE+PREVIEW_MARGIN)), IMAGE_CHANNELS),  255, dtype=np.uint8)
  
  generated_images = generator.predict(noise)
  generated_images = 0.5 * generated_images + 0.5

  image_count = 0
  for row in range(PREVIEW_ROWS):
      for col in range(PREVIEW_COLS):
        r = row * (GENERATE_SQUARE+16) + PREVIEW_MARGIN
        c = col * (GENERATE_SQUARE+16) + PREVIEW_MARGIN
        image_array[r:r+GENERATE_SQUARE,c:c+GENERATE_SQUARE] \
            = generated_images[image_count] * 255
        image_count += 1

          
  output_path = os.path.join(DATA_PATH,'output')
  if not os.path.exists(output_path):
    os.makedirs(output_path)
  
  filename = os.path.join(output_path,f"{file_name_prefix}train-{cnt}.png")
  im = Image.fromarray(image_array)
  im.save(filename)

In [8]:
generator = build_generator(SEED_SIZE, IMAGE_CHANNELS)
#generator.save(os.path.join(DATA_PATH,"gen1.h5"))

#noise = tf.random.normal([1, SEED_SIZE])
image_shape = (GENERATE_SQUARE,GENERATE_SQUARE,IMAGE_CHANNELS)

discriminator = build_discriminator(image_shape)
#discriminator.save(os.path.join(DATA_PATH,"disc1.h5"))

In [9]:
# This method returns a helper function to compute cross entropy loss
cross_entropy = tf.keras.losses.BinaryCrossentropy()
def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

In [10]:

def train_step(images,generator,discriminator,gen_opt, disc_opt):

  seed = tf.random.normal([BATCH_SIZE, SEED_SIZE])

  with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
    generated_images = generator(seed, training=True)

    real_output = discriminator(images, training=True)
    fake_output = discriminator(generated_images, training=True)
    
    gen_loss = generator_loss(fake_output)
    disc_loss = discriminator_loss(real_output, fake_output)
    
    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    gen_opt.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    disc_opt.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))
  return gen_loss,disc_loss, generator, discriminator, gen_opt,disc_opt

In [11]:
def train(dataset, gen, disc, gen_opt,disc_opt,e,file_name_prefix = "default"):
  fixed_seed = np.random.normal(0, 1, (PREVIEW_ROWS * PREVIEW_COLS, SEED_SIZE))
  gen_loss_list = [] 
  disc_loss_list = []

  for image_batch in dataset:
      t = train_step(image_batch,gen,disc,gen_opt, disc_opt)
      gen_loss_list.append(t[0])
      disc_loss_list.append(t[1])
      gen=t[2]
      disc=t[3]
      gen_opt = t[4]
      disc_opt = t[5]

  g_loss = sum(gen_loss_list) / len(gen_loss_list)
  d_loss = sum(disc_loss_list) / len(disc_loss_list)

  save_images(e,fixed_seed,file_name_prefix)


  return g_loss, gen, disc

In [None]:
  Temp = EPOCHS *.75
  dec = Temp / EPOCHS
  print("TEMP  DEC", Temp, dec)

  for e in range(EPOCHS):

    generator_optimizer = tf.keras.optimizers.Adam(1.5e-6,0.5)
    discriminator_optimizer = tf.keras.optimizers.Adam(1.5e-6,0.5)
    try:
      print("load and compile")
      gen = load_model(os.path.join(DATA_PATH,"gen1.h5"),)
      gen.compile(optimizer=generator_optimizer, loss = cross_entropy)
      disc =load_model(os.path.join(DATA_PATH,"disc1.h5"))
      disc.compile(optimizer=discriminator_optimizer, loss = cross_entropy)
    except:
      gen = generator
      disc = discriminator
      ## If no model currently exists in a save state, create an initial model by performing a single train operation
      l1, gen2, disc2 =train(train_dataset, gen, disc, generator_optimizer,discriminator_optimizer,e,"1")
      print(f"EPOCH {e}  LOSS: {l1}")
      gen2.save(os.path.join(DATA_PATH,"gen1.h5"))
      disc2.save(os.path.join(DATA_PATH,"disc1.h5"))
     
    
    ### Execute each of the training iterations using the loaded model as a baseline
    ### Save the result into a new temporary model

    l1, gen2, disc2 =train(train_dataset, gen, disc, generator_optimizer,discriminator_optimizer,e,"1")
    print(f"EPOCH {e}  LOSS: {l1}")
    try:
      gen2.save(os.path.join(DATA_PATH,"gen2.h5"))
      disc2.save(os.path.join(DATA_PATH,"disc2.h5"))
    except:
      pass
    min = l1
    min_id = "l1"

    ### Update Optimizer for second instance
    generator_optimizer = tf.keras.optimizers.SGD(1.5e-3,0.005, False)
    discriminator_optimizer = tf.keras.optimizers.SGD(1.5e-3,0.005, False)
    l2, gen3, disc3 =train(train_dataset, gen, disc, generator_optimizer,discriminator_optimizer,e,"2") 
    print(f"EPOCH {e}  LOSS: {l2}")
    try:
      gen3.save(os.path.join(DATA_PATH,"gen2.h5"))
      disc3.save(os.path.join(DATA_PATH,"disc2.h5"))
    except:
      pass
    if l2 < min:
      min = l2
      min_id = "l2"


    ### Update Optimizer for third instance
    generator_optimizer = tf.keras.optimizers.Nadam(1.5e-3,0.25)
    discriminator_optimizer = tf.keras.optimizers.Nadam(1.5e-3,0.25)
    l3, gen4, disc4 =train(train_dataset, gen, disc, generator_optimizer,discriminator_optimizer,e,"3") 
    print(f"EPOCH {e}  LOSS: {l3}")
    try:
      gen4.save(os.path.join(DATA_PATH,"gen4.h5"))
    
      disc4.save(os.path.join(DATA_PATH,"disc4.h5"))
    except:
      pass
    if l3 < min:
      min = l3
      min_id = "l3"


    ### Decrase Temprature value
    ### Choose a random number between 0 and the number of EPOCHS
    ### If the random value is less then the Temp, allow for a random choice
    ### Otherwise pick the model with the lowest loss as the baseline for next Epoch
    Temp = Temp - dec
    rand = random.randint(0, EPOCHS)
  
    if rand < Temp:
      rand2 = random.randint(0,3)
      print("RAND 2" , rand2)
      if rand2 ==0:
        print("Random choice : first option")
        try:
          gen2.save(os.path.join(DATA_PATH,"gen1.h5"))
          disc2.save(os.path.join(DATA_PATH,"disc1.h5"))
        except:
          pass
      elif rand2 == 1:
        print("Random choice : second option")
        try:

          gen3.save(os.path.join(DATA_PATH,"gen1.h5"))
          disc3.save(os.path.join(DATA_PATH,"disc1.h5"))
        except:
          pass
      else:
        print("Random choice : third option ")
        try:
          gen4.save(os.path.join(DATA_PATH,"gen1.h5"))
          disc4.save(os.path.join(DATA_PATH,"disc1.h5"))
        except:
          pass
    else:
      if min_id =="l3":
        print("Choosing best : third option")
        try:
          gen4.save(os.path.join(DATA_PATH,"gen1.h5"))
          disc4.save(os.path.join(DATA_PATH,"disc1.h5"))
        except:
          pass
      elif min_id == "l2":
        print("Choosing best : second option")
        try:
          gen3.save(os.path.join(DATA_PATH,"gen1.h5"))
          disc3.save(os.path.join(DATA_PATH,"disc1.h5"))
        except:
          pass
      else:
        print("Choosing best : first option")
        
        try:
          gen2.save(os.path.join(DATA_PATH,"gen1.h5"))
          disc2.save(os.path.join(DATA_PATH,"disc1.h5"))
        except:
          pass

TEMP  DEC 750.0 0.75
load and compile
EPOCH 0  LOSS: 0.9266050457954407
EPOCH 0  LOSS: 0.973770022392273
EPOCH 0  LOSS: 2.0476632118225098
EPOCH 0  LOSS: 6.915613651275635
RAND 2 1
Random choice : second option
load and compile
EPOCH 1  LOSS: 0.5372833013534546
EPOCH 1  LOSS: 2.7757039070129395
EPOCH 1  LOSS: 4.132080078125
Choosing best : first option
load and compile
EPOCH 2  LOSS: 1.4716503620147705
EPOCH 2  LOSS: 1.9854199886322021
EPOCH 2  LOSS: 4.155323505401611
RAND 2 3
Random choice : third option 
load and compile
EPOCH 3  LOSS: 1.8889292478561401
EPOCH 3  LOSS: 3.878035306930542
EPOCH 3  LOSS: 4.040680885314941
RAND 2 2
Random choice : third option 
load and compile
EPOCH 4  LOSS: 1.765494704246521
EPOCH 4  LOSS: 4.227635383605957
EPOCH 4  LOSS: 3.7876081466674805
RAND 2 3
Random choice : third option 
load and compile
EPOCH 5  LOSS: 4.5471649169921875
EPOCH 5  LOSS: 4.387053966522217
EPOCH 5  LOSS: 2.603386163711548
RAND 2 3
Random choice : third option 
load and compile
EPO

In [None]:
generator.save(os.path.join(DATA_PATH,"final_generator.h5"))