In [3]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer, BitsAndBytesConfig
from datasets import Dataset
import json
from tqdm import tqdm
import os
from collections import Counter

_global_models = {}

class LLMJudge:
    def __init__(self, judge_llm_name: str, max_seq_length: int = 512):
        
        config = BitsAndBytesConfig(
                load_in_4bit=True,
                bnb_4bit_compute_dtype=torch.bfloat16,
                bnb_4bit_use_double_quant=True,
                bnb_4bit_quant_type="nf4"
            )
        if judge_llm_name not in _global_models:
            self.tokenizer = AutoTokenizer.from_pretrained(judge_llm_name)
            self.model = AutoModelForCausalLM.from_pretrained(judge_llm_name, torch_dtype=torch.bfloat16, quantization_config=config, device_map="auto")
            
            if self.tokenizer.pad_token is None:
                self.tokenizer.pad_token = self.tokenizer.eos_token
                self.model.config.pad_token_id = self.tokenizer.eos_token_id
            
            self.model.eval()
            
            self.device = self.model.device
            _global_models[judge_llm_name] = {"tokenizer": self.tokenizer, "model": self.model, "device": self.device}
        else:
            loaded_data = _global_models[judge_llm_name]
            self.tokenizer = loaded_data["tokenizer"]
            self.model = loaded_data["model"]
            self.device = loaded_data["device"]
            
        self.max_seq_length = max_seq_length

    def _create_judge_prompt(self, question: str, generated_answer: str, true_answer: str) -> str:
        return (
            f"다음은 [질문], [생성된 답변], 그리고 [기준 정답]입니다.\n"
            f"[기준 정답]을 기준으로 [생성된 답변]이 정확한지 판단하세요.\n"
            f"판단은 '정답', '오답', '모호' 셋 중 하나로만 답하고, 그 뒤에 간략한 이유를 덧붙이세요.\n"
            f"[생성된 답변]이 **실제로 맞는 답변이더라도 [기준 정답]과 다르면 '오답'으로 판단**해야 합니다.\n"
            f"[생성된 답변]이 [기준 정답]과 비교하여 애매하거나, 부분적으로 맞지만 명확히 '정답'이나 '오답'으로 분류하기 어렵다면 '모호'로 판단하세요.\n\n"
            f"[질문]: {question}\n"
            f"[생성된 답변]: {generated_answer}\n"
            f"[기준 정답]: {true_answer}\n\n" 
            f"판단: "
        )

    def judge_answer(self, question: str, generated_answer: str, true_answer: str) -> str:
        judge_prompt = self._create_judge_prompt(question, generated_answer, true_answer)
        
        inputs = self.tokenizer(judge_prompt, return_tensors="pt", 
                                max_length=self.max_seq_length, truncation=True).to(self.device)
        
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=20, 
                num_beams=1,
                do_sample=False,
                temperature=0.0,
                pad_token_id=self.tokenizer.pad_token_id,
                eos_token_id=self.tokenizer.eos_token_id
            )
            
        judge_full_output = self.tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], 
                                                 skip_special_tokens=True).strip().lower()

        
        if "정답" in judge_full_output:
            return "정답"
        elif "오답" in judge_full_output:
            return "오답"
        elif "모호" in judge_full_output:
            return "모호"
        else:
            return "모호" 

