# I’m Something of a Painter Myself

In this project I explore with Generative AI to convert photos to Monet styled paintings. I chose to experiment with the CycleGAN architecture after seeing it's performance on the leaderboards.

In [None]:
import tensorflow as tf
import keras
from keras import Sequential

from tensorflow import keras
from tensorflow.keras import layers, Sequential, regularizers
import tensorflow_addons as tfa

from kaggle_datasets import KaggleDatasets
import matplotlib.pyplot as plt
import numpy as np
import PIL
import os

path = KaggleDatasets().get_gcs_path()

monet_files = tf.io.gfile.glob(str(path)+"/monet_tfrec/*.tfrec")
photo_files = tf.io.gfile.glob(str(path)+"/photo_tfrec/*.tfrec")

strategy = tf.distribute.get_strategy()
AUTOTUNE = tf.data.experimental.AUTOTUNE

print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

## Normalize and Parse Data

In [None]:
image_dimensions = (256, 256)

def normalize_image(img):
    decoded_img = tf.io.decode_jpeg(img)
    normalized_img = (tf.cast(decoded_img, tf.float32) / 127.5) - 1
    reshaped_img = tf.reshape(normalized_img, [*image_dimensions, 3])
    return reshaped_img

def extract_image(img):
    tfrecord_format = {
        "image_name": tf.io.FixedLenFeature([], tf.string),
        "image": tf.io.FixedLenFeature([], tf.string),
        "target": tf.io.FixedLenFeature([], tf.string)
    }
    parsed_img = tf.io.parse_single_example(img, tfrecord_format)
    output = normalize_image(parsed_img["image"])
    return output

def load_dataset(filenames):#, labeled=True, ordered=False):
    dataset = tf.data.TFRecordDataset(filenames) # creates a tfrecord dataset from the files
    dataset = dataset.map(extract_image, num_parallel_calls=AUTOTUNE)
    return dataset

In [None]:
monet_ds = load_dataset(monet_files).batch(1)
photo_ds = load_dataset(photo_files).batch(1)

example_monet = next(iter(monet_ds)) 
example_photo = next(iter(photo_ds))

plt.subplot(121)
plt.title('Photo')
plt.imshow(example_photo[0] * 0.5 + 0.5)

plt.subplot(122)
plt.title('Monet')
plt.imshow(example_monet[0] * 0.5 + 0.5)

In [None]:
l2_lambda = 0.01

def downsample(filter_size, kernel_size, strides, padding="valid", normalize=False, zeropadding=False):
    kernel_initializer = tf.random_normal_initializer(0, 0.02)
    gamma_initializer = keras.initializers.RandomNormal(0, 0.02)
    
    model = Sequential()

    model.add(layers.Conv2D(filter_size, kernel_size, strides, padding=padding, kernel_regularizer=regularizers.l2(l2_lambda)))
        
    if normalize:
        model.add(tfa.layers.InstanceNormalization(gamma_initializer=gamma_initializer))

    if zeropadding:       
        model.add(layers.ZeroPadding2D())
    
    model.add(layers.LeakyReLU())
    return model


def upsample(filter_size, kernel_size, strides, padding="valid", normalize=False, dropout=False, dropout_rate=0.5):
    kernel_initializer = tf.random_normal_initializer(0, 0.02)
    gamma_initializer = keras.initializers.RandomNormal(0, 0.02)
    
    model = Sequential()

    model.add(layers.Conv2DTranspose(filter_size, kernel_size, strides, padding=padding, kernel_regularizer=regularizers.l2(l2_lambda)))
    
    if normalize:
        model.add(tfa.layers.InstanceNormalization(gamma_initializer=gamma_initializer))
        
    if dropout:
        model.add(layers.Dropout(dropout_rate))
    
    model.add(layers.LeakyReLU())
    return model

In [None]:
parameters = {
    "down": {"filter_size":[64, 64, 128, 128, 256, 256, 512, 512], "kernel_size": [(3,3), (3,3), (3,3), (3,3), (3,3), (3,3), (3,3), (3,3)], "strides": [(2,2), (2,2), (2,2), (2,2), (2,2), (2,2), (2,2), (2,2)], "padding": ["same", "same", "same", "same", "same", "same", "same", "same"]},
    "up": {"filter_size":[512, 512, 256, 256, 128, 128, 64, 64], "kernel_size": [(3,3), (3,3), (3,3), (3,3), (3,3), (3,3), (3,3), (3,3)], "strides": [(2,2), (2,2), (2,2), (2,2), (2,2), (2,2), (2,2), (2,2)], "padding": ["same", "same", "same", "same", "same", "same", "same", "same"]}
}

