In [None]:
# Download the stable diffusion model from Hugging Face Diffusers library
from diffusers import StableDiffusionPipeline
import torch
pipe = StableDiffusionPipeline.from_pretrained("CompVis/stable-diffusion-v1-4", variant="fp16", dtype=torch.float16, use_auth_token=False).to("cuda")
image = pipe("An astronaught scuba diving").images[0]

In [None]:
from base64 import b64encode
import numpy as np
from diffusers import AutoencoderKL, LMSDiscreteScheduler, UNet2DConditionModel
from huggingface_hub import notebook_login

# For video display:
from IPython.display import HTML
from matplotlib import pyplot as plt
from pathlib import Path
from PIL import Image
from torch import autocast
from torchvision import transforms as tfms
from tqdm.auto import tqdm
from transformers import CLIPTextModel, CLIPTokenizer, logging
import os


In [None]:
!ls ~/.cache/huggingface/hub # check if model is cached

In [None]:
torch.manual_seed(1)
if not (Path.home()/'.cache/huggingface'/'token').exists(): notebook_login()

In [None]:
torch_device = "cuda" if torch.cuda.is_available() else "mps" if torch.backens.mps.is_available() else "cpu"

In [None]:
print(torch_device)

In [None]:
# load the autoencoder model which will be used to decode the latents into image space
ae = AutoencoderKL.from_pretrained("CompVis/stable-diffusion-v1-4", subfolder="vae")

In [None]:
# Load the tokenizer and text encoder to tokenize and encode the text.
tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-large-patch14")
text_encoder = CLIPTextModel.from_pretrained("openai/clip-vit-large-patch14")

In [None]:
# load the unet model which predicts the amount of noise in the latents

unet = UNet2DConditionModel.from_pretrained("CompVis/stable-diffusion-v1-4", subfolder="unet")

# many options for noise scheduler are available. we go with the  Linear MultiStep Discrete Scheduler (LMSDiscrete).
scheduler = LMSDiscreteScheduler(beta_start = 0.00085, beta_end = 0.012, beta_schedule = "scaled_linear", num_train_timesteps = 1000)



In [None]:
# moving all the models to gpu
vae = ae.to(torch_device)
text_encoder = text_encoder.to(torch_device)
unet = unet.to(torch_device)

In [None]:
# digging a bit deeper into diffusion
# Setting params
prompt = ["oil painting of a bull dog"]
height, width = 512, 512 # default resolution of image
num_inference_steps = 50 # number of inference steps
guidance_scale = 7.5 # the sweet spot to balance image generation diversity and adherence to prompt
generator = torch.manual_seed(42) # to replicate results
batch_size = 1

# get the text embedding for the prompts
text_input = tokenizer(prompt, padding="max_length", max_length=tokenizer.model_max_length, truncation=True, return_tensors="pt")

with torch.no_grad():
  text_embeddings = text_encoder(text_input.input_ids.to(torch_device))[0]
max_length = text_input.input_ids.shape[-1]

# get the text embedding for the unconditional image generation
uncond_str = [""] * batch_size
uncond_input = tokenizer(uncond_str, padding = "max_length", max_length = max_length, return_tensors = "pt")

with torch.no_grad():
  uncond_embeddings = text_encoder(uncond_input.input_ids.to(torch_device))[0]

text_embeddings = torch.cat([uncond_embeddings, text_embeddings]) # concatenate both unconditional and prompt conditional embeddings




In [None]:
# prepare the noise scheduler

scheduler.set_timesteps(num_inference_steps)

# prepare the latents

latents = torch.randn(
    batch_size, unet.config.in_channels, height // 8, width // 8,
    generator = generator
) # random noise scaled down to 1/8 th the resolution of original image
latents = latents.to(torch_device) # moving to GPU
latents = latents * scheduler.init_noise_sigma # scaling the latents


In [None]:
with autocast("cuda") : # convert to the correct type in cuda (memory efficient and accurate)
  for i, t in tqdm(enumerate(scheduler.timesteps), total = len(scheduler.timesteps)):
    # expand the latents to the UNet model
    latent_model_input = torch.cat([latents] * 2)
    # scale the latents - pre-conditioning process
    latent_model_input = scheduler.scale_model_input(latent_model_input, t)

    with torch.no_grad():
      # predict the noise in the latent space at time step t
      noise_pred = unet(latent_model_input, t, encoder_hidden_states = text_embeddings).sample
    noise_pred_uncond,noise_pred_text = noise_pred.chunk(2)
    # mix them in the required proportion - similar to Kalman filter update
    noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
    # update the latents by removing the predicted noise
    latents = scheduler.step(noise_pred, t, latents).prev_sample




In [None]:
# scale and decode the latents back to image
latents = 1 / 0.18125 * latents # this particula scale is chosen, because during training, the same scaling is used to account for variance shift
with torch.no_grad():
  # use a variational auto encoder to retrieve the image from the latent
  image = vae.decode(latents).sample

