In [None]:
import json
from typing import List, Dict, Set, Tuple

def load_test_set(test_path: str) -> Set[Tuple[int, int]]:
    """
    테스트셋을 로드하고 (q_id, a_id) 쌍의 집합을 반환합니다.
    
    Args:
        test_path: 테스트셋 파일 경로
    
    Returns:
        (q_id, a_id) 튜플들의 집합
    """
    with open(test_path, 'r', encoding='utf-8') as f:
        test_data = json.load(f)
    
    test_pairs = set()
    for item in test_data:
        test_pairs.add((item["q_id"], item["a_id"]))
    
    print(f"테스트셋에서 {len(test_pairs)}개의 (q_id, a_id) 쌍을 로드했습니다.")
    return test_pairs

def extract_preference_pairs_filtered(data: List[Dict], test_pairs: Set[Tuple[int, int]]) -> List[Dict]:
    """
    테스트셋과 중복되지 않는 선호도 쌍을 추출합니다.
    
    Args:
        data: 원본 데이터 리스트
        test_pairs: 테스트셋의 (q_id, a_id) 쌍들
    
    Returns:
        필터링된 선호도 쌍 리스트
    """
    preference_pairs = []
    filtered_count = 0
    
    for item in data:
        q_id = item["q_id"]
        title = item["title"]
        content = item["content"]
        # question = item["title"] + " " + item["content"]
        answers = item["answers"]
        
        # 채택된 답변과 채택되지 않은 답변으로 분류
        selected_answers = []
        unselected_answers = []
        
        for answer in answers:
            # 테스트셋에 포함된 (q_id, a_id) 쌍은 제외
            if (q_id, answer["a_id"]) not in test_pairs:
                if answer["selected"]:
                    selected_answers.append(answer)
                else:
                    unselected_answers.append(answer)
            else:
                filtered_count += 1
        
        # 채택된 답변과 채택되지 않은 답변이 모두 존재하는 경우에만 쌍 생성
        if selected_answers and unselected_answers:
            # n개의 채택된 답변과 m개의 채택되지 않은 답변으로 n*m개의 쌍 생성
            for chosen_answer in selected_answers:
                for rejected_answer in unselected_answers:
                    preference_pairs.append({
                        "q_id": q_id,
                        "title": title,
                        "content": content,
                        "chosen": {
                            "a_id": chosen_answer["a_id"],
                            "answer": chosen_answer["answer"],
                            "answer_type": chosen_answer["answer_type"],
                            "answer_date": chosen_answer.get("answer_date", "")
                        },
                        "rejected": {
                            "a_id": rejected_answer["a_id"],
                            "answer": rejected_answer["answer"],
                            "answer_type": rejected_answer["answer_type"],
                            "answer_date": rejected_answer.get("answer_date", "")
                        },
                        "chosen_answer_type": chosen_answer["answer_type"],
                        "rejected_answer_type": rejected_answer["answer_type"],
                        "question_date": item.get("question_date", ""),
                        "animal_type": item.get("animal_type", ""),
                        "link": item.get("link", "")
                    })
    
    print(f"테스트셋과 중복되어 제외된 답변 수: {filtered_count}개")
    return preference_pairs

def analyze_pairs(preference_pairs: List[Dict]) -> Dict:
    """생성된 쌍들을 분석합니다."""
    
    analysis = {
        "total_pairs": len(preference_pairs),
        "expert_chosen_vs_expert_rejected": 0,
        "expert_chosen_vs_nonexpert_rejected": 0,
        "nonexpert_chosen_vs_expert_rejected": 0,
        "nonexpert_chosen_vs_nonexpert_rejected": 0,
        "unique_questions": len(set([pair["q_id"] for pair in preference_pairs]))
    }
    
    for pair in preference_pairs:
        chosen_type = pair["chosen_answer_type"]
        rejected_type = pair["rejected_answer_type"]
        
        if chosen_type == "expert" and rejected_type == "expert":
            analysis["expert_chosen_vs_expert_rejected"] += 1
        elif chosen_type == "expert" and rejected_type == "nonexpert":
            analysis["expert_chosen_vs_nonexpert_rejected"] += 1
        elif chosen_type == "nonexpert" and rejected_type == "expert":
            analysis["nonexpert_chosen_vs_expert_rejected"] += 1
        elif chosen_type == "nonexpert" and rejected_type == "nonexpert":
            analysis["nonexpert_chosen_vs_nonexpert_rejected"] += 1
    
    return analysis

