<a href="https://colab.research.google.com/github/singhsourav0/Diffusion/blob/main/Sampling_Diffusion.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Diffusion Models: A Detailed Look

Diffusion models are a powerful class of generative models that have gained significant traction in recent years for their ability to produce high-quality images and other data. They work by gradually corrupting training data with noise and then learning to reverse this process, effectively going from pure noise to a realistic sample. This approach offers several advantages, including:

**Advantages of Diffusion Models:**

* **High-quality Samples:** Diffusion models excel at generating realistic and diverse images with impressive detail and coherence.
* **Flexibility:** They can be applied to various data modalities, including images, audio, and text.
* **Controllability:** Conditioning mechanisms allow users to guide the generation process by providing additional information like text prompts or class labels.

**The Diffusion Process:**

1. **Forward Diffusion:** Starting with a real data sample, Gaussian noise is iteratively added in small increments over many steps, slowly transforming the image into pure noise.
2. **Reverse Diffusion:** The model learns to reverse this process. It starts with a sample of pure noise and gradually removes the noise, step-by-step, to generate a realistic data sample.

**The Diffusion Model Architecture:**

The core of a diffusion model is typically a U-Net architecture. This network consists of a contracting path (encoder) and an expansive path (decoder) with skip connections between them.

* **Encoder:** Downsamples the input and extracts features at different scales.
* **Decoder:** Upsamples the encoded features and progressively reconstructs the image.
* **Skip Connections:** Provide the decoder with information from corresponding levels of the encoder, helping to preserve spatial details.

**What Can Diffusion Models Generate?**

* **Realistic Images:** From faces and landscapes to objects and artistic creations.
* **Conditional Generation:** Images based on text prompts, class labels, or other conditioning information.
* **Image Editing and Inpainting:** Modify existing images or fill in missing parts.
* **Data Augmentation:** Create new training data for other machine learning models.

**Understanding the Provided Code:**

The provided code snippets define the building blocks of a diffusion model, including:

* **ResidualConvBlock:** This block forms the basis of the U-Net architecture. It consists of two convolutional layers with batch normalization and GELU activation, and optionally incorporates a residual connection for improved training stability.
* **UnetUp:** The upsampling block used in the decoder part of the U-Net. It upsamples the input using a transposed convolution and then applies two ResidualConvBlock layers.
* **UnetDown:** The downsampling block used in the encoder part of the U-Net. It applies two ResidualConvBlock layers and then downsamples using max pooling.
* **EmbedFC:** This block embeds the conditioning information (e.g., text prompts) into a suitable feature vector that can be used by the model.
* **CustomDataset:** A custom PyTorch dataset class for loading and pre-processing image data and corresponding labels.

**Further Steps and Implementation Details:**

The provided code snippets only show a part of the complete diffusion model implementation. Additional components would include:

* **Noise scheduling:** Defining how the noise is added and removed during the forward and reverse diffusion processes.
* **Loss function:** Typically a reconstruction loss that measures the difference between the generated image and the original image.
* **Sampling methods:** Different strategies for sampling from the model during the reverse diffusion process.
* **Training and optimization:** Setting up the training loop and choosing an optimizer.

**Exploring diffusion models further would involve implementing these additional components and training the model on a specific dataset to generate desired outputs. The provided code snippets offer a solid starting point for understanding the core building blocks and mechanisms involved.**


## Diffusion diffusion utilities Code Explained:

This code implements a diffusion model using a U-Net architecture and residual convolutional blocks. Let's break down the code step-by-step:

**1. Imports & Utility Functions:**

*   The code starts by importing necessary libraries like `torch`, `torchvision`, and `matplotlib`.
*   Utility functions like `unorm` and `norm_all` handle normalization of image data, while `gen_tst_context` creates context vectors for testing.
*   Plotting functions `plot_grid` and `plot_sample` visualize generated images and create animations.

**2. ResidualConvBlock:**

*   This class defines a residual convolutional block used in the U-Net.
*   It consists of two convolutional layers with batch normalization and GELU activation.
*   It can optionally use a residual connection, adding the input to the output for improved gradient flow.

