In [4]:
# !pip install bm3d
import bm3d

In [5]:
import os
import numpy as np
import pandas as pd

import cv2
from PIL import Image
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader, random_split

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# This function reduces salt-and-pepper noise from an image using a median filter.
def reduceSaltAndPeperNoise(image, kernel=3):
    image = np.array(image) 

    median_filtered = cv2.medianBlur(image, kernel)

    # Convert the filtered image back to a PIL Image and return it
    return Image.fromarray(median_filtered)

In [12]:
# This function uses the BM3D algorithm to reduce Gaussian noise from an image. 
# Before using it, ensure the 'bm3d' package is installed by running: pip3 install bm3d
def reduceGaussianNoise(image, sigma=20/255):
    image     = np.array(image) 
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    R, G, B   = cv2.split(image_rgb) 

    # Apply BM3D denoising on each channel (R, G, B), normalize by 255.0 and scale back after denoising
    R_denoised = bm3d.bm3d(R / 255.0, sigma) * 255
    G_denoised = bm3d.bm3d(G / 255.0, sigma) * 255
    B_denoised = bm3d.bm3d(B / 255.0, sigma) * 255

    # Merge the denoised channels back into a single image
    image_denoised = cv2.merge((R_denoised, G_denoised, B_denoised))
    # Clip the values to be in the range [0, 255] and convert to uint8 type
    image_denoised = np.clip(image_denoised, 0, 255).astype(np.uint8)

    # Convert the image back to BGR color space before returning
    image_denoised = cv2.cvtColor(image_denoised, cv2.COLOR_RGB2BGR)

    return image_denoised 


In [None]:
# Read the CSV file containing image labels
df = pd.read_csv("/content/drive/MyDrive/noise reduce/Labels.csv")

# Filter only the images with noise_type = "Periodic"
periodic_images = df[df["noise_type"] == "Periodic"]["image_name"].tolist()

In [None]:
class DenoisingDatasetPeriodic(Dataset):
    def __init__(self, noisy_dir, clean_dir, image_list, transform=None):
        self.noisy_dir = noisy_dir
        self.clean_dir = clean_dir
        self.transform = transform

        # Filter out invalid images
        self.image_list = []
        for filename in image_list:
            noisy_path = os.path.join(self.noisy_dir, filename)
            clean_path = os.path.join(self.clean_dir, filename)

            if os.path.exists(noisy_path) and os.path.exists(clean_path):
                noisy_img = cv2.imread(noisy_path)
                clean_img = cv2.imread(clean_path)

                if noisy_img is not None and clean_img is not None:
                    self.image_list.append(filename)

        print(f"✅ Selected {len(self.image_list)} valid images out of {len(image_list)} total images.")

    def __len__(self):
        return len(self.image_list)

    def __getitem__(self, idx):
        filename = self.image_list[idx]
        noisy_path = os.path.join(self.noisy_dir, filename)
        clean_path = os.path.join(self.clean_dir, filename)

        noisy_img = cv2.imread(noisy_path)
        clean_img = cv2.imread(clean_path)

        # Convert BGR to RGB
        noisy_img = cv2.cvtColor(noisy_img, cv2.COLOR_BGR2RGB)
        clean_img = cv2.cvtColor(clean_img, cv2.COLOR_BGR2RGB)

        # Resize images
        noisy_img = cv2.resize(noisy_img, (256, 256))
        clean_img = cv2.resize(clean_img, (256, 256))

        if self.transform:
            noisy_img = self.transform(noisy_img)
            clean_img = self.transform(clean_img)

        return noisy_img, clean_img

# 3. Image processing settings for RGB
transformP = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
])


In [None]:
import random

# Split dataset: 90% for training and 10% for testing
random.shuffle(periodic_images)  # Shuffle the image list randomly
train_size = int(0.9 * len(periodic_images))  # Compute training set size
test_size  = len(periodic_images) - train_size  # Compute testing set size
train_images, test_images = periodic_images[:train_size], periodic_images[train_size:]  # Split images

# 4. Create training and testing datasets
train_dataset = DenoisingDatasetPeriodic(
    "/content/drive/MyDrive/noise reduce/Noisy",
    "/content/drive/MyDrive/noise reduce/Clean",
    train_images,
    transform=transformP
)

test_dataset = DenoisingDatasetPeriodic(
    "/content/drive/MyDrive/noise reduce/Noisy",
    "/content/drive/MyDrive/noise reduce/Clean",
    test_images,
    transform=transformP
)

