In [None]:
import pandas as pd
import json
import re
import time
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from typing import Dict

class HFModelWrapper:
    def __init__(self, model_name_or_path: str):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name_or_path,
            torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
            device_map="auto"
        )
        self.model.eval()

    def generate(self, prompt: str, max_new_tokens=512) -> str:
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
        with torch.no_grad():
            output = self.model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                do_sample=True,
                temperature=0.8,
                top_p=0.95,
                eos_token_id=self.tokenizer.eos_token_id
            )
        return self.tokenizer.decode(output[0], skip_special_tokens=True)


class RestaurantReviewClassifier:
    def __init__(self, model_path: str):
        self.model = HFModelWrapper(model_path)
        self.classification_prompt = """
다음 맛집 리뷰를 분석하여 아래 5개 카테고리로 분류해주세요:

1. [맛] - 맛에 대한 언급 (예: 맛있다, 달다, 짜다, 부드럽다 등)
2. [음식종류] - 음식의 종류나 메뉴 (예: 닭갈비, 파스타, 삼겹살, 해산물 등)
3. [평가] - 전반적인 평가나 만족도 (예: 좋음, 추천함, 별로임, 최고 등)
4. [가격] - 가격에 대한 언급 (예: 가성비좋음, 비싸다, 저렴함, 합리적 등)
5. [가게이름] - 블로그에 나온 맛집의 이름(예: ㅇㅇ가게, ㅇㅇ닭갈비..)

리뷰 내용: {review_text}

결과를 다음 JSON 형식으로 답변해주세요:
{{
    "맛": "추출된 맛 관련 키워드",
    "음식종류": "추출된 음식 종류",
    "평가": "추출된 평가 내용",
    "가격": "추출된 가격 관련 내용",
    "가게이름": "추출된 가게의 이름"
}}

만약 해당 카테고리에 대한 정보가 없으면 "정보없음"으로 표시해주세요.
"""

    def _create_empty_result(self) -> Dict[str, str]:
        return {
            "맛": "정보없음",
            "음식종류": "정보없음",
            "평가": "정보없음",
            "가격": "정보없음",
            "가게이름": "정보없음"
        }

    def classify_single_review(self, review_text: str) -> Dict[str, str]:
        try:
            prompt = self.classification_prompt.format(review_text=review_text[:1000])
            response = self.model.generate(prompt)
            json_match = re.search(r"\{.*\}", response, re.DOTALL)
            if json_match:
                json_str = json_match.group()
                try:
                    result = json.loads(json_str)
                    return result
                except json.JSONDecodeError:
                    print(f"[JSON 파싱 오류]: {json_str}")
            else:
                print(f"[JSON 응답 누락]: {response}")
        except Exception as e:
            print(f"[분류 오류]: {e}")
        return self._create_empty_result()

    def classify_csv_file(self, input_file: str, output_file: str, sample_size: int = None):
        print("CSV 파일을 읽는 중...")
        df = pd.read_csv(input_file, encoding='utf-8')

        if sample_size:
            df = df.head(sample_size)
            print(f"샘플 {sample_size}개로 제한하여 처리합니다.")

        print(f"총 {len(df)}개의 리뷰를 처리합니다.")

        df['분류_맛'] = ''
        df['분류_음식종류'] = ''
        df['분류_평가'] = ''
        df['분류_가격'] = ''
        df['분류_가게이름'] = ''  # ✅ 추가

        for idx, row in df.iterrows():
            print(f"\n[{idx + 1}/{len(df)}] 리뷰 처리 중...")
            review_text = str(row.get('본문내용', '')).strip()
            if not review_text:
                print(" → 본문 없음. 스킵합니다.")
                continue

            result = self.classify_single_review(review_text)

            df.at[idx, '분류_맛'] = result.get('맛', '정보없음')
            df.at[idx, '분류_음식종류'] = result.get('음식종류', '정보없음')
            df.at[idx, '분류_평가'] = result.get('평가', '정보없음')
            df.at[idx, '분류_가격'] = result.get('가격', '정보없음')
            df.at[idx, '분류_가게이름'] = result.get('가게이름', '정보없음')  # ✅ 추가

            print(f" → 맛: {result.get('맛')}")
            print(f" → 음식종류: {result.get('음식종류')}")
            print(f" → 평가: {result.get('평가')}")
            print(f" → 가격: {result.get('가격')}")
            print(f" → 가게이름: {result.get('가게이름')}")

            time.sleep(1)

        df.to_csv(output_file, index=False, encoding='utf-8-sig')

    def show_sample_results(self, output_file: str, num_samples: int = 5):
        df = pd.read_csv(output_file, encoding='utf-8-sig')
        print(f"\n=== 분류 결과 샘플 ({num_samples}개) ===")
        for idx in range(min(num_samples, len(df))):
            row = df.iloc[idx]
            print(f"\n#{idx + 1}")
            print(f"원본 리뷰: {row['본문내용'][:100]}...")
            print(f"맛: {row['분류_맛']}")
            print(f"음식종류: {row['분류_음식종류']}")
            print(f"평가: {row['분류_평가']}")
            print(f"가격: {row['분류_가격']}")
            print(f"가게이름: {row['분류_가게이름']}")  # ✅ 추가
            print("-" * 50)


def main():
    print("=== 맛집 리뷰 분류기 (Hugging Face 모델 기반) ===\n")

    model_path = "google/gemma-3-4b-it"

    classifier = RestaurantReviewClassifier(model_path)

    input_file = "blog_search_results.csv"
    output_file = "맛집_분류결과"

    try:
        classifier.classify_csv_file(input_file, output_file, sample_size=10)
        classifier.show_sample_results(output_file, num_samples=5)
    except FileNotFoundError:
        print(f"'{input_file}' 파일이 존재하지 않습니다.")
    except Exception as e:
        print(f"[오류 발생]: {e}")


if __name__ == "__main__":
    main()
