In [None]:
import os
import numpy as np
import imageio.v2 as imageio  # Use imageio.v2 to avoid deprecation warnings
import torch

device = "cuda" if torch.cuda.is_available() else "cpu"

# Paths
HR_DIR = "/kaggle/input/lsdir33/0003000"  # High-resolution images
NOISY_DIR = "/kaggle/working/lsdir33/noise"  # Output directory for noisy images

# Ensure output directory exists
os.makedirs(NOISY_DIR, exist_ok=True)

import cupy as cp

def add_noise(image, sigma=50):
    image = cp.array(image / 255, dtype=cp.float32)
    noise = cp.random.normal(0, sigma / 255, image.shape)
    gauss_noise = image + noise
    return (gauss_noise * 255).get()  # Convert back to NumPy

def save_image(image, path):
    """
    Saves an image after clipping and rounding to uint8 format.
    image: Image as numpy array.
    path: Save location.
    """
    image = np.round(np.clip(image, 0, 255)).astype(np.uint8)
    imageio.imwrite(path, image)

def crop_image(image, s=8):
    """
    Crops an image so its width & height are multiples of 's'.
    """
    h, w, c = image.shape
    image = image[:h - h % s, :w - w % s, :]
    return image

# Process all images in the HR directory
for img_name in os.listdir(HR_DIR):
    if img_name.endswith(".png"):  # Process only PNG files
        img_path = os.path.join(HR_DIR, img_name)
        noisy_img_path = os.path.join(NOISY_DIR, img_name)

        # Read and process the image
        img = imageio.imread(img_path)
        img = crop_image(img)  # Ensure size is multiple of 8
        img_noise = add_noise(img, sigma=50)  # Add noise

        # Save the noisy image
        save_image(img_noise, noisy_img_path)
        print(f"Processed: {img_name}")

print("All images processed and saved in", NOISY_DIR)


In [None]:
import os
import cv2
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader, ConcatDataset
from PIL import Image

# ✅ Custom Dataset Class (Handles Clean & Noisy Images)
class DenoiseDataset(Dataset):
    def __init__(self, clean_dir, noisy_dir, transform=None):
        self.clean_images = sorted(os.listdir(clean_dir))
        self.noisy_images = sorted(os.listdir(noisy_dir))
        self.clean_dir = clean_dir
        self.noisy_dir = noisy_dir
        self.transform = transform

    def __len__(self):
        return len(self.clean_images)

    def __getitem__(self, idx):
        clean_path = os.path.join(self.clean_dir, self.clean_images[idx])
        noisy_path = os.path.join(self.noisy_dir, self.noisy_images[idx])

        # Load images using OpenCV (NumPy arrays)
        clean_img = cv2.imread(clean_path)
        noisy_img = cv2.imread(noisy_path)

        # Convert BGR to RGB
        clean_img = cv2.cvtColor(clean_img, cv2.COLOR_BGR2RGB)
        noisy_img = cv2.cvtColor(noisy_img, cv2.COLOR_BGR2RGB)

        # Convert NumPy array to PIL Image
        clean_img = Image.fromarray(clean_img)
        noisy_img = Image.fromarray(noisy_img)

        # Apply transformations
        if self.transform:
            clean_img = self.transform(clean_img)
            noisy_img = self.transform(noisy_img)

        return noisy_img, clean_img

# ✅ Data Augmentation & Transformations
transform = transforms.Compose([
    transforms.Resize((512, 512)),
    transforms.ToTensor()  # Convert to PyTorch tensor
])

# ✅ Define dataset paths (for 4 datasets)
dataset_paths = [
    ("/kaggle/input/div2k-high-resolution-images/DIV2K_train_HR/DIV2K_train_HR", "/kaggle/input/noisyd/noisy_data/DIV2K/DIV2K_train_HR/DIV2K_train_HR"),
    ("/kaggle/input/lsdier22/0002000", "/kaggle/working/lsdir22/noise"),
    ("/kaggle/input/lsdir11/0001000", "/kaggle/working/lsdir11/noise"),
    ("/kaggle/input/lsdir33/0003000", "/kaggle/working/lsdir33/noise"),
    ("/kaggle/input/lsdir44/0004000", "/kaggle/working/lsdir44/noise"),
    ("/kaggle/input/lsdir55/0005000", "/kaggle/working/lsdir55/noise")
]

# ✅ Create datasets for all 6 datasets
datasets = [DenoiseDataset(clean, noisy, transform) for clean, noisy in dataset_paths]

# ✅ Combine datasets into one
combined_dataset = ConcatDataset(datasets)

