# Diffusion Models (DDPM)
Notebook created by [Pol Caselles Rico](https://www.linkedin.com/in/pcaselles/) for the Postgraduate course in artificial intelligence with deep learning in UPC School (2023). Minor contributions by [Laia Tarrés](https://www.linkedin.com/in/laia-tarres-9a5369138/) during 2023.

In this notebook you will learn about Diffusion Models by implementing a DDPM to generate images from noise.

In [None]:
import numpy as np
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision.utils import make_grid
from torch.utils.data import DataLoader, Dataset
import math
import tqdm
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torch import nn
from typing import Dict, List, Tuple
import tqdm
from torchvision.datasets import video_utils

In [None]:
seed = 22
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(seed)

# Understanding Diffusion Models

A diffusion model transforms noise sampled from a known distribution into a data sample. The network is trained to gradually remove noise, starting from an initial state of pure noise.

<p align="center">
    <img src="https://drive.google.com/uc?id=12tpb3-3KwP4IVaRh7iQTOjPDc8AeJl3x" width="300" />
</p>

Denoising diffusion models involve two essential processes:

* **Forward Diffusion Process:** This process incrementally introduces noise to the input data.
* **Reverse Denoising Process:** The model learns to generate data by effectively denoising the input.

For the sake of simplicity, we often choose to model the noise as isotropic Gaussians. It's important to recall that the sum of Gaussians remains a Gaussian.


<p align="center">
    <img src="https://drive.google.com/uc?id=1t5dUyJwgy2ZpDAqHXw7GhUAp2FE5BWHA" width="600" />
</p>


Transitioning from \\(\mathbf{x}_{t-1}\\) to \\(\mathbf{x}_{t}\\) is straightforward, since it involves the addition of Gaussian noise computed in closed form. However, the challenge is in the reverse direction: learning to distinguish between noise and the original structure requires an understanding of the underlying data distribution.

To illustrate this point, consider the example of modeling faces. When presented with a noised image, identifying the modified pixels necessitates knowledge of facial features.

Given the inherent difficulty, we use the capabilities of deep learning models to discern this underlying structure. The core concept involves taking a dataset of ground truth (GT) images, applying the forward diffusion process (adding noise), and self-supervising our model to effectively reverse this process. For clarity, we specify the number of steps (or states) \\(T\\) in our diffusion process. The workflow involves:


1.   sampling a noised image,
2.   forwarding it to our denoiser model,
3.   removing the noise,
3.   and repeating steps (2) and (3) until arriving at the final image.



Conceptually, we model this intricate process as a Markov process, emphasizing its dependency solely on the current state. This implies that predicting the applied noise relies only on the information within the noised image.

Given the data-intensive nature of diffusion models, we opt to train it on the MNIST dataset for simplicity:


In [None]:
device = torch.device('cuda')

In [None]:
# Download and dataset preparation
transforms = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.5],[0.5])
    ])

train_data: Dataset = torchvision.datasets.MNIST(
    root='./content/data/',
    train= True,
    transform=transforms,
    download= True
  )

test_data: Dataset = torchvision.datasets.MNIST(
    root='./content/data/',
    train= False,
    transform=None,
    download= True,

  )

# Forward Diffusion and Variance Schedules

The forward diffusion process introduces noise to an image sampled from the real distribution over a defined number of time steps $T$. This noise addition is orchestrated through a variance schedule. Thus, starting from \\(\mathbf{x}_0\\), the progression unfolds as
\\(\mathbf{x}_1,  ..., \mathbf{x}_t, ..., \mathbf{x}_T\\)
 , culminating with \\(\mathbf{x}_T\\) representing pure Gaussian noise when the schedule is appropriately configured.

To streamline computation and prevent the need for continuous reparameterization of betas during processing, we compute them once. These computed values are referred to as "ddpm_schedules."

In [None]:
def ddpm_schedules(betas: torch.tensor) -> Dict[str,torch.tensor]:
    """
    Returns pre-computed schedules for DDPM sampling, training process.
    """
    alphas = 1. - betas
    alphas_cumprod = torch.cumprod(alphas, dim=-1)
    sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)
    sqrt_one_minus_alphas_cumprod = torch.sqrt(1.0 - alphas_cumprod)

    return {
        "alphas": alphas,
        "betas": betas,
        "alphas_cumprod": alphas_cumprod,
        "sqrt_alphas_cumprod": sqrt_alphas_cumprod,
        "sqrt_one_minus_alphas_cumprod": sqrt_one_minus_alphas_cumprod,
    }