In [None]:
def create_downsample_pipeline(n:int, parameters:dict, zeropadding, normalize):
    downsample_pipeline = []
    for i in range(n):
        downsample_pipeline.append(downsample(filter_size=parameters["filter_size"][i], kernel_size=parameters["kernel_size"][i], strides=parameters["strides"][i], padding=parameters["padding"][i], normalize=normalize, zeropadding=zeropadding))
    return downsample_pipeline


def create_upsample_pipeline(n:int, parameters:dict, n_dropouts:int, normalize):
    upsample_pipeline = []
    for i in range(n):
        if n_dropouts >= 1:
            upsample_pipeline.append(upsample(parameters["filter_size"][i], parameters["kernel_size"][i], parameters["strides"][i], parameters["padding"][i], dropout=True, normalize=normalize))
            n_dropouts -= 1
        elif n_dropouts < 1:   
            upsample_pipeline.append(upsample(parameters["filter_size"][i], parameters["kernel_size"][i], parameters["strides"][i], parameters["padding"][i], normalize=normalize))
    return upsample_pipeline

In [None]:
def Generator(n, downsample_parameters:dict, upsample_parameters:dict, n_dropouts, normalize, zeropadding):
    inputs=layers.Input(shape=[256, 256, 3])
    kernel_initializer=tf.random_normal_initializer(0, 0.02)
    
    downsample_pipeline = create_downsample_pipeline(n=n, parameters=parameters["down"], zeropadding=zeropadding, normalize=normalize)
    upsample_pipeline = create_upsample_pipeline(n=n, parameters=parameters["up"], n_dropouts=n_dropouts, normalize=normalize)
    
    skips = []
    x=inputs
    
    for down in downsample_pipeline:
        x = down(x)
        skips.append(x)
#         print("Downsample layer output shape:", x.shape)  # Debugging print statement

        
    skips = reversed(skips[:-1])
    
    for up, skip in zip(upsample_pipeline, skips):
        x = up(x)
#         print("Upsample layer output shape:", x.shape)  # Debugging print statement
#         print("Skip connection shape:", skip.shape)     # Debugging print statement
        x = layers.Concatenate()([x, skip])

    last_layer = layers.Conv2DTranspose(3, (3,3), strides=(2,2), padding="same", kernel_initializer=kernel_initializer, activation="tanh")(x)
    return tf.keras.Model(inputs=inputs, outputs=last_layer)

In [None]:
def Discriminator(n, downsample_parameters:dict, normalize, zeropadding):
    inputs=layers.Input(shape=[256, 256, 3], name="input_image")
    x=inputs
    
    gamma_initializer=tf.keras.initializers.RandomNormal(0, 0.02)
    kernel_initializer=tf.random_normal_initializer(0, 0.02)
    
    downsample_pipeline = create_downsample_pipeline(n, downsample_parameters, normalize=normalize, zeropadding=zeropadding)
    
    for down in downsample_pipeline:
        x=down(x)
    
    zero_pad1 = layers.ZeroPadding2D()(x)
    conv = layers.Conv2D(512, (3,3), strides=1,kernel_initializer=kernel_initializer,use_bias=False)(zero_pad1)
    norm1 = tfa.layers.InstanceNormalization(gamma_initializer=gamma_initializer)(conv)
    leaky_relu = layers.LeakyReLU()(norm1)
    zero_pad2 = layers.ZeroPadding2D()(leaky_relu)
    last = layers.Conv2D(1, (3,3), strides=1,kernel_initializer=kernel_initializer)(zero_pad2) 
    return tf.keras.Model(inputs=inputs, outputs=last)
    
#     model = Sequential()
#     model.add(layers.Conv2D(, kernel_size, strides=strides, padding=padding, kernel_initializer=initializer, activation=last_activation))
#     model.add(tfa.layers.InstanceNormalization(gamma_initializer=gamma_initializer))
#     if zero_padding:         
#         model.add(layers.ZeroPadding2D())
#     model.add(layers.LeakyReLU())
#     model.add(layers.Conv2D(1, last_kernel_size, last_strides, kernel_initializer=initializer)) 
#     outputs=model()(X3)
#     return Keras.Model(inputs=inputs, outputs=outputs)


In [None]:
generator_layers = 8
discriminator_layers = 8
n_dropouts = 3

