In [1]:
import numpy as np 
import pandas as pd
import os
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow import keras
from tensorflow.keras import layers

In [2]:
monet_pics_path = tf.io.gfile.glob("/kaggle/input/gan-getting-started/monet_tfrec/*.tfrec")
photo_pics_path = tf.io.gfile.glob("/kaggle/input/gan-getting-started/photo_tfrec/*.tfrec")
image_shape = [256,256]

In [3]:
monet_pics_path

['/kaggle/input/gan-getting-started/monet_tfrec/monet12-60.tfrec',
 '/kaggle/input/gan-getting-started/monet_tfrec/monet16-60.tfrec',
 '/kaggle/input/gan-getting-started/monet_tfrec/monet08-60.tfrec',
 '/kaggle/input/gan-getting-started/monet_tfrec/monet04-60.tfrec',
 '/kaggle/input/gan-getting-started/monet_tfrec/monet00-60.tfrec']

In [4]:
features = {"image":tf.io.FixedLenFeature([],tf.string)} # configire the parsing one into fixed lenght
def read_tfrecord(images):
    image_data = tf.io.parse_single_example(images,features) # serlize the data
    image = image_data["image"]
    image = tf.image.decode_jpeg(image,channels=3)
    image = (tf.cast(image,tf.float32)/127.5) - 1
    image = tf.reshape(image,[*image_shape,3])
    return image

In [5]:
AUTOTUNE = tf.data.experimental.AUTOTUNE
monet_dataset = tf.data.TFRecordDataset(monet_pics_path).map(read_tfrecord, num_parallel_calls=AUTOTUNE).batch(1)
photo_dataset = tf.data.TFRecordDataset(photo_pics_path).map(read_tfrecord, num_parallel_calls=AUTOTUNE).batch(1)

In [6]:
# TO check number of images
print( len(list(iter(monet_dataset))))
print( len(list(iter(photo_dataset))))

300
7038


In [7]:
something = next(iter(monet_dataset))
something.shape

TensorShape([1, 256, 256, 3])

In [8]:
something = next(iter(monet_dataset))
something.shape

TensorShape([1, 256, 256, 3])

In [9]:
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow import keras
from tensorflow.keras import layers


