## Preliminaries

In [None]:
import os
os.environ['KMP_DUPLICATE_LIB_OK']='TRUE'  # Prevent OpenMP initialization error
import torch  # Import PyTorch library
import torch.nn as nn  # Import neural network module
import torch.optim as optim  # Import optimization module
import math  # Import math module for log calculations
from torchvision import datasets, transforms  # Import datasets and transforms
from torchvision.utils import save_image, make_grid  # Import utility to save images
import torchvision  # Import torchvision library
import matplotlib.pyplot as plt  # Import plotting library
import os  # Import os module for file operations
import numpy as np  # Import numpy library
from torch.utils.data import Dataset  # Add this import at the top
from PIL import Image  # Import PIL Image module for image handling
import torch.nn.functional as F  # Import PyTorch's functional API for loss functions

In [None]:
# Set device to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # Set device
print(f"Using device: {device}")  # Print the device being used

## Define the DDPM model

In [None]:
class UNet(nn.Module):
    """U-Net architecture for noise prediction in diffusion models with built-in residual connections, optimized for 128x128 RGB images"""
    def __init__(self, in_channels=3, time_dim=256):  # Modified for RGB input (3 channels)
        super().__init__()

        # Pooling and activation layers used throughout the network
        self.pool = nn.MaxPool2d(2)  # Max pooling for downsampling
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)  # Bilinear upsampling
        self.relu = nn.ReLU()  # ReLU activation function

        # Encoder Block 1 - Input level (128x128)
        self.enc1_conv1 = nn.Conv2d(in_channels, 64, 3, padding=1)  # First conv: (N,3,128,128) -> (N,64,128,128)
        self.enc1_bn1 = nn.BatchNorm2d(64)  # Normalizes each of the 64 channels independently
        self.enc1_conv2 = nn.Conv2d(64, 64, 3, padding=1)  # Second conv: (N,64,128,128) -> (N,64,128,128)
        self.enc1_bn2 = nn.BatchNorm2d(64)  # Batch norm after second conv

        # Encoder Block 2 - After first pooling (64x64)
        self.enc2_conv1 = nn.Conv2d(64, 128, 3, padding=1)  # First conv: (N,64,64,64) -> (N,128,64,64)
        self.enc2_bn1 = nn.BatchNorm2d(128)  # Batch norm after first conv
        self.enc2_conv2 = nn.Conv2d(128, 128, 3, padding=1)  # Second conv: (N,128,64,64) -> (N,128,64,64)
        self.enc2_bn2 = nn.BatchNorm2d(128)  # Batch norm after second conv

        # Encoder Block 3 - After second pooling (32x32)
        self.enc3_conv1 = nn.Conv2d(128, 256, 3, padding=1)  # First conv: (N,128,32,32) -> (N,256,32,32)
        self.enc3_bn1 = nn.BatchNorm2d(256)  # Batch norm after first conv
        self.enc3_conv2 = nn.Conv2d(256, 256, 3, padding=1)  # Second conv: (N,256,32,32) -> (N,256,32,32)
        self.enc3_bn2 = nn.BatchNorm2d(256)  # Batch norm after second conv

        # Decoder Block 3 - First upsampling (32x32 -> 64x64)
        self.dec3_conv1 = nn.Conv2d(384, 128, 3, padding=1)  # First conv: (N,384,64,64) -> (N,128,64,64)
        self.dec3_bn1 = nn.BatchNorm2d(128)  # Batch norm after first conv
        self.dec3_conv2 = nn.Conv2d(128, 128, 3, padding=1)  # Second conv: (N,128,64,64) -> (N,128,64,64)
        self.dec3_bn2 = nn.BatchNorm2d(128)  # Batch norm after second conv

        # Decoder Block 2 - Second upsampling (64x64 -> 128x128)
        self.dec2_conv1 = nn.Conv2d(192, 64, 3, padding=1)  # First conv: (N,192,128,128) -> (N,64,128,128)
        self.dec2_bn1 = nn.BatchNorm2d(64)  # Batch norm after first conv
        self.dec2_conv2 = nn.Conv2d(64, 64, 3, padding=1)  # Second conv: (N,64,128,128) -> (N,64,128,128)
        self.dec2_bn2 = nn.BatchNorm2d(64)  # Batch norm after second conv

        # Final output layer
        self.final_conv = nn.Conv2d(64, in_channels, kernel_size=1)  # Final conv: (N,64,128,128) -> (N,3,128,128)

        # Time embedding dimension and projection
        self.time_dim = time_dim  # Time embedding dimension

        # Define MLPs as model parameters
        self.time_enc1 = nn.Sequential(nn.Linear(time_dim, 64), nn.SiLU(), nn.Linear(64, 64))  # Time embedding MLP for encoder block 1
        self.time_enc2 = nn.Sequential(nn.Linear(time_dim, 128), nn.SiLU(), nn.Linear(128, 128))  # Time embedding MLP for encoder block 2
        self.time_enc3 = nn.Sequential(nn.Linear(time_dim, 256), nn.SiLU(), nn.Linear(256, 256))  # Time embedding MLP for encoder block 3
        self.time_dec3 = nn.Sequential(nn.Linear(time_dim, 128), nn.SiLU(), nn.Linear(128, 128))  # Time embedding MLP for decoder block 3
        self.time_dec2 = nn.Sequential(nn.Linear(time_dim, 64), nn.SiLU(), nn.Linear(64, 64))  # Time embedding MLP for decoder block 2

    def get_time_embedding(self, t):
        """Generate sinusoidal time embedding and project through MLPs for each block

        Args:
            t: Time tensor of shape (batch_size, 1)

        Returns:
            Dictionary containing time embeddings for each block
        """
        half_dim = self.time_dim // 2  # Calculate half dimension for sin/cos embeddings
        embeddings = torch.arange(half_dim, device=t.device).float()  # Create position indices
        embeddings = torch.exp(-math.log(10000) * embeddings / half_dim)  # Calculate frequency bands
        embeddings = t * embeddings.unsqueeze(0)  # Shape: (batch_size, half_dim)
        embeddings = torch.cat([torch.sin(embeddings), torch.cos(embeddings)], dim=-1)  # Shape: (batch_size, time_dim)

        # Use the class MLPs instead of creating new ones
        t_emb_enc1 = self.time_enc1(embeddings).unsqueeze(-1).unsqueeze(-1)  # Use class MLP
        t_emb_enc2 = self.time_enc2(embeddings).unsqueeze(-1).unsqueeze(-1)  # Use class MLP
        t_emb_enc3 = self.time_enc3(embeddings).unsqueeze(-1).unsqueeze(-1)  # Use class MLP
        t_emb_dec3 = self.time_dec3(embeddings).unsqueeze(-1).unsqueeze(-1)  # Use class MLP
        t_emb_dec2 = self.time_dec2(embeddings).unsqueeze(-1).unsqueeze(-1)  # Use class MLP

        return {
            'enc1': t_emb_enc1,  # Time embedding for encoder block 1
            'enc2': t_emb_enc2,  # Time embedding for encoder block 2
            'enc3': t_emb_enc3,  # Time embedding for encoder block 3
            'dec3': t_emb_dec3,  # Time embedding for decoder block 3
            'dec2': t_emb_dec2   # Time embedding for decoder block 2
        }

    def forward(self, x, t):
        """Forward pass through U-Net optimized for 128x128 RGB input with time embeddings at each block"""
        # Time embedding
        t = t.unsqueeze(-1).float()  # Ensure time is in correct shape
        t_embs = self.get_time_embedding(t)  # Get time embeddings for each block

        # Encoder pathway with skip connections and time embeddings
        # Encoder Block 1 (128x128)
        e1 = self.relu(self.enc1_bn1(self.enc1_conv1(x)))  # First conv layer
        e1 = self.relu(self.enc1_bn2(self.enc1_conv2(e1)))  # Second conv layer with ReLU
        e1 = e1 + t_embs['enc1']  # Add time embedding to encoder block 1

        # Encoder Block 2 (64x64)
        e2 = self.relu(self.enc2_bn1(self.enc2_conv1(self.pool(e1))))  # First conv layer
        e2 = self.relu(self.enc2_bn2(self.enc2_conv2(e2)))  # Second conv layer with ReLU
        e2 = e2 + t_embs['enc2']  # Add time embedding to encoder block 2

        # Encoder Block 3 (32x32)
        e3 = self.relu(self.enc3_bn1(self.enc3_conv1(self.pool(e2))))  # First conv layer
        e3 = self.relu(self.enc3_bn2(self.enc3_conv2(e3)))  # Second conv layer with ReLU
        e3 = e3 + t_embs['enc3']  # Add time embedding to encoder block 3

        # Decoder pathway using skip connections
        # Decoder Block 3 (32x32 -> 64x64)
        d3 = torch.cat([self.upsample(e3), e2], dim=1)  # Concatenate along channel dimension
        d3 = self.relu(self.dec3_bn1(self.dec3_conv1(d3)))  # First conv block
        d3 = self.dec3_bn2(self.dec3_conv2(d3))  # Second conv block
        d3 = d3 + t_embs['dec3']  # Add time embedding to decoder block 3

        # Decoder Block 2 (64x64 -> 128x128)
        d2 = torch.cat([self.upsample(d3), e1], dim=1)  # Concatenate along channel dimension
        d2 = self.relu(self.dec2_bn1(self.dec2_conv1(d2)))  # First conv block
        d2 = self.dec2_bn2(self.dec2_conv2(d2))  # Second conv block
        d2 = d2 + t_embs['dec2']  # Add time embedding to decoder block 2

        return self.final_conv(d2)  # Return final output (N,3,128,128)

