# **Understanding pipelines, models and schedulers**

- 작성일 : 24.07.30  
- 작성자 : 유소영  
- 출처 : https://huggingface.co/docs/diffusers/using-diffusers/write_own_pipeline

Diffusers는 사용자 친화적이고 유연한 도구로, 당신의 사용 사례에 맞는 확산 시스템을 구축하도록 설계되었습니다. 

이 도구의 핵심은 <u>모델과 스케줄러</u>입니다. DiffusionPipeline이 편의를 위해 이 구성 요소들을 함께 묶어주지만, 파이프라인을 해체하여 모델과 스케줄러를 별도로 사용해 새로운 확산 시스템을 만들 수도 있습니다.

이 튜토리얼에서는 모델과 스케줄러를 사용하여 추론을 위한 확산 시스템을 조립하는 방법을 배웁니다. 기본 파이프라인부터 시작해 Stable Diffusion 파이프라인까지 진행합니다.

기본 파이프라인 해체하기
파이프라인은 모델을 추론에 사용하는 빠르고 쉬운 방법입니다. 이미지를 생성하는 데 단 4줄의 코드만 필요합니다:

## Deconstruct a basic pipeline


In [None]:
from diffusers import DDPMPipeline

ddpm = DDPMPipeline.from_pretrained("google/ddpm-cat-256", use_safetensors=True).to("cuda")
image = ddpm(num_inference_steps=250).images[0]
image.show()

이 과정은 매우 간단해 보이지만, 파이프라인이 어떻게 작동하는지 자세히 살펴보겠습니다.

위 예시에서 파이프라인은 UNet2DModel 모델과 DDPMScheduler를 포함합니다. 파이프라인은 원하는 출력 크기의 랜덤 노이즈를 여러 번 모델에 통과시켜 이미지의 노이즈를 제거합니다.  
각 timestep에서 모델은 '노이즈 잔차'를 예측하고, 스케줄러는 이를 사용해 덜 노이즈가 있는 이미지를 예측합니다. 파이프라인은 지정된 추론 단계 수에 도달할 때까지 이 과정을 반복합니다.
모델과 스케줄러를 별도로 사용하여 파이프라인을 재현하기 위해, 자체적인 디노이징 프로세스를 작성해 보겠습니다.  

**1. Load the model and scheduler:**

In [None]:
from diffusers import DDPMScheduler, UNet2DModel

scheduler = DDPMScheduler.from_pretrained("google/ddpm-cat-256")
model = UNet2DModel.from_pretrained("google/ddpm-cat-256", use_safetensors=True).to("cuda")

**2. Set the number of timesteps to run the denoising process for:**


In [None]:
scheduler.set_timesteps(50)


**3. 스케줄러 timesteps를 설정하면 균일하게 분포된 요소를 가진 텐서가 생성됩니다. 이 예시에서는 50개입니다. 각 요소는 모델이 이미지의 노이즈를 제거하는 timestep에 해당합니다. 나중에 디노이징 루프를 만들 때, 이 텐서를 반복하며 이미지의 노이즈를 제거하게 됩니다:**

In [None]:
print(scheduler.timesteps)

**4. 원하는 출력과 같은 형태의 랜덤 노이즈를 생성합니다:**

In [None]:
import torch

sample_size = model.config.sample_size
noise = torch.randn((1, 3, sample_size, sample_size), device="cuda")

**5. 이제 timesteps를 반복하는 루프를 작성합니다.**  각 timestep에서 모델은 UNet2DModel.forward() 패스를 수행하고 노이즈 잔차를 반환합니다. 스케줄러의 step() 메소드는 노이즈 잔차, timestep, 그리고 입력을 받아 이전 timestep의 이미지를 예측합니다. 이 출력은 디노이징 루프에서 모델의 다음 입력이 되며, `timesteps` 배열의 끝에 도달할 때까지 이 과정이 반복됩니다.

In [None]:
input = noise

for t in scheduler.timesteps:
    with torch.no_grad():
        noisy_residual = model(input, t).sample
    previous_noisy_sample = scheduler.step(noisy_residual, t, input).prev_sample
    input = previous_noisy_sample

**6. The last step is to convert the denoised output into an image:**

