# 🚀 FSKU 데이터 증강 시스템 (RAG + Chain-of-Thought)

## 📌 개요
- **대회**: 2025 금융 AI Challenge Track1
- **목적**: 외부 금융 문서 기반 고품질 학습 데이터 자동 생성
- **핵심 기술**: RAG (Retrieval Augmented Generation) + LLM 체이닝 + 품질 관리

## 🔄 전체 동작 프로세스
```
1. 문서 수집 → 2. RAG 인덱싱 → 3. 컨텍스트 검색 → 4. LLM 문제 생성 
→ 5. 품질 평가 → 6. 체이닝 개선 → 7. 최종 검증 → 8. 데이터 저장
```

## 🛠️ 핵심 기법
1. **RAG (Retrieval Augmented Generation)**
   - 외부 금융 문서를 벡터 DB에 저장
   - 의미 기반 검색으로 관련 컨텍스트 추출
   - LLM 생성 시 참고 자료로 활용

2. **Chain-of-Thought 체이닝**
   - 생성 → 검증 → 개선 → 최종확인 4단계
   - 각 단계별 전문 프롬프트 사용
   - 품질 점수 기반 자동 개선

3. **품질 관리 시스템**
   - 다차원 평가 (길이, 구조, 명확성, 금융용어)
   - 70점 이상만 통과
   - 실시간 모니터링 대시보드

## 📊 예상 결과
- 생성 목표: 1,000~5,000개 고품질 문제
- 품질 기준: 70점 이상
- 소요 시간: 약 2-4시간 (모델 크기 따라)
- 출력 형식: JSONL (학습용), 메타데이터 (분석용)

## 1. 환경 설정 및 시스템 정보

In [None]:
# 경고 메시지 숨기기
import warnings
warnings.filterwarnings('ignore')

# 시스템 정보 출력
import platform
import sys
import os

print("=" * 60)
print("🖥️  시스템 환경 정보")
print("=" * 60)
print(f"OS: {platform.system()} {platform.release()}")
print(f"Python: {platform.python_version()}")
print(f"현재 디렉토리: {os.getcwd()}")
print("=" * 60)

## 2. 필수 패키지 설치

In [None]:
# 필수 패키지 설치
# 주의: 이미 설치된 경우 스킵됩니다

print("📦 필수 패키지 설치 중...")
print("(이미 설치된 패키지는 자동으로 스킵됩니다)\n")

# 기본 패키지
%pip install -q transformers accelerate bitsandbytes
print("✅ Transformers 관련 패키지 설치 완료")

# 벡터 검색 및 임베딩
%pip install -q sentence-transformers faiss-cpu
print("✅ 벡터 검색 패키지 설치 완료")

# 문서 처리
%pip install -q PyPDF2 pdfplumber pandas openpyxl
print("✅ 문서 처리 패키지 설치 완료")

# 추가 유틸리티
%pip install -q tiktoken langchain tqdm matplotlib seaborn
print("✅ 유틸리티 패키지 설치 완료")

print("\n✅ 모든 패키지 설치 완료!")

## 3. 라이브러리 임포트 및 환경 체크

In [None]:
# 필수 라이브러리 임포트
print("📚 라이브러리 임포트 중...")

# 기본 라이브러리
import os
import sys
import json
import time
import pickle
import re
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Optional, Any, Tuple
from collections import defaultdict, Counter
import random
import logging

# 데이터 처리
import numpy as np
import pandas as pd
from tqdm import tqdm

# 시각화
import matplotlib.pyplot as plt
import seaborn as sns
plt.rcParams['font.family'] = 'DejaVu Sans'  # 한글 폰트 설정

# 딥러닝 프레임워크
import torch
from transformers import (
    AutoModelForCausalLM, 
    AutoTokenizer, 
    BitsAndBytesConfig
)

print("✅ 라이브러리 임포트 완료\n")

# GPU/환경 체크
device = "cuda" if torch.cuda.is_available() else "cpu"
print("=" * 60)
print("🔥 PyTorch 환경 정보")
print("=" * 60)
print(f"PyTorch 버전: {torch.__version__}")
print(f"Device: {device}")

if device == "cuda":
    print(f"GPU 이름: {torch.cuda.get_device_name(0)}")
    print(f"GPU 메모리: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f}GB")
    print(f"현재 할당된 메모리: {torch.cuda.memory_allocated() / 1024**3:.2f}GB")
else:
    print("⚠️ GPU를 사용할 수 없습니다. CPU 모드로 실행됩니다.")
    print("   성능이 느릴 수 있습니다.")

# 프로젝트 경로 설정
print("\n" + "=" * 60)
print("📁 프로젝트 경로 설정")
print("=" * 60)

# 현재 노트북의 위치를 기준으로 경로 설정
PROJECT_ROOT = Path.cwd()
DATA_DIR = PROJECT_ROOT / "data"
EXTERNAL_DIR = DATA_DIR / "external"  # 외부 문서 디렉토리
OUTPUT_DIR = DATA_DIR / "augmented"   # 생성된 데이터 저장
CACHE_DIR = DATA_DIR / "cache"        # 캐시 디렉토리
VECTORDB_DIR = DATA_DIR / "vectordb"  # 벡터 DB 저장

# 디렉토리 생성
for dir_path in [DATA_DIR, EXTERNAL_DIR, OUTPUT_DIR, CACHE_DIR, VECTORDB_DIR]:
    dir_path.mkdir(parents=True, exist_ok=True)
    print(f"✅ {dir_path.name}/")

print(f"\n프로젝트 루트: {PROJECT_ROOT}")
print(f"외부 데이터: {EXTERNAL_DIR}")
print(f"출력 디렉토리: {OUTPUT_DIR}")

# 로깅 설정
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)

print("\n✅ 환경 설정 완료!")

## 4. RAG 시스템 - 문서 로더 클래스