## Load the model

In [None]:
# Load the trained model and generate samples
model_path = r"D:\Users\VICTOR\Desktop\ADRL\Assignment 3\unet_model_epoch_14.pth"
loaded_model = UNet().to('cuda')

# Load the checkpoint dictionary with weights_only=True for security
checkpoint = torch.load(model_path, weights_only=True)

# Extract the model state dict from the checkpoint
loaded_model.load_state_dict(checkpoint['model_state_dict'])

# Set model to evaluation mode
loaded_model.eval()

print("Model loaded successfully!")

In [None]:
# Move the model to the appropriate device (GPU or CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
loaded_model = loaded_model.to(device)

print(f"Model moved to {device}")

## DDIM Sampler

The inference process in diffusion models can be done in two ways:

DDPM (Denoising Diffusion Probabilistic Models):
1. Sample from the prior: Start by sampling $x_T \sim \mathcal{N}(0, I)$, which is the prior distribution.

2. Reverse the diffusion process: Sequentially get $x_{t-1}$ from $x_t$ for $t = T, T-1, \ldots, 1$ using
   
    $$x_{t-1} = \frac{1}{\sqrt{\alpha_t}} (x_t - \frac{1-\alpha_t}{\sqrt{1-\bar{\alpha}_t}} \epsilon(x_t, t)) + \sigma_t z$$

    where $z \sim \mathcal{N}(0, I)$ and $\sigma_t^2 = \beta_t = \frac{1-\bar{\alpha}_{t-1}}{1-\bar{\alpha}_t}(1-\alpha_t)$

