In [None]:
import torch
from unsloth import FastLanguageModel
from datasets import load_dataset
from tqdm import tqdm
import pandas as pd

# 모델의 데이터 타입을 설정
dtype = torch.bfloat16

# 모델을 4비트 양자화하여 로드할지 여부
load_in_4bit = True

# 배치 크기 설정
BATCH_SIZE = 8

# 모델과 토크나이저 초기화
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name="Bllossom/llama-3.2-Korean-Bllossom-3B",
    max_seq_length=200,
    dtype=dtype,
    load_in_4bit=load_in_4bit,
    device_map="auto",
)

In [2]:
# 체크포인트 경로
checkpoint_path = "./adapter/checkpoint-Construction"

# adapter 가중치 로드
model.load_adapter(checkpoint_path)

In [3]:
def prepare_batch(examples):
    # 빈 리스트를 생성하여 프롬프트를 저장합니다.
    prompts = []
    
    # 질문의 길이만큼 반복합니다.
    for idx in range(len(examples['question'])):
        # 각 질문에 대해 포맷된 프롬프트 문자열을 생성합니다.
        prompt = f"""문제: {examples['question'][idx]}

        선택지:
        A. {examples['A'][idx]}
        B. {examples['B'][idx]}
        C. {examples['C'][idx]}
        D. {examples['D'][idx]}

        정답:"""
        # 생성된 프롬프트를 리스트에 추가합니다.
        prompts.append(prompt)
    
    # 배치 토크나이징을 수행하여 입력을 텐서로 변환합니다.
    inputs = tokenizer(
        prompts, 
        return_tensors="pt",  # PyTorch 텐서 형식으로 반환
        truncation=True,       # 길이가 max_length를 초과하는 경우 잘라냄
        max_length=512,       # 최대 길이를 512로 설정
        padding=True           # 배치 내의 모든 시퀀스 길이를 맞추기 위해 패딩 추가
    )
    
    # 입력 텐서를 GPU로 이동시켜 반환합니다.
    return {k: v.cuda() for k, v in inputs.items()}

In [4]:
def process_batch_outputs(outputs, prompt_lengths, examples, category):
    # 결과를 저장할 빈 리스트를 생성합니다.
    results = []
    
    # 선택지에 대한 번호 매핑을 정의합니다.
    answer_mapping = {'A': 1, 'B': 2, 'C': 3, 'D': 4}
    
    # 출력과 프롬프트 길이를 동시에 반복합니다.
    for idx, (output, prompt_length) in enumerate(zip(outputs, prompt_lengths)):
        # 토큰화된 출력을 디코딩하고, 프롬프트 길이 이후의 응답을 추출합니다.
        response = tokenizer.decode(output, skip_special_tokens=True)[prompt_length:].strip().upper()
        
        # 예측된 답안을 저장할 변수를 초기화합니다.
        predicted_answer = None
        
        # 가능한 답안(A, B, C, D) 중에서 응답에 포함된 답안을 찾습니다.
        for answer in ['A', 'B', 'C', 'D']:
            if answer in response:
                predicted_answer = answer  # 응답에 포함된 답안을 저장합니다.
                break
        
        # 예측된 답안을 숫자로 변환합니다.
        predicted_answer_num = answer_mapping.get(predicted_answer, None)
        
        # 결과 리스트에 질문 ID, 예측된 답안, 정답, 정답 여부, 카테고리를 추가합니다.
        results.append({
            'question_id': idx,
            'predicted': predicted_answer,  # 예측된 답안
            'correct': examples['answer'][idx], # 정답
            'is_correct': predicted_answer_num == examples['answer'][idx],  # 정답 여부
            'category': category    # 질문 카테고리
        })
    
    # 최종 결과 리스트를 반환합니다.
    return results

In [None]:
import time

# 모델 최적화 설정
model.eval()
torch.backends.cudnn.benchmark = True

# KMMLU 데이터셋의 모든 카테고리 목록
kmmlu_categories = [
    "Accounting", "Agricultural-Sciences", "Aviation-Engineering-and-Maintenance", "Biology", 
    "Chemical-Engineering", "Chemistry", "Civil-Engineering", "Computer-Science", "Construction", 
    "Criminal-Law", "Ecology", "Economics", "Education", "Electrical-Engineering", 
    "Electronics-Engineering", "Energy-Management", "Environmental-Science", "Fashion", "Food-Processing", 
    "Gas-Technology-and-Engineering", "Geomatics", "Health", "Industrial-Engineer", "Information-Technology", "Interior-Architecture-and-Design", 
    "Law", "Machine-Design-and-Manufacturing", "Management", "Maritime-Engineering", "Marketing", "Materials-Engineering",
    "Mechanical-Engineering", "Nondestructive-Testing", "Patent", "Political-Science-and-Sociology", "Psychology",
    "Public-Safety", "Railway-and-Automotive-Engineering", "Real-Estate", "Refrigerating-Machinery", "Social-Welfare",
    "Taxation", "Telecommunications-and-Wireless-Technology", "Korean-History", "Math"
]

