In [None]:
# @title LOM Cushion LoRA Trainer 🧠

# @markdown 먼저 필요한 환경을 설정합니다.
# @markdown **실행 버튼을 누르세요!**

import os
import sys
import subprocess
from IPython.display import HTML, display

# 깃허브 저장소에서 파일 가져오기 설정
github_repo = "sun2141/lom-cushion-lora"  # @param {type:"string"}

# 환경 확인 및 설정
print("🔍 환경 확인 중...")

# CUDA 사용 가능 여부 확인
import torch
print(f"CUDA 사용 가능: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA 버전: {torch.version.cuda}")
    print(f"현재 CUDA 장치: {torch.cuda.current_device()}")
    print(f"장치 이름: {torch.cuda.get_device_name()}")

# 저장소 클론
print("\n📥 깃허브 저장소 클론 중...")
!git clone https://github.com/{github_repo}.git lom_cushion_repo
%cd lom_cushion_repo

# 메모리 관리 및 캐시 정리
torch.cuda.empty_cache()
import gc
gc.collect()

# 필요한 패키지 설치
print("\n📦 필수 패키지 설치 중...")
!pip install -q -r requirements.txt

# 가끔 특정 버전 충돌이 발생할 수 있어 numpy와 pandas 재설치
print("\n🔄 호환성 문제 해결을 위한 패키지 재설치 중...")
!pip install -q numpy>=1.26.4 --no-deps
!pip install -q pandas==2.0.3 --no-deps

print("\n✅ 환경 설정 완료!")

# Google Drive 연결 (모델 저장용)
# @markdown 학습된 모델을 저장할 Google Drive를 연결하시겠습니까?
connect_drive = True  # @param {type:"boolean"}

if connect_drive:
    from google.colab import drive
    drive.mount('/content/drive')
    save_directory = "/content/drive/MyDrive/LOM_Cushion_LoRA"
    !mkdir -p {save_directory}
    print(f"✅ Google Drive 연결 완료! 저장 경로: {save_directory}")
else:
    save_directory = "/content/lom_cushion_output"
    !mkdir -p {save_directory}
    print(f"📁 로컬 저장 경로: {save_directory}")

# 세션 연결 유지 설정
from google.colab import output
output.enable_custom_widget_manager()

# @title 데이터셋 다운로드 및 준비 🖼️

# @markdown Hugging Face에서 데이터셋을 다운로드합니다.
dataset_name = "sun2141/lom-cushion-images"  # @param {type:"string"}

print("📥 데이터셋 다운로드 중...")
!huggingface-cli login --token hf_dummy_token_for_script_execution
# Hugging Face 데이터셋 라이브러리 로드
from datasets import load_dataset

# 데이터셋 다운로드
try:
    dataset = load_dataset(dataset_name)
    print(f"✅ 데이터셋 '{dataset_name}' 다운로드 완료!")
except Exception as e:
    print(f"❌ 데이터셋 다운로드 실패: {e}")
    print("대신 로컬 이미지를 사용합니다.")
    
    # 로컬 이미지 폴더 확인
    if not os.path.exists("images"):
        print("⚠️ 로컬 이미지 폴더가 없습니다. 폴더 생성 중...")
        !mkdir -p images
        print("images 폴더를 생성했습니다. 여기에 학습 이미지를 추가해주세요.")
    else:
        print("📁 로컬 이미지 폴더 확인 완료!")
        image_count = len([f for f in os.listdir("images") if os.path.isfile(os.path.join("images", f))])
        print(f"📊 이미지 파일 {image_count}개를 발견했습니다.")

# 캡션 파일 확인
print("\n📝 캡션 파일 확인 중...")

prompts_dir = "prompts"
if not os.path.exists(prompts_dir):
    !mkdir -p {prompts_dir}
    print(f"📁 {prompts_dir} 폴더를 생성했습니다.")

