In [None]:
import os


# Path to the info.txt file in the jp2k folder

info_path = r"c:\matek_msc\Neural Nets\data\LIVE\jp2k\info.txt"

# Set your base directory where ALL images are stored

IMAGE_DIR = r"C:\matek_msc\Neural Nets\data\LIVE\jp2k"  # All .bmp files are here


def load_info_file(file_path):
    """
    Reads the info.txt file and groups entries by their reference image name.
    For each reference name (e.g. buildings.bmp), if an entry has a noise value of 0,
    it is considered the original (y) and all others (nonzero) are treated as distortions (X).
    """
    groups = {}

    with open(file_path, "r") as f:
        for line in f:
            line = line.strip()

            # Skip empty lines or comments
            if not line or line.startswith("//"):
                continue

            # Each line is expected to have three tokens: <ref_image> <distorted_image> <value>
            tokens = line.split()

            if len(tokens) != 3:
                continue  # skip unexpected format lines

            ref_img, distorted_img, value_str = tokens

            try:
                value = float(value_str)

            except ValueError:
                continue  # skip if value is not convertible

            if ref_img not in groups:
                groups[ref_img] = {"original": None, "distorted": []}

            if value == 0:

                # if multiple originals exist, warn and override.
                if groups[ref_img]["original"] is not None:
                    print(
                        f"Warning: Multiple originals for {ref_img}. Overwriting previous original {groups[ref_img]['original']} with new original {distorted_img}."
                    )
                groups[ref_img]["original"] = distorted_img

            else:
                groups[ref_img]["distorted"].append((distorted_img, value))
    return groups


grouped_pairs = load_info_file(info_path)


# Print out the groups
for ref, pair in grouped_pairs.items():
    print(f"Reference image (key): {ref}")
    original = pair["original"]
    distorted = pair["distorted"]

    if original:
        print("  Original (y):", original)
    else:
        print("  Warning: No original found!")

    if distorted:
        print("  Distorted (X):")
        for d, val in distorted:
            print(f"    {d} with noise value {val}")

    else:
        print("  No distorted versions found!")

import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import os
from PIL import Image


def load_image(filename):
    """Load and preprocess an image from the IMAGE_DIR."""
    if not filename:
        return None

    path = os.path.join(IMAGE_DIR, filename)

    try:
        img = Image.open(path)
        if img.mode != "RGB":
            img = img.convert("RGB")

        img = img.resize((256, 256))  # Resize to 256x256
        return np.array(img, dtype=np.float32) / 255.0  # Normalize to [0,1]

    except Exception as e:
        print(f"Error loading {filename}: {str(e)}")
        return None


# Organize data
X = []  # Distorted images
y = []  # Original images


for ref, pair in grouped_pairs.items():
    original_filename = pair["original"]
    distorted_list = pair["distorted"]

    # Load original image
    orig_img = load_image(original_filename)

    if orig_img is None:
        print(f"⚠️ Original image missing: {original_filename}")
        continue

    # Load distorted versions

    if not distorted_list:
        print(f"⚠️ No distorted versions for {original_filename}")
        continue

    for distorted_filename, noise_val in distorted_list:
        dist_img = load_image(distorted_filename)

        if dist_img is None:
            continue  # Skip if distorted image fails to load

        X.append(dist_img)
        y.append(orig_img)


X = np.array(X)
y = np.array(y)


# Split into train and test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


print(f"✅ Training data shape: {X_train.shape} (Distorted: X)")
print(f"✅ Test data shape: {X_test.shape} (Original clean: y)")

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, Model
from tensorflow.keras.applications import VGG19
from tensorflow.keras.models import Model as KModel
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim
import numpy as np


# === Generator ===
def build_generator(input_shape=(256, 256, 3)):
    inputs = layers.Input(shape=input_shape)
    e1 = layers.Conv2D(64, 4, strides=2, padding="same")(inputs)
    e2 = layers.Conv2D(128, 4, strides=2, padding="same")(e1)
    e3 = layers.Conv2D(256, 4, strides=2, padding="same")(e2)
    e4 = layers.Conv2D(512, 4, strides=2, padding="same")(e3)

    def res_block(x, filters):
        shortcut = x
        x = layers.Conv2D(filters, 3, padding="same", activation="relu")(x)
        x = layers.Conv2D(filters, 3, padding="same")(x)
        x = layers.Add()([shortcut, x])
        x = layers.Activation("relu")(x)
        return x

    b = res_block(e4, 512)
    b = res_block(b, 512)
    d1 = layers.Conv2DTranspose(256, 4, strides=2, padding="same", activation="relu")(b)
    d1 = layers.Concatenate()([d1, e3])
    d2 = layers.Conv2DTranspose(128, 4, strides=2, padding="same", activation="relu")(d1)
    d2 = layers.Concatenate()([d2, e2])
    d3 = layers.Conv2DTranspose(64, 4, strides=2, padding="same", activation="relu")(d2)
    d3 = layers.Concatenate()([d3, e1])
    outputs = layers.Conv2DTranspose(3, 4, strides=2, padding="same", activation="sigmoid")(d3)
    return Model(inputs, outputs)


