<a href="https://colab.research.google.com/github/tlsgptj/AIStudy/blob/main/%EC%82%BC%EC%84%B1_%EC%9E%AC%EC%97%85%EB%A1%9C%EB%93%9C.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import random
import warnings
import re
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm

import torch
from transformers import (
    BlipProcessor, BlipForConditionalGeneration,
    AutoTokenizer, AutoModelForCausalLM,
    Trainer, TrainingArguments,
    TextDataset, DataCollatorForLanguageModeling
)
from peft import get_peft_model, LoraConfig, TaskType

os.environ["WANDB_DISABLED"] = "true"

# 여기서 모델 변경하는거임
blip_name = "Salesforce/blip-image-captioning-base"
llm_base = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
train_csv = "/content/samsung_data/train.csv"
test_csv = "/content/samsung_data/test.csv"
train_img_dir = "/content/samsung_data/train_input_images"
test_img_dir = "/content/samsung_data/test_input_images"
finetune_txt = "/content/finetune_data.txt"
finetuned_dir = "./tiny_finetuned"
submission_path = "/content/final_submit.csv"


warnings.filterwarnings("ignore")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using:", device)

def seed_everything(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

# 1. 데이터 전처리
def prepare_finetune_data():
    processor = BlipProcessor.from_pretrained(blip_name)
    blip_model = BlipForConditionalGeneration.from_pretrained(blip_name).to(device).eval()
    df = pd.read_csv(train_csv)
    samples = []

    print("[INFO] 학습 데이터 샘플 5개 출력:")
    sample_count = 0

    for _, row in tqdm(df.iterrows(), total=len(df), desc="Generating finetune data"):
        image_path = os.path.join(train_img_dir, f"{row['ID']}.jpg")
        if not os.path.exists(image_path):
            continue
        image = Image.open(image_path).convert("RGB")
        question = row["Question"]
        choices = [row["A"], row["B"], row["C"], row["D"]]
        answer = row["answer"].strip().upper()
        if answer not in ['A', 'B', 'C', 'D']:
            continue

        inputs = processor(images=image, text="Describe this image in detail.", return_tensors="pt").to(device)
        with torch.no_grad():
            caption_ids = blip_model.generate(
                **inputs,
                max_length=50,
                num_beams=5,
                early_stopping=True,
                no_repeat_ngram_size=2,
            )
        caption = processor.tokenizer.decode(caption_ids[0], skip_special_tokens=True).strip()

        prompt = (
            f"Image description: {caption}\n"
            f"Question: {question}\n"
            f"Choices: A. {choices[0]} B. {choices[1]} C. {choices[2]} D. {choices[3]}\n"
            f"Answer:"
        )
        samples.append(f"{prompt} {answer}")

        if sample_count < 5:
            print(f"\n샘플 {sample_count+1}:")
            print(f"Prompt:\n{prompt}")
            print(f"Answer: {answer}")
            sample_count += 1

    with open(finetune_txt, "w", encoding="utf-8") as f:
        f.write("\n\n".join(samples))

# 2. 모델 파인튜닝
def finetune_llm():
    tokenizer = AutoTokenizer.from_pretrained(llm_base)
    model = AutoModelForCausalLM.from_pretrained(llm_base)

    peft_config = LoraConfig(
        r=8,
        lora_alpha=16,
        target_modules=["q_proj", "v_proj"],
        lora_dropout=0.1,
        bias="none",
        task_type=TaskType.CAUSAL_LM,
    )
    model = get_peft_model(model, peft_config)

    dataset = TextDataset(tokenizer=tokenizer, file_path=finetune_txt, block_size=512)
    data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)

    training_args = TrainingArguments(
        output_dir=finetuned_dir,
        overwrite_output_dir=True,
        num_train_epochs=20,
        per_device_train_batch_size=4,
        save_steps=500,
        save_total_limit=2,
        logging_steps=50,
        fp16=True,
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        data_collator=data_collator,
        train_dataset=dataset
    )

    print("[INFO] 학습 시작...")
    trainer.train()
    print("[INFO] 학습 완료. 모델 저장 중...")
    model.save_pretrained(finetuned_dir)
    tokenizer.save_pretrained(finetuned_dir)
    print("[INFO] 모델 저장 완료.")

# 3. 추론
def generate_caption(image, processor, model):
    try:
        inputs = processor(images=image, return_tensors="pt").to(device)
        with torch.no_grad():
            generated_ids = model.generate(
                **inputs,
                max_length=50,
                num_beams=5,
                early_stopping=True
            )
        caption = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
        print(f"[DEBUG] Generated caption: {caption}")
        return caption
    except Exception as e:
        print(f"[Caption Error] {e}")
        return ""


