## Theory

Let $X_0$ be the original image and $X_1, ..., X_T$ be the latent variables at different timesteps. We assume the transitions to be first order markovian chain:
$$X_0 \rightarrow X_1 \rightarrow X_2 - - \rightarrow X_T$$
The transitions are gaussian:
$$X_t = \sqrt{\alpha_t} X_{t-1} + \sqrt{1-\alpha_t} \epsilon, \quad \epsilon \sim \mathcal{N}(0,I)$$

We will use latent variables $X_t$ to model the data as:

$$\log p_\theta(X_0) = \log \int p_\theta(X_{0:T}) dX_{1:T}$$

We also assume that: $p_\theta(X_T) = \mathcal{N}(0, I)$ - The prior distribution of the final latent variable is a standard normal distribution

Because we cannot compute log likelihood directly, we maximize the ELBO:
$$\log p_\theta(X_0) \geq \mathbb{E}_{q(X_{1:T}|X_0)}[\log \frac{p_\theta(X_{0:T})}{q(X_{1:T}|X_0)}]$$

After simplifying, we get:
$$ ELBO = \mathbb{E}_{q(X_{1:T}|X_0)}\left[\log p_\theta(X_T) + \log \left(\frac{p(X_T|X_0)}{q(X_T|X_0)}\right) + \sum_{t=2}^T \log \left(\frac{p_\theta(X_{t-1}|X_t)}{q(X_{t-1}|X_t,X_0)}\right)\right]$$

- Term 1(Reconstruction term): $\log p_\theta(X_T)$ - ignored in the original DDPM paper because it only applies to the first timestep

- Term 2(Prior matching term): $\log \left(\frac{p(X_T|X_0)}{q(X_T|X_0)}\right)$ - Can be ignored as it is invariant to $\theta$

- Term 3(Transition matching term): $\sum_{t=2}^T \log \left(\frac{p_\theta(X_{t-1}|X_t)}{q(X_{t-1}|X_t,X_0)}\right) \propto \sum_{t=2}^T \mathbb{E}_{q(x_t|x_0)} \left\| \epsilon - \epsilon_\theta(x_t, t) \right\|_2^2$ used to train the network to predict the noise

## Prelimnaries

In [None]:
import os
os.environ['KMP_DUPLICATE_LIB_OK']='TRUE'  # Prevent OpenMP initialization error
import torch  # Import PyTorch library
import torch.nn as nn  # Import neural network module
import torch.optim as optim  # Import optimization module
import math  # Import math module for log calculations
from torchvision import datasets, transforms  # Import datasets and transforms
from torchvision.utils import save_image, make_grid  # Import utility to save images
import torchvision  # Import torchvision library
import matplotlib.pyplot as plt  # Import plotting library
import os  # Import os module for file operations
import numpy as np  # Import numpy library
from torch.utils.data import Dataset  # Add this import at the top
from PIL import Image  # Import PIL Image module for image handling
import torch.nn.functional as F  # Import PyTorch's functional API for loss functions

In [None]:
# Set device to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # Set device
print(f"Using device: {device}")  # Print the device being used

## Define models

