In [1]:
# src/data_generator.py

import os
import numpy as np
from tensorflow.keras.utils import Sequence
from tensorflow.keras.preprocessing.image import load_img, img_to_array

class DataGenerator(Sequence):
    def __init__(self, noisy_dir, clean_dir, batch_size=32, img_size=(256, 256)):
        self.noisy_dir = noisy_dir
        self.clean_dir = clean_dir
        self.batch_size = batch_size
        self.img_size = img_size
        self.noisy_files = os.listdir(self.noisy_dir)
        self.clean_files = os.listdir(self.clean_dir)
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.noisy_files) / self.batch_size))

    def __getitem__(self, index):
        batch_noisy = self.noisy_files[index * self.batch_size:(index + 1) * self.batch_size]
        batch_clean = self.clean_files[index * self.batch_size:(index + 1) * self.batch_size]

        X, y = self.__data_generation(batch_noisy, batch_clean)
        return X, y

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.noisy_files))

    def __data_generation(self, batch_noisy, batch_clean):
        X = np.empty((self.batch_size, *self.img_size, 3))
        y = np.empty((self.batch_size, *self.img_size, 3))

        for i, (noisy_file, clean_file) in enumerate(zip(batch_noisy, batch_clean)):
            noisy_img = load_img(os.path.join(self.noisy_dir, noisy_file), target_size=self.img_size)
            clean_img = load_img(os.path.join(self.clean_dir, clean_file), target_size=self.img_size)

            X[i,] = img_to_array(noisy_img) / 255.0
            y[i,] = img_to_array(clean_img) / 255.0

        return X, y


In [2]:
# src/mirnet_model.py
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, ReLU, Add
from tensorflow.keras.models import Model

def residual_block(x, filters, kernel_size=3, stride=1):
    res = Conv2D(filters, kernel_size, strides=stride, padding='same')(x)
    res = ReLU()(res)
    res = Conv2D(filters, kernel_size, strides=stride, padding='same')(res)
    return Add()([x, res])

def mirnet_model(input_shape):
    inputs = Input(shape=input_shape)

    x = Conv2D(64, (3, 3), padding='same')(inputs)
    x = ReLU()(x)

    for _ in range(3):  # You can adjust the number of residual blocks
        x = residual_block(x, 64)

    x = Conv2D(3, (3, 3), padding='same')(x)
    outputs = Add()([inputs, x])

    return Model(inputs, outputs)


In [None]:
import os
import numpy as np
import gc
import tensorflow.keras.backend as K
from tensorflow.keras.optimizers import Adam
from skimage.metrics import peak_signal_noise_ratio as psnr
from tensorflow.keras.models import load_model

# Define custom MSE loss function
def mse_loss(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true), axis=-1)

# Paths to the data directories in Google Drive
data_dir = '/content/drive/MyDrive/dataset'  # Adjust this path if necessary

noisy_train_dir = os.path.join(data_dir, 'train', 'low')
clean_train_dir = os.path.join(data_dir, 'train', 'high')
noisy_val_dir = os.path.join(data_dir, 'val', 'low')
clean_val_dir = os.path.join(data_dir, 'val', 'high')
noisy_test_dir = os.path.join(data_dir, 'test', 'low')
clean_test_dir = os.path.join(data_dir, 'test', 'high')

batch_size = 32
img_size = (128, 128)
epochs = 150

# Create data generators
train_generator = DataGenerator(noisy_train_dir, clean_train_dir, batch_size=batch_size, img_size=img_size)
val_generator = DataGenerator(noisy_val_dir, clean_val_dir, batch_size=batch_size, img_size=img_size)
test_generator = DataGenerator(noisy_test_dir, clean_test_dir, batch_size=batch_size, img_size=img_size)

# Define input shape based on image size
input_shape = (*img_size, 3)

# Create and compile MIRNet model
model = mirnet_model(input_shape)
model.compile(optimizer=Adam(learning_rate=1e-4), loss=mse_loss, metrics=['mae'])

# Print model summary
model.summary()

# Train the model
model.fit(train_generator, validation_data=val_generator, epochs=epochs, verbose=1)

# Optimized evaluate_model function
def evaluate_model(generator):
    total_psnr = 0.0
    total_loss = 0.0
    total_batches = 0

    all_preds = []
    all_batch_ys = []

    # Iterate through the generator to get all predictions and ground truth
    for batch_x, batch_y in generator:
        preds = model.predict(batch_x, verbose=0)
        all_preds.append(preds)
        all_batch_ys.append(batch_y)
        total_batches += 1
        print(f'Processed batch {total_batches}')
        gc.collect()  # Explicitly invoke garbage collection

    # Concatenate all batches
    all_preds = np.concatenate(all_preds, axis=0)
    all_batch_ys = np.concatenate(all_batch_ys, axis=0)
    print('Concatenation complete')

    # # Calculate PSNR for the entire dataset
    psnr_values = [psnr(all_batch_ys[i], all_preds[i], data_range=all_batch_ys[i].max() - all_batch_ys[i].min()) for i in range(len(all_batch_ys))]
    avg_psnr = np.mean(psnr_values)
    print(f'PSNR calculation complete: {avg_psnr}')

    # Calculate loss for the entire dataset
    total_loss = model.evaluate(all_preds, all_batch_ys, verbose=0)[0]  # Assuming first element is loss
    print('Loss evaluation complete')

    return total_loss, avg_psnr

# Evaluate the model on test data
test_loss, test_psnr = evaluate_model(test_generator)
print(f'Test Loss: {test_loss}, Test PSNR: {test_psnr}')

# Save the model after training
model.save('mirnet_model_trained.h5')
print('Model saved successfully')