# Create DataLoaders for training and testing
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)  # Shuffle training data to improve generalization 
test_loader  = DataLoader(test_dataset, batch_size=4, shuffle=False)  # Keep test data order for consistent evaluation

In [None]:
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()

        # Define the encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 64, 3, padding=1),  # First convolution layer for RGB input
            nn.ReLU(),  # Activation function
            nn.MaxPool2d(2, 2),  # Downsampling (reduces spatial size)
            nn.Conv2d(64, 128, 3, padding=1),  # Second convolution layer
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(128, 256, 3, padding=1),  # Third convolution layer
            nn.ReLU(),
            nn.MaxPool2d(2, 2)  # Further downsampling
        )

        # Define the decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, 3, stride=2, padding=1, output_padding=1),  # First upsampling layer
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1),  # Second upsampling layer
            nn.ReLU(),
            nn.ConvTranspose2d(64, 3, 3, stride=2, padding=1, output_padding=1),  # Final layer to reconstruct RGB image
            nn.Sigmoid()  # Output values scaled to [0,1]
        )

    def forward(self, x):
        x = self.encoder(x)  # Encode the input image
        x = self.decoder(x)  # Decode to reconstruct the image
        return x

# Model testing
encoder      = Autoencoder()
sample_input = torch.randn(1, 3, 64, 64)  # A sample RGB image with size 64x64
output       = encoder(sample_input)

# 6. Model setup and training
device    = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # Use GPU if available, otherwise CPU
encoder   = Autoencoder().to(device)  # Move model to the selected device
criterion = nn.MSELoss()  # Mean Squared Error loss function for pixel-wise comparison
optimizer = optim.Adam(encoder.parameters(), lr=0.0005)  # Adam(Adaptive Moment Estimation) optimizer with learning rate 0.0005

In [None]:
# 7. Train the model on the training dataset
num_epochs = 30  # Number of epochs for training
for epoch in range(num_epochs):
    encoder.train()  # Set the model to training mode
    for noisy_imgs, clean_imgs in train_loader:  # Iterate over training data
        noisy_imgs, clean_imgs = noisy_imgs.to(device), clean_imgs.to(device)  # Move data to the selected device (GPU/CPU)

        optimizer.zero_grad()  # Zero the gradients before the backward pass
        outputs = encoder(noisy_imgs)  # Forward pass: get the model's predictions
        loss = criterion(outputs, clean_imgs)  # Calculate the loss (MSE between predicted and true clean images)
        loss.backward()  # Backward pass: compute gradients
        optimizer.step()  # Update model parameters

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")  # Print loss after each epoch


In [None]:
# 8. Evaluate the model on the test dataset
encoder.eval()  # Set the model to evaluation mode (disables dropout, batch norm)
total_loss = 0  # Initialize total loss for test dataset
with torch.no_grad():  # Disable gradient computation (no need for backpropagation)
    for noisy_imgs, clean_imgs in test_loader:  # Iterate over test data
        noisy_imgs, clean_imgs = noisy_imgs.to(device), clean_imgs.to(device)
        outputs = encoder(noisy_imgs)  # Forward pass: get model's predictions
        loss    = criterion(outputs, clean_imgs)  # Calculate the loss (MSE between predicted and true clean images)
        total_loss += loss.item()  # Accumulate the loss

# Calculate the average test loss over all batches
avg_test_loss = total_loss / len(test_loader)
print(f"Test Loss: {avg_test_loss:.4f}")  # Print the average test loss


In [None]:
def reducePeriodicNoise(image, model, transform, device):
    model.eval()  # Set the model to evaluation mode

    noisy_img = cv2.imread(image)
    noisy_img = cv2.cvtColor(noisy_img, cv2.COLOR_BGR2RGB)

    # Convert the image to PIL format (required by transform)
    noisy_img = Image.fromarray(noisy_img)

    # Apply the transformation (resize, normalization, etc.) and move it to the device (GPU/CPU)
    noisy_tensor = transform(noisy_img).unsqueeze(0).to(device)

    # Disable gradient calculation (since we are not training the model)
    with torch.no_grad():
        denoised_tensor = model(noisy_tensor).cpu().squeeze(0)  # Get model output and move it to CPU

    # Convert the model output (Tensor) to a NumPy array (from (Channels, Height, Width) to (H, W, C))
    denoised_img = denoised_tensor.permute(1, 2, 0).numpy()

    # Undo normalization and clip the values to [0, 255] for proper image representation
    denoised_img = (denoised_img * 255).clip(0, 255).astype(np.uint8)

    return denoised_img  # Return the denoised image as a NumPy array

