In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Definirea problemei
Ce se dă:

Setul de date este dat, inclusiv patru directoare care conțin imagini:
monet_tfrec și monet_jpg: conțin 300 de imagini ale picturilor lui Monet, fie în formatul TFRecord, fie în formatul JPEG.
photo_tfrec și photo_jpg: conțin 7028 de fotografii, fie în formatul TFRecord, fie în formatul JPEG.
Ce se cere:

Se cere să se dezvolte un algoritm care să preia imaginile din directoarele photo_tfrec și photo_jpg (input), să le proceseze pentru a adăuga stilul lui Monet și să genereze imaginile corespunzătoare în stilul Monet.
Outputul algoritmului trebuie să fie imaginile generate în stilul Monet, care trebuie să fie salvate în format JPEG și să fie împachetate într-un fișier zip.

Tip problema: Generative Adversarial Networks - care are la baza doua retele neuronale, una care genereaza continut si una care care incearca sa distinga intre materialele reale si cele generate

# Analiza exploratorie a datelor (imagini)


In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import cv2
import matplotlib.pyplot as plt

Definirea directoarelor pentru date

In [None]:
monet_tfrec_dir = '/content/drive/MyDrive/data/monet_tfrec'
photo_tfrec_dir = '/content/drive/MyDrive/data/photo_tfrec'


Funcție pentru a încărca și decoda imagini din formatul TFRecord

In [None]:
def decode_tfrecord(example):
    tfrecord_format = {
        "image": tf.io.FixedLenFeature([], tf.string)
    }
    example = tf.io.parse_single_example(example, tfrecord_format)
    image = tf.image.decode_jpeg(example['image'], channels=3)
    return image


Funcție pentru a număra imaginile dintr-un director TFRecord

In [None]:
def count_tfrecord_files(tfrecord_dir):
    return sum([len(files) for _, _, files in os.walk(tfrecord_dir)])


Numărul de imagini din fiecare director TFRecord, numărul de imagini JPEG

In [None]:

num_monet_images = count_tfrecord_files(monet_tfrec_dir)
num_photo_images = count_tfrecord_files(photo_tfrec_dir)

num_monet_jpg = len(os.listdir('/content/drive/MyDrive/data/monet_jpg'))
num_photo_jpg = len(os.listdir('/content/drive/MyDrive/data/photo_jpg'))



Crearea unui DataFrame pentru analiza exploratorie a datelor

In [None]:
data = {
    'Dataset': ['Monet', 'Photo'],
    'TFRecord Images': [num_monet_images, num_photo_images],
    'JPG Images': [num_monet_jpg, num_photo_jpg]
}
df = pd.DataFrame(data)

# Afișarea DataFrame
print("Summary of Dataset:")
print(df)

Vizualizarea dimensiunii unei imagini din fiecare set

In [None]:
monet_dataset = tf.data.TFRecordDataset([os.path.join(monet_tfrec_dir, f) for f in os.listdir(monet_tfrec_dir)])
photo_dataset = tf.data.TFRecordDataset([os.path.join(photo_tfrec_dir, f) for f in os.listdir(photo_tfrec_dir)])

# Aplicarea funcției de decodificare asupra datelor
monet_dataset = monet_dataset.map(decode_tfrecord)
photo_dataset = photo_dataset.map(decode_tfrecord)

# Extragerea unei imagini de exemplu pentru vizualizare
sample_monet_image = next(iter(monet_dataset))
sample_photo_image = next(iter(photo_dataset))

# Afișarea dimensiunilor imaginilor de exemplu
print("Dimensions of a sample Monet image:", sample_monet_image.shape)
print("Dimensions of a sample Photo image:", sample_photo_image.shape)


Vizualizarea a câtorva imagini de exemplu

In [None]:
def visualize_samples(image_dir, num_samples=5):
    image_files = os.listdir(image_dir)
    fig, axes = plt.subplots(1, num_samples, figsize=(15, 5))
    for i, img_file in enumerate(np.random.choice(image_files, num_samples)):
        img = plt.imread(os.path.join(image_dir, img_file))
        axes[i].imshow(img)
        axes[i].axis('off')
    plt.show()

