In [2]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import difflib
import random
from collections import Counter

# 1. 모델 및 토크나이저 로드 (기존과 동일)
model_name = "Qwen/Qwen3-14B"
# model_name = "dnotitia/DNA-R1"

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16
)

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    quantization_config=bnb_config,
    device_map="auto"
)

# 'categories' 딕셔너리가 함수 외부 또는 전역 스코프에 정의되어 있어야 합니다.
categories = {
    0: "졸업요건",
    1: "학교 공지사항",
    2: "학사일정",
    3: "식단 안내",
    4: "통학/셔틀 버스"
}

def _get_single_llm_prediction(user_query):
    # 1. 프롬프트를 'system'과 'user' 역할로 분리합니다.
    system_prompt = "당신은 주어진 학교 관련 질문을 사전 정의된 5개의 카테고리 중 하나로 분류하는 AI 분류기입니다. 오직 숫자 레이블 하나만 응답해야 합니다."

    user_prompt = f"""다음은 사용자 질문과 관련된 카테고리 목록입니다:
0: 졸업요건 (예: 졸업까지 몇 학점을 들어야 하나요?)
1: 학교 공지사항 (예: 이번에 올라온 공지사항 어디서 볼 수 있어요?)
2: 학사일정 (예: 이번 학기 수강신청은 언제 시작하나요?)
3: 식단 안내 (예: 오늘 학식 뭐 나와요?)
4: 통학/셔틀 버스 (예: 다음주에 셔틀버스는 정상 운행하나요?)

사용자 질문: "{user_query}"

위 질문은 어떤 카테고리에 가장 적합한가요? 숫자 레이블만 대답해주세요 (0, 1, 2, 3, 4 중 하나).

가장 적합한 카테고리 번호:"""

    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt}
    ]

    # 2. Qwen2의 채팅 템플릿을 적용하여 모델이 이해하는 형식으로 변환합니다.
    text = tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True,
        enable_thinking=False
    )

    # 3. 변환된 텍스트를 토큰화합니다.
    model_inputs = tokenizer([text], return_tensors="pt").to(model.device)

    # Qwen2 토크나이저에 pad_token이 없을 수 있으므로 eos_token으로 설정해주는 것이 안전합니다.
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    # 4. 모델을 통해 답변을 생성합니다. (기존 generate 파라미터 유지)
    generated_ids = model.generate(
        model_inputs.input_ids,
        max_new_tokens=5,
        eos_token_id=tokenizer.eos_token_id,
        pad_token_id=tokenizer.pad_token_id,
        do_sample=True,
        temperature=0.5,
        top_p=0.9
    )

    # 5. 입력 부분을 제외하고 순수하게 생성된 부분만 디코딩합니다.
    # 제공해주신 Qwen2 예제 코드의 디코딩 방식을 적용합니다.
    input_ids_len = model_inputs.input_ids.shape[1]
    response_ids = generated_ids[:, input_ids_len:]

    response_text = tokenizer.batch_decode(response_ids, skip_special_tokens=True)[0].strip()

    #print(f"모델에서 추출한 response_text(label 파싱 전): {response_text}")
    
    # 6. 응답에서 숫자 레이블을 파싱합니다. (기존 로직과 동일)
    try:
        predicted_label_str = ''.join(filter(str.isdigit, response_text))
        if predicted_label_str:
            predicted_label = int(predicted_label_str[0])
            if predicted_label in categories:
                return predicted_label
    except (ValueError, IndexError):
        # 파싱에 실패한 경우
        return None

    # 유효한 카테고리 번호가 아닌 경우
    return None


def _fallback_classify_by_similarity(user_query):
    """
    LLM 예측 실패 시, 키워드 유사도 기반으로 분류하는 Fallback 함수입니다.
    """
    category_examples = {
        0: "졸업 요건, 졸업 학점, 졸업 논문, 졸업 자격, 졸업 인증, 유예 신청, 졸업 필수 조건",
        1: "학교 공지사항, 안내문, 공고, 공지 업데이트, 휴강 안내, 장학금 공지, 긴급 알림",
        2: "수강 신청, 시험 기간, 성적 발표, 개강일, 종강일, 학사 일정, 수업 일정, 등록 일정",
        3: "학식, 학생 식당, 식단표, 조식, 중식, 석식, 오늘 메뉴, 급식 시간, 식단 운영",
        4: "셔틀버스, 통학버스, 운행 시간표, 버스 노선, 정류장 위치, 셔틀 예약, 통학 교통"
    }

    best_score = -1
    best_label = random.randint(0, 4) # 만약을 대비한 기본값
    for label, example in category_examples.items():
        score = difflib.SequenceMatcher(None, user_query, example).ratio()
        if score > best_score:
            best_score = score
            best_label = label

    return best_label


