In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
import os
import glob
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, MaxPooling2D, UpSampling2D, Dropout, Concatenate, Add, ReLU
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.losses import MeanSquaredError
from sklearn.model_selection import train_test_split
from skimage.metrics import peak_signal_noise_ratio as psnr, mean_squared_error as mse, structural_similarity as ssim

### Define working directory and paths

In [None]:
work = input('input system path for working directory :')

In [None]:
test_dir = input('input system path for test dataset :')

In [None]:
predict = input('input system path for predicted :')

In [2]:
working_dir = work
noisy_train_path = os.path.join(working_dir, "Train/low")
clean_train_path = os.path.join(working_dir, "Train/high")
noisy_test_path = test_dir
predicted_dir = predict

# Ensure predicted directory exists
os.makedirs(predicted_dir, exist_ok=True)

### Get a sorted list of images from the subdirectory specified

In [3]:
noisy_train_images = sorted(glob.glob(os.path.join(noisy_train_path, '*')))
clean_train_images = sorted(glob.glob(os.path.join(clean_train_path, '*')))
noisy_test_images = sorted(glob.glob(os.path.join(noisy_test_path, '*')))

### Preprocessing function

In [4]:
def image_preprocessing(path):
    img = Image.open(path)
    img = img.resize((256, 256))  # Resizing to 256x256
    img = img.convert("L")  # Convert to grayscale
    img = np.asarray(img, dtype="float32") / 255.0
    img = np.reshape(img, (256, 256, 1))
    return img

### Preprocess the training images

In [5]:
noised_train = [image_preprocessing(f) for f in noisy_train_images]
cleaned_train = [image_preprocessing(f) for f in clean_train_images]

### Split Dataset

In [6]:
x_train = np.asarray(noised_train)
y_train = np.asarray(cleaned_train)
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.20, random_state=42)

### Model building

In [7]:
class Conv_block(tf.keras.layers.Layer):
    def __init__(self, num_filters=64, kernel_size=3, **kwargs):
        super().__init__(**kwargs)
        self.num_filters = num_filters
        self.kernel_size = kernel_size
        self.conv = Conv2D(filters=self.num_filters, kernel_size=self.kernel_size, padding='same', activation='relu')

    def call(self, x):
        x = self.conv(x)
        return x

class DWT_downsampling(tf.keras.layers.Layer):
    def call(self, x):
        x1 = x[:, 0::2, 0::2, :]  # x(2i−1, 2j−1)
        x2 = x[:, 1::2, 0::2, :]  # x(2i, 2j-1)
        x3 = x[:, 0::2, 1::2, :]  # x(2i−1, 2j)
        x4 = x[:, 1::2, 1::2, :]  # x(2i, 2j)

        x_LL = x1 + x2 + x3 + x4
        x_LH = -x1 - x3 + x2 + x4
        x_HL = -x1 + x3 - x2 + x4
        x_HH = x1 - x3 - x2 + x4

        return Concatenate(axis=-1)([x_LL, x_LH, x_HL, x_HH])