**3. UnetUp & UnetDown:**

*   These classes define the upsampling and downsampling blocks of the U-Net.
*   `UnetUp` uses transposed convolution for upsampling and combines it with skip connections from the corresponding downsampling layer.
*   `UnetDown` uses residual convolutional blocks and max pooling for downsampling.

**4. EmbedFC:**

*   This class defines a simple feed-forward network for embedding input data into a lower-dimensional space.

**5. CustomDataset:**

*   This class creates a custom PyTorch dataset from provided image and label files.
*   It applies transformations to the images and allows for handling null context situations.

**6. Data Transformations:**

*   The `transform` variable defines a set of transformations to be applied to the images during training. This includes converting them to tensors and normalizing them to the range of [-1, 1].

**Overall Flow:**

This code provides the building blocks for a diffusion model. To actually train and use the model, you would need additional code for:

*   Defining the overall U-Net architecture using the defined blocks.
*   Implementing the forward diffusion and reverse diffusion processes.
*   Defining a loss function and optimizer.
*   Training the model on your data.
*   Generating samples using the trained model.



##Explanation: Diffusion Model Sampling

This code implements the sampling process of a diffusion model, specifically a ContextUnet model, trained on image data. Let's break down the main components step-by-step:

**1. Imports and Class Definitions:**

- The code imports necessary libraries like PyTorch, Torchvision, and matplotlib for model definition, image processing, and visualization.
- It defines a `ContextUnet` class, which is a U-Net architecture modified to incorporate context information (e.g., class labels) and timestep embeddings.

**2. Model and Hyperparameters:**

- Hyperparameters like timesteps, beta values (for noise scheduling), and network dimensions are defined.
- A noise schedule is constructed based on the beta values.
- A `ContextUnet` model is created and loaded with pre-trained weights.

**3. Sampling Functions:**

- **`sample_ddpm`**:
    - This function performs the correct sampling process.
    - It starts with pure noise and iteratively denoises it by predicting the noise at each timestep and subtracting it.
    - It also adds a small amount of noise back in to prevent the generated images from collapsing to a single point.
    - It saves intermediate steps for visualization.

- **`denoise_add_noise`**:
    - This helper function takes the current noisy image, timestep, and predicted noise as input.
    - It calculates the mean of the denoised image based on the noise schedule and adds a controlled amount of noise back in.

- **`sample_ddpm_incorrect`**:
    - This function demonstrates the incorrect way of sampling by not adding back any noise during the denoising process.

**4. Visualization:**

- The code uses the `plot_sample` function (from the imported `diffusion_utilities`) to create animations of the sampling process for both correct and incorrect methods.
- The animations show how the images gradually become clearer as the noise is removed.

**Key Points:**

- The code showcases the importance of adding noise back during the sampling process of diffusion models.
- Without adding noise, the generated images may lack diversity and detail.
- The `ContextUnet` model allows for incorporating additional information like class labels to guide the generation process.

**Additional Notes:**

- The code assumes the existence of a pre-trained model and the `diffusion_utilities` module.
- The visualization part requires appropriate libraries like matplotlib and IPython to be installed.


<h2><font color='green'><b>SAMPLING:</font></h2>

In [1]:
!pip install Ipython
!pip install diffusion

Collecting jedi>=0.16 (from Ipython)
  Downloading jedi-0.19.1-py2.py3-none-any.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: jedi