def classify_query(user_query, num_votes=5):
    """
    사용자 질의를 입력받아 Voting 앙상블 기법으로 카테고리를 분류합니다.

    Args:
        user_query (str): 사용자 질문
        num_votes (int): 앙상블에 사용할 투표 횟수 (홀수를 추천)

    Returns:
        tuple: (예측된 레이블, 예측된 카테고리 이름)
    """
    votes = []
    # 1. 정해진 횟수만큼 LLM에게 예측을 요청하여 투표 수집
    for _ in range(num_votes):
        prediction = _get_single_llm_prediction(user_query)
        #print(f"get single llm prediction의 결과물: {prediction}")
        if prediction is not None:
            votes.append(prediction)

    # 2. 투표 결과 분석
    if votes:
        # 가장 많이 나온 투표 결과를 선택 (다수결)
        counter = Counter(votes)
        # most_common(1)은 [(가장 흔한 항목, 횟수)] 형태의 리스트를 반환
        best_label = counter.most_common(1)[0][0]
        print(f"투표 결과: {counter}, 최종 선택: {best_label}")
    else:
        # 3. LLM이 모든 예측에 실패한 경우, Fallback 로직 실행
        print("LLM 예측이 모두 실패하여 유사도 기반 Fallback 로직을 실행합니다.")
        best_label = _fallback_classify_by_similarity(user_query)

    return best_label, categories[best_label]

if __name__ == "__main__":
    q = "오늘 학식 뭐 나와?"
    label, category = classify_query(q)
    print(f"입력: {q}\n예측: {label} ({category})")

Loading checkpoint shards:   0%|          | 0/8 [00:00<?, ?it/s]

모델에서 추출한 response_text(label 파싱 전): 3
get single llm prediction의 결과물: 3
모델에서 추출한 response_text(label 파싱 전): 3
get single llm prediction의 결과물: 3
모델에서 추출한 response_text(label 파싱 전): 3
get single llm prediction의 결과물: 3
모델에서 추출한 response_text(label 파싱 전): 3
get single llm prediction의 결과물: 3
모델에서 추출한 response_text(label 파싱 전): 3
get single llm prediction의 결과물: 3
투표 결과: Counter({3: 5}), 최종 선택: 3
입력: 오늘 학식 뭐 나와?
예측: 3 (식단 안내)


In [3]:
import sys
from pathlib import Path
project_root = str(Path.cwd().parent)
if project_root not in sys.path:
    sys.path.insert(0, project_root)
from src.answers import academic_calendar_answer,shuttle_bus_answer,graduation_req_answer,meals_answer,notices_answer

from typing import List, Dict, Any
import json
import torch
from threading import Thread
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer, BitsAndBytesConfig
from utils.config import settings


def format_meals_context(docs: List[Dict[str, Any]]) -> List[str]:
    formatted_texts = []
    for doc in docs:
        meal = doc.get("meal", "")
        menu = doc.get("menu", "운영안함").replace("\n", ", ")
        if menu != "운영안함":
            formatted_texts.append(f"[{meal}] {menu}")
    return formatted_texts


class HybridRetriever:
    def retrieve(self, query: str) -> List[str]:
        return []


class PromptBuilder:
    def build(self, question: str, docs: List[str]) -> str:
        context = "".join(docs) if docs else "참고할 정보가 없습니다."
        prompt = f"""당신은 충남대학교 관련 정보를 안내하는 챗봇입니다.
주어진 '참고 자료'를 근거로 사용자의 질문에 명확하고 친절하게 답변해야 합니다.
참고 자료에 없는 내용은 답변에 포함하지 마세요.

[참고 자료]
{context}

[질문]
{question}

[답변]
"""
        return prompt.strip()