print("Sample Monet paintings:")
visualize_samples('/content/drive/MyDrive/data/monet_jpg')

print("Sample Photos:")
visualize_samples('/content/drive/MyDrive/data/photo_jpg')

Calcul histograma culorilor RGB

In [None]:
def plotImagesHistogram(image_dir):
    # Lista pentru a stoca toate imaginile
    all_images = []

    # Încărcați primele 50 de imagini din director
    for img_file in os.listdir(image_dir)[:50]:
        img_path = os.path.join(image_dir, img_file)
        img = plt.imread(img_path)
        all_images.append(img)

    # Convertiți lista de imagini într-o matrice numpy
    all_images = np.array(all_images)

    # Calculează histograma medie pentru fiecare canal de culoare (roșu, verde, albastru)
    red_hist = np.mean(all_images[:, :, :, 0], axis=(0, 1))
    green_hist = np.mean(all_images[:, :, :, 1], axis=(0, 1))
    blue_hist = np.mean(all_images[:, :, :, 2], axis=(0, 1))

    # Calculează valorile x pentru linii verticale
    x_values = np.arange(256)

    # Trasează histograma pentru toate canalele de culoare pe o singură figură
    plt.figure(figsize=(10, 6))

    plt.plot(x_values, red_hist, color='red', label='Red')
    plt.plot(x_values, green_hist, color='green', label='Green')
    plt.plot(x_values, blue_hist, color='blue', label='Blue')

    plt.title('Average Histogram of RGB Channels')
    plt.xlabel('Pixel Intensity')
    plt.ylabel('Average Frequency')
    plt.legend()
    plt.grid(True)
    plt.show()

Picturi

In [None]:
plotImagesHistogram('/content/drive/MyDrive/data/monet_jpg')

Poze

In [None]:
plotImagesHistogram('/content/drive/MyDrive/data/photo_jpg')

# Analiza modelului

In [None]:
# Root folder
root_folder = ''

1. Data Loading Functions

In [None]:
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
import glob
import os

# Function to preprocess images for ResNet50
def preprocess_resnet(image):
    image = tf.cast(image, tf.float32)
    image = tf.keras.applications.resnet50.preprocess_input(image)  # Normalize for ResNet50
    return image

# Function to preprocess images for GAN
def normalize_minus_one_to_one(image):
    image = tf.cast(image, tf.float32)
    image = (image / 127.5) - 1.0  # Normalize to [-1, 1]
    return image

def normalize_zero_to_one(image):
    image = tf.cast(image, tf.float32)
    image = image / 255.0  # Normalize to [0, 1]
    return image

# Function to load images from folder
def load_images_from_folder(folder, label, image_size=(256, 256), limit=None):
    images = []
    labels = []
    for i, filename in enumerate(glob.glob(os.path.join(folder, '*.jpg'))):
        if limit and i >= limit:
            break
        img = tf.keras.preprocessing.image.load_img(filename, target_size=image_size)
        img = tf.keras.preprocessing.image.img_to_array(img)
        images.append(img)
        labels.append(label)
    return images, labels

