In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models , initializers
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import matplotlib.pyplot as plt
from tqdm import tqdm
from tensorflow.keras.optimizers import legacy
import pandas as pd
import random
import numpy as np
import cv2
import matplotlib.pyplot as plt
import os
import shutil


# load data

In [None]:
def load_images_from_dir(directory, target_size=(320, 320)):
    image_list = []
    for filename in os.listdir(directory):
        if filename.endswith(".jpg") or filename.endswith(".png"):
            image_path = os.path.join(directory, filename)
            # Load the image using OpenCV
            image = cv2.imread(image_path)
            if image is None:
                print(f"Warning: Unable to load image '{filename}'")
                continue
            
            # Convert BGR to RGB format
            image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            
            # Resize the image
            resized_image = cv2.resize(image_rgb, target_size)
            image_list.append(resized_image)
    return image_list



In [None]:
directory_path = "data/monet_jpg/"
monet_image_list = load_images_from_dir(directory_path)
print("Number of images loaded:", len(monet_image_list))

In [None]:
directory_path = "data/photo_jpg/"
image_list = load_images_from_dir(directory_path)
print("Number of images loaded:", len(image_list))

# EDA images

In [None]:
def plot_random_images(image_list):
    # Ensure that we have at least 5 images
    if len(image_list) < 5:
        print("Error: Insufficient number of images.")
        return
    
    # Choose 5 random indices
    random_indices = random.sample(range(len(image_list)), 5)
    
    # Plot the images
    fig, axes = plt.subplots(1, 5, figsize=(15, 5))
    for i, idx in enumerate(random_indices):
        axes[i].imshow(image_list[idx])  # Convert BGR to RGB for Matplotlib
        axes[i].axis('off')
        axes[i].set_title(f"Image {idx}")
    plt.show()


In [None]:
plot_random_images(image_list)

In [None]:
plot_random_images(monet_image_list)

# cleaning the data

In [None]:
def has_white_corner(image):
    # Convert image to grayscale
    gray_image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
    
    # Define the size of the corner region
    corner_size = 5
    
    # Define the four corners
    corners = [
        gray_image[:corner_size, :corner_size],                # Top-left corner
        gray_image[:corner_size, -corner_size:],              # Top-right corner
        gray_image[-corner_size:, :corner_size],              # Bottom-left corner
        gray_image[-corner_size:, -corner_size:]              # Bottom-right corner
    ]
    
    # Check if all corners are completely white
    return all(np.all(corner > 220) for corner in corners)