In [None]:
class DocumentLoader:
    """
    다양한 형식의 문서를 로드하는 클래스
    
    지원 형식:
    - PDF: PyPDF2 또는 pdfplumber 사용
    - Excel: pandas로 읽기
    - TXT: 기본 파일 읽기
    - JSON: 구조화된 데이터 로드
    """
    
    def __init__(self):
        # 지원하는 파일 형식들
        self.supported_formats = ['.pdf', '.txt', '.xlsx', '.xls', '.json']
        
    def load_document(self, file_path: Path) -> Optional[Dict]:
        """
        문서를 로드하는 메인 함수
        
        Args:
            file_path: 문서 파일 경로
            
        Returns:
            Dict: {
                'content': str,      # 문서 전체 텍스트
                'metadata': dict,    # 메타데이터
                'pages': list       # 페이지별 텍스트 (PDF의 경우)
            }
        """
        # 파일 존재 확인
        if not file_path.exists():
            logger.error(f"파일이 존재하지 않습니다: {file_path}")
            return None
            
        # 파일 확장자 추출
        suffix = file_path.suffix.lower()
        
        # 지원하지 않는 형식 체크
        if suffix not in self.supported_formats:
            logger.warning(f"지원하지 않는 파일 형식: {suffix}")
            return None
        
        try:
            # 파일 형식별 처리
            if suffix == '.pdf':
                return self._load_pdf(file_path)
            elif suffix == '.txt':
                return self._load_text(file_path)
            elif suffix in ['.xlsx', '.xls']:
                return self._load_excel(file_path)
            elif suffix == '.json':
                return self._load_json(file_path)
                
        except Exception as e:
            logger.error(f"문서 로드 실패 {file_path}: {str(e)}")
            return None
    
    def _load_pdf(self, file_path: Path) -> Dict:
        """
        PDF 파일 로드 - 페이지별 텍스트 추출
        """
        try:
            import PyPDF2
            
            text_pages = []
            metadata = {
                'source': file_path.name,
                'type': 'pdf',
                'pages': 0
            }
            
            # PDF 파일 열기
            with open(file_path, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                metadata['pages'] = len(pdf_reader.pages)
                
                # 각 페이지에서 텍스트 추출
                for page_num, page in enumerate(pdf_reader.pages):
                    try:
                        text = page.extract_text()
                        if text.strip():  # 빈 페이지 제외
                            text_pages.append({
                                'page': page_num + 1,
                                'content': text
                            })
                    except Exception as e:
                        logger.debug(f"페이지 {page_num+1} 추출 실패: {e}")
                        continue
                        
        except ImportError:
            # PyPDF2가 없으면 pdfplumber 시도
            logger.info("PyPDF2 대신 pdfplumber 사용")
            return self._load_pdf_with_pdfplumber(file_path)
            
        # 전체 텍스트 결합
        full_text = "\n\n".join([
            f"[페이지 {p['page']}]\n{p['content']}" 
            for p in text_pages
        ])
        
        return {
            'content': full_text,
            'metadata': metadata,
            'pages': text_pages
        }
    
    def _load_pdf_with_pdfplumber(self, file_path: Path) -> Dict:
        """
        pdfplumber로 PDF 로드 (대체 방법)
        """
        import pdfplumber
        
        text_pages = []
        metadata = {
            'source': file_path.name,
            'type': 'pdf',
            'pages': 0
        }
        
        # pdfplumber로 파일 열기
        with pdfplumber.open(file_path) as pdf:
            metadata['pages'] = len(pdf.pages)
            
            # 각 페이지 처리
            for page_num, page in enumerate(pdf.pages):
                text = page.extract_text()
                if text:
                    text_pages.append({
                        'page': page_num + 1,
                        'content': text
                    })
        
        # 전체 텍스트 결합
        full_text = "\n\n".join([
            f"[페이지 {p['page']}]\n{p['content']}" 
            for p in text_pages
        ])
        
        return {
            'content': full_text,
            'metadata': metadata,
            'pages': text_pages
        }
    
    def _load_text(self, file_path: Path) -> Dict:
        """
        텍스트 파일 로드
        """
        try:
            # UTF-8로 시도
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
        except UnicodeDecodeError:
            # 실패시 cp949(한글 윈도우) 인코딩 시도
            with open(file_path, 'r', encoding='cp949') as f:
                content = f.read()
        
        return {
            'content': content,
            'metadata': {
                'source': file_path.name,
                'type': 'text',
                'size': len(content)
            }
        }
    
    def _load_excel(self, file_path: Path) -> Dict:
        """
        Excel 파일 로드
        """
        # 모든 시트 읽기
        dfs = pd.read_excel(file_path, sheet_name=None)
        
        text_parts = []
        # 각 시트별로 처리
        for sheet_name, df in dfs.items():
            text_parts.append(f"\n[시트: {sheet_name}]\n")
            # DataFrame을 텍스트로 변환
            text_parts.append(df.to_string())
        
        content = "\n".join(text_parts)
        
        return {
            'content': content,
            'metadata': {
                'source': file_path.name,
                'type': 'excel',
                'sheets': list(dfs.keys()),
                'total_rows': sum(len(df) for df in dfs.values())
            }
        }
    
    def _load_json(self, file_path: Path) -> Dict:
        """
        JSON 파일 로드
        """
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        # JSON을 보기 좋은 텍스트로 변환
        content = json.dumps(data, ensure_ascii=False, indent=2)
        
        return {
            'content': content,
            'metadata': {
                'source': file_path.name,
                'type': 'json',
                'keys': list(data.keys()) if isinstance(data, dict) else None
            }
        }
    
    def load_directory(self, dir_path: Path) -> List[Dict]:
        """
        디렉토리의 모든 문서 로드
        
        Args:
            dir_path: 디렉토리 경로
            
        Returns:
            문서 리스트
        """
        documents = []
        
        if not dir_path.exists():
            logger.error(f"디렉토리가 존재하지 않습니다: {dir_path}")
            return documents
        
        # 지원하는 형식의 파일들 찾기
        total_files = 0
        for ext in self.supported_formats:
            for file_path in dir_path.glob(f"*{ext}"):
                total_files += 1
                print(f"📄 로딩: {file_path.name}")
                
                doc = self.load_document(file_path)
                if doc:
                    documents.append(doc)
                    print(f"   ✅ 성공 (크기: {len(doc['content']):,}자)")
                else:
                    print(f"   ❌ 실패")
        
        print(f"\n📊 로드 결과: {len(documents)}/{total_files}개 문서 로드 완료")
        return documents


# 문서 로더 테스트
print("🔄 문서 로더 클래스 생성 완료")
loader = DocumentLoader()
print(f"지원 형식: {', '.join(loader.supported_formats)}")

## 5. RAG 시스템 - 문서 청킹 클래스

In [None]:
class DocumentChunker:
    """
    긴 문서를 의미 있는 단위로 분할하는 클래스
    
    특징:
    - 청크 크기: 300 토큰 (RAG 최적화)
    - 오버랩: 50 토큰 (문맥 유지)
    - 한국어 특화: 형태소 기반 분할
    """
    
    def __init__(self, chunk_size: int = 300, chunk_overlap: int = 50):
        """
        초기화
        
        Args:
            chunk_size: 각 청크의 최대 토큰 수
            chunk_overlap: 청크 간 겹치는 토큰 수
        """
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.min_chunk_size = 50  # 최소 청크 크기
        
    def chunk_document(self, document: Dict) -> List[Dict]:
        """
        문서를 청크로 분할
        
        처리 과정:
        1. 문서를 문장 단위로 분할
        2. 각 문장의 토큰 수 계산
        3. 청크 크기에 맞게 문장들을 결합
        4. 오버랩 처리하여 문맥 유지
        """
        content = document.get('content', '')
        metadata = document.get('metadata', {})
        
        # PDF인 경우 페이지별 처리 고려
        if metadata.get('type') == 'pdf' and 'pages' in document:
            chunks = []
            # 각 페이지별로 청킹
            for page_info in document['pages']:
                page_chunks = self._chunk_text(
                    page_info['content'],
                    {**metadata, 'page': page_info['page']}
                )
                chunks.extend(page_chunks)
            return chunks
        else:
            # 일반 텍스트 처리
            return self._chunk_text(content, metadata)
    
    def _chunk_text(self, text: str, metadata: Dict = None) -> List[Dict]:
        """
        텍스트를 청크로 분할하는 실제 로직
        """
        if not text or len(text.strip()) < self.min_chunk_size:
            return []
        
        # 문장 단위로 분할
        sentences = self._split_sentences(text)
        
        # 청크 생성
        chunks = []
        current_chunk = []  # 현재 청크의 문장들
        current_length = 0  # 현재 청크의 토큰 수
        
        for sentence in sentences:
            # 문장의 예상 토큰 수 계산
            sentence_length = self._estimate_tokens(sentence)
            
            # 현재 청크가 크기를 초과하면 새 청크 시작
            if current_length + sentence_length > self.chunk_size and current_chunk:
                # 현재 청크 저장
                chunk_text = ' '.join(current_chunk)
                chunks.append(self._create_chunk(chunk_text, len(chunks), metadata))
                
                # 오버랩 처리: 이전 청크의 마지막 부분을 다음 청크에 포함
                if self.chunk_overlap > 0:
                    overlap_sentences = []
                    overlap_length = 0
                    
                    # 뒤에서부터 오버랩 크기만큼 문장 선택
                    for sent in reversed(current_chunk):
                        sent_len = self._estimate_tokens(sent)
                        if overlap_length + sent_len <= self.chunk_overlap:
                            overlap_sentences.insert(0, sent)
                            overlap_length += sent_len
                        else:
                            break
                    
                    current_chunk = overlap_sentences
                    current_length = overlap_length
                else:
                    current_chunk = []
                    current_length = 0
            
            # 현재 문장 추가
            current_chunk.append(sentence)
            current_length += sentence_length
        
        # 마지막 청크 처리
        if current_chunk:
            chunk_text = ' '.join(current_chunk)
            if len(chunk_text.strip()) >= self.min_chunk_size:
                chunks.append(self._create_chunk(chunk_text, len(chunks), metadata))
        
        return chunks
    
    def _split_sentences(self, text: str) -> List[str]:
        """
        텍스트를 문장으로 분할
        한국어 문장 종결 어미를 고려한 분할
        """
        # 문장 종결 패턴 (한국어 + 영어)
        sentence_endings = r'[.!?。]\s+'
        
        # 특수 케이스 처리 (예: 조항 번호)
        # "제1조." 같은 경우를 문장 끝으로 인식하지 않도록
        text = re.sub(r'(\d+)\.\s*(\d+)', r'\1_\2', text)  # 1.2 -> 1_2
        text = re.sub(r'(\d+)\.\s*([가-힣])', r'\1_\2', text)  # 1.가 -> 1_가
        text = re.sub(r'제(\d+)조\.', r'제\1조_', text)  # 제1조. -> 제1조_
        
        # 문장 분할
        sentences = re.split(sentence_endings, text)
        
        # 특수 케이스 복원
        sentences = [s.replace('_', '.') for s in sentences]
        
        # 빈 문장 제거 및 정리
        sentences = [s.strip() for s in sentences if s.strip()]
        
        # 너무 짧은 문장은 다음 문장과 결합
        combined_sentences = []
        temp_sentence = ""
        
        for sentence in sentences:
            if len(sentence) < 20 and temp_sentence:
                # 짧은 문장은 이전 문장과 결합
                temp_sentence += " " + sentence
            else:
                if temp_sentence:
                    combined_sentences.append(temp_sentence)
                temp_sentence = sentence
        
        if temp_sentence:
            combined_sentences.append(temp_sentence)
        
        return combined_sentences
    
    def _estimate_tokens(self, text: str) -> int:
        """
        텍스트의 토큰 수 추정
        
        한국어/영어/숫자를 고려한 추정:
        - 한글: 평균 2.5자 = 1토큰
        - 영어: 평균 4자 = 1토큰
        - 숫자: 평균 3자 = 1토큰
        """
        # 문자 유형별 개수 계산
        korean_chars = len(re.findall(r'[가-힣]', text))
        english_chars = len(re.findall(r'[a-zA-Z]', text))
        numbers = len(re.findall(r'\d', text))
        
        # 토큰 수 추정
        estimated_tokens = (
            korean_chars / 2.5 +
            english_chars / 4 +
            numbers / 3
        )
        
        return int(estimated_tokens)
    
    def _create_chunk(self, text: str, index: int, metadata: Dict = None) -> Dict:
        """
        청크 딕셔너리 생성
        """
        chunk = {
            'content': text.strip(),
            'chunk_id': index,
            'tokens': self._estimate_tokens(text),
            'metadata': metadata or {}
        }
        
        # 청크에서 핵심 키워드 추출
        keywords = self._extract_keywords(text)
        if keywords:
            chunk['keywords'] = keywords
        
        return chunk
    
    def _extract_keywords(self, text: str) -> List[str]:
        """
        텍스트에서 핵심 키워드 추출
        금융 문서의 특성을 고려한 키워드 추출
        """
        keywords = []
        
        # 조항 번호 패턴 (예: 제1조, 제2항)
        article_patterns = re.findall(r'제\d+조', text)
        keywords.extend(article_patterns[:3])  # 최대 3개
        
        # 괄호 안의 정의 (예: 전자금융거래(이하 "전자거래"라 한다))
        definitions = re.findall(r'[가-힣]+(?:\([^)]+\))', text)
        keywords.extend(definitions[:3])  # 최대 3개
        
        # 금융 관련 핵심 용어들
        finance_terms = [
            '금융', '은행', '증권', '보험', '신용', '대출', '예금',
            '전자금융', '개인정보', '암호화', '보안', '인증'
        ]
        
        # 텍스트에 포함된 금융 용어 찾기
        found_terms = [term for term in finance_terms if term in text]
        keywords.extend(found_terms[:2])  # 최대 2개
        
        # 중복 제거하여 반환
        return list(dict.fromkeys(keywords))  # 순서 유지하며 중복 제거


# 청킹 클래스 테스트
print("🔄 문서 청킹 클래스 생성 완료")
chunker = DocumentChunker(chunk_size=300, chunk_overlap=50)
print(f"청크 크기: {chunker.chunk_size} 토큰")
print(f"오버랩: {chunker.chunk_overlap} 토큰")

## 6. RAG 시스템 - 벡터 검색 시스템

In [None]:
class DocumentRetriever:
    """
    RAG의 핵심 - 벡터 기반 문서 검색 시스템
    
    특징:
    - 임베딩 모델: jhgan/ko-sbert-nli (한국어 특화)
    - 검색 방법: 코사인 유사도 + BM25 하이브리드
    - 캐싱: 인덱스 저장으로 빠른 재실행
    """
    
    def __init__(self, use_cache: bool = True):
        """
        초기화
        
        Args:
            use_cache: 캐시 사용 여부
        """
        self.use_cache = use_cache
        self.cache_path = VECTORDB_DIR / "rag_index.pkl"
        
        # 데이터 저장소
        self.documents = []  # 원본 문서들
        self.chunks = []     # 청킹된 텍스트들
        self.embeddings = None  # 벡터 임베딩
        self.embedding_model = None  # 임베딩 모델
        
        # BM25를 위한 인덱스
        self.chunk_index = defaultdict(list)  # 키워드 -> 청크 인덱스
        
        # 캐시 확인 및 로드
        if use_cache and self.cache_path.exists():
            print("📂 캐시된 인덱스 발견...")
            if self._load_cache():
                print("✅ 캐시에서 인덱스 로드 완료!")
                return
                
        print("🔄 새로운 인덱스를 생성해야 합니다.")
    
    def build_index(self, documents: List[Dict]):
        """
        검색 인덱스 구축
        
        단계:
        1. 문서 청킹
        2. 임베딩 생성
        3. BM25 인덱스 구축
        4. 캐시 저장
        """
        print("\n🔨 인덱스 구축 시작...")
        start_time = time.time()
        
        # 1. 문서 저장
        self.documents = documents
        
        # 2. 청킹
        print("📄 문서 청킹 중...")
        chunker = DocumentChunker(chunk_size=300, chunk_overlap=50)
        
        for doc in tqdm(documents, desc="문서 처리"):
            doc_chunks = chunker.chunk_document(doc)
            
            # 각 청크에 소스 정보 추가
            for chunk in doc_chunks:
                chunk['source'] = doc['metadata']['source']
                chunk['doc_type'] = doc['metadata']['type']
            
            self.chunks.extend(doc_chunks)
        
        print(f"✅ {len(self.chunks)}개 청크 생성 완료")
        
        # 3. 임베딩 생성
        self._create_embeddings()
        
        # 4. BM25 인덱스 구축
        self._build_bm25_index()
        
        # 5. 캐시 저장
        if self.use_cache:
            self._save_cache()
        
        elapsed_time = time.time() - start_time
        print(f"\n✅ 인덱스 구축 완료! (소요 시간: {elapsed_time:.1f}초)")
    
    def _create_embeddings(self):
        """
        모든 청크에 대한 벡터 임베딩 생성
        """
        try:
            from sentence_transformers import SentenceTransformer
            
            # 한국어 특화 모델 사용
            model_name = "jhgan/ko-sbert-nli"
            print(f"\n🤖 임베딩 모델 로드 중: {model_name}")
            
            self.embedding_model = SentenceTransformer(model_name)
            print("✅ 임베딩 모델 로드 완료")
            
            # 모든 청크의 텍스트 추출
            texts = [chunk['content'] for chunk in self.chunks]
            
            # 임베딩 생성 (배치 처리)
            print(f"🔄 {len(texts)}개 청크 임베딩 중...")
            self.embeddings = self.embedding_model.encode(
                texts,
                normalize_embeddings=True,  # 정규화로 코사인 유사도 계산 최적화
                show_progress_bar=True,
                batch_size=32
            )
            
            print(f"✅ 임베딩 완료 (차원: {self.embeddings.shape})")
            
        except ImportError:
            logger.warning("sentence-transformers가 설치되지 않았습니다.")
            logger.warning("BM25 검색만 사용합니다.")
            self.embeddings = None
    
    def _build_bm25_index(self):
        """
        BM25 검색을 위한 역인덱스 구축
        """
        print("\n📑 BM25 인덱스 구축 중...")
        
        for idx, chunk in enumerate(self.chunks):
            content = chunk['content'].lower()
            
            # 단어 추출 (한글, 영어, 숫자)
            words = re.findall(r'[가-힣]+|[a-zA-Z]+|\d+', content)
            
            # 각 단어에 대해 청크 인덱스 저장
            for word in set(words):  # 중복 제거
                if len(word) >= 2:  # 2글자 이상만
                    self.chunk_index[word].append(idx)
            
            # 키워드가 있으면 추가
            if 'keywords' in chunk:
                for keyword in chunk['keywords']:
                    self.chunk_index[keyword.lower()].append(idx)
        
        print(f"✅ BM25 인덱스 구축 완료: {len(self.chunk_index)}개 키워드")
    
    def search(self, query: str, top_k: int = 3, method: str = "hybrid") -> str:
        """
        문서 검색
        
        Args:
            query: 검색 쿼리
            top_k: 반환할 청크 수
            method: 검색 방법 ("similarity", "bm25", "hybrid")
            
        Returns:
            검색된 컨텍스트 문자열
        """
        if not self.chunks:
            return ""
        
        # 검색 방법별 처리
        if method == "similarity" and self.embeddings is not None:
            results = self._similarity_search(query, top_k * 2)
        elif method == "bm25":
            results = self._bm25_search(query, top_k * 2)
        elif method == "hybrid" and self.embeddings is not None:
            results = self._hybrid_search(query, top_k * 2)
        else:
            # 폴백: BM25 사용
            results = self._bm25_search(query, top_k * 2)
        
        # 상위 k개 선택 및 중복 제거
        seen_content = set()
        final_results = []
        
        for chunk_idx, score in results[:top_k*2]:
            chunk = self.chunks[chunk_idx]
            content_hash = hash(chunk['content'][:100])  # 앞부분으로 중복 체크
            
            if content_hash not in seen_content:
                seen_content.add(content_hash)
                final_results.append((chunk, score))
                
                if len(final_results) >= top_k:
                    break
        
        # 컨텍스트 생성
        contexts = []
        for chunk, score in final_results:
            source = chunk.get('source', 'Unknown')
            content = chunk['content']
            
            # 소스 정보 포함
            context = f"[출처: {source}]\n{content}"
            contexts.append(context)
        
        return "\n\n---\n\n".join(contexts)
    
    def _similarity_search(self, query: str, top_k: int) -> List[Tuple[int, float]]:
        """
        임베딩 기반 유사도 검색
        """
        # 쿼리 임베딩 생성
        query_embedding = self.embedding_model.encode(
            [query],
            normalize_embeddings=True
        )
        
        # 코사인 유사도 계산 (정규화된 벡터이므로 내적이 코사인 유사도)
        similarities = np.dot(self.embeddings, query_embedding.T).flatten()
        
        # 상위 k개 인덱스
        top_indices = np.argsort(similarities)[::-1][:top_k]
        
        # (인덱스, 점수) 튜플 리스트 반환
        results = [(int(idx), float(similarities[idx])) for idx in top_indices]
        return results
    
    def _bm25_search(self, query: str, top_k: int) -> List[Tuple[int, float]]:
        """
        BM25 기반 키워드 검색
        """
        query_lower = query.lower()
        query_words = re.findall(r'[가-힣]+|[a-zA-Z]+|\d+', query_lower)
        
        # BM25 파라미터
        k1 = 1.2  # 단어 빈도 포화도
        b = 0.75  # 문서 길이 정규화
        
        # 평균 문서 길이
        avg_len = np.mean([len(chunk['content']) for chunk in self.chunks])
        
        # 각 청크에 대한 점수 계산
        scores = defaultdict(float)
        
        for word in query_words:
            if word in self.chunk_index:
                # IDF 계산
                df = len(self.chunk_index[word])  # 문서 빈도
                idf = np.log((len(self.chunks) - df + 0.5) / (df + 0.5) + 1)
                
                # 각 문서에 대한 BM25 점수
                for chunk_idx in self.chunk_index[word]:
                    chunk = self.chunks[chunk_idx]
                    tf = chunk['content'].lower().count(word)  # 단어 빈도
                    doc_len = len(chunk['content'])
                    
                    # BM25 점수 계산
                    score = idf * (tf * (k1 + 1)) / (tf + k1 * (1 - b + b * doc_len / avg_len))
                    scores[chunk_idx] += score
        
        # 점수순 정렬
        sorted_scores = sorted(scores.items(), key=lambda x: x[1], reverse=True)
        return sorted_scores[:top_k]
    
    def _hybrid_search(self, query: str, top_k: int) -> List[Tuple[int, float]]:
        """
        하이브리드 검색 (유사도 + BM25)
        """
        # 각각의 검색 수행
        similarity_results = self._similarity_search(query, top_k)
        bm25_results = self._bm25_search(query, top_k)
        
        # 점수를 딕셔너리로 변환
        similarity_scores = {idx: score for idx, score in similarity_results}
        bm25_scores = {idx: score for idx, score in bm25_results}
        
        # 모든 고유 인덱스
        all_indices = set(similarity_scores.keys()) | set(bm25_scores.keys())
        
        # 점수 정규화 및 결합
        hybrid_scores = {}
        
        # 최대값으로 정규화
        max_sim = max(similarity_scores.values()) if similarity_scores else 1
        max_bm25 = max(bm25_scores.values()) if bm25_scores else 1
        
        for idx in all_indices:
            # 정규화된 점수 (없으면 0)
            norm_sim = similarity_scores.get(idx, 0) / max_sim
            norm_bm25 = bm25_scores.get(idx, 0) / max_bm25
            
            # 가중 평균 (유사도 0.7, BM25 0.3)
            hybrid_scores[idx] = 0.7 * norm_sim + 0.3 * norm_bm25
        
        # 정렬
        sorted_scores = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True)
        return sorted_scores[:top_k]
    
    def get_random_chunks(self, n: int = 3) -> str:
        """
        랜덤 청크 가져오기 (데이터 생성시 다양성 확보용)
        """
        if not self.chunks:
            return ""
        
        # 랜덤하게 n개 청크 선택
        selected_chunks = random.sample(self.chunks, min(n, len(self.chunks)))
        
        contexts = []
        for chunk in selected_chunks:
            source = chunk.get('source', 'Unknown')
            content = chunk['content']
            context = f"[출처: {source}]\n{content}"
            contexts.append(context)
        
        return "\n\n---\n\n".join(contexts)
    
    def get_statistics(self) -> Dict:
        """
        통계 정보 반환
        """
        stats = {
            'total_documents': len(self.documents),
            'total_chunks': len(self.chunks),
            'total_keywords': len(self.chunk_index),
            'avg_chunk_size': np.mean([chunk.get('tokens', 0) for chunk in self.chunks]) if self.chunks else 0,
            'has_embeddings': self.embeddings is not None
        }
        
        # 문서 타입별 통계
        if self.chunks:
            doc_types = Counter(chunk.get('doc_type', 'Unknown') for chunk in self.chunks)
            stats['document_types'] = dict(doc_types)
        
        return stats
    
    def _save_cache(self):
        """
        인덱스를 캐시 파일로 저장
        """
        print("\n💾 인덱스 캐시 저장 중...")
        
        cache_data = {
            'chunks': self.chunks,
            'chunk_index': dict(self.chunk_index),
            'embeddings': self.embeddings,
            'timestamp': datetime.now().isoformat()
        }
        
        with open(self.cache_path, 'wb') as f:
            pickle.dump(cache_data, f)
        
        print(f"✅ 캐시 저장 완료: {self.cache_path}")
    
    def _load_cache(self) -> bool:
        """
        캐시 파일에서 인덱스 로드
        """
        try:
            with open(self.cache_path, 'rb') as f:
                cache_data = pickle.load(f)
            
            self.chunks = cache_data['chunks']
            self.chunk_index = defaultdict(list, cache_data['chunk_index'])
            self.embeddings = cache_data['embeddings']
            
            print(f"📅 캐시 생성 시간: {cache_data.get('timestamp', 'Unknown')}")
            print(f"📊 로드된 청크 수: {len(self.chunks)}")
            
            # 임베딩 모델 로드 (필요시)
            if self.embeddings is not None:
                from sentence_transformers import SentenceTransformer
                self.embedding_model = SentenceTransformer("jhgan/ko-sbert-nli")
            
            return True
            
        except Exception as e:
            logger.error(f"캐시 로드 실패: {e}")
            return False