caption_file = os.path.join(prompts_dir, "image_caption.csv")
if not os.path.exists(caption_file):
    print(f"⚠️ 캡션 파일 {caption_file}이 없습니다.")
    
    # 이미 제공된 캡션 내용을 저장
    with open(caption_file, "w") as f:
        f.write("filename,prompt\n")
        # 캡션 파일 내용 추가
        with open("../image_caption.csv", "r") as source:
            next(source)  # 헤더 건너뛰기
            for line in source:
                f.write(line)
    print(f"✅ {caption_file} 파일을 생성했습니다.")
else:
    print(f"✅ 캡션 파일 {caption_file}을 찾았습니다.")

import pandas as pd
try:
    captions_df = pd.read_csv(caption_file)
    print(f"📊 캡션 파일 정보: {len(captions_df)}개의 항목이 있습니다.")
except Exception as e:
    print(f"❌ 캡션 파일 읽기 실패: {e}")

# @title 모델 설정 및 학습 파라미터 ⚙️

# @markdown 기본 모델과 학습 설정을 구성합니다.

# 기본 모델 설정
pretrained_model = "runwayml/stable-diffusion-v1-5"  # @param {type:"string"}
resolution = 512  # @param {type:"integer"}
batch_size = 1  # @param {type:"integer"}
num_train_epochs = 10  # @param {type:"integer"}
learning_rate = 1e-4  # @param {type:"number"}
lora_r = 4  # @param {type:"integer"}
lora_alpha = 32  # @param {type:"integer"}
gradient_accumulation_steps = 4  # @param {type:"integer"}
mixed_precision = "fp16"  # @param ["no", "fp16", "bf16"]
train_text_encoder = True  # @param {type:"boolean"}
checkpointing_steps = 500  # @param {type:"integer"}

print("⚙️ 학습 설정:")
print(f"- 기본 모델: {pretrained_model}")
print(f"- 해상도: {resolution} x {resolution}")
print(f"- 배치 크기: {batch_size}")
print(f"- 학습 에폭: {num_train_epochs}")
print(f"- 학습률: {learning_rate}")
print(f"- LoRA 랭크 (r): {lora_r}")
print(f"- LoRA 알파: {lora_alpha}")
print(f"- 그래디언트 누적 단계: {gradient_accumulation_steps}")
print(f"- 혼합 정밀도: {mixed_precision}")
print(f"- 텍스트 인코더 학습: {train_text_encoder}")
print(f"- 체크포인트 저장 단계: {checkpointing_steps}")

# @title 학습 파이프라인 구성 및 실행 🚀

# @markdown 학습을 시작하려면 실행 버튼을 누르세요.

print("🔧 학습 파이프라인 구성 중...")

import torch
from pathlib import Path
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from diffusers import StableDiffusionPipeline, DDPMScheduler
from diffusers.loaders import AttnProcsLayers
from diffusers.models.attention_processor import LoRAAttnProcessor
from diffusers.optimization import get_scheduler
from peft import LoraConfig, get_peft_model
from transformers import CLIPTextModel, CLIPTokenizer

