<a href="https://colab.research.google.com/github/yashbansal1000/inferencing_pipeline/blob/main/Inferencing_Pipeline_Streamlined.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#@title Variables
trained_model_dir = '/content/drive/MyDrive/LoRA/output' # {type:"string"}
model_name = 'athar'
model_checkpoint = 'athar-05'
output_dir = "/content/drive/MyDrive/training_results/" + model_name + "/generations"

In [None]:
#@title Dependencies, Models, Server Setup

%cd /content

import os
import re
from google.colab import drive

drive.mount('/content/drive')


%env TF_CPP_MIN_LOG_LEVEL=1

!apt -y update -qq
!wget https://github.com/camenduru/gperftools/releases/download/v1.0/libtcmalloc_minimal.so.4 -O /content/libtcmalloc_minimal.so.4
%env LD_PRELOAD=/content/libtcmalloc_minimal.so.4

!apt -y install -qq aria2 libcairo2-dev pkg-config python3-dev
!pip install -q xformers==0.0.20 triton==2.0.0 -U

!git clone -b v2.5 https://dagshub.com/camenduru/ui
!git clone https://github.com/camenduru/tunnels /content/ui/extensions/tunnels
%cd /content/ui

!sed -i -e '''/from modules import launch_utils/a\import os''' /content/ui/launch.py
!sed -i -e '''/        prepare_environment()/a\        os.system\(f\"""sed -i -e ''\"s/dict()))/dict())).cuda()/g\"'' /content/ui/repositories/stable-diffusion-stability-ai/ldm/util.py""")''' /content/ui/launch.py
!sed -i -e 's/\["sd_model_checkpoint"\]/\["sd_model_checkpoint","sd_vae","CLIP_stop_at_last_layers"\]/g' /content/ui/modules/shared.py


#### Installing extensions
!git clone https://github.com/Bing-su/adetailer.git /content/ui/extensions/adetailer
!git clone https://github.com/Coyote-A/ultimate-upscale-for-automatic1111 /content/ui/extensions/ultimate-upscale-for-automatic1111
!git clone https://github.com/Mikubill/sd-webui-controlnet.git /content/ui/extensions/sd-webui-controlnet

#Copying the Realistic Vision Base Model
!cp /content/drive/MyDrive/realisticVisionV51_v51VAE.safetensors /content/ui/models/Stable-diffusion

os.makedirs("/content/ui/models/Lora", exist_ok=True)

#Copying Genereal LoRAs
!cp /content/drive/MyDrive/LowRA.safetensors /content/ui/models/Lora
!cp /content/drive/MyDrive/add_detail.safetensors /content/ui/models/Lora
!cp /content/drive/MyDrive/epi_noiseoffset2.safetensors /content/ui/models/Lora

#Copying the ControlNet Models
os.makedirs("/content/ui/models/ControlNet", exist_ok=True)
!cp /content/drive/MyDrive/control_v11f1e_sd15_tile.pth /content/ui/models/ControlNet


#Copying the Ultimate SD Upscale Model
os.makedirs("/content/ui/models/ESRGAN", exist_ok=True)
!cp /content/drive/MyDrive/4x-UltraSharp.pth /content/ui/models/ESRGAN

###################################################
# There's a bug which prevents the running of LoRA
# This fixes it
###################################################

print("Fixing the bug")
filename = '/content/ui/webui.py'

with open(filename, 'r') as file:
    lines = file.readlines()

new_lines = []
for l in range(len(lines)):
    line = lines[l]
    new_lines.append(line)
    if 'api = create_api' in line:
        if l + 1 < len(lines) and 'modules.script_callbacks.before_ui_callback()' not in lines[l + 1]:
            print("Found the line. Adding the fix")
            new_lines.append('    modules.script_callbacks.before_ui_callback()\n')

with open(filename, 'w') as file:
    file.writelines(new_lines)



################################################
################################################
##### Running Server
################################################
################################################
import subprocess
import threading

def run_server():
    command = "python launch.py --nowebui --listen --xformers --enable-insecure-extension-access --theme dark --gradio-queue --multiple"
    print("Starting server...")
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Print stdout and stderr as they are produced
    for line in process.stdout:
        print(line.decode().strip())

    stdout, stderr = process.communicate()
    print("Server stopped.")
    if stderr:
        print("Error:")
        print(stderr.decode())

# Start the server in a separate thread
server_thread = threading.Thread(target=run_server)
server_thread.start()