The beta schedulers are responsible for defining the amout of noise applied at each timestep \\(t\\) during the forward process.

The quantity of noise introduced at each step influences the generation process. The original authors of [DDPM](https://proceedings.neurips.cc/paper/2020/file/4c5bcfec8584af0d967f1ab10179ca4b-Paper.pdf) utilized a linear schedule for this purpose. However, in this lab, we will also implement the cosine beta scheduler to explore and compare the differences between the two approaches.




In [None]:
def linear_beta_schedule(
      T: int = 500,
      beta1: float = 1e-4,
      beta2: float = 0.02
      ) -> torch.tensor:
    """
    linear schedule, proposed in original ddpm paper
    """
    timesteps = T
    scale = 1000 / timesteps
    beta_start = scale * 0.0001
    beta_end = scale * 0.02
    betas = torch.linspace(
        beta_start,
        beta_end,
        timesteps,
        dtype=torch.float32)
    return betas


def cosine_beta_schedule(
      T: int,
      s: float = 0.008
      ) -> torch.tensor:
    """
    cosine schedule
    as proposed in https://openreview.net/forum?id=-NEXDKk8gZ
    """
    steps=torch.linspace(0, T, steps=T+1, dtype=torch.float32)
    f_t=torch.cos(((steps/T+s)/(1.0+s))*math.pi*0.5)**2
    betas=torch.clip(1.0-f_t[1:]/f_t[:T],0.0,0.999)
    return betas

## Forward Diffusion Process

To maintain simplicity, we'll skip the majority of the mathematical formulation.

The `ForwardDiffusionProcess` class is responsible for generating noisy samples based on the true signal \\(\mathbf{x}_0\\). This process is deterministic, meaning there are no parameters being trained at this stage. However, we will use this class to train our denoiser model, our network. Typically, for a given true sample \\(\mathbf{x}_0\\), we uniformly sample a timestep \\(t\\) and apply Gaussian noise.

Note:
Remember that a normal distribution (also known as a Gaussian distribution) is defined by two parameters: a mean \\(\mu\\) and a variance \\(\sigma^2 \geq 0\\). Essentially, each new (slightly noisier) image at timestep \\(t\\) is drawn from a conditional Gaussian distribution with \\(\mathbf{\mu}_t = \sqrt{1 - \beta_t} \mathbf{x}_{t-1}\\) and \\(\sigma^2_t = \beta_t\\). This is achieved by sampling \\(\mathbf{\epsilon} \sim \mathbf{N}(\mathbf{0}, \mathbf{I})\\) and then setting \\(\mathbf{x}_t = \sqrt{1 - \beta_t} \mathbf{x}_{t-1} +  \sqrt{\beta_t} \mathbf{\epsilon}\\).


**Exercise 1:**
Your task is to complete the forward method to sample timesteps between 0 and \\(T\\) from a Uniform distribution.

In [None]:
class ForwardDiffusionProcess(nn.Module):
    def __init__(
          self,
          ddpm_schedules: Dict[str,torch.tensor],
          device: torch.device) -> None:

        super().__init__()

        # Register buffers with ddpm schedules
        for k, v in ddpm_schedules.items():
            self.register_buffer(k, v)

        self.n_T = self.alphas.shape[0]
        self.device = device

    def apply_noise(self, x, noise, ts):
        # This is the x_t, which is sqrt(alphabar) x_0 + sqrt(1-alphabar) * eps

        a = self.sqrt_alphas_cumprod.gather(-1, ts).reshape(x.shape[0], 1, 1, 1)
        b = self.sqrt_one_minus_alphas_cumprod.gather(-1, ts).reshape(x.shape[0], 1, 1, 1)
        x_t = a * x + b * noise

        return x_t

    def forward(self, x):
        """
        this method is used in training, so samples t and noise randomly
        """

        # t ~ Uniform(0, n_T)
        # TODO: sample uniformly a timestep
        _ts = torch.randint(...,...,...).to(self.device)

        # eps ~ N(0, 1)
        # TODO: get the random noise
        noise = torch...

        x_t = self.apply_noise(x, noise, _ts)

        return x_t, noise, _ts

To start with, we will establish the linear and cosine schedules for \\(T=100\\) time steps. Additionally, we will define several essential variables derived from \\(\beta_t\\), including the cumulative product of variances \\(\bar{\alpha}_t\\). Each of the following variables is represented as a 1-dimensional tensor, storing values from \\(t\\) to \\(T\\).

In [None]:
# Define number of steps
n_T=100

# Define beta schedulers
betas_linear = linear_beta_schedule(T=n_T)
betas_cosine = cosine_beta_schedule(T=n_T)


# Generate ddpm schedulers based on the given betas
ddpm_linear = ddpm_schedules(betas_linear)
ddpm_cosine = ddpm_schedules(betas_cosine)


# Create the ForwardDiffusionProcess objects
forward_diffusion_process_linear = ForwardDiffusionProcess(
    ddpm_schedules=ddpm_linear,
    device=device
).to(device) # Linear

forward_diffusion_process_cosine = ForwardDiffusionProcess(
    ddpm_schedules=ddpm_cosine,
    device=device
).to(device) # Cosine

The beta parameter plays a crucial role in regulating the extent of noise introduced at each timestep relative to the original sample in diffusion models. In the subsequent plot, we illustrate the noise levels applied at each timestep for different schedulers.

The Y-axis denotes the ratio of the original image \\(\mathbf{x}_0\\), while the X-axis represents the step \\(t\\) in the diffusion process. Notably, the linear scheduler exhibits a more pronounced drop at the onset, whereas the cosine scheduler preserves a higher fidelity to the true signal across all timesteps.

The precise control of noise levels during training directly influences the learning trajectory of the model. When a substantial number of steps deviate significantly from the target distribution, the network tends to counteract Gaussian noise, potentially leading to inaccuracies.

In [None]:
plt.figure(figsize = (8, 4))
plt.plot(forward_diffusion_process_linear.alphas_cumprod.cpu().numpy(), label='linear')
plt.plot(forward_diffusion_process_cosine.alphas_cumprod.cpu().numpy(), label='cosine')
plt.legend(loc="upper right")

plt.title("Linear combination between a sample and noise")
plt.xlabel("Timesteps")
plt.ylabel("Amount of original image")

To visually ilustrate the differences, we will now pick a sample from MNIST dataset, and we will apply both of our defined schedulers at different time steps:

In [None]:
# Pick a MNIST sample
image = train_data[0][0].clone().unsqueeze(0).to(device) # Shape: [1, 1, 28, 28]
plt.axis('off')
plt.imshow(image.permute(0,2,3,1)[0].cpu().numpy(), cmap='Greys')

In [None]:
# Return Forward Diffusion Process
def get_image_from_linear(noise: torch.tensor, image: torch.tensor) -> torch.tensor:
    image_linear = forward_diffusion_process_linear.apply_noise(
      image,
      noise,
      torch.tensor(t).to(device)
    )
    return image_linear


def get_image_from_cosine(noise: torch.tensor, image: torch.tensor) -> torch.tensor:
    image_cosine = forward_diffusion_process_cosine.apply_noise(
        image,
        noise,
        torch.tensor(t).to(device)
    )
    return image_cosine

In [None]:
# noise the image over timesteps using the two schedulers
image_linear = image.clone()
image_cosine = image.clone()

images_linear = []
images_cosine = []

for t in tqdm.tqdm(range(n_T)):
    noise = torch.randn_like(image_linear)

    # Add noise from linear scheduler
    image_linear = get_image_from_linear(noise, image_linear)
    images_linear.append(image_linear.detach().cpu())

    # Add noise from cosine scheduler
    image_cosine = get_image_from_cosine(noise, image_cosine)
    images_cosine.append(image_cosine.detach().cpu())

Here, we present the evolution of samples over various timesteps. The top row corresponds to the linear scheduler, while the bottom row corresponds to the cosine scheduler:

In [None]:
plt.figure(figsize = (16, 8))
plt.axis('off')
indices = [0, 5,10,15,20,25, 30, 35, 40, 45, 50]
img_row_1 = torch.cat([images_linear[i] for i in indices], dim=3).permute(0,2,3,1)[0] # Linear beta scheduler
img_row_2 = torch.cat([images_cosine[i] for i in indices], dim=3).permute(0,2,3,1)[0] # Cosine beta scheduler
img_row_1_2 = torch.cat([img_row_1, img_row_2])
np.clip(img_row_1_2, 0, 1)
plt.imshow(img_row_1_2, interpolation='nearest', cmap='Greys')

Notably, the cosine scheduler exhibits the ability to retain recognition of the original image across more distant timesteps compared to the linear scheduler.

The intuition is the following: When we train, if we have a lot of timesteps where there is almost no information of the original image, the network learning a mapping between a random noise and another random noise is irrelevant. so we are wasting valuable GPU energy. So it is preferred to use the cosine schedule.

# Defining the Reverse Denoising Process

While the comprehensive understanding of the mathematics behind diffusion models isn't the primary focus of this lab, we do need to define certain methods for training and sampling our model. Here, we introduce two essential methods:

- `ReverseDiffusionProcess()._reverse_diffusion`: This method manages the reverse process for a **single step**.
- `ReverseDiffusionProcess().sample`: This method oversees the **complete reverse process**, guiding the transformation from noise to the final sample.

In [None]:
class ReverseDiffusionProcess(nn.Module):
    def __init__(self, nn_model, ddpm_schedules, device):
        super().__init__()

        for k, v in ddpm_schedules.items():
            self.register_buffer(k, v)

        self.nn_model = nn_model
        self.n_T = self.alphas.shape[0]
        self.device = device

    @torch.no_grad()
    def _reverse_diffusion(self, x_t, t, noise, c=None):
        '''
        p(x_{0}|x_{t}),q(x_{t-1}|x_{0},x_{t})->mean,std

        pred_noise -> pred_x_0 (clip to [-1.0,1.0]) -> pred_mean and pred_std
        '''
        pred=self.nn_model(x_t.float(),t.float(),c)
        alpha_t=self.alphas.gather(-1,t).reshape(x_t.shape[0],1,1,1)
        alpha_t_cumprod=self.alphas_cumprod.gather(-1,t).reshape(x_t.shape[0],1,1,1)
        beta_t=self.betas.gather(-1,t).reshape(x_t.shape[0],1,1,1)

        x_0_pred=torch.sqrt(1. / alpha_t_cumprod)*x_t-torch.sqrt(1. / alpha_t_cumprod - 1.)*pred
        x_0_pred.clamp_(-1., 1.)

        if t.min()>0:
            alpha_t_cumprod_prev=self.alphas_cumprod.gather(-1,t-1).reshape(x_t.shape[0],1,1,1)
            mean= (beta_t * torch.sqrt(alpha_t_cumprod_prev) / (1. - alpha_t_cumprod))*x_0_pred +\
                 ((1. - alpha_t_cumprod_prev) * torch.sqrt(alpha_t) / (1. - alpha_t_cumprod))*x_t

            std=torch.sqrt(beta_t*(1.-alpha_t_cumprod_prev)/(1.-alpha_t_cumprod))
        else:
            mean=(beta_t / (1. - alpha_t_cumprod))*x_0_pred #alpha_t_cumprod_prev=1 since 0!=1
            std=0.0

        return mean+std*noise

    @torch.no_grad()
    def sample(self, n_sample, size, device, c=None):

        x_t = torch.randn(n_sample, *size).to(device)  # x_T ~ N(0, 1), sample initial noise
        x_ts = [x_t.cpu()]
        for i in tqdm.tqdm(range(self.n_T - 1, -1, -1), desc="Sampling"):

            noise = torch.randn_like(x_t).to(device)
            t = torch.tensor([i for _ in range(n_sample)]).to(device)
            x_t = self._reverse_diffusion(x_t, t, noise, c)
            x_ts.append(x_t.cpu())

        x_t = (x_t + 1.) / 2. #[-1,1] to [0,1]

        return x_t, x_ts

##### **Excercise 2**:

Your task is to complete the forward method of the DDPM class to obtain the ground truth (GT) sample at timestep t and the corresponding noise.

The denoiser model should be conditioned on \\(\mathbf{x}_t\\) and timestep \\(t\\).

In [None]:
class DDPM(nn.Module):
    def __init__(
          self,
          forward_diffusion_process: ForwardDiffusionProcess,
          reverse_diffusion_process: ReverseDiffusionProcess,
          nn_model
          ) -> None:
        super(DDPM, self).__init__()

        self.forward_diffusion_process = forward_diffusion_process
        self.reverse_diffusion_process = reverse_diffusion_process
        self.nn_model = nn_model # denoiser model

    def forward(self, x, c=None):
        """
        this method is used in training, so samples t and noise randomly
        """
        # TODO: Call the the forward_diffusion_proces, that given a sample, it returns a sampled timestep, the initial noise and the noisy sample
        x_t, noise, _ts = ...
        # TODO: Call the denoiser model, so given the noisy sample and the timestep, it returns the predicted initial noise
        noise_pred = ...(..., ..., c=c)

        return noise, noise_pred

    def sample(self, n_sample, size, device, c=None):
        """
        this method is used for inference, and performs the whole process from noise to the final sample, x_i
        """
        x_i, x_i_store = self.reverse_diffusion_process.sample(
            n_sample=n_sample,
            size=size,
            device=device,
            c=c
        )

        return x_i, x_i_store

# The neural network (denoiser)

For the neural network to effectively denoise an image at a specific time step, it must generate a prediction for the noise. It's important to note that the predicted noise is a tensor with the same size/resolution as the input image. Various approaches exist for predicting noise, and in this lab, we opt to predict the mean while maintaining a fixed variance.

To achieve this, we require a model capable of performing image-to-image transformations. For this purpose, we will utilize an adapted version of the UNET architecture.

<p align="center">
    <img src="https://drive.google.com/uc?id=1_Hej_VTgdUWGsxxIuyZACCGjpbCGIUi6" width="400" />
</p>

The adapted UNET architecture consists of multiple stages, downsampling, upsampling, self-attention layers and residual connections.

In order to predict the noise injected at each timestep, it is crucial to condition the model on the timestep \\(\mathbf{N}_T\\). To achieve this, various methods could be employed. However, we have chosen the following approach: We predict two distinct latent representations for each timestep. These representations are then added to the latent features of the model at each of the upsampling stages.
This approach is similar to what is done in the Transformer architecture.



In [None]:
class SelfAttention(nn.Module):
    '''
    Similar to the Transformer Architecture, this network has self-attention blocks.
    '''
    def __init__(self, n_channels):
      super().__init__()
      n_channels_out = n_channels//4
      self.query = nn.Linear(n_channels, n_channels_out, bias=False)
      self.key = nn.Linear(n_channels, n_channels_out, bias=False)
      self.value = nn.Linear(n_channels, n_channels, bias=False)
      self.gamma = nn.Parameter(torch.tensor([0.0]))

    def forward(self, x):
      B, C, H, W = x.shape

      x = x.permute(0, 2, 3, 1).view(B, H * W, C) # Shape: [B, H*W, C]

      q = self.query(x) # [B, H*W, C]
      k = self.key(x) # [B, H*W, C]
      v = self.value(x) # [B, H*W, C]

      attn = F.softmax(torch.bmm(q, k.transpose(1,2)), dim=1) # Shape: [B, H*W, H*W]
      out = self.gamma * torch.bmm(attn, v) + x # Shape: [B, H*W, C]

      out = out.permute(0, 2, 1).view(B, C, H, W).contiguous()

      return out


class ResidualConvBlock(nn.Module):
    '''
    The following are resnet block, which consist of convolutional layers, followed by batch normalization and residual connections.
    '''
    def __init__(
        self, in_channels: int, out_channels: int, is_res: bool = False
    ) -> None:
        super().__init__()
        '''
        standard ResNet style convolutional block
        '''
        self.same_channels = in_channels==out_channels
        self.is_res = is_res
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1),
            nn.BatchNorm2d(out_channels),
            nn.GELU(),
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(out_channels, out_channels, 3, 1, 1),
            nn.BatchNorm2d(out_channels),
            nn.GELU(),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        if self.is_res:
            x1 = self.conv1(x)
            x2 = self.conv2(x1)
            # this adds on correct residual in case channels have increased
            if self.same_channels:
                out = x + x2
            else:
                out = x1 + x2
            return out / 1.414
        else:
            x1 = self.conv1(x)
            x2 = self.conv2(x1)
            return x2