class AnswerGenerator:
    def __init__(self) -> None:
        self.model_type = settings.generator_model_type
        self.model = None
        self.tokenizer = None
        self.client = None
        self.streamer = None
        bnb_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_use_double_quant=True,
            bnb_4bit_quant_type="nf4",
            bnb_4bit_compute_dtype=torch.bfloat16,
        )
        if self.model_type == "local":
            model_name = settings.generator_model_name_or_path
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.model = AutoModelForCausalLM.from_pretrained(
                model_name,
                device_map="auto" if torch.cuda.is_available() else "cpu",
                torch_dtype=torch.bfloat16,
                use_safetensors=True,
                quantization_config=bnb_config,
            )
            self.streamer = TextIteratorStreamer(
                self.tokenizer, skip_prompt=True, skip_special_tokens=True
            )
            print(f"✅ Local Generator Model Loaded: {model_name}")

    def generate(self, question: str, docs: List[Dict[str, Any]], max_new_tokens: int = 512, temperature: float = 0.0):
        if docs and isinstance(docs[0], dict) and "menu" in docs[0]:
            doc_texts = format_meals_context(docs)
        else:
            doc_texts = [json.dumps(d, ensure_ascii=False) for d in docs]
        prompt = PromptBuilder().build(question, doc_texts)

        if self.model_type == "local" and self.model and self.tokenizer:
            inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)
            generation_kwargs = dict(
                input_ids=inputs.input_ids,
                streamer=self.streamer,
                max_new_tokens=max_new_tokens,
                do_sample=temperature > 0.0,
                temperature=temperature if temperature > 0.0 else None,
            )
            thread = Thread(target=self.model.generate, kwargs=generation_kwargs)
            thread.start()
            for token in self.streamer:
                yield token
            return
        if self.model_type == "openai" and self.client:
            response = self.client.chat.completions.create(
                model=settings.openai_model_name,
                messages=[
                    {"role": "system", "content": "당신은 충남대학교 정보를 안내하는 챗봇입니다."},
                    {"role": "user", "content": prompt},
                ],
            )
            yield response.choices[0].message.content.strip()
            return
        yield "답변 생성 모델이 올바르게 설정되지 않았습니다."


ANSWER_HANDLERS = {
    0: graduation_req_answer.generate_answer,
    1: notices_answer.generate_answer,
    2: academic_calendar_answer.generate_answer,
    3: meals_answer.generate_answer,
    4: shuttle_bus_answer.generate_answer,
}

def generate_response(question: str) -> str:
    label, _ = classify_query(question)
    handler = ANSWER_HANDLERS.get(label)
    if handler:
        return handler(question)
    retriever = HybridRetriever()
    docs = retriever.retrieve(question)
    generator = AnswerGenerator()
    return ''.join(generator.generate(question, docs))

sample_question = '오늘 학식 뭐 나와?'
print(generate_response(sample_question))


모델에서 추출한 response_text(label 파싱 전): 3
get single llm prediction의 결과물: 3
모델에서 추출한 response_text(label 파싱 전): 3
get single llm prediction의 결과물: 3
모델에서 추출한 response_text(label 파싱 전): 3
get single llm prediction의 결과물: 3
모델에서 추출한 response_text(label 파싱 전): 3
get single llm prediction의 결과물: 3
모델에서 추출한 response_text(label 파싱 전): 3
get single llm prediction의 결과물: 3
투표 결과: Counter({3: 5}), 최종 선택: 3
2025-06-20 07:51:36.074 | INFO     | src.answers.meals_answer:get_context:184 - '%s' 날짜의 로컬 식단 정보를 사용합니다.
찾은 식단 정보: 메뉴운영내역, 운영안함, 운영안함 등


In [4]:
import json
from pathlib import Path
from tqdm import tqdm

# data/question directory relative to this notebook
base_dir = Path('../data/question') if Path('../data/question').exists() else Path('data/question')
output_dir = Path('../outputs') if Path('../outputs').exists() else Path('outputs')
output_dir.mkdir(parents=True, exist_ok=True)

for path in base_dir.glob('*.json'):
    with path.open('r', encoding='utf-8') as f:
        questions_data = json.load(f)
    results = []  # question-label pairs
    qa_results = []  # question-answer pairs
    for item in tqdm(questions_data, desc=path.name):
        question = item['question']
        label, _ = classify_query(question)  # VARCO 모델 사용
        answer = generate_response(question)  # 모델이 생성한 대답
        results.append({'question': question, 'label': label})
        qa_results.append({'question': question, 'answer': answer})
    out_file = output_dir / f"{path.stem}_output.json"
    qa_file = output_dir / f"{path.stem}_answer_output.json"
    with out_file.open('w', encoding='utf-8') as f:
        json.dump(results, f, ensure_ascii=False, indent=2)
    with qa_file.open('w', encoding='utf-8') as f:
        json.dump(qa_results, f, ensure_ascii=False, indent=2)
    print(f'✅ {out_file} 저장 완료')
    print(f'✅ {qa_file} 저장 완료')


randomized_korean_questions_result.json:   0%|                                      | 0/104 [00:00<?, ?it/s]