In [16]:
# This function calculates the Peak Signal-to-Noise Ratio (PSNR) between the original and reconstructed images.
def psnr(original_image, reconstructed_image):
    original_image      = np.array(original_image, dtype=np.float64)  
    reconstructed_image = np.array(reconstructed_image, dtype=np.float64)

    # Calculate Mean Squared Error (MSE) between the original and reconstructed images
    mse = np.mean((original_image - reconstructed_image) ** 2)

    # If MSE is 0, return infinity (i.e., perfect match between images)
    if mse == 0:
        return float('inf')

    max_pixel_value = 255  # The maximum pixel value for an 8-bit image
    # Calculate PSNR using the formula: PSNR = 10 * log10((max_value^2) / MSE)
    psnr = 10 * np.log10((max_pixel_value ** 2) / mse)
    
    return psnr


In [None]:
from pathlib import Path

#open images and read images
class NoiseDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.data       = pd.read_csv(csv_file)
        self.img_dir    = img_dir
        self.transform  = transform
        self.label_map  = {"Salt & Pepper": 0, "Gaussian": 1, "Periodic": 2} #map noise type to number
        self.valid_data = self._filter_valid_images()

    def _filter_valid_images(self):
        valid_rows = []
        for idx in range(len(self.data)):
            img_path = Path(self.img_dir) / self.data.iloc[idx, 0]
            if img_path.exists():  # check for existing image
                valid_rows.append(self.data.iloc[idx]) #append to dataset
        return pd.DataFrame(valid_rows)

    def __len__(self):
        return len(self.valid_data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.valid_data.iloc[idx, 0])
        image = Image.open(img_name).convert("RGB")
        label = self.label_map[self.valid_data.iloc[idx, 1]]

        if self.transform:
            image = self.transform(image)

        return image, label

In [None]:
transform = transforms.Compose([
    transforms.Resize((256, 192)), #resize images for train model
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2), #cahnge brightness and contrast of images for better learning
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]) #normalized images with static mean and std
])

csv_path = "/content/drive/MyDrive/reduce noise/Labels.csv"
img_dir  = "/content/drive/MyDrive/reduce noise/Noisy"
full_dataset = NoiseDataset(csv_path, img_dir, transform=transform)

# split data to train and test part with 0.8 rate
train_size = int(0.8 * len(full_dataset))
test_size  = len(full_dataset) - train_size
train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True) #set batch size for train model
test_loader  = DataLoader(test_dataset, batch_size=32, shuffle=False) #set natch size for test model

In [None]:
#CNN definition for Lable Detection
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1) #concolutional layer definition
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool  = nn.MaxPool2d(2, 2) #compression features
        self.fc1   = nn.Linear(64 * 32 * 24, 128) #reduce classes
        self.fc2   = nn.Linear(128, 3) #reduce 128 classes to 3 classes
        self.relu  = nn.ReLU()

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x))) #layer 1 process
        x = self.pool(self.relu(self.conv2(x))) #layer 2 process
        x = self.pool(self.relu(self.conv3(x))) #layer 3 process

        x = x.view(x.size(0), -1) #flatten images
        x = self.relu(self.fc1(x)) #first layer
        x = self.fc2(x) #end layer
        return x

In [None]:
device    = torch.device("cuda" if torch.cuda.is_available() else "cpu") #use GPU if we have GPU, else use CPU
model     = CNNModel().to(device)
criterion = nn.CrossEntropyLoss()

optimizer = optim.Adam(model.parameters(), lr=0.0005, weight_decay=1e-4) #set Adam optimizer and set learning rate to 0.0005 and set for weight_decay for reduce overfitting 

In [None]:
#train model
epochs    = 15 
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1) #Reduce the learning rate by a factor of ten every ten epochs

for epoch in range(epochs):
    model.train()
    running_loss = 0.0 #total loss of each epoch
    correct      = 0
    total        = 0

    for images, labels in train_loader: #load for all batches
        images, labels = images.to(device), labels.to(device) #give every batch to devices(GPU)

        optimizer.zero_grad() #set zero for all of last gradian 
        outputs = model(images) #provide the images to the model to obtain the output
        loss = criterion(outputs, labels) #calcute loss value
        loss.backward() #calcuteing the derivative of the error with respect to the weights
        optimizer.step() #update weights

        # calcute prediction
        _, predicted = torch.max(outputs, 1) #find max of class
        total += labels.size(0) #sum of visited items
        correct += (predicted == labels).sum().item() #calcute sum of correct predicting

        running_loss += loss.item() #reduce loarning rate

    scheduler.step()

    train_acc = 100 * correct / total #calcute accuracy
    print(f"Epoch {epoch+1}, Loss: {running_loss/len(train_loader):.4f}, Train Acc: {train_acc:.2f}%")