3. Obtain the final sample: The final sample $x_0$ is obtained after completing the reverse process.


DDIM (Denoising Diffusion Implicit Models):
1. Sample from the prior: Start by sampling $x_T \sim \mathcal{N}(0, I)$, which is the prior distribution.

2. Reverse the diffusion process: Sequentially get $x_{t-1}$ from $x_t$ for selected timesteps using
    
    $$x_{t-1} = \sqrt{\bar{\alpha}_{t-1}} \left(\frac{x_t - \sqrt{1-\bar{\alpha}_t}\epsilon_\theta(x_t,t)}{\sqrt{\bar{\alpha}_t}}\right) + \sqrt{1-\bar{\alpha}_{t-1}}\epsilon_\theta(x_t,t)$$

    where $\epsilon_\theta$ is the learned noise predictor. Unlike DDPM, DDIM is deterministic and does not add
    random noise at each step, allowing for fewer sampling steps while maintaining quality.

3. Obtain the final sample: The final sample $x_0$ is obtained after completing the reverse process using
    significantly fewer steps than DDPM (typically 50-100 vs 1000).

In [None]:
def linear_beta_schedule(timesteps):
    """
    Linear beta schedule as used in the DDPM paper
    Args:
        timesteps: Number of timesteps in the diffusion process
    Returns:
        Tensor of betas
    """
    scale = 1000 / timesteps
    beta_start = scale * 0.0001
    beta_end = scale * 0.02
    return torch.linspace(beta_start, beta_end, timesteps, dtype=torch.float32)

