In [1]:
import os
import numpy as np 
import tensorflow as tf 
from keras import Input 
from keras.applications import VGG19 
from keras.callbacks import TensorBoard 
from keras.layers import BatchNormalization, Activation, LeakyReLU, Add, Dense, PReLU, Flatten, Conv2D, UpSampling2D
from keras.models import Model 
from keras.optimizers import Adam
import keras
from keras.models import Sequential
from keras.preprocessing.image import img_to_array
from tqdm import tqdm
import re
import matplotlib.pyplot as plt
import glob
import time
from skimage.metrics import peak_signal_noise_ratio

2024-04-03 20:56:17.897295: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-04-03 20:56:17.897429: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-04-03 20:56:18.054254: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [2]:
def residual_block(x):
    """
    Residual block
    """
    filters = [64, 64]
    kernel_size = 3
    strides = 1
    padding = "same"
    momentum = 0.8
    activation = "relu"
    
    res = Conv2D(filters=filters[0], kernel_size=kernel_size, 
                 strides=strides, padding=padding)(x)
    res = Activation(activation=activation)(res)
    res = BatchNormalization(momentum=momentum)(res)
    
    res = Conv2D(filters=filters[1], kernel_size=kernel_size, 
                 strides=strides, padding=padding)(res)
    res = BatchNormalization(momentum=momentum)(res)
    
    # Add res and x
    res = Add()([res, x])
    
    return res

In [3]:
def build_generator():
    """
    Create a generator network using the hyperparameter values defined below
    :return:
    """
    residual_blocks = 16
    momentum = 0.8
    input_shape = (64, 64, 3)
    
    # Input Layer of the generator network
    input_layer = Input(shape=input_shape)
    
    # Add the pre-residual block
    gen1 = Conv2D(filters=64, kernel_size=9, strides=1, padding='same', activation='relu')(input_layer)
    
    # Add 16 residual blocks
    res = residual_block(gen1)
    for i in range(residual_blocks - 1):
        res = residual_block(res)
    
    # Add the post-residual block
    gen2 = Conv2D(filters=64, kernel_size=3, strides=1, padding='same')(res)
    gen2 = BatchNormalization(momentum=momentum)(gen2)
    
    # Take the sum of the output from the pre-residual block(gen1) and the post-residual block(gen2)
    gen3 = Add()([gen2, gen1])
    
    # Add an upsampling block
    gen4 = UpSampling2D(size=2)(gen3)
    gen4 = Conv2D(filters=256, kernel_size=3, strides=1, padding='same')(gen4)
    gen4 = Activation('relu')(gen4)
    
    # Add another upsampling block
    gen5 = UpSampling2D(size=2)(gen4)
    gen5 = Conv2D(filters=256, kernel_size=3, strides=1, padding='same')(gen5)
    gen5 = Activation('relu')(gen5)
    
    # Output convolution layer
    gen6 = Conv2D(filters=3, kernel_size=9, strides=1, padding='same')(gen5)
    output = Activation('tanh')(gen6)
    
    # Keras model
    model = Model(inputs=[input_layer], outputs=[output], name='generator')
    
    return model

