# Acknoledgemnet
Many lines of code are based on the 'Monet CycleGAN Tutorial by Amy Jang'. Thank you Amy, for sharing the great work.
https://www.kaggle.com/code/amyjang/monet-cyclegan-tutorial

# Dataset
* Number of monet samples: 300
* Number of photo samples: 7028
* Image size: 256 256 3
The task is style transfer. Our goal is to learn features from the works of Monet and then apply them to the photos. 

# EDA
All images are the same shape, and there are no duplicates. They are ready to be used for training.

# Model structure
Inpsired by the CycleGAN tutorial on Keras, I used a U-net architecture for this project:
* Number of downsampling: 2
* Number of residual blocks: 9
* Number of upsampling:2

As per the standard settings of CycleGAN, I trained 4 models:
1. Monet_generator
2. Photo_generator
3. Monet_discriminator
4. Photo_discriminator

A few changes that make my model different from Amy's model:
1. There are fewer downsampling and upsampling in my model.
2. There are 9 residual blocks in between the downward and upward structure

Eventually, I used the monet generator to transfer photos into Monet-esque images.

# Results
The final score is 69.031, whic is worse than Amy's model score of 53.769. The models encoutnered overfitting issues and stopped learning after a few epochs. 

# Discussions
There are techniques that might help alleviate the overfitting issues, such as adding layers and complexity to the model architecture, better weights initialization, and better data normalizaton ... etc. 

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa

from kaggle_datasets import KaggleDatasets
import matplotlib.pyplot as plt
import numpy as np

try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    print('Device:', tpu.master())
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.experimental.TPUStrategy(tpu)
except:
    strategy = tf.distribute.get_strategy()
print('Number of replicas:', strategy.num_replicas_in_sync)

AUTOTUNE = tf.data.experimental.AUTOTUNE
    
print(tf.__version__)

# Load in the data

We want to keep our photo dataset and our Monet dataset separate. First, load in the filenames of the TFRecords.

In [None]:
GCS_PATH = KaggleDatasets().get_gcs_path()

In [None]:
MONET_FILENAMES = tf.io.gfile.glob(str(GCS_PATH + '/monet_tfrec/*.tfrec'))
print('Monet TFRecord Files:', len(MONET_FILENAMES))

PHOTO_FILENAMES = tf.io.gfile.glob(str(GCS_PATH + '/photo_tfrec/*.tfrec'))
print('Photo TFRecord Files:', len(PHOTO_FILENAMES))

In [None]:
IMAGE_SIZE = [256, 256]

def decode_image(image):
    image = tf.image.decode_jpeg(image, channels=3)
    image = (tf.cast(image, tf.float32) / 127.5) - 1
    image = tf.reshape(image, [*IMAGE_SIZE, 3])
    return image

def read_tfrecord(example):
    tfrecord_format = {
        "image_name": tf.io.FixedLenFeature([], tf.string),
        "image": tf.io.FixedLenFeature([], tf.string),
        "target": tf.io.FixedLenFeature([], tf.string)
    }
    example = tf.io.parse_single_example(example, tfrecord_format)
    image = decode_image(example['image'])
    return image

def load_dataset(filenames, labeled=True, ordered=False):
    dataset = tf.data.TFRecordDataset(filenames)
    dataset = dataset.map(read_tfrecord, num_parallel_calls=AUTOTUNE)
    return dataset

In [None]:
with strategy.scope():
    monet_ds = load_dataset(MONET_FILENAMES, labeled=True).batch(1)
    photo_ds = load_dataset(PHOTO_FILENAMES, labeled=True).batch(1)

# Model architecture

In [None]:
class ReflectionPadding2D(layers.Layer):
    """Implements Reflection Padding as a layer.

    Args:
        padding(tuple): Amount of padding for the
        spatial dimensions.

    Returns:
        A padded tensor with the same type as the input tensor.
    """

    def __init__(self, padding=(1, 1), **kwargs):
        self.padding = tuple(padding)
        super().__init__(**kwargs)

    def call(self, input_tensor, mask=None):
        padding_width, padding_height = self.padding
        padding_tensor = [
            [0, 0],
            [padding_height, padding_height],
            [padding_width, padding_width],
            [0, 0],
        ]
        return tf.pad(input_tensor, padding_tensor, mode="REFLECT")

