In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from tqdm import trange
import os
from PIL import Image
import tensorflow as tf

# Dataset creation and preprocessing

In [None]:
np.random.seed(0)


DATA_DIR = "flowers"
x_paths = [DATA_DIR+"/"+i for i in sorted(os.listdir(DATA_DIR)) if not i.startswith(".")]
data = []

#resize images if necessary
for image_path_i in trange(len(x_paths)):
    image_path = x_paths[image_path_i]

    try:
        image_array = np.array(Image.open(image_path))
        if len(image_array.shape) == 3:  # Check if the image has 3 dimensions
            data.append(tf.image.resize(image_array, (128, 128)))
    except Exception as e:
        print(f"Error processing image {image_path}: {str(e)}")

#create the splits
data = np.array(data)
print(data.shape)
print("train/test/val split pt1")
X_train, X_test = train_test_split(data, test_size=0.2, random_state=1)
print("train/test/val split pt2")
X_train, X_val = train_test_split(X_train, test_size=0.25, random_state=1)  # 0.25 x 0.8 = 0.2

#save array into folder
if not os.path.exists('dataset'):
    os.makedirs('dataset')

with open('dataset/X_train.npy', 'wb+') as f:
    np.save(f, np.array(X_train))

with open('dataset/X_val.npy', 'wb+') as f:
    np.save(f, np.array(X_val))

with open('dataset/X_test.npy', 'wb+') as f:
    np.save(f, np.array(X_test))

# Train definition

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from tqdm import trange
import os
from PIL import Image
import tensorflow as tf
from tqdm import tqdm
import keras.api._v2.keras as K
import pickle
import time

In [None]:
#collegamento drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
def train_dnn(X_train, X_val, model, name, batch_size=128, epochs=30, save_only_weights=False):
    @tf.function
    def train_step(x, y):
        with tf.GradientTape() as tape:
            preds = model(x, training=True)
            loss_value = model.loss(y, preds)
        grads = tape.gradient(loss_value, model.trainable_weights)
        model.optimizer.apply_gradients(zip(grads, model.trainable_weights))
        return loss_value

    def hue_shift(image):
        hue_shift_value = tf.random.uniform([], -0.5, 0.5)
        adjusted_image = tf.image.adjust_hue(image, hue_shift_value)
        return adjusted_image

    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, X_train))
    train_dataset = train_dataset.map(lambda x, y: (hue_shift(x), y))

    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, X_train)) \
        .map(lambda x, y: (tf.reduce_mean(x, axis=-1, keepdims=True), tf.cast(y, tf.float32) / 255.)) \
        .cache() \
        .shuffle(buffer_size=1024) \
        .batch(batch_size)

    val_dataset = tf.data.Dataset.from_tensor_slices((X_val, X_val)) \
        .map(lambda x, y: (tf.reduce_mean(x, axis=-1, keepdims=True), tf.cast(y, tf.float32) / 255.)) \
        .cache() \
        .shuffle(buffer_size=1024) \
        .batch(batch_size)

    losses_history = []
    for epoch in range(epochs):
        print("\nStart of epoch %d" % (epoch,))
        start_time = time.time()

        train_losses = []
        for step, (x_batch_train, y_batch_train) in enumerate(tqdm(train_dataset)):
            train_losses.append(
                train_step(x_batch_train, y_batch_train)
            )

        val_losses = []
        for x_batch_val, y_batch_val in val_dataset:
            val_losses.append(model.evaluate(x_batch_val, y_batch_val, verbose=0))

        print("Train loss: %.4f" % (float(np.mean(train_losses)),))
        print("Validation loss: %.4f" % (float(np.mean(val_losses)),))
        print("Time taken: %.2fs" % (time.time() - start_time))

        losses_history.append([float(np.mean(train_losses)), float(np.mean(val_losses))])

        if save_only_weights:
            weights_dir = f"drive/MyDrive/models/{name}"
            os.makedirs(weights_dir, exist_ok=True)
            model.save_weights(f"{weights_dir}/{epoch}")
        else:
            model_dir = f"drive/MyDrive/models/{name}"
            os.makedirs(model_dir, exist_ok=True)
            model.save(f"{model_dir}/{epoch}")

        history_dir = f"drive/MyDrive/models/{name}"
        os.makedirs(history_dir, exist_ok=True)
        with open(f"{history_dir}/loss_history", "wb+") as f:
            pickle.dump(losses_history, f)

    return losses_history