In [10]:
output_channels = 3
class cyclic_gans(keras.Model):
    def __init__(self):
        super(cyclic_gans,self).__init__()
        self.monet_genrator = self.make_genrator()
        self.photo_genrator = self.make_genrator()
        self.monet_dicriminator = self.make_discriminator()
        self.photo_dicriminator = self.make_discriminator()
        self.lambda_cycles = 10
    def compile(self):
        super(cyclic_gans,self).compile()  
        self.monet_genrator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
        self.photo_genrator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
        self.monet_dicriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
        self.photo_dicriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
    def create_downsmapler(self,filters,size,apply_instace_norm=True):
        model = keras.Sequential()
        model.add(layers.Conv2D(filters,size,strides=2,padding="same",use_bias=False,kernel_initializer = tf.random_normal_initializer(0.0,0.02)))
        if apply_instace_norm:
            model.add(tfa.layers.InstanceNormalization(gamma_initializer=keras.initializers.RandomNormal(mean=0.0, stddev=0.02)))
        model.add(layers.LeakyReLU())
        return model
    def create_upsampler(self,filters,size,add_dropout=True):
        model = keras.Sequential()
        model.add(layers.Conv2DTranspose(filters,size,strides=2,padding="same",use_bias=True,kernel_initializer = tf.random_normal_initializer(0.0,0.02)))
        model.add(tfa.layers.InstanceNormalization(gamma_initializer=keras.initializers.RandomNormal(mean=0.0, stddev=0.02)))
        if add_dropout:
                  model.add(layers.Dropout(0.5))
        model.add(layers.LeakyReLU())
        return model 
    def make_genrator(self):
        down_samplers = [
            self.create_downsmapler(64,4,apply_instace_norm=False),
            self.create_downsmapler(128,4),
            self.create_downsmapler(256,4),
            self.create_downsmapler(512,4),
            self.create_downsmapler(512,4),
            self.create_downsmapler(512,4),
            self.create_downsmapler(512,4),
            self.create_downsmapler(512,4)
        ]
        up_samplers = [
            self.create_upsampler(512,4,add_dropout=True),
            self.create_upsampler(512,4,add_dropout=True),
            self.create_upsampler(512,4,add_dropout=True),
            self.create_upsampler(512,4),
            self.create_upsampler(256,4),
            self.create_upsampler(128,4),
            self.create_upsampler(64,4),
        ]
        input_layers = layers.Input(shape=[256,256,3])
        x =  input_layers
        skips = []          
        for downsampler in down_samplers:
                  x = downsampler(x)
                  skips.append(x)
        skips = reversed(skips[:-1])          
        for upsampler,skip_layer in zip(up_samplers,skips):
                  x = upsampler(x)
                  x = layers.Concatenate()([x,skip_layer])
        last_layer = layers.Conv2DTranspose(output_channels,4,strides=2,padding="same",kernel_initializer = tf.random_normal_initializer(0.0,0.02),activation="tanh")       
        x = last_layer(x)
        return keras.Model(inputs=input_layers,outputs=x)
    def make_discriminator(self):
        input_layer = layers.Input(shape=[256,256,3],name="input_image")
        x = input_layer
        downsampler1 = self.create_downsmapler(64,4,False)(x)
        downsampler2 = self.create_downsmapler(128,4)(downsampler1)
        downsampler3 = self.create_downsmapler(256,4)(downsampler2)
        zero_padd1 = layers.ZeroPadding2D()(downsampler3)
        conv_layer = layers.Conv2D(512,4,strides=1,use_bias=False,kernel_initializer = tf.random_normal_initializer(0.0,0.02))(zero_padd1)
        norm_layer1 = tfa.layers.InstanceNormalization(gamma_initializer=keras.initializers.RandomNormal(mean=0.0, stddev=0.02))(conv_layer)
        leaky_layer = layers.LeakyReLU()(norm_layer1)
        zero_pad2 = layers.ZeroPadding2D()(leaky_layer)
        last_layers = layers.Conv2D(1,4,strides=1,kernel_initializer = tf.random_normal_initializer(0.0,0.02))(zero_pad2)
        return keras.Model(inputs=input_layer,outputs=last_layers)
    def descriminator_loss_fn(self,real,fake):
        real_loss = tf.keras.losses.BinaryCrossentropy(from_logits = True,reduction=tf.keras.losses.Reduction.NONE)(tf.ones_like(real),real)
        fake_loss = tf.keras.losses.BinaryCrossentropy(from_logits = True,reduction=tf.keras.losses.Reduction.NONE)(tf.ones_like(fake),fake)
        return (real_loss+fake_loss)/2
    def genrator_loss_fn(self,genrated_image):
        return tf.keras.losses.BinaryCrossentropy(from_logits = True,reduction=tf.keras.losses.Reduction.NONE)(tf.ones_like(genrated_image),genrated_image)
    def cycle_loss_fn(self,image,cycled_image,lambda_image):
        return tf.reduce_mean(tf.abs(image-cycled_image))*lambda_image
    def identiy_loss_fn(self,real_photo,photo,lambda_image):
        return tf.reduce_mean(tf.abs(real_photo-photo))*lambda_image/2
    def train_step(self,batch_data):
        real_monet,real_photot = batch_data
        with tf.GradientTape(persistent=True) as tape:
            fake_monet = self.monet_genrator(real_photot,training=True)
            cycled_photo = self.photo_genrator(fake_monet,training=True)
            fake_photo = self.photo_genrator(real_monet,training=True)
            cycled_monet = self.monet_genrator(fake_photo,training=True)
            
            monet1 = self.monet_genrator(real_monet,training=True)
            phtot1 = self.photo_genrator(real_photot,training=True)
            
            monet_real_discriminated = self.monet_dicriminator(real_monet,training=True)
            monet_fake_discriminated = self.monet_dicriminator(fake_monet,training=True)
            photo_real_discriminated = self.photo_dicriminator(real_photot,training=True)
            photo_fake_discriminated = self.photo_dicriminator(fake_photo,training=True)
            
            monet_loss = self.genrator_loss_fn(monet_fake_discriminated)
            photo_loss = self.genrator_loss_fn(photo_fake_discriminated)
            
            cycle_loss = self.cycle_loss_fn(real_monet,cycled_monet,self.lambda_cycles) + self.cycle_loss_fn(real_photot,cycled_photo,self.lambda_cycles)
            
            total_monet_genrator_loss = monet_loss + cycle_loss + self.identiy_loss_fn(real_monet,monet1, self.lambda_cycles)
            total_photo_genrator_loss = photo_loss + cycle_loss + self.identiy_loss_fn(real_photot,phtot1, self.lambda_cycles)
            
            monet_discriminator_loss = self.descriminator_loss_fn(monet_real_discriminated,monet_fake_discriminated)
            photo_discriminator_loss = self.descriminator_loss_fn(photo_real_discriminated,photo_fake_discriminated)
        monet_genrator_gradient = tape.gradient(total_monet_genrator_loss,self.monet_genrator.trainable_variables)
        photo_genrator_gradient = tape.gradient(total_photo_genrator_loss,self.photo_genrator.trainable_variables)
        monet_disr_gradient = tape.gradient(monet_discriminator_loss,self.monet_dicriminator.trainable_variables)
        photo_disr_gradient = tape.gradient(photo_discriminator_loss, self.photo_dicriminator.trainable_variables)
        self.monet_genrator_optimizer.apply_gradients(zip(monet_genrator_gradient,self.monet_genrator.trainable_variables))
        self.photo_dicriminator_optimizer.apply_gradients(zip(photo_disr_gradient,self.photo_dicriminator.trainable_variables))
        self.photo_genrator_optimizer.apply_gradients(zip(photo_genrator_gradient,self.photo_genrator.trainable_variables))
        self.monet_dicriminator_optimizer.apply_gradients(zip(monet_disr_gradient,self.monet_dicriminator.trainable_variables))
        return {
            "monet_generator_loss": total_monet_genrator_loss,
            "photo_generator_loss": total_photo_genrator_loss,
            "monet_discriminator_loss": monet_discriminator_loss,
            "photo_discriminator_loss": photo_discriminator_loss,
        }

