# Overview
The purpose of this notebook is to apply Cycle GAN to translate photos into the style of Monet paintings. This task does not have paired images (the same image as a photo and Monet painting), so the discriminator component of a GAN is necessary. The generator will create photos in the style of Monet paintings and the discriminator will evaluate how well those generated paintings pass for true Monet paintings.
In order to ensure the generated paintings not only resemble Monet paintings, but also resemble the original photograph, the model will also be evaluated on how well the generated paintings can be translated back into photographs. This is the 'cycle' aspect of the Cycle GAN because the photograph will be translated to a Monet painting and then translated back to a photograph. The loss will be calculated for both directions of the cycle.

### Acknowledgements
This notebook is based on Amy Jang's tutorial notebook found here: https://www.kaggle.com/code/amyjang/monet-cyclegan-tutorial.

In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa

from kaggle_datasets import KaggleDatasets
import matplotlib.pyplot as plt
import numpy as np

try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    print('Device:', tpu.master())
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.experimental.TPUStrategy(tpu)
except:
    strategy = tf.distribute.get_strategy()
print('Number of replicas:', strategy.num_replicas_in_sync)

AUTOTUNE = tf.data.experimental.AUTOTUNE
    
print(tf.__version__)

Number of replicas: 1
2.6.4


# Load data

The dataset consists of 300 Monet paintings and 7028 photographs. All images have a consistent size of 256x256. The images are available in JPEG or TFRecord formats. This project will use the TFRecord format.

In [2]:
GCS_PATH = KaggleDatasets().get_gcs_path()

In [3]:
MONET_FILENAMES = tf.io.gfile.glob(str(GCS_PATH + '/monet_tfrec/*.tfrec'))
print('Monet TFRecord Files:', len(MONET_FILENAMES))

PHOTO_FILENAMES = tf.io.gfile.glob(str(GCS_PATH + '/photo_tfrec/*.tfrec'))
print('Photo TFRecord Files:', len(PHOTO_FILENAMES))

Monet TFRecord Files: 5
Photo TFRecord Files: 20


# Process Images
The images are a consistent size (256 x 256). The image size variable is set outside the function in the event the model will be used with photographs outside the given dataset. The variable could be changed to match the size of those photographs. The below functions will set the channels to 3 (RGB images) and rescale the images to [-1, 1].


In [4]:
image_size = [256, 256]

def decode_image(image):
    image = tf.image.decode_jpeg(image, channels = 3)
    image = (tf.cast(image, tf.float32) / 127.5) - 1
    image = tf.reshape(image, [*image_size, 3])
    return image

def read_tfrecord(example):
    tfrecord_format = {
        "image_name": tf.io.FixedLenFeature([], tf.string),
        "image": tf.io.FixedLenFeature([], tf.string),
        "target": tf.io.FixedLenFeature([], tf.string)
    }
    example = tf.io.parse_single_example(example, tfrecord_format)
    image = decode_image(example['image'])
    return image

# Extract images from datasets

In [5]:
def load_dataset(filenames, labeled = True, ordered = False):
    dataset = tf.data.TFRecordDataset(filenames)
    dataset = dataset.map(read_tfrecord, num_parallel_calls = AUTOTUNE)
    return dataset

In [6]:
monet_ds = load_dataset(MONET_FILENAMES, labeled = True).batch(1)
photo_ds = load_dataset(PHOTO_FILENAMES, labeled = True).batch(1)

In [None]:
# Sample images from each dataset
example_monet = next(iter(monet_ds))
example_photo = next(iter(photo_ds))

plt.subplot(121)
plt.title('Photo')
plt.imshow(example_photo[0] * 0.5 + 0.5)

plt.subplot(122)
plt.title('Monet')
plt.imshow(example_monet[0] * 0.5 + 0.5)

# Build the Architectures

## Generator

The generator will be a U-Net model. This model involves two steps: contracting path and expansive path.
- Contracting path (downsample function): extracts features using a convolution layer, an instance normalization layer, and a LeakyReLU activation function.
- Expansive path (upsample function): translates features into learned Monet-style using an inverse convolution layer, an instance normalization layer, and a LeakyReLU activation function.
I performed informal comparisons of activation functions and chose to use Leaky Relu for both paths.

In [None]:
OUTPUT_CHANNELS = 3

