In [1]:
!pip install transformers gradio
!pip install diffusers
!pip install accelerate

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting transformers
  Downloading transformers-4.29.2-py3-none-any.whl (7.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.1/7.1 MB[0m [31m63.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting gradio
  Downloading gradio-3.32.0-py3-none-any.whl (19.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m19.9/19.9 MB[0m [31m47.2 MB/s[0m eta [36m0:00:00[0m
Collecting huggingface-hub<1.0,>=0.14.1 (from transformers)
  Downloading huggingface_hub-0.14.1-py3-none-any.whl (224 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m224.5/224.5 kB[0m [31m25.6 MB/s[0m eta [36m0:00:00[0m
Collecting tokenizers!=0.11.3,<0.14,>=0.11.1 (from transformers)
  Downloading tokenizers-0.13.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.8/7.8 MB[0m [31m91.7 

In [2]:
import PIL
from PIL import Image
import torch
from transformers import CLIPTextModel, CLIPTokenizer
from diffusers import AutoencoderKL, UNet2DConditionModel, PNDMScheduler, DPMSolverMultistepScheduler
from tqdm.auto import tqdm





In [3]:
vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse")
tokenizer = CLIPTokenizer.from_pretrained("stabilityai/stable-diffusion-2-1", subfolder="tokenizer") # to generate tokens for every word in the vocabulary
text_encoder = CLIPTextModel.from_pretrained("stabilityai/stable-diffusion-2-1", subfolder="text_encoder")
unet = UNet2DConditionModel.from_pretrained("stabilityai/stable-diffusion-2-1", subfolder="unet")
scheduler = DPMSolverMultistepScheduler.from_pretrained("stabilityai/stable-diffusion-2-1", subfolder="scheduler")



Downloading (…)lve/main/config.json:   0%|          | 0.00/547 [00:00<?, ?B/s]

Downloading (…)on_pytorch_model.bin:   0%|          | 0.00/335M [00:00<?, ?B/s]

Downloading (…)tokenizer/vocab.json:   0%|          | 0.00/1.06M [00:00<?, ?B/s]

Downloading (…)tokenizer/merges.txt:   0%|          | 0.00/525k [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/460 [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/824 [00:00<?, ?B/s]

Downloading (…)_encoder/config.json:   0%|          | 0.00/633 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/1.36G [00:00<?, ?B/s]

Downloading (…)ain/unet/config.json:   0%|          | 0.00/939 [00:00<?, ?B/s]

Downloading (…)on_pytorch_model.bin:   0%|          | 0.00/3.46G [00:00<?, ?B/s]

Downloading (…)cheduler_config.json:   0%|          | 0.00/345 [00:00<?, ?B/s]

In [41]:
device = "cuda" if torch.cuda.is_available() else "cpu"  # change to "cuda" if you have a GPU
vae.to(device)
text_encoder.to(device)
unet.to(device)

batch_size = 1
def get_text_embeddings(prompt):
    text_input = tokenizer(
        prompt, padding="max_length", max_length=tokenizer.model_max_length, truncation=True, return_tensors="pt")

    with torch.no_grad():
        text_embeddings = text_encoder(text_input.input_ids.to(device))[0] 
        # text input is sent as a batch(of images) but we need only the first one as we are using it for inferencing only

    max_length = text_input.input_ids.shape[-1]
    uncond_input = tokenizer([""] * batch_size, padding="max_length", max_length=max_length, return_tensors="pt") 

    ## empty prompt- unconditioned input
    uncond_embeddings = text_encoder(uncond_input.input_ids.to(device))[0]

    text_embeddings = torch.cat([uncond_embeddings, text_embeddings]) 
    ## the ratio of the text and random dictates how creative or how text relevant an image will be(=guidance scale)

    return text_embeddings

def image_to_latents(img):
    with torch.no_grad():
      latent_dists = vae.encode(img.to(device)).latent_dist.sample()
      latent_samples = latent_dists
      latent_samples *= vae.config.scaling_factor
    return latent_samples

def latents_to_img(latents):
    latents = 1 / vae.config.scaling_factor * latents ##IMPORTANT LINE!!!
    with torch.no_grad():
        image = vae.decode(latents).sample
    image = (image / 2 + 0.5).clamp(0, 1)
    image = image.detach().cpu().permute(0, 2, 3, 1).numpy()
    images = (image * 255).round().astype("uint8")
    pil_images = [Image.fromarray(image) for image in images]
    return pil_images
    

def get_latents(text_embeddings, height=512, width=512, num_inference_steps=50,
                guidance_scale=7.5, init_latents = None, seed=0,
                return_all_latents=False , noise_in_image = 0.2 , mask = None): # returnAllLatents is used to return all the latents for the graphical video
    
    # generator = torch.manual_seed(seed).device("cuda")
    batch_size = text_embeddings.shape[0]//2

    init_latents = init_latents.to(device)
    init_latents = init_latents * scheduler.init_noise_sigma ## very important line, if not written we only get a brown image
    init_latents_withoutNoise = init_latents

    all_latents = [init_latents] # if we want to create a video of the latents
    scheduler.set_timesteps(num_inference_steps) 

    init_timestep = int(num_inference_steps * (1 - noise_in_image))
    timesteps = scheduler.timesteps[-init_timestep]
    
    timesteps = torch.tensor([timesteps] * batch_size, dtype=torch.long, device= "cuda")

    noise = torch.randn(init_latents.shape, device= "cuda")
    init_latents = scheduler.add_noise(init_latents, noise, timesteps)
    
    # noise = torch.randn(init_latents.shape, generator=generator, device=self.device)
    
    # init_latents_withNoise = perturb_latents(init_latents, noise_in_image)

    # init_latents = init_latents_withNoise

    mask.to(device)

    latents = init_latents
    t_start = num_inference_steps - init_timestep
    # latents = (latents * mask)
    
    # i = 0.0
    for t in tqdm(scheduler.timesteps[t_start + 1:]):
        
        # ## to make only last noise % of steps takes place.
        # if( i  / float(num_inference_steps) < 1 - noise_in_image  ): ## found out adding 0.1 increases the output quality.
        #    i+= 1.0
        #    continue
        # print(i)
        # i+= 1.0

        latent_model_input = torch.cat([latents] * 2)

        # predict the noise residual
        with torch.no_grad():
            noise_pred = unet(latent_model_input, t,encoder_hidden_states=text_embeddings).sample ## text_embeddings added in hidden layer

        # perform guidance
        noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) 
        # as we sent 2 latents concatenated, one random one text, we separated the pred_noise in 2 parts
        noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

        # guidance scale - how much of image should be similar to the text and how much of it should be creative
        # running the dission or noise reduction step for t steps
        latents = scheduler.step(noise_pred, t, latents).prev_sample  

        # print(mask.get_device())
        # ## adding noise to the latent model input acc to the timestep t

        init_latents_initialWithNoise = scheduler.add_noise(init_latents_withoutNoise, noise, t)
        latents = (init_latents_initialWithNoise * mask) + (latents * (1- mask))

        all_latents.append(latents)

    if not return_all_latents:
        return latents
    
    all_latents = torch.cat(all_latents, dim=0)

    return all_latents

def perturb_latents(latents, scale = 0.2):
    with torch.no_grad():
      noise = torch.randn_like(latents)
      new_latents = (1 - scale) * latents + scale * noise
      return (new_latents - new_latents.mean()) / new_latents.std()

def InpaintingPipeline( init_image , mask, prompt = "" , guidance = 7.5, noise_in_image = 0.2):
    print("starting of image to latents")
    
    latents_img = image_to_latents(init_image)
    print(mask.size())
    
    mask_tensor = torch.zeros((1, 4, 64, 64), device= "cuda")
    mask_tensor.to(device)
    print(mask_tensor.get_device())
    for i in range(4):
        for j in range(64):
            for k in range(64):
                mask_tensor[0][i][j][k] = mask[0][i][j][k]
    mask_tensor.to(device)
    # unmask_tensor = 1- mask_tensor
    # unmask_tensor.to(device)
    print(mask_tensor.get_device())

    latents = latents_img 
    latents.to(device)   
    
    ## adding noise to initial image latents to predict the right image by eliminating noise at each step 
    # latents = perturb_latents(latents, noise_in_image)
    print("finishing of image to latent")

    print("starting of text to embedding")
    text_embeddings = get_text_embeddings(prompt)
    print(text_embeddings.size())
    print("finishing of text to embedding")

    print("starting of getting the final latents of the formed picture")
    mask.to(device)

    random_latents = torch.rand((1,4,64,64), device = "cuda")
    latents = get_latents(text_embeddings, 
                          height=512, 
                          width=512, 
                          num_inference_steps=50,
                          guidance_scale=guidance, 
                          init_latents = latents, 
                          seed=1,
                          return_all_latents=False,
                          noise_in_image = noise_in_image,
                          mask = mask_tensor, #latents_mask,
                          ) #latents_unmask)
    print("final latents received")
   
    all_imgs = []
    for i in tqdm(range(0, len(latents), batch_size)):
        imgs = latents_to_img(latents[i:i+batch_size])
        all_imgs.extend(imgs)
    return all_imgs[-1]

resized the mask array to make it equal to the the latent size of the image.
the latent size is 4* 64 * 64 where 4 are th enumber of channels and 64 * 64 represents the content of the particular channel and i think the feature has its location in the latent similar to the location of that feature in the initial picture. As the postional of feature is similar in latent and the original image, we can use mask by resizing(to 4 * 64 * 64) it directly also for the latents, no need to pass the mask through vae as then 0 and 1 will change and also there is not much image i the masks like colour, texture etc, its just 0 and 1. <br> 
Some of the variables which we use:
1. Strength: it is the inverse of the noise added in the image, so it is default at 0.8 so image have 20 % noise in the starting and only 20 steps of stable diffusion will be carried out.
2. Guidance: how much of the generated image in the inpainted part is rellated to the the text we inputed and how much is random, the more this guidance the more it is oriented towards the inputed text. 

In [44]:
import gradio as gr
#test
from io import BytesIO
import requests
import PIL
from PIL import Image
import numpy as np
import os
import uuid
import torch
from torch import autocast
import cv2
from matplotlib import pyplot as plt
# from diffusers import DiffusionPipeline
from torchvision import transforms
# from clipseg.models.clipseg import CLIPDensePredT

auth_token = os.environ.get("API_TOKEN") or True

def download_image(url):
    response = requests.get(url)
    return PIL.Image.open(BytesIO(response.content)).convert("RGB")

device = "cuda" if torch.cuda.is_available() else "cpu"

transform = transforms.Compose([
      transforms.ToTensor(),
      transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
      transforms.Resize((512, 512)),
])

def predict(dict,  prompt=""):
    # print(dict)
    with autocast("cuda"):
        init_image = dict["image"].convert("RGB").resize((512, 512))
        type(init_image)
        init_image = np.array(init_image).astype(np.float32) / 255.0
        # init_image = init_image.unsqueeze(0).transpose(0, 3, 1, 2)
        init_image = init_image.transpose(2,0,1)
        init_image = torch.from_numpy(init_image)
        init_image = init_image.unsqueeze(0)
        init_image =  2.0 * init_image - 1.0
        init_image.to(device)
        print(init_image.get_device())


        mask = dict["mask"].convert("L")
        mask = mask.resize((64, 64), resample=PIL.Image.NEAREST)
        mask = np.array(mask).astype(np.float32) / 255.0
        ### duplicating the mask for all 4 channels as the mask is applied for all channels in the image.
        mask = np.tile(mask, (4, 1, 1))
        # mask = mask.unsqueeze().transpose(0, 1, 2, 3) 
        mask = 1 - mask  # repaint white, keep black
        mask = torch.from_numpy(mask)
        mask.to("cuda:0")
        mask = mask.unsqueeze(0)
        # unmask = 1 - mask

        ## trying other method by passing latents also through vae
        # mask = dict["mask"].convert("RGB")
        # # mask = mask.to("cuda")
        # mask = mask.resize((512, 512), resample=PIL.Image.NEAREST)
        # # print(mask)
        # mask = np.array(mask).astype(np.float32) / 255.0
        # print(mask)
        # ### duplicating the mask for all 4 channels as the mask is applied for all channels in the image.
        # # mask = np.tile(mask, (4, 1, 1))
        # mask = mask.transpose(2,0,1)
        # mask = torch.from_numpy(mask)
        # mask = mask.unsqueeze(0)
        # mask = 1 - mask  # repaint white, keep black
        # mask.to("cuda")
        # unmask = 1 - mask

    print(prompt)
    output = InpaintingPipeline(init_image, mask, prompt, guidance= 7.5, noise_in_image = 0.2)
    # print(output.size)
    # print(output)
    return output

with gr.Blocks() as demo:
    with gr.Row():
        with gr.Column():
            image = gr.Image(source='upload', tool='sketch', elem_id="image_upload", type="pil", label="Upload").style(height=400)
            prompt = gr.Textbox(label = 'Your prompt (what you want to add in place of what you are removing)')
            btn = gr.Button("Run")
        with gr.Column():
            result = gr.Image(label="Result")
        btn.click(fn=predict, inputs=[image, prompt], outputs=result)

demo.queue().launch(share=True, debug = True)

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
Running on public URL: https://e088e0a31c57119887.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades (NEW!), check out Spaces: https://huggingface.co/spaces


-1
brown cat sitting on a bench facing forward realistic image
starting of image to latents
torch.Size([1, 4, 64, 64])
0
0
finishing of image to latent
starting of text to embedding
torch.Size([2, 77, 1024])
finishing of text to embedding
starting of getting the final latents of the formed picture


  0%|          | 0/39 [00:00<?, ?it/s]

final latents received


  0%|          | 0/1 [00:00<?, ?it/s]

-1
tiger sitting on a bench facing forward realistic image
starting of image to latents
torch.Size([1, 4, 64, 64])
0
0
finishing of image to latent
starting of text to embedding
torch.Size([2, 77, 1024])
finishing of text to embedding
starting of getting the final latents of the formed picture


  0%|          | 0/39 [00:00<?, ?it/s]

final latents received


  0%|          | 0/1 [00:00<?, ?it/s]

-1
tiger sitting in front of the bench realistic image
starting of image to latents
torch.Size([1, 4, 64, 64])
0
0
finishing of image to latent
starting of text to embedding
torch.Size([2, 77, 1024])
finishing of text to embedding
starting of getting the final latents of the formed picture


  0%|          | 0/39 [00:00<?, ?it/s]

final latents received


  0%|          | 0/1 [00:00<?, ?it/s]

Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://4243a1c9438175dbc8.gradio.live
Killing tunnel 127.0.0.1:7860 <> https://e088e0a31c57119887.gradio.live




In [None]:

dir(scheduler)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattr__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__len__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_compatibles',
 '_convert_to_karras',
 '_deprecated_kwargs',
 '_dict_from_json_file',
 '_get_compatibles',
 '_get_init_keys',
 '_internal_dict',
 '_sigma_to_t',
 '_threshold_sample',
 'add_noise',
 'alpha_t',
 'alphas',
 'alphas_cumprod',
 'betas',
 'compatibles',
 'config',
 'config_name',
 'convert_model_output',
 'dpm_solver_first_order_update',
 'extract_init_dict',
 'from_config',
 'from_pretrained',
 'get_config_dict',
 'has_compatibles',
 'ignore_for_config',
 'init_noise_sigma',
 'lambda_t',
 'load_config',
 'lower_order_nums',
 'model_outputs',
 'multistep_dpm_solver_second_order_upda

In [31]:
scheduler.init_noise_sigma

1.0

In [None]:
torch.cuda.is_available()

True