# 데이터셋 평가를 위한 빈 리스트를 생성합니다.
full_results = []

# 추론 시간 측정
start_time = time.time()

# 각 카테고리별로 데이터셋을 평가합니다.
for category in kmmlu_categories:
    # KMMLU 데이터셋을 로드합니다.
    dataset = load_dataset("HAERAE-HUB/KMMLU", category)
    test_dataset = dataset['test']  # 테스트 데이터셋 선택
    
    # 모델을 추론 모드로 설정합니다.
    FastLanguageModel.for_inference(model)
    
    # 테스트 데이터셋을 배치 사이즈만큼 반복합니다.
    for i in tqdm(range(0, len(test_dataset), BATCH_SIZE), desc=f"{category} 평가"):
        # 배치 데이터의 끝 인덱스를 계산합니다.
        end_idx = min(i + BATCH_SIZE, len(test_dataset))
        batch_data = test_dataset.select(range(i, end_idx))  # 현재 배치 데이터 선택
        
        # 배치 데이터를 전처리합니다.
        inputs = prepare_batch(batch_data)

        # 모델의 추론 모드에서 배치 추론을 수행합니다.
        with torch.inference_mode():
            outputs = model.generate(
                **inputs,  # 전처리된 입력을 모델에 전달합니다.
                max_new_tokens=1,  # 생성할 최대 토큰 수 설정
                num_return_sequences=1,  # 반환할 시퀀스 수
                temperature=0.7,  # 샘플링 온도 설정
                use_cache=True,  # 캐시 사용 여부
                do_sample=False,  # 샘플링 방식
            )
        
        # 각 입력의 프롬프트 길이를 계산합니다.
        prompt_lengths = [len(tokenizer.decode(input_ids, skip_special_tokens=True)) 
                          for input_ids in inputs['input_ids']]
        
        # 배치 결과를 처리합니다.
        batch_results = process_batch_outputs(outputs, prompt_lengths, batch_data, category)
        # 처리된 배치 결과를 전체 결과에 추가합니다.
        full_results.extend(batch_results)
        
end_time = time.time()
# 추론 시간 계산
inference_time = end_time - start_time

In [None]:
# 결과 분석을 위한 DataFrame을 생성합니다.
df = pd.DataFrame(full_results)

# 전체 정확도를 계산합니다.
overall_accuracy = df['is_correct'].mean()

# 카테고리별 메트릭스를 계산합니다.
category_metrics = df.groupby('category')['is_correct'].agg([
    ('accuracy', 'mean'),           # 각 카테고리의 정확도 계산
    ('total_questions', 'count'),   # 각 카테고리의 총 문제 수 계산
    ('correct_answers', 'sum')      # 각 카테고리의 정답 수 계산
]).reset_index()  # 그룹화된 DataFrame의 인덱스를 기본 정수 인덱스로 재설정

# 전체 정확도를 행으로 추가합니다.
overall_row = pd.DataFrame({
    '카테고리': '전체',               # 새로운 카테고리 이름
    '평균 정확도': overall_accuracy,      # 전체 정확도
    '총 문제 수': len(df),        # 총 문제 수
    'correct_answers': df['is_correct'].sum(),  # 총 정답 수
    'inference_time': inference_time # 총 추론 시간
}, index=[0])  # 인덱스를 0으로 설정하여 DataFrame 생성

# 전체 정확도 출력
print(f"\n전체 정확도: {overall_accuracy:.2%}")
# 전체 문제 수 출력
print(f"총 문제 수: {len(df)}")
# 정답 수 출력
print(f"정답 수: {df['is_correct'].sum()}")
# 총 추론 시간 출력
print(f"추론 시간: {inference_time:.2f} 초")

In [None]:
overall_row

In [11]:
# 결과 저장
df.to_csv('./result/kmmlu_full_evaluation_results_test.csv', index=False)
category_metrics.to_csv('./result/kmmlu_category_metrics_test.csv', index=False)