# ✅ Create DataLoader
train_loader = DataLoader(combined_dataset, batch_size=4, shuffle=True, num_workers=4)

print(f"Total images in combined dataset: {len(combined_dataset)}")
print("✅ DataLoader ready!")

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class UNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=3, features=[64, 128, 256, 512]):
        super(UNet, self).__init__()
        
        # Encoder
        self.encoders = nn.ModuleList()
        for feature in features:
            self.encoders.append(self._conv_block(in_channels, feature))
            in_channels = feature
        
        # Bottleneck
        self.bottleneck = self._conv_block(features[-1], features[-1] * 2)
        
        # Decoder
        self.decoders = nn.ModuleList()
        for feature in reversed(features):
            self.decoders.append(nn.ConvTranspose2d(feature * 2, feature, kernel_size=2, stride=2))
            self.decoders.append(self._conv_block(feature * 2, feature))
        
        # Final Output Layer
        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)

    def _conv_block(self, in_channels, out_channels):
        """Double Convolution Block"""
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        skip_connections = []
        for encoder in self.encoders:
            x = encoder(x)
            skip_connections.append(x)
            x = F.max_pool2d(x, kernel_size=2, stride=2)
        
        x = self.bottleneck(x)

        skip_connections = skip_connections[::-1]
        for i in range(0, len(self.decoders), 2):
            x = self.decoders[i](x)  # Upconvolution
            skip_connection = skip_connections[i // 2]

            if x.shape != skip_connection.shape:
                x = F.interpolate(x, size=skip_connection.shape[2:], mode="bilinear", align_corners=True)

            x = torch.cat((skip_connection, x), dim=1)  # Skip connection
            x = self.decoders[i + 1](x)  # Convolution Block

        return self.final_conv(x)
        

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Charbonnier Loss (L1-like but robust)
class CharbonnierLoss(nn.Module):
    def __init__(self, epsilon=1e-3):
        super(CharbonnierLoss, self).__init__()
        self.epsilon = epsilon

    def forward(self, pred, target):
        return torch.mean(torch.sqrt((pred - target) ** 2 + self.epsilon ** 2))

# SSIM Loss
def ssim_loss(pred, target):
    mu_pred = F.avg_pool2d(pred, kernel_size=3, stride=1, padding=1)
    mu_target = F.avg_pool2d(target, kernel_size=3, stride=1, padding=1)
    sigma_pred = F.avg_pool2d(pred ** 2, kernel_size=3, stride=1, padding=1) - mu_pred ** 2
    sigma_target = F.avg_pool2d(target ** 2, kernel_size=3, stride=1, padding=1) - mu_target ** 2
    sigma_pred_target = F.avg_pool2d(pred * target, kernel_size=3, stride=1, padding=1) - mu_pred * mu_target
    c1, c2 = 0.01 ** 2, 0.03 ** 2
    ssim = ((2 * mu_pred * mu_target + c1) * (2 * sigma_pred_target + c2)) / ((mu_pred ** 2 + mu_target ** 2 + c1) * (sigma_pred + sigma_target + c2))
    return torch.clamp((1 - ssim) / 2, 0, 1).mean()

# Denoising Loss (Only Charbonnier + SSIM)
class DenoiseLoss(nn.Module):
    def __init__(self):
        super(DenoiseLoss, self).__init__()
        self.charbonnier = CharbonnierLoss()
        self.alpha = nn.Parameter(torch.tensor(0.5))  # Learnable weight for Charbonnier
        self.beta = nn.Parameter(torch.tensor(0.5))   # Learnable weight for SSIM

    def forward(self, pred, target):
        l1 = self.charbonnier(pred, target)
        ssim = ssim_loss(pred, target)

        # Normalize weights dynamically
        total_weight = self.alpha + self.beta
        return (self.alpha / total_weight) * l1 + (self.beta / total_weight) * ssim

# Initialize UNet Model (Ensure UNet class is defined before this)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = UNet(in_channels=3, out_channels=3).to(device)

# Define loss function
criterion = DenoiseLoss().to(device)

In [None]:
import torch
import torch.optim as optim
import torch.nn as nn
import torchvision.transforms as transforms
from torch.cuda.amp import autocast, GradScaler

# Initialize UNet Model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = UNet(in_channels=3, out_channels=3).to(device)

optimizer = optim.AdamW(model.parameters(), lr=1e-3)
scaler = GradScaler()  # Enable Mixed Precision Training

# Define Transform with Random Cropping
train_transform = transforms.Compose([
    transforms.RandomCrop(128),
    transforms.RandomHorizontalFlip(p=0.5),  # Flip images horizontally
    transforms.RandomVerticalFlip(p=0.5),  # Flip images vertically (optional)
    transforms.RandomRotation(degrees=10),  # Small rotations to introduce variance# Crop to 128x128 patches
    transforms.ToTensor()
])

# Training Loop
num_epochs = 10
accumulation_steps = 4  # Effective batch size multiplier

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    optimizer.zero_grad()

    for i, (noisy_imgs, clean_imgs) in enumerate(train_loader):
        noisy_imgs, clean_imgs = noisy_imgs.to(device), clean_imgs.to(device)

        with autocast():  # Enable Mixed Precision
            outputs = model(noisy_imgs)
            loss = criterion(outputs, clean_imgs) / accumulation_steps  # Scale loss

        scaler.scale(loss).backward()  # Backpropagate scaled loss

        if (i + 1) % accumulation_steps == 0:  # Update weights every 'accumulation_steps' batches
            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad()

        epoch_loss += loss.item() * accumulation_steps  # Reverse scaling

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss/len(train_loader):.6f}")