def extract_choice(ans):
    ans = ans.strip().upper()
    for ch in ans:
        if ch in ['A', 'B', 'C', 'D']:
            return ch
    return "?"

def run_inference():
    processor = BlipProcessor.from_pretrained(blip_name)
    blip_model = BlipForConditionalGeneration.from_pretrained(blip_name).to(device).eval()
    tokenizer = AutoTokenizer.from_pretrained(finetuned_dir)
    model = AutoModelForCausalLM.from_pretrained(finetuned_dir).to(device).eval()

    df = pd.read_csv(test_csv)
    results = []

    print("[INFO] 추론 시작...")
    for idx, row in tqdm(df.iterrows(), total=len(df), desc="Running inference"):
        try:
            image_path = os.path.join(test_img_dir, f"{row['ID']}.jpg")
            image = Image.open(image_path).convert("RGB")
            question = row["Question"]
            choices = [row["A"], row["B"], row["C"], row["D"]]

            caption = generate_caption(image, processor, blip_model)
            prompt = (
                f"Image description: {caption}\n"
                f"Question: {question}\n"
                f"Choices: A. {choices[0]} B. {choices[1]} C. {choices[2]} D. {choices[3]}\n"
                f"Answer:"
            )

            print(f"\n[Prompt idx {idx}]\n{prompt}")

            inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512).to(device)
            with torch.no_grad():
                outputs = model.generate(
                    **inputs,
                    max_new_tokens=50,
                    num_beams=5,
                    early_stopping=True,
                    no_repeat_ngram_size=2
                )
            answer = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
            best_choice = extract_choice(answer)
            results.append(best_choice)

            print(f"[Output idx {idx}] Raw answer: '{answer}'")
            print(f"[Output idx {idx}] 선택: {best_choice}")
        except Exception as e:
            print(f"Error at idx {idx}: {e}")
            results.append("?")

    pd.DataFrame({"ID": df["ID"], "answer": results}).to_csv(submission_path, index=False)
    print(f"[INFO] 결과 저장 완료: {submission_path}")

# 4. 실행
if __name__ == "__main__":
    seed_everything()
    prepare_finetune_data()
    finetune_llm()
    run_inference()


In [None]:
import os
import torch
import pandas as pd
from PIL import Image
from tqdm import tqdm

from transformers import AutoTokenizer, AutoModelForCausalLM, BlipProcessor, BlipForQuestionAnswering

# 경로 설정
test_csv = "/content/samsung_data/test.csv"
test_img_dir = "/content/samsung_data/test_input_images"
submission_path = "/content/samsung_data/sample_submission.csv"
finetuned_dir = "./tiny_finetuned"  # 파인튜닝된 TinyLlama 저장 경로
blip_name = "Salesforce/blip-vqa-base"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 모델 로딩
print("Loading models...")
tokenizer = AutoTokenizer.from_pretrained(finetuned_dir)
llm_model = AutoModelForCausalLM.from_pretrained(finetuned_dir).to(device).eval()
blip_processor = BlipProcessor.from_pretrained(blip_name)
blip_model = BlipForQuestionAnswering.from_pretrained(blip_name).to(device).eval()
print("Models loaded.")

# 이미지 캡션 생성
def generate_caption(image):
    try:
        inputs = blip_processor(image, "Describe the scene in detail.", return_tensors="pt").to(device)
        with torch.no_grad():
            output = blip_model.generate(**inputs)
        return blip_processor.tokenizer.decode(output[0], skip_special_tokens=True).strip()
    except:
        return ""

# 추론 함수
def inference():
    test_df = pd.read_csv(test_csv)
    results = []

    for _, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Inference"):
        try:
            image_path = os.path.join(test_img_dir, f"{row['ID']}.jpg")
            image = Image.open(image_path).convert("RGB")
            question = row["Question"]
            choices = [row["A"], row["B"], row["C"], row["D"]]

            caption = generate_caption(image)
            prompt = f"Image description: {caption}\nQuestion: {question}\nChoices: A. {choices[0]} B. {choices[1]} C. {choices[2]} D. {choices[3]}\nAnswer:"

            inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512).to(device)
            with torch.no_grad():
                outputs = llm_model.generate(**inputs, max_new_tokens=4)
            answer = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()

            if answer and answer[0] in "ABCD":
                results.append(answer[0])
            else:
                results.append("?")
        except Exception as e:
            print(f"Error: {e}")
            results.append("?")

    pd.DataFrame({"ID": test_df["ID"], "answer": results}).to_csv(submission_path, index=False)
    print("Saved submission to:", submission_path)

if __name__ == "__main__":
    inference()