In [None]:
# Define sampling parameters
n_timesteps = 1000  # Original number of timesteps used in training
n_sampling_steps = 100  # Number of steps for DDIM (much less than DDPM)

In [None]:
# Calculate necessary variables
betas = linear_beta_schedule(n_timesteps).to(device)  # Compute beta schedule and move to device
alphas = 1. - betas  # Compute alphas from betas
alphas_cumprod = torch.cumprod(alphas, axis=0)  # Compute cumulative product of alphas
sqrt_alphas_cumprod = torch.sqrt(alphas_cumprod)  # Compute square root of cumulative alphas
sqrt_one_minus_alphas_cumprod = torch.sqrt(1. - alphas_cumprod)  # Compute square root of (1 - cumulative alphas)

# Update timesteps
timesteps = torch.linspace(n_timesteps - 1, 0, n_sampling_steps, dtype=torch.long).to(device)  # Create timestep sequence for sampling

In [None]:
@torch.no_grad()  # Disable gradient calculations for inference
def ddim_sample(model, image_size, batch_size=1, channels=3):
    """
    DDIM sampling function
    Args:
        model: Trained UNet model
        image_size: Size of the output image
        batch_size: Number of images to generate simultaneously
        channels: Number of image channels
    Returns:
        Generated images
    """
    device = next(model.parameters()).device
    # Start from pure noise
    x_t = torch.randn(batch_size, channels, image_size, image_size).to(device)

    # Progress through timesteps in reverse order
    for i in range(len(timesteps) - 1):
        t = timesteps[i]
        t_prev = timesteps[i + 1]

        # Get model prediction (predicted noise)
        pred_noise = model(x_t, t)

        # Extract alphas for current and previous timesteps
        alpha_t = alphas_cumprod[t]
        alpha_t_prev = alphas_cumprod[t_prev]

        # DDIM deterministic sampling formula
        x_0_pred = (x_t - torch.sqrt(1 - alpha_t) * pred_noise) / torch.sqrt(alpha_t)
        x_t = torch.sqrt(alpha_t_prev) * x_0_pred + \
              torch.sqrt(1 - alpha_t_prev) * pred_noise

    return x_t  # Return final generated image

In [None]:
import os
from PIL import Image

# Create output directory if it doesn't exist
output_dir = "generated_images"
os.makedirs(output_dir, exist_ok=True)

# Set generation parameters
image_size = 64  # Adjust based on your training image size
batch_size = 4   # Number of images to generate at once
channels = 3     # RGB images
num_images = 100 # Total number of images to generate

def save_images(images, start_idx):
    """
    Save tensor images as PNG files
    Args:
        images: Tensor of normalized images
        start_idx: Starting index for file naming
    """
    images = denormalize_images(images)
    for i, img in enumerate(images):
        # Convert to PIL Image and save
        img = Image.fromarray((img * 255).astype('uint8'))
        img.save(os.path.join(output_dir, f'generated_image_{start_idx + i:03d}.png'))

# Function to convert tensors to displayable images
def denormalize_images(images):
    """
    Convert normalized tensor images to displayable format
    Args:
        images: Tensor of shape (batch_size, channels, height, width)
    Returns:
        Numpy array of images in range [0, 1]
    """
    images = images.cpu().detach()
    images = (images + 1) / 2  # Convert from [-1, 1] to [0, 1]
    images = images.clamp(0, 1)
    images = images.permute(0, 2, 3, 1)  # Change to (batch, height, width, channels)
    return images.numpy()