class UnetDown(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(UnetDown, self).__init__()
        '''
        process and downscale the image feature maps
        '''
        layers = [
            ResidualConvBlock(in_channels, in_channels, True),
            ResidualConvBlock(in_channels, in_channels, True),
            ResidualConvBlock(in_channels, out_channels),
            nn.MaxPool2d(2)
        ]
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)


class UnetUp(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(UnetUp, self).__init__()
        '''
        process and upscale the image feature maps
        '''
        layers = [
            nn.ConvTranspose2d(in_channels, out_channels, 2, 2),
            ResidualConvBlock(out_channels, out_channels, True),
            ResidualConvBlock(out_channels, out_channels, True),
        ]
        self.model = nn.Sequential(*layers)

    def forward(self, x, skip):
        x = torch.cat((x, skip), 1)
        x = self.model(x)
        return x


class EmbedFC(nn.Module):
    def __init__(self, input_dim, emb_dim):
        super(EmbedFC, self).__init__()
        '''
        generic one layer FC NN for embedding things
        '''
        self.input_dim = input_dim
        layers = [
            nn.Linear(input_dim, emb_dim),
            nn.GELU(),
            nn.Linear(emb_dim, emb_dim),
        ]
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        x = x.view(-1, self.input_dim)
        return self.model(x)


##### **Excercise 3**:

Your task is to complete the forward method of the UNET architecture, ensuring it incorporates both timestep and class conditioning.

It's noteworthy that there are two Multi-Layer Perceptrons (MLPs), namely timeembed1 and timeembed2, responsible for generating two time embeddings. Additionally, two other MLPs, classembed1 and classembed2, are employed to handle class conditioning information.

In [None]:
class Unet(nn.Module):
    def __init__(self, in_channels, n_feat = 256, num_classes: int = 10):
        super(Unet, self).__init__()

        self.in_channels = in_channels
        self.n_feat = n_feat

        self.init_conv = ResidualConvBlock(in_channels, n_feat, is_res=True)

        self.down1 = UnetDown(n_feat, n_feat)
        self.attn1 = SelfAttention(n_feat)

        self.down2 = UnetDown(n_feat, 2 * n_feat)
        self.attn2 = SelfAttention(2 * n_feat)

        self.to_vec = nn.Sequential(nn.AvgPool2d(7), nn.GELU())

        self.timeembed1 = EmbedFC(1, 2*n_feat)
        self.timeembed2 = EmbedFC(1, 1*n_feat)

        self.classembed1 = EmbedFC(num_classes, 2*n_feat)
        self.classembed2 = EmbedFC(num_classes, 1*n_feat)

        self.up0 = nn.Sequential(
            nn.ConvTranspose2d(2 * n_feat, 2 * n_feat, 7, 7),
            nn.GroupNorm(8, 2 * n_feat),
            nn.ReLU(),
        )

        self.up1 = UnetUp(4 * n_feat, n_feat)
        self.attn1up = SelfAttention(n_feat)
        self.up2 = UnetUp(2 * n_feat, n_feat)
        self.attn2up = SelfAttention(n_feat)
        self.out = nn.Sequential(
            nn.Conv2d(2 * n_feat, n_feat, 3, 1, 1),
            nn.GroupNorm(8, n_feat),
            nn.ReLU(),
            nn.Conv2d(n_feat, self.in_channels, 3, 1, 1),
        )
        self.n_classes=10

    def forward(self, x, t, c=None):
        # x is (noisy) image, t is timestep,

        # Downsampling
        x = self.init_conv(x)
        down1 = self.attn1(self.down1(x))
        down2 = self.attn2(self.down2(down1))

        hiddenvec = self.to_vec(down2)

        # get the embeddings corresponding to the time step
        temb1 = self.timeembed1(t).view(-1, self.n_feat * 2, 1, 1)
        temb2 = self.timeembed2(t).view(-1, self.n_feat, 1, 1)

        # class condition embeddings
        if c is not None:
            c = torch.nn.functional.one_hot(c, num_classes=self.n_classes).type(torch.float)
            # TODO: Get the class embeddings
            cemb1 = ...
            cemb2 = ...

        # Upsampling
        up1 = self.up0(hiddenvec)

        if c is not None:
           condition = up1*cemb1 + temb1
        else:
            condition = up1 + temb1
        up2 = self.attn1up(self.up1(condition, down2))


        if c is not None:
           condition = up2*cemb2 + temb2
        else:
            condition = up2 + temb2

        up3 = self.attn2up(self.up2(condition, down1))
        out = self.out(torch.cat((up3, x), 1))
        return out

m = Unet(in_channels=1, n_feat=32)
inp = torch.randn(2,1,28,28) #As a sanity check, we define a random tensor with the same shape as an input image, to make sure that we can forward it to our network
t = torch.tensor([0]).float()
out = m(inp,t)

# Training the neural network (denoiser)

We now train our denoiser model. You can select which beta scheduler to use by commenting out your choice!

In [None]:

# Model parameters
h_params = {
    'n_feat': 64,
    'n_T': 500,
    'lr': 0.001,
    'epochs': 25,
}


# Betas scheduler. Uncomment your choice!

#betas = linear_beta_schedule(beta1=1e-4, beta2=0.02, T=h_params['n_T'])
betas = cosine_beta_schedule(T=h_params['n_T'])


ddpm_schedules_dict = ddpm_schedules(betas=betas)

# Forwards Difussion Process
forward_diffusion_process = ForwardDiffusionProcess(
    ddpm_schedules=ddpm_schedules_dict,
    device=device
).to(device)


# Reverse Diffusion Process
denoiser_model = Unet(in_channels=1, n_feat=h_params['n_feat'])
reverse_diffusion_process = ReverseDiffusionProcess(
    nn_model=denoiser_model,
    ddpm_schedules=ddpm_schedules_dict,
    device=device
).to(device)


# Full Model (Only for training)
model = DDPM(
    forward_diffusion_process=forward_diffusion_process,
    reverse_diffusion_process=reverse_diffusion_process,
    nn_model=denoiser_model,
)

In [None]:
# Create Dataloaders
d = torch.utils.data.Subset(train_data, indices=torch.tensor(list(range(10000))))
train_dataloader = DataLoader(d, batch_size=128, shuffle=True, drop_last=True, num_workers=0)

Our model follows a standard image-to-image training.

For optimization, we employ the AdamW optimizer to update our parameters. The Mean Squared Error (MSE) serves as the objective function for training the UNET.

If results are not satisfactory after 50 epochs, feel free to rerun the subsequent cells until the results align with expectations!

In [None]:
# Script for training the denoiser model
def training_diffusion(
      model: torch.nn.Module,
      train_dataloader,
      epochs: int,
      lr: float,
      plot_every_n: int = 50,
      add_class_condition: bool = False) -> None:
    import time

    optim = torch.optim.AdamW(model.parameters(), lr=lr)
    loss_fn = F.mse_loss

    model.to(device)

    for epoch in range(epochs):
      # Adjunst lr during trianing
      optim.param_groups[0]['lr'] = lr * (1-epoch / epochs)

      for batch_idx, (images, labels) in enumerate(train_dataloader):
        optim.zero_grad()

        images = images.to(device)

        # predict mean noise, we use the forward function of the DDPM model,  which if you remember, returned both the original noise used and the predicted noise
        if add_class_condition:
            labels = labels.to(device)
            noise, noise_pred = model(images.float(), labels.to(torch.int64))
        else:
            noise, noise_pred = model(images.float())

        #MSE loss
        loss = loss_fn(noise, noise_pred, reduction='mean')

        loss.backward()
        optim.step()

        if batch_idx % plot_every_n == 0:
          print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
              epoch, batch_idx * len(images), len(train_dataloader.dataset),
              100. * batch_idx / len(train_dataloader), loss.item()))
      t2 = time.perf_counter()
      torch.cuda.synchronize()

    return model