In [4]:
def build_discriminator():
    
    leakyrelu_alpha = 0.2
    momentum = 0.8
    input_shape = (256, 256, 3)

    input_layer = Input(shape=input_shape)

    # Add the first convolution block
    dis1 = Conv2D(filters=64, kernel_size=3, strides=1, padding='same')(input_layer)
    dis1 = LeakyReLU(alpha=leakyrelu_alpha)(dis1)

    # Add the 2nd convolution block
    dis2 = Conv2D(filters=64, kernel_size=3, strides=2, padding='same')(dis1)
    dis2 = LeakyReLU(alpha=leakyrelu_alpha)(dis2)
    dis2 = BatchNormalization(momentum=momentum)(dis2)

    # Add the third convolution block
    dis3 = Conv2D(filters=128, kernel_size=3, strides=1, padding='same')(dis2)
    dis3 = LeakyReLU(alpha=leakyrelu_alpha)(dis3)
    dis3 = BatchNormalization(momentum=momentum)(dis3)
    
    # Add the fourth convolution block
    dis4 = Conv2D(filters=128, kernel_size=3, strides=2, padding='same')(dis3)
    dis4 = LeakyReLU(alpha=leakyrelu_alpha)(dis4)
    dis4 = BatchNormalization(momentum=0.8)(dis4)

    # Add the fifth convolution block
    dis5 = Conv2D(256, kernel_size=3, strides=1, padding='same')(dis4)
    dis5 = LeakyReLU(alpha=leakyrelu_alpha)(dis5)
    dis5 = BatchNormalization(momentum=momentum)(dis5)

    # Add the sixth convolution block
    dis6 = Conv2D(filters=256, kernel_size=3, strides=2, padding='same')(dis5)
    dis6 = LeakyReLU(alpha=leakyrelu_alpha)(dis6)
    dis6 = BatchNormalization(momentum=momentum)(dis6)

    # Add the seventh convolution block
    dis7 = Conv2D(filters=512, kernel_size=3, strides=1, padding='same')(dis6)
    dis7 = LeakyReLU(alpha=leakyrelu_alpha)(dis7)
    dis7 = BatchNormalization(momentum=momentum)(dis7)
    
    # Add the eight convolution block
    dis8 = Conv2D(filters=512, kernel_size=3, strides=2, padding='same')(dis7)
    dis8 = LeakyReLU(alpha=leakyrelu_alpha)(dis8)
    dis8 = BatchNormalization(momentum=momentum)(dis8)

    # Flatten the output from convolutional layers
    flatten = Flatten()(dis8)
    
    # Add a dense layer
    dis9 = Dense(units=1024)(flatten)
    dis9 = LeakyReLU(alpha=0.2)(dis9)

    # Last dense layer - for classification
    output = Dense(units=1, activation='sigmoid')(dis9)

    model = Model(inputs=[input_layer], outputs=[output], name='discriminator')
    return model


In [5]:
def build_vgg():
    """
    Build the VGG network to extract image features
    """
    input_shape = (256, 256, 3)
    
    # Load a pre-trained VGG19 model trained on 'Imagenet' dataset
    vgg = VGG19(weights="imagenet", include_top=False)
    
    # Choose the layer from which you want to extract features
    output_layer = vgg.layers[9].output
    
    # Create a new model with the desired output layer
    model = Model(inputs=vgg.input, outputs=output_layer)
    
    return model

In [6]:
def build_adversarial_model(generator, discriminator, vgg):
    input_low_resolution = Input(shape=(64, 64, 3))
    fake_hr_images = generator(input_low_resolution)
    fake_features = vgg(fake_hr_images)
    discriminator.trainable = False
    output = discriminator(fake_hr_images)
    model = Model(inputs=[input_low_resolution],
    outputs=[output, fake_features])
    
    for layer in model.layers:
        print(layer.name, layer.trainable)
        
    print(model.summary())
    return model


In [7]:
# Define hyperparameters
data_dir = "/kaggle/input/celebrities-100k/100k/100k.jpg"
epochs = 241
batch_size = 1
# Shape of low-resolution and high-resolution images
low_resolution_shape = (64, 64, 3)
high_resolution_shape = (256, 256, 3)

In [8]:
# Common optimizer for all networks
common_optimizer = Adam(0.0002, 0.5)

In [9]:
vgg = build_vgg()
vgg.trainable = False
vgg.compile(loss='mse', optimizer=common_optimizer, metrics=['accuracy'])

In [10]:
discriminator = build_discriminator()
discriminator.compile(loss='mse', optimizer=common_optimizer, 
 metrics=['accuracy'])



In [11]:
generator = build_generator()

In [12]:
input_high_resolution = Input(shape=high_resolution_shape)
input_low_resolution = Input(shape=low_resolution_shape)

In [13]:
generated_high_resolution_images = generator(input_low_resolution)

In [14]:
features = vgg(generated_high_resolution_images)

In [15]:
discriminator.trainable = False