In [None]:
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim

from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim

def evaluate_model(model, dataloader, device):
    model.eval()
    total_psnr = 0
    total_ssim = 0
    num_samples = 0

    with torch.no_grad():
        for noisy_imgs, clean_imgs in dataloader:
            noisy_imgs, clean_imgs = noisy_imgs.to(device), clean_imgs.to(device)
            denoised_imgs = model(noisy_imgs)

            for i in range(noisy_imgs.shape[0]):
                clean_np = clean_imgs[i].cpu().numpy().transpose(1, 2, 0)  # (H, W, C)
                denoised_np = denoised_imgs[i].cpu().numpy().transpose(1, 2, 0)

                clean_np = np.clip(clean_np, 0, 1)
                denoised_np = np.clip(denoised_np, 0, 1)

                # Compute PSNR
                img_psnr = psnr(clean_np, denoised_np, data_range=1.0)

                # Compute SSIM with win_size fix
                try:
                    img_ssim = ssim(clean_np, denoised_np, data_range=1.0, win_size=3, channel_axis=-1)
                except ValueError as e:
                    print(f"Skipping SSIM for small image: {clean_np.shape} - {e}")
                    img_ssim = 0  # Assign 0 if SSIM cannot be computed

                total_psnr += img_psnr
                total_ssim += img_ssim
                num_samples += 1

    avg_psnr = total_psnr / num_samples
    avg_ssim = total_ssim / num_samples
    print(f"Validation PSNR: {avg_psnr:.2f} dB, SSIM: {avg_ssim:.4f}")
    return avg_psnr, avg_ssim


In [None]:
class Val(Dataset):
    def __init__(self, clean_dir, noisy_dir, transform=None):
        self.clean_images = sorted(os.listdir(clean_dir))
        self.noisy_images = sorted(os.listdir(noisy_dir))
        self.clean_dir = clean_dir
        self.noisy_dir = noisy_dir
        self.transform = transform

    def __len__(self):
        return len(self.clean_images)

    def __getitem__(self, idx):
        clean_path = os.path.join(self.clean_dir, self.clean_images[idx])
        noisy_path = os.path.join(self.noisy_dir, self.noisy_images[idx])

        # Load images using OpenCV (NumPy arrays)
        clean_img = cv2.imread(clean_path)
        noisy_img = cv2.imread(noisy_path)

        # Convert BGR to RGB
        clean_img = cv2.cvtColor(clean_img, cv2.COLOR_BGR2RGB)
        noisy_img = cv2.cvtColor(noisy_img, cv2.COLOR_BGR2RGB)

        # Convert NumPy array to PIL Image
        clean_img = Image.fromarray(clean_img)
        noisy_img = Image.fromarray(noisy_img)

        # Apply transformations
        if self.transform:
            clean_img = self.transform(clean_img)
            noisy_img = self.transform(noisy_img)

        return noisy_img, clean_img

# Data augmentation & transformations
transform = transforms.Compose([
    transforms.Resize((512, 512)),  # Resize all images to 512x512
    transforms.ToTensor()  # Convert to PyTorch tensor
])

In [None]:
# Define validation dataset paths
val_clean_dir = "/kaggle/input/div2k-high-resolution-images/DIV2K_valid_HR/DIV2K_valid_HR"
val_noisy_dir = "/kaggle/input/noisyd/noisy_data/DIV2K/DIV2K_valid_HR/DIV2K_valid_HR"

# Create validation dataset and dataloader
val_dataset = Val(val_clean_dir, val_noisy_dir, transform=transform)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)