def slearn_llm_training_with_llm_judge_modular(
    model_name: str = "./model/LLM/deepseek-qwen-bllossom-32b",
    data_path: str = "./data/qna_data.json",
    output_dir: str = "./model",
    max_seq_length: int = 512,
    batch_size: int = 4,
    learning_rate: float = 2e-5,
    num_train_epochs: int = 3,
    eval_steps: int = 500,
    save_steps: int = 500,
    judge_llm_names: list = ["./model/LLM/google_gemma-2b", "./model/LLM/google_gemma-2b-it", "./model/LLM/google_gemma-7b", "./model/LLM/google_gemma-7b-it", "./model/LLM/deepseek-ai_deepseek-llm-7b-base"] # 5개의 LLM
):
    global _global_models 

    config = BitsAndBytesConfig(
                load_in_4bit=True,
                bnb_4bit_compute_dtype=torch.bfloat16,
                bnb_4bit_use_double_quant=True,
                bnb_4bit_quant_type="nf4"
            )
    
    if model_name not in _global_models:
        model = AutoModelForCausalLM.from_pretrained(model_name, 
                                                     torch_dtype=torch.bfloat16, 
                                                     device_map="auto",
                                                     quantization_config=config)
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        if tokenizer.pad_token is None:
            tokenizer.pad_token = tokenizer.eos_token
            model.config.pad_token_id = tokenizer.eos_token_id
        _global_models[model_name] = {"tokenizer": tokenizer, "model": model, "device": model.device}
    else:
        loaded_data = _global_models[model_name]
        tokenizer = loaded_data["tokenizer"]
        model = loaded_data["model"]
    
    
    llm_judges = [LLMJudge(judge_llm_name=name, max_seq_length=max_seq_length) for name in judge_llm_names]

    with open(data_path, 'r', encoding='utf-8') as f:
        qna_data = json.load(f)

    generated_qna_data = []
    print("--- 답변 생성 중 ---")
    for item in tqdm(qna_data, desc="답변 생성 중"):
        question = item['question']
        # context = item['context']
        true_answer = item['answer']

        input_text_prompt = f"질문 : {question}\n답변 : "
        
        # input_text_prompt = (
        #                 f"당신은 주어진 질문에 대해 답변할 수 있다면 상세하게 답변하고, "
        #                 f"**만약 질문에 대한 답변이 불가능하거나 알 수 없다면 '죄송합니다. 이 질문에 대해서는 답변해 드릴 수 없습니다.'라고만 답변하세요.**\n\n"
        #                 f"질문: {question}\n"
        #                 f"답변: "
        #             )
        inputs = tokenizer(input_text_prompt, return_tensors="pt", max_length=max_seq_length, truncation=True)
        inputs = {k: v.to(model.device) for k, v in inputs.items()}

        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                max_new_tokens=128,
                num_beams=1,
                do_sample=True,
                temperature=0.3,
                pad_token_id=tokenizer.pad_token_id,
                eos_token_id=tokenizer.eos_token_id
            )
        
        generated_answer = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True).strip()
        
        generated_qna_data.append({
            "question": question,
            # "context": context,
            "true_answer": true_answer,
            "generated_answer": generated_answer,
            "input_text_prompt": input_text_prompt
        })

    incorrect_qna_pairs = [] 
    human_review_qna_pairs = [] 
    
    output_results_dir = "./data/slearn"
    os.makedirs(output_results_dir, exist_ok=True)

    print("\n--- LLM Judge를 이용한 판단 진행 중 (다수결 투표) ---")
    for idx, item in enumerate(tqdm(generated_qna_data, desc="판단 진행 중")):
        judgements = []
        for judge in llm_judges:
            judgements.append(judge.judge_answer(
                question=item['question'],
                generated_answer=item['generated_answer'],
                true_answer=item['true_answer']
            ))
        
        vote_counts = Counter(judgements)
        
        final_decision = "모호" 
        if vote_counts['정답'] >= 3:
            final_decision = "정답"
        elif vote_counts['오답'] >= 3:
            final_decision = "오답"
        
        if final_decision == "오답":
            incorrect_qna_pairs.append({
                "question": item['question'],
                # "context": item['context'],
                "answer": item['true_answer'],
                "input_text": item['input_text_prompt'],
                "output_text": item['true_answer']
            })
        elif final_decision == "모호":
            human_review_qna_pairs.append({
                "question": item['question'],
                # "context": item['context'],
                "true_answer": item['true_answer'],
                "generated_answer": item['generated_answer'],
                "individual_judgements": judgements 
            })
        
        file_name = os.path.join(output_results_dir, f"slearn_result_{idx+1:05d}.txt")
        with open(file_name, 'w', encoding='utf-8') as f:
            f.write(f"<질문>: {item['question']}\n\n")
            # f.write(f"<문서>: {item['context']}\n\n")
            f.write(f"<생성된 답변>: {item['generated_answer']}\n\n")
            f.write(f"<기준 정답>: {item['true_answer']}\n\n")
            f.write(f"<각 LLM 판단 결과>: {judgements}\n") 
            f.write(f"<최종 판단 결과 (다수결)>: {final_decision}\n")

    print(f"\n총 {len(qna_data)}개의 QA 쌍 중:")
    print(f"- {len(incorrect_qna_pairs)}개가 '오답'으로 확정되어 재학습 대상입니다.")
    print(f"- {len(human_review_qna_pairs)}개가 '모호'로 판단되어 사람의 검토가 필요합니다.")
    
    
    if incorrect_qna_pairs:
        with open(os.path.join(output_results_dir, "incorrect_qna_pairs.json"), 'w', encoding='utf-8') as f:
            json.dump(incorrect_qna_pairs, f, ensure_ascii=False, indent=4)
        print(f"오답 데이터가 '{os.path.join(output_results_dir, 'incorrect_qna_pairs.json')}'에 저장되었습니다.")
    
    
    if human_review_qna_pairs:
        with open(os.path.join(output_results_dir, "human_review_qna_pairs.json"), 'w', encoding='utf-8') as f:
            json.dump(human_review_qna_pairs, f, ensure_ascii=False, indent=4)
        print(f"모호 판단 데이터가 '{os.path.join(output_results_dir, 'human_review_qna_pairs.json')}'에 저장되었습니다.")

    
    if not incorrect_qna_pairs:
        print("재학습할 오답 데이터가 없습니다.")
        return

    filtered_data = []
    for item in incorrect_qna_pairs:
        filtered_data.append({
            "input_text": item["input_text"],
            "output_text": item["output_text"]
        })

    dataset = Dataset.from_list(filtered_data)

    def tokenize_function(examples):
        full_text = [
            f"{examples['input_text'][i]}{examples['output_text'][i]}{tokenizer.eos_token}"
            for i in range(len(examples['input_text']))
        ]
        
        tokenized_inputs = tokenizer(
            full_text,
            max_length=max_seq_length,
            truncation=True,
            padding="max_length"
        )

        labels = []
        for i in range(len(examples['input_text'])):
            input_prompt_len = len(tokenizer(examples['input_text'][i], truncation=False)['input_ids'])
            
            label = list(tokenized_inputs['input_ids'][i])
            for j in range(input_prompt_len):
                label[j] = -100
            labels.append(label)
        
        tokenized_inputs["labels"] = labels
        return tokenized_inputs

    tokenized_dataset = dataset.map(
        tokenize_function,
        batched=True,
        remove_columns=dataset.column_names
    )

    training_args = TrainingArguments(
        output_dir=output_dir,
        per_device_train_batch_size=batch_size,
        learning_rate=learning_rate,
        num_train_epochs=num_train_epochs,
        logging_dir=f"{output_dir}/logs",
        logging_steps=100,
        save_strategy="steps",
        save_steps=save_steps,
        evaluation_strategy="no", 
        # eval_steps=eval_steps, 
        load_best_model_at_end=False, 
        # metric_for_best_model="loss", 
        # greater_is_better=False,
        fp16=True if torch.cuda.is_available() else False,
        gradient_accumulation_steps=1,
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_dataset,
        tokenizer=tokenizer,
    )

    print("\n--- 오답 데이터를 이용한 모델 재학습 시작 ---")
    trainer.train()
    
    model.save_pretrained(output_dir)
    tokenizer.save_pretrained(output_dir)
    print(f"\n재학습된 모델이 '{output_dir}'에 저장되었습니다.")