def crop_and_resize(image, target_size=(320, 320)):
    # Get the dimensions of the input image
    height, width = image.shape[:2]
    
    # Calculate the center coordinates
    center_x = width // 2
    center_y = height // 2
    
    # Calculate the crop boundaries
    crop_left = max(0, center_x - 215 // 2)
    crop_top = max(0, center_y - 215 // 2)
    crop_right = min(width, center_x + 215 // 2)
    crop_bottom = min(height, center_y + 215 // 2)
    
    # Crop the image
    cropped_image = image[crop_top:crop_bottom, crop_left:crop_right]
    
    # Resize the cropped image
    resized_image = cv2.resize(cropped_image, target_size)
    
    return resized_image

def show_images_with_white_corner(images):
    for idx, image in enumerate(images):
        if has_white_corner(image):
            cropped_resized_image = crop_and_resize(image)
            plt.imshow(image)
            plt.title(f"Image {idx}")
            plt.axis('off')
            plt.show()

            plt.imshow(cropped_resized_image)
            plt.title(f"Image {idx}")
            plt.axis('off')
            plt.show()


show_images_with_white_corner(monet_image_list)

In [None]:
def change_images_with_white_corner(images):
    new_images = []
    for idx, image in enumerate(images):
        if has_white_corner(image):
            new_images.append(crop_and_resize(image))
        else:
            new_images.append(image)

    return new_images

monet_image_list = change_images_with_white_corner(monet_image_list)

# AUGMENTATION

In [None]:
def split_and_resize_quarters(original_image):
    # Convert PIL Image to NumPy array
    original_image_array = np.array(original_image)
    
    # Ensure the image is 320x320
    if original_image_array.shape[:2] != (320, 320):
        print("Error: Image size must be 320x320.")
        return None
    
    
    # Split the image into quarters
    quarters = []
    for i in range(2):
        for j in range(2):
            left = j * 160
            upper = i * 160
            right = left + 160
            lower = upper + 160
            quarter = original_image_array[upper:lower, left:right, :]
            quarters.append(quarter)
    
    # Resize each quarter to 320x320
    resized_quarters = [cv2.resize(quarter, (320, 320)) for quarter in quarters]
    
    return original_image , resized_quarters

In [None]:
original_image, quarters = split_and_resize_quarters(monet_image_list[2])
if quarters:
    # Plot original image
    plt.figure(figsize=(8, 8))
    plt.imshow(original_image)
    plt.axis('off')
    plt.title("Original Image")
    plt.show()
    
    # Plot each quarter
    fig, axes = plt.subplots(2, 2, figsize=(10, 10))
    for i, quarter in enumerate(quarters):
        axes[i//2, i%2].imshow(quarter)
        axes[i//2, i%2].axis('off')
        axes[i//2, i%2].set_title(f"Quarter {i+1}")
    plt.show()

In [None]:
def get_images_with_high_color_variance(images, threshold):
    high_color_var_images = []
    
    for image in images:
        # Convert image to numpy array
        image_array = np.array(image)
        
        # Calculate color variance
        color_variance = np.var(image_array)
        # Check if color variance is higher than the threshold
        if color_variance > threshold:
            high_color_var_images.append(image)
    
    return high_color_var_images

In [None]:
most_colorful_image = get_images_with_high_color_variance(quarters , 2500)
if most_colorful_image:
    # Plot each quarter
    fig, axes = plt.subplots(2, 2, figsize=(10, 10))
    for i, quarter in enumerate(most_colorful_image):
        axes[i//2, i%2].imshow(quarter)
        axes[i//2, i%2].axis('off')
    plt.show()

In [None]:
def mirror_flip(image):
    # Mirror flip the image horizontally
    flipped_image = cv2.flip(image, 1)
    return flipped_image


def aug_pipeline(images, threshold):
    augmented_images = []
    
    for image in images:
        # Split into quarters
        _ , quarters = split_and_resize_quarters(image)
        
        # Save only important quarters
        important_quarters = get_images_with_high_color_variance(quarters, threshold)

        # Mirror flip the original image
        mirrored_image = mirror_flip(image)
                
        # Combine original image, mirrored image, and important quarters
        augmented_images.append(image)
        augmented_images.append(mirrored_image)
        if len(important_quarters) != 0:
            augmented_images.extend(important_quarters)
    
    return augmented_images

In [None]:
print(len(monet_image_list))
augmented_images = aug_pipeline(monet_image_list, 2500)
print(len(augmented_images))
plot_random_images(augmented_images)

# save the data in batch for model

In [None]:
# for debug
# augmented_images = augmented_images[:50]
# image_list = image_list[:100]

In [None]:
def decode_image(image):
    image = (tf.cast(image, tf.float32) / 127.5) - 1
    image = tf.reshape(image, [320,320, 3])
    return image


def encode_image(image):
    # Reverse normalization: Convert pixel values from range [-1, 1] to [0, 255]
    image = (image + 1) * 127.5
    
    # Ensure pixel values are within valid range [0, 255]
    image = tf.clip_by_value(image, 0, 255)
    
    # Convert pixel values to uint8 data type
    image = tf.cast(image, tf.uint8)
    
    return image



In [None]:
image_list = random.sample(image_list, len(augmented_images))

In [None]:
# Define batch size
BATCH_SIZE = 10 # need to select the batch size based on the gpu

monet_len, real_len = len(augmented_images), len(image_list)
decoded_monet_images = [decode_image(image) for image in augmented_images]
decoded_real_images = [decode_image(image) for image in image_list]
print(len(decoded_monet_images) ,monet_len )
print(len(decoded_real_images) , real_len)




# Correct the slicing for augmented_images to split into train, val, and test sets
train_monet = decoded_monet_images[:int(0.8 * monet_len)]
val_monet = decoded_monet_images[int(0.8 * monet_len):int(0.9 * monet_len)]
test_monet = decoded_monet_images[int(0.9 * monet_len):]

# Correct the slicing for image_list to split into train, val, and test sets
train_real = decoded_real_images[:int(0.8 * real_len)]
val_real = decoded_real_images[int(0.8 * real_len):int(0.9 * real_len)]
test_real = decoded_real_images[int(0.9 * real_len):]


# Convert lists to TensorFlow datasets
train_real = tf.data.Dataset.from_tensor_slices(train_real)
val_real = tf.data.Dataset.from_tensor_slices(val_real)
test_real = tf.data.Dataset.from_tensor_slices(test_real)

train_monet = tf.data.Dataset.from_tensor_slices(train_monet)
val_monet = tf.data.Dataset.from_tensor_slices(val_monet)
test_monet = tf.data.Dataset.from_tensor_slices(test_monet)



# Train Dataset
train_dataset = tf.data.Dataset.zip((train_real ,train_monet))
test_dataset = tf.data.Dataset.zip((test_real ,test_monet))
val_dataset = tf.data.Dataset.zip((val_real , val_monet))


# Optionally shuffle and batch the dataset
train_dataset = train_dataset.shuffle(buffer_size=len(train_real)).batch(BATCH_SIZE)
val_dataset = val_dataset.batch(BATCH_SIZE)
test_dataset = test_dataset.batch(BATCH_SIZE)

train_monet = train_monet.shuffle(buffer_size=len(train_monet)).batch(BATCH_SIZE)
train_real = train_real.shuffle(buffer_size=len(train_real)).batch(BATCH_SIZE)
val_monet = val_monet.shuffle(buffer_size=len(val_monet)).batch(BATCH_SIZE)
val_real = val_real.shuffle(buffer_size=len(val_real)).batch(BATCH_SIZE)

In [None]:
import matplotlib.pyplot as plt
import tensorflow as tf

def plot_images_from_batch(dataset, title, img_to_plot=10):
    # Fetch one batch of images
    for real_batch ,monet_batch in dataset.take(1):  # Just take one batch from the dataset
        plt.figure(figsize=(12, 8))

        for i in range(img_to_plot):
            # Plot Monet images
            plt.subplot(2, img_to_plot, i + 1)
            monet_image = encode_image(monet_batch[i])
            plt.imshow(monet_image)
            plt.title("Monet")
            plt.axis("off")

            # Plot Real images, adjust subplot index to move to the next row
            plt.subplot(2, img_to_plot, i + 1 + img_to_plot)
            real_image = encode_image(real_batch[i])
            plt.imshow(real_image)
            plt.title("Real")
            plt.axis("off")

        plt.suptitle(title)
        plt.show()

# Example usage:
plot_images_from_batch(train_dataset, "Monet and Real Images", img_to_plot=5)


# create the objects that neccessary for the training

In [None]:

def downsample(filters, size, apply_batchnorm=True):
  initializer = tf.random_normal_initializer(0., 0.02)

  result = tf.keras.Sequential()
  result.add(
      tf.keras.layers.Conv2D(filters, size, strides=2, padding='same',
                             kernel_initializer=initializer, use_bias=False))

  if apply_batchnorm:
    result.add(tf.keras.layers.BatchNormalization())

  result.add(tf.keras.layers.LeakyReLU())

  return result


sample, _ = next(iter(train_dataset))  # Assuming batch size of 1
sample_float32 = tf.cast(sample, tf.float32)

down_model = downsample(3, 4)
down_result = down_model(sample_float32)
print(down_result.shape)

In [None]:

def upsample(filters, size, apply_dropout=False):
  initializer = tf.random_normal_initializer(0., 0.02)

  result = tf.keras.Sequential()
  result.add(
    tf.keras.layers.Conv2DTranspose(filters, size, strides=2,
                                    padding='same',
                                    kernel_initializer=initializer,
                                    use_bias=False))

  result.add(tf.keras.layers.BatchNormalization())

  if apply_dropout:
      result.add(tf.keras.layers.Dropout(0.5))

  result.add(tf.keras.layers.ReLU())

  return result
     


up_model = upsample(3, 4)
up_result = up_model(down_result)
print (up_result.shape)

In [None]:


# Define Generator model
def Generator():
  inputs = tf.keras.layers.Input(shape=[320, 320, 3])
  
  down_stack = [
    downsample(80, 4, apply_batchnorm=False),  # (batch_size, 160, 160, 80)
    downsample(160, 4),  # (batch_size, 64, 64, 128)
    downsample(320, 4),  # (batch_size, 32, 32, 256)
    downsample(640, 4),  # (batch_size, 16, 16, 512)
    downsample(640, 4),  # (batch_size, 8, 8, 512)
    downsample(640, 4),  # (batch_size, 4, 4, 512)
    # downsample(640, 4),  # (batch_size, 2, 2, 512)
    # downsample(640, 4),  # (batch_size, 1, 1, 512)
  ]

  up_stack = [
    # upsample(640, 4, apply_dropout=True),  # (batch_size, 2, 2, 1024)
    # upsample(640, 4, apply_dropout=True),  # (batch_size, 4, 4, 1024)
    upsample(640, 4, apply_dropout=True),  # (batch_size, 8, 8, 1024)
    upsample(640, 4 ,apply_dropout=True),  # (batch_size, 16, 16, 1024)
    upsample(320, 4),  # (batch_size, 32, 32, 512)
    upsample(160, 4),  # (batch_size, 64, 64, 256)
    upsample(80, 4),  # (batch_size, 128, 128, 128)
  ]

  initializer = tf.random_normal_initializer(0., 0.02)
  last = tf.keras.layers.Conv2DTranspose(3, 4,
                                         strides=2,
                                         padding='same',
                                         kernel_initializer=initializer,
                                         activation='tanh')  # (batch_size, 256, 256, 3)



  x = inputs


  # Downsampling through the model
  skips = []
  for down in down_stack:
    x = down(x)
    skips.append(x)

  skips = reversed(skips[:-1])

  # Upsampling and establishing the skip connections
  for up, skip in zip(up_stack, skips):
    x = up(x)
    x = tf.keras.layers.Concatenate()([x, skip])

  x = last(x)
     


  return tf.keras.Model(inputs=inputs, outputs=x)

# Test the generator model with random input batch
def test_build_generator():


    # Build generator model
    generator = Generator()
    

    input_shape = (320, 320, 3)  # Example input shape
    # Generate a random batch of input images
    batch_size = 4
    random_input_batch = tf.random.uniform((batch_size,) + input_shape, minval=0, maxval=1)
    
    # Forward pass through the generator
    generated_images = generator(random_input_batch)
    print(generator.summary())

    # Check output shape
    output_shape = generated_images.shape
    expected_output_shape = (batch_size, 320, 320, 3)  # Output shape with batch dimension
    
    # Check if the output shape matches the expected output shape
    assert output_shape == expected_output_shape, "Output shape mismatch"
    print("Test passed!")

# Run the test
test_build_generator()

In [None]:
# Define Discriminator model
def Discriminator(input_nc=3, ndf=80, n_layers=3):
    inputs = tf.keras.layers.Input(shape=(320, 320, input_nc))
    x = inputs

    # Convolutional blocks
    for n in range(n_layers + 1):
        nf_mult = min(2**n, 5)
        x = tf.keras.layers.Conv2D(ndf * nf_mult, kernel_size=4, strides=2, padding='same')(x)
        x = tf.keras.layers.LeakyReLU(0.2)(x)
        if n < n_layers:
            x = tf.keras.layers.BatchNormalization()(x)

    # Output convolutional layer
    outputs = tf.keras.layers.Conv2D(1, kernel_size=4, strides=1, padding='same')(x)

    return tf.keras.Model(inputs=inputs, outputs=outputs)


# Test the discriminator model with random input batch
def test_build_discriminator():
    # Define input shape
    input_shape = (320, 320, 3)  # Example input shape

    # Build discriminator model
    discriminator = Discriminator(3)
    
    # Generate a random batch of input images
    batch_size = 4
    random_input_batch = tf.random.uniform((batch_size,) + input_shape, minval=0, maxval=1)
    
    # Forward pass through the discriminator
    discriminator_output = discriminator(random_input_batch)
    
    # Check output shape
    output_shape = discriminator_output.shape
    print(output_shape)
    print(discriminator.summary())
    expected_output_shape = (batch_size, 20, 20, 1)  # Output shape with batch dimension
    
    # Check if the output shape matches the expected output shape
    assert output_shape == expected_output_shape, "Output shape mismatch"
    print("Test passed!")

# Run the test
test_build_discriminator()

# step 1

In [None]:
import tensorflow as tf
import pandas as pd

# Define the optimizer
optimizer = legacy.Adam()

# Define the loss function (e.g., Mean Squared Error)
loss_function = tf.keras.losses.MeanSquaredError()

# Define a function for training a single model
def train_model(model, train_dataset, val_dataset, optimizer, loss_function, num_epochs):
    # Compile the model
    model.compile(optimizer=optimizer, loss=loss_function)
    
    train_losses = []
    val_losses = []
    
    for epoch in range(num_epochs):
        train_epoch_loss = 0
        val_epoch_loss = 0
        
        # Training loop
        for batch in tqdm(train_dataset, desc=f'Epoch {epoch}'):
            with tf.GradientTape() as tape:
                # Forward pass
                generated_images = model(batch, training=True)
                # Compute loss
                loss = loss_function(batch, generated_images)
            # Compute gradients
            gradients = tape.gradient(loss, model.trainable_variables)
            # Update weights
            optimizer.apply_gradients(zip(gradients, model.trainable_variables))
            # Accumulate training epoch loss
            train_epoch_loss += loss
            
        # Validation loop
        for val_batch in tqdm(val_dataset, desc=f'Epoch {epoch}'):
            val_generated_images = model(val_batch, training=False)
            val_loss = loss_function(val_batch, val_generated_images)
            val_epoch_loss += val_loss
        
        # Average epoch losses
        train_epoch_loss /= len(train_dataset)
        val_epoch_loss /= len(val_dataset)
        
        # Append epoch losses
        train_losses.append(train_epoch_loss.numpy())
        val_losses.append(val_epoch_loss.numpy())
        
        # Print epoch losses
        print(f'Epoch {epoch + 1}, Training Loss: {train_epoch_loss.numpy()}, Validation Loss: {val_epoch_loss.numpy()}')
    
    # Create a DataFrame of losses
    losses_df = pd.DataFrame({'Epoch': range(1, num_epochs + 1), 'Training Loss': train_losses, 'Validation Loss': val_losses})
    
    return losses_df

num_epochs = 20
# Train monet_gen
print("Training monet_gen:")
monet_gen_trained = Generator()
monet_losses = train_model(monet_gen_trained, train_monet, val_monet, optimizer, loss_function, num_epochs)

# Train real_gen
print("Training real_gen:")
real_gen_trained = Generator()
real_losses = train_model(real_gen_trained, train_real, val_real, optimizer, loss_function, num_epochs)



In [None]:
real_gen_trained.save_weights(os.path.join("pre_train", f'generator_m2r.h5'))
monet_gen_trained.save_weights(os.path.join("pre_train", f'generator_r2m.h5'))



In [None]:
# Save the losses DataFrame to a CSV file
monet_losses.to_csv('pre_train/monet_losses.csv', index=False)
real_losses.to_csv('pre_train/real_losses.csv', index=False)


In [None]:
monet_losses = pd.read_csv('pre_train/monet_losses.csv')
real_losses = pd.read_csv('pre_train/real_losses.csv')

monet_gen_trained = Generator()
real_gen_trained = Generator()

real_gen_trained.load_weights(os.path.join("pre_train", f'generator_m2r.h5'))
monet_gen_trained.load_weights(os.path.join("pre_train", f'generator_r2m.h5'))

In [None]:
# Plot
plt.figure(figsize=(10, 6))
plt.plot(monet_losses['Epoch'], monet_losses['Training Loss'], label='Training Loss')
plt.plot(monet_losses['Epoch'], monet_losses['Validation Loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Losses')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Plot
plt.figure(figsize=(10, 6))
plt.plot(real_losses['Epoch'], real_losses['Training Loss'], label='Training Loss')
plt.plot(real_losses['Epoch'], real_losses['Validation Loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Losses')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
def generate_images(generator, img):
    img = np.expand_dims(img, axis=0)  # Add batch dimension
    generated_img = generator.predict(img)[0]
    return generated_img

for real_batch ,monet_batch in test_dataset.take(1):
    real_image = encode_image(real_batch[0])
    plt.imshow(real_image)
    plt.title("Real")
    plt.axis("off")
    plt.show(block = False)

    real_generate = generate_images(real_gen_trained , real_batch[0])
    real_generate_img = encode_image(real_generate)
    plt.imshow(real_generate_img)
    plt.title("real_generate")
    plt.axis("off")
    plt.show(block = False)


    print("-"*60)
    monet_image = encode_image(monet_batch[0])
    plt.imshow(monet_image)
    plt.title("Monet")
    plt.axis("off")
    plt.show(block = False)

    monet_generate = generate_images(monet_gen_trained , monet_batch[0])
    monet_generate_img = encode_image(monet_generate)
    plt.imshow(monet_generate_img)
    plt.title("monet_generate")
    plt.axis("off")
    plt.show(block = False)


# step 2


In [None]:
class CycleGan(tf.keras.Model):
    def __init__(self, generator_real2monet, generator_monet2real, discriminator_real, discriminator_monet, lr=5e-4, beta_1=0.5):
        super(CycleGan, self).__init__()
        self.generator_real2monet = generator_real2monet
        self.generator_monet2real = generator_monet2real
        self.discriminator_real = discriminator_real
        self.discriminator_monet = discriminator_monet
        self.lr = lr
        self.beta_1 = beta_1
        
    def compile(self, mac=True):
        if mac:
            optimizer = legacy.Adam(learning_rate=self.lr, beta_1=self.beta_1)
        else:
            optimizer = tf.keras.optimizers.Adam(learning_rate=self.lr, beta_1=self.beta_1)
            
        self.generator_real2monet.compile(optimizer=optimizer)
        self.generator_monet2real.compile(optimizer=optimizer)
        self.discriminator_real.compile(optimizer=optimizer)
        self.discriminator_monet.compile(optimizer=optimizer)
        
    def adversarial_loss(self, discriminator, generated):
        fake_output = discriminator(generated)
        return tf.reduce_mean(tf.keras.losses.BinaryCrossentropy(from_logits=True)(tf.ones_like(fake_output), fake_output))
    
    def cycle_consistency_loss(self, real_images, reconstructed_images):
        return tf.reduce_mean(tf.abs(real_images - reconstructed_images))
    
    def identity_loss(self , real_image, generated_image):
        loss = tf.reduce_mean(tf.square(real_image - generated_image))
        return loss
    
    def discriminator_loss(self ,real_output, fake_output):
        bce_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)

        # Real and fake labels
        real_labels = tf.ones_like(real_output)
        fake_labels = tf.zeros_like(fake_output)

        # Compute losses
        real_loss = bce_loss(real_labels, real_output)
        fake_loss = bce_loss(fake_labels, fake_output)

        # Total loss
        total_loss = real_loss + fake_loss

        return total_loss

    
    def train_step(self, batch_data):
        batch_real, batch_monet = batch_data
        
        with tf.GradientTape(persistent=True) as tape:
            # Generate fake images
            fake_monet = self.generator_real2monet(batch_real, training=True)
            fake_real = self.generator_monet2real(batch_monet, training=True)

            # Generate reconstructed images
            reconstr_real = self.generator_monet2real(fake_monet, training=True)
            reconstr_monet = self.generator_real2monet(fake_real, training=True)

            same_real = self.generator_monet2real(batch_real, training = True)
            same_monet = self.generator_real2monet(batch_monet, training = True)

            # Adversarial loss
            adv_loss_R2M = self.adversarial_loss(self.discriminator_monet, fake_monet)
            adv_loss_M2R = self.adversarial_loss(self.discriminator_real, fake_real)

            # Cycle consistency loss
            cycle_loss = self.cycle_consistency_loss(batch_real, reconstr_real) + \
                         self.cycle_consistency_loss(batch_monet, reconstr_monet)

            identity_loss_real = self.identity_loss(batch_real ,same_real )
            identity_loss_monet = self.identity_loss(batch_monet ,same_monet )
            # Total generator loss
            if True:
                total_gen_R2M_loss = 2*adv_loss_R2M + cycle_loss  + 2*identity_loss_monet 
                total_gen_M2R_loss = 2*adv_loss_M2R + cycle_loss + 2*identity_loss_real
            else: 
                total_gen_R2M_loss = 2*adv_loss_R2M + cycle_loss 
                total_gen_M2R_loss = 2*adv_loss_M2R + cycle_loss 

            # Compute discriminator losses
            disc_real_real_output = self.discriminator_real(batch_real, training=True)
            disc_real_fake_output = self.discriminator_real(fake_real, training=True)
            disc_monet_real_output = self.discriminator_monet(batch_monet, training=True)
            disc_monet_fake_output = self.discriminator_monet(fake_monet, training=True)
            disc_real_loss = self.discriminator_loss(disc_real_real_output, disc_real_fake_output)
            disc_monet_loss = self.discriminator_loss(disc_monet_real_output, disc_monet_fake_output)
            
        # Compute gradients of generator loss with respect to generator variables
        gen_R2M_gradients = tape.gradient(total_gen_R2M_loss, self.generator_real2monet.trainable_variables)
        gen_M2R_gradients = tape.gradient(total_gen_M2R_loss, self.generator_monet2real.trainable_variables)

        # Apply gradients to generator variables
        self.generator_real2monet.optimizer.apply_gradients(zip(gen_R2M_gradients, self.generator_real2monet.trainable_variables))
        self.generator_monet2real.optimizer.apply_gradients(zip(gen_M2R_gradients, self.generator_monet2real.trainable_variables))

        # Compute gradients of discriminator losses with respect to discriminator variables
        disc_real_gradients = tape.gradient(disc_real_loss, self.discriminator_real.trainable_variables)
        disc_monet_gradients = tape.gradient(disc_monet_loss, self.discriminator_monet.trainable_variables)

        # Apply gradients to discriminator variables
        self.discriminator_real.optimizer.apply_gradients(zip(disc_real_gradients, self.discriminator_real.trainable_variables))
        self.discriminator_monet.optimizer.apply_gradients(zip(disc_monet_gradients, self.discriminator_monet.trainable_variables))

        return {
            "gen_R2M_loss": total_gen_R2M_loss,
            "gen_M2R_loss": total_gen_M2R_loss,
            "disc_real_loss": disc_real_loss,
            "disc_monet_loss": disc_monet_loss
        }
    
    def test_step(self, batch_data):
        batch_real, batch_monet = batch_data
        
        # Generate fake images
        fake_monet = self.generator_real2monet(batch_real, training=False)
        fake_real = self.generator_monet2real(batch_monet, training=False)

        # Generate reconstructed images
        reconstr_real = self.generator_monet2real(fake_monet, training=False)
        reconstr_monet = self.generator_real2monet(fake_real, training=False)
          
        same_real = self.generator_monet2real(batch_real, training = False)
        same_monet = self.generator_real2monet(batch_monet, training = False)
        # Adversarial loss
        adv_loss_R2M = self.adversarial_loss(self.discriminator_monet, fake_monet)
        adv_loss_M2R = self.adversarial_loss(self.discriminator_real, fake_real)

        # Cycle consistency loss
        cycle_loss = self.cycle_consistency_loss(batch_real, reconstr_real) + \
                    self.cycle_consistency_loss(batch_monet, reconstr_monet)

        identity_loss_real = self.identity_loss(batch_real ,same_real )
        identity_loss_monet = self.identity_loss(batch_monet ,same_monet )

        # Total generator loss
        if True:
            total_gen_R2M_loss = 2*adv_loss_R2M + cycle_loss  + 2*identity_loss_monet 
            total_gen_M2R_loss = 2*adv_loss_M2R + cycle_loss + 2*identity_loss_real
        else: 
            total_gen_R2M_loss = 2*adv_loss_R2M + cycle_loss 
            total_gen_M2R_loss = 2*adv_loss_M2R + cycle_loss 

        # Compute discriminator losses
        disc_real_real_output = self.discriminator_real(batch_real, training=False)
        disc_real_fake_output = self.discriminator_real(fake_real, training=False)
        disc_monet_real_output = self.discriminator_monet(batch_monet, training=False)
        disc_monet_fake_output = self.discriminator_monet(fake_monet, training=False)
        disc_real_loss = self.discriminator_loss(disc_real_real_output, disc_real_fake_output)
        disc_monet_loss = self.discriminator_loss(disc_monet_real_output, disc_monet_fake_output)

        return {
            "gen_R2M_loss": total_gen_R2M_loss,
            "gen_M2R_loss": total_gen_M2R_loss,
            "disc_real_loss": disc_real_loss,
            "disc_monet_loss": disc_monet_loss
        }

In [None]:
print("GPU Available:", tf.config.list_physical_devices('GPU'))

In [None]:
generator_real2monet, generator_monet2real = Generator(),Generator()
# generator_real2monet.load_weights('pre_train/generator_r2m.h5')
# generator_monet2real.load_weights('pre_train/generator_m2r.h5')
discriminator_real, discriminator_monet = Discriminator(),Discriminator()
lr = 0.0001
beta_1 = 0.5

models_folder = f"models/lr_{lr}_beta_{beta_1}"
os.makedirs(models_folder, exist_ok=True)
os.makedirs("results", exist_ok=True)
cycle_gan_model = CycleGan(generator_real2monet, generator_monet2real, discriminator_real, discriminator_monet, lr=lr, beta_1=beta_1)

# Compile the model
cycle_gan_model.compile()

num_epochs = 30
eval_interval = 2  

# Create an empty DataFrame to store training and evaluation metrics
log_data = {'Epoch': [], 
            'Gen R2M Loss Train': [], 'Gen M2R Loss Train': [], 
            'Disc Real Loss Train': [], 'Disc Monet Loss Train': [],
            'Gen R2M Loss Eval': [], 'Gen M2R Loss Eval': [], 
            'Disc Real Loss Eval': [], 'Disc Monet Loss Eval': []}

for epoch in range(1, num_epochs + 1):
    # Initialize tqdm for the training dataset
    train_losses = {'Gen R2M Loss': [], 'Gen M2R Loss': [], 'Disc Real Loss': [], 'Disc Monet Loss': []}

    for batch_real, batch_monet in tqdm(train_dataset, desc=f'Epoch {epoch}'):
        # Perform one training step
        train_logs = cycle_gan_model.train_step((batch_real, batch_monet))

        # Log training metrics
        train_losses['Gen R2M Loss'].append(train_logs['gen_R2M_loss'])
        train_losses['Gen M2R Loss'].append(train_logs['gen_M2R_loss'])
        train_losses['Disc Real Loss'].append(train_logs['disc_real_loss'])
        train_losses['Disc Monet Loss'].append(train_logs['disc_monet_loss'])

    # Calculate average training metrics
    avg_train_losses = {k + ' Train': np.mean(v) for k, v in train_losses.items()}
    
    cycle_gan_model.generator_real2monet.save_weights(os.path.join(models_folder, f'generator_real2monet_epoch_{epoch}.h5'))
    cycle_gan_model.generator_monet2real.save_weights(os.path.join(models_folder, f'generator_monet2real_epoch_{epoch}.h5'))
    cycle_gan_model.discriminator_real.save_weights(os.path.join(models_folder, f'discriminator_real_epoch_{epoch}.h5'))
    cycle_gan_model.discriminator_monet.save_weights(os.path.join(models_folder, f'discriminator_monet_epoch_{epoch}.h5'))
    
    # Evaluate the model every eval_interval epochs
    if epoch % eval_interval == 0:
        eval_metrics = {'Gen R2M Loss': 0.0, 'Gen M2R Loss': 0.0, 'Disc Real Loss': 0.0, 'Disc Monet Loss': 0.0}  # Initialize evaluation metrics
        num_batches = 0

        # Initialize tqdm for the evaluation dataset
        for batch_real_eval, batch_monet_eval in tqdm(val_dataset, desc=f'Eval at Epoch {epoch}'):
            # Perform one evaluation step
            eval_logs = cycle_gan_model.test_step((batch_real_eval, batch_monet_eval))

            # Accumulate evaluation metrics
            eval_metrics['Gen R2M Loss'] += eval_logs['gen_R2M_loss']
            eval_metrics['Gen M2R Loss'] += eval_logs['gen_M2R_loss']
            eval_metrics['Disc Real Loss'] += eval_logs['disc_real_loss']
            eval_metrics['Disc Monet Loss'] += eval_logs['disc_monet_loss']
            num_batches += 1

        # Calculate average evaluation metrics
        for metric in eval_metrics:
            eval_metrics[metric] /= num_batches

        # Log evaluation metrics
        avg_eval_metrics = {k + ' Eval': float(v) for k, v in eval_metrics.items()}

        # Append metrics to log_data
        log_data['Epoch'].append(epoch)
        for key, value in avg_train_losses.items():
            if key in log_data:
                log_data[key].append(value)
            else:
                log_data[key] = [value]

        for key, value in avg_eval_metrics.items():
            if key in log_data:
                log_data[key].append(value)
            else:
                log_data[key] = [value]

        print(log_data)

# Convert log_data to a pandas DataFrame
log_df = pd.DataFrame(log_data)

# Save log_df to CSV file
log_df.to_csv(f'results/log_lr_{lr}_beta_{beta_1}.csv', index=False)

In [None]:
def plot_losses(df_losses):
    epochs = df_losses['Epoch']
    gen_R2M_loss_train = df_losses['Gen R2M Loss Train']
    gen_M2R_loss_train = df_losses['Gen M2R Loss Train']
    disc_real_loss_train = df_losses['Disc Real Loss Train']
    disc_monet_loss_train = df_losses['Disc Monet Loss Train']
    gen_R2M_loss_eval = df_losses['Gen R2M Loss Eval']
    gen_M2R_loss_eval = df_losses['Gen M2R Loss Eval']
    disc_real_loss_eval = df_losses['Disc Real Loss Eval']
    disc_monet_loss_eval = df_losses['Disc Monet Loss Eval']

    # Plot the data
    plt.figure(figsize=(12, 8))

    # Plot training loss
    plt.plot(epochs, gen_R2M_loss_train, label='Gen R2M Loss Train', marker='o')
    plt.plot(epochs, gen_M2R_loss_train, label='Gen M2R Loss Train', marker='o')
    plt.plot(epochs, disc_real_loss_train, label='Disc Real Loss Train', marker='o')
    plt.plot(epochs, disc_monet_loss_train, label='Disc Monet Loss Train', marker='o')

    # Plot evaluation loss
    plt.plot(epochs, gen_R2M_loss_eval, label='Gen R2M Loss Eval', marker='o')
    plt.plot(epochs, gen_M2R_loss_eval, label='Gen M2R Loss Eval', marker='o')
    plt.plot(epochs, disc_real_loss_eval, label='Disc Real Loss Eval', marker='o')
    plt.plot(epochs, disc_monet_loss_eval, label='Disc Monet Loss Eval', marker='o')

    # Add labels and title
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Loss Over Epochs')
    plt.legend()
    plt.grid(True)

    # Show plot
    plt.show()

In [None]:
def generate_images(generator, img):
    img = np.expand_dims(img, axis=0)  
    generated_img = generator.predict(img)[0]
    return generated_img


def plot_predictions(test_dataset , generator_monet2real , generator_real2monet):
    for real_batch ,monet_batch in test_dataset.take(1):
        real_image = encode_image(real_batch[0])
        plt.imshow(real_image)
        plt.title("Real")
        plt.axis("off")
        plt.show(block = False)

        monet_generate = generate_images(generator_real2monet , real_batch[0])
        monet_generate_img = encode_image(monet_generate)
        plt.imshow(monet_generate_img)
        plt.title("monet_generate")
        plt.axis("off")
        plt.show(block = False)

        real_generate = generate_images(generator_monet2real ,monet_generate)
        real_generate_img = encode_image(real_generate)
        plt.imshow(real_generate_img)
        plt.title("real_generate")
        plt.axis("off")
        plt.show(block = False)

        print("-"*60)
        monet_image = encode_image(monet_batch[0])
        plt.imshow(monet_image)
        plt.title("Monet")
        plt.axis("off")
        plt.show(block = False)

        real_generate = generate_images(generator_monet2real , monet_batch[0])
        real_generate_img = encode_image(real_generate)
        plt.imshow(real_generate_img)
        plt.title("realt_generate")
        plt.axis("off")
        plt.show(block = False)

        monet_generate = generate_images(generator_real2monet ,real_generate)
        monet_generate_img = encode_image(monet_generate)
        plt.imshow(monet_generate_img)
        plt.title("monet_generate")
        plt.axis("off")
        plt.show(block = False)

In [None]:
df_losses_identity = pd.read_csv('results/log_lr_0.0001_beta_0.5_identity.csv')
print(df_losses_identity.head())

df_losses_pretrain_identity = pd.read_csv('results/log_lr_0.0001_beta_0.5_pretrain_identity.csv')
print(df_losses_pretrain_identity.head())

df_losses_pretrain = pd.read_csv('results/log_lr_0.0001_beta_0.5_pretrain.csv')
print(df_losses_pretrain.head())

In [None]:
plot_losses(df_losses_identity)

In [None]:
plot_losses(df_losses_pretrain_identity)

In [None]:
plot_losses(df_losses_pretrain)

In [None]:
best_epoch = 30
models_folder = "models/lr_0.0001_beta_0.5_identity"
generator_real2monet, generator_monet2real = Generator(),Generator()
generator_real2monet.load_weights(os.path.join(models_folder, f'generator_real2monet_epoch_{best_epoch}.h5'))
generator_monet2real.load_weights(os.path.join(models_folder, f'generator_monet2real_epoch_{best_epoch}.h5'))

print("identity batch 30")
plot_predictions(test_dataset , generator_monet2real , generator_real2monet)


In [None]:
best_epoch = 20
models_folder = "models/lr_0.0001_beta_0.5_pretrain"
generator_real2monet, generator_monet2real = Generator(),Generator()
generator_real2monet.load_weights(os.path.join(models_folder, f'generator_real2monet_epoch_{best_epoch}.h5'))
generator_monet2real.load_weights(os.path.join(models_folder, f'generator_monet2real_epoch_{best_epoch}.h5'))

print("pretrain batch 20")
plot_predictions(test_dataset , generator_monet2real , generator_real2monet)

In [None]:
best_epoch = 20
models_folder = "models/lr_0.0001_beta_0.5_pretrain_identity"
generator_real2monet, generator_monet2real = Generator(),Generator()
generator_real2monet.load_weights(os.path.join(models_folder, f'generator_real2monet_epoch_{best_epoch}.h5'))
generator_monet2real.load_weights(os.path.join(models_folder, f'generator_monet2real_epoch_{best_epoch}.h5'))

print("pretrain_identity batch 20")
plot_predictions(test_dataset , generator_monet2real , generator_real2monet)

## evel best model

In [None]:
# need to change all this 

In [None]:
best_epoch = 30
models_folder = "models/lr_0.0001_beta_0.5_identity"
generator_real2monet, generator_monet2real = Generator(),Generator()
generator_real2monet.load_weights(os.path.join(models_folder, f'generator_real2monet_epoch_{best_epoch}.h5'))
generator_monet2real.load_weights(os.path.join(models_folder, f'generator_monet2real_epoch_{best_epoch}.h5'))


In [None]:
import numpy as np
import tensorflow as tf
from scipy.spatial.distance import cosine

# Function to calculate FID
def calculate_fid(mu_real, sigma_real, mu_gen, sigma_gen):
    fid = np.sum((mu_real - mu_gen)**2) + np.trace(sigma_real + sigma_gen - 2 * np.sqrt(np.dot(sigma_real, sigma_gen)))
    return fid

# Function to calculate memorization distance
def calculate_memorization_distance(real_samples, generated_samples):
    distances = []
    for gen_sample in generated_samples:
        min_distance = np.inf
        for real_sample in real_samples:
            distance = cosine(gen_sample, real_sample)
            if distance < min_distance:
                min_distance = distance
        distances.append(min_distance)
    return np.mean(distances)

In [None]:

# Load a pre-trained Inception model for feature extraction
inception_model = tf.keras.applications.InceptionV3(include_top=False, pooling='avg')

real_images_features_all = []
monet_images_features_all = []

generated_monet_features_all = []
generated_real_features_all = []

# Loop over the dataset
for real_batch, monet_batch in test_dataset:
    real_images_features = inception_model.predict(encode_image(real_batch))
    monet_images_features = inception_model.predict(encode_image(monet_batch))

    generated_monet_features = inception_model.predict(generator_real2monet(encode_image(real_batch) , training = False))
    generated_real_features = inception_model.predict(generator_monet2real(encode_image(monet_batch) , training = False))

    # Accumulate features for the entire dataset
    real_images_features_all.append(real_images_features)
    monet_images_features_all.append(monet_images_features)
    generated_monet_features_all.append(generated_monet_features)
    generated_real_features_all.append(generated_real_features)

# Concatenate features from all batches
real_images_features_all = np.concatenate(real_images_features_all)
monet_images_features_all = np.concatenate(monet_images_features_all)
generated_monet_features_all = np.concatenate(generated_monet_features_all)
generated_real_features_all = np.concatenate(generated_real_features_all)





In [None]:
# Calculate mean and covariance of features for real images
mu_real = np.mean(real_images_features_all, axis=0)
sigma_real = np.cov(real_images_features_all, rowvar=False)

# Calculate mean and covariance of features for monet images
mu_monet = np.mean(monet_images_features_all, axis=0)
sigma_monet = np.cov(monet_images_features_all, rowvar=False)

# Calculate mean and covariance of features for generated Monet images
mu_gen_monet = np.mean(generated_monet_features_all, axis=0)
sigma_gen_monet = np.cov(generated_monet_features_all, rowvar=False)

# Calculate mean and covariance of features for generated real images
mu_gen_real = np.mean(generated_real_features_all, axis=0)
sigma_gen_real = np.cov(generated_real_features_all, rowvar=False)

# Compute FID scores
fid_score_monet = calculate_fid(mu_monet, sigma_monet, mu_gen_monet, sigma_gen_monet)
fid_score_real = calculate_fid(mu_real, sigma_real, mu_gen_real, sigma_gen_real)

# Compute memorization distance for generated Monet images
memorization_distance_monet = calculate_memorization_distance(real_images_features_all, generated_monet_features_all)

# Compute memorization distance for generated real images
memorization_distance_real = calculate_memorization_distance(real_images_features_all, generated_real_features_all)

# Threshold memorization distances based on epsilon
epsilon = 0.1
if memorization_distance_monet > epsilon:
    memorization_distance_monet = 1.0

if memorization_distance_real > epsilon:
    memorization_distance_real = 1.0

# Calculate MiFID scores
mifid_score_monet = fid_score_monet * memorization_distance_monet
mifid_score_real = fid_score_real * memorization_distance_real

print("FID Score (Monet):", fid_score_monet)
print("Memorization Distance (Monet):", memorization_distance_monet)
print("MiFID Score (Monet):", mifid_score_monet)

print("FID Score (Real):", fid_score_real)
print("Memorization Distance (Real):", memorization_distance_real)
print("MiFID Score (Real):", mifid_score_real)


In [None]:
print("sefe")

# save results for kaggle 


In [None]:
!mkdir ../images

In [None]:
directory_path = "data/photo_jpg/"
image_list = load_images_from_dir(directory_path)
print("Number of images loaded:", len(image_list))

In [None]:
best_epoch = 30
models_folder = "models/lr_0.0001_beta_0.5_identity"
generator_real2monet = Generator()
generator_real2monet.load_weights(os.path.join(models_folder, f'generator_real2monet_epoch_{best_epoch}.h5'))


In [None]:
i = 1
for image in image_list:
    real_image = decode_image(image)
    
    monet_generate = generate_images(generator_real2monet , real_image)
    monet_generate_img = encode_image(monet_generate).numpy()
    monet_generate_img_fixed_size = cv2.resize(monet_generate_img, (256, 256))
    # print(monet_generate_img_fixed_size.shape)
    # plt.imshow(monet_generate_img_fixed_size)
    # plt.title("Real")
    # plt.axis("off")
    # plt.show(block = False)
  
    cv2.imwrite(f"../images/{i}.jpg", monet_generate_img_fixed_size)
    i += 1
    

In [None]:
images_folder_path = '../images'  
zip_filename = 'images.zip'
shutil.make_archive(zip_filename.split('.')[0], 'zip', images_folder_path)
print(f"Images folder has been successfully zipped to {zip_filename}.")
