#Mounting Drive to import Dataset

In [1]:
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

Mounted at /content/drive


#Unzipping the dataset

In [2]:
!unzip drive/My\ Drive/Monet/GAN.zip


Archive:  drive/My Drive/Monet/GAN.zip
   creating: GAN/
   creating: GAN/photo_tfrec/
  inflating: GAN/photo_tfrec/photo07-352.tfrec  
  inflating: GAN/photo_tfrec/photo09-352.tfrec  
  inflating: GAN/photo_tfrec/photo15-352.tfrec  
  inflating: GAN/photo_tfrec/photo02-352.tfrec  
  inflating: GAN/photo_tfrec/photo16-352.tfrec  
  inflating: GAN/photo_tfrec/photo19-350.tfrec  
  inflating: GAN/photo_tfrec/photo04-352.tfrec  
  inflating: GAN/photo_tfrec/photo10-352.tfrec  
  inflating: GAN/photo_tfrec/photo03-352.tfrec  
  inflating: GAN/photo_tfrec/photo13-352.tfrec  
  inflating: GAN/photo_tfrec/photo01-352.tfrec  
  inflating: GAN/photo_tfrec/photo06-352.tfrec  
  inflating: GAN/photo_tfrec/photo18-352.tfrec  
  inflating: GAN/photo_tfrec/photo00-352.tfrec  
  inflating: GAN/photo_tfrec/photo08-352.tfrec  
  inflating: GAN/photo_tfrec/photo17-352.tfrec  
  inflating: GAN/photo_tfrec/photo12-352.tfrec  
  inflating: GAN/photo_tfrec/photo14-352.tfrec  
  inflating: GAN/photo_tfrec/ph

#Importing required Modules and Libraries

In [3]:
import tensorflow as tf
from tensorflow import keras
import tensorflow_addons as tfa
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt



In [4]:
import os
import PIL
from IPython import display
import time
import cv2

In [6]:
print(tf.__version__)

2.3.0


In [11]:
AUTOTUNE = tf.data.experimental.AUTOTUNE

#Load The Dataset

The following code reads the images from directory 

In [7]:
MONET_FILENAMES = tf.io.gfile.glob('/content/GAN/monet_tfrec/*.tfrec')
print('Monet TFRecord Files:', len(MONET_FILENAMES))

PHOTO_FILENAMES = tf.io.gfile.glob('/content/GAN/photo_tfrec/*.tfrec')
print('Photo TFRecord Files:', len(PHOTO_FILENAMES))

Monet TFRecord Files: 5
Photo TFRecord Files: 20


#Transform the Images and form Dataset

In [8]:
IMAGE_SIZE = [256, 256]

#decodes jpeg to numpy array, reshapes to 256*256*3
def decode_image(image):
    image = tf.image.decode_jpeg(image, channels=3)
    image = (tf.cast(image, tf.float32) / 127.5) - 1
    image = tf.reshape(image, [*IMAGE_SIZE, 3])
    return image

#reads tfrecord in given format, extracts image from it
def read_tfrecord(example):
    tfrecord_format = {
        "image_name": tf.io.FixedLenFeature([], tf.string),
        "image": tf.io.FixedLenFeature([], tf.string),
        "target": tf.io.FixedLenFeature([], tf.string)
    }
    example = tf.io.parse_single_example(example, tfrecord_format)
    image = decode_image(example['image'])
    return image

In [9]:
#creates dataset from tfrecords
def load_dataset(filenames, labeled=True, ordered=False):
    dataset = tf.data.TFRecordDataset(filenames)
    dataset = dataset.map(read_tfrecord, num_parallel_calls=AUTOTUNE)
    return dataset

In [12]:
train_dataset = load_dataset(MONET_FILENAMES, labeled=True).batch(1)
photo_dataset = load_dataset(PHOTO_FILENAMES, labeled=True).batch(1)

#Make Generator Model

To create Cycle GAN, first define upsampling and downsampling layers.

In [13]:
def downsample(filters, size,stride, norm=True):
    gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)
    
    result = keras.Sequential()
    result.add(keras.layers.Conv2D(filters, size, strides= stride, padding='same',kernel_initializer='random_normal', use_bias=False))
    if norm:
        result.add(tfa.layers.InstanceNormalization(gamma_initializer=gamma_init))
    result.add(keras.layers.LeakyReLU())

    return result