# 데이터셋 준비 함수
def prepare_dataset(image_folder, caption_file):
    """이미지 폴더와 캡션 파일을 기반으로 데이터셋을 준비합니다."""
    import pandas as pd
    from torch.utils.data import Dataset
    from PIL import Image
    
    class CustomDataset(Dataset):
        def __init__(self, image_folder, captions_df, tokenizer, size=512):
            self.image_folder = image_folder
            self.captions_df = captions_df
            self.tokenizer = tokenizer
            self.size = size
            
            # 존재하는 이미지 파일만 필터링
            valid_files = []
            for idx, row in self.captions_df.iterrows():
                file_path = os.path.join(image_folder, row['filename'])
                if os.path.exists(file_path):
                    valid_files.append(idx)
            
            self.valid_indices = valid_files
            print(f"유효한 이미지 파일: {len(self.valid_indices)}/{len(self.captions_df)}")
            
        def __len__(self):
            return len(self.valid_indices)
        
        def __getitem__(self, idx):
            idx = self.valid_indices[idx]
            row = self.captions_df.iloc[idx]
            
            image_path = os.path.join(self.image_folder, row['filename'])
            prompt = row['prompt']
            
            # 이미지 로드 및 전처리
            image = Image.open(image_path).convert('RGB')
            # 이미지 크기 조정
            if image.width != self.size or image.height != self.size:
                image = image.resize((self.size, self.size), Image.LANCZOS)
            
            # 토큰화
            text_inputs = self.tokenizer(
                prompt,
                padding="max_length",
                max_length=self.tokenizer.model_max_length,
                truncation=True,
                return_tensors="pt",
            )
            
            # 이미지를 텐서로 변환
            image = (torch.from_numpy(np.array(image)) / 255.0) * 2.0 - 1.0
            image = image.permute(2, 0, 1).float()
            
            return {
                "pixel_values": image,
                "input_ids": text_inputs.input_ids[0],
                "attention_mask": text_inputs.attention_mask[0],
            }
    
    captions_df = pd.read_csv(caption_file)
    print(f"캡션 파일에서 {len(captions_df)}개의 항목을 로드했습니다.")
    
    return captions_df, CustomDataset

# 모델 준비 함수
def prepare_model(pretrained_model, lora_r, lora_alpha, train_text_encoder):
    """모델을 로드하고 LoRA 설정으로 준비합니다."""
    # 파이프라인 로드
    pipeline = StableDiffusionPipeline.from_pretrained(
        pretrained_model,
        torch_dtype=torch.float16 if mixed_precision == "fp16" else torch.float32
    )
    
    # 텍스트 인코더
    text_encoder = pipeline.text_encoder
    tokenizer = pipeline.tokenizer
    
    # U-Net
    unet = pipeline.unet
    
    # LoRA 구성
    if lora_r > 0:
        # U-Net에 LoRA 적용
        unet_lora_config = LoraConfig(
            r=lora_r,
            lora_alpha=lora_alpha,
            target_modules=["to_q", "to_k", "to_v", "to_out.0"],
            init_lora_weights="gaussian",
        )
        unet = get_peft_model(unet, unet_lora_config)
        unet.print_trainable_parameters()
        
        # 텍스트 인코더에도 LoRA 적용 (옵션)
        if train_text_encoder:
            text_encoder_lora_config = LoraConfig(
                r=lora_r,
                lora_alpha=lora_alpha,
                target_modules=["q_proj", "k_proj", "v_proj", "out_proj"],
                init_lora_weights="gaussian",
            )
            text_encoder = get_peft_model(text_encoder, text_encoder_lora_config)
            text_encoder.print_trainable_parameters()
    
    return pipeline, unet, text_encoder, tokenizer

