This notebook is used for every view. Just datapath is changed.

In [None]:
import numpy as np
import os
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from PIL import Image
from tensorflow.keras import backend as K

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Path
DATA_PATH_F = '/content/drive/MyDrive/sve_F-bez augumentacije/Train/F'
DATA_PATH_M = '/content/drive/MyDrive/sve_F-bez augumentacije/Train/M'
DATA_PATH = '/content/drive/MyDrive/sve_F-bez augumentacije/Train'
BATCH_SIZE = 32
NUM_CLASSES = 2  # Number of classes, F and M in this case

In [None]:
# Load the dataset
def load_image(image_path, label):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, [224, 224])
    img = (img - 127.5) / 127.5  # Normalize to [-1,1]
    return img, label

In [None]:
# Create a TensorFlow dataset
dataset_f = tf.data.Dataset.list_files(DATA_PATH_F + '/*.jpg').map(lambda x: load_image(x, 0))
dataset_m = tf.data.Dataset.list_files(DATA_PATH_M + '/*.jpg').map(lambda x: load_image(x, 1))
dataset = dataset_f.concatenate(dataset_m)
dataset = dataset.shuffle(buffer_size=1000).batch(BATCH_SIZE).prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
# self attention
class SelfAttention(keras.layers.Layer):
    def __init__(self, in_dim, attention_dim):
        super(SelfAttention, self).__init__()
        self.query_conv = layers.Conv2D(filters=attention_dim, kernel_size=1, padding='same')
        self.key_conv = layers.Conv2D(filters=attention_dim, kernel_size=1, padding='same')
        self.value_conv = layers.Conv2D(filters=in_dim, kernel_size=1, padding='same')
        self.gamma = self.add_weight(name='gamma', shape=(), initializer='zeros', trainable=True)
        self.softmax = layers.Softmax(axis=-1)

    def call(self, x):
        # Accessing the shape components by indexing
        batch_size = tf.shape(x)[0]
        height = tf.shape(x)[1]
        width = tf.shape(x)[2]
        C = tf.shape(x)[3]

        proj_query = tf.reshape(self.query_conv(x), [batch_size, -1, width*height])
        proj_query = tf.transpose(proj_query, perm=[0, 2, 1])
        proj_key = tf.reshape(self.key_conv(x), [batch_size, -1, width*height])
        energy = tf.matmul(proj_query, proj_key)
        attention = self.softmax(energy)
        proj_value = tf.reshape(self.value_conv(x), [batch_size, -1, width*height])

        out = tf.matmul(proj_value, tf.transpose(attention, perm=[0, 2, 1]))
        out = tf.reshape(out, [batch_size, height, width, C])
        out = self.gamma * out + x
        return out