In [14]:
def upsample(filters, size,stride,dropout=False):
    gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)
    
    result = keras.Sequential()
    result.add(keras.layers.Conv2DTranspose(filters, size, strides=stride,
                                      padding='same',
                                      kernel_initializer='random_normal',
                                      use_bias=False))

    result.add(tfa.layers.InstanceNormalization(gamma_initializer=gamma_init))

    if dropout:
        result.add(keras.layers.Dropout(0.5))

    result.add(keras.layers.ReLU())

    return result

In [15]:
def Generator():
    inputs = keras.layers.Input(shape=[256,256,3])

#parameters defined to reduce the size of o/p by 2 at each step
    down_stack = [
        downsample(64, 4, 2, norm=False), 
        downsample(64, 4, 2), 
        downsample(64, 4, 2), 
        downsample(128, 4, 2), 
        downsample(128, 4, 2), 
        downsample(256, 4, 2), 
        downsample(256, 4, 2), 
        downsample(512, 4, 2), 
    ]
#parameters defined to double the size of o/p at each step   
    up_stack = [
        upsample(512, 4, 2, dropout=True), 
        upsample(256, 4, 2, dropout=True), 
        upsample(256, 4, 2, dropout=True), 
        upsample(128, 4, 2), 
        upsample(128, 4, 2), 
        upsample(64, 4, 2), 
        upsample(64, 4, 2), ]
        
    
    last = keras.layers.Conv2DTranspose(3, 4,
                                  strides=2,
                                  padding='same',
                                  kernel_initializer='random_normal',
                                  activation='tanh') 
    x = inputs

    # Downsampling through the model
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)  

    skips = reversed(skips[:-1])

    # Upsampling and establishing the skip connections
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = keras.layers.Concatenate()([x, skip])

    x = last(x)

    return keras.Model(inputs=inputs, outputs=x)

#Building the Discriminator

the discriminator output a small image with one channel, instead of a boolen value. If the o/p image has high values, means the input is real, else fake.

In [16]:
def Discriminator():
    gamma_init = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)
    
    ip = keras.layers.Input(shape=[256, 256, 3], name='input_image')

    x = ip

    down1 = downsample(64, 4, 2, False)(x) 
    down2 = downsample(128, 4, 2)(down1) 
    down3 = downsample(256, 4, 2)(down2) 

    pad1 = keras.layers.ZeroPadding2D()(down3) 
    conv = keras.layers.Conv2D(512, 4, strides=1, kernel_initializer='random_normal', use_bias=False)(pad1) 

    norm1 = tfa.layers.InstanceNormalization(gamma_initializer=gamma_init)(conv)

    leaky_relu = keras.layers.LeakyReLU()(norm1)

    pad2 = keras.layers.ZeroPadding2D()(leaky_relu)

    last = keras.layers.Conv2D(1, 4, strides=1,
                         kernel_initializer='random_normal')(pad2) 

    return tf.keras.Model(inputs=ip, outputs=last)

In [17]:
monet_generator = Generator() # transforms photos to Monet-esque paintings
photo_generator = Generator() # transforms Monet paintings to be more like photos

monet_discriminator = Discriminator() # differentiates real Monet paintings and generated Monet paintings
photo_discriminator = Discriminator() # differentiates real photos and generated photos

In [18]:
monet_generator.summary()

Model: "functional_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 256, 256, 3) 0                                            
__________________________________________________________________________________________________
sequential (Sequential)         (None, 128, 128, 64) 3072        input_1[0][0]                    
__________________________________________________________________________________________________
sequential_1 (Sequential)       (None, 64, 64, 64)   65664       sequential[0][0]                 
__________________________________________________________________________________________________
sequential_2 (Sequential)       (None, 32, 32, 64)   65664       sequential_1[0][0]               
_______________________________________________________________________________________

In [19]:
monet_discriminator.summary()

