In [None]:
import sys
import math
import matplotlib.pyplot as plt
from pathlib import Path
sys.path.append('/comfywr/diffusers/src/')

def iter_subplots_axes(ncol, n_subplots, tile_size_col=5, tile_size_row=5, title=None, title_fontsize=14):
    """ Creates subplots figure, and iterates over axes in left-right/top-bottom order """
    nrow = math.ceil(n_subplots / ncol)
    fig, axes = plt.subplots(nrow, ncol)
    if title is not None:
        plt.suptitle(title, fontsize=title_fontsize)
    fig.set_size_inches(ncol * tile_size_col, nrow * tile_size_row)
    for i in range(n_subplots):
        if nrow > 1 and ncol > 1:
            ax = axes[i // ncol, i % ncol]
        else:
            if n_subplots > 1 or ncol > 1:
                ax = axes[i]
            else:
                ax = axes
        plt.sca(ax)
        yield ax

In [None]:
path = Path('/comfywr/downloaded_models/checkpoints/DreamShaper_8_pruned.safetensors')
stem = path.stem
assert path.is_file()

In [None]:
!PYTHONPATH=/comfywr/diffusers/src python /comfywr/diffusers/scripts/convert_original_stable_diffusion_to_diffusers.py  \
    --checkpoint_path '/comfywr/downloaded_models/checkpoints/DreamShaper_8_pruned.safetensors' \
    --dump_path /comfywr/diffusers_tmp/chkp/ \
    --from_safetensors \
    --to_safetensors

In [None]:
from pathlib import Path
from diffusers import DDPMPipeline
from diffusers import DiffusionPipeline

pipe = DiffusionPipeline.from_pretrained('/comfywr/diffusers_tmp/chkp/', use_safetensors=True).to('cuda')
# ddpm = DDPMPipeline.from_pretrained('/comfywr/diffusers_tmp/chkp/', use_safetensors=True).to("cuda")
# ddpm = DDPMPipeline.from_pretrained("google/ddpm-cat-256", use_safetensors=True).to("cuda")
image = pipe(prompt='a tree', num_inference_steps=25).images[0]
image

In [None]:
from PIL import Image
import torch
from transformers import CLIPTextModel, CLIPTokenizer
from diffusers import AutoencoderKL, UNet2DConditionModel 

path = Path('/comfywr/diffusers_tmp/chkp/')
vae = AutoencoderKL.from_pretrained(path, subfolder="vae", use_safetensorfs=True)
tokenizer = CLIPTokenizer.from_pretrained(path, subfolder="tokenizer")
text_encoder = CLIPTextModel.from_pretrained(path, subfolder="text_encoder", use_safetensors=True)
unet = UNet2DConditionModel.from_pretrained(path, subfolder="unet", use_safetensors=True)
torch_device = "cuda"
vae = vae.to(torch_device)
text_encoder = text_encoder.to(torch_device)
unet = unet.to(torch_device)
pass

In [None]:
# from diffusers import EulerAncestralDiscreteScheduler
# scheduler = EulerAncestralDiscreteScheduler()
from diffusers import UniPCMultistepScheduler
scheduler = UniPCMultistepScheduler.from_pretrained("CompVis/stable-diffusion-v1-4", subfolder="scheduler")

prompt = ["A purple tree on red grass"]
height = 512  # default height of Stable Diffusion
width = 512  # default width of Stable Diffusion
num_inference_steps = 25  # Number of denoising steps
guidance_scale = 7.5  # Scale for classifier-free guidance
generator = torch.Generator(device='cuda').manual_seed(0)
batch_size = len(prompt)

text_input = tokenizer(
    prompt, padding="max_length", max_length=tokenizer.model_max_length, truncation=True, return_tensors="pt"
)
with torch.no_grad():
    text_embeddings = text_encoder(text_input.input_ids.to(torch_device))[0]
    print(text_embeddings.shape)

max_length = text_input.input_ids.shape[-1]
uncond_input = tokenizer([""] * batch_size, padding="max_length", max_length=max_length, return_tensors="pt")
uncond_embeddings = text_encoder(uncond_input.input_ids.to(torch_device))[0]

text_embeddings = torch.cat([uncond_embeddings, text_embeddings])

assert 2 ** (len(vae.config.block_out_channels) - 1) == 8


In [None]:
# assert scheduler.init_noise_sigma == scheduler.sigmas[0]
latents = torch.randn(
    (batch_size, unet.config.in_channels, height // 8, width // 8),
    generator=generator,
    device=torch_device,
)
latents = latents * scheduler.init_noise_sigma

from tqdm.auto import tqdm
import numpy as np

scheduler.set_timesteps(num_inference_steps)

all_latents = [latents]

for timestep_index, t in enumerate(tqdm(scheduler.timesteps)):
    # expand the latents if we are doing classifier-free guidance to avoid doing two forward passes.
    latent_model_input = torch.cat([latents] * 2)

    # a = latent_model_input[0][0][0][0]
    latent_model_input = scheduler.scale_model_input(latent_model_input, timestep=t)
    # b = latent_model_input[0][0][0][0]
    # assert abs(a / b - (scheduler.sigmas[timestep_index] ** 2 + 1) ** 0.5) < 0.00001, (a/b, scheduler.sigmas[i])

    # predict the noise residual
    with torch.no_grad():
        noise_pred = unet(latent_model_input, t, encoder_hidden_states=text_embeddings).sample

    # perform guidance
    noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
    noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

    # compute the previous noisy sample x_t -> x_t-1
    latents = scheduler.step(noise_pred, t, latents).prev_sample
    all_latents.append(latents)

In [None]:
all_imgs = []
for latents in all_latents:
    # the 0.18215 constant seems to be just used in VAE training -- TODO: is  it just experimental
    latents = 1 / 0.18215 * latents
    with torch.no_grad():
        image = vae.decode(latents).sample
        image = (image / 2 + 0.5).clamp(0, 1).squeeze()
        image = (image.permute(1, 2, 0) * 255).to(torch.uint8).cpu().numpy()
        all_imgs.append(image)

In [None]:
for img, _ in zip(all_imgs, iter_subplots_axes(4, len(all_imgs))):
    plt.imshow(img)