In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install pytorch-msssim

Collecting pytorch-msssim
  Downloading pytorch_msssim-1.0.0-py3-none-any.whl.metadata (8.0 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch->pytorch-msssim)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch->pytorch-msssim)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch->pytorch-msssim)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch->pytorch-msssim)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch->pytorch-msssim)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch->pytorch-mss

In [None]:
# Deep Learning Denoising Pipeline Canvas (ULiteDenoiseDeblur)
"""
This canvas contains the classic deep learning denoising workflow, now using the
ULiteDenoiseDeblur U-Net–like architecture with dilated residual ConvBlocks.
1. Imports and configuration
2. ULiteDenoiseDeblur model definition
3. Transforms and PairedZipDataset loader
4. Training loop with MSE & SSIM, early stopping, checkpoints
5. DataLoaders & training invocation
6. Sync checkpoints to Drive
7. Test evaluation, histograms, and sample visualizations
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from torch.utils.data import DataLoader
import zipfile, io
from pathlib import Path
from PIL import Image
import math
from pytorch_msssim import ssim
from tqdm import tqdm
import os
import matplotlib.pyplot as plt
import random


In [None]:
# ---------------------------------------------
# 1. ULiteDenoiseDeblur Definition
# ---------------------------------------------
class ConvBlock(nn.Module):
    def __init__(self, in_ch, out_ch, dilation=1):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, 3, padding=dilation, dilation=dilation),
            nn.BatchNorm2d(out_ch), nn.ReLU(inplace=True),
            nn.Conv2d(out_ch, out_ch, 3, padding=dilation, dilation=dilation),
            nn.BatchNorm2d(out_ch), nn.ReLU(inplace=True)
        )
        self.res_conv = nn.Conv2d(in_ch, out_ch, 1)
    def forward(self, x):
        return self.conv(x) + self.res_conv(x)

class Down(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.pool = nn.MaxPool2d(2)
        self.block = ConvBlock(in_ch, out_ch)
    def forward(self, x):
        return self.block(self.pool(x))

class Up(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.up = nn.ConvTranspose2d(in_ch, out_ch, 2, stride=2)
        self.block = ConvBlock(in_ch, out_ch)
    def forward(self, x1, x2):
        x1 = self.up(x1)
        diffY = x2.size(2) - x1.size(2)
        diffX = x2.size(3) - x1.size(3)
        x1 = F.pad(x1, [diffX//2, diffX-diffX//2, diffY//2, diffY-diffY//2])
        return self.block(torch.cat([x2, x1], dim=1))

class ULiteDenoiseDeblur(nn.Module):
    def __init__(self, in_ch=1, out_ch=1, base_ch=32):
        super().__init__()
        self.inc   = ConvBlock(in_ch, base_ch)
        self.down1 = Down(base_ch, base_ch*2)
        self.down2 = Down(base_ch*2, base_ch*4)
        self.up1   = Up(base_ch*4, base_ch*2)
        self.up2   = Up(base_ch*2, base_ch)
        self.outc  = nn.Conv2d(base_ch, out_ch, 1)
    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x  = self.up1(x3, x2)
        x  = self.up2(x, x1)
        return self.outc(x)



In [None]:
# ---------------------------------------------
# 2. Transforms & Loader
# ---------------------------------------------
mean, std = [0.1781], [0.1976]
target_size = (256,256)
train_tf = transforms.Compose([
    transforms.Resize(target_size),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])
val_tf = transforms.Compose([
    transforms.Resize(target_size),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

class PairedZipDataset(torch.utils.data.Dataset):
    def __init__(self, clean_zip, noisy_zip, transform=None):
        self.clean_zip, self.noisy_zip = clean_zip, noisy_zip
        with zipfile.ZipFile(clean_zip, 'r') as zf:
            self.files = [Path(f).name for f in zf.namelist()
                          if f.startswith('clean/') and not f.endswith('/')]
        self.transform = transform or (lambda x: x)
        self.zc = None; self.zn = None
    def __len__(self): return len(self.files)
    def __getitem__(self, idx):
        if self.zc is None: self.zc = zipfile.ZipFile(self.clean_zip)
        if self.zn is None: self.zn = zipfile.ZipFile(self.noisy_zip)
        fn = self.files[idx]
        clean = Image.open(io.BytesIO(self.zc.read(f'clean/{fn}'))).convert('L')
        noisy = Image.open(io.BytesIO(self.zn.read(f'noisy_15_awgn/{fn}'))).convert('L')
        return self.transform(noisy), self.transform(clean)




In [None]:
# ---------------------------------------------
# 3. Training Loop
# ---------------------------------------------
def train_model(model, train_loader, val_loader,
                epochs=50, lr=1e-3, patience=5,
                checkpoint_dir='checkpoints_ulite'):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    os.makedirs(checkpoint_dir, exist_ok=True)
    best, waits = float('inf'), 0
    for e in range(1, epochs+1):
        print(f'Epoch {e}/{epochs}')
        model.train(); t_loss=t_ssim=0
        for X, Y in tqdm(train_loader, leave=False):
            X, Y = X.to(device), Y.to(device)
            P = model(X)
            loss = F.mse_loss(P, Y)
            optimizer.zero_grad(); loss.backward(); optimizer.step()
            t_loss += loss.item()*X.size(0)
            t_ssim += ssim(P, Y, data_range=1.0, size_average=True).item()*X.size(0)
        t_loss /= len(train_loader.dataset); t_ssim /= len(train_loader.dataset)
        model.eval(); v_loss=v_ssim=v_psnr=0
        with torch.no_grad():
            for X, Y in tqdm(val_loader, leave=False):
                X, Y = X.to(device), Y.to(device)
                P = model(X)
                v_loss += F.mse_loss(P, Y).item()*X.size(0)
                v_ssim += ssim(P, Y, data_range=1.0, size_average=True).item()*X.size(0)
                m = ((P - Y)**2).mean().item()
                v_psnr += 10*math.log10(1.0/m)*X.size(0)
        v_loss /= len(val_loader.dataset); v_ssim /= len(val_loader.dataset); v_psnr /= len(val_loader.dataset)
        print(f'Train MSE={t_loss:.4f}, SSIM={t_ssim:.4f} | Val MSE={v_loss:.4f}, SSIM={v_ssim:.4f}, PSNR={v_psnr:.2f}dB')
        if v_loss < best:
            best, waits = v_loss, 0
            ckpt = os.path.join(checkpoint_dir, f'best_e{e}.pth')
            torch.save(model.state_dict(), ckpt)
            print(' Saved', ckpt)
        else:
            waits += 1
            print(f'No imp {waits}/{patience}')
            if waits >= patience:
                print('Early stopping')
                break


In [None]:
# ---------------------------------------------
# 4. DataLoaders & Training
# ---------------------------------------------
batch_size = 16
train_ds = PairedZipDataset('/content/drive/MyDrive/IA2/dataset/train_clean.zip','/content/drive/MyDrive/IA2/dataset/train_noisy_15_awgn.zip', train_tf)
val_ds   = PairedZipDataset('/content/drive/MyDrive/IA2/dataset/val_clean.zip','/content/drive/MyDrive/IA2/dataset/val_noisy_15_awgn.zip',     val_tf)
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=0)
val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, num_workers=0)
model = ULiteDenoiseDeblur(in_ch=1, out_ch=1, base_ch=32)
train_model(model, train_loader, val_loader)


Epoch 1/50




Train MSE=0.0815, SSIM=0.4462 | Val MSE=0.0628, SSIM=0.5294, PSNR=12.24dB
 Saved checkpoints_ulite/best_e1.pth
Epoch 2/50




Train MSE=0.0568, SSIM=0.5492 | Val MSE=0.0582, SSIM=0.5802, PSNR=12.45dB
 Saved checkpoints_ulite/best_e2.pth
Epoch 3/50




Train MSE=0.0521, SSIM=0.5863 | Val MSE=0.0533, SSIM=0.6086, PSNR=12.84dB
 Saved checkpoints_ulite/best_e3.pth
Epoch 4/50




Train MSE=0.0494, SSIM=0.6031 | Val MSE=0.0472, SSIM=0.6220, PSNR=13.46dB
 Saved checkpoints_ulite/best_e4.pth
Epoch 5/50




Train MSE=0.0475, SSIM=0.6163 | Val MSE=0.0474, SSIM=0.6243, PSNR=13.44dB
No imp 1/5
Epoch 6/50




Train MSE=0.0468, SSIM=0.6213 | Val MSE=0.0469, SSIM=0.6305, PSNR=13.45dB
 Saved checkpoints_ulite/best_e6.pth
Epoch 7/50




Train MSE=0.0462, SSIM=0.6235 | Val MSE=0.0506, SSIM=0.6265, PSNR=13.23dB
No imp 1/5
Epoch 8/50




Train MSE=0.0450, SSIM=0.6299 | Val MSE=0.0451, SSIM=0.6316, PSNR=13.62dB
 Saved checkpoints_ulite/best_e8.pth
Epoch 9/50




Train MSE=0.0445, SSIM=0.6330 | Val MSE=0.0450, SSIM=0.6358, PSNR=13.60dB
 Saved checkpoints_ulite/best_e9.pth
Epoch 10/50




Train MSE=0.0437, SSIM=0.6355 | Val MSE=0.0433, SSIM=0.6402, PSNR=13.81dB
 Saved checkpoints_ulite/best_e10.pth
Epoch 11/50




Train MSE=0.0436, SSIM=0.6362 | Val MSE=0.0439, SSIM=0.6373, PSNR=13.75dB
No imp 1/5
Epoch 12/50




Train MSE=0.0433, SSIM=0.6378 | Val MSE=0.0443, SSIM=0.6449, PSNR=13.69dB
No imp 2/5
Epoch 13/50




Train MSE=0.0428, SSIM=0.6402 | Val MSE=0.0470, SSIM=0.6339, PSNR=13.38dB
No imp 3/5
Epoch 14/50




Train MSE=0.0426, SSIM=0.6398 | Val MSE=0.0433, SSIM=0.6439, PSNR=13.82dB
 Saved checkpoints_ulite/best_e14.pth
Epoch 15/50




Train MSE=0.0423, SSIM=0.6421 | Val MSE=0.0420, SSIM=0.6469, PSNR=13.95dB
 Saved checkpoints_ulite/best_e15.pth
Epoch 16/50




Train MSE=0.0420, SSIM=0.6433 | Val MSE=0.0425, SSIM=0.6472, PSNR=13.86dB
No imp 1/5
Epoch 17/50




Train MSE=0.0417, SSIM=0.6444 | Val MSE=0.0418, SSIM=0.6510, PSNR=13.99dB
 Saved checkpoints_ulite/best_e17.pth
Epoch 18/50




Train MSE=0.0416, SSIM=0.6438 | Val MSE=0.0425, SSIM=0.6439, PSNR=13.88dB
No imp 1/5
Epoch 19/50




Train MSE=0.0413, SSIM=0.6452 | Val MSE=0.0430, SSIM=0.6477, PSNR=13.87dB
No imp 2/5
Epoch 20/50




Train MSE=0.0412, SSIM=0.6456 | Val MSE=0.0420, SSIM=0.6503, PSNR=13.98dB
No imp 3/5
Epoch 21/50




Train MSE=0.0412, SSIM=0.6451 | Val MSE=0.0424, SSIM=0.6471, PSNR=13.92dB
No imp 4/5
Epoch 22/50


                                                 

Train MSE=0.0409, SSIM=0.6449 | Val MSE=0.0420, SSIM=0.6480, PSNR=13.95dB
No imp 5/5
Early stopping




In [None]:
# ---------------------------------------------
# 5. Sync Checkpoints to Drive
# ---------------------------------------------
import shutil
shutil.copytree('checkpoints_ulite', '/content/drive/MyDrive/IA2/ulite_checkpoints_15_awgn', dirs_exist_ok=True)
print('Checkpoints synced')


Checkpoints synced


In [None]:
# ---------------------------------------------
# 6. Test Evaluation & Visualization
#---------------------------------------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.eval(); model.to(device)
test_ds = PairedZipDataset('/content/drive/MyDrive/IA2/dataset/test_clean.zip','/content/drive/MyDrive/IA2/dataset/test_noisy_15_awgn.zip', val_tf)
test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False, num_workers=0)

m_list, s_list, p_list = [], [], []
with torch.no_grad():
    for n, c in tqdm(test_loader, desc='Testing'):
        n, c = n.to(device), c.to(device)
        d = model(n)
        pd, cd = d * std[0] + mean[0], c * std[0] + mean[0]
        for i in range(pd.size(0)):
            mse_i = ((pd[i] - cd[i])**2).mean().item(); m_list.append(mse_i)
            s_list.append(ssim(pd[i].unsqueeze(0), cd[i].unsqueeze(0), data_range=1.0, size_average=True).item())
            p_list.append(10 * math.log10(1.0 / mse_i))
# Compute and print average metrics
avg_mse = sum(m_list) / len(m_list)
avg_ssim = sum(s_list) / len(s_list)
avg_psnr = sum(p_list) / len(p_list)
print(f"Test MSE: {avg_mse:.4f}")
print(f"Test SSIM: {avg_ssim:.4f}")
print(f"Test PSNR: {avg_psnr:.2f} dB")
# Histograms
plt.figure(); plt.hist(m_list, bins=50); plt.title('MSE'); plt.show()
plt.figure(); plt.hist(s_list, bins=50); plt.title('SSIM'); plt.show()
plt.figure(); plt.hist(p_list, bins=50); plt.title('PSNR'); plt.show()
# Sample Visualizations
indices = random.sample(range(len(test_ds)), 10)
fig, ax = plt.subplots(10, 3, figsize=(9, 30))
for i, idx in enumerate(indices):
    n, c = test_ds[idx]
    d = model(n.unsqueeze(0).to(device)).squeeze(0).cpu()
    def den(t): return (t * std[0] + mean[0]).detach().numpy().squeeze()
    ax[i,0].imshow(den(n),   cmap='gray'); ax[i,0].axis('off'); ax[i,0].set_title('Noisy')
    ax[i,1].imshow(den(d),   cmap='gray'); ax[i,1].axis('off'); ax[i,1].set_title('Denoised')
    ax[i,2].imshow(den(c),   cmap='gray'); ax[i,2].axis('off'); ax[i,2].set_title('Clean')
plt.tight_layout(); plt.show()

Output hidden; open in https://colab.research.google.com to view.