In [None]:
"""
依赖自检
"""

import sys
import importlib.util

print(f"{'='*10} 执行依赖自检 {'='*10}")

required_packages = [
    ("torch", "PyTorch" ),
    ("torchvision", "TorchVision"),
    ("diffusers", "Diffusers"),
    ("matplotlib", "Matplotlib")
]

missing_packages = []

for package_name, display_name in required_packages:
    if importlib.util.find_spec(package_name) is None:
        missing_packages.append(package_name)
    else:
        module = __import__(package_name)
        version = getattr(module, '__version__', '未知版本')
        print(f"{display_name}: {version}")

if missing_packages:
    print(f"\n【ERROR】缺少以下依赖包: {', '.join(missing_packages)}")
    sys.exit(1)
else:
    try:
        import torch
        import torchvision
        from torch import nn
        from torch.nn import functional as f
        from torch.utils.data import DataLoader
        from diffusers import DDPMScheduler, UNet2DModel
        from matplotlib import pyplot as plt
    except Exception as e:
        print(f"【ERROR】导入发生未知错误")
        sys.exit(1)

print(f"{'='*10} 依赖自检完成 {'='*10}")

In [None]:
"""
Device 自检
Apple Silicon：优先 mps，否则 cpu
NVIDIA：优先 cuda，否则 cpu
"""

print(f"{'='*10} 执行硬件自检 {'='*10}")

try:
    if torch.backends.mps.is_available():
        device_name = "mps"
        if torch.backends.mps.is_built():
            print("【INFO】Use Apple Silicon (MPS)")
    elif torch.cuda.is_available():
        device_name = "cuda"
        print("【INFO】Use NVIDIA")
    else:
        device_name = "cpu"
        print("【INFO】Use CPU")

    device = torch.device(device_name)
    x = torch.ones(1).to(device)  # 在CPU中创建张量，并将其移动至device，赋值给x
    print(f"{device} 可以使用")

except Exception as e:
    print(f"【ERROR】{e} 设备无法使用")
    device = torch.device("cpu")

print(f"{'='*10} 硬件自检完成 {'='*10}")

In [None]:
"""
数据集测试
"""

# 数据集
dataset = torchvision.datasets.MNIST(
                                root="../data/datasets",
                                train=True,                                  # 使用训练集，False为测试集
                                download=True,                               # 下载数据集
                                transform=torchvision.transforms.ToTensor()  # 将图像转换为张量
                                )

# 为数据集创建数据加载器
dataset_loader = DataLoader(
                dataset,        # 要加载的数据对象
                batch_size=16,  # 每次迭代加载的样本数量
                shuffle=True    # 打乱数据顺序
                )

# 从加载器中取出第一批数据
x, y = next(iter(dataset_loader))
print('Input shape:', x.shape)
print('Labels :', y)
plt.imshow(torchvision.utils.make_grid(x)[0], cmap = 'gray')  # 以单通道取出所有图像，拼接成大图并用灰度显示

In [None]:
"""
添加噪声，并对输出结果进行可视化
"""

def corrupt(x, amount):
    """
    根据给定的amount值，向输入张量x中添加噪声，返回添加噪声后的张量
    """
    amount = amount.view(-1, 1, 1, 1)  # 调整amount形状以便广播
    noise = torch.rand_like(x)  # 生成与x形状相同的噪声
    return x * (1 - amount) + noise * amount

# 绘制输入数据
fig, axs = plt.subplots(2, 1, figsize=(12, 5))  # 画布行数，画布列数，画布大小。plt.subplots返回两个方法，第一个是画布对象fig，第二个是子图对象axs
plt.subplots_adjust(hspace=0.4)  # 扩大子图间距
axs[0].set_title('Input Images')
axs[0].imshow(torchvision.utils.make_grid(x)[0], cmap='gray')  # 以单通道取出所有图像，拼接成大图并用灰度显示

# 加入噪声
amount = torch.linspace(0, 1, x.shape[0])  # 在指定的范围内，生成一组等距离的数字，数量与x的Batch_size相同
noised_x = corrupt(x, amount)

# 绘制加入噪声后的图像
axs[1].set_title('Corrupted Images (--- amount increases --->)')
axs[1].imshow(torchvision.utils.make_grid(noised_x)[0], cmap='gray')

In [None]:
class BasicUnet(nn.Module):
    """
    一个简单的UNet网络部署
    """
    def __init__(self, in_channels=1, out_channels=1):
        super().__init__()  # 初始化

        # 下采样路径，包含三个卷积层
        self.down_layers = torch.nn.ModuleList([
            nn.Conv2d(in_channels, 32, kernel_size=5, padding=2),  # 由输入通道数生成32个特征图，卷积核大小为5x5，填充为2以保持尺寸
            nn.Conv2d(32, 64, kernel_size=5, padding=2),  # 由32个特征图生成64个特征图
            nn.Conv2d(64, 64, kernel_size=5, padding=2),  # 不继续上升通道，防止过拟合
        ])

        # 上采样路径，包含三个转置卷积层
        self.up_layers = torch.nn.ModuleList([
            nn.ConvTranspose2d(64, 64, kernel_size=5, padding=2),
            nn.ConvTranspose2d(64, 32, kernel_size=5, padding=2),
            nn.ConvTranspose2d(32, out_channels, kernel_size=5, padding=2),
        ])
        self.act = nn.SiLU()  # 激活函数
        self.downscale = nn.MaxPool2d(2)  # 下采样使用最大池化法，窗口大小为2x2
        self.upscale = nn.Upsample(scale_factor=2)  # 上采样使用插值法
    
    def forward(self, x):
        h = []  # 创建一个空列表，保存下采样前的数据供上采样参考，以免上采样时丢失信息
        # 下采样循环 (Encoder)
        for i, l in enumerate(self.down_layers):
            x = self.act(l(x))        # 卷积 -> 激活
            if i < 2:                 # 前两层卷积层执行以下操作
                h.append(x)           # 把当前特征图存入 h 列表
                x = self.downscale(x) # 池化
        # 上采样循环 (Decoder)
        for i, l in enumerate(self.up_layers):
            if i > 0:                 # 除了第一个上采样层外，其他层都执行以下操作    
                x = self.upscale(x)   # 插值
                x = x + h.pop()       # 与对应的下采样特征图相加（跳跃连接）
            x = self.act(l(x))        # 转置卷积 -> 激活
        return x  # 返回预测的噪声

In [None]:
"""
验证输入和输出的形状是否相同，并查看 UNet 网络的参数量
"""

net = BasicUnet()
x = torch.rand(8, 1, 28, 28)  # 生成形状 (8,1,28,28) 的随机张量
net(x).shape  # 将随机张量丢进网络，查看输出形状是否与输入相同
print(net(x).shape)

sum(p.numel() for p in net.parameters())  # 计算网络的参数量