In [16]:
probs = discriminator(generated_high_resolution_images)

In [17]:
adversarial_model = Model([input_low_resolution, input_high_resolution], [probs[0], features])
adversarial_model.compile(loss=['binary_crossentropy', 'mse'], 
 loss_weights=[1e-3, 1], optimizer=common_optimizer)

In [18]:
tensorboard = TensorBoard(log_dir="logs/".format(time.time()))
tensorboard.set_model(generator)
tensorboard.set_model(discriminator)

In [19]:
def sample_images(data_dir, batch_size, high_resolution_shape, low_resolution_shape):
    
    data_dir = "/kaggle/input/celebrities-100k/100k/100k/*.jpg"
    # Make a list of all images inside the data directory
    all_images = glob.glob(data_dir)

    # Choose a random batch of images
    images_batch = np.random.choice(all_images, size=batch_size)

    low_resolution_images = []
    high_resolution_images = []

    for img_path in images_batch:
        # Read image from file path
        img = tf.io.read_file(img_path)
        # Decode image to a tensor
        img = tf.image.decode_jpeg(img, channels=3)  # Assuming JPEG images with 3 channels
        
        # Resize the image
        img_high_resolution = tf.image.resize(img, high_resolution_shape[:2])
        img_low_resolution = tf.image.resize(img, low_resolution_shape[:2])

        # Convert image to numpy array
        img_high_resolution = img_high_resolution.numpy().astype(np.float32)
        img_low_resolution = img_low_resolution.numpy().astype(np.float32)

        # Do a random horizontal flip
        if np.random.random() < 0.5:
            img_high_resolution = np.fliplr(img_high_resolution)
            img_low_resolution = np.fliplr(img_low_resolution)

        high_resolution_images.append(img_high_resolution)
        low_resolution_images.append(img_low_resolution)

    # Convert the lists to Numpy NDArrays
    return np.array(high_resolution_images), np.array(low_resolution_images)


In [20]:
def save_images(low_resolution_image, original_image, generated_image, path):
    """
    Save low-resolution, high-resolution(original) and
    generated high-resolution images in a single image
    """
    fig = plt.figure()
    ax = fig.add_subplot(1, 3, 1)
    ax.imshow(low_resolution_image)
    ax.axis("off")
    ax.set_title("Low-resolution")

    ax = fig.add_subplot(1, 3, 2)
    ax.imshow(original_image)
    ax.axis("off")
    ax.set_title("Original")

    ax = fig.add_subplot(1, 3, 3)
    ax.imshow(generated_image)
    ax.axis("off")
    ax.set_title("Generated")

    plt.savefig(path)


In [None]:
# Function to calculate PSNR
def calculate_psnr(hr_images, generated_images):
    psnr_values = []
    for i in range(len(hr_images)):
        psnr = peak_signal_noise_ratio(hr_images[i], generated_images[i], data_range=1.0)
        psnr_values.append(psnr)
    return np.mean(psnr_values)

def save_images(low_resolution_image, original_image, generated_image, path):
    if not os.path.exists(os.path.dirname(path)):
        os.makedirs(os.path.dirname(path))
    fig, axs = plt.subplots(1, 3, figsize=(12, 4))
    axs[0].imshow(low_resolution_image)
    axs[0].set_title('Low Resolution')
    axs[0].axis('off')
    axs[1].imshow(original_image)
    axs[1].set_title('Original')
    axs[1].axis('off')
    axs[2].imshow(generated_image)
    axs[2].set_title('Generated')
    axs[2].axis('off')
    plt.tight_layout()
    plt.savefig(path)
    plt.close()

