<div style="color:rgb(0,0,255);font-size: 40px;font-weight:700;">
MAIN SETTINGS
</div>

In [1]:
###SETTINGS###

import os
import re
import time
import subprocess
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
from getpass import getpass
from urllib.parse import urlencode

# Platform detection
ON_KAGGLE = os.path.exists('/kaggle')
ON_COLAB = 'COLAB_RELEASE_TAG' in os.environ or os.path.exists('/content')
ON_VAST = any(k in os.environ for k in ("VAST_CONTAINERLABEL", "VAST_TCP_PORT_22", "CONTAINER_ID")) or os.path.exists('/workspace')


MAX_PARALLEL_DOWNLOADS = max(1, int(os.environ.get("MAX_PARALLEL_DOWNLOADS", "3")))
MIN_VALID_FILE_BYTES = int(os.environ.get("MIN_VALID_FILE_BYTES", "1000000"))

if shutil.which("aria2c") is None:
    print("aria2c not found → installing...")
    try:
        subprocess.run(["apt", "update", "-qq"], check=True, capture_output=True)
        result = subprocess.run(["apt", "install", "-y", "-qq", "aria2"], capture_output=True, text=True)
        if result.returncode == 0:
            print("aria2c installed successfully")
        else:
            print("Install failed (code {}):".format(result.returncode))
            print("stderr:", result.stderr.strip())
    except subprocess.CalledProcessError as e:
        print(f"apt error (code {e.returncode}): {e.stderr}")
    except Exception as e:
        print(f"Unexpected error: {e}")
else:
    print("aria2c already available")

# Determining the working directory
possible_bases = [
    "/workspace",       # Vast.ai / RunPod
    "/kaggle/working",  # Kaggle
    "/content",         # Google Colab
]

BASE_DIR = None
for path in possible_bases:
    if os.path.isdir(path):
        BASE_DIR = path
        break

if BASE_DIR is None:
    BASE_DIR = os.getcwd()
    print("WARNING: Known directory not found:", BASE_DIR)

print("Working directory:", BASE_DIR)

# Configuration
FORGE_DIR        = os.path.join(BASE_DIR, "stable-diffusion-webui-forge")
MODELS_DIR       = os.path.join(BASE_DIR, "stable-diffusion-webui-forge", "models", "Stable-diffusion")
LORA_DIR         = os.path.join(BASE_DIR, "stable-diffusion-webui-forge", "models", "Lora")
CONTROLNET_DIR   = os.path.join(FORGE_DIR, "extensions", "sd-webui-controlnet")
CONTROLNET_MODELS_DIR = os.path.join(CONTROLNET_DIR, "models")
EXTENSIONS_DIR   = os.path.join(FORGE_DIR, "extensions")
OUTPUTS_DIR      = os.path.join(FORGE_DIR, "outputs")
VOLUME_DIR       = os.path.join(BASE_DIR, "volume")
GEN_DIR          = os.path.join(BASE_DIR, "gen")

for d in [MODELS_DIR, LORA_DIR, CONTROLNET_DIR, CONTROLNET_MODELS_DIR, EXTENSIONS_DIR, OUTPUTS_DIR, VOLUME_DIR, GEN_DIR]:
    os.makedirs(d, exist_ok=True)

# Dependencies used by generation cell
for pkg in ["openpyxl", "requests"]:
    try:
        __import__(pkg)
    except Exception:
        print(f"Installing missing dependency: {pkg}")
        subprocess.run(["python", "-m", "pip", "install", "-q", pkg], check=False)


def get_secret(name: str):
    """Get secret from env/Kaggle/Colab only."""
    value = os.environ.get(name)
    if value:
        return value.strip(), "env"

    # Kaggle secrets
    if ON_KAGGLE:
        try:
            from kaggle_secrets import UserSecretsClient
            value = UserSecretsClient().get_secret(name)
            if value:
                return value.strip(), "kaggle_secrets"
        except Exception:
            pass

    # Colab secrets panel: from google.colab import userdata
    if ON_COLAB:
        try:
            from google.colab import userdata
            value = userdata.get(name)
            if value:
                return value.strip(), "colab_userdata"
        except Exception:
            pass

    return None, None


CIVITAI_TOKEN, CIVITAI_SRC = get_secret("CIVITAI_TOKEN")
HF_TOKEN, HF_SRC = get_secret("HF_TOKEN")