def downsample(filters, size):
    initializer = tf.random_normal_initializer(0., 0.02)
    gamma_init = keras.initializers.RandomNormal(mean = 0.0, stddev = 0.02)

    model = keras.Sequential()
    model.add(layers.Conv2D(filters, 
                            size, 
                            strides = 2, 
                            padding = 'same',
                             kernel_initializer = initializer,
                            use_bias = False))
    model.add(tfa.layers.InstanceNormalization(gamma_initializer = gamma_init))
    model.add(layers.LeakyReLU())

    return model

def upsample(filters, size):
    initializer = tf.random_normal_initializer(0., 0.02)
    gamma_init = keras.initializers.RandomNormal(mean = 0.0, stddev = 0.02)

    model = keras.Sequential()
    model.add(layers.Conv2DTranspose(filters, 
                                     size, 
                                     strides = 2,
                                      padding = 'same',
                                      kernel_initializer = initializer,
                                      use_bias = False))
    model.add(tfa.layers.InstanceNormalization(gamma_initializer = gamma_init))
    model.add(layers.LeakyReLU())

    return model

## Generator architecture
The up and down stacks mirror each other such that each feature reduction in the down stack has a matching expansion in the up stack.

In [None]:
# Use the downsample and upsample functions to build the generator
def Generator():
    inputs = layers.Input(shape=[256, 256, 3])

    down_stack = [
        downsample(64, 4), # (128, 128, 64)
        downsample(128, 4), # (64, 64, 128)
        downsample(256, 4), # (32, 32, 256)
        downsample(512, 4), # (16, 16, 512)
        downsample(512, 4), # (8, 8, 512)
        downsample(512, 4), # (4, 4, 512)
        downsample(512, 4), # (2, 2, 512)
        downsample(512, 4), # (1, 1, 512)
    ]

    up_stack = [
        upsample(512, 4), # (2, 2, 1024)
        upsample(512, 4), # (4, 4, 1024)
        upsample(512, 4), # (8, 8, 1024)
        upsample(512, 4), # (16, 16, 1024)
        upsample(256, 4), # (32, 32, 512)
        upsample(128, 4), # (64, 64, 256)
        upsample(64, 4), # (128, 128, 128)
    ]

    initializer = tf.random_normal_initializer(0., 0.02)
    last = layers.Conv2DTranspose(OUTPUT_CHANNELS, 4,
                                  strides = 2,
                                  padding = 'same',
                                  kernel_initializer = initializer,
                                  activation = 'tanh') # (256, 256, 3)

    x = inputs

    # Downsampling through the model
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)

    skips = reversed(skips[:-1])

    # Upsampling and establishing the skip connections
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = layers.Concatenate()([x, skip])

    x = last(x)

    return keras.Model(inputs = inputs, outputs = x)

## Discriminator

The discriminator downsamples the image, then applies a convolutional layer, normalization layer, activation function, and a final convolutional layer. 

In [None]:
def Discriminator():
    initializer = tf.random_normal_initializer(0., 0.02)
    gamma_init = keras.initializers.RandomNormal(mean = 0.0, stddev = 0.02)

    inp = layers.Input(shape = [256, 256, 3], name = 'input_image')

    x = inp

    down1 = downsample(64, 4)(x) # (bs, 128, 128, 64)
    down2 = downsample(128, 4)(down1) # (bs, 64, 64, 128)
    down3 = downsample(256, 4)(down2) # (bs, 32, 32, 256)

    zero_pad1 = layers.ZeroPadding2D()(down3) # (bs, 34, 34, 256)
    conv = layers.Conv2D(512, 4, strides = 1,
                         kernel_initializer = initializer,
                         use_bias = False)(zero_pad1) # (bs, 31, 31, 512)

    norm1 = tfa.layers.InstanceNormalization(gamma_initializer = gamma_init)(conv)

    leaky_relu = layers.LeakyReLU()(norm1)

    zero_pad2 = layers.ZeroPadding2D()(leaky_relu) # (bs, 33, 33, 512)

    last = layers.Conv2D(1, 4, strides = 1,
                         kernel_initializer = initializer)(zero_pad2) # (bs, 30, 30, 1)

    return tf.keras.Model(inputs = inp, outputs = last)

In [None]:
with strategy.scope():
    monet_generator = Generator() # generates Monet-style paintings
    photo_generator = Generator() # generates photographs

    monet_discriminator = Discriminator() # discriminates between true Monets and false Monets
    photo_discriminator = Discriminator() # discriminates between true photographs and false photographs

## Cycle GAN Model

