In [None]:
import requests
import torch
from PIL import Image
from io import BytesIO
import matplotlib.pyplot as plt

from diffusers import CycleDiffusionPipeline, DDIMScheduler

# load the pipeline
# make sure you're logged in with `huggingface-cli login`
model_id_or_path = "CompVis/stable-diffusion-v1-4"
scheduler = DDIMScheduler.from_pretrained(model_id_or_path, subfolder="scheduler")
pipe = CycleDiffusionPipeline.from_pretrained(model_id_or_path, scheduler=scheduler).to("cuda",torch.float64)

# let's download an initial image
url = "https://raw.githubusercontent.com/ChenWu98/cycle-diffusion/main/data/dalle2/An%20astronaut%20riding%20a%20horse.png"
response = requests.get(url)
init_image = Image.open(BytesIO(response.content)).convert("RGB")
init_image = init_image.resize((512, 512))
init_image.save("horse.png")
plt.imshow(init_image)
plt.show()

# let's specify a prompt
source_prompt = "An astronaut riding a horse"
prompt = "An astronaut riding an elephant"

# call the pipeline
image1 = pipe(
    prompt=prompt,
    source_prompt=source_prompt,
    image=init_image,
    num_inference_steps=100,
    eta=0.1,
    strength=0.8,
    guidance_scale=2,
    source_guidance_scale=1,
).images[0]
plt.imshow(image1)
plt.show()