for epoch in range(epochs):
    print("Epoch:{}".format(epoch))

    """
    Train the discriminator network
    """

    # Sample a batch of images
    high_resolution_images, low_resolution_images = sample_images(data_dir=data_dir, batch_size=batch_size,
                                                                  low_resolution_shape=low_resolution_shape,
                                                                  high_resolution_shape=high_resolution_shape)
    # Normalize images
    high_resolution_images = high_resolution_images / 127.5 - 1.
    low_resolution_images = low_resolution_images / 127.5 - 1.

    # Generate high-resolution images from low-resolution images
    generated_high_resolution_images = generator.predict(low_resolution_images)

    # Generate batch of real and fake labels
    real_labels = np.ones((batch_size, 1))
    fake_labels = np.zeros((batch_size, 1))

    # Train the discriminator network on real and fake images
    d_loss_real = discriminator.train_on_batch(high_resolution_images, real_labels)
    d_loss_fake = discriminator.train_on_batch(generated_high_resolution_images, fake_labels)

    # Calculate total discriminator loss
    d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

    """
    Train the generator network
    """

    # Sample a batch of images
    high_resolution_images, low_resolution_images = sample_images(data_dir=data_dir, batch_size=batch_size,
                                                                  low_resolution_shape=low_resolution_shape,
                                                                  high_resolution_shape=high_resolution_shape)
    # Normalize images
    high_resolution_images = high_resolution_images / 127.5 - 1.
    low_resolution_images = low_resolution_images / 127.5 - 1.

    # Extract feature maps for real high-resolution images
    image_features = vgg.predict(high_resolution_images)

    if len(real_labels.shape) == 1:
        real_labels = np.expand_dims(real_labels, axis=-1)

    # Train the generator network
    g_loss = adversarial_model.train_on_batch([low_resolution_images, high_resolution_images],
                                              [real_labels, image_features])

    # Sample and save images after every 100 epochs
    if epoch % 30 == 0:
        high_resolution_images, low_resolution_images = sample_images(data_dir=data_dir, batch_size=batch_size,
                                                                      low_resolution_shape=low_resolution_shape,
                                                                      high_resolution_shape=high_resolution_shape)
        # Normalize images
        high_resolution_images = high_resolution_images / 127.5 - 1.
        low_resolution_images = low_resolution_images / 127.5 - 1.

        generated_images = generator.predict_on_batch(low_resolution_images)

        # Calculate PSNR
        psnr_value = calculate_psnr(high_resolution_images, generated_images)
        print("PSNR after {} epochs: {}".format(epoch, psnr_value))

        for index, img in enumerate(generated_images):
            save_images(low_resolution_images[index], high_resolution_images[index], img,
                        path="results/img_{}_{}.png".format(epoch, index))

    
# Save models
generator.save_weights("generator.weights.h5")
discriminator.save_weights("discriminator.weights.h5")


Epoch:0
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 10s/step


I0000 00:00:1712177865.980629    5650 device_compiler.h:186] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 4s/step
PSNR after 0 epochs: 0.7917734911874961
Epoch:1
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
Epoch:2
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
Epoch:3
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
Epoch:4
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
Epoch:5
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 23ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
Epoch:6
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m

In [None]:
# Check if weights files exist before loading
if os.path.exists("generator.weights.h5") and os.path.exists("discriminator.weights.h5"):
    # Load models
    generator.load_weights("generator.weights.h5")
    discriminator.load_weights("discriminator.weights.h5")
else:
    print("Generator or discriminator weights file not found. Please ensure the models are trained first.")
    exit(1)

# Get 10 random images
high_resolution_images, low_resolution_images = sample_images(data_dir=data_dir, batch_size=10,
                                                              low_resolution_shape=low_resolution_shape,
                                                              high_resolution_shape=high_resolution_shape)

# Normalize images
high_resolution_images = high_resolution_images / 127.5 - 1.
low_resolution_images = low_resolution_images / 127.5 - 1.

# Generate high-resolution images from low-resolution images
generated_images = generator.predict_on_batch(low_resolution_images)

# Ensure the directory exists
if not os.path.exists("results"):
    os.makedirs("results")

# Save images
for index, img in enumerate(generated_images):
    save_images(low_resolution_images[index], high_resolution_images[index], img,
                path="results/gen_{}.png".format(index))
    print("Saved image {}".format(index))

# Save models (optional)
generator.save_weights("generator.weights.h5")
discriminator.save_weights("discriminator.weights.h5")
