In [None]:
!pip install -qU diffusers transformers huggingface_hub

In [None]:
from huggingface_hub import notebook_login
notebook_login()

# Pipeline callbacks

The denoising loop of a pipeline can be modified with custom define functions using the `callback_on_step_end` parameter. The callback function is executed at the end of each step, and modifies the pipeline attributes and variables for the next step.

## Official callbacks

* `SDCFGCutoffCallback`: disables the CFG after a certain number of steps for all SD 1.5 pipelines, including text-to-image, image-to-image, inpaint, and controlnet
* `SDXLCFGCutoffCallback`: disables the CFG after a certain number of steps for all SDXL pipelines, including text-to-image, image-to-image, inpaint, and controlnet
* `IPAdapterScaleCutoffCallback`: disables the IP Adapter after a certain number of steps for all pipelines supporting IP-Adapter.

To set up a callback, we need to specify the number of denoising steps after which the calllback comes into effect:
* `cutoff_step_ratio`: float number with the ratio of the steps
* `cutoff_step_index`: integer number with the exact number of the steps

In [None]:
import torch
from diffusers import DPMSolverMultistepScheduler, StableDiffusionXLPipeline
from diffusers.callbacks import SDXLCFGCutoffCallback

callback = SDXLCFGCutoffCallback(cutoff_step_ratio=0.4)
# we can also use `cutoff_step_index`
#callback = SDXLCFGCutoffCallback(cutoff_step_ratio=None, cutoff_step_index=10)

pipeline = StableDiffusionXLPipeline.from_pretrained(
    'stabilityai/stable-diffusion-xl-base-1.0',
    torch_dtype=torch.float16,
    variant='fp16'
).to('cuda')
pipeline.scheduler = DPMSolverMultistepScheduler.from_config(
    pipeline.scheduler.config,
    use_karras_sigmas=True
)

In [None]:
prompt = "a sports car at the road, best quality, high quality, high detail, 8k resolution"
generator = torch.Generator(device="cpu").manual_seed(111)

image = pipeline(
    prompt=prompt,
    negative_prompt="",
    guidance_scale=6.5,
    num_inference_steps=25,
    generator=generator,
    callback_on_step_end=callback, # add callback here
).images[0]
image

In [None]:
image = pipeline(
    prompt=prompt,
    negative_prompt="",
    guidance_scale=6.5,
    num_inference_steps=25,
    generator=generator,

).images[0]
image

## Dynamic classifier-free guidance

**Dynamic classifier-free guidance (CFG)** is a feature that allows us to disable CFG after a certain number of inference steps which can help us save compute with minimal cost to performance.

The callback function should have the following
* `pipeline` provides access to important properties such as `num_timesteps` and `guidance_scale`
* `step_index` and `timestep` tell us where we are in the denoising loop
* `callback_kwargs` is a dictionary that contains tensor variables we can modify during the denoising loop.

Example of callback function:

In [None]:
def callback_dynamic_cfg(pipeline, step_index, timestep, callback_kwargs):
    # adjust the batch_size of prompt_embeds according to guidance_scale
    if step_index == int(pipeline.num_timesteps * 0.4):
        # turn off CFG after reaching 40% of num_timesteps
        prompt_embeds = callback_kwargs['prompt_embeds']
        prompt_embeds = prompt_embeds.chunk(2)[-1]
        # update guidance_scale and prompt_embeds
        pipeline._guidance_scale = 0.0
        callback_kwargs['prompt_embeds'] = prompt_embeds

    return callback_kwargs

Now we can pass the callback function to the `callback_on_step_end` parameter and the `prompt_embeds` to `callback_on_step_end_tensor_inputs`:

In [None]:
from diffusers import StableDiffusionPipeline
import torch

pipeline = StableDiffusionPipeline.from_pretrained(
    'stable-diffusion-v1-5/stable-diffusion-v1-5',
    torch_dtype=torch.float16
).to('cuda')

In [None]:
prompt = "a photo of an astronaut riding a horse on mars"
generator = torch.Generator('cuda').manual_seed(111)

image = pipeline(
    prompt,
    generator=generator,
    callback_on_step_end=callback_dynamic_cfg,
    callback_on_step_end_tensor_inputs=['prompt_embeds']
).images[0]
image

## Interrupt the diffusion process

Stopping the diffusion process early is useful when building UIs that work with Diffusers because it allows users to stop the generation process if they are unhappy with the intermediate results.

This callback function should take the following: `pipeline`, `i`, `t`, and `callback_kwargs`. Set the pipeline's `_interrupt` attribute to `True` to stop the diffusion process after a certain number of steps.

Here, the diffusion process is stopped after 10 steps even though `num_inference_steps` is set to 50.

In [None]:
from diffusers import StableDiffusionPipeline

pipeline = StableDiffusionPipeline.from_pretrained("stable-diffusion-v1-5/stable-diffusion-v1-5")
pipeline.enable_model_cpu_offload()
num_inference_steps = 50

In [None]:
def interrupt_callback(pipeline, i, t, callback_kwargs):
    stop_idx = 10

    if i == stop_idx:
        pipeline._interrupt = True

    return callback_kwargs


image = pipeline(
    'A photo of a cat',
    num_inference_steps=num_inference_steps,
    callback_on_step_end=interrupt_callback
).images[0]
image

## Display image after each generation step

Display an image after each generation step by accessing and converting the latents after each step into an image. The latent space is compressed to 128x128, so the images are also 128x128 which is useful for a quick preview.

1. Use the function below to convert the SDXL latents (4 channels) to RGB tensors (3 channels)

In [None]:
import torch
from PIL import Image

def latents_to_rgb(latents):
    weights = (
        (60, -60, 25, -70),
        (60,  -5, 15, -50),
        (60,  10, -5, -35),
    )

    weights_tensor = torch.t(torch.tensor(weights, dtype=latents.dtype).to(latents.device))
    biases_tensor = torch.tensor((150, 140, 130), dtype=latents.dtype).to(latents.device)

    rgb_tensor = torch.einsum("...lxy,lr -> ...rxy", latents, weights_tensor) + biases_tensor.unsqueeze(-1).unsqueeze(-1)
    image_array = rgb_tensor.clamp(0, 255).byte().cpu().numpy().transpose(1,2,0)

    return Image.fromarray(image_array)

2. Create a function to decode and save the latents into an image

In [None]:
def decode_tensors(pipeline, step, timestep, callback_kwargs):
    latents = callback_kwargs['latents']

    image = latents_to_rgb(latents[0])
    image.save(f"step_{step}.png")

    return callback_kwargs

3. Pass the `decode_tensors` function to the `callback_on_step_end` parameter to decode the tensors after each step.

In [None]:
from diffusers import AutoPipelineForText2Image

pipeline = AutoPipelineForText2Image.from_pretrained(
    'stabilityai/stable-diffusion-xl-base-1.0',
    torch_dtype=torch.float16,
    variant='fp16',
    use_safetensors=True
).to('cuda')

In [None]:
image = pipeline(
    prompt="A croissant shaped like a cute bear",
    negative_prompt="deformed, ugly, bad anatomy",
    callback_on_step_end=decode_tensors,
    callback_on_step_end_tensor_inputs=['latents']
).images[0]