if not CIVITAI_TOKEN:
    manual_civitai = getpass("Enter CIVITAI_TOKEN (leave blank to skip): ").strip()
    if manual_civitai:
        CIVITAI_TOKEN, CIVITAI_SRC = manual_civitai, "manual_input"

if not HF_TOKEN:
    manual_hf = getpass("Enter HF_TOKEN (leave blank to skip): ").strip()
    if manual_hf:
        HF_TOKEN, HF_SRC = manual_hf, "manual_input"

TOKENS = {}
if CIVITAI_TOKEN:
    TOKENS["CIVITAI"] = CIVITAI_TOKEN
if HF_TOKEN:
    TOKENS["HF_TOKEN"] = HF_TOKEN

print("Token sources:")
print(f"  CIVITAI_TOKEN: {CIVITAI_SRC or 'not found'}")
print(f"  HF_TOKEN: {HF_SRC or 'not found'}")
if ON_VAST:
    print("Vast.ai tip: add CIVITAI_TOKEN/HF_TOKEN in template env vars, restart container, then rerun this cell.")

if not CIVITAI_TOKEN:
    print("CivitAI token not found")
if not HF_TOKEN:
    print("HF token not found")
if not TOKENS:
    raise RuntimeError("No tokens were provided. Set secrets or enter at least one token (CivitAI or HF).")


def _prepare_download_url(url, token):
    """CivitAI download works more reliably with token as query param."""
    if token and "civitai.com/api/download/models" in url and "token=" not in url:
        sep = "&" if "?" in url else "?"
        return f"{url}{sep}{urlencode({'token': token})}"
    return url


def _looks_valid_file(path, min_bytes=MIN_VALID_FILE_BYTES):
    return os.path.exists(path) and os.path.getsize(path) > min_bytes


def _human_mb(num_bytes):
    return f"{num_bytes / (1024 * 1024):.1f} MB"


def _estimate_expected_mb(label):
    # Examples: "151 MB", "6,46 GB"
    match = re.search(r"(\d+[\.,]?\d*)\s*(MB|GB)", label, re.IGNORECASE)
    if not match:
        return None
    value = float(match.group(1).replace(',', '.'))
    unit = match.group(2).upper()
    return value * (1024 if unit == "GB" else 1)


def _size_sanity_warning(path, expected_mb, tolerance=0.7):
    if expected_mb is None or not os.path.exists(path):
        return
    actual_mb = os.path.getsize(path) / (1024 * 1024)
    if actual_mb < expected_mb * tolerance:
        print(f"  WARNING: file size looks low ({actual_mb:.1f} MB vs expected ~{expected_mb:.1f} MB)")


def _has_min_free_disk(path, required_mb, reserve_mb=1024):
    if required_mb is None:
        return True
    usage = shutil.disk_usage(path)
    free_mb = usage.free / (1024 * 1024)
    return free_mb >= (required_mb + reserve_mb)


def _download_one(job, target_dir):
    label, url, filename, token_name = job
    token = TOKENS.get(token_name)
    output_path = os.path.join(target_dir, filename)

    expected_mb = _estimate_expected_mb(label)

    if _looks_valid_file(output_path):
        size = os.path.getsize(output_path)
        print(f"[SKIP] {label}: already exists ({_human_mb(size)})")
        _size_sanity_warning(output_path, expected_mb)
        return (label, True, "exists")

    if expected_mb is not None and not _has_min_free_disk(target_dir, expected_mb):
        return (label, False, "insufficient_disk")

    tmp_path = output_path + ".part"
    if os.path.exists(tmp_path):
        os.remove(tmp_path)

    final_url = _prepare_download_url(url, token if token_name == "CIVITAI" else None)

    cmd = [
        "aria2c",
        "--allow-overwrite=true",
        "--auto-file-renaming=false",
        "--continue=true",
        "--max-connection-per-server=16",
        "--split=16",
        "--min-split-size=1M",
        "--console-log-level=warn",
        "--summary-interval=1",
        "--check-certificate=false",
        "--out", os.path.basename(tmp_path),
        "--dir", target_dir,
        final_url,
    ]

    if token_name == "HF_TOKEN" and token:
        cmd.insert(-1, f"--header=Authorization: Bearer {token}")

    print(f"[DOWNLOADING] {label}")
    result = subprocess.run(cmd, text=True, capture_output=True)
    if result.returncode != 0:
        stderr = (result.stderr or "").strip()
        stdout = (result.stdout or "").strip()
        msg = stderr or stdout or f"aria2c exited {result.returncode}"
        if os.path.exists(tmp_path):
            os.remove(tmp_path)
        return (label, False, msg)

    if not _looks_valid_file(tmp_path):
        size = os.path.getsize(tmp_path) if os.path.exists(tmp_path) else 0
        if os.path.exists(tmp_path):
            os.remove(tmp_path)
        return (label, False, f"downloaded file too small ({_human_mb(size)})")

    os.replace(tmp_path, output_path)
    _size_sanity_warning(output_path, expected_mb)
    return (label, True, _human_mb(os.path.getsize(output_path)))