Epoch 1, Loss: 0.1325, Train Acc: 96.75%
Epoch 2, Loss: 0.1417, Train Acc: 95.83%
Epoch 3, Loss: 0.1319, Train Acc: 96.82%
Epoch 4, Loss: 0.1324, Train Acc: 95.89%
Epoch 5, Loss: 0.1366, Train Acc: 95.89%
Epoch 6, Loss: 0.1277, Train Acc: 96.16%
Epoch 7, Loss: 0.1429, Train Acc: 96.56%
Epoch 8, Loss: 0.1340, Train Acc: 96.42%
Epoch 9, Loss: 0.1358, Train Acc: 96.69%
Epoch 10, Loss: 0.1458, Train Acc: 95.56%
Epoch 11, Loss: 0.1406, Train Acc: 96.36%
Epoch 12, Loss: 0.1343, Train Acc: 96.16%
Epoch 13, Loss: 0.1319, Train Acc: 96.56%
Epoch 14, Loss: 0.1409, Train Acc: 96.09%
Epoch 15, Loss: 0.1337, Train Acc: 96.36%


In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

model.eval() #set model to evaluation mode
y_true = [] #real label
y_pred = [] #model predict

with torch.no_grad(): #stop cacuting gradian
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images) #model outputs for test datas
        _, predicted = torch.max(outputs, 1) #find max classes

        y_true.extend(labels.cpu().numpy()) # transfer real labels from GPU to cpu and cascade them to numpy array
        y_pred.extend(predicted.cpu().numpy()) # transfer predicted labels from GPU to cpu and cascade them to numpy array

# calcuting Evaluation metrics
accuracy    = accuracy_score  (y_true, y_pred)
precision   = precision_score (y_true, y_pred, average="weighted")
recall      = recall_score    (y_true, y_pred, average="weighted")
f1          = f1_score        (y_true, y_pred, average="weighted")
conf_matrix = confusion_matrix(y_true, y_pred)


tn = conf_matrix[0][0]  # True Negative
fp = conf_matrix[0][1]  # False Positive
specificity = tn / (tn + fp) if (tn + fp) != 0 else 0  # Preventing division by zero

print(f"Accuracy: {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"Specificity: {specificity:.2f}")
print(f"F1-Score: {f1:.2f}")
print("Confusion Matrix:")
print(conf_matrix)

Accuracy: 87.50%


In [None]:
# noisre classes
class_labels = ["Salt & Pepper", "Gaussian", "Periodic"]

image_folder       = "/content/drive/MyDrive/reduce noise/Test" # NoisyTest
output_folder      = "/content/drive/MyDrive/reduce noise/filtered_images"
clean_image_folder = "/content/drive/MyDrive/reduce noise/Clean" # Without-NoisyTest

os.makedirs(output_folder, exist_ok=True)

psnr_values = []

for img_name in os.listdir(image_folder):
    img_path = os.path.join(image_folder, img_name)
    image = Image.open(img_path).convert("RGB") #read images and convert them to RGB
    image_tensor = transform(image) #preprocess
    image_tensor = image_tensor.unsqueeze(0).to(device)  # send batch to GPU

    # پیش‌بینی مدل
    with torch.no_grad():
        output       = model(image_tensor)
        _, predicted = torch.max(output, 1)  # fing max class

    predicted_label  = class_labels[predicted.item()]

    #select function by label
    if predicted_label == "Salt & Pepper":
        filtered_image = reduceSaltAndPeperNoise(image)
    elif predicted_label == "Gaussian":
        filtered_image = reduceGaussianNoise(image)
    elif predicted_label == "Periodic":
        filtered_image = reducePeriodicNoise(image, encoder, transformP, device)

    output_path = os.path.join(output_folder, img_name)
    filtered_image.save(output_path)

    original_path = os.path.join(clean_image_folder, img_name)
    if os.path.exists(original_path):
        original_image = Image.open(original_path).convert("RGB")
        psnr_value = psnr(original_image, filtered_image) #calcute PSNR
        psnr_values.append(psnr_value)

#calcute avg of PSNR
if psnr_values:
    mean_psnr = np.mean(psnr_values)
    print(f"میانگین PSNR برای تمام تصاویر: {mean_psnr:.2f} dB")
else:
    print("هیچ مقدار PSNR محاسبه نشد!")