In [None]:
class UNet(nn.Module):
    """U-Net architecture for noise prediction in diffusion models with built-in residual connections, optimized for 128x128 RGB images"""
    def __init__(self, in_channels=3, time_dim=256):  # Modified for RGB input (3 channels)
        super().__init__()

        # Pooling and activation layers used throughout the network
        self.pool = nn.MaxPool2d(2)  # Max pooling for downsampling
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)  # Bilinear upsampling
        self.relu = nn.ReLU()  # ReLU activation function

        # Encoder Block 1 - Input level (128x128)
        self.enc1_conv1 = nn.Conv2d(in_channels, 64, 3, padding=1)  # First conv: (N,3,128,128) -> (N,64,128,128)
        self.enc1_bn1 = nn.BatchNorm2d(64)  # Normalizes each of the 64 channels independently
        self.enc1_conv2 = nn.Conv2d(64, 64, 3, padding=1)  # Second conv: (N,64,128,128) -> (N,64,128,128)
        self.enc1_bn2 = nn.BatchNorm2d(64)  # Batch norm after second conv

        # Encoder Block 2 - After first pooling (64x64)
        self.enc2_conv1 = nn.Conv2d(64, 128, 3, padding=1)  # First conv: (N,64,64,64) -> (N,128,64,64)
        self.enc2_bn1 = nn.BatchNorm2d(128)  # Batch norm after first conv
        self.enc2_conv2 = nn.Conv2d(128, 128, 3, padding=1)  # Second conv: (N,128,64,64) -> (N,128,64,64)
        self.enc2_bn2 = nn.BatchNorm2d(128)  # Batch norm after second conv

        # Encoder Block 3 - After second pooling (32x32)
        self.enc3_conv1 = nn.Conv2d(128, 256, 3, padding=1)  # First conv: (N,128,32,32) -> (N,256,32,32)
        self.enc3_bn1 = nn.BatchNorm2d(256)  # Batch norm after first conv
        self.enc3_conv2 = nn.Conv2d(256, 256, 3, padding=1)  # Second conv: (N,256,32,32) -> (N,256,32,32)
        self.enc3_bn2 = nn.BatchNorm2d(256)  # Batch norm after second conv

        # Decoder Block 3 - First upsampling (32x32 -> 64x64)
        self.dec3_conv1 = nn.Conv2d(384, 128, 3, padding=1)  # First conv: (N,384,64,64) -> (N,128,64,64)
        self.dec3_bn1 = nn.BatchNorm2d(128)  # Batch norm after first conv
        self.dec3_conv2 = nn.Conv2d(128, 128, 3, padding=1)  # Second conv: (N,128,64,64) -> (N,128,64,64)
        self.dec3_bn2 = nn.BatchNorm2d(128)  # Batch norm after second conv

        # Decoder Block 2 - Second upsampling (64x64 -> 128x128)
        self.dec2_conv1 = nn.Conv2d(192, 64, 3, padding=1)  # First conv: (N,192,128,128) -> (N,64,128,128)
        self.dec2_bn1 = nn.BatchNorm2d(64)  # Batch norm after first conv
        self.dec2_conv2 = nn.Conv2d(64, 64, 3, padding=1)  # Second conv: (N,64,128,128) -> (N,64,128,128)
        self.dec2_bn2 = nn.BatchNorm2d(64)  # Batch norm after second conv

        # Final output layer
        self.final_conv = nn.Conv2d(64, in_channels, kernel_size=1)  # Final conv: (N,64,128,128) -> (N,3,128,128)

        # Time embedding dimension and projection
        self.time_dim = time_dim  # Time embedding dimension

        # Define MLPs as model parameters
        self.time_enc1 = nn.Sequential(nn.Linear(time_dim, 64), nn.SiLU(), nn.Linear(64, 64))  # Time embedding MLP for encoder block 1
        self.time_enc2 = nn.Sequential(nn.Linear(time_dim, 128), nn.SiLU(), nn.Linear(128, 128))  # Time embedding MLP for encoder block 2
        self.time_enc3 = nn.Sequential(nn.Linear(time_dim, 256), nn.SiLU(), nn.Linear(256, 256))  # Time embedding MLP for encoder block 3
        self.time_dec3 = nn.Sequential(nn.Linear(time_dim, 128), nn.SiLU(), nn.Linear(128, 128))  # Time embedding MLP for decoder block 3
        self.time_dec2 = nn.Sequential(nn.Linear(time_dim, 64), nn.SiLU(), nn.Linear(64, 64))  # Time embedding MLP for decoder block 2

    def get_time_embedding(self, t):
        """Generate sinusoidal time embedding and project through MLPs for each block

        Args:
            t: Time tensor of shape (batch_size, 1)

        Returns:
            Dictionary containing time embeddings for each block
        """
        half_dim = self.time_dim // 2  # Calculate half dimension for sin/cos embeddings
        embeddings = torch.arange(half_dim, device=t.device).float()  # Create position indices
        embeddings = torch.exp(-math.log(10000) * embeddings / half_dim)  # Calculate frequency bands
        embeddings = t * embeddings.unsqueeze(0)  # Shape: (batch_size, half_dim)
        embeddings = torch.cat([torch.sin(embeddings), torch.cos(embeddings)], dim=-1)  # Shape: (batch_size, time_dim)

        # Use the class MLPs instead of creating new ones
        t_emb_enc1 = self.time_enc1(embeddings).unsqueeze(-1).unsqueeze(-1)  # Use class MLP
        t_emb_enc2 = self.time_enc2(embeddings).unsqueeze(-1).unsqueeze(-1)  # Use class MLP
        t_emb_enc3 = self.time_enc3(embeddings).unsqueeze(-1).unsqueeze(-1)  # Use class MLP
        t_emb_dec3 = self.time_dec3(embeddings).unsqueeze(-1).unsqueeze(-1)  # Use class MLP
        t_emb_dec2 = self.time_dec2(embeddings).unsqueeze(-1).unsqueeze(-1)  # Use class MLP

        return {
            'enc1': t_emb_enc1,  # Time embedding for encoder block 1
            'enc2': t_emb_enc2,  # Time embedding for encoder block 2
            'enc3': t_emb_enc3,  # Time embedding for encoder block 3
            'dec3': t_emb_dec3,  # Time embedding for decoder block 3
            'dec2': t_emb_dec2   # Time embedding for decoder block 2
        }

    def forward(self, x, t):
        """Forward pass through U-Net optimized for 128x128 RGB input with time embeddings at each block"""
        # Time embedding
        t = t.unsqueeze(-1).float()  # Ensure time is in correct shape
        t_embs = self.get_time_embedding(t)  # Get time embeddings for each block

        # Encoder pathway with skip connections and time embeddings
        # Encoder Block 1 (128x128)
        e1 = self.relu(self.enc1_bn1(self.enc1_conv1(x)))  # First conv layer
        e1 = self.relu(self.enc1_bn2(self.enc1_conv2(e1)))  # Second conv layer with ReLU
        e1 = e1 + t_embs['enc1']  # Add time embedding to encoder block 1

        # Encoder Block 2 (64x64)
        e2 = self.relu(self.enc2_bn1(self.enc2_conv1(self.pool(e1))))  # First conv layer
        e2 = self.relu(self.enc2_bn2(self.enc2_conv2(e2)))  # Second conv layer with ReLU
        e2 = e2 + t_embs['enc2']  # Add time embedding to encoder block 2

        # Encoder Block 3 (32x32)
        e3 = self.relu(self.enc3_bn1(self.enc3_conv1(self.pool(e2))))  # First conv layer
        e3 = self.relu(self.enc3_bn2(self.enc3_conv2(e3)))  # Second conv layer with ReLU
        e3 = e3 + t_embs['enc3']  # Add time embedding to encoder block 3

        # Decoder pathway using skip connections
        # Decoder Block 3 (32x32 -> 64x64)
        d3 = torch.cat([self.upsample(e3), e2], dim=1)  # Concatenate along channel dimension
        d3 = self.relu(self.dec3_bn1(self.dec3_conv1(d3)))  # First conv block
        d3 = self.dec3_bn2(self.dec3_conv2(d3))  # Second conv block
        d3 = d3 + t_embs['dec3']  # Add time embedding to decoder block 3

        # Decoder Block 2 (64x64 -> 128x128)
        d2 = torch.cat([self.upsample(d3), e1], dim=1)  # Concatenate along channel dimension
        d2 = self.relu(self.dec2_bn1(self.dec2_conv1(d2)))  # First conv block
        d2 = self.dec2_bn2(self.dec2_conv2(d2))  # Second conv block
        d2 = d2 + t_embs['dec2']  # Add time embedding to decoder block 2

        return self.final_conv(d2)  # Return final output (N,3,128,128)