# 검색 시스템 테스트
print("🔄 문서 검색 시스템 클래스 생성 완료")
print("지원 검색 방법: similarity (벡터), bm25 (키워드), hybrid (결합)")

## 7. RAG 시스템 통합 클래스

In [None]:
class RAGSystem:
    """
    RAG 컴포넌트 통합 관리 클래스
    
    주요 기능:
    - 문서 로드 → 청킹 → 인덱싱 → 검색
    - 캐시 관리로 빠른 재실행
    - 통합 인터페이스 제공
    """
    
    def __init__(self):
        """컴포넌트 초기화"""
        self.loader = DocumentLoader()
        self.chunker = DocumentChunker()
        self.retriever = DocumentRetriever()
        self.is_initialized = False
        
    def initialize(self, data_dir: Path, force_rebuild: bool = False):
        """
        RAG 시스템 초기화
        
        Args:
            data_dir: 문서가 있는 디렉토리
            force_rebuild: 캐시 무시하고 재구축
        """
        print("=" * 60)
        print("🚀 RAG 시스템 초기화")
        print("=" * 60)
        
        # 캐시 확인
        cache_exists = (VECTORDB_DIR / "rag_index.pkl").exists()
        
        if cache_exists and not force_rebuild:
            # 캐시가 있으면 로드 시도
            if self.retriever.chunks:  # 이미 로드됨
                print("✅ RAG 시스템이 이미 초기화되어 있습니다.")
                self.is_initialized = True
                return
        
        # 새로 구축이 필요한 경우
        if force_rebuild:
            print("🔄 강제 재구축 모드")
        
        # 1. 문서 로드
        print("\n[1/3] 문서 로드 중...")
        documents = self.loader.load_directory(data_dir)
        
        if not documents:
            print("⚠️ 로드된 문서가 없습니다!")
            print(f"   {data_dir} 디렉토리에 PDF, TXT, Excel 파일을 추가하세요.")
            return
        
        print(f"\n📚 총 {len(documents)}개 문서 로드 완료")
        
        # 2. 인덱스 구축
        print("\n[2/3] 인덱스 구축 중...")
        self.retriever.build_index(documents)
        
        # 3. 통계 출력
        print("\n[3/3] 초기화 완료")
        stats = self.retriever.get_statistics()
        self._print_statistics(stats)
        
        self.is_initialized = True
    
    def search(self, query: str, top_k: int = 3, method: str = "hybrid") -> str:
        """
        관련 컨텍스트 검색
        
        Args:
            query: 검색 쿼리
            top_k: 반환할 청크 수
            method: 검색 방법
            
        Returns:
            검색된 컨텍스트
        """
        if not self.is_initialized:
            logger.warning("RAG 시스템이 초기화되지 않았습니다.")
            return ""
        
        return self.retriever.search(query, top_k, method)
    
    def get_random_context(self, n: int = 2) -> str:
        """
        랜덤 컨텍스트 가져오기
        
        Args:
            n: 가져올 청크 수
            
        Returns:
            랜덤 컨텍스트
        """
        if not self.is_initialized:
            logger.warning("RAG 시스템이 초기화되지 않았습니다.")
            return ""
        
        return self.retriever.get_random_chunks(n)
    
    def _print_statistics(self, stats: Dict):
        """
        통계 정보 출력
        """
        print("\n" + "=" * 60)
        print("📊 RAG 시스템 통계")
        print("=" * 60)
        print(f"문서 수: {stats['total_documents']}개")
        print(f"청크 수: {stats['total_chunks']}개")
        print(f"인덱싱된 키워드: {stats['total_keywords']}개")
        print(f"평균 청크 크기: {stats['avg_chunk_size']:.1f} 토큰")
        print(f"벡터 임베딩: {'활성화' if stats['has_embeddings'] else '비활성화'}")
        
        if 'document_types' in stats:
            print("\n문서 타입별 분포:")
            for doc_type, count in stats['document_types'].items():
                print(f"  - {doc_type}: {count}개")
        
        print("=" * 60)
    
    def rebuild_index(self, data_dir: Path):
        """
        인덱스 재구축
        """
        print("🔄 인덱스 재구축 시작...")
        self.initialize(data_dir, force_rebuild=True)
    
    def test_search(self, test_queries: List[str] = None):
        """
        검색 기능 테스트
        """
        if not self.is_initialized:
            print("⚠️ RAG 시스템을 먼저 초기화하세요.")
            return
        
        # 기본 테스트 쿼리
        if test_queries is None:
            test_queries = [
                "전자금융거래",
                "개인정보보호",
                "암호화 기술",
                "금융보안"
            ]
        
        print("\n" + "=" * 60)
        print("🔍 RAG 검색 테스트")
        print("=" * 60)
        
        for query in test_queries:
            print(f"\n📌 검색어: '{query}'")
            print("-" * 40)
            
            # 하이브리드 검색
            result = self.search(query, top_k=2, method="hybrid")
            
            if result:
                # 결과를 200자로 제한하여 출력
                preview = result[:200] + "..." if len(result) > 200 else result
                print(preview)
            else:
                print("검색 결과 없음")
        
        print("\n" + "=" * 60)


# RAG 시스템 생성
print("✅ RAG 시스템 통합 클래스 생성 완료")
rag_system = RAGSystem()
print("\nRAG 시스템 사용 준비 완료!")
print("다음 명령으로 초기화하세요:")
print("  rag_system.initialize(EXTERNAL_DIR)")

## 8. RAG 시스템 초기화 및 테스트

In [None]:
# RAG 시스템 초기화
# 주의: 외부 문서가 없으면 작동하지 않습니다!

# 외부 데이터 디렉토리 확인
print("📁 외부 데이터 디렉토리 확인...")
print(f"경로: {EXTERNAL_DIR}")

# 디렉토리의 파일 목록 확인
if EXTERNAL_DIR.exists():
    files = list(EXTERNAL_DIR.glob("*"))
    if files:
        print(f"\n발견된 파일 ({len(files)}개):")
        for file in files[:10]:  # 최대 10개만 표시
            print(f"  - {file.name}")
        if len(files) > 10:
            print(f"  ... 외 {len(files)-10}개")
    else:
        print("\n⚠️ 디렉토리가 비어있습니다!")
        print("PDF, TXT, Excel 형식의 금융 문서를 추가하세요.")
else:
    print("\n❌ 디렉토리가 존재하지 않습니다!")
    EXTERNAL_DIR.mkdir(parents=True, exist_ok=True)
    print(f"✅ 디렉토리 생성 완료: {EXTERNAL_DIR}")

# RAG 초기화 실행
print("\n" + "="*60)
choice = input("RAG 시스템을 초기화하시겠습니까? (y/n): ")

if choice.lower() == 'y':
    # 초기화 실행
    rag_system.initialize(EXTERNAL_DIR)
    
    # 초기화 성공시 테스트
    if rag_system.is_initialized:
        print("\n✅ RAG 시스템 초기화 성공!")
        
        # 검색 테스트 실행 여부 확인
        test_choice = input("\n검색 테스트를 실행하시겠습니까? (y/n): ")
        if test_choice.lower() == 'y':
            rag_system.test_search()
else:
    print("\n초기화를 건너뜁니다.")
    print("나중에 다음 명령으로 초기화할 수 있습니다:")
    print("  rag_system.initialize(EXTERNAL_DIR)")

## 9. RAG 검색 예제 및 활용법

In [None]:
# RAG 검색 활용 예제

if rag_system.is_initialized:
    print("=" * 60)
    print("📚 RAG 검색 활용 예제")
    print("=" * 60)
    
    # 1. 특정 주제 검색
    print("\n1️⃣ 특정 주제 검색")
    query = "개인정보보호법"
    result = rag_system.search(query, top_k=2)
    print(f"검색어: '{query}'")
    print(f"결과:\n{result[:300]}...\n")
    
    # 2. 다양한 검색 방법 비교
    print("\n2️⃣ 검색 방법 비교")
    query = "암호화"
    
    # BM25 검색
    bm25_result = rag_system.search(query, top_k=1, method="bm25")
    print(f"BM25 검색 결과 길이: {len(bm25_result)}자")
    
    # 벡터 유사도 검색
    if rag_system.retriever.embeddings is not None:
        sim_result = rag_system.search(query, top_k=1, method="similarity")
        print(f"유사도 검색 결과 길이: {len(sim_result)}자")
    
    # 하이브리드 검색
    hybrid_result = rag_system.search(query, top_k=1, method="hybrid")
    print(f"하이브리드 검색 결과 길이: {len(hybrid_result)}자")
    
    # 3. 랜덤 컨텍스트 가져오기
    print("\n3️⃣ 랜덤 컨텍스트 (데이터 생성용)")
    random_context = rag_system.get_random_context(n=2)
    print(f"랜덤 컨텍스트:\n{random_context[:300]}...\n")
    
    # 4. 복합 쿼리 예제
    print("\n4️⃣ 복합 쿼리 예제")
    complex_query = "전자금융거래 보안 인증"
    result = rag_system.search(complex_query, top_k=3)
    print(f"복합 검색어: '{complex_query}'")
    print(f"검색된 청크 수: {len(result.split('---'))}개")
    
else:
    print("⚠️ RAG 시스템이 초기화되지 않았습니다.")
    print("먼저 위의 셀에서 RAG 시스템을 초기화하세요.")

## 10. RAG 모듈 저장 (추론용)

In [None]:
# 추론에서도 사용할 RAG 모듈을 별도 파일로 저장
# 이렇게 하면 추론 노트북에서도 동일한 RAG 시스템을 사용할 수 있습니다.

rag_module_content = '''
"""
RAG 모듈 (경량화 버전)
추론 시에도 사용할 수 있는 최소한의 RAG 기능만 포함
"""

import pickle
import numpy as np
from pathlib import Path
from typing import List, Dict, Tuple
import re
from collections import defaultdict


class LightweightRAG:
    """추론용 경량 RAG 시스템"""
    
    def __init__(self, index_path: str):
        """사전 구축된 인덱스 로드"""
        self.index_path = Path(index_path)
        self.chunks = []
        self.chunk_index = defaultdict(list)
        self.embeddings = None
        self._load_index()
    
    def _load_index(self):
        """인덱스 로드"""
        if not self.index_path.exists():
            raise FileNotFoundError(f"인덱스 파일이 없습니다: {self.index_path}")
        
        with open(self.index_path, 'rb') as f:
            data = pickle.load(f)
        
        self.chunks = data['chunks']
        self.chunk_index = defaultdict(list, data['chunk_index'])
        self.embeddings = data.get('embeddings')
    
    def search(self, query: str, top_k: int = 3) -> str:
        """BM25 기반 검색 (임베딩 불필요)"""
        query_words = re.findall(r'[가-힣]+|[a-zA-Z]+|\\d+', query.lower())
        
        # BM25 점수 계산
        scores = defaultdict(float)
        for word in query_words:
            if word in self.chunk_index:
                for idx in self.chunk_index[word]:
                    scores[idx] += 1.0
        
        # 상위 k개 선택
        top_indices = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
        
        # 컨텍스트 생성
        contexts = []
        for idx, _ in top_indices:
            chunk = self.chunks[idx]
            context = f"[출처: {chunk.get('source', 'Unknown')}]\\n{chunk['content']}"
            contexts.append(context)
        
        return "\\n\\n---\\n\\n".join(contexts)
'''