The class below creates a keras model from the generators and discriminators. This allows the use of keras model methods to train the model.

In [None]:
class CycleGan(keras.Model):
    def __init__(
        self,
        monet_generator,
        photo_generator,
        monet_discriminator,
        photo_discriminator,
        lambda_cycle=10,
    ):
        super(CycleGan, self).__init__()
        self.m_gen = monet_generator
        self.p_gen = photo_generator
        self.m_disc = monet_discriminator
        self.p_disc = photo_discriminator
        self.lambda_cycle = lambda_cycle
        
    def compile(
        self,
        m_gen_optimizer,
        p_gen_optimizer,
        m_disc_optimizer,
        p_disc_optimizer,
        gen_loss_fn,
        disc_loss_fn,
        cycle_loss_fn,
        identity_loss_fn
    ):
        super(CycleGan, self).compile()
        self.m_gen_optimizer = m_gen_optimizer
        self.p_gen_optimizer = p_gen_optimizer
        self.m_disc_optimizer = m_disc_optimizer
        self.p_disc_optimizer = p_disc_optimizer
        self.gen_loss_fn = gen_loss_fn
        self.disc_loss_fn = disc_loss_fn
        self.cycle_loss_fn = cycle_loss_fn
        self.identity_loss_fn = identity_loss_fn
        
    def train_step(self, batch_data):
        real_monet, real_photo = batch_data
        
        with tf.GradientTape(persistent=True) as tape:
            # Photo -> Monet -> Photo
            fake_monet = self.m_gen(real_photo, training=True)
            cycled_photo = self.p_gen(fake_monet, training=True)

            # Monet -> Photo -> Monet
            fake_photo = self.p_gen(real_monet, training=True)
            cycled_monet = self.m_gen(fake_photo, training=True)

            # Monet -> Monet
            same_monet = self.m_gen(real_monet, training=True)
            
            # Photo -> Photo
            same_photo = self.p_gen(real_photo, training=True)

            # Pass true Monet to Monet discriminator
            disc_real_monet = self.m_disc(real_monet, training=True)
            
            # Pass true photo to photo discriminator
            disc_real_photo = self.p_disc(real_photo, training=True)

            # Pass fake Monet to Monet discriminator
            disc_fake_monet = self.m_disc(fake_monet, training=True)
            
            # Pass fake photo to photo discriminator
            disc_fake_photo = self.p_disc(fake_photo, training=True)

            # Generator losses
            monet_gen_loss = self.gen_loss_fn(disc_fake_monet)
            photo_gen_loss = self.gen_loss_fn(disc_fake_photo)

            # Consistency losses for Monet and photo
            total_cycle_loss = self.cycle_loss_fn(real_monet, cycled_monet, self.lambda_cycle) + self.cycle_loss_fn(real_photo, cycled_photo, self.lambda_cycle)

            # Total generator losses
            total_monet_gen_loss = monet_gen_loss + total_cycle_loss + self.identity_loss_fn(real_monet, same_monet, self.lambda_cycle)
            total_photo_gen_loss = photo_gen_loss + total_cycle_loss + self.identity_loss_fn(real_photo, same_photo, self.lambda_cycle)

            # Discriminator losses
            monet_disc_loss = self.disc_loss_fn(disc_real_monet, disc_fake_monet)
            photo_disc_loss = self.disc_loss_fn(disc_real_photo, disc_fake_photo)

        # Calculate the gradients for generator and discriminator
        monet_generator_gradients = tape.gradient(total_monet_gen_loss,
                                                  self.m_gen.trainable_variables)
        photo_generator_gradients = tape.gradient(total_photo_gen_loss,
                                                  self.p_gen.trainable_variables)

        monet_discriminator_gradients = tape.gradient(monet_disc_loss,
                                                      self.m_disc.trainable_variables)
        photo_discriminator_gradients = tape.gradient(photo_disc_loss,
                                                      self.p_disc.trainable_variables)

        # Apply the gradients to the optimizer
        self.m_gen_optimizer.apply_gradients(zip(monet_generator_gradients,
                                                 self.m_gen.trainable_variables))

        self.p_gen_optimizer.apply_gradients(zip(photo_generator_gradients,
                                                 self.p_gen.trainable_variables))

        self.m_disc_optimizer.apply_gradients(zip(monet_discriminator_gradients,
                                                  self.m_disc.trainable_variables))

        self.p_disc_optimizer.apply_gradients(zip(photo_discriminator_gradients,
                                                  self.p_disc.trainable_variables))
        
        return {
            "monet_gen_loss": total_monet_gen_loss,
            "photo_gen_loss": total_photo_gen_loss,
            "monet_disc_loss": monet_disc_loss,
            "photo_disc_loss": photo_disc_loss
        }

