In [1]:
"""
Blind Denoising Pipeline — No clean ground truth required
Kaggle P100 compatible

Directory structure:
  test_degradato/
    class_name/
      1.jpg … 5.jpg

Output mirrors the same structure:
  test_denoised/
    class_name/
      1.jpg … 5.jpg
"""

import os
import cv2
import torch
import numpy as np
import shutil
from pathlib import Path
from tqdm import tqdm

# ── Paths ─────────────────────────────────────────────────────────────────────
test_deg_dir   = Path("/kaggle/input/datasets/andreaspagnolo/visual-exam-dataset/visual_dataset/test_degradato")
test_clean_dir = Path("/kaggle/input/datasets/andreaspagnolo/visual-exam-dataset/visual_dataset/test")
output_dir     = Path("/kaggle/working/test_denoised")

output_dir.mkdir(parents=True, exist_ok=True)

# ── Config ────────────────────────────────────────────────────────────────────
DEVICE   = "cuda" if torch.cuda.is_available() else "cpu"
STRATEGY = "deep"   # "classical" | "deep"
IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff"}

print(f"Device  : {DEVICE}")
print(f"Strategy: {STRATEGY}")


# ═════════════════════════════════════════════════════════════════════════════
# 1.  CLASSICAL — Non-Local Means
# ═════════════════════════════════════════════════════════════════════════════
def nlm_denoise(img_bgr: np.ndarray, h=10, hColor=10, tWS=7, sWS=21) -> np.ndarray:
    return cv2.fastNlMeansDenoisingColored(img_bgr, None, h, hColor, tWS, sWS)


# ═════════════════════════════════════════════════════════════════════════════
# 2.  DEEP — Blind DnCNN
# ═════════════════════════════════════════════════════════════════════════════
class DnCNN(torch.nn.Module):
    """
    DnCNN-B colour blind — matches KAIR checkpoint layout:
      model.0 … model.38  (Conv2d with bias, ReLU, NO BatchNorm)
    20 conv layers total.
    """
    def __init__(self, channels=3, num_layers=20, features=64):
        super().__init__()
        layers = [
            torch.nn.Conv2d(channels, features, 3, padding=1, bias=True),
            torch.nn.ReLU(inplace=True),
        ]
        for _ in range(num_layers - 2):
            layers += [
                torch.nn.Conv2d(features, features, 3, padding=1, bias=True),
                torch.nn.ReLU(inplace=True),
            ]
        layers.append(torch.nn.Conv2d(features, channels, 3, padding=1, bias=True))
        self.model = torch.nn.Sequential(*layers)

    def forward(self, x):
        return x - self.model(x)


def load_dncnn(weights_url: str, device: str):
    import urllib.request

    model = DnCNN(channels=3).to(device)
    model.eval()

    weights_path = "/kaggle/working/dncnn_color_blind.pth"
    if not os.path.exists(weights_path):
        print("Downloading DnCNN-B weights…")
        try:
            urllib.request.urlretrieve(weights_url, weights_path)
            print("  ✓ Downloaded.")
        except Exception as e:
            print(f"  ✗ Download failed ({e}). Falling back to NLMeans.")
            return None

    state = torch.load(weights_path, map_location=device)
    model.load_state_dict(state, strict=True)
    print("  ✓ Weights loaded successfully.")
    return model


def dncnn_denoise(img_bgr: np.ndarray, model: DnCNN, device: str) -> np.ndarray:
    img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
    t = torch.from_numpy(img_rgb.transpose(2, 0, 1)).unsqueeze(0).to(device)
    with torch.no_grad():
        out = model(t).clamp(0, 1)
    out_np = out.squeeze(0).cpu().numpy().transpose(1, 2, 0)
    return cv2.cvtColor((out_np * 255).astype(np.uint8), cv2.COLOR_RGB2BGR)


# ═════════════════════════════════════════════════════════════════════════════
# 3.  LOAD MODEL
# ═════════════════════════════════════════════════════════════════════════════
DNCNN_WEIGHTS_URL = "https://github.com/cszn/KAIR/releases/download/v1.0/dncnn_color_blind.pth"