# Function to create datasets
def create_datasets(monet_folder, photo_folder, test_size=0.2, image_size=(256, 256), batch_size=32,
                   monet_limit=None, photo_limit=None):
    # Load Monet and photo images for discriminator
    monet_images, monet_labels = load_images_from_folder(monet_folder, 0, image_size, limit=monet_limit)  # Label 0 for Monet
    print('Successfully loaded Monet images:', len(monet_images))

    photo_images, photo_labels = load_images_from_folder(photo_folder, 1, image_size, limit=photo_limit)  # Label 1 for photos
    print('Successfully loaded photo images:', len(photo_images))

    # Prepare datasets for discriminator training
    images = np.array(monet_images + photo_images)
    labels = np.array(monet_labels + photo_labels)

    train_images, val_images, train_labels, val_labels = train_test_split(images, labels, test_size=test_size)

    D_train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
    D_val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels))

    D_train_dataset = D_train_dataset.map(lambda x, y: (normalize_zero_to_one(x), y)).shuffle(buffer_size=1000).batch(batch_size).prefetch(buffer_size=tf.data.AUTOTUNE)
    D_val_dataset = D_val_dataset.map(lambda x, y: (normalize_zero_to_one(x), y)).batch(batch_size).prefetch(buffer_size=tf.data.AUTOTUNE)

    # Prepare datasets for generator training
    photo_images = np.array(photo_images)
    dataset_X = tf.data.Dataset.from_tensor_slices(photo_images).map(lambda x: normalize_zero_to_one(x)).shuffle(buffer_size=1000).batch(batch_size).prefetch(buffer_size=tf.data.AUTOTUNE)
    monet_images = np.array(monet_images)
    dataset_Y = tf.data.Dataset.from_tensor_slices(monet_images).map(lambda x: normalize_zero_to_one(x)).shuffle(buffer_size=1000).batch(batch_size).prefetch(buffer_size=tf.data.AUTOTUNE)

    return dataset_X, dataset_Y, D_train_dataset, D_val_dataset


In [None]:
# Use the create_dataset function with limits
dataset_X, dataset_Y, D_train_dataset, D_val_dataset = create_datasets(
    monet_folder=root_folder + 'monet_jpg',
    photo_folder=root_folder + 'photo_jpg',
    batch_size=8,
    monet_limit=300,
    photo_limit=400
)

# Print dataset sizes
print("Generator X to Y training dataset size:", len(dataset_X))
print("Generator Y to X training dataset size:", len(dataset_Y))
print("Discriminator training dataset size:", len(D_train_dataset))
print("Discriminator validation dataset size:", len(D_val_dataset))

2. Build and Fine-tune the Pre-trained Discriminator Model

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models

def build_pretrained_discriminator(pretrained_model):
    model = models.Sequential()
    model.add(pretrained_model)
    model.add(layers.GlobalAveragePooling2D())
    model.add(layers.Dense(512, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(1, activation='sigmoid'))  # Use sigmoid for binary classification
    return model

def fine_tune_pretrained_discriminator():
    # Load a pre-trained model
    pretrained_model = tf.keras.applications.ResNet50(
        input_shape=(256, 256, 3),
        include_top=False,
        weights='imagenet'
    )

    discriminator = build_pretrained_discriminator(pretrained_model)
    discriminator.summary()

    # Fine-tune the model
    discriminator.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )

    history = discriminator.fit(
        D_train_dataset,
        validation_data=D_val_dataset,
        epochs=10
    )

    # Save the fine-tuned model
    discriminator.save('fine_tuned_discriminator.h5')

# fine_tune_pretrained_discriminator()

3. Define the Discriminator Model

In [None]:
# Custom discriminator
from tensorflow.keras import layers, models
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D,MaxPooling2D, Flatten, Dense, Input, LeakyReLU, BatchNormalization, GlobalAveragePooling2D, Dropout

def build_discriminator():
    model = Sequential()
    model.add(Input(shape=(256, 256, 3)))

    model.add(Conv2D(32, (4, 4), strides=2))
    model.add(LeakyReLU(alpha=0.2))
    model.add(MaxPooling2D(pool_size=(2,2)))

    model.add(Conv2D(64, (4, 4)))
    model.add(LeakyReLU(alpha=0.2))
    model.add(MaxPooling2D(pool_size=(2,2)))

    model.add(Conv2D(128, (4, 4), strides=2, padding='same'))
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.2))
    model.add(MaxPooling2D(pool_size=(2,2)))

    model.add(Flatten())
    model.add(Dense(256, activation='relu'))
    # model.add(Dropout(0.3))
    model.add(Dense(1, activation='sigmoid'))
    return model

discriminator = build_discriminator()
discriminator.summary()

# Compile the discriminator
# discriminator.compile(optimizer=tf.keras.optimizers.Adam(lr=0.0002, beta_1=0.5), loss='binary_crossentropy', metrics=['accuracy'])
discriminator.compile(optimizer=tf.keras.optimizers.Adam(), loss='binary_crossentropy', metrics=['accuracy'])

