# Basic pipeline

In [None]:
from diffusers import DDPMPipeline

model_id = "/data/duantong/mazipei/HuggingFace-Download-Accelerator/data/hf_cache/hub/models--google--ddpm-cat-256/snapshots/82ca0d5db4a5ec6ff0e9be8d86852490bc18a3d9"
ddpm = DDPMPipeline.from_pretrained(model_id, use_safetensors=True).to("cuda")

image = ddpm(num_inference_steps=25).images[0]
image


# Decompose DDPM pipeline

In [None]:
from diffusers import DDPMScheduler, UNet2DModel

scheduler = DDPMScheduler.from_pretrained(model_id)
model = UNet2DModel.from_pretrained(model_id, use_safetensors=True).to("cuda")

scheduler.set_timesteps(50)

scheduler.timesteps

In [3]:
import torch

sample_size = model.config.sample_size

noise = torch.randn((1, 3, sample_size, sample_size)).to("cuda")
input = noise

for t in scheduler.timesteps:
    with torch.no_grad():
        noisy_scheduler = model(input, t).sample
    previous_noisy_sample = scheduler.step(noisy_scheduler, t, input).prev_sample
    input = previous_noisy_sample

In [None]:
from PIL import Image
import numpy as np

# [-1, 1]  / 2 -> [-0.5, 0.5] + 0.5 -> [0, 1]
image = (input / 2 + 0.5).clamp(0, 1).squeeze()
image = (image.permute(1, 2, 0) * 255).round().to(torch.uint8).cpu().numpy()
image = Image.fromarray(image)
image

# Deconstruct the Stable Diffusion pipeline

DDPM only contains a UNet model; the stable diffusion has three separate components:
1. VAE: for compression and decompression of the image(image space -> latent space -> image space)
2. Text encoder: for encoding the text prompt
3. UNet: for generating the image conditioned on the text prompt

In [None]:
from PIL import Image
import torch
from transformers import CLIPTokenizer, CLIPTextModel
from diffusers import AutoencoderKL, UNet2DConditionModel, PNDMScheduler

model_id = ""
vae = AutoencoderKL.from_pretrained(model_id, subfolder="vae", use_safetensors=True)
tokenizer = CLIPTokenizer.from_pretrained(model_id, subfolder="tokenizer")
text_encoder = CLIPTextModel.from_pretrained(model_id, subfolder="text_encoder", use_safetensors=True)
unet = UNet2DConditionModel.from_pretrained(model_id, subfolder="unet", use_safetensors=True)


## Swap scheduler

In [None]:
from diffusers import UniPCMultistepScheduler

scheduler = UniPCMultistepScheduler.from_pretrained(model_id, subfolder="scheduler")
torch_device = "cuda"
vae.to(torch_device)
text_encoder.to(torch_device)
unet.to(torch_device)

## Create text embeddings

In [None]:
prompt = ["a photograph of an astronaut riding a horse"]
height = 512
width = 512

num_inference_steps = 25
# guidance scale: how much the model should pay attention to the text prompt
# the bigger the scale, the more attention to the text prompt and vice versa
guidance_scale = 7.5
generator = torch.Generator(device=torch_device).manual_seed(0)
batch_size = len(prompt)

In [None]:
text_input = tokenizer(
    prompt, padding="max_length", max_length=tokenizer.model_max_length, truncation=True, return_tensors="pt"
)
with torch.no_grad():
    text_embeddings = text_encoder(text_input.input_ids.to(torch_device))[0]

You’ll also need to generate the unconditional text embeddings which are the embeddings for the padding token. These need to have the same shape (batch_size and seq_length) as the conditional text_embeddings:

In [None]:
max_length = text_input.input_ids.shape[-1]
uncond_input = tokenizer([""] * batch_size, padding="max_length", max_length=max_length, truncation=True, return_tensors="pt")
uncond_embeddings = text_encoder(uncond_input.input_ids.to(torch_device))[0]

Let’s concatenate the conditional and unconditional embeddings into a batch to avoid doing two forward passes:

In [None]:
text_embeddings = torch.cat([uncond_embeddings, text_embeddings])

## Create random noise

💡 The height and width are divided by 8 because the vae model has 3 down-sampling layers. You can check by running the following:
```
2 ** (len(vae.config.block_out_channels) - 1) == 8
```

In [None]:
latents = torch.randn(
    (batch_size, unet.config.in_channels, height // 8, width // 8),
    generator=generator,
    device=torch_device,
)

## Denoise the image
Start by scaling the input with the initial noise distribution, sigma, the noise scale value, which is required for improved schedulers like UniPCMultistepScheduler:

In [None]:
latents = latents * scheduler.init_noise_sigma

The last step is to create the denoising loop that’ll progressively transform the pure noise in latents to an image described by your prompt. Remember, the denoising loop needs to do three things:

1. Set the scheduler’s timesteps to use during denoising.
2. Iterate over the timesteps.
3. At each timestep, scale the model input, call the UNet model to predict the noise residual and pass it to the scheduler to compute the previous noisy sample.

In [None]:
from tqdm.auto import tqdm

scheduler.set_timesteps(num_inference_steps)

for t in tqdm(scheduler.timesteps):
    # expand latents because of the condition and the un-condition
    latent_model_input = torch.cat([latents * 2])
    latent_model_input = scheduler.scale_model_input(latent_model_input, timestep=t)

    # predict the conditional noise
    with torch.no_grad():
        noise_pred = unet(latent_model_input, t, encoder_hidden_states=text_embeddings).sample

    # perform guidance
    noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
    noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

    # 至此，才算完全确定了 noise
    # 下一步才是去噪

    # compute the previous noisy sample x_t -> x_t-1
    latents = scheduler.step(noise_pred, t, latents).prev_sample

## Decode the image

In [None]:
latens = 1 / 0.18215 * latents
with torch.no_grad():
    image = vae.decode(latents).sample

image = (image / 2 + 0.5).clamp(0, 1).squeeze()
image = (image.permute(1, 2, 0) * 255).to(torch.uint8).cpu().numpy()
image = Image.fromarray(image)
image