<a href="https://colab.research.google.com/github/tahamsi/computer-vision/blob/main/generative-models/DDPM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

 [![GitHub](https://badges.aleen42.com/src/github.svg)](https://github.com/tahamsi/computer-vision)

# Denoising Diffusion Probabilistic Models

## 1. Overview
Denoising Diffusion Probabilistic Models ([DDPMs](https://arxiv.org/abs/2006.11239)) are a class of generative models introduced by Jonathan Ho, Ajay Jain, and Pieter Abbeel in 2020. These models generate data by reversing a diffusion process that incrementally adds noise to the data. By learning to reverse this noising process, DDPMs can produce high-quality samples from complex data distributions.

The process can be described in two main phases: the **forward** process (**diffusion**) and the **reverse** process (**generation**).

### Forward Diffusion Process (adding noise)
The forward process is where noise is gradually added to the data over a series of time steps. It starts with an image $𝑥_0$ (e.g., a picture of a cat). And gradually adds Gaussian noise to it over $𝑇$ steps using this formula:

$$q(\mathbf{x}_t \mid \mathbf{x}_0) = \mathcal{N}(\mathbf{x}_t; \sqrt{\bar{\alpha}_t} \, \mathbf{x}_0, (1 - \bar{\alpha}_t) \mathbf{I}).$$

Or

$$ x_t = \sqrt{\bar{\alpha}_t} x_0 + \sqrt{1 - \bar{\alpha}_t} \epsilon.$$

where:
* $\bar{\alpha}_t = \prod_{s=1}^{t} (1 - \beta_t)$
* $\beta_t$: A small variance schedule controlling how much noise is added at step $𝑡$
* $𝑥_𝑡$: Noisy image at step $𝑡$,

* $\epsilon$: Random (Gaussian) noise (like static on a TV).

By the end ($𝑡$ = $𝑇$), the image looks like pure noise.

### Reverse Process (Denoising)
The model (a neural network) learns to reverse this noise-adding process step-by-step to recover the original image or generate a new one. The reverse process is defined as:

$$p_\theta(\mathbf{x}_{t-1} \mid \mathbf{x}_t) = \mathcal{N}(\mathbf{x}_{t-1}; \mu_\theta(\mathbf{x}_t, t), \Sigma_\theta(\mathbf{x}_t, t)).$$

Where:
* $\mu_\theta(\mathbf{x}_t, t)$: The predicted mean, parameterized by a neural network.
* $\Sigma_\theta(\mathbf{x}_t, t)$: The variance, often fixed for simplicity.

In another word, at each step, the model predicts the noise $\epsilon_θ(x_t,t)$ using a neural network and removes it:

$$x_{t-1} = \frac{1}{\sqrt{\alpha_t}} \left( x_t - \sqrt{1 - \alpha_t} \, \epsilon_\theta(x_t, t) \right).$$

where:
* $x_{t-1}$: The less noisy image at step $𝑡-1$,
* $\epsilon_\theta(x_t, t)$: Noise predicted by the model.

The process is repeated until a clean image is generated.


### Loss Function
To train the model, we calculate how well it predicts the noise $ϵ$. The simplified loss function is:
$$\mathcal{L}_t = \mathbb{E}_q \left[ \| \epsilon - \epsilon_\theta(x_t, t) \|^2 \right].$$

This measures the difference between:

* $\epsilon$: The actual noise added,
* $\epsilon_\theta(x_t, t)$: The noise predicted by the model.

### Full Objective
The total loss over all $𝑇$ timesteps is:

$$\mathcal{L} = \sum_{t=1}^T \mathcal{L}_t = \mathbb{E}_q \left[ \sum_{t=1}^T \| \epsilon - \epsilon_\theta(x_t, t) \|^2 \right]$$


###Training

\\(\mathbf{x}_0\\) represents the initial real and uncorrupted image, while \\(t\\) denotes the noise level sampled through the fixed forward process. \\(\mathbf{\epsilon}\\) corresponds to the true Gaussian noise sampled at time step \\(t\\), and \\(\mathbf{\epsilon}_\theta (\mathbf{x}_t, t)\\) is the output of the neural network. The network is trained to minimize the mean squared error (MSE) between the true Gaussian noise \\(\mathbf{\epsilon}\\) and its predicted counterpart \\(\mathbf{\epsilon}_\theta (\mathbf{x}_t, t)\\).

The training algorithm now looks as follows:

> **Algorithm 1**
> 1. **repeat**
> 2. $x_0 \sim q(x_0)$
> 3. $t \sim Uniform({1,...,T})$
> 4. $ ϵ ∼ \mathcal{N}(0,\mathbf{I})$
> 5. Take gradient descent step on
>   * $\nabla_θ\| \epsilon - \epsilon_\theta(\sqrt{\bar{\alpha}_t} x_0 + \sqrt{1 - \bar{\alpha}_t} \epsilon, t) \|^2$
> 6. **until** converged


In other words:

* A random sample \\(\mathbf{x}_0\\) is taken from the real, unknown, and potentially complex data distribution \\(q(\mathbf{x}_0)\\).
* A noise level \\(t\\) is sampled uniformly from the range \\([1, T]\\), representing a random time step.
* Noise is sampled from a Gaussian distribution, and the input \\(\mathbf{x}_0\\) is corrupted at level \\(t\\) using the property defined earlier.
* The neural network is trained to predict the noise based on the corrupted image \\(\mathbf{x}_t\\), which is \\(\mathbf{x}_0\\) corrupted according to a predefined schedule \\(\beta_t\\).


<p align="center">
    <img src="https://raw.githubusercontent.com/tahamsi/computer-vision/refs/heads/main/images/ddpm.jpeg"/>
</p>

### The neural network
A noised image at a specific time step is provided to the neural network, and the predicted noise is returned as output. The predicted noise is represented as a tensor with the same size and resolution as the input image. Thus, the network is required to process and output tensors of identical shapes. The question arises: what type of neural network is suitable for this task?

Autoencoders are characterized by a "*bottleneck*" layer located between the encoder and decoder. The encoder compresses an image into a smaller hidden representation called the "bottleneck," while the decoder reconstructs the hidden representation back into an image. This architecture compels the network to retain only the most essential information in the bottleneck layer.

For this purpose, the DDPM authors selected a **U-Net** architecture, originally introduced by [Ronneberger et al., 2015](https://arxiv.org/abs/1505.04597), which at the time demonstrated state-of-the-art performance in medical image segmentation. Like autoencoders, the U-Net architecture includes a bottleneck in the middle to ensure that only critical information is learned by the network. Additionally, U-Net introduced residual connections between the encoder and decoder, significantly enhancing gradient flow.

<p align="center">
    <img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/unet-model.png" width="500" />
</p>

source: [huggingface](https://huggingface.co)

### Sampling

The model will be sampled during training (to track progress), the corresponding code is defined below based on Algorithm 2 in the [paper](https://arxiv.org/abs/2006.11239).


> **Algorithm 2**
> * $x_T \sim \mathcal{N}(0,\mathbf{I})$
> * `for` $t$ `in` ${T,...,1}$:
>   * `if` $t>1$:
>     * $ ϵ ∼ \mathcal{N}(0,\mathbf{I})$
>   * `else`:
>     * $ϵ = 0$
>   * $x_{t-1} = \frac{1}{\sqrt{\alpha_t}} \left( x_t - \frac{1-α_t}{\sqrt{1 - \alpha_t}} \, \epsilon_\theta(x_t, t) \right) + σ_t ϵ$
> * `return` $x_0$

New images are generated from a diffusion model by reversing the diffusion process. The process begins at time step \\(T\\), where pure noise is sampled from a Gaussian distribution. The noise is then gradually denoised using the neural network, which leverages the conditional probabilities it has learned, until time step \\(t = 0\\) is reached. As described earlier, a slightly less denoised image \\(\mathbf{x}_{t-1}\\) can be derived by substituting the reparameterized mean into the noise predictor. It should be noted that the variance is predetermined. Ideally, an image is produced that appears as though it originated from the real data distribution.



### Why It Works
* The model learns to reverse the noisy process, step by step.
* By minimizing $\mathcal{L}$, it becomes better at reconstructing $x_0$ or generating new, realistic data from noise.


### Key Ideas in Simple Terms
* Forward Process: Gradually make the image noisier.
* Reverse Process: Teach the model to remove the noise step-by-step.
* Final Output: A clear image generated from random noise.


### Why It’s Powerful
* Produces high-quality images.
* Stable to train compared to other methods like GANs.
* Can handle tasks like:
 * Image generation (e.g., create art from scratch).
 *Denoising (e.g., clean up blurry or noisy images).
 *Inpainting (e.g., fill in missing parts of an image).

This simplified process is the foundation of tools like [Stable Diffusion](https://github.com/tahamsi/computer-vision/blob/main/generative-models/Image_Generation_with_Stable_Diffusion.ipynb) for generating images from text!

## 2. Implimentation in pytorch

### Before you start

Let's make sure that we have access to `GPU`. We can use `nvidia-smi` command to do that. In case of any problems navigate to `Edit -> Notebook settings -> Hardware accelerator`, set it to `GPU`, and then click `Save`.

In [None]:
!nvidia-smi

### Install required libraries

In [None]:
!pip install --upgrade einops datasets matplotlib tqdm
from IPython import display
display.clear_output()

### Import libraries

In [None]:
import math
from inspect import isfunction
from functools import partial

%matplotlib inline
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
from einops import rearrange

import torch
from torch import nn, einsum
import torch.nn.functional as F

### Network helpers

First, some helper functions and classes are defined, which will be utilized in the implementation of the neural network. Notably, a `Residual` module is defined, which adds the input to the output of a specific function (i.e., a residual connection is added to the function).

In [None]:
def exists(x):
    return x is not None

def default(val, d):
    if exists(val):
        return val
    return d() if isfunction(d) else d

class Residual(nn.Module):
    def __init__(self, fn):
        super().__init__()
        self.fn = fn

    def forward(self, x, *args, **kwargs):
        return self.fn(x, *args, **kwargs) + x

def Upsample(dim):
    return nn.ConvTranspose2d(dim, dim, 4, 2, 1)

def Downsample(dim):
    return nn.Conv2d(dim, dim, 4, 2, 1)

### Position embeddings
Since the parameters of the neural network are shared across time (noise levels), sinusoidal position embeddings are employed to encode \(t\), following the approach inspired by the Transformer ([Vaswani et al., 2017](https://arxiv.org/abs/1706.03762)). This allows the neural network to "understand" the specific time step (noise level) at which it is operating for each image in a batch.

The `SinusoidalPositionEmbeddings` module takes a tensor of shape \\((\text{batch_size}, 1)\\) as input, representing the noise levels of multiple noisy images in a batch, and transforms it into a tensor of shape \\((\text{batch_size}, \text{dim})\\), where \\(\text{dim}\\) is the dimensionality of the position embeddings. These embeddings are then added to each residual block, as will be demonstrated later.

In [None]:
class SinusoidalPositionEmbeddings(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim

    def forward(self, time):
        device = time.device
        half_dim = self.dim // 2
        embeddings = math.log(10000) / (half_dim - 1)
        embeddings = torch.exp(torch.arange(half_dim, device=device) * -embeddings)
        embeddings = time[:, None] * embeddings[None, :]
        embeddings = torch.cat((embeddings.sin(), embeddings.cos()), dim=-1)
        return embeddings

### ResNet/ConvNeXT block

Next, the core building block of the U-Net model is defined.

In [None]:
class Block(nn.Module):
    def __init__(self, dim, dim_out, groups = 8):
        super().__init__()
        self.proj = nn.Conv2d(dim, dim_out, 3, padding = 1)
        self.norm = nn.GroupNorm(groups, dim_out)
        self.act = nn.SiLU()

    def forward(self, x, scale_shift = None):
        x = self.proj(x)
        x = self.norm(x)

        if exists(scale_shift):
            scale, shift = scale_shift
            x = x * (scale + 1) + shift

        x = self.act(x)
        return x

class ResnetBlock(nn.Module):
    def __init__(self, dim, dim_out, *, time_emb_dim=None, groups=8):
        super().__init__()
        self.mlp = (
            nn.Sequential(nn.SiLU(), nn.Linear(time_emb_dim, dim_out))
            if exists(time_emb_dim)
            else None
        )

        self.block1 = Block(dim, dim_out, groups=groups)
        self.block2 = Block(dim_out, dim_out, groups=groups)
        self.res_conv = nn.Conv2d(dim, dim_out, 1) if dim != dim_out else nn.Identity()

    def forward(self, x, time_emb=None):
        h = self.block1(x)

        if exists(self.mlp) and exists(time_emb):
            time_emb = self.mlp(time_emb)
            h = rearrange(time_emb, "b c -> b c 1 1") + h

        h = self.block2(h)
        return h + self.res_conv(x)

class ConvNextBlock(nn.Module):

    def __init__(self, dim, dim_out, *, time_emb_dim=None, mult=2, norm=True):
        super().__init__()
        self.mlp = (
            nn.Sequential(nn.GELU(), nn.Linear(time_emb_dim, dim))
            if exists(time_emb_dim)
            else None
        )

        self.ds_conv = nn.Conv2d(dim, dim, 7, padding=3, groups=dim)

        self.net = nn.Sequential(
            nn.GroupNorm(1, dim) if norm else nn.Identity(),
            nn.Conv2d(dim, dim_out * mult, 3, padding=1),
            nn.GELU(),
            nn.GroupNorm(1, dim_out * mult),
            nn.Conv2d(dim_out * mult, dim_out, 3, padding=1),
        )

        self.res_conv = nn.Conv2d(dim, dim_out, 1) if dim != dim_out else nn.Identity()

    def forward(self, x, time_emb=None):
        h = self.ds_conv(x)

        if exists(self.mlp) and exists(time_emb):
            assert exists(time_emb), "time embedding must be passed in"
            condition = self.mlp(time_emb)
            h = h + rearrange(condition, "b c -> b c 1 1")

        h = self.net(h)
        return h + self.res_conv(x)

### Attention module

Next, the attention module, which was added by the DDPM authors between the convolutional blocks, is defined. Attention is the building block of the famous Transformer architecture ([Vaswani et al., 2017](https://arxiv.org/abs/1706.03762)), which has shown great success in various domains of AI, from NLP and vision to [protein folding](https://www.deepmind.com/blog/alphafold-a-solution-to-a-50-year-old-grand-challenge-in-biology).

In [None]:
class Attention(nn.Module):
    def __init__(self, dim, heads=4, dim_head=32):
        super().__init__()
        self.scale = dim_head**-0.5
        self.heads = heads
        hidden_dim = dim_head * heads
        self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, bias=False)
        self.to_out = nn.Conv2d(hidden_dim, dim, 1)

    def forward(self, x):
        b, c, h, w = x.shape
        qkv = self.to_qkv(x).chunk(3, dim=1)
        q, k, v = map(
            lambda t: rearrange(t, "b (h c) x y -> b h c (x y)", h=self.heads), qkv
        )
        q = q * self.scale

        sim = einsum("b h d i, b h d j -> b h i j", q, k)
        sim = sim - sim.amax(dim=-1, keepdim=True).detach()
        attn = sim.softmax(dim=-1)

        out = einsum("b h i j, b h d j -> b h i d", attn, v)
        out = rearrange(out, "b h (x y) d -> b (h d) x y", x=h, y=w)
        return self.to_out(out)

class LinearAttention(nn.Module):
    def __init__(self, dim, heads=4, dim_head=32):
        super().__init__()
        self.scale = dim_head**-0.5
        self.heads = heads
        hidden_dim = dim_head * heads
        self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, bias=False)

        self.to_out = nn.Sequential(nn.Conv2d(hidden_dim, dim, 1),
                                    nn.GroupNorm(1, dim))

    def forward(self, x):
        b, c, h, w = x.shape
        qkv = self.to_qkv(x).chunk(3, dim=1)
        q, k, v = map(
            lambda t: rearrange(t, "b (h c) x y -> b h c (x y)", h=self.heads), qkv
        )

        q = q.softmax(dim=-2)
        k = k.softmax(dim=-1)

        q = q * self.scale
        context = torch.einsum("b h d n, b h e n -> b h d e", k, v)

        out = torch.einsum("b h d e, b h d n -> b h e n", context, q)
        out = rearrange(out, "b h c (x y) -> b (h c) x y", h=self.heads, x=h, y=w)
        return self.to_out(out)

### Group normalization

The convolutional and attention layers of the U-Net are interleaved with group normalization by the DDPM authors. A `PreNorm` class is defined below, which is used to apply group normalization before the attention layer, as will be demonstrated later.

In [None]:
class PreNorm(nn.Module):
    def __init__(self, dim, fn):
        super().__init__()
        self.fn = fn
        self.norm = nn.GroupNorm(1, dim)

    def forward(self, x):
        x = self.norm(x)
        return self.fn(x)

### Conditional U-Net

Now that all the building blocks have been defined (position embeddings, ResNet/ConvNeXT blocks, attention, and group normalization), the entire neural network can be constructed. The purpose of the network \\(\mathbf{\epsilon}_\theta(\mathbf{x}_t, t)\\) is to process a batch of noisy images and corresponding noise levels and output the noise added to the input. Formally:

A batch of noisy images with shape `(batch_size, num_channels, height, width)` and a batch of noise levels with shape `(batch_size, 1)` are provided as input, and a tensor of shape `(batch_size, num_channels, height, width)` is returned as output.

The network is structured as follows:

* First, a convolutional layer is applied to the batch of noisy images, and position embeddings are computed for the noise levels.
* Next, a sequence of downsampling stages is applied. Each downsampling stage consists of two ResNet/ConvNeXT blocks, group normalization, attention, residual connections, and a downsampling operation.
* At the middle of the network, ResNet or ConvNeXT blocks are applied, interleaved with attention layers.
* Subsequently, a sequence of upsampling stages is applied. Each upsampling stage consists of two ResNet/ConvNeXT blocks, group normalization, attention, residual connections, and an upsampling operation.
* Finally, a ResNet/ConvNeXT block is applied, followed by a convolutional layer.

In [None]:
class Unet(nn.Module):
    def __init__(
        self,
        dim,
        init_dim=None,
        out_dim=None,
        dim_mults=(1, 2, 4, 8),
        channels=3,
        with_time_emb=True,
        resnet_block_groups=8,
        use_convnext=True,
        convnext_mult=2,
    ):
        super().__init__()

        # determine dimensions
        self.channels = channels

        init_dim = default(init_dim, dim // 3 * 2)
        self.init_conv = nn.Conv2d(channels, init_dim, 7, padding=3)

        dims = [init_dim, *map(lambda m: dim * m, dim_mults)]
        in_out = list(zip(dims[:-1], dims[1:]))

        if use_convnext:
            block_klass = partial(ConvNextBlock, mult=convnext_mult)
        else:
            block_klass = partial(ResnetBlock, groups=resnet_block_groups)

        # time embeddings
        if with_time_emb:
            time_dim = dim * 4
            self.time_mlp = nn.Sequential(
                SinusoidalPositionEmbeddings(dim),
                nn.Linear(dim, time_dim),
                nn.GELU(),
                nn.Linear(time_dim, time_dim),
            )
        else:
            time_dim = None
            self.time_mlp = None

        # layers
        self.downs = nn.ModuleList([])
        self.ups = nn.ModuleList([])
        num_resolutions = len(in_out)

        for ind, (dim_in, dim_out) in enumerate(in_out):
            is_last = ind >= (num_resolutions - 1)

            self.downs.append(
                nn.ModuleList(
                    [
                        block_klass(dim_in, dim_out, time_emb_dim=time_dim),
                        block_klass(dim_out, dim_out, time_emb_dim=time_dim),
                        Residual(PreNorm(dim_out, LinearAttention(dim_out))),
                        Downsample(dim_out) if not is_last else nn.Identity(),
                    ]
                )
            )

        mid_dim = dims[-1]
        self.mid_block1 = block_klass(mid_dim, mid_dim, time_emb_dim=time_dim)
        self.mid_attn = Residual(PreNorm(mid_dim, Attention(mid_dim)))
        self.mid_block2 = block_klass(mid_dim, mid_dim, time_emb_dim=time_dim)

        for ind, (dim_in, dim_out) in enumerate(reversed(in_out[1:])):
            is_last = ind >= (num_resolutions - 1)

            self.ups.append(
                nn.ModuleList(
                    [
                        block_klass(dim_out * 2, dim_in, time_emb_dim=time_dim),
                        block_klass(dim_in, dim_in, time_emb_dim=time_dim),
                        Residual(PreNorm(dim_in, LinearAttention(dim_in))),
                        Upsample(dim_in) if not is_last else nn.Identity(),
                    ]
                )
            )

        out_dim = default(out_dim, channels)
        self.final_conv = nn.Sequential(
            block_klass(dim, dim), nn.Conv2d(dim, out_dim, 1)
        )

    def forward(self, x, time):
        x = self.init_conv(x)

        t = self.time_mlp(time) if exists(self.time_mlp) else None

        h = []

        # downsample
        for block1, block2, attn, downsample in self.downs:
            x = block1(x, t)
            x = block2(x, t)
            x = attn(x)
            h.append(x)
            x = downsample(x)

        # bottleneck
        x = self.mid_block1(x, t)
        x = self.mid_attn(x)
        x = self.mid_block2(x, t)

        # upsample
        for block1, block2, attn, upsample in self.ups:
            x = torch.cat((x, h.pop()), dim=1)
            x = block1(x, t)
            x = block2(x, t)
            x = attn(x)
            x = upsample(x)

        return self.final_conv(x)

### Defining the forward diffusion process

The forward diffusion process is designed to gradually add noise to an image from the real distribution over a series of time steps \\(T\\). This process is governed by a variance schedule. A linear schedule was employed by the original DDPM authors, as follows:

>The forward process variances are set to constants that increase linearly from \\(\beta_1 = 10^{-4}\\) to \\(\beta_T = 0.02\\).

Various schedules for the \\(T\\) time steps are defined below, along with corresponding variables, such as cumulative variances, which will be required.

In [None]:
def cosine_beta_schedule(timesteps, s=0.008):

    steps = timesteps + 1
    x = torch.linspace(0, timesteps, steps)
    alphas_cumprod = torch.cos(((x / timesteps) + s) / (1 + s) * torch.pi * 0.5) ** 2
    alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
    betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
    return torch.clip(betas, 0.0001, 0.9999)

def linear_beta_schedule(timesteps):
    beta_start = 0.0001
    beta_end = 0.02
    return torch.linspace(beta_start, beta_end, timesteps)

def quadratic_beta_schedule(timesteps):
    beta_start = 0.0001
    beta_end = 0.02
    return torch.linspace(beta_start**0.5, beta_end**0.5, timesteps) ** 2

def sigmoid_beta_schedule(timesteps):
    beta_start = 0.0001
    beta_end = 0.02
    betas = torch.linspace(-6, 6, timesteps)
    return torch.sigmoid(betas) * (beta_end - beta_start) + beta_start

To begin, the linear schedule is used for \\(T=200\\) time steps, and the various variables derived from \\(\beta_t\\), such as the cumulative product of the variances \\(\bar{\alpha}_t\\), are defined. Each of these variables is represented as a one-dimensional tensor, storing values from \\(t\\) to \\(T\\). Additionally, an extract function is defined to enable the extraction of the appropriate \\(t\\) index for a given batch of indices.

In [None]:
timesteps = 200

# define beta schedule
betas = linear_beta_schedule(timesteps=timesteps)

# define alphas
alphas = 1. - betas
alphas_cumprod = torch.cumprod(alphas, axis=0)
alphas_cumprod_prev = F.pad(alphas_cumprod[:-1], (1, 0), value=1.0)
sqrt_recip_alphas = torch.sqrt(1.0 / alphas)

# calculations for diffusion q(x_t | x_{t-1}) and others
sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
sqrt_one_minus_alphas_cumprod = torch.sqrt(1. - alphas_cumprod)

# calculations for posterior q(x_{t-1} | x_t, x_0)
posterior_variance = betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod)

def extract(a, t, x_shape):
    batch_size = t.shape[0]
    out = a.gather(-1, t.cpu())
    return out.reshape(batch_size, *((1,) * (len(x_shape) - 1))).to(t.device)

In [None]:
from PIL import Image
import requests

url = 'https://raw.githubusercontent.com/tahamsi/computer-vision/refs/heads/main/images/London_bridge.jpg'
image = Image.open(requests.get(url, stream=True).raw)
image

Noise is added to PyTorch tensors rather than Pillow images. To facilitate this, image transformations are defined to convert a PIL image into a PyTorch tensor (on which noise can be added) and back to a PIL image.

These transformations are relatively straightforward: images are first normalized by dividing by \\(255\\) to bring them into the \\([0, 1]\\) range and then scaled to the \\([-1, 1]\\) range. From the DDPM paper:

> Image data is assumed to consist of integers in \\({0, 1, ..., 255}\\) scaled linearly to \\([-1, 1]\\). This ensures that the neural network reverse process operates on consistently scaled inputs, starting from the standard normal prior \\(p(\mathbf{x}_T)\\).

In [None]:
from torchvision.transforms import Compose, ToTensor, Lambda, ToPILImage, CenterCrop, Resize

image_size = 128
transform = Compose([
    Resize(image_size),
    CenterCrop(image_size),
    ToTensor(), # turn into Numpy array of shape HWC, divide by 255
    Lambda(lambda t: (t * 2) - 1),

])

x_start = transform(image).unsqueeze(0)
x_start.shape

In [None]:
import numpy as np

reverse_transform = Compose([
     Lambda(lambda t: (t + 1) / 2),
     Lambda(lambda t: t.permute(1, 2, 0)), # CHW to HWC
     Lambda(lambda t: t * 255.),
     Lambda(lambda t: t.numpy().astype(np.uint8)),
     ToPILImage(),
])

In [None]:
reverse_transform(x_start.squeeze())

In [None]:
# forward diffusion
def q_sample(x_start, t, noise=None):
    if noise is None:
        noise = torch.randn_like(x_start)

    sqrt_alphas_cumprod_t = extract(sqrt_alphas_cumprod, t, x_start.shape)
    sqrt_one_minus_alphas_cumprod_t = extract(
        sqrt_one_minus_alphas_cumprod, t, x_start.shape
    )

    return sqrt_alphas_cumprod_t * x_start + sqrt_one_minus_alphas_cumprod_t * noise

In [None]:
def get_noisy_image(x_start, t):
  # add noise
  x_noisy = q_sample(x_start, t=t)

  # turn back into PIL image
  noisy_image = reverse_transform(x_noisy.squeeze())

  return noisy_image

In [None]:
# take time step
t = torch.tensor([40])

get_noisy_image(x_start, t)

###Visualisation

In [None]:
import matplotlib.pyplot as plt

# use seed for reproducability
torch.manual_seed(0)

def plot(imgs, with_orig=False, row_title=None, **imshow_kwargs):
    if not isinstance(imgs[0], list):
        # Make a 2d grid even if there's just 1 row
        imgs = [imgs]

    num_rows = len(imgs)
    num_cols = len(imgs[0]) + with_orig
    fig, axs = plt.subplots(figsize=(200,200), nrows=num_rows, ncols=num_cols, squeeze=False)
    for row_idx, row in enumerate(imgs):
        row = [image] + row if with_orig else row
        for col_idx, img in enumerate(row):
            ax = axs[row_idx, col_idx]
            ax.imshow(np.asarray(img), **imshow_kwargs)
            ax.set(xticklabels=[], yticklabels=[], xticks=[], yticks=[])

    if with_orig:
        axs[0, 0].set(title='Original image')
        axs[0, 0].title.set_size(8)
    if row_title is not None:
        for row_idx in range(num_rows):
            axs[row_idx, 0].set(ylabel=row_title[row_idx])

    plt.tight_layout()

In [None]:
plot([get_noisy_image(x_start, torch.tensor([t])) for t in [0, 50, 100, 150, 199]])

In [None]:
def p_losses(denoise_model, x_start, t, noise=None, loss_type="l1"):
    if noise is None:
        noise = torch.randn_like(x_start)

    x_noisy = q_sample(x_start=x_start, t=t, noise=noise)
    predicted_noise = denoise_model(x_noisy, t)

    if loss_type == 'l1':
        loss = F.l1_loss(noise, predicted_noise)
    elif loss_type == 'l2':
        loss = F.mse_loss(noise, predicted_noise)
    elif loss_type == "huber":
        loss = F.smooth_l1_loss(noise, predicted_noise)
    else:
        raise NotImplementedError()

    return loss

The `denoise_model` will be our U-Net defined above. We'll employ the Huber loss between the true and the predicted noise.

### Training

Each image is resized to a uniform size. Notably, images are also randomly flipped horizontally. As stated in the paper:

> Random horizontal flips were used during training for `CIFAR10`. Training was conducted both with and without flips, and it was observed that flips slightly improved sample quality.

In [None]:
from datasets import load_dataset

# load dataset from the hub
dataset = load_dataset("fashion_mnist")
image_size = 28
channels = 1
batch_size = 128
display.clear_output()

Next, a function is defined to be applied on-the-fly to the entire dataset. The `with_transform` [functionality](https://huggingface.co/docs/datasets/v2.2.1/en/package_reference/main_classes#datasets.Dataset.with_transform) is utilized for this purpose. This function performs basic image preprocessing, including random horizontal flips, rescaling, and scaling the values to the \([-1, 1]\) range.

In [None]:
from torchvision import transforms
from torch.utils.data import DataLoader

# define image transformations (e.g. using torchvision)
transform = Compose([
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Lambda(lambda t: (t * 2) - 1)
])

# define function
def transforms(examples):
   examples["pixel_values"] = [transform(image.convert("L")) for image in examples["image"]]
   del examples["image"]

   return examples

transformed_dataset = dataset.with_transform(transforms).remove_columns("label")

# create dataloader
dataloader = DataLoader(transformed_dataset["train"], batch_size=batch_size, shuffle=True)

In [None]:
batch = next(iter(dataloader))
print(batch.keys())

### Sampling

The code below implements this.

In [None]:
@torch.no_grad()
def p_sample(model, x, t, t_index):
    betas_t = extract(betas, t, x.shape)
    sqrt_one_minus_alphas_cumprod_t = extract(
        sqrt_one_minus_alphas_cumprod, t, x.shape
    )
    sqrt_recip_alphas_t = extract(sqrt_recip_alphas, t, x.shape)

    # Equation 11 in the paper
    # Use our model (noise predictor) to predict the mean
    model_mean = sqrt_recip_alphas_t * (
        x - betas_t * model(x, t) / sqrt_one_minus_alphas_cumprod_t
    )

    if t_index == 0:
        return model_mean
    else:
        posterior_variance_t = extract(posterior_variance, t, x.shape)
        noise = torch.randn_like(x)
        # Algorithm 2 line 4:
        return model_mean + torch.sqrt(posterior_variance_t) * noise

# Algorithm 2 but save all images:
@torch.no_grad()
def p_sample_loop(model, shape):
    device = next(model.parameters()).device

    b = shape[0]
    # start from pure noise (for each example in the batch)
    img = torch.randn(shape, device=device)
    imgs = []

    for i in tqdm(reversed(range(0, timesteps)), desc='sampling loop time step', total=timesteps):
        img = p_sample(model, img, torch.full((b,), i, device=device, dtype=torch.long), i)
        imgs.append(img.cpu().numpy())
    return imgs

@torch.no_grad()
def sample(model, image_size, batch_size=16, channels=3):
    return p_sample_loop(model, shape=(batch_size, channels, image_size, image_size))

In [None]:
from pathlib import Path

def num_to_groups(num, divisor):
    groups = num // divisor
    remainder = num % divisor
    arr = [divisor] * groups
    if remainder > 0:
        arr.append(remainder)
    return arr

results_folder = Path("./results")
results_folder.mkdir(exist_ok = True)
save_and_sample_every = 1000

In [None]:
from torch.optim import Adam

device = "cuda" if torch.cuda.is_available() else "cpu"

model = Unet(
    dim=image_size,
    channels=channels,
    dim_mults=(1, 2, 4,)
)
model.to(device)

optimizer = Adam(model.parameters(), lr=1e-3)

In [None]:
from torchvision.utils import save_image

epochs = 5

for epoch in range(epochs):
    for step, batch in enumerate(dataloader):
      optimizer.zero_grad()

      batch_size = batch["pixel_values"].shape[0]
      batch = batch["pixel_values"].to(device)

      # Algorithm 1 line 3: sample t uniformally for every example in the batch
      t = torch.randint(0, timesteps, (batch_size,), device=device).long()

      loss = p_losses(model, batch, t, loss_type="huber")

      if step % 100 == 0:
        print("Loss:", loss.item())

      loss.backward()
      optimizer.step()

      # save generated images
      if step != 0 and step % save_and_sample_every == 0:
        milestone = step // save_and_sample_every
        batches = num_to_groups(4, batch_size)
        all_images_list = list(map(lambda n: sample(model, batch_size=n, channels=channels), batches))
        all_images = torch.cat(all_images_list, dim=0)
        all_images = (all_images + 1) * 0.5
        save_image(all_images, str(results_folder / f'sample-{milestone}.png'), nrow = 6)

### Inference

In [None]:
batch_size=64
samples = sample(model, image_size=image_size, batch_size=batch_size, channels=channels)

In [None]:
# show a random one
random_index = np.random.choice(batch_size)
plt.imshow(samples[-1][random_index].reshape(image_size, image_size, channels), cmap="gray")

In [None]:
import matplotlib.animation as animation
from IPython.display import Image

random_index = np.random.choice(batch_size)

fig = plt.figure()
ims = []
for i in range(timesteps):
    im = plt.imshow(samples[i][random_index].reshape(image_size, image_size, channels), cmap="gray", animated=True)
    ims.append([im])

animate = animation.ArtistAnimation(fig, ims, interval=50, blit=True, repeat_delay=1000)
animate.save('diffusion.gif')
plt.show()
Image(open('diffusion.gif','rb').read())

##2. Huggingface Diffusers

### Overview

Without delving too deeply into the details, the model is typically not trained to directly predict a slightly less noisy image. Instead, it is trained to predict the "noise residual," which represents the difference between a less noisy image and the input image (as in the diffusion model called "DDPM"), or alternatively, the gradient between two time steps (as in the diffusion model called "Score VE").

To perform the denoising process, a specific noise scheduling algorithm is required. This algorithm is used to "wrap" the model, defining the number of diffusion steps needed for inference and determining how to compute a less noisy image from the model's output. This is where the various schedulers from the diffusers library are utilized.

Lastly, a **pipeline** combines a **model** and a **scheduler**, simplifying the process for end-users to execute a complete denoising loop. The focus will begin with pipelines, exploring their implementation, before examining models and schedulers in greater detail.

### Install `diffusers`

In [None]:
!pip install diffusers
from IPython import display
display.clear_output()

### Pipelines

The process is initiated with the import of a pipeline. The google/ddpm-celebahq-256 model, which was collaboratively developed by Google and U.C. Berkeley, is utilized. This model follows the DDPM algorithm and has been trained on a dataset of celebrity images.

The `DDPMPipeline` is imported to enable inference with minimal code. The `from_pretrained()` method is then used to download the model and its configuration from [the Hugging Face Hub](https://huggingface.co/google/ddpm-celebahq-256), a repository hosting over 60,000 community-contributed models.

In [None]:
from diffusers import DDPMPipeline
image_pipe = DDPMPipeline.from_pretrained("google/ddpm-celebahq-256")
image_pipe.to("cuda")
display.clear_output()

To generate an image, the pipeline is run without requiring any input. A random initial noise sample is automatically generated, and the diffusion process is iterated. A dictionary containing the generated `sample` of interest is returned as output. On Google Colab, this process typically requires 2–3 minutes.

In [None]:
images = image_pipe().images
images[0]

Let’s explore the components that make up the pipeline:

In [None]:
image_pipe

Inside the pipeline, a scheduler and a U-Net model can be observed. A closer examination of these components and the operations performed by the pipeline behind the scenes is provided.

### Models

Instances of the model class are defined as neural networks that take a noisy `sample` and a `timestep` as inputs to predict a less noisy output sample. A pre-trained model is loaded and explored to gain an understanding of the model's API.

An unconditional image generation model of type `UNet2DModel` is loaded, and another checkpoint trained on church images, [`google/ddpm-ema-cat-256`](https://huggingface.co/google/ddpm-ema-cat-256), is examined.

Similar to the pipeline class, the model configuration and weights can be loaded with a single line using the `from_pretrained()` method, which may already be familiar from the `transformers` library.

In [None]:
from diffusers import UNet2DModel
repo_id = "google/ddpm-ema-cat-256"
model = UNet2DModel.from_pretrained(repo_id)
display.clear_output()

Now, let’s examine the model’s configuration. The configuration is accessible through the `config` attribute and displays all the essential parameters required to define the model architecture (and nothing more).

In [None]:
model.config


The model configuration is presented as a frozen dictionary. This design ensures that the configuration is only used to define the model architecture during instantiation and is not modified for any attributes during inference.

Several key configuration parameters are:

* `sample_size`: Specifies the height and width dimensions of the input sample.
* `in_channels`: Indicates the number of input channels in the input sample.
* `down_block_types` and `up_block_types`: Define the types of downsampling and upsampling blocks used to construct the UNet architecture, as depicted earlier in the notebook.
* `block_out_channels`: Specifies the number of output channels for the downsampling blocks, which are also used in reversed order for the input channels of the upsampling blocks.
* `layers_per_block`: Defines the number of ResNet blocks included in each UNet block.

With an understanding of the UNet configuration, the same model architecture can be instantiated with random weights. To achieve this, the configuration can be passed as an unpacked dictionary to the `UNet2DModel` class.

In [None]:
model_random = UNet2DModel(**model.config)

The statement above indicates that a randomly initialized model has been created with the same configuration as the previous one.

To save the model that has just been created, the `save_pretrained()` method can be used. This method saves both the model's weights and its configuration in the specified folder.









In [None]:
model_random.save_pretrained("my_model")

In [None]:
!ls my_model

`diffusion_pytorch_model.bin` is a binary PyTorch file that stores the model weights and `config.json` stores the model's configuration.

If you want to reuse the model, you can simply use the `from_pretrained()` method again, as it loads local checkpoints as well as those present on the Hub.

In [None]:
model_random = UNet2DModel.from_pretrained("my_model")

Returning to the trained model, the process of using it for inference can now be explored. To begin, a random Gaussian sample is required with the shape of an image (`batch_size` \\(\times\\) `in_channels` \\(\times\\) `sample_size` \\(\times\\) `sample_size`). The batch axis represents the model's ability to process multiple random noise samples simultaneously. The `channel` axis accounts for the multiple channels in each sample (e.g., red, green, blue for RGB images). Finally, the `sample_size` corresponds to the height and width of the image.

In [None]:
import torch

torch.manual_seed(0)

noisy_sample = torch.randn(
    1, model.config.in_channels, model.config.sample_size, model.config.sample_size
)
noisy_sample.shape

### Inference

The noisy sample can be passed through the model along with a `timestep`. The timestep is crucial as it informs the model about "how noisy" the input image is—more noisy at the beginning of the process and less noisy toward the end—enabling the model to determine its position in the diffusion process.

As discussed in the introduction, the model predicts either the slightly less noisy image, the difference between the slightly less noisy image and the input image, or another related quantity. It is essential to review [`google/ddpm-ema-cat-256`](https://huggingface.co/google/ddpm-ema-cat-256) carefully to understand what the model has been trained to predict. In this specific case, the model predicts the noise residual, which is the difference between the slightly less noisy image and the input image.

In [None]:
with torch.no_grad():
    noisy_residual = model(sample=noisy_sample, timestep=2).sample

The predicted `noisy_residual` has the exact same shape as the input and we use it to compute a slightly less noised image. Let's confirm the output shapes match:

In [None]:
noisy_residual.shape

### Summary

**models**, such as `UNet2DModel` are parameterized neural networks trained to *predict* a slightly less noisy image or residual. They are defined by their `.config` and can be loaded from the Hub as well as saved and loaded locally. The next step is learning how to combine this **model** with the correct **scheduler** to be able to actually generate images.

### Schedulers

**Schedulers** are algorithms encapsulated within Python classes. They are used to define the noise schedule applied to the model during training and to compute the slightly less noisy sample based on the model output (in this case, the `noisy_residual`). This notebook focuses exclusively on the use of scheduler classes for inference. For details on using schedulers during training, another notebook can be referenced.

It should be emphasized that, unlike models, which contain trainable weights, schedulers are typically parameter-free (meaning they do not possess trainable weights). Instead, they define the algorithm for computing the slightly less noisy sample. As a result, schedulers do not inherit from `torch.nn.Module` but, like models, are instantiated using a configuration. To download a scheduler configuration from the Hub, the `from_config()` method can be used to load the configuration and instantiate a scheduler.

In [None]:
from diffusers import DDPMScheduler

scheduler = DDPMScheduler.from_config(repo_id)
display.clear_output()

In [None]:
scheduler.config

Here are the most important parameters:
- `num_train_timesteps` defines the length of the denoising process, e.g. how many timesteps are need to process random gaussian noise to a data sample.
- `beta_schedule` define the type of noise schedule that shall be used for inference and training
- `beta_start` and `beta_end` define the smallest noise value and highest noise value of the schedule.

Like the *models*, *schedulers* can be saved and loaded with `save_config()` and `from_config()`.


In [None]:
scheduler.save_config("my_scheduler")
new_scheduler = DDPMScheduler.from_config("my_scheduler")

All schedulers provide one or multiple `step()` methods that can be used to compute the slightly less noisy image. The `step()` method may vary from one scheduler to another, but normally expects at least the model output, the `timestep` and the current `noisy_sample`. Note that the `step()` method is somewhat of a black box function that "just works".

In [None]:
less_noisy_sample = scheduler.step(
    model_output=noisy_residual, timestep=2, sample=noisy_sample
).prev_sample
less_noisy_sample.shape

In [None]:
import PIL.Image
import numpy as np
from IPython.display import display


def display_sample(sample, i):
    image_processed = sample.cpu().permute(0, 2, 3, 1)
    image_processed = (image_processed + 1.0) * 127.5
    image_processed = image_processed.numpy().astype(np.uint8)

    image_pil = PIL.Image.fromarray(image_processed[0])
    display(f"Image at step {i}")
    display(image_pil)

In [None]:
model.to("cuda")
noisy_sample = noisy_sample.to("cuda")

It is now time to define the denoising loop, which is straightforward for DDPM:

1. The residual of the less noisy sample is predicted using the model.
2. The less noisy sample is computed using the scheduler.

Additionally, progress is displayed at every 50th step.

It is important to note that the loop iterates over `scheduler.timesteps`, a tensor that defines the sequence of timesteps for the denoising process. Typically, the process proceeds in decreasing order of timesteps, starting from the total number of timesteps (e.g., 1000) and ending at 0. Depending on the GPU, this process may take up to a minute—enough time to reflect on what has been learned so far while watching a church being constructed from pure noise.


In [None]:
import tqdm

sample = noisy_sample

for i, t in enumerate(tqdm.tqdm(scheduler.timesteps)):
  # 1. predict noise residual
  with torch.no_grad():
      residual = model(sample, t).sample

  # 2. compute less noisy image and set x_t -> x_t-1
  sample = scheduler.step(residual, t, sample).prev_sample

  # 3. optionally look at image
  if (i + 1) % 100 == 0:
      display_sample(sample, i + 1)

It can be observed that it takes a significant amount of time—approximately 800 steps—to begin seeing a somewhat meaningful shape. While the quality of the generated image is quite impressive, you might want to consider ways to speed up the image generation process.