# Training the discriminator
epochs = 70
history = discriminator.fit(D_train_dataset, validation_data=D_val_dataset, epochs=epochs, shuffle=True)

# Save the discriminator model
discriminator.save(root_folder + 'discriminator.h5')

In [None]:
from sklearn.metrics import confusion_matrix
import itertools
import matplotlib.pyplot as plt

def evalMultiClass(realLabels, computedLabels, labelNames):
    confMatrix = confusion_matrix(realLabels, computedLabels)
    acc = np.sum([confMatrix[i][i] for i in range(len(labelNames))]) / len(realLabels)
    precision = {}
    recall = {}
    for i in range(len(labelNames)):
        precision[labelNames[i]] = confMatrix[i][i] / np.sum([confMatrix[j][i] for j in range(len(labelNames))])
        recall[labelNames[i]] = confMatrix[i][i] / np.sum([confMatrix[i][j] for j in range(len(labelNames))])

    return acc, precision, recall, confMatrix

def plotConfusionMatrix(cm, classNames, title):
    classes = classNames
    plt.figure()
    plt.imshow(cm, interpolation = 'nearest', cmap = 'Blues')
    plt.title('Confusion Matrix ' + title)
    plt.colorbar()
    tick_marks = np.arange(len(classNames))
    plt.xticks(tick_marks, classNames, rotation=45)
    plt.yticks(tick_marks, classNames)

    text_format = 'd'
    thresh = cm.max() / 2.
    for row, column in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(column, row, format(cm[row, column], text_format),
                horizontalalignment = 'center',
                color = 'white' if cm[row, column] > thresh else 'black')

    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()

    plt.show()

# Function to plot the image and prediction
def plot_image_and_prediction(image_path, prediction):
    image = tf.keras.preprocessing.image.load_img(image_path, target_size=(256, 256))
    plt.imshow(image)
    plt.axis('off')
    plt.title(f'Prediction: {"Real" if prediction == 1 else "Monet"}')
    plt.show()

# Plot the training history
def plot_training_history(history):
    # Plot the loss
    plt.figure(figsize=(12, 4))
    
    # Loss plot
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Loss over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    
    # Accuracy plot
    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Accuracy over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    
    plt.show()

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt
import os

# Call the function to plot the training history
plot_training_history(history)

results = discriminator.predict(D_val_dataset)

computed = [0 if x < 0.5 else 1 for x in results]
real = np.concatenate(([el2 for el1, el2 in list(D_val_dataset.as_numpy_iterator())]), axis=None).tolist()

acc, prec, recall, cm = evalMultiClass(real, computed, ['Monet', 'Photo'])
print('Accuracy:', acc)
print('Precision:', prec)
print('Recall:', recall)
plotConfusionMatrix(cm, ['Monet', 'Photo'], 'Discriminator')

# Load the fine-tuned discriminator model
# discriminator = tf.keras.models.load_model(root_folder + 'discriminator.h5')

# Test images
test_image_paths = [
    root_folder + 'photo_jpg/0a74701f65.jpg',
    root_folder + 'monet_jpg/2f20944b6a.jpg',
    root_folder + 'photo_jpg/0aec1f9701.jpg',
    root_folder + 'monet_jpg/32e33792cc.jpg'
]

# Predict and plot results
for image_path in test_image_paths:
    image = tf.keras.preprocessing.image.load_img(image_path, target_size=(256, 256))
    image = tf.keras.preprocessing.image.img_to_array(image)
    image = normalize_zero_to_one(image)
    image = np.expand_dims(image, axis=0)
    prediction = discriminator.predict(image)
    print(f'Prediction: {prediction}')
    prediction = round(prediction[0][0])
    plot_image_and_prediction(image_path, prediction)

4. Define the Generator Model

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models