In [None]:
model = training_diffusion(
      model=model,
      train_dataloader=train_dataloader,
      epochs=h_params['epochs'],
      lr=h_params['lr'],
      plot_every_n=25
)

### Testing the Denoiser Model

The denoiser model has undergone independent training, implying that updates to the network were not based on sampling all timesteps.

During testing, the procedure begins by sampling a noisy image from a Gaussian distribution. The trained denoiser model is then employed iteratively to denoise the image. It's worth noting that, unlike Generative Adversarial Networks (GANs) and Variational Autoencoders (VAEs), the sampling process in Diffusion models tends to be comparatively slower. This is due to the necessity of multiple forward passes for generating a single sample.

We now sample 56 different numbers!

In [None]:
# Test time:

reverse_diffusion_process.eval()
with torch.no_grad():
    x_gen, x_gen_store = reverse_diffusion_process.sample(56, (1, 28, 28), device)

In [None]:
# Final result

img_grid = make_grid(x_gen.detach().cpu())

plt.figure(figsize = (10, 10))
plt.axis('off')
plt.imshow(img_grid.permute(1, 2, 0), interpolation='nearest')

### Visualizing Intermediate Outputs of the Reverse Process

We next show the intermediate outputs of our reverse process. For better accessibility, we've generated a gif/video in which each frame corresponds to the denoised images, transitioning from noise to the desired sample:

In [None]:
# Generate Gif to visualize the reverse diffusion process

%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.animation as animation

fig = plt.figure(figsize=(8, 8))
ims = []
for xx in x_gen_store[::16]:
    iim = make_grid(xx).permute(1, 2, 0)
    iim = torch.clip(iim, 0, 1)
    im = plt.imshow(iim.numpy(), cmap="gray", animated=True)
    plt.axis('off')
    ims.append([im])

animate = animation.ArtistAnimation(fig, ims, interval=100, blit=True, repeat_delay=5000)
animate.save('diffusion_56.gif')
plt.show()

In [None]:
# If the above cell does not show the gif, run this cell:
from IPython.display import Image
Image(open('diffusion_56.gif','rb').read())

In [None]:
Image(open('diffusion_56.gif','rb').read())

# Extra 1: Conditional Generation

Conditional Diffusion Models extend traditional Diffusion Models by enabling the generation process to be conditioned on additional information, incorporating context or auxiliary variables during both training and generation.

In the following cell we train the same model as before, but adding the class information to the denoiser (U-Net) model. This give us the ability to control the generation process to obtain class specific images.

In [None]:
# Model parameters
h_params = {
    'n_feat': 64,
    'n_T': 500,
    'lr': 0.001,
    'epochs': 35,
}