Successfully installed jedi-0.19.1
Collecting diffusion
  Downloading diffusion-6.10.3-1-py3-none-any.whl (191 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m191.1/191.1 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting attrs==21.*,>=21.4.0 (from diffusion)
  Downloading attrs-21.4.0-py2.py3-none-any.whl (60 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.6/60.6 kB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting cbor2==5.*,>=5.1.2 (from diffusion)
  Downloading cbor2-5.6.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (242 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m242.1/242.1 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting

In [14]:
from typing import Dict, Tuple
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import models, transforms
from torchvision.utils import save_image , make_grid
from matplotlib.animation import FuncAnimation, PillowWriter
import matplotlib.pyplot as plt
import numpy as np
from IPython.display import HTML
from diffusion_utilities import *


<h1><font color='yellow'><b>Setting:</font></h1>

In [23]:
!mkdir ./weights/

In [24]:
!mv model_trained.pth ./weights/


In [15]:
class ContextUnet(nn.Module):
    def __init__(self, in_channels, n_feat=256, n_cfeat=10, height=28):  # cfeat - context features
        super(ContextUnet, self).__init__()

        # number of input channels, number of intermediate feature maps and number of classes
        self.in_channels = in_channels
        self.n_feat = n_feat
        self.n_cfeat = n_cfeat
        self.h = height  #assume h == w. must be divisible by 4, so 28,24,20,16...

        # Initialize the initial convolutional layer
        self.init_conv = ResidualConvBlock(in_channels, n_feat, is_res=True)

        # Initialize the down-sampling path of the U-Net with two levels
        self.down1 = UnetDown(n_feat, n_feat)        # down1 #[10, 256, 8, 8]
        self.down2 = UnetDown(n_feat, 2 * n_feat)    # down2 #[10, 256, 4,  4]

         # original: self.to_vec = nn.Sequential(nn.AvgPool2d(7), nn.GELU())
        self.to_vec = nn.Sequential(nn.AvgPool2d((4)), nn.GELU())

        # Embed the timestep and context labels with a one-layer fully connected neural network
        self.timeembed1 = EmbedFC(1, 2*n_feat)
        self.timeembed2 = EmbedFC(1, 1*n_feat)
        self.contextembed1 = EmbedFC(n_cfeat, 2*n_feat)
        self.contextembed2 = EmbedFC(n_cfeat, 1*n_feat)

        # Initialize the up-sampling path of the U-Net with three levels
        self.up0 = nn.Sequential(
            nn.ConvTranspose2d(2 * n_feat, 2 * n_feat, self.h//4, self.h//4), # up-sample
            nn.GroupNorm(8, 2 * n_feat), # normalize
            nn.ReLU(),
        )
        self.up1 = UnetUp(4 * n_feat, n_feat)
        self.up2 = UnetUp(2 * n_feat, n_feat)

        # Initialize the final convolutional layers to map to the same number of channels as the input image
        self.out = nn.Sequential(
            nn.Conv2d(2 * n_feat, n_feat, 3, 1, 1), # reduce number of feature maps   #in_channels, out_channels, kernel_size, stride=1, padding=0
            nn.GroupNorm(8, n_feat), # normalize
            nn.ReLU(),
            nn.Conv2d(n_feat, self.in_channels, 3, 1, 1), # map to same number of channels as input
        )

    def forward(self, x, t, c=None):
        """
        x : (batch, n_feat, h, w) : input image
        t : (batch, n_cfeat)      : time step
        c : (batch, n_classes)    : context label
        """
        # x is the input image, c is the context label, t is the timestep, context_mask says which samples to block the context on

        # pass the input image through the initial convolutional layer
        x = self.init_conv(x)
        # pass the result through the down-sampling path
        down1 = self.down1(x)       #[10, 256, 8, 8]
        down2 = self.down2(down1)   #[10, 256, 4, 4]

        # convert the feature maps to a vector and apply an activation
        hiddenvec = self.to_vec(down2)

        # mask out context if context_mask == 1
        if c is None:
            c = torch.zeros(x.shape[0], self.n_cfeat).to(x)

        # embed context and timestep
        cemb1 = self.contextembed1(c).view(-1, self.n_feat * 2, 1, 1)     # (batch, 2*n_feat, 1,1)
        temb1 = self.timeembed1(t).view(-1, self.n_feat * 2, 1, 1)
        cemb2 = self.contextembed2(c).view(-1, self.n_feat, 1, 1)
        temb2 = self.timeembed2(t).view(-1, self.n_feat, 1, 1)
        #print(f"uunet forward: cemb1 {cemb1.shape}. temb1 {temb1.shape}, cemb2 {cemb2.shape}. temb2 {temb2.shape}")


        up1 = self.up0(hiddenvec)
        up2 = self.up1(cemb1*up1 + temb1, down2)  # add and multiply embeddings
        up3 = self.up2(cemb2*up2 + temb2, down1)
        out = self.out(torch.cat((up3, x), 1))
        return out

In [16]:
# hyperparameters

# diffusion hyperparameters
timesteps = 500
beta1 = 1e-4
beta2 = 0.02

# network hyperparameters
device = torch.device("cuda:0" if torch.cuda.is_available() else torch.device('cpu'))
n_feat = 64 # 64 hidden dimension feature
n_cfeat = 5 # context vector is of size 5
height = 16 # 16x16 image
save_dir = './weights/'

In [17]:
# construct DDPM noise schedule
b_t = (beta2 - beta1) * torch.linspace(0, 1, timesteps + 1, device=device) + beta1
a_t = 1 - b_t
ab_t = torch.cumsum(a_t.log(), dim=0).exp()
ab_t[0] = 1

In [18]:
# construct model
nn_model = ContextUnet(in_channels=3, n_feat=n_feat, n_cfeat=n_cfeat, height=height).to(device)

<h1><font color= 'red'> Sampling</h1>

In [19]:
# helper function; removes the predicted noise (but adds some noise back in to avoid collapse)
def denoise_add_noise(x, t, pred_noise, z=None):
    if z is None:
        z = torch.randn_like(x)
    noise = b_t.sqrt()[t] * z
    mean = (x - pred_noise * ((1 - a_t[t]) / (1 - ab_t[t]).sqrt())) / a_t[t].sqrt()
    return mean + noise

In [25]:
# load in model weights and set to eval mode
nn_model.load_state_dict(torch.load(f"{save_dir}/model_trained.pth", map_location=device))
nn_model.eval()
print("Loaded in Model")

Loaded in Model


In [26]:
# sample using standard algorithm
@torch.no_grad()
def sample_ddpm(n_sample, save_rate=20):
    # x_T ~ N(0, 1), sample initial noise
    samples = torch.randn(n_sample, 3, height, height).to(device)

    # array to keep track of generated steps for plotting
    intermediate = []
    for i in range(timesteps, 0, -1):
        print(f'sampling timestep {i:3d}', end='\r')

        # reshape time tensor
        t = torch.tensor([i / timesteps])[:, None, None, None].to(device)

        # sample some random noise to inject back in. For i = 1, don't add back in noise
        z = torch.randn_like(samples) if i > 1 else 0

        eps = nn_model(samples, t)    # predict noise e_(x_t,t)
        samples = denoise_add_noise(samples, i, eps, z)
        if i % save_rate ==0 or i==timesteps or i<8:
            intermediate.append(samples.detach().cpu().numpy())

    intermediate = np.stack(intermediate)
    return samples, intermediate

In [27]:
# visualize samples
plt.clf()
samples, intermediate_ddpm = sample_ddpm(32)
animation_ddpm = plot_sample(intermediate_ddpm,32,4,save_dir, "ani_run", None, save=False)
HTML(animation_ddpm.to_jshtml())



<Figure size 640x480 with 0 Axes>

In [28]:
# incorrectly sample without adding in noise
@torch.no_grad()
def sample_ddpm_incorrect(n_sample):
    # x_T ~ N(0, 1), sample initial noise
    samples = torch.randn(n_sample, 3, height, height).to(device)

    # array to keep track of generated steps for plotting
    intermediate = []
    for i in range(timesteps, 0, -1):
        print(f'sampling timestep {i:3d}', end='\r')

        # reshape time tensor
        t = torch.tensor([i / timesteps])[:, None, None, None].to(device)

        # don't add back in noise
        z = 0

        eps = nn_model(samples, t)    # predict noise e_(x_t,t)
        samples = denoise_add_noise(samples, i, eps, z)
        if i%20==0 or i==timesteps or i<8:
            intermediate.append(samples.detach().cpu().numpy())

    intermediate = np.stack(intermediate)
    return samples, intermediate

In [29]:
# visualize samples
plt.clf()
samples, intermediate = sample_ddpm_incorrect(32)
animation = plot_sample(intermediate,32,4,save_dir, "ani_run", None, save=False)
HTML(animation.to_jshtml())



<Figure size 640x480 with 0 Axes>