# # Define generator model
# generator = models.Sequential([
#     layers.Input(shape=(256, 256, 3)),
#     layers.Conv2D(64, kernel_size=(3, 3), padding='same', activation='relu'),
#     layers.BatchNormalization(),
#     layers.Conv2D(128, kernel_size=(3, 3), padding='same', activation='relu'),
#     layers.BatchNormalization(),
#     layers.Conv2D(256, kernel_size=(3, 3), padding='same', activation='relu'),
#     layers.BatchNormalization(),
#     layers.Conv2D(3, kernel_size=(3, 3), padding='same', activation='sigmoid')
# ])

# generator.summary()
# generator.compile()

import tensorflow as tf
from tensorflow.keras import layers, models

def residual_block(x, filters, kernel_size=3, strides=1):
    res = x
    x = layers.Conv2D(filters, kernel_size, strides=strides, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(filters, kernel_size, strides=strides, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Add()([x, res])
    return x

def build_generator(input_shape=(256, 256, 3)):
    inputs = layers.Input(shape=input_shape)

    # Encoder
    down1 = layers.Conv2D(64, (3, 3), strides=2, padding='same')(inputs)
    down1 = layers.BatchNormalization()(down1)
    down1 = layers.ReLU()(down1)

    down2 = layers.Conv2D(128, (3, 3), strides=2, padding='same')(down1)
    down2 = layers.BatchNormalization()(down2)
    down2 = layers.ReLU()(down2)

    down3 = layers.Conv2D(256, (3, 3), strides=2, padding='same')(down2)
    down3 = layers.BatchNormalization()(down3)
    down3 = layers.ReLU()(down3)

    # Bottleneck with More Residual Blocks
    bottleneck = residual_block(down3, 256)
    bottleneck = residual_block(bottleneck, 256)
    bottleneck = residual_block(bottleneck, 256)
    bottleneck = residual_block(bottleneck, 256)
    bottleneck = residual_block(bottleneck, 256)

    # Decoder
    up3 = layers.Conv2DTranspose(128, (3, 3), strides=2, padding='same')(bottleneck)
    up3 = layers.BatchNormalization()(up3)
    up3 = layers.ReLU()(up3)

    up2 = layers.Conv2DTranspose(64, (3, 3), strides=2, padding='same')(up3)
    up2 = layers.BatchNormalization()(up2)
    up2 = layers.ReLU()(up2)

    up1 = layers.Conv2DTranspose(32, (3, 3), strides=2, padding='same')(up2)
    up1 = layers.BatchNormalization()(up1)
    up1 = layers.ReLU()(up1)

    outputs = layers.Conv2D(3, (3, 3), padding='same', activation='sigmoid')(up1)

    model = models.Model(inputs, outputs)
    model.summary()
    model.compile()

    return model

5.0 Define and train the Cycle-GAN

In [None]:
from matplotlib import pyplot as plt

# Plotting generated images for a sample array of images
def display_generated_images(title, generator, images):
    num_images = len(images)
    plt.figure(figsize=(20, 10))  # Increase figure size

    for i, image in enumerate(images):
        image = np.expand_dims(image, axis=0)  # Add an extra dimension
        generated_image = generator(image, training=False)
        generated_image = generated_image[0] / 1.0
        image = image[0] / 1.0

        plt.subplot(2, num_images, i + 1)  # Adjust subplot for horizontal layout
        plt.imshow(image)
        plt.title(f'Original Image {i+1}')
        plt.axis('off')

        plt.subplot(2, num_images, i + num_images + 1)  # Adjust subplot for horizontal layout
        plt.imshow(generated_image)
        plt.title(f'Generated Image {i+1}')
        plt.axis('off')

    plt.suptitle(title)
    plt.tight_layout()
    plt.show()

# Plotting the losses
def plot_losses(gen_x_losses, disc_y_losses, batch_gen_x_losses, batch_disc_y_losses,
                 gen_y_losses, disc_x_losses, batch_gen_y_losses, batch_disc_x_losses):
    plt.figure(figsize=(12, 6))

    # Plot losses per epoch
    plt.subplot(2, 2, 1)
    plt.plot(gen_x_losses, label='Generator XtoY Loss per Epoch')
    plt.plot(disc_y_losses, label='Discriminator Y Loss per Epoch')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Loss per Epoch')

    # Plot losses per batch
    plt.subplot(2, 2, 2)
    plt.plot(batch_gen_x_losses, label='Generator XtoY Loss per Batch')
    plt.plot(batch_disc_y_losses, label='Discriminator Y Loss per Batch')
    plt.xlabel('Batches')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Loss per Batch')

    # Plot losses per epoch
    plt.subplot(2, 2, 3)
    plt.plot(gen_y_losses, label='Generator YtoX Loss per Epoch')
    plt.plot(disc_x_losses, label='Discriminator X Loss per Epoch')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Loss per Epoch')

    # Plot losses per batch
    plt.subplot(2, 2, 4)
    plt.plot(batch_gen_y_losses, label='Generator YtoX Loss per Batch')
    plt.plot(batch_disc_x_losses, label='Discriminator X Loss per Batch')
    plt.xlabel('Batches')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Loss per Batch')

    plt.tight_layout()
    plt.show()

In [None]:
# Load/build the generators and discriminators
G_XtoY = build_generator()
G_YtoX = build_generator()
D_X = tf.keras.models.load_model(root_folder + 'discriminator.h5')
D_Y = tf.keras.models.load_model(root_folder + 'discriminator.h5')
# print("Model(s) loaded successfully")

# Define loss functions
loss_obj = tf.keras.losses.BinaryCrossentropy(from_logits=False)

def discriminator_loss(real, generated):
    real_loss = loss_obj(tf.ones_like(real), real)
    generated_loss = loss_obj(tf.zeros_like(generated), generated)
    return real_loss + generated_loss

def generator_loss(generated):
    return loss_obj(tf.ones_like(generated), generated)

def cycle_loss(real_image, cycled_image, lambda_cycle):
    loss = tf.reduce_mean(tf.abs(real_image - cycled_image))
    return lambda_cycle * loss

def identity_loss(real_image, same_image, lambda_identity):
    loss = tf.reduce_mean(tf.abs(real_image - same_image))
    return lambda_identity * loss

# Define optimizers
generator_g_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
generator_f_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_x_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_y_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

# Define generator and discriminator losses arrays
gen_g_losses, gen_f_losses, disc_x_losses, disc_y_losses = [], [], [], []
batch_gen_g_losses, batch_gen_f_losses, batch_disc_x_losses, batch_disc_y_losses = [], [], [], []

# Train the models while collecting the losses
@tf.function
def train_step_with_loss_collection(real_x, real_y, lambda_cycle=10, lambda_identity=0.5):
    with tf.GradientTape(persistent=True) as tape:
        # Generate fake images
        fake_y = G_XtoY(real_x, training=True)
        cycled_x = G_YtoX(fake_y, training=True)
        
        fake_x = G_YtoX(real_y, training=True)
        cycled_y = G_XtoY(fake_x, training=True)
        
        # Identity mapping
        same_x = G_YtoX(real_x, training=True)
        same_y = G_XtoY(real_y, training=True)
        
        # Discriminator output
        disc_real_x = D_X(real_x, training=True)
        disc_fake_x = D_X(fake_x, training=True)
        
        disc_real_y = D_Y(real_y, training=True)
        disc_fake_y = D_Y(fake_y, training=True)
        
        # Calculate the losses
        gen_g_loss = generator_loss(disc_fake_y)
        gen_f_loss = generator_loss(disc_fake_x)
        
        total_cycle_loss = cycle_loss(real_x, cycled_x, lambda_cycle) + cycle_loss(real_y, cycled_y, lambda_cycle)
        
        total_gen_g_loss = gen_g_loss + total_cycle_loss + identity_loss(real_y, same_y, lambda_identity)
        total_gen_f_loss = gen_f_loss + total_cycle_loss + identity_loss(real_x, same_x, lambda_identity)
        
        disc_x_loss = discriminator_loss(disc_real_x, disc_fake_x)
        disc_y_loss = discriminator_loss(disc_real_y, disc_fake_y)
    
    # Calculate the gradients
    gradients_of_gen_g = tape.gradient(total_gen_g_loss, G_XtoY.trainable_variables)
    gradients_of_gen_f = tape.gradient(total_gen_f_loss, G_YtoX.trainable_variables)
    
    gradients_of_disc_x = tape.gradient(disc_x_loss, D_X.trainable_variables)
    gradients_of_disc_y = tape.gradient(disc_y_loss, D_Y.trainable_variables)
    
    # Apply the gradients
    generator_g_optimizer.apply_gradients(zip(gradients_of_gen_g, G_XtoY.trainable_variables))
    generator_f_optimizer.apply_gradients(zip(gradients_of_gen_f, G_YtoX.trainable_variables))
    
    discriminator_x_optimizer.apply_gradients(zip(gradients_of_disc_x, D_X.trainable_variables))
    discriminator_y_optimizer.apply_gradients(zip(gradients_of_disc_y, D_Y.trainable_variables))
    
    return total_gen_g_loss, total_gen_f_loss, disc_x_loss, disc_y_loss

# Train the models while collecting the losses
def train_with_loss_collection(dataset_x, dataset_y, epochs):
    num_images = 5
    sample_images_x = next(iter(dataset_X))[:num_images]
    sample_images_y = next(iter(dataset_Y))[:num_images]

    print('dataset_X size:', len(dataset_X), 'dataset_Y size:', len(dataset_Y))

    for epoch in range(epochs):
        epoch_gen_g_loss, epoch_gen_f_loss = 0, 0
        epoch_disc_x_loss, epoch_disc_y_loss = 0, 0

        epoch_batches = min(len(dataset_X), len(dataset_Y))

        # Shuffle the datasets
        dataset_X.shuffle(buffer_size=1000)
        dataset_Y.shuffle(buffer_size=1000)
        
        for i, (image_batch_x, image_batch_y) in enumerate(zip(dataset_x, dataset_y)):
            gen_g_loss, gen_f_loss, disc_x_loss, disc_y_loss = train_step_with_loss_collection(image_batch_x, image_batch_y)

            # Print the losses
            print(f'Batch {i + 1}/{epoch_batches}: Generator G Loss: {gen_g_loss}, Discriminator Y Loss: {disc_y_loss}, '
                  f'Generator F Loss: {gen_f_loss}, Discriminator X Loss: {disc_x_loss}')
            
            epoch_gen_g_loss += gen_g_loss
            epoch_gen_f_loss += gen_f_loss
            epoch_disc_x_loss += disc_x_loss
            epoch_disc_y_loss += disc_y_loss

            # Append the batch losses
            batch_gen_g_losses.append(gen_g_loss)
            batch_gen_f_losses.append(gen_f_loss)
            batch_disc_x_losses.append(disc_x_loss)
            batch_disc_y_losses.append(disc_y_loss)

        epoch_gen_g_loss /= epoch_batches
        epoch_gen_f_loss /= epoch_batches
        epoch_disc_x_loss /= epoch_batches
        epoch_disc_y_loss /= epoch_batches

        gen_g_losses.append(epoch_gen_g_loss)
        gen_f_losses.append(epoch_gen_f_loss)
        disc_x_losses.append(epoch_disc_x_loss)
        disc_y_losses.append(epoch_disc_y_loss)
        
        print(f'Epoch {epoch+1}/{epochs}, '
              f'Generator G Loss: {epoch_gen_g_loss}, Discriminator Y Loss: {epoch_disc_y_loss}, '
              f'Generator F Loss: {epoch_gen_f_loss}, Discriminator X Loss: {epoch_disc_x_loss}')
        
        display_generated_images(f'XtoY: Epoch {epoch + 1}', G_XtoY, sample_images_x)
        display_generated_images(f'YtoX: Epoch {epoch + 1}', G_YtoX, sample_images_y)
        
    # Save models
    G_XtoY.save(root_folder + 'generator_g.h5')
    G_YtoX.save(root_folder + 'generator_f.h5')
    D_X.save(root_folder + 'discriminator_x.h5')
    D_Y.save(root_folder + 'discriminator_y.h5')


train_with_loss_collection(dataset_X, dataset_Y, epochs=50)

# Plot the losses
plot_losses(gen_g_losses, gen_f_losses, batch_gen_g_losses, batch_disc_y_losses,
            gen_f_losses, disc_x_losses, batch_gen_f_losses, batch_disc_x_losses)


5. Define and train the GAN

In [None]:
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, models

# Assuming generator and discriminator are already defined and compiled
discriminator = tf.keras.models.load_model(root_folder + 'discriminator.h5')
# generator = tf.keras.models.load_model(root_folder + 'generator.h5')
# print("Model(s) loaded successfully")

# Define loss function
binary_crossentropy = tf.keras.losses.BinaryCrossentropy(from_logits=False)

# Define optimizers
generator_optimizer = tf.keras.optimizers.Adam(1e-4, beta_1=0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4, beta_1=0.5)

# Function to store loss values
gen_losses, disc_losses, batch_gen_losses, batch_disc_losses = [], [], [], []

@tf.function
def train_step(real_images):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        generated_images = generator(real_images, training=True)

        real_output = discriminator(real_images, training=True)
        fake_output = discriminator(generated_images, training=True)

        gen_loss = binary_crossentropy(tf.ones_like(fake_output), fake_output)
        disc_loss_real = binary_crossentropy(tf.ones_like(real_output), real_output)
        disc_loss_fake = binary_crossentropy(tf.zeros_like(fake_output), fake_output)
        disc_loss = disc_loss_real + disc_loss_fake

    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

    return gen_loss, disc_loss

def train(dataset, epochs):
    num_images = 5
    sample_images = next(iter(dataset_X))[:num_images]

    for epoch in range(epochs):
        i = 0
        total_gen_loss = 0
        total_disc_loss = 0
        for image_batch in dataset:
            i += 1
            gen_loss, disc_loss = train_step(image_batch)
            print(f'Batch {i}/{len(dataset)}: Generator Loss: {gen_loss}, Discriminator Loss: {disc_loss}')
            total_gen_loss += gen_loss
            total_disc_loss += disc_loss

            # Store batch losses
            batch_gen_losses.append(gen_loss.numpy())
            batch_disc_losses.append(disc_loss.numpy())

        epoch_gen_loss = total_gen_loss / i
        epoch_disc_loss = total_disc_loss / i

        print(f'Epoch: {epoch+1}, Overall Generator Loss: {epoch_gen_loss}, Overall Discriminator Loss: {epoch_disc_loss}')
        gen_losses.append(epoch_gen_loss.numpy())
        disc_losses.append(epoch_disc_loss.numpy())

        display_generated_images(f'Epoch {epoch + 1}', generator, sample_images)

# Train the GAN
train(dataset_X, epochs=150)

# Call the function to plot the losses
plot_losses(gen_losses, disc_losses, batch_gen_losses, batch_disc_losses)

# Save the trained generator model
generator.save(root_folder + 'generator.h5')

# Save the updated discriminator model
discriminator.save(root_folder + 'updated_discriminator.h5')


6. Re-load the GAN and test it with a small set of photos

In [None]:
from tensorflow.keras.models import load_model

generator = load_model(root_folder + 'generator_g.h5')

generator.compile()

In [None]:
from matplotlib import pyplot as plt
import os
import tensorflow as tf

def transform_and_save_images(generator, test_image_paths, output_folder):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    test_images = []

    for i, path in enumerate(test_image_paths):
        test_image = tf.keras.preprocessing.image.load_img(path, target_size=(256, 256))
        test_image = tf.keras.preprocessing.image.img_to_array(test_image)
        test_image = normalize_zero_to_one(test_image)
        test_images.append(test_image)

    display_generated_images('Test Images', generator, test_images)

# Example usage
test_image_paths = [
    root_folder + 'photo_jpg/0f8e316a87.jpg',
    root_folder + 'photo_jpg/0039ebb598.jpg'
]
output_folder = root_folder + 'generated_paintings/'

transform_and_save_images(generator, test_image_paths, output_folder)