# Setup Environment and Dependencies
Install and import required libraries including diffusers, torch, datasets, and transformers. Setup training device (CPU/GPU).

In [None]:
# Install required libraries
!pip install diffusers torch datasets transformers

# Import required libraries
import torch
from diffusers import DDPMPipeline, DDPMScheduler, DDPMPipeline
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModel

# Setup training device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Prepare Custom Dataset
Load and preprocess custom images, create a custom dataset class, and set up the data loader for training.

In [None]:
import os
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

# Define a custom dataset class
class CustomImageDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.image_paths = [os.path.join(image_dir, img) for img in os.listdir(image_dir) if img.endswith(('png', 'jpg', 'jpeg'))]

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image

# Define image transformations
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
])

# Load custom dataset
image_dir = 'path_to_your_image_directory'  # Replace with the path to your image directory
dataset = CustomImageDataset(image_dir=image_dir, transform=transform)

# Create data loader
batch_size = 16
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=4)

# Display the number of images loaded
print(f"Number of images loaded: {len(dataset)}")

# Define DDPM Model Architecture
Configure the DDPM model architecture using UNet2DModel and set hyperparameters like image size, timesteps, and batch size.

In [None]:
from diffusers import UNet2DModel

# Define DDPM model architecture
model = UNet2DModel(
    sample_size=256,  # the target image size
    in_channels=3,    # number of input channels (3 for RGB images)
    out_channels=3,   # number of output channels
    layers_per_block=2,  # number of layers per block
    block_out_channels=(64, 128, 256, 512),  # number of output channels for each block
    down_block_types=("DownBlock2D", "DownBlock2D", "DownBlock2D", "AttnDownBlock2D"),  # types of down blocks
    up_block_types=("UpBlock2D", "UpBlock2D", "UpBlock2D", "AttnUpBlock2D")  # types of up blocks
)

# Set hyperparameters
num_timesteps = 1000  # number of timesteps for the diffusion process
learning_rate = 1e-4  # learning rate for the optimizer
num_epochs = 10  # number of training epochs

# Move model to the training device
model.to(device)

# Print model architecture
print(model)

# Create Training Pipeline
Set up the DDPMPipeline for training, including noise scheduler and optimizer configuration.

In [None]:
# Define the noise scheduler
noise_scheduler = DDPMScheduler(num_train_timesteps=num_timesteps)

# Define the optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    model.train()
    for step, batch in enumerate(data_loader):
        # Move batch to the training device
        batch = batch.to(device)
        
        # Sample noise
        noise = torch.randn(batch.shape).to(device)
        
        # Sample a random timestep for each image
        timesteps = torch.randint(0, num_timesteps, (batch_size,), device=device).long()
        
        # Add noise to the images according to the noise scheduler
        noisy_images = noise_scheduler.add_noise(batch, noise, timesteps)
        
        # Predict the noise residual
        noise_pred = model(noisy_images, timesteps).sample
        
        # Calculate the loss
        loss = torch.nn.functional.mse_loss(noise_pred, noise)
        
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Print loss every 100 steps
        if step % 100 == 0:
            print(f"Epoch {epoch+1}/{num_epochs}, Step {step}/{len(data_loader)}, Loss: {loss.item()}")

# Train the Model
Implement the training loop with forward pass, loss calculation, and backward propagation. Include progress tracking and model checkpointing.

In [None]:
# Training loop with progress tracking and model checkpointing
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    for step, batch in enumerate(data_loader):
        # Move batch to the training device
        batch = batch.to(device)
        
        # Sample noise
        noise = torch.randn(batch.shape).to(device)
        
        # Sample a random timestep for each image
        timesteps = torch.randint(0, num_timesteps, (batch_size,), device=device).long()
        
        # Add noise to the images according to the noise scheduler
        noisy_images = noise_scheduler.add_noise(batch, noise, timesteps)
        
        # Predict the noise residual
        noise_pred = model(noisy_images, timesteps).sample
        
        # Calculate the loss
        loss = torch.nn.functional.mse_loss(noise_pred, noise)
        
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Accumulate loss
        epoch_loss += loss.item()
        
        # Print loss every 100 steps
        if step % 100 == 0:
            print(f"Epoch {epoch+1}/{num_epochs}, Step {step}/{len(data_loader)}, Loss: {loss.item()}")
    
    # Calculate average loss for the epoch
    avg_epoch_loss = epoch_loss / len(data_loader)
    print(f"Epoch {epoch+1} completed. Average Loss: {avg_epoch_loss}")
    
    # Save model checkpoint
    checkpoint_path = f"ddpm_epoch_{epoch+1}.pth"
    torch.save(model.state_dict(), checkpoint_path)
    print(f"Model checkpoint saved at {checkpoint_path}")

# Save and Test Model
Save the trained model and demonstrate how to generate new images using the custom-trained pipeline.

In [None]:
# Save the final trained model
final_model_path = "ddpm_final.pth"
torch.save(model.state_dict(), final_model_path)
print(f"Final model saved at {final_model_path}")

# Load the trained model for inference
model.load_state_dict(torch.load(final_model_path))
model.eval()

# Define a function to generate new images
def generate_images(model, num_images=1, num_inference_steps=120):
    pipeline = DDPMPipeline(unet=model, scheduler=noise_scheduler)
    pipeline.to(device)
    generated_images = []
    for _ in range(num_images):
        image = pipeline(num_inference_steps=num_inference_steps).images[0]
        generated_images.append(image)
    return generated_images

# Generate and display new images
num_images_to_generate = 5
generated_images = generate_images(model, num_images=num_images_to_generate)

# Display the generated images
import matplotlib.pyplot as plt

fig, axes = plt.subplots(1, num_images_to_generate, figsize=(15, 5))
for i, img in enumerate(generated_images):
    axes[i].imshow(img)
    axes[i].axis("off")
plt.show()