# Evaluate Model
evaluate_model(model, val_loader, device)

In [None]:
torch.save(model, "unet")

In [None]:
import os
import cv2
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader, ConcatDataset
from PIL import Image


class TestDenoiseDataset(Dataset):
    def __init__(self, noisy_dir, transform=None):
        self.noisy_images = sorted(os.listdir(noisy_dir))
        self.noisy_dir = noisy_dir
        self.transform = transform

    def __len__(self):
        return len(self.noisy_images)

    def __getitem__(self, idx):
        noisy_path = os.path.join(self.noisy_dir, self.noisy_images[idx])

        # Load noisy image
        noisy_img = cv2.imread(noisy_path)
        noisy_img = cv2.cvtColor(noisy_img, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB
        noisy_img = Image.fromarray(noisy_img)  # Convert to PIL

        # Apply transformations
        if self.transform:
            noisy_img = self.transform(noisy_img)

        return noisy_img, self.noisy_images[idx]  # Return filename for saving

In [None]:
!gdown --fuzzy --id 1UZA_AEdV5EgqWl9lozYo12YrET-Pno6L

In [None]:
import shutil
import os

zip_file = "/kaggle/working/LSDIR_DIV2K_Test_Sigma50.zip"
extract_folder = "/kaggle/working/LSDIR_DIV2K_Test_Sigma50"

shutil.unpack_archive(zip_file, extract_folder)
os.remove(zip_file)
print("Unzipped and deleted!")

In [None]:
test_noisy_dir = "/kaggle/working/LSDIR_DIV2K_Test_Sigma50"

test_dataset = TestDenoiseDataset(test_noisy_dir, transform=transforms.ToTensor())
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

In [None]:
import torch

# Define Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load Full Model (Not Recommended but Works)
model = torch.load("/kaggle/input/unet-denoise4/pytorch/default/1/unet(4)", map_location=device)

# Set Model to Evaluation Mode
model.eval()

print("Full model loaded successfully!")


In [None]:
import os
import torch
import torchvision.transforms as transforms

output_dir = "denoised_results1"
os.makedirs(output_dir, exist_ok=True)

model.eval()
with torch.no_grad():
    for noisy_imgs, filenames in test_loader:
        noisy_imgs = noisy_imgs.to(device)

        # Denoise
        denoised_imgs = model(noisy_imgs)

        # Normalize outputs to [0, 1]
        denoised_imgs = torch.clamp(denoised_imgs, 0, 1)  # Ensure values are in [0,1]

        for i in range(denoised_imgs.shape[0]):  
            img = denoised_imgs[i].cpu().float()  # Ensure float32
            
            # Normalize properly
            min_val, max_val = img.min(), img.max()
            if max_val > min_val:  # Avoid division by zero
                img = (img - min_val) / (max_val - min_val)

            # Convert to RGB & Save
            denoised_pil = transforms.ToPILImage()(img).convert("RGB")
            denoised_pil.save(f"{output_dir}/{filenames[i]}", quality=95)

print(f"Denoised images saved in {output_dir}/")


In [None]:
import shutil

shutil.make_archive("denoised", 'zip', "/kaggle/working/denoised_results1")  # Zips the entire folder

In [None]:
import os
import torch
import matplotlib.pyplot as plt
import numpy as np
import torchvision.transforms as transforms
from ipywidgets import interact, IntSlider
from PIL import Image

# Set directories
noisy_dir = "/kaggle/working/LSDIR_DIV2K_Test_Sigma50"  # Folder with noisy test images
denoised_dir = "/kaggle/working/denoised_results1"  # Folder with saved denoised images

# Load image filenames
noisy_images = sorted(os.listdir(noisy_dir))
denoised_images = sorted(os.listdir(denoised_dir))

# Ensure matching images
assert len(noisy_images) == len(denoised_images), "Mismatch in number of images!"

# Load and preprocess images
def load_image(image_path):
    image = Image.open(image_path).convert("RGB")
    return np.array(image)

# Interactive visualization
def show_images(index):
    noisy_img = load_image(os.path.join(noisy_dir, noisy_images[index]))
    denoised_img = load_image(os.path.join(denoised_dir, denoised_images[index]))

    fig, axes = plt.subplots(1, 2, figsize=(10, 5))
    axes[0].imshow(noisy_img)
    axes[0].set_title("Noisy Image")
    axes[0].axis("off")

    axes[1].imshow(denoised_img)
    axes[1].set_title("Denoised Image")
    axes[1].axis("off")

    plt.show()

# Create interactive slider
interact(show_images, index=IntSlider(0, 0, len(noisy_images) - 1, 1));