# Let's Begin

## Imports

In [1]:
import tensorflow as tf
from tensorflow.keras import mixed_precision
from tensorflow.keras import layers, Model
import numpy as np
import matplotlib.pyplot as plt
import os

2025-01-08 18:44:10.022924: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1736342050.041598   80698 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1736342050.047057   80698 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-01-08 18:44:10.066320: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
# print("Num GPUs Available:", len(tf.config.list_physical_devices('GPU')))
# physical_devices = tf.config.list_physical_devices('GPU')
# tf.config.experimental.set_memory_growth(physical_devices[0], True)

In [3]:
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"  # Disable GPU

In [4]:
tf.keras.backend.clear_session() 

In [5]:
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

In [6]:
np.random.seed(42)
tf.random.set_seed(42)

In [7]:
LOW_RES_PATH = "./LR/"
HIGH_RES_PATH = "./HR/"

In [8]:
EPOCHS = 10
BATCH_SIZE = 1
HR_SHAPE = (2040, 2040, 3)
LR_SHAPE = (192, 256, 3)
LEARNING_RATE = 1e-4

In [9]:
def preprocess_image(image_path, target_size):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, target_size)
    image = tf.cast(image, tf.float16) / 127.5 - 1.0  # Normalize to [-1, 1]
    return image

In [10]:
def load_dataset(lr_path, hr_path, lr_shape, hr_shape, batch_size):
    lr_files = tf.data.Dataset.list_files(os.path.join(lr_path, "*.png"), shuffle=True)
    hr_files = tf.data.Dataset.list_files(os.path.join(hr_path, "*.png"), shuffle=True)

    lr_images = lr_files.map(
        lambda x: preprocess_image(x, lr_shape[:2]), num_parallel_calls=tf.data.AUTOTUNE
    )
    hr_images = hr_files.map(
        lambda x: preprocess_image(x, hr_shape[:2]), num_parallel_calls=tf.data.AUTOTUNE
    )

    dataset = tf.data.Dataset.zip((lr_images, hr_images))
    dataset = dataset.shuffle(buffer_size=256)  # Reduce shuffle buffer
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)  # Prefetch for performance
    return dataset

In [11]:
dataset = load_dataset(LOW_RES_PATH, HIGH_RES_PATH, LR_SHAPE, HR_SHAPE, BATCH_SIZE)

2025-01-08 18:44:12.554576: E external/local_xla/xla/stream_executor/cuda/cuda_driver.cc:152] failed call to cuInit: INTERNAL: CUDA error: Failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2025-01-08 18:44:12.554627: I external/local_xla/xla/stream_executor/cuda/cuda_diagnostics.cc:137] retrieving CUDA diagnostic information for host: Vallhala
2025-01-08 18:44:12.554635: I external/local_xla/xla/stream_executor/cuda/cuda_diagnostics.cc:144] hostname: Vallhala
2025-01-08 18:44:12.554800: I external/local_xla/xla/stream_executor/cuda/cuda_diagnostics.cc:168] libcuda reported version is: 535.183.1
2025-01-08 18:44:12.554828: I external/local_xla/xla/stream_executor/cuda/cuda_diagnostics.cc:172] kernel reported version is: 535.183.1
2025-01-08 18:44:12.554834: I external/local_xla/xla/stream_executor/cuda/cuda_diagnostics.cc:259] kernel version seems to match DSO: 535.183.1


In [12]:
def build_generator(input_shape=(192, 256, 3)):
    inputs = layers.Input(shape=input_shape)

    # Initial Convolution Block
    x = layers.Conv2D(32, (9, 9), padding="same")(inputs)
    x = layers.PReLU()(x)
    residual = x

    # Residual Blocks
    for _ in range(1):  # Reduced number of residual blocks
        x = layers.Conv2D(32, (3, 3), padding="same")(x)
        x = layers.BatchNormalization()(x)
        x = layers.PReLU()(x)
        x = layers.Conv2D(32, (3, 3), padding="same")(x)
        x = layers.BatchNormalization()(x)
        x = layers.add([x, residual])

    # Upsampling Layers
    for _ in range(1):  # 3 upscaling layers with block_size=2
        x = layers.Conv2D(32, (3, 3), padding="same")(x)
        x = layers.Lambda(tf.nn.depth_to_space, arguments={'block_size': 2})(x)
        x = layers.PReLU()(x)

    # Final Convolution Block
    x = layers.Conv2D(3, (3, 3), padding="same", activation="tanh")(x)
    x = layers.Lambda(lambda x: tf.image.resize(x, HR_SHAPE[:2]))(x)
    return Model(inputs, x)