def train_pix2pix(X_train, generator, discriminator, name, alpha=1.0, batch_size=32, epochs=30, save_only_weights=False):
    @tf.function
    def train_step(x, y):
        generated_rgb_images = generator(x, training=False)
        with tf.GradientTape() as tape:
            preds_1 = discriminator([x, y])
            preds_0 = discriminator([x, generated_rgb_images])
            loss = tf.reduce_mean((1 - preds_1)**2) + tf.reduce_mean(preds_0**2)
        grads = tape.gradient(loss, discriminator.trainable_weights)
        discriminator.optimizer.apply_gradients(zip(grads, discriminator.trainable_weights))

        with tf.GradientTape() as tape:
            generated_rgb_images = generator(x)
            preds = discriminator([x, generated_rgb_images])
            loss = generator.loss(y, generated_rgb_images) + alpha * (
                -tf.reduce_mean(
                    tf.math.log(preds + 1e-8)
                )
            )
        grads = tape.gradient(loss, generator.trainable_weights)
        generator.optimizer.apply_gradients(zip(grads, generator.trainable_weights))
        return tf.reduce_mean(preds)

    def hue_shift(image):
        hue_shift_value = tf.random.uniform([], -0.5, 0.5)
        adjusted_image = tf.image.adjust_hue(image, hue_shift_value)
        return adjusted_image

    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, X_train))
    train_dataset = train_dataset.map(lambda x, y: (hue_shift(x), y))


     #def contrast_adjust(image):
      #contrast_factor = tf.random.uniform([], 0.5, 1.5)
      #adjusted_image = tf.image.adjust_contrast(image, contrast_factor)
      #return adjusted_image

    #train_dataset = tf.data.Dataset.from_tensor_slices((X_train, X_train))
    #train_dataset = train_dataset.map(lambda x, y: (contrast_adjust(x), y))

    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, X_train)) \
        .map(lambda x, y: (tf.reduce_mean(x, axis=-1, keepdims=True), tf.cast(y, tf.float32) / 255.)) \
        .cache() \
        .shuffle(buffer_size=1024) \
        .batch(batch_size)

    for epoch in range(epochs):
        disc_losses = []
        print("\nStart of epoch %d" % (epoch,))
        start_time = time.time()

        for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
            disc_losses.append(train_step(x_batch_train, y_batch_train))
            print(f"\r{step}/{train_dataset.cardinality().numpy()} - Avg disc pred: {np.mean(disc_losses[-3:])}", end="")

        print("Time taken: %.2fs" % (time.time() - start_time))

        if save_only_weights:
            generator.save_weights(f"models/{name}/{epoch}/generator")
            discriminator.save_weights(f"models/{name}/{epoch}/discriminator")
        else:
            generator.save(f"models/{name}/{epoch}/generator")
            discriminator.save(f"models/{name}/{epoch}/discriminator")

# Autoencoder