# 파일 저장
rag_module_path = PROJECT_ROOT / "rag_module.py"
with open(rag_module_path, 'w', encoding='utf-8') as f:
    f.write(rag_module_content)

print("✅ RAG 모듈 저장 완료!")
print(f"저장 위치: {rag_module_path}")
print("\n이 파일을 추론 노트북에서 import하여 사용할 수 있습니다:")
print("  from rag_module import LightweightRAG")
print("  rag = LightweightRAG('data/vectordb/rag_index.pkl')")
print("  context = rag.search('검색어')")

## 🔥 Stage 2: 데이터 생성 및 품질 관리

### 11. 품질 평가 시스템

FSKU 평가 기준에 맞춘 다차원 품질 평가 시스템을 구현합니다.

In [None]:
class QualityEvaluator:
    """
    FSKU 기준에 맞춘 품질 평가 시스템
    
    평가 차원:
    1. 형식 (Format): 문제 구조, 선택지 형식
    2. 내용 (Content): 금융 관련성, 정확성
    3. 난이도 (Difficulty): 적절한 난이도
    4. 명확성 (Clarity): 모호하지 않은 표현
    """
    
    def __init__(self):
        """평가 시스템 초기화"""
        # 평가 가중치 설정
        self.weights = {
            'format': 0.25,      # 형식 적절성
            'content': 0.35,     # 내용 품질
            'difficulty': 0.20,  # 난이도 적절성
            'clarity': 0.20      # 명확성
        }
        
        # 통계 정보
        self.evaluation_stats = {
            'total_evaluated': 0,
            'passed': 0,
            'failed': 0,
            'avg_score': 0,
            'score_distribution': defaultdict(int)
        }
        
        # 금융 키워드 (기존 품질 평가 모듈에서 가져옴)
        self.finance_keywords = [
            '금융', '은행', '증권', '보험', '예금', '대출', '투자',
            '전자금융', '핀테크', '블록체인', '암호화폐', '디지털자산',
            '신용', '리스크', '규제', '감독', '준법', '개인정보',
            'KYC', 'AML', '자금세탁', '보안', '인증', '본인확인'
        ]
        
        # 모호한 표현들
        self.ambiguous_terms = [
            '대략', '아마', '어느정도', '일부', '몇몇', '약간',
            '가능한', '일반적으로', '보통', '대체로', '종종'
        ]
        
        # FSKU 문제 유형 패턴
        self.question_patterns = {
            '객관식': {
                'markers': ['다음 중', '올바른 것은', '맞는 것은', '틀린 것은', '해당하는 것은'],
                'options': [r'[1-5]\)', r'[가-마]\)', r'[A-E]\)', r'①②③④⑤']
            },
            '주관식': {
                'markers': ['설명하시오', '서술하시오', '무엇인가', '정의하시오', '비교하시오'],
                'requirements': ['이유', '근거', '예시', '특징']
            },
            '계산': {
                'markers': ['계산하시오', '산출하시오', '구하시오', '값은'],
                'units': ['%', '원', '달러', '배', '비율']
            }
        }
    
    def evaluate(self, question: str, answer: str) -> Dict[str, Any]:
        """
        문제-답변 쌍 평가
        
        Args:
            question: 생성된 문제
            answer: 생성된 답변
            
        Returns:
            평가 결과 딕셔너리
        """
        # 각 차원별 평가
        scores = {
            'format': self._evaluate_format(question, answer),
            'content': self._evaluate_content(question, answer),
            'difficulty': self._evaluate_difficulty(question, answer),
            'clarity': self._evaluate_clarity(question, answer)
        }
        
        # 가중 평균 계산
        total_score = sum(
            scores[dim] * self.weights[dim] 
            for dim in scores
        )
        
        # 문제 유형 판별
        question_type = self._identify_question_type(question)
        
        # 세부 분석
        analysis = {
            'question_type': question_type,
            'has_options': self._has_options(question),
            'keyword_count': self._count_keywords(question + answer),
            'length': len(question) + len(answer),
            'ambiguity_count': self._count_ambiguous_terms(question + answer)
        }
        
        # 통계 업데이트
        self._update_stats(total_score)
        
        # 결과 반환
        return {
            'total_score': round(total_score, 1),
            'dimension_scores': scores,
            'passed': total_score >= 70,
            'analysis': analysis,
            'feedback': self._generate_feedback(scores, analysis)
        }
    
    def _evaluate_format(self, question: str, answer: str) -> float:
        """
        형식 평가
        - 문제 구조의 적절성
        - 선택지 형식 (객관식의 경우)
        - 질문 명확성
        """
        score = 50  # 기본 점수
        
        # 질문 마크 확인
        if '?' in question or any(end in question for end in ['는가', '까', '인가']):
            score += 10
        
        # 문제 유형별 형식 체크
        question_type = self._identify_question_type(question)
        
        if question_type == '객관식':
            # 선택지 체크
            if self._has_proper_options(question):
                score += 20
            # 선택지 개수 체크 (4-5개가 이상적)
            option_count = self._count_options(question)
            if 4 <= option_count <= 5:
                score += 10
        elif question_type == '주관식':
            # 명확한 지시사항 체크
            if any(marker in question for marker in self.question_patterns['주관식']['markers']):
                score += 20
        elif question_type == '계산':
            # 단위 명시 체크
            if any(unit in question or unit in answer for unit in self.question_patterns['계산']['units']):
                score += 15
        
        # 구조화 체크 (줄바꿈 사용)
        if '\n' in question and question.count('\n') >= 2:
            score += 10
        
        return min(100, score)
    
    def _evaluate_content(self, question: str, answer: str) -> float:
        """
        내용 평가
        - 금융 관련성
        - 정확성 (답변의 적절성)
        - 전문성
        """
        score = 40  # 기본 점수
        text = question + ' ' + answer
        
        # 금융 키워드 포함도
        keyword_count = self._count_keywords(text)
        if keyword_count >= 3:
            score += 30
        elif keyword_count >= 2:
            score += 20
        elif keyword_count >= 1:
            score += 10
        
        # 전문 용어 사용 (영문 약어 등)
        professional_terms = re.findall(r'[A-Z]{2,}', text)
        if professional_terms:
            score += 10
        
        # 구체적인 예시나 규정 언급
        if any(pattern in text for pattern in ['제\d+조', '제\d+항', '예를 들어', '예시']):
            score += 10
        
        # 답변의 구체성 (주관식)
        if self._identify_question_type(question) == '주관식':
            if len(answer) > 100:  # 충분한 설명
                score += 10
        
        return min(100, score)
    
    def _evaluate_difficulty(self, question: str, answer: str) -> float:
        """
        난이도 평가
        - 너무 쉽거나 어렵지 않은 적절한 수준
        - FSKU 시험 수준에 맞는지
        """
        score = 70  # 기본 점수 (중간 난이도)
        
        # 문제 길이로 복잡도 추정
        question_length = len(question)
        if 100 <= question_length <= 300:
            score += 10
        elif question_length < 50:
            score -= 20  # 너무 단순
        elif question_length > 500:
            score -= 10  # 너무 복잡
        
        # 전문 용어 수로 난이도 추정
        keyword_count = self._count_keywords(question + answer)
        if 2 <= keyword_count <= 4:
            score += 10
        elif keyword_count > 6:
            score -= 10  # 너무 전문적
        
        # 계산 문제의 경우
        if self._identify_question_type(question) == '계산':
            # 숫자의 복잡도 체크
            numbers = re.findall(r'\d+', question)
            if numbers and all(int(n) < 10000 for n in numbers if n.isdigit()):
                score += 10  # 적절한 숫자 범위
        
        return max(0, min(100, score))
    
    def _evaluate_clarity(self, question: str, answer: str) -> float:
        """
        명확성 평가
        - 모호한 표현 없음
        - 이해하기 쉬운 문장
        - 일관된 용어 사용
        """
        score = 100  # 시작 점수
        text = question + ' ' + answer
        
        # 모호한 표현 체크
        ambiguity_count = self._count_ambiguous_terms(text)
        score -= ambiguity_count * 10
        
        # 이중 부정 체크
        if any(pattern in text for pattern in ['않지 않', '없지 않', '못하지 않']):
            score -= 20
        
        # 너무 많은 조건문
        condition_count = text.count('만약') + text.count('경우') + text.count('때')
        if condition_count > 3:
            score -= 15
        
        # 문장 길이 체크 (너무 긴 문장은 이해하기 어려움)
        sentences = text.split('.')
        long_sentences = [s for s in sentences if len(s.strip()) > 150]
        if long_sentences:
            score -= len(long_sentences) * 5
        
        # 일관된 어미 사용 체크
        if question.endswith('시오.') or question.endswith('하라.'):
            score += 5  # 일관된 명령형
        
        return max(0, score)
    
    def _identify_question_type(self, question: str) -> str:
        """문제 유형 판별"""
        # 객관식 체크
        for marker in self.question_patterns['객관식']['markers']:
            if marker in question:
                return '객관식'
        for option_pattern in self.question_patterns['객관식']['options']:
            if re.search(option_pattern, question):
                return '객관식'
        
        # 계산 문제 체크
        for marker in self.question_patterns['계산']['markers']:
            if marker in question:
                return '계산'
        
        # 나머지는 주관식
        return '주관식'
    
    def _has_options(self, question: str) -> bool:
        """선택지 존재 여부 확인"""
        for pattern in self.question_patterns['객관식']['options']:
            if re.search(pattern, question):
                return True
        return False
    
    def _has_proper_options(self, question: str) -> bool:
        """적절한 형식의 선택지 확인"""
        # 일관된 선택지 형식 체크
        option_formats = []
        for pattern in self.question_patterns['객관식']['options']:
            if re.findall(pattern, question):
                option_formats.append(pattern)
        
        # 하나의 일관된 형식만 사용하는지 체크
        return len(option_formats) == 1
    
    def _count_options(self, question: str) -> int:
        """선택지 개수 세기"""
        max_count = 0
        for pattern in self.question_patterns['객관식']['options']:
            matches = re.findall(pattern, question)
            max_count = max(max_count, len(matches))
        return max_count
    
    def _count_keywords(self, text: str) -> int:
        """금융 키워드 개수 세기"""
        count = 0
        text_lower = text.lower()
        for keyword in self.finance_keywords:
            if keyword.lower() in text_lower:
                count += 1
        return count
    
    def _count_ambiguous_terms(self, text: str) -> int:
        """모호한 표현 개수 세기"""
        count = 0
        for term in self.ambiguous_terms:
            count += text.count(term)
        return count
    
    def _generate_feedback(self, scores: Dict, analysis: Dict) -> List[str]:
        """개선 피드백 생성"""
        feedback = []
        
        # 점수가 낮은 차원에 대한 피드백
        for dim, score in scores.items():
            if score < 70:
                if dim == 'format':
                    feedback.append("📝 형식 개선: 명확한 질문 구조와 일관된 선택지 형식을 사용하세요.")
                elif dim == 'content':
                    feedback.append("📚 내용 강화: 더 많은 금융 전문 용어와 구체적인 예시를 포함하세요.")
                elif dim == 'difficulty':
                    feedback.append("🎯 난이도 조정: FSKU 시험 수준에 맞는 적절한 난이도로 조정하세요.")
                elif dim == 'clarity':
                    feedback.append("✨ 명확성 향상: 모호한 표현을 제거하고 간결한 문장을 사용하세요.")
        
        # 추가 분석 기반 피드백
        if analysis['ambiguity_count'] > 2:
            feedback.append("⚠️ 모호한 표현이 많습니다. 구체적이고 명확한 표현으로 수정하세요.")
        
        if analysis['length'] < 100:
            feedback.append("📏 문제가 너무 짧습니다. 더 구체적인 상황 설명을 추가하세요.")
        
        if analysis['keyword_count'] < 2:
            feedback.append("🏦 금융 관련 키워드가 부족합니다. 전문 용어를 더 포함하세요.")
        
        return feedback
    
    def _update_stats(self, score: float):
        """통계 정보 업데이트"""
        self.evaluation_stats['total_evaluated'] += 1
        
        if score >= 70:
            self.evaluation_stats['passed'] += 1
        else:
            self.evaluation_stats['failed'] += 1
        
        # 평균 점수 업데이트
        n = self.evaluation_stats['total_evaluated']
        prev_avg = self.evaluation_stats['avg_score']
        self.evaluation_stats['avg_score'] = (prev_avg * (n-1) + score) / n
        
        # 점수 분포 업데이트
        score_range = int(score // 10) * 10  # 10점 단위
        self.evaluation_stats['score_distribution'][score_range] += 1
    
    def get_statistics(self) -> Dict:
        """평가 통계 반환"""
        return self.evaluation_stats.copy()
    
    def reset_statistics(self):
        """통계 초기화"""
        self.evaluation_stats = {
            'total_evaluated': 0,
            'passed': 0,
            'failed': 0,
            'avg_score': 0,
            'score_distribution': defaultdict(int)
        }


# 품질 평가 시스템 테스트
print("✅ 품질 평가 시스템 생성 완료")
evaluator = QualityEvaluator()

# 테스트 예제
test_question = """
다음 중 전자금융거래법상 금융회사가 준수해야 할 사항으로 틀린 것은?

1) 전자금융거래 시 본인확인 절차를 거쳐야 한다.
2) 고객의 개인정보를 암호화하여 보관해야 한다.
3) 전자금융사고 발생 시 24시간 이내에 신고해야 한다.
4) 고객의 동의 없이 제3자에게 정보를 제공할 수 있다.
5) 정기적으로 보안 취약점 점검을 실시해야 한다.
"""

test_answer = "정답: 4번. 고객의 동의 없이 제3자에게 정보를 제공하는 것은 개인정보보호법 위반입니다."

# 평가 실행
result = evaluator.evaluate(test_question, test_answer)

print("\n📊 평가 결과:")
print(f"총점: {result['total_score']}점")
print(f"통과 여부: {'✅ 통과' if result['passed'] else '❌ 미통과'}")
print(f"\n차원별 점수:")
for dim, score in result['dimension_scores'].items():
    print(f"  - {dim}: {score}점")
print(f"\n분석:")
print(f"  - 문제 유형: {result['analysis']['question_type']}")
print(f"  - 금융 키워드 수: {result['analysis']['keyword_count']}개")
print(f"\n피드백:")
for feedback in result['feedback']:
    print(f"  {feedback}")

### 12. 체이닝 데이터 생성기

Chain-of-Thought를 활용한 고품질 데이터 생성 시스템

In [None]:
class ChainingDataGenerator:
    """
    Chain-of-Thought를 활용한 체이닝 데이터 생성기
    
    생성 프로세스:
    1. 초기 생성 (Initial Generation)
    2. 자가 검증 (Self-Verification)
    3. 개선 생성 (Improvement)
    4. 최종 검증 (Final Check)
    """
    
    def __init__(self, model_name: str = "beomi/llama-2-ko-7b"):
        """
        초기화
        
        Args:
            model_name: 사용할 LLM 모델명
        """
        self.model_name = model_name
        self.model = None
        self.tokenizer = None
        
        # 생성 통계
        self.generation_stats = {
            'total_attempts': 0,
            'successful': 0,
            'failed': 0,
            'improvement_rate': 0,
            'avg_iterations': 0
        }
        
        # 프롬프트 템플릿
        self.prompts = self._load_prompt_templates()
        
        # 품질 평가기
        self.evaluator = QualityEvaluator()
        
        # 캐시 (동일한 컨텍스트에 대한 중복 생성 방지)
        self.generation_cache = {}
    
    def initialize_model(self, use_quantization: bool = True):
        """
        모델 초기화
        
        Args:
            use_quantization: 4bit 양자화 사용 여부
        """
        print(f"🤖 모델 로드 중: {self.model_name}")
        
        try:
            # 토크나이저 로드
            self.tokenizer = AutoTokenizer.from_pretrained(
                self.model_name,
                trust_remote_code=True
            )
            
            # 패딩 토큰 설정
            if self.tokenizer.pad_token is None:
                self.tokenizer.pad_token = self.tokenizer.eos_token
            
            # 모델 설정
            if use_quantization:
                # 4bit 양자화 설정
                bnb_config = BitsAndBytesConfig(
                    load_in_4bit=True,
                    bnb_4bit_quant_type="nf4",
                    bnb_4bit_compute_dtype=torch.float16,
                    bnb_4bit_use_double_quant=True
                )
                
                self.model = AutoModelForCausalLM.from_pretrained(
                    self.model_name,
                    quantization_config=bnb_config,
                    device_map="auto",
                    trust_remote_code=True,
                    torch_dtype=torch.float16
                )
            else:
                # 일반 로드
                self.model = AutoModelForCausalLM.from_pretrained(
                    self.model_name,
                    device_map="auto",
                    trust_remote_code=True,
                    torch_dtype=torch.float16
                )
            
            print("✅ 모델 로드 완료!")
            
            # 메모리 사용량 출력
            if torch.cuda.is_available():
                memory_used = torch.cuda.memory_allocated() / 1024**3
                print(f"💾 GPU 메모리 사용량: {memory_used:.2f}GB")
                
        except Exception as e:
            logger.error(f"모델 로드 실패: {e}")
            raise
    
    def _load_prompt_templates(self) -> Dict[str, str]:
        """프롬프트 템플릿 로드"""
        templates = {
            'initial_generation': """당신은 한국 금융감독원의 FSKU(금융전문지식자격시험) 출제위원입니다.
다음 금융 문서를 참고하여 FSKU 시험에 출제될 수 있는 고품질 문제를 생성하세요.

### 참고 문서:
{context}

### 생성 지침:
1. 문제 유형: {question_type}
2. 난이도: 중상 (FSKU 실제 시험 수준)
3. 금융 전문 용어를 정확히 사용하세요
4. 실무에서 중요한 내용을 다루세요
5. 명확하고 모호하지 않은 표현을 사용하세요

### 생성할 문제:
""",
            
            'self_verification': """다음 생성된 문제를 검토하고 문제점을 찾아주세요.

### 생성된 문제:
{question}

### 생성된 답변:
{answer}

### 검토 기준:
1. 문제가 명확하고 이해하기 쉬운가?
2. 금융 전문 용어가 정확히 사용되었는가?
3. 답변이 정확하고 충분한 설명을 포함하는가?
4. FSKU 시험 수준에 적합한가?
5. 모호하거나 논란의 여지가 있는 부분은 없는가?

### 문제점 분석:
""",
            
            'improvement': """다음 문제와 피드백을 바탕으로 개선된 버전을 생성하세요.

### 원본 문제:
{question}

### 원본 답변:
{answer}

### 피드백:
{feedback}

### 개선 지침:
1. 지적된 문제점을 모두 수정하세요
2. 더 명확하고 전문적인 표현을 사용하세요
3. 필요시 구체적인 예시나 규정을 추가하세요
4. 답변의 설명을 더 충실하게 작성하세요

### 개선된 문제:
""",
            
            'final_check': """최종 생성된 문제를 검토하고 FSKU 시험 출제 가능 여부를 판단하세요.

### 최종 문제:
{question}

### 최종 답변:
{answer}

### 평가 항목:
1. FSKU 출제 가능성 (적합/부적합)
2. 전반적인 품질 (1-10점)
3. 개선이 필요한 부분 (있다면)
4. 최종 의견

### 평가 결과:
"""
        }
        
        return templates
    
    def generate_qa_pair(self, 
                        context: str, 
                        question_type: str = "객관식",
                        max_iterations: int = 3) -> Optional[Dict]:
        """
        체이닝을 통한 QA 쌍 생성
        
        Args:
            context: RAG에서 가져온 컨텍스트
            question_type: 문제 유형 (객관식/주관식/계산)
            max_iterations: 최대 개선 반복 횟수
            
        Returns:
            생성된 QA 쌍 또는 None
        """
        # 캐시 확인
        cache_key = hash(context[:200] + question_type)
        if cache_key in self.generation_cache:
            logger.info("캐시에서 결과 반환")
            return self.generation_cache[cache_key]
        
        self.generation_stats['total_attempts'] += 1
        
        try:
            # 1단계: 초기 생성
            print("🔄 [1/4] 초기 문제 생성 중...")
            initial_qa = self._initial_generation(context, question_type)
            if not initial_qa:
                raise ValueError("초기 생성 실패")
            
            current_question = initial_qa['question']
            current_answer = initial_qa['answer']
            
            # 2단계: 반복적 개선
            iteration_count = 0
            for i in range(max_iterations):
                iteration_count += 1
                print(f"🔄 [{2+i*2}/4] 자가 검증 중...")
                
                # 자가 검증
                verification = self._self_verification(current_question, current_answer)
                
                # 문제점이 없으면 종료
                if "문제없음" in verification or "적합" in verification:
                    break
                
                # 개선
                print(f"🔄 [{3+i*2}/4] 개선 생성 중...")
                improved_qa = self._improvement_generation(
                    current_question, 
                    current_answer, 
                    verification
                )
                
                if improved_qa:
                    current_question = improved_qa['question']
                    current_answer = improved_qa['answer']
            
            # 3단계: 최종 검증
            print("🔄 [4/4] 최종 검증 중...")
            final_check = self._final_check(current_question, current_answer)
            
            # 4단계: 품질 평가
            evaluation = self.evaluator.evaluate(current_question, current_answer)
            
            # 결과 준비
            result = {
                'question': current_question,
                'answer': current_answer,
                'context': context,
                'question_type': question_type,
                'quality_score': evaluation['total_score'],
                'passed': evaluation['passed'],
                'iterations': iteration_count,
                'final_check': final_check,
                'metadata': {
                    'timestamp': datetime.now().isoformat(),
                    'model': self.model_name,
                    'dimension_scores': evaluation['dimension_scores']
                }
            }
            
            # 통과한 경우만 캐시에 저장
            if result['passed']:
                self.generation_cache[cache_key] = result
                self.generation_stats['successful'] += 1
            else:
                self.generation_stats['failed'] += 1
            
            # 통계 업데이트
            self._update_stats(iteration_count, result['passed'])
            
            return result
            
        except Exception as e:
            logger.error(f"생성 중 오류 발생: {e}")
            self.generation_stats['failed'] += 1
            return None
    
    def _generate_text(self, prompt: str, max_length: int = 512) -> str:
        """
        LLM을 사용한 텍스트 생성
        
        Args:
            prompt: 입력 프롬프트
            max_length: 최대 생성 길이
            
        Returns:
            생성된 텍스트
        """
        if not self.model or not self.tokenizer:
            raise ValueError("모델이 초기화되지 않았습니다.")
        
        # 입력 토큰화
        inputs = self.tokenizer(
            prompt, 
            return_tensors="pt",
            truncation=True,
            max_length=2048
        ).to(self.model.device)
        
        # 생성
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=max_length,
                temperature=0.7,
                top_p=0.9,
                do_sample=True,
                pad_token_id=self.tokenizer.pad_token_id,
                eos_token_id=self.tokenizer.eos_token_id
            )
        
        # 디코딩
        generated_text = self.tokenizer.decode(
            outputs[0][inputs['input_ids'].shape[1]:], 
            skip_special_tokens=True
        )
        
        return generated_text.strip()
    
    def _initial_generation(self, context: str, question_type: str) -> Optional[Dict]:
        """초기 문제 생성"""
        prompt = self.prompts['initial_generation'].format(
            context=context,
            question_type=question_type
        )
        
        generated = self._generate_text(prompt)
        
        # 문제와 답변 분리
        qa_pair = self._parse_qa(generated)
        return qa_pair
    
    def _self_verification(self, question: str, answer: str) -> str:
        """자가 검증"""
        prompt = self.prompts['self_verification'].format(
            question=question,
            answer=answer
        )
        
        verification = self._generate_text(prompt, max_length=256)
        return verification
    
    def _improvement_generation(self, question: str, answer: str, feedback: str) -> Optional[Dict]:
        """개선된 버전 생성"""
        prompt = self.prompts['improvement'].format(
            question=question,
            answer=answer,
            feedback=feedback
        )
        
        generated = self._generate_text(prompt)
        qa_pair = self._parse_qa(generated)
        return qa_pair
    
    def _final_check(self, question: str, answer: str) -> str:
        """최종 검증"""
        prompt = self.prompts['final_check'].format(
            question=question,
            answer=answer
        )
        
        check_result = self._generate_text(prompt, max_length=256)
        return check_result
    
    def _parse_qa(self, text: str) -> Optional[Dict]:
        """
        생성된 텍스트에서 문제와 답변 추출
        """
        try:
            # 다양한 구분자로 시도
            separators = ['정답:', '답:', '답변:', 'Answer:', 'A:']
            
            question = ""
            answer = ""
            
            for sep in separators:
                if sep in text:
                    parts = text.split(sep, 1)
                    question = parts[0].strip()
                    answer = parts[1].strip() if len(parts) > 1 else ""
                    break
            
            # 구분자가 없는 경우 휴리스틱 사용
            if not answer:
                lines = text.strip().split('\n')
                # 마지막 단락을 답변으로 간주
                if len(lines) > 1:
                    question = '\n'.join(lines[:-1])
                    answer = lines[-1]
                else:
                    question = text
                    answer = "답변 생성 필요"
            
            return {
                'question': question,
                'answer': answer
            }
            
        except Exception as e:
            logger.error(f"QA 파싱 오류: {e}")
            return None
    
    def _update_stats(self, iterations: int, passed: bool):
        """통계 업데이트"""
        n = self.generation_stats['successful'] + self.generation_stats['failed']
        
        # 평균 반복 횟수 업데이트
        prev_avg = self.generation_stats['avg_iterations']
        self.generation_stats['avg_iterations'] = (prev_avg * (n-1) + iterations) / n
        
        # 개선율 계산
        if iterations > 1 and passed:
            improvement_count = self.generation_stats.get('improvements', 0) + 1
            self.generation_stats['improvements'] = improvement_count
            self.generation_stats['improvement_rate'] = improvement_count / self.generation_stats['successful']
    
    def get_statistics(self) -> Dict:
        """생성 통계 반환"""
        return self.generation_stats.copy()
    
    def batch_generate(self, 
                      contexts: List[str], 
                      question_types: List[str] = None,
                      batch_size: int = 4) -> List[Dict]:
        """
        배치 생성
        
        Args:
            contexts: 컨텍스트 리스트
            question_types: 문제 유형 리스트
            batch_size: 배치 크기
            
        Returns:
            생성된 QA 리스트
        """
        if question_types is None:
            # 기본값: 객관식 60%, 주관식 30%, 계산 10%
            question_types = []
            for _ in range(len(contexts)):
                rand = random.random()
                if rand < 0.6:
                    question_types.append("객관식")
                elif rand < 0.9:
                    question_types.append("주관식")
                else:
                    question_types.append("계산")
        
        results = []
        
        # 배치 처리
        for i in tqdm(range(0, len(contexts), batch_size), desc="배치 생성"):
            batch_contexts = contexts[i:i+batch_size]
            batch_types = question_types[i:i+batch_size]
            
            # 각 컨텍스트에 대해 생성
            for ctx, qtype in zip(batch_contexts, batch_types):
                result = self.generate_qa_pair(ctx, qtype)
                if result:
                    results.append(result)
        
        return results