# Generate and save images in batches
num_batches = (num_images + batch_size - 1) // batch_size
generated_count = 0

print(f"Generating {num_images} images...")
for batch in range(num_batches):
    # Adjust batch size for the last batch if needed
    current_batch_size = min(batch_size, num_images - generated_count)

    # Generate images
    with torch.no_grad():
        generated_images = ddim_sample(
            model=loaded_model,
            image_size=image_size,
            batch_size=current_batch_size,
            channels=channels
        )

    # Save the generated images
    save_images(generated_images, generated_count)
    generated_count += current_batch_size

    # Print progress
    print(f"Progress: {generated_count}/{num_images} images generated")

print(f"Generation complete. Images saved in '{output_dir}' directory")

## FID Calculation

In [None]:
preprocess = transforms.Compose([
    transforms.Resize((299, 299)),  # Resize images to Inception v3 input size
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize with ImageNet stats ie we will also preprocess the data in same way as the original imagenet dataset that was used to train Inception v3
])  # Define preprocessing steps for Inception v3 input

def prepare_inception_input(images):
    return preprocess(images)  # Apply preprocessing to the input images

In [None]:
from torchvision.models import inception_v3  # Import inception_v3 model from torchvision

def load_inception_model():
    """
    Loads and prepares a pre-trained Inception v3 model for feature extraction

    Returns:
        model: Modified Inception v3 model with final classification layer removed
    """
    model = inception_v3(pretrained=True, transform_input=False)  # Load pre-trained Inception v3 model without input transformation
    model.fc = torch.nn.Identity()  # Removes the last fully connected layer
    model.eval()  # Set the model to evaluation mode
    return model.to(device)  # Move the model to the same device as our GAN

inception_model = load_inception_model()  # Load and prepare the modified Inception v3 model

In [None]:
import glob  # Import glob module for file path operations

def extract_inception_features(image_paths):
    """
    Extracts features from images using Inception v3 model

    Args:
        image_paths: List of paths to image files

    Returns:
        features: Tensor of extracted features from all images
    """
    all_features = []  # Initialize list to store features from all images
    transform = transforms.Compose([
        transforms.ToTensor(),  # Convert image to tensor
        transforms.Resize((299, 299)),  # Resize to inception input size
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize with ImageNet stats
    ])  # Define transformations for input images

    with torch.no_grad():  # Disable gradient computation for inference
        for img_path in image_paths:  # Process each image
            img = Image.open(img_path).convert('RGB')  # Load and convert image to RGB
            img_tensor = transform(img).unsqueeze(0).to(device)  # Transform and add batch dimension
            features = inception_model(img_tensor)  # Extract features using inception model
            all_features.append(features)  # Store features for current image

    return torch.cat(all_features, dim=0)  # Concatenate all features into single tensor

In [None]:
# Get list of generated image paths
generated_image_paths = glob.glob(os.path.join(r"D:\Users\VICTOR\Desktop\ADRL\Assignment 3\generated_images", "*.png"))  # Get paths of all PNG files in specified directory using os.path.join for robust path handling

# Extract features from generated images
generated_features = extract_inception_features(generated_image_paths)  # Extract inception features from generated images
print(f"Extracted features from {len(generated_image_paths)} generated images")  # Print number of processed images
print(f"Feature shape: {generated_features.shape}")  # Print shape of extracted features

# Calculate statistics of generated features
generated_mean = torch.mean(generated_features, dim=0)  # Calculate mean across all samples for each feature dimension
generated_cov = torch.cov(generated_features.T)  # Calculate covariance matrix of features across samples

print(f"Generated features mean shape: {generated_mean.shape}")  # Print shape of mean vector
print(f"Generated features covariance shape: {generated_cov.shape}")  # Print shape of covariance matrix

# Save statistics for later comparison
torch.save(generated_mean, 'generated_mean.pt')  # Save mean vector to disk for future use
torch.save(generated_cov, 'generated_cov.pt')  # Save covariance matrix to disk for future use