Model: "functional_5"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_image (InputLayer)     [(None, 256, 256, 3)]     0         
_________________________________________________________________
sequential_30 (Sequential)   (None, 128, 128, 64)      3072      
_________________________________________________________________
sequential_31 (Sequential)   (None, 64, 64, 128)       131328    
_________________________________________________________________
sequential_32 (Sequential)   (None, 32, 32, 256)       524800    
_________________________________________________________________
zero_padding2d (ZeroPadding2 (None, 34, 34, 256)       0         
_________________________________________________________________
conv2d_19 (Conv2D)           (None, 31, 31, 512)       2097152   
_________________________________________________________________
instance_normalization_30 (I (None, 31, 31, 512)      

#Training the Model
Overriding the training step and Compile function of keras.model to train our Cycle GAN. 

In [20]:
class CycleGan(keras.Model):
    def __init__(
        self,
        monet_generator,
        photo_generator,
        monet_discriminator,
        photo_discriminator,
        lambda_cycle=10,
    ):
        super(CycleGan, self).__init__()
        self.m_gen = monet_generator
        self.p_gen = photo_generator
        self.m_disc = monet_discriminator
        self.p_disc = photo_discriminator
        self.lambda_cycle = lambda_cycle
        
    def compile(
        self,
        m_gen_optimizer,
        p_gen_optimizer,
        m_disc_optimizer,
        p_disc_optimizer,
        gen_loss_fn,
        disc_loss_fn,
        cycle_loss_fn,
        identity_loss_fn
    ):
        super(CycleGan, self).compile()
        self.m_gen_optimizer = m_gen_optimizer
        self.p_gen_optimizer = p_gen_optimizer
        self.m_disc_optimizer = m_disc_optimizer
        self.p_disc_optimizer = p_disc_optimizer
        self.gen_loss_fn = gen_loss_fn
        self.disc_loss_fn = disc_loss_fn
        self.cycle_loss_fn = cycle_loss_fn
        self.identity_loss_fn = identity_loss_fn
        
    def train_step(self, batch_data): # it is called for model.fit function
        real_monet, real_photo = batch_data
       
    
        # Run the forward pass of the layer.
        # The operations that the layer applies to its inputs are going to be recorded on the GradientTape.

        with tf.GradientTape(persistent=True) as tape:
            # photo to monet back to photo
            fake_monet = self.m_gen(real_photo, training=True)  # Forward pass
            cycled_photo = self.p_gen(fake_monet, training=True)

            # monet to photo back to monet
            fake_photo = self.p_gen(real_monet, training=True)
            cycled_monet = self.m_gen(fake_photo, training=True)

            # generating itself
            same_monet = self.m_gen(real_monet, training=True)
            same_photo = self.p_gen(real_photo, training=True)

            # discriminator used to check, inputing real images
            disc_real_monet = self.m_disc(real_monet, training=True)
            disc_real_photo = self.p_disc(real_photo, training=True)

            # discriminator used to check, inputing fake images
            disc_fake_monet = self.m_disc(fake_monet, training=True)
            disc_fake_photo = self.p_disc(fake_photo, training=True)

            # evaluates generator loss
            monet_gen_loss = self.gen_loss_fn(disc_fake_monet)
            photo_gen_loss = self.gen_loss_fn(disc_fake_photo)

            # evaluates total cycle consistency loss
            total_cycle_loss = self.cycle_loss_fn(real_monet, cycled_monet, self.lambda_cycle) + self.cycle_loss_fn(real_photo, cycled_photo, self.lambda_cycle)

            # evaluates total generator loss
            total_monet_gen_loss = monet_gen_loss + total_cycle_loss + self.identity_loss_fn(real_monet, same_monet, self.lambda_cycle)
            total_photo_gen_loss = photo_gen_loss + total_cycle_loss + self.identity_loss_fn(real_photo, same_photo, self.lambda_cycle)

            # evaluates discriminator loss
            monet_disc_loss = self.disc_loss_fn(disc_real_monet, disc_fake_monet)
            photo_disc_loss = self.disc_loss_fn(disc_real_photo, disc_fake_photo)

        # Calculate the gradients for generator and discriminator
        monet_generator_gradients = tape.gradient(total_monet_gen_loss,
                                                  self.m_gen.trainable_variables)
        photo_generator_gradients = tape.gradient(total_photo_gen_loss,
                                                  self.p_gen.trainable_variables)

        monet_discriminator_gradients = tape.gradient(monet_disc_loss,
                                                      self.m_disc.trainable_variables)
        photo_discriminator_gradients = tape.gradient(photo_disc_loss,
                                                      self.p_disc.trainable_variables)

        # Apply the gradients to the optimizer;Update weights
        # Run one step of gradient descent by updating the value of the variables to minimize the loss.
        self.m_gen_optimizer.apply_gradients(zip(monet_generator_gradients,
                                                 self.m_gen.trainable_variables))

        self.p_gen_optimizer.apply_gradients(zip(photo_generator_gradients,
                                                 self.p_gen.trainable_variables))

        self.m_disc_optimizer.apply_gradients(zip(monet_discriminator_gradients,
                                                  self.m_disc.trainable_variables))

        self.p_disc_optimizer.apply_gradients(zip(photo_discriminator_gradients,
                                                  self.p_disc.trainable_variables))
        
        return {
            "monet_gen_loss": total_monet_gen_loss,
            "photo_gen_loss": total_photo_gen_loss,
            "monet_disc_loss": monet_disc_loss,
            "photo_disc_loss": photo_disc_loss
        }

#Defining Loss Functions
An ideal discriminator will output all 1s for a real image and all 0s for a fake image, thus the discriminator output is being compared to matrices of 0s and 1s. The discriminator loss is average of Real loss and Fake loss

In [21]:
def discriminator_loss(real, generated):
        real_loss = keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.ones_like(real), real)

        generated_loss = keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.zeros_like(generated), generated)

        total_disc_loss = real_loss + generated_loss

        return total_disc_loss * 0.5