In [None]:
#@title Model Checkpoints & Files Setup
#Copying the LoRA Checkpoints
trained_model = trained_model_dir + "/" + model_name
!cp {trained_model}-*.safetensors /content/ui/models/Lora
!cp {trained_model}.safetensors /content/ui/models/Lora

prefix = model_name
counter = 1

dir = "/content/ui/models/Lora"

# List all files in the current directory
files = os.listdir(dir)

# Sort the files to ensure they are processed in the correct order
files.sort()

# Define a pattern to match files with the prefix and a series of numbers
pattern = re.compile(rf'^{prefix}-\d{{6}}\.safetensors$')

# Rename files with numbers
for file in files:
  if pattern.match(file):
    print("Found: " + file)
    new_name = f"{dir}/{prefix}-{counter:02}.safetensors"
    os.rename(dir + "/" + file, new_name)
    counter += 1
  # Rename the file without numbers if it exists
  if f"{prefix}.safetensors" in file:
    new_name = f"{dir}/{prefix}-{counter:02}.safetensors"
    os.rename(dir + "/" + file, new_name)

In [None]:
#@title Inferencing Side Setup (WAIT for 5 mins for the server to start in previous step)
import json
import requests
import io
import base64
from PIL import Image, PngImagePlugin
import matplotlib.pyplot as plt
from datetime import datetime


activation_word = ""
with open(trained_model_dir + "/" + model_name + "_activation_word.txt", 'r') as file:
    activation_word = file.read()

os.makedirs(output_dir, exist_ok=True)

url = "http://0.0.0.0:7861"

opt = requests.get(url=f'{url}/sdapi/v1/options')
opt_json = opt.json()
opt_json['sd_model_checkpoint'] = 'realisticVisionV51_v51VAE.safetensors [15012c538f]'
requests.post(url=f'{url}/sdapi/v1/options', json=opt_json)

def get_base_model():
    opt = requests.get(url=f'{url}/sdapi/v1/options')
    opt_json = opt.json()
    return opt_json['sd_model_checkpoint']

def text2img(payload = {}, display_image = False):
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")

    response = requests.post(url=f'{url}/sdapi/v1/txt2img', json=payload)
    r = response.json()
    images = []
    for index, i in enumerate(r['images']):
        image = Image.open(io.BytesIO(base64.b64decode(i.split(",",1)[0])))
        images.append(image)
        png_payload = {
            "image": "data:image/png;base64," + i
        }
        response2 = requests.post(url=f'{url}/sdapi/v1/png-info', json=png_payload)
        pnginfo = PngImagePlugin.PngInfo()
        pnginfo.add_text("parameters", response2.json().get("info"))
        output_file = f'{output_dir}/output_{timestamp}_{index}.png'

        image.save(output_file, pnginfo=pnginfo)

        if display_image:
            plt.imshow(image)
            plt.axis('off') # To turn off axes
            plt.show()
    return images

def img2img(payload = {}, display_image = False):
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")

    response = requests.post(url=f'{url}/sdapi/v1/img2img', json=payload)
    r = response.json()

    images = []
    for index, i in enumerate(r['images']):
        image = Image.open(io.BytesIO(base64.b64decode(i.split(",",1)[0])))
        images.append(image)

        png_payload = {
            "image": "data:image/png;base64," + i
        }
        response2 = requests.post(url=f'{url}/sdapi/v1/png-info', json=png_payload)
        pnginfo = PngImagePlugin.PngInfo()
        pnginfo.add_text("parameters", response2.json().get("info"))
        output_file = f'{output_dir}/output_{timestamp}_{index}.png'

        image.save(output_file, pnginfo=pnginfo)

        if display_image:
            plt.imshow(image)
            plt.axis('off') # To turn off axes
            plt.show()
    return images

def create_prompt_with_loras(
                            prompt = "",
                            model_lora = "",
                            add_detail_lora = 0,
                            lowra_lora = 0
                            ):
    ### Prompt Formation
    ### Need to play around this a lot
    base_prompt_suffix = "" #", 8k, detailed shadows, detailed quality lighting, (hyperrealistic:1.2), absurdres, realistic, hyperdetailed skin, intrinsically detailed face, perfect hair, (hyper detailed), love of details"
    prompt_except_lora = "photo of " + activation_word + "," + prompt + base_prompt_suffix if prompt != "" else ("photo of " + activation_word + base_prompt_suffix)
    complete_prompt = prompt_except_lora + " <lora:" + model_lora + ":1>" if model_lora != "" else prompt_except_lora
    complete_prompt = complete_prompt + " <lora:add_detail:" + str(add_detail_lora) + ">" if str(add_detail_lora) != 0 else complete_prompt
    complete_prompt = complete_prompt + " <lora:LowRA:" + str(lowra_lora) + ">" if str(lowra_lora) != 0 else complete_prompt

    return complete_prompt

