<a href="https://colab.research.google.com/github/tanyavijj/Tanya-project/blob/main/SRGAN_FINAL_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
import zipfile
import os

# Mount Google Drive
drive.mount('/content/drive')

import os
import zipfile

# Define paths
zip_path = "/content/drive/MyDrive/archive (5).zip"
extract_dir = "/content/dataset"

# Extract dataset
if not os.path.exists(extract_dir):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_dir)

# Define HR image folders
train_dir = "/content/drive/MyDrive/archive (5)/DIV2K_train_HR"
val_dir = "/content/drive/MyDrive/archive (5)/DIV2K_valid_HR"


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
import matplotlib.pyplot as plt
import os
from PIL import Image
import random
from skimage.metrics import structural_similarity as ssim
import cv2

print("Num GPUs Available:", len(tf.config.list_physical_devices('GPU')))


Num GPUs Available: 1


In [4]:
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
import matplotlib.pyplot as plt
import os
from PIL import Image
import random
from skimage.metrics import structural_similarity as ssim
import cv2

print("Num GPUs Available:", len(tf.config.list_physical_devices('GPU')))

# %%
def preprocess_image(path):
    hr_image = tf.io.read_file(path)
    hr_image = tf.image.decode_png(hr_image, channels=3)
    hr_image = tf.image.convert_image_dtype(hr_image, tf.float32)
    hr_image = tf.image.resize(hr_image, [96, 96])  # HR target

    lr_image = tf.image.resize(hr_image, [24, 24])  # Downscale to LR
    return lr_image, hr_image


def load_dataset(folder, max_images=100):
    images = os.listdir(folder)[:max_images]
    data = []
    for img in images:
        # Check if the current item is a file before processing
        image_path = os.path.join(folder, img)
        if os.path.isfile(image_path):
            data.append(preprocess_image(image_path))

    # Check if data is empty before unpacking
    if not data:
        print(f"Warning: No image files found in {folder}. Returning empty dataset.")
        return tf.data.Dataset.from_tensor_slices((np.array([]), np.array([]))).batch(4)

    lrs, hrs = zip(*data)
    return tf.data.Dataset.from_tensor_slices((np.array(lrs), np.array(hrs))).batch(4)

train_dataset = load_dataset(train_dir)
val_dataset = load_dataset(val_dir)


Num GPUs Available: 1


In [7]:
def build_generator():
    def res_block(x_in):
        x = layers.Conv2D(64, 3, padding='same')(x_in)
        x = layers.BatchNormalization()(x)
        x = layers.Activation('relu')(x)  # Use Keras Activation layer
        x = layers.Conv2D(64, 3, padding='same')(x)
        x = layers.BatchNormalization()(x)
        return layers.Add()([x_in, x])

    inputs = layers.Input(shape=(96, 96, 3))
    x = layers.Conv2D(64, 9, padding='same')(inputs)
    x = layers.Activation('relu')(x)  # Use Keras Activation layer
    skip = x
    for _ in range(16):
        x = res_block(x)
    x = layers.Conv2D(64, 3, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Add()([x, skip])
    for _ in range(2):
        x = layers.Conv2D(256, 3, padding='same')(x)
        # Wrap tf.nn.depth_to_space in a Lambda layer
        x = layers.Lambda(lambda x: tf.nn.depth_to_space(x, 2))(x)
        x = layers.Activation('relu')(x)  # Use Keras Activation layer
    x = layers.Conv2D(3, 9, padding='same', activation='tanh')(x)
    return models.Model(inputs, x)

def build_discriminator():
    def disc_block(x, filters, strides):
        x = layers.Conv2D(filters, 3, strides=strides, padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.LeakyReLU(0.2)(x)
        return x

    inputs = layers.Input(shape=(96, 96, 3))
    x = layers.Conv2D(64, 3, strides=1, padding='same')(inputs)
    x = layers.LeakyReLU(0.2)(x)
    for filters, strides in [(64, 2), (128, 1), (128, 2), (256, 1), (256, 2), (512, 1), (512, 2)]:
        x = disc_block(x, filters, strides)
    x = layers.Flatten()(x)
    x = layers.Dense(1024)(x)
    x = layers.LeakyReLU(0.2)(x)
    x = layers.Dense(1, activation='sigmoid')(x)
    return models.Model(inputs, x)

generator = build_generator()
discriminator = build_discriminator()


In [8]:
vgg = tf.keras.applications.VGG19(include_top=False, weights='imagenet', input_shape=(96, 96, 3))
vgg.trainable = False
vgg_model = tf.keras.Model(inputs=vgg.input, outputs=vgg.get_layer("block5_conv4").output)


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg19/vgg19_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m80134624/80134624[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


In [9]:
bce_loss = tf.keras.losses.BinaryCrossentropy()
generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)


In [10]:
EPOCHS = 5

for epoch in range(EPOCHS):
    print(f"\nEpoch {epoch+1}/{EPOCHS}")
    for step, (lr, hr) in enumerate(train_dataset):
        with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
            sr = generator(lr, training=True)
            real_output = discriminator(hr, training=True)
            fake_output = discriminator(sr, training=True)

            hr_features = vgg_model(hr)
            sr_features = vgg_model(sr)
            content_loss = tf.reduce_mean(tf.square(hr_features - sr_features))
            adv_loss = bce_loss(tf.ones_like(fake_output), fake_output)
            gen_loss = content_loss + 1e-3 * adv_loss

            real_loss = bce_loss(tf.ones_like(real_output), real_output)
            fake_loss = bce_loss(tf.zeros_like(fake_output), fake_output)
            disc_loss = real_loss + fake_loss

        gen_grads = gen_tape.gradient(gen_loss, generator.trainable_variables)
        disc_grads = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

        generator_optimizer.apply_gradients(zip(gen_grads, generator.trainable_variables))
        discriminator_optimizer.apply_gradients(zip(disc_grads, discriminator.trainable_variables))

        if step % 10 == 0:
            print(f"Step {step} | Gen Loss: {gen_loss:.4f} | Disc Loss: {disc_loss:.4f}")


Epoch 1/5

Epoch 2/5

Epoch 3/5

Epoch 4/5

Epoch 5/5


In [11]:
def evaluate(sr_img, hr_img):
    sr = np.clip(sr_img * 255, 0, 255).astype(np.uint8)
    hr = np.clip(hr_img * 255, 0, 255).astype(np.uint8)
    psnr_val = tf.image.psnr(sr, hr, max_val=255).numpy()
    ssim_val = ssim(hr, sr, multichannel=True)
    return psnr_val, ssim_val

for lr, hr in val_dataset.take(1):
    sr = generator(lr, training=False)
    for i in range(len(sr)):
        psnr_val, ssim_val = evaluate(sr[i].numpy(), hr[i].numpy())
        print(f"Image {i+1}: PSNR = {psnr_val:.2f}, SSIM = {ssim_val:.4f}")