# === Discriminator ===
def build_discriminator(input_shape=(256, 256, 3)):
    inp = layers.Input(shape=input_shape)
    tar = layers.Input(shape=input_shape)
    x = layers.Concatenate()([inp, tar])
    x = layers.Conv2D(64, 4, strides=2, padding="same")(x)
    x = layers.LeakyReLU(0.2)(x)
    x = layers.Conv2D(128, 4, strides=2, padding="same")(x)
    x = layers.LeakyReLU(0.2)(x)
    x = layers.Conv2D(256, 4, strides=2, padding="same")(x)
    x = layers.LeakyReLU(0.2)(x)
    x = layers.Conv2D(512, 4, strides=2, padding="same")(x)
    x = layers.LeakyReLU(0.2)(x)
    x = layers.Conv2D(1, 4, strides=1, padding="same")(x)
    return Model([inp, tar], x)


# === Perceptual Loss ===
class PerceptualLoss(tf.keras.losses.Loss):
    def __init__(self, layer="block3_conv3"):
        super().__init__()
        vgg = VGG19(include_top=False, weights="imagenet")
        self.model = KModel(inputs=vgg.input, outputs=vgg.get_layer(layer).output)
        self.model.trainable = False

    def call(self, y_true, y_pred):
        y_true = tf.keras.applications.vgg19.preprocess_input(y_true * 255.0)
        y_pred = tf.keras.applications.vgg19.preprocess_input(y_pred * 255.0)
        return tf.reduce_mean(tf.abs(self.model(y_true) - self.model(y_pred)))


# === Data Augmentation ===
def data_augment(x, y):
    if tf.random.uniform(()) > 0.5:
        x = tf.image.flip_left_right(x)
        y = tf.image.flip_left_right(y)
    if tf.random.uniform(()) > 0.5:
        x = tf.image.flip_up_down(x)
        y = tf.image.flip_up_down(y)
    k = tf.random.uniform((), minval=0, maxval=4, dtype=tf.int32)
    x = tf.image.rot90(x, k)
    y = tf.image.rot90(y, k)
    x = tf.image.random_brightness(x, max_delta=0.1)
    x = tf.image.random_contrast(x, lower=0.9, upper=1.1)
    noise = tf.random.normal(shape=tf.shape(x), mean=0.0, stddev=0.02)
    x = tf.clip_by_value(x + noise, 0.0, 1.0)
    return x, y


# === Losses and optimizers ===
l1_loss = tf.keras.losses.MeanAbsoluteError()
percep_loss = PerceptualLoss()
adv_loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)
gen_opt = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
disc_opt = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

# === Instantiate models ===
generator = build_generator()
discriminator = build_discriminator()


# === Training step ===
@tf.function
def train_step(x, y):
    real_labels = tf.ones((x.shape[0], 16, 16, 1))
    fake_labels = tf.zeros_like(real_labels)
    with tf.GradientTape(persistent=True) as tape:
        fake_y = generator(x, training=True)
        disc_real = discriminator([x, y], training=True)
        disc_fake = discriminator([x, fake_y], training=True)
        d_loss_real = adv_loss(real_labels, disc_real)
        d_loss_fake = adv_loss(fake_labels, disc_fake)
        d_loss = 0.5 * (d_loss_real + d_loss_fake)
        g_l1 = l1_loss(y, fake_y)
        g_perc = percep_loss(y, fake_y)
        g_adv = adv_loss(real_labels, disc_fake)
        g_ssim = ssim_loss(y, fake_y)

        # Total generator loss
        g_loss = g_l1 + 0.5 * g_perc + 0.5 * g_ssim + 0.1 * g_adv
    grads_gen = tape.gradient(g_loss, generator.trainable_variables)
    grads_disc = tape.gradient(d_loss, discriminator.trainable_variables)
    gen_opt.apply_gradients(zip(grads_gen, generator.trainable_variables))
    disc_opt.apply_gradients(zip(grads_disc, discriminator.trainable_variables))
    return g_loss, d_loss


# === Pretraining step ===
@tf.function
def pretrain_step(x, y):
    with tf.GradientTape() as tape:
        y_pred = generator(x, training=True)
        loss = l1_loss(y, y_pred)
    grads = tape.gradient(loss, generator.trainable_variables)
    gen_opt.apply_gradients(zip(grads, generator.trainable_variables))
    return loss


# === Evaluation ===
def evaluate_model(generator, X_test, y_test, sample_size=5):
    idx = np.random.choice(len(X_test), sample_size, replace=False)
    x_sample = X_test[idx]
    y_sample = y_test[idx]
    y_pred = generator.predict(x_sample, verbose=0)
    psnr_scores = [psnr(y_sample[i], y_pred[i], data_range=1.0) for i in range(sample_size)]
    ssim_scores = [ssim(y_sample[i], y_pred[i], channel_axis=-1, data_range=1.0) for i in range(sample_size)]
    return np.mean(psnr_scores), np.mean(ssim_scores)


