In [1]:
import torch
import pandas as pd
import numpy as np
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig
from transformers import BitsAndBytesConfig
from tqdm import tqdm
import re
from collections import Counter
from sentence_transformers import SentenceTransformer, util
import math

2025-09-01 17:50:13.790446: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-09-01 17:50:13.798892: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1756716613.808196 1678043 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1756716613.810908 1678043 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-09-01 17:50:13.821369: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr

In [2]:
data_base = '../data'
test_path = data_base + '/raginfo_all.csv'

In [3]:
test_df = pd.read_csv(test_path)
test_df

Unnamed: 0,ID,Question,RagInfo,score
0,TEST_000,금융산업의 이해와 관련하여 금융투자업의 구분에 해당하지 않는 것은?\n1 소비자금융...,"따른 중견기업 및 「외국환거래법」 제3조제15호에 따른 비거주자를 말한다. 다만, ...",0.023120
1,TEST_001,위험 관리 계획 수립 시 고려해야 할 요소로 적절하지 않은 것은?\n1 수행인력\n...,1.2.3 위험 평가 항 목 조직의 대내외 환경분석을 통하여 유형별 위협정보를 수집...,0.810402
2,TEST_002,관리체계 수립 및 운영'의 '정책 수립' 단계에서 가장 중요한 요소는 무엇인가?\n...,"관리체계 수립 및 운영 관리체계 수립 및 운영 영역은 관리체계 기반 마련, 위험 관...",0.984937
3,TEST_003,재해 복구 계획 수립 시 고려해야 할 요소로 옳지 않은 것은?\n1 복구 절차 수립...,"핵심 IT 서비스 및 시스템의 중요도 및 특성에 따른 복구 목표시간, 복구 목표시점...",0.917190
4,TEST_004,트로이 목마(Trojan) 기반 원격제어 악성코드(RAT)의 특징과 주요 탐지 지표...,What indicators of compromise would most relia...,0.826005
...,...,...,...,...
510,TEST_510,"""정보보호최고책임자""의 임명에 관한 설명으로 옳지 않은 것은?\n1 정보보호최고책임...",1.1.2 최고책임자의 지정 항 목 최고경영자는 정보보호 업무를 총괄하는 정보보호 ...,0.969154
511,TEST_511,IPv6 주소 체계의 주요 특징으로 옳지 않은 것은?\n1 NAT 필요성 감소\n2...,What is the primary security concern raised by...,0.491904
512,TEST_512,하이브리드 위협에 대한 설명으로 가장 적절한 것은?\n1 사이버 공간에서만 발생하는...,What lessons learned from notable past inciden...,0.209277
513,TEST_513,전자금융거래법의 주요 목적 중 하나는 무엇인가?\n1 전자금융거래의 비대면성 강화\...,전자금융거래법 제1조(목적) 이 법은 전자금융거래의 법률관계를 명확히 하여 전자금융...,0.998581


In [4]:
model_id = "../model/midm2_bf16"

bnb_config = BitsAndBytesConfig(
    load_in_8bit=True,             # 8-bit 양자화
    llm_int8_threshold=6.0,        # (기본값) 민감한 계층은 float 유지
    llm_int8_skip_modules=None,    # 특정 모듈 제외 가능
)

model = AutoModelForCausalLM.from_pretrained(
    model_id,
    device_map="auto",
    # quantization_config=bnb_config,
    torch_dtype=torch.bfloat16
)

tokenizer = AutoTokenizer.from_pretrained(model_id)
generation_config = GenerationConfig.from_pretrained(model_id)

Loading checkpoint shards:   0%|          | 0/5 [00:00<?, ?it/s]

Some parameters are on the meta device because they were offloaded to the cpu.


In [5]:
# 객관식 여부 판단 함수
def is_multiple_choice(question_text):
    """
    객관식 여부를 판단: 2개 이상의 숫자 선택지가 줄 단위로 존재할 경우 객관식으로 간주
    """
    lines = question_text.strip().split("\n")
    option_count = sum(bool(re.match(r"^\s*[1-9][0-9]?\s", line)) for line in lines)
    return option_count >= 2


# 질문과 선택지 분리 함수
def extract_question_and_choices(full_text):
    """
    전체 질문 문자열에서 질문 본문과 선택지 리스트를 분리
    """
    lines = full_text.strip().split("\n")
    q_lines = []
    options = []

    for line in lines:
        if re.match(r"^\s*[1-9][0-9]?\s", line):
            options.append(line.strip())
        else:
            q_lines.append(line.strip())
    
    question = " ".join(q_lines)
    return question, options

def extract_reason_only(text: str, original_question: str) -> str:
    """
    객관식: '정답:' 이전까지의 이유 설명을 추출
    주관식: '이유:' 이후의 이유 설명을 추출
    """
    is_mc = is_multiple_choice(original_question)

    if is_mc:
        # 객관식: 정답: 이전 텍스트 추출
        parts = re.split(r"정답\s*[:：]", text, maxsplit=1)
        if parts:
            return parts[0].strip()
    else:
        # 주관식: 이유: 이후 텍스트 추출
        match = re.search(r"이유\s*[:：]\s*(.*)", text, re.DOTALL)
        if match:
            return match.group(1).strip()
    
    return ""

In [6]:
def make_prompt_auto(text, rag):

    # RAG 문서 안내
    if rag != "":
        rag_prompt = (
            "다음은 질문과 관련된 문서의 일부입니다.\n"
            f"[문서 정보]\n{rag}\n\n"
        )
    else:
        rag_prompt = ""
        
    # 객관식
    if is_multiple_choice(text):
        question, options = extract_question_and_choices(text)
        numbered_options = [f"{i+1}. {opt}" for i, opt in enumerate(options)]

        prompt = (
            "다음 객관식 질문에 대해 **가장 적절한 선택지의 번호**를 정답으로 작성하세요. (1,2,3,4 or 5)\n\n"
            "출력 형식:\n정답: 번호(번호만)\n\n"
            f"{rag_prompt}"
            f"[질문]\n{question.strip()}\n\n"
            "[선택지]\n" + "\n".join(numbered_options) + "\n\n"
            "정답:"
        )

    # 주관식
    else:
        prompt = (
            "다음 주관식 질문에 대해 **정확한 정답**을 주요 키워드를 활용하여 작성하시오.\n"
            "정답은 정답에 사용할 키워드를 뽑아본 후 이를 활용하여 3문장 내로 서술하시오.\n"
            "출력 형식:\n키워드: 키워드\n정답: 내용\n\n"
            f"{rag_prompt}"
            f"[질문]\n{text.strip()}\n\n"
        )

    # 시스템 프롬프트
    system_prompt = (
        "당신은 한국의 금융 보안 및 법률에 정통한 전문가입니다.\n"
        "출력 형식을 반드시 지키세요.\n"
    )

    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": prompt}
    ]
    return messages


In [7]:
# 후처리 함수
def extract_answer_only(text: str, original_question: str) -> str:
    # 공백 또는 빈 문자열일 경우 기본값 지정
    if not text:
        return "미응답"

    # 객관식 여부 판단
    is_mc = is_multiple_choice(original_question)

    if is_mc:
        # 숫자만 추출
        match = re.match(r"\D*([1-9][0-9]?)", text)
        if match:
            return match.group(1)
        else:
            # 숫자 추출 실패 시 "0" 반환
            return "0"
    else:
        return text

In [8]:
def inference(df, tokenizer, model):

    text = df["Question"]
    rag = df["RagInfo"] if df["score"] > 0.75 else ""

    input_message = make_prompt_auto(text, rag)
    # print(input_message)

    text_prompt = tokenizer.apply_chat_template(
        input_message,
        add_generation_prompt=True,
        tokenize=False,
        enable_thinking=True
    )

    model_inputs = tokenizer([text_prompt], return_tensors="pt").to(model.device)
    model_inputs.pop("token_type_ids", None)

    generated_ids = model.generate(
        **model_inputs,
        generation_config=generation_config,
        max_new_tokens=512,
        pad_token_id=tokenizer.eos_token_id,
        eos_token_id=tokenizer.eos_token_id
    )
    output_ids = generated_ids[0][len(model_inputs.input_ids[0]):].tolist() 

    content = tokenizer.decode(output_ids, skip_special_tokens=True).strip("\n")

    text = content.split("정답:")[-1].strip()
    reason = content.split("정답:")[0].strip()

    return reason, text


In [None]:
preds = []
reasons = []

for _, t in tqdm(test_df.iterrows(), desc="Inference", total=len(test_df)):
    answer_candidates = []
    reasoning_candidates = []

    for _ in range(5):  # 반복 횟수 조절 가능
        thinking_content, content = inference(t, tokenizer, model)
        pred_answer = extract_answer_only(content, original_question=t["Question"])

        answer_candidates.append(pred_answer)
        reasoning_candidates.append(thinking_content)

    is_mc = is_multiple_choice(t["Question"])


    if is_mc:
        # ✅ 객관식 → majority voting
        final_answer = Counter(answer_candidates).most_common(1)[0][0]
        matched_reasoning = next(
            (r for a, r in zip(answer_candidates, reasoning_candidates) if a == final_answer),
            reasoning_candidates[0]
        )

        if any(a != final_answer for a in answer_candidates):
            print(f"❗ 다른 답안이 존재합니다 - Question: {t['Question']}")
            print(answer_candidates)

    else:
        def extract_keywords_from_text(t: str) -> list[str]:
            m = re.search(r"키워드\s*:\s*(.+)", t)
            if not m:
                return []
            return [k.strip().lower() for k in m.group(1).split(",") if k.strip()]

        keywords_list = [extract_keywords_from_text(i) for i in reasoning_candidates]
        keywords_list = [[k for k in kws if k] for kws in keywords_list]

        # 2) df/idf 계산 (코퍼스=후보들)
        N = len(keywords_list)
        all_unique = sorted({k for kws in keywords_list for k in set(kws)})

        # df: 몇 개의 후보가 이 키워드를 갖고 있는가?
        df = {k: sum(1 for kws in keywords_list if k in set(kws)) for k in all_unique}

        # idf: log 스무딩
        idf = {k: math.log((N + 1) / (df[k] + 1)) + 1.0 for k in all_unique}
        max_idf = max(idf.values()) if idf else 1.0

        def keyword_weight(k: str, alpha: float = 0.7) -> float:
            """합의(consensus) + (너무 흔한 키워드 패널티) 를 섞은 하이브리드 가중치"""
            if k not in df:
                return 0.0
            # 합의도 (본인 제외 0~1)
            cf = (df[k] - 1) / max(1, N - 1)
            # IDF 정규화 (희소할수록 1)
            idf_norm = (idf[k] - 1.0) / max(1e-8, (max_idf - 1.0))
            # 흔함 점수 = 1 - idf_norm (너무 흔하면 1에 가깝지 않음)
            commonness = 1.0 - idf_norm
            return alpha * cf + (1 - alpha) * commonness

        # 3) 후보 점수 계산
        def score_candidate(kws: list[str], alpha: float = 0.7) -> float:
            # 중복 제거 후 각 키워드 가중치 합
            uniq = set(kws)
            return sum(keyword_weight(k, alpha=alpha) for k in uniq)

        scores = [score_candidate(kws, alpha=0.7) for kws in keywords_list]

        # 4) tie-breaker: (a) 기존 교집합 수, (b) 키워드 개수 많은 쪽
        def overlap_score(i: int) -> int:
            others = set().union(*[set(x) for j, x in enumerate(keywords_list) if j != i])
            return len(set(keywords_list[i]) & others)

        best_idx = max(
            range(len(scores)),
            key=lambda i: (scores[i], overlap_score(i), len(set(keywords_list[i])))
        )

        print("best_idx:", best_idx)
        print("chosen keywords:", keywords_list[best_idx])
        print("hybrid score:", scores[best_idx])

        final_answer = answer_candidates[best_idx]
        matched_reasoning = reasoning_candidates[best_idx]

    preds.append(final_answer)
    reasons.append(matched_reasoning)

In [None]:
sample_submission = pd.read_csv(data_base + '/sample_submission.csv')
sample_submission['Answer'] = preds
sample_submission.to_csv(data_base + "/rag_submission.csv", index=False, encoding='utf-8-sig')
sample_submission['Reason'] = reasons
sample_submission.to_csv(data_base + '/reason_debug.csv', index=False, encoding='utf-8-sig')