In [None]:
def image_grid(imgs, rows, cols):
    assert len(imgs) == rows*cols

    if not isinstance(imgs[0], np.ndarray):
        w, h = imgs[0].size
        grid = Image.new('RGB', size=(cols*w, rows*h))
        grid_w, grid_h = grid.size
        
        for i, img in enumerate(imgs):
            grid.paste(img, box=(i%cols*w, i//cols*h))
    else:
        w, h = Image.fromarray((imgs[0] * 255).astype(np.uint8)).size
        grid = Image.new('RGB', size=(cols*w, rows*h))
        grid_w, grid_h = grid.size
        
        for i, img in enumerate(imgs):
            grid.paste(Image.fromarray((img * 255).astype(np.uint8)), box=(i%cols*w, i//cols*h))   

    return grid

In [None]:
import numpy as np
# load the pipeline
# make sure you're logged in with `huggingface-cli login`
model_id_or_path = "gsdf/Counterfeit-V2.5"
scheduler = DDIMScheduler.from_pretrained(model_id_or_path, subfolder="scheduler")
pipe = CycleDiffusionPipeline.from_pretrained(model_id_or_path, scheduler=scheduler).to("cuda",torch.float64)

# let's download an initial image
init_image = Image.open("/home/ubuntu/anime_dance/0001.png").convert("RGB")
init_image = init_image.resize((512, 512))

torch.manual_seed(30)
# let's specify a prompt
source_prompt = "masterpiece,best quality, 1girl, long hair, black hair, solo, black dress, white tie, white scarf, looking at viewer"
prompt = "masterpiece,best quality, 1girl, long hair, red hair, solo, black dress, white tie, white scarf, looking at viewer"

# call the pipeline
image1 = pipe(
    prompt=prompt,
    source_prompt=source_prompt,
    image=init_image,
    num_inference_steps=80,
    eta=0.1,
    strength=0.8,
    guidance_scale=7.5,
    source_guidance_scale=7.5,
).images[0]

grid = image_grid([init_image, image1], rows=1, cols=2)
grid

# Cycle Diffusion Deep Dive

In [None]:
import PIL
def preprocess(image):
    if isinstance(image, torch.Tensor):
        return image
    elif isinstance(image, PIL.Image.Image):
        image = [image]

    if isinstance(image[0], PIL.Image.Image):
        w, h = image[0].size
        w, h = (x - x % 8 for x in (w, h))  # resize to integer multiple of 8

        image = [np.array(i.resize((w, h), resample=PIL.Image.LANCZOS))[None, :] for i in image]
        image = np.concatenate(image, axis=0)
        image = np.array(image).astype(np.float32) / 255.0
        image = image.transpose(0, 3, 1, 2)
        image = 2.0 * image - 1.0
        image = torch.from_numpy(image)
    elif isinstance(image[0], torch.Tensor):
        image = torch.cat(image, dim=0)
    return image

def posterior_sample(scheduler, latents, timestep, clean_latents, generator, eta):
    # 1. get previous step value (=t-1)
    prev_timestep = timestep - scheduler.config.num_train_timesteps // scheduler.num_inference_steps

    if prev_timestep <= 0:
        return clean_latents

    # 2. compute alphas, betas
    alpha_prod_t = scheduler.alphas_cumprod[timestep]
    alpha_prod_t_prev = (
        scheduler.alphas_cumprod[prev_timestep] if prev_timestep >= 0 else scheduler.final_alpha_cumprod
    )

    variance = scheduler._get_variance(timestep, prev_timestep)
    std_dev_t = eta * variance ** (0.5)

    # direction pointing to x_t
    e_t = (latents - alpha_prod_t ** (0.5) * clean_latents) / (1 - alpha_prod_t) ** (0.5)
    dir_xt = (1.0 - alpha_prod_t_prev - std_dev_t**2) ** (0.5) * e_t
    noise = std_dev_t * randn_tensor(
        clean_latents.shape, dtype=clean_latents.dtype, device=clean_latents.device, generator=generator
    )
    prev_latents = alpha_prod_t_prev ** (0.5) * clean_latents + dir_xt + noise

    return prev_latents


def compute_noise(scheduler, prev_latents, latents, timestep, noise_pred, eta):
    # 1. get previous step value (=t-1)
    prev_timestep = timestep - scheduler.config.num_train_timesteps // scheduler.num_inference_steps

    # 2. compute alphas, betas
    alpha_prod_t = scheduler.alphas_cumprod[timestep]
    alpha_prod_t_prev = (
        scheduler.alphas_cumprod[prev_timestep] if prev_timestep >= 0 else scheduler.final_alpha_cumprod
    )

    beta_prod_t = 1 - alpha_prod_t

    # 3. compute predicted original sample from predicted noise also called
    # "predicted x_0" of formula (12) from https://arxiv.org/pdf/2010.02502.pdf
    pred_original_sample = (latents - beta_prod_t ** (0.5) * noise_pred) / alpha_prod_t ** (0.5)

    # 4. Clip "predicted x_0"
    if scheduler.config.clip_sample:
        pred_original_sample = torch.clamp(pred_original_sample, -1, 1)

    # 5. compute variance: "sigma_t(η)" -> see formula (16)
    # σ_t = sqrt((1 − α_t−1)/(1 − α_t)) * sqrt(1 − α_t/α_t−1)
    variance = scheduler._get_variance(timestep, prev_timestep)
    std_dev_t = eta * variance ** (0.5)

    # 6. compute "direction pointing to x_t" of formula (12) from https://arxiv.org/pdf/2010.02502.pdf
    pred_sample_direction = (1 - alpha_prod_t_prev - std_dev_t**2) ** (0.5) * noise_pred

    noise = (prev_latents - (alpha_prod_t_prev ** (0.5) * pred_original_sample + pred_sample_direction)) / (
        variance ** (0.5) * eta
    )
    return noise

In [None]:
source_prompt = "masterpiece,best quality, 1girl, long hair, red hair, solo, dress, blue eyes, looking at viewer"
prompt = "masterpiece,best quality, 1girl, long hair, red hair, solo, dress, red eyes, looking at viewer"
init_image = Image.open("/home/ubuntu/anime_dance/0001.png").convert("RGB")
init_image = init_image.resize((512, 512))
strength = 0.8
num_inference_steps = 50
guidance_scale = 7.5
source_guidance_scale = 7.5
num_images_per_prompt = 1
eta = 0.1
generator = None
prompt_embeds = None
output_type = "pil"
return_dict = True
callback = None
callback_steps = 1


scheduler = DDIMScheduler.from_pretrained(model_id_or_path, subfolder="scheduler")
pipe.scheduler = scheduler

torch.manual_seed(11)

with torch.no_grad():

    # 1. Check inputs
    pipe.check_inputs(prompt, strength, callback_steps)

    # 2. Define call parameters
    batch_size = 1
    device = pipe._execution_device
    do_classifier_free_guidance = guidance_scale > 1.0

    # 3. Encode input prompt
    prompt_embeds = pipe._encode_prompt(
                prompt,
                device,
                num_images_per_prompt,
                do_classifier_free_guidance,
                prompt_embeds=prompt_embeds,
            )
    source_prompt_embeds = pipe._encode_prompt(
        source_prompt, device, num_images_per_prompt, do_classifier_free_guidance, None
    )
    print(source_prompt_embeds.shape)

    # 4. Preprocess image
    image = preprocess(init_image)
    print(image.shape)
    print(torch.max(image), torch.min(image))

    # 5. Prepare timesteps
    pipe.scheduler.set_timesteps(num_inference_steps, device=device)
    timesteps, num_inference_steps = pipe.get_timesteps(num_inference_steps, strength, device)
    latent_timestep = timesteps[:1].repeat(batch_size * num_images_per_prompt)
    print(latent_timestep)

    # 6. Prepare latent variables
    latents, clean_latents = pipe.prepare_latents(
        image, latent_timestep, batch_size, num_images_per_prompt, prompt_embeds.dtype, device, generator
    )
    source_latents = latents
    print("latent shape:", latents.shape, clean_latents.shape)

    # 7. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline
    extra_step_kwargs = pipe.prepare_extra_step_kwargs(generator, eta)
    generator = extra_step_kwargs.pop("generator", None)



In [None]:
# 8. Denoising loop
from diffusers.utils import randn_tensor

with torch.no_grad():
    num_warmup_steps = len(timesteps) - num_inference_steps * pipe.scheduler.order
    with pipe.progress_bar(total=num_inference_steps) as progress_bar:
        for i, t in enumerate(timesteps):
            # expand the latents if we are doing classifier free guidance
            latent_model_input = torch.cat([latents] * 2)
            source_latent_model_input = torch.cat([source_latents] * 2)
            latent_model_input = pipe.scheduler.scale_model_input(latent_model_input, t)
            source_latent_model_input = pipe.scheduler.scale_model_input(source_latent_model_input, t)

            # predict the noise residual
            concat_latent_model_input = torch.stack(
                [
                    source_latent_model_input[0],
                    latent_model_input[0],
                    source_latent_model_input[1],
                    latent_model_input[1],
                ],
                dim=0,
            )
            concat_prompt_embeds = torch.stack(
                [
                    source_prompt_embeds[0],
                    prompt_embeds[0],
                    source_prompt_embeds[1],
                    prompt_embeds[1],
                ],
                dim=0,
            )
            concat_noise_pred = pipe.unet(
                concat_latent_model_input, t, encoder_hidden_states=concat_prompt_embeds
            ).sample

            # perform guidance
            (
                source_noise_pred_uncond,
                noise_pred_uncond,
                source_noise_pred_text,
                noise_pred_text,
            ) = concat_noise_pred.chunk(4, dim=0)
            noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
            source_noise_pred = source_noise_pred_uncond + source_guidance_scale * (
                source_noise_pred_text - source_noise_pred_uncond
            )

            # Sample source_latents from the posterior distribution.
            prev_source_latents = posterior_sample(
                pipe.scheduler, source_latents, t, clean_latents, generator=generator, **extra_step_kwargs
            )
            # Compute noise.
            noise = compute_noise(
                pipe.scheduler, prev_source_latents, source_latents, t, source_noise_pred, **extra_step_kwargs
            )
            source_latents = prev_source_latents

            # compute the previous noisy sample x_t -> x_t-1
            latents = pipe.scheduler.step(
                noise_pred, t, latents, variance_noise=noise, **extra_step_kwargs
            ).prev_sample

            # call the callback, if provided
            if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % pipe.scheduler.order == 0):
                progress_bar.update()
                if callback is not None and i % callback_steps == 0:
                    callback(i, t, latents)

In [None]:
with torch.no_grad():
    # 9. Post-processing
    image = pipe.decode_latents(latents)

    # 11. Convert to PIL
    if output_type == "pil":
        image = pipe.numpy_to_pil(image)
grid = image_grid([init_image, image[0]], rows=1, cols=2)
grid

In [None]:
with torch.no_grad():
    # 9. Post-processing
    image = pipe.decode_latents(clean_latents)

    # 11. Convert to PIL
    if output_type == "pil":
        image = pipe.numpy_to_pil(image)
grid = image_grid([init_image, image[0]], rows=1, cols=2)
grid