In [None]:
def downsample(
    x,
    filters, 
    kernel_size, 
    activation=True,
    isInstanceNormalized=True):
  
    kernel_initializer = keras.initializers.RandomNormal(0., 0.02)
    gamma_initializer = keras.initializers.RandomNormal(0., 0.02)

    x = layers.Conv2D(
      filters=filters,
      kernel_size=kernel_size,
      strides=2,
      padding='same',
      kernel_initializer=kernel_initializer,
      use_bias=False,
    )(x)

    if isInstanceNormalized:
        x = tfa.layers.InstanceNormalization(
            gamma_initializer=gamma_initializer,
        )(x)

    if activation:
        x = layers.LeakyReLU()(x)

    return x


def upsample(
    x,
    filters, 
    kernel_size, 
    activation=True,
    isInstanceNormalized=True):
    
    kernel_initializer = keras.initializers.RandomNormal(0., 0.02)
    gamma_initializer = keras.initializers.RandomNormal(0., 0.02)
    
    x = layers.Conv2DTranspose(
      filters=filters,
      kernel_size=kernel_size,
      strides=2,
      padding='same',
      kernel_initializer=kernel_initializer,
      use_bias=False,
    )(x)

    if isInstanceNormalized:
        x = tfa.layers.InstanceNormalization(
            gamma_initializer=gamma_initializer,
        )(x)

    if activation:
        x = layers.LeakyReLU()(x)

    return x

def residual_block(
    x,
    filters, 
    kernel_size, 
    activation=True,
    isInstanceNormalized=True):

    shortcut = x
    
    kernel_initializer = keras.initializers.RandomNormal(0., 0.02)
    gamma_initializer = keras.initializers.RandomNormal(0., 0.02)

    x = ReflectionPadding2D()(x)
    x = layers.Conv2D(
      filters=filters,
      kernel_size=kernel_size,
      strides=1, #therefore the shape remain the same
      padding='valid',
      kernel_initializer=kernel_initializer
    )(x)

    x = tfa.layers.InstanceNormalization(
        gamma_initializer=gamma_initializer,
    )(x)

    x = layers.LeakyReLU()(x)
    
    kernel_initializer = keras.initializers.RandomNormal(0., 0.02)
    gamma_initializer = keras.initializers.RandomNormal(0., 0.02)
    
    x = ReflectionPadding2D()(x)
    x = layers.Conv2D(
      filters=filters,
      kernel_size=kernel_size,
      strides=1, #therefore the shape remain the same
      padding='valid',
      kernel_initializer=kernel_initializer
    )(x)

    x = tfa.layers.InstanceNormalization(
        gamma_initializer=gamma_initializer,
    )(x)

    x = layers.Add()([x, shortcut]) #residual connections
    x = layers.LeakyReLU()(x)

    return x
  

In [None]:
def build_resnet_generator(
    n_downsample=2,
    n_residual=9,
    n_upsample=2,
    filters=64,
    kernel_size=3,
  ):

    kernel_initializer = keras.initializers.RandomNormal(0., 0.02)
    gamma_initializer = keras.initializers.RandomNormal(0., 0.02)
    
    # first block
    inputs = layers.Input(shape=(256,256,3))
    x = inputs
    x = layers.Conv2D(
      filters=filters, 
      kernel_size=7, 
      kernel_initializer=kernel_initializer,
      padding='same'
      )(x)
    x = layers.LeakyReLU()(x)

    for _ in range(n_downsample):
        filters *= 2
        x = downsample(x, filters, kernel_size)

    for _ in range(n_residual):
        x = residual_block(x, filters, kernel_size)

    for _ in range(n_upsample):
        filters //= 2
        x = upsample(x, filters, kernel_size)

    last_layer = layers.Conv2D(3, kernel_size, padding='same', activation='tanh')
    outputs = last_layer(x)

    return keras.Model(inputs, outputs)

  

