<a href="https://colab.research.google.com/github/melanieyes/diffusion-model/blob/main/Scalar_Denoising_Diffusion_Probabilistic_Models.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import numpy as np
import torch.optim as optim
import matplotlib.pyplot as plt

# fixate the seed number so i'll have a consistent result
torch.manual_seed(42)
np.random.seed(42)

In [None]:
# root number
x0 = torch.tensor([0.5])

In [None]:
noise = torch.randn_like(x0)
noise

tensor([0.3367])

In [None]:
# betas scheduling
T = 4
beta_start, beta_end = 0.0001, 0.02
betas = torch.linspace(beta_start, beta_end, T)
alphas = 1 - betas
alphas_cumprod = torch.cumprod(alphas, dim=0)
sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
sqrt_one_minus_alphas_cumprod = torch.sqrt(1 - alphas_cumprod)

In [None]:
alphas

tensor([0.9999, 0.9933, 0.9866, 0.9800])

In [None]:
alphas_cumprod

tensor([0.9999, 0.9932, 0.9799, 0.9603])

In [None]:
sqrt_alphas_cumprod

tensor([0.9999, 0.9966, 0.9899, 0.9799])

In [None]:
sqrt_one_minus_alphas_cumprod

tensor([0.0100, 0.0827, 0.1418, 0.1993])

In [None]:
t = 1
x_1 = sqrt_alphas_cumprod[t] * x0 + sqrt_one_minus_alphas_cumprod[t] * noise
x_1

tensor([0.5261])

In [None]:
# Model linear regression: input (x_t, t) --> output: epsilon_pred
class LinearModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = nn.Linear(2, 1)  # input: [x_t, t_norm]

    def forward(self, x_t, t):
        t_norm = t.float().unsqueeze(1) / T
        x = torch.cat([x_t, t_norm], dim=1)
        return self.linear(x)

model = LinearModel()

In [None]:
model.state_dict()

OrderedDict([('linear.weight', tensor([[-0.1549,  0.1427]])),
             ('linear.bias', tensor([-0.3443]))])

In [None]:
loss_fn = nn.MSELoss()

pred_eps = model(x_1.unsqueeze(1), torch.tensor([t]).long())
loss = loss_fn(pred_eps.squeeze(0), noise)
pred_eps, loss

(tensor([[-0.3901]], grad_fn=<AddmmBackward0>),
 tensor(0.5282, grad_fn=<MseLossBackward0>))

In [None]:
optimizer = optim.Adam(model.parameters(), lr=1e-2)
optimizer.zero_grad()
loss.backward()
optimizer.step()

In [None]:
model.state_dict()

OrderedDict([('linear.weight', tensor([[-0.1449,  0.1527]])),
             ('linear.bias', tensor([-0.3343]))])

In [None]:
# training model
steps = 500
for step in range(steps):
    t = torch.randint(0, T, (1,)).long()
    eps = torch.randn_like(x0)
    x_t = sqrt_alphas_cumprod[t] * x0 + sqrt_one_minus_alphas_cumprod[t] * eps

    pred_eps = model(x_t.unsqueeze(1), t)
    loss = loss_fn(pred_eps.squeeze(0), eps)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if (step+1) % 10 == 0:
        print(f"Step {step+1}, Loss: {loss.item():.6f}")

# Forward process
forward_trajectory = []
for t in range(T):
    eps = torch.randn_like(x0)
    x_t = sqrt_alphas_cumprod[t] * x0 + sqrt_one_minus_alphas_cumprod[t] * eps
    forward_trajectory.append(x_t.item())

Step 10, Loss: 0.152132
Step 20, Loss: 0.012964
Step 30, Loss: 0.016697
Step 40, Loss: 0.029058
Step 50, Loss: 0.300508
Step 60, Loss: 0.101107
Step 70, Loss: 2.357594
Step 80, Loss: 0.031770
Step 90, Loss: 0.124409
Step 100, Loss: 0.677176
Step 110, Loss: 0.000328
Step 120, Loss: 0.928795
Step 130, Loss: 1.923289
Step 140, Loss: 1.141241
Step 150, Loss: 0.027901
Step 160, Loss: 0.996438
Step 170, Loss: 1.365081
Step 180, Loss: 0.193380
Step 190, Loss: 3.453658
Step 200, Loss: 2.055219
Step 210, Loss: 0.119034
Step 220, Loss: 0.197162
Step 230, Loss: 0.512264
Step 240, Loss: 0.075301
Step 250, Loss: 1.417635
Step 260, Loss: 0.130039
Step 270, Loss: 0.188805
Step 280, Loss: 0.782253
Step 290, Loss: 0.544903
Step 300, Loss: 0.854366
Step 310, Loss: 0.001799
Step 320, Loss: 0.583609
Step 330, Loss: 3.155837
Step 340, Loss: 0.808562
Step 350, Loss: 0.004719
Step 360, Loss: 1.035679
Step 370, Loss: 0.200521
Step 380, Loss: 0.255355
Step 390, Loss: 0.173701
Step 400, Loss: 0.099638
Step 410,

In [None]:
model.state_dict()

OrderedDict([('linear.weight', tensor([[ 0.6428, -0.0184]])),
             ('linear.bias', tensor([-0.3795]))])

In [None]:
# Sampling from x_T
x_t = torch.tensor([forward_trajectory[-1]])
reverse_trajectory = [x_t.item()]

for t in reversed(range(T)):
    t_tensor = torch.tensor([t]).long()
    alpha_t = alphas[t]
    alpha_bar_t = alphas_cumprod[t]
    sqrt_one_minus_alpha_bar = torch.sqrt(1 - alpha_bar_t)
    sqrt_recip_alpha = torch.sqrt(1 / alpha_t)

    eps_theta = model(x_t.unsqueeze(0), t_tensor).squeeze()
    mean = sqrt_recip_alpha * (x_t - (1 - alpha_t) / sqrt_one_minus_alpha_bar * eps_theta)

    if t > 0:
        noise = torch.randn_like(x_t)
        sigma = torch.sqrt(betas[t])
        x_t = mean + sigma * noise
    else:
        x_t = mean

    reverse_trajectory.append(x_t.item())
    print(x_t.item())

0.3436119556427002
0.150761678814888
0.1634775847196579
0.16623036563396454
