# 第五节 Stable Diffusion

In [None]:
import torch
import requests
from PIL import Image
from io import BytesIO
from matplotlib import pyplot as plt
# 这次要探索的管线比较多
from diffusers import (
    StableDiffusionPipeline, 
    StableDiffusionImg2ImgPipeline,
    StableDiffusionInpaintPipeline, 
    StableDiffusionDepth2ImgPipeline
)
# 因为要用到的展示图片较多，所以我们写了一个旨在下载图片的函数
@retry_on_exception
def download_image(url):
    response = requests.get(url)
    return Image.open(BytesIO(response.content)).convert("RGB")
# Inpainting需要用到的图片
img_url = "https://raw.githubusercontent.com/CompVis/latent-diffusion/main/data/inpainting_examples/overture-creations-5sI6fQgYIuo.png"
mask_url = "https://raw.githubusercontent.com/CompVis/latent-diffusion/main/data/inpainting_examples/overture-creations-5sI6fQgYIuo_mask.png"

init_image = download_image(img_url).resize((512, 512))
mask_image = download_image(mask_url).resize((512, 512))

device = (
    "mps" if torch.backends.mps.is_available()
    else "cuda" if torch.cuda.is_available()
    else "cpu"
)

In [None]:
# 载入管线
model_id = "stabilityai/stable-diffusion-2-1-base"
cache_dir = r'E:\model_zoo\huggingface\diffusers'
# 如果显存不足请指定revision="fp16",torch_dtype=torch.float16
@retry_on_exception
def load_stable_diffusion_pipeline(model_id, cache_dir, torch_dtype=torch.float32, device='cuda'):
    return StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=torch_dtype, cache_dir=cache_dir).to(device)

pipe = load_stable_diffusion_pipeline(model_id, cache_dir, device=device)

In [None]:
# 注意力切分功能，降低速度减少GPU占用
# pipe.enable_attention_slicing()

In [None]:
# 给生成器设置一个随机种子，这样可以保证结果的可复现性
generator = torch.Generator(device=device).manual_seed(42)
# 运行这个管线
pipe_output = pipe(
    prompt="Palette knife painting of an autumn cityscape",
    # 提示文字：哪些要生成
    negative_prompt="Oversaturated, blurry, low quality",
    # 提示文字：哪些不要生成
    height=640, width=640,     # 定义所生成图片的尺寸
    guidance_scale=8,          # 提示文字的影响程度
    num_inference_steps=50,    # 定义一次生成需要多少个推理步骤
    generator=generator        # 设定随机种子的生成器
)
# 查看生成结果
pipe_output.images[0]

In [None]:
# 主要的调节参数如下：
# ● width和height用于指定所生成图片的尺寸，注意它们必须是能被8整除的数字，因为只有这样，VAE才能正常工作（原因稍后介绍）。
# ● 步数num_inference_steps也会影响所生成图片的质量，采用默认设置50即可，你也可以尝试将其设置为20并观察效果。
# ● negative_prompt用于强调不希望生成的内容，这个参数一般在无分类器引导的情况下使用。这种添加额外控制的方式特别有效：
#    列出一些不想要的特征，以帮助生成更好的结果。
# ● guidance_scale 决定了无分类器引导的影响强度。增大这个参数可以使生成的内容更接近给出的文本提示语；
#    但如果该参数过大，则可能导致结果过于饱和，不美观。

In [None]:
# 加大guidance_scale参数的作用
cfg_scales = [1.1, 8, 12] 
prompt = "A collie with a pink hat" 
fig, axs = plt.subplots(1, len(cfg_scales), figsize=(16, 5))
for i, ax in enumerate(axs):
    im = pipe(prompt, height=480, width=480, 
              guidance_scale=cfg_scales[i], num_inference_steps=35, 
              generator=torch.Generator(device=device).manual_seed(42)).images[0] 
    ax.imshow(im); ax.set_title(f'CFG Scale {cfg_scales[i]}')
    
# 一般来说 guidance_scale 在 8-12 是不错的选择