def run_download_list(download_list, target_dir, title):
    print(f"\n=== {title} ===")
    os.makedirs(target_dir, exist_ok=True)

    if not download_list:
        print("No items.")
        return

    workers = min(MAX_PARALLEL_DOWNLOADS, len(download_list))
    print(f"Parallel downloads: {workers}")

    ok = 0
    fail = 0

    with ThreadPoolExecutor(max_workers=workers) as ex:
        futures = [ex.submit(_download_one, job, target_dir) for job in download_list]
        for fut in as_completed(futures):
            label, success, info = fut.result()
            if success:
                ok += 1
                print(f"[OK]   {label} -> {info}")
            else:
                fail += 1
                print(f"[FAIL] {label} -> {info}")

    print(f"Done: OK={ok}, FAIL={fail}")
    if fail > 0:
        raise RuntimeError(f"Some downloads failed in {title}: {fail} item(s)")




aria2c not found → installing...
aria2c installed successfully
Working directory: /content
Token sources:
  CIVITAI_TOKEN: colab_userdata
  HF_TOKEN: colab_userdata


In [2]:
### OPTIONAL: KAGGLE/COLAB FORGE BOOTSTRAP ###

import os
import shutil
import subprocess

if not (ON_KAGGLE or ON_COLAB):
    print("Optional cell: предназначена только для Kaggle/Colab. Текущая платформа пропущена.")
else:
    required_packages = ["git", "python3-venv", "python3-pip"]
    print("Checking/installing platform dependencies...")
    subprocess.run(["apt", "update", "-qq"], check=False)
    subprocess.run(["apt", "install", "-y", "-qq", *required_packages], check=False)

    launch_script = os.path.join(FORGE_DIR, "webui.sh")
    git_head = os.path.join(FORGE_DIR, ".git", "HEAD")
    forge_ready = os.path.isfile(launch_script) and os.path.isfile(git_head)

    if forge_ready:
        print("WebUI Forge already exists and looks valid, skipping clone.")
    else:
        if os.path.isdir(FORGE_DIR):
            print("FORGE_DIR exists but WebUI Forge is incomplete/corrupted. Recreating...")
            shutil.rmtree(FORGE_DIR)

        print("Cloning WebUI Forge...")
        subprocess.run([
            "git", "clone", "https://github.com/lllyasviel/stable-diffusion-webui-forge", FORGE_DIR
        ], check=True)

        if not os.path.isfile(os.path.join(FORGE_DIR, "webui.sh")):
            raise FileNotFoundError("Clone completed but webui.sh not found. Check repository state.")

    print("Optional bootstrap finished.")



Checking/installing platform dependencies...
FORGE_DIR exists but WebUI Forge is incomplete/corrupted. Recreating...
Cloning WebUI Forge...
Optional bootstrap finished.


In [None]:
### RUN WEBUI (PURE PYTHON) ###

import os
import subprocess
from pathlib import Path

forge_dir = Path(FORGE_DIR)
if not forge_dir.exists():
    raise FileNotFoundError(f"FORGE_DIR не найден: {forge_dir}")

launch_utils_path = forge_dir / "modules" / "launch_utils.py"
if launch_utils_path.exists():
    content = launch_utils_path.read_text(encoding="utf-8")
    old = 'run_pip(f"install {clip_package}", "clip")'
    new = 'run_pip(f"install --no-build-isolation --no-use-pep517 {clip_package}", "clip")'
    if old in content and new not in content:
        launch_utils_path.write_text(content.replace(old, new), encoding="utf-8")
        print("Patched CLIP install command (no-build-isolation + no-use-pep517).")
    else:
        print("CLIP install patch already applied or not required.")
else:
    print(f"Warning: {launch_utils_path} not found, skipping CLIP patch.")

cmd = ["bash", "webui.sh", "-f", "--xformers", "--api", "--port", "17860"]
print("Running:", " ".join(cmd), "in", forge_dir)
subprocess.run(cmd, cwd=forge_dir, check=True)