## Loss Functions

A perfect generator will produce images that the discriminator considers real. The discriminator loss function outputs 1s for real images; therefore, a perfect generator will cause the discriminator to output 1s. The consistency loss calculates the loss between the original photo and the photo that has been translated to Monet style and back. In a perfect system, the translated photo would perfectly match the original photo. The identity loss calculates the loss between a photo that is passed through the photo generator. It should not be changed because it is already a photo.


In [None]:
# Discriminator loss
with strategy.scope():
    def discriminator_loss(real, generated):
        real_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction = tf.keras.losses.Reduction.NONE)(tf.ones_like(real), real)

        generated_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction = tf.keras.losses.Reduction.NONE)(tf.zeros_like(generated), generated)

        total_disc_loss = real_loss + generated_loss

        return total_disc_loss * 0.5
    
# Generator loss
with strategy.scope():
    def generator_loss(generated):
        return tf.keras.losses.BinaryCrossentropy(from_logits = True, reduction = tf.keras.losses.Reduction.NONE)(tf.ones_like(generated), generated)
    
# Consistency loss
with strategy.scope():
    def calc_cycle_loss(real_image, cycled_image, LAMBDA):
        loss1 = tf.reduce_mean(tf.abs(real_image - cycled_image))

        return LAMBDA * loss1
    
# Identity loss
with strategy.scope():
    def identity_loss(real_image, same_image, LAMBDA):
        loss = tf.reduce_mean(tf.abs(real_image - same_image))
        return LAMBDA * 0.5 * loss

# Training

The model will first be trained using the ADAM optimizer with a learning rate of 2e-4. Next, I will reduce the learning rate by half and evaluate the effect on performance.

In [None]:
with strategy.scope():
    monet_generator_optimizer = tf.keras.optimizers.Adam(learning_rate = 2e-4, beta_1 = 0.5)
    photo_generator_optimizer = tf.keras.optimizers.Adam(learning_rate = 2e-4, beta_1 = 0.5)

    monet_discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate = 2e-4, beta_1 = 0.5)
    photo_discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate = 2e-4, beta_1 = 0.5)

In [None]:
with strategy.scope():
    cycle_gan_model = CycleGan(
        monet_generator, photo_generator, monet_discriminator, photo_discriminator
    )

    cycle_gan_model.compile(
        m_gen_optimizer = monet_generator_optimizer,
        p_gen_optimizer = photo_generator_optimizer,
        m_disc_optimizer = monet_discriminator_optimizer,
        p_disc_optimizer = photo_discriminator_optimizer,
        gen_loss_fn = generator_loss,
        disc_loss_fn = discriminator_loss,
        cycle_loss_fn = calc_cycle_loss,
        identity_loss_fn = identity_loss
    )

In [None]:
mod1_history = cycle_gan_model.fit(
    tf.data.Dataset.zip((monet_ds, photo_ds)),
    epochs = 25
) 

In [None]:
# Decrease learning rate
with strategy.scope():
    monet_generator_optimizer2 = tf.keras.optimizers.Adam(learning_rate = 1e-4, beta_1 = 0.5)
    photo_generator_optimizer2 = tf.keras.optimizers.Adam(learning_rate = 1e-4, beta_1 = 0.5)

    monet_discriminator_optimizer2 = tf.keras.optimizers.Adam(learning_rate = 1e-4, beta_1 = 0.5)
    photo_discriminator_optimizer2 = tf.keras.optimizers.Adam(learning_rate = 1e-4, beta_1 = 0.5)

In [None]:
with strategy.scope():
    cycle_gan_model2 = CycleGan(
        monet_generator, photo_generator, monet_discriminator, photo_discriminator
    )

    cycle_gan_model2.compile(
        m_gen_optimizer = monet_generator_optimizer2,
        p_gen_optimizer = photo_generator_optimizer2,
        m_disc_optimizer = monet_discriminator_optimizer2,
        p_disc_optimizer = photo_discriminator_optimizer2,
        gen_loss_fn = generator_loss,
        disc_loss_fn = discriminator_loss,
        cycle_loss_fn = calc_cycle_loss,
        identity_loss_fn = identity_loss
    )

