<a href="https://colab.research.google.com/github/theofarouk/IMDA/blob/main/Intro_Diffusion.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Introduction to Diffusion Models for Text-to-Image Generation**

---

### **<p style="text-align: center; text-decoration: underline;">Introduction to Diffusion Models</p>**
# **<p style="text-align: center;">Text-to-Image Generation with Diffusion Models</p>**

---

> Tutor: *Omar IKNE*

> Master 2, IMT Nord Europe

---

### ■ **Overview**

In this notebook, we will explore the fascinating world of diffusion models for text-to-image generation. Diffusion models have revolutionized image generation by learning to reverse a gradual noising process, creating highly realistic images from random noise. We'll build a simple diffusion model from scratch and train it on a small dataset to understand the fundamental concepts.

**Main Task: Text-to-Image Generation**

![Diffusion Process](https://imgs.search.brave.com/HQ6f8BLM8vHBXJWwYv7MdMbq6ddhGU-pZOKWQnYCP58/rs:fit:860:0:0:0/g:ce/aHR0cHM6Ly9pbWFn/ZXMucHJpc21pYy5p/by9lbmNvcmQvZjAz/ZWQ5OWItOTBkMi00/NDQxLTg1MzctYThh/YTBhZjIzMjQ2X2lt/YWdlMTIucG5nP2F1/dG89Y29tcHJlc3Ms/Zm9ybWF0)

### ■ **Contents**

- [1. Preliminaries](#section1)
- [2. Understanding Diffusion Models](#section2)
- [3. Dataset Preparation](#section3)
- [4. Building the Diffusion Model](#section4)
- [5. Training the Model](#section5)
- [6. Text-to-Image Generation](#section6)
- [7. Model Evaluation](#section7)

---

### ■ **Libraries**

In [None]:
# Install required packages
# !pip install torch torchvision matplotlib pillow tqdm numpy
!pip install datasets transformers

In [None]:
# Import dependencies
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision
import torchvision.transforms as transforms
from torchvision.utils import make_grid

# Basic dependencies
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import random
import os
from tqdm import tqdm
from copy import deepcopy
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModel

# Set random seeds for reproducibility
def seed_everything(seed=42):
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything()


### ■ **<a name="section1">1. Preliminaries</a>** [(&#8593;)](#content)

Diffusion models are a class of generative models that have shown remarkable performance in image generation tasks. The core idea is to learn a process that gradually transforms random noise into realistic images.

#### **Key Concepts:**

1. **Forward Process (Diffusion):** Gradually add noise to an image until it becomes pure Gaussian noise
2. **Reverse Process (Denoising):** Learn to reverse the noise addition process to generate images from noise
3. **Noise Schedule:** Controls how much noise is added at each step
4. **Text Conditioning:** Using text prompts to guide the image generation process

#### **Mathematical Foundation:**

The diffusion process can be described as a Markov chain:
- Forward: $q(x_t|x_{t-1}) = \mathcal{N}(x_t; \sqrt{1-\beta_t}x_{t-1}, \beta_t I)$
- Reverse: $p_\theta(x_{t-1}|x_t) = \mathcal{N}(x_{t-1}; \mu_\theta(x_t, t), \Sigma_\theta(x_t, t))$

Where $\beta_t$ is the noise schedule and $\theta$ represents the model parameters.

---

### ■ **<a name="section2">2. Understanding Diffusion Models</a>** [(&#8593;)](#content)

Let's first understand the core concepts through implementation before building our complete model.

#### **Question 1: What is the forward diffusion process?**

The forward process gradually adds Gaussian noise to an image over multiple timesteps.


In [None]:
import requests

def download_image(image_url, file_dir):
    response = requests.get(image_url)
    if response.status_code == 200:
        directory = os.path.dirname(file_dir)
        if not os.path.exists(directory):
            os.makedirs(directory)
        with open(file_dir, "wb") as fp:
            fp.write(response.content)
        print("Image downloaded successfully.")
    else:
        print(f"Failed to download the image. Status code: {response.status_code}")

# Download an example image
image_url = "https://img-9gag-fun.9cache.com/photo/aPg3MoB_460s.jpg"
file_dir = "./image.jpg"
download_image(image_url, file_dir)

In [None]:
def linear_beta_schedule(timesteps, start=0.0001, end=0.02):
    """Linear noise schedule for the diffusion process"""
    return torch.linspace(start, end, timesteps)

def forward_diffusion_sample(x_0, t, betas, device="cpu"):
    """
    Sample from q(x_t | x_0) using the reparameterization trick

    Args:
        x_0: Original image (batch_size, channels, height, width)
        t: Timestep (batch_size,)
        betas: Noise schedule (timesteps,)
        device: Device to run on

    Returns:
        x_t: Noisy image at timestep t
        noise: The noise that was added
    """
    # Extract sqrt(alpha_bar) and sqrt(1-alpha_bar) for timestep t
    t = t.to(device)
    sqrt_alphas_cumprod = torch.sqrt(1. - betas).cumprod(dim=0).to(device)
    sqrt_one_minus_alphas_cumprod = torch.sqrt(1. - sqrt_alphas_cumprod ** 2)

    # Gather the appropriate values for timestep t
    sqrt_alphas_cumprod_t = sqrt_alphas_cumprod[t].to(device)
    sqrt_one_minus_alphas_cumprod_t = sqrt_one_minus_alphas_cumprod[t].to(device)

    # Reshape for broadcasting
    sqrt_alphas_cumprod_t = sqrt_alphas_cumprod_t[:, None, None, None]
    sqrt_one_minus_alphas_cumprod_t = sqrt_one_minus_alphas_cumprod_t[:, None, None, None]

    # Sample noise
    noise = torch.randn_like(x_0).to(device)

    # Forward diffusion: x_t = sqrt(alpha_bar_t) * x_0 + sqrt(1-alpha_bar_t) * epsilon
    x_t = sqrt_alphas_cumprod_t * x_0 + sqrt_one_minus_alphas_cumprod_t * noise

    return x_t, noise

# Let's test the forward diffusion process
def visualize_forward_process():
    """Visualize the forward diffusion process on a sample image"""
    # Load a sample image
    transform = transforms.Compose([
        transforms.Resize((64, 64)),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))  # Normalize to [-1, 1]
    ])

    # Use a sample image (you can replace this with any image)
    sample_image = torch.randn(1, 3, 64, 64)  # Random image for demonstration

    # Set up diffusion parameters
    timesteps = 1000
    betas = linear_beta_schedule(timesteps)

    # Select specific timesteps to visualize
    viz_timesteps = [0, 50, 100, 200, 500, 999]

    fig, axes = plt.subplots(1, len(viz_timesteps), figsize=(15, 3))

    for i, t in enumerate(viz_timesteps):
        x_t, noise = forward_diffusion_sample(sample_image, torch.tensor([t]), betas)

        # Denormalize for visualization
        img = x_t[0].permute(1, 2, 0)
        img = (img * 0.5 + 0.5).clamp(0, 1)

        axes[i].imshow(img)
        axes[i].set_title(f't = {t}')
        axes[i].axis('off')

    plt.tight_layout()
    plt.show()

# Let's test the forward diffusion process
def visualize_forward_process_image(image_path):
    """Visualize the forward diffusion process on a sample image"""
    # Load a sample image
    transform = transforms.Compose([
        transforms.Resize((64, 64)),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))  # Normalize to [-1, 1]
    ])

    # Use a sample image (you can replace this with any image)
    sample_image = Image.open(image_path)
    sample_image = transforms.ToTensor()(sample_image)

    # Set up diffusion parameters
    timesteps = 1000
    betas = linear_beta_schedule(timesteps)

    # Select specific timesteps to visualize
    viz_timesteps = [0, 50, 100, 200, 500, 999]

    fig, axes = plt.subplots(1, len(viz_timesteps), figsize=(15, 3))

    for i, t in enumerate(viz_timesteps):
        x_t, noise = forward_diffusion_sample(sample_image, torch.tensor([t]), betas)

        # Denormalize for visualization
        img = x_t[0].permute(1, 2, 0)
        # img = (img * 0.5 + 0.5).clamp(0, 1)

        axes[i].imshow(img)
        axes[i].set_title(f't = {t}')
        axes[i].axis('off')

    plt.tight_layout()
    plt.show()

# run the visualization
image_path = './image.jpg'
visualize_forward_process_image(image_path)

**Question 1.1:** What happens to the image as we increase the timestep `t`?

L'image devient progressivement plus bruitée jusqu'à devenir du bruit gaussien pur à t=999. Plus t augmente, plus l'image originale est dégradée par l'ajout de bruit

**Question 1.2:** Why do we use a noise schedule instead of adding the same amount of noise at each step?

Le noise schedule permet un contrôle progressif du processus de diffusion. Un schedule linéaire ou cosinus assure une transition douce de l'image originale vers le bruit, facilitant l'apprentissage du processus inverse.

---

#### **Question 2: How does the reverse process work?**

The reverse process learns to denoise images. We train a neural network (e.g., UNet) to predict the noise that was added.


![unet](https://i0.wp.com/eviltux.com/wp-content/uploads/2024/08/1.-UNet_What-Is-It-1.png?w=1000&ssl=1)


In [None]:
class SimpleUNet(nn.Module):
    """A simplified U-Net architecture for diffusion models"""

    def __init__(self, in_channels=3, out_channels=3, base_channels=64):
        super(SimpleUNet, self).__init__()

        # Encoder (Downsampling)
        self.enc1 = self._block(in_channels, base_channels)
        self.enc2 = self._block(base_channels, base_channels * 2)
        self.enc3 = self._block(base_channels * 2, base_channels * 4)

        # Bottleneck
        self.bottleneck = self._block(base_channels * 4, base_channels * 8)

        # Decoder (Upsampling)
        self.dec3 = self._block(base_channels * 12, base_channels * 4)  # Skip connection
        self.dec2 = self._block(base_channels * 6, base_channels * 2)   # Skip connection
        self.dec1 = self._block(base_channels * 3, base_channels)       # Skip connection

        # Final convolution
        self.final_conv = nn.Conv2d(base_channels, out_channels, kernel_size=1)

        # Pooling and upsample
        self.pool = nn.MaxPool2d(2)
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

    def _block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        # Encoder
        e1 = self.enc1(x)
        e2 = self.enc2(self.pool(e1))
        e3 = self.enc3(self.pool(e2))

        # Bottleneck
        bottleneck = self.bottleneck(self.pool(e3))

        # Decoder with skip connections
        d3 = self.dec3(torch.cat([self.upsample(bottleneck), e3], dim=1))
        d2 = self.dec2(torch.cat([self.upsample(d3), e2], dim=1))
        d1 = self.dec1(torch.cat([self.upsample(d2), e1], dim=1))

        return self.final_conv(d1)

# Test the model
def test_unet():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = SimpleUNet().to(device)

    # Create a dummy input
    dummy_input = torch.randn(4, 3, 64, 64).to(device)

    # Forward pass
    with torch.no_grad():
        output = model(dummy_input)

    print(f"Input shape: {dummy_input.shape}")
    print(f"Output shape: {output.shape}")
    print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")

test_unet()


**Question 2.1:** What is the purpose of the U-Net architecture in diffusion models?

U-Net prédit le bruit ajouté à l'image à chaque timestep. Son architecture encoder-decoder avec skip connections préserve les détails spatiaux nécessaires pour un débruitage précis

**Question 2.2:** Why are skip connections important in the U-Net architecture?

Les skip connections permettent de préserver l'information haute résolution du encoder vers le decoder, évitant la perte de détails fins lors du débruitage
---

### ■ **<a name="section3">3. Dataset Preparation</a>** [(&#8593;)](#content)

We'll use a simple dataset for our text-to-image generation task. Let's use the [**CIFAR-10**](https://www.cs.toronto.edu/~kriz/cifar.html) dataset with simple text descriptions.

In [None]:
class CIFAR10WithCaptions(torch.utils.data.Dataset):
    """CIFAR-10 dataset with simple text captions"""

    def __init__(self, train=True, image_size=64):
        self.cifar10 = torchvision.datasets.CIFAR10(
            root='./data', train=train, download=True,
            transform=transforms.Compose([
                transforms.Resize((image_size, image_size)),
                transforms.ToTensor(),
                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
            ])
        )

        # Simple text descriptions for CIFAR-10 classes
        self.class_descriptions = {
            0: "an airplane flying in the sky",
            1: "a car on the road",
            2: "a bird perched on a branch",
            3: "a cat sitting on the floor",
            4: "a deer in the forest",
            5: "a dog playing in the park",
            6: "a frog near the pond",
            7: "a horse in the field",
            8: "a ship on the ocean",
            9: "a truck on the highway"
        }

    def __len__(self):
        return len(self.cifar10)

    def __getitem__(self, idx):
        image, label = self.cifar10[idx]
        caption = self.class_descriptions[label]

        return image, caption, label

def visualize_dataset_samples():
    """Visualize some samples from our dataset"""
    dataset = CIFAR10WithCaptions(train=True)

    fig, axes = plt.subplots(2, 5, figsize=(15, 6))

    for i in range(10):
        image, caption, label = dataset[i]

        # Denormalize image
        img = image.permute(1, 2, 0)
        img = (img * 0.5 + 0.5).clamp(0, 1)

        ax = axes[i // 5, i % 5]
        ax.imshow(img)
        ax.set_title(caption, fontsize=8)
        ax.axis('off')

    plt.tight_layout()
    plt.show()

# Visualize dataset samples
visualize_dataset_samples()


**Question 3.1:** Why did we choose CIFAR-10 for this introductory lab?

CIFAR-10 est idéal pour l'apprentissage car il contient des images 32x32 simples, un dataset compact (60K images), et 10 classes bien définies permettant des descriptions textuelles simples

**Question 3.2:** What are the advantages and limitations of using simple text descriptions?

**Avantages:** Simplicité d'implémentation, correspondance directe classe-description.
**Limitations:** Descriptions peu variées, pas de nuances linguistiques, conditionnement textuel limité.

---

### ■ **<a name="section4">4. Building the Diffusion Model</a>** [(&#8593;)](#content)

Now let's build our complete text-conditioned diffusion model.

In [None]:
class SimpleTextEncoder(nn.Module):
    """A simple text encoder for our captions"""

    def __init__(self, vocab_size=1000, embed_dim=512, hidden_dim=512):
        super(SimpleTextEncoder, self).__init__()

        # Simple embedding based on word positions
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.fc = nn.Sequential(
            nn.Linear(embed_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim)
        )

    def forward(self, text_indices):
        # text_indices shape: [batch_size]
        embeddings = self.embedding(text_indices)  # [batch_size, embed_dim]
        return self.fc(embeddings)  # [batch_size, hidden_dim]

# Fix the diffusion model to properly handle text conditioning
class TextConditionedDiffusionModel(nn.Module):
    """Diffusion model with text conditioning"""

    def __init__(self, timesteps=1000, image_size=64, text_embed_dim=512):
        super(TextConditionedDiffusionModel, self).__init__()

        self.timesteps = timesteps
        self.image_size = image_size

        # Noise schedule
        self.betas = linear_beta_schedule(timesteps)
        self.alphas = 1. - self.betas
        self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
        self.sqrt_alphas_cumprod = torch.sqrt(self.alphas_cumprod)
        self.sqrt_one_minus_alphas_cumprod = torch.sqrt(1. - self.alphas_cumprod)

        # U-Net for denoising
        self.unet = SimpleUNet(in_channels=3 + text_embed_dim)  # Add text channels

        # Timestep embedding
        self.timestep_embedding = nn.Sequential(
            nn.Linear(1, 128),
            nn.ReLU(),
            nn.Linear(128, text_embed_dim)
        )

    def forward(self, x, t, text_embeddings):
        """
        Forward pass of the diffusion model

        Args:
            x: Noisy images [batch_size, 3, height, width]
            t: Timesteps [batch_size]
            text_embeddings: Text embeddings [batch_size, embed_dim]
        """
        batch_size = x.shape[0]

        # Add timestep information
        t_embed = self.timestep_embedding(t.unsqueeze(1).float())  # [batch_size, text_embed_dim]

        # Combine timestep and text information
        conditioning = t_embed + text_embeddings  # [batch_size, text_embed_dim]

        # Reshape conditioning to spatial dimensions and concatenate with image
        conditioning_spatial = conditioning.unsqueeze(-1).unsqueeze(-1)  # [batch_size, text_embed_dim, 1, 1]
        conditioning_spatial = conditioning_spatial.repeat(1, 1, self.image_size, self.image_size)  # [batch_size, text_embed_dim, H, W]

        # Concatenate conditioning with noisy image
        x_conditioned = torch.cat([x, conditioning_spatial], dim=1)  # [batch_size, 3 + text_embed_dim, H, W]

        # Predict noise
        predicted_noise = self.unet(x_conditioned)

        return predicted_noise

    def sample_timesteps(self, batch_size):
        """Sample random timesteps for training"""
        return torch.randint(0, self.timesteps, (batch_size,))


# Let's test the model
def test_model():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model = TextConditionedDiffusionModel().to(device)
    text_encoder = SimpleTextEncoder().to(device)

    batch_size = 4

    # Dummy inputs
    x = torch.randn(batch_size, 3, 64, 64).to(device)
    t = torch.randint(0, 1000, (batch_size,)).to(device)
    text_indices = torch.randint(0, 10, (batch_size,)).to(device)

    # Get text embeddings
    text_embeddings = text_encoder(text_indices)

    # Forward pass
    with torch.no_grad():
        predicted_noise = model(x, t, text_embeddings)

    print(f"Noisy image shape: {x.shape}")
    print(f"Text embeddings shape: {text_embeddings.shape}")
    print(f"Predicted noise shape: {predicted_noise.shape}")
    print("✓ Model forward pass successful!")

test_model()


**Question 4.1:** What is the role of the timestep embedding in the diffusion model?

Le timestep embedding informe le modèle sur l'étape de diffusion actuelle (niveau de bruit), permettant d'adapter la prédiction de bruit selon le timestep t

**Question 4.2:** How text conditioning is integrated to guide the image generation process?

Le texte est encodé en embeddings puis combiné spatialement avec l'image bruitée comme canaux additionnels en entrée du U-Net, guidant ainsi la génération

---

### ■ **<a name="section5">5. Training the Model</a>** [(&#8593;)](#content)

Now let's train our diffusion model on the CIFAR-10 dataset with text captions.


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hyperparameters - reduced for quick training
batch_size = 16
learning_rate = 1e-4
epochs = 100  # Very few epochs for demonstration
timesteps = 200  # Reduced timesteps for faster training (typically: 1000)

# Load dataset
dataset = CIFAR10WithCaptions(train=True, image_size=64)
# Use a smaller subset for quick training
subset_indices = torch.randperm(len(dataset)).to(device)[:5000]  # Only 5000 samples
subset_dataset = torch.utils.data.Subset(dataset, subset_indices)
dataloader = DataLoader(subset_dataset, batch_size=batch_size, shuffle=True, num_workers=0)

# Initialize model with reduced timesteps
model = TextConditionedDiffusionModel(timesteps=timesteps).to(device)
text_encoder = SimpleTextEncoder(vocab_size=10, embed_dim=64, hidden_dim=512).to(device)

# Optimizer and loss
optimizer = optim.Adam(list(model.parameters()) + list(text_encoder.parameters()), lr=learning_rate)
criterion = nn.MSELoss()

# Simplified training with proper error handling
def train_simple_diffusion_model():
    """Simplified training loop for our diffusion model"""
    print(f"Using device: {device}")

    # Training loop
    model.train()
    text_encoder.train()

    losses = []

    for epoch in range(epochs):
        epoch_loss = 0
        progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{epochs}")

        for batch_idx, (images, captions, labels) in enumerate(progress_bar):
            images = images.to(device)
            batch_size = images.shape[0]

            # Sample timesteps
            t = model.sample_timesteps(batch_size).to(device)

            # Sample noise
            noise = torch.randn_like(images)

            # Add noise to images (forward process)
            x_t, noise = forward_diffusion_sample(images, t, model.betas, device)

            # Get text embeddings
            text_indices = labels.to(device)
            text_embeddings = text_encoder(text_indices)

            # Predict noise
            predicted_noise = model(x_t, t, text_embeddings)

            # Calculate loss
            loss = criterion(predicted_noise, noise)

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()
            progress_bar.set_postfix({"Loss": f"{loss.item():.4f}"})

        avg_loss = epoch_loss / len(dataloader)
        losses.append(avg_loss)
        print(f"Epoch {epoch+1}, Average Loss: {avg_loss:.4f}")

        ## show a generated sample
        demo_generation()

    # Plot training loss
    plt.figure(figsize=(10, 5))
    plt.plot(losses)
    plt.title("Training Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.grid(True)
    plt.show()

    # return model, text_encoder

# Let's also create a function to demonstrate the training concept without actual training
def demonstrate_diffusion_concepts():
    """Demonstrate diffusion concepts without full training"""

    print("=== Diffusion Model Concepts Demonstration ===")

    # 1. Show forward diffusion process
    print("\n1. Forward Diffusion Process:")
    visualize_forward_process()

    # 2. Show model architecture
    print("\n2. Model Architecture:")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = TextConditionedDiffusionModel(timesteps=200).to(device)
    text_encoder = SimpleTextEncoder().to(device)

    total_params = sum(p.numel() for p in model.parameters()) + sum(p.numel() for p in text_encoder.parameters())
    print(f"Total model parameters: {total_params:,}")

    # 3. Show dataset samples
    print("\n3. Dataset Samples:")
    visualize_dataset_samples()

    # 4. Explain the training process
    print("\n4. Training Process Explanation:")
    print("   - Sample random timestep t")
    print("   - Add noise to image: x_t = sqrt(alpha_t) * x_0 + sqrt(1-alpha_t) * epsilon")
    print("   - Predict the noise using U-Net")
    print("   - Compute MSE loss between predicted and actual noise")
    print("   - Backpropagate and update weights")

    # 5. Show what a trained model could generate
    print("\n5. Generation Process:")
    print("   - Start from random noise x_T ~ N(0, I)")
    print("   - For t = T to 1:")
    print("     - Predict noise: epsilon_theta = model(x_t, t, text_embedding)")
    print("     - Compute x_{t-1} using the reverse process")
    print("   - Final result: x_0 (generated image)")

# Run the demonstration
demonstrate_diffusion_concepts()

## **Training Function**

In [None]:
@torch.no_grad()
def generate_images(model, text_encoder, captions, num_images=4, image_size=64, device="cpu"):
    """
    Generate images from text captions using the trained diffusion model
    """
    model.eval()
    text_encoder.eval()

    # Convert captions to embeddings (simplified)
    # In practice, you would use a proper text tokenizer and encoder
    caption_indices = torch.randint(0, 10, (num_images,))  # Using class indices as proxy
    text_embeddings = text_encoder(caption_indices.to(device))

    # Start from random noise
    x = torch.randn(num_images, 3, image_size, image_size).to(device)

    # Reverse diffusion process
    for t in tqdm(reversed(range(model.timesteps)), desc="Generating images"):
        t_batch = torch.full((num_images,), t, device=device, dtype=torch.long)

        # Predict noise
        predicted_noise = model(x, t_batch, text_embeddings)

        # Get alpha and beta parameters
        alpha_t = model.alphas[t]
        alpha_t_cumprod = model.alphas_cumprod[t]
        beta_t = model.betas[t]

        if t > 0:
            noise = torch.randn_like(x)
        else:
            noise = torch.zeros_like(x)

        # Reverse process step
        x = (1 / torch.sqrt(alpha_t)) * (
            x - ((1 - alpha_t) / torch.sqrt(1 - alpha_t_cumprod)) * predicted_noise
        ) + torch.sqrt(beta_t) * noise

    # Denormalize images
    images = torch.clamp(x, -1, 1)
    images = (images + 1) / 2  # Scale to [0, 1]

    return images, captions

def visualize_generated_images(images, captions):
    """Visualize generated images with their captions"""
    fig, axes = plt.subplots(1, len(images), figsize=(15, 3))

    if len(images) == 1:
        axes = [axes]

    for i, (img, caption) in enumerate(zip(images, captions)):
        img = img.cpu().permute(1, 2, 0)
        axes[i].imshow(img)
        axes[i].set_title(caption, fontsize=10)
        axes[i].axis('off')

    plt.tight_layout()
    plt.show()

# Example generation (using a dummy model for demonstration)
def demo_generation():
    # device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Create dummy model for demonstration
    # In practice, you would use your trained model
    # model = TextConditionedDiffusionModel().to(device)
    # text_encoder = SimpleTextEncoder().to(device)

    # Example captions
    captions = [
        "an airplane flying in the sky",
        "a cat sitting on the floor",
        "a dog playing in the park",
        "a ship on the ocean"
    ]

    print("Generating images from text...")
    generated_images, _ = generate_images(
        model, text_encoder, captions, num_images=4, device=device
    )

    visualize_generated_images(generated_images, captions)

# Run the demo
demo_generation()

In [None]:
## training function
train_simple_diffusion_model()


**Question 5.1:** Why do we use MSE loss between predicted noise and actual noise?

La MSE loss est simple et efficace pour l'apprentissage du débruitage. Elle force le modèle à prédire précisément le bruit ajouté, permettant l'inversion du processus de diffusion

**Question 5.2:** What happens if we use too many or too few timesteps in the diffusion process?

Lorsque c'est trop peu on a unt transition abrupte, apprentissage difficile.
Et lorsque c'est trop on a un processus lent, et de la redondance. Le compromis optimal est 1000 timesteps.

---

### ■ **<a name="section6">6. Text-to-Image Generation</a>** [(&#8593;)](#content)

Now let's use our trained model to generate images from text descriptions!


In [None]:
# Run the demo
demo_generation()


**Question 6.1:** Why do we start the generation process from random noise?

Le processus de diffusion apprend à transformer du bruit gaussien en images. Partir du bruit aléatoire permet d'utiliser le processus inverse appris pour générer de nouvelles images.


**Question 6.2:** What is the role of the reverse process in image generation?

Le processus inverse débruite progressivement l'image en prédisant et soustrayant le bruit à chaque étape, reconstituant une image cohérente à partir du bruit initial.

---

### ■ **<a name="section7">7. Model Evaluation</a>** [(&#8593;)](#content)

Let's evaluate our model and understand its limitations and potential improvements.


In [None]:
def evaluate_model_qualitatively(model, text_encoder, dataset, num_samples=5, device="cpu"):
    """Qualitative evaluation by comparing generated images with real ones"""
    model.eval()
    text_encoder.eval()

    # Get some real samples
    real_images, real_captions, real_labels = [], [], []
    indices = random.sample(range(len(dataset)), num_samples)

    for idx in indices:
        img, caption, label = dataset[idx]
        real_images.append(img)
        real_captions.append(caption)
        real_labels.append(label)

    real_images = torch.stack(real_images).to(device)

    # Generate corresponding images
    generated_images, _ = generate_images(
        model, text_encoder, real_captions, num_images=num_samples, device=device
    )

    # Visualize comparison
    fig, axes = plt.subplots(2, num_samples, figsize=(15, 6))

    for i in range(num_samples):
        # Real image
        real_img = real_images[i].cpu().permute(1, 2, 0)
        real_img = (real_img * 0.5 + 0.5).clamp(0, 1)

        axes[0, i].imshow(real_img)
        axes[0, i].set_title(f"Real: {real_captions[i]}", fontsize=8)
        axes[0, i].axis('off')

        # Generated image
        gen_img = generated_images[i].cpu().permute(1, 2, 0)
        axes[1, i].imshow(gen_img)
        axes[1, i].set_title(f"Generated: {real_captions[i]}", fontsize=8)
        axes[1, i].axis('off')

    plt.tight_layout()
    plt.show()

def analyze_training_components():
    """Analyze different components of the diffusion model"""

    print("=== Diffusion Model Analysis ===")
    print("\n1. Forward Process:")
    print("   - Gradually adds noise to images")
    print("   - Controlled by noise schedule (beta)")
    print("   - Results in pure Gaussian noise after T steps")

    print("\n2. Reverse Process:")
    print("   - Learns to denoise images step by step")
    print("   - Uses U-Net architecture for noise prediction")
    print("   - Conditioned on timestep and text embeddings")

    print("\n3. Text Conditioning:")
    print("   - Guides image generation based on text prompts")
    print("   - Uses text embeddings from encoder")
    print("   - Can be improved with cross-attention mechanisms")

    print("\n4. Training Objective:")
    print("   - Predict the noise added during forward process")
    print("   - Simple MSE loss between predicted and actual noise")
    print("   - Enables stable training of diffusion models")

# Run evaluation and analysis
analyze_training_components()

# Uncomment to run qualitative evaluation (requires trained model)
evaluate_model_qualitatively(model, text_encoder, dataset, device=device)


**Question 7.1:** What are the main limitations of our simple diffusion model?

- **Architecture:** U-Net simplifié sans attention
- **Conditionnement:** Embedding textuel basique, pas de cross-attention
- **Dataset:** CIFAR-10 limité (32x32, 10 classes)
- **Entraînement:** Peu d'époques, sous-ensemble réduit


**Question 7.2:** How could we improve the text conditioning mechanism?

- **Cross-attention:** Intégrer des mécanismes d'attention croisée text-image
- **Encodeur:** Utiliser des modèles pré-entraînés (CLIP, BERT)
- **Architecture:** Transformer-based diffusion models
- **Conditionnement:** Injection à différentes échelles dans le U-Net

# **Demo: Image Editing using InstructPix2Pix**

A demo notebook for [InstructPix2Pix](https://www.timothybrooks.com/instruct-pix2pix/) using [diffusers](https://github.com/huggingface/diffusers). InstructPix2Pix is fine-tuned stable diffusion model which allows you to edit images using language instructions.

<img src='https://instruct-pix2pix.timothybrooks.com/teaser.jpg'/>

In [None]:
# install package from github
!pip install -qqq git+https://github.com/huggingface/diffusers.git gradio transformers accelerate safetensors

#### To load a specific image, put its url bellow

In [None]:
# %%capture
# !wget yout_url

In [None]:
import PIL
import requests
import torch
from diffusers import StableDiffusionInstructPix2PixPipeline, EulerAncestralDiscreteScheduler

model_id = "timbrooks/instruct-pix2pix"
pipe = StableDiffusionInstructPix2PixPipeline.from_pretrained(model_id, torch_dtype=torch.float16, revision="fp16", safety_checker=None)
pipe.to("cuda")
pipe.enable_attention_slicing()

In [None]:
image = PIL.Image.open("./image.jpg")
image = PIL.ImageOps.exif_transpose(image)
image = image.convert("RGB")
image

In [None]:
prompts = ["turn him into cyborg",
    "Make it a picasso painting",
    "as if it were by modigliani",
    "convert to a bronze statue",
    "Turn it into an anime.",
    "have it look like a graphic novel",
    "make him gain weight",
    "what would he look like bald?",
    "Have him smile",
    "Put him in a cocktail party.",
    "move him at the beach.",
    "add dramatic lighting",
    "Convert to black and white",
    "What if it were snowing?",
    "Give him a leather jacket",
    "Turn him into a cyborg!",
    "make him wear a beanie",
    ]
prompt = "Make it a picasso painting"


pipe(prompt, image=image, num_inference_steps=20, image_guidance_scale=1).images[0]