In [None]:
import os
import json
import time
from pathlib import Path

import torch
import cv2
import numpy as np
from PIL import Image

from diffusers import (
    StableDiffusionPipeline,
    StableDiffusionControlNetPipeline,
    ControlNetModel,
    StableDiffusionXLPipeline
)


# 기본 설정

MODE = "sd15_controlnet"      # 원하는 모델 선택: "sd15_controlnet" | "sdxl"
OUTPUT_DIR = "outputs"
INPUT_DIR = Path("inputs")

RUN_NAME = f"run_{time.strftime('%Y%m%d_%H%M%S')}"
SAVE_DIR = os.path.join(OUTPUT_DIR, RUN_NAME)

os.makedirs(SAVE_DIR, exist_ok=True)
os.makedirs(os.path.join(SAVE_DIR, "candidates"), exist_ok=True)


# 공용 모델 ID 정의

MODEL_IDS = {
    "sd15": "runwayml/stable-diffusion-v1-5",
    "sdxl": "stabilityai/stable-diffusion-xl-base-1.0",
    "controlnet": "lllyasviel/control_v11p_sd15_canny",
}


pipe = None
controlnet = None


# 파이프라인 언로드 (모델 변경 시 메모리 회수)

def unload_pipeline():
    """모델 파이프라인을 안전하게 언로드"""
    global pipe, controlnet

    pipe = None
    controlnet = None

    torch.cuda.empty_cache()
    torch.cuda.ipc_collect()


# 파이프라인 로더 (MODE 기반)


def load_pipeline(mode: str, use_controlnet: bool, canny_image=None):
    """모델 모드를 기준으로 파이프라인을 로드"""
    global pipe, controlnet

    unload_pipeline()

    if mode == "sd15_controlnet" and use_controlnet:
        print("▶ Loading SD1.5 + ControlNet pipeline")

        controlnet = ControlNetModel.from_pretrained(
            MODEL_IDS["controlnet"],
            torch_dtype=torch.float16
        ).to("cuda")

        pipe = StableDiffusionControlNetPipeline.from_pretrained(
            MODEL_IDS["sd15"],
            controlnet=controlnet,
            torch_dtype=torch.float16
        ).to("cuda")

    elif mode == "sdxl":
        print("▶ Loading SDXL pipeline")

        pipe = StableDiffusionXLPipeline.from_pretrained(
            MODEL_IDS["sdxl"],
            torch_dtype=torch.float16
        ).to("cuda")

    else:
        print("▶ Loading SD1.5 base pipeline")

        pipe = StableDiffusionPipeline.from_pretrained(
            MODEL_IDS["sd15"],
            torch_dtype=torch.float16
        ).to("cuda")

    return pipe


# ControlNet 입력 이미지 탐색 및 Canny 변환

def get_controlnet_image():
    """입력 디렉터리에서 첫 번째 이미지 파일을 찾아 Canny 변환"""
    image_files = sorted([
        *INPUT_DIR.glob("*.jpg"),
        *INPUT_DIR.glob("*.jpeg"),
        *INPUT_DIR.glob("*.png"),
        *INPUT_DIR.glob("*.webp"),
    ])

    if len(image_files) == 0:
        print("ℹ 입력 이미지 없음 — ControlNet 비활성화")
        return None, False

    path = image_files[0]
    print(f"✔ 참조 이미지 사용: {path}")

    img = cv2.imread(str(path))

    if img is None:
        print("이미지를 읽지 못했습니다 — ControlNet 비활성화")
        return None, False

    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (512, 512))

    edges = cv2.Canny(img, 100, 200)
    canny = Image.fromarray(edges)

    print("✔ ControlNet Canny 적용 완료")
    return path, canny


# 프롬프트 정의

prompt = (
    "illustration style, clean poster layout, warm cozy color tone, "
    "korean traditional market dried seafood poster, "
    "korean hangul signage only, authentic korean market mood, "
    "wooden baskets and display table, dried anchovies, dried squid, pollack strips, seaweed, "
    "neatly arranged products, soft textured illustration shading, "
    "natural warm lighting, product-focused composition"
)

negative_prompt = (
    "japanese text, chinese text, kanji, latin letters, english text, "
    "blurry, noisy, messy layout, distorted shapes, watermark, logo"
)


# 생성 파라미터

num_steps = 30
guidance = 5.5
num_images = 3
seed = 42
control_scale = 0.45

generator = torch.Generator("cuda").manual_seed(seed)


# 이미지 생성 + 벤치마킹 계측

input_image_path, canny_img = get_controlnet_image()
use_controlnet = (MODE == "sd15_controlnet" and canny_img is not None)

pipe = load_pipeline(MODE, use_controlnet, canny_img)

print("▶ Generating candidate images...")

start_time = time.time()
torch.cuda.reset_peak_memory_stats()

if use_controlnet:
    images = pipe(
        prompt=prompt,
        negative_prompt=negative_prompt,
        image=canny_img,
        controlnet_conditioning_scale=control_scale,
        num_inference_steps=num_steps,
        guidance_scale=guidance,
        num_images_per_prompt=num_images,
        generator=generator
    ).images
else:
    images = pipe(
        prompt=prompt,
        negative_prompt=negative_prompt,
        num_inference_steps=num_steps,
        guidance_scale=guidance,
        num_images_per_prompt=num_images,
        generator=generator
    ).images

latency_ms = int((time.time() - start_time) * 1000)
vram_peak = round(torch.cuda.max_memory_allocated() / 1024 / 1024, 2)


# 결과 저장

paths = []
for i, img in enumerate(images, start=1):
    path = os.path.join(SAVE_DIR, "candidates", f"candidate_v{i}.png")
    img.save(path)
    paths.append(path)

print("✔ Saved candidates:")
print("\n".join(paths))


# params.json (벤치마킹 로그)

params = {
    "mode": MODE,
    "seed": seed,
    "steps": num_steps,
    "guidance_scale": guidance,
    "controlnet_scale": control_scale if use_controlnet else None,
    "latency_ms": latency_ms,
    "vram_used_mb": vram_peak,
    "input_image": str(input_image_path) if use_controlnet else None,
    "output_path": str(SAVE_DIR),
}

with open(os.path.join(SAVE_DIR, "params.json"), "w") as f:
    json.dump(params, f, indent=2)

print("\n✔ params.json saved")
print("Run folder:", SAVE_DIR)
print(f"▶ latency: {latency_ms} ms   |   vram_peak: {vram_peak} MB")