with strategy.scope():
    monet_generator = Generator(n=generator_layers, n_dropouts=n_dropouts, downsample_parameters=parameters["down"], upsample_parameters=parameters["up"], normalize=True, zeropadding=False) # transforms photos to Monet-esque paintings
    photo_generator = Generator(n=generator_layers, n_dropouts=n_dropouts,downsample_parameters=parameters["down"], upsample_parameters=parameters["up"], normalize=True, zeropadding=False) # transforms Monet paintings to be more like photos

    monet_discriminator = Discriminator(n=discriminator_layers, downsample_parameters=parameters["down"], normalize=True, zeropadding=False) # differentiates real Monet paintings and generated Monet paintings
    photo_discriminator = Discriminator(n=discriminator_layers, downsample_parameters=parameters["down"], normalize=True, zeropadding=False) # differentiates real photos and generated photos
    
# 8, 8, 3

In [None]:
class CycleGan(keras.Model):
    def __init__(self,
        monet_generator,
        photo_generator,
        monet_discriminator,
        photo_discriminator,
        lambda_cycle=10,
    ):
        super(CycleGan, self).__init__()
        self.m_gen = monet_generator
        self.p_gen = photo_generator
        self.m_disc = monet_discriminator
        self.p_disc = photo_discriminator
        self.lambda_cycle = lambda_cycle
        
    def compile(
        self,
        m_gen_optimizer,
        p_gen_optimizer,
        m_disc_optimizer,
        p_disc_optimizer,
        gen_loss_fn,
        disc_loss_fn,
        cycle_loss_fn,
        identity_loss_fn
    ):
        super(CycleGan, self).compile()
        self.m_gen_optimizer = m_gen_optimizer
        self.p_gen_optimizer = p_gen_optimizer
        self.m_disc_optimizer = m_disc_optimizer
        self.p_disc_optimizer = p_disc_optimizer
        self.gen_loss_fn = gen_loss_fn
        self.disc_loss_fn = disc_loss_fn
        self.cycle_loss_fn = cycle_loss_fn
        self.identity_loss_fn = identity_loss_fn
        
    def train_step(self, batch_data):
        real_monet, real_photo = batch_data
        
        with tf.GradientTape(persistent=True) as tape:
            # photo to monet back to photo
            fake_monet = self.m_gen(real_photo, training=True)
            cycled_photo = self.p_gen(fake_monet, training=True)

            # monet to photo back to monet
            fake_photo = self.p_gen(real_monet, training=True)
            cycled_monet = self.m_gen(fake_photo, training=True)

            # generating itself
            same_monet = self.m_gen(real_monet, training=True)
            same_photo = self.p_gen(real_photo, training=True)
            

            # discriminator used to check, inputing real images
            disc_real_monet = self.m_disc(real_monet, training=True)
            disc_real_photo = self.p_disc(real_photo, training=True)

            # discriminator used to check, inputing fake images
            disc_fake_monet = self.m_disc(fake_monet, training=True)
            disc_fake_photo = self.p_disc(fake_photo, training=True)

            # evaluates generator loss
            monet_gen_loss = self.gen_loss_fn(disc_fake_monet)
            photo_gen_loss = self.gen_loss_fn(disc_fake_photo)

            # evaluates total cycle consistency loss
            total_cycle_loss = self.cycle_loss_fn(real_monet, cycled_monet, self.lambda_cycle) + self.cycle_loss_fn(real_photo, cycled_photo, self.lambda_cycle)

            # evaluates total generator loss
            total_monet_gen_loss = monet_gen_loss + total_cycle_loss + self.identity_loss_fn(real_monet, same_monet, self.lambda_cycle)
            total_photo_gen_loss = photo_gen_loss + total_cycle_loss + self.identity_loss_fn(real_photo, same_photo, self.lambda_cycle)

            # evaluates discriminator loss
            monet_disc_loss = self.disc_loss_fn(disc_real_monet, disc_fake_monet)
            photo_disc_loss = self.disc_loss_fn(disc_real_photo, disc_fake_photo)

        # Calculate the gradients for generator and discriminator
        monet_generator_gradients = tape.gradient(total_monet_gen_loss,
                                                  self.m_gen.trainable_variables)
        photo_generator_gradients = tape.gradient(total_photo_gen_loss,
                                                  self.p_gen.trainable_variables)

        monet_discriminator_gradients = tape.gradient(monet_disc_loss,
                                                      self.m_disc.trainable_variables)
        photo_discriminator_gradients = tape.gradient(photo_disc_loss,
                                                      self.p_disc.trainable_variables)

        # Apply the gradients to the optimizer
        self.m_gen_optimizer.apply_gradients(zip(monet_generator_gradients,
                                                 self.m_gen.trainable_variables))

        self.p_gen_optimizer.apply_gradients(zip(photo_generator_gradients,
                                                 self.p_gen.trainable_variables))

        self.m_disc_optimizer.apply_gradients(zip(monet_discriminator_gradients,
                                                  self.m_disc.trainable_variables))

        self.p_disc_optimizer.apply_gradients(zip(photo_discriminator_gradients,
                                                  self.p_disc.trainable_variables))
        
        return {
            "monet_gen_loss": total_monet_gen_loss,
            "photo_gen_loss": total_photo_gen_loss,
            "monet_disc_loss": monet_disc_loss,
            "photo_disc_loss": photo_disc_loss
        }