model = None
if STRATEGY == "deep":
    model = load_dncnn(DNCNN_WEIGHTS_URL, DEVICE)
    if model is None:
        print("Falling back to classical NLMeans.")
        STRATEGY = "classical"


# ═════════════════════════════════════════════════════════════════════════════
# 4.  MAIN LOOP  (recursive — handles class_name/image.jpg structure)
# ═════════════════════════════════════════════════════════════════════════════
img_paths = sorted(
    p for p in test_deg_dir.rglob("*")
    if p.is_file() and p.suffix.lower() in IMG_EXTS
)

print(f"\nFound {len(img_paths)} images across {len(list(test_deg_dir.iterdir()))} classes")
print(f"Saving to: {output_dir}\n")

for img_path in tqdm(img_paths, desc="Denoising"):
    img = cv2.imread(str(img_path))
    if img is None:
        print(f"  [WARN] Cannot read {img_path}, skipping.")
        continue

    # Mirror the subfolder structure in output
    rel_path = img_path.relative_to(test_deg_dir)   # e.g. "air hockey/1.jpg"
    out_path = output_dir / rel_path                  # e.g. test_denoised/air hockey/1.jpg
    out_path.parent.mkdir(parents=True, exist_ok=True)

    denoised = dncnn_denoise(img, model, DEVICE) if STRATEGY == "deep" else nlm_denoise(img)
    cv2.imwrite(str(out_path), denoised)

total_saved = len(list(output_dir.rglob("*.jpg")))
print(f"\n✓ Done. {total_saved} images saved to {output_dir}")


# ═════════════════════════════════════════════════════════════════════════════
# 5.  ZIP & DOWNLOAD
# ═════════════════════════════════════════════════════════════════════════════
zip_path = "/kaggle/working/test_denoised"
shutil.make_archive(zip_path, "zip", "/kaggle/working", "test_denoised")
print(f"✓ Archive ready: {zip_path}.zip")


# ═════════════════════════════════════════════════════════════════════════════
# 6.  OPTIONAL — Evaluate vs clean set (PSNR / SSIM)
# ═════════════════════════════════════════════════════════════════════════════
def evaluate(denoised_dir: Path, clean_dir: Path):
    from skimage.metrics import peak_signal_noise_ratio as psnr
    from skimage.metrics import structural_similarity  as ssim

    scores = {"psnr": [], "ssim": []}

    for d_path in sorted(denoised_dir.rglob("*.jpg")):
        # Match by relative path: test_denoised/air hockey/1.jpg → test/air hockey/1.jpg
        rel    = d_path.relative_to(denoised_dir)
        c_path = clean_dir / rel
        if not c_path.exists():
            continue

        d = cv2.imread(str(d_path))
        c = cv2.imread(str(c_path))
        if d is None or c is None:
            continue

        d_rgb = cv2.cvtColor(d, cv2.COLOR_BGR2RGB)
        c_rgb = cv2.cvtColor(c, cv2.COLOR_BGR2RGB)
        scores["psnr"].append(psnr(c_rgb, d_rgb, data_range=255))
        scores["ssim"].append(ssim(c_rgb, d_rgb, channel_axis=2, data_range=255))

    if scores["psnr"]:
        print(f"PSNR (mean): {np.mean(scores['psnr']):.2f} dB  |  "
              f"SSIM (mean): {np.mean(scores['ssim']):.4f}  |  "
              f"N={len(scores['psnr'])}")
    else:
        print("No matching pairs found for evaluation.")

# Uncomment to evaluate:
evaluate(output_dir, test_clean_dir)

Device  : cuda
Strategy: deep
Downloading DnCNN-B weights…
  ✓ Downloaded.
  ✓ Weights loaded successfully.

Found 500 images across 100 classes
Saving to: /kaggle/working/test_denoised



Denoising: 100%|██████████| 500/500 [00:09<00:00, 54.30it/s]



✓ Done. 500 images saved to /kaggle/working/test_denoised
✓ Archive ready: /kaggle/working/test_denoised.zip