In [None]:
def build_discriminator(
    n_downsample=3,
    filters=32,
    kernel_size=3,
  ):

    inputs = layers.Input(shape=(256,256,3))
    x = inputs
    for _ in range(n_downsample):
        filters *= 2
        x = downsample(x, filters, kernel_size)

    # last layer
    outputs = layers.Conv2D(
      1, 3, padding='same', activation='tanh'
    )(x)

    return keras.Model(inputs, outputs)


In [None]:
with strategy.scope():

    def g_loss_fn(d_generated): 
        # generated image after discriminator
        reduction = keras.losses.Reduction.NONE
        loss_fn = keras.losses.BinaryCrossentropy(from_logits=True, reduction=reduction)
        return loss_fn(tf.ones_like(d_generated), d_generated)

    def d_loss_fn(d_real, d_fake):
        # real and generated image after discriminator
        reduction = keras.losses.Reduction.NONE
        loss_fn = keras.losses.BinaryCrossentropy(from_logits=True, reduction=reduction)
        loss_real = loss_fn(tf.ones_like(d_real), d_real)
        loss_fake = loss_fn(tf.zeros_like(d_fake), d_fake)
        return (loss_real + loss_fake) / 2

    def cycle_loss_fn(real, cycled):
        return tf.reduce_mean(tf.abs(real - cycled))

    def identity_loss_fn(real, same):
        return tf.reduce_mean(tf.abs(real - same))

In [None]:
# Build GAN

class GAN(keras.Model):
    def __init__(self, 
               monet_g,
               photo_g,
               monet_d,
               photo_d,
               LAMBDA_CYCLE,
               LAMBDA_ID):
        super().__init__()
        self.monet_g = monet_g
        self.photo_g = photo_g
        self.monet_d = monet_d
        self.photo_d = photo_d
        self.LAMBDA_CYCLE = LAMBDA_CYCLE
        self.LAMBDA_ID = LAMBDA_ID

    def compile(self,
              monet_g_opt,
              photo_g_opt,
              monet_d_opt,
              photo_d_opt,
              g_loss_fn,
              d_loss_fn,
              cycle_loss_fn,
              identity_loss_fn):
        super().compile()
        self.monet_g_opt = monet_g_opt
        self.photo_g_opt = photo_g_opt
        self.monet_d_opt = monet_d_opt
        self.photo_d_opt = photo_d_opt
        self.g_loss_fn = g_loss_fn
        self.d_loss_fn = d_loss_fn
        self.cycle_loss_fn = cycle_loss_fn
        self.identity_loss_fn = identity_loss_fn

    def train_step(self, batch_data):
        real_monet,real_photo = batch_data

        with tf.GradientTape(persistent=True) as tape:

            # Cycle
            fake_monet = self.monet_g(real_photo, training=True)
            cycle_photo = self.photo_g(fake_monet, training=True)
            fake_photo = self.photo_g(real_monet, training=True)
            cycle_monet = self.monet_g(fake_photo, training=True)

            # Identiy
            same_monet = self.monet_g(real_monet, training=True)
            same_photo = self.photo_g(real_photo, training=True)

            # Discriminator outputs
            d_real_monet = self.monet_d(real_monet)
            d_fake_monet = self.monet_d(fake_monet)
            d_real_photo = self.photo_d(real_photo)
            d_fake_photo = self.photo_d(fake_photo)

            # Discriminators' loss
            monet_d_loss = self.d_loss_fn(d_real_monet, d_fake_monet)
            photo_d_loss = self.d_loss_fn(d_real_photo, d_fake_photo)

            # Cycle loss
            monet_cycle_loss = self.cycle_loss_fn(real_monet, cycle_monet) * self.LAMBDA_CYCLE
            photo_cycle_loss = self.cycle_loss_fn(real_photo, cycle_photo) * self.LAMBDA_CYCLE

            # Identity loss
            monet_id_loss = (self.identity_loss_fn(real_monet, same_monet)*
                           self.LAMBDA_CYCLE * self.LAMBDA_ID)
            photo_id_loss = (self.identity_loss_fn(real_photo, same_photo)*
                           self.LAMBDA_CYCLE * self.LAMBDA_ID)

            # Adverserial loss
            monet_g_loss = self.g_loss_fn(d_fake_monet)
            photo_g_loss = self.g_loss_fn(d_fake_photo)

            # Generators' loss
            monet_g_total_loss = monet_cycle_loss + monet_id_loss + monet_g_loss
            photo_g_total_loss = photo_cycle_loss + photo_id_loss + photo_g_loss

        # Get the gradients
        monet_g_gradients = tape.gradient(monet_g_total_loss, monet_g.trainable_variables)
        photo_g_gradients = tape.gradient(photo_g_total_loss, photo_g.trainable_variables)
        monet_d_gradients = tape.gradient(monet_d_loss, monet_d.trainable_variables)
        photo_d_gradients = tape.gradient(photo_d_loss, photo_d.trainable_variables)

        # Apply the gradients
        self.monet_g_opt.apply_gradients(zip(monet_g_gradients, monet_g.trainable_variables))
        self.photo_g_opt.apply_gradients(zip(photo_g_gradients, photo_g.trainable_variables))
        self.monet_d_opt.apply_gradients(zip(monet_d_gradients, monet_d.trainable_variables))
        self.photo_d_opt.apply_gradients(zip(photo_d_gradients, photo_d.trainable_variables))

        return {
            'monet_g_loss': monet_g_total_loss,
            'photo_g_loss': photo_g_total_loss,
            'monet_d_loss': monet_d_loss,
            'photo_d_loss': photo_d_loss
        }





