In [None]:
import os
import random
import time
import math
import base64
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from IPython.display import clear_output
from tqdm import tqdm
from io import BytesIO
import pandas as pd
from skimage.metrics import peak_signal_noise_ratio as compare_psnr
from skimage.metrics import structural_similarity as compare_ssim

# 设置随机种子以确保可重复性
torch.manual_seed(0)
np.random.seed(0)
random.seed(0)

# 设置计算设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

## 数据集加载

我们定义了一个自定义数据集类 `NoisyMNISTDatasetBinary`，用于加载二进制数据（`.npz` 文件）。

- 对于训练集，返回噪声图像和原始图像对。
- 对于测试集，仅返回噪声图像。

数据会被归一化到 [0, 1] 范围并转换为 PyTorch 张量。我们还将训练集划分为训练子集和验证子集（80% 训练，20% 验证），以便在训练过程中评估模型性能。

In [None]:
class NoisyMNISTDatasetBinary(Dataset):
    def __init__(self, root_dir, train):
        if train:
            npz_file = os.path.join(root_dir, "train.npz")
        else:
            npz_file = os.path.join(root_dir, "test.npz")
        data = np.load(npz_file)
        self.noise = data['noise']      # shape: (N, H, W)
        self.train = train
        if self.train:
            self.origin = data['origin']
    
    def __len__(self):
        return self.noise.shape[0]

    def __getitem__(self, idx):
        noise = torch.from_numpy(self.noise[idx]).float().unsqueeze(0) / 255.0
        if not self.train:
            return noise
        origin = torch.from_numpy(self.origin[idx]).float().unsqueeze(0) / 255.0
        return noise, origin

# 创建训练和测试数据集
train_dataset = NoisyMNISTDatasetBinary(".", True)
test_dataset = NoisyMNISTDatasetBinary(".", False)

# 划分训练集和验证集
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_subset, val_subset = random_split(train_dataset, [train_size, val_size])

# 设置批量大小
batch_size = 128

# 创建数据加载器
train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

batch_size

## 数据示例

以下代码展示训练集中的前 8 个样本，比较原始图像和噪声图像。

In [None]:
# 可视化训练数据
dataiter = iter(train_loader)
noisy, original = next(dataiter)

num_examples = min(8, len(noisy))
plt.figure(figsize=(16, 4))

# 第一行：原始图像
for i in range(num_examples):
    plt.subplot(2, num_examples, i + 1)
    plt.imshow(original[i][0], cmap='gray')
    plt.axis('off')
    if i == 0:
        plt.ylabel('Original', fontsize=14, rotation=90, labelpad=20)

# 第二行：噪声图像
for i in range(num_examples):
    plt.subplot(2, num_examples, num_examples + i + 1)
    plt.imshow(noisy[i][0], cmap='gray')
    plt.axis('off')
    if i == 0:
        plt.ylabel('Noisy', fontsize=14, rotation=90, labelpad=20)

plt.suptitle(f"Training Examples: Original vs Noisy (First {num_examples} Samples)", fontsize=16)
plt.tight_layout()
plt.show()

## 模型定义

我们使用扩散模型（DDPM）来实现图像去噪。模型包括以下部分：

- `SinusoidalPosEmb`：为时间步长生成正弦位置嵌入，用于条件化模型。
- `DenoiseUNet`：一个基于 U-Net 的去噪模型，接受噪声图像和时间步长作为输入，预测噪声分量。