## Diffusion process

The forward diffusion process gradually adds Gaussian noise to an image over multiple timesteps. At each timestep $t$, we add noise according to a schedule, transforming a clear image $x_0$ into increasingly noisy versions $x_1$, $x_2$, ..., $x_t$. The amount of noise added is controlled by the diffusion parameters $\alpha$ and $\beta$. The forward process equation is:

$$x_t = \sqrt{\bar{\alpha_t}} \cdot x_0 + \sqrt{1 - \bar{\alpha_t}} \cdot \epsilon$$

where:
- $x_t$ is the noisy image at timestep $t$
- $x_0$ is the original clean image  
- $\bar{\alpha_t}$ (alpha_bar) is the cumulative product of $\alpha_i = (1-\beta_i)$ up to timestep $t$, i.e. $\bar{\alpha}_t = \prod_{i=1}^t \alpha_i$
- $\epsilon$ (epsilon) is random Gaussian noise

This process gradually transforms the data distribution into pure Gaussian noise at $t=T$.

In [None]:
# Hyperparameters
BETA_START = 0.0001  # Start value for noise schedule
BETA_END = 0.02  # End value for noise schedule

In [None]:
def add_noise_at_timestep(x_start, t, timesteps=1000):
    """
    Add noise to images at timestep t according to diffusion process

    Args:
        x_start (torch.Tensor): Original images
        t (torch.Tensor): Timesteps
        timesteps (int): Total number of diffusion steps

    Returns:
        tuple: Noisy images and noise
    """
    device = x_start.device  # Get device from input tensor
    noise = torch.randn_like(x_start)  # Generate random noise on same device as x_start

    betas = torch.linspace(BETA_START, BETA_END, timesteps).to(device)  # Move noise schedule to device
    alphas = 1 - betas  # Alpha values
    alphas_cumprod = torch.cumprod(alphas, dim=0)  # Cumulative product of alphas

    # Extract relevant alpha values for timestep t
    sqrt_alphas_cumprod_t = alphas_cumprod[t].sqrt()  # Get sqrt(alpha_bar) for timestep t
    sqrt_one_minus_alphas_cumprod_t = (1 - alphas_cumprod[t]).sqrt()  # Get sqrt(1-alpha_bar) for timestep t

    # Reshape for broadcasting
    sqrt_alphas_cumprod_t = sqrt_alphas_cumprod_t.view(-1, 1, 1, 1)  # Shape: (batch_size, 1, 1, 1)
    sqrt_one_minus_alphas_cumprod_t = sqrt_one_minus_alphas_cumprod_t.view(-1, 1, 1, 1)  # Shape: (batch_size, 1, 1, 1)

    # Apply noise using the diffusion equation
    noisy_images = sqrt_alphas_cumprod_t * x_start + sqrt_one_minus_alphas_cumprod_t * noise  # Forward diffusion step

    return noisy_images, noise  # Return both noisy images and the noise added