In [None]:
# Stable Diffusion Pipeline

In [None]:
print(list(pipe.components.keys()))

In [None]:
# VAE（可变分自编码器）

# 创建取值区间为(-1, 1)的伪数据
images = torch.rand(1, 3, 512, 512).to(device) * 2 - 1 
print("Input images shape:", images.shape)
# 编码到隐空间
with torch.no_grad():
    latents = 0.18215 * pipe.vae.encode(images).latent_dist.mean
print("Encoded latents shape:", latents.shape)
# 再解码回来
with torch.no_grad():
    decoded_images = pipe.vae.decode(latents / 0.18215).sample
print("Decoded images shape:", decoded_images.shape)

In [None]:
# 分词器和文本编码器

# 手动对提示文字进行分词和编码
# 分词
input_ids = pipe.tokenizer(["A painting of a flooble"])['input_ids']
print("Input ID -> decoded token")
for input_id in input_ids[0]:
    print(f"{input_id} -> {pipe.tokenizer.decode(input_id)}")
# 将分词结果输入CLIP 
input_ids = torch.tensor(input_ids).to(device)
with torch.no_grad():
    text_embeddings = pipe.text_encoder(input_ids)['last_hidden_state']
    print("Text embeddings shape:", text_embeddings.shape)

# 输出最终编码结果
text_embeddings = pipe._encode_prompt("A painting of a flooble", device, 1, False, '')
print(text_embeddings.size())

In [None]:
# UNet

# 创建伪输入
timestep = pipe.scheduler.timesteps[0]
latents = torch.randn(1, 4, 64, 64).to(device)
text_embeddings = torch.randn(1, 77, 1024).to(device)
# 让模型进行预测
with torch.no_grad():
    unet_output = pipe.unet(latents, timestep, text_embeddings).sample
print('UNet output shape:', unet_output.shape)

In [None]:
# 调度器
# 调度器保存了关于如何添加噪声的信息，并管理如何基于模型的预测更新“带噪”样本。
# 默认调度器是 PNDMScheduler，也可以使用其他调度器如 LMSDiscreteScheduler

# 观察采样过程中噪声的变化
plt.plot(pipe.scheduler.alphas_cumprod, label=r'$\bar{\alpha}$')
plt.xlabel('Timestep (high noise to low noise ->)')
plt.title('Noise schedule')
plt.legend()

In [None]:
from diffusers import LMSDiscreteScheduler
# 替换原来的调度器
pipe.scheduler = LMSDiscreteScheduler.from_config(pipe.scheduler.config)
# 输出配置参数
print('Scheduler config:', pipe.scheduler)
# 使用新的调度器生成图片
pipe(prompt="Palette knife painting of an winter cityscape", 
     height=480, width=480, generator=torch.Generator(device=device).manual_seed(42)).images[0]

In [None]:
guidance_scale = 8
num_inference_steps=30
prompt = "Beautiful picture of a wave breaking"
negative_prompt = "zoomed in, blurry, oversaturated, warped"

# 对提示文字进行编码
text_embeddings = pipe._encode_prompt(prompt, device, 1, True, negative_prompt)
# 创建随机噪声作为起点
latents = torch.randn((1, 4, 64, 64), device=device, generator=generator)
latents *= pipe.scheduler.init_noise_sigma
# 准备调度器
pipe.scheduler.set_timesteps(num_inference_steps, device=device)
# 生成过程开始
for i, t in enumerate(pipe.scheduler.timesteps):
    latent_model_input = torch.cat([latents] * 2)
    latent_model_input = pipe.scheduler.scale_model_input(latent_model_input, t)
    with torch.no_grad():
        noise_pred = pipe.unet(latent_model_input, t, encoder_hidden_states=text_embeddings).sample
        noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
        noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
    latents = pipe.scheduler.step(noise_pred, t, latents).prev_sample
# 将隐变量映射到图片
with torch.no_grad():
    image = pipe.decode_latents(latents.detach())

pipe.numpy_to_pil(image)[0]

In [None]:
# 其他pipeline介绍