# Diffusion Models

Diffusion models are a class of `generative models` that create high-quality images, videos, and even text by gradually refining noise into structured data. They are currently the state-of-the-art in generative AI, o`utperforming GANs and VAEs` in many tasks.

These models have been used in Stable Diffusion, DALL·E 2, Imagen, and Gen-2, enabling text-to-image, video synthesis, and music generation.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader
import numpy as np
import matplotlib.pyplot as plt

# 1️⃣ Noise Scheduler (Forward Diffusion Process)
def forward_diffusion(x, t, noise):
    """Adds noise to an image at time step t."""
    alpha_t = torch.exp(-0.02 * t)  # Noise schedule
    noisy_x = alpha_t * x + (1 - alpha_t) * noise
    return noisy_x

# 2️⃣ Simple U-Net Architecture for Denoising
class SimpleUNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 32, 3, padding=1), nn.ReLU(),
            nn.Conv2d(32, 64, 3, padding=1), nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 32, 3, padding=1), nn.ReLU(),
            nn.ConvTranspose2d(32, 1, 3, padding=1), nn.Tanh()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# 3️⃣ Training Loop
def train_diffusion_model():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = SimpleUNet().to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.MSELoss()

    # Load MNIST Dataset
    transform = transforms.Compose([transforms.ToTensor()])
    train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

    # Training
    for epoch in range(10):
        for images, _ in train_loader:
            images = images.to(device)
            noise = torch.randn_like(images).to(device)
            t = torch.randint(0, 100, (images.shape[0],), device=device) / 100  # Random timestep
            noisy_images = forward_diffusion(images, t[:, None, None, None], noise)

            optimizer.zero_grad()
            predicted_denoised = model(noisy_images)
            loss = criterion(predicted_denoised, images)
            loss.backward()
            optimizer.step()

        print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")
    return model

# 4️⃣ Generating Images
def generate_images(model, num_images=5):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.eval()
    with torch.no_grad():
        noise = torch.randn((num_images, 1, 28, 28)).to(device)
        generated_images = model(noise).cpu()

    fig, axs = plt.subplots(1, num_images, figsize=(10, 2))
    for i in range(num_images):
        axs[i].imshow(generated_images[i].squeeze(), cmap='gray')
        axs[i].axis('off')
    plt.show()

# Train and Generate!
model = train_diffusion_model()
generate_images(model)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt

# Set random seed for reproducibility
torch.manual_seed(42)

# Hyperparameters
T = 1000  # Number of timesteps
BETA_START = 1e-4
BETA_END = 0.02
DATA_DIM = 1  # 1D data for simplicity
BATCH_SIZE = 128
LEARNING_RATE = 1e-4
EPOCHS = 100

# Variance schedule (linear)
betas = torch.linspace(BETA_START, BETA_END, T)
alphas = 1.0 - betas
alpha_bars = torch.cumprod(alphas, dim=0)  # \bar{\alpha}_t = \prod_{s=1}^t (1 - \beta_s)

# Forward process: q(x_t | x_0)
def forward_process(x_0, t, device="cpu"):
    """
    x_0: Original data [batch_size, data_dim]
    t: Timestep [batch_size]
    Returns: x_t = sqrt(\bar{\alpha}_t) * x_0 + sqrt(1 - \bar{\alpha}_t) * \epsilon
    """
    alpha_bar_t = alpha_bars[t].to(device)  # [batch_size]
    noise = torch.randn_like(x_0).to(device)  # \epsilon ~ N(0, I)
    x_t = (alpha_bar_t.sqrt() * x_0) + ((1 - alpha_bar_t).sqrt() * noise)
    return x_t, noise

# Simple neural network to predict noise \epsilon_\theta(x_t, t)
class NoisePredictor(nn.Module):
    def __init__(self, input_dim, hidden_dim=64):
        super(NoisePredictor, self).__init__()
        self.time_embedding = nn.Embedding(T, hidden_dim)  # Embed timestep t
        self.net = nn.Sequential(
            nn.Linear(input_dim + hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim)
        )

    def forward(self, x_t, t):
        t_embed = self.time_embedding(t)  # [batch_size, hidden_dim]
        x_input = torch.cat([x_t, t_embed], dim=-1)  # Concatenate x_t and t_embed
        return self.net(x_input)