# 학습 함수
def train_lora(unet, text_encoder, dataset, tokenizer, train_text_encoder, learning_rate, num_train_epochs, gradient_accumulation_steps):
    """모델 학습을 실행합니다."""
    # 학습 매개변수 설정
    train_dataloader = torch.utils.data.DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=0,
    )
    
    # 옵티마이저 설정
    params_to_optimize = [
        {"params": unet.parameters(), "lr": learning_rate},
    ]
    
    if train_text_encoder:
        params_to_optimize.append(
            {"params": text_encoder.parameters(), "lr": learning_rate},
        )
    
    optimizer = torch.optim.AdamW(
        params_to_optimize,
        lr=learning_rate,
        betas=(0.9, 0.999),
        weight_decay=1e-2,
        eps=1e-8,
    )
    
    # 학습 스케줄러
    lr_scheduler = get_scheduler(
        "cosine",
        optimizer=optimizer,
        num_warmup_steps=0,
        num_training_steps=len(train_dataloader) * num_train_epochs // gradient_accumulation_steps,
    )
    
    # 노이즈 스케줄러
    noise_scheduler = DDPMScheduler.from_pretrained(pretrained_model, subfolder="scheduler")
    
    # 진행상황 추적을 위한 변수
    global_step = 0
    losses = []
    
    # 텍스트 인코더와 U-Net을 학습 모드로 설정
    if train_text_encoder:
        text_encoder.train()
    unet.train()
    
    # 학습 루프
    progress_bar = tqdm(range(num_train_epochs * len(train_dataloader)), desc="Training")
    
    for epoch in range(num_train_epochs):
        for step, batch in enumerate(train_dataloader):
            # 배치 데이터 준비
            pixel_values = batch["pixel_values"].to("cuda", non_blocking=True)
            input_ids = batch["input_ids"].to("cuda", non_blocking=True)
            
            # 텍스트 임베딩
            with torch.no_grad():
                if train_text_encoder:
                    encoder_hidden_states = text_encoder(input_ids)[0]
                else:
                    encoder_hidden_states = text_encoder(input_ids, output_hidden_states=True).hidden_states[-1]

            # 노이즈 추가 및 잠재변수 인코딩
            latents = pipeline.vae.encode(pixel_values).latent_dist.sample()
            latents = latents * 0.18215
            
            # 노이즈 샘플링
            noise = torch.randn_like(latents)
            bsz = latents.shape[0]
            timesteps = torch.randint(0, noise_scheduler.config.num_train_timesteps, (bsz,), device=latents.device)
            timesteps = timesteps.long()
            
            # 노이즈 추가
            noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps)
            
            # U-Net 예측
            noise_pred = unet(noisy_latents, timesteps, encoder_hidden_states).sample
            
            # 손실 계산
            loss = torch.nn.functional.mse_loss(noise_pred, noise, reduction="mean")
            
            # 손실 역전파
            loss = loss / gradient_accumulation_steps
            loss.backward()
            
            # 그래디언트 누적 및 업데이트
            if (step + 1) % gradient_accumulation_steps == 0:
                optimizer.step()
                lr_scheduler.step()
                optimizer.zero_grad()
                
                # 손실 기록
                losses.append(loss.detach().item() * gradient_accumulation_steps)
                
                # 진행상황 업데이트
                progress_bar.update(1)
                progress_bar.set_postfix({"loss": losses[-1], "epoch": epoch + 1})
                
                global_step += 1
                
                # 체크포인트 저장
                if global_step % checkpointing_steps == 0:
                    save_checkpoint(unet, text_encoder, tokenizer, epoch, global_step)
            
    # 최종 모델 저장
    save_checkpoint(unet, text_encoder, tokenizer, num_train_epochs - 1, global_step, is_final=True)
    
    # 손실 그래프 그리기
    plt.figure(figsize=(10, 5))
    plt.plot(losses)
    plt.title("Training Loss")
    plt.xlabel("Steps")
    plt.ylabel("Loss")
    plt.savefig(f"{save_directory}/training_loss.png")
    plt.show()
    
    return losses

# 체크포인트 저장 함수
def save_checkpoint(unet, text_encoder, tokenizer, epoch, global_step, is_final=False):
    """모델 체크포인트를 저장합니다."""
    if is_final:
        checkpoint_dir = f"{save_directory}/final_model"
    else:
        checkpoint_dir = f"{save_directory}/checkpoint-{global_step}"
    
    os.makedirs(checkpoint_dir, exist_ok=True)
    
    # U-Net 상태 저장
    unet.save_pretrained(checkpoint_dir)
    
    # 텍스트 인코더도 저장 (학습된 경우)
    if hasattr(text_encoder, "save_pretrained"):
        text_encoder.save_pretrained(checkpoint_dir)
    
    # 토크나이저 저장
    tokenizer.save_pretrained(checkpoint_dir)
    
    print(f"체크포인트 저장 완료: {checkpoint_dir}")