# 메인 실행 코드
if __name__ == "__main__":
    # 파일 경로 설정
    data_path = "/home/work/factchecking/PetQA/data/interim/unique_data.json"
    test_path = "/home/work/factchecking/PetQA/data/processed/test.json"
    output_path = "./dpo_preference_pairs.json"
    
    # 테스트셋 로드
    print("=== 테스트셋 로딩 ===")
    test_pairs = load_test_set(test_path)
    
    # 원본 데이터 로드
    print("\n=== 원본 데이터 로딩 ===")
    with open(data_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    print(f"원본 데이터에서 {len(data)}개의 질문을 로드했습니다.")
    
    # 필터링된 선호도 쌍 추출
    print("\n=== DPO 선호도 쌍 생성 ===")
    preference_pairs = extract_preference_pairs_filtered(data, test_pairs)
    
    # 분석 결과 출력
    analysis = analyze_pairs(preference_pairs)
    
    print("\n=== 선호도 쌍 생성 결과 ===")
    print(f"총 쌍 개수: {analysis['total_pairs']:,}개")
    print(f"관련 질문 수: {analysis['unique_questions']:,}개")
    if analysis['unique_questions'] > 0:
        print(f"평균 질문당 쌍 수: {analysis['total_pairs'] / analysis['unique_questions']:.2f}개")
    print()
    
    print("=== 쌍 유형별 분포 ===")
    print(f"채택된 전문가 vs 채택되지 않은 전문가: {analysis['expert_chosen_vs_expert_rejected']:,}개")
    print(f"채택된 전문가 vs 채택되지 않은 일반인: {analysis['expert_chosen_vs_nonexpert_rejected']:,}개")
    print(f"채택된 일반인 vs 채택되지 않은 전문가: {analysis['nonexpert_chosen_vs_expert_rejected']:,}개")
    print(f"채택된 일반인 vs 채택되지 않은 일반인: {analysis['nonexpert_chosen_vs_nonexpert_rejected']:,}개")
    
    # 결과 저장
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(preference_pairs, f, ensure_ascii=False, indent=2)
    
    print(f"\n결과가 {output_path}에 저장되었습니다.")

In [None]:
import json
from typing import Dict, Tuple

def load_preprocessed_data(file_paths: list) -> Dict[Tuple[int, int], Dict]:
   """
   전처리된 데이터를 로드하고 (q_id, a_id) -> preprocessed 정보 매핑을 생성합니다.
   
   Args:
       file_paths: train.json, validation.json 파일 경로 리스트
   
   Returns:
       (q_id, a_id) 튜플을 키로 하는 전처리 정보 딕셔너리
   """
   preprocessed_map = {}
   
   for file_path in file_paths:
       print(f"로딩 중: {file_path}")
       with open(file_path, 'r', encoding='utf-8') as f:
           data = json.load(f)
       
       for item in data:
           key = (item["q_id"], item["a_id"])
           preprocessed_map[key] = {
               "preprocessed_question": item.get("preprocessed_question", ""),
               "preprocessed_answer": item.get("preprocessed_answer", "")
           }
       
       print(f"  - {len(data)}개 샘플 로드 완료")
   
   print(f"총 {len(preprocessed_map)}개의 전처리 정보를 로드했습니다.")
   return preprocessed_map

def add_preprocessed_info(preference_pairs: list, preprocessed_map: Dict[Tuple[int, int], Dict]) -> list:
   """
   선호도 쌍에 전처리 정보를 추가합니다.
   
   Args:
       preference_pairs: 기존 선호도 쌍 리스트
       preprocessed_map: 전처리 정보 매핑
   
   Returns:
       전처리 정보가 추가된 선호도 쌍 리스트
   """
   enhanced_pairs = []
   found_count = 0
   
   for pair in preference_pairs:
       enhanced_pair = pair.copy()
       
       # chosen 답변에 전처리 정보 추가
       chosen_key = (pair["q_id"], pair["chosen"]["a_id"])
       if chosen_key in preprocessed_map:
           enhanced_pair["chosen"]["preprocessed_answer"] = preprocessed_map[chosen_key]["preprocessed_answer"]
           enhanced_pair["preprocessed_question"] = preprocessed_map[chosen_key]["preprocessed_question"]
           found_count += 1
       else:
           enhanced_pair["chosen"]["preprocessed_answer"] = ""
           if "preprocessed_question" not in enhanced_pair:
               enhanced_pair["preprocessed_question"] = ""
       
       # rejected 답변에 전처리 정보 추가
       rejected_key = (pair["q_id"], pair["rejected"]["a_id"])
       if rejected_key in preprocessed_map:
           enhanced_pair["rejected"]["preprocessed_answer"] = preprocessed_map[rejected_key]["preprocessed_answer"]
           # 질문은 이미 위에서 처리했거나 동일하므로 중복 처리 방지
           if "preprocessed_question" not in enhanced_pair or not enhanced_pair["preprocessed_question"]:
               enhanced_pair["preprocessed_question"] = preprocessed_map[rejected_key]["preprocessed_question"]
       else:
           enhanced_pair["rejected"]["preprocessed_answer"] = ""
       
       enhanced_pairs.append(enhanced_pair)
   
   print(f"{found_count}개의 답변에 대해 전처리 정보를 찾았습니다.")
   return enhanced_pairs

# 전처리 데이터 로드
preprocessed_files = [
   "/home/work/factchecking/PetQA/data/processed/train.json",
   "/home/work/factchecking/PetQA/data/processed/validation.json"
]

preprocessed_map = load_preprocessed_data(preprocessed_files)

# 기존 선호도 쌍에 전처리 정보 추가
enhanced_preference_pairs = add_preprocessed_info(preference_pairs, preprocessed_map)

print(f"\n전처리 정보가 추가된 선호도 쌍: {len(enhanced_preference_pairs)}개")

# 결과 저장
enhanced_output_path = "./dpo_preference_pairs.json"
with open(enhanced_output_path, 'w', encoding='utf-8') as f:
   json.dump(enhanced_preference_pairs, f, ensure_ascii=False, indent=2)

In [None]:
# preprocessed_question이 빈 문자열인 샘플 필터링
def filter_empty_preprocessed_questions(preference_pairs: list) -> list:
   """
   preprocessed_question이 빈 문자열인 샘플을 제거합니다.
   
   Args:
       preference_pairs: 전처리 정보가 포함된 선호도 쌍 리스트
   
   Returns:
       필터링된 선호도 쌍 리스트
   """
   filtered_pairs = []
   removed_count = 0
   
   for pair in preference_pairs:
       preprocessed_question = pair.get("preprocessed_question", "").strip()
       
       if preprocessed_question:  # 빈 문자열이 아닌 경우만 포함
           filtered_pairs.append(pair)
       else:
           removed_count += 1
   
   print(f"빈 preprocessed_question으로 인해 제거된 샘플: {removed_count}개")
   print(f"필터링 후 남은 샘플: {len(filtered_pairs)}개")
   
   return filtered_pairs

# 필터링 수행
print("=== preprocessed_question 필터링 ===")
filtered_preference_pairs = filter_empty_preprocessed_questions(enhanced_preference_pairs)

# 필터링 통계
original_count = len(enhanced_preference_pairs)
filtered_count = len(filtered_preference_pairs)
removal_rate = ((original_count - filtered_count) / original_count) * 100 if original_count > 0 else 0

print(f"원본 샘플 수: {original_count:,}개")
print(f"필터링 후 샘플 수: {filtered_count:,}개")
print(f"제거율: {removal_rate:.2f}%")

# 필터링된 결과 저장
filtered_output_path = "./dpo_preference_pairs_filtered.json"
with open(filtered_output_path, 'w', encoding='utf-8') as f:
   json.dump(filtered_preference_pairs, f, ensure_ascii=False, indent=2)

print(f"\n필터링된 결과가 {filtered_output_path}에 저장되었습니다.")

In [3]:
import json
def analyze_pairs(preference_pairs):
    """생성된 쌍들을 분석합니다."""
    
    analysis = {
        "total_pairs": len(preference_pairs),
        "expert_chosen_vs_expert_rejected": 0,
        "expert_chosen_vs_nonexpert_rejected": 0,
        "nonexpert_chosen_vs_expert_rejected": 0,
        "nonexpert_chosen_vs_nonexpert_rejected": 0,
        "unique_questions": len(set([pair["q_id"] for pair in preference_pairs]))
    }
    
    for pair in preference_pairs:
        chosen_type = pair["chosen_answer_type"]
        rejected_type = pair["rejected_answer_type"]
        
        if chosen_type == "expert" and rejected_type == "expert":
            analysis["expert_chosen_vs_expert_rejected"] += 1
        elif chosen_type == "expert" and rejected_type == "nonexpert":
            analysis["expert_chosen_vs_nonexpert_rejected"] += 1
        elif chosen_type == "nonexpert" and rejected_type == "expert":
            analysis["nonexpert_chosen_vs_expert_rejected"] += 1
        elif chosen_type == "nonexpert" and rejected_type == "nonexpert":
            analysis["nonexpert_chosen_vs_nonexpert_rejected"] += 1
    
    return analysis

data_path = "/home/work/factchecking/PetQA/data/interim/dpo_cleaned_data.json"
with open(data_path, 'r', encoding='utf-8') as f:
    data = json.load(f)

analysis = analyze_pairs(data)

print("\n=== 선호도 쌍 생성 결과 ===")
print(f"총 쌍 개수: {analysis['total_pairs']:,}개\n")

print("=== 쌍 유형별 분포 ===")
print(f"채택된 전문가 vs 채택되지 않은 전문가: {analysis['expert_chosen_vs_expert_rejected']:,}개")
print(f"채택된 전문가 vs 채택되지 않은 일반인: {analysis['expert_chosen_vs_nonexpert_rejected']:,}개")
print(f"채택된 일반인 vs 채택되지 않은 전문가: {analysis['nonexpert_chosen_vs_expert_rejected']:,}개")
print(f"채택된 일반인 vs 채택되지 않은 일반인: {analysis['nonexpert_chosen_vs_nonexpert_rejected']:,}개")


=== 선호도 쌍 생성 결과 ===
총 쌍 개수: 16,435개

=== 쌍 유형별 분포 ===
채택된 전문가 vs 채택되지 않은 전문가: 0개
채택된 전문가 vs 채택되지 않은 일반인: 1,802개
채택된 일반인 vs 채택되지 않은 전문가: 105개
채택된 일반인 vs 채택되지 않은 일반인: 14,528개


In [4]:
import json
import pandas as pd
from sklearn.model_selection import train_test_split
import os
from collections import Counter

def create_stratified_dpo_split(input_file, output_dir, test_size=0.2, random_state=42):
    """
    DPO 데이터를 라벨 분포를 고려하여 train/validation으로 split
    
    Args:
        input_file: 입력 JSON 파일 경로
        output_dir: 출력 디렉토리 경로
        test_size: validation 데이터 비율
        random_state: 재현 가능한 결과를 위한 시드
    """
    
    # 데이터 로드
    with open(input_file, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    # DataFrame으로 변환
    df = pd.DataFrame(data)
    
    # 라벨 생성: chosen_answer_type과 rejected_answer_type 조합
    df['label'] = df['chosen_answer_type'] + '_vs_' + df['rejected_answer_type']
    
    # 라벨 분포 확인
    label_counts = Counter(df['label'])
    print("라벨 분포:")
    for label, count in label_counts.items():
        print(f"  {label}: {count}개 ({count/len(df)*100:.2f}%)")
    
    # Stratified split 수행
    try:
        train_df, val_df = train_test_split(
            df, 
            test_size=test_size, 
            random_state=random_state,
            stratify=df['label']
        )
        
        print(f"\n분할 결과:")
        print(f"  Train: {len(train_df)}개")
        print(f"  Validation: {len(val_df)}개")
        
        # 분할 후 라벨 분포 확인
        print(f"\nTrain 데이터 라벨 분포:")
        train_label_counts = Counter(train_df['label'])
        for label, count in train_label_counts.items():
            print(f"  {label}: {count}개 ({count/len(train_df)*100:.2f}%)")
        
        print(f"\nValidation 데이터 라벨 분포:")
        val_label_counts = Counter(val_df['label'])
        for label, count in val_label_counts.items():
            print(f"  {label}: {count}개 ({count/len(val_df)*100:.2f}%)")
        
    except ValueError as e:
        print(f"Stratified split 실패: {e}")
        print("일부 라벨의 샘플 수가 너무 적어서 stratify가 불가능합니다.")
        print("일반 random split을 수행합니다.")
        
        train_df, val_df = train_test_split(
            df, 
            test_size=test_size, 
            random_state=random_state
        )
    
    # label 컬럼 제거 (원본 데이터 형태로 복원)
    train_df = train_df.drop('label', axis=1)
    val_df = val_df.drop('label', axis=1)
    
    # JSON 파일로 저장
    train_output_file = os.path.join(output_dir, 'dpo_train.json')
    val_output_file = os.path.join(output_dir, 'dpo_validation.json')
    
    # DataFrame을 리스트로 변환하여 저장
    train_data = train_df.to_dict('records')
    val_data = val_df.to_dict('records')
    
    with open(train_output_file, 'w', encoding='utf-8') as f:
        json.dump(train_data, f, ensure_ascii=False, indent=2)
    
    with open(val_output_file, 'w', encoding='utf-8') as f:
        json.dump(val_data, f, ensure_ascii=False, indent=2)
    
    print(f"\n파일 저장 완료:")
    print(f"  Train: {train_output_file}")
    print(f"  Validation: {val_output_file}")
    
    return train_data, val_data

# 실행
if __name__ == "__main__":
    input_file = "/home/work/factchecking/PetQA/data/interim/dpo_cleaned_data.json"
    output_dir = "/home/work/factchecking/PetQA/data/processed"
    
    train_data, val_data = create_stratified_dpo_split(
        input_file=input_file,
        output_dir=output_dir,
        test_size=0.2,  # validation 데이터 20%
        random_state=42
    )

라벨 분포:
  expert_vs_nonexpert: 1802개 (10.96%)
  nonexpert_vs_expert: 105개 (0.64%)
  nonexpert_vs_nonexpert: 14528개 (88.40%)

분할 결과:
  Train: 13148개
  Validation: 3287개

Train 데이터 라벨 분포:
  nonexpert_vs_nonexpert: 11622개 (88.39%)
  expert_vs_nonexpert: 1442개 (10.97%)
  nonexpert_vs_expert: 84개 (0.64%)

Validation 데이터 라벨 분포:
  nonexpert_vs_nonexpert: 2906개 (88.41%)
  expert_vs_nonexpert: 360개 (10.95%)
  nonexpert_vs_expert: 21개 (0.64%)

파일 저장 완료:
  Train: /home/work/factchecking/PetQA/data/processed/dpo_train.json
  Validation: /home/work/factchecking/PetQA/data/processed/dpo_validation.json


In [6]:
file_path = "/home/work/factchecking/PetQA/data/interim/dpo_cleaned_data.json"
with open(file_path, "r") as f:
    data = json.load(f)

nonexpert_data = []
for item in data:
    if item['chosen_answer_type'] == 'nonexpert' and item['rejected_answer_type'] == 'expert':
        nonexpert_data.append(item)

output_path = "/home/work/factchecking/PetQA/data/interim/ne_chosen_e_rejected.json"
with open(output_path, "w") as f:
    json.dump(nonexpert_data, f, ensure_ascii=False, indent=2)