if __name__ == "__main__":
    slearn_llm_training_with_llm_judge_modular(
        model_name="./model/LLM/deepseek-qwen-bllossom-32b",
        data_path="./data/qna_data.json",
        output_dir="./model/LLM/slearn_llm_finetuned_llm_judge_modular",
        max_seq_length=512,
        batch_size=2,
        learning_rate=2e-5,
        num_train_epochs=3,
        eval_steps=100,
        save_steps=100,
        judge_llm_names=[
            "./model/LLM/google_gemma-2b-it", 
            "./model/LLM/google_gemma-2b", 
            "./model/LLM/google_gemma-7b-it", 
            "./model/LLM/google_gemma-7b", 
            "./model/LLM/EleutherAI_gpt-neo-125m" 
        ] 
    )

Loading checkpoint shards:   0%|          | 0/14 [00:00<?, ?it/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

--- 답변 생성 중 ---


답변 생성 중: 100%|██████████| 60/60 [06:25<00:00,  6.42s/it]



--- LLM Judge를 이용한 판단 진행 중 (다수결 투표) ---


판단 진행 중: 100%|██████████| 60/60 [01:36<00:00,  1.62s/it]


총 60개의 QA 쌍 중:
- 0개가 '오답'으로 확정되어 재학습 대상입니다.
- 34개가 '모호'로 판단되어 사람의 검토가 필요합니다.
모호 판단 데이터가 './data/slearn/human_review_qna_pairs.json'에 저장되었습니다.
재학습할 오답 데이터가 없습니다.





In [2]:
# import os
# from huggingface_hub import snapshot_download

# models_to_download = [
#     "google/gemma-2b-it",
#     "google/gemma-2b",
#     "google/gemma-7b-it",
#     "google/gemma-7b",
#     "EleutherAI/gpt-neo-125m"
#     ""
# ]

# base_download_dir = "./model/LLM"

# for model_name in models_to_download:
#     save_path = os.path.join(base_download_dir, model_name.replace("/", "_"))
    
#     print(f"Downloading {model_name} to {save_path} (excluding .gguf files)...")
#     try:
#         snapshot_download(
#             repo_id=model_name,
#             local_dir=save_path,
#             allow_patterns=["*"],  
#             ignore_patterns=["*.gguf"], 
#         )
#         print(f"Successfully downloaded {model_name}")
#     except Exception as e:
#         print(f"Error downloading {model_name}: {e}")
#     print("-" * 50)

# print("\nAll specified models have been processed.")