# Load the trained model
model = load_model('/content/mirnet_model_trained.h5', custom_objects={'mse_loss': mse_loss})
print('Model loaded successfully')


Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 128, 128, 3)]        0         []                            
                                                                                                  
 conv2d (Conv2D)             (None, 128, 128, 64)         1792      ['input_1[0][0]']             
                                                                                                  
 re_lu (ReLU)                (None, 128, 128, 64)         0         ['conv2d[0][0]']              
                                                                                                  
 conv2d_1 (Conv2D)           (None, 128, 128, 64)         36928     ['re_lu[0][0]']               
                                                                                              

In [None]:
# evaluate_model.py

import os
import numpy as np
from tensorflow.keras.models import load_model
import matplotlib.pyplot as plt
import tensorflow.keras.backend as K

# Define custom MSE loss function
def mse_loss(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true), axis=-1)

# Paths and parameters
data_dir = '/content/drive/MyDrive/dataset'
noisy_val_dir = os.path.join(data_dir, 'val', 'low')
clean_val_dir = os.path.join(data_dir, 'val', 'high')
img_size = (128, 128)
batch_size =

# Load trained model
model = load_model('/content/mirnet_model_trained.h5', custom_objects={'mse_loss': mse_loss})

# Create data generator for validation set
val_generator = DataGenerator(noisy_val_dir, clean_val_dir, batch_size=batch_size, img_size=img_size)

# Evaluate model on validation set
val_loss = model.evaluate(val_generator)
print(f'Validation Loss: {val_loss}')

# Visualize example denoising results
def visualize_denoising(model, val_generator, num_samples=4):  # Ensure num_samples is not greater than batch_size
    noisy_images, clean_images = val_generator[0]  # Assuming batch size of 8, take first batch
    denoised_images = model.predict(noisy_images)

    plt.figure(figsize=(15, 6))
    for i in range(num_samples):
        plt.subplot(3, num_samples, i + 1)
        plt.imshow(np.clip(noisy_images[i], 0, 1))  # Clip values to [0, 1] for visualization
        plt.title('Noisy')
        plt.axis('off')

        plt.subplot(3, num_samples, num_samples + i + 1)
        plt.imshow(np.clip(denoised_images[i], 0, 1))  # Clip values to [0, 1] for visualization
        plt.title('Denoised')
        plt.axis('off')

        plt.subplot(3, num_samples, 2 * num_samples + i + 1)
        plt.imshow(np.clip(clean_images[i], 0, 1))  # Clip values to [0, 1] for visualization
        plt.title('Clean')
        plt.axis('off')

    plt.tight_layout()
    plt.show()

# Visualize denoising results
visualize_denoising(model, val_generator)


In [None]:
# evaluate_model.py

import os
import numpy as np
from tensorflow.keras.models import load_model
import matplotlib.pyplot as plt
import tensorflow.keras.backend as K

# Define custom MSE loss function
def mse_loss(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true), axis=-1)

# Paths and parameters
data_dir = '/content/drive/MyDrive/dataset'
noisy_val_dir = os.path.join(data_dir, 'test', 'low')
clean_val_dir = os.path.join(data_dir, 'test', 'high')
img_size = (128, 128)
batch_size = 4

# Load trained model
model = load_model('/content/mirnet_model_trained.h5', custom_objects={'mse_loss': mse_loss})

# Create data generator for validation set
val_generator = DataGenerator(noisy_val_dir, clean_val_dir, batch_size=batch_size, img_size=img_size)

# Evaluate model on validation set
val_loss = model.evaluate(val_generator)
print(f'Validation Loss: {val_loss}')

# Visualize example denoising results
def visualize_denoising(model, val_generator, num_samples=4):  # Ensure num_samples is not greater than batch_size
    noisy_images, clean_images = val_generator[0]  # Assuming batch size of 8, take first batch
    denoised_images = model.predict(noisy_images)

    plt.figure(figsize=(15, 6))
    for i in range(num_samples):
        plt.subplot(3, num_samples, i + 1)
        plt.imshow(np.clip(noisy_images[i], 0, 1))  # Clip values to [0, 1] for visualization
        plt.title('Noisy')
        plt.axis('off')

        plt.subplot(3, num_samples, num_samples + i + 1)
        plt.imshow(np.clip(denoised_images[i], 0, 1))  # Clip values to [0, 1] for visualization
        plt.title('Denoised')
        plt.axis('off')

        plt.subplot(3, num_samples, 2 * num_samples + i + 1)
        plt.imshow(np.clip(clean_images[i], 0, 1))  # Clip values to [0, 1] for visualization
        plt.title('Clean')
        plt.axis('off')

    plt.tight_layout()
    plt.show()

# Visualize denoising results
visualize_denoising(model, val_generator)


In [None]:
# denoise_image.py
import cv2
import numpy as np
from tensorflow.keras.models import load_model

# Load the trained model
model = load_model('/content/mirnet_model_trained.h5', custom_objects={'mse_loss': mse_loss})

# Function to load and preprocess an image
def load_and_preprocess_image(image_path, img_size):
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, img_size)
    image = image.astype(np.float32) / 255.0
    return image

# Path to the noisy image
noisy_image_path = '/content/image3.jpg'
img_size = (128, 128)

# Load and preprocess the noisy image
noisy_image = load_and_preprocess_image(noisy_image_path, img_size)
input_image = np.expand_dims(noisy_image, axis=0)

# Use the model to denoise the image
denoised_image = model.predict(input_image)
denoised_image = np.squeeze(denoised_image, axis=0)
denoised_image = (denoised_image * 255).astype(np.uint8)
denoised_image = cv2.cvtColor(denoised_image, cv2.COLOR_RGB2BGR)

# Save the denoised image
cv2.imwrite('denoised_image.jpg', denoised_image)