# === Training loop ===
def train_model(X_train, y_train, X_test, y_test, batch_size=8, pretrain_epochs=5, gan_epochs=50):
    train_size = X_train.shape[0]
    steps_per_epoch = train_size // batch_size
    best_psnr = 0.0

    for epoch in range(pretrain_epochs):
        total_loss = 0
        for i in range(0, train_size, batch_size):
            x_batch = X_train[i : i + batch_size]
            y_batch = y_train[i : i + batch_size]
            x_aug, y_aug = zip(*[data_augment(tf.convert_to_tensor(x), tf.convert_to_tensor(y)) for x, y in zip(x_batch, y_batch)])
            x_aug = tf.stack(x_aug)
            y_aug = tf.stack(y_aug)
            loss = pretrain_step(x_aug, y_aug)
            total_loss += loss
        print(f"[Pretrain] Epoch {epoch+1}/{pretrain_epochs} - L1 Loss: {total_loss / steps_per_epoch:.4f}")

    for epoch in range(gan_epochs):
        g_total, d_total = 0, 0
        for i in range(0, train_size, batch_size):
            x_batch = X_train[i : i + batch_size]
            y_batch = y_train[i : i + batch_size]
            x_aug, y_aug = zip(*[data_augment(tf.convert_to_tensor(x), tf.convert_to_tensor(y)) for x, y in zip(x_batch, y_batch)])
            x_aug = tf.stack(x_aug)
            y_aug = tf.stack(y_aug)
            g_loss, d_loss = train_step(x_aug, y_aug)
            g_total += g_loss
            d_total += d_loss

        avg_g = g_total / steps_per_epoch
        avg_d = d_total / steps_per_epoch
        print(f"[GAN] Epoch {epoch+1}/{gan_epochs} - G Loss: {avg_g:.4f}, D Loss: {avg_d:.4f}")

        avg_psnr, avg_ssim = evaluate_model(generator, X_test, y_test)
        print(f"[Eval] PSNR: {avg_psnr:.2f}, SSIM: {avg_ssim:.3f}")

        if avg_psnr > best_psnr:
            best_psnr = avg_psnr
            generator.save("best_denoiser2.h5")
            print(f"✅ Best model saved with PSNR: {best_psnr:.2f}")


train_model(X_train, y_train, X_test, y_test, batch_size=8, pretrain_epochs=20, gan_epochs=20)

In [None]:
# Load best generator
best_generator = tf.keras.models.load_model("best_denoiser.h5")

# Predict on test set
denoised_test = best_generator.predict(X_test, verbose=0)

# Baseline metrics (noisy vs. clean)
baseline_psnr = [psnr(y_test[i], X_test[i], data_range=1) for i in range(len(X_test))]
baseline_ssim = [ssim(y_test[i], X_test[i], data_range=1, channel_axis=-1) for i in range(len(X_test))]

# Denoised metrics (denoised vs. clean)
psnr_scores = [psnr(y_test[i], denoised_test[i], data_range=1) for i in range(len(X_test))]
ssim_scores = [ssim(y_test[i], denoised_test[i], data_range=1, channel_axis=-1) for i in range(len(X_test))]

# Print averages
print("=== Average Metrics ===")
print(f"Baseline PSNR:   {np.mean(baseline_psnr):.2f} ± {np.std(baseline_psnr):.2f}")
print(f"Denoised PSNR:   {np.mean(psnr_scores):.2f} ± {np.std(psnr_scores):.2f}")
print()
print(f"Baseline SSIM:   {np.mean(baseline_ssim):.2f} ± {np.std(baseline_ssim):.2f}")
print(f"Denoised SSIM:   {np.mean(ssim_scores):.2f} ± {np.std(ssim_scores):.2f}")

# Visualize
plt.figure(figsize=(15, 6))
for i in range(3):
    # Noisy
    plt.subplot(3, 4, i * 4 + 1)
    plt.imshow(X_test[i])
    plt.title("Noisy Input\nPSNR: {:.2f}".format(baseline_psnr[i]))
    plt.axis("off")

    # Denoised
    plt.subplot(3, 4, i * 4 + 2)
    plt.imshow(denoised_test[i])
    plt.title(f"Denoised\nPSNR: {psnr_scores[i]:.2f}")
    plt.axis("off")

    # Clean
    plt.subplot(3, 4, i * 4 + 3)
    plt.imshow(y_test[i])
    plt.title("Original Clean")
    plt.axis("off")

    # Error map (absolute difference)
    plt.subplot(3, 4, i * 4 + 4)
    error_map = np.abs(denoised_test[i] - y_test[i])
    plt.imshow(error_map, cmap="hot")
    plt.title("Error Map")
    plt.axis("off")

plt.tight_layout()
plt.show()