# Betas scheduler. Uncomment your choice!

betas = linear_beta_schedule(beta1=1e-4, beta2=0.02, T=h_params['n_T'])
#betas = cosine_beta_schedule(T=h_params['n_T']) #/100


ddpm_schedules_dict = ddpm_schedules(betas=betas)

# Forwards Difussion Process
forward_diffusion_process = ForwardDiffusionProcess(
    ddpm_schedules=ddpm_schedules_dict,
    device=device
).to(device)


# Reverse Diffusion Process
denoiser_model = Unet(in_channels=1, n_feat=h_params['n_feat'])
reverse_diffusion_process = ReverseDiffusionProcess(
    nn_model=denoiser_model,
    ddpm_schedules=ddpm_schedules_dict,
    device=device
).to(device)


# Full Model (Only for training)
model = DDPM(
    forward_diffusion_process=forward_diffusion_process,
    reverse_diffusion_process=reverse_diffusion_process,
    nn_model=denoiser_model,
)

In [None]:

model = training_diffusion(
      model=model,
      train_dataloader=train_dataloader,
      epochs=h_params['epochs'],
      lr=h_params['lr'],
      plot_every_n=25,
      add_class_condition=True #this is the difference between the uncoditional and conditional
)

In [None]:
def generate_and_show(condition=0) -> None:
    reverse_diffusion_process.eval()
    with torch.no_grad():
        x_gen, x_gen_store = reverse_diffusion_process.sample(
            56,
            (1, 28, 28),
            device,
            c=torch.tensor([condition]*56).to(torch.int64).to(device)
        )

    # Final result
    img_grid = make_grid(x_gen.detach().cpu())

    plt.figure(figsize = (5, 5))
    plt.axis('off')
    plt.imshow(img_grid.permute(1, 2, 0), interpolation='nearest')

