<a href="https://colab.research.google.com/github/mikonvergence/DiffusionFastForward/blob/master/01-colab-Diffusion-Sandbox.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

> This is part of [DiffusionFastForward](https://github.com/mikonvergence/DiffusionFastForward) course. For more content, please go to https://github.com/mikonvergence/DiffusionFastForward.

# Diffusion Sandbox - 扩散模型实验室

[English]
In this notebook, the intricacies of a denosing diffusion framework are illustrated with the aid of simple snippets.

[中文]
在这个notebook中，我们将通过简单的代码示例来详细说明去噪扩散模型(Denoising Diffusion)的工作原理。扩散模型是一类强大的生成模型，它通过逐步向数据添加噪声，然后学习反向去噪过程来生成新的数据。

First, let's import an image to use for the examples.
首先，让我们导入一张图片作为示例。

In [None]:
! git clone https://github.com/mikonvergence/DiffusionFastForward
!pip install pytorch-lightning==1.9.3 diffusers einops kornia

In [None]:
import sys
sys.path.append('./DiffusionFastForward/')

In [None]:
import torch
import torch.nn.functional as F
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import imageio

mpl.rcParams['figure.figsize'] = (12, 8)

img = torch.FloatTensor(imageio.imread('./DiffusionFastForward/imgs/hills_2.png')/255)
plt.imshow(img)

### Before we start... 开始之前...

[English]
The majority of the diffusion models assume that the images are scaled to the `[-1,+1]` range (which tends to simplify many equations). This tutorial will follow the same approach, so we need to define input and output transformation functions `input_T()` and `output_T()`.

Also, let's define our own `show()` wrapper function that displays the image with automatic output transformation!

[中文]
大多数扩散模型假设图像被缩放到 `[-1,+1]` 的范围内（这样可以简化许多公式）。本教程也将采用这种方法，因此我们需要定义输入和输出转换函数 `input_T()` 和 `output_T()`。

同时，我们还将定义一个自己的 `show()` 包装函数，它可以在显示图像时自动进行输出转换！

In [None]:
def input_T(input):
    # [0,1] -> [-1,+1]
    return 2*input-1
    
def output_T(input):
    # [-1,+1] -> [0,1]
    return (input+1)/2

def show(input):
    plt.imshow(output_T(input).clip(0,1))
    
img_=input_T(img)
show(img_)

### Defining a schedule 定义噪声调度

[English]
The diffusion process is built based on a variance schedule, which determines the levels of added noise at each step of the process. To that end, our schedule is defined below, with the following quantities:

[中文]
扩散过程是基于一个方差调度(variance schedule)构建的，它决定了在过程的每一步中添加噪声的程度。我们的调度包含以下重要参数：

* `betas`:$\beta_t$ 
  - [EN] The variance schedule that controls noise addition at each step
  - [CN] 控制每一步添加噪声量的方差调度

* `alphas`: $\alpha_t=1-\beta_t$
  - [EN] Complement of betas, represents signal preservation at each step
  - [CN] betas的补数，表示每一步中信号保留的程度

* `alphas_sqrt`:  $\sqrt{\alpha_t}$ 
  - [EN] Square root of alphas, used in noise scaling
  - [CN] alphas的平方根，用于噪声缩放

* `alphas_prod`: $\bar{\alpha}_t=\prod_{i=0}^{t}\alpha_i$ 
  - [EN] Cumulative product of alphas up to step t
  - [CN] 从开始到步骤t的alphas累积乘积

* `alphas_prod_sqrt`: $\sqrt{\bar{\alpha}_t}$ 
  - [EN] Square root of cumulative alphas product
  - [CN] 累积alphas乘积的平方根

In [None]:
num_timesteps=1000
betas=torch.linspace(1e-4,2e-2,num_timesteps)

alphas=1-betas
alphas_sqrt=alphas.sqrt()
alphas_cumprod=torch.cumprod(alphas,0)
alphas_cumprod_sqrt=alphas_cumprod.sqrt()

## Forward Process 前向过程

[English]
The forward process $q$ determines how subsequent steps in the diffusion are derived (gradual distortion of the original sample $x_0$).

[中文]
前向过程 $q$ 决定了扩散过程中后续步骤是如何推导的（原始样本 $x_0$ 的逐步扰动）。

📃 First, let's bring up the key equations describing this process...
让我们先来看看描述这个过程的关键方程...

[English]
Basic format of the forward step:
[中文]
前向步骤的基本格式：

$$q(x_t|x_{t−1}) := \mathcal{N}(x_t; \sqrt{1 − \beta_t}x_{t−1}, \beta_tI) \tag{1}$$

[English]
to step directly from $x_0$ to $x_t$:
[中文]
直接从 $x_0$ 到 $x_t$ 的步骤：

$$q(x_t|x_0) = \mathcal{N}(x_t;\sqrt{\bar{\alpha_t}}x_0, (1 − \bar{\alpha_t})I) \tag{2}$$

[中文解释]
这两个方程描述了扩散过程中的噪声添加机制：
1. 方程(1)描述了如何从时间步 t-1 到 t 添加噪声
2. 方程(2)描述了如何直接从原始图像跳转到任意时间步 t
3. $\mathcal{N}$ 表示正态分布，其中第一个参数是均值，第二个参数是方差

### Let's define a function `forward_step()` that will allow us to use both $q(x_t|x_{t-1})$ and  `forward_jump()` for $q(x_t|x_0)$

In [None]:
def forward_step(t, condition_img, return_noise=False):
    """
    [EN] Forward step: t-1 -> t
         Performs a single step in the forward diffusion process
    
    [CN] 前向步骤：从t-1到t
         执行前向扩散过程中的单个步骤
    """    
    assert t >= 0

    # [CN] 计算均值：使用alpha的平方根缩放条件图像
    mean=alphas_sqrt[t]*condition_img    
    # [CN] 计算标准差：使用beta的平方根
    std=betas[t].sqrt()
      
    # [CN] 从正态分布N中采样
    if not return_noise:
        return mean+std*torch.randn_like(img)
    else:
        noise=torch.randn_like(img)
        return mean+std*noise, noise
    
def forward_jump(t, condition_img, condition_idx=0, return_noise=False):
    """
    [EN] Forward jump: 0 -> t
         Directly jumps from x_0 to x_t in the forward diffusion process
    
    [CN] 前向跳跃：从0直接到t
         在前向扩散过程中从x_0直接跳跃到x_t
    """   
    assert t >= 0
    
    # [CN] 计算均值：使用累积alpha的平方根缩放条件图像
    mean=alphas_cumprod_sqrt[t]*condition_img
    # [CN] 计算标准差：使用累积alpha的补数的平方根
    std=(1-alphas_cumprod[t]).sqrt()
      
    # [CN] 从正态分布N中采样
    if not return_noise:
        return mean+std*torch.randn_like(img)
    else:
        noise=torch.randn_like(img)
        return mean+std*noise, noise

In [None]:
N=5 # number of computed states between x_0 and x_T
M=4 # number of samples taken from each distribution

[English]
In the first example, when `t==0`, we want to derive a sample $x_t$ based on the clean sample $x_0$!

The first column shows the mean image for a given stage of the diffusion, and the subsequent columns to the right show several samples taken from the same distribution (they are different if you look closely!)

[中文]
在第一个示例中，当 `t==0` 时，我们要基于干净的样本 $x_0$ 推导出样本 $x_t$！

第一列显示了扩散过程中给定阶段的均值图像，右侧的后续列显示了从同一分布中采样的几个样本（仔细观察的话，它们是不同的！）

[可视化说明]
- 每一行代表扩散过程中的不同时间步
- 最左边的图像是该时间步的均值（确定性的）
- 右边的图像是加入随机噪声后的结果（随机的）
- 从上到下可以看到图像逐渐被噪声破坏的过程.

In [None]:
plt.figure(figsize=(12,8))
for idx in range(N):
    t_step=int(idx*(num_timesteps/N))
    
    plt.subplot(N,1+M,1+(M+1)*idx)
    show(alphas_cumprod_sqrt[t_step]*img_)
    plt.title(r'$\mu_t=\sqrt{\bar{\alpha}_t}x_0$') if idx==0 else None
    plt.ylabel("t: {:.2f}".format(t_step/num_timesteps))
    plt.xticks([])
    plt.yticks([])
    
    for sample in range(M):
        x_t=forward_jump(t_step,img_)
        
        plt.subplot(N,1+M,2+(1+M)*idx+sample)
        show(x_t)        
        plt.axis('off')
        
plt.tight_layout()

Alternatively, we can test the process of going from $x_{t-1}$ to $x_t$, which is a single step in the diffusion process. For that we can use the `forward_step` function.

Note that the mean $\mu_t$ is now a bit different (first column) since it is conditioned on a specific sample of $x_{t-1}$!

In [None]:
plt.figure(figsize=(12,8))
for idx in range(N):
    t_step=int(idx*(num_timesteps/N))
    prev_img=forward_jump(max([0,t_step-1]),img_) # directly go to prev state
    
    plt.subplot(N,1+M,1+(M+1)*idx)
    show(alphas_sqrt[t_step]*prev_img)
    plt.title(r'$\mu_t=\sqrt{1-\beta_t}x_{t-1}$') if idx==0 else None
    plt.ylabel("t: {:.2f}".format(t_step/num_timesteps))
    plt.xticks([])
    plt.yticks([])
    
    for sample in range(M):
        plt.subplot(N,1+M,2+(1+M)*idx+sample)
        x_t=forward_step(t_step,prev_img)
        show(x_t)        
        plt.axis('off')
plt.tight_layout()

## Reverse Process 反向过程

[English]
The purpose of the reverse process $p$ is to approximate the previous step $x_{t-1}$ in the diffusion chain based on a sample $x_t$. In practice, this approximation $p(x_{t-1}|x_t)$ must be done without the knowledge of $x_0$.

[中文]
反向过程 $p$ 的目的是基于样本 $x_t$ 来近似扩散链中的前一步 $x_{t-1}$。在实践中，这个近似 $p(x_{t-1}|x_t)$ 必须在不知道 $x_0$ 的情况下完成。

[English]
A parametrizable prediction model with parameters $\theta$ is used to estimate $p_\theta(x_{t-1}|x_t)$.

[中文]
我们使用一个带参数 $\theta$ 的可参数化预测模型来估计 $p_\theta(x_{t-1}|x_t)$。

[English]
The reverse process will also be (approximately) gaussian if the diffusion steps are small enough:

[中文]
如果扩散步骤足够小，反向过程也将（近似）服从高斯分布：

$$p_\theta(x_{t-1}|x_t) := \mathcal{N}(x_{t-1};\mu_\theta(x_t),\Sigma_\theta(x_t))\tag{3}$$

[English]
In many works, it is assumed that the variance of this distribution should not depend strongly on $x_0$ or $x_t$, but rather on the stage of the diffusion process $t$. This can be observed in the true distribution $q(x_{t-1}|x_t, x_0)$, where the variance of the distribution equals $\tilde{\beta}_t$.

[中文]
在许多研究中，都假设这个分布的方差不应该强烈依赖于 $x_0$ 或 $x_t$，而是主要依赖于扩散过程的阶段 $t$。这一点可以在真实分布 $q(x_{t-1}|x_t, x_0)$ 中观察到，其中分布的方差等于 $\tilde{\beta}_t$。

[补充说明]
反向过程是扩散模型中最关键的部分，因为它实际上就是我们的生成模型。通过学习如何逐步去除噪声，模型最终能够从纯噪声生成有意义的数据。

### Parameterizing $\mu_\theta$ 参数化均值 $\mu_\theta$

[English]
There are at least 3 ways of parameterizing the mean of the reverse step distribution $p_\theta(x_{t-1}|x_t)$:

[中文]
反向步骤分布 $p_\theta(x_{t-1}|x_t)$ 的均值至少有3种参数化方式：

1. [EN] Directly (a neural network will estimate $\mu_\theta$)
   [CN] 直接方式（神经网络直接估计 $\mu_\theta$）

2. [EN] Via $x_0$ (a neural network will estimate $x_0$)
   [CN] 通过 $x_0$ 方式（神经网络估计 $x_0$）
$$\tilde{\mu}_\theta = \frac{\sqrt{\bar{\alpha}_{t-1}}\beta_t}{1-\bar{\alpha}_t}x_0 + \frac{\sqrt{\alpha_t}(1-\bar{\alpha}_{t-1})}{1-\bar{\alpha}_t}x_t\tag{4}$$

3. [EN] Via noise $\epsilon$ subtraction from $x_0$ (a neural network will estimate $\epsilon$)
   [CN] 通过从 $x_0$ 中减去噪声 $\epsilon$ 的方式（神经网络估计 $\epsilon$）
$$x_0=\frac{1}{\sqrt{\bar{\alpha}_t}}(x_t-\sqrt{1-\bar{\alpha}_t}\epsilon)\tag{5}$$

[English]
The approach of approximating the normal noise $\epsilon$ is used most widely.

[中文]
估计正态噪声 $\epsilon$ 的方法是最广泛使用的。这种方法的优势在于它可以让模型直接学习噪声的分布，这通常比学习原始数据或中间状态更容易。

[English]
Let's look at what an example $\epsilon$ might look like:

[中文]
让我们看看一个 $\epsilon$ 噪声的示例是什么样子：

In [None]:
t_step=50

x_t,noise=forward_jump(t_step,img_,return_noise=True)

plt.subplot(1,3,1)
show(img_)
plt.title(r'$x_0$')
plt.axis('off')
plt.subplot(1,3,2)
show(x_t)
plt.title(r'$x_t$')
plt.axis('off')
plt.subplot(1,3,3)
show(noise)
plt.title(r'$\epsilon$')
plt.axis('off')

If $\epsilon$ is predicted correctly, we can use the equation (5) to predict $x_0$:

In [None]:
x_0_pred=(x_t-(1-alphas_cumprod[t_step]).sqrt()*noise)/(alphas_cumprod_sqrt[t_step])

plt.subplot(1,3,1)
show(x_t)
plt.title('$x_t$ ($\ell_1$: {:.3f})'.format(F.l1_loss(x_t,img_)))
plt.axis('off')
plt.subplot(1,3,2)
show(x_0_pred)
plt.title('$x_0$ prediction ($\ell_1$: {:.3f})'.format(F.l1_loss(x_0_pred,img_)))
plt.axis('off') 
plt.subplot(1,3,3)
show(img_)
plt.title('$x_0$')
plt.axis('off')

Approximation (or knowledge) of $x_0$ allows us to approximate the mean of the step $t-1$, using (4).

In [None]:
# estimate mean
mean_pred=x_0_pred*(alphas_cumprod_sqrt[t_step-1]*betas[t_step])/(1-alphas_cumprod[t_step]) + x_t*(alphas_sqrt[t_step]*(1-alphas_cumprod[t_step-1]))/(1-alphas_cumprod[t_step])

# let's compare it to ground truth mean of the previous step (requires knowledge of x_0)
mean_gt=alphas_cumprod_sqrt[t_step-1]*img_

Since reverse process mean estimation $\tilde{\mu}_\theta$ in (4) is effectively linear interpolation between noisy $x_t$ and $x_0$ it is expected to have a higher error (as the additive noise is still present) compared to the mean computed using the forward process (which is computed by scaling the clean sample by a scalar value).

In [None]:
plt.subplot(1,3,1)
show(x_t)
plt.title('$x_t$   ($\ell_1$: {:.3f})'.format(F.l1_loss(x_t,img_)))
plt.subplot(1,3,2)
show(mean_pred)
plt.title(r'$\tilde{\mu}_{t-1}$' + '  ($\ell_1$: {:.3f})'.format(F.l1_loss(mean_pred,img_)))
plt.subplot(1,3,3)
show(mean_gt)
plt.title(r'$\mu_{t-1}$' + '  ($\ell_1$: {:.3f})'.format(F.l1_loss(mean_gt,img_)))

Once we get our `mean_pred` ($\tilde{\mu_{t}}$), we can define our distribution for the previous step

$$\tilde{\beta}_t=\beta_t \tag{6}$$

$$ p_\theta(x_{t-1}|x_t) := \mathcal{N}(x_{t-1};\tilde{\mu}_\theta(x_t,t),\tilde{\beta}_t I) \tag{7}$$

> Important: the experiment below should be treated as a simulation. In practice, the network must  predict either $\epsilon$ or $x_0$ or $\tilde{\mu}_\theta$. Here, the value of $epsilon$ is simply subs

In [None]:
def reverse_step(epsilon, x_t, t_step, return_noise=False):
    """
    [EN] Performs a single step in the reverse diffusion process
         Going from t to t-1 by predicting and removing noise
    
    [CN] 执行反向扩散过程中的单个步骤
         通过预测和移除噪声从t到t-1
    """
    
    # [CN] 基于epsilon估计x_0
    x_0_pred=(x_t-(1-alphas_cumprod[t_step]).sqrt()*epsilon)/(alphas_cumprod_sqrt[t_step])
    if t_step==0:
        # [CN] 如果是最后一步(t=0)，直接返回预测的x_0
        sample=x_0_pred
        noise=torch.zeros_like(x_0_pred)
    else:
        # [CN] 估计均值：使用预测的x_0和当前x_t的线性组合
        mean_pred=x_0_pred*(alphas_cumprod_sqrt[t_step-1]*betas[t_step])/(1-alphas_cumprod[t_step]) + x_t*(alphas_sqrt[t_step]*(1-alphas_cumprod[t_step-1]))/(1-alphas_cumprod[t_step])

        # [CN] 计算方差：使用beta的平方根
        beta_pred=betas[t_step].sqrt() if t_step != 0 else 0

        # [CN] 从预测分布中采样
        sample=mean_pred+beta_pred*torch.randn_like(x_t)
        # [CN] 这个噪声仅用于模拟目的（因为在实际情况下x_0_pred是未知的）
        noise=(sample-x_0_pred*alphas_cumprod_sqrt[t_step-1])/(1-alphas_cumprod[t_step-1]).sqrt()
    
    if return_noise:
        return sample, noise
    else:
        return sample

In [None]:
x_t,noise=forward_jump(1000-1,img_,return_noise=True)

state_imgs=[x_t.numpy()]
for t_step in reversed(range(1000)):
    x_t,noise=reverse_step(noise,x_t,t_step,return_noise=True)
    
    if t_step % 200 == 0:
        state_imgs.append(x_t.numpy())

In [None]:
plt.figure()
for idx,state_img in enumerate(state_imgs):
    plt.subplot(1,len(state_imgs),idx+1)
    show(state_img.clip(-1,1))
    plt.axis('off')
    
plt.tight_layout()

## Packaging into Components 组件封装

[English]
The processes investigated above are neatly packaged into modular components for easier management of the diffusion framework.

First, the forward process component `GaussianForwardProcess` encapsulates the functions of $q(x_t|x_0)$ and $q(x_t|x_{t-1})$.

Below, we can see how different schedules of the variance parameter $\beta$ affect how the noise level changes throughout the progression.

[中文]
上面研究的过程被整齐地打包成模块化组件，以便更容易地管理扩散框架。

首先，前向过程组件 `GaussianForwardProcess` 封装了 $q(x_t|x_0)$ 和 $q(x_t|x_{t-1})$ 的功能。

在下面的示例中，我们可以看到方差参数 $\beta$ 的不同调度方式如何影响噪声水平在整个过程中的变化。

[调度说明]
我们将展示四种不同的噪声调度方案：
1. linear（线性）：噪声线性增加
2. quadratic（二次）：噪声按二次函数增加
3. sigmoid（S型）：噪声按S形曲线增加
4. cosine（余弦）：噪声按余弦函数调整

每种调度方案都会显示：
- 左侧：$\beta_t$ 随时间的变化曲线
- 右侧：5个时间步的图像样本，展示噪声的逐步添加过程

In [None]:
from src import *

D=128
make_white=False
save=False
line_color='black' #'#9EFFB9'
test_img=img[256-D:256+D,512-D:512+D,:]

for schedule in ['linear','quadratic','sigmoid','cosine']:
    fw=GaussianForwardProcess(1000,
                              schedule)

    plt.figure(figsize=(10,2))
    plt.subplot(1,6,1)    
    plt.plot(fw.betas,color=line_color)
    plt.title(schedule,color=line_color)
    plt.xlabel(r'step $t$',color=line_color)
    plt.ylabel(r'$\beta_t$',color=line_color)
    
    if make_white:
        plt.xticks(color='white')
        plt.gca().tick_params(axis='x', colors='white')
        plt.gca().tick_params(axis='y', colors='white')
        plt.gca().spines['top'].set_color('white')
        plt.gca().spines['right'].set_color('white')
        plt.gca().spines['left'].set_color('white')
        plt.gca().spines['bottom'].set_color('white')
    for step in range(5):
        plt.subplot(1,6,step+2)
        plt.imshow(fw(test_img.permute(2,0,1).unsqueeze(0),torch.tensor(step*200))[0].permute(1,2,0))
        plt.axis('off')        
    plt.tight_layout()
    
    
    if save:
        plt.savefig('{}.png'.format(schedule),
                    dpi=200,
                    bbox_inches='tight',
                    pad_inches=0.0,
                    transparent=True)