## Define the loss function

Its simply mean square error loss

In [None]:
def diffusion_loss_fn(model, x_start, timesteps=1000):
    """
    Calculate the diffusion loss across multiple timesteps for each image in batch

    Args:
        model (nn.Module): The UNet model for noise prediction
        x_start (torch.Tensor): Original clean images (batch_size, channels, height, width)
        timesteps (int): Total number of diffusion steps

    Returns:
        torch.Tensor: Mean loss per image in batch
    """
    batch_size = x_start.shape[0]

    # Initialize total loss for each image
    total_loss = torch.zeros(batch_size, device=x_start.device)

    # Sample 10 timesteps for each image
    for _ in range(10):
        # Sample random timesteps for each image in the batch
        t = torch.randint(1, timesteps, (batch_size,), device=x_start.device)

        # Add noise to the input images for the sampled timesteps
        noisy_images, noise = add_noise_at_timestep(x_start, t, timesteps)

        # Predict the noise using the model
        predicted_noise = model(noisy_images, t)

        # Calculate MSE loss between predicted and actual noise per image
        step_loss = F.mse_loss(predicted_noise, noise, reduction='none')  # Shape: (batch_size, channels, H, W)
        step_loss = step_loss.mean(dim=(1,2,3))  # Average over channels, height, width

        # Add to total loss
        total_loss += step_loss

    # Average loss across the 10 timesteps
    avg_loss = total_loss / 10

    return avg_loss  # Return average loss per image in batch