def get_negative_prompt(
                      add_negatives=""
                      ):
    negative_prompt = "(deformed iris, deformed pupils, semi-realistic, cgi, 3d, render, sketch, cartoon, drawing, anime, mutated hands and fingers:1.4), (deformed, distorted, disfigured:1.3), poorly drawn, bad anatomy, wrong anatomy, extra limb, missing limb, floating limbs, disconnected limbs, mutation, mutated, ugly, disgusting, amputation"
    if add_negatives != "":
        negative_prompt = add_negatives + ", " + negative_prompt
    return negative_prompt

def get_default_steps():
    return 30

def get_sampler():
    return "DPM++ 2M SDE Karras"

def get_cfg_scale():
    return 7

def get_default_width(hr = False):
    if hr:
        return 1024
    return 512
def get_default_height(hr = False):
    if hr:
        return 1536
    return 768

def get_adetailer_params(
                        ad_model = "face_yolov8n.pt",
                        ad_prompt = "",
                        model_lora = ""
                        ):
    base_prompt = activation_word + " face" + ", " + ad_prompt if ad_prompt != "" else activation_word + " face"
    final_prompt = base_prompt + ", <lora:" + model_lora + ":1>" if model_lora != "" else base_prompt

    adetailer_params = {
                        "args": [
                                {
                                    "ad_model": ad_model,
                                    "ad_prompt": final_prompt,
                                    "ad_negative_prompt": "",
                                    "ad_sampler": get_sampler(),
                                    "ad_restore_face": True,
                                    "ad_controlnet_model": "None",
                                    "ad_controlnet_module": None,
                                    "ad_controlnet_weight": 1.0,
                                    "ad_controlnet_guidance_start": 0.0,
                                    "ad_controlnet_guidance_end": 1.0
                                }
                        ]
                      }
    return adetailer_params

def get_controlnet_params():
    controlnet_params = {
                            "args": [
                                {
                                    "module": "tile_resample",
                                    "model": "control_v11f1e_sd15_tile [a371b31b]",
                                    "resize_mode": "Just Resize",
                                    "control_mode": "ControlNet is more important"
                                }
                            ]
                        }
    return controlnet_params

    # For reference
    # def run(self, p, _, tile_width, tile_height, mask_blur, padding, seams_fix_width, seams_fix_denoise, seams_fix_padding,
    #         upscaler_index, save_upscaled_image, redraw_mode, save_seams_fix_image, seams_fix_mask_blur,
    #         seams_fix_type, target_size_type, custom_width, custom_height, custom_scale):

def get_ultimateSDUpscale_params(scale_up = 2):
    ultimateSDUpscale_params = ["",512,0,8,32,64,0.35,32,5,True,0,False,8,0,2,2048,3072,scale_up]
    return ultimateSDUpscale_params


def create_text2img_payload(
                      prompt,
                      negative_prompt,
                      seed,
                      steps,
                      sampler,
                      width,
                      height,
                      cfg_scale,
                      adetailer_payload = None,
                      ):

    alwayson_scripts = {}
    if adetailer_payload:
        alwayson_scripts["ADetailer"] = adetailer_payload

    payload = {
        "prompt": prompt,
        "steps": steps,
        "seed": seed,
        "negative_prompt": negative_prompt,
        "sampler_name": sampler,
        "cfg_scale": cfg_scale,
        "width": width,
        "height": height,
        "alwayson_scripts": alwayson_scripts
    }

    return payload

def create_img2img_payload(
                      init_images,
                      prompt,
                      negative_prompt,
                      seed,
                      steps,
                      sampler,
                      width,
                      height,
                      cfg_scale,
                      denoising_strength,
                      adetailer_payload = None,
                      controlnet_payload = None,
                      ultimateSDUpscale_args = None
                      ):
    images = []
    for init_image in init_images:
        img_bytes = io.BytesIO()
        init_image.save(img_bytes, format='PNG')
        img_base64 = base64.b64encode(img_bytes.getvalue()).decode('utf-8')
        images.append(img_base64)


    alwayson_scripts = {}
    if adetailer_payload:
        alwayson_scripts["ADetailer"] = adetailer_payload

    if controlnet_payload:
        alwayson_scripts["controlnet"] = controlnet_payload


    payload = {
        "init_images": images,
        "prompt": prompt,
        "steps": steps,
        "seed": seed,
        "negative_prompt": negative_prompt,
        "sampler_name": sampler,
        "cfg_scale": cfg_scale,
        "width": width,
        "height": height,
        "denoising_strength": denoising_strength,
        "alwayson_scripts": alwayson_scripts
    }

    if ultimateSDUpscale_args:
        payload["script_args"] = ultimateSDUpscale_args
        payload["script_name"] = "Ultimate SD upscale"

    return payload


