In [None]:
# 这行代码将你的Google Drive挂载到Colab虚拟机上
from google.colab import drive
drive.mount('/content/drive')

#  TODO: 在你的Drive中输入保存了解压后的作业文件夹的路径，
# 例如 'cs231n/assignments/assignment3/'
FOLDERNAME = "cs231n/assignments/assignment3/"
assert FOLDERNAME is not None, "[!] 请输入文件夹名称。"

# 现在我们已经挂载了你的Drive，这确保了
# Colab虚拟机的Python解释器可以从其中加载
# Python文件
import sys
sys.path.append('/content/drive/My Drive/{}'.format(FOLDERNAME))

# # 这行代码会将Emoji数据集下载到你的Drive中
# # 如果它还不存在的话
# %cd /content/drive/My\ Drive/$FOLDERNAME/cs231n/datasets/
# !bash get_datasets.sh
# %cd /content/drive/My\ Drive/$FOLDERNAME

# 去噪扩散概率模型

到目前为止，我们已经探讨了判别模型，这类模型通过训练来生成带标签的输出。从简单的图像分类到句子生成（尽管句子生成问题仍被框架化为分类任务，其标签属于词汇空间，并通过循环机制捕捉多词标签），均属于判别模型的范畴。现在，我们将拓展知识体系，构建一个能够生成与给定训练图像集相似的新图像的生成模型。

生成模型有很多类型，包括生成对抗网络（GAN）、自回归模型、归一化流模型和变分自编码器（VAE），所有这些模型都能合成令人惊叹的图像。然而，在2020年，Ho等人通过将扩散概率模型与去噪分数匹配相结合，提出了去噪扩散概率模型（DDPM）。这一模型既易于训练，又足以生成复杂的高质量图像。以下是DDPM的高层概述，更多细节请参考课程幻灯片和DDPM的原始论文[1]。


### 前向过程
设$q(x_0)$为干净数据集图像的分布。我们将前向加噪过程定义为一个由小加噪步骤组成的马尔可夫链：

$$q(x_t | x_{t-1}) \sim N(x_t; \sqrt{1-\beta_t} x_{t-1}, \beta_t I)$$

其中，逐步方差$(\beta_1, ..., \beta_T)$决定了噪声调度。由于高斯分布的性质，我们可以将$q(x_t | x_0)$以闭合形式表示为：

$$q(x_t | x_0) \sim N(x_t; \sqrt{\bar{\alpha}_t} x_0, (1-\bar{\alpha}_t) I)$$

其中$\alpha_t = 1-\beta_t$且$\bar{\alpha}_t = \prod_{s=1}^{t}\alpha_t$。
如果噪声调度$(\beta_1, ..., \beta_T)$设置得当，最终分布$q(x_T)$将与纯高斯噪声$N(0, I)$无法区分。

回想一下，从高斯分布$x \sim N(\mu, \sigma^2)$中采样等价于计算$\sigma * \epsilon + \mu$，其中$\epsilon \sim N(0, 1)$。因此，给定$x_{t-1}$或$x_0$时，从$q(x_t | x_{t-1})$或$q(x_t | x_0)$中采样是直接可行的。正因为如此，前向过程非常简单，无需学习。


### 反向过程
反向过程通过多个步骤从纯噪声$x_T$重建干净图像$x_0$。设$p(x_{t-1} | x_t)$表示$q(x_t | x_{t-1})$的反向步骤。
第一个关键见解是，学习反转每个单独的去噪步骤比一次性反转整个前向过程更容易。换句话说，学习每个$t$对应的$p(x_{t-1} | x_t)$比直接学习$p(x_0 | x_T)$更容易。

然而，学习$p(x_{t-1} | x_t)$仍然具有挑战性。尽管$q(x_t | x_{t-1})$是高斯分布，但$p(x_{t-1} | x_t)$可能呈现任何复杂形式，并且几乎可以肯定不是高斯分布。对任意分布进行建模和采样，比处理像高斯分布这样简单的参数化分布要困难得多。

第二个关键见解是，如果前向过程中的逐步噪声$\beta_t$足够小，那么反向步骤$p(x_{t-1} | x_t)$也接近高斯分布。因此，我们只需要估计其均值和方差。在实践中，将$p(x_{t-1} | x_t)$的方差设置为与$\beta_t$（前向步骤中的方差）相匹配的效果很好。因此，学习反向过程简化为学习均值$\mu(x_t, t; \theta)$，其中$\theta$表示神经网络的参数。


### 去噪目标
生成模型通过最小化数据集样本的期望负对数似然$\mathbb{E}[-\log{p_\theta(x_0)}]$来优化。每个样本的似然可以表示为：$p_\theta(x_0) = p(x_T)\prod_{t=1}^T p(x_{t-1} | x_t)$。由于这个目标在许多情况下难以处理，各类生成模型转而优化负对数似然的变分下界。

Ho等人证明，该目标等价于最小化以下去噪损失：

$$\mathbb{E}_{t, x_0, \epsilon}\left[ \| \epsilon - \epsilon_\theta (\sqrt{\bar{\alpha}_t}x_0 + \sqrt{1 - \bar{\alpha}_t} \epsilon, t ) \|^2 \right]$$