In [None]:
generate_and_show(condition=0)

In [None]:
generate_and_show(condition=1)

# Bonus: Generate State-of-the-Art Text-to-Image Samples

Many recent models, including DreamFusion, DreamBooth, StableDiffusion, DALL·E, or ControlNet, are built upon diffusion models. Typically, these models are trained on billions of samples, making it challenging to train them from scratch. Fortunately, some of them can be executed at no cost!

We now present examples demonstrating the generation of images from a text prompt using the [🤗 Diffusers](https://huggingface.co/docs/diffusers/index) library.

In [None]:
!pip install diffusers
from diffusers import AutoPipelineForText2Image

In [None]:
pipeline = AutoPipelineForText2Image.from_pretrained(
	"runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16, variant="fp16"
).to("cuda")

Once we have downloaded the model, we can generate new images by changing the text promt:

In [None]:
promt = "Shiba inu with a black hat, 8k "
image = pipeline(promt).images[0]
image

If you look inside the library, you will find many more examples. Dive deep and surprise yourself with your own generations!

One example specially fun uses image-to-image pipeline, and it allows editing an input image through the information of the text prompt.

In [None]:
from diffusers import AutoPipelineForImage2Image
import torch

pipeline = AutoPipelineForImage2Image.from_pretrained(
    "runwayml/stable-diffusion-v1-5",
    torch_dtype=torch.float16,
    use_safetensors=True,
).to("cuda")

In [None]:
import requests
from PIL import Image
from io import BytesIO
prompt = "a portrait of a dog wearing a pearl earring"

url = "https://upload.wikimedia.org/wikipedia/commons/thumb/0/0f/1665_Girl_with_a_Pearl_Earring.jpg/800px-1665_Girl_with_a_Pearl_Earring.jpg"
response = requests.get(url)
image = Image.open(BytesIO(response.content)).convert("RGB")
image.thumbnail((768, 768))

In [None]:
image = pipeline(prompt, image, num_inference_steps=200, strength=0.75, guidance_scale=10.5).images[0]
image