In [None]:
class SinusoidalPosEmb(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim

    def forward(self, t):
        half = self.dim // 2
        emb = math.log(10000) / (half - 1)
        emb = torch.exp(torch.arange(half, device=t.device) * -emb)
        emb = t.float().unsqueeze(1) * emb.unsqueeze(0)
        emb = torch.cat((emb.sin(), emb.cos()), dim=1)
        return emb

class DenoiseUNet(nn.Module):
    def __init__(self, time_dim=32):
        super().__init__()
        self.time_mlp = nn.Sequential(
            SinusoidalPosEmb(time_dim),
            nn.Linear(time_dim, time_dim * 4),
            nn.GELU(),
            nn.Linear(time_dim * 4, 64)
        )
        self.conv1 = nn.Conv2d(1, 64, 3, padding=1)
        self.conv2 = nn.Conv2d(64, 64, 3, padding=1)
        self.conv3 = nn.Conv2d(64, 64, 3, padding=1)
        self.conv_out = nn.Conv2d(64, 1, 1)

    def forward(self, x, t):
        t_emb = self.time_mlp(t.to(x.device)).unsqueeze(-1).unsqueeze(-1)
        h = F.relu(self.conv1(x))
        h = F.relu(self.conv2(h) + t_emb)
        h = F.relu(self.conv3(h))
        return self.conv_out(h)

## 评估指标

我们定义了两个函数来计算批量图像的 PSNR 和 SSIM 指标，用于评估模型在验证集上的去噪性能。这些函数使用 `skimage.metrics` 库，与 `team3-template-dncnn(1).ipynb` 中的方法一致。

In [None]:
def batch_psnr(preds, targets):
    psnr = []
    for p, t in zip(preds, targets):
        p_img = p.cpu().numpy().squeeze()
        t_img = t.cpu().numpy().squeeze()
        psnr.append(compare_psnr(t_img, p_img, data_range=1.0))
    return np.mean(psnr)

def batch_ssim(preds, targets):
    ssim = []
    for p, t in zip(preds, targets):
        p_img = p.cpu().numpy().squeeze()
        t_img = t.cpu().numpy().squeeze()
        ssim.append(compare_ssim(t_img, p_img, data_range=1.0))
    return np.mean(ssim)

## 模型训练

我们使用扩散模型的训练流程，通过逐步添加噪声并训练模型预测噪声分量。训练参数包括：

- 扩散步数 `T = 50`
- 噪声调度参数 `betas` 从 1e-4 到 0.02
- 使用 Adam 优化器，学习率为 1e-3
- 训练 5 个 epoch

在每个 epoch 结束后，我们在验证集上评估模型，计算平均 PSNR 和 SSIM 指标，以监控去噪性能。

In [None]:
# 初始化模型和优化器
model = DenoiseUNet().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# 设置扩散参数
T = 50
betas = torch.linspace(1e-4, 0.02, T).to(device)
alphas = 1 - betas
alpha_bar = torch.cumprod(alphas, dim=0)
sqrt_alpha_bar = torch.sqrt(alpha_bar)
sqrt_one_minus_alpha_bar = torch.sqrt(1 - alpha_bar)

# 验证函数
def validate(model, val_loader, alpha_bar, t_step=20):
    model.eval()
    psnr_list = []
    ssim_list = []
    with torch.no_grad():
        for _, clean in tqdm(val_loader, desc="Validation"):
            clean = clean.to(device)
            B = clean.size(0)
            t = torch.full((B,), t_step, device=device, dtype=torch.long)
            noise = torch.randn_like(clean)
            sqrt_ab = torch.sqrt(alpha_bar[t]).view(-1, 1, 1, 1)
            sqrt_1_ab = torch.sqrt(1 - alpha_bar[t]).view(-1, 1, 1, 1)
            x_noisy = sqrt_ab * clean + sqrt_1_ab * noise
            pred_noise = model(x_noisy, t)
            x_denoised = (x_noisy - sqrt_1_ab * pred_noise) / sqrt_ab
            x_denoised = x_denoised.clamp(0, 1)
            psnr = batch_psnr(x_denoised, clean)
            ssim = batch_ssim(x_denoised, clean)
            psnr_list.append(psnr)
            ssim_list.append(ssim)
    model.train()
    return np.mean(psnr_list), np.mean(ssim_list)

# 训练循环
model.train()
num_epochs = 5
train_losses = []
val_psnrs = []
val_ssims = []

for epoch in range(num_epochs):
    total_loss = 0.0
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}")
    for noisy, clean in progress_bar:
        clean = clean.to(device)
        B = clean.size(0)
        t = torch.randint(0, T, (B,), device=device)
        noise = torch.randn_like(clean)
        sqrt_ab = sqrt_alpha_bar[t].view(-1, 1, 1, 1)
        sqrt_1_ab = sqrt_one_minus_alpha_bar[t].view(-1, 1, 1, 1)
        x_t = sqrt_ab * clean + sqrt_1_ab * noise
        pred = model(x_t, t)
        loss = F.mse_loss(pred, noise)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        progress_bar.set_postfix(loss=total_loss / (progress_bar.n + 1))
    avg_loss = total_loss / len(train_loader)
    train_losses.append(avg_loss)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss={avg_loss:.4f}")

    # 验证
    val_psnr, val_ssim = validate(model, val_loader, alpha_bar)
    val_psnrs.append(val_psnr)
    val_ssims.append(val_ssim)
    print(f"Validation PSNR: {val_psnr:.2f}, SSIM: {val_ssim:.4f}")