In [None]:
# Get list of real image paths and verify they exist
real_image_paths = glob.glob(r"D:\Users\VICTOR\Desktop\ADRL\Assignment 3\Butterfly dataset\*.jpg")[:100]  # Get paths of 100 JPG files from real dataset
if len(real_image_paths) == 0:  # Check if any image paths were found
    raise ValueError("No image files found in the specified directory")  # Raise error if no images found

# Extract features from real images with error handling
try:
    real_features = extract_inception_features(real_image_paths)  # Extract inception features from real images
    print(f"Extracted features from {len(real_image_paths)} real images")  # Print number of processed images
    print(f"Feature shape: {real_features.shape}")  # Print shape of extracted features

    # Calculate statistics of real features
    real_mean = torch.mean(real_features, dim=0)  # Calculate mean across all samples for each feature dimension
    real_cov = torch.cov(real_features.T)  # Calculate covariance matrix of features across samples

    print(f"Real features mean shape: {real_mean.shape}")  # Print shape of mean vector
    print(f"Real features covariance shape: {real_cov.shape}")  # Print shape of covariance matrix

    # Save statistics for later comparison
    torch.save(real_mean, 'real_mean.pt')  # Save mean vector to disk for future use
    torch.save(real_cov, 'real_cov.pt')  # Save covariance matrix to disk for future use

except RuntimeError as e:
    print(f"Error processing images: {str(e)}")  # Print error message if feature extraction fails
    print("Please verify that all images are valid and accessible")  # Provide troubleshooting hint

In [None]:
def calculate_frechet_inception_distance(real_mean, real_cov, generated_mean, generated_cov):
    """
    Calculate the Fréchet Inception Distance (FID) between real and generated image features.

    Args:
    real_mean (torch.Tensor): Mean of real image features.
    real_cov (torch.Tensor): Covariance matrix of real image features.
    generated_mean (torch.Tensor): Mean of generated image features.
    generated_cov (torch.Tensor): Covariance matrix of generated image features.

    Returns:
    float: The calculated FID score.
    """

    # Convert to numpy for scipy operations
    real_mean_np = real_mean.cpu().numpy()  # Convert real mean to numpy array
    real_cov_np = real_cov.cpu().numpy()  # Convert real covariance to numpy array
    generated_mean_np = generated_mean.cpu().numpy()  # Convert generated mean to numpy array
    generated_cov_np = generated_cov.cpu().numpy()  # Convert generated covariance to numpy array

    # Calculate squared L2 norm between means
    mean_diff = np.sum((real_mean_np - generated_mean_np) ** 2)  # Compute squared difference between means

    # Calculate sqrt of product of covariances
    covmean = scipy.linalg.sqrtm(real_cov_np.dot(generated_cov_np))  # Compute matrix square root

    # Check and correct imaginary parts if necessary
    if np.iscomplexobj(covmean):
        covmean = covmean.real  # Take only the real part if result is complex

    # Calculate trace term
    trace_term = np.trace(real_cov_np + generated_cov_np - 2 * covmean)  # Compute trace of the difference

    # Compute FID
    fid = mean_diff + trace_term  # Sum up mean difference and trace term

    return fid  # Return FID as a Python float

In [None]:
import scipy

# Move tensors to CPU and convert to numpy arrays
generated_mean_cpu = generated_mean.cpu()  # Move generated mean tensor to CPU
generated_cov_cpu = generated_cov.cpu()  # Move generated covariance tensor to CPU
real_mean_cpu = real_mean.cpu()  # Move real mean tensor to CPU
real_cov_cpu = real_cov.cpu()  # Move real covariance tensor to CPU

# Calculate FID score using CPU tensors
fid_score = calculate_frechet_inception_distance(real_mean_cpu, real_cov_cpu, generated_mean_cpu, generated_cov_cpu)  # Calculate FID using CPU tensors
print(f"Fréchet Inception Distance: {fid_score:.4f}")  # Print calculated FID score with 4 decimal places