# 체이닝 생성기 테스트
print("✅ 체이닝 데이터 생성기 생성 완료")
print("\n특징:")
print("- 4단계 체이닝: 생성 → 검증 → 개선 → 최종확인")
print("- 자동 품질 평가 및 필터링")
print("- 캐싱으로 중복 생성 방지")
print("- 배치 처리 지원")

# 사용 예시 출력
print("\n사용 예시:")
print("generator = ChainingDataGenerator('beomi/llama-2-ko-7b')")
print("generator.initialize_model(use_quantization=True)")
print("result = generator.generate_qa_pair(context, '객관식')")

### 13. 프롬프트 템플릿 관리

다양한 문제 유형별 프롬프트 템플릿 관리 시스템

In [None]:
class PromptTemplateManager:
    """
    문제 유형별 프롬프트 템플릿 관리
    
    FSKU 시험의 다양한 문제 유형에 맞춘
    전문적인 프롬프트 템플릿 제공
    """
    
    def __init__(self):
        """템플릿 초기화"""
        self.templates = self._initialize_templates()
        self.usage_stats = defaultdict(int)
    
    def _initialize_templates(self) -> Dict[str, Dict[str, str]]:
        """모든 템플릿 초기화"""
        templates = {
            # 객관식 템플릿
            '객관식_일반': {
                'system': "당신은 FSKU 출제위원입니다. 정확하고 명확한 객관식 문제를 생성하세요.",
                'user': """다음 금융 문서를 참고하여 객관식 문제를 생성하세요.

### 참고 문서:
{context}

### 요구사항:
- 5개의 선택지 제공 (1번~5번)
- 정답은 1개만
- 오답은 그럴듯하지만 명확히 틀린 내용
- 선택지는 비슷한 길이로 작성

### 문제:""",
                'examples': [
                    {
                        'question': "다음 중 전자금융거래법상 금융회사의 의무사항이 아닌 것은?",
                        'options': [
                            "1) 접근매체의 위조나 변조를 방지하기 위한 조치",
                            "2) 전자금융거래 내용의 확인 및 오류정정 요구 처리",
                            "3) 전자금융거래 기록의 5년간 보존",
                            "4) 이용자의 요청 없이도 정기적인 거래내역 통지",
                            "5) 전자금융사고 발생 시 손해배상"
                        ],
                        'answer': "4번"
                    }
                ]
            },
            
            '객관식_부정형': {
                'system': "부정형 객관식 문제를 생성하세요. '~아닌 것은', '~틀린 것은' 형태로 작성합니다.",
                'user': """다음 내용에서 틀리거나 해당하지 않는 것을 찾는 객관식 문제를 생성하세요.

### 참고 문서:
{context}

### 요구사항:
- "다음 중 ~아닌 것은?" 또는 "~틀린 것은?" 형태
- 4개는 맞는 내용, 1개만 틀린 내용
- 혼동하기 쉬운 내용으로 구성

### 문제:""",
                'examples': []
            },
            
            '객관식_복수정답': {
                'system': "복수 정답형 객관식 문제를 생성하세요.",
                'user': """다음 내용에서 모두 맞는 것을 고르는 복수정답형 문제를 생성하세요.

### 참고 문서:
{context}

### 요구사항:
- "다음 중 모두 옳은 것은?" 형태
- 보기를 ㄱ, ㄴ, ㄷ, ㄹ로 제시
- 선택지는 조합 형태 (예: 1) ㄱ, ㄴ  2) ㄴ, ㄷ ...)

### 문제:""",
                'examples': []
            },
            
            # 주관식 템플릿
            '주관식_설명형': {
                'system': "개념이나 제도를 설명하는 주관식 문제를 생성하세요.",
                'user': """다음 내용을 바탕으로 설명형 주관식 문제를 생성하세요.

### 참고 문서:
{context}

### 요구사항:
- "~에 대해 설명하시오" 형태
- 핵심 개념, 특징, 목적 등을 포함한 답변 요구
- 200자 이상의 상세한 답변이 필요한 문제

### 문제:""",
                'examples': []
            },
            
            '주관식_비교형': {
                'system': "두 개념을 비교하는 주관식 문제를 생성하세요.",
                'user': """다음 내용에서 비교 가능한 개념들을 찾아 비교형 문제를 생성하세요.

### 참고 문서:
{context}

### 요구사항:
- "A와 B를 비교하여 설명하시오" 형태
- 공통점과 차이점을 모두 다루도록
- 표나 도식으로 정리 가능한 내용

### 문제:""",
                'examples': []
            },
            
            '주관식_사례형': {
                'system': "실무 사례를 제시하고 해결방안을 묻는 문제를 생성하세요.",
                'user': """다음 내용을 바탕으로 실무 사례형 문제를 생성하세요.

### 참고 문서:
{context}

### 요구사항:
- 구체적인 상황 제시
- "이 경우 어떻게 처리해야 하는가?" 형태
- 법적 근거와 실무적 해결방안 요구

### 문제:""",
                'examples': []
            },
            
            # 계산 문제 템플릿
            '계산_금리': {
                'system': "금리 계산 문제를 생성하세요.",
                'user': """다음 내용을 참고하여 금리 계산 문제를 생성하세요.

### 참고 문서:
{context}

### 요구사항:
- 실제 금융상품의 금리 계산
- 단리/복리, 연이율/월이율 변환 포함
- 계산 과정을 단계별로 보여주는 답안

### 문제:""",
                'examples': []
            },
            
            '계산_위험관리': {
                'system': "위험관리 지표 계산 문제를 생성하세요.",
                'user': """다음 내용을 바탕으로 위험관리 관련 계산 문제를 생성하세요.

### 참고 문서:
{context}

### 요구사항:
- VaR, 자기자본비율 등 위험지표 계산
- 공식과 계산과정 명시
- 결과의 의미 해석 포함

### 문제:""",
                'examples': []
            },
            
            # 특수 유형
            '법규_조문': {
                'system': "특정 법규 조문의 내용을 묻는 문제를 생성하세요.",
                'user': """다음 법규 내용을 바탕으로 조문 관련 문제를 생성하세요.

### 참고 문서:
{context}

### 요구사항:
- 특정 조항의 내용 확인
- 빈칸 채우기 또는 내용 확인
- 정확한 법률 용어 사용

### 문제:""",
                'examples': []
            },
            
            '최신동향': {
                'system': "최신 금융 동향이나 제도 변경사항을 묻는 문제를 생성하세요.",
                'user': """다음 최신 동향을 바탕으로 문제를 생성하세요.

### 참고 문서:
{context}

### 요구사항:
- 최근 도입된 제도나 정책
- 변경 전후 비교
- 시행 시기와 주요 내용

### 문제:""",
                'examples': []
            }
        }
        
        return templates
    
    def get_template(self, template_type: str) -> Dict[str, Any]:
        """
        특정 템플릿 가져오기
        
        Args:
            template_type: 템플릿 유형
            
        Returns:
            템플릿 딕셔너리
        """
        if template_type not in self.templates:
            logger.warning(f"템플릿 '{template_type}'이 존재하지 않습니다. 기본 템플릿 사용.")
            template_type = '객관식_일반'
        
        self.usage_stats[template_type] += 1
        return self.templates[template_type]
    
    def get_random_template(self, category: str = None) -> Tuple[str, Dict[str, Any]]:
        """
        랜덤 템플릿 선택
        
        Args:
            category: 카테고리 (객관식/주관식/계산 등)
            
        Returns:
            (템플릿명, 템플릿 내용)
        """
        if category:
            # 특정 카테고리에서 선택
            filtered = [k for k in self.templates.keys() if k.startswith(category)]
            if filtered:
                template_name = random.choice(filtered)
            else:
                template_name = random.choice(list(self.templates.keys()))
        else:
            # 전체에서 선택
            template_name = random.choice(list(self.templates.keys()))
        
        return template_name, self.get_template(template_name)
    
    def format_prompt(self, template_type: str, context: str, **kwargs) -> str:
        """
        컨텍스트로 프롬프트 포맷팅
        
        Args:
            template_type: 템플릿 유형
            context: RAG에서 가져온 컨텍스트
            **kwargs: 추가 변수
            
        Returns:
            포맷된 프롬프트
        """
        template = self.get_template(template_type)
        
        # system 프롬프트와 user 프롬프트 결합
        system_prompt = template['system']
        user_prompt = template['user'].format(context=context, **kwargs)
        
        # 예시가 있으면 추가
        if template.get('examples'):
            examples_text = "\n### 예시:\n"
            for ex in template['examples'][:2]:  # 최대 2개 예시
                examples_text += f"문제: {ex['question']}\n"
                if 'options' in ex:
                    examples_text += "\n".join(ex['options']) + "\n"
                examples_text += f"답: {ex['answer']}\n\n"
            
            full_prompt = f"{system_prompt}\n\n{examples_text}{user_prompt}"
        else:
            full_prompt = f"{system_prompt}\n\n{user_prompt}"
        
        return full_prompt
    
    def get_usage_statistics(self) -> Dict[str, int]:
        """
        템플릿 사용 통계 반환
        """
        return dict(self.usage_stats)
    
    def recommend_template(self, context: str) -> str:
        """
        컨텍스트 분석 후 적절한 템플릿 추천
        
        Args:
            context: 문서 컨텍스트
            
        Returns:
            추천 템플릿명
        """
        context_lower = context.lower()
        
        # 키워드 기반 추천
        if any(word in context_lower for word in ['계산', '산출', '공식', '%', '이자']):
            return random.choice(['계산_금리', '계산_위험관리'])
        
        elif any(word in context_lower for word in ['제\d+조', '법', '규정', '시행령']):
            return '법규_조문'
        
        elif any(word in context_lower for word in ['최근', '개정', '도입', '변경']):
            return '최신동향'
        
        elif any(word in context_lower for word in ['비교', '차이', '공통점']):
            return '주관식_비교형'
        
        elif any(word in context_lower for word in ['사례', '경우', '상황']):
            return '주관식_사례형'
        
        else:
            # 기본값: 객관식 60%, 주관식 30%, 기타 10%
            rand = random.random()
            if rand < 0.6:
                return random.choice(['객관식_일반', '객관식_부정형', '객관식_복수정답'])
            elif rand < 0.9:
                return random.choice(['주관식_설명형', '주관식_비교형', '주관식_사례형'])
            else:
                return random.choice(['계산_금리', '계산_위험관리'])
    
    def add_custom_template(self, name: str, template: Dict[str, Any]):
        """
        사용자 정의 템플릿 추가
        
        Args:
            name: 템플릿 이름
            template: 템플릿 내용
        """
        if name in self.templates:
            logger.warning(f"템플릿 '{name}'이 이미 존재합니다. 덮어씁니다.")
        
        # 필수 키 확인
        required_keys = ['system', 'user']
        for key in required_keys:
            if key not in template:
                raise ValueError(f"템플릿에 필수 키 '{key}'가 없습니다.")
        
        self.templates[name] = template
        logger.info(f"템플릿 '{name}' 추가됨")
    
    def export_templates(self, file_path: str):
        """
        템플릿을 파일로 내보내기
        
        Args:
            file_path: 저장할 파일 경로
        """
        with open(file_path, 'w', encoding='utf-8') as f:
            json.dump(self.templates, f, ensure_ascii=False, indent=2)
        
        logger.info(f"템플릿이 {file_path}에 저장됨")
    
    def import_templates(self, file_path: str):
        """
        파일에서 템플릿 가져오기
        
        Args:
            file_path: 불러올 파일 경로
        """
        with open(file_path, 'r', encoding='utf-8') as f:
            imported = json.load(f)
        
        self.templates.update(imported)
        logger.info(f"{len(imported)}개 템플릿 가져옴")