In [None]:
# Generator model
def make_conditional_generator_model(input_shape=(100,), num_classes=NUM_CLASSES):
    noise_input = layers.Input(shape=input_shape)
    label_input = layers.Input(shape=(1,), dtype='int32')

    label_embedding = layers.Embedding(num_classes, input_shape[0])(label_input)
    label_embedding = layers.Flatten()(label_embedding)

    merged_input = layers.Concatenate()([noise_input, label_embedding])

    x = layers.Dense(7*7*256, use_bias=False)(merged_input)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(alpha=0.2)(x)
    x = layers.Reshape((7, 7, 256))(x)

    x = layers.Conv2DTranspose(128, kernel_size=4, strides=2, padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(alpha=0.2)(x)

    x = layers.Conv2DTranspose(64, kernel_size=4, strides=2, padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(alpha=0.2)(x)

    # Add Self-Attention here if needed
    x = SelfAttention(64, 16)(x)


    x = layers.Conv2DTranspose(32, kernel_size=4, strides=2, padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(alpha=0.2)(x)

    x = layers.Conv2DTranspose(16, kernel_size=4, strides=2, padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(alpha=0.2)(x)

    out = layers.Conv2DTranspose(3, kernel_size=4, strides=2, padding='same', use_bias=False, activation='tanh')(x)

    model = keras.Model(inputs=[noise_input, label_input], outputs=out)
    return model

In [None]:
# Discriminator model
def make_conditional_discriminator_model(img_shape=(224, 224, 3), num_classes=NUM_CLASSES):
    image_input = layers.Input(shape=img_shape)
    label_input = layers.Input(shape=(1,), dtype='int32')

    label_embedding = layers.Embedding(num_classes, np.prod(img_shape))(label_input)
    label_embedding = layers.Flatten()(label_embedding)
    label_embedding = layers.Reshape(img_shape)(label_embedding)

    merged_input = layers.Concatenate()([image_input, label_embedding])

    x = layers.Conv2D(16, kernel_size=4, strides=2, padding='same')(merged_input)
    x = layers.LeakyReLU(alpha=0.2)(x)
    x = layers.Dropout(0.5)(x)

    x = layers.Conv2D(32, kernel_size=4, strides=2, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(alpha=0.2)(x)
    x = layers.Dropout(0.5)(x)

    x = layers.Conv2D(64, kernel_size=4, strides=2, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(alpha=0.2)(x)
    x = layers.Dropout(0.5)(x)

    x = layers.Conv2D(128, kernel_size=4, strides=2, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(alpha=0.2)(x)
    x = layers.Dropout(0.5)(x)

    x = layers.Conv2D(256, kernel_size=4, strides=2, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(alpha=0.2)(x)
    x = layers.Dropout(0.5)(x)

    x = layers.Flatten()(x)
    out = layers.Dense(1)(x)  # No activation, because from_logits=True in loss function

    model = keras.Model([image_input, label_input], out)
    return model

In [None]:
generator = make_conditional_generator_model()
discriminator = make_conditional_discriminator_model()

In [None]:
# Define the loss and optimizers
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)


initial_learning_rate = 0.0002

lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate,
    decay_steps=1000,
    decay_rate=0.96,
    staircase=True)

# Adjusted the beta_1 value for the Adam optimizer
generator_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule, beta_1=0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule, beta_1=0.5)

In [None]:
noise_dim = 100

In [None]:
# Training step for conditional GAN with gradient clipping
@tf.function
def train_step(images, labels):
    # Generate random noise
    noise = tf.random.normal([BATCH_SIZE, noise_dim])
    # Generate fake labels
    fake_labels = tf.random.uniform([BATCH_SIZE, 1], minval=0, maxval=NUM_CLASSES, dtype=tf.int32)

    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        # Generate fake images
        generated_images = generator([noise, fake_labels], training=True)

        # Get the logits for real images
        real_output = discriminator([images,labels], training=True)
        # Get the logits for fake images
        fake_output = discriminator([generated_images,fake_labels], training=True)

        # Calculate the generator loss
        gen_loss = cross_entropy(tf.ones_like(fake_output), fake_output)
        # Calculate the discriminator loss for real images
        real_loss = cross_entropy(tf.ones_like(real_output), real_output)
        # Calculate the discriminator loss for fake images
        fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
        # Total discriminator loss
        disc_loss = real_loss + fake_loss

    # Get the gradients for the generator
    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    # Get the gradients for the discriminator
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    # Clip the gradients for the generator
    clipped_gen_gradients = [tf.clip_by_value(grad, -1., 1.) for grad in gradients_of_generator]
    # Clip the gradients for the discriminator
    clipped_disc_gradients = [tf.clip_by_value(grad, -1., 1.) for grad in gradients_of_discriminator]

    # Apply the gradients to the optimizer
    generator_optimizer.apply_gradients(zip(clipped_gen_gradients, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(clipped_disc_gradients, discriminator.trainable_variables))

    return gen_loss, disc_loss

In [None]:
def generate_and_save_images(model, epoch, test_input, num_images_per_class=100):
    # Ensure output path exists
    output_path = os.path.join(DATA_PATH, f'output_cGAN{epoch}')
    if not os.path.exists(output_path):
        os.makedirs(output_path)

    # Prepare noise vector and class labels
    noise = tf.random.normal([num_images_per_class * NUM_CLASSES, 100])
    labels = tf.concat([tf.fill([num_images_per_class, 1], class_id) for class_id in range(NUM_CLASSES)], axis=0)

    predictions = model([noise, labels], training=False)

    # Convert from [-1, 1] to [0, 255]
    predictions = (predictions + 1) * 127.5
    predictions = tf.cast(predictions, tf.uint8)

    # Save images
    for i in range(num_images_per_class * NUM_CLASSES):
        class_id = i // num_images_per_class
        img = predictions[i]
        path = os.path.join(output_path, f'class_{class_id}_sample_{i % num_images_per_class}.png')
        tf.keras.preprocessing.image.save_img(path, img)


In [None]:
seed = tf.random.normal([150, 100])

In [None]:
num_examples_to_generate = 200  # Total number of images
num_classes = 2  # Number of classes

# Create the noise vector
noise = tf.random.normal([num_examples_to_generate, noise_dim])

# Create the class labels
labels = tf.concat([
    tf.fill([num_examples_to_generate // num_classes, 1], class_id)
    for class_id in range(num_classes)
], axis=0)

# Combine noise and labels to create the seed
seed = (noise, labels)

In [None]:
# Define the training function
def train(dataset, epochs, num_images_per_class):
    for epoch in range(epochs):
        for image_batch, label_batch in dataset:
            gen_loss, disc_loss = train_step(image_batch, label_batch)

        # Print the losses
        print(f"Epoch {epoch+1}, Generator Loss: {gen_loss}, Discriminator Loss: {disc_loss}")

        # Generate and save images only for the last 5 epochs
        if epoch >= (epochs - 10):
            generate_and_save_images(generator, epoch + 1, seed, num_images_per_class)

In [None]:
# Call the train function
train(dataset, 10000, 100)