In [11]:
cycle_gan = cyclic_gans()
cycle_gan.compile()

In [12]:
cycle_gan.fit(
    tf.data.Dataset.zip((monet_dataset.repeat(-1), photo_dataset.repeat(-1))),
    steps_per_epoch=300,
    epochs=30
)

Epoch 1/30


2023-03-22 19:40:09.403666: E tensorflow/core/grappler/optimizers/meta_optimizer.cc:954] layout failed: INVALID_ARGUMENT: Size of values 0 does not match size of permutation 4 @ fanin shape inmodel/sequential_8/dropout/dropout/SelectV2-2-TransposeNHWCToNCHW-LayoutOptimizer


Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


<keras.callbacks.History at 0x7f775f60c550>

In [19]:
from io import BytesIO
from PIL import Image
from zipfile import ZipFile
import numpy as np


with ZipFile("images.zip",mode="w") as zipfiles:
    i = 1
    for img in photo_dataset:
        genrated_image_cycle = cycle_gan.monet_genrator(img,training=False)[0].numpy()
        scaled_img = (genrated_image_cycle*127.5+127.5).astype(np.uint8)
        with BytesIO() as image_bytes:
            Image.fromarray(scaled_img).save(image_bytes, 'JPEG')
            image_bytes.seek(0)
            zipfiles.writestr('{}.jpg'.format(i), image_bytes.read())
            i += 1