# 保存模型
torch.save(model.state_dict(), "ddpm_denoise_model.pth")

## 推理与提交

我们使用训练好的模型对测试集进行去噪，并将结果保存为提交所需的 CSV 文件。去噪过程在固定时间步长 `t=20` 进行，假设测试集的噪声图像对应于此时间步长的噪声水平。输出图像被转换为 base64 编码的 PNG 格式。

In [None]:
# 将张量转换为 base64 编码的 PNG
def tensor_to_base64(tensor):
    tensor = tensor.squeeze().cpu().numpy()
    tensor = (tensor * 255).astype(np.uint8)
    img = Image.fromarray(tensor, mode='L')
    buffer = BytesIO()
    img.save(buffer, format='PNG')
    return base64.b64encode(buffer.getvalue()).decode('utf-8')

# 去噪函数
def denoise(model, data, alpha_bar, t_step=20):
    model.eval()
    t = torch.full((data.size(0),), t_step, device=device, dtype=torch.long)
    with torch.no_grad():
        pred_noise = model(data, t)
    sqrt_ab = torch.sqrt(alpha_bar[t]).view(-1, 1, 1, 1)
    sqrt_1_ab = torch.sqrt(1 - alpha_bar[t]).view(-1, 1, 1, 1)
    x_denoised = (data - sqrt_1_ab * pred_noise) / sqrt_ab
    x_denoised = x_denoised.clamp(0, 1)
    return x_denoised

# 推理并生成提交文件
model.eval()
submission = []
example_noisy = []
example_output = []

for i, noisy in enumerate(tqdm(test_loader, desc="Inference")):
    noisy = noisy.to(device)
    denoised = denoise(model, noisy, alpha_bar)
    for j in range(denoised.size(0)):
        img_base64 = tensor_to_base64(denoised[j])
        submission.append({'id': i * batch_size + j, 'image': img_base64})
    if i == 0:  # 保存第一批次用于可视化
        example_noisy.append(noisy[:8])
        example_output.append(denoised[:8])

# 保存提交文件
submission_df = pd.DataFrame(submission)
submission_df.to_csv('submission.csv', index=False)
print("Submission file 'submission.csv' has been generated.")

## 去噪结果可视化

以下代码展示测试集中的前 8 个样本，比较噪声输入和去噪输出。

In [None]:
# 可视化去噪结果
num_examples = min(8, len(example_noisy[0]))
plt.figure(figsize=(16, 6))

# 第一行：噪声图像
for i in range(num_examples):
    plt.subplot(2, num_examples, i + 1)
    plt.imshow(example_noisy[0][i][0].cpu(), cmap='gray')
    plt.axis('off')
    if i == 0:
        plt.ylabel('Noisy Input', fontsize=14, rotation=90, labelpad=20)

# 第二行：去噪结果
for i in range(num_examples):
    plt.subplot(2, num_examples, num_examples + i + 1)
    plt.imshow(example_output[0][i][0].cpu(), cmap='gray')
    plt.axis('off')
    if i == 0:
        plt.ylabel('Denoised Output', fontsize=14, rotation=90, labelpad=20)

plt.suptitle(f"Denoising Results Comparison (First {num_examples} Test Examples)", fontsize=16)
plt.tight_layout()
plt.show()

## 结果分析

我们通过在验证集上计算 PSNR 和 SSIM 指标来评估模型的去噪性能。PSNR 衡量像素级别的重建质量，值越高表示去噪效果越好；SSIM 衡量图像的结构相似性，值越接近 1 表示去噪图像与原始图像越相似。可以进一步分析不同时间步长 `t` 的去噪效果，或调整模型架构以提高性能。