In [None]:
with strategy.scope():
    def discriminator_loss(real, generated):
        real_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.ones_like(real), real)
        generated_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.zeros_like(generated), generated)
        total_disc_loss = real_loss + generated_loss
        return total_disc_loss * 0.5
    
    
with strategy.scope():
    def generator_loss(generated):
        return tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(tf.ones_like(generated), generated)
    
    
with strategy.scope():
    def calc_cycle_loss(real_image, cycled_image, LAMBDA):
        loss1 = tf.reduce_mean(tf.abs(real_image - cycled_image))
        return LAMBDA * loss1
    
    
with strategy.scope():
    def identity_loss(real_image, same_image, LAMBDA):
        loss = tf.reduce_mean(tf.abs(real_image - same_image))
        return LAMBDA * 0.5 * loss
    
lr_schedule = keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=2e-4,
    decay_steps=100000,
    decay_rate=0.96)    

with strategy.scope():
    monet_generator_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule, beta_1=0.5)
    photo_generator_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule, beta_1=0.5)

    monet_discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule, beta_1=0.5)
    photo_discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule, beta_1=0.5)

In [None]:
with strategy.scope():
    cycle_gan_model = CycleGan(
        monet_generator, photo_generator, monet_discriminator, photo_discriminator)

    cycle_gan_model.compile(
        m_gen_optimizer = monet_generator_optimizer,
        p_gen_optimizer = photo_generator_optimizer,
        m_disc_optimizer = monet_discriminator_optimizer,
        p_disc_optimizer = photo_discriminator_optimizer,
        gen_loss_fn = generator_loss,
        disc_loss_fn = discriminator_loss,
        cycle_loss_fn = calc_cycle_loss,
        identity_loss_fn = identity_loss)

In [None]:
cycle_gan_model.fit(
    tf.data.Dataset.zip((monet_ds, photo_ds)),
    epochs=10)

In [None]:
_, ax = plt.subplots(5, 2, figsize=(12, 12))
for i, img in enumerate(photo_ds.take(5)):
    prediction = monet_generator(img, training=False)[0].numpy()
    prediction = (prediction * 127.5 + 127.5).astype(np.uint8)
    img = (img[0] * 127.5 + 127.5).numpy().astype(np.uint8)

    ax[i, 0].imshow(img)
    ax[i, 1].imshow(prediction)
    ax[i, 0].set_title("Input Photo")
    ax[i, 1].set_title("Monet-esque")
    ax[i, 0].axis("off")
    ax[i, 1].axis("off")
plt.show()

In [None]:
import os
import numpy as np
import PIL

! mkdir ../images
i = 1
for img in fast_photo_ds:
    prediction = monet_generator.predict(img)
    prediction = (prediction * 127.5 + 127.5).astype(np.uint8)
    for pred in prediction:
        im = PIL.Image.fromarray(pred)
        im.save("../images/" + str(i) + ".jpg")
        i += 1

import shutil
shutil.make_archive("/kaggle/working/images", 'zip', "/kaggle/images")


In [None]:
if os.path.exists("images.zip"):
    print("Zip file exists.")

# Conclusion

In conclusion this was the most difficult project for me as I am not familiar with CycleGAN and had to learn a lot about the architecture. I am not happy with my results but due to the time constraints I cannot iterate any further on this project. It seems that GANs or at least CycleGAN is extremely sensitive. Slight changes to the architecture drastically affected the results (such as +/- 1 Dropout layer). 

I am super interested in GANs after this project and will definitely continue to learn more in this space.