generator = build_generator(LR_SHAPE)
generator.summary()

In [13]:
def build_discriminator(input_shape=(128, 128, 3)):
    inputs = layers.Input(shape=input_shape)

    # Convolutional Blocks
    x = layers.Conv2D(32, (3, 3), strides=1, padding="same")(inputs)
    x = layers.LeakyReLU(alpha=0.2)(x)

    for filters in [32, 64]:
        x = layers.Conv2D(filters, (3, 3), strides=2, padding="same")(x)
        x = layers.BatchNormalization()(x)
        x = layers.LeakyReLU(alpha=0.2)(x)

    # Global Pooling Layer instead of Flatten
    x = layers.GlobalAveragePooling2D()(x)

    # Dense Layers
    x = layers.Dense(64)(x)  # Reduced size
    x = layers.LeakyReLU(alpha=0.2)(x)
    outputs = layers.Dense(1, activation="sigmoid")(x)

    return Model(inputs, outputs)

discriminator = build_discriminator(HR_SHAPE)
discriminator.summary()




In [14]:
# Pre-trained VGG model for perceptual loss
vgg = tf.keras.applications.VGG19(include_top=False, weights="imagenet", input_shape=HR_SHAPE)
vgg.trainable = False
vgg = Model(vgg.input, vgg.layers[10].output)  # Use intermediate layer for feature extraction

In [15]:
binary_cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=False)

In [16]:
def perceptual_loss(hr, sr):
    sr_features = vgg(sr)
    hr_features = vgg(hr)
    return tf.reduce_mean(tf.square(hr_features - sr_features))

In [17]:
# Optimizers
g_optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)
d_optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)

In [18]:
@tf.function
def train_step(lr_batch, hr_batch):
    with tf.GradientTape(persistent=True) as tape:
        # Forward pass
        fake_hr = generator(lr_batch, training=True)
        
        # Cast both the fake_hr and hr_batch to float32 for loss calculation
        fake_hr = tf.cast(fake_hr, dtype=tf.float32)
        hr_batch = tf.cast(hr_batch, dtype=tf.float32)
        
        # Calculate loss (Example: using L1 loss)
        g_loss = tf.reduce_mean(tf.abs(fake_hr - hr_batch))  # Example loss
        
        # Example of discriminator loss
        d_loss_real = discriminator(hr_batch, training=True)
        d_loss_fake = discriminator(fake_hr, training=True)
        d_loss = 0.5 * (tf.reduce_mean(d_loss_real) + tf.reduce_mean(d_loss_fake))
        
    # Calculate gradients
    gradients_of_generator = tape.gradient(g_loss, generator.trainable_variables)
    gradients_of_discriminator = tape.gradient(d_loss, discriminator.trainable_variables)
    
    # Apply gradients
    g_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    d_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

    # Delete the tape to free resources after use
    del tape

    return d_loss, g_loss

In [19]:
# Training loop
for epoch in range(EPOCHS):
    for lr_batch, hr_batch in dataset:
        d_loss, g_loss = train_step(lr_batch, hr_batch)

    if epoch % 1 == 0:
        print(f"Epoch {epoch}/{EPOCHS} | D Loss: {d_loss:.4f} | G Loss: {g_loss:.4f}")

    # Save model periodically
    if epoch % 1 == 0:
        generator.save(f"generator_epoch_{epoch}.h5")

In [None]:
generator.save("srgan_generator.h5")