In [None]:
def create_autoencoder():
    inputs = K.layers.Input(shape=(128, 128, 1))
    x = K.layers.BatchNormalization()(inputs)
    x = K.layers.Conv2D(32, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(32, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(32, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)

    x = K.layers.MaxPool2D(2)(x)

    x = K.layers.Conv2D(64, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(64, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(64, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)

    x = K.layers.MaxPool2D(2)(x)

    x = K.layers.Conv2D(128, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(128, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(128, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)

    x = K.layers.UpSampling2D(2)(x)

    x = K.layers.Conv2D(64, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(64, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(64, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)

    x = K.layers.UpSampling2D(2)(x)

    x = K.layers.Conv2D(32, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(32, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(3, 1, activation="sigmoid", padding="SAME", use_bias=True)(x)

    autoencoder = K.models.Model(inputs=inputs, outputs=x)
    return autoencoder


if __name__ == "__main__":
    with open('drive/MyDrive/dataset/X_train.npy', 'rb') as f:
        X_train = np.load(f)
    with open('drive/MyDrive/dataset/X_val.npy', 'rb') as f:
        X_val = np.load(f)
    with open('drive/MyDrive/dataset/X_test.npy', 'rb') as f:
        X_test = np.load(f)

    autoencoder = create_autoencoder()

    autoencoder.compile(
        tf.optimizers.legacy.Adam(1e-3),
        loss=tf.losses.MeanSquaredError()
    )
    train_dnn(X_train, X_val, autoencoder, "autoencoder", batch_size=16)


# Deep neural network

In [None]:
def create_dnet():
    inputs = K.layers.Input(shape=(128, 128, 1))
    x = K.layers.BatchNormalization(axis=-1)(inputs)

    for _ in range(6):
        x = K.layers.Conv2D(64, 3, activation="linear", padding="SAME", use_bias=False)(x)
        x = K.layers.BatchNormalization(axis=-1)(x)
        x = K.layers.Activation(tf.nn.leaky_relu)(x)

    x = K.layers.Conv2D(3, 3, activation="sigmoid", padding="SAME", use_bias=True)(x)

    dnet = K.models.Model(inputs=inputs, outputs=x)
    return dnet

if __name__ == "__main__":
    with open('drive/MyDrive/dataset/X_train.npy', 'rb') as f:
        X_train = np.load(f)
    with open('drive/MyDrive/dataset/X_val.npy', 'rb') as f:
        X_val = np.load(f)
    with open('drive/MyDrive/dataset/X_test.npy', 'rb') as f:
        X_test = np.load(f)

    dnet = create_dnet()

    dnet.compile(
        optimizer=tf.optimizers.legacy.Adam(1e-3),
        loss=tf.losses.MeanSquaredError()
    )
    train_dnn(X_train, X_val, dnet, "dnet", batch_size=16)


# Unet

In [None]:
class ResidualLayer(K.layers.Layer):
    def __init__(self, ffnn, **kwargs):
        super().__init__(**kwargs)
        self.ffnn = K.models.Sequential(ffnn)

    def call(self, inputs, *args, **kwargs):
        return tf.concat((self.ffnn(inputs), inputs), axis=-1)


def create_unet():
    inputs = K.layers.Input(shape=(128, 128, 1))
    x = K.layers.BatchNormalization(axis=-1)(inputs)

    # Encoder
    for filters in [16, 32, 64, 128, 256]:
        x = K.layers.Conv2D(filters, 3, activation="linear", padding="SAME", use_bias=False)(x)
        x = K.layers.BatchNormalization(axis=-1)(x)
        x = K.layers.Activation(tf.nn.leaky_relu)(x)

        if filters != 256:
            x = K.layers.MaxPool2D(2)(x)
        else:
            x = ResidualLayer([
                K.layers.Conv2D(256, 1, activation="linear", padding="SAME", use_bias=False),
                K.layers.BatchNormalization(axis=-1),
                K.layers.Activation(tf.nn.leaky_relu),
                K.layers.Conv2D(256, 3, activation="linear", padding="SAME", use_bias=False),
                K.layers.BatchNormalization(axis=-1),
                K.layers.Activation(tf.nn.leaky_relu),
            ])(x)

    # Decoder
    for filters in [128, 64, 32, 16]:
        x = K.layers.UpSampling2D(2)(x)
        x = K.layers.Conv2D(filters, 3, activation="linear", padding="SAME", use_bias=False)(x)
        x = K.layers.BatchNormalization(axis=-1)(x)
        x = K.layers.Activation(tf.nn.leaky_relu)(x)
        x = K.layers.Conv2D(filters, 3, activation="linear", padding="SAME", use_bias=False)(x)
        x = K.layers.BatchNormalization(axis=-1)(x)
        x = K.layers.Activation(tf.nn.leaky_relu)(x)

    # Output Layer
    x = K.layers.UpSampling2D(2)(x)
    x = K.layers.Conv2D(3, 3, activation="sigmoid", padding="SAME")(x)

    unet = K.models.Model(inputs=inputs, outputs=x)
    return unet


if __name__ == "__main__":
    with open('drive/MyDrive/dataset/X_train.npy', 'rb') as f:
        X_train = np.load(f)
    with open('drive/MyDrive/dataset/X_val.npy', 'rb') as f:
        X_val = np.load(f)
    with open('drive/MyDrive/dataset/X_test.npy', 'rb') as f:
        X_test = np.load(f)
    unet = create_unet()
    unet.compile(
        optimizer=tf.optimizers.legacy.Adam(1e-3),
        loss=tf.losses.MeanSquaredError()
    )

    train_dnn(X_train, X_val, unet, "unet", batch_size=16, save_only_weights=False)

# Resnet

In [None]:
class ResidualLayer(K.layers.Layer):
    def __init__(self, ffnn, **kwargs):
        super().__init__(**kwargs)
        self.ffnn = K.models.Sequential(ffnn)

    def call(self, inputs, *args, **kwargs):
        return self.ffnn(inputs) + inputs


def create_resnet():
    inputs = K.layers.Input(shape=(128, 128, 1))
    x = K.layers.BatchNormalization(axis=-1)(inputs)
    x = K.layers.Conv2D(256, 1, activation="linear", padding="SAME")(x)

    # Initial Residual Block
    x = ResidualLayer([
        K.layers.BatchNormalization(axis=-1),
        K.layers.Activation(tf.nn.leaky_relu),
        K.layers.Conv2D(32, 1, activation="linear", padding="SAME", use_bias=False),
        K.layers.BatchNormalization(axis=-1),
        K.layers.Activation(tf.nn.leaky_relu),
        K.layers.Conv2D(32, 3, activation="linear", padding="SAME", use_bias=False),
        K.layers.BatchNormalization(axis=-1),
        K.layers.Activation(tf.nn.leaky_relu),
        K.layers.Conv2D(256, 1, activation="linear", padding="SAME", use_bias=False),
    ])(x)

    # Residual Blocks
    for _ in range(20):
        x = ResidualLayer([
            K.layers.BatchNormalization(axis=-1),
            K.layers.Activation(tf.nn.leaky_relu),
            K.layers.Conv2D(32, 1, activation="linear", padding="SAME", use_bias=False),
            K.layers.BatchNormalization(axis=-1),
            K.layers.Activation(tf.nn.leaky_relu),
            K.layers.Conv2D(32, 3, activation="linear", padding="SAME", use_bias=False),
            K.layers.BatchNormalization(axis=-1),
            K.layers.Activation(tf.nn.leaky_relu),
            K.layers.Conv2D(256, 1, activation="linear", padding="SAME", use_bias=False),
        ])(x)

    # Finalization
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(3, 3, activation="sigmoid", padding="SAME")(x)

    resnet = K.models.Model(inputs=inputs, outputs=x)
    return resnet


if __name__ == "__main__":
    with open('drive/MyDrive/dataset/X_train.npy', 'rb') as f:
        X_train = np.load(f)
    with open('drive/MyDrive/dataset/X_val.npy', 'rb') as f:
        X_val = np.load(f)
    with open('drive/MyDrive/dataset/X_test.npy', 'rb') as f:
        X_test = np.load(f)

    resnet = create_resnet()

    resnet.compile(
        optimizer=tf.optimizers.legacy.Adam(1e-3),
        loss=tf.losses.MeanSquaredError()
    )
    train_dnn(X_train, X_val, resnet, "resnet", batch_size=8)


# Dense net

In [None]:
class ResidualLayer(K.layers.Layer):
    def __init__(self, ffnn, **kwargs):
        super().__init__(**kwargs)
        self.ffnn = K.models.Sequential(ffnn)

    def call(self, inputs, *args, **kwargs):
        return tf.concat((self.ffnn(inputs), inputs), axis=-1)


def create_dense_net():
    inputs = K.layers.Input(shape=(128, 128, 1))
    x = K.layers.Conv2D(filters=256, kernel_size=1, padding="SAME", activation="linear")(inputs)
    x = K.layers.BatchNormalization()(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    concatenated_inputs = x
    for f in [128] * 7:
        x = K.layers.Conv2D(filters=64, kernel_size=1, padding="SAME", activation="linear")(concatenated_inputs)
        x = K.layers.BatchNormalization()(x)
        x = K.layers.Activation(tf.nn.leaky_relu)(x)
        x = K.layers.Conv2D(filters=f, kernel_size=4, padding="SAME", activation="linear")(x)
        x = K.layers.BatchNormalization()(x)
        x = K.layers.Activation(tf.nn.leaky_relu)(x)
        concatenated_inputs = K.layers.Concatenate()([concatenated_inputs, x])

    x = K.layers.Conv2D(filters=64, kernel_size=1, padding="SAME", activation="linear")(concatenated_inputs)
    x = K.layers.BatchNormalization()(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(3, 3, activation="sigmoid", padding="SAME")(x)
    dense_net = K.models.Model(inputs=[inputs], outputs=[x])
    return dense_net


if __name__ == "__main__":
    with open('drive/MyDrive/dataset/X_train.npy', 'rb') as f:
        X_train = np.load(f)
    with open('drive/MyDrive/dataset/X_val.npy', 'rb') as f:
        X_val = np.load(f)
    with open('drive/MyDrive/dataset/X_test.npy', 'rb') as f:
        X_test = np.load(f)

    dense_net = create_dense_net()

    dense_net.compile(
        optimizer=tf.optimizers.legacy.Adam(1e-3),
        loss=tf.losses.MeanSquaredError()
    )

    train_dnn(X_train, X_val, dense_net, "dense_net", batch_size=16, save_only_weights=False)


# Pix2pix

In [None]:
class ResidualLayer(K.layers.Layer):
    def __init__(self, ffnn, **kwargs):
        super().__init__(**kwargs)
        self.ffnn = K.models.Sequential(ffnn)

    def call(self, inputs, *args, **kwargs):
        return tf.concat((self.ffnn(inputs), inputs), axis=-1)


def create_discriminator_p2p():
    inputs = K.layers.Input(shape=(128, 128, 3))
    conditioning = K.layers.Input(shape=(128, 128, 1))
    x = K.layers.Concatenate(axis=-1)([inputs, conditioning])
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Conv2D(512, 3, activation="linear", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(128, 1, activation="linear", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(128, 3, activation="linear", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.sigmoid)(x)
    x = K.layers.Conv2D(64, 1, activation="linear", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(64, 3, activation="linear", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(1, 1, activation="linear", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.sigmoid)(x)
    disc = K.models.Model(inputs=[conditioning, inputs], outputs=x)
    return disc


def create_generator_p2p():
    inputs = K.layers.Input(shape=(128, 128, 1))
    x = K.layers.BatchNormalization(axis=-1)(inputs)
    x = K.layers.Conv2D(16, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(16, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.MaxPool2D(2)(x)
    x = ResidualLayer([
        K.layers.Conv2D(32, 3, activation="linear", padding="SAME", use_bias=False),
        K.layers.BatchNormalization(axis=-1),
        K.layers.Activation(tf.nn.leaky_relu),
        K.layers.Conv2D(32, 3, activation="linear", padding="SAME", use_bias=False),
        K.layers.BatchNormalization(axis=-1),
        K.layers.Activation(tf.nn.leaky_relu),
        K.layers.MaxPool2D(2),
        ResidualLayer([
            K.layers.Conv2D(64, 3, activation="linear", padding="SAME", use_bias=False),
            K.layers.BatchNormalization(axis=-1),
            K.layers.Activation(tf.nn.leaky_relu),
            K.layers.Conv2D(64, 3, activation="linear", padding="SAME", use_bias=False),
            K.layers.BatchNormalization(axis=-1),
            K.layers.Activation(tf.nn.leaky_relu),
            K.layers.MaxPool2D(2),
            ResidualLayer([
                K.layers.Conv2D(128, 3, activation="linear", padding="SAME", use_bias=False),
                K.layers.BatchNormalization(axis=-1),
                K.layers.Activation(tf.nn.leaky_relu),
                K.layers.Conv2D(128, 3, activation="linear", padding="SAME", use_bias=False),
                K.layers.BatchNormalization(axis=-1),
                K.layers.Activation(tf.nn.leaky_relu),
                K.layers.MaxPool2D(2),
                ResidualLayer([
                    K.layers.Conv2D(256, 1, activation="linear", padding="SAME", use_bias=False),
                    K.layers.BatchNormalization(axis=-1),
                    K.layers.Activation(tf.nn.leaky_relu),
                    K.layers.Conv2D(256, 3, activation="linear", padding="SAME", use_bias=False),
                    K.layers.BatchNormalization(axis=-1),
                    K.layers.Activation(tf.nn.leaky_relu),
                ]),
                K.layers.UpSampling2D(2),
                K.layers.Conv2D(128, 3, activation="linear", padding="SAME", use_bias=False),
                K.layers.BatchNormalization(axis=-1),
                K.layers.Activation(tf.nn.leaky_relu),
                K.layers.Conv2D(128, 3, activation="linear", padding="SAME", use_bias=False),
                K.layers.BatchNormalization(axis=-1),
                K.layers.Activation(tf.nn.leaky_relu),
            ]),
            K.layers.UpSampling2D(2),
            K.layers.Conv2D(64, 3, activation="linear", padding="SAME", use_bias=False),
            K.layers.BatchNormalization(axis=-1),
            K.layers.Activation(tf.nn.leaky_relu),
            K.layers.Conv2D(64, 3, activation="linear", padding="SAME", use_bias=False),
            K.layers.BatchNormalization(axis=-1),
            K.layers.Activation(tf.nn.leaky_relu),
        ]),
        K.layers.UpSampling2D(2),
        K.layers.Conv2D(32, 3, activation="linear", padding="SAME", use_bias=False),
        K.layers.BatchNormalization(axis=-1),
        K.layers.Activation(tf.nn.leaky_relu),
        K.layers.Conv2D(32, 3, activation="linear", padding="SAME", use_bias=False),
        K.layers.BatchNormalization(axis=-1),
        K.layers.Activation(tf.nn.leaky_relu),
    ])(x)
    x = K.layers.UpSampling2D(2)(x)
    x = K.layers.Conv2D(16, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(16, 3, activation="linear", padding="SAME", use_bias=False)(x)
    x = K.layers.BatchNormalization(axis=-1)(x)
    x = K.layers.Activation(tf.nn.leaky_relu)(x)
    x = K.layers.Conv2D(3, 3, activation="sigmoid", padding="SAME")(x)

    unet = K.models.Model(inputs=inputs, outputs=x)
    return unet


if __name__ == "__main__":
    with open('drive/MyDrive/cache/X_train.npy', 'rb') as f:
        X_train = np.load(f)
    with open('drive/MyDrive/cache/X_val.npy', 'rb') as f:
        X_val = np.load(f)
    with open('drive/MyDrive/cache/X_test.npy', 'rb') as f:
        X_test = np.load(f)

    unet = create_generator_p2p()
    patch_gan = create_discriminator_p2p()

    unet.compile(
        optimizer=tf.optimizers.legacy.Adam(1e-4, beta_1=0.5),
        loss=tf.losses.MeanAbsoluteError()
    )
    patch_gan.compile(
        optimizer=tf.optimizers.legacy.Adam(1e-4, beta_1=0.5)
    )

    train_pix2pix(X_train, unet, patch_gan, "pix2pix-0.01", alpha=0.01, save_only_weights=False,
                  batch_size=8)


# Generate images on test set and download it

In [None]:
import cv2
# convert images in black and white
X_test_gray = np.zeros_like(X_test[:, :, :, 0:1])

for i in range(X_test.shape[0]):
    gray_image = cv2.cvtColor(X_test[i], cv2.COLOR_BGR2GRAY)
    gray_image = np.expand_dims(gray_image, axis=-1)
    X_test_gray[i] = gray_image


In [None]:
# Get model prediction on grey images (change autoencoder if you used another model)
predictions = autoencoder.predict(X_test_gray)
test_loss = autoencoder.evaluate(X_test_gray, X_test, verbose=1)

In [None]:
#create a folder with the colorized images and saved images
output_folder = 'generated_images'
os.makedirs(output_folder, exist_ok=True)

for i, generated_image in enumerate(predictions):
    # Denormalize the image that was previously normalized
    generated_image_denormalized = (generated_image * 255.0).astype(np.uint8)

    # Save image
    filename = os.path.join(output_folder, f'generated_image_{i}.png')
    plt.imsave(filename, generated_image_denormalized.squeeze())

print(f"Generated images saved in the folder: {output_folder}")

In [None]:
#download a zip file of the generated images folder
import shutil

folder_to_zip = 'generated_images'
zip_destination = 'generated_images.zip'
shutil.make_archive(zip_destination[:-4], 'zip', folder_to_zip)
from google.colab import files
files.download(zip_destination)

# Metrics

In [None]:
import numpy as np

def get_colourfulness(im):
    """
    Calculate colourfulness in natural images.

    Parameters:
        im: ndarray
            Image in RGB format.

    Returns:
        C: float
            Colourfulness.
    """
    im = im.astype(float)
    R = im[:, :, 0]
    G = im[:, :, 1]
    B = im[:, :, 2]

    # rg = |R - G|
    rg = np.abs(R - G).flatten()

    # yb = |0.5*(R + G) - B|
    yb = np.abs(0.5 * (R + G) - B).flatten()

    # Standard deviation and mean value of the pixel cloud along directions
    std_RG = np.std(rg)
    mean_RG = np.mean(rg)

    std_YB = np.std(yb)
    mean_YB = np.mean(yb)

    std_RGYB = np.sqrt(std_RG*2 + std_YB*2)
    mean_RGYB = np.sqrt(mean_RG*2 + mean_YB*2)

    C = std_RGYB + (0.3 * mean_RGYB)

    return C

In [None]:
# Upload foders with generated images and original test images as zip file then use this code to unzip them
from zipfile import ZipFile
import os

#generated images
zip_file_path = 'generated_images.zip'
extracted_folder_path = 'content/images'

with ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extracted_folder_path)

#original test images
zip_file_path = 'test_images.zip'
extracted_folder_path = 'content/test_images'
with ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extracted_folder_path)

In [None]:
# get colorfullness of all images

folder_path = 'content/images/'
image_files = os.listdir(folder_path)
count=0
colorfullnes=0
for image_file in image_files:
    image_path = os.path.join(folder_path, image_file)
    image = Image.open(image_path)

    # If the image has 4 channels convert it into rgb
    if image.mode == 'RGBA':
        image = image.convert('RGB')

    image_array = np.array(image)
    colorfullnes+=get_colourfulness(image_array)
    count+=1

#print colorfullness and number of images
print(colorfullnes)
print(count)

In [None]:
#define psnr
def psnr(original, compressed):
    """
    Calculate PSNR (Peak Signal-to-Noise Ratio) between two images.

    Parameters:
        original: ndarray
            Original image.
        compressed: ndarray
            Compressed image.

    Returns:
        psnr_value: float
            PSNR value.
    """
    mse = np.mean((original - compressed) ** 2)
    max_pixel = np.max(original)
    psnr_value = 20 * np.log10(max_pixel / np.sqrt(mse))
    return psnr_value

In [None]:
#psnr of all images
psnr_tot=0
count=0


folder_path = 'content/images/'
test_images_path = 'content/test_images/test_images/'
png_image_files = [file for file in os.listdir(folder_path) if file.lower().endswith('.png')]

for png_image_file in png_image_files:
    image_number = int(png_image_file.split("_")[-1].split(".")[0])
    png_image_path = os.path.join(folder_path, png_image_file)
    png_image = Image.open(png_image_path)

    # If the image has 4 channels convert it into rgb
    if png_image.mode == 'RGBA':
      png_image = image.convert('RGB')
    png_image_array = np.array(png_image)
    test_image_path = os.path.join(test_images_path, f'image_{image_number + 1}.jpg')
    test_image = Image.open(test_image_path)
    test_image_array = np.array(test_image)

    psnr_tot+=psnr(test_image_array, png_image_array)
    count+=1

#print value
print(psnr_tot)
print(count)

In [None]:
#definte the function to compute ssim
from skimage.metrics import structural_similarity as ssim
def calculate_ssim(original, compressed):
    """
    Calculate SSIM (Structural Similarity Index) between two images.

    Parameters:
        original: ndarray
            Original image.
        compressed: ndarray
            Compressed image.

    Returns:
        ssim_value: float
            SSIM value.
    """
    ssim_value, _ = ssim(original, compressed, win_size=3, full=True)
    return ssim_value

In [None]:
#compute ssim on all images

ssim_tot=0
count=0
folder_path = 'content/images/'
test_images_path = 'content/test_images/test_images/'
png_image_files = [file for file in os.listdir(folder_path) if file.lower().endswith('.png')]

for png_image_file in png_image_files:
    image_number = int(png_image_file.split("_")[-1].split(".")[0])
    png_image_path = os.path.join(folder_path, png_image_file)
    png_image = Image.open(png_image_path)

    if png_image.mode == 'RGBA':
      png_image = image.convert('RGB')

    png_image_array = np.array(png_image)
    test_image_path = os.path.join(test_images_path, f'image_{image_number + 1}.jpg')
    test_image = Image.open(test_image_path)
    test_image_array = np.array(test_image)
    ssim_couple=calculate_ssim(test_image_array, png_image_array)
    ssim_tot+=ssim_couple
    count+=1

#print results
print(ssim_tot)
print(count)



In [None]:
# mean squared error

mse_tot=0
count=0
folder_path = 'content/images/'
test_images_path = 'content/test_images/test_images/'
png_image_files = [file for file in os.listdir(folder_path) if file.lower().endswith('.png')]
for png_image_file in png_image_files:
    image_number = int(png_image_file.split("_")[-1].split(".")[0])
    png_image_path = os.path.join(folder_path, png_image_file)
    png_image = Image.open(png_image_path)

    if png_image.mode == 'RGBA':
      png_image = image.convert('RGB')

    png_image_array = np.array(png_image)
    test_image_path = os.path.join(test_images_path, f'image_{image_number + 1}.jpg')
    test_image = Image.open(test_image_path)
    test_image_array = np.array(test_image)
    mse_tot += np.sum((test_image_array - png_image_array) ** 2)
    count+=1

# Compute the mean of the squared errors
mean_mse=mse_tot/count

# Print results
print(mean_mse)
print(count)

In [None]:
#mean absolute errore
mae_tot=0
count=0
folder_path = 'content/images/'
test_images_path = 'content/test_images/test_images/'
png_image_files = [file for file in os.listdir(folder_path) if file.lower().endswith('.png')]

for png_image_file in png_image_files:
    image_number = int(png_image_file.split("_")[-1].split(".")[0])
    png_image_path = os.path.join(folder_path, png_image_file)
    png_image = Image.open(png_image_path)

    if png_image.mode == 'RGBA':
      png_image = image.convert('RGB')

    png_image_array = np.array(png_image)
    test_image_path = os.path.join(test_images_path, f'image_{image_number + 1}.jpg')
    test_image = Image.open(test_image_path)
    test_image_array = np.array(test_image)
    mae_tot += np.sum(np.abs(test_image_array - png_image_array))
    count+=1

# Compute the mean MAE
mean_mae=mae_tot/count
#print results
print(mean_mae)
print(count)

# Plot

I upload a folder with selected images for i from 1 to 10 named test_i for the test and autoencoder_i, unet_i, dense_net_i, etc. for the models

In [None]:
import matplotlib.pyplot as plt
from PIL import Image
import os

In [None]:
fig, axs = plt.subplots(8, 10, figsize=(13,12), dpi=200)
for ax in axs.flatten():
    ax.set_yticks([], [])
    ax.set_xticks([], [])
axs[0, 0].set_ylabel("Grayscale\n128x128", rotation=0, fontsize=15, labelpad=60)
axs[1, 0].set_ylabel("Ground truth\n128x128", rotation=0, fontsize=15, labelpad=60)
axs[2, 0].set_ylabel("Autoencoder\n128x128", rotation=0, fontsize=15, labelpad=60)
axs[3, 0].set_ylabel("Deep CNN\n128x128", rotation=0, fontsize=15, labelpad=60)
axs[4, 0].set_ylabel("U-Net\n128x128", rotation=0, fontsize=15, labelpad=60)
axs[5, 0].set_ylabel("ResNet\n128x128", rotation=0, fontsize=15, labelpad=60)
axs[6, 0].set_ylabel("DenseNet\n128x128", rotation=0, fontsize=15, labelpad=60)
axs[7, 0].set_ylabel("Pix2Pix\n128x128", rotation=0, fontsize=15, labelpad=60)
for i in range(10):
    axs[0, i].imshow(Image.open(''.join(["/content/immagini/immagini_finali/test_", str(i+1),".jpg" ])).convert("L"), cmap="gray")
    axs[1, i].imshow(Image.open(''.join(["/content/immagini/immagini_finali/test_", str(i+1),".jpg" ])))
    axs[2, i].imshow(Image.open(''.join(["/content/immagini/immagini_finali/autoencoder_", str(i+1),".png" ])))
    axs[3, i].imshow(Image.open(''.join(["/content/immagini/immagini_finali/deep_nn_", str(i+1),".png" ])))
    axs[4, i].imshow(Image.open(''.join(["/content/immagini/immagini_finali/unet_", str(i+1),".png" ])))
    axs[5, i].imshow(Image.open(''.join(["/content/immagini/immagini_finali/resnet_", str(i+1),".png" ])))
    axs[6, i].imshow(Image.open(''.join(["/content/immagini/immagini_finali/dense_net_", str(i+1),".png" ])))
    axs[7, i].imshow(Image.open(''.join(["/content/immagini/immagini_finali/pix2pix_", str(i+1),".png" ])))
print("finish")
plt.subplots_adjust(wspace=0.1, hspace=0)
plt.show()