In [None]:
with strategy.scope():

    # Prepare models
    monet_g = build_resnet_generator()
    photo_g = build_resnet_generator()
    monet_d = build_discriminator()
    photo_d = build_discriminator()

    # Prepare optimizers
    monet_g_opt = keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5)
    photo_g_opt = keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5)
    monet_d_opt = keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5)
    photo_d_opt = keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5)


In [None]:
with strategy.scope():

    # Prepare GAN
    gan = GAN(
        monet_g,
        photo_g,
        monet_d,
        photo_d,
        LAMBDA_CYCLE=10,
        LAMBDA_ID=0.5,
    )

    gan.compile(
        monet_g_opt,
        photo_g_opt,
        monet_d_opt,
        photo_d_opt,
        g_loss_fn,
        d_loss_fn,
        cycle_loss_fn=cycle_loss_fn,
        identity_loss_fn=identity_loss_fn,
    )

# Training

In [None]:
# Checkpointer
checkpoint_path = '/kaggle/working/model_weights.h5'
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path, 
                                                         monitor='monet_g_loss', 
                                                         save_best_only=True)

earlystopping = tf.keras.callbacks.EarlyStopping(monitor='monet_g_loss', patience=3)

In [None]:
# Training
dataset = tf.data.Dataset.zip((monet_ds, photo_ds))
epochs = 25 
print('# epochs: ', epochs)
gan.fit(
    dataset,
    epochs=epochs,
    #callbacks=[checkpoint_callback, earlystopping],# for debugging
)

# Visualize our Monet-esque photos

In [None]:
_, ax = plt.subplots(5, 2, figsize=(12, 12))
for i,img in enumerate(photo_ds.take(5)):
    prediction = monet_g(img)[0].numpy()
    prediction = (prediction * 127.5 + 127.5).astype(np.uint8)
    img = (img[0] * 127.5 + 127.5).numpy().astype(np.uint8)

    ax[i, 0].imshow(img)
    ax[i, 1].imshow(prediction)
    ax[i, 0].set_title("Input Photo")
    ax[i, 1].set_title("Monet-esque")
    ax[i, 0].axis("off")
    ax[i, 1].axis("off")
plt.show()

# Create submission file

In [None]:
import PIL
! mkdir ../images

In [None]:
i = 1
for img in photo_ds:
    prediction = monet_g(img, training=False)[0].numpy()
    prediction = (prediction * 127.5 + 127.5).astype(np.uint8)
    im = PIL.Image.fromarray(prediction)
    im.save("../images/" + str(i) + ".jpg")
    i += 1

In [None]:
import shutil
shutil.make_archive("/kaggle/working/images", 'zip', "/kaggle/images")