class IWT_upsampling(tf.keras.layers.Layer):
    def call(self, x):
        x_LL = x[:, :, :, 0:x.shape[3] // 4]
        x_LH = x[:, :, :, x.shape[3] // 4:x.shape[3] // 4 * 2]
        x_HL = x[:, :, :, x.shape[3] // 4 * 2:x.shape[3] // 4 * 3]
        x_HH = x[:, :, :, x.shape[3] // 4 * 3:]

        x1 = (x_LL - x_LH - x_HL + x_HH) / 4
        x2 = (x_LL - x_LH + x_HL - x_HH) / 4
        x3 = (x_LL + x_LH - x_HL - x_HH) / 4
        x4 = (x_LL + x_LH + x_HL + x_HH) / 4

        y1 = tf.stack([x1, x3], axis=2)
        y2 = tf.stack([x2, x4], axis=2)
        shape = tf.shape(x)
        return tf.reshape(tf.concat([y1, y2], axis=-1), tf.stack([shape[0], shape[1] * 2, shape[2] * 2, shape[3] // 4]))

In [8]:
def create_model():
    input = Input(shape=(256, 256, 1))
    cb_1 = Conv_block(num_filters=64)(input)
    dwt_1 = DWT_downsampling()(cb_1)
    cb_2 = Conv_block(num_filters=128)(dwt_1)
    dwt_2 = DWT_downsampling()(cb_2)
    cb_3 = Conv_block(num_filters=256)(dwt_2)
    dwt_3 = DWT_downsampling()(cb_3)
    cb_4 = Conv_block(num_filters=512)(dwt_3)
    dwt_4 = DWT_downsampling()(cb_4)
    cb_5 = Conv_block(num_filters=512)(dwt_4)
    cb_5 = BatchNormalization()(cb_5)
    cb_5 = Conv_block(num_filters=512)(cb_5)
    cb_5 = Conv2D(filters=2048, kernel_size=3, strides=1, padding='same')(cb_5)
    up = IWT_upsampling()(cb_5)
    up = Conv_block(num_filters=512)(Add()([up, cb_4]))
    up = Conv2D(filters=1024, kernel_size=3, strides=1, padding='same')(up)
    up = IWT_upsampling()(up)
    up = Conv_block(num_filters=256)(Add()([up, cb_3]))
    up = Conv2D(filters=512, kernel_size=3, strides=1, padding='same')(up)
    up = IWT_upsampling()(up)
    up = Conv_block(num_filters=128)(Add()([up, cb_2]))
    up = Conv2D(filters=256, kernel_size=3, strides=1, padding='same')(up)
    up = IWT_upsampling()(up)
    up = Conv_block(num_filters=64)(Add()([up, cb_1]))
    up = Conv2D(filters=128, kernel_size=3, strides=1, padding='same')(up)
    out = Conv2D(filters=1, kernel_size=(1, 1), padding="same")(up)
    model = Model(inputs=[input], outputs=[out])
    return model

model = create_model()
model.summary()

### Compile and train the model

In [9]:
steps_per_epoch_train = len(noisy_train_images) // 16
steps_per_epoch_validation = len(noisy_test_images) // 16

callbacks_lst = [
    ReduceLROnPlateau(monitor='val_loss', min_lr=0.0000009, min_delta=0.0001, factor=0.70, patience=3, verbose=1, mode='min'),
    EarlyStopping(monitor='val_loss', mode='min', verbose=1, min_delta=0.0001, patience=10)
]

model.compile(loss=MeanSquaredError(), optimizer=Adam(learning_rate=0.0009))
model.fit(x_train, y_train,
          validation_data=(x_val, y_val),
          steps_per_epoch=steps_per_epoch_train,
          validation_steps=steps_per_epoch_validation,
          epochs=300,
          verbose=1,
          callbacks=callbacks_lst)
model.save(os.path.join(working_dir, 'model.h5'))

### Evaluate the model

In [10]:
# Load and preprocess test images
noised_test = [image_preprocessing(f) for f in noisy_test_images]
x_test = np.asarray(noised_test)

# Predict denoised images
pred = model.predict(x_test, batch_size=16)

psnr_values = []
mse_values = []
ssim_values = []

# Ensure predicted directory exists
os.makedirs(predicted_dir, exist_ok=True)

for i in range(len(x_test)):
    original = np.clip(x_test[i] * 255.0, 0, 255)  # Clip values to [0, 255]
    denoised = np.clip(pred[i] * 255.0, 0, 255)   # Clip values to [0, 255]

    psnr_value = psnr(original, denoised, data_range=255)  # Specify data_range
    mse_value = mse(original, denoised)
    ssim_value = ssim(original, denoised, data_range=255)

    psnr_values.append(psnr_value)
    mse_values.append(mse_value)
    ssim_values.append(ssim_value)

    denoised_img = Image.fromarray(denoised.squeeze().astype(np.uint8))
    denoised_img.save(os.path.join(predicted_dir, os.path.basename(noisy_test_images[i])))

mean_psnr = np.mean(psnr_values)
mean_mse = np.mean(mse_values)
mean_ssim = np.mean(ssim_values)

print(f"Mean PSNR: {mean_psnr:.2f}")
print(f"Mean MSE: {mean_mse:.4f}")
print(f"Mean SSIM: {mean_ssim:.4f}")

### Plot some examples with their PSNR values

In [11]:
plt.figure(figsize=(15, 25))
for i in range(0, 8, 2):
    if i >= len(x_test):
        break
    plt.subplot(4, 2, i + 1)
    plt.xticks([])
    plt.yticks([])
    plt.imshow(x_test[i][:, :, 0], cmap='gray')
    plt.title('Test Image with Noise')

    plt.subplot(4, 2, i + 2)
    plt.xticks([])
    plt.yticks([])
    plt.imshow(pred[i][:, :, 0], cmap='gray')
    plt.title(f'Denoised by Autoencoder - PSNR: {psnr_values[i]:.2f} dB')

plt.show()