其中$t$在1到T之间均匀分布，$x_0$是干净样本，$\epsilon$从标准高斯分布$N(0, I)$中采样，$\epsilon_\theta$是一个神经网络模型，经过训练后可从输入的噪声样本$x_t = \sqrt{\bar{\alpha}_t}x_0 + \sqrt{1 - \bar{\alpha}_t} \epsilon$中预测噪声$\epsilon$。换句话说，$\epsilon_\theta$学习对输入的噪声图像进行去噪。请注意，这等价于预测干净样本，因为根据公式$x_t = \sqrt{\bar{\alpha}_t}x_0 + \sqrt{1 - \bar{\alpha}_t} \epsilon$，噪声可以从噪声图像和干净样本中恢复。


[1] Denoising Diffusion Probabilistic Models. Jonathan Ho, Ajay Jain, Pieter Abbeel. [Link](https://arxiv.org/pdf/2006.11239)



# 在本笔记本中...

我们将实现并训练一个DDPM模型，用于生成以文本提示为条件的32×32小尺寸表情图像。首先，我们将基于论文[1]的公式(4)实现前向加噪过程。然后构建一个UNet模型，该模型以$x_t$和$t$作为输入（可选地包含文本提示等其他条件），并输出与$x_t$形状相同的张量。最后，我们将实现去噪目标并训练DDPM模型。

我们使用预训练的CLIP[2]模型中的文本编码器将输入文本编码为512维向量。为加快训练速度，我们已预先编码了训练集的文本数据。

[2] Learning transferable visual models from natural language supervision. Radford et. al. [Link](https://github.com/openai/CLIP)

In [None]:
# 从GitHub仓库安装OpenAI的CLIP库
!pip install git+https://github.com/openai/CLIP.git


In [None]:
# 加载自动重载扩展，便于在代码修改后自动重新加载模块
%load_ext autoreload
# 设置自动重载级别为2（重载所有依赖模块）
%autoreload 2

import numpy as np  # 导入NumPy库，用于数值计算
import torch  # 导入PyTorch库，用于深度学习计算
import random  # 导入random库，用于生成随机数
import matplotlib.pyplot as plt  # 导入matplotlib.pyplot，用于数据可视化
import torchvision.utils as tv_utils  # 导入PyTorch视觉工具包中的工具函数
from cs231n.emoji_dataset import EmojiDataset  # 从cs231n.emoji_dataset模块导入表情数据集类
from cs231n.gaussian_diffusion import GaussianDiffusion  # 从cs231n.gaussian_diffusion模块导入高斯扩散类

def rel_error(x, y):
    """
    计算两个数组的相对误差
    
    参数:
    x, y: 输入的两个数组
    
    返回:
    相对误差的最大值，计算公式为 |x - y| / max(1e-10, |x| + |y|)
    该公式用于衡量两个数组的差异程度，避免除以零的情况
    """
    return np.max(np.abs(x - y) / (np.maximum(1e-10, np.abs(x) + np.abs(y))))


In [None]:
# 首先，我们加载并可视化数据集
# 数据集中的每个样本是一个元组：(图像, {"text_emb": <张量>, "text": <字符串>})
# 我们将使用预训练的文本编码器将文本编码为嵌入向量
# 为了加快训练速度，我们已将数据集的文本预编码为嵌入向量
image_size = 32  # 图像大小设为32x32像素
dataset = EmojiDataset(image_size)  # 实例化表情数据集类，指定图像尺寸


In [None]:
def visualize_samples(dataset, num_samples=25, grid_size=(5, 5)):
    """
    可视化数据集中的样本图像及其对应的文本描述
    
    参数:
    dataset: 表情数据集对象
    num_samples: 要可视化的样本数量，默认25个
    grid_size: 图像网格的行列数，默认(5,5)
    """
    # 从数据集中随机采样指定数量的索引
    indices = random.sample(range(len(dataset)), num_samples)
    samples = [dataset[i] for i in indices]  # 获取对应索引的样本
    
    # 查看单个样本的结构和形状
    img_shape = list(samples[0][0].shape)
    emb_shape = list(samples[0][1]["text_emb"].shape)
    print(f"单个样本结构：(图像: {img_shape}, {{ \"text_emb\": {emb_shape}, \"text\": 字符串 }})")
    
    # 提取样本中的图像和文本描述
    images = torch.stack([sample[0] for sample in samples])  # 将图像堆叠为张量
    texts = [sample[1]["text"] for sample in samples]  # 提取文本描述列表
    
    # 创建图像网格（使用torchvision的工具函数）
    grid_img = tv_utils.make_grid(images, nrow=grid_size[1], padding=2)
    
    # 将张量转换为NumPy数组以便绘图（调整维度顺序：CHW -> HWC）
    grid_img = grid_img.permute(1, 2, 0).numpy()
    
    # 绘制图像网格
    fig, ax = plt.subplots(figsize=(10, 10))
    ax.imshow(grid_img)  # 显示图像网格
    ax.axis("off")  # 关闭坐标轴显示
    
    # 添加文本注释（在每个图像上方显示对应的文本描述）
    grid_w, grid_h = grid_size  # 网格的列数和行数
    img_w, img_h = grid_img.shape[1] // grid_w, grid_img.shape[0] // grid_h  # 单个图像的宽度和高度
    
    for i, text in enumerate(texts):
        row, col = divmod(i, grid_w)  # 计算当前样本在网格中的行列位置
        x, y = col * img_w, row * img_h  # 计算文本注释的坐标
        # 添加文本注释（限制文本长度为30字符，设置字体、颜色和背景框）
        ax.text(x+5, y+5, text[:30], fontsize=8, color='white', bbox=dict(facecolor='black', alpha=0.5))
    
    plt.show()  # 显示图像

visualize_samples(dataset)  # 调用函数可视化数据集样本

## q_sample

现在我们将定义前向加噪过程。请通读`cs231n/gaussian_diffusion.py`中的GaussianDiffusion类。相关公式可参考DDPM原始论文[1]。实现`q_sample`方法并在下方进行测试。你应该能看到相对误差为零。

In [None]:
# 测试GaussianDiffusion.q_sample方法
sz = 2  # 图像尺寸（将生成sz×sz的图像）
b = 3   # 批量大小

# 实例化高斯扩散模型（暂时传入model=None，后续会完善）
diffusion = GaussianDiffusion(
    model=None,
    image_size=sz,          # 输入图像的尺寸
    timesteps=1000,         # 扩散过程的总时间步数
    beta_schedule="sigmoid" # beta参数的调度方式（采用sigmoid曲线）
)

# 定义测试用的时间步（分别取起始、中间、末尾时间步）
t = torch.tensor([0, 300, 999]).long()
# 生成测试用的初始干净图像张量（线性插值生成连续值）
x_start = torch.linspace(-0.9, 0.6, b*3*sz*sz).view(b, 3, sz, sz)
# 生成测试用的噪声张量（线性插值生成连续值）
noise = torch.linspace(-0.7, 0.8, b*3*sz*sz).view(b, 3, sz, sz)
# 调用q_sample方法执行前向加噪过程，获取加噪后的图像
x_t = diffusion.q_sample(x_start, t, noise)

# 预设的期望输出（根据论文公式计算的理论值）
expected_x_t = np.array([
    [
        [[-0.9119949, -0.86840147], [-0.8248081, -0.7812148]],
        [[-0.7376214, -0.694028], [-0.65043473, -0.6068413]],
        [[-0.563248, -0.51965463], [-0.47606122, -0.43246788]],
    ],
    [
        [[-0.42800453, -0.37039882], [-0.31279305, -0.2551873]],
        [[-0.19758154, -0.1399758], [-0.08237009, -0.024764337]],
        [[0.032841414, 0.090447165], [0.14805292, 0.20565866]],
    ],
    [
        [[0.32864183, 0.37152246], [0.41440308, 0.45728368]],
        [[0.50016433, 0.5430449], [0.5859255, 0.6288062]],
        [[0.67168677, 0.7145674], [0.757448, 0.8003287]],
    ],
]).astype(np.float32)

# 计算并打印相对误差（理想情况下应为0，表明实现正确）
print("x_t的误差: ", rel_error(x_t.numpy(), expected_x_t))

In [None]:
# 可视化不同时间步的加噪图像
diffusion = GaussianDiffusion(
    model=None,                  # 暂时不传入模型（后续实现）
    image_size=image_size,       # 图像尺寸（与数据集一致）
    timesteps=1000,              # 总时间步数
)

B = 10  # 批量大小（可视化10张不同加噪程度的图像）

# 获取数据集中的一张图像（形状为3 x H x W）
img = dataset[770][0]
# 扩展批次维度并重复B次，得到B x 3 x H x W的张量
x_start = img[None].repeat(B, 1, 1, 1)
# 生成与x_start形状相同的随机噪声（标准正态分布）
noise = torch.randn_like(x_start)
# 生成B个均匀分布的时间步（从0到999）
t = torch.linspace(0, 1000-1, B).long()

# 对初始图像进行标准化（符合模型输入要求）
x_start = diffusion.normalize(x_start)
# 执行前向加噪过程，得到不同时间步的加噪图像
x_t = diffusion.q_sample(x_start, t, noise)
# 对加噪图像去标准化，并将像素值限制在[0, 1]范围内
x_t = diffusion.unnormalize(x_t).clamp(0, 1)

# 将加噪图像排列成网格（5列，2行）
grid_img = tv_utils.make_grid(x_t, nrow=5, padding=2)
# 转换张量维度（CHW -> HWC）并转为NumPy数组用于绘图
grid_img = grid_img.permute(1, 2, 0).cpu().numpy()

# 绘制图像网格
fig, ax = plt.subplots(figsize=(10, 10))
ax.imshow(grid_img)       # 显示图像网格
ax.axis("off")            # 关闭坐标轴
plt.show()                # 显示图像

扩散模型可以训练为预测干净图像或噪声，因为两者可以相互推导（如上文“去噪目标”部分所述）。实现`predict_start_from_noise`和`predict_noise_from_start`方法，并在下方进行测试。你应该能看到相对误差小于1e-5。

In [None]:
# 测试`predict_noise_from_start`和`predict_start_from_noise`方法
sz = 2  # 图像尺寸（2×2像素）
b = 3   # 批量大小

# 实例化高斯扩散模型（仅用于测试，暂不传入实际模型）
diffusion = GaussianDiffusion(
    model=None,
    image_size=sz,          # 图像尺寸
    timesteps=1000,         # 总时间步数
    beta_schedule="sigmoid" # beta参数的调度方式
)

# 定义测试用的时间步（选取3个不同时间点）
t = torch.tensor([1, 300, 998]).long()
# 生成测试用的初始干净图像张量（线性插值生成连续值）
x_start = torch.linspace(-0.91, 0.67, b*3*sz*sz).view(b, 3, sz, sz)
# 生成测试用的噪声张量（线性插值生成连续值）
noise = torch.linspace(-0.73, 0.81, b*3*sz*sz).view(b, 3, sz, sz)
# 执行前向加噪过程，得到加噪后的图像x_t
x_t = diffusion.q_sample(x_start, t, noise)

# 从x_t、时间步t和干净图像x_start预测噪声
pred_noise = diffusion.predict_noise_from_start(x_t, t, x_start)
# 从x_t、时间步t和噪声预测干净图像
pred_x_start = diffusion.predict_start_from_noise(x_t, t, noise)

# 应看到相对误差在1e-5左右或更小（表明两个方法实现正确）
print("噪声预测误差: ", rel_error(pred_noise.numpy(), noise.numpy()))
print("初始图像预测误差: ", rel_error(pred_x_start.numpy(), x_start.numpy()))

## UNet模型  

既然我们已经定义了前向过程，现在来定义用于对输入图像去噪的UNet模型。UNet是一种专为图像到图像任务（如分割、风格迁移等）设计的神经网络架构。它包含一个编码器（或下采样模块），将输入图像转换为空间分辨率递减、特征维度递增的层次化特征；解码器（或上采样模块）则通过逐步恢复空间分辨率对特征进行上采样，结构与编码器镜像对称。在每个解码器层，对应编码器层的特征会被拼接进来，为细粒度细节提供直接传输路径。这种设计减轻了瓶颈层的负担，使其专注于捕捉高层语义表示，而不必记忆细节信息。  

在此我们使用UNet的原因是：输入和输出都是维度一致的图像（C×H×W）。此外，UNet中的每个ResNet块还会接收一个额外的输入向量作为**条件（context）**——我们将通过编码扩散时间步和文本提示来生成该上下文向量。  

运行下面的单元格可查看UNet架构的大致轮廓：  
- 每个红色方框代表一个ResNet块，包含2-3个卷积层，用于保持特征图的空间分辨率（为清晰起见，省略了每个ResNet块的上下文向量输入）。  
- 方框下方的形状表示该模块的输出张量形状。  
- 额外的箭头表示**跳跃连接**，使U-Net能够在输出中保留细粒度细节。例如，形状为(d, h, w)的`layer1_block1`输出会与同形状的`layer4_block1`输出拼接，再传入`layer4_block2`，因此`layer4_block2`的输入形状为(2*d, h, w)。

In [None]:
# 从IPython.display模块导入Image类，用于显示图像
from IPython.display import Image
# 显示UNet架构图（图像文件存储在Google Drive中）
Image(f'/content/drive/My Drive/{FOLDERNAME}/unet.png')

在`cs231n/unet.py`中实现`Unet.__init__`方法，以定义UNet模型的上采样和下采样模块，然后在下方进行测试。如果实现正确，应不会出现任何错误。调用`Unet(dim=d, condition_dim=condition_dim, dim_mults=(2,4))`应能成功创建与上图所示架构对应的UNet模型。

In [None]:
# 从cs231n.unet模块导入UNet模型、ResNet块、下采样和上采样模块
from cs231n.unet import Unet, ResnetBlock, Downsample, Upsample

dim = 2  # 基础特征维度
condition_dim = 4  # 条件向量维度
dim_mults = (1, 2, 4)  # 维度乘数，用于确定各层特征维度
# 实例化UNet模型，传入基础维度、条件维度和维度乘数
unet = Unet(dim=dim, condition_dim=condition_dim, dim_mults=dim_mults)


# 检查下采样和上采样模块的数量
assert len(unet.downs) == len(dim_mults), "UNet下采样模块数量错误。"
assert len(unet.ups) == len(dim_mults), "UNet上采样模块数量错误。"


# 检查下采样模块的维度配置
try:
    # 预期的下采样模块维度序列
    expected_downs_dims = [2, 2, 8, 2, 2, 8, 2, 2, 8, 2, 2, 8, 4, 4, 8, 4, 4, 8]
    # 提取实际下采样模块的维度信息
    downs_dims = [
        d for m in unet.downs for d in [
            m[0].dim, m[0].dim_out, m[0].context_dim, m[1].dim, m[1].dim_out, m[1].context_dim,
        ]
    ]
    # 断言实际维度与预期一致，否则抛出异常
    assert downs_dims == expected_downs_dims, "维度配置不匹配"
except Exception as e:
    raise RuntimeError("下采样模块配置错误") from e


# 检查上采样模块的维度配置
try:
    # 预期的上采样模块维度序列
    expected_ups_dims = [8, 4, 8, 8, 4, 8, 4, 2, 8, 4, 2, 8, 4, 2, 8, 4, 2, 8]
    # 提取实际上采样模块的维度信息
    ups_dims = [
        d for m in unet.ups for d in [
            m[1].dim, m[1].dim_out, m[1].context_dim, m[2].dim, m[2].dim_out, m[2].context_dim,
        ]
    ]
    # 断言实际维度与预期一致，否则抛出异常
    assert ups_dims == expected_ups_dims, "维度配置不匹配"
except Exception as e:
    raise RuntimeError("上采样模块配置错误") from e

# 检查模型参数数量
num_params = sum(p.numel() for p in unet.parameters())  # 计算模型总参数数量
expected_num_params = 6499  # 预期参数数量
# 断言参数数量与预期一致，否则提示模型创建错误
assert num_params == expected_num_params, "UNet模型创建错误！"


在`cs231n/unet.py`中填写`Unet.forward`方法，并在下方进行测试。目前无需考虑`Unet.cfg_forward`方法。你应该会看到相对误差小于1e-5。

In [None]:
# 设置随机种子以保证结果可复现
np.random.seed(231)
torch.manual_seed(231)

dim = 4  # 基础特征维度
condition_dim = 4  # 条件向量维度
dim_mults = (2, 4)  # 维度乘数
# 实例化UNet模型
unet = Unet(dim=dim, condition_dim=condition_dim, dim_mults=dim_mults)

b = 2  # 批量大小
h = w = 4  # 图像高度和宽度
inp_x = torch.randn(b, 3, h, w)  # 输入图像张量（随机初始化）
inp_text_emb = torch.randn(b, condition_dim)  # 文本嵌入向量（随机初始化）
inp_t = torch.tensor([8, 25]).long()  # 时间步输入
# 执行UNet的前向传播，传入图像、时间步和文本嵌入，获取输出并转为NumPy数组
out = unet.forward(x=inp_x, time=inp_t,
                   model_kwargs={"text_emb": inp_text_emb}).detach().numpy()

# 预期的输出结果（用于验证前向传播实现正确性）
expected_out = np.array(
      [[[[ 0.14615417,  0.36610782,  0.27948245,  0.2229169 ],
         [ 1.0268314 , -0.04441035,  0.33097324,  0.21493062],
         [ 0.15944722,  1.1060286 ,  0.36489075,  0.39395577],
         [ 0.5593624 ,  0.95084137,  0.46409354, -0.15076232]],

        [[ 0.07152754, -0.19067341,  0.36995906,  0.1898715 ],
         [ 0.18764025, -0.37758452,  0.22994985,  0.14644745],
         [ 0.39364466,  0.42091975,  0.75438905, -0.17806   ],
         [ 0.0934296 ,  0.44165182,  0.2768886 ,  0.19760622]],

        [[ 0.39873862,  0.86417544,  0.707601  ,  0.5136454 ],
         [ 0.8151177 ,  0.01816908,  0.64427924,  0.45256743],
         [ 0.6901425 ,  1.0449984 ,  0.8272561 ,  0.38516602],
         [ 0.48775655,  0.91759497,  0.56286275,  0.38452417]]],


       [[[ 0.31076878,  0.25998223,  0.35973004, -0.01464513],
         [ 0.37456402,  0.10733554,  1.1211727 ,  0.596719  ],
         [-0.19628221,  0.49115434,  0.5591996 , -0.02811927],
         [ 0.2980889 ,  0.7983323 ,  0.31545636,  0.1045265 ]],

        [[-0.21484727, -0.11434001,  0.01019827, -0.07907221],
         [-0.14186645,  0.2666731 ,  0.36379665,  0.25780094],
         [ 0.6618308 ,  0.09432775,  0.3441353 ,  0.11780772],
         [ 0.3818162 ,  0.54577625,  0.15127666,  0.2136025 ]],

        [[ 0.23299581,  0.51728034,  0.5330554 ,  0.30019608],
         [ 0.34902877,  0.29055628,  1.1447697 ,  0.5087651 ],
         [ 0.7447357 ,  0.4974355 ,  0.564866  ,  0.4631402 ],
         [ 0.6024195 ,  0.8882342 ,  0.46354175,  0.4344969 ]]]])

# 计算并打印前向传播输出与预期结果的相对误差（应小于1e-5）
print("前向传播误差: ", rel_error(out, expected_out))

# p_losses

既然模型实现已完成，我们来编写DDPM的去噪训练步骤。如前所述，优化去噪损失等价于最小化数据集的期望负对数似然。请在`cs231n/gaussian_diffusion.py`中补全`GaussianDiffusion.p_losses`方法，并在下方进行测试。你应该会看到相对误差小于1e-6。

In [None]:
# 设置随机种子以确保结果可复现
np.random.seed(231)
torch.manual_seed(231)

dim = 4  # 基础特征维度
condition_dim = 4  # 条件向量维度
dim_mults = (2, 4)  # 维度乘数
# 实例化UNet模型
unet = Unet(dim=dim, condition_dim=condition_dim, dim_mults=dim_mults)

h = w = 4  # 图像高度和宽度
b = 3  # 批量大小
# 实例化高斯扩散模型
diffusion = GaussianDiffusion(
    model=unet,  # 传入UNet模型作为去噪模型
    image_size=h,  # 图像尺寸
    timesteps=1000,  # 总时间步数
    beta_schedule="sigmoid",  # beta参数调度方式
    objective="pred_x_start",  # 模型目标：预测初始干净图像
)

# 生成输入图像张量（随机初始化）
inp_x = torch.randn(b, 3, h, w)
# 生成模型所需的额外参数（包含文本嵌入向量）
inp_model_kwargs = {"text_emb": torch.randn(b, condition_dim)}
# 计算去噪损失
out = diffusion.p_losses(inp_x, inp_model_kwargs)
# 预期的损失值（用于验证损失计算的正确性）
expected_out = 30.0732689

# 计算并打印实际损失与预期损失的相对误差（应小于1e-6）
print("前向传播误差: ", rel_error(out.item(), expected_out))

## p_sample

现在只剩下最后一个部分了。DDPM通过迭代执行逆过程来生成样本。逆过程的每一次迭代都涉及从$p(x_{t-1}|x_t)$中采样。打开`cs231n/gaussian_diffusion.py`，根据论文中的公式（6）实现`p_sample`方法。该公式描述了在前向过程的后验分布中进行采样，其条件为$x_t$和$x_0$，其中$x_0$可从去噪模型的输出中推导得到。我们已经实现了`sample`方法，该方法通过迭代调用`p_sample`来从输入文本生成图像。

在下方测试你实现的`p_sample`方法，你应该会看到相对误差小于1e-6。

In [None]:
# 设置随机种子以确保结果可复现
np.random.seed(231)
torch.manual_seed(231)

dim = 4  # 基础特征维度
condition_dim = 4  # 条件向量维度
dim_mults = (2,)  # 维度乘数（单元素元组）
# 实例化UNet模型
unet = Unet(dim=dim, condition_dim=condition_dim, dim_mults=dim_mults)

h = w = 4  # 图像高度和宽度
b = 1  # 批量大小
# 生成t时刻的加噪图像张量（随机初始化）
inp_x_t = torch.randn(b, 3, h, w)
# 生成模型所需的额外参数（包含文本嵌入向量）
inp_model_kwargs = {"text_emb": torch.randn(b, condition_dim)}
t = 231  # 测试用的时间步

# 测试1
# 实例化高斯扩散模型（使用sigmoid调度的beta参数，目标为预测初始图像）
diffusion = GaussianDiffusion(
    model=unet,
    image_size=h,
    timesteps=1000,
    beta_schedule="sigmoid",
    objective="pred_x_start",
)
# 执行p_sample方法，从x_t采样得到x_{t-1}，并转为NumPy数组
out = diffusion.p_sample(inp_x_t, t, inp_model_kwargs).detach().numpy()
# 预期的输出结果（用于验证p_sample实现的正确性）
expected_out = np.array(
    [[[[ 1.1339471 ,  0.12097352, -0.7175048 ,  1.3196243 ],
         [-0.27657282,  0.4899886 ,  1.0170169 , -0.8242867 ],
         [-0.18946372,  0.9899801 ,  0.01498353,  0.39722288],
         [-0.97995025, -0.5947938 , -0.07796463, -0.07311387]],

        [[ 0.0739838 , -1.5537696 ,  0.43128064, -0.7395982 ],
         [-1.0517508 , -1.7030833 ,  0.79073197, -1.217138  ],
         [-0.5314434 ,  0.9862699 ,  0.6568664 , -0.4559122 ],
         [-0.17322278,  0.51251256, -0.75741345, -0.3967054 ]],

        [[ 0.8546979 ,  1.6186953 ,  1.9930652 ,  0.57347   ],
         [ 0.20219846,  0.5374655 , -0.81597316,  1.9089762 ],
         [ 0.7327057 ,  1.19275   ,  1.8593936 , -1.4582647 ],
         [ 0.68447256, -0.9056745 ,  0.7863245 ,  0.14455058]]]])
# 计算并打印实际输出与预期输出的相对误差（应小于1e-6）
print("前向传播误差: ", rel_error(out, expected_out))

# 测试2
# 实例化高斯扩散模型（使用cosine调度的beta参数，目标为预测噪声）
diffusion = GaussianDiffusion(
    model=unet,
    image_size=h,
    timesteps=1000,
    beta_schedule="cosine",
    objective="pred_noise",
)
# 执行p_sample方法，从x_t采样得到x_{t-1}，并转为NumPy数组
out = diffusion.p_sample(inp_x_t, t, inp_model_kwargs).detach().numpy()
# 预期的输出结果（用于验证p_sample实现的正确性）
expected_out = np.array(
    [[[[ 1.1036711 ,  0.08143333, -0.6856102 ,  1.3826138 ],
         [-0.25455472,  0.514572  ,  1.104592  , -0.75972646],
         [-0.22729763,  0.9837706 ,  0.05891411,  0.52049375],
         [-1.0331786 , -0.5416254 , -0.01623197, -0.04838388]],

        [[ 0.08324978, -1.545468  ,  0.41357145, -0.63511896],
         [-1.1362139 , -1.7128816 ,  0.8694859 , -1.2297069 ],
         [-0.49168122,  1.0043695 ,  0.6759953 , -0.5297671 ],
         [-0.10931232,  0.52347076, -0.80946106, -0.5015002 ]],

        [[ 0.7437265 ,  1.590004  ,  1.9481117 ,  0.5656144 ],
         [ 0.22895451,  0.5289113 , -0.8511001 ,  1.8864397 ],
         [ 0.72863096,  1.2271638 ,  1.892699  , -1.5199479 ],
         [ 0.64346373, -0.86913294,  0.7869012 ,  0.12637165]]]])
# 计算并打印实际输出与预期输出的相对误差（应小于1e-6）
print("前向传播误差: ", rel_error(out, expected_out))

## 训练

我们已经具备了DDPM训练所需的所有组件，可以在表情符号数据集上训练模型了。这里不需要你编写任何代码，但建议你查看`cs231n/ddpm_trainer.py`中的训练代码。

在本笔记本的剩余部分，我们将使用`cs231n/exp/pretrained`文件夹中的预训练模型，该模型已经在这个数据集上训练了很多轮次。不过，你也可以在Colab的GPU上训练自己的模型（记得修改`results_folder`）。需要注意的是，在T4 GPU上可能需要超过12小时才能开始生成效果尚可的图像。

In [None]:
# 从cs231n.ddpm_trainer模块导入训练器类
from cs231n.ddpm_trainer import Trainer

dim = 48  # UNet模型的基础特征维度
image_size = 32  # 图像尺寸（32×32像素）
# 训练结果保存路径（位于Google Drive中）
results_folder = f"/content/drive/My Drive/{FOLDERNAME}/cs231n/exp/pretrained"
condition_dim = 512  # 条件向量的维度

# 确定运行设备（优先使用GPU，若无则使用CPU）
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# 实例化UNet模型
model = Unet(
    dim=dim,  # 基础特征维度
    dim_mults=(1, 2, 4, 8),  # 维度乘数，控制各层特征维度的缩放
    condition_dim=condition_dim,  # 条件向量的维度
)
# 打印模型的总参数数量
print("参数数量:", sum(p.numel() for p in model.parameters()))

# 实例化高斯扩散模型
diffusion = GaussianDiffusion(
    model,  # 传入UNet模型作为去噪模型
    image_size=image_size,  # 图像尺寸
    timesteps=100,  # 扩散过程的总时间步数
    objective="pred_noise",  # 模型的训练目标：预测噪声（可选"pred_x_start"表示预测初始干净图像）
)

# 加载表情符号数据集（已调整为指定图像尺寸）
dataset = EmojiDataset(image_size)

# 实例化训练器
trainer = Trainer(
    diffusion,  # 扩散模型
    dataset,  # 训练数据集
    device,  # 训练设备
    train_batch_size=256,  # 训练时的批量大小
    weight_decay=0.0,  # 权重衰减系数（用于正则化）
    train_lr=1e-3,  # 训练学习率
    train_num_steps=50000,  # 训练的总步数
    results_folder=results_folder,  # 训练结果（模型 checkpoint 等）的保存路径
)

# 你不需要自己训练模型（预训练模型已提供）
# trainer.train()


In [None]:
# 我们将加载一个预训练模型（而不是重新训练）
# 加载训练到70000步的模型权重
trainer.load(70000)

In [None]:
# 推理时用于获取CLIP文本嵌入的辅助函数
from cs231n.emoji_dataset import ClipEmbed
clip_embedder = ClipEmbed(device)  # 初始化CLIP文本嵌入器（运行在指定设备上）

def get_text_emb(text):
    # 根据输入文本生成CLIP嵌入向量
    return trainer.ds.embed_new_text(text, clip_embedder)

# 用于可视化生成结果的辅助函数
def show_images(img):
    # img的形状为：B（批量大小）x T（时间步）x 3（通道数）x H（高度）x W（宽度）
    plt.figure(figsize=(10, 10))  # 创建绘图窗口，设置尺寸
    # 调整图像张量维度并转换为NumPy数组，便于显示
    img2 = img.clamp(0, 1).permute(0, 3, 1, 4, 2).flatten(0, 1).flatten(1, 2).cpu().numpy()
    plt.imshow(img2)  # 显示图像
    plt.axis('off')   # 关闭坐标轴

    plt.show()  # 展示图像

## 采样

运行下方单元格，可视化基于文本提示生成的表情符号。你可以随意修改提示词来探索不同的生成结果。由于我们的表情符号数据集规模很小，不足以训练出一个具有完全泛化能力的文本到图像模型。因此，对于未见过的提示词，生成效果可能较差，或者与输入文本不匹配（这种情况在见过的示例中也可能发生，但较少见）。

为了更快地进行采样，你可以使用GPU运行时。如果切换了运行时类型，请确保重新运行整个笔记本。

In [None]:
# text = "crying face"  # 已见过的示例，生成效果好
text = "face with cowboy hat"  # 已见过的示例，生成效果好
# text = "crying face with cowboy hat"  # 未见过的示例，生成效果差
# 获取文本的嵌入向量
text_emb = get_text_emb(text)
# 将文本嵌入向量扩展为批量大小为5，并移动到指定设备
text_emb = text_emb[None].expand(5, -1).to(device)


# 使用扩散模型生成图像
img = trainer.diffusion_model.sample(
    batch_size=5,  # 生成的图像数量
    model_kwargs={"text_emb": text_emb},  # 传入文本嵌入作为条件
    return_all_timesteps=True  # 返回所有时间步的生成结果
)
# 可视化生成过程，每隔20个时间步显示一次
show_images(img[:, ::20])

## 无分类器引导（Classifier Free Guidance）

生成模型的评估通常围绕两个指标：**保真度**（即生成样本的质量或真实感）和**多样性**（样本空间的变异性或覆盖范围）。对于条件生成模型，保真度还指生成样本与输入条件的贴合程度。这两个指标往往相互制约，需要在两者之间进行权衡。Ho等人提出了一种名为“无分类器引导”的简单技术[3]，它能够对这种权衡进行显式控制。


在无分类器引导中，训练条件扩散模型$\epsilon_\theta(x_t, t, c)$时，会以一定概率（通常为0.1到0.2）随机丢弃条件$c$（即替换为$c=\phi$）。在采样的每个去噪步骤中，预测结果会更新为：
$$\epsilon_\theta(x_t, t, c) \leftarrow (w+1) \epsilon_\theta(x_t, t, c) - w \epsilon_\theta(x_t, t, \phi)$$
其中$w$是一个正标量（称为引导尺度）。也就是说，在每个去噪步骤中，我们会进行两次预测：一次是带条件的，一次是无条件的，然后通过线性组合来偏向条件生成结果。$w$是一个超参数，需要根据模型特定的评估指标进行调优。$w$值越高，生成结果与条件的贴合度越好，但往往会降低其多样性。

[3] Classifier-Free Diffusion Guidance. Jonathan Ho, Tim Salimans. [Link](https://arxiv.org/abs/2207.12598)

在`cs231n/unet.py`中实现`Unet.cfg_forward`方法中的无分类器引导，并在下方进行测试。你应该会看到相对误差小于1e-6。

In [None]:
# 设置随机种子以确保结果可复现
np.random.seed(231)
torch.manual_seed(231)

dim = 4  # 基础特征维度
condition_dim = 4  # 条件向量维度
dim_mults = (2, 4)  # 维度乘数
# 实例化UNet模型
unet = Unet(dim=dim, condition_dim=condition_dim, dim_mults=dim_mults)

b = 2  # 批量大小
h = w = 4  # 图像高度和宽度
inp_x = torch.randn(b, 3, h, w)  # 输入图像张量（随机初始化）
inp_text_emb = torch.randn(b, condition_dim)  # 文本嵌入向量（随机初始化）
inp_t = torch.tensor([8, 25]).long()  # 时间步输入
# 执行UNet的前向传播，传入图像、时间步、文本嵌入和引导尺度（cfg_scale），获取输出并转为NumPy数组
out = unet.forward(x=inp_x, time=inp_t,
                   model_kwargs={"text_emb": inp_text_emb, "cfg_scale": 2.31}
                   ).detach().numpy()

# 预期的输出结果（用于验证无分类器引导实现的正确性）
expected_out = np.array(
      [[[[-0.07755187,  0.39913225, -0.616872  ,  0.16161466],
         [ 0.76309466, -0.64505696,  1.1228579 ,  0.1429432 ],
         [-0.58470994,  1.5556629 ,  0.19990933,  0.6726817 ],
         [ 0.34811258,  1.6286248 , -0.57835865, -0.3712303 ]],

        [[-0.2780811 ,  0.09640026,  0.80653083,  0.3257922 ],
         [ 0.49113247, -1.2000966 ,  0.9383536 ,  0.10577369],
         [ 0.5326107 ,  0.38000846,  0.90770614,  0.08911347],
         [-0.2537056 ,  0.6668851 , -0.16009146,  0.4560123 ]],

        [[-0.03857625,  1.2413033 ,  0.89891887,  0.22149336],
         [ 0.9030682 , -1.0636187 ,  1.2424004 ,  0.56415176],
         [ 0.6789831 ,  1.367657  ,  0.84504557,  0.5781751 ],
         [ 0.10814822,  1.3854939 , -0.33456588,  0.34210002]]],


       [[[ 0.17439526, -0.01185328,  0.39814878,  0.2655859 ],
         [ 0.1156677 , -0.29466197,  4.5019875 ,  0.90760195],
         [-0.7210121 ,  0.32611835,  1.262263  , -0.46243155],
         [ 0.05207008,  1.3481442 ,  0.06369245,  0.46200275]],

        [[-0.24512854, -0.08326203,  0.04366357, -0.86336297],
         [-0.9094473 ,  0.36758858,  0.5417196 ,  0.33162278],
         [ 1.233382  ,  0.4753497 ,  1.0248462 , -0.1512323 ],
         [ 0.40446353,  0.77949953, -0.48068368,  0.92509973]],

        [[ 0.22089547,  0.43676746,  0.31286478,  0.273731  ],
         [-0.34253466, -0.18519384,  2.603891  ,  0.6012087 ],
         [ 1.2847279 ,  0.8032987 ,  1.0297089 ,  0.52087414],
         [ 0.5678704 ,  1.1869694 ,  0.09395003,  0.90305966]]]])


# 计算并打印前向传播输出与预期结果的相对误差（应小于1e-6）
print("前向传播误差: ", rel_error(out, expected_out))

运行下方单元格，可视化使用无分类器引导生成的表情符号。你也可以随意修改“cfg_scale”参数的值。如前所述，由于我们的模型泛化能力不佳，即使使用较高的引导尺度，生成结果也可能无法准确贴合条件。


In [None]:
# text = "crying face"  # 已见过的示例，生成效果好
text = "face with cowboy hat"  # 已见过的示例，生成效果好
# text = "crying face with cowboy hat"  # 未见过的示例，生成效果差
# 获取文本的嵌入向量
text_emb = get_text_emb(text)
# 将文本嵌入向量扩展为批量大小为5，并并移动到指定设备
text_emb = text_emb[None].expand(5, -1).to(device)


# 使用扩散扩散模型生成图像
img = trainer.diffusion_model.sample(
    batch_size=5,  # 生成的图像数量
    model_kwargs={"text_emb": text_emb, "cfg_scale": 1},  # 传入文本嵌入和引导尺度参数
    return_all_timesteps=True  # 返回所有时间步的生成结果
)
# 可视化生成过程，每隔20个时间步显示一次
show_images(img[:, ::20])