In [None]:
mod2_history = cycle_gan_model2.fit(
    tf.data.Dataset.zip((monet_ds, photo_ds)),
    epochs = 25
) 

In [None]:
# Increase learning rate
with strategy.scope():
    monet_generator_optimizer3 = tf.keras.optimizers.Adam(learning_rate = 4e-4, beta_1 = 0.5)
    photo_generator_optimizer3 = tf.keras.optimizers.Adam(learning_rate = 4e-4, beta_1 = 0.5)

    monet_discriminator_optimizer3 = tf.keras.optimizers.Adam(learning_rate = 4e-4, beta_1 = 0.5)
    photo_discriminator_optimizer3 = tf.keras.optimizers.Adam(learning_rate = 4e-4, beta_1 = 0.5)

In [None]:
with strategy.scope():
    cycle_gan_model3 = CycleGan(
        monet_generator, photo_generator, monet_discriminator, photo_discriminator
    )

    cycle_gan_model3.compile(
        m_gen_optimizer = monet_generator_optimizer3,
        p_gen_optimizer = photo_generator_optimizer3,
        m_disc_optimizer = monet_discriminator_optimizer3,
        p_disc_optimizer = photo_discriminator_optimizer3,
        gen_loss_fn = generator_loss,
        disc_loss_fn = discriminator_loss,
        cycle_loss_fn = calc_cycle_loss,
        identity_loss_fn = identity_loss
    )

In [None]:
mod3_history = cycle_gan_model3.fit(
    tf.data.Dataset.zip((monet_ds, photo_ds)),
    epochs = 25
) 

# Results

For the model with a learning rate of 2e-4, both of the generator losses decrease across the 25 epochs. The discriminator losses increase in early epochs and then decrease.  
For the model with a learning rate of 1e-4, initial generator losses are higher than the previous model, but they rapidly decrease. The discriminator losses are similar to those of the previous model.  
For the model with a learning rate of 4e-4,   
The table below summarizes the losses for each of the models.



| Learning Rate    | Monet Generator | Photo Generator | Monet Discriminator | Photo Discriminator |
| ---------------- | --------------- | --------------- | ------------------- | ------------------- |
| 2e-4  | 5.262 | 5.382 | 0.567 | 0.543 |
| 1e-4 | 5.591 | 5.657 | 0.570 | 0.558 |
| 4e-4 | 4.899 | 4.997 | 0.595 | 0.552 |

## Sample Monet-style Photos

In [None]:
_, ax = plt.subplots(5, 2, figsize=(12, 12))
for i, img in enumerate(photo_ds.take(5)):
    prediction = monet_generator(img, training=False)[0].numpy()
    prediction = (prediction * 127.5 + 127.5).astype(np.uint8)
    img = (img[0] * 127.5 + 127.5).numpy().astype(np.uint8)

    ax[i, 0].imshow(img)
    ax[i, 1].imshow(prediction)
    ax[i, 0].set_title("Input Photo")
    ax[i, 1].set_title("Monet-style")
    ax[i, 0].axis("off")
    ax[i, 1].axis("off")
plt.show()

# Create submission file

In [None]:
import PIL
! mkdir ../images

In [None]:
i = 1
for img in photo_ds:
    prediction = monet_generator(img, training=False)[0].numpy()
    prediction = (prediction * 127.5 + 127.5).astype(np.uint8)
    im = PIL.Image.fromarray(prediction)
    im.save("../images/" + str(i) + ".jpg")
    i += 1

In [None]:
import shutil
shutil.make_archive("/kaggle/working/images", 'zip', "/kaggle/images")

# Conclusion
For the project, I used a Cycle GAN to translate photographs into Monet-style paintings. The Cycle GAN was constructed from U-Net-based generator, which contracts and expands the images fed to it, and a CNN-based discriminator. Based on informal experimentation, I elected to use Leaky ReLU activation functions for both the downsampling and upsampling functions I compared 3 learning rates for training the model and selected the largest learning rate of 4e-4, which resulted in lower losses for both the generators and discriminators. As seen in the sample photos, the model is able to produce Monet-style paintings when given photographs.  
Further experimentation with generator and discriminator architectures may yield better performance.

# References
https://www.kaggle.com/code/amyjang/monet-cyclegan-tutorial
https://medium.com/@william.op.cable/cycle-gans-and-monet-style-transfer-28c0cc1dede6