모델에서 추출한 response_text(label 파싱 전): 4
get single llm prediction의 결과물: 4
모델에서 추출한 response_text(label 파싱 전): 4
get single llm prediction의 결과물: 4
모델에서 추출한 response_text(label 파싱 전): 4
get single llm prediction의 결과물: 4
모델에서 추출한 response_text(label 파싱 전): 4
get single llm prediction의 결과물: 4
모델에서 추출한 response_text(label 파싱 전): 4
get single llm prediction의 결과물: 4
투표 결과: Counter({4: 5}), 최종 선택: 4
모델에서 추출한 response_text(label 파싱 전): 4
get single llm prediction의 결과물: 4
모델에서 추출한 response_text(label 파싱 전): 4
get single llm prediction의 결과물: 4
모델에서 추출한 response_text(label 파싱 전): 4
get single llm prediction의 결과물: 4


randomized_korean_questions_result.json:   0%|                                      | 0/104 [00:03<?, ?it/s]


KeyboardInterrupt: 

In [None]:
import time
import json
from pathlib import Path
import shutil
from IPython.display import display, clear_output
from datetime import datetime

# --- 1. 경로 설정 ---
# 웹 UI가 질문을 저장하는 폴더 (입력)
QUESTION_DIR = Path('../question')

# 분류된 답변을 저장하는 폴더 (출력)
ANSWER_DIR = Path('../answer')

# 처리가 완료된 질문을 옮길 폴더
PROCESSED_DIR = QUESTION_DIR / 'processed'

# 오류 발생 시 질문을 옮길 폴더
ERROR_DIR = QUESTION_DIR / 'error'

# 폴더가 없는 경우 생성
QUESTION_DIR.mkdir(exist_ok=True)
ANSWER_DIR.mkdir(exist_ok=True)
PROCESSED_DIR.mkdir(exist_ok=True)
ERROR_DIR.mkdir(exist_ok=True)


print(f"'{QUESTION_DIR}' 폴더를 실시간으로 감지합니다...")
print("노트북 셀을 중단(Interrupt)하면 감지가 종료됩니다.")

# --- 2. 실시간 감지 및 처리 루프 ---
try:
    while True:
        # question 폴더에서 아직 처리되지 않은 질문 파일 목록을 가져옴
        # 파일 이름이 'q_'로 시작하는 json 파일만 대상으로 함
        question_files = list(QUESTION_DIR.glob('q_*.json'))

        # 처리할 파일이 없으면 5초 대기 후 다시 확인
        if not question_files:
            time.sleep(5)
            continue

        # 새로운 질문 파일을 하나씩 처리
        for q_path in question_files:
            try:
                # 1. 질문 파일 읽기
                with q_path.open('r', encoding='utf-8') as f:
                    q_data = json.load(f)
                
                question_text = q_data.get('text', '')
                question_id = q_data.get('question_id', q_path.stem)

                # 2. 질문 분류 (기존에 정의된 classify_query 함수 사용)
                label, label_text = classify_query(question_text)

                # 3. 답변 파일 생성
                # 웹 UI가 질문(q_id)에 맞는 답변(a_id)을 찾을 수 있도록 파일 이름을 맞춤
                answer_id = question_id.replace('q_', 'a_')
                answer_path = ANSWER_DIR / f"{answer_id}.json"
                
                answer_data = {
                    'question_id': question_id,
                    'original_question': question_text,
                    'label': label,
                    'label_text': label_text,
                    'classified_at': datetime.now().isoformat()
                }

                # 답변 파일을 JSON 형식으로 저장
                with answer_path.open('w', encoding='utf-8') as f:
                    json.dump(answer_data, f, ensure_ascii=False, indent=2)

                # 4. 처리 완료된 질문 파일을 processed 폴더로 이동
                shutil.move(str(q_path), PROCESSED_DIR / q_path.name)
                
                # 5. 노트북에 처리 로그 출력
                clear_output(wait=True) # 이전 출력 지우기
                print(f"'{QUESTION_DIR}' 폴더를 실시간으로 감지합니다...")
                print("노트북 셀을 중단(Interrupt)하면 감지가 종료됩니다.\n")
                print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 처리 완료:")
                print(f"  - 질문 ID: {question_id}")
                print(f"  - 내   용: \"{question_text}\"")
                print(f"  - 분   류: {label} ({label_text})")
                print(f"  - 답변 파일: {answer_path}\n")
                print("다음 질문을 기다립니다...")

            except Exception as e:
                print(f"파일 처리 중 오류 발생: {q_path.name}, 오류: {e}")
                # 오류 발생 시 해당 파일을 error 폴더로 이동
                shutil.move(str(q_path), ERROR_DIR / q_path.name)

# 사용자가 직접 셀 실행을 중단할 경우 (KeyboardInterrupt) 루프 종료
except KeyboardInterrupt:
    print("\n실시간 분류 프로세스를 중단합니다.")