# Display the image
image = (image / 2 + 0.5).clamp(0,1)
# permute to NCHW format and convert to uint8
image = image.detach().cpu().permute(0,2,3,1).numpy()
image = (image * 255).round().astype(np.uint8)
image.shape

In [None]:
pil_image = [Image.fromarray(img) for img in image]
pil_image[0]


We are able to get a pretty good image of a bull dog in oil painting style with inference_steps set to 50. if we increase the num_inference_steps, there is not much improvement. So, it may not be scalable with respect to the number of inference steps.

Let us dig into the components which make up this inference pipeline - 1. VAE 2. Unet 3. Noise scheduler

In [None]:
# Autoencoders help compress an image into a latent space of reduced dimension and then decompress it back to recover the original image

def pil_to_latent(input_im) :
  # input image shape -> (1, 4, 64, 64)
  with torch.no_grad():
    latent = vae.encode(tfms.ToTensor()(input_im).unsqueeze(0).to(torch_device) * 2 - 1) # the scaling , this is why we divide by 2 and subtract by 0.5 during decoding
  return 0.18125 * latent.latent_dist.sample() # scale the latents by a factor to account for covariance shift during matrix multiplications

def latent_to_pil(latent):
  # batch of latents -> image
  latent = 1/0.18125 * latent
  with torch.no_grad():
    image = vae.decode(latent).sample
  # reverse process of pil_to_latent
  image = (image / 2 + 0.5).clamp(0,1)
  image = image.detach().cpu().permute(0,2,3,1).numpy()
  image = (image * 255).round().astype(np.uint8)
  pil_image = [Image.fromarray(img) for img in image]
  return pil_image


In [None]:
# let us download an image from web
!pip install bing_image_downloader
from bing_image_downloader import downloader
downloader.download(query="cute kitten", limit=1, output_dir='.', adult_filter_off=True, force_replace=False, timeout=60)


In [None]:
input_image = Image.open('/content/cute kitten/Image_1.jpg').resize((512, 512))
input_image

In [None]:
# encode the image into latent space
latent = pil_to_latent(input_image)
print(latent.shape)

In [None]:
# plot the latents to get an idea of what the compressed form holds
fig, axs = plt.subplots(1, 4, figsize=(16,4))
for c in range(4):
  axs[c].imshow(latent[0][c].cpu(), cmap = "Greys")

the latents capture quite a lot of data about the image like shape, color, textures etc

In [None]:
# let us see if we the decoder gets back the image from these latents
reconstructed_image = latent_to_pil(latent)
reconstructed_image[0]

The diffusion process performed in the latent space and then using the VAE to get the image back saves a lot of computational processes, becausing working with original image resolution is very expensive and takes a lot of time and compute to train. Another approach could be to resize the image to a smaller one, but there could be information loss and blurring effect. This latent space representation through VAE, compresses and also preserves information.

In [None]:
# playing around with schedulers
scheduler.set_timesteps(15)
print(scheduler.timesteps)
print(scheduler.sigmas)

In [None]:
# Plotting this noise schedule:
plt.plot(scheduler.timesteps)
plt.title('Inference sampling timesteps')
plt.xlabel('Sampling step')
plt.ylabel('Timesteps')
plt.show()

In [None]:
# Plotting this noise schedule:
plt.plot(scheduler.sigmas)
plt.title('Inference sample sigmas')
plt.xlabel('Sampling step')
plt.ylabel('sigmas')
plt.show()

In [None]:
noise = torch.randn_like(latent) # generate a normally distributed noise
sampling_step = 10
print(scheduler.timesteps[sampling_step], scheduler.sigmas[sampling_step])

In [None]:
latent_noised = scheduler.add_noise(latent, noise, timesteps=torch.tensor([scheduler.timesteps[sampling_step]]))
# convert this noised latent to full size image
noised_image = latent_to_pil(latent_noised)

In [None]:
noised_image[0]

We see that the kitty's structure is somewhat preserved but otherwise the image is very noisy with all the details taken away

In [None]:
# let us see how the image looks at different timesteps
timesteps = [2, 4, 8, 14]
imgs = []
for t in timesteps:
  noise = torch.randn_like(latent)
  sampling_step = t
  latent_noised = scheduler.add_noise(latent, noise, timesteps=torch.tensor([scheduler.timesteps[sampling_step]]))
  # convert this noised latent to full size image
  noised_image = latent_to_pil(latent_noised)
  imgs.append(noised_image[0])


In [None]:
fig, axs = plt.subplots(1, 4, figsize=(16,4))
for i in range(len(timesteps)):
  axs[i].imshow(imgs[i])

In [None]:
# what does add_noise do?
??scheduler.add_noise

During inference, we don't start with a noisy image. instead we have a noisy latent generated by scaling it with the largest variance, hence the factor 0.18125 in the code.this is particulary applicable for this type of model with this scheduler. if a model uses different scheduler , these constants change