In [22]:
def generator_loss(generated):
        return tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.ones_like(generated), generated)

In [23]:
def calc_cycle_loss(real_image, cycled_image, LAMBDA):
        loss1 = tf.reduce_mean(tf.abs(real_image - cycled_image))

        return LAMBDA * loss1

In [24]:
def identity_loss(real_image, same_image, LAMBDA):
        loss = tf.reduce_mean(tf.abs(real_image - same_image))
        return LAMBDA * 0.5 * loss

In [25]:
monet_generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
photo_generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

monet_discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
photo_discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

In [26]:
cycle_gan_model = CycleGan(
        monet_generator, photo_generator, monet_discriminator, photo_discriminator
    )

cycle_gan_model.compile(
        m_gen_optimizer = monet_generator_optimizer,
        p_gen_optimizer = photo_generator_optimizer,
        m_disc_optimizer = monet_discriminator_optimizer,
        p_disc_optimizer = photo_discriminator_optimizer,
        gen_loss_fn = generator_loss,
        disc_loss_fn = discriminator_loss,
        cycle_loss_fn = calc_cycle_loss,
        identity_loss_fn = identity_loss
    )

In [27]:
cycle_gan_model.fit(
    tf.data.Dataset.zip((train_dataset, photo_dataset)),
    epochs=15
)

Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15


<tensorflow.python.keras.callbacks.History at 0x7f7a61b2cb38>

#Submission File

In [28]:
!mkdir /content/images

In [29]:
i=1
for img in photo_dataset:
    prediction = monet_generator(img, training=False)[0].numpy()
    prediction = (prediction * 127.5 + 127.5).astype(np.uint8)
    im = PIL.Image.fromarray(prediction)
    im.save("/content/images/" + str(i) + ".jpg")
    i += 1

In [30]:
!zip -r /content/imgs.zip /content/images

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  adding: content/images/6264.jpg (deflated 1%)
  adding: content/images/522.jpg (deflated 1%)
  adding: content/images/3236.jpg (deflated 2%)
  adding: content/images/4365.jpg (deflated 1%)
  adding: content/images/1485.jpg (deflated 1%)
  adding: content/images/4312.jpg (deflated 1%)
  adding: content/images/3528.jpg (deflated 2%)
  adding: content/images/7009.jpg (deflated 1%)
  adding: content/images/1268.jpg (deflated 3%)
  adding: content/images/5883.jpg (deflated 1%)
  adding: content/images/1743.jpg (deflated 1%)
  adding: content/images/1433.jpg (deflated 1%)
  adding: content/images/5114.jpg (deflated 1%)
  adding: content/images/471.jpg (deflated 2%)
  adding: content/images/1555.jpg (deflated 1%)
  adding: content/images/3226.jpg (deflated 2%)
  adding: content/images/5126.jpg (deflated 1%)
  adding: content/images/6251.jpg (deflated 1%)
  adding: content/images/4688.jpg (deflated 1%)
  adding: content/images/