def get_available_loras():
    response = requests.get(url=f'{url}/sdapi/v1/loras')
    r = response.json()
    loras = set()
    for obj in r:
        loras.add(obj["name"])
    print(loras)
    return list(loras)

def refresh_loras():
    requests.post(url=f'{url}/sdapi/v1/refresh-loras')


refresh_loras()
get_available_loras()


def create_sample_hr_images(
                            prompt="wearing dark green silk suit posing in parking garage, 8k",
                            add_detail_lora = 0.5,
                            lowra_lora = 0.5,
                            display_image = True,
                            seed = -1,
                            scale_up = 2
                            ):
    prompt = create_prompt_with_loras(
                          prompt = prompt,
                          model_lora = model_checkpoint,
                          add_detail_lora = add_detail_lora,
                          lowra_lora = lowra_lora
                          )

    adetailer_payload = get_adetailer_params(ad_model = "face_yolov8n.pt", model_lora = model_checkpoint)

    payload = create_text2img_payload(prompt = prompt,
                            negative_prompt = get_negative_prompt(),
                            seed = seed,
                            steps = get_default_steps(),
                            sampler = get_sampler(),
                            width = get_default_width(),
                            height = get_default_height(),
                            cfg_scale = get_cfg_scale(),
                            adetailer_payload = adetailer_payload
                            )
    images = text2img(payload = payload, display_image = display_image)


    controlnet_payload = get_controlnet_params()

    ultimateSDUpscale_args = get_ultimateSDUpscale_params(scale_up = scale_up)

    img2img_payload = create_img2img_payload(
                                            init_images = images,
                                            prompt = prompt,
                                            negative_prompt = get_negative_prompt(),
                                            seed = seed,
                                            steps = get_default_steps(),
                                            sampler = get_sampler(),
                                            width = get_default_width(hr = True),
                                            height = get_default_height(hr = True),
                                            cfg_scale = get_cfg_scale(),
                                            denoising_strength = 0.2,
                                            adetailer_payload = adetailer_payload,
                                            controlnet_payload = controlnet_payload,
                                            ultimateSDUpscale_args = ultimateSDUpscale_args
                                            )

    upscaled_images = img2img(payload = img2img_payload, display_image = display_image)
    return upscaled_images


In [None]:
# Dark Green Silk Suit
create_sample_hr_images(
                        prompt="wearing dark green silk suit posing in parking garage, 8k",
                        add_detail_lora = 0.5,
                        lowra_lora = 0.5,
                        display_image = False
                        )


In [None]:
# Dark Night Setting Closeup
create_sample_hr_images(
                        prompt="full body shot, 8k, detailed quality lighting, (hyperrealistic:1.2), absurdres, realistic, hyperdetailed skin, intrinsically detailed face, perfect hair, (hyper detailed), love of details",
                        add_detail_lora = 1,
                        lowra_lora = 1,
                        display_image = False
                        )

In [None]:
# Luxury Villa Shorts pic
create_sample_hr_images(
                        prompt="wearing black swim shorts standing in front of luxury design villa",
                        add_detail_lora = 0.5,
                        lowra_lora = 0.5,
                        display_image = False
                        )

In [None]:
# Smiling in front of destruction behind
create_sample_hr_images(
                        prompt="standing in front of burning house and many burning cars, mischievous smile, teal aesthetic, hero shot, tumblr, realism, sharp focus cinematic lighting, cinestill eastmancolor, scene from church, porcelain, windy mood, moody, green, (((cinematic)))",
                        add_detail_lora = 1,
                        lowra_lora = 1,
                        display_image = False,
                        )

In [None]:
refresh_loras()
get_available_loras()
get_base_model()

In [None]:
get_base_model()

In [None]:
plt.imshow(upscaled_images[0])
plt.axis('off') # To turn off axes
plt.show()