## Data preparation

In [None]:
# Hyperparameters
batch_size = 2  # Number of images to process in each training iteration

In [None]:
# Define transformations
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # Resize images to 128x128 pixels
    transforms.ToTensor(),  # Convert PIL Image to tensor and scale to [0, 1]
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Normalize to [-1, 1] range
])

class CustomImageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir  # Directory containing all images
        self.transform = transform  # Transformations to apply to images
        self.image_files = [f for f in os.listdir(root_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]  # List all image files

    def __len__(self):
        return len(self.image_files)  # Return the total number of images

    def __getitem__(self, idx):
        img_path = os.path.join(self.root_dir, self.image_files[idx])  # Get path of image at index idx
        image = Image.open(img_path).convert('RGB')  # Open image and convert to RGB

        if self.transform:
            image = self.transform(image)  # Apply transformations if any

        return image, 0  # Return image and a dummy label (0)

# Load the Butterfly dataset from local machine
data_dir = r'D:\Users\VICTOR\Desktop\ADRL\Assignment 3\Butterfly dataset'  # Path to the Butterfly dataset

# Create dataset and dataloader
dataset = CustomImageDataset(root_dir=data_dir, transform=transform)  # Use our custom dataset class
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)  # Create dataloader

print(f"Loaded {len(dataset)} images.")  # Print the total number of images loaded

## Training loop

In [None]:
# Initialize model and move to device
model = UNet()  # Create UNet model instance
model = model.to(device)  # Move model to GPU if available

In [None]:
# Set model to training mode
model.train()  # Enable training mode for model (activates dropout, batch norm, etc.)

In [None]:
# Define training hyperparameters
num_epochs = 100  # Number of epochs to train for
learning_rate = 1e-4  # Learning rate for optimizer

In [None]:
# Initialize optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)  # Initialize Adam optimizer

In [None]:
# Training loop implementation
for epoch in range(num_epochs):  # Iterate through epochs
    print(f"Epoch {epoch+1}/{num_epochs}")  # Print current epoch progress

    for batch_idx, (images, _) in enumerate(dataloader):  # Iterate through batches
        optimizer.zero_grad()  # Reset gradients for this batch

        images = images.to(device)  # Move input images to device

        # Calculate diffusion loss using the modified loss function
        batch_losses = diffusion_loss_fn(model, images)  # Get per-image losses
        loss = batch_losses.mean()  # Average loss across batch
        print(f"Batch {batch_idx+1}, Loss: {loss.item():.4f}")  # Print current batch loss and value

        loss.backward()  # Backward pass to compute gradients
        optimizer.step()  # Update model parameters

        # Clear GPU cache every few batches
        if batch_idx % 10 == 0:
            torch.cuda.empty_cache()

    # Save model checkpoint after each epoch
    torch.save(model.state_dict(), f'unet_model_epoch_{epoch+1}.pth')  # Save model weights

## Inference

The inference process in DDPM involves reversing the diffusion process to generate new samples. The steps are as follows:

1. Sample from the prior: Start by sampling $x_T \sim \mathcal{N}(0, I)$, which is the prior distribution.

2. Reverse the diffusion process: Sequentially get $x_{t-1}$ from xt for $t = T, T-1, \ldots, 1$ using
   
    $$x_{t-1} = \frac{1}{\sqrt{\alpha_t}} (x_t - \frac{1-\alpha_t}{\sqrt{1-\bar{\alpha}_t}} \epsilon(x_t, t)) + \sigma_t z$$

    where $z \sim \mathcal{N}(0, I)$ and $\sigma_t^2 = \beta_t = \frac{1-\bar{\alpha}_{t-1}}{1-\bar{\alpha}_t}(1-\alpha_t)$