# 프롬프트 매니저 테스트
print("✅ 프롬프트 템플릿 매니저 생성 완료")
prompt_manager = PromptTemplateManager()

print(f"\n📝 사용 가능한 템플릿 ({len(prompt_manager.templates)}개):")
for i, (name, _) in enumerate(prompt_manager.templates.items()):
    print(f"  {i+1}. {name}")

# 템플릿 사용 예시
print("\n📌 템플릿 사용 예시:")
test_context = "전자금융거래법 제21조에 따르면 금융회사는..."
template_name = prompt_manager.recommend_template(test_context)
print(f"추천 템플릿: {template_name}")

formatted_prompt = prompt_manager.format_prompt(template_name, test_context)
print(f"\n포맷된 프롬프트 (첫 200자):\n{formatted_prompt[:200]}...")

## 🎉 Stage 2 완료!

### ✅ 완료된 작업
1. **품질 평가 시스템** - 다차원 평가로 고품질 데이터만 선별
2. **체이닝 데이터 생성기** - 4단계 개선 프로세스로 품질 향상
3. **프롬프트 템플릿 관리** - 10가지 이상의 전문 템플릿 제공

### 📌 다음 단계 (Stage 3)
- **모니터링 대시보드 구현**
- **메인 실행 함수**
- **결과 분석 및 저장**

Stage 3을 계속 진행하시려면 다음 셀들을 실행하세요.

## 🚀 Stage 3: 실행 및 유틸리티

### 14. 실시간 모니터링 대시보드