We have already seen how to add noise to images through latents and remove noise and get back the image as well. So, it is easier to understand image2image generation. Given an image and a text promot, the diffusion model can generate an image adhering to the text prompt. Instead of starting from noisy latents, we start with reference image

In [None]:
prompt = "A kitten with a hat and a sunglass on"
height , width = 512, 512
num_inference_steps = 50
guidance_scale = 7.5 # usually works well, but can be tuned
generator = torch.manual_seed(42)
batch_size = 1

text_input = tokenizer(prompt, padding="max_length", max_length = tokenizer.model_max_length,
                       truncation=True, return_tensors = "pt")
with torch.no_grad():
  text_embeddings = text_encoder(text_input.input_ids.to(torch_device))[0]
max_length = text_input.input_ids.shape[-1]
uncond_text = [""] * batch_size
uncond_input = tokenizer(uncond_text, padding="max_length", max_length = max_length, return_tensors="pt")
with torch.no_grad():
  uncond_embeddings = text_encoder(uncond_input.input_ids.to(torch_device))[0]
text_embeddings = torch.cat([uncond_embeddings, text_embeddings])




In [None]:
scheduler.set_timesteps(num_inference_steps)

In [None]:
# prepare latents from the already existing latent of the cute kitten we have from before
start_step = 10
start_sigma = scheduler.sigmas[start_step]
noise = torch.randn_like(latent)
new_latent = scheduler.add_noise(latent, noise, torch.tensor([scheduler.timesteps[start_step]]))
new_latent = new_latent.to(torch_device).float()

In [None]:
for i, t in tqdm(enumerate(scheduler.timesteps), total = len(scheduler.timesteps)):
  if i >= start_step: # we already have added noise of the first 10 steps to the image's latent
    latent_model_input = torch.cat([new_latent] * 2)
    latent_model_input = scheduler.scale_model_input(latent_model_input, t)

    # predict the noise at the timestep t
    with torch.no_grad():
      noise_pred = unet(latent_model_input, t, encoder_hidden_states = text_embeddings).sample
    # split text and unconditional noise
    noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
    # perform classifier free guidance
    noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
    # compute the previously noisy sample x_t-1 given x_t
    new_latent = scheduler.step(noise_pred, t, new_latent).prev_sample



In [None]:
new_image = latent_to_pil(new_latent)

In [None]:
new_image[0]

not bad, with a request for hat and sunglass, the model generates an image with sunglass on but missing hat, maybe a higher weightage to guidance scale may improve the result

In [None]:
# let us put the code in a function to play around with different guidance scale and start step

def img2img(guidance_scale, start_step):
  prompt = "A kitten with a hat and a sunglass on"
  height , width = 512, 512
  num_inference_steps = 50
  guidance_scale = 7.5 # usually works well, but can be tuned
  generator = torch.manual_seed(42)
  batch_size = 1

  text_input = tokenizer(prompt, padding="max_length", max_length = tokenizer.model_max_length,
                        truncation=True, return_tensors = "pt")
  with torch.no_grad():
    text_embeddings = text_encoder(text_input.input_ids.to(torch_device))[0]
  max_length = text_input.input_ids.shape[-1]
  uncond_text = [""] * batch_size
  uncond_input = tokenizer(uncond_text, padding="max_length", max_length = max_length, return_tensors="pt")
  with torch.no_grad():
    uncond_embeddings = text_encoder(uncond_input.input_ids.to(torch_device))[0]
  text_embeddings = torch.cat([uncond_embeddings, text_embeddings])

  scheduler.set_timesteps(num_inference_steps)

  start_sigma = scheduler.sigmas[start_step]
  noise = torch.randn_like(latent)
  new_latent = scheduler.add_noise(latent, noise, torch.tensor([scheduler.timesteps[start_step]]))
  new_latent = new_latent.to(torch_device).float()

  for i, t in tqdm(enumerate(scheduler.timesteps), total = len(scheduler.timesteps)):
    if i >= start_step: # we already have added noise of the first 10 steps to the image's latent
      latent_model_input = torch.cat([new_latent] * 2)
      latent_model_input = scheduler.scale_model_input(latent_model_input, t)

      # predict the noise at the timestep t
      with torch.no_grad():
        noise_pred = unet(latent_model_input, t, encoder_hidden_states = text_embeddings).sample
      # split text and unconditional noise
      noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
      # perform classifier free guidance
      noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
      # compute the previously noisy sample x_t-1 given x_t
      new_latent = scheduler.step(noise_pred, t, new_latent).prev_sample
  new_image = latent_to_pil(new_latent)
  return new_image[0]




In [None]:
img2img(7.5, 15)

increase in start step worsens the result with respect to adherence of prompt. So, let us decrease the start_step,. this could move the image away from original image, but worth experimenting

In [None]:
img2img(7.5, 5)

This is better with respect to adherence to prompt, but as expected, the image has deviated from the original image. let us now increase the guidance scale

In [None]:
img2img(10, 5)