# 이미지 생성 함수
def generate_sample_images(pipeline, prompt_examples=None, num_samples=4, guidance_scale=7.5):
    """모델로 샘플 이미지를 생성합니다."""
    if prompt_examples is None:
        # 기본 프롬프트
        prompts = [
            "A cushion supporting arms while driving, realistic photo",
            "A purple cushion with emerald pocket, product photo",
            "A person using cushion while working on laptop, office setting",
            "A cushion being used in car backseat, smartphone in pocket"
        ]
    else:
        # 지정된 프롬프트 사용
        prompts = prompt_examples[:num_samples]
    
    # 이미지 생성
    pipeline.to("cuda")
    pipeline.enable_attention_slicing()
    
    fig, axs = plt.subplots(1, len(prompts), figsize=(16, 4))
    
    for i, prompt in enumerate(prompts):
        image = pipeline(prompt, num_inference_steps=30, guidance_scale=guidance_scale).images[0]
        if len(prompts) > 1:
            axs[i].imshow(image)
            axs[i].set_title(prompt[:20] + "...", fontsize=10)
            axs[i].axis("off")
        else:
            axs.imshow(image)
            axs.set_title(prompt[:20] + "...", fontsize=10)
            axs.axis("off")
    
    plt.tight_layout()
    plt.savefig(f"{save_directory}/sample_generated_images.png")
    plt.show()

# Gradio 인터페이스 생성 함수
def create_gradio_interface(pipeline):
    """Gradio 웹 인터페이스를 생성합니다."""
    import gradio as gr
    
    def generate_image(prompt, guidance_scale=7.5, steps=30):
        """이미지 생성 함수"""
        image = pipeline(
            prompt,
            num_inference_steps=steps,
            guidance_scale=guidance_scale
        ).images[0]
        return image
    
    # 예시 프롬프트 로드
    example_prompts = []
    example_file = os.path.join("prompts", "example_prompts.csv")
    if os.path.exists(example_file):
        import pandas as pd
        df = pd.read_csv(example_file)
        example_prompts = df["prompt"].tolist()
    
    # Gradio 인터페이스 구성
    with gr.Blocks() as demo:
        gr.Markdown("# LOM Cushion 이미지 생성기")
        
        with gr.Row():
            with gr.Column():
                prompt = gr.Textbox(label="프롬프트", placeholder="이미지를 설명하는 텍스트를 입력하세요...")
                guidance = gr.Slider(minimum=1.0, maximum=15.0, value=7.5, step=0.5, label="Guidance Scale")
                steps = gr.Slider(minimum=10, maximum=100, value=30, step=1, label="생성 단계")
                btn = gr.Button("이미지 생성하기")
            
            with gr.Column():
                output = gr.Image(label="생성된 이미지")
        
        # 예시 추가
        if example_prompts:
            gr.Examples(
                examples=[[p] for p in example_prompts[:10]],
                inputs=[prompt],
                outputs=output,
                fn=lambda p: generate_image(p),
                cache_examples=True
            )
        
        # 이벤트 연결
        btn.click(fn=generate_image, inputs=[prompt, guidance, steps], outputs=output)
    
    # 인터페이스 실행
    demo.launch(share=True)

# 학습 실행
import numpy as np
import importlib

try:
    print("🚀 학습 시작...")

    # 이미지 폴더와 캡션 파일 경로 설정
    image_folder = "images"
    caption_file = os.path.join("prompts", "image_caption.csv")

    # 모델 및 토크나이저 로드
    pipeline, unet, text_encoder, tokenizer = prepare_model(
        pretrained_model, lora_r, lora_alpha, train_text_encoder
    )

    # 데이터셋 준비
    captions_df, CustomDataset = prepare_dataset(image_folder, caption_file)
    dataset = CustomDataset(image_folder, captions_df, tokenizer, size=resolution)

    # 학습 실행
    unet.to("cuda")
    if train_text_encoder:
        text_encoder.to("cuda")

    # 학습 프로세스 실행
    losses = train_lora(
        unet, text_encoder, dataset, tokenizer, 
        train_text_encoder, learning_rate, 
        num_train_epochs, gradient_accumulation_steps
    )

    # 저장된 모델로 파이프라인 업데이트
    pipeline.unet = unet.to("cuda")
    if train_text_encoder:
        pipeline.text_encoder = text_encoder.to("cuda")

    # 샘플 이미지 생성
    print("\n🖼️ 샘플 이미지 생성 중...")
    
    # 예시 프롬프트 로드
    example_prompts = []
    example_file = os.path.join("prompts", "example_prompts.csv")
    if os.path.exists(example_file):
        example_df = pd.read_csv(example_file)
        example_prompts = example_df["prompt"].tolist()
    
    generate_sample_images(pipeline, prompt_examples=example_prompts)

    # Gradio 인터페이스 시작
    print("\n🌐 Gradio 인터페이스 시작...")
    create_gradio_interface(pipeline)

    print("\n✅ 모든 작업이 완료되었습니다!")