3. Obtain the final sample: The final sample x0 is obtained after completing the reverse process.


In [None]:
def get_beta_schedule(num_timesteps):
    """
    Get the beta schedule for the diffusion process

    Args:
        num_timesteps (int): Number of timesteps in diffusion process

    Returns:
        torch.Tensor: Beta schedule tensor
    """
    # Create linear schedule from BETA_START to BETA_END
    return torch.linspace(BETA_START, BETA_END, num_timesteps).to('cuda')  # Return beta schedule on GPU

def generate_sample(model, num_timesteps, device='cuda'):
    """
    Generate a new RGB image sample by reversing the diffusion process.

    Args:
        model: UNet model instance trained on diffusion process
        num_timesteps: Number of timesteps in the diffusion process
        device: Device to run generation on (default: 'cuda')

    Returns:
        x_0: Generated RGB image sample after complete reverse diffusion
    """
    # Start with random noise from standard normal distribution
    x_t = torch.randn(1, 3, 128, 128).to(device)

    # Get pre-computed diffusion parameters
    betas = get_beta_schedule(num_timesteps)
    alphas = 1 - betas
    alphas_cumprod = torch.cumprod(alphas, dim=0)

    model.eval()
    with torch.no_grad():
        for t in reversed(range(num_timesteps)):
            t_tensor = torch.tensor([t], device=device)

            predicted_noise = model(x_t, t_tensor)

            # Calculate reverse process parameters
            alpha_t = alphas[t]
            alpha_t_bar = alphas_cumprod[t]

            # Calculate the correct variance term
            if t > 0:
                alpha_t_bar_prev = alphas_cumprod[t-1]
                sigma_t = torch.sqrt(
                    ((1 - alpha_t_bar_prev) / (1 - alpha_t_bar)) * (1 - alpha_t)
                )
            else:
                sigma_t = torch.zeros_like(alpha_t)

            # Only add random noise if t > 0
            if t > 0:
                noise = torch.randn_like(x_t)
            else:
                noise = 0

            # Reverse diffusion step formula
            x_t = (1 / torch.sqrt(alpha_t)) * (
                x_t - ((1 - alpha_t) / torch.sqrt(1 - alpha_t_bar)) * predicted_noise
            ) + sigma_t * noise

    x_0 = (torch.clamp(x_t, -1, 1) + 1) / 2

    return x_0

In [None]:
# Load the trained model and generate samples
model_path = r"D:\Users\VICTOR\Desktop\ADRL\Assignment 3\unet_model_epoch_11.pth"
loaded_model = UNet().to('cuda')

# Load the checkpoint dictionary
checkpoint = torch.load(model_path)

# Extract the model state dict from the checkpoint
loaded_model.load_state_dict(checkpoint['model_state_dict'])

# Set model to evaluation mode
loaded_model.eval()

print("Model loaded successfully!")

In [None]:
# Create output directory if it doesn't exist
output_dir = "generated_images"
os.makedirs(output_dir, exist_ok=True)

# Set model to evaluation mode
loaded_model.eval()
num_timesteps = 1000  # Use same number of timesteps as training

# Generate and save 100 images
for i in range(100):
    # Generate sample using the loaded model
    generated_sample = generate_sample(loaded_model, num_timesteps, device='cuda')

    # Convert tensor to image
    generated_image = generated_sample[0].cpu().permute(1, 2, 0).numpy()

    # Save the image
    plt.figure(figsize=(8, 8))
    plt.imshow(generated_image)
    plt.axis('off')
    plt.savefig(os.path.join(output_dir, f'generated_image_{i+1}.png'),
                bbox_inches='tight', pad_inches=0)
    plt.close()

    # Print progress
    if (i + 1) % 10 == 0:
        print(f"Generated {i+1}/100 images")

print("All images generated and saved successfully!")