In [2]:
!pip install torch torchvision diffusers transformers accelerate safetensors pillow



In [3]:
!pip install fastapi uvicorn pyngrok nest-asyncio



In [4]:
!pip install gradio



In [5]:
!pip install langdetect python-dotenv



In [6]:
import os
from pyngrok import ngrok
from fastapi import FastAPI
from pydantic import BaseModel
from langdetect import detect
from starlette.responses import FileResponse
import torch, gc
from datetime import datetime
from transformers import AutoTokenizer, AutoModelForCausalLM
from diffusers import StableDiffusionPipeline
from PIL import Image
import nest_asyncio
import asyncio
import uvicorn
from fastapi.responses import HTMLResponse
from accelerate import init_empty_weights, load_checkpoint_and_dispatch
import time
import logging

# 로깅 설정
logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger("Generate_Image")


In [None]:
# 환경 변수 설정

os.environ['HF_TOKEN'] = 'Llama-3.2-3B-Instruct 토큰'
os.environ['MODEL_ID_LLAMA'] = 'meta-llama/Llama-3.2-3B-Instruct'
os.environ['MODEL_ID_SD'] = 'runwayml/stable-diffusion-v1-5'
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

HF_TOKEN = os.environ['HF_TOKEN']
MODEL_ID_LLAMA = os.environ['MODEL_ID_LLAMA']
MODEL_ID_SD = os.environ['MODEL_ID_SD']
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

NGROK_AUTH_TOKEN = "Grok 토큰"
ngrok.set_auth_token(NGROK_AUTH_TOKEN)


In [8]:
# 유틸 함수
def clear_memory():
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

def get_env(name, default=None):
    return os.getenv(name, default)

def wait_for_gpu_memory(required_mb=1000, check_interval=5):
    """GPU에 required_mb 만큼의 빈 공간이 생길 때까지 대기"""
    if not torch.cuda.is_available():
        return
    while True:
        free_mem = torch.cuda.mem_get_info()[0] / (1024 ** 2)  # MB 단위
        if free_mem >= required_mb:
            break
        print(f"[GPU 메모리 부족] 현재 여유: {free_mem:.2f} MB, {check_interval}s 후 재시도")
        time.sleep(check_interval)

In [9]:
class LlamaTranslator:
    def __init__(self, model_id, hf_token, offload_folder="./llama_offload"):
        self.model_id = model_id
        self.hf_token = hf_token
        self.offload_folder = offload_folder

        clear_memory()
        if DEVICE == "cuda":
            wait_for_gpu_memory(required_mb=1000)

        # 토크나이저 로드
        self.tokenizer = AutoTokenizer.from_pretrained(model_id, token=hf_token)
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token

        # 모델 로드
        device_map = "auto" if DEVICE == "cuda" else {"": "cpu"}
        try:
            self.model = AutoModelForCausalLM.from_pretrained(
                model_id,
                token=hf_token,
                device_map=device_map,
                offload_folder=self.offload_folder,
                torch_dtype=torch.float16 if DEVICE == "cuda" else torch.float32,
                trust_remote_code=True
            )
        except RuntimeError:
            logger.warning("[경고] GPU 메모리 부족, CPU 오프로드로 재시도...")
            self.model = AutoModelForCausalLM.from_pretrained(
                model_id,
                token=hf_token,
                device_map={"": "cpu"},
                offload_folder=self.offload_folder,
                torch_dtype=torch.float16,
                trust_remote_code=True
            )
        logger.info("[LlamaTranslator] 모델 로드 완료")

    def generate(self, system_message, user_message, max_new_tokens=256):
        messages = [
            {"role": "system", "content": system_message},
            {"role": "user", "content": user_message}
        ]

        inputs = self.tokenizer.apply_chat_template(
            messages, add_generation_prompt=True, return_tensors="pt"
        )

        # 타입 체크
        if isinstance(inputs, dict):
            logger.debug(f"inputs keys: {list(inputs.keys())}")
            input_ids = inputs["input_ids"]
            attention_mask = inputs["attention_mask"]
            logger.debug(f"input_ids shape: {input_ids.shape}, attention_mask shape: {attention_mask.shape}")
        elif isinstance(inputs, torch.Tensor):
            logger.debug(f"inputs is a tensor with shape: {inputs.shape}")
            input_ids = inputs
            attention_mask = torch.ones_like(input_ids)
        else:
            raise TypeError(f"Unexpected type from tokenizer.apply_chat_template: {type(inputs)}")

        # batch 차원
        if input_ids.dim() == 1:
            input_ids = input_ids.unsqueeze(0)
        if attention_mask.dim() == 1:
            attention_mask = attention_mask.unsqueeze(0)

        input_ids = input_ids.to(self.model.device)
        attention_mask = attention_mask.to(self.model.device)

        # 종료 토큰
        eot_id = self.tokenizer.convert_tokens_to_ids("<|eot_id|>")
        terminators = [self.tokenizer.eos_token_id]
        if eot_id is not None:
            terminators.append(eot_id)

        outputs = self.model.generate(
            input_ids=input_ids,
            attention_mask=attention_mask,
            max_new_tokens=max_new_tokens,
            eos_token_id=tuple(terminators) if len(terminators) > 1 else terminators[0],
            pad_token_id=self.tokenizer.pad_token_id,
            do_sample=True,
            temperature=0.6,
            top_p=0.9
        )

        # 2D/1D 안전 처리
        if outputs.dim() == 2:
            output_ids = outputs[0, input_ids.shape[1]:]
        else:
            output_ids = outputs[input_ids.shape[1]:]

        decoded = self.tokenizer.decode(output_ids, skip_special_tokens=True)
        logger.debug(f"decoded output (first 200 chars): {decoded[:200]}")
        return decoded


