# GAN LAB
By: Ali Panahi

Inspired by [GAN](https://github.com/eriklindernoren/Keras-GAN/blob/master/gan/gan.py),
[DCGAN](https://github.com/eriklindernoren/Keras-GAN/blob/master/dcgan/dcgan.py), [CycleGAN](https://github.com/eriklindernoren/Keras-GAN/blob/master/cyclegan/cyclegan.py)
[DCGAN by fchaollet](https://colab.research.google.com/github/fchollet/deep-learning-with-python-notebooks/blob/master/8.5-introduction-to-gans.ipynb#scrollTo=WLCF7I8opN7i), [DCGAN from TensorFlow with eager](https://github.com/tensorflow/tensorflow/blob/r1.11/tensorflow/contrib/eager/python/examples/generative_examples/dcgan.ipynb)


# Document

## How to use this Colab notebook


- Go to File > Save a copy in Drive
- Runtime > Change Runtime type > GPU (Free Tesla K80 GPU)
- Go to Prepare > User Defined Variables and set `GOOGLE_DRIVE_ENABLED = FALSE` if you don't need a persistant storage
- Run All Cell
- Go to TensorBoard > Click on the TensorBoard link to see how well your model is working
- Change Parameters or architechture and Run again, You can compare the performance in TensorBoard
- For **debugging** the code you can add `import pdb; pdb.set_trace()`  to any line of the code as a *breakpoint* and then run the code. below you can see a list of some of the commands that you can use while you're debugging your code. [Tutorial article on PDB](https://www.machinelearningplus.com/python/python-debugging/) , [Tutorial Video on PDB](https://www.youtube.com/watch?v=VQjCx3P89yk)
- For **debugging** with GUI you can add `import web_pdb; web_pdb.set_trace()` to any line of the code as a *breakpoint* and then run the code. Then visit the link provided bellow in Debugging section . 




### PDB list of debugging commands
| Command         |  Description                                                |
|-----------------|-------------------------------------------------------------|
| ``list``        | Show the current location in the file                       |
| ``h(elp)``      | Show a list of commands, or find help on a specific command |
| ``q(uit)``      | Quit the debugger and the program                           |
| ``c(ontinue)``  | Quit the debugger, continue in the program                  |
| ``n(ext)``      | Go to the next step of the program                          |
| ``<enter>``     | Repeat the previous command                                 |
| ``p(rint)``     | Print variables                                             |
| ``s(tep)``      | Step into a subroutine                                      |
| ``r(eturn)``    | Return out of a subroutine                                  |


## How the original GAN works


![Gan Architecture](https://ishmaelbelghazi.github.io/ALI/assets/gan_simple.svg)
Image Source [ALI](https://ishmaelbelghazi.github.io/ALI)

- for each epoch:
  1.   Generator creates fake images (generator.predict)
  2.   Discriminator learns how to distinguish these fake images with real images (discriminator.train_on_batch)
  3.   Generator learns how to create better fake images using Discriminator guides ${\frac{p_{data}(x)}{  p_{model}(x)}}$ (combo.train_on_batch)

So generator generates images that discrimintator cannot distinguish => combo low score!


### Theoretical perfect final state
-   Have 100% accuracy for the generator - meaning the discriminator classifies all synthetic observations as real;
-   Have about 50% accuracy for the discriminator - meaning it cannot distingiush fake observations from real ones;
-   The synthetic observations are of good quality.

## How CycleGAN Works

### CycleGAN Architecture 
![CycleGan Architecture](https://camo.githubusercontent.com/c653ddc55471557b851a7059540e80799fad7e29/687474703a2f2f6572696b6c696e6465726e6f72656e2e73652f696d616765732f6379636c6567616e2e706e67)
Image Source: [KerasGAN](https://github.com/eriklindernoren/Keras-GAN)

![CycleGAN Ali](https://i.imgur.com/5eMUAo2.png)
Image By Ali Panahi
### CycleGAN Generator/Discriminitator Networks
What you should know about the generator and the discriminator networks:
**Discriminator**
-  [PatchGAN](https://arxiv.org/pdf/1611.07004.pdf)
  - Only penalizes structure at the scale of image patches
  - In order to model high-frequencies, it is sufficient to restrict our attention to the structure in local image patches.
  - PatchGAN's Discriminator tries to classify if each N × N patch in an image is real or fake. We run this discriminator convolutationally across the image, averaging all responses to provide the ultimate output of D. (like  texture/style loss)
  - Using the Patch GAN Approach we can train and generate high resolution images

**Generator**
- Original CycleGAN generator: [Perceptual Losses](Perceptual Losses for Real-Time Style Transfer
and Super-Resolution)
- New CycleGAN generator: [U-net]()
  - Using U-net as a generator has been a big improvement for forwarding low level features through the network and partially reconstructing it at the output.
- [Layer Normalization](https://arxiv.org/abs/1607.06450) /  [Instance Normalizatin Layer](https://arxiv.org/abs/1607.08022)
  - Normalize the activations of the previous layer at each step, i.e. applies a transformation that maintains the mean activation close to 0 and the activation standard deviation close to 1.
- [CycleGAN Identity Loss](https://arxiv.org/pdf/1611.02200.pdf)
  - Regularize the generator to be near an identity mapping when real samples of the target domain are provided as the input to the gen- erator

![U-net](http://deeplearning.net/tutorial/_images/unet.jpg)

Image source: [U-net](http://deeplearning.net/tutorial/_images/unet.jpg)

**Lambda**

We used these lambdas for the importance of each of the losses in the final loss function:

```g_loss = 1 * adversarial losses + 10 * Cycle-consistency losses + 1 * Identity losses```

**Model names in Keras output**

In tensorboard you will see loss and accuracies named by models name of ```model_1, mode_2, model_3, model_4```. These are belonging to: ```Adversarial_D_A, Adversarial_D_B, recon_BA, recon_AB, id_BA, id_AB```

## Different Normalization Methods

  -  [Batch normalization layer](https://arxiv.org/pdf/1502.03167.pdf)
  -  [Layer Normalization](https://arxiv.org/abs/1607.06450)
  -  [Instance Normalizatin Layer](https://arxiv.org/abs/1607.08022)
  - [Pixel Normalization](https://arxiv.org/pdf/1710.10196.pdf) (Nvidia's ProgresssiveGAN - Style Based GAN)

# Code

## Preapare

### Installing Dependencies

In [0]:
def install_default_packages():
  try:
     import keras_contrib
  except:
    !pip install git+https://www.github.com/keras-team/keras-contrib.git -q
    
  !pip install imageio -q
  !pip install scikit-image -q

def clean_content_folder():
  # Clean the folder
  import os
  if os.path.exists('/content/sample_data'):
    !rm -rf 'sample_data'

try: dependencies_cell_executed
except NameError:
  install_default_packages()
  clean_content_folder()
dependencies_cell_executed = True

### Import

In [2]:
import tensorflow as tf
from tensorflow.python import debug as tf_debug
import keras

from keras_contrib.layers.normalization import InstanceNormalization
from keras.datasets import mnist
from keras.layers import Input, Dense, Reshape, Flatten, Dropout, Concatenate
from keras.layers import BatchNormalization, Activation, ZeroPadding2D
from keras.layers import LeakyReLU
from keras.layers import UpSampling2D, Conv2D
from keras.models import Sequential, Model
from keras.optimizers import Adam
from keras.callbacks import TensorBoard
import matplotlib.pyplot as plt

import sys
import os
import time
import datetime
import numpy as np
from distutils.dir_util import copy_tree

Using TensorFlow backend.


### Persistant Storage - Google Drive

In [0]:
# Add Google Drive as a persistant storage
def mount_Google_Drive(GDRIVE_PROJECT_PATH):
  from google.colab import drive
  import tensorflow as tf
  if not tf.gfile.Exists('/content/GDrive/'):
    tf.gfile.MakeDirs('/content/GDrive/')
    drive.mount('/content/GDrive/')
  return '/content/GDrive/My Drive/' + GDRIVE_PROJECT_PATH

### TensorBoard

In [0]:
# Install Dependencies
def tensorboard_installer():
  tensorboardcolab_version = !pip show tensorboardcolab | grep Version
  if tensorboardcolab_version != ['Version: 0.0.21.1b0']:
    !pip install git+https://github.com/panaali/tensorboardcolab -q

# Run Tensorboard
def tensorboard_runner(Tensorboard_Path, Enable_Tensorboard_Debugger):
  from tensorboardcolab import TensorBoardColab
  if Enable_Tensorboard_Debugger is True:
    tbc = TensorBoardColab(graph_path = Tensorboard_Path, debugger_port=6064)
  else:
    tbc = TensorBoardColab(graph_path = Tensorboard_Path)
  return tbc

### Debugging with Web-PDB and PDB

The pdb 
To install and run [web-pdb](https://github.com/romanvm/python-web-pdb) set the option in the configuration section and click on the 

- To add a *breakpoint*  usin web-pdb add `import web_pdb; web_pdb.set_trace()` in any line of the code you like.

- To add a *breakpoint*  usin pdb add `import pdb; pdb.set_trace()` in any line of the code you like.


In [0]:
def web_pdb_installer():
  # setup web-pdb
  !pip install web-pdb
  !apt-get install autossh

In [0]:
def web_pdb_runner(web_pdb_connected = False):
  import time
  if not web_pdb_connected:
    web_pdb_connected = True
    !nohup autossh -M 0 -o StrictHostKeyChecking=no -R 80:localhost:5555 serveo.net o > web_pdb.txt 2>&1 < /dev/null &
    time.sleep(3)

## Architectures
  


### AbstractGAN

Here you can find an abstract class for GANs which holds some commond functions:

In [0]:
class AbstractGAN:
  def __init__(self, run_image_path, run_tensorboard_path, dataset_name, TensorBoardDebugWrapperSession = None, GDrive_run_image_path = '', GDrive_run_tensorboard_path = ''):
    self.run_image_path = run_image_path
    self.run_tensorboard_path = run_tensorboard_path
    self.dataset_name = dataset_name
    self.GDrive_run_image_path = GDrive_run_image_path
    self.GDrive_run_tensorboard_path = GDrive_run_tensorboard_path
    if TensorBoardDebugWrapperSession is not None:
      keras.backend.set_session(tf_debug.TensorBoardDebugWrapperSession(tf.Session(), TensorBoardDebugWrapperSession, send_traceback_and_source_code=False))
    self.tensorboard_folder_run = run_tensorboard_path
    np.random.seed(seed=0)
    self.start()
    
  def start(self):
    pass
  
  # Transform train_on_batch return value
  # to dict expected by on_batch_end or on_epoch_end callback
  def named_logs(self, model, logs):
    result = {}
    for metric_name, log_value in zip(model.metrics_names, logs):
      result[metric_name] = log_value
    result['size'] = 1
    return result
  
  def init_tensorboard(self, model, model_name):
    tensorboard = TensorBoard(
      log_dir= f'{self.run_tensorboard_path}/{model_name}',
      histogram_freq=0,
      batch_size=self.batch_size,
      write_graph=True,
      write_grads=True,
      update_freq=1
    )
    tensorboard.set_model(model)
    return tensorboard
  
  def copy_to_gdrive(self):
    if self.GDrive_run_image_path is not '':
      copy_tree(self.run_image_path, self.GDrive_run_image_path)
      copy_tree(self.run_tensorboard_path, self.GDrive_run_tensorboard_path)

### OriginalGAN

In [0]:
class OriginalGAN(AbstractGAN):
  def start(self):
    self.img_rows = 28
    self.img_cols = 28
    self.channels = 1
    self.img_shape = (self.img_rows, self.img_cols, self.channels)
    self.latent_dim = 100

    g_optimizer = Adam(lr=0.001, beta_1=0.5, beta_2=0.999, epsilon=None, decay=0.001, amsgrad=False)
    d_optimizer = Adam(lr=0.001, beta_1=0.5, beta_2=0.999, epsilon=None, decay=0.001, amsgrad=False)

    # Build and compile the discriminator
    self.discriminator = self.build_discriminator()    
    self.discriminator.compile(loss='binary_crossentropy',
      optimizer=d_optimizer,
      metrics=['accuracy'])
    
    # Build the generator
    self.generator = self.build_generator()

    # For the combined model we will only train the generator
    discriminator_frozen = Model(inputs=self.discriminator.inputs,
                                         outputs=self.discriminator.outputs)
    discriminator_frozen.trainable = False
    
    # The combined model  (stacked generator and discriminator)
    # Trains the generator to fool the discriminator
    combined_output = discriminator_frozen(self.generator.outputs) 
    self.combined = Model(inputs=self.generator.inputs, outputs=combined_output)
      
    self.combined.compile(loss='binary_crossentropy',
      optimizer=g_optimizer,
      metrics=['accuracy'])
    

  def build_generator(self):
    g_input_noise = Input(shape=(self.latent_dim,), name='g_input_noise')
    x = Dense(256, input_dim=self.latent_dim)(g_input_noise)
    x = LeakyReLU(alpha=0.2)(x)
    x = BatchNormalization(momentum=0.8)(x)
    x = Dense(512)(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = BatchNormalization(momentum=0.8)(x)
    x = Dense(1024)(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = BatchNormalization(momentum=0.8)(x)
    x = Dense(np.prod(self.img_shape), activation='tanh')(x)
    g_output_img = Reshape(self.img_shape)(x)
    
    return Model(inputs=g_input_noise, outputs=g_output_img)

  def build_discriminator(self):
    d_input_img = Input(shape=self.img_shape, name='d_input_img')
    x = Flatten(input_shape=self.img_shape)(d_input_img)
    x = Dense(512)(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = Dense(256)(x)
    x = LeakyReLU(alpha=0.2)(x)
    d_output_validity = Dense(1, activation='sigmoid')(x)
    
    return Model(inputs=d_input_img, outputs=d_output_validity)

  def train(self, epochs, batch_size=128, sample_interval=50):
    if False:
      print('Generator Model:')
      self.discriminator.summary()
      print('Discriminator Model:')
      self.combined.summary()

    # Load the dataset
    (X_train, _), (_, _) = mnist.load_data()
    
    # Rescale -1 to 1
    X_train = X_train / 127.5 - 1.
    X_train = np.expand_dims(X_train, axis=3)

    # Adversarial ground truths
    valid = np.ones((batch_size, 1))
    fake = np.zeros((batch_size, 1))

    # Setup tensorboard
    # Create the TensorBoard callback,
    # which we will drive manually
    now = time.time()
    tensorboard_d = TensorBoard(
      log_dir= f'{self.run_tensorboard_path}/{now}/d',
      histogram_freq=0,
      batch_size=batch_size,
      write_graph=True,
      write_grads=True
    )
    
    tensorboard_g = TensorBoard(
      log_dir= f'{self.run_tensorboard_path}/{now}/g',
      histogram_freq=0,
      batch_size=batch_size,
      write_graph=True,
      write_grads=True
    )
    
    tensorboard_d.set_model(self.discriminator)
    tensorboard_g.set_model(self.combined)
    
    for epoch in range(epochs):

      # ---------------------
      #  Train Discriminator
      # ---------------------
      
      # Generate a batch of new images
      noise = np.random.normal(0, 1, (batch_size, self.latent_dim))
      gen_imgs = self.generator.predict(noise)
      
      # Select a random batch of images
      idx = np.random.randint(0, X_train.shape[0], batch_size)
      imgs = X_train[idx]
      

      # Train the discriminator
      d_loss_real = self.discriminator.train_on_batch(imgs, valid)
      d_loss_fake = self.discriminator.train_on_batch(gen_imgs, fake)
      d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
      
      # ---------------------
      #  Train Generator
      # ---------------------
      
      noise = np.random.normal(0, 1, (batch_size, self.latent_dim))

      # Train the generator (to have the discriminator label samples as valid)
      g_loss = self.combined.train_on_batch(noise, valid)
      
      # Save the progress
      # print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f, acc.: %.2f%]" % (epoch, d_loss[0], 100*d_loss[1], g_loss[0], g_loss[1]))
      tensorboard_d.on_epoch_end(epoch, self.named_logs(self.discriminator, d_loss))
      tensorboard_g.on_epoch_end(epoch, self.named_logs(self.combined, g_loss))
      
      # If at save interval => save generated image samples
      if epoch % sample_interval == 0:
        print(f'epoch: {epoch}')
        self.sample_images(epoch)
      
    tensorboard_d.on_train_end(None)
    tensorboard_g.on_train_end(None)


  def sample_images(self, epoch):
    r, c = 5, 5
    noise = np.random.normal(0, 1, (r * c, self.latent_dim))
    gen_imgs = self.generator.predict(noise)

    # Rescale images 0 - 1
    gen_imgs = 0.5 * gen_imgs + 0.5

    fig, axs = plt.subplots(r, c)
    cnt = 0
    for i in range(r):
      for j in range(c):
        axs[i,j].imshow(gen_imgs[cnt, :,:,0], cmap='gray')
        axs[i,j].axis('off')
        cnt += 1
    fig.savefig(self.image_path + f'{epoch}.png')
    plt.close()
  
  # Transform train_on_batch return value
  # to dict expected by on_batch_end or on_epoch_end callback
  def named_logs(self, model, logs):
    result = {}
    for metric_name, log_value in zip(model.metrics_names, logs):
      result[metric_name] = log_value
    return result

### DCGAN

In [0]:
class DCGAN(AbstractGAN):
  def start(self):
    self.img_rows = 28
    self.img_cols = 28
    self.channels = 1
    self.img_shape = (self.img_rows, self.img_cols, self.channels)
    self.latent_dim = 100

    g_optimizer = Adam(lr=0.0002, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0, amsgrad=True)
    d_optimizer = Adam(lr=0.0002, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0, amsgrad=False)

    # Build and compile the discriminator
    self.discriminator = self.build_discriminator()    
    self.discriminator.compile(loss='binary_crossentropy',
      optimizer=d_optimizer,
      metrics=['accuracy'])
    
    # Build the generator
    self.generator = self.build_generator()

    # For the combined model we will only train the generator
    discriminator_frozen = Model(inputs=self.discriminator.inputs,
                                         outputs=self.discriminator.outputs)
    discriminator_frozen.trainable = False
    
    # The combined model  (stacked generator and discriminator)
    # Trains the generator to fool the discriminator
    combined_output = discriminator_frozen(self.generator.outputs) 
    self.combined = Model(inputs=self.generator.inputs, outputs=combined_output)
      
    self.combined.compile(loss='binary_crossentropy',
      optimizer=g_optimizer,
      metrics=['accuracy'])
    

  def build_generator(self):
    model = Sequential()

    model.add(Dense(128 * 7 * 7, activation="relu", input_dim=self.latent_dim))
    model.add(Reshape((7, 7, 128)))
    model.add(UpSampling2D())
    model.add(Conv2D(128, kernel_size=3, padding="same"))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Activation("relu"))
    model.add(UpSampling2D())
    model.add(Conv2D(64, kernel_size=3, padding="same"))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Activation("relu"))
    model.add(Conv2D(self.channels, kernel_size=3, padding="same"))
    model.add(Activation("tanh"))

    noise = Input(shape=(self.latent_dim,))
    img = model(noise)

    return Model(noise, img)

  def build_discriminator(self):
    model = Sequential()

    model.add(Conv2D(32, kernel_size=3, strides=2, input_shape=self.img_shape, padding="same"))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.25))
    model.add(Conv2D(64, kernel_size=3, strides=2, padding="same"))
    model.add(ZeroPadding2D(padding=((0,1),(0,1))))
    model.add(BatchNormalization(momentum=0.8))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.25))
    model.add(Conv2D(128, kernel_size=3, strides=2, padding="same"))
    model.add(BatchNormalization(momentum=0.8))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.25))
    model.add(Conv2D(256, kernel_size=3, strides=1, padding="same"))
    model.add(BatchNormalization(momentum=0.8))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Dropout(0.25))
    model.add(Flatten())
    model.add(Dense(1, activation='sigmoid'))

    img = Input(shape=self.img_shape)
    validity = model(img)

    return Model(img, validity)

  def train(self, epochs, batch_size=128, sample_interval=50):
    if False:
      print('Generator Model:')
      self.discriminator.summary()
      print('Discriminator Model:')
      self.combined.summary()

    # Load the dataset
    (X_train, _), (_, _) = mnist.load_data()
    
    # Rescale -1 to 1
    X_train = X_train / 127.5 - 1.
    X_train = np.expand_dims(X_train, axis=3)

    # Adversarial ground truths
    valid = np.ones((batch_size, 1))
    fake = np.zeros((batch_size, 1))

    # Setup tensorboard
    # Create the TensorBoard callback,
    # which we will drive manually
    tensorboard_d = TensorBoard(
      log_dir= f'{self.run_tensorboard_path}/D',
      histogram_freq=0,
      batch_size=batch_size,
      write_graph=True,
      write_grads=True
    )
    
    tensorboard_g = TensorBoard(
      log_dir= f'{self.run_tensorboard_path}/G',
      histogram_freq=0,
      batch_size=batch_size,
      write_graph=True,
      write_grads=True
    )
    
    tensorboard_d.set_model(self.discriminator)
    tensorboard_g.set_model(self.combined)
    
    for epoch in range(epochs):

      # ---------------------
      #  Train Discriminator
      # ---------------------
      
      # Generate a batch of new images
      noise = np.random.normal(0, 1, (batch_size, self.latent_dim))
      gen_imgs = self.generator.predict(noise)
      
      # Select a random batch of images
      idx = np.random.randint(0, X_train.shape[0], batch_size)
      imgs = X_train[idx]
      

      # Train the discriminator
      d_loss_real = self.discriminator.train_on_batch(imgs, valid)
      d_loss_fake = self.discriminator.train_on_batch(gen_imgs, fake)
      d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
      
      # ---------------------
      #  Train Generator
      # ---------------------
      
      noise = np.random.normal(0, 1, (batch_size, self.latent_dim))

      # Train the generator (to have the discriminator label samples as valid)
      g_loss = self.combined.train_on_batch(noise, valid)
      
      # Save the progress
      # print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f, acc.: %.2f%]" % (epoch, d_loss[0], 100*d_loss[1], g_loss[0], g_loss[1]))
      tensorboard_d.on_epoch_end(epoch, self.named_logs(self.discriminator, d_loss))
      tensorboard_g.on_epoch_end(epoch, self.named_logs(self.combined, g_loss))
      
      # If at save interval => save generated image samples
      if epoch % sample_interval == 0:
        print(f'epoch: {epoch}')
        self.sample_images(epoch)
      
    tensorboard_d.on_train_end(None)
    tensorboard_g.on_train_end(None)


  def sample_images(self, epoch):
    r, c = 5, 5
    noise = np.random.normal(0, 1, (r * c, self.latent_dim))
    gen_imgs = self.generator.predict(noise)

    # Rescale images 0 - 1
    gen_imgs = 0.5 * gen_imgs + 0.5

    fig, axs = plt.subplots(r, c)
    cnt = 0
    for i in range(r):
      for j in range(c):
        axs[i,j].imshow(gen_imgs[cnt, :,:,0], cmap='gray')
        axs[i,j].axis('off')
        cnt += 1
    fig.savefig(self.image_path + f'{epoch}.png')
    plt.close()
  
  # Transform train_on_batch return value
  # to dict expected by on_batch_end or on_epoch_end callback
  def named_logs(self, model, logs):
    result = {}
    for metric_name, log_value in zip(model.metrics_names, logs):
      result[metric_name] = log_value
    return result

### CycleGAN

In [0]:
class CycleGAN(AbstractGAN):
  def start(self):
    # TODO: Read dimensions from the files.
    self.img_rows = 128
    self.img_cols = 128
    self.channels = 3
    self.img_shape = (self.img_rows, self.img_cols, self.channels)
    self.print_summary = True

    optimizer_d = Adam(lr=0.0002, beta_1=0.5, beta_2=0.999, epsilon=None, decay=0, amsgrad=False)
    optimizer_g = Adam(lr=0.0002, beta_1=0.5, beta_2=0.999, epsilon=None, decay=0, amsgrad=False)
    
    # Number of filters in the first layer of G and D
    self.gf = 32
    self.df = 64
    
    # Loss weights
    self.lambda_cycle = 10.0                    # Cycle-consistency loss
    self.lambda_id = 0.1 * self.lambda_cycle    # Identity loss
    
    # Configure data loader
    self.data_loader = DataLoader(dataset_name=self.dataset_name,
                                  img_res=(self.img_rows, self.img_cols))
    
    
    # Calculate output shape of D (PatchGAN)
    patch = int(self.img_rows / 2**4)
    self.disc_patch = (patch, patch, 1)

    # Build and compile the discriminator
    self.d_A = self.build_discriminator(name = 'd_A')
    self.d_B = self.build_discriminator(name = 'd_B')
    self.d_A.compile(loss={'d_A_validity':'mse'},
        optimizer=optimizer_d,
        metrics=['accuracy'])
    self.d_B.compile(loss={'d_B_validity':'mse'},
        optimizer=optimizer_d,
        metrics=['accuracy'])
    
    #-------------------------
    # Construct Computational
    #   Graph of Generators
    #-------------------------

    # Build the generators
    self.g_AB = self.build_generator(name = 'g_AB')
    self.g_BA = self.build_generator(name = 'g_BA')

    # Input images from both domains
    img_A = Input(shape=self.img_shape)
    img_B = Input(shape=self.img_shape)

    # Translate images to the other domain
    fake_B = self.g_AB(img_A)
    fake_A = self.g_BA(img_B)
    # Translate images back to original domain
    reconstr_A = self.g_BA(fake_B)
    reconstr_B = self.g_AB(fake_A)
    # Identity mapping of images
    img_A_id = self.g_BA(img_A)
    img_B_id = self.g_AB(img_B)

    # For the combined model we will only train the generators
    # For preveting the warning error we should create new model and freeze that one
    self.d_A.trainable = False
    self.d_B.trainable = False

    # Discriminators determines validity of translated images
    valid_A = self.d_A(fake_A)
    valid_B = self.d_B(fake_B)

    # Combined model trains generators to fool discriminators
    self.combined = Model(inputs=[img_A, img_B],
                outputs=[ valid_A, valid_B,
                    reconstr_A, reconstr_B,
                    img_A_id, img_B_id ], name='combined')
    self.combined.compile(loss=['mse', 'mse',
                                'mae', 'mae',
                                'mae', 'mae'],
                            loss_weights=[  1, 1,
                                    self.lambda_cycle, self.lambda_cycle,
                                    self.lambda_id, self.lambda_id ],
                            optimizer=optimizer_g,
                            metrics=['accuracy']
                         )
     

      
  def build_generator(self, name = ''):
    """U-Net Generator"""

    def conv2d(layer_input, filters, f_size=4):
      """Layers used during downsampling"""
      d = Conv2D(filters, kernel_size=f_size, strides=2, padding='same')(layer_input)
      d = LeakyReLU(alpha=0.2)(d)
      d = InstanceNormalization()(d)
      return d

    def deconv2d(layer_input, skip_input, filters, f_size=4, dropout_rate=0):
      """Layers used during upsampling"""
      u = UpSampling2D(size=2)(layer_input)
      u = Conv2D(filters, kernel_size=f_size, strides=1, padding='same', activation='relu')(u)
      if dropout_rate:
        u = Dropout(dropout_rate)(u)
      u = InstanceNormalization()(u)
      u = Concatenate()([u, skip_input])
      return u

    # Image input
    d0 = Input(shape=self.img_shape)

    # Downsampling
    d1 = conv2d(d0, self.gf)
    d2 = conv2d(d1, self.gf*2)
    d3 = conv2d(d2, self.gf*4)
    d4 = conv2d(d3, self.gf*8)

    # Upsampling
    u1 = deconv2d(d4, d3, self.gf*4)
    u2 = deconv2d(u1, d2, self.gf*2)
    u3 = deconv2d(u2, d1, self.gf)

    u4 = UpSampling2D(size=2)(u3)
    output_img = Conv2D(self.channels, kernel_size=4, strides=1, padding='same', activation='tanh', name= name + '_output_img')(u4)

    return Model(d0, output_img, name=name)

  def build_discriminator(self, name = ''):

    def d_layer(layer_input, filters, f_size=4, normalization=True):
      """Discriminator layer"""
      d = Conv2D(filters, kernel_size=f_size, strides=2, padding='same')(layer_input)
      d = LeakyReLU(alpha=0.2)(d)
      if normalization:
        d = InstanceNormalization()(d)
      return d

    img = Input(shape=self.img_shape)

    d1 = d_layer(img, self.df, normalization=False)
    d2 = d_layer(d1, self.df*2)
    d3 = d_layer(d2, self.df*4)
    d4 = d_layer(d3, self.df*8)

    validity = Conv2D(1, kernel_size=4, strides=1, padding='same', name=name + '_validity')(d4)

    return Model(img, validity, name=name)

  def train(self, epochs, batch_size=1, sample_interval=50):
    self.batch_size = batch_size
    if self.print_summary:
      print('Discriminator Model:')
      self.d_A.summary()
      self.d_A.summary()
      print('Generators Model:')
      self.combined.summary()

    # Adversarial loss ground truths
    valid = np.ones((self.batch_size,) + self.disc_patch)
    fake = np.zeros((self.batch_size,) + self.disc_patch)

    # Setup tensorboard
    tensorboard_d_A = self.init_tensorboard(self.d_A, 'D_A')
    tensorboard_d_B = self.init_tensorboard(self.d_B, 'D_B')
    tensorboard_g = self.init_tensorboard(self.combined, 'G')
    
    for epoch in range(epochs):
      for batch_i, (imgs_A, imgs_B) in enumerate(self.data_loader.load_batch(self.batch_size)):
        # ----------------------
        #  Train Discriminators
        # ----------------------

        # Translate images to opposite domain
        fake_B = self.g_AB.predict(imgs_A)
        fake_A = self.g_BA.predict(imgs_B)

        # Train the discriminators (original images = real / translated = Fake)
        dA_loss_real = self.d_A.train_on_batch(imgs_A, valid)
        dA_loss_fake = self.d_A.train_on_batch(fake_A, fake)
        dA_loss = 0.5 * np.add(dA_loss_real, dA_loss_fake)

        dB_loss_real = self.d_B.train_on_batch(imgs_B, valid)
        dB_loss_fake = self.d_B.train_on_batch(fake_B, fake)
        dB_loss = 0.5 * np.add(dB_loss_real, dB_loss_fake)

        # Total disciminator loss
        d_loss = 0.5 * np.add(dA_loss, dB_loss)


        # ------------------
        #  Train Generators
        # ------------------

        # Train the generators
        g_loss = self.combined.train_on_batch([imgs_A, imgs_B],
                            [valid, valid,
                            imgs_A, imgs_B,
                            imgs_A, imgs_B])

        # Plot the progress
#         print ("[Epoch %d/%d] [Batch %d/%d] [D loss: %f, acc: %3d%%] [G loss: %05f, adv: %05f, recon: %05f, id: %05f] time: %s " \
#                                     % ( epoch, epochs,
#                                       batch_i, self.data_loader.n_batches,
#                                       d_loss[0], 100*d_loss[1],
#                                       g_loss[0],
#                                       np.mean(g_loss[1:3]),
#                                       np.mean(g_loss[3:5]),
#                                       np.mean(g_loss[5:6]),
#                                       elapsed_time))
##      g_loss = 1 * adversarial loss + 10 * Cycle-consistency loss + 1 * Identity loss

        tensorboard_d_A.on_batch_end(None, self.named_logs(self.d_A, dA_loss))
        tensorboard_d_B.on_batch_end(None, self.named_logs(self.d_B, dB_loss))
        tensorboard_g.on_batch_end(None, self.named_logs(self.combined, g_loss))
        # If at save interval => save generated image samples
        if batch_i % sample_interval == 0:
          print(f'epoch: {epoch}, batch: {batch_i}')
          self.sample_images(epoch, batch_i)
      print(f'epoch: {epoch} done.')
      self.copy_to_gdrive()
    tensorboard_d_A.on_train_end(None)
    tensorboard_d_B.on_train_end(None)
    tensorboard_g.on_train_end(None)

  def sample_images(self, epoch, batch_i):
    r, c = 2, 3

    imgs_A = self.data_loader.load_data(domain="A", batch_size=1, is_testing=True)
    imgs_B = self.data_loader.load_data(domain="B", batch_size=1, is_testing=True)

    # Demo (for GIF)
    #imgs_A = self.data_loader.load_img('datasets/apple2orange/testA/n07740461_1541.jpg')
    #imgs_B = self.data_loader.load_img('datasets/apple2orange/testB/n07749192_4241.jpg')

    # Translate images to the other domain
    fake_B = self.g_AB.predict(imgs_A)
    fake_A = self.g_BA.predict(imgs_B)
    # Translate back to original domain
    reconstr_A = self.g_BA.predict(fake_B)
    reconstr_B = self.g_AB.predict(fake_A)

    gen_imgs = np.concatenate([imgs_A, fake_B, reconstr_A, imgs_B, fake_A, reconstr_B])

    # Rescale images 0 - 1
    gen_imgs = 0.5 * gen_imgs + 0.5

    titles = ['Original', 'Translated', 'Reconstructed']
    fig, axs = plt.subplots(r, c)
    cnt = 0
    for i in range(r):
      for j in range(c):
        axs[i,j].imshow(gen_imgs[cnt])
        axs[i, j].set_title(titles[j])
        axs[i,j].axis('off')
        cnt += 1
    fig.savefig("%s/%d_%d.png" % (self.run_image_path, epoch, batch_i))
    plt.close()    

  
  

### Utilities

In [0]:
import imageio
from skimage.transform import resize
from glob import glob
import numpy as np
from tensorflow.python.keras.utils.data_utils import get_file

class DataLoader():
  def __init__(self, dataset_name, img_res=(128, 128), GAN_type = 'cycleGAN', download_url = ''):
    self.dataset_name = dataset_name
    self.download_url = download_url
    self.img_res = img_res
    self.GAN_type = GAN_type
    self.download_dataset()

  def download_dataset(self):
    
    if self.GAN_type == 'cycleGAN':
      url = f'https://people.eecs.berkeley.edu/~taesung_park/CycleGAN/datasets/{self.dataset_name}.zip'
    else:
      url = download_url
    
    print(f'Downloading the dataset "{self.dataset_name}"')
    
    path = get_file(self.dataset_name + '.zip',
                    origin= url,
                    cache_dir='/content/',
                    extract = True
                   )
    print(f'Dataset "{self.dataset_name}" downloaded and extracted in "{path}"') 
    
  
  def load_data(self, domain, batch_size=1, is_testing=False):
    data_type = "train%s" % domain if not is_testing else "test%s" % domain
    path = glob('./datasets/%s/%s/*' % (self.dataset_name, data_type))
    batch_images = np.random.choice(path, size=batch_size)

    imgs = []
    for img_path in batch_images:
      img = self.imread(img_path)
      if not is_testing:
        img = resize(img, self.img_res, mode = 'reflect')

        if np.random.random() > 0.5:
            img = np.fliplr(img)
      else:
        img = resize(img, self.img_res, mode = 'reflect')
      imgs.append(img)

    imgs = np.array(imgs)/127.5 - 1.

    return imgs

  def load_batch(self, batch_size=1, is_testing=False):
    data_type = "train" if not is_testing else "val"
    path_A = glob('./datasets/%s/%sA/*' % (self.dataset_name, data_type))
    path_B = glob('./datasets/%s/%sB/*' % (self.dataset_name, data_type))
    self.n_batches = int(min(len(path_A), len(path_B)) / batch_size)
    total_samples = self.n_batches * batch_size

    # Sample n_batches * batch_size from each path list so that model sees all
    # samples from both domains
    path_A = np.random.choice(path_A, total_samples, replace=False)
    path_B = np.random.choice(path_B, total_samples, replace=False)

    for i in range(self.n_batches-1):
      batch_A = path_A[i*batch_size:(i+1)*batch_size]
      batch_B = path_B[i*batch_size:(i+1)*batch_size]
      imgs_A, imgs_B = [], []
      for img_A, img_B in zip(batch_A, batch_B):
        img_A = self.imread(img_A)
        img_B = self.imread(img_B)

        img_A = resize(img_A, self.img_res, mode = 'reflect')
        img_B = resize(img_B, self.img_res, mode = 'reflect')

        if not is_testing and np.random.random() > 0.5:
                img_A = np.fliplr(img_A)
                img_B = np.fliplr(img_B)

        imgs_A.append(img_A)
        imgs_B.append(img_B)

      imgs_A = np.array(imgs_A)/127.5 - 1.
      imgs_B = np.array(imgs_B)/127.5 - 1.

      yield imgs_A, imgs_B

  def load_img(self, path):
    img = self.imread(path)
    img = resize(img, self.img_res, mode = 'reflect')
    img = img/127.5 - 1.
    return img[np.newaxis, :, :, :]

  def imread(self, path):
    return imageio.imread(path, pilmode='RGB').astype(np.float)

## RUN

#### Configs

In [12]:
#@title General Configurations  { run: "auto" }
if __name__ == '__main__':
  # User Defined Variables
  Enable_Google_Drive = False #@param {type:"boolean"}
  Enable_Tensorboard = True #@param {type:"boolean"}
  Enable_Tensorboard_Debugger = False #@param {type:"boolean"}
  Reset_Tensorboard = False #@param {type:"boolean"}
  Enable_Web_PDB = False #@param {type:"boolean"}
  Tensorboard_Path = './tensorboard/' #@param {type:"string"}
  Image_Path = './images/' #@param {type:"string"}
  #@markdown You need to create a folder in your Google Drive and set the folder address in here if you plan to use Google Drive
  GDrive_Project_Path = '/GANs Research/' #@param {type:"string"}
    
  #Flag Variables
  # Check to see if this cell is executed before
  try: configs_cell_executed
  except NameError:
    google_drive_mounted = False
    tensorboard_installed = False
    tensorboard_runned = False
    web_pdb_installed = False
    web_pdb_runned = False

  if Enable_Google_Drive is True and google_drive_mounted is False:
    GDrive_Project_Moundted_Path = mount_Google_Drive(GDrive_Project_Path)
    google_drive_mounted = True

  if Enable_Tensorboard is True and tensorboard_installed is False:
    tensorboard_installer()
    tensorboard_installed = True

  if Enable_Tensorboard is True and tensorboard_runned is True:
    print('Tensorboard: ' + tbc.tensorboard_link)
    
  if (Enable_Tensorboard is True and tensorboard_runned is False) or Reset_Tensorboard is True:
    tbc = tensorboard_runner(Tensorboard_Path, Enable_Tensorboard_Debugger)
    tensorboard_runned = True

  if Enable_Web_PDB is True and web_pdb_installed is False:
    web_pdb_installer()
    web_pdb_installed = True

  if Enable_Web_PDB is True and web_pdb_runned is False:
    web_pdb_runner()
    web_pdb_runned = True
  
  if Enable_Web_PDB is True:
    !cat web_pdb.txt


  configs_cell_executed = True

Wait for 8 seconds...
TensorBoard link:
http://396ff29c.ngrok.io


In [0]:
#@title GAN Configurations
if __name__ == '__main__':
  from distutils.dir_util import copy_tree
  import time
  import datetime
  import os
  from pytz import timezone
  
  start_time = time.time()
  #@markdown Which GAN network you want to use?
  GAN_type = "CycleGAN" #@param ["CycleGAN", "DCGAN", "OriginalGAN"]
  #@markdown Select the dataset that you want to use for CycleGAN.
  cycleGAN_dataset = "horse2zebra" #@param ["ae_photos", "apple2orange", "summer2winter_yosemite", "horse2zebra", "monet2photo", "cezanne2photo", "ukiyoe2photo", "vangogh2photo", "maps", "cityscapes", "facades", "iphone2dslr_flower"]
  #@markdown Select the dataset that you want to use for OriginalGAN or DCGAN
  OriginalGAN_DCGAN_dataset = "mnist" #@param ['boston_housing', 'cifar', 'cifar10', 'cifar100', 'fashion_mnist', 'imdb', 'mnist', 'reuters']
  epochs = 100 #@param {type:"integer"}
  batch_size = 1 #@param {type:"integer"}
  sample_interval = 50 #@param {type:"integer"}
  #@markdown ---
  #@markdown Check or Uncheck to Run:
  Run = False #@param {type:"boolean"}
  
  # RUN
  # Folder name
  folder_prefix = GAN_type
  folder_postfix = ''
  
  # add datetime to folder name
  now = datetime.datetime.now().astimezone(timezone('US/Eastern'))
  time_for_different_run = f'{now.year}-{now.month}-{now.day}-{now.hour}-{now.minute}-{now.second}'
  run_folder_name = folder_prefix + '-' + time_for_different_run + '-' + folder_postfix
  
  run_image_path = f'{Image_Path}/{run_folder_name}/'
  run_tensorboard_path = f'{Tensorboard_Path}/{run_folder_name}/'
  
  if Enable_Google_Drive is True:
    GDrive_run_image_path = f'{GDrive_Project_Moundted_Path}/{run_image_path}/'
    GDrive_run_tensorboard_path = f'{GDrive_Project_Moundted_Path}/{run_tensorboard_path}/'
    
  if not os.path.exists(run_image_path):
    os.makedirs(run_image_path)
   
  # Run the GAN Network
#   import web_pdb; web_pdb.set_trace()
  GAN_class = eval(GAN_type)
  if Enable_Tensorboard_Debugger is True:
    gan = GAN_class(run_image_path, run_tensorboard_path, cycleGAN_dataset,
                    TensorBoardDebugWrapperSession = "67f39d12e309:6064",
                    GDrive_run_image_path = GDrive_run_image_path,
                    GDrive_run_tensorboard_path = GDrive_run_tensorboard_path)
  else:
    gan = GAN_class(run_image_path, run_tensorboard_path, cycleGAN_dataset)
  gan.train(epochs, batch_size, sample_interval)
  
  # Done
  print("--- %s seconds ---" % (time.time() - start_time))
  

Downloading the dataset "horse2zebra"
Downloading data from https://people.eecs.berkeley.edu/~taesung_park/CycleGAN/datasets/horse2zebra.zip
Dataset "horse2zebra" downloaded and extracted in "/content/datasets/horse2zebra.zip"
Discriminator Model:
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         (None, 128, 128, 3)       0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 64, 64, 64)        3136      
_________________________________________________________________
leaky_re_lu_1 (LeakyReLU)    (None, 64, 64, 64)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 32, 32, 128)       131200    
_________________________________________________________________
leaky_re_lu_2 (LeakyReLU)    (None, 32, 32, 128)       0         
__________________________

  'Discrepancy between trainable weights and collected trainable'
  'Discrepancy between trainable weights and collected trainable'


epoch: 0, batch: 0
epoch: 0, batch: 50
epoch: 0, batch: 100
epoch: 0, batch: 150
epoch: 0, batch: 200
epoch: 0, batch: 250
epoch: 0, batch: 300
epoch: 0, batch: 350
epoch: 0, batch: 400
epoch: 0, batch: 450
epoch: 0, batch: 500
epoch: 0, batch: 550
epoch: 0, batch: 600
epoch: 0, batch: 650
epoch: 0, batch: 700
epoch: 0, batch: 750
epoch: 0, batch: 800
epoch: 0, batch: 850
epoch: 0, batch: 900
epoch: 0, batch: 950
epoch: 0, batch: 1000
epoch: 0, batch: 1050
epoch: 0 done.
epoch: 1, batch: 0


## To Do
- Cache Datasets in google drive and then copy to /content
- Test with Binary Cross Entropy and Hinge loss and compare the results
- Test with map dataset