In [None]:
class GenerationMonitor:
    """
    데이터 생성 과정 실시간 모니터링
    
    기능:
    - 진행 상황 시각화
    - 품질 분포 차트
    - 실시간 통계
    - 예상 소요 시간
    """
    
    def __init__(self):
        """모니터 초기화"""
        self.start_time = None
        self.stats = {
            'total_target': 0,
            'total_generated': 0,
            'passed': 0,
            'failed': 0,
            'current_rate': 0,
            'quality_scores': [],
            'generation_times': [],
            'question_types': defaultdict(int)
        }
        
        # 시각화 설정
        plt.style.use('seaborn-v0_8-darkgrid')
        self.fig = None
        self.axes = None
    
    def start_monitoring(self, target_count: int):
        """
        모니터링 시작
        
        Args:
            target_count: 목표 생성 개수
        """
        self.start_time = time.time()
        self.stats['total_target'] = target_count
        
        # 대시보드 초기화
        self._initialize_dashboard()
        
        print("=" * 60)
        print("📊 데이터 생성 모니터링 시작")
        print("=" * 60)
        print(f"목표: {target_count}개 문제 생성")
        print(f"시작 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print("=" * 60)
    
    def _initialize_dashboard(self):
        """대시보드 초기화"""
        # 2x2 서브플롯 생성
        self.fig, self.axes = plt.subplots(2, 2, figsize=(15, 10))
        self.fig.suptitle('FSKU 데이터 생성 모니터링 대시보드', fontsize=16)
        
        # 레이아웃 조정
        plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    
    def update(self, result: Dict):
        """
        생성 결과로 통계 업데이트
        
        Args:
            result: 생성된 QA 결과
        """
        # 기본 통계 업데이트
        self.stats['total_generated'] += 1
        
        if result.get('passed', False):
            self.stats['passed'] += 1
        else:
            self.stats['failed'] += 1
        
        # 품질 점수 추가
        if 'quality_score' in result:
            self.stats['quality_scores'].append(result['quality_score'])
        
        # 문제 유형 카운트
        if 'question_type' in result:
            self.stats['question_types'][result['question_type']] += 1
        
        # 생성 시간 기록
        if hasattr(self, '_last_update_time'):
            generation_time = time.time() - self._last_update_time
            self.stats['generation_times'].append(generation_time)
        self._last_update_time = time.time()
        
        # 성공률 계산
        if self.stats['total_generated'] > 0:
            self.stats['current_rate'] = self.stats['passed'] / self.stats['total_generated']
        
        # 주기적으로 대시보드 업데이트 (10개마다)
        if self.stats['total_generated'] % 10 == 0:
            self._update_dashboard()
    
    def _update_dashboard(self):
        """대시보드 업데이트"""
        if not self.fig or not self.axes:
            return
        
        # 각 서브플롯 초기화
        for ax in self.axes.flatten():
            ax.clear()
        
        # 1. 진행 상황 (왼쪽 상단)
        ax1 = self.axes[0, 0]
        self._plot_progress(ax1)
        
        # 2. 품질 점수 분포 (오른쪽 상단)
        ax2 = self.axes[0, 1]
        self._plot_quality_distribution(ax2)
        
        # 3. 문제 유형 분포 (왼쪽 하단)
        ax3 = self.axes[1, 0]
        self._plot_question_types(ax3)
        
        # 4. 시간당 생성량 (오른쪽 하단)
        ax4 = self.axes[1, 1]
        self._plot_generation_rate(ax4)
        
        # 화면 갱신
        plt.draw()
        plt.pause(0.01)
    
    def _plot_progress(self, ax):
        """진행 상황 플롯"""
        # 데이터 준비
        categories = ['생성됨', '통과', '실패']
        values = [
            self.stats['total_generated'],
            self.stats['passed'],
            self.stats['failed']
        ]
        
        # 막대 그래프
        bars = ax.bar(categories, values, color=['blue', 'green', 'red'])
        
        # 값 표시
        for bar, value in zip(bars, values):
            height = bar.get_height()
            ax.text(bar.get_x() + bar.get_width()/2., height,
                   f'{value}', ha='center', va='bottom')
        
        # 목표선 표시
        ax.axhline(y=self.stats['total_target'], color='orange', 
                  linestyle='--', label=f"목표: {self.stats['total_target']}")
        
        ax.set_title('생성 진행 상황')
        ax.set_ylabel('개수')
        ax.legend()
        
        # 진행률 표시
        progress = self.stats['total_generated'] / self.stats['total_target'] * 100
        ax.text(0.5, 0.95, f'진행률: {progress:.1f}%', 
               transform=ax.transAxes, ha='center', fontsize=12, 
               bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
    
    def _plot_quality_distribution(self, ax):
        """품질 점수 분포"""
        if not self.stats['quality_scores']:
            ax.text(0.5, 0.5, '데이터 없음', transform=ax.transAxes, 
                   ha='center', va='center')
            ax.set_title('품질 점수 분포')
            return
        
        # 히스토그램
        ax.hist(self.stats['quality_scores'], bins=20, color='skyblue', 
               edgecolor='black', alpha=0.7)
        
        # 평균선
        mean_score = np.mean(self.stats['quality_scores'])
        ax.axvline(x=mean_score, color='red', linestyle='--', 
                  label=f'평균: {mean_score:.1f}')
        
        # 합격선 (70점)
        ax.axvline(x=70, color='green', linestyle='--', 
                  label='합격선: 70')
        
        ax.set_title('품질 점수 분포')
        ax.set_xlabel('점수')
        ax.set_ylabel('빈도')
        ax.legend()
    
    def _plot_question_types(self, ax):
        """문제 유형 분포"""
        if not self.stats['question_types']:
            ax.text(0.5, 0.5, '데이터 없음', transform=ax.transAxes, 
                   ha='center', va='center')
            ax.set_title('문제 유형 분포')
            return
        
        # 파이 차트
        types = list(self.stats['question_types'].keys())
        counts = list(self.stats['question_types'].values())
        
        colors = plt.cm.Set3(range(len(types)))
        wedges, texts, autotexts = ax.pie(counts, labels=types, colors=colors,
                                          autopct='%1.1f%%', startangle=90)
        
        ax.set_title('문제 유형 분포')
    
    def _plot_generation_rate(self, ax):
        """생성 속도"""
        if not self.stats['generation_times'] or not self.start_time:
            ax.text(0.5, 0.5, '데이터 없음', transform=ax.transAxes, 
                   ha='center', va='center')
            ax.set_title('생성 속도')
            return
        
        # 시간당 생성량 계산
        elapsed_time = time.time() - self.start_time
        if elapsed_time > 0:
            rate_per_hour = (self.stats['total_generated'] / elapsed_time) * 3600
            rate_per_minute = (self.stats['total_generated'] / elapsed_time) * 60
        else:
            rate_per_hour = 0
            rate_per_minute = 0
        
        # 예상 완료 시간
        if rate_per_minute > 0:
            remaining = self.stats['total_target'] - self.stats['total_generated']
            eta_minutes = remaining / rate_per_minute
            eta_str = f"{int(eta_minutes)}분 {int((eta_minutes % 1) * 60)}초"
        else:
            eta_str = "계산 중..."
        
        # 정보 표시
        info_text = f"""현재 속도:
        
{rate_per_hour:.0f} 개/시간
{rate_per_minute:.1f} 개/분

평균 생성 시간: {np.mean(self.stats['generation_times']):.1f}초

예상 완료 시간: {eta_str}
        """
        
        ax.text(0.5, 0.5, info_text, transform=ax.transAxes, 
               ha='center', va='center', fontsize=12,
               bbox=dict(boxstyle='round', facecolor='lightblue', alpha=0.5))
        
        ax.set_title('생성 속도 및 예상 시간')
        ax.axis('off')
    
    def print_summary(self):
        """최종 요약 출력"""
        if not self.start_time:
            return
        
        total_time = time.time() - self.start_time
        
        print("\n" + "=" * 60)
        print("📊 데이터 생성 완료 요약")
        print("=" * 60)
        print(f"총 소요 시간: {int(total_time//60)}분 {int(total_time%60)}초")
        print(f"총 생성: {self.stats['total_generated']}개")
        print(f"통과: {self.stats['passed']}개 ({self.stats['current_rate']*100:.1f}%)")
        print(f"실패: {self.stats['failed']}개")
        
        if self.stats['quality_scores']:
            print(f"\n품질 점수:")
            print(f"  - 평균: {np.mean(self.stats['quality_scores']):.1f}")
            print(f"  - 최고: {max(self.stats['quality_scores']):.1f}")
            print(f"  - 최저: {min(self.stats['quality_scores']):.1f}")
        
        print(f"\n문제 유형별 분포:")
        for qtype, count in self.stats['question_types'].items():
            percentage = count / self.stats['total_generated'] * 100
            print(f"  - {qtype}: {count}개 ({percentage:.1f}%)")
        
        print("=" * 60)
    
    def save_report(self, file_path: str):
        """
        상세 보고서 저장
        
        Args:
            file_path: 저장할 파일 경로
        """
        report = {
            'summary': {
                'total_generated': self.stats['total_generated'],
                'passed': self.stats['passed'],
                'failed': self.stats['failed'],
                'success_rate': self.stats['current_rate'],
                'total_time_seconds': time.time() - self.start_time if self.start_time else 0
            },
            'quality': {
                'scores': self.stats['quality_scores'],
                'mean': np.mean(self.stats['quality_scores']) if self.stats['quality_scores'] else 0,
                'std': np.std(self.stats['quality_scores']) if self.stats['quality_scores'] else 0
            },
            'distribution': dict(self.stats['question_types']),
            'performance': {
                'generation_times': self.stats['generation_times'],
                'avg_time_per_item': np.mean(self.stats['generation_times']) if self.stats['generation_times'] else 0
            },
            'timestamp': datetime.now().isoformat()
        }
        
        with open(file_path, 'w', encoding='utf-8') as f:
            json.dump(report, f, ensure_ascii=False, indent=2)
        
        print(f"✅ 보고서 저장됨: {file_path}")
    
    def close(self):
        """모니터링 종료"""
        if self.fig:
            plt.close(self.fig)


# 모니터링 시스템 테스트
print("✅ 실시간 모니터링 시스템 생성 완료")
print("\n주요 기능:")
print("- 실시간 진행 상황 추적")
print("- 품질 점수 분포 시각화")
print("- 문제 유형별 통계")
print("- 예상 완료 시간 계산")
print("- 상세 보고서 생성")

### 15. 데이터 저장 및 관리

In [None]:
class DataManager:
    """
    생성된 데이터 저장 및 관리
    
    기능:
    - JSONL 형식으로 저장
    - 메타데이터 관리
    - 중복 제거
    - 데이터 검증
    """
    
    def __init__(self, output_dir: Path):
        """
        초기화
        
        Args:
            output_dir: 출력 디렉토리
        """
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        
        # 파일 경로 설정
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        self.data_file = self.output_dir / f"fsku_data_{timestamp}.jsonl"
        self.metadata_file = self.output_dir / f"fsku_metadata_{timestamp}.json"
        
        # 데이터 버퍼
        self.data_buffer = []
        self.buffer_size = 100  # 100개씩 배치 저장
        
        # 중복 체크용
        self.question_hashes = set()
        
        # 통계
        self.save_stats = {
            'total_saved': 0,
            'duplicates_removed': 0,
            'batches_written': 0
        }
    
    def add_data(self, qa_data: Dict) -> bool:
        """
        데이터 추가
        
        Args:
            qa_data: 생성된 QA 데이터
            
        Returns:
            저장 성공 여부
        """
        # 필수 필드 확인
        required_fields = ['question', 'answer', 'quality_score', 'passed']
        if not all(field in qa_data for field in required_fields):
            logger.warning("필수 필드가 누락된 데이터")
            return False
        
        # 품질 체크
        if not qa_data.get('passed', False):
            logger.debug("품질 기준 미달 데이터 스킵")
            return False
        
        # 중복 체크
        question_hash = hash(qa_data['question'])
        if question_hash in self.question_hashes:
            self.save_stats['duplicates_removed'] += 1
            logger.debug("중복 문제 발견, 스킵")
            return False
        
        # 저장용 포맷으로 변환
        save_data = self._format_for_save(qa_data)
        
        # 버퍼에 추가
        self.data_buffer.append(save_data)
        self.question_hashes.add(question_hash)
        
        # 버퍼가 가득 차면 저장
        if len(self.data_buffer) >= self.buffer_size:
            self._flush_buffer()
        
        self.save_stats['total_saved'] += 1
        return True
    
    def _format_for_save(self, qa_data: Dict) -> Dict:
        """
        저장용 포맷으로 변환
        
        학습에 사용할 수 있는 표준 형식으로 변환
        """
        # 기본 필드
        formatted = {
            'instruction': qa_data['question'],
            'input': "",  # RAG 컨텍스트는 별도 관리
            'output': qa_data['answer'],
            'quality_score': qa_data['quality_score'],
            'question_type': qa_data.get('question_type', 'unknown')
        }
        
        # 메타데이터 추가
        if 'metadata' in qa_data:
            formatted['metadata'] = qa_data['metadata']
        
        # 생성 시간 추가
        formatted['created_at'] = datetime.now().isoformat()
        
        return formatted
    
    def _flush_buffer(self):
        """버퍼 내용을 파일에 저장"""
        if not self.data_buffer:
            return
        
        # JSONL 형식으로 저장 (한 줄에 하나씩)
        with open(self.data_file, 'a', encoding='utf-8') as f:
            for data in self.data_buffer:
                json_line = json.dumps(data, ensure_ascii=False)
                f.write(json_line + '\n')
        
        self.save_stats['batches_written'] += 1
        logger.info(f"배치 {self.save_stats['batches_written']} 저장 완료 ({len(self.data_buffer)}개)")
        
        # 버퍼 초기화
        self.data_buffer.clear()
    
    def save_metadata(self, generation_stats: Dict = None):
        """
        메타데이터 저장
        
        Args:
            generation_stats: 생성 통계 정보
        """
        metadata = {
            'file_info': {
                'data_file': str(self.data_file),
                'created_at': datetime.now().isoformat(),
                'format': 'jsonl',
                'encoding': 'utf-8'
            },
            'statistics': {
                'total_saved': self.save_stats['total_saved'],
                'duplicates_removed': self.save_stats['duplicates_removed'],
                'batches_written': self.save_stats['batches_written']
            },
            'data_info': {
                'fields': ['instruction', 'input', 'output', 'quality_score', 'question_type'],
                'quality_threshold': 70
            }
        }
        
        # 생성 통계 추가
        if generation_stats:
            metadata['generation_stats'] = generation_stats
        
        # 메타데이터 저장
        with open(self.metadata_file, 'w', encoding='utf-8') as f:
            json.dump(metadata, f, ensure_ascii=False, indent=2)
        
        logger.info(f"메타데이터 저장: {self.metadata_file}")
    
    def finalize(self):
        """최종 저장 및 정리"""
        # 남은 버퍼 저장
        if self.data_buffer:
            self._flush_buffer()
        
        print("\n" + "=" * 60)
        print("💾 데이터 저장 완료")
        print("=" * 60)
        print(f"저장 위치: {self.data_file}")
        print(f"총 저장된 데이터: {self.save_stats['total_saved']}개")
        print(f"제거된 중복: {self.save_stats['duplicates_removed']}개")
        print(f"배치 수: {self.save_stats['batches_written']}개")
        print("=" * 60)
    
    def validate_data(self) -> Dict:
        """
        저장된 데이터 검증
        
        Returns:
            검증 결과
        """
        if not self.data_file.exists():
            return {'valid': False, 'error': '데이터 파일이 없습니다'}
        
        validation_results = {
            'valid': True,
            'total_lines': 0,
            'valid_lines': 0,
            'invalid_lines': [],
            'quality_distribution': defaultdict(int),
            'type_distribution': defaultdict(int)
        }
        
        with open(self.data_file, 'r', encoding='utf-8') as f:
            for line_num, line in enumerate(f, 1):
                validation_results['total_lines'] += 1
                
                try:
                    # JSON 파싱
                    data = json.loads(line.strip())
                    
                    # 필수 필드 확인
                    required = ['instruction', 'output', 'quality_score']
                    if all(field in data for field in required):
                        validation_results['valid_lines'] += 1
                        
                        # 통계 수집
                        score_range = int(data['quality_score'] // 10) * 10
                        validation_results['quality_distribution'][score_range] += 1
                        validation_results['type_distribution'][data.get('question_type', 'unknown')] += 1
                    else:
                        validation_results['invalid_lines'].append(line_num)
                        
                except json.JSONDecodeError:
                    validation_results['invalid_lines'].append(line_num)
                    validation_results['valid'] = False
        
        # 검증 결과 출력
        print("\n📋 데이터 검증 결과:")
        print(f"총 라인 수: {validation_results['total_lines']}")
        print(f"유효한 라인: {validation_results['valid_lines']}")
        print(f"무효한 라인: {len(validation_results['invalid_lines'])}")
        
        if validation_results['invalid_lines']:
            print(f"무효한 라인 번호: {validation_results['invalid_lines'][:10]}...")
        
        return validation_results
    
    def export_for_training(self, output_file: str = None):
        """
        학습용 형식으로 내보내기
        
        Args:
            output_file: 출력 파일 경로
        """
        if output_file is None:
            output_file = self.output_dir / "train_data.jsonl"
        
        # 데이터 로드 및 변환
        training_data = []
        
        with open(self.data_file, 'r', encoding='utf-8') as f:
            for line in f:
                data = json.loads(line.strip())
                
                # 학습용 형식으로 변환
                training_format = {
                    'text': f"### 질문: {data['instruction']}\n\n### 답변: {data['output']}"
                }
                
                training_data.append(training_format)
        
        # 셔플
        random.shuffle(training_data)
        
        # 저장
        with open(output_file, 'w', encoding='utf-8') as f:
            for item in training_data:
                f.write(json.dumps(item, ensure_ascii=False) + '\n')
        
        print(f"✅ 학습용 데이터 내보내기 완료: {output_file}")
        print(f"   총 {len(training_data)}개 샘플")


# 데이터 매니저 테스트
print("✅ 데이터 관리 시스템 생성 완료")
print("\n주요 기능:")
print("- JSONL 형식 저장 (학습 호환)")
print("- 자동 중복 제거")
print("- 배치 저장으로 효율성 향상")
print("- 메타데이터 관리")
print("- 데이터 검증 기능")

### 16. 메인 실행 함수

In [None]:
class FSKUDataAugmentation:
    """
    FSKU 데이터 증강 통합 실행 클래스
    
    모든 컴포넌트를 통합하여 자동 데이터 생성
    """
    
    def __init__(self, 
                 model_name: str = "beomi/llama-2-ko-7b",
                 external_dir: Path = None,
                 output_dir: Path = None):
        """
        초기화
        
        Args:
            model_name: 사용할 LLM 모델
            external_dir: 외부 문서 디렉토리
            output_dir: 출력 디렉토리
        """
        self.model_name = model_name
        self.external_dir = external_dir or EXTERNAL_DIR
        self.output_dir = output_dir or OUTPUT_DIR
        
        # 컴포넌트 초기화
        self.rag_system = None
        self.generator = None
        self.prompt_manager = None
        self.monitor = None
        self.data_manager = None
        
        # 설정
        self.config = {
            'use_quantization': True,
            'batch_size': 4,
            'target_count': 1000,
            'max_iterations': 3,
            'quality_threshold': 70
        }
    
    def initialize(self):
        """시스템 초기화"""
        print("=" * 60)
        print("🚀 FSKU 데이터 증강 시스템 초기화")
        print("=" * 60)
        
        # 1. RAG 시스템 초기화
        print("\n[1/5] RAG 시스템 초기화...")
        self.rag_system = RAGSystem()
        self.rag_system.initialize(self.external_dir)
        
        if not self.rag_system.is_initialized:
            raise ValueError("RAG 시스템 초기화 실패. 외부 문서를 확인하세요.")
        
        # 2. 체이닝 생성기 초기화
        print("\n[2/5] 데이터 생성기 초기화...")
        self.generator = ChainingDataGenerator(self.model_name)
        
        # 3. 프롬프트 매니저 초기화
        print("\n[3/5] 프롬프트 매니저 초기화...")
        self.prompt_manager = PromptTemplateManager()
        
        # 4. 모니터링 시스템 초기화
        print("\n[4/5] 모니터링 시스템 초기화...")
        self.monitor = GenerationMonitor()
        
        # 5. 데이터 매니저 초기화
        print("\n[5/5] 데이터 매니저 초기화...")
        self.data_manager = DataManager(self.output_dir)
        
        print("\n✅ 모든 시스템 초기화 완료!")
    
    def run(self, 
            target_count: int = None,
            load_model: bool = True,
            test_mode: bool = False):
        """
        데이터 증강 실행
        
        Args:
            target_count: 생성할 문제 수
            load_model: LLM 모델 로드 여부
            test_mode: 테스트 모드 (소량 생성)
        """
        if target_count:
            self.config['target_count'] = target_count
        
        if test_mode:
            self.config['target_count'] = 10
            self.config['batch_size'] = 2
            print("⚠️ 테스트 모드: 10개만 생성합니다.")
        
        try:
            # 모델 로드
            if load_model:
                print("\n🤖 LLM 모델 로드 중...")
                self.generator.initialize_model(
                    use_quantization=self.config['use_quantization']
                )
            
            # 모니터링 시작
            self.monitor.start_monitoring(self.config['target_count'])
            
            # 데이터 생성 루프
            self._generation_loop()
            
            # 완료 처리
            self._finalize()
            
        except KeyboardInterrupt:
            print("\n\n⚠️ 사용자에 의해 중단됨")
            self._finalize()
        except Exception as e:
            logger.error(f"실행 중 오류: {e}")
            raise
        finally:
            if self.monitor:
                self.monitor.close()
    
    def _generation_loop(self):
        """데이터 생성 메인 루프"""
        print("\n" + "=" * 60)
        print("🔄 데이터 생성 시작")
        print("=" * 60)
        
        generated_count = 0
        attempts = 0
        
        # 진행률 표시
        pbar = tqdm(total=self.config['target_count'], desc="생성 진행")
        
        while generated_count < self.config['target_count']:
            attempts += 1
            
            try:
                # 1. RAG에서 컨텍스트 가져오기
                if random.random() < 0.7:
                    # 70% 확률로 관련 검색
                    query = self._generate_search_query()
                    context = self.rag_system.search(query, top_k=3)
                else:
                    # 30% 확률로 랜덤
                    context = self.rag_system.get_random_context(n=2)
                
                if not context:
                    logger.warning("컨텍스트 가져오기 실패")
                    continue
                
                # 2. 프롬프트 템플릿 선택
                template_name = self.prompt_manager.recommend_template(context)
                
                # 3. 문제 생성
                result = self.generator.generate_qa_pair(
                    context=context,
                    question_type=self._get_question_type_from_template(template_name),
                    max_iterations=self.config['max_iterations']
                )
                
                if result and result.get('passed', False):
                    # 4. 데이터 저장
                    if self.data_manager.add_data(result):
                        generated_count += 1
                        pbar.update(1)
                    
                    # 5. 모니터링 업데이트
                    self.monitor.update(result)
                    
                    # 6. 진행 상황 출력 (50개마다)
                    if generated_count % 50 == 0:
                        self._print_progress(generated_count, attempts)
                
            except Exception as e:
                logger.error(f"생성 중 오류: {e}")
                continue
            
            # 너무 많은 시도 방지
            if attempts > self.config['target_count'] * 3:
                logger.warning("너무 많은 시도. 품질 기준을 확인하세요.")
                break
        
        pbar.close()
    
    def _generate_search_query(self) -> str:
        """검색 쿼리 생성"""
        # 금융 관련 주요 토픽
        topics = [
            "전자금융거래", "개인정보보호", "금융보안", "자금세탁방지",
            "신용정보", "금융상품", "리스크관리", "내부통제",
            "금융규제", "핀테크", "디지털금융", "금융소비자보호",
            "KYC", "AML", "자산운용", "증권거래"
        ]
        
        return random.choice(topics)
    
    def _get_question_type_from_template(self, template_name: str) -> str:
        """템플릿명에서 문제 유형 추출"""
        if '객관식' in template_name:
            return '객관식'
        elif '주관식' in template_name:
            return '주관식'
        elif '계산' in template_name:
            return '계산'
        else:
            return '기타'
    
    def _print_progress(self, generated: int, attempts: int):
        """진행 상황 출력"""
        success_rate = generated / attempts * 100 if attempts > 0 else 0
        print(f"\n📊 진행 상황: {generated}/{self.config['target_count']} "
              f"(성공률: {success_rate:.1f}%)")
    
    def _finalize(self):
        """최종 처리"""
        print("\n🏁 최종 처리 중...")
        
        # 데이터 저장 완료
        if self.data_manager:
            self.data_manager.finalize()
            
            # 메타데이터 저장
            if self.generator and self.monitor:
                generation_stats = {
                    'generator': self.generator.get_statistics(),
                    'monitor': self.monitor.stats
                }
                self.data_manager.save_metadata(generation_stats)
            
            # 데이터 검증
            self.data_manager.validate_data()
        
        # 모니터링 요약
        if self.monitor:
            self.monitor.print_summary()
            
            # 보고서 저장
            report_path = self.output_dir / "generation_report.json"
            self.monitor.save_report(str(report_path))
        
        print("\n✅ 모든 작업 완료!")
    
    def update_config(self, **kwargs):
        """설정 업데이트"""
        self.config.update(kwargs)
        print("설정 업데이트됨:")
        for key, value in kwargs.items():
            print(f"  - {key}: {value}")


# 통합 실행 시스템 생성
print("✅ FSKU 데이터 증강 통합 시스템 생성 완료")
print("\n사용법:")
print("1. augmentation = FSKUDataAugmentation()")
print("2. augmentation.initialize()")
print("3. augmentation.run(target_count=1000)")
print("\n설정 변경:")
print("augmentation.update_config(batch_size=8, quality_threshold=75)")

## 🎯 실행 예제

아래 셀을 실행하여 데이터 증강을 시작하세요.

In [None]:
# 실행 예제 1: 테스트 실행 (10개만 생성)
print("=" * 60)
print("🧪 테스트 실행 예제")
print("=" * 60)
print("\n이 셀을 실행하면 10개의 샘플 데이터를 생성합니다.")
print("실제 실행 전에 시스템이 제대로 작동하는지 확인할 수 있습니다.")
print("\n실행하려면 아래 주석을 제거하세요:")

# # 시스템 생성
# augmentation = FSKUDataAugmentation(
#     model_name="beomi/llama-2-ko-7b",
#     external_dir=EXTERNAL_DIR,
#     output_dir=OUTPUT_DIR
# )

# # 초기화
# augmentation.initialize()

# # 테스트 실행
# augmentation.run(test_mode=True)

In [None]:
# 실행 예제 2: 본격 실행 (1000개 생성)
print("=" * 60)
print("🚀 본격 실행 예제")
print("=" * 60)
print("\n이 셀을 실행하면 1000개의 고품질 데이터를 생성합니다.")
print("예상 소요 시간: 2-4시간 (모델과 하드웨어에 따라 다름)")
print("\n실행하려면 아래 주석을 제거하세요:")

# # 시스템 생성 (더 좋은 모델 사용)
# augmentation = FSKUDataAugmentation(
#     model_name="upstage/SOLAR-10.7B-v1.0",  # 더 좋은 한국어 모델
#     external_dir=EXTERNAL_DIR,
#     output_dir=OUTPUT_DIR
# )

# # 설정 변경 (선택사항)
# augmentation.update_config(
#     target_count=1000,        # 생성할 문제 수
#     batch_size=4,             # 배치 크기
#     quality_threshold=75,     # 품질 기준 (높일수록 엄격)
#     max_iterations=3          # 개선 반복 횟수
# )

# # 초기화
# augmentation.initialize()

# # 본격 실행
# augmentation.run()

In [None]:
# 실행 예제 3: 결과 분석 및 학습 데이터 내보내기
print("=" * 60)
print("📊 결과 분석 예제")
print("=" * 60)
print("\n생성이 완료된 후 이 셀을 실행하여 결과를 분석하세요.")

# 최신 생성 파일 찾기
import glob

# JSONL 파일 찾기
jsonl_files = sorted(glob.glob(str(OUTPUT_DIR / "fsku_data_*.jsonl")))

if jsonl_files:
    latest_file = jsonl_files[-1]
    print(f"\n최신 데이터 파일: {latest_file}")
    
    # 데이터 매니저로 검증
    temp_manager = DataManager(OUTPUT_DIR)
    temp_manager.data_file = Path(latest_file)
    
    # 검증 실행
    validation = temp_manager.validate_data()
    
    # 학습용 데이터로 내보내기
    if validation['valid']:
        print("\n학습용 데이터로 내보내기...")
        temp_manager.export_for_training()
    
    # 샘플 출력
    print("\n📝 생성된 데이터 샘플 (처음 3개):")
    print("-" * 60)
    
    with open(latest_file, 'r', encoding='utf-8') as f:
        for i, line in enumerate(f):
            if i >= 3:
                break
            data = json.loads(line)
            print(f"\n[샘플 {i+1}]")
            print(f"문제: {data['instruction'][:100]}...")
            print(f"답변: {data['output'][:100]}...")
            print(f"품질: {data['quality_score']}점")
            print(f"유형: {data['question_type']}")
else:
    print("\n⚠️ 생성된 데이터 파일이 없습니다.")
    print("먼저 데이터 생성을 실행하세요.")

## 🎉 완료!

### ✅ 구현 완료 사항
1. **RAG 시스템** - 외부 문서 기반 컨텍스트 검색
2. **품질 평가 시스템** - 다차원 평가로 고품질 보장
3. **체이닝 생성기** - 4단계 개선 프로세스
4. **프롬프트 템플릿** - 10가지 이상의 문제 유형
5. **실시간 모니터링** - 진행 상황 시각화
6. **데이터 관리** - JSONL 저장 및 검증
7. **통합 실행** - 원클릭 자동화

### 📋 사용 방법 요약

1. **외부 문서 준비**
   ```
   data/external/ 폴더에 PDF, TXT, Excel 문서 추가
   ```

2. **테스트 실행**
   ```python
   augmentation = FSKUDataAugmentation()
   augmentation.initialize()
   augmentation.run(test_mode=True)
   ```

3. **본격 실행**
   ```python
   augmentation.run(target_count=1000)
   ```

### 💡 Tips
- GPU 메모리 부족시: `use_quantization=True` (기본값)
- 품질 향상: `quality_threshold=80` 으로 설정
- 속도 향상: `batch_size` 증가
- 다양성 확보: 외부 문서 다양하게 추가

### 📚 생성된 파일
- `data/augmented/fsku_data_*.jsonl` - 생성된 데이터
- `data/augmented/fsku_metadata_*.json` - 메타데이터
- `data/augmented/generation_report.json` - 상세 보고서
- `data/augmented/train_data.jsonl` - 학습용 포맷

### 🚀 다음 단계
1. 생성된 데이터로 모델 학습 (FSKU_2_학습.ipynb)
2. 학습된 모델로 추론 (FSKU_3_추론.ipynb)

---
**Good Luck with FSKU Challenge! 🏆**