In [10]:
class ImageGenerator:
    def __init__(self):
        clear_memory()
        device_map = "cuda" if torch.cuda.is_available() else None
        self.pipe = StableDiffusionPipeline.from_pretrained(
            MODEL_ID_SD,
            dtype=torch.float16,
            safety_checker=None,
            device_map=device_map,
            token=HF_TOKEN
        )
        self.pipe.enable_attention_slicing()
        self.pipe.enable_vae_slicing()
        if not torch.cuda.is_available():
            self.pipe.enable_model_cpu_offload()

    def generate_image(self, prompt, height=512, width=512, guidance_scale=8):
        device = "cuda" if torch.cuda.is_available() else "cpu"
        with torch.autocast(device):
            result = self.pipe(prompt, height=height, width=width, guidance_scale=guidance_scale)
        image = result.images[0]
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        file_name = f"generated_image_{timestamp}.jpg"
        image.save(file_name)
        logger.debug(f"Image saved to {file_name}")
        return image, file_name


In [11]:
app = FastAPI(title="AI Image Generator API", version="1.0")
translator = LlamaTranslator(model_id=MODEL_ID_LLAMA, hf_token=HF_TOKEN)
generator = ImageGenerator()

class PromptRequest(BaseModel):
    prompt_text: str

@app.get("/generate", response_class=HTMLResponse)
def generate_form():
    html_content = """
    <html>
        <head><title>AI Image Generator</title></head>
        <body>
            <h2>AI Image Generator</h2>
            <form id="generate-form">
                <label>Prompt:</label><br>
                <input type="text" id="prompt_text" size="50"><br><br>
                <button type="button" onclick="submitPrompt()">Generate Image</button>
            </form>
            <h3>Result:</h3>
            <div id="result"></div>
            <script>
                async function submitPrompt() {
                    const prompt = document.getElementById("prompt_text").value;
                    const response = await fetch("/api/generate", {
                        method: "POST",
                        headers: {"Content-Type": "application/json"},
                        body: JSON.stringify({prompt_text: prompt})
                    });
                    const data = await response.json();
                    document.getElementById("result").innerHTML =
                        "<b>Translated Prompt:</b> " + data.translated_prompt + "<br>" +
                        "<b>Image Path:</b> <a href='" + data.image_path + "' target='_blank'>View Image</a>";
                }
            </script>
        </body>
    </html>
    """
    return html_content

@app.post("/api/generate")
def generate_image_api(req: PromptRequest):
    prompt_text = req.prompt_text
    try:
        lang = detect(prompt_text)
    except Exception:
        lang = "unknown"
    logger.debug(f"Original prompt: {prompt_text}, detected language: {lang}")

    if lang == "ko":
      system_message = (
          "You are a professional prompt engineer and creative translator. "
          "Translate Korean descriptions into vivid English prompts suitable for Stable Diffusion."
      )
    else:
        system_message = (
            "You are a professional Stable Diffusion prompt engineer. "
            "Enhance English prompts into cinematic, visually rich versions suitable for high-quality wallpapers."
        )


    translated_prompt = translator.generate(system_message, prompt_text)
    logger.debug(f"Translated prompt: {translated_prompt[:200]}")  # 200자만 로그

    image, file_path = generator.generate_image(translated_prompt)

    return {
        "original_prompt": prompt_text,
        "translated_prompt": translated_prompt,
        "image_path": file_path
    }

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
`torch_dtype` is deprecated! Use `dtype` instead!


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Keyword arguments {'dtype': torch.float16} are not expected by StableDiffusionPipeline and will be ignored.


Loading pipeline components...:   0%|          | 0/6 [00:00<?, ?it/s]

You have disabled the safety checker for <class 'diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline'> by passing `safety_checker=None`. Ensure that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered results in services or applications open to the public. Both the diffusers team and Hugging Face strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling it only for use-cases that involve analyzing network behavior or auditing its results. For more information, please have a look at https://github.com/huggingface/diffusers/pull/254 .


In [12]:
public_url = ngrok.connect(8000)
print("Public URL:", public_url)

Public URL: NgrokTunnel: "https://rompish-statuesquely-mina.ngrok-free.dev" -> "http://localhost:8000"


In [13]:
config = uvicorn.Config(app, host="0.0.0.0", port=8000, reload=True, log_level="debug")
server = uvicorn.Server(config)

In [None]:
nest_asyncio.apply()
asyncio.get_event_loop().run_until_complete(server.serve())

INFO:     Started server process [29482]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)


INFO:     59.6.82.140:0 - "GET / HTTP/1.1" 404 Not Found
INFO:     59.6.82.140:0 - "GET /generate HTTP/1.1" 200 OK
