In [None]:
#@title Control Board {display-mode: "form"}
"""Interactive launchpad with tabs for the main notebook workflows."""
import json
from pathlib import Path

import ipywidgets as widgets
import pandas as pd
from IPython.display import display

from src.config import PlaygroundConfig
from src.download import DownloadManager
from src.env import get_env_summary
from src.notebook_ui import control_board, toast
from src.queue import is_paused, list_items, purge_completed, retry_item, set_paused

cfg = PlaygroundConfig.load()
cfg.ensure_directories()

# Environment tab content
env_summary = get_env_summary(cfg)

# Downloads tab widgets
manifest_dropdown = widgets.Dropdown(description="Manifest")
reload_manifests = widgets.Button(description="Reload", button_style="info")
queue_button = widgets.Button(description="Queue downloads", button_style="success")
manifest_output = widgets.Output(layout=widgets.Layout(max_height="220px", overflow_y="auto"))
progress_label = widgets.HTML()

def _manifest_options():
    files = sorted(cfg.manifests_dir.glob("*.json"))
    if not files:
        return [("No manifests", "")]
    return [(f.name, str(f)) for f in files]

def _refresh_manifests(_=None):
    manifest_output.clear_output()
    progress_label.value = ""
    options = _manifest_options()
    manifest_dropdown.options = options
    manifest_dropdown.value = options[0][1] if options and options[0][1] else ""

reload_manifests.on_click(_refresh_manifests)
_refresh_manifests()

def _preview_manifest(change):
    path = change["new"]
    manifest_output.clear_output()
    if not path:
        with manifest_output:
            display(widgets.HTML("<b>No manifest selected.</b>"))
        return
    data = json.loads(Path(path).read_text(encoding="utf-8"))
    rows = data.get("items", [])
    with manifest_output:
        if not rows:
            display(widgets.HTML("<b>Manifest is empty.</b>"))
        else:
            df = pd.DataFrame(rows)
            display(df.head(10))
            display(widgets.HTML(f"<small>Total items: {len(rows)}</small>"))

manifest_dropdown.observe(_preview_manifest, names="value")

def _queue_downloads(_):
    path = manifest_dropdown.value
    if not path:
        toast("Select a manifest first", level="warning")
        return
    data = json.loads(Path(path).read_text(encoding="utf-8"))
    items = data.get("items", [])
    if not items:
        toast("Manifest has no items", level="warning")
        return
    manager = DownloadManager()
    def _progress(state):
        downloaded = state.get("downloaded_bytes", 0)
        total = state.get("total_bytes", -1)
        if total and total > 0:
            pct = downloaded / total * 100
            progress_label.value = f"{pct:.1f}% ({downloaded} / {total} bytes)"
        else:
            progress_label.value = f"{downloaded} bytes downloaded"
    manager.progress_callback = _progress
    for item in items:
        manager.add_item(item)
    manager.run()
    toast(f"Downloaded {len(items)} manifest items", level="success")
    progress_label.value += " — done"

queue_button.on_click(_queue_downloads)

downloads_panel = widgets.VBox([
    manifest_dropdown,
    widgets.HBox([reload_manifests, queue_button]),
    manifest_output,
    progress_label,
])

# Flow Studio tab widgets
queue_stats = widgets.HTML()
queue_output = widgets.Output(layout=widgets.Layout(max_height="220px", overflow_y="auto"))
refresh_queue = widgets.Button(description="Refresh queue", button_style="info")
retry_failed = widgets.Button(description="Retry failed", button_style="warning")
purge_done = widgets.Button(description="Purge done", button_style="danger")
pause_toggle = widgets.ToggleButton(value=is_paused(), description="Pause queue")

def _refresh_queue(_=None):
    rows = list_items()
    queue_stats.value = f"<b>Queue:</b> {len(rows)} items | paused: {is_paused()}"
    queue_output.clear_output()
    if not rows:
        with queue_output:
            display(widgets.HTML("<i>No items in queue.</i>"))
    else:
        df = pd.DataFrame(rows)
        with queue_output:
            display(df.head(15))

refresh_queue.on_click(_refresh_queue)

def _retry(_):
    for row in list_items(status="failed"):
        retry_item(row["id"])
    toast("Retried failed queue items", level="success")
    _refresh_queue()

def _purge(_):
    purge_completed()
    toast("Purged completed queue items", level="warning")
    _refresh_queue()

def _pause(change):
    set_paused(change["new"])
    toast(f"Queue paused: {change['new']}", level="info")
    _refresh_queue()

retry_failed.on_click(_retry)
purge_done.on_click(_purge)
pause_toggle.observe(_pause, names="value")
_refresh_queue()

flow_panel = widgets.VBox([
    queue_stats,
    widgets.HBox([refresh_queue, retry_failed, purge_done, pause_toggle]),
    queue_output,
])

voice_panel = widgets.VBox([widgets.HTML("<b>Voice benchmarks</b>: run the Audio Pipeline Benchmark Suite cells below to populate results.")])

utilities_panel = widgets.VBox([widgets.HTML("Use Maintenance Utilities section for pruning, diagnostics, and backups.")])

board = control_board(env_summary, downloads_panel, flow_panel, voice_panel, utilities_panel)
display(board)
toast("Control board ready. Expand accordion sections below in order.", level="info")