# Reverse process sampling: p_\theta(x_{t-1} | x_t)
def reverse_process(model, x_t, t, device="cpu"):
    """
    x_t: Noisy data at timestep t
    t: Current timestep (scalar or [batch_size])
    Returns: x_{t-1} = (1/sqrt(\alpha_t)) * (x_t - (\beta_t/sqrt(1-\bar{\alpha}_t)) * \epsilon_\theta) + \sigma_t * z
    """
    beta_t = betas[t].to(device)
    alpha_t = alphas[t].to(device)
    alpha_bar_t = alpha_bars[t].to(device)

    # Predict noise with the model
    epsilon_theta = model(x_t, t)

    # Compute mean of p_\theta(x_{t-1} | x_t)
    mu_theta = (1 / alpha_t.sqrt()) * (x_t - (beta_t / (1 - alpha_bar_t).sqrt()) * epsilon_theta)

    # Variance (simplified as \beta_t in DDPM)
    sigma_t = beta_t.sqrt()
    z = torch.randn_like(x_t).to(device) if t > 0 else 0  # No noise at t=0
    x_t_minus_1 = mu_theta + sigma_t * z
    return x_t_minus_1

# Generate toy dataset (e.g., samples from a mixture of Gaussians)
def generate_data(n_samples=1000):
    data = torch.cat([
        torch.randn(n_samples // 2, DATA_DIM) * 0.1 - 1.0,  # Mean -1
        torch.randn(n_samples // 2, DATA_DIM) * 0.1 + 1.0   # Mean +1
    ])
    return data

# Training loop
def train_diffusion_model():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = NoisePredictor(input_dim=DATA_DIM).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
    data = generate_data(BATCH_SIZE * 10).to(device)

    for epoch in range(EPOCHS):
        model.train()
        total_loss = 0
        for _ in range(10):  # Mini-batches
            # Sample random batch and timesteps
            batch = data[torch.randperm(data.size(0))[:BATCH_SIZE]]
            t = torch.randint(0, T, (BATCH_SIZE,), device=device)

            # Forward process
            x_t, true_noise = forward_process(batch, t, device)

            # Predict noise
            pred_noise = model(x_t, t)

            # Loss: MSE between predicted and true noise
            loss = F.mse_loss(pred_noise, true_noise)
            total_loss += loss.item()

            # Backprop
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        print(f"Epoch {epoch+1}/{EPOCHS}, Loss: {total_loss / 10:.4f}")

    return model

# Sampling function
def sample_diffusion_model(model, n_samples=1000, device="cpu"):
    model.eval()
    with torch.no_grad():
        # Start from pure noise
        x_t = torch.randn(n_samples, DATA_DIM).to(device)

        # Reverse process from T to 0
        for t in reversed(range(T)):
            t_tensor = torch.full((n_samples,), t, dtype=torch.long, device=device)
            x_t = reverse_process(model, x_t, t_tensor, device)

        return x_t

# Run the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = train_diffusion_model()

# Generate samples
samples = sample_diffusion_model(model, n_samples=1000, device=device)

# Plot original data vs. generated samples
original_data = generate_data(1000).cpu().numpy()
samples = samples.cpu().numpy()

plt.hist(original_data, bins=50, alpha=0.5, label="Original Data", density=True)
plt.hist(samples, bins=50, alpha=0.5, label="Generated Samples", density=True)
plt.legend()
plt.title("Diffusion Model: Original vs Generated Data")
plt.show()

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader
import numpy as np
import matplotlib.pyplot as plt

# Hyperparameters
T = 1000  # Number of timesteps
BETA_START = 1e-4
BETA_END = 0.02
IMG_SIZE = 28
CHANNELS = 1  # MNIST is grayscale
BATCH_SIZE = 128
LEARNING_RATE = 1e-4
EPOCHS = 20  # Increased epochs for images

# Variance schedule (linear)
betas = torch.linspace(BETA_START, BETA_END, T)
alphas = 1.0 - betas
alpha_bars = torch.cumprod(alphas, dim=0)

# Forward process: q(x_t | x_0)
def forward_process(x_0, t, device="cpu"):
    """
    x_0: [batch_size, channels, height, width]
    t: [batch_size]
    Returns: x_t and noise
    """
    alpha_bar_t = alpha_bars[t].to(device).view(-1, 1, 1, 1)  # Shape for broadcasting
    noise = torch.randn_like(x_0).to(device)
    x_t = (alpha_bar_t.sqrt() * x_0) + ((1 - alpha_bar_t).sqrt() * noise)
    return x_t, noise

# Simple U-Net for noise prediction
class SimpleUNet(nn.Module):
    def __init__(self, time_dim=64):
        super(SimpleUNet, self).__init__()
        self.time_embedding = nn.Embedding(T, time_dim)

        # Encoder
        self.enc1 = nn.Conv2d(CHANNELS, 32, 3, padding=1)
        self.enc2 = nn.Conv2d(32, 64, 3, padding=1, stride=2)  # Downsample

        # Bottleneck
        self.bottleneck = nn.Conv2d(64, 128, 3, padding=1)

        # Decoder
        self.dec1 = nn.ConvTranspose2d(128, 64, 3, padding=1, stride=2, output_padding=1)  # Upsample
        self.dec2 = nn.Conv2d(64 + 32, 32, 3, padding=1)  # Skip connection concatenation
        self.dec3 = nn.Conv2d(32, CHANNELS, 3, padding=1)

    def forward(self, x, t):
        # Time embedding
        t_embed = self.time_embedding(t).view(-1, 64, 1, 1)  # [batch_size, time_dim, 1, 1]

        # Encoder
        e1 = F.relu(self.enc1(x))           # [batch, 32, 28, 28]
        e2 = F.relu(self.enc2(e1))          # [batch, 64, 14, 14]

        # Bottleneck
        b = F.relu(self.bottleneck(e2))     # [batch, 128, 14, 14]

        # Decoder with skip connections
        d1 = F.relu(self.dec1(b))           # [batch, 64, 28, 28]
        d1 = torch.cat([d1, e1], dim=1)     # [batch, 96, 28, 28] (skip from e1)
        d2 = F.relu(self.dec2(d1))          # [batch, 32, 28, 28]
        out = self.dec3(d2)                 # [batch, 1, 28, 28]
        return out

# Reverse process sampling
def reverse_process(model, x_t, t, device="cpu"):
    beta_t = betas[t].to(device).view(-1, 1, 1, 1)
    alpha_t = alphas[t].to(device).view(-1, 1, 1, 1)
    alpha_bar_t = alpha_bars[t].to(device).view(-1, 1, 1, 1)

    epsilon_theta = model(x_t, t)
    mu_theta = (1 / alpha_t.sqrt()) * (x_t - (beta_t / (1 - alpha_bar_t).sqrt()) * epsilon_theta)
    sigma_t = beta_t.sqrt()
    z = torch.randn_like(x_t).to(device) if t.max() > 0 else 0  # No noise at t=0
    x_t_minus_1 = mu_theta + sigma_t * z
    return x_t_minus_1

# Training loop
def train_diffusion_model():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = SimpleUNet().to(device)
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

    # Load MNIST
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
    train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

    for epoch in range(EPOCHS):
        model.train()
        total_loss = 0
        for batch_idx, (images, _) in enumerate(train_loader):
            images = images.to(device)
            t = torch.randint(0, T, (images.size(0),), device=device)

            # Forward process
            x_t, true_noise = forward_process(images, t, device)

            # Predict noise
            pred_noise = model(x_t, t)

            # Loss
            loss = F.mse_loss(pred_noise, true_noise)
            total_loss += loss.item()

            # Backprop
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        avg_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch+1}/{EPOCHS}, Avg Loss: {avg_loss:.4f}")
    return model

# Sampling function
def sample_diffusion_model(model, n_samples=5, device="cpu"):
    model.eval()
    with torch.no_grad():
        # Start from pure noise
        x_t = torch.randn(n_samples, CHANNELS, IMG_SIZE, IMG_SIZE).to(device)

        # Reverse process
        for t in reversed(range(T)):
            t_tensor = torch.full((n_samples,), t, dtype=torch.long, device=device)
            x_t = reverse_process(model, x_t, t_tensor, device)

        return x_t

# Train and generate
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = train_diffusion_model()

# Generate samples
samples = sample_diffusion_model(model, n_samples=5, device=device)
samples = samples.cpu()

# Plot generated images
fig, axs = plt.subplots(1, 5, figsize=(10, 2))
for i in range(5):
    axs[i].imshow(samples[i].squeeze(), cmap='gray')
    axs[i].axis('off')
plt.show()

In [None]:
# Cosine variance schedule
def cosine_beta_schedule(timesteps, s=0.008):
    """
    Returns betas based on a cosine schedule.
    s: Small shift to prevent extreme values.
    """
    steps = timesteps + 1
    t = torch.linspace(0, timesteps, steps, dtype=torch.float32) / timesteps
    f_t = torch.cos(((t + s) / (1 + s)) * (np.pi / 2)) ** 2
    alpha_bars = f_t / f_t[0]  # Normalize so alpha_bar_0 = 1
    betas = 1 - alpha_bars[1:] / alpha_bars[:-1]  # beta_t = 1 - alpha_t
    betas = torch.clamp(betas, 0, 0.999)  # Ensure betas stay in (0, 1)
    return betas

# Precompute schedule
betas = cosine_beta_schedule(T)