except Exception as e:
    print(f"\n❌ 오류 발생: {e}")
    import traceback
    traceback.print_exc()
    
    # 시스템 정보 추가 출력
    print("\n📊 시스템 정보:")
    print(f"CUDA 사용 가능: {torch.cuda.is_available()}")
    if torch.cuda.is_available():
        print(f"현재 CUDA 메모리 사용량: {torch.cuda.memory_allocated() / 1024**2:.2f} MB")
        print(f"최대 CUDA 메모리 사용량: {torch.cuda.max_memory_allocated() / 1024**2:.2f} MB")

# @title Hugging Face 모델 허브에 모델 업로드 🚀

# @markdown 학습이 완료된 후 모델을 Hugging Face에 업로드합니다.

# Hugging Face 업로드 설정
upload_to_hub = False  # @param {type:"boolean"}
hf_model_name = "lom-cushion-lora"  # @param {type:"string"}
hf_token = ""  # @param {type:"string"}

if upload_to_hub:
    from huggingface_hub import login, HfApi
    
    # 토큰이 비어있지 않은지 확인
    if not hf_token:
        print("❌ Hugging Face 토큰이 비어있습니다. 토큰을 입력해주세요.")
    else:
        try:
            # Hugging Face 로그인
            login(token=hf_token, add_to_git_credential=True)
            
            # 최종 모델 경로
            model_path = f"{save_directory}/final_model"
            
            # 모델 설명 생성
            model_card = f"""
            # LOM Cushion LoRA 모델
            
            이 모델은 Stable Diffusion v1.5를 기반으로 쿠션 이미지 생성에 특화된 LoRA 모델입니다.
            
            ## 모델 정보
            - 기본 모델: {pretrained_model}
            - LoRA 랭크 (r): {lora_r}
            - LoRA 알파: {lora_alpha}
            - 학습 에폭: {num_train_epochs}
            
            ## 사용 방법
            ```python
            from diffusers import StableDiffusionPipeline
            import torch
            
            model_id = "{hf_model_name}"
            pipe = StableDiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
            pipe.unet.load_attn_procs(model_id)
            pipe = pipe.to("cuda")
            
            prompt = "A cushion supporting arms while driving, realistic photo"
            image = pipe(prompt).images[0]
            image.save("cushion.png")
            ```
            """
            
            # README.md 파일 생성
            with open(f"{model_path}/README.md", "w") as f:
                f.write(model_card)
            
            # 모델 업로드
            print(f"📤 모델을 Hugging Face Hub에 업로드 중: {hf_model_name}")
            api = HfApi()
            api.create_repo(repo_id=hf_model_name, exist_ok=True)
            api.upload_folder(
                folder_path=model_path,
                repo_id=hf_model_name,
                commit_message="Initial model upload"
            )
            print(f"✅ 모델 업로드 완료: https://huggingface.co/{hf_model_name}")
        
        except Exception as e:
            print(f"❌ 모델 업로드 중 오류 발생: {e}")

print("\n🎉 LOM Cushion LoRA Trainer 작업이 모두 완료되었습니다!")