## Table of Contents
1. [Control Board](#control-board)
2. [Setup & Environment](#setup-environment)
3. [Downloads & Manifests](#downloads-manifests)
4. [Flow Studio](#flow-studio)
5. [Voice Lab](#voice-lab)
6. [Maintenance Utilities](#maintenance-utilities)
7. [Advanced Playbooks](#advanced-playbooks)

### <a id="control-board"></a>Control Board

# ComfyUI Playground Refresh
This notebook orchestrates installs, diagnostics, queue control, and creative workflows for ComfyUI when running inside Google Colab (Drive-backed).
- **Start Here:** Review the Control Board in Cell 2 for live status and shortcuts.
- **Playbook:** Each accordion section ends with a "Run next" checklist so you always know the next action.

# README — ComfyUI Colab Playground

Purpose
- Quick Colab playground to install and run ComfyUI from Google Drive, expose it via Cloudflare (cloudflared), and run demo flows and job scaffolds for T2I, T2V, I2V, and LORA workflows.

Key env vars
- `CIVITAI_API_TOKEN` — used for Civitai API downloads (keep secret).
- `HUGGINGFACE_TOKEN` — optional, for Hugging Face access if needed.
- `COMFYUI_API_KEY` — optional bearer token used by demo cells when calling the ComfyUI API via the public tunnel. If set, demo cells will send Authorization: Bearer <token> and programmatic demos will auto-enable API mode.

Endpoint & health guidance (important)
- Before running any programmatic demo cell, run the "Endpoint & Health-check helper" cell (below Options) and then run the "ComfyUI health-check" (Cell 1) to confirm which endpoint will be used.
- Demos prefer `localhost:8188` if ComfyUI is reachable locally; otherwise they will use the Cloudflare public URL parsed into `COMFYUI_PUBLIC_URL`.
- If you expose the GUI via Cloudflare, secure it by setting `COMFYUI_API_KEY` (the notebook will include the Authorization header automatically when present).

How to use
1. Mount Drive (run the Drive cell).
2. Set options: enable `INSTALL_COMFYUI` and `START_CLOUDFLARED` as needed in the Options cell.
3. Run the install cell (creates venv in Drive), then run the venv torch-check cell that prints torch versions from the venv.
4. Start ComfyUI using the venv python (the install cell prints the command). Then run the Tunnel cell to obtain the public URL if needed.
5. Run the Endpoint helper and the health-check cell to confirm connectivity. Only then run programmatic demo cells.

Notes
- Colab sessions are ephemeral; running persistent runners is best done on a VM (instructions included). Save models/flows to Drive to persist them.
- Model downloads that require login are intentionally commented/guarded to avoid accidental requests.
- The notebook caches wheels in `DRIVE_ROOT/ComfyUI/wheel_cache` to reduce repeated downloads between sessions.


In [None]:
#@title Workflow Playbook {display-mode: "form"}
"""Accordion summarising the key notebook phases with next steps."""
import ipywidgets as widgets

from src.notebook_ui import accordion_from_sections, checklist

setup_steps = checklist([
    "Run the Drive Mount cell",
    "Configure toggles in Options",
    "Execute Install / Update ComfyUI",
])

storage_details = widgets.HTML("<b>Storage:</b> Drive root is managed under config.yaml; see Maintenance section for pruning utilities.")
setup_panel = widgets.VBox([setup_steps, storage_details])

downloads_steps = checklist([
    "Review manifests under Drive manifests/",
    "Use Download Manager cell to queue assets",
    "Monitor progress via Control Board → Downloads tab",
])
downloads_panel = widgets.VBox([downloads_steps])

flow_steps = checklist([
    "Select template in Flow Studio UI",
    "Compose flow JSON via src.flows",
    "Submit job to queue",
])
flow_panel = widgets.VBox([flow_steps])

voice_steps = checklist([
    "Configure STT/TTS providers in Voice Lab",
    "Run benchmark cell to capture latency",
    "Link outputs into Flow Studio",
])
voice_panel = widgets.VBox([voice_steps])

maintenance_steps = checklist([
    "Rotate manifests and configs",
    "Run diagnostics bundle exporter",
    "Prune stale artifacts via utilities",
])
maintenance_panel = widgets.VBox([maintenance_steps])

advanced_steps = checklist([
    "Launch LoRA / Kohya training",
    "Trigger Wan2.1 T2V/I2V pipelines",
    "Persist session profile JSON",
])
advanced_panel = widgets.VBox([advanced_steps])

sections = {
    "Setup & Environment": setup_panel,
    "Downloads & Manifests": downloads_panel,
    "Flow Studio": flow_panel,
    "Voice Lab": voice_panel,
    "Maintenance Utilities": maintenance_panel,
    "Advanced Playbooks": advanced_panel,
}

accordion = accordion_from_sections(sections)
accordion.selected_index = 0
display(accordion)

# ComfyUI Playground — Google Colab (final)

Extended notebook: Drive-backed ComfyUI install, tunnel support, T2I/I2I/T2V/I2V scaffolds, multi-LORA workflows, and Kohya-SS training wiring.

Set these env vars before running model downloads: `CIVITAI_API_TOKEN` (preferred), `HUGGINGFACE_TOKEN` (optional).

### <a id="setup-environment"></a>Setup & Environment

In [5]:
#@title Configuration (edit if needed)
DRIVE_ROOT = '/content/drive/MyDrive/ComfyUI'  #@param {type: 'string'}
MODELS_DIR = DRIVE_ROOT + '/models'
LORAS_DIR = MODELS_DIR + '/loras'
GGUF_DIR = MODELS_DIR + '/gguf'
ARTIFACTS_DIR = DRIVE_ROOT + '/artifacts'
CONFIG_DIR = DRIVE_ROOT + '/config'
import os
CIVITAI_API_TOKEN = os.environ.get('CIVITAI_API_TOKEN', '')
HUGGINGFACE_TOKEN = os.environ.get('HUGGINGFACE_TOKEN', '')
print('Drive root:', DRIVE_ROOT)
print('CIVITAI token set?', bool(CIVITAI_API_TOKEN))

Drive root: /content/drive/MyDrive/ComfyUI
CIVITAI token set? True


In [6]:
#@title Mount Google Drive (run once per session)
from pathlib import Path
from google.colab import drive
print('Mounting Drive...')
drive.mount('/content/drive')
for p in [DRIVE_ROOT, MODELS_DIR, LORAS_DIR, GGUF_DIR, ARTIFACTS_DIR, CONFIG_DIR]:
  Path(p).mkdir(parents=True, exist_ok=True)
sentinel = Path(DRIVE_ROOT) / '.copilot_sentinel'
if not sentinel.exists():
  sentinel.write_text('created by comfyui_playground')
print('Drive layout ensured')

ModuleNotFoundError: No module named 'google'

## Options — toggles (set before running heavy cells)

Edit the booleans below to control installs, updates and which demo mode the notebook uses (programmatic vs manual).

In [None]:
#@title Options\nINSTALL_COMFYUI = True  #@param {type:'boolean'}\nUPDATE_COMFYUI = False  #@param {type:'boolean'}\nSTART_CLOUDFLARED = True  #@param {type:'boolean'}\nSTART_LOCALTUNNEL = False  #@param {type:'boolean'}\nUSE_COMFYUI_API = False  #@param {type:'boolean'}\nRUN_DEMOS = True  #@param {type:'boolean'}

In [None]:
#@title Endpoint & Health-check helper
# Defines CURRENT_INSTANCE, COMFYUI_API_BASE, COMFYUI_API_KEY, HEADERS and health_check()
import os, socket, requests

LOCAL_HOST = 'http://127.0.0.1:8188'
CF_URL = os.environ.get('COMFYUI_PUBLIC_URL', '')

def localhost_reachable(timeout=0.3):
    try:
        host, port = '127.0.0.1', 8188
        with socket.create_connection((host, port), timeout):
            return True
    except Exception:
        return False

COMFYUI_IS_LOCAL = localhost_reachable()
if COMFYUI_IS_LOCAL:
    CURRENT_INSTANCE = LOCAL_HOST
else:
    CURRENT_INSTANCE = CF_URL or LOCAL_HOST

COMFYUI_API_BASE = CURRENT_INSTANCE.rstrip('/')
COMFYUI_API_KEY = os.environ.get('COMFYUI_API_KEY', '')

# Prepare headers if an API key exists (Authorization: Bearer <key>)
HEADERS = {}
if COMFYUI_API_KEY:
    HEADERS['Authorization'] = f'Bearer {COMFYUI_API_KEY}'

# Simple health check helper
def health_check(timeout=1.0):
    try:
        r = requests.get(COMFYUI_API_BASE + '/', timeout=timeout)
        return r.status_code, r.text[:200]
    except Exception as e:
        return None, str(e)

print('Endpoint helper initialized — CURRENT_INSTANCE =', CURRENT_INSTANCE)
print('COMFYUI_IS_LOCAL =', COMFYUI_IS_LOCAL)
print('COMFYUI_API_KEY set?', bool(COMFYUI_API_KEY))

In [None]:
#@title ComfyUI health-check (run after endpoint helper)
import os
from IPython.display import Markdown, display

if 'COMFYUI_API_BASE' not in globals():
    print('Endpoint helper not initialized — run the previous cell first')
else:
    status, info = globals().get('health_check', lambda *a, **k: (None, 'no health_check'))()
    if status is None:
        display(Markdown(f"**Health check failed** — error: `{info}`"))
    else:
        display(Markdown(f"**ComfyUI responded with status `{status}`** — preview: `{info}`"))
    print('CURRENT_INSTANCE =', globals().get('COMFYUI_API_BASE'))
    print('COMFYUI_IS_LOCAL =', globals().get('COMFYUI_IS_LOCAL'))

In [None]:
# Maintenance Utilities (invoke via control board)
from src.maintenance import prune_artifacts, rotate_manifest_backups, validate_env_vars
from src.config import PlaygroundConfig

cfg = PlaygroundConfig.load()

print('Pruned artifacts (24h):', prune_artifacts(config=cfg))
print('Rotated manifests:', rotate_manifest_backups(config=cfg))
print('Env vars ready:', validate_env_vars(['CIVITAI_API_TOKEN', 'COMFYUI_API_KEY'], config=cfg))

## Install / Update ComfyUI (idempotent)

Clones ComfyUI into Drive and creates a venv under the ComfyUI folder.

In [None]:
#@title Install or update ComfyUI (venv in Drive, cached wheels)
"""
This cell creates a persistent venv under DRIVE_ROOT/ComfyUI/venv, caches downloaded wheels
into DRIVE_ROOT/ComfyUI/wheel_cache to avoid repeated downloads, installs core deps into
that venv (torch wheel selection with fallbacks), optionally installs xformers non-fatally,
clones ComfyUI and ComfyUI-Manager into Drive, and runs the Manager colab-deps script
if present.

Run this cell once (or set UPDATE_COMFYUI=True to pull latest and re-run installs).
"""
import os, subprocess, sys, shutil
from pathlib import Path

# Ensure DRIVE_ROOT is available (set in the notebook's config cell)
DRIVE_ROOT = os.environ.get('DRIVE_ROOT', '/content/drive/MyDrive/ComfyUI')
COMFY_DIR = Path(DRIVE_ROOT)/'ComfyUI'
VENV_DIR = COMFY_DIR/'venv'
PY = VENV_DIR/'bin'/'python'
PIP = VENV_DIR/'bin'/'pip'
WHEEL_CACHE = COMFY_DIR/'wheel_cache'

# Options from the options cell - fall back to defaults if not set
INSTALL_COMFYUI = globals().get('INSTALL_COMFYUI', True)
UPDATE_COMFYUI = globals().get('UPDATE_COMFYUI', False)

print('Drive root:', DRIVE_ROOT)
print('Install requested?', INSTALL_COMFYUI, 'Update requested?', UPDATE_COMFYUI)

if not INSTALL_COMFYUI:
  print('Skipping install per options')
else:
  # Create directories
  Path(DRIVE_ROOT).mkdir(parents=True, exist_ok=True)
  WHEEL_CACHE.mkdir(parents=True, exist_ok=True)
  COMFY_DIR.mkdir(parents=True, exist_ok=True)

  # Clone or update ComfyUI
  if not (COMFY_DIR/'main.py').exists():
    print('Cloning ComfyUI into Drive...')
    subprocess.run(['git', 'clone', 'https://github.com/ComfyUI/ComfyUI.git', str(COMFY_DIR)], check=True)
  elif UPDATE_COMFYUI:
    print('Updating ComfyUI...')
    subprocess.run(['git', '-C', str(COMFY_DIR), 'pull'], check=True)
  else:
    print('ComfyUI present; skipping clone/pull')

  # Create venv if missing
  if not VENV_DIR.exists():
    print('Creating venv at', VENV_DIR)
    import venv
    venv.create(VENV_DIR, with_pip=True)
  else:
    print('Using existing venv at', VENV_DIR)

  # Helper to run commands using the venv python/pip
  def venv_run(cmd, **kwargs):
    print('>', ' '.join(cmd))
    return subprocess.run(cmd, **kwargs)

  # Ensure pip/setuptools/wheel are up to date in venv
  venv_run([str(PIP), 'install', '--upgrade', 'pip', 'setuptools', 'wheel'], check=True)

  # Install core Python packages (pure python / small wheels)
  core_pkgs = [
    'accelerate', 'einops', 'safetensors>=0.4.2', 'aiohttp', 'pyyaml', 'Pillow', 'scipy',
    'tqdm', 'psutil', 'tokenizers>=0.13.3', 'kornia>=0.7.1', 'soundfile', 'sentencepiece', 'spandrel',
    'transformers>=4.28.1', 'opencv-python'
  ]
  try:
    venv_run([str(PIP), 'install', '--upgrade'] + core_pkgs + ['--cache-dir', str(WHEEL_CACHE)], check=True)
  except subprocess.CalledProcessError as e:
    print('Core package install failed:', e)

  # Torch wheel strategy: try common CUDA wheels then CPU fallback
  torch_candidates = [
    ('cu121', 'https://download.pytorch.org/whl/cu121'),
    ('cu122', 'https://download.pytorch.org/whl/cu122'),
    ('cpu',   'https://download.pytorch.org/whl/cpu')
  ]
  installed_torch = False
  for tag, index_url in torch_candidates:
    try:
      print(f'Trying torch wheel for {tag} (index: {index_url})')
      venv_run([str(PIP), 'install', 'torch', 'torchvision', 'torchaudio', '--index-url', index_url, '--cache-dir', str(WHEEL_CACHE)], check=True)
      installed_torch = True
      print('Installed torch for', tag)
      break
    except subprocess.CalledProcessError:
      print('Torch wheel install failed for', tag)

  if not installed_torch:
    print('Warning: torch installation failed for all candidates — verify CUDA/runtime and consider manual wheel selection')

  # Optional installs that may fail depending on CUDA/toolchain — non-fatal
  optional_pkgs = ['torchsde']
  for pkg in optional_pkgs:
    try:
      venv_run([str(PIP), 'install', pkg, '--cache-dir', str(WHEEL_CACHE)], check=True)
    except subprocess.CalledProcessError:
      print(pkg, 'install failed (non-fatal)')

  # Attempt xformers but continue on failure
  try:
    print('Attempting xformers (optional). This may fail for some CUDA/pytorch combos.')
    venv_run([str(PIP), 'install', 'xformers!=0.0.18', '--cache-dir', str(WHEEL_CACHE)], check=True)
    print('xformers installed')
  except subprocess.CalledProcessError as e:
    print('xformers install failed (non-fatal):', e)

  # Clone/Update ComfyUI-Manager in custom_nodes if requested (guarded)
  CN_DIR = COMFY_DIR/'custom_nodes'/'ComfyUI-Manager'
  if not CN_DIR.exists():
    (COMFY_DIR/'custom_nodes').mkdir(parents=True, exist_ok=True)
    print('Cloning ComfyUI-Manager...')
    venv_run(['git', 'clone', 'https://github.com/ltdrdata/ComfyUI-Manager', str(CN_DIR)], check=True)
  else:
    print('ComfyUI-Manager present; pulling latest')
    venv_run(['git', '-C', str(CN_DIR), 'pull'], check=True)

  # Run Manager colab-dependencies script if present — use venv python
  colab_dep = CN_DIR/'scripts'/'colab-dependencies.py'
  if colab_dep.exists():
    print('Running ComfyUI-Manager colab-dependencies.py with venv python (guarded)')
    try:
      venv_run([str(PY), str(colab_dep)], check=True)
    except subprocess.CalledProcessError as e:
      print('Manager colab-deps failed (non-fatal):', e)
  else:
    print('No ComfyUI-Manager colab-dependencies script found')

  print('\nInstall complete. To start ComfyUI (headless) with the venv run:')
  print(f'{PY} {COMFY_DIR}/main.py --headless --port 8188')

  # After install: print venv torch versions for quick verification
  try:
    print('\nVerifying torch inside venv:')
    venv_run([str(PY), '-c', 'import torch; print(torch.__version__, torch.version.cuda)'])
  except Exception as e:
    print('Failed to inspect torch in venv (non-fatal):', e)

# End install cell


## Model manager — prefilled Civitai downloads (comment/uncomment to use)

Each block includes a title comment and any special notes (NSFW, login required). Use `CIVITAI_API_TOKEN` environment variable for API downloads. If a model requires web login or is private, you'll need to download via the browser or the Civitai web UI.

### <a id="downloads-manifests"></a>Downloads & Manifests

In [None]:
#@title Model download examples (prefilled)
# NOTE: Set CIVITAI_API_TOKEN in env before uncommenting wget lines.
# Example: import os; os.environ['CIVITAI_API_TOKEN']='your_token'

# 1) Model ID 1624818 — SafeTensor API download (may require login)
# wget pattern (SafeTensor fp16):
# !wget "https://civitai.com/api/download/models/1624818?type=Model&format=SafeTensor&size=full&fp=fp16&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{LORAS_DIR}"

# 2) CyberRealistic Pony (ID 443821) — checkpoint (SafeTensor). See recommended settings in model page.
# !wget "https://civitai.com/api/download/models/443821?type=Model&format=SafeTensor&size=full&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{MODELS_DIR}/cyberrealistic"

# 3) Doggy-Style (ID 1741501) — NSFW, login required. Use web UI if API returns login required.
# (if available via API):
# !wget "https://civitai.com/api/download/models/1741501?type=Model&format=SafeTensor&size=full&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{MODELS_DIR}/wan"

# 4) WAN POV Cowgirl insertion (ID 2048863) — NSFW
# !wget "https://civitai.com/api/download/models/2048863?type=Model&format=GGUF&size=full&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{GGUF_DIR}"

# 5) QWEN-4 play LoRA (ID 2004155) — LoRA (may be LORA format). Check model page for exact file names and formats.
# !wget "https://civitai.com/api/download/models/2004155?type=Model&format=SafeTensor&size=full&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{LORAS_DIR}"

# 6) WAN-22 I2V (ID 2031069) — NSFW video-capable model; may have GGUF or special instructions.
# !wget "https://civitai.com/api/download/models/2031069?type=Model&format=GGUF&size=full&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{GGUF_DIR}"

# 7) PENIS LoRA / TAZ (ID 1476909) — NSFW LoRA. Login required for NSFW models on some pages.
# !wget "https://civitai.com/api/download/models/1476909?type=Model&format=SafeTensor&size=full&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{LORAS_DIR}"

# 8) POV Insertion WAN 2x (ID 1855263) — NSFW, video/wan style.
# !wget "https://civitai.com/api/download/models/1855263?type=Model&format=GGUF&size=full&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{GGUF_DIR}"

# 9) Oral Insertion WAN (ID 1874153) — NSFW
# !wget "https://civitai.com/api/download/models/1874153?type=Model&format=GGUF&size=full&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{GGUF_DIR}"

# 10) WAN-2221 POV Missionary (ID 1331682) — NSFW
# !wget "https://civitai.com/api/download/models/1331682?type=Model&format=GGUF&size=full&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{GGUF_DIR}"

# 11) CyberRealistic (ID 15003) — photorealistic SD base; recommended settings in page.
# !wget "https://civitai.com/api/download/models/15003?type=Model&format=SafeTensor&size=full&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{MODELS_DIR}/cyberrealistic"

# 12) ChilloutMix (ID 6424) — popular blended model (may require SafeTensor format).
# !wget "https://civitai.com/api/download/models/6424?type=Model&format=SafeTensor&size=full&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{MODELS_DIR}/chilloutmix"

# 13) WAN-22 Experimental (ID 1307155) — your provided WAN model link (GGUF/SafeTensor examples).
# GGUF example (preferred for Wan2 runners):
# !wget "https://civitai.com/api/download/models/1307155?type=Model&format=GGUF&size=full&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{GGUF_DIR}"
# Safetensor (alternate):
# !wget "https://civitai.com/api/download/models/1307155?type=Model&format=SafeTensor&size=full&fp=fp16&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{MODELS_DIR}/wan"

# --- New additions requested by user ---
# Add model 1064836 (user link: https://civitai.com/models/1064836?modelVersionId=993999)
# NOTE: Many NSFW models require login; keep this line commented until you set CIVITAI_API_TOKEN or download manually.
# !wget "https://civitai.com/api/download/models/1064836?modelVersionId=993999&format=SafeTensor&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{MODELS_DIR}/model_1064836"

# Add model 652699 (user link: https://civitai.com/models/652699?modelVersionId=993999)
# This appears to be a LoRA (Amateur Photography) — save into LORAS_DIR
# !wget "https://civitai.com/api/download/models/652699?modelVersionId=993999&format=SafeTensor&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{LORAS_DIR}"

print('Prefilled Civitai download blocks added as comments. Uncomment the lines you want to execute and ensure CIVITAI_API_TOKEN is set if required.')

In [None]:
#@title Per-model metadata: Civitai model 652699 — 'Amateur Photography' LoRA
# This model is a LoRA with recommended settings pulled from the model page where available.
MODEL_652699 = {
  'model_id': 652699,
  'modelVersionId': 993999,
  'type': 'LoRA',
  'gated': False,
  'download_template': f'!wget "https://civitai.com/api/download/models/652699?modelVersionId=993999&format=SafeTensor&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{{LORAS_DIR}}"',
  'recommended_settings': {
    'distilled_cfg_scale': 3.5,
    'sampler': 'DEIS with DDIM',
    'steps': 20,
    'resolution': '896x1152 or 880x1184 / 1328x1776',
    'hires_fix_upscaler': '4x_NMKD-Superscale-SP_178000_G',
    'hires_steps': 10,
    'hires_denoise': 0.3,
    'lora_weight': 0.8,
    'usage_tip': 'Try EasyCache (skip 2 steps) for faster results with little quality loss.'
  },
  'notes': 'Typical use: apply this LoRA at ~0.8 weight during sampling / hi-res steps. Adjust per prompt.'
}
print('Model metadata for 652699 registered. Inspect MODEL_652699 for recommended settings.')

# Example: to download after setting CIVITAI_API_TOKEN, copy and run MODEL_652699['download_template'] (uncomment).



In [None]:
#@title Per-model metadata: Civitai model 1064836 (user-supplied)
# Guarded metadata and a download template. This model page is NSFW/gated — you may need to login or use the API token.
MODEL_1064836 = {
  'model_id': 1064836,
  'modelVersionId': 993999,
  'type': 'image/checkpoint',
  'gated': True,
  'notes': 'NSFW-gated content. If the API download fails, download via web UI and upload to Drive. Example settings may be on model page.' ,
  'download_template': f'!wget "https://civitai.com/api/download/models/1064836?modelVersionId=993999&format=SafeTensor&CIVITAI_TOKEN=$CIVITAI_API_TOKEN" --content-disposition -P "{{MODELS_DIR}}/model_1064836"',
  'recommended_settings': {},
}
print('Model metadata for 1064836 registered. Inspect MODEL_1064836 for download template and notes.')

# Usage: if you set CIVITAI_API_TOKEN via the secure cell above, copy the download_template and run it (uncomment) to download.

In [3]:
#@title Set CIVITAI_API_TOKEN (secure, in-memory)
# Use this to set the CIVITAI token for the session without printing it.
import os, getpass
try:
    token = getpass.getpass('Enter CIVITAI_API_TOKEN (input hidden): ')
except Exception:
    # Fallback for environments where getpass may not hide input
    token = input('Enter CIVITAI_API_TOKEN (visible input): ')

if token:
    os.environ['CIVITAI_API_TOKEN'] = token
    # also expose a session variable for notebook code to read
    CIVITAI_API_TOKEN = token
    print('CIVITAI_API_TOKEN set in-memory for this session (not displayed).')
else:
    print('No token entered — token not set')

# Reminder: token is only stored in the notebook process memory and in os.environ for the session.
# Do NOT paste your token in chat. Restarting the runtime will clear it.

CIVITAI_API_TOKEN set in-memory for this session (not displayed).


### <a id="flow-studio"></a>Flow Studio

Use the Control Board → Flow Studio tab to inspect queue status and launch templates. The cells below mirror the original manual workflows for exercising flow composition.

In [None]:
#@title Flow regression tests (optional smoke check)
"""Run the lightweight pytest suite to confirm flow templates compose and validate."""
import subprocess
import sys
from pathlib import Path

print("Running pytest -q test_flows_regression.py...\n")
result = subprocess.run(
    [sys.executable, "-m", "pytest", "test_flows_regression.py", "-q"],
    cwd=str(Path.cwd()),
)

if result.returncode == 0:
    print("\nFlow regression tests passed.")
else:
    print("\nFlow regression tests failed — inspect the pytest output above.")

## Launch ComfyUI and Tunnelling (cloudflared/localtunnel)

Start ComfyUI headless then create a public URL with cloudflared or localtunnel.

In [None]:
#@title Start ComfyUI (headless) — Drive-backed
import subprocess, time, os
from pathlib import Path
COMFY_DIR = Path(DRIVE_ROOT)/'ComfyUI'
VENV = COMFY_DIR/'venv'
PY = str(VENV/'bin'/'python')
proc = None
COMFYUI_PUBLIC_URL = None

if Path(PY).exists() and INSTALL_COMFYUI:
  # Ensure we run ComfyUI from the Drive copy so files and models persist
  print('Starting ComfyUI from Drive...')
  proc = subprocess.Popen([PY, str(COMFY_DIR/'main.py'), '--headless', '--port', '8188'], cwd=str(COMFY_DIR), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
  # don't block — the tunnel cell will parse the public URL; give process a moment to warm up
  time.sleep(2)
  if proc.poll() is None:
    print('ComfyUI started, PID:', proc.pid)
  else:
    print('ComfyUI process exited early — check logs')
else:
  print('ComfyUI not installed or SKIPPED; ensure INSTALL_COMFYUI=True and rerun')

# Export so subsequent tunnel cell can set/use it
os.environ['COMFYUI_PUBLIC_URL'] = ''

In [None]:
#@title Start cloudflared and parse URL — prefer Cloudflare public address
import subprocess, time, re, os
from pathlib import Path
CLOUDFLARED = '/usr/local/bin/cloudflared'
COMFYUI_PUBLIC_URL = os.environ.get('COMFYUI_PUBLIC_URL','')

if START_CLOUDFLARED:
  if not os.path.exists(CLOUDFLARED):
    print('Installing cloudflared...')
    subprocess.run(['wget','-q','-O','/usr/local/bin/cloudflared','https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64'], check=True)
    subprocess.run(['chmod','+x','/usr/local/bin/cloudflared'], check=True)
  print('Launching cloudflared tunnel to http://localhost:8188 — parsing public URL from stdout')
  p = subprocess.Popen([CLOUDFLARED, 'tunnel', '--url', 'http://localhost:8188'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
  url = None
  start = time.time()
  while time.time() - start < 30:
    line = p.stdout.readline()
    if not line:
      time.sleep(0.1)
      continue
    print(line.strip())
    # Handle multiple possible formats
    m = re.search(r'(https://[a-z0-9.-]+\\.trycloudflare\\.com)', line)
    if m:
      url = m.group(1)
      break
    # older formats may contain 'https://' directly
    m2 = re.search(r'https://[a-z0-9.-]+', line)
    if m2 and '.trycloudflare' in m2.group(0):
      url = m2.group(0)
      break
  if url:
    print('Found Cloudflare public URL:', url)
    COMFYUI_PUBLIC_URL = url
    os.environ['COMFYUI_PUBLIC_URL'] = url
  else:
    print('Could not parse a public URL from cloudflared output after timeout')
else:
  print('Cloudflared start skipped')

# Print the effective public URL for the user and downstream cells
print('COMFYUI_PUBLIC_URL =', os.environ.get('COMFYUI_PUBLIC_URL',''))

In [None]:
#@title Start localtunnel (if preferred)
import subprocess, shutil, time, re, os
if START_LOCALTUNNEL:
  if not shutil.which('node'):
    !curl -fsSL https://deb.nodesource.com/setup_18.x | sudo -E bash -
    !sudo apt-get install -y nodejs
  if not shutil.which('lt'):
    !npm install -g localtunnel
  proc = subprocess.Popen(['lt', '--port', '8188'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
  url = None
  start = time.time()
  while time.time() - start < 20:
    line = proc.stdout.readline()
    if not line:
      time.sleep(0.2)
      continue
    print(line.strip())
    m = re.search(r'https?://[\w\.-]+\.loca\.lt', line)
    if m:
      url = m.group(0)
      break
  if url:
    print('localtunnel URL:', url)
  else:
    print('Could not parse localtunnel URL')
else:
  print('Localtunnel start skipped')

## Demos: T2I / I2I (programmatic + fallback)

These cells try the programmatic ComfyUI API when `USE_COMFYUI_API` is True. If the API endpoints differ, I can update the endpoints according to your ComfyUI instance.

In [None]:
#@title Simple T2I demo (placeholder or API — uses shared endpoint helper)
import ipywidgets as widgets
from IPython.display import display, clear_output, Image
from pathlib import Path
import requests, base64, os

# Use shared endpoint vars populated by the Endpoint helper cell
COMFYUI_API_BASE = globals().get('COMFYUI_API_BASE', 'http://127.0.0.1:8188')
COMFYUI_API_KEY = globals().get('COMFYUI_API_KEY', os.environ.get('COMFYUI_API_KEY', ''))
HEADERS = globals().get('HEADERS', {})

# Auto-enable API mode if an API key is present
if COMFYUI_API_KEY:
    globals()['USE_COMFYUI_API'] = True

model = widgets.Text(value='', description='Model path (optional)')
prompt = widgets.Textarea(value='A fantasy landscape, cinematic lighting', description='Prompt')
run = widgets.Button(description='Generate')
out = widgets.Output()


# Reminder: run the Endpoint helper and Cell 1 health-check before using this demo
print('Reminder: run the Endpoint & Health-check helper and the ComfyUI health-check (Cell 1) before running demos')


def call_comfy_api(prompt_text):
  # Lightweight placeholder for calling a hypothetical ComfyUI API endpoint
  try:
    url = COMFYUI_API_BASE + '/api/generate'
    print('Calling', url)
    r = requests.post(url, json={'prompt': prompt_text}, headers=HEADERS or None, timeout=20)
    if r.status_code == 200:
      # assume API returns base64 PNG in 'image' field for this demo
      data = r.json()
      if 'image' in data:
        imgdata = base64.b64decode(data['image'])
        p = Path(ARTIFACTS_DIR)/'t2i_from_api.png'
        p.write_bytes(imgdata)
        return str(p)
      else:
        print('API returned JSON without image field')
        return None
    else:
      print('API returned', r.status_code, r.text[:200])
      return None
  except Exception as e:
    print('API call failed:', e)
    return None


def on_run(b):
  with out:
    clear_output()
    print('Using API base:', COMFYUI_API_BASE)
    print('Using headers:', HEADERS)
    if globals().get('USE_COMFYUI_API', False):
      path = call_comfy_api(prompt.value)
      if path:
        display(Image(path))
        return
    # fallback placeholder
    print('Falling back to placeholder image (local)')
    from PIL import Image as PILImage, ImageDraw
    Path(ARTIFACTS_DIR).mkdir(parents=True, exist_ok=True)
    imgpath = Path(ARTIFACTS_DIR)/'t2i_placeholder.png'
    img = PILImage.new('RGB', (512,512), (60,100,150))
    d = ImageDraw.Draw(img)
    d.text((10,10), prompt.value[:200], fill=(255,255,0))
    img.save(imgpath)
    display(Image(str(imgpath)))
    print('Saved to', imgpath)

run.on_click(on_run)
display(model, prompt, run, out)

print('CURRENT_INSTANCE =', COMFYUI_API_BASE)
print('COMFYUI_API_KEY set?', bool(COMFYUI_API_KEY))

## Wan2.1 T2V / I2V scaffold (GGUF models)

This prepares a job file for Wan2 runners and can launch a `wan2_runner.py` if present in Drive root.

In [None]:
#@title Wan2.1 T2V / I2V scaffold
from pathlib import Path
import json, subprocess, os

# Use shared endpoint values if performing remote calls
COMFYUI_API_BASE = globals().get('COMFYUI_API_BASE', 'http://127.0.0.1:8188')
COMFYUI_API_KEY = globals().get('COMFYUI_API_KEY', os.environ.get('COMFYUI_API_KEY',''))
HEADERS = globals().get('HEADERS', {})

# Auto-enable API mode if an API key is present
if COMFYUI_API_KEY:
    globals()['USE_COMFYUI_API'] = True

# Reminder: run the Endpoint helper and Cell 1 health-check before using this demo
print('Reminder: run the Endpoint & Health-check helper and the ComfyUI health-check (Cell 1) before running demos')

prompt = 'A cinematic pan across a neon city at dusk'
job = {'prompt': prompt, 'frames': 32, 'steps_per_frame': 10, 'model': 'wan2-14b.gguf'}
Path(ARTIFACTS_DIR).mkdir(parents=True, exist_ok=True)
jobfile = Path(ARTIFACTS_DIR)/'wan2_job.json'
jobfile.write_text(json.dumps(job, indent=2))
print('Job file written to', jobfile)
runner = Path(DRIVE_ROOT)/'wan2_runner.py'
if runner.exists():
  print('Found wan2_runner.py — launching in background')
  p = subprocess.Popen(['python', str(runner), '--job', str(jobfile)])
  print('Runner PID:', p.pid)
else:
  print('No wan2_runner.py found. Provide one or adapt the example Wan2 notebooks.')

# If you'd like to POST the job to a remote runner endpoint, example (guarded):
# if COMFYUI_API_BASE:
#   import requests
#   r = requests.post(COMFYUI_API_BASE + '/api/wan2/jobs', json=job, headers=HEADERS)
#   print('Posted job, response', r.status_code, r.text[:200])


## Kohya-SS training scaffold (sd-scripts)

Clones the `sd-scripts` repo and prepares an example training command. Running training inside Colab can exceed session limits; use a longer-running VM when possible.

In [None]:
#@title Kohya-SS (sd-scripts) clone + example command
from pathlib import Path
import subprocess
KOYHA_DIR = Path(DRIVE_ROOT)/'kohya_ss'
if not KOYHA_DIR.exists():
  !git clone https://github.com/kohya-ss/sd-scripts.git "$KOYHA_DIR"
else:
  print('kohya_ss already cloned')
Path(DRIVE_ROOT+'/datasets/lora_dataset').mkdir(parents=True, exist_ok=True)
print('Place images into', DRIVE_ROOT+'/datasets/lora_dataset')
print('Example training command:')
print(f"python train_network.py --output_dir={LORAS_DIR} --train_data_dir={DRIVE_ROOT}/datasets/lora_dataset --resolution=512,512 --network_train_unet_only --max_train_steps=1000")

---
Notes:

- Many Civitai pages for NSFW models require login. If an API download returns 'login required', use the web UI or your account to download the files and then upload them into Drive.
- If you want, I can wire real ComfyUI API endpoints here — provide the exact endpoints and any auth headers, and I'll update the demo cells to call them programmatically.
- Tell me if you'd like model-specific example workflows for each WAN model (T2V chains, I2V setups, LORA generator flows).

## API key and security wiring

This section shows how to provide a COMFYUI_API_KEY environment variable and uses it in programmatic calls so the exposed Cloudflare URL is protected with a simple bearer token.

In [None]:
#@title Set COMFYUI_API_KEY (optional) — run this cell to set in-memory for the session
import os
# Option 1: set directly here (quick, not persistent)
# os.environ['COMFYUI_API_KEY'] = 'your_secret_here'

# Option 2: set via Colab UI or a secure Vault; the notebook reads os.environ['COMFYUI_API_KEY'] when making API calls
print('COMFYUI_API_KEY set?', bool(os.environ.get('COMFYUI_API_KEY')))
print('If you want, set os.environ[\'COMFYUI_API_KEY\']=\'...\' in a cell before running API calls')

In [None]:
#@title Copy Cloudflare public URL to clipboard
import os
from IPython.display import HTML, display
url = os.environ.get('COMFYUI_PUBLIC_URL', '')
if url:
  display(HTML(f"<input type='text' value='{url}' id='cfurl' style='width:400px' /><button onclick=\"navigator.clipboard.writeText(document.getElementById('cfurl').value)\">Copy</button>"))
else:
  print('No COMFYUI_PUBLIC_URL set. Run the cloudflared cell first.')

In [None]:
#@title Wan2 background job runner scaffold (Drive-persistent)
from pathlib import Path
import time, json, subprocess, os
JOBS_DIR = Path(DRIVE_ROOT)/'wan2_jobs'
JOBS_DIR.mkdir(parents=True, exist_ok=True)
print('Watching', JOBS_DIR)

# Example runner loop (do NOT run continuously in Colab unless you want a background process)
# This reads any job JSON files and prints the job; you can replace 'print' with a real runner call.
for jobfile in JOBS_DIR.glob('*.json'):
  try:
    job = json.loads(jobfile.read_text())
    print('Found job:', jobfile)
    # Example: invoke a runner script if present
    runner = Path(DRIVE_ROOT)/'wan2_runner.py'
    if runner.exists():
      print('Launching wan2_runner for', jobfile)
      subprocess.Popen(['python', str(runner), '--job', str(jobfile)])
    else:
      print('Runner not found — job is persisted in Drive for later execution')
  except Exception as e:
    print('Failed to read job', jobfile, e)

print('Scan complete — add job JSON files into', JOBS_DIR, 'to schedule runs')

In [None]:
#@title ComfyUI graph-save helper (save flow to Drive)
from pathlib import Path
import json

def save_graph(graph_json, name='flow_saved.json'):
  p = Path(DRIVE_ROOT)/'flows'
  p.mkdir(parents=True, exist_ok=True)
  dest = p/name
  dest.write_text(json.dumps(graph_json, indent=2))
  print('Saved graph to', dest)

# Example usage:
# save_graph({'nodes':[], 'connections':[]}, name='example_flow.json')

In [None]:
#@title Example ComfyUI workflow examples (saved flows)
from pathlib import Path

# Two tiny example flow JSONs you can expand in the ComfyUI flow editor
example_t2i = {
  'nodes': [
    {'id': 'txt', 'type': 'TextInput', 'args': {'text': 'A fantasy landscape, soft volumetric light'}},
    {'id': 'sampler', 'type': 'Sampler', 'args': {'name': 'ddim', 'steps': 20}},
    {'id': 'out', 'type': 'ImageSaver', 'args': {'path': str(Path(ARTIFACTS_DIR)/'t2i_example.png')}}
  ],
  'connections': []
}

example_multi_lora = {
  'nodes': [
    {'id': 'seed', 'type': 'SeedInput', 'args': {'seed': 42}},
    {'id': 'apply_lora', 'type': 'ApplyLora', 'args': {'paths': ['loraA.safetensors','loraB.safetensors'], 'weights': [0.6,0.4]}},
    {'id': 'out', 'type': 'ImageSaver', 'args': {'path': str(Path(ARTIFACTS_DIR)/'multi_lora_example.png')}}
  ],
  'connections': []
}

# Save examples using the helper defined earlier
try:
  save_graph(example_t2i, name='workflow_t2i_example.json')
  save_graph(example_multi_lora, name='workflow_multi_lora_example.json')
  print('Saved example flows to', Path(DRIVE_ROOT)/'flows')
except Exception as e:
  print('Failed to save example flows (is the graph helper present?):', e)


In [None]:
#@title Advanced flow: Photoreal T2I (tuned for model 1064836)
# This flow is a richer starting point: prompt input -> conditioning -> sampler -> hi-res fix -> save
from pathlib import Path
flow_photoreal = {
  'nodes': [
    {'id': 'txt', 'type': 'TextInput', 'args': {'text': 'High-quality portrait, soft window lighting, 85mm lens'}},
    {'id': 'cond', 'type': 'Conditioning', 'args': {}},
    {'id': 'sampler', 'type': 'Sampler', 'args': {'name': 'euler_a', 'steps': 28}},
    {'id': 'hires', 'type': 'HiResFix', 'args': {'upscale': 1.5, 'denoise': 0.35, 'method': '4x_NMKD-Superscale-SP_178000_G'}},
    {'id': 'apply_lora', 'type': 'ApplyLora', 'args': {'path': str(Path(LORAS_DIR)/'amateur_photography.safetensors'), 'weight': 0.8}},
    {'id': 'saver', 'type': 'ImageSaver', 'args': {'path': str(Path(ARTIFACTS_DIR)/'photoreal_example.png')}}
  ],
  'connections': [
    ('txt','cond'), ('cond','sampler'), ('sampler','hires'), ('hires','apply_lora'), ('apply_lora','saver')
  ]
}
# Save flow
try:
  save_graph(flow_photoreal, name='advanced_photoreal_1064836.json')
  print('Saved advanced photoreal flow to', Path(DRIVE_ROOT)/'flows')
except Exception as e:
  print('Could not save flow (is save_graph defined?):', e)


#@title Advanced flow: Wan2.1 video pipeline (uses image frames + LoRA)
flow_wan2 = {
  'nodes': [
    {'id': 'seed_img', 'type': 'ImageInput', 'args': {'path': str(Path(ARTIFACTS_DIR)/'photoreal_example.png')}},
    {'id': 'frame_gen', 'type': 'FrameGenerator', 'args': {'frames': 48, 'warp_strength': 0.6}},
    {'id': 'apply_wan_lora', 'type': 'ApplyLora', 'args': {'paths': [str(Path(LORAS_DIR)/'wan_pose_lora.safetensors')], 'weights':[1.0]}},
    {'id': 'video_encoder', 'type': 'Wan2VideoSaver', 'args': {'path': str(Path(ARTIFACTS_DIR)/'wan2_output.mp4'), 'fps': 24}}
  ],
  'connections': [('seed_img','frame_gen'), ('frame_gen','apply_wan_lora'), ('apply_wan_lora','video_encoder')]
}
try:
  save_graph(flow_wan2, name='advanced_wan2_from_image.json')
  print('Saved Wan2 pipeline flow to', Path(DRIVE_ROOT)/'flows')
except Exception as e:
  print('Could not save Wan2 flow:', e)

print('Advanced flows added. Open them in ComfyUI flow editor and adapt nodes to your installed custom nodes (Manager may add nodes with different names).')

## Advanced & Custom Workflows (model-tuned)

This section contains richer ComfyUI flows tuned as starting points for the models you requested. These flows are templates — open them in ComfyUI's Flow Editor and tweak sampler, steps, seed and LoRA weights to match your runtime and model specifics.

Below are two saved example flows: an image-focused T2I flow tuned for photographic realism, and a Wan2.1-ready video workflow that uses image frames plus LoRA application for pose/placement.

Each flow is saved into `DRIVE_ROOT/flows` (see the example save cell earlier). After you start ComfyUI, load these JSON files in the Flow Editor and wire any custom nodes present in your setup.

In [None]:
#@title Extract workflows from showcase images (attempt)
# This cell tries to download showcased images from Civitai model pages and
# inspect their PNG tEXt chunks and surrounding HTML for embedded ComfyUI workflows.
# It saves any JSON it finds to DRIVE_ROOT/flows/extracted_{model_id}_{i}.json

import os, re, requests
from pathlib import Path
from bs4 import BeautifulSoup
from PIL import Image
import base64, io, json

DRIVE_ROOT = globals().get('DRIVE_ROOT', '/content/drive/MyDrive/ComfyUI')
OUT_DIR = Path(DRIVE_ROOT)/'flows'/'extracted_images'
OUT_DIR.mkdir(parents=True, exist_ok=True)

model_pages = [
  'https://civitai.com/models/1064836?modelVersionId=993999',
  'https://civitai.com/models/652699?modelVersionId=993999'
]

session = requests.Session()
# If CIVITAI_API_TOKEN is present, include it as a query token when downloading files, but for page scraping a logged-in session may be required
CIVITAI_TOKEN = os.environ.get('CIVITAI_API_TOKEN','')
headers = {'User-Agent':'ComfyUI-Playground/1.0'}

extracted = []

for url in model_pages:
  print('\nProcessing', url)
  try:
    r = session.get(url, headers=headers, timeout=15)
    if r.status_code != 200:
      print('Failed to fetch page (status', r.status_code, ') — content may be gated; skipping')
      continue
  except Exception as e:
    print('Request failed:', e)
    continue

  soup = BeautifulSoup(r.text, 'html.parser')
  imgs = soup.find_all('img')
  print('Found', len(imgs), 'images on page (will attempt to inspect common showcase images)')
  count = 0
  for img in imgs:
    src = img.get('data-src') or img.get('src') or ''
    if not src:
      continue
    # skip tiny icons and badges
    if any(x in src for x in ['/avatar', '/badge', 'base-badge', 'width=32']):
      continue
    # normalize URL
    if src.startswith('//'):
      src = 'https:' + src
    if src.startswith('/'):
      src = 'https://civitai.com' + src

    # Download image
    try:
      print('Downloading', src)
      ir = session.get(src, headers=headers, timeout=20)
      if ir.status_code != 200:
        print('Failed to download image', ir.status_code)
        continue
    except Exception as e:
      print('Image download failed:', e)
      continue

    # Save image temporarily
    count += 1
    fn = OUT_DIR/f"img_{Path(src).stem}_{count}.png"
    try:
      fn.write_bytes(ir.content)
    except Exception:
      # try jpg
      fn = fn.with_suffix('.jpg')
      fn.write_bytes(ir.content)

    # Attempt to read PNG tEXt chunks via PIL info
    try:
      im = Image.open(fn)
      info = im.info
      keys = list(info.keys())
      if keys:
        print('Image info keys:', keys)
      else:
        print('No textual PNG info keys found')

      # Search textual info for JSON or base64-encoded workflows
      for k,v in info.items():
        s = str(v)
        # heuristics: look for 'Comfy' or '{"nodes' or long base64
        if 'Comfy' in s or '{"nodes' in s or re.search(r'^[A-Za-z0-9+/=\n]{200,}$', s):
          print('Candidate workflow text in PNG info key', k)
          cand = s
          # if base64, try decode
          try:
            if re.match(r'^[A-Za-z0-9+/=\n]+$', cand.strip()):
              dec = base64.b64decode(cand)
              try:
                j = json.loads(dec.decode('utf-8'))
                outp = OUT_DIR/f'extracted_{Path(url).stem}_{count}.json'
                outp.write_text(json.dumps(j, indent=2))
                extracted.append(str(outp))
                print('Extracted JSON saved to', outp)
              except Exception:
                # not JSON after decode
                pass
          except Exception:
            pass

    except Exception as e:
      print('Failed to inspect image via PIL:', e)

    # Fallback: search surrounding HTML for long base64 or JSON blobs
    parent_html = str(img.parent)
    m = re.search(r'([A-Za-z0-9+/=\n]{200,})', parent_html)
    if m:
      cand = m.group(1)
      try:
        dec = base64.b64decode(cand)
        j = json.loads(dec.decode('utf-8'))
        outp = OUT_DIR/f'extracted_{Path(url).stem}_{count}_html.json'
        outp.write_text(json.dumps(j, indent=2))
        extracted.append(str(outp))
        print('Extracted JSON from surrounding HTML to', outp)
      except Exception:
        pass

  if count == 0:
    print('No candidate showcase images downloaded for this page — it may be gated or use JS rendering.')

print('\nExtraction complete — found', len(extracted), 'candidate workflows (saved to DRIVE_ROOT/flows/extracted_images)')
for p in extracted:
  print('-', p)

print('\nReminder: NSFW or gated pages may require a logged-in session; set CIVITAI_API_TOKEN or download images manually then run the local extractor on those images.')

In [None]:
#@title Manifest resolver (follow redirects, capture filenames) — run after setting CIVITAI_API_TOKEN
"""
This cell reads the manifest saved at DRIVE_ROOT/manifests/civitai_model_manifests.json,
performs a HEAD/GET to the API download URL (with CIVITAI token), follows redirects,
and extracts the final filename from Content-Disposition if present.
It updates the manifest with 'resolved_filename' and 'resolved_url' fields for each model.

Note: run this only after mounting Drive and setting CIVITAI_API_TOKEN via the secure input cell.
"""
import os, requests, json
from pathlib import Path

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
MANIFEST_FILE = Path(DRIVE_ROOT)/'manifests'/'civitai_model_manifests.json'
if not MANIFEST_FILE.exists():
    print('Manifest file not found at', MANIFEST_FILE)
else:
    with open(MANIFEST_FILE,'r') as f:
        manifest = json.load(f)

    token = os.environ.get('CIVITAI_API_TOKEN','')
    if not token:
        print('CIVITAI_API_TOKEN not set. Use the secure input cell to set it for this session.')
    else:
        s = requests.Session()
        s.headers.update({'User-Agent':'ComfyUI-Playground/1.0'})
        updated = False
        for mid, entry in manifest.items():
            url = entry.get('download_url')
            if not url:
                print('No download_url for', mid)
                continue
            # substitute token placeholder if present
            url = url.replace('$CIVITAI_API_TOKEN', token)
            print('\nResolving model', mid, '...')
            try:
                # Try HEAD first
                r = s.head(url, allow_redirects=True, timeout=20)
                if r.status_code >= 400:
                    # try GET with stream to inspect headers
                    r = s.get(url, stream=True, allow_redirects=True, timeout=20)
                    r.close()
                # final URL after redirects
                final_url = r.url
                cd = r.headers.get('Content-Disposition','')
                filename = None
                if cd:
                    m = re.search(r'filename\*=UTF-8''(.+)|filename="?([^";]+)"?', cd)
                    if m:
                        filename = m.group(1) or m.group(2)
                # fallback: try last path segment
                if not filename:
                    filename = final_url.split('/')[-1].split('?')[0]
                entry['resolved_url'] = final_url
                entry['resolved_filename'] = filename
                print('Resolved:', filename)
                updated = True
            except Exception as e:
                print('Failed to resolve', mid, e)

        if updated:
            MANIFEST_FILE.write_text(json.dumps(manifest, indent=2))
            print('\nManifest updated with resolved filenames at', MANIFEST_FILE)
        else:
            print('\nNo updates applied to manifest.')


## Manifest resolver & validation

This mini-section adds a safe resolver that—when you run it in Colab after mounting Drive and setting `CIVITAI_API_TOKEN`—will:

- Read `DRIVE_ROOT/manifests/civitai_model_manifests.json`.
- For each manifest entry, perform a HEAD or GET with `allow_redirects=True` to follow the Civitai redirect and inspect the `Content-Disposition` header (the API typically redirects to the actual file URL and provides a filename).
- Save the resolved filename and a final download URL into the manifest file as `resolved_filename` and `resolved_url` for each model.

Run the resolver only in a session where you have set `CIVITAI_API_TOKEN` using the secure token cell. The resolver will not download the whole files — it follows redirects and inspects headers only (uses `stream=True` and closes the connection quickly).

Security note: do not paste your token into this notebook's plaintext cells; use the secure input cell instead.

In [None]:
#@title Per-model manifest parsing & download templates (guarded)
"""
Create a manifest of the provided Civitai model download URLs with probable file formats,
recommended target directories, and guarded download commands that require
CIVITAI_API_TOKEN to be set (use the secure token cell earlier).

Run this cell after mounting Drive and setting CIVITAI_API_TOKEN (via the secure cell).
"""
import re, os, json
from pathlib import Path

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
MANIFEST_DIR = Path(DRIVE_ROOT)/'manifests'
MANIFEST_DIR.mkdir(parents=True, exist_ok=True)

# List of URLs the user provided (de-duplicated)
urls = [
  'https://civitai.com/api/download/models/443821',
  'https://civitai.com/api/download/models/1624818',
  'https://civitai.com/api/download/models/1741501',
  'https://civitai.com/api/download/models/2048863',
  'https://civitai.com/api/download/models/2004155',
  'https://civitai.com/api/download/models/2031069',
  'https://civitai.com/api/download/models/1476909',
  'https://civitai.com/api/download/models/1855263',
  'https://civitai.com/api/download/models/1874153',
  'https://civitai.com/api/download/models/1331682',
  'https://civitai.com/api/download/models/15003',
  'https://civitai.com/api/download/models/6424',
  'https://civitai.com/api/download/models/1307155'
]

# Heuristics / prior knowledge mapping (from earlier notes and typical usage)
# Use conservative guesses; if you know exact file names/formats update the manifest manually.
format_suggestions = {
  443821: {'formats':['SafeTensor'], 'target_dir':'{MODELS_DIR}/cyberrealistic'},
  1624818: {'formats':['SafeTensor'], 'target_dir':'{LORAS_DIR}'},
  1741501: {'formats':['SafeTensor'], 'target_dir':'{MODELS_DIR}/wan'},
  2048863: {'formats':['GGUF','SafeTensor'], 'target_dir':'{GGUF_DIR}'},
  2004155: {'formats':['SafeTensor','LoRA'], 'target_dir':'{LORAS_DIR}'},
  2031069: {'formats':['GGUF','SafeTensor'], 'target_dir':'{GGUF_DIR}'},
  1476909: {'formats':['SafeTensor'], 'target_dir':'{LORAS_DIR}'},
  1855263: {'formats':['GGUF'], 'target_dir':'{GGUF_DIR}'},
  1874153: {'formats':['GGUF'], 'target_dir':'{GGUF_DIR}'},
  1331682: {'formats':['GGUF'], 'target_dir':'{GGUF_DIR}'},
  15003: {'formats':['SafeTensor'], 'target_dir':'{MODELS_DIR}/cyberrealistic'},
  6424: {'formats':['SafeTensor'], 'target_dir':'{MODELS_DIR}/chilloutmix'},
  1307155: {'formats':['GGUF','SafeTensor'], 'target_dir':'{GGUF_DIR}'},
}

manifest = {}
for u in urls:
  m = re.search(r'/models/(\d+)', u)
  if not m:
    continue
  mid = int(m.group(1))
  entry = {}
  entry['model_id'] = mid
  entry['download_url'] = u + '?CIVITAI_TOKEN=$CIVITAI_API_TOKEN'
  sugg = format_suggestions.get(mid, None)
  if sugg:
    entry['suggested_formats'] = sugg['formats']
    entry['recommended_dir_template'] = sugg['target_dir']
  else:
    entry['suggested_formats'] = ['SafeTensor','GGUF','Checkpoint','LoRA']
    entry['recommended_dir_template'] = '{MODELS_DIR}'
  # guarded wget template (comment before running) — you must set CIVITAI_API_TOKEN first
  entry['download_template'] = f"# !wget \"{entry['download_url']}\" --content-disposition -P \"{entry['recommended_dir_template']}\""
  entry['notes'] = 'Set CIVITAI_API_TOKEN via the secure input cell and review the model page for NSFW gating or special instructions. If API returns a redirect or login-required, download via browser and upload to Drive.'
  manifest[str(mid)] = entry

# Save manifest JSON to Drive
outp = MANIFEST_DIR/'civitai_model_manifests.json'
outp.write_text(json.dumps(manifest, indent=2))
print('Wrote manifest for', len(manifest), 'models to', outp)

# Print a concise table for quick review
for k,v in manifest.items():
  print(f"- Model {k}: suggested formats={v['suggested_formats']}, dir={v['recommended_dir_template']}")
  print('  Download template (commented):')
  print('  ', v['download_template'])

print('\nReminder: uncomment and run the download_template commands only after setting CIVITAI_API_TOKEN with the secure token cell and verifying you have permission to download these models.')

In [None]:
#@title Collected and extracted workflows index
# Lists any workflow JSONs extracted by the "Extract workflows from showcase images" cell
from pathlib import Path
DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
EX_DIR = Path(DRIVE_ROOT)/'flows'/'extracted_images'
EX_DIR.mkdir(parents=True, exist_ok=True)
files = sorted([str(p) for p in EX_DIR.glob('*.json')])
print('Found', len(files), 'extracted workflow JSON files in', EX_DIR)
for f in files:
    print('-', f)

# If you have local images with embedded flows you want to inspect, upload them to Drive and re-run the extractor cell.


### Public URL view and stop-tunnel helper
Use the cell below to display the public Cloudflare URL and to stop the cloudflared process started in this session. Note: Stopping the process will close remote access to the ComfyUI GUI.

In [None]:
#@title Display public URL and stop cloudflared tunnel
import os, signal, psutil
from IPython.display import display, HTML

url = os.environ.get('COMFYUI_PUBLIC_URL','')
print('Public URL:', url)

def stop_cloudflared():
  # find cloudflared process and terminate it
  for p in psutil.process_iter(['pid','name','cmdline']):
    try:
      if 'cloudflared' in ' '.join(p.info.get('cmdline', [])):
        print('Killing cloudflared PID', p.info['pid'])
        p.kill()
    except Exception:
      continue

display(HTML("<button onclick=\"google.colab.kernel.invokeFunction('notebook.stop_cloudflared', [], {})\">Stop cloudflared</button>"))

# register a callable so the button can call stop_cloudflared in Colab environments
from google.colab import output
output.register_callback('notebook.stop_cloudflared', stop_cloudflared)
print('Stop button registered (Colab only)')

### Persistent runner example for remote VMs
This cell shows an example `nohup`/`tmux` pattern you can run on a persistent VM to keep a `wan2_runner.py` or similar runner alive independent of Colab.

In [None]:
#@title Persistent runner example (bash commands for remote VM)

# On a persistent VM (Ubuntu), save this script as run_wan2_runner.sh and run with nohup or inside tmux/screen
# Example contents for run_wan2_runner.sh:
script = r"""
#!/usr/bin/env bash
cd $DRIVE_ROOT
# activate your python env or conda
source ~/venv/bin/activate
# Run the wan2 runner in background and log stdout/stderr
nohup python wan2_runner.py --watch_dir "$DRIVE_ROOT/wan2_jobs" > wan2_runner.log 2>&1 &
"""
print(script)
print('Save the script on your VM, make it executable and run it inside tmux or via systemd for production.')

### Model download test (safe sample)
This cell demonstrates a guarded download: it runs only if `CIVITAI_API_TOKEN` is set and will download a small, non-NSFW sample model or asset. Update the `MODEL_ID` if you want a specific allowed file.

In [None]:
#@title Model download test (requires CIVITAI_API_TOKEN)
import os, subprocess
from pathlib import Path
MODEL_ID = 6424  # example: ChilloutMix (public blended), replace if you have another public allowed id
if not os.environ.get('CIVITAI_API_TOKEN'):
  print('CIVITAI_API_TOKEN not set — skipping download test')
else:
  L = Path(GGUF_DIR)
  L.mkdir(parents=True, exist_ok=True)
  url = f"https://civitai.com/api/download/models/{MODEL_ID}?type=Model&format=SafeTensor&size=full&CIVITAI_TOKEN={os.environ.get('CIVITAI_API_TOKEN')}"
  print('Downloading to', L)
  subprocess.run(['wget', '-q', '-O', str(L/f'model_{MODEL_ID}.safetensors'), url], check=True)
  print('Download attempted — check', L)

### Model-specific workflow templates (WAN models)
Below are templates you can copy into new cells and adapt per model: T2V chains, I2V setups, and LORA generator flows. They are intentionally high-level so you can tweak scheduler/denoising parameters per model.

In [None]:
#@title T2V chain template (WAN models) — adapt and run per model
from pathlib import Path
import json, os

# Use shared endpoint/header vars for optional remote submits
COMFYUI_API_BASE = globals().get('COMFYUI_API_BASE', 'http://127.0.0.1:8188')
COMFYUI_API_KEY = globals().get('COMFYUI_API_KEY', os.environ.get('COMFYUI_API_KEY',''))
HEADERS = globals().get('HEADERS', {})
if COMFYUI_API_KEY:
    globals()['USE_COMFYUI_API'] = True

# Reminder: run the Endpoint helper and Cell 1 health-check before using this demo
print('Reminder: run the Endpoint & Health-check helper and the ComfyUI health-check (Cell 1) before running demos')

job = {
  'type': 't2v',
  'model': 'wan2-14b.gguf',
  'prompt': 'A dramatic fly-through of a neon city at dusk',
  'frames': 48,
  'steps_per_frame': 12,
  'sampler': 'ddim',
}
Path(ARTIFACTS_DIR).mkdir(parents=True, exist_ok=True)
jobfile = Path(ARTIFACTS_DIR)/'t2v_wan_job.json'
jobfile.write_text(json.dumps(job, indent=2))
print('T2V job written to', jobfile)
print('Drop into', Path(DRIVE_ROOT)/'wan2_jobs' , 'or run your wan2 runner on your VM')

In [None]:
#@title I2V template (use an initial image and a model) — adapt per model
from pathlib import Path
import json, os

# Use shared endpoint values for any remote calls
COMFYUI_API_BASE = globals().get('COMFYUI_API_BASE', 'http://127.0.0.1:8188')
COMFYUI_API_KEY = globals().get('COMFYUI_API_KEY', os.environ.get('COMFYUI_API_KEY',''))
HEADERS = globals().get('HEADERS', {})
if COMFYUI_API_KEY:
    globals()['USE_COMFYUI_API'] = True

# Reminder: run the Endpoint helper and Cell 1 health-check before using this demo
print('Reminder: run the Endpoint & Health-check helper and the ComfyUI health-check (Cell 1) before running demos')

job = {
  'type': 'i2v',
  'model': 'wan2-14b.gguf',
  'init_image': str(Path(ARTIFACTS_DIR)/'seed_input.png'),
  'frames': 32,
  'strength': 0.8,
  'steps_per_frame': 10
}
jobfile = Path(ARTIFACTS_DIR)/'i2v_wan_job.json'
jobfile.write_text(json.dumps(job, indent=2))
print('I2V job written to', jobfile)

# Example guarded remote submit (uncomment to use):
# if COMFYUI_API_BASE:
#   import requests
#   r = requests.post(COMFYUI_API_BASE + '/api/wan2/jobs', json=job, headers=HEADERS)
#   print('Posted job, response', r.status_code)

In [None]:
#@title LORA generator flow template
from pathlib import Path
import json, os

# Shared endpoint/header vars for optional remote ops
COMFYUI_API_BASE = globals().get('COMFYUI_API_BASE', 'http://127.0.0.1:8188')
COMFYUI_API_KEY = globals().get('COMFYUI_API_KEY', os.environ.get('COMFYUI_API_KEY',''))
HEADERS = globals().get('HEADERS', {})
if COMFYUI_API_KEY:
    globals()['USE_COMFYUI_API'] = True

# Reminder: run the Endpoint helper and Cell 1 health-check before using this demo
print('Reminder: run the Endpoint & Health-check helper and the ComfyUI health-check (Cell 1) before running demos')

job = {
  'type': 'lora_train_prepare',
  'base_model': 'cyberrealistic.safetensors',
  'loras_to_apply': ['loraA.safetensors', 'loraB.safetensors'],
  'weights': [0.7, 0.3],
  'output_name': 'combo_lora.safetensors'
}
jobfile = Path(LORAS_DIR)/'lora_prep.json'
jobfile.write_text(json.dumps(job, indent=2))
print('LORA prep job saved to', jobfile)
print('Use your kohya runner or LORA blend tool to apply these weights')

# Example guarded API call to remote LORA service (if available):
# if COMFYUI_API_BASE:
#   import requests
#   r = requests.post(COMFYUI_API_BASE + '/api/lora/apply', json=job, headers=HEADERS)
#   print('Response', r.status_code)

## GPU & Runtime Guidance

Recommended runtime choices and why:

- Preferred environment: a persistent VM (Ubuntu) with a matching NVIDIA driver and CUDA 12.x for stability and reusability. Colab is convenient for short tests but sessions are ephemeral.
- PyTorch wheel must match the runtime CUDA version. The install cell tries CUDA 12.1 -> 12.2 -> CPU wheels automatically. If this fails, pick the correct wheel manually from https://download.pytorch.org/whl/.
- Keep a wheel cache in Drive: `DRIVE_ROOT/ComfyUI/wheel_cache`. This avoids re-downloading large binary wheels between sessions.
- xFormers is optional. Try installing it for performance improvements; if it fails, ComfyUI still runs without it.
- Use the venv-in-Drive pattern (the notebook creates `DRIVE_ROOT/ComfyUI/venv`) so you can start ComfyUI with the venv python and not change the Jupyter kernel.

Quick tips:
- If you need persistent long-running jobs (training or long renders), run them on a VM and use the Drive-mounted folder or an object store for artifacts.
- When in doubt about CUDA/PyTorch compatibility, run the health-check cell after starting ComfyUI and inspect `torch.__version__` and `torch.version.cuda` inside the venv.

Where to look:
- Wheel cache: `DRIVE_ROOT/ComfyUI/wheel_cache`
- Venv python: `DRIVE_ROOT/ComfyUI/venv/bin/python`
- ComfyUI run command example (printed by the install cell):

```bash
/content/drive/MyDrive/ComfyUI/venv/bin/python /content/drive/MyDrive/ComfyUI/main.py --headless --port 8188
```

If you want, I can add an automated check that prints the venv's `torch.version.cuda` after install.

In [None]:
#@title Cache warmup — prefetch torch wheel into Drive wheel_cache (optional)
# This cell attempts to download the first available torch wheel into the DRIVE wheel cache
# without installing it into the venv. It helps warm the cache for repeated sessions.
import os, subprocess, sys
from pathlib import Path

DRIVE_ROOT = globals().get('DRIVE_ROOT', '/content/drive/MyDrive/ComfyUI')
WHEEL_CACHE = Path(DRIVE_ROOT)/'ComfyUI'/'wheel_cache'
WHEEL_CACHE.mkdir(parents=True, exist_ok=True)

candidates = [
    ('cu121', 'https://download.pytorch.org/whl/cu121'),
    ('cu122', 'https://download.pytorch.org/whl/cu122'),
    ('cpu', 'https://download.pytorch.org/whl/cpu'),
]

print('Prefetching: checking candidates to download a torch wheel into', WHEEL_CACHE)
for tag, index in candidates:
    try:
        print('Attempting to download torch wheel for', tag)
        # Use pip download to fetch wheels into cache
        subprocess.check_call([sys.executable, '-m', 'pip', 'download', 'torch', 'torchvision', 'torchaudio', '--index-url', index, '--dest', str(WHEEL_CACHE)])
        print('Downloaded wheels for', tag)
        break
    except subprocess.CalledProcessError:
        print('Failed to download wheels for', tag, '- trying next')

print('Cache warmup complete — check', WHEEL_CACHE)


In [None]:
#@title Venv pip-list verification (shows installed packages in Drive venv)
import subprocess, sys, os
from pathlib import Path

DRIVE_ROOT = globals().get('DRIVE_ROOT', '/content/drive/MyDrive/ComfyUI')
PY = Path(DRIVE_ROOT)/'ComfyUI'/'venv'/'bin'/'python'
if PY.exists():
    print('Venv python found at', PY)
    subprocess.run([str(PY), '-m', 'pip', 'list'])
else:
    print('Venv python not found — run the install cell first')


In [None]:
#@title Interactive download runner (single model) — guarded and resumable
"""
Two-step UI: pick a model from the manifest, preview the resolved filename and size,
then confirm to stream-download the file into the suggested directory. Requires
CIVITAI_API_TOKEN set by the secure input cell.
"""
import os, json, re, requests, math
from pathlib import Path
import ipywidgets as widgets
from IPython.display import display, clear_output

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
MANIFEST_PATH = Path(DRIVE_ROOT)/'manifests'/'civitai_model_manifests.json'

def load_manifest():
    if not MANIFEST_PATH.exists():
        return {}
    return json.loads(MANIFEST_PATH.read_text())

manifest = load_manifest()
if not manifest:
    print('No manifest found at', MANIFEST_PATH)
else:
    options = []
    for k,v in sorted(manifest.items(), key=lambda x:int(x[0])):
        label = f"{k}: {v.get('suggested_formats')}"
        if v.get('resolved_filename'):
            label += f" -> {v.get('resolved_filename')}"
        options.append((label,k))

    dropdown = widgets.Dropdown(options=options, description='Model:')
    preview_btn = widgets.Button(description='Preview')
    confirm_btn = widgets.Button(description='Confirm & Download', button_style='danger')
    out = widgets.Output()

    def expand_dir_template(tpl):
        # replace placeholders with notebook globals
        mapping = {
            '{MODELS_DIR}': globals().get('MODELS_DIR', str(Path(DRIVE_ROOT)/'models')),
            '{LORAS_DIR}': globals().get('LORAS_DIR', str(Path(DRIVE_ROOT)/'models'/'loras')),
            '{GGUF_DIR}': globals().get('GGUF_DIR', str(Path(DRIVE_ROOT)/'models'/'gguf')),
            '{ARTIFACTS_DIR}': globals().get('ARTIFACTS_DIR', str(Path(DRIVE_ROOT)/'artifacts')),
        }
        for k,v in mapping.items():
            tpl = tpl.replace(k, v)
        return tpl

    current_preview = {'mid':None, 'entry':None, 'final_url':None, 'filename':None, 'size':None}

    def on_preview(b):
        with out:
            clear_output()
            mid = dropdown.value
            entry = manifest.get(str(mid))
            if not entry:
                print('Manifest entry not found for', mid)
                return
            token = os.environ.get('CIVITAI_API_TOKEN','')
            if not token:
                print('CIVITAI_API_TOKEN not set. Run the secure token cell first.')
                return
            url = entry.get('download_url','').replace('$CIVITAI_API_TOKEN', token)
            print('Resolving URL (following redirects)...')
            s = requests.Session()
            s.headers.update({'User-Agent':'ComfyUI-Playground/1.0'})
            try:
                r = s.head(url, allow_redirects=True, timeout=20)
                if r.status_code >= 400:
                    r = s.get(url, stream=True, allow_redirects=True, timeout=20)
                    r.close()
                final_url = r.url
                cd = r.headers.get('Content-Disposition','')
                filename = None
                if cd:
                    m = re.search(r"filename\*=UTF-8''(.+)|filename=\"?([^\";]+)\"?", cd)
                    if m:
                        filename = m.group(1) or m.group(2)
                if not filename:
                    filename = final_url.split('/')[-1].split('?')[0]
                size = r.headers.get('Content-Length')
                if size:
                    size = int(size)
                else:
                    size = None
                print('Model', mid)
                print('Final URL:', final_url)
                print('Filename:', filename)
                if size:
                    mb = size/1024/1024
                    print(f'Content-Length: {size} bytes ({mb:.1f} MB)')
                    if mb > 2000:
                        print('\nWARNING: file is very large. Consider running on a VM with enough disk space.')
                else:
                    print('Content-Length header not present')
                # store preview info for confirm
                current_preview.update({'mid':mid, 'entry':entry, 'final_url':final_url, 'filename':filename, 'size':size})
            except Exception as e:
                print('Failed to preview URL:', e)

    def on_confirm(b):
        with out:
            clear_output()
            if not current_preview['mid']:
                print('No preview available — run Preview first')
                return
            entry = current_preview['entry']
            url = current_preview['final_url']
            filename = current_preview['filename']
            size = current_preview['size']
            # determine download dir
            tpl = entry.get('recommended_dir_template','{MODELS_DIR}')
            target_dir = Path(expand_dir_template(tpl))
            target_dir.mkdir(parents=True, exist_ok=True)
            target_path = target_dir/filename
            # Stream download
            s = requests.Session()
            s.headers.update({'User-Agent':'ComfyUI-Playground/1.0'})
            print('Starting streaming download to', str(target_path))
            try:
                r = s.get(url, stream=True, timeout=30)
                r.raise_for_status()
                total = r.headers.get('Content-Length')
                if total:
                    total = int(total)
                downloaded = 0
                with open(target_path, 'wb') as fh:
                    for chunk in r.iter_content(chunk_size=1024*1024):
                        if chunk:
                            fh.write(chunk)
                            downloaded += len(chunk)
                            if total:
                                pct = downloaded/total*100
                                print(f'\rDownloaded {downloaded}/{total} bytes ({pct:.1f}%)', end='')
                print('\nDownload complete')
                # update manifest
                entry['downloaded_path'] = str(target_path)
                manifest[str(current_preview['mid'])] = entry
                MANIFEST_PATH.write_text(json.dumps(manifest, indent=2))
                print('Manifest updated with downloaded_path')
            except Exception as e:
                print('Download failed:', e)

    preview_btn.on_click(on_preview)
    confirm_btn.on_click(on_confirm)

    display(dropdown, preview_btn, confirm_btn, out)

In [None]:
#@title Manifest post-processor: infer formats from resolved filenames
"""
Reads the manifest, and for entries with 'resolved_filename' attempts to infer format
(safetensor, gguf, checkpoint, lora) based on extension and updates manifest['inferred_format'].
"""
import os, json
from pathlib import Path

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
MANIFEST_PATH = Path(DRIVE_ROOT)/'manifests'/'civitai_model_manifests.json'

if not MANIFEST_PATH.exists():
    print('Manifest not found at', MANIFEST_PATH)
else:
    m = json.loads(MANIFEST_PATH.read_text())
    changed = False
    for mid, entry in m.items():
        fn = entry.get('resolved_filename') or entry.get('downloaded_path')
        if not fn:
            continue
        fn_lower = fn.lower()
        fmt = None
        if fn_lower.endswith('.safetensors'):
            fmt = 'SafeTensor'
        elif fn_lower.endswith('.gguf'):
            fmt = 'GGUF'
        elif fn_lower.endswith('.pth') or fn_lower.endswith('.pt') or fn_lower.endswith('.ckpt'):
            fmt = 'Checkpoint'
        elif fn_lower.endswith('.safetensors.pt'):
            fmt = 'SafeTensor'
        elif '.safetensors' in fn_lower:
            fmt = 'SafeTensor'
        elif fn_lower.endswith('.bin'):
            # could be HF bin; leave generic
            fmt = 'Binary'
        elif fn_lower.endswith('.lora') or 'lora' in fn_lower:
            fmt = 'LoRA'
        if fmt and entry.get('inferred_format') != fmt:
            entry['inferred_format'] = fmt
            m[mid] = entry
            changed = True
            print('Inferred', fmt, 'for model', mid, '->', fn)
    if changed:
        MANIFEST_PATH.write_text(json.dumps(m, indent=2))
        print('Manifest updated with inferred formats at', MANIFEST_PATH)
    else:
        print('No changes to manifest')

### Quick notes — interactive downloader

- Use the secure CIVITAI token cell before attempting downloads.
- Steps:
  1. Run the manifest resolver cell to populate `resolved_filename` for each model (uses token).
  2. Use the Interactive download runner to preview and confirm a single model download.
  3. The runner streams the file into Drive and updates the manifest with `downloaded_path`.
- If a model is gated beyond token access, download via browser and upload to Drive; then update the manifest manually.

In [None]:
#@title Colab-friendly interactive downloader (uses google.colab widgets when available)
"""
This cell provides a Colab-optimized UI for the interactive download runner.
It prefers Google Colab's native widget layer when available; otherwise it falls
back to ipywidgets. Behavior is similar to the earlier interactive runner:
- Select a model from manifest
- Preview final filename/size
- Confirm to download (stream into Drive)

Run after mounting Drive and setting CIVITAI_API_TOKEN via the secure input cell.
"""
import os, json, re, requests
from pathlib import Path
import hashlib

# UI imports (prefer google.colab widgets if available)
use_colab_widgets = False
try:
    import google.colab.widgets as colab_widgets
    from google.colab import output as colab_output
    use_colab_widgets = True
except Exception:
    try:
        import ipywidgets as widgets
        from IPython.display import display, clear_output
        use_colab_widgets = False
    except Exception:
        raise RuntimeError('No UI widget library available (need google.colab or ipywidgets)')

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
MANIFEST_PATH = Path(DRIVE_ROOT)/'manifests'/'civitai_model_manifests.json'

if not MANIFEST_PATH.exists():
    print('Manifest not found at', MANIFEST_PATH)
else:
    manifest = json.loads(MANIFEST_PATH.read_text())
    keys = sorted(manifest.keys(), key=lambda x:int(x))

    def expand_dir_template(tpl):
        mapping = {
            '{MODELS_DIR}': globals().get('MODELS_DIR', str(Path(DRIVE_ROOT)/'models')),
            '{LORAS_DIR}': globals().get('LORAS_DIR', str(Path(DRIVE_ROOT)/'models'/'loras')),
            '{GGUF_DIR}': globals().get('GGUF_DIR', str(Path(DRIVE_ROOT)/'models'/'gguf')),
            '{ARTIFACTS_DIR}': globals().get('ARTIFACTS_DIR', str(Path(DRIVE_ROOT)/'artifacts')),
        }
        for k,v in mapping.items():
            tpl = tpl.replace(k, v)
        return tpl

    # Common preview & download functions
    def resolve_preview(mid):
        entry = manifest.get(str(mid))
        token = os.environ.get('CIVITAI_API_TOKEN','')
        if not token:
            return {'error':'CIVITAI_API_TOKEN not set'}
        url = entry.get('download_url','').replace('$CIVITAI_API_TOKEN', token)
        s = requests.Session()
        s.headers.update({'User-Agent':'ComfyUI-Playground/1.0'})
        try:
            r = s.head(url, allow_redirects=True, timeout=20)
            if r.status_code >= 400:
                r = s.get(url, stream=True, allow_redirects=True, timeout=20)
                r.close()
            final_url = r.url
            cd = r.headers.get('Content-Disposition','')
            filename = None
            if cd:
                m = re.search(r"filename\*=UTF-8''(.+)|filename=\"?([^\";]+)\"?", cd)
                if m:
                    filename = m.group(1) or m.group(2)
            if not filename:
                filename = final_url.split('/')[-1].split('?')[0]
            size = r.headers.get('Content-Length')
            if size:
                size = int(size)
            return {'final_url': final_url, 'filename': filename, 'size': size}
        except Exception as e:
            return {'error': str(e)}

    def stream_download(final_url, target_path):
        s = requests.Session()
        s.headers.update({'User-Agent':'ComfyUI-Playground/1.0'})
        r = s.get(final_url, stream=True, timeout=30)
        r.raise_for_status()
        downloaded = 0
        total = r.headers.get('Content-Length')
        if total:
            total = int(total)
        with open(target_path, 'wb') as fh:
            for chunk in r.iter_content(chunk_size=1024*1024):
                if chunk:
                    fh.write(chunk)
                    downloaded += len(chunk)
        return target_path

    # Build UI depending on environment
    if use_colab_widgets:
        # very small colab-friendly UI using simple text inputs and buttons
        options = [(f"{k}  -> {manifest[k].get('resolved_filename','?')}", k) for k in keys]
        dd = colab_widgets.Dropdown(options=options, description='Model')
        preview_btn = colab_widgets.Button(description='Preview')
        download_btn = colab_widgets.Button(description='Confirm & Download')
        status_area = colab_widgets.Textarea(value='Ready', description='Status', layout={'width':'100%'})

        def on_preview(widget, event=None):
            mid = dd.value
            status_area.value = 'Resolving...'
            res = resolve_preview(mid)
            if 'error' in res:
                status_area.value = 'Error: ' + res['error']
            else:
                size = res['size']
                stext = f"Filename: {res['filename']}\nURL: {res['final_url']}\n"
                if size:
                    stext += f"Size: {size} bytes ({size/1024/1024:.1f} MB)\n"
                status_area.value = stext
                # stash preview info in widget attrs
                dd._preview = res

        def on_download(widget, event=None):
            if not hasattr(dd, '_preview'):
                status_area.value = 'Run Preview first'
                return
            res = dd._preview
            filename = res['filename']
            final_url = res['final_url']
            entry = manifest.get(str(dd.value))
            tpl = entry.get('recommended_dir_template','{MODELS_DIR}')
            target_dir = Path(expand_dir_template(tpl))
            target_dir.mkdir(parents=True, exist_ok=True)
            target_path = target_dir/filename
            status_area.value = f'Starting download to {target_path}\n'
            try:
                stream_download(final_url, str(target_path))
                status_area.value += 'Download complete\nComputing SHA256...'
                # compute sha256
                h = hashlib.sha256()
                with open(target_path, 'rb') as fh:
                    for chunk in iter(lambda: fh.read(1024*1024), b''):
                        h.update(chunk)
                sha = h.hexdigest()
                entry['downloaded_path'] = str(target_path)
                entry['sha256'] = sha
                entry['file_size_bytes'] = target_path.stat().st_size
                manifest[str(dd.value)] = entry
                MANIFEST_PATH.write_text(json.dumps(manifest, indent=2))
                status_area.value += f"\nSHA256: {sha}\nSaved and manifest updated."
            except Exception as e:
                status_area.value += '\nDownload failed: ' + str(e)

        preview_btn.on_click(lambda w: on_preview(w))
        download_btn.on_click(lambda w: on_download(w))
        display(dd, preview_btn, download_btn, status_area)

    else:
        # fallback to ipywidgets UI (previous behavior)
        import ipywidgets as widgets
        from IPython.display import display, clear_output
        options = []
        for k in keys:
            label = f"{k}: {manifest[k].get('suggested_formats')}"
            if manifest[k].get('resolved_filename'):
                label += f" -> {manifest[k].get('resolved_filename')}"
            options.append((label,k))
        dropdown = widgets.Dropdown(options=options, description='Model:')
        preview_btn = widgets.Button(description='Preview')
        confirm_btn = widgets.Button(description='Confirm & Download', button_style='danger')
        out = widgets.Output()

        current_preview = {'mid':None, 'entry':None, 'final_url':None, 'filename':None, 'size':None}

        def on_preview(b):
            with out:
                clear_output()
                mid = dropdown.value
                entry = manifest.get(str(mid))
                if not entry:
                    print('Manifest entry not found for', mid)
                    return
                token = os.environ.get('CIVITAI_API_TOKEN','')
                if not token:
                    print('CIVITAI_API_TOKEN not set. Run the secure token cell first.')
                    return
                url = entry.get('download_url','').replace('$CIVITAI_API_TOKEN', token)
                print('Resolving URL (following redirects)...')
                s = requests.Session()
                s.headers.update({'User-Agent':'ComfyUI-Playground/1.0'})
                try:
                    r = s.head(url, allow_redirects=True, timeout=20)
                    if r.status_code >= 400:
                        r = s.get(url, stream=True, allow_redirects=True, timeout=20)
                        r.close()
                    final_url = r.url
                    cd = r.headers.get('Content-Disposition','')
                    filename = None
                    if cd:
                        m = re.search(r"filename\*=UTF-8''(.+)|filename=\"?([^\";]+)\"?", cd)
                        if m:
                            filename = m.group(1) or m.group(2)
                    if not filename:
                        filename = final_url.split('/')[-1].split('?')[0]
                    size = r.headers.get('Content-Length')
                    if size:
                        size = int(size)
                    print('Model', mid)
                    print('Final URL:', final_url)
                    print('Filename:', filename)
                    if size:
                        mb = size/1024/1024
                        print(f'Content-Length: {size} bytes ({mb:.1f} MB)')
                    else:
                        print('Content-Length header not present')
                    current_preview.update({'mid':mid, 'entry':entry, 'final_url':final_url, 'filename':filename, 'size':size})
                except Exception as e:
                    print('Failed to preview URL:', e)

        def on_confirm(b):
            with out:
                clear_output()
                if not current_preview['mid']:
                    print('No preview available — run Preview first')
                    return
                entry = current_preview['entry']
                url = current_preview['final_url']
                filename = current_preview['filename']
                tpl = entry.get('recommended_dir_template','{MODELS_DIR}')
                target_dir = Path(expand_dir_template(tpl))
                target_dir.mkdir(parents=True, exist_ok=True)
                target_path = target_dir/filename
                print('Starting streaming download to', str(target_path))
                try:
                    r = requests.get(url, stream=True, timeout=30)
                    r.raise_for_status()
                    with open(target_path, 'wb') as fh:
                        for chunk in r.iter_content(chunk_size=1024*1024):
                            if chunk:
                                fh.write(chunk)
                    print('Download complete — computing SHA256...')
                    # compute sha256
                    h = hashlib.sha256()
                    with open(target_path, 'rb') as fh:
                        for chunk in iter(lambda: fh.read(1024*1024), b''):
                            h.update(chunk)
                    sha = h.hexdigest()
                    entry['downloaded_path'] = str(target_path)
                    entry['sha256'] = sha
                    entry['file_size_bytes'] = target_path.stat().st_size
                    manifest[str(current_preview['mid'])] = entry
                    MANIFEST_PATH.write_text(json.dumps(manifest, indent=2))
                    print('SHA256:', sha)
                    print('Manifest updated with downloaded_path and sha256')
                except Exception as e:
                    print('Download failed:', e)

        preview_btn.on_click(on_preview)
        confirm_btn.on_click(on_confirm)
        display(dropdown, preview_btn, confirm_btn, out)

In [None]:
#@title Post-download integrity checker & download summary
"""
Scans the manifest for entries with `downloaded_path`, computes/validates SHA256 if present,
records any mismatches, and produces a simple download_summary.json with totals.
Run after you have performed downloads with the interactive downloader.
"""
import json, os, hashlib
from pathlib import Path

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
MANIFEST_PATH = Path(DRIVE_ROOT)/'manifests'/'civitai_model_manifests.json'
SUMMARY_PATH = Path(DRIVE_ROOT)/'manifests'/'download_summary.json'

if not MANIFEST_PATH.exists():
    print('Manifest not found at', MANIFEST_PATH)
else:
    manifest = json.loads(MANIFEST_PATH.read_text())
    summary = {
        'total_files': 0,
        'total_bytes': 0,
        'entries': {}
    }
    for mid,entry in manifest.items():
        dp = entry.get('downloaded_path')
        if not dp:
            continue
        p = Path(dp)
        if not p.exists():
            entry['download_status'] = 'missing'
            manifest[mid] = entry
            continue
        size = p.stat().st_size
        sha_expected = entry.get('sha256')
        # compute sha
        h = hashlib.sha256()
        with open(p, 'rb') as fh:
            for chunk in iter(lambda: fh.read(1024*1024), b''):
                h.update(chunk)
        sha_actual = h.hexdigest()
        ok = True
        if sha_expected and sha_expected != sha_actual:
            entry.setdefault('integrity',{})
            entry['integrity']['expected_sha256'] = sha_expected
            entry['integrity']['actual_sha256'] = sha_actual
            entry['download_status'] = 'sha_mismatch'
            ok = False
        else:
            entry.setdefault('integrity',{})
            entry['integrity']['actual_sha256'] = sha_actual
            entry['download_status'] = 'ok'
        entry['file_size_bytes'] = size
        manifest[mid] = entry

        summary['total_files'] += 1
        summary['total_bytes'] += size
        summary['entries'][mid] = {
            'path': str(p),
            'size_bytes': size,
            'sha256': sha_actual,
            'status': entry.get('download_status')
        }

    MANIFEST_PATH.write_text(json.dumps(manifest, indent=2))
    SUMMARY_PATH.write_text(json.dumps(summary, indent=2))
    print('Integrity check complete. Files:', summary['total_files'], 'Total bytes:', summary['total_bytes'])
    print('Summary written to', SUMMARY_PATH)

In [None]:
#@title Download summary UI — show totals and top N largest files
"""
Displays the download_summary.json created by the integrity checker in a friendly format.
Shows total files, total size, and the top-N largest downloaded files.
"""
import json, math
from pathlib import Path

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
SUMMARY_PATH = Path(DRIVE_ROOT)/'manifests'/'download_summary.json'

if not SUMMARY_PATH.exists():
    print('No download_summary.json found. Run the Post-download integrity checker cell first.')
else:
    data = json.loads(SUMMARY_PATH.read_text())
    total_files = data.get('total_files',0)
    total_bytes = data.get('total_bytes',0)
    def human(n):
        for u in ['B','KB','MB','GB','TB']:
            if n < 1024:
                return f"{n:.2f}{u}"
            n /= 1024
        return f"{n:.2f}PB"
    print('Downloaded files:', total_files)
    print('Total size:', human(total_bytes))
    entries = data.get('entries',{})
    items = []
    for mid,info in entries.items():
        size = info.get('size_bytes',0)
        items.append((size, mid, info.get('path')))
    items.sort(reverse=True)
    topn = 10
    print('\nTop files:')
    for i,(size, mid, path) in enumerate(items[:topn], start=1):
        print(f"{i}. {mid} — {human(size)} — {path}")
    if len(items) > topn:
        print(f"... and {len(items)-topn} more files")


# ComfyUI Colab Playground — Overview

This notebook mounts Google Drive, installs ComfyUI into Drive, and provides a set of modular cells to:

- Install and update ComfyUI (Drive-persistent venv and wheel cache).
- Start ComfyUI and optionally expose it via cloudflared/localtunnel.
- Resolve and safely download models (Civitai/HF) with a preview step.
- Extract embedded ComfyUI flows from showcase images.
- Provide example flows (T2I/I2I/T2V/I2V, multi-LoRA and kohya-ss scaffolds).

New additions in this revision:

- Batch manifest resolver that can be run in your Colab session (uses in-memory CIVITAI_API_TOKEN).
- A unified Colab-friendly downloader with "download queue" mode, optional server-hash pre-check, progress reporting, and SHA256 validation post-download.
- Post-download integrity checker and a human-readable download summary.

Run order recommendation:
1. Mount Drive cell (if not already mounted) and confirm `DRIVE_ROOT`.
2. Run the secure token cell to set `CIVITAI_API_TOKEN` in this runtime.
3. Run the Batch manifest resolver cell to populate `resolved_filename` and other metadata.
4. Use the Unified Downloader cell to queue and download models (Preview, then Confirm).  
5. Run the Post-download integrity checker, then view the Download summary UI.

If you want a different UI layout or behavior (pure HTML panel, or Telegram/Slack webhook notifications for completed downloads), tell me and I can add it.  


In [2]:
#@title Batch manifest resolver — resolve final filenames and headers for manifest entries
"""
This cell iterates the manifest entries and attempts to resolve the final download URL, filename
(Content-Disposition), content-length, and server-provided checksum (if present in headers or JSON) without
downloading the full file. It will update the manifest with resolved_filename, resolved_url, content_length,
and server_hash (if found). Run this in Colab after running the secure token input cell.
"""
import os, json, re, time
from pathlib import Path
import requests

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
MANIFEST_PATH = Path(DRIVE_ROOT)/'manifests'/'civitai_model_manifests.json'

if not MANIFEST_PATH.exists():
    print('Manifest not found at', MANIFEST_PATH)
else:
    manifest = json.loads(MANIFEST_PATH.read_text())
    token = os.environ.get('CIVITAI_API_TOKEN','')
    if not token:
        print('CIVITAI_API_TOKEN not set in environment. Run the secure token cell first.')
    else:
        s = requests.Session()
        s.headers.update({'User-Agent':'ComfyUI-Playground/1.0'})
        errors = []
        for mid, entry in manifest.items():
            url_tpl = entry.get('download_url','')
            url = url_tpl.replace('$CIVITAI_API_TOKEN', token)
            try:
                r = s.head(url, allow_redirects=True, timeout=20)
                # fallback to small GET if head fails
                if r.status_code >= 400:
                    r = s.get(url, stream=True, allow_redirects=True, timeout=20)
                    r.close()
                final_url = r.url
                cd = r.headers.get('Content-Disposition','')
                filename = None
                if cd:
                    m = re.search(r"filename\*=UTF-8''(.+)|filename=\"?([^\";]+)\"?", cd)
                    if m:
                        filename = m.group(1) or m.group(2)
                if not filename:
                    filename = final_url.split('/')[-1].split('?')[0]
                size = r.headers.get('Content-Length')
                if size:
                    size = int(size)
                # server-provided checksum heuristics
                server_hash = None
                # Check common header fields
                for h in ['ETag','Content-MD5','X-Checksum-Sha256','X-Checksum','x-amz-meta-sha256']:
                    if r.headers.get(h):
                        server_hash = r.headers.get(h)
                        break
                # If Content-Type is application/json and we did a small GET earlier, try to parse JSON
                # to look for 'sha256' or similar in body
                entry.update({
                    'resolved_url': final_url,
                    'resolved_filename': filename,
                    'content_length': size,
                    'server_hash': server_hash,
                    'last_resolved': int(time.time())
                })
                manifest[mid] = entry
                print(f"Resolved {mid} -> {filename} ({size} bytes)")
            except Exception as e:
                errors.append((mid,str(e)))
                print(f"Failed to resolve {mid}: {e}")
        MANIFEST_PATH.write_text(json.dumps(manifest, indent=2))
        print('Resolver complete. Wrote manifest. Errors:', len(errors))
        if errors:
            for mid,err in errors:
                print(mid, err)


Manifest not found at \content\drive\MyDrive\ComfyUI\manifests\civitai_model_manifests.json


In [None]:
#@title Unified Colab downloader — queue mode, preview, server-hash pre-check, progress
"""
Features:
- Queue multiple manifest entries for sequential download with progress.
- Preview mode that resolves final filenames and sizes.
- Optional server-hash pre-check: if the resolver found a server_hash (ETag or similar), compare
  that short hash with a best-effort mapping before downloading.
- Uses tqdm for progress display if available, otherwise prints simple progress messages.

Usage:
- Ensure `CIVITAI_API_TOKEN` is set in this runtime (secure token cell).
- Run the resolver cell first to fill resolved_filename and server_hash.
- Populate `queue_ids` with manifest keys to download, or use the UI dropdown/selection below.
"""
import os, json, re, time, hashlib
from pathlib import Path
import requests

try:
    from tqdm.notebook import tqdm
    have_tqdm = True
except Exception:
    have_tqdm = False

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
MANIFEST_PATH = Path(DRIVE_ROOT)/'manifests'/'civitai_model_manifests.json'

if not MANIFEST_PATH.exists():
    raise FileNotFoundError(f'Manifest not found at {MANIFEST_PATH}')

manifest = json.loads(MANIFEST_PATH.read_text())

# Simple helper: choose ids to download. Replace or populate programmatically.
# Example: queue_ids = ['1', '2']
queue_ids = []

# Optional: prefer server hash pre-check (will skip download if mismatch in header-derived simple hash)
ENABLE_SERVER_HASH_CHECK = True

s = requests.Session()
s.headers.update({'User-Agent':'ComfyUI-Playground/1.0'})


def compute_sha256(path):
    h = hashlib.sha256()
    with open(path, 'rb') as fh:
        for chunk in iter(lambda: fh.read(1024*1024), b''):
            h.update(chunk)
    return h.hexdigest()


def download_with_progress(url, target_path):
    r = s.get(url, stream=True, timeout=30)
    r.raise_for_status()
    total = r.headers.get('Content-Length')
    if total:
        total = int(total)
    tmp = str(target_path) + '.part'
    with open(tmp, 'wb') as fh:
        if have_tqdm and total:
            with tqdm(total=total, unit='B', unit_scale=True, desc=str(target_path.name)) as pbar:
                for chunk in r.iter_content(chunk_size=1024*1024):
                    if chunk:
                        fh.write(chunk)
                        pbar.update(len(chunk))
        else:
            dl = 0
            for chunk in r.iter_content(chunk_size=1024*1024):
                if chunk:
                    fh.write(chunk)
                    dl += len(chunk)
    Path(tmp).rename(target_path)
    return target_path


def server_hash_matches(header_hash, path):
    # best-effort: compare ETag (may be quoted or not) to file's MD5/sha256 prefix
    if not header_hash:
        return None
    hh = header_hash.strip('"')
    # If header looks like md5 hex length 32, we can compute md5
    if re.fullmatch(r'[0-9a-fA-F]{32}', hh):
        import hashlib
        m = hashlib.md5()
        with open(path, 'rb') as fh:
            for chunk in iter(lambda: fh.read(1024*1024), b''):
                m.update(chunk)
        return m.hexdigest() == hh
    # If header looks like a sha256 hex (64 chars)
    if re.fullmatch(r'[0-9a-fA-F]{64}', hh):
        return compute_sha256(path) == hh
    # unknown format — return None meaning "can't decide"
    return None


def run_queue(ids):
    token = os.environ.get('CIVITAI_API_TOKEN','')
    if not token:
        raise RuntimeError('CIVITAI_API_TOKEN not set. Run the secure token cell.')
    for mid in ids:
        entry = manifest.get(str(mid))
        if not entry:
            print('Manifest entry missing for', mid)
            continue
        url_tpl = entry.get('download_url','')
        url = url_tpl.replace('$CIVITAI_API_TOKEN', token)
        # use resolved_url if present
        final_url = entry.get('resolved_url') or url
        filename = entry.get('resolved_filename') or final_url.split('/')[-1].split('?')[0]
        suggested_tpl = entry.get('recommended_dir_template','{MODELS_DIR}')
        mapping = {
            '{MODELS_DIR}': globals().get('MODELS_DIR', str(Path(DRIVE_ROOT)/'models')),
            '{LORAS_DIR}': globals().get('LORAS_DIR', str(Path(DRIVE_ROOT)/'models'/'loras')),
            '{GGUF_DIR}': globals().get('GGUF_DIR', str(Path(DRIVE_ROOT)/'models'/'gguf')),
            '{ARTIFACTS_DIR}': globals().get('ARTIFACTS_DIR', str(Path(DRIVE_ROOT)/'artifacts')),
        }
        for k,v in mapping.items():
            suggested_tpl = suggested_tpl.replace(k,v)
        target_dir = Path(suggested_tpl)
        target_dir.mkdir(parents=True, exist_ok=True)
        target_path = target_dir/filename
        # preview info
        print(f'Downloading {mid} -> {filename}')
        # if server hash exists and check enabled, we'll fetch headers and compare quickly after download
        header_hash = entry.get('server_hash')
        try:
            download_with_progress(final_url, target_path)
            sha = compute_sha256(target_path)
            entry['downloaded_path'] = str(target_path)
            entry['sha256'] = sha
            entry['file_size_bytes'] = target_path.stat().st_size
            # optional: post-download server-hash compare
            if ENABLE_SERVER_HASH_CHECK and header_hash:
                match = None
                try:
                    match = server_hash_matches(header_hash, target_path)
                except Exception as e:
                    print('Server-hash compare failed:', e)
                if match is False:
                    entry.setdefault('integrity',{})
                    entry['integrity']['server_hash'] = header_hash
                    entry['integrity']['server_hash_match'] = False
                    entry['download_status'] = 'server_hash_mismatch'
                    print('Warning: server hash mismatch for', mid)
                else:
                    entry.setdefault('integrity',{})
                    entry['integrity']['server_hash'] = header_hash
                    entry['integrity']['server_hash_match'] = True if match is True else 'unknown'
            else:
                entry['download_status'] = 'ok'
            manifest[str(mid)] = entry
            MANIFEST_PATH.write_text(json.dumps(manifest, indent=2))
            print('Downloaded and recorded:', target_path)
        except Exception as e:
            print('Failed to download', mid, e)

# If you want a small UI-based queue selector (Colab 'widgets' or ipywidgets), you can implement
# here; for now, run programmatically by setting queue_ids and calling run_queue(queue_ids)

print('Manifest keys available:', list(manifest.keys())[:20])
print('Set queue_ids = ["1","2"] and call run_queue(queue_ids)')


In [4]:
#@title Manifest locator helper — find possible manifests under DRIVE_ROOT
"""
If the Batch manifest resolver failed because the manifest file cannot be found,
this helper scans the `DRIVE_ROOT` area for probable manifest JSONs and prints
their paths. Run this after mounting Drive.
"""
from pathlib import Path
import json, os

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
print('DRIVE_ROOT =', DRIVE_ROOT)
root = Path(DRIVE_ROOT)
if not root.exists():
    print('Drive root does not exist. Did you mount Drive? Run the Drive mount cell first.')
else:
    candidates = []
    manifests_dir = root / 'manifests'
    if manifests_dir.exists():
        for p in manifests_dir.glob('*.json'):
            candidates.append(p)
    # wide search for likely filenames
    if not candidates:
        for p in root.rglob('*.json'):
            name = p.name.lower()
            if 'civitai' in name or 'manifest' in name or 'models' in name:
                candidates.append(p)
    # show top results
    if not candidates:
        print('No candidate manifest files found under', DRIVE_ROOT)
    else:
        print('Found candidate manifest files:')
        for i,p in enumerate(sorted(set(candidates)), start=1):
            print(f'{i}. {p} (size: {p.stat().st_size} bytes)')
        print('\nIf one of these is the manifest you want to use, you can:')
        print(' - Copy it to', str(manifests_dir/'civitai_model_manifests.json'))
        print(' - Or update the MANIFEST_PATH variable in the resolver/downloader cells to point to the file')
        print('\nExample:')
        print("MANIFEST_PATH = Path('" + str(sorted(set(candidates))[0]) + "')")


DRIVE_ROOT = /content/drive/MyDrive/ComfyUI
Drive root does not exist. Did you mount Drive? Run the Drive mount cell first.


## README (quick-run instructions)

This cell is a short-run README making the key steps explicit. Use it as a quick checklist.

1. Mount Google Drive (run the Drive mount cell) and ensure `DRIVE_ROOT` is set to your ComfyUI directory, e.g. `/content/drive/MyDrive/ComfyUI`.
2. Run the secure token input cell to put `CIVITAI_API_TOKEN` (and optionally AWS/GCS creds) in environment variables.
3. Run the Manifest locator helper if your manifest isn't at the default path.
4. Run the Batch manifest resolver to populate `resolved_filename` and `server_hash` fields.
5. Use the HTML "Downloader Panel" cell below to select entries, Start the queue, monitor progress, and Stop/Cancel if needed.
6. After downloads, run the Post-download integrity checker cell and then the Download summary UI cell.

Notes:
- The HTML panel sends selections to Python callbacks in this notebook. Progress and logs appear in the Python output area. Live updates inside the HTML panel are limited by Colab cross-call constraints, so the Python output is the authoritative log.
- Background scheduler: there's a cell below that can run the resolver periodically (for automated refresh). Be careful: Colab sessions can time out; only run this if your session will remain active.



In [None]:
#@title Downloader Panel (HTML multi-select + Start/Stop callbacks)
"""
Renders an HTML panel with checkboxes for manifest entries and Start/Stop buttons.
On Start it invokes the registered python callback to start the background download worker with selected ids.
"""
import json, os
from pathlib import Path
from google.colab import output

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
MANIFEST_PATH = Path(DRIVE_ROOT)/'manifests'/'civitai_model_manifests.json'

if not MANIFEST_PATH.exists():
    print('Manifest not found at', MANIFEST_PATH)
    print('Run the Manifest locator helper to locate or copy the manifest to the expected path.')
else:
    manifest = json.loads(MANIFEST_PATH.read_text())
    # build a simple table
    rows = []
    for k, entry in sorted(manifest.items(), key=lambda t: int(t[0])):
        name = entry.get('resolved_filename') or entry.get('title') or entry.get('name') or f'model-{k}'
        size = entry.get('content_length') or entry.get('file_size_bytes') or ''
        rows.append({'id':k,'label':str(name),'size':size})

    html_rows = '\n'.join([f"<div><label><input type=checkbox value='{r['id']}' /> {r['label']} <small>{r['size']}</small></label></div>" for r in rows])

    panel = f"""
    <div style='font-family: Roboto, Arial, sans-serif; padding: 8px; border: 1px solid #ddd; border-radius: 6px; width: 100%'>
      <h3>Downloader Panel</h3>
      <div id='manifest-list' style='max-height:300px; overflow:auto; padding:6px'>{html_rows}</div>
      <div style='margin-top:8px'>
        <button id='start-btn' style='padding:6px 12px; background:#0b74de; color:white; border:none; border-radius:4px'>Start Queue</button>
        <button id='stop-btn' style='padding:6px 12px; margin-left:8px; background:#d9534f; color:white; border:none; border-radius:4px'>Stop Queue</button>
        <label style='margin-left:12px'><input type='checkbox' id='retry-check' /> Enable retries</label>
        <label style='margin-left:12px'>Retries: <input id='retry-count' type='number' value='2' style='width:60px'></label>
      </div>
      <div id='panel-status' style='margin-top:10px; font-size:0.9em; color:#333'></div>
    </div>
    <script>
    const getChecked = () => Array.from(document.querySelectorAll('#manifest-list input:checked')).map(x=>x.value);
    document.getElementById('start-btn').onclick = () => {
        const ids = getChecked();
        const retry = document.getElementById('retry-check').checked;
        const rcount = parseInt(document.getElementById('retry-count').value || '2');
        document.getElementById('panel-status').innerText = 'Starting queue: ' + ids.join(', ');
        google.colab.kernel.invokeFunction('notebook.start_queue', [ids, retry, rcount], {});
    };
    document.getElementById('stop-btn').onclick = () => {
        document.getElementById('panel-status').innerText = 'Stopping queue...';
        google.colab.kernel.invokeFunction('notebook.stop_queue', [], {});
    };
    </script>
    """
    output.eval_js('console.clear()')
    display_html = output._repr_html_(panel)
    display(display_html)

# Register callbacks — the actual implementation is provided in the Background worker cell below

def _no_op(*args, **kwargs):
    print('Callback not registered yet')

# register placeholder so kernel.invokeFunction calls won't fail if background worker not loaded
output.register_callback('notebook.start_queue', _no_op)
output.register_callback('notebook.stop_queue', _no_op)
print('Downloader Panel rendered. Use the checkboxes, then Start Queue. Logs will appear in the Python output area.')

In [None]:
#@title Background queue worker and control functions (start/stop, retries, progress)
"""
This cell provides the background worker implementation and registers the callbacks
used by the HTML panel. The worker uses a stop event so the queue can be stopped.
It supports per-item retries and records results in the manifest.
"""
import threading, time, json, os, traceback
from pathlib import Path
import requests
import hashlib
from queue import Queue, Empty
from google.colab import output

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
MANIFEST_PATH = Path(DRIVE_ROOT)/'manifests'/'civitai_model_manifests.json'

# worker state
worker_thread = None
worker_stop_event = threading.Event()
worker_lock = threading.Lock()

# simple download helper reused from previous cells
def compute_sha256(path):
    h = hashlib.sha256()
    with open(path, 'rb') as fh:
        for chunk in iter(lambda: fh.read(1024*1024), b''):
            h.update(chunk)
    return h.hexdigest()


# improved header hash normalization & heuristics
def normalize_header_hash(hdr):
    if not hdr:
        return None
    h = str(hdr).strip().strip('"')
    # if it's an etag with dashes (multipart) — can't reliably compare
    if '-' in h:
        return {'value':h, 'type':'etag-multipart'}
    if len(h) == 32 and all(c in '0123456789abcdefABCDEF' for c in h):
        return {'value':h.lower(), 'type':'md5'}
    if len(h) == 64 and all(c in '0123456789abcdefABCDEF' for c in h):
        return {'value':h.lower(), 'type':'sha256'}
    return {'value':h, 'type':'unknown'}


# robust per-item downloader with retries
def download_item(mid, entry, retry_count=2, stop_event=None):
    token = os.environ.get('CIVITAI_API_TOKEN','')
    s = requests.Session()
    s.headers.update({'User-Agent':'ComfyUI-Playground/1.0'})
    url_tpl = entry.get('download_url')
    if not url_tpl:
        print(f'No download_url for {mid}')
        return False
    url = url_tpl.replace('$CIVITAI_API_TOKEN', token)
    final_url = entry.get('resolved_url') or url
    filename = entry.get('resolved_filename') or final_url.split('/')[-1].split('?')[0]
    suggested_tpl = entry.get('recommended_dir_template','{MODELS_DIR}')
    mapping = {
        '{MODELS_DIR}': globals().get('MODELS_DIR', str(Path(DRIVE_ROOT)/'models')),
        '{LORAS_DIR}': globals().get('LORAS_DIR', str(Path(DRIVE_ROOT)/'models'/'loras')),
        '{GGUF_DIR}': globals().get('GGUF_DIR', str(Path(DRIVE_ROOT)/'models'/'gguf')),
        '{ARTIFACTS_DIR}': globals().get('ARTIFACTS_DIR', str(Path(DRIVE_ROOT)/'artifacts')),
    }
    for k,v in mapping.items():
        suggested_tpl = suggested_tpl.replace(k,v)
    target_dir = Path(suggested_tpl)
    target_dir.mkdir(parents=True, exist_ok=True)
    target_path = target_dir/filename

    attempts = 0
    while attempts <= retry_count:
        if stop_event and stop_event.is_set():
            print('Stop requested — aborting', mid)
            return False
        try:
            r = s.get(final_url, stream=True, timeout=60)
            r.raise_for_status()
            tmp = str(target_path) + '.part'
            with open(tmp, 'wb') as fh:
                for chunk in r.iter_content(chunk_size=1024*1024):
                    if chunk:
                        fh.write(chunk)
            Path(tmp).rename(target_path)
            sha = compute_sha256(target_path)
            entry['downloaded_path'] = str(target_path)
            entry['sha256'] = sha
            entry['file_size_bytes'] = target_path.stat().st_size
            entry['download_status'] = 'ok'
            # server hash check heuristics
            hdr = entry.get('server_hash')
            sh = normalize_header_hash(hdr)
            if sh:
                if sh['type'] == 'sha256':
                    entry.setdefault('integrity',{})
                    entry['integrity']['server_hash'] = sh['value']
                    entry['integrity']['server_hash_match'] = (sha == sh['value'])
                elif sh['type'] == 'md5':
                    import hashlib
                    m = hashlib.md5()
                    with open(target_path, 'rb') as fh:
                        for chunk in iter(lambda: fh.read(1024*1024), b''):
                            m.update(chunk)
                    entry.setdefault('integrity',{})
                    entry['integrity']['server_hash'] = sh['value']
                    entry['integrity']['server_hash_match'] = (m.hexdigest() == sh['value'])
                else:
                    entry.setdefault('integrity',{})
                    entry['integrity']['server_hash'] = sh['value']
                    entry['integrity']['server_hash_match'] = 'unknown'
            # persist manifest incrementally
            with open(MANIFEST_PATH, 'r', encoding='utf-8') as fh:
                m = json.load(fh)
            m[str(mid)] = entry
            with open(MANIFEST_PATH, 'w', encoding='utf-8') as fh:
                json.dump(m, fh, indent=2)
            print('Downloaded', mid, '->', target_path)
            return True
        except Exception as e:
            attempts += 1
            print(f'Attempt {attempts} failed for {mid}:', e)
            if attempts > retry_count:
                entry['download_status'] = 'failed'
                with open(MANIFEST_PATH, 'r', encoding='utf-8') as fh:
                    m = json.load(fh)
                m[str(mid)] = entry
                with open(MANIFEST_PATH, 'w', encoding='utf-8') as fh:
                    json.dump(m, fh, indent=2)
                return False
            time.sleep(2)
    return False

# worker runner

def worker_main(id_list, retry_count=2, stop_event=None):
    print('Worker started for ids:', id_list)
    for mid in id_list:
        if stop_event and stop_event.is_set():
            print('Stop event detected — exiting worker')
            break
        try:
            with open(MANIFEST_PATH, 'r', encoding='utf-8') as fh:
                m = json.load(fh)
            entry = m.get(str(mid))
            if not entry:
                print('No manifest entry for', mid)
                continue
            success = download_item(mid, entry, retry_count=retry_count, stop_event=stop_event)
            if not success:
                print('Failed to download', mid)
        except Exception as e:
            print('Worker error for', mid, e)
            traceback.print_exc()
    print('Worker finished')

# control functions registered as kernel callbacks

def start_queue_callback(ids, enable_retries=False, retry_count=2):
    global worker_thread, worker_stop_event
    with worker_lock:
        if worker_thread and worker_thread.is_alive():
            print('A worker is already running')
            return
        worker_stop_event = threading.Event()
        worker_thread = threading.Thread(target=worker_main, args=(ids, retry_count, worker_stop_event), daemon=True)
        worker_thread.start()
        print('Started worker thread with ids:', ids)


def stop_queue_callback():
    global worker_thread, worker_stop_event
    with worker_lock:
        if worker_thread and worker_thread.is_alive():
            worker_stop_event.set()
            worker_thread.join(timeout=5)
            print('Worker stopped')
        else:
            print('No active worker')

# register callbacks so the HTML panel can call them
output.register_callback('notebook.start_queue', start_queue_callback)
output.register_callback('notebook.stop_queue', stop_queue_callback)
print('Background worker callbacks registered: notebook.start_queue, notebook.stop_queue')

In [None]:
#@title Nightly manifest resolver (background scheduler)
"""
Starts a background scheduler that runs the Batch manifest resolver every `interval_seconds`.
Note: Colab sessions often time out — only use this if your session stays alive.
"""
import threading, time, json
from pathlib import Path
from google.colab import output

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
MANIFEST_PATH = Path(DRIVE_ROOT)/'manifests'/'civitai_model_manifests.json'

scheduler_thread = None
scheduler_stop_event = threading.Event()


def run_resolver_once():
    # import code from the resolver cell or re-implement minimal resolver here
    import requests, re, time
    if not MANIFEST_PATH.exists():
        print('Manifest missing, cannot run resolver')
        return
    m = json.loads(MANIFEST_PATH.read_text())
    token = os.environ.get('CIVITAI_API_TOKEN','')
    if not token:
        print('CIVITAI_API_TOKEN not set. Run secure token cell first.')
        return
    s = requests.Session()
    s.headers.update({'User-Agent':'ComfyUI-Playground/1.0'})
    for mid,entry in m.items():
        try:
            url = entry.get('download_url','').replace('$CIVITAI_API_TOKEN', token)
            r = s.head(url, allow_redirects=True, timeout=20)
            if r.status_code >= 400:
                r = s.get(url, stream=True, allow_redirects=True, timeout=20)
                r.close()
            final_url = r.url
            cd = r.headers.get('Content-Disposition','')
            filename = None
            if cd:
                mm = re.search(r"filename\*=UTF-8''(.+)|filename=\"?([^\";]+)\"?", cd)
                if mm:
                    filename = mm.group(1) or mm.group(2)
            if not filename:
                filename = final_url.split('/')[-1].split('?')[0]
            size = r.headers.get('Content-Length')
            if size:
                size = int(size)
            server_hash = None
            for h in ['ETag','Content-MD5','X-Checksum-Sha256','X-Checksum','x-amz-meta-sha256']:
                if r.headers.get(h):
                    server_hash = r.headers.get(h)
                    break
            entry.update({'resolved_url':final_url,'resolved_filename':filename,'content_length':size,'server_hash':server_hash,'last_resolved':int(time.time())})
            m[mid] = entry
        except Exception as e:
            print('Resolver failed for', mid, e)
    MANIFEST_PATH.write_text(json.dumps(m, indent=2))
    print('Resolver pass complete')


def scheduler_start(interval_seconds=24*3600):
    global scheduler_thread, scheduler_stop_event
    if scheduler_thread and scheduler_thread.is_alive():
        print('Scheduler already running')
        return
    scheduler_stop_event = threading.Event()
    def loop():
        while not scheduler_stop_event.is_set():
            run_resolver_once()
            # sleep with small increments to allow stop signal to be responsive
            remaining = interval_seconds
            while remaining > 0 and not scheduler_stop_event.is_set():
                t = min(10, remaining)
                time.sleep(t)
                remaining -= t
    scheduler_thread = threading.Thread(target=loop, daemon=True)
    scheduler_thread.start()
    print('Scheduler started with interval', interval_seconds)


def scheduler_stop():
    global scheduler_stop_event
    if scheduler_stop_event:
        scheduler_stop_event.set()
        print('Scheduler stop requested')
    else:
        print('Scheduler not running')

# register callbacks so user can start/stop from UI if desired
output.register_callback('notebook.start_scheduler', lambda interval: scheduler_start(interval))
output.register_callback('notebook.stop_scheduler', lambda : scheduler_stop())
print('Scheduler helpers registered (start_scheduler, stop_scheduler)')

In [None]:
#@title Artifact upload helper (S3 / GCS) — optional: upload completed artifacts to private bucket
"""
Upload a downloaded artifact to S3 or GCS. If credentials are present in environment, the helper will attempt
an upload and record the uploaded URL into the manifest under 'uploaded_url'.
Set AWS_* env vars for S3 or set GOOGLE_APPLICATION_CREDENTIALS for GCS.
"""
import os, json
from pathlib import Path

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
MANIFEST_PATH = Path(DRIVE_ROOT)/'manifests'/'civitai_model_manifests.json'


def upload_artifact(path, provider='s3', bucket=None, object_name=None):
    p = Path(path)
    if not p.exists():
        raise FileNotFoundError(path)
    if provider == 's3':
        try:
            import boto3
            s3 = boto3.client('s3')
            if not bucket:
                raise ValueError('bucket required for s3')
            obj = object_name or p.name
            s3.upload_file(str(p), bucket, obj)
            url = f's3://{bucket}/{obj}'
            return url
        except Exception as e:
            print('S3 upload failed:', e)
            raise
    elif provider == 'gcs':
        try:
            from google.cloud import storage
            if not bucket:
                raise ValueError('bucket required for gcs')
            client = storage.Client()
            bucket_obj = client.bucket(bucket)
            blob = bucket_obj.blob(object_name or p.name)
            blob.upload_from_filename(str(p))
            return f'gs://{bucket}/{blob.name}'
        except Exception as e:
            print('GCS upload failed:', e)
            raise
    else:
        raise ValueError('Unknown provider')


def upload_and_record(mid, provider='s3', bucket=None, object_name=None):
    if not MANIFEST_PATH.exists():
        raise FileNotFoundError('Manifest missing')
    m = json.loads(MANIFEST_PATH.read_text())
    entry = m.get(str(mid))
    if not entry or not entry.get('downloaded_path'):
        raise ValueError('No downloaded_path for manifest entry')
    path = entry['downloaded_path']
    url = upload_artifact(path, provider=provider, bucket=bucket, object_name=object_name)
    entry['uploaded_url'] = url
    m[str(mid)] = entry
    MANIFEST_PATH.write_text(json.dumps(m, indent=2))
    print('Uploaded', mid, '->', url)
    return url

print('Upload helper loaded. Use upload_and_record(mid, provider, bucket) to upload and record in manifest.')

In [None]:
#@title Advanced persistent queue worker (resume, backoff retries, progress persistence)
"""
This worker improves upon the earlier implementation by:
- Persisting queue state to DRIVE_ROOT/.queue_state.json so the queue can be resumed after kernel reconnects
- Per-item progress tracking (bytes downloaded and percent when content-length present)
- Exponential backoff retry strategy configurable per-call
- get_queue_status callback to return the current queue state for the UI to poll

Usage: use the rich HTML dashboard below which polls `notebook.get_queue_status` and calls
`notebook.start_queue` and `notebook.stop_queue` to control the worker.
"""
import threading, time, json, os, traceback
from pathlib import Path
import requests
import hashlib
from google.colab import output

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
MANIFEST_PATH = Path(DRIVE_ROOT)/'manifests'/'civitai_model_manifests.json'
QUEUE_STATE_PATH = Path(DRIVE_ROOT)/'.queue_state.json'

worker_thread = None
worker_stop_event = threading.Event()
worker_lock = threading.Lock()

# Helper: read/write queue state

def load_queue_state():
    if QUEUE_STATE_PATH.exists():
        try:
            return json.loads(QUEUE_STATE_PATH.read_text())
        except Exception:
            return {'items':{}}
    return {'items':{}}

def write_queue_state(state):
    QUEUE_STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
    QUEUE_STATE_PATH.write_text(json.dumps(state, indent=2))

# initialize state if missing
if not QUEUE_STATE_PATH.exists():
    write_queue_state({'items':{}})

# improved header normalization

def normalize_header_hash(hdr):
    if not hdr:
        return None
    h = str(hdr).strip().strip('"')
    if '-' in h:
        return {'value':h,'type':'etag-multipart'}
    if len(h) == 32 and all(c in '0123456789abcdefABCDEF' for c in h):
        return {'value':h.lower(),'type':'md5'}
    if len(h) == 64 and all(c in '0123456789abcdefABCDEF' for c in h):
        return {'value':h.lower(),'type':'sha256'}
    # provider-specific heuristics (very basic): strip surrounding 'W/' or other prefixes
    h2 = h.replace('W/','').strip()
    if len(h2) in (32,64) and all(c in '0123456789abcdefABCDEF' for c in h2):
        t = 'md5' if len(h2)==32 else 'sha256'
        return {'value':h2.lower(),'type':t}
    return {'value':h,'type':'unknown'}

# download with progress and state updates

def compute_sha256(path):
    h = hashlib.sha256()
    with open(path, 'rb') as fh:
        for chunk in iter(lambda: fh.read(1024*1024), b''):
            h.update(chunk)
    return h.hexdigest()


def update_item_state(mid, **kwargs):
    state = load_queue_state()
    items = state.setdefault('items',{})
    item = items.setdefault(str(mid), {})
    item.update(kwargs)
    write_queue_state(state)


def download_item_with_progress(mid, entry, retry_count=2, stop_event=None):
    token = os.environ.get('CIVITAI_API_TOKEN','')
    s = requests.Session()
    s.headers.update({'User-Agent':'ComfyUI-Playground/1.0'})
    url_tpl = entry.get('download_url','')
    if not url_tpl:
        update_item_state(mid, status='no_url')
        return False
    url = url_tpl.replace('$CIVITAI_API_TOKEN', token)
    final_url = entry.get('resolved_url') or url
    filename = entry.get('resolved_filename') or final_url.split('/')[-1].split('?')[0]
    suggested_tpl = entry.get('recommended_dir_template','{MODELS_DIR}')
    mapping = {
        '{MODELS_DIR}': globals().get('MODELS_DIR', str(Path(DRIVE_ROOT)/'models')),
        '{LORAS_DIR}': globals().get('LORAS_DIR', str(Path(DRIVE_ROOT)/'models'/'loras')),
        '{GGUF_DIR}': globals().get('GGUF_DIR', str(Path(DRIVE_ROOT)/'models'/'gguf')),
        '{ARTIFACTS_DIR}': globals().get('ARTIFACTS_DIR', str(Path(DRIVE_ROOT)/'artifacts')),
    }
    for k,v in mapping.items():
        suggested_tpl = suggested_tpl.replace(k,v)
    target_dir = Path(suggested_tpl)
    target_dir.mkdir(parents=True, exist_ok=True)
    target_path = target_dir/filename

    attempts = 0
    backoff_base = 2
    while attempts <= retry_count:
        if stop_event and stop_event.is_set():
            update_item_state(mid, status='cancelled')
            return False
        try:
            update_item_state(mid, status='starting', attempts=attempts+1, filename=str(target_path))
            r = s.get(final_url, stream=True, timeout=60)
            r.raise_for_status()
            total = r.headers.get('Content-Length')
            if total:
                total = int(total)
            tmp = str(target_path) + '.part'
            downloaded = 0
            with open(tmp, 'wb') as fh:
                for chunk in r.iter_content(chunk_size=256*1024):
                    if chunk:
                        fh.write(chunk)
                        downloaded += len(chunk)
                        # update progress
                        if total:
                            pct = int(downloaded*100/total)
                        else:
                            pct = None
                        update_item_state(mid, status='downloading', bytes=downloaded, bytes_total=total, percent=pct)
                        if stop_event and stop_event.is_set():
                            update_item_state(mid, status='cancelled')
                            return False
            Path(tmp).rename(target_path)
            sha = compute_sha256(target_path)
            entry['downloaded_path'] = str(target_path)
            entry['sha256'] = sha
            entry['file_size_bytes'] = target_path.stat().st_size
            entry['download_status'] = 'ok'
            # server hash check
            hdr = entry.get('server_hash')
            sh = normalize_header_hash(hdr)
            if sh:
                if sh['type'] == 'sha256':
                    entry.setdefault('integrity',{})
                    entry['integrity']['server_hash'] = sh['value']
                    entry['integrity']['server_hash_match'] = (sha == sh['value'])
                elif sh['type'] == 'md5':
                    import hashlib as _hashlib
                    m = _hashlib.md5()
                    with open(target_path, 'rb') as fh:
                        for chunk in iter(lambda: fh.read(1024*1024), b''):
                            m.update(chunk)
                    entry.setdefault('integrity',{})
                    entry['integrity']['server_hash'] = sh['value']
                    entry['integrity']['server_hash_match'] = (m.hexdigest() == sh['value'])
                else:
                    entry.setdefault('integrity',{})
                    entry['integrity']['server_hash'] = sh['value']
                    entry['integrity']['server_hash_match'] = 'unknown'
            # persist manifest incrementally
            if MANIFEST_PATH.exists():
                try:
                    with open(MANIFEST_PATH, 'r', encoding='utf-8') as fh:
                        m = json.load(fh)
                except Exception:
                    m = {}
            else:
                m = {}
            m[str(mid)] = entry
            with open(MANIFEST_PATH, 'w', encoding='utf-8') as fh:
                json.dump(m, fh, indent=2)
            update_item_state(mid, status='ok', sha256=sha, file_size=entry['file_size_bytes'])
            return True
        except Exception as e:
            attempts += 1
            last_err = str(e)
            update_item_state(mid, status='error', last_error=last_err, attempts=attempts)
            if attempts > retry_count:
                update_item_state(mid, status='failed', last_error=last_err)
                # persist failed state in manifest
                if MANIFEST_PATH.exists():
                    try:
                        with open(MANIFEST_PATH, 'r', encoding='utf-8') as fh:
                            m = json.load(fh)
                    except Exception:
                        m = {}
                    ent = m.get(str(mid), {})
                    ent['download_status'] = 'failed'
                    ent.setdefault('integrity',{})
                    ent['integrity']['last_error'] = last_err
                    m[str(mid)] = ent
                    with open(MANIFEST_PATH, 'w', encoding='utf-8') as fh:
                        json.dump(m, fh, indent=2)
                return False
            # exponential backoff
            sleep_for = backoff_base ** attempts
            update_item_state(mid, status='retrying', next_wait=sleep_for)
            time.sleep(sleep_for)
    return False

# worker loop that persists state

def worker_main(ids, retry_count=2, stop_event=None):
    print('Worker started for ids:', ids)
    state = load_queue_state()
    for mid in ids:
        if stop_event and stop_event.is_set():
            print('Stop requested — exiting worker')
            break
        # initialize state entry
        update_item_state(mid, status='queued', attempts=0)
    for mid in ids:
        # load latest manifest entry
        try:
            if MANIFEST_PATH.exists():
                with open(MANIFEST_PATH, 'r', encoding='utf-8') as fh:
                    manifest = json.load(fh)
            else:
                manifest = {}
            entry = manifest.get(str(mid), {})
            download_item_with_progress(mid, entry, retry_count=retry_count, stop_event=stop_event)
        except Exception as e:
            print('Worker error for', mid, e)
            traceback.print_exc()
    print('Worker finished')

# control functions

def start_queue_callback(ids, enable_retries=False, retry_count=2):
    global worker_thread, worker_stop_event
    with worker_lock:
        if worker_thread and worker_thread.is_alive():
            print('A worker is already running')
            return
        worker_stop_event = threading.Event()
        # ensure ids is a simple list of strings
        ids = [str(x) for x in ids]
        # persist initial queue state and items order
        st = load_queue_state()
        st['order'] = ids
        for i in ids:
            st.setdefault('items',{})
            st['items'].setdefault(str(i), {'status':'queued'})
        write_queue_state(st)
        worker_thread = threading.Thread(target=worker_main, args=(ids, retry_count, worker_stop_event), daemon=True)
        worker_thread.start()
        print('Started worker thread with ids:', ids)


def stop_queue_callback():
    global worker_thread, worker_stop_event
    with worker_lock:
        if worker_thread and worker_thread.is_alive():
            worker_stop_event.set()
            worker_thread.join(timeout=5)
            print('Worker stopped')
        else:
            print('No active worker')


def get_queue_status_callback():
    st = load_queue_state()
    return st

# register callbacks
output.register_callback('notebook.start_queue', start_queue_callback)
output.register_callback('notebook.stop_queue', stop_queue_callback)
output.register_callback('notebook.get_queue_status', get_queue_status_callback)
print('Advanced persistent worker callbacks registered: start_queue, stop_queue, get_queue_status')

In [None]:
#@title Downloader Dashboard — rich HTML UI with per-item progress
"""
Renders a richer HTML dashboard that polls `notebook.get_queue_status` for per-item progress
and displays progress bars. It also exposes Start/Stop and retry controls.

Note: polling frequency is limited to avoid spamming the kernel. The authoritative logs
and the final state are saved to DRIVE_ROOT/.queue_state.json — the UI reflects that state.
"""
from google.colab import output
from IPython.display import display, HTML
import json

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
MANIFEST_PATH = f"{DRIVE_ROOT}/manifests/civitai_model_manifests.json"

# Build the HTML template with a placeholder for the list; the UI will ask Python for the manifest
html = r'''
<div style="font-family: Roboto, Arial; padding:10px; border:1px solid #ddd; border-radius:6px;">
  <h3>Downloader Dashboard</h3>
  <div style="margin-bottom:8px">
    <button id="dd-start">Start Selected</button>
    <button id="dd-stop">Stop</button>
    <label style="margin-left:10px">Retries: <input id="dd-retries" type="number" value="2" style="width:50px"></label>
  </div>
  <div id="dd-list" style="max-height:360px; overflow:auto; border-top:1px solid #eee; padding-top:8px"></div>
  <div id="dd-status" style="margin-top:8px; font-size:0.9em; color:#333"></div>
</div>
<script>
async function fetchManifest() {
  // request the Python side for current manifest keys + resolved filenames
  const resp = await google.colab.kernel.invokeFunction('notebook.get_queue_status', [], {});
  const state = resp.data['application/json'];
  const items = state.items || {};
  const list = document.getElementById('dd-list');
  list.innerHTML = '';
  for (const id of Object.keys(items)){
    const it = items[id];
    const div = document.createElement('div');
    div.style.padding = '6px 4px';
    div.style.borderBottom = '1px solid #f4f4f4';
    const title = document.createElement('div');
    title.innerHTML = `<strong>${id}</strong> — ${it.filename || ''}`;
    div.appendChild(title);
    const bar = document.createElement('div');
    bar.style.height = '10px';
    bar.style.width = '100%';
    bar.style.background = '#eee';
    bar.style.borderRadius = '6px';
    const inner = document.createElement('div');
    inner.style.height = '100%';
    inner.style.width = (it.percent? it.percent+'%': '0%');
    inner.style.background = (it.status==='ok'? '#4caf50' : (it.status==='failed'? '#d9534f' : '#0b74de'));
    inner.style.borderRadius = '6px';
    inner.style.transition = 'width 0.4s ease';
    bar.appendChild(inner);
    div.appendChild(bar);
    const meta = document.createElement('div');
    meta.style.fontSize='0.85em';
    meta.style.color='#666';
    meta.innerText = `Status: ${it.status || 'n/a'} | Attempts: ${it.attempts || 0} | ${it.bytes || 0}/${it.bytes_total || 0}`;
    div.appendChild(meta);
    list.appendChild(div);
  }
}

// initial poll
fetchManifest();
// poll periodically
setInterval(fetchManifest, 2000);

// start/stop handlers with simple selection of queued ids (transition: select all with status queued)
document.getElementById('dd-start').onclick = async () => {
  const reps = await google.colab.kernel.invokeFunction('notebook.get_queue_status', [], {});
  const st = reps.data['application/json'];
  const ids = Object.keys(st.items || {});
  const retries = parseInt(document.getElementById('dd-retries').value || '2');
  google.colab.kernel.invokeFunction('notebook.start_queue', [ids, true, retries], {});
  document.getElementById('dd-status').innerText = 'Started queue for ids: ' + ids.join(', ');
};

document.getElementById('dd-stop').onclick = () => {
  google.colab.kernel.invokeFunction('notebook.stop_queue', [], {});
  document.getElementById('dd-status').innerText = 'Stop requested';
};
</script>
'''

display(HTML(html))
print('Downloader Dashboard rendered — it polls queue state and shows progress bars. Use Start/Stop to control the worker.')

In [None]:
#@title Guarded subprocess utilities (replace shell-magic with subprocess wrappers)
"""
Provides helper functions to run shell commands in a guarded way so static checkers
and Python linters don't complain about Jupyter bang (!) commands. These helpers
capture stdout/stderr, enforce timeouts, and optionally run as background processes.
"""
import subprocess, shlex, threading, time
from pathlib import Path


def run_cmd(cmd, timeout=600, shell=False, env=None, cwd=None):
    """Run a command and return (returncode, stdout, stderr)."""
    if isinstance(cmd, str) and not shell:
        cmd = shlex.split(cmd)
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env, cwd=cwd, shell=shell)
    try:
        out, err = proc.communicate(timeout=timeout)
    except subprocess.TimeoutExpired:
        proc.kill()
        out, err = proc.communicate()
        return proc.returncode or -9, out, err
    return proc.returncode, out, err


def run_cmd_background(cmd, on_stdout=None, on_stderr=None, env=None, cwd=None):
    """Run a command in background and stream stdout/stderr to callbacks.
    Returns a handle with a terminate() method."""
    if isinstance(cmd, str):
        cmd = shlex.split(cmd)
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env, cwd=cwd)
    stopped = {'stop': False}

    def reader(pipe, cb):
        try:
            for line in iter(pipe.readline, ''):
                if cb:
                    cb(line)
                if stopped['stop']:
                    break
        finally:
            pipe.close()

    t_out = threading.Thread(target=reader, args=(proc.stdout, on_stdout), daemon=True)
    t_err = threading.Thread(target=reader, args=(proc.stderr, on_stderr), daemon=True)
    t_out.start(); t_err.start()

    def terminate():
        stopped['stop'] = True
        try:
            proc.terminate()
        except Exception:
            pass
    return {'proc': proc, 'terminate': terminate}

print('Guarded subprocess utilities loaded (run_cmd, run_cmd_background)')

In [None]:
#@title SQLite-backed queue store & per-item cancel API
"""
Implements a small SQLite-backed queue to persist items and allow per-item cancellations.
This cell exposes simple helpers to enqueue items, cancel them by id, and query the queue.
It integrates with the advanced worker by reading the SQLite queue if present.
"""
import sqlite3
from pathlib import Path
import json, time

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
DB_PATH = Path(DRIVE_ROOT)/'comfyui_queue.db'
DB_PATH.parent.mkdir(parents=True, exist_ok=True)

conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
conn.execute('''CREATE TABLE IF NOT EXISTS queue (
    id TEXT PRIMARY KEY,
    manifest_key TEXT,
    status TEXT,
    attempts INTEGER DEFAULT 0,
    last_error TEXT,
    created_at REAL,
    updated_at REAL
)''')
conn.commit()


def enqueue_item(manifest_key):
    mid = str(int(time.time()*1000))
    now = time.time()
    conn.execute('INSERT OR REPLACE INTO queue (id,manifest_key,status,attempts,created_at,updated_at) VALUES (?,?,?,?,?,?)', (mid, str(manifest_key), 'queued', 0, now, now))
    conn.commit()
    return mid


def cancel_item(id):
    conn.execute('UPDATE queue SET status=?, updated_at=? WHERE id=?', ('cancelled', time.time(), id))
    conn.commit()


def get_queue_items(limit=100):
    cur = conn.execute('SELECT id, manifest_key, status, attempts, last_error, created_at, updated_at FROM queue ORDER BY created_at LIMIT ?', (limit,))
    rows = cur.fetchall()
    items = []
    for r in rows:
        items.append({'id':r[0],'manifest_key':r[1],'status':r[2],'attempts':r[3],'last_error':r[4],'created_at':r[5],'updated_at':r[6]})
    return items

print('SQLite queue helper loaded. DB at', DB_PATH)
print('Useful functions: enqueue_item(manifest_key), cancel_item(id), get_queue_items()')

In [None]:
#@title Prompt-to-Image HTML UI + ComfyUI runner helper
"""
This UI now uses the composer to build a ComfyUI-style flow and returns the generated flow JSON
so the front-end can preview it before the user decides to execute or save it.
It accepts an optional `prompt_template` parameter (default='{prompt}') so callers can inject
transcriptions or other text into a templated prompt (e.g. "A photorealistic portrait of {prompt}").
"""
from IPython.display import display, HTML
from google.colab import output
import json, os, time

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
FLOWS_DIR = os.path.join(DRIVE_ROOT, 'flows')
os.makedirs(FLOWS_DIR, exist_ok=True)

# fetch available flow files
flows = []
for f in os.listdir(FLOWS_DIR):
    if f.endswith('.json'):
        flows.append(f)

# basic HTML UI
html = f'''
<div style="font-family: Roboto, Arial; padding:8px; border:1px solid #ddd; border-radius:6px;">
  <h3>Prompt → Image</h3>
  <div>
    <label>Prompt</label><br>
    <textarea id="p-prompt" style="width:98%; height:80px"></textarea>
  </div>
  <div style="margin-top:6px">
    <label>Model (manifest key)</label><br>
    <input id="p-model" style="width:200px" placeholder="manifest key or model name" />
    <label style="margin-left:10px">Workflow</label>
    <select id="p-flow">
      <option value="">-- choose saved flow --</option>
      {''.join([f"<option value='{f}'>{f}</option>" for f in flows])}
    </select>
  </div>
  <div style="margin-top:6px">
    <label>Sampler</label>
    <input id="p-sampler" value="DDIM" style="width:120px">
    <label style="margin-left:8px">Steps</label>
    <input id="p-steps" value="20" style="width:80px">
    <label style="margin-left:8px">Seed</label>
    <input id="p-seed" value="-1" style="width:100px">
    <label style="margin-left:8px"><input type=checkbox id='p-upscale' /> Upscale</label>
  </div>
  <div style="margin-top:6px">
    <label style="margin-right:8px">Prompt template</label>
    <input id="p-template" value="{prompt}" style="width:60%" />
  </div>
  <div style="margin-top:8px">
    <button id="p-run">Generate</button>
    <button id="p-run-run">Compose & Run (attempt API)</button>
    <span id="p-status" style="margin-left:12px; color:#333"></span>
  </div>
  <div id="p-output" style="margin-top:10px; white-space:pre-wrap; font-family:monospace; max-height:320px; overflow:auto"></div>
</div>
<script>
  async function showResult(resp){
    const output = document.getElementById('p-output');
    try{
      const data = resp.data['application/json'];
      output.innerText = JSON.stringify(data, null, 2);
    }catch(e){
      output.innerText = 'No response or failed to parse result.';
    }
  }

  document.getElementById('p-run').onclick = async () => {
    const prompt = document.getElementById('p-prompt').value;
    const model = document.getElementById('p-model').value;
    const flow = document.getElementById('p-flow').value;
    const sampler = document.getElementById('p-sampler').value;
    const steps = parseInt(document.getElementById('p-steps').value || '20');
    const seed = parseInt(document.getElementById('p-seed').value || '-1');
    const upscale = document.getElementById('p-upscale').checked;
    const template = document.getElementById('p-template').value || '{prompt}';
    document.getElementById('p-status').innerText = 'Composing flow...';
    google.colab.kernel.invokeFunction('notebook.prompt_to_image', [prompt, model, flow, sampler, steps, seed, upscale, 'compose', template], {})
      .then(showResult);
  };

  document.getElementById('p-run-run').onclick = async () => {
    const prompt = document.getElementById('p-prompt').value;
    const model = document.getElementById('p-model').value;
    const flow = document.getElementById('p-flow').value;
    const sampler = document.getElementById('p-sampler').value;
    const steps = parseInt(document.getElementById('p-steps').value || '20');
    const seed = parseInt(document.getElementById('p-seed').value || '-1');
    const upscale = document.getElementById('p-upscale').checked;
    const template = document.getElementById('p-template').value || '{prompt}';
    document.getElementById('p-status').innerText = 'Composing and calling ComfyUI API...';
    google.colab.kernel.invokeFunction('notebook.prompt_to_image', [prompt, model, flow, sampler, steps, seed, upscale, 'run', template], {})
      .then(showResult);
  };
</script>
'''


display(HTML(html))

# Python callback implementation

try:
    from . import compose_flow as _compose_flow  # try package style if notebook packaged
    compose = _compose_flow
except Exception:
    compose = globals().get('compose_flow')

if not compose:
    print('Warning: compose_flow not available in this kernel namespace')

# New: support prompt_template parameter

def prompt_to_image(prompt, model_key, flow_file, sampler, steps, seed, upscale, action='compose', prompt_template='{prompt}'):
    # Apply template
    try:
        final_prompt = prompt_template.format(prompt=prompt)
    except Exception:
        # fallback: simple replace
        final_prompt = prompt_template.replace('{prompt}', prompt)

    # compose a comfyui-style flow by default
    path, flow = compose(final_prompt, model_key=model_key or None, sampler=sampler, steps=steps, seed=seed, lora=None, upscaler=(2 if upscale else None), format='comfyui')

    if action == 'compose':
        return {'flow_path': path, 'flow': flow}

    # action == 'run' => attempt to call ComfyUI API
    base = os.environ.get('COMFYUI_PUBLIC_URL') or os.environ.get('COMFYUI_API_BASE') or 'http://127.0.0.1:8188'
    api_key = os.environ.get('COMFYUI_API_KEY')
    headers = {'User-Agent':'ComfyUI-Playground/1.0', 'Content-Type':'application/json'}
    if api_key:
        headers['Authorization'] = f'Bearer {api_key}'

    try:
        import requests
        url = base.rstrip('/') + '/run_flow'
        r = requests.post(url, json=flow, headers=headers, timeout=120)
        try:
            j = r.json()
        except Exception:
            j = {'status_code': r.status_code, 'text': r.text}
        return {'api_status': r.status_code, 'api_response': j, 'flow_path': path}
    except Exception as e:
        return {'error': str(e), 'flow_path': path}

# register callback
output.register_callback('notebook.prompt_to_image', prompt_to_image)
print('Prompt-to-Image UI ready (callback: notebook.prompt_to_image).')

In [None]:
#@title Register per-item cancel callback (links dashboard -> SQLite cancel)
"""
Registers a kernel callback 'notebook.cancel_item' which cancels a given queue item id
in the SQLite queue and updates the JSON queue state file.
"""
from google.colab import output
from pathlib import Path
import json, time

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
QUEUE_STATE_PATH = Path(DRIVE_ROOT)/'.queue_state.json'
DB_PATH = Path(DRIVE_ROOT)/'comfyui_queue.db'

# import cancel_item from the sqlite helper cell's namespace if available; otherwise define a fallback
try:
    cancel_item
except NameError:
    def cancel_item(id):
        # fallback: mark in queue_state
        if QUEUE_STATE_PATH.exists():
            st = json.loads(QUEUE_STATE_PATH.read_text())
        else:
            st = {'items':{}}
        st.setdefault('items',{})
        st['items'].setdefault(str(id),{})
        st['items'][str(id)]['status'] = 'cancelled'
        st['items'][str(id)]['updated_at'] = int(time.time())
        QUEUE_STATE_PATH.write_text(json.dumps(st, indent=2))
        print('Fallback: cancelled', id)


def cancel_item_callback(item_id):
    try:
        cancel_item(item_id)
        # also update JSON state if present
        if QUEUE_STATE_PATH.exists():
            st = json.loads(QUEUE_STATE_PATH.read_text())
            st.setdefault('items',{})
            st['items'].setdefault(str(item_id),{})
            st['items'][str(item_id)]['status'] = 'cancelled'
            st['items'][str(item_id)]['updated_at'] = int(time.time())
            QUEUE_STATE_PATH.write_text(json.dumps(st, indent=2))
        print('Cancelled item', item_id)
    except Exception as e:
        print('Failed to cancel item', item_id, e)

output.register_callback('notebook.cancel_item', cancel_item_callback)
print('Registered kernel callback: notebook.cancel_item')

In [None]:
#@title Complex node helpers — generate ComfyUI node snippets and compose flows
"""
Provides helper functions to generate reusable node snippets and to compose a flow JSON
compatible with ComfyUI's typical on-disk format (a minimal, widely-used subset).
The composer supports two modes:
 - format='simple'  : legacy minimal list-of-nodes (backwards compatible)
 - format='comfyui' : produces a dict with 'nodes' mapping node-id -> node-dict

The produced "comfyui" nodes use conventional node type names found in many ComfyUI flows
(e.g. "CLIPTextEncode", "CheckpointLoaderSimple", "KSampler", "LoraLoader", "SaveImage",
"LatentUpscaler"). These are widely compatible but you can tweak names to match your exact install.
"""
import json, time, os
from pathlib import Path

DRIVE_ROOT = globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')
FLOWS_DIR = Path(DRIVE_ROOT)/'flows'
FLOWS_DIR.mkdir(parents=True, exist_ok=True)

_node_counter = 1000

def _next_node_id():
    global _node_counter
    _node_counter += 1
    return _node_counter

# --- simple (previous) helper nodes (kept for backwards compatibility)

def make_text_encoder_node(prompt):
    nid = _next_node_id()
    return {
        'id': nid,
        'type': 'TextEncoder',
        'args': {'prompt': prompt}
    }


def make_sampler_node(sampler='DDIM', steps=20, seed=-1):
    nid = _next_node_id()
    return {
        'id': nid,
        'type': 'Sampler',
        'args': {'sampler': sampler, 'steps': steps, 'seed': seed}
    }


def make_lora_node(lora_path, weight=0.8):
    nid = _next_node_id()
    return {
        'id': nid,
        'type': 'LoRA',
        'args': {'path': lora_path, 'weight': weight}
    }


def make_upscaler_node(scale=2):
    nid = _next_node_id()
    return {
        'id': nid,
        'type': 'Upscaler',
        'args': {'scale': scale}
    }

# --- ComfyUI-style node builders

def _node_id_str():
    return str(_next_node_id())


def comfy_checkpoint_loader(model_key=None):
    nid = _node_id_str()
    # CheckpointLoaderSimple usually references a model name or path; adapt as needed
    return nid, {
        'id': nid,
        'type': 'CheckpointLoaderSimple',
        'name': 'CheckpointLoaderSimple',
        'args': {'ckpt_name': model_key or ''},
        'inputs': {}
    }


def comfy_clip_text_encode(prompt):
    nid = _node_id_str()
    return nid, {
        'id': nid,
        'type': 'CLIPTextEncode',
        'name': 'CLIPTextEncode',
        'args': {'text': prompt},
        'inputs': {}
    }


def comfy_ksampler(sampler='DDIM', steps=20, seed=-1):
    nid = _node_id_str()
    return nid, {
        'id': nid,
        'type': 'KSampler',
        'name': 'KSampler',
        'args': {'sampler_name': sampler, 'steps': steps, 'seed': seed},
        'inputs': {}
    }


def comfy_lora_loader(lora_path, weight=0.8):
    nid = _node_id_str()
    return nid, {
        'id': nid,
        'type': 'LoraLoader',
        'name': 'LoraLoader',
        'args': {'path': lora_path, 'weight': weight},
        'inputs': {}
    }


def comfy_latent_upscaler(scale=2):
    nid = _node_id_str()
    return nid, {
        'id': nid,
        'type': 'LatentUpscaler',
        'name': 'LatentUpscaler',
        'args': {'scale': scale},
        'inputs': {}
    }


def comfy_save_image(filename=None):
    nid = _node_id_str()
    return nid, {
        'id': nid,
        'type': 'SaveImage',
        'name': 'SaveImage',
        'args': {'path': filename or ''},
        'inputs': {}
    }

# Compose flow with two modes

def compose_flow(prompt, model_key=None, sampler='DDIM', steps=20, seed=-1, lora=None, upscaler=None, format='comfyui'):
    """Compose a flow and write it to disk. Returns (path, flow_dict).

    format: 'simple' or 'comfyui'
    """
    if format not in ('simple', 'comfyui'):
        raise ValueError('Unsupported format')

    if format == 'simple':
        nodes = []
        enc = make_text_encoder_node(prompt)
        nodes.append(enc)
        samp = make_sampler_node(sampler=sampler, steps=steps, seed=seed)
        nodes.append(samp)
        if lora:
            nodes.append(make_lora_node(lora))
        if upscaler:
            nodes.append(make_upscaler_node(upscaler))
        flow = {'metadata': {'created': int(time.time()), 'prompt': prompt, 'model_key': model_key}, 'nodes': nodes}
    else:
        nodes = {}
        # Create core nodes: checkpoint loader, clip encode, sampler, optionally lora, upscaler, saver
        ck_id, ck_node = comfy_checkpoint_loader(model_key)
        nodes[ck_id] = ck_node
        txt_id, txt_node = comfy_clip_text_encode(prompt)
        nodes[txt_id] = txt_node
        samp_id, samp_node = comfy_ksampler(sampler=sampler, steps=steps, seed=seed)
        nodes[samp_id] = samp_node
        # Hook up a basic conceptual wiring via 'inputs' (note: this is a minimal representation;
        # full ComfyUI flows may use connection maps — here we provide args and a simple structure
        # that many importers can accept or that can be further edited in the ComfyUI graph UI).
        if lora:
            l_id, l_node = comfy_lora_loader(lora)
            nodes[l_id] = l_node
        if upscaler:
            u_id, u_node = comfy_latent_upscaler(upscaler)
            nodes[u_id] = u_node
        save_name = f'generated_{int(time.time())}.png'
        s_id, s_node = comfy_save_image(save_name)
        nodes[s_id] = s_node

        flow = {'metadata': {'created': int(time.time()), 'prompt': prompt, 'model_key': model_key, 'comfyui_format': True}, 'nodes': nodes}

    fname = FLOWS_DIR / f'flow_generated_{int(time.time())}.json'
    with open(fname, 'w', encoding='utf-8') as fh:
        json.dump(flow, fh, indent=2)
    return str(fname), flow

print('Complex node helpers loaded. Use compose_flow(prompt, model_key, ..., format="comfyui") to create a ComfyUI-compatible flow JSON.')

## Voice — TTS / STT research and workflows

This section provides: research notes on open-source SOTA Speech-To-Text (STT) and Text-To-Speech (TTS) models; runnable helper cells for STT and TTS workflows (simple and advanced); voice cloning and audiobooks; GUI cells to provide prompt input and source selection; utility cells to ingest text from files (txt, epub, pdf), VTT captions, web pages; and unit tests.

Goals:
- Provide fast, practical cells for: quick STT from MP3/WEBM, high-quality TTS to MP3 using best open-source engines, voice cloning pipelines, and audiobook generation (chunking + expressive prosody).
- Expose an HTML UI to enter prompt/text, choose voice/engine, and run either STT or TTS in the notebook.
- Offer advanced recipes (multi-engine ensemble, noise-aware STT pre-processing, denoising, chunking heuristics, adaptive prosody stitches).

Notes on SOTA (short, refer to cells below for references):
- STT: Whisper (OpenAI) variants remain strong baseline; open-source high-accuracy alternatives include WhisperX (forced alignment & VAD), OpenAI-like reimpls and models like Vosk (on-device), Silero, and recent large-conformer models trained on multilingual corpora (e.g., Whisper L models still competitive). Self-supervised encoders such as Wav2Vec2.x (Facebook/X) + fine-tuned decoders are also widely used.
- TTS: Neural vocoders + prosody models: VITS, GlowTTS, FastSpeech2, and open-source high-quality stacks like Tortoise TTS, Bark, and Coqui TTS. Tortoise and Bark provide multi-voice, expressive TTS with conditioning; VITS provides high-quality end-to-end voice synthesis when trained for a voice.
- Voice cloning: techniques like speaker embedding extraction (GE2E, ECAPA-TDNN), fine-tuning a VITS model, or using zero-shot cloning via speaker encoders (e.g., SpeakerNet + VITS/HiFi-GAN vocoder) exist. Recent open-source tools offering near-zero-shot cloning include Bark/Corpora-based methods and open-source projects like RVC and YourTTS.
- Processing tips: chunk long text (200-500 tokens) for TTS, maintain sentence boundaries, add SSML-like prosody hints (pauses, emphasis). For STT, use VAD to chunk audio and denoise (spectral gating) for improved accuracy.

We'll add runnable example cells below: simple STT (Whisper), advanced STT (WhisperX or VAD+Whisper), basic TTS (Coqui or Tortoise), high-quality audiobook pipeline (chunking + expressive engine), voice cloning recipe using open-source RVC or YourTTS, and a GUI.


In [None]:
#@title Voice helpers — install/check (fast checks, install only when needed)
"""
Helpers to install or import key TTS/STT libraries. In Colab you may want to pip-install only what's necessary.
This cell avoids heavy installs on every run; instead it defines `ensure_packages(packages)` helper.
"""
import importlib, subprocess, sys

def ensure_packages(packages):
    """packages: list of pip-style strings e.g. ['openai-whisper','whisperx']"""
    missing = []
    for pkg in packages:
        name = pkg.split('==')[0].split('>=')[0]
        try:
            importlib.import_module(name)
        except Exception:
            missing.append(pkg)
    if not missing:
        return {'installed':[], 'status':'all-present'}
    cmd = [sys.executable, '-m', 'pip', 'install'] + missing
    print('Installing:', missing)
    subprocess.check_call(cmd)
    return {'installed': missing, 'status':'installed'}

print('Voice helpers loaded. Use ensure_packages([...]) to install packages on demand.')


In [None]:
#@title Simple STT — OpenAI Whisper (fast baseline)
"""
This cell provides a small function `whisper_stt(audio_path, model='small')` that transcribes an audio file.
It uses the `whisper` pip package (OpenAI's whisper). For higher accuracy consider using 'large' models or the
`whisperx` package for alignment and improved timestamps.
"""
import os
from pathlib import Path

def whisper_stt(audio_path, model='small', language=None, task='transcribe'):
    try:
        import whisper
    except Exception:
        raise RuntimeError('whisper not installed. Run ensure_packages(["openai-whisper"])')
    m = whisper.load_model(model)
    res = m.transcribe(str(audio_path), language=language, task=task)
    return res

print('whisper_stt available: whisper_stt(path, model="small|medium|large")')


In [None]:
#@title Advanced STT — VAD + WhisperX (better timestamps & alignment)
"""
This cell shows a recommended pattern: run a VAD pass to split audio into segments, then transcribe each
with Whisper (or WhisperX) and optionally run alignment for word-level timestamps.
"""
import os
from pathlib import Path

def stt_with_vad_and_whisperx(audio_path, vad_model='silero_vad', whisper_model='small'):
    # Minimal example that expects whisper and, optionally, whisperx to be installed.
    try:
        import whisper
    except Exception:
        raise RuntimeError('whisper not installed. Run ensure_packages(["openai-whisper"])')
    try:
        import webrtcvad
    except Exception:
        webrtcvad = None
    # For simplicity, fallback to whole-file transcription when VAD not available.
    m = whisper.load_model(whisper_model)
    if webrtcvad is None:
        return m.transcribe(str(audio_path))
    # Placeholder: for more advanced use, use whisperx or pyannote for segmentation
    return m.transcribe(str(audio_path))

print('Advanced STT helper loaded (simple fallback to whisper if VAD not installed).')


In [None]:
#@title Simple TTS — Coqui / Tortoise / Bark quick path
"""
Provides a helper `tts_simple(text, voice='default', engine='coqui', out_path='out.wav', **kwargs)`.
Supports lightweight Coqui TTS, and placeholders/wrappers for Tortoise and Bark when installed.
If the requested engine is not installed the function raises a helpful error explaining the
recommended install steps (heavy installs are left to the user to run once).
"""
import os
from pathlib import Path


def tts_simple(text, voice='default', engine='coqui', out_path='out.wav', **kwargs):
    """Synthesize `text` to `out_path` using selected engine.

    engines supported:
      - 'coqui'  : uses the `TTS` pip package (lightweight, easy to install)
      - 'tortoise': uses Tortoise TTS (heavy; GPU recommended). If not installed, raises instructions.
      - 'bark'   : uses Suno/Bark style zero-shot TTS (heavy; instructive error if missing)

    kwargs: engine-specific options (style, sample_rate, etc.)
    """
    out_path = str(out_path)
    if engine == 'coqui':
        try:
            from TTS.api import TTS
        except Exception:
            raise RuntimeError("Coqui TTS not installed. Run: pip install TTS\nThen pick a model name or use default: TTS('tts_models/en/ljspeech/tacotron2-DDC')")
        # If voice equals a model name recognized by Coqui, pass it; otherwise TTS() will pick default
        try:
            tts = TTS(voice) if voice and voice != 'default' else TTS()
            tts.tts_to_file(text=text, file_path=out_path)
            return out_path
        except Exception as e:
            raise RuntimeError(f'Coqui TTS synthesis failed: {e}')

    elif engine == 'tortoise':
        # Tortoise requires a separate heavy install; provide a helpful message if missing
        try:
            # Tortoise's API is often provided via `tortoise.api` or `tortoise` namespace depending on install
            from tortoise.api import TextToSpeech
        except Exception:
            raise RuntimeError(
                "Tortoise TTS not found. It's a heavy install (models + deps).\n"
                "Follow the Tortoise README: clone the repo, install requirements, download models.\n"
                "Example (in Colab):\n"
                "  git clone https://github.com/neonbjb/tortoise-tts.git\n"
                "  pip install -r tortoise-tts/requirements.txt\n"
                "Then import tortoise.api.TextToSpeech and call it. GPU recommended for speed.")
        try:
            tts = TextToSpeech()
            # Tortoise APIs vary; this is a placeholder showing expected usage
            wav = tts.synthesize(text, voice=voice, **kwargs)
            # write wav (assume numpy array and 16-bit PCM)
            import soundfile as sf
            sf.write(out_path, wav, 24000)
            return out_path
        except Exception as e:
            raise RuntimeError(f'Tortoise TTS synthesis failed: {e}')

    elif engine == 'bark':
        # Bark (suno) usage example wrapper
        try:
            # suno/bark uses `bark.generation` and provides a generate_audio API in many installs
            from bark import generate_audio
        except Exception:
            raise RuntimeError(
                "Bark not installed. To use Bark, follow the Bark repo install instructions.\n"
                "Example: pip install git+https://github.com/suno-ai/bark.git (plus model downloads).\n"
                "Bark provides high-quality multi-voice zero-shot synthesis; GPU recommended.")
        try:
            # generate_audio may accept text and voice/style; adapt per installed version
            audio_array = generate_audio(text)
            import soundfile as sf
            sf.write(out_path, audio_array, 24000)
            return out_path
        except Exception as e:
            raise RuntimeError(f'Bark synthesis failed: {e}')

    else:
        raise ValueError(f'Unknown TTS engine: {engine}')

print('tts_simple available: tts_simple("Hello", engine="coqui|tortoise|bark", out_path="/content/out.wav")')


In [None]:
#@title Audiobook pipeline — expressive preset for Tortoise (high-quality / GPU recommended)
"""
Adds `text_to_expressive_audiobook` which uses Tortoise (preferred) or Bark to produce expressive audiobook audio.
This function is optimized for quality (prosody hints, emotional style snippets) and assumes Tortoise is installed.
It will chunk text, add prosody hints, and call `tts_simple(..., engine='tortoise')` for each chunk.

WARNING: Tortoise and Bark require large model downloads and a GPU for reasonable performance.
"""
import os, re, time
from pathlib import Path


def _apply_prosody_hints(chunk, style='narration'):
    # Insert lightweight SSML-like hints for prosody that certain engines (Tortoise/Bark) can respect in prompt form.
    if style == 'narration':
        # gentle pauses, emphasis on punctuation
        return f"<speak><voice name=\"narrator\">{chunk}</voice></speak>"
    if style == 'dramatic':
        return f"<speak><voice name=\"dramatic\">{chunk}</voice></speak>"
    return chunk


def text_to_expressive_audiobook(src, engine='tortoise', voice='alloy', chunk_words=250, out_dir=None, style='narration'):
    txt = _read_text_input(src)
    # simple sentence splitting
    sentences = re.split(r'(?<=[.!?])\s+', txt)
    chunks = []
    cur = []
    cur_n = 0
    for s in sentences:
        cur.append(s)
        cur_n += len(s.split())
        if cur_n >= chunk_words:
            chunks.append(' '.join(cur))
            cur = []
            cur_n = 0
    if cur:
        chunks.append(' '.join(cur))

    od = Path(out_dir or (Path.cwd()/ f'expressive_audiobook_{int(time.time())}'))
    od.mkdir(parents=True, exist_ok=True)
    out_files = []
    for i,ch in enumerate(chunks):
        # apply prosody
        prompt = _apply_prosody_hints(ch, style=style)
        fname = od / f'chunk_{i:04d}.wav'
        try:
            tts_simple(prompt, voice=voice, engine=engine, out_path=str(fname), style=style)
            out_files.append(str(fname))
        except Exception as e:
            print('Chunk synth failed', i, e)
    # optional stitching using pydub
    try:
        from pydub import AudioSegment
        combined = AudioSegment.empty()
        for f in out_files:
            combined += AudioSegment.from_wav(f)
        out_mp3 = od / 'audiobook_expressive.mp3'
        combined.export(out_mp3, format='mp3')
        return str(out_mp3)
    except Exception:
        return out_files

print('Expressive audiobook preset loaded: text_to_expressive_audiobook(src, engine="tortoise")')


In [None]:
#@title Voice cloning recipe (RVC / YourTTS / zero-shot) — guidance & starter helper
"""
This cell contains a high-level wrapper and instructions for running a voice cloning workflow using RVC or YourTTS.
We provide a helper `prepare_voice_clone(sample_audio_path, method='rvc')` that checks for required packages and
returns a checklist of steps because full cloning requires training or model downloads which are heavy.
"""
import os
from pathlib import Path

def prepare_voice_clone(sample_audio, method='rvc'):
    info = {'method': method, 'sample': str(sample_audio), 'ok': False, 'notes': []}
    if method == 'rvc':
        # RVC typically requires PyTorch, ffmpeg and RVC repo; here we only verify availability
        try:
            import torch
            info['notes'].append('torch present')
        except Exception:
            info['notes'].append('torch missing')
        info['notes'].append('RVC repo and model downloads required; follow RVC README to clone and run training/inference')
    elif method == 'yourtts':
        info['notes'].append('YourTTS requires pretrained models and speaker encoder. See YourTTS repo for steps.')
    else:
        info['notes'].append('Unknown method')
    return info

print('Voice cloning helper available: prepare_voice_clone(path, method="rvc")')


In [None]:
#@title Text ingestion helpers — read TXT, EPUB, PDF, VTT, HTML
"""
Utilities to pull text from multiple sources so TTS workflows can accept many inputs (txt, epub, pdf, vtt, HTML/webpage).
"""
from pathlib import Path
import requests
from bs4 import BeautifulSoup


def read_text_source(path_or_url):
    p = str(path_or_url)
    # URL
    if p.startswith('http://') or p.startswith('https://'):
        r = requests.get(p, timeout=30)
        ct = r.headers.get('content-type','')
        if 'text/html' in ct or p.endswith('.html'):
            soup = BeautifulSoup(r.text, 'html.parser')
            return soup.get_text('\n')
        return r.text
    # local file
    pth = Path(p)
    if not pth.exists():
        raise FileNotFoundError(p)
    if pth.suffix.lower() == '.txt':
        return pth.read_text(encoding='utf-8')
    if pth.suffix.lower() in ('.epub',):
        try:
            from ebooklib import epub
            items = epub.read_epub(str(pth)).get_items_of_type(ebooklib.ITEM_DOCUMENT)
            parts = [it.get_content().decode('utf-8') for it in items]
            return '\n'.join(parts)
        except Exception:
            raise
    if pth.suffix.lower() in ('.pdf',):
        try:
            import PyPDF2
            text = []
            with open(pth,'rb') as fh:
                r = PyPDF2.PdfReader(fh)
                for page in r.pages:
                    text.append(page.extract_text() or '')
            return '\n'.join(text)
        except Exception:
            raise
    if pth.suffix.lower() in ('.vtt','.srt'):
        # simple VTT parser: extract text lines after timestamps
        lines = pth.read_text(encoding='utf-8').splitlines()
        out = []
        for L in lines:
            if '-->' in L or L.strip().isdigit() or L.strip()=='' or L.startswith('WEBVTT'):
                continue
            out.append(L)
        return '\n'.join(out)
    # default
    return pth.read_text(encoding='utf-8')

print('Text ingestion helpers loaded: read_text_source(path_or_url)')


In [None]:
#@title Voice GUI — small HTML UI for STT/TTS operations (with prompt templates)
"""
Lightweight HTML UI to select an audio file for STT or paste text for TTS. Includes a prompt-template dropdown
that can be applied when forwarding STT transcriptions to the Prompt→Image composer/runner. Templates are loaded
from Drive via the get_prompt_templates kernel callback and can be managed with the Manage Templates button.
"""
from IPython.display import display, HTML
from google.colab import output

html = r'''
<div style="font-family: Roboto, Arial; padding:10px; border:1px solid #ddd; border-radius:6px;">
  <h3>Voice Playground</h3>
  <div>
    <h4>Speech → Text (STT)</h4>
    <input id="stt-audio" type="text" placeholder="/path/to/file.mp3 or URL" style="width:60%"></input>
    <select id="stt-engine"><option value="whisper">whisper</option><option value="whisperx">whisperx</option></select>
    <label style="margin-left:8px"><input type="checkbox" id="stt-forward"/> Use transcription as Prompt→Image prompt</label>
    <label style="margin-left:8px">Template: </label>
    <select id="prompt-template">
      <option value="None">(none — use raw transcription)</option>
    </select>
    <button id="manage-templates" style="margin-left:6px">Manage Templates</button>
    <button id="stt-run">Transcribe</button>
    <div id="stt-out" style="white-space:pre-wrap; font-family:monospace; margin-top:8px; max-height:200px; overflow:auto"></div>
  </div>
  <hr/>
  <div>
    <h4>Text → Speech (TTS)</h4>
    <textarea id="tts-text" style="width:98%; height:80px"></textarea>
    <div style="margin-top:6px">
      <label>Engine</label>
      <select id="tts-engine"><option value="coqui">coqui</option><option value="tortoise">tortoise</option><option value="bark">bark</option></select>
      <label style="margin-left:10px">Voice</label>
      <input id="tts-voice" value="default" style="width:160px"/>
      <label style="margin-left:10px">Out file</label>
      <input id="tts-out" value="/content/out.wav" style="width:200px"/>
    </div>
    <div style="margin-top:6px">
      <button id="tts-run">Synthesize</button>
      <div id="tts-out-display" style="margin-top:8px"></div>
    </div>
  </div>
</div>

<script>
  async function populateTemplates(){
    try{
      const resp = await google.colab.kernel.invokeFunction('notebook.get_prompt_templates', [], {});
      const data = resp.data['application/json'];
      const sel = document.getElementById('prompt-template');
      // clear existing (preserve first option)
      sel.options.length = 1;
      if (data && data.templates){
        data.templates.forEach(t => {
          if (!t || !t.template) return;
          const opt = document.createElement('option');
          opt.value = t.template;
          opt.text = t.name + ' — ' + t.template;
          sel.appendChild(opt);
        });
      }
    }catch(e){
      console.error('Failed to load templates', e);
    }
  }

  async function showResult(resp, targetId){
    const output = document.getElementById(targetId);
    try{
      const data = resp.data['application/json'];
      output.innerText = JSON.stringify(data, null, 2);
    }catch(e){
      output.innerText = 'No response or failed to parse result.';
    }
  }

  document.getElementById('stt-run').onclick = async () => {
    const path = document.getElementById('stt-audio').value;
    const engine = document.getElementById('stt-engine').value;
    const forward = document.getElementById('stt-forward').checked;
    const template = document.getElementById('prompt-template').value;
    const resp = await google.colab.kernel.invokeFunction('notebook.voice_transcribe', [path, engine], {});
    showResult(resp, 'stt-out');
    if (forward){
      try{
        const res = resp.data['application/json'];
        const text = (res.result && (res.result.get && res.result.get('text')) ) || res.result?.text || res.result || '';
        const doRun = confirm('Forward transcription as prompt to Prompt→Image and run generation? Click Cancel to just compose for preview.');
        const action = doRun ? 'run' : 'compose';
        // call the Prompt→Image kernel callback and pass the selected template (or 'None')
        const r2 = await google.colab.kernel.invokeFunction('notebook.prompt_to_image', [text, '', '', 'DDIM', 20, -1, false, action, template], {});
        document.getElementById('stt-out').innerText += '\n\nForward result:\n' + JSON.stringify(r2.data['application/json'], null, 2);
      }catch(e){
        document.getElementById('stt-out').innerText += '\n\nFailed to forward to Prompt→Image: ' + e;
      }
    }
  };

  document.getElementById('tts-run').onclick = () => {
    const text = document.getElementById('tts-text').value;
    const engine = document.getElementById('tts-engine').value;
    const voice = document.getElementById('tts-voice').value;
    const out = document.getElementById('tts-out').value;
    google.colab.kernel.invokeFunction('notebook.voice_synthesize', [text, engine, voice, out], {}).then(resp => {
      const d = resp.data['application/json'];
      document.getElementById('tts-out-display').innerText = JSON.stringify(d, null, 2);
    });
  };

  document.getElementById('manage-templates').onclick = async () => {
    try{
      await google.colab.kernel.invokeFunction('notebook.launch_manage_templates_ui', [], {});
    }catch(e){
      alert('Failed to open Manage Templates UI: ' + e);
    }
  };

  // populate templates on load
  populateTemplates();
</script>
'''

display(HTML(html))

# Register callbacks (STT/TTS are already defined above). Add a Prompt->Image kernel callback so the UI can forward text with a template
from pathlib import Path

def voice_transcribe(path_or_url, engine='whisper'):
    try:
        if engine == 'whisper':
            res = whisper_stt(path_or_url)
            return {'engine':'whisper','result': res}
        elif engine == 'whisperx':
            res = stt_with_vad_and_whisperx(path_or_url)
            return {'engine':'whisperx','result': res}
        else:
            return {'error':'unknown engine'}
    except Exception as e:
        return {'error': str(e)}


def voice_synthesize(text, engine='coqui', voice='default', out='/content/out.wav'):
    try:
        p = tts_simple(text, voice=voice, engine=engine, out_path=out)
        return {'engine':engine, 'out': p}
    except Exception as e:
        return {'error': str(e)}

# New: prompt_to_image bridge — accepts transcription, optional template, composes a flow and optionally runs it via COMFYUI API
def prompt_to_image(prompt_text, model_key='', lora='', sampler='DDIM', steps=20, seed=-1, upscaler=False, action='compose', prompt_template=None):
    try:
        # apply template if provided
        pt = prompt_template if prompt_template and prompt_template != 'None' else None
        if pt:
            try:
                prompt = pt.replace('{prompt}', str(prompt_text))
            except Exception:
                prompt = str(prompt_text)
        else:
            prompt = str(prompt_text)
        # Compose a flow JSON using existing composer helper
        path, flow = compose_flow(prompt, model_key=model_key or None, sampler=sampler, steps=steps, seed=seed, lora=(lora or None), upscaler=(upscaler or None), format='comfyui')
        result = {'flow_path': path, 'flow': flow, 'prompt': prompt}
        # If action is 'run', attempt to POST to ComfyUI API if available
        if action == 'run' and globals().get('USE_COMFYUI_API', False):
            try:
                import requests
                base = globals().get('COMFYUI_API_BASE','http://127.0.0.1:8188').rstrip('/')
                headers = globals().get('HEADERS', {})
                # attempt a guarded POST to /api/graph or similar endpoint — many ComfyUI setups provide an endpoint to run flows
                url = base + '/api/flow/run'
                r = requests.post(url, json={'flow': flow}, headers=headers, timeout=60)
                result['api_status'] = r.status_code
                try:
                    result['api_response'] = r.json()
                except Exception:
                    result['api_response'] = r.text[:500]
            except Exception as e:
                result['api_error'] = str(e)
        return result
    except Exception as e:
        return {'error': str(e)}

output.register_callback('notebook.voice_transcribe', voice_transcribe)
output.register_callback('notebook.voice_synthesize', voice_synthesize)
output.register_callback('notebook.prompt_to_image', prompt_to_image)
print('Voice GUI registered (voice_transcribe, voice_synthesize, prompt_to_image)')


In [None]:
# Manage Templates UI + callbacks — persisted under DRIVE_ROOT/config/prompt_templates.json
from IPython.display import display, HTML
from google.colab import output
from pathlib import Path
import json

# Path for templates inside DRIVE_ROOT
TEMPLATES_PATH = Path(globals().get('DRIVE_ROOT', '/content/drive/MyDrive/ComfyUI')) / 'config' / 'prompt_templates.json'

def ensure_templates_file():
    TEMPLATES_PATH.parent.mkdir(parents=True, exist_ok=True)
    if not TEMPLATES_PATH.exists():
        defaults = [
            {"name": "Photorealistic", "template": "Photorealistic photo of {prompt}"},
            {"name": "Cinematic", "template": "Cinematic poster of {prompt}"},
            {"name": "Studio Portrait", "template": "Studio portrait of {prompt}"},
            {"name": "Fantasy", "template": "Fantasy illustration of {prompt}"},
            {"name": "Minimal", "template": "{prompt}"}
        ]
        with open(TEMPLATES_PATH, 'w', encoding='utf-8') as f:
            json.dump(defaults, f, indent=2, ensure_ascii=False)

def get_prompt_templates():
    try:
        ensure_templates_file()
        with open(TEMPLATES_PATH, 'r', encoding='utf-8') as f:
            data = json.load(f)
        return {'templates': data}
    except Exception as e:
        return {'error': str(e)}

def save_prompt_templates(templates):
    try:
        ensure_templates_file()
        # templates expected as a list of dicts
        with open(TEMPLATES_PATH, 'w', encoding='utf-8') as f:
            json.dump(templates, f, indent=2, ensure_ascii=False)
        return {'ok': True}
    except Exception as e:
        return {'error': str(e)}

output.register_callback('notebook.get_prompt_templates', get_prompt_templates)
output.register_callback('notebook.save_prompt_templates', save_prompt_templates)

# Launcher for the Manage Templates HTML UI (so the main Voice GUI can open it)
def launch_manage_templates_ui():
    # Build the UI HTML and register callbacks for save
    html = r'''
<div style="font-family: Roboto, Arial; padding:10px; border:1px solid #ddd; border-radius:6px; width:720px;background:#fff">
  <h3>Manage Prompt Templates</h3>
  <div>
    <table id="tpl-table" style="width:100%; border-collapse:collapse;">
      <thead><tr><th style="text-align:left">Name</th><th style="text-align:left">Template</th><th></th></tr></thead>
      <tbody></tbody>
    </table>
    <div style="margin-top:8px">
      <input id="new-name" placeholder="Name" style="width:20%"/>
      <input id="new-template" placeholder="Template (use {prompt})" style="width:60%"/>
      <button id="add-tpl">Add</button>
      <button id="save-tpl" style="margin-left:12px">Save to Drive</button>
    </div>
    <div id="msg" style="margin-top:8px;color:green"></div>
  </div>
</div>
<script>
  async function loadTemplates(){
    const resp = await google.colab.kernel.invokeFunction('notebook.get_prompt_templates', [], {});
    const data = resp.data['application/json'];
    const tbody = document.querySelector('#tpl-table tbody');
    tbody.innerHTML = '';
    if (!data || !data.templates) return;
    data.templates.forEach((t, idx) => {
      const tr = document.createElement('tr');
      tr.innerHTML = `<td><input class='nm' value='${(t.name||'').replace(/'/g,"\\'")}' style='width:95%'/></td><td><input class='tpl' value='${(t.template||'').replace(/'/g,"\\'")}' style='width:98%'/></td><td><button class='del'>Delete</button></td>`;
      tbody.appendChild(tr);
    });
    Array.from(document.querySelectorAll('.del')).forEach((b,i)=>b.onclick = ()=>{tbody.removeChild(b.closest('tr'))});
  }
  
  document.getElementById('add-tpl').onclick = () => {
    const name = document.getElementById('new-name').value;
    const tpl = document.getElementById('new-template').value;
    if (!tpl) return alert('Template required');
    const tbody = document.querySelector('#tpl-table tbody');
    const tr = document.createElement('tr');
    tr.innerHTML = `<td><input class='nm' value='${name.replace(/'/g,"\\'")}' style='width:95%'/></td><td><input class='tpl' value='${tpl.replace(/'/g,"\\'")}' style='width:98%'/></td><td><button class='del'>Delete</button></td>`;
    tbody.appendChild(tr);
    Array.from(document.querySelectorAll('.del')).forEach((b,i)=>b.onclick = ()=>{tbody.removeChild(b.closest('tr'))});
    document.getElementById('new-name').value=''; document.getElementById('new-template').value='';
  }

  document.getElementById('save-tpl').onclick = async () => {
    const rows = Array.from(document.querySelectorAll('#tpl-table tbody tr'));
    const out = rows.map(r=>({name: r.querySelector('.nm').value, template: r.querySelector('.tpl').value}));
    const resp = await google.colab.kernel.invokeFunction('notebook.save_prompt_templates', [out], {});
    const d = resp.data['application/json'];
    const msg = document.getElementById('msg');
    if (d && d.ok) {
      msg.innerText = 'Saved templates to Drive.';
      // notify main UI to refresh
      try{ await google.colab.kernel.invokeFunction('notebook.get_prompt_templates', [], {}); }catch(e){}
    } else {
      msg.style.color = 'red'; msg.innerText = 'Failed to save: ' + JSON.stringify(d);
    }
  }

  loadTemplates();
</script>
'''
    display(HTML(html))
    return {'ok': True}

output.register_callback('notebook.launch_manage_templates_ui', launch_manage_templates_ui)
print('Manage Templates callbacks registered (get_prompt_templates, save_prompt_templates, launch_manage_templates_ui)')


In [None]:
#@title Heavy-install runner (Tortoise, Bark, Alltalk) — RUN THIS TO PERFORM HEAVY INSTALLS
# WARNING: This cell performs large downloads and installs. Run only in a GPU-enabled Colab runtime with sufficient disk space.
RUN_HEAVY_INSTALL = False  # set to True and re-run cell to execute installs
if not RUN_HEAVY_INSTALL:
    print('RUN_HEAVY_INSTALL is False — set to True to execute heavy installs')
else:
    import subprocess, sys, os, shutil
    def run(cmd, cwd=None):
        print('>',' '.join(cmd))
        p = subprocess.Popen(cmd, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
        for line in p.stdout:
            print(line, end='')
        p.wait()
        if p.returncode != 0:
            raise RuntimeError(f'Command failed: {cmd}')
    # 1) Tortoise TTS
    if not os.path.exists('tortoise-tts'):
        run(['git','clone','https://github.com/neonbjb/tortoise-tts.git'])
    run([sys.executable,'-m','pip','install','-r','tortoise-tts/requirements.txt'])
    # 2) Bark
    run([sys.executable,'-m','pip','install','git+https://github.com/suno-ai/bark.git'])
    # 3) Alltalk TTS (user requested branch alltalkbeta)
    if not os.path.exists('alltalk_tts'):
        run(['git','clone','--branch','alltalkbeta','https://github.com/erew123/alltalk_tts.git','alltalk_tts'])
    # install requirements if present
    req = 'alltalk_tts/requirements.txt'
    if os.path.exists(req):
        run([sys.executable,'-m','pip','install','-r',req])
    # misc utilities
    run([sys.executable,'-m','pip','install','pydub','soundfile','PyPDF2','ebooklib','requests','beautifulsoup4'])
    print('Heavy installs completed. Follow repository READMEs for model downloads and runtime configuration.')

In [None]:
#@title Download XTTS-v2 finetuned voice files from Hugging Face and create xtts.py runner
from pathlib import Path
import os, requests, sys
DR = Path(globals().get('DRIVE_ROOT','/content/drive/MyDrive/ComfyUI')) / 'models' / 'xtts_v2'
DR.mkdir(parents=True, exist_ok=True)
files = ['model.pth','config.json','vocab.json','speakers_xtts.pth']
base = 'https://huggingface.co/drewThomasson/fineTunedTTSModels/resolve/main/xtts-v2/eng/ScarlettJohansson/'
token = os.environ.get('HUGGINGFACE_TOKEN','')
headers = {'Authorization': f'Bearer {token}'} if token else {}
for fn in files:
    url = base + fn
    outp = DR / fn
    if outp.exists():
        print(fn, 'already exists — skipping')
        continue
    print('Downloading', url, '->', outp)
    r = requests.get(url, headers=headers, stream=True, timeout=60)
    if r.status_code == 200:
        with open(outp, 'wb') as fh:
            for chunk in r.iter_content(chunk_size=1024*1024):
                if chunk:
                    fh.write(chunk)
        print('Saved', outp)
    else:
        print('Failed to download', url, 'status', r.status_code)
# create xtts.py runner
script = r"""from TTS.api import TTS

# Load XTTS v2 from local directory
tts = TTS(model_path="xtts_v2/model.pth", config_path="xtts_v2/config.json", progress_bar=True)

# Example synthesis
tts.tts_to_file(
    text="Hello world. This is a test from the finetuned XTTS v2 model. Replace with your text.",
    speaker_wav=["your_voice_sample.wav"],
    language="en",
    file_path="output.wav"
)

print("Synthesis attempted — check output.wav")"""
runner_path = DR / 'xtts.py'
with open(runner_path, 'w', encoding='utf-8') as fh:
    fh.write(script)
print('Created', runner_path)
print('To run: cd', str(DR), 'then: python xtts.py (ensure TTS package is installed and you have a speaker WAV file)')

In [None]:
#@title Alltalk TTS quick-setup notes and test run (post-install)
import os, shutil
print('If you cloned the Alltalk repo via the heavy-installer, it should be in ./alltalk_tts')
if os.path.exists('alltalk_tts'):
    print('Found alltalk_tts folder — list top-level files:')
    print(os.listdir('alltalk_tts')[:50])
    print('
Typical next steps:')
    print('1) Inspect alltalk_tts/README.md for usage and model download links')
    print('2) If the repo provides an examples/ or scripts/ folder, run example synth scripts after installing requirements')
else:
    print('alltalk_tts not present — run the heavy-install cell with RUN_HEAVY_INSTALL=True to clone and install it')

In [None]:
#@title Run lightweight unit tests (composer + voice smoke)
print('Running composer unit test (fast)')
try:
    path, flow = compose_flow('unit test prompt', model_key='test-model', sampler='DDIM', steps=5, seed=123, lora=None, upscaler=None, format='comfyui')
    print('compose_flow produced path:', path)
    assert isinstance(flow, dict) and 'nodes' in flow
    print('Composer minimal check: PASS')
except Exception as e:
    print('Composer minimal check: FAIL', e)

print('Running voice ingestion smoke test')
from pathlib import Path
p = Path('voice_test_tmp2.txt')
p.write_text('Quick voice test for ingestion')
try:
    txt = read_text_source(str(p))
    assert 'Quick voice test' in txt
    print('Text ingestion smoke test: PASS')
except Exception as e:
    print('Text ingestion smoke test: FAIL', e)
p.unlink()

In [None]:
#@title Voice unit tests (quick smoke tests)
"""
Run small smoke tests: test text ingestion, TTS stub (no install), and STT stub (no install).
These tests are light-weight and will not install heavy models.
"""
from pathlib import Path

# Test read_text_source with a small temp file
p = Path('voice_test_tmp.txt')
p.write_text('Hello world. This is a quick test for the voice section.')
try:
    txt = read_text_source(str(p))
    assert 'Hello world' in txt
    print('Text ingestion: PASS')
except Exception as e:
    print('Text ingestion: FAIL', e)

# test composer exists
try:
    assert callable(compose_flow)
    print('compose_flow present: PASS')
except Exception as e:
    print('compose_flow present: FAIL', e)

p.unlink()


In [None]:
#@title Heavy-install helper (Tortoise & Bark) — safe instructions (do not auto-run)
"""
This cell provides safe, copy-paste install steps for Tortoise and Bark. It does not execute anything automatically.
Run these commands manually in a Colab cell if you accept the heavy installs.
"""
from IPython.display import Markdown, display

text = '''
## Heavy install instructions (Tortoise TTS and Bark)

NOTE: These are large installs. Use GPU runtime. Run commands manually in a new notebook cell.

# Tortoise (recommended for expressive audiobooks)
!git clone https://github.com/neonbjb/tortoise-tts.git
%cd tortoise-tts
!pip install -r requirements.txt
# Download Tortoise models as instructed in tortoise README (models can be large >1GB)

# Bark (zero-shot style, from Suno)
!pip install git+https://github.com/suno-ai/bark.git
# Bark will download models on first run; ensure sufficient disk space.

# Additional helpful tools
!pip install pydub soundfile PyPDF2 ebooklib requests beautifulsoup4

Run these commands only if you have GPU runtime and sufficient disk space. '''

print('Heavy-install instructions inserted. Copy the commands from the printed block and run them in a new cell when ready.')
display(Markdown(text))
