In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os

# Path to your Google Drive
base_path = "/content/drive/MyDrive"

# Folder you want to create
folder_name = "txt2img"

# Full path
folder_path = os.path.join(base_path, folder_name)

# Create if not exist (no error if already exists)
os.makedirs(folder_path, exist_ok=True)

print(f"Folder ready at: {folder_path}")


In [None]:
!pip install -q --upgrade pyngrok fastapi "uvicorn[standard]" nest-asyncio

# Txt2img

In [None]:
%cd /content
!git clone -b totoro3 https://github.com/camenduru/ComfyUI /content/TotoroUI
%cd /content/TotoroUI

In [None]:
!pip install -q torchsde einops diffusers accelerate xformers==0.0.27.post2
!apt -y install -qq aria2

In [None]:
!pip install -q torchvision

In [None]:
from huggingface_hub import hf_hub_download
import os

# Make sure target folder exists
os.makedirs("/content/TotoroUI/models/unet", exist_ok=True)

# Download into custom folder (no symlinks)
path = hf_hub_download(
    repo_id="black-forest-labs/FLUX.1-schnell",
    filename="flux1-schnell.safetensors",
    local_dir="/content/TotoroUI/models/unet",
    token=""  # optional if logged in
)

print("Downloaded to:", path)


In [None]:
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/camenduru/FLUX.1-dev/resolve/main/ae.sft -d /content/TotoroUI/models/vae -o ae.sft
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/camenduru/FLUX.1-dev/resolve/main/clip_l.safetensors -d /content/TotoroUI/models/clip -o clip_l.safetensors
!aria2c --console-log-level=error -c -x 16 -s 16 -k 1M https://huggingface.co/camenduru/FLUX.1-dev/resolve/main/t5xxl_fp8_e4m3fn.safetensors -d /content/TotoroUI/models/clip -o t5xxl_fp8_e4m3fn.safetensors

In [None]:
import random
import torch
import numpy as np
import PIL
import os

# Change directory to access the nodes and totoro_extras modules
%cd /content/TotoroUI

# Import necessary modules from the TotoroUI directory
import nodes
from nodes import NODE_CLASS_MAPPINGS
from totoro_extras import nodes_custom_sampler
from totoro import model_management

DualCLIPLoader = NODE_CLASS_MAPPINGS["DualCLIPLoader"]()
UNETLoader = NODE_CLASS_MAPPINGS["UNETLoader"]()
RandomNoise = nodes_custom_sampler.NODE_CLASS_MAPPINGS["RandomNoise"]()
BasicGuider = nodes_custom_sampler.NODE_CLASS_MAPPINGS["BasicGuider"]()
KSamplerSelect = nodes_custom_sampler.NODE_CLASS_MAPPINGS["KSamplerSelect"]()
BasicScheduler = nodes_custom_sampler.NODE_CLASS_MAPPINGS["BasicScheduler"]()
SamplerCustomAdvanced = nodes_custom_sampler.NODE_CLASS_MAPPINGS["SamplerCustomAdvanced"]()
VAELoader = NODE_CLASS_MAPPINGS["VAELoader"]()
VAEDecode = NODE_CLASS_MAPPINGS["VAEDecode"]()
EmptyLatentImage = NODE_CLASS_MAPPINGS["EmptyLatentImage"]()

with torch.inference_mode():
    clip = DualCLIPLoader.load_clip("t5xxl_fp8_e4m3fn.safetensors", "clip_l.safetensors", "flux")[0]
    unet = UNETLoader.load_unet("flux1-schnell.safetensors", "fp8_e4m3fn")[0]
    vae = VAELoader.load_vae("ae.sft")[0]

def closestNumber(n, m):
    q = int(n / m)
    n1 = m * q
    if (n * m) > 0:
        n2 = m * (q + 1)
    else:
        n2 = m * (q - 1)
    if abs(n - n1) < abs(n - n2):
        return n1
    return n2

In [None]:
def generate_image(prompt, img_name = "sample"):
    with torch.inference_mode():
        positive_prompt = prompt
        width = 1024
        height = 1024
        seed = 0
        steps = 4
        sampler_name = "euler"
        scheduler = "simple"

        if seed == 0:
            seed = random.randint(0, 18446744073709551615)

        cond, pooled = clip.encode_from_tokens(clip.tokenize(positive_prompt), return_pooled=True)
        cond = [[cond, {"pooled_output": pooled}]]
        noise = RandomNoise.get_noise(seed)[0]
        guider = BasicGuider.get_guider(unet, cond)[0]
        sampler = KSamplerSelect.get_sampler(sampler_name)[0]
        sigmas = BasicScheduler.get_sigmas(unet, scheduler, steps, 1.0)[0]
        latent_image = EmptyLatentImage.generate(closestNumber(width, 16), closestNumber(height, 16))[0]
        sample, sample_denoised = SamplerCustomAdvanced.sample(noise, guider, sampler, sigmas, latent_image)
        model_management.soft_empty_cache()
        decoded = VAEDecode.decode(vae, sample)[0].detach()
        PIL.Image.fromarray(np.array(decoded*255, dtype=np.uint8)[0]).save(f"/content/drive/MyDrive/txt2img/{img_name}.png")

In [None]:
from IPython.display import Image, display
generate_image(
    prompt = "A cat holding a sign that says hello world",
    img_name = "sample"
)
display(Image(filename="/content/drive/MyDrive/txt2img/sample.png"))

In [None]:
from fastapi import FastAPI, Request, BackgroundTasks

app = FastAPI()

def generate_image_task(prompt: str, img_name: str):
    generate_image(
        prompt = prompt,
        img_name = img_name
    )

@app.post("/txt2img")
async def txt2img(request: Request, background_tasks: BackgroundTasks):
    data = await request.json()
    prompt = data["prompt"]
    img_name = data["img_name"]

    # Schedule the heavy function in the background
    background_tasks.add_task(generate_image_task, prompt, img_name)

    # Immediately return response
    return {"status": "ok", "msg": "Image generation started ..."}


In [None]:
from getpass import getpass

os.environ['NGROK_AUTHTOKEN'] = getpass()

In [None]:
import uvicorn
import nest_asyncio
from pyngrok import ngrok
import threading

nest_asyncio.apply()
port = 8002
public_url = ngrok.connect(port).public_url
print("Public URL:", public_url)

def run():
    uvicorn.run(app, host="0.0.0.0", port=port)

threading.Thread(target=run, daemon=True).start()

In [None]:
import time
while True:
  time.sleep(30)
  print("live")