In [None]:
from PIL import Image
import numpy as np

image = (input / 2 + 0.5).clamp(0, 1).squeeze()
image = (image.permute(1, 2, 0) * 255).round().to(torch.uint8).cpu().numpy()
image = Image.fromarray(image)
image

## Deconstruct the Stable Diffusion pipeline


Stable Diffusion은 텍스트-이미지 '잠재 확산(Latent Diffusion)' 모델입니다. '잠재 확산' 모델이라고 불리는 이유는 다음과 같습니다:

실제 픽셀 공간 대신 이미지의 저차원 표현을 다룹니다. 이로 인해 메모리 효율성이 높아집니다.

Stable Diffusion 모델의 주요 구성 요소:

- 인코더: 이미지를 더 작은 표현으로 압축합니다.
- 디코더: 압축된 표현을 다시 이미지로 변환합니다.
- 토크나이저와 인코더: 텍스트 임베딩을 생성합니다 (텍스트-이미지 모델에 필요).
- UNet 모델: 이전 예제에서 본 것과 같은 역할을 합니다.
- 스케줄러: 노이즈 제거 과정을 관리합니다.

이는 UNet 모델만 포함하는 DDPM 파이프라인보다 훨씬 복잡합니다. Stable Diffusion 모델은 세 개의 별도 사전 훈련된 모델을 가지고 있어 구조가 더 복잡합니다. Stable Diffusion 파이프라인에 필요한 구성 요소들을 알았으니, from_pretrained() 메서드를 사용해 이 모든 구성 요소를 로드하겠습니다.  

이들은 사전 훈련된 runwayml/stable-diffusion-v1-5 체크포인트에서 찾을 수 있으며, 각 구성 요소는 별도의 하위 폴더에 저장되어 있습니다:

In [None]:
from PIL import Image
import torch
from transformers import CLIPTextModel, CLIPTokenizer
from diffusers import AutoencoderKL, UNet2DConditionModel, PNDMScheduler

vae = AutoencoderKL.from_pretrained("CompVis/stable-diffusion-v1-4", subfolder="vae", use_safetensors=True)
tokenizer = CLIPTokenizer.from_pretrained("CompVis/stable-diffusion-v1-4", subfolder="tokenizer")
text_encoder = CLIPTextModel.from_pretrained(
    "CompVis/stable-diffusion-v1-4", subfolder="text_encoder", use_safetensors=True
)
unet = UNet2DConditionModel.from_pretrained(
    "CompVis/stable-diffusion-v1-4", subfolder="unet", use_safetensors=True
)

기본 PNDMScheduler 대신 UniPCMultistepScheduler로 교체하여 다른 스케줄러를 얼마나 쉽게 연결할 수 있는지 확인해 보겠습니다:

In [None]:
from diffusers import UniPCMultistepScheduler

scheduler = UniPCMultistepScheduler.from_pretrained("CompVis/stable-diffusion-v1-4", subfolder="scheduler")

추론 속도를 높이기 위해, 스케줄러와 달리 훈련 가능한 가중치를 가진 모델들을 GPU로 이동시킵니다:

In [None]:
torch_device = "cuda"
vae.to(torch_device)
text_encoder.to(torch_device)
unet.to(torch_device)

**텍스트 임베딩 생성**  
다음 단계는 텍스트를 토큰화하여 임베딩을 생성하는 것입니다. 이 텍스트는 UNet 모델을 조건화하고 확산 과정을 입력 프롬프트와 유사한 방향으로 유도하는 데 사용됩니다.

💡 `guidance_scale` 매개변수는 이미지 생성 시 프롬프트에 얼마나 가중치를 줄지 결정합니다.

다른 것을 생성하고 싶다면 원하는 프롬프트를 자유롭게 선택하세요!

In [None]:
prompt = ["a photograph of an astronaut riding a horse"]
height = 512  # default height of Stable Diffusion
width = 512  # default width of Stable Diffusion
num_inference_steps = 100  # Number of denoising steps
guidance_scale = 7.5  # Scale for classifier-free guidance
generator = torch.manual_seed(0)  # Seed generator to create the initial latent noise
batch_size = len(prompt)

Tokenize the text and generate the embeddings from the prompt:

In [None]:
text_input = tokenizer(
    prompt, padding="max_length", max_length=tokenizer.model_max_length, truncation=True, return_tensors="pt"
)

with torch.no_grad():
    text_embeddings = text_encoder(text_input.input_ids.to(torch_device))[0]

또한 'unconditional text embeddings'을 생성해야 합니다. 이는 패딩 토큰에 대한 임베딩입니다. 이들은 conditional text_embeddings와 같은 형태의(batch_size와 seq_length)를 가져야 합니다:

In [None]:
max_length = text_input.input_ids.shape[-1]
uncond_input = tokenizer([""] * batch_size, padding="max_length", max_length=max_length, return_tensors="pt")
uncond_embeddings = text_encoder(uncond_input.input_ids.to(torch_device))[0]

Let’s concatenate the conditional and unconditional embeddings into a batch to avoid doing two forward passes:

In [None]:
text_embeddings = torch.cat([uncond_embeddings, text_embeddings])

**랜덤 노이즈 생성**  
다음으로, 확산 과정의 시작점으로 사용할 초기 랜덤 노이즈를 생성합니다. 이것은 이미지의 잠재 표현이며, 점진적으로 노이즈가 제거될 것입니다. 이 시점에서 `latent` 이미지는 최종 이미지 크기보다 작습니다. 하지만 이는 문제가 되지 않습니다. 왜냐하면 모델이 나중에 이를 최종적인 512x512 이미지 크기로 변환할 것이기 때문입니다.  
💡 높이와 너비를 8로 나누는 이유는 vae 모델이 3개의 다운샘플링 레이어를 가지고 있기 때문입니다. 다음 코드를 실행하여 확인할 수 있습니다: 

2 ** (len(vae.config.block_out_channels) - 1) == 8

In [None]:
generator = torch.Generator(device=torch_device).manual_seed(123123)

latents = torch.randn(
    (batch_size, unet.config.in_channels, height // 8, width // 8),
    generator=generator,
    device=torch_device,
)

**이미지 노이즈 제거**  
먼저 초기 노이즈 분포인 *sigma*(노이즈 스케일 값)로 입력을 스케일링합니다. 이는 UniPCMultistepScheduler와 같은 개선된 스케줄러에 필요합니다:

```python
latents = latents * scheduler.init_noise_sigma
```

마지막 단계는 순수한 노이즈인 latents를 프롬프트에 설명된 이미지로 점진적으로 변환하는 디노이징 루프를 만드는 것입니다. 디노이징 루프는 다음 세 가지 작업을 수행해야 합니다:

1. 디노이징 중 사용할 스케줄러의 timesteps를 설정합니다.
2. timesteps를 반복합니다.
3. 각 timestep에서 UNet 모델을 호출하여 노이즈 잔차를 예측하고, 이를 스케줄러에 전달하여 이전의 노이즈가 있는 샘플을 계산합니다.

이 과정을 통해 점진적으로 노이즈가 제거되고 원하는 이미지가 생성됩니다.

In [None]:
from tqdm.auto import tqdm

scheduler.set_timesteps(num_inference_steps)

for t in tqdm(scheduler.timesteps):
    # expand the latents if we are doing classifier-free guidance to avoid doing two forward passes.
    latent_model_input = torch.cat([latents] * 2)

    latent_model_input = scheduler.scale_model_input(latent_model_input, timestep=t)

    # predict the noise residual
    with torch.no_grad():
        noise_pred = unet(latent_model_input, t, encoder_hidden_states=text_embeddings).sample

    # perform guidance
    noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
    noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

    # compute the previous noisy sample x_t -> x_t-1
    latents = scheduler.step(noise_pred, t, latents).prev_sample

**Decode the image**

The final step is to use the vae to decode the latent representation into an image and get the decoded output with sample:

In [None]:
# scale and decode the image latents with vae
latents = 1 / 0.18215 * latents
with torch.no_grad():
    _image = vae.decode(latents).sample

In [None]:
image =(_image-_image.min()) / (_image.max() - _image.min())
image = (image.permute(2,3,1,0) * 255).to(torch.uint8).cpu().numpy()
image = image[:,:,:,0]
image = Image.fromarray(image)
image.show()