claude

In [1]:
"""
한국 형법 RAG 에이전트 시스템
================================
Korean Criminal Law Retrieval-Augmented Generation Agent System

모듈화된 효율적인 한국 형법 Question-Answering 시스템
- 데이터 모델링, Repository 패턴, 파이프라인 아키텍처 적용
- KMMLU Criminal-Law 카테고리 평가용
"""

import os
import json
import re
import time
import asyncio
import concurrent.futures
from typing import List, Dict, Any, Optional, Union, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.notebook import tqdm
import pandas as pd
import numpy as np

# 외부 라이브러리
from dotenv import load_dotenv
from langchain_community.document_loaders import PyPDFLoader, DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
from neo4j import GraphDatabase
from openai import OpenAI

# 환경 변수 로드
load_dotenv()

# 환경 설정
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
NEO4J_URI = os.getenv("NEO4J_URI")
NEO4J_USERNAME = os.getenv("NEO4J_USERNAME")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD")

# 폴더 생성
os.makedirs("results", exist_ok=True)
os.makedirs("dataset", exist_ok=True)

print("환경 설정 완료 및 필요 라이브러리 임포트 완료")

환경 설정 완료 및 필요 라이브러리 임포트 완료


In [2]:
# 데이터 모델 클래스
# ================

@dataclass
class Article:
    """법조항 정보를 담는 클래스"""
    id: str
    text: str
    embedding: Optional[List[float]] = None
    
    def to_dict(self) -> Dict[str, Any]:
        """딕셔너리 변환"""
        return {
            "id": self.id,
            "text": self.text,
            "embedding": self.embedding
        }


@dataclass
class Precedent:
    """판례 정보를 담는 클래스"""
    id: str
    name: Optional[str] = None
    judgment_summary: Optional[str] = None
    full_summary: Optional[str] = None
    keywords: List[str] = field(default_factory=list)
    referenced_rules: List[str] = field(default_factory=list)
    referenced_cases: List[str] = field(default_factory=list)
    embedding: Optional[List[float]] = None
    
    def to_dict(self) -> Dict[str, Any]:
        """딕셔너리 변환"""
        return {
            "id": self.id,
            "name": self.name,
            "judgment_summary": self.judgment_summary,
            "full_summary": self.full_summary,
            "keywords": self.keywords,
            "referenced_rules": self.referenced_rules,
            "referenced_cases": self.referenced_cases,
            "embedding": self.embedding
        }


@dataclass
class SearchResult:
    """검색 결과를 담는 클래스"""
    id: str
    type: str  # "Article" 또는 "Precedent"
    score: float
    text: str
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def to_dict(self) -> Dict[str, Any]:
        """딕셔너리 변환"""
        return {
            "id": self.id,
            "type": self.type,
            "score": self.score,
            "text": self.text,
            "metadata": self.metadata
        }

print("데이터 모델 클래스 정의 완료")

데이터 모델 클래스 정의 완료


In [3]:
# Neo4j 리포지토리 클래스
# =====================

class Neo4jRepository:
    """Neo4j 데이터베이스 연결 및 기본 작업을 위한 베이스 클래스"""
    
    def __init__(self, uri: str, username: str, password: str, database: str = "neo4j"):
        """Neo4j 연결 초기화
        
        Args:
            uri: Neo4j 서버 URI
            username: 사용자명
            password: 비밀번호
            database: 데이터베이스 이름
        """
        self.uri = uri
        self.username = username
        self.password = password
        self.database = database
        self.driver = None
        
    def connect(self) -> None:
        """Neo4j 데이터베이스에 연결"""
        try:
            self.driver = GraphDatabase.driver(
                self.uri, 
                auth=(self.username, self.password)
            )
            self.driver.verify_connectivity()
            print("Successfully connected to Neo4j.")
        except Exception as e:
            print(f"Failed to connect to Neo4j: {e}")
            raise
    
    def close(self) -> None:
        """연결 종료"""
        if self.driver:
            self.driver.close()
            self.driver = None
            print("Neo4j connection closed.")
    
    def run_query(self, query: str, parameters: Dict[str, Any] = None) -> List[Dict[str, Any]]:
        """Neo4j 쿼리 실행
        
        Args:
            query: Cypher 쿼리
            parameters: 쿼리 파라미터
            
        Returns:
            쿼리 결과
        """
        if not self.driver:
            self.connect()
            
        with self.driver.session(database=self.database) as session:
            result = session.run(query, parameters or {})
            return [record.data() for record in result]
            
    def setup_constraints_and_indexes(self, embedding_dimension: int) -> None:
        """제약조건 및 인덱스 설정
        
        Args:
            embedding_dimension: 임베딩 벡터 차원
        """
        try:
            # 고유성 제약조건 설정
            self.run_query("CREATE CONSTRAINT article_id IF NOT EXISTS FOR (a:Article) REQUIRE a.id IS UNIQUE")
            self.run_query("CREATE CONSTRAINT precedent_id IF NOT EXISTS FOR (p:Precedent) REQUIRE p.id IS UNIQUE")
            self.run_query("CREATE CONSTRAINT keyword_text IF NOT EXISTS FOR (k:Keyword) REQUIRE k.text IS UNIQUE")
            
            # 벡터 인덱스 생성
            try:
                self.run_query(
                    "CREATE VECTOR INDEX article_embedding IF NOT EXISTS "
                    "FOR (a:Article) ON (a.embedding) "
                    f"OPTIONS {{indexConfig: {{`vector.dimensions`: {embedding_dimension}, `vector.similarity_function`: 'cosine'}}}}"
                )
                print("Article vector index created or already exists.")
            except Exception as e:
                print(f"Error creating Article vector index: {e}")
                
            try:
                self.run_query(
                    "CREATE VECTOR INDEX precedent_embedding IF NOT EXISTS "
                    "FOR (p:Precedent) ON (p.embedding) "
                    f"OPTIONS {{indexConfig: {{`vector.dimensions`: {embedding_dimension}, `vector.similarity_function`: 'cosine'}}}}"
                )
                print("Precedent vector index created or already exists.")
            except Exception as e:
                print(f"Error creating Precedent vector index: {e}")
                
            # 인덱스가 활성화될 때까지 대기
            print("Waiting for indexes to populate...")
            self.run_query("CALL db.awaitIndexes(300)")
            print("Indexes are now online.")
        except Exception as e:
            print(f"Error setting up constraints and indexes: {e}")
            raise

print("Neo4j 리포지토리 베이스 클래스 정의 완료")

Neo4j 리포지토리 베이스 클래스 정의 완료


In [4]:
class ArticleRepository(Neo4jRepository):
    """법조항 관련 데이터베이스 작업을 처리하는 클래스"""
    
    def create_article(self, article: Article) -> None:
        """법조항 노드 생성
        
        Args:
            article: 법조항 객체
        """
        query = """
        MERGE (a:Article {id: $article_id})
        SET a.text = $content,
            a.embedding = $embedding
        """
        self.run_query(
            query,
            {
                "article_id": article.id,
                "content": article.text,
                "embedding": article.embedding
            }
        )
    
    def create_bulk_articles(self, articles: List[Article], batch_size: int = 50) -> int:
        """다수의 법조항 노드 생성
        
        Args:
            articles: 법조항 객체 리스트
            batch_size: 배치 크기
            
        Returns:
            생성된 법조항 수
        """
        count = 0
        start_time = time.time()
        
        # 벌크 연산을 위한 쿼리 최적화
        batch_query = """
        UNWIND $articles AS article
        MERGE (a:Article {id: article.id})
        SET a.text = article.text,
            a.embedding = article.embedding
        """
        
        for i in range(0, len(articles), batch_size):
            batch = articles[i:i+batch_size]
            
            # 유효한 항목만 필터링
            valid_articles = []
            for article in batch:
                if not article.text:
                    print(f"Skipping article {article.id} due to empty content.")
                    continue
                
                valid_articles.append({
                    "id": article.id,
                    "text": article.text,
                    "embedding": article.embedding
                })
                
            if valid_articles:
                try:
                    self.run_query(batch_query, {"articles": valid_articles})
                    count += len(valid_articles)
                except Exception as e:
                    print(f"Error processing article batch: {e}")
                    
                    # 실패 시 개별 처리로 폴백
                    for article_data in valid_articles:
                        try:
                            article = Article(
                                id=article_data["id"],
                                text=article_data["text"],
                                embedding=article_data["embedding"]
                            )
                            self.create_article(article)
                            count += 1
                        except Exception as e2:
                            print(f"Error processing article {article_data['id']}: {e2}")
            
            if (i + batch_size) % (batch_size * 5) == 0:
                print(f"  Processed {i + batch_size}/{len(articles)} articles...")
                
        end_time = time.time()
        print(f"Finished creating {count} Article nodes in {end_time - start_time:.2f} seconds.")
        
        return count
    
    def find_article_by_id(self, article_id: str) -> Optional[Article]:
        """ID로 법조항 찾기
        
        Args:
            article_id: 법조항 ID
            
        Returns:
            법조항 객체 또는 None
        """
        query = """
        MATCH (a:Article {id: $article_id})
        RETURN a.id as id, a.text as text, a.embedding as embedding
        """
        results = self.run_query(query, {"article_id": article_id})
        
        if not results:
            return None
            
        result = results[0]
        return Article(
            id=result["id"],
            text=result["text"],
            embedding=result["embedding"]
        )
    
    def search_articles_by_vector(self, query_embedding: List[float], limit: int = 5) -> List[SearchResult]:
        """벡터 유사도 기반 법조항 검색 - 최적화된 버전
        
        Args:
            query_embedding: 쿼리 임베딩
            limit: 최대 결과 수
            
        Returns:
            검색 결과 리스트
        """
        query = """
        CALL db.index.vector.queryNodes('article_embedding', $limit, $query_embedding) 
        YIELD node, score
        RETURN node.id AS id, node.text AS text, score
        """
        results = self.run_query(
            query,
            {
                "limit": limit,
                "query_embedding": query_embedding
            }
        )
        
        return [
            SearchResult(
                id=result["id"],
                type="Article",
                score=result["score"],
                text=result["text"]
            )
            for result in results
        ]
    
    def search_articles_related_to_precedents(self, query_embedding: List[float], top_k: int = 8) -> List[SearchResult]:
        """판례 관계를 고려한 법조항 검색 - 최적화된 버전
        
        Args:
            query_embedding: 쿼리 임베딩
            top_k: 최대 결과 수
            
        Returns:
            검색 결과 리스트
        """
        # 최적화된 쿼리: 불필요한 중간 집계 제거, 인덱스 활용 극대화
        query = """
        // 1. 벡터 검색으로 시작 법조항 찾기 - 더 많은 후보를 가져옴
        CALL db.index.vector.queryNodes('article_embedding', $initial_limit, $query_embedding) 
        YIELD node as article, score as article_score
        
        // 2. 법조항과 판례 관계 및 공통 키워드 계산
        WITH article, article_score
        
        // 판례 관계 파악 
        OPTIONAL MATCH (precedent:Precedent)-[:REFERENCES_ARTICLE]->(article)
        WITH article, article_score, count(precedent) as precedent_count
        
        // 3. 최종 점수 계산 (벡터 점수 + 판례 인용 보너스)
        WITH article, 
            article_score + (precedent_count * 0.02) as final_score,
            precedent_count
        
        // 판례와 키워드 정보 수집
        OPTIONAL MATCH (precedent:Precedent)-[:REFERENCES_ARTICLE]->(article)
        OPTIONAL MATCH (precedent)-[:HAS_KEYWORD]->(keyword:Keyword)
        
        // 4. 결과 집계 및 반환
        WITH article, final_score, precedent_count, 
             collect(DISTINCT keyword.text) as keywords
        
        RETURN article.id as id, 
               article.text as text, 
               final_score as score,
               precedent_count,
               keywords
        ORDER BY final_score DESC
        LIMIT $top_k
        """
        
        results = self.run_query(
            query,
            {
                "initial_limit": top_k * 3,  # 더 많은 후보를 검색해 관계 점수를 반영
                "top_k": top_k,
                "query_embedding": query_embedding
            }
        )
        
        return [
            SearchResult(
                id=result["id"],
                type="Article",
                score=result["score"],
                text=result["text"],
                metadata={
                    "precedent_count": result["precedent_count"],
                    "related_keywords": result["keywords"]
                }
            )
            for result in results
        ]

print("법조항 리포지토리 클래스 정의 완료")

법조항 리포지토리 클래스 정의 완료


In [5]:
class PrecedentRepository(Neo4jRepository):
    """판례 관련 데이터베이스 작업을 처리하는 클래스"""
    
    def create_precedent(self, precedent: Precedent) -> None:
        """판례 노드 생성
        
        Args:
            precedent: 판례 객체
        """
        query = """
        MERGE (p:Precedent {id: $case_id})
        SET p.name = $case_name,
            p.judgment_summary = $judgment_summary,
            p.full_summary = $full_summary,
            p.embedding = $embedding
        """
        self.run_query(
            query,
            {
                "case_id": precedent.id,
                "case_name": precedent.name,
                "judgment_summary": precedent.judgment_summary,
                "full_summary": precedent.full_summary,
                "embedding": precedent.embedding
            }
        )
        
        # 키워드 노드 생성 및 관계 설정 (최적화: 벌크 처리)
        if precedent.keywords:
            keywords_query = """
            UNWIND $keywords AS keyword_text
            MERGE (k:Keyword {text: keyword_text})
            WITH k
            MATCH (p:Precedent {id: $case_id})
            MERGE (p)-[:HAS_KEYWORD]->(k)
            """
            self.run_query(
                keywords_query,
                {
                    "keywords": precedent.keywords,
                    "case_id": precedent.id
                }
            )
            
        # 참조 법조항 관계 설정 (최적화: 벌크 처리)
        if precedent.referenced_rules:
            rules_query = """
            MATCH (p:Precedent {id: $case_id})
            UNWIND $article_refs AS article_ref
            MATCH (a:Article)
            WHERE a.id STARTS WITH article_ref
            MERGE (p)-[:REFERENCES_ARTICLE]->(a)
            """
            self.run_query(
                rules_query,
                {
                    "case_id": precedent.id,
                    "article_refs": precedent.referenced_rules
                }
            )
    
    def create_bulk_precedents(self, precedents: List[Precedent], batch_size: int = 25) -> int:
        """다수의 판례 노드 생성
        
        Args:
            precedents: 판례 객체 리스트
            batch_size: 배치 크기
            
        Returns:
            생성된 판례 수
        """
        count = 0
        start_time = time.time()
        
        # 벌크 연산을 위한 쿼리 최적화
        batch_query = """
        UNWIND $precedents AS precedent
        MERGE (p:Precedent {id: precedent.id})
        SET p.name = precedent.name,
            p.judgment_summary = precedent.judgment_summary,
            p.full_summary = precedent.full_summary,
            p.embedding = precedent.embedding
        """
        
        for i in range(0, len(precedents), batch_size):
            batch = precedents[i:i+batch_size]
            
            # 유효한 항목만 필터링
            valid_precedents = []
            for precedent in batch:
                text_to_embed = precedent.full_summary or precedent.judgment_summary
                if not text_to_embed:
                    print(f"Skipping precedent {precedent.id} due to empty summary.")
                    continue
                
                valid_precedents.append({
                    "id": precedent.id,
                    "name": precedent.name,
                    "judgment_summary": precedent.judgment_summary,
                    "full_summary": precedent.full_summary,
                    "embedding": precedent.embedding
                })
            
            if valid_precedents:
                try:
                    # 1. 노드 생성
                    self.run_query(batch_query, {"precedents": valid_precedents})
                    
                    # 2. 키워드 및 관계 설정
                    for precedent in batch:
                        if precedent.full_summary or precedent.judgment_summary:
                            # 키워드 노드 및 관계 설정
                            if precedent.keywords:
                                self.run_query(
                                    """
                                    UNWIND $keywords AS keyword_text
                                    MERGE (k:Keyword {text: keyword_text})
                                    WITH k
                                    MATCH (p:Precedent {id: $case_id})
                                    MERGE (p)-[:HAS_KEYWORD]->(k)
                                    """,
                                    {
                                        "keywords": precedent.keywords,
                                        "case_id": precedent.id
                                    }
                                )
                            
                            # 참조 법조항 관계 설정
                            if precedent.referenced_rules:
                                self.run_query(
                                    """
                                    MATCH (p:Precedent {id: $case_id})
                                    UNWIND $article_refs AS article_ref
                                    MATCH (a:Article)
                                    WHERE a.id STARTS WITH article_ref
                                    MERGE (p)-[:REFERENCES_ARTICLE]->(a)
                                    """,
                                    {
                                        "case_id": precedent.id,
                                        "article_refs": precedent.referenced_rules
                                    }
                                )
                            
                            count += 1
                except Exception as e:
                    print(f"Error processing precedent batch: {e}")
                    
                    # 실패 시 개별 처리로 폴백
                    for precedent in batch:
                        text_to_embed = precedent.full_summary or precedent.judgment_summary
                        if text_to_embed:
                            try:
                                self.create_precedent(precedent)
                                count += 1
                            except Exception as e2:
                                print(f"Error processing precedent {precedent.id}: {e2}")
            
            if (i + batch_size) % (batch_size * 2) == 0:
                print(f"  Processed {i + batch_size}/{len(precedents)} precedents...")
                
        end_time = time.time()
        print(f"Finished creating {count} Precedent nodes in {end_time - start_time:.2f} seconds.")
        
        return count
    
    def search_precedents_by_vector(self, query_embedding: List[float], limit: int = 5) -> List[SearchResult]:
        """벡터 유사도 기반 판례 검색 - 최적화된 버전
        
        Args:
            query_embedding: 쿼리 임베딩
            limit: 최대 결과 수
            
        Returns:
            검색 결과 리스트
        """
        query = """
        CALL db.index.vector.queryNodes('precedent_embedding', $limit, $query_embedding) 
        YIELD node, score
        
        // 관계 정보 한 번에 가져오기 (최적화)
        OPTIONAL MATCH (node)-[:REFERENCES_ARTICLE]->(a:Article)
        OPTIONAL MATCH (node)-[:HAS_KEYWORD]->(k:Keyword)
        
        // 결과 집계 및 반환
        RETURN node.id AS id, 
               node.name AS name, 
               node.full_summary AS text, 
               score,
               collect(DISTINCT a.id) as referenced_articles,
               collect(DISTINCT k.text) as keywords
        """
        results = self.run_query(
            query,
            {
                "limit": limit,
                "query_embedding": query_embedding
            }
        )
        
        return [
            SearchResult(
                id=result["id"],
                type="Precedent",
                score=result["score"],
                text=result["text"],
                metadata={
                    "name": result["name"],
                    "referenced_articles": result["referenced_articles"],
                    "keywords": result["keywords"]
                }
            )
            for result in results
        ]
    
    def search_precedents_by_article(self, 
                                    article_id: str, 
                                    query_embedding: List[float], 
                                    query_keywords: List[str] = None, 
                                    limit: int = 2) -> List[SearchResult]:
        """특정 법조항과 관련된 판례 검색 - 최적화된 버전
        
        Args:
            article_id: 법조항 ID
            query_embedding: 쿼리 임베딩
            query_keywords: 쿼리 키워드
            limit: 최대 결과 수
            
        Returns:
            검색 결과 리스트
        """
        # 최적화된 쿼리
        query = """
        // 1. 특정 법조항을 참조하는 판례 찾기 + 벡터 유사도 동시 계산
        MATCH (precedent:Precedent)-[:REFERENCES_ARTICLE]->(article:Article)
        WHERE article.id STARTS WITH $article_id
        
        // 2. 벡터 유사도 계산 (더 효율적인 처리)
        CALL {
            WITH precedent
            CALL db.index.vector.queryNodes('precedent_embedding', 100, $query_embedding) 
            YIELD node, score
            WHERE node = precedent
            RETURN node as precedent, score as vector_score
        }
        
        // 3. 키워드 관계 정보 수집 및 보너스 점수 계산
        OPTIONAL MATCH (precedent)-[:HAS_KEYWORD]->(keyword:Keyword)
        WITH precedent, vector_score, 
             collect(DISTINCT keyword.text) as keywords,
             sum(CASE WHEN $query_keywords IS NULL THEN 0
                  WHEN keyword.text IN $query_keywords
                  THEN 0.05 ELSE 0 END) as keyword_bonus
        
        // 4. 다른 법조항 참조 정보 수집
        OPTIONAL MATCH (precedent)-[:REFERENCES_ARTICLE]->(ref_article:Article)
        
        // 5. 최종 결과 반환
        RETURN precedent.id as id,
               precedent.name as name,
               precedent.full_summary as text,
               vector_score + keyword_bonus as score,
               keywords,
               collect(DISTINCT ref_article.id) as referenced_articles
        ORDER BY score DESC
        LIMIT $limit
        """
        
        results = self.run_query(
            query,
            {
                "article_id": article_id,
                "query_embedding": query_embedding,
                "query_keywords": query_keywords or [],
                "limit": limit
            }
        )
        
        return [
            SearchResult(
                id=result["id"],
                type="Precedent",
                score=result["score"],
                text=result["text"],
                metadata={
                    "name": result["name"],
                    "keywords": result["keywords"],
                    "referenced_articles": result["referenced_articles"]
                }
            )
            for result in results
        ]

print("판례 리포지토리 클래스 정의 완료")

판례 리포지토리 클래스 정의 완료


In [6]:
# 데이터 로더 및 처리기
# ==================

class DataLoader:
    """데이터 로드 및 전처리를 위한 클래스"""
    
    @staticmethod
    def load_articles_from_pdf(pdf_path: str) -> Dict[str, Article]:
        """PDF에서 법조항 로드 - 개선된 버전
        
        Args:
            pdf_path: PDF 파일 경로
            
        Returns:
            법조항 딕셔너리 (ID -> Article)
        """
        try:
            loader = PyPDFLoader(pdf_path)
            pages = loader.load()
            full_text = "\n".join(page.page_content for page in pages)
            
            # 전체 텍스트에서 모든 조항 시작 위치 찾기
            article_pattern = r'제\d+조(?:의\d+)?(?:\s*\(.+?\))?'
            matches = list(re.finditer(article_pattern, full_text))
            
            articles = {}
            for i in range(len(matches)):
                current_match = matches[i]
                current_article_id = current_match.group(0).strip()  # 현재 조항 ID
                
                # 현재 조항 시작 위치
                start_pos = current_match.start()
                
                # 다음 조항 시작 위치 (없으면 텍스트 끝까지)
                end_pos = matches[i+1].start() if i < len(matches)-1 else len(full_text)
                
                # 현재 조항의 전체 내용 (ID 포함)
                article_text = full_text[start_pos:end_pos].strip()
                
                # 저장 (ID는 조항 번호만)
                articles[current_article_id] = Article(id=current_article_id, text=article_text)
            
            print(f"Processed {len(articles)} articles from PDF")
            return articles
            
        except Exception as e:
            print(f"Error loading articles from PDF: {e}")
            return {}
    
    @staticmethod
    def load_precedents_from_json(directory_path: str, limit: int = None) -> List[Precedent]:
        """JSON 파일에서 판례 로드 - 개선된 버전
        
        Args:
            directory_path: JSON 파일 디렉토리 경로
            limit: 최대 로드할 판례 수
            
        Returns:
            판례 객체 리스트
        """
        precedents = []
        
        try:
            file_list = os.listdir(directory_path)
            json_files = [f for f in file_list if f.endswith(".json")]
            
            if limit:
                json_files = json_files[:limit]
            
            # 병렬 처리로 성능 향상
            with concurrent.futures.ThreadPoolExecutor(max_workers=min(10, os.cpu_count() * 2)) as executor:
                future_to_file = {
                    executor.submit(DataLoader._load_single_precedent, os.path.join(directory_path, filename)): 
                    filename for filename in json_files
                }
                
                for future in tqdm(concurrent.futures.as_completed(future_to_file), 
                                  total=len(json_files), 
                                  desc="Loading precedents"):
                    filename = future_to_file[future]
                    try:
                        precedent = future.result()
                        if precedent:
                            precedents.append(precedent)
                    except Exception as e:
                        print(f"Error processing {filename}: {e}")
            
            print(f"Loaded {len(precedents)} precedents.")
            return precedents
            
        except Exception as e:
            print(f"Error loading precedents from JSON: {e}")
            return []
    
    @staticmethod
    def _load_single_precedent(filepath: str) -> Optional[Precedent]:
        """단일 판례 JSON 파일 로드 (병렬 처리용)
        
        Args:
            filepath: JSON 파일 경로
            
        Returns:
            판례 객체 또는 None
        """
        try:
            with open(filepath, 'r', encoding='utf-8') as f:
                data = json.load(f)
                
                # 기존에 라벨링 되어있었음
                precedent_info = {
                    "id": data.get("info", {}).get("caseNoID", os.path.basename(filepath).replace(".json", "")),
                    "name": data.get("info", {}).get("caseNm"),
                    "judgment_summary": data.get("jdgmn"),
                    "full_summary": " ".join([s.get("summ_contxt", "") for s in data.get("Summary", [])]),
                    "keywords": [kw.get("keyword") for kw in data.get("keyword_tagg", []) if kw.get("keyword")],
                    "referenced_rules": data.get("Reference_info", {}).get("reference_rules", "").split(',') if data.get("Reference_info", {}).get("reference_rules") else [],
                    "referenced_cases": data.get("Reference_info", {}).get("reference_court_case", "").split(',') if data.get("Reference_info", {}).get("reference_court_case") else [],
                }
                
                # 참조 법조항 정제 (조항 번호만)
                cleaned_rules = []
                rule_pattern = re.compile(r'제\d+조(?:의\d+)?')  # 패턴 찾기: "제X조" or "제X조의Y"
                
                for rule in precedent_info["referenced_rules"]:
                    # 각 규칙 문자열에서 모든 일치 항목 찾기
                    matches = rule_pattern.findall(rule.strip())
                    cleaned_rules.extend(matches)
                    
                precedent_info["referenced_rules"] = list(set(cleaned_rules))  # 중복 제거
                
                # Precedent 객체 생성
                precedent = Precedent(
                    id=precedent_info["id"],
                    name=precedent_info["name"],
                    judgment_summary=precedent_info["judgment_summary"],
                    full_summary=precedent_info["full_summary"],
                    keywords=precedent_info["keywords"],
                    referenced_rules=precedent_info["referenced_rules"],
                    referenced_cases=precedent_info["referenced_cases"]
                )
                
                return precedent
                
        except json.JSONDecodeError:
            print(f"Warning: Could not decode JSON from {os.path.basename(filepath)}")
            return None
        except Exception as e:
            print(f"Error processing {os.path.basename(filepath)}: {e}")
            return None


class TextProcessor:
    """텍스트 처리 유틸리티 클래스 - 최적화된 버전"""
    
    # 불용어 정의를 클래스 변수로 (한 번만 로드)
    stopwords = [
        "무엇", "어떤", "어떻게", "언제", "누구", "왜", "어디", "경우", "관하여", "대하여", 
        "은", "는", "이", "가", "을", "를", "에", "의", "와", "과", "로", "으로",
        "있다", "없다", "경우", "때", "것", "등", "수", "그", "이", "저", "그렇게",
        "그런", "이런", "저런", "하는", "다음", "또는", "또한", "그리고", "만약", "만일"
    ]
    
    # 주요 법률 용어 사전
    legal_terms = {
        # 형법 기본 원칙
        "고의": 3, "과실": 3, "인과관계": 3, "위법성": 3, "책임": 3, 
        "구성요건": 3, "위법성조각사유": 3, "책임조각사유": 3,
        
        # 정당화 사유
        "정당방위": 3, "긴급피난": 3, "자구행위": 3, "피해자동의": 3, "정당행위": 3, 
        "업무로인한행위": 3, "강요된행위": 3,
        
        # 범죄의 실행 단계
        "미수": 3, "기수": 3, "예비": 3, "음모": 3, "중지": 3, "불능미수": 3,
        
        # 공범 관련
        "공범": 3, "교사": 3, "방조": 3, "공동정범": 3, "간접정범": 3, "종범": 3,
        
        # 형벌 관련
        "형": 2, "징역": 2, "벌금": 2, "집행유예": 2, "선고유예": 2, "누범": 2,
        
        # 법이론
        "법익": 2, "작위": 2, "부작위": 2, "결과범": 2, "거동범": 2, "상당인과관계": 2,
        
        # 기타 형법 용어
        "불법": 2, "과잉방위": 2, "우발적": 2, "착오": 2, "원인에서자유로운행위": 2
    }
    
    # 동의어 매핑
    synonyms = {
        "고의": ["범의", "의도적", "계획적"],
        "과실": ["부주의", "태만", "소홀"],
        "위법성": ["불법성", "위법", "불법"],
        "책임": ["형사책임", "귀책", "비난가능성"],
        "미수": ["미완성", "불성공"],
        "정당방위": ["자기방어", "방어행위"]
    }
    
    @staticmethod
    def extract_keywords(text: str) -> List[str]:
        """텍스트에서 주요 키워드 추출 - 최적화된 버전
        
        Args:
            text: 입력 텍스트
            
        Returns:
            키워드 리스트
        """
        # 기본 키워드 추출 (2글자 이상)
        words = re.findall(r'\w{2,}', text)
        basic_keywords = [w for w in words if w not in TextProcessor.stopwords]
        
        # 인접한 단어들도 함께 고려 (복합 키워드)
        bigrams = []
        for i in range(len(words) - 1):
            if words[i] not in TextProcessor.stopwords or words[i+1] not in TextProcessor.stopwords:
                bigram = words[i] + words[i+1]
                if len(bigram) >= 4:  # 4글자 이상 복합어만 고려
                    bigrams.append(bigram)
        
        # 가중치 및 동의어 적용
        weighted_keywords = []
        
        for keyword in basic_keywords + bigrams:
            if keyword in TextProcessor.legal_terms:
                # 가중치만큼 반복 추가
                weighted_keywords.extend([keyword] * TextProcessor.legal_terms[keyword])
                
                # 동의어도 추가
                if keyword in TextProcessor.synonyms:
                    for synonym in TextProcessor.synonyms[keyword]:
                        weighted_keywords.append(synonym)
            else:
                weighted_keywords.append(keyword)
        
        # 빈도수 계산 및 필터링
        from collections import Counter
        counter = Counter(weighted_keywords)
        
        # 키워드가 적으면 낮은 기준 적용
        min_count = 1 if len(counter) < 10 else 2
        
        final_keywords = [k for k, v in counter.items() if v >= min_count]
        
        # 키워드가 너무 적으면 원래 키워드 반환
        if len(final_keywords) < 3:
            return weighted_keywords[:10]  # 최대 10개만 사용
        
        return final_keywords[:15]  # 최대 15개 키워드만 사용 (성능 최적화)
    
    @staticmethod
    def summarize_text(text: str, question: str, is_article: bool = True) -> str:
        """텍스트 요약 및 관련 부분 강조 - 최적화된 버전
        
        Args:
            text: 입력 텍스트
            question: 질문 
            is_article: 법조항 텍스트 여부
            
        Returns:
            요약된 텍스트
        """
        # 텍스트가 이미 짧으면 그대로 반환
        min_len = 500 if is_article else 300
        if len(text) < min_len:
            return text
        
        # 키워드 추출
        keywords = TextProcessor.extract_keywords(question)
        
        # 문장 분리 (정규식 컴파일로 성능 향상)
        sentence_pattern = re.compile(r'(?<=[.!?])\s+|(?<=\n)')
        sentences = sentence_pattern.split(text)
        sentences = [s.strip() for s in sentences if s.strip()]
        
        # 문장별 점수 계산
        scored_sentences = []
        
        # 법조항 및 법률 용어 패턴 컴파일 (반복 사용 최적화)
        article_pattern = re.compile(r'제\d+조')
        legal_term_pattern = re.compile(r'(판시|판결|법리|해석|적용|요건|효과|정당|위법|책임)')
        conclusion_pattern = re.compile(r'(따라서|그러므로|결론적으로|이유로|판단한다)')
        important_pattern = re.compile(r'(~으로 한다|~라 함은|~을 말한다|다만|단,|제외한다)')
        
        for i, sentence in enumerate(sentences):
            # 기본 점수
            score = 0
            
            # 키워드 매칭 점수
            for keyword in keywords:
                if keyword in sentence:
                    # 중요 법률 용어는 더 높은 가중치
                    if keyword in ["구성요건", "위법성", "책임", "정당방위", "긴급피난", 
                                "고의", "과실", "미수", "예비", "음모", "공범"]:
                        score += 3
                    else:
                        score += 1
            
            # 법조항 번호 포함 문장은 높은 가중치
            if article_pattern.search(sentence):
                score += 5
            
            # 법률적 중요 문장 패턴에 가중치
            if important_pattern.search(sentence):
                score += 3
            
            # 법률 용어가 많은 문장은 높은 가중치
            term_count = len(legal_term_pattern.findall(sentence))
            score += min(term_count, 3)  # 최대 3점
            
            # 결론 표현 포함 시 높은 점수
            if conclusion_pattern.search(sentence):
                score += 3
            
            # 위치 가중치
            if i == 0:  # 첫 문장 (제목이나 조항 번호일 가능성)
                score += 5
            elif i == len(sentences) - 1:  # 마지막 문장 (결론일 가능성)
                score += 2
            elif i <= 2:  # 앞부분 문장들 (정의나 개요일 가능성)
                score += 1
            
            scored_sentences.append((sentence, score, i))
        
        # 법조항과 판례별 다른 전략 적용
        if is_article:
            # 점수 기준으로 상위 문장 선택
            min_score = max(1, sorted([score for _, score, _ in scored_sentences], reverse=True)[0] * 0.3)
            relevant_sentences = [(s, score, i) for s, score, i in scored_sentences if score >= min_score]
            
            # 최소 문장 수 보장
            min_sentences = min(8, len(sentences))
            if len(relevant_sentences) < min_sentences:
                relevant_sentences = sorted(scored_sentences, key=lambda x: x[1], reverse=True)[:min_sentences]
        else:
            # 판례는 더 많은 문장 포함
            top_count = max(5, min(len(sentences) // 3, 10))
            relevant_sentences = sorted(scored_sentences, key=lambda x: x[1], reverse=True)[:top_count]
        
        # 원래 순서로 정렬
        ordered_sentences = sorted(relevant_sentences, key=lambda x: x[2])
        
        # 법조항 번호와 제목은 항상 포함
        if is_article and len(ordered_sentences) > 0 and 0 not in [i for _, _, i in ordered_sentences]:
            ordered_sentences.insert(0, (sentences[0], 0, 0))
        
        result = " ".join([s for s, _, _ in ordered_sentences])
        
        # 결과가 너무 짧으면 원본 반환
        min_ratio = 0.3 if is_article else 0.25
        if len(result) < len(text) * min_ratio:
            return text[:1500] if len(text) > 2000 else text  # 너무 긴 텍스트는 앞부분만 사용
            
        return result
    
    @staticmethod
    def format_context(search_results: List[SearchResult], question: str) -> str:
        """검색 결과에서 질문에 최적화된 컨텍스트 구성 - 개선된 버전
        
        Args:
            search_results: 검색 결과 리스트
            question: 질문
            
        Returns:
            포맷팅된 컨텍스트
        """
        # 검색 결과가 없으면 빈 컨텍스트 반환
        if not search_results:
            return "관련 형법 자료를 찾지 못했습니다."
            
        # 검색 결과 중 상위 결과에 대해 질문 관련성 재평가
        question_lower = question.lower()
        scored_results = []
        
        for result in search_results:
            relevance_score = result.score
            
            # 텍스트 내 키워드 매칭 검사로 관련성 점수 보정
            for keyword in TextProcessor.extract_keywords(question):
                if keyword in result.text.lower():
                    relevance_score += 0.05
            
            # 법 조항 ID가 질문에 언급된 경우 높은 가중치
            if result.type == "Article" and result.id in question:
                relevance_score += 0.3
                
            # 판례 이름이 질문에 언급된 경우
            if result.type == "Precedent" and result.metadata.get("name") and result.metadata["name"] in question:
                relevance_score += 0.2
                
            scored_results.append((result, relevance_score))
        
        # 관련성 점수로 재정렬
        scored_results.sort(key=lambda x: x[1], reverse=True)
        reranked_results = [r for r, _ in scored_results]
        
        # 검색 결과 타입별 분류
        results_by_type = {"Article": [], "Precedent": []}
        for result in reranked_results:
            results_by_type[result.type].append(result)
        
        # 법조항 컨텍스트 구성
        article_contexts = []
        for article in results_by_type["Article"][:5]:  # 최대 5개
            # 법조항 요약
            summarized_text = TextProcessor.summarize_text(article.text, question, is_article=True)
            
            # 점수에 따른 강조
            if article.score > 0.7:
                article_contexts.append(f"【중요 법조항: {article.id}】\n{summarized_text}")
            else:
                article_contexts.append(f"【{article.id}】\n{summarized_text}")
                
            # 관련 키워드가 있으면 표시
            if article.metadata.get("related_keywords"):
                keywords = article.metadata["related_keywords"]
                if keywords and len(keywords) > 0:
                    keywords_str = ", ".join(keywords[:5])
                    article_contexts[-1] += f"\n[관련 키워드: {keywords_str}]"
        
        # 판례 컨텍스트 구성
        precedent_contexts = []
        for precedent in results_by_type["Precedent"][:3]:  # 최대 3개
            # 판례 요약
            summarized_text = TextProcessor.summarize_text(precedent.text, question, is_article=False)
            
            # 판례명이 있으면 표시
            name_str = f" - {precedent.metadata.get('name', '')}" if precedent.metadata.get("name") else ""
            precedent_contexts.append(f"【판례 {precedent.id}{name_str}】\n{summarized_text}")
            
            # 참조 법조항이 있으면 표시
            if precedent.metadata.get("referenced_articles"):
                refs = ", ".join(precedent.metadata["referenced_articles"][:3])
                precedent_contexts[-1] += f"\n[참조 법조항: {refs}]"
                
            # 키워드가 있으면 표시
            if precedent.metadata.get("keywords"):
                keywords_str = ", ".join(precedent.metadata["keywords"][:5])
                precedent_contexts[-1] += f"\n[관련 키워드: {keywords_str}]"
        
        # 질문 분석 및 분류
        question_category = ""
        if any(term in question_lower for term in ["구성요건", "범죄성립", "해당", "요건"]):
            question_category = "구성요건 분석 문제"
        elif any(term in question_lower for term in ["위법", "정당", "방위", "피난", "적법"]):
            question_category = "위법성 판단 문제"
        elif any(term in question_lower for term in ["책임", "비난", "책임능력", "강요", "초법규"]):
            question_category = "책임 판단 문제"
        elif any(term in question_lower for term in ["미수", "예비", "음모", "중지", "불능"]):
            question_category = "미수 관련 문제"
        elif any(term in question_lower for term in ["형", "형벌", "형량", "처벌", "징역", "벌금"]):
            question_category = "형벌 관련 문제"
        elif any(term in question_lower for term in ["공범", "공동정범", "교사", "방조", "종범"]):
            question_category = "공범 관련 문제"
        else:
            question_category = "일반 형법 문제"
            
        # 전체 컨텍스트 구성
        formatted_context = "\n\n".join([
            f"### 형법 관련 참고 자료 ({question_category}) ###",
            "## 관련 법조항:",
            "\n\n".join(article_contexts) if article_contexts else "관련 법조항 정보가 없습니다.",
            "## 관련 판례:",
            "\n\n".join(precedent_contexts) if precedent_contexts else "관련 판례 정보가 없습니다.",
            "### 참고사항: 형법 해석 시 구성요건-위법성-책임 순서로 판단하며, 법조항과 판례를 함께 고려하십시오. ###"
        ])
        
        return formatted_context
    
    @staticmethod
    def extract_answer(text: str) -> Optional[str]:
        """텍스트에서 A, B, C, D 중 답변 추출 - 최적화된 버전
        
        Args:
            text: 입력 텍스트
            
        Returns:
            추출된 답변 (A, B, C, D) 또는 None
        """
        # 정규표현식 패턴들
        patterns = [
            # 직접 응답 패턴
            r'^([A-D])$',
            r'^답(?:변|안|)(?:은|): ?([A-D])',
            r'정답(?:은|): ?([A-D])',
            r'([A-D])(?:가|이|을|를) 선택',
            r'([A-D])(?:가|이|이) 정답',
            r'([A-D])(?:가|이|을|를) (?:고른다|고릅니다|고르겠습니다)',
            
            # 간접 응답 패턴
            r'따라서 (?:정답은 |답은 |)([A-D])',
            r'([A-D])(?:가|이|은|는) (?:가장 적절|가장 정확|옳은)',
            
            # 결론 문장 패턴
            r'(?:최종적으로|결론적으로|종합하면|따라서|분석 결과|이상의 이유로).{1,50}(?:정답은|답은|옳은 것은|맞는 것은) ?([A-D])',
            r'(?:선택지|옵션) ?([A-D])(?:가|이|은|는) (?:정답|맞습니다|맞다|적절|적합|옳은|타당)',
            r'정답은 선택지 ?([A-D])',
            r'선택지 ?([A-D])(?:을|를)? ?(?:선택합니다|고릅니다|고르겠습니다|골라야 합니다)',
            
            # 비교 분석 패턴
            r'(?:따라서|그러므로|그래서|이에).{0,30}([A-D])(?:이외|를 제외하고|빼고).{0,20}(?:모두|다른 선택지|다른 것)(?:는|은) (?:틀리|오답|부적절|타당하지 않)',
            r'선택지 ([A-D])(?:만|이).{0,30}(?:정확|옳|적절|타당|맞)',
            
            # 부정 표현을 통한 정답 유추
            r'(?:선택지|옵션) ([A-D])(?:을|를)? ?제외한.{1,20}(?:틀리|오답|부적절)',
            r'([A-D])(?:을|를)? ?제외한.{1,20}(?:나머지|다른).{1,20}(?:틀리|오답|부적절)',
        ]
        
        # 대문자 또는 소문자 답변을 모두 허용 (대문자로 통일)
        normalized_text = text.upper()
        
        # 패턴 적용
        for pattern in patterns:
            match = re.search(pattern, normalized_text, re.IGNORECASE | re.MULTILINE)
            if match:
                return match.group(1).upper()
        
        # 문장별 검색 (마지막 문장들에 집중)
        lines = normalized_text.split('\n')
        
        # 마지막 3개 문장 우선 검색 (결론이 주로 마지막에 위치)
        last_lines = lines[-3:] if len(lines) >= 3 else lines
        for line in last_lines:
            for pattern in patterns:
                match = re.search(pattern, line, re.IGNORECASE)
                if match:
                    return match.group(1).upper()
        
        # 문맥 기반 접근
        options = ['A', 'B', 'C', 'D']
        
        # 가장 강조된 선택지 찾기
        option_emphasis = {option: 0 for option in options}
        
        for option in options:
            # 긍정적 언급 패턴 (가중치: +2)
            option_emphasis[option] += normalized_text.count(f"{option}이 옳다") * 2
            option_emphasis[option] += normalized_text.count(f"{option}가 맞다") * 2
            option_emphasis[option] += normalized_text.count(f"{option}이 정답") * 2
            option_emphasis[option] += normalized_text.count(f"{option}만 옳다") * 2
            option_emphasis[option] += normalized_text.count(f"정답은 {option}") * 2
            
            # 일반적 언급 패턴 (가중치: +1)
            option_emphasis[option] += normalized_text.count(f"선택지 {option}")
            option_emphasis[option] += normalized_text.count(f"{option}.")
            option_emphasis[option] += normalized_text.count(f"{option}이 ")
            option_emphasis[option] += normalized_text.count(f"{option}가 ")
            option_emphasis[option] += normalized_text.count(f"{option}은 ")
            option_emphasis[option] += normalized_text.count(f"{option}는 ")
            
            # 부정 표현 (가중치: -2)
            option_emphasis[option] -= normalized_text.count(f"{option}이 아니") * 2
            option_emphasis[option] -= normalized_text.count(f"{option}가 아니") * 2
            option_emphasis[option] -= normalized_text.count(f"{option}은 틀린") * 2
            option_emphasis[option] -= normalized_text.count(f"{option}는 틀린") * 2
            option_emphasis[option] -= normalized_text.count(f"{option}은 부적절") * 2
        
        # 최종 결정: 긍정적 언급이 가장 많은 선택지 또는 부정적 언급이 가장 적은 선택지
        if any(emphasis > 0 for emphasis in option_emphasis.values()):
            return max(option_emphasis.items(), key=lambda x: x[1])[0]
        
        # 출현 빈도 기반 추측 (모든 방법이 실패했을 때)
        option_counts = {option: normalized_text.count(option) for option in options}
        if any(count > 0 for count in option_counts.values()):
            return max(option_counts.items(), key=lambda x: x[1])[0]
        
        # 응답에서 아무 것도 찾지 못한 경우
        return None

print("데이터 로더 및 텍스트 처리기 클래스 정의 완료")

데이터 로더 및 텍스트 처리기 클래스 정의 완료


In [7]:
# RAG 검색 서비스
# =============

class GraphSearchService:
    """그래프 기반 검색 서비스 - 최적화된 버전"""
    
    def __init__(self, 
                 neo4j_uri: str, 
                 neo4j_username: str, 
                 neo4j_password: str,
                 embedding_model: Any,
                 embedding_dimension: int = 1536):
        """검색 서비스 초기화
        
        Args:
            neo4j_uri: Neo4j 서버 URI
            neo4j_username: 사용자명 
            neo4j_password: 비밀번호
            embedding_model: 임베딩 모델
            embedding_dimension: 임베딩 벡터 차원
        """
        self.article_repo = ArticleRepository(neo4j_uri, neo4j_username, neo4j_password)
        self.precedent_repo = PrecedentRepository(neo4j_uri, neo4j_username, neo4j_password)
        self.embedding_model = embedding_model
        self.embedding_dimension = embedding_dimension
        
        # 연결 및 스키마 설정
        self.article_repo.connect()
        self.article_repo.setup_constraints_and_indexes(embedding_dimension)
        
        # 캐시 초기화 (검색 성능 향상)
        self.query_cache = {}
    
    def search(self, query_text: str, top_k: int = 8) -> List[SearchResult]:
        """질의에 대한 관련 컨텍스트 검색 - 최적화된 버전
        
        Args:
            query_text: 검색 질의 텍스트
            top_k: 반환할 최대 결과 수
            
        Returns:
            검색 결과 리스트
        """
        start_time = time.time()
        
        # 캐시 검사 (동일 쿼리 반복 실행 최적화)
        cache_key = f"{query_text}_{top_k}"
        if cache_key in self.query_cache:
            print(f"Cache hit for query: {query_text[:30]}...")
            return self.query_cache[cache_key]
        
        try:
            # 임베딩 생성
            query_embedding = self.embedding_model.embed_query(query_text)
            
            # 키워드 추출
            keywords = TextProcessor.extract_keywords(query_text)
            
            # 법조항 검색 (그래프 관계 활용)
            article_results = self.article_repo.search_articles_related_to_precedents(
                query_embedding, 
                top_k=top_k
            )
            
            # 관련 판례 검색
            precedent_results = []
            
            # 상위 법조항에 대해 판례 검색 (병렬 처리)
            if article_results:
                async def fetch_related_precedents():
                    tasks = []
                    for article in article_results[:3]:  # 상위 3개 법조항만
                        tasks.append(self._async_search_precedents(article.id, query_embedding, keywords))
                    
                    all_results = await asyncio.gather(*tasks)
                    return [precedent for sublist in all_results for precedent in sublist]
                
                # 비동기 실행
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
                try:
                    precedent_results = loop.run_until_complete(fetch_related_precedents())
                finally:
                    loop.close()
            
            # 결과가 충분하지 않으면 직접 벡터 검색 추가
            if len(precedent_results) < 2:
                direct_precedent_results = self.precedent_repo.search_precedents_by_vector(
                    query_embedding,
                    limit=3
                )
                
                # 중복 제거하며 추가
                for precedent in direct_precedent_results:
                    if not any(p.id == precedent.id for p in precedent_results):
                        precedent_results.append(precedent)
            
            # 모든 결과 통합 및 점수 기준 정렬
            all_results = article_results + precedent_results
            all_results.sort(key=lambda x: x.score, reverse=True)
            
            # 최종 결과 선택
            final_results = all_results[:top_k]
            
            # 결과 캐싱
            self.query_cache[cache_key] = final_results
            
            end_time = time.time()
            print(f"Search completed in {end_time - start_time:.2f} seconds")
            
            return final_results
            
        except Exception as e:
            print(f"Error in graph search: {e}")
            
            # 백업: 기본 벡터 검색
            try:
                # 임베딩 생성 (재시도)
                query_embedding = self.embedding_model.embed_query(query_text)
                
                # 간단한 벡터 검색
                article_results = self.article_repo.search_articles_by_vector(
                    query_embedding, 
                    limit=top_k//2
                )
                
                precedent_results = self.precedent_repo.search_precedents_by_vector(
                    query_embedding,
                    limit=top_k//2
                )
                
                # 결과 통합 및 정렬
                all_results = article_results + precedent_results
                all_results.sort(key=lambda x: x.score, reverse=True)
                
                print("Fallback search completed successfully")
                return all_results[:top_k]
                
            except Exception as e2:
                print(f"Fallback search failed: {e2}")
                return []
    
    async def _async_search_precedents(self, article_id: str, query_embedding: List[float], keywords: List[str]) -> List[SearchResult]:
        """법조항 관련 판례 비동기 검색 (병렬 처리용)"""
        return self.precedent_repo.search_precedents_by_article(
            article_id,
            query_embedding,
            query_keywords=keywords,
            limit=2
        )
    
    def close(self) -> None:
        """리소스 정리"""
        self.article_repo.close()
        # 캐시 정리
        self.query_cache.clear()
        print("Graph search service resources released.")

print("그래프 검색 서비스 클래스 정의 완료")

그래프 검색 서비스 클래스 정의 완료


In [9]:
# 메인 파이프라인
# ============

class LegalRAGPipeline:
    """법률 RAG 파이프라인 - 최적화된 버전"""
    
    def __init__(self, 
                 openai_api_key: str, 
                 neo4j_uri: str, 
                 neo4j_username: str, 
                 neo4j_password: str):
        """파이프라인 초기화
        
        Args:
            openai_api_key: OpenAI API 키
            neo4j_uri: Neo4j 서버 URI
            neo4j_username: Neo4j 사용자명
            neo4j_password: Neo4j 비밀번호
        """
        self.openai_client = OpenAI(api_key=openai_api_key)
        self.openai_api_key = openai_api_key
        
        # 더 효율적인 임베딩 설정 (batch 지원)
        self.embedding_model = OpenAIEmbeddings(
            model='text-embedding-3-small', 
            api_key=openai_api_key,
            openai_api_key=openai_api_key,
            chunk_size=100  # API 호출 배치 크기
        )
        self.embedding_dimension = 1536  # text-embedding-3-small 차원
        
        self.search_service = GraphSearchService(
            neo4j_uri,
            neo4j_username,
            neo4j_password,
            self.embedding_model,
            self.embedding_dimension
        )
        
        self.text_processor = TextProcessor()
        
        # LLM 모델 설정
        self.llm = ChatOpenAI(
            model_name="gpt-4o-mini",
            openai_api_key=openai_api_key,
            temperature=0.1  # 예측 안정성 향상
        )
    
    def load_data(self, pdf_path: str, precedent_dir: str, precedent_limit: int = 1000) -> Tuple[Dict[str, Article], List[Precedent]]:
        """데이터 로드 및 그래프 구축 - 최적화된 버전
        
        Args:
            pdf_path: 법조항 PDF 경로
            precedent_dir: 판례 디렉토리 경로
            precedent_limit: 최대 로드할 판례 수
            
        Returns:
            (법조항 딕셔너리, 판례 리스트)
        """
        print("1. 데이터 로드 시작...")
        
        # 데이터 로드
        articles = DataLoader.load_articles_from_pdf(pdf_path)
        precedents = DataLoader.load_precedents_from_json(precedent_dir, limit=precedent_limit)
        
        print(f"2. 데이터 로드 완료: {len(articles)} 법조항, {len(precedents)} 판례")
        
        # 임베딩 생성 (배치 처리)
        print("3. 법조항 임베딩 생성 시작...")
        
        # 임베딩 함수를 정의
        def generate_embeddings_batch(items, is_articles=True):
            texts = []
            item_ids = []
            
            # 텍스트 수집
            for item_id, item in items:
                if is_articles:
                    texts.append(item.text)
                else:
                    text_to_embed = item.full_summary or item.judgment_summary
                    if text_to_embed:
                        texts.append(text_to_embed)
                        item_ids.append(item_id)
            
            # 빈 텍스트 확인
            if not texts:
                return
                
            # 배치 임베딩 수행
            embeddings = self.embedding_model.embed_documents(texts)
            
            # 결과 매핑
            for i, (item_id, embedding) in enumerate(zip(item_ids, embeddings)):
                if is_articles:
                    articles[item_id].embedding = embedding
                else:
                    precedents[item_id].embedding = embedding
        
        # 법조항 배치 처리 (500개 단위)
        article_items = list(articles.items())
        batch_size = 100
        
        for i in range(0, len(article_items), batch_size):
            batch = article_items[i:i+batch_size]
            generate_embeddings_batch(batch, is_articles=True)
            if (i + batch_size) % (batch_size * 5) == 0:
                print(f"  법조항 임베딩 진행중: {i + batch_size}/{len(article_items)}...")
        
        print("4. 판례 임베딩 생성 시작...")
        
        # 판례 배치 처리
        precedent_items = [(i, p) for i, p in enumerate(precedents)]
        for i in range(0, len(precedent_items), batch_size):
            batch = precedent_items[i:i+batch_size]
            generate_embeddings_batch(batch, is_articles=False)
            if (i + batch_size) % (batch_size * 5) == 0:
                print(f"  판례 임베딩 진행중: {i + batch_size}/{len(precedent_items)}...")
        
        print("5. 임베딩 생성 완료")
        
        # 그래프 구축
        print("6. Neo4j 그래프 구축 시작...")
        
        article_repo = ArticleRepository(
            self.search_service.article_repo.uri,
            self.search_service.article_repo.username,
            self.search_service.article_repo.password
        )
        article_repo.connect()
        article_count = article_repo.create_bulk_articles(list(articles.values()))
        
        precedent_repo = PrecedentRepository(
            self.search_service.precedent_repo.uri,
            self.search_service.precedent_repo.username,
            self.search_service.precedent_repo.password
        )
        precedent_repo.connect()
        precedent_count = precedent_repo.create_bulk_precedents(precedents)
        
        # 연결 종료
        article_repo.close()
        precedent_repo.close()
        
        print(f"7. 그래프 구축 완료: {article_count} 법조항, {precedent_count} 판례 노드 생성")
        
        return articles, precedents
    
    def create_batch_requests(self, questions_df: pd.DataFrame) -> List[Dict[str, Any]]:
        """배치 API 요청 생성 - 최적화된 버전
        
        Args:
            questions_df: 질문 데이터프레임
            
        Returns:
            배치 요청 리스트
        """
        batch_requests = []
        
        # 토큰 제한을 고려한 최대 문맥 길이
        MAX_CONTEXT_LENGTH = 5000
        
        # 모든 질문에 대해 RAG 검색 실행
        retrieved_contexts = {}
        
        print("RAG 검색으로 문맥 검색 시작...")
        for idx, row in tqdm(questions_df.iterrows(), total=len(questions_df), desc="문맥 검색"):
            question = row['question']
            try:
                # RAG 검색으로 문맥 가져오기
                search_results = self.search_service.search(question, top_k=8)
                retrieved_contexts[idx] = search_results
            except Exception as e:
                print(f"Error in RAG search for question {idx}: {e}")
                retrieved_contexts[idx] = []
        print("RAG 검색 완료")
        
        # 형법 전문가 시스템 프롬프트
        system_prompt = """당신은 법률 분야, 특히 한국 형법 전문가입니다. 주어진 질문을 해결하기 위해 다음 단계를 따르세요:

1. 문제의 핵심 쟁점 파악 - 구성요건, 위법성, 책임 중 어디에 해당하는지 
2. 관련 법조항 분석 - 제시된 법조항의 요건과 효과 정확히 파악
3. 판례 법리 적용 - 유사 판례의 해석론 적용
4. 체계적 분석 - 형법 해석의 기본 원칙에 따라 단계별 분석

답변은 A, B, C, D 중 하나만 선택하며, 형법의 정확한 해석과 적용에 근거하여 결정하세요. 확신할 수 없더라도 제공된 문맥을 토대로 가장 정확한 답변을 선택해야 합니다."""
        
        # 배치 요청 준비
        print("배치 요청 준비 시작...")
        for idx, row in tqdm(questions_df.iterrows(), total=len(questions_df), desc="배치 요청 준비"):
            question = row['question']
            options = {
                'A': row['A'],
                'B': row['B'], 
                'C': row['C'],
                'D': row['D']
            }
            
            # 검색된 문맥 가져오기
            search_results = retrieved_contexts.get(idx, [])
            
            # 최적화된 컨텍스트 구성
            if search_results:
                context_str = self.text_processor.format_context(search_results, question)
                # 문맥이 너무 길면 잘라내기
                if len(context_str) > MAX_CONTEXT_LENGTH:
                    context_str = context_str[:MAX_CONTEXT_LENGTH] + "... (문맥이 너무 길어 일부 생략됨)"
            else:
                context_str = "관련 문맥 정보가 없습니다. 주어진 질문과 선택지만으로 판단하세요."
            
            # 향상된 프롬프트 작성
            prompt = f"""다음은 한국 형법에 관한 객관식 문제입니다. 제공된 문맥 정보를 참고하여 가장 적절한 답변을 선택하세요.

## 질문
{question}

## 선택지
A. {options['A']}
B. {options['B']}
C. {options['C']}
D. {options['D']}

## 관련 문맥 정보
{context_str}

## 분석 단계
1. 문제 유형 파악: 구성요건/위법성/책임/기타 중 어떤 문제인지 결정
2. 해당 법조항 분석: 문맥에서 제공된 법조항의 요건과 효과를 정확히 이해
3. 판례 검토: 유사한 판례의 법리 원칙을 식별하고 적용
4. 선택지 분석: 각 선택지가 왜 맞는지 또는 틀린지 법적 근거를 들어 설명
5. 최종 선택: 가장 정확한 선택지 하나 결정

## 최종 답변
형법 원칙에 따라 분석한 결과, 정답은 (A/B/C/D) 입니다.
"""
            
            # 배치 요청 생성
            request = {
                "custom_id": f"q_{idx}",
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": "gpt-4o-mini",
                    "messages": [
                        {"role": "system", "content": system_prompt},
                        {"role": "user", "content": prompt}
                    ],
                    "max_tokens": 400,
                    "temperature": 0.1  # 낮은 temperature로 일관된 답변 촉진
                }
            }
            
            batch_requests.append(request)
        
        print(f"배치 요청 준비 완료: 총 {len(batch_requests)}개 요청 생성")
        return batch_requests
    
    def run_batch_api(self, batch_requests: List[Dict[str, Any]], output_dir: str = "results") -> str:
        """배치 API 실행 - 최적화된 버전
        
        Args:
            batch_requests: 배치 요청 리스트
            output_dir: 결과 저장 디렉토리
            
        Returns:
            배치 출력 파일 경로
        """
        # 결과 디렉토리 생성
        os.makedirs(output_dir, exist_ok=True)
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        # JSONL 파일로 저장
        batch_file_path = f"{output_dir}/criminal_law_batch_input_{timestamp}.jsonl"
        with open(batch_file_path, 'w', encoding='utf-8') as f:
            for request in batch_requests:
                f.write(json.dumps(request, ensure_ascii=False) + '\n')
        
        print(f"배치 파일 저장 완료: {batch_file_path} (총 {len(batch_requests)}개 요청)")
        
        # 배치 파일 업로드
        try:
            batch_input_file = self.openai_client.files.create(
                file=open(batch_file_path, "rb"),
                purpose="batch"
            )
            batch_input_file_id = batch_input_file.id
            print(f"배치 파일 업로드 완료: ID={batch_input_file_id}")
        except Exception as e:
            print(f"배치 파일 업로드 실패: {e}")
            # 실패 시 중단
            raise
        
        # 배치 작업 생성
        try:
            batch_job = self.openai_client.batches.create(
                input_file_id=batch_input_file_id,
                endpoint="/v1/chat/completions",
                completion_window="24h",
                metadata={"description": "Criminal Law benchmark evaluation"}
            )
            batch_id = batch_job.id
            print(f"배치 작업 생성 완료: ID={batch_id}")
        except Exception as e:
            print(f"배치 작업 생성 실패: {e}")
            # 실패 시 중단
            raise
        
        # 배치 작업 상태 확인 및 대기
        print("배치 작업 진행 상황 모니터링 시작...")
        start_time = time.time()
        
        # 상태 체크 간격 설정
        check_intervals = [30] * 20 + [60] * 10 + [120]  # 처음 10분: 30초, 다음 10분: 1분, 이후: 2분
        interval_idx = 0
        
        status = None
        while True:
            try:
                status = self.openai_client.batches.retrieve(batch_id)
                elapsed_time = time.time() - start_time
                
                # 진행 상황 표시
                completed = status.request_counts.completed if status.request_counts else 0
                total = status.request_counts.total if status.request_counts else 0
                
                progress = f"{completed}/{total}" if total > 0 else "N/A"
                print(f"[{datetime.now().strftime('%H:%M:%S')}] 상태: {status.status}, 진행: {progress}, 경과 시간: {elapsed_time:.2f}초")
                
                if status.status in ['completed', 'failed', 'cancelled', 'expired']:
                    break
                
                # 체크 간격 조정
                if interval_idx < len(check_intervals) - 1:
                    interval_idx += 1
                
                time.sleep(check_intervals[interval_idx])
                
            except Exception as e:
                print(f"상태 확인 중 오류 발생: {e}")
                time.sleep(60)  # 오류 발생 시 1분 대기 후 재시도
        
        end_time = time.time()
        total_time = end_time - start_time
        print(f"배치 작업 완료: 상태={status.status}, 소요 시간={total_time:.2f}초")
        
        # 작업이 성공적으로 완료된 경우 결과 처리
        output_file_path = f"{output_dir}/criminal_law_batch_output_{timestamp}.jsonl"
        
        if status.status == 'completed':
            output_file_id = status.output_file_id
            print(f"배치 결과 파일 다운로드 시작: ID={output_file_id}")
            
            # 결과 파일 다운로드
            try:
                file_response = self.openai_client.files.content(output_file_id)
                
                # 결과 파일 저장
                with open(output_file_path, 'w', encoding='utf-8') as f:
                    f.write(file_response.text)
                
                print(f"배치 결과 파일 저장 완료: {output_file_path}")
            except Exception as e:
                print(f"결과 파일 다운로드 실패: {e}")
                raise
        else:
            print(f"배치 작업이 정상적으로 완료되지 않음: 상태={status.status}")
            if hasattr(status, 'errors') and status.errors:
                print("오류 내용:")
                for error in status.errors:
                    print(f"  - {error}")
            raise Exception(f"배치 작업 실패: {status.status}")
        
        return output_file_path
    
    def evaluate_results(self, 
                         batch_output_path: str, 
                         questions_df: pd.DataFrame,
                         output_dir: str = "results") -> Dict[str, Any]:
        """결과 평가 - 최적화된 버전
        
        Args:
            batch_output_path: 배치 출력 파일 경로
            questions_df: 질문 데이터프레임
            output_dir: 결과 저장 디렉토리
            
        Returns:
            평가 결과 요약
        """
        timestamp = os.path.basename(batch_output_path).split('_')[-1].split('.')[0]
        
        print(f"배치 결과 평가 시작: {batch_output_path}")
        
        # 배치 결과 로드
        batch_results = []
        with open(batch_output_path, 'r', encoding='utf-8') as f:
            for line in f:
                if line.strip():
                    batch_results.append(json.loads(line))
        
        print(f"배치 결과 로드 완료: 총 {len(batch_results)}개 결과")
        
        # 정확도 평가
        correct_count = 0
        results_with_answers = []
        
        # 카테고리별 성능 추적을 위한 변수

In [10]:
def evaluate_results(self, 
                    batch_output_path: str, 
                    questions_df: pd.DataFrame,
                    output_dir: str = "results") -> Dict[str, Any]:
    """결과 평가 - 정교한 분석 버전
    
    Args:
        batch_output_path: 배치 출력 파일 경로
        questions_df: 질문 데이터프레임
        output_dir: 결과 저장 디렉토리
        
    Returns:
        평가 결과 요약
    """
    timestamp = os.path.basename(batch_output_path).split('_')[-1].split('.')[0]
    
    print(f"배치 결과 평가 시작: {batch_output_path}")
    
    # 배치 결과 로드
    batch_results = []
    with open(batch_output_path, 'r', encoding='utf-8') as f:
        for line in f:
            if line.strip():
                batch_results.append(json.loads(line))
    
    print(f"배치 결과 로드 완료: 총 {len(batch_results)}개 결과")
    
    # 정확도 평가
    correct_count = 0
    results_with_answers = []
    
    # 카테고리별 성능 추적
    question_types = {
        '구성요건': {'correct': 0, 'total': 0},
        '위법성': {'correct': 0, 'total': 0},
        '책임': {'correct': 0, 'total': 0},
        '미수/공범': {'correct': 0, 'total': 0},
        '형벌': {'correct': 0, 'total': 0}, 
        '기타': {'correct': 0, 'total': 0}
    }
    
    # 오답 패턴 분석
    error_patterns = {
        'A로 예측했으나 실제 정답은 B': 0,
        'A로 예측했으나 실제 정답은 C': 0,
        'A로 예측했으나 실제 정답은 D': 0,
        'B로 예측했으나 실제 정답은 A': 0,
        'B로 예측했으나 실제 정답은 C': 0,
        'B로 예측했으나 실제 정답은 D': 0,
        'C로 예측했으나 실제 정답은 A': 0,
        'C로 예측했으나 실제 정답은 B': 0,
        'C로 예측했으나 실제 정답은 D': 0,
        'D로 예측했으나 실제 정답은 A': 0,
        'D로 예측했으나 실제 정답은 B': 0,
        'D로 예측했으나 실제 정답은 C': 0
    }
    
    # 신뢰도 분석 (응답의 확신도에 따른 정확도)
    confidence_levels = {
        '높음': {'correct': 0, 'total': 0},
        '중간': {'correct': 0, 'total': 0},
        '낮음': {'correct': 0, 'total': 0}
    }
    
    # 결과 분석을 위한 정규 표현식 패턴
    confidence_high = re.compile(r'(확실|분명|명백|틀림없|확신|100%|매우 높|강한 확신)')
    confidence_low = re.compile(r'(불확실|어려운|애매|모호|낮은 확신|불명확|확신할 수 없)')
    
    # 질문 유형 분류 패턴
    type_patterns = {
        '구성요건': re.compile(r'(구성요건|범죄의?\s*성립|성립요건|행위 태양|주체|객체|행위|결과)'),
        '위법성': re.compile(r'(위법성|정당방위|긴급피난|자구행위|피해자동의|의사)'),
        '책임': re.compile(r'(책임|책임능력|원인에서 자유로운 행위|강요된 행위|기대가능성|심신|정상|초법규)'),
        '미수/공범': re.compile(r'(미수|공범|예비|음모|중지|불능|교사|방조|종범|공동정범|간접정범)'),
        '형벌': re.compile(r'(형벌|처벌|양형|누범|선고유예|집행유예|가중|감경)')
    }
    
    for result in batch_results:
        custom_id = result['custom_id']
        idx = int(custom_id.split('_')[1])
        
        if result.get('error') is not None:
            print(f"Error in result {custom_id}: {result['error']}")
            continue
        
        try:
            response_text = result['response']['body']['choices'][0]['message']['content'].strip()
            
            # 응답에서 답변 추출 (A, B, C, D 중 하나)
            predicted_answer = self.text_processor.extract_answer(response_text)
            
            if predicted_answer is None:
                print(f"Could not extract answer from response for question {idx}: {response_text[:100]}...")
                continue
            
            # 정답과 비교 (CSV에서는 1-indexed, 1=A, 2=B, 3=C, 4=D)
            correct_answer = chr(64 + questions_df.iloc[idx]['answer'])  # 1->A, 2->B, 3->C, 4->D
            is_correct = (predicted_answer == correct_answer)
            
            if is_correct:
                correct_count += 1
            else:
                # 오답 패턴 추적
                error_key = f'{predicted_answer}로 예측했으나 실제 정답은 {correct_answer}'
                if error_key in error_patterns:
                    error_patterns[error_key] += 1
            
            # 질문 유형 분류
            question_type = '기타'
            question_text = questions_df.iloc[idx]['question'].lower()
            
            for type_name, pattern in type_patterns.items():
                if pattern.search(question_text):
                    question_type = type_name
                    break
                    
            # 유형별 정확도 추적
            if question_type in question_types:
                question_types[question_type]['total'] += 1
                if is_correct:
                    question_types[question_type]['correct'] += 1
            
            # 신뢰도 분석
            confidence_level = '중간'  # 기본값
            if confidence_high.search(response_text):
                confidence_level = '높음'
            elif confidence_low.search(response_text):
                confidence_level = '낮음'
                
            confidence_levels[confidence_level]['total'] += 1
            if is_correct:
                confidence_levels[confidence_level]['correct'] += 1
            
            # 전체 결과 수집
            results_with_answers.append({
                'question_id': idx,
                'question': questions_df.iloc[idx]['question'],
                'predicted': predicted_answer,
                'actual': correct_answer,
                'is_correct': is_correct,
                'confidence': confidence_level,
                'question_type': question_type,
                'response': response_text
            })
        except Exception as e:
            print(f"Error processing result for question {idx}: {e}")
    
    accuracy = correct_count / len(results_with_answers) if results_with_answers else 0
    print(f"정확도: {accuracy:.4f} ({correct_count}/{len(results_with_answers)})")
    
    # 유형별 정확도 계산
    type_accuracies = {}
    for q_type, stats in question_types.items():
        if stats['total'] > 0:
            type_accuracies[q_type] = stats['correct'] / stats['total']
            print(f"{q_type} 유형 정확도: {type_accuracies[q_type]:.4f} ({stats['correct']}/{stats['total']})")
    
    # 신뢰도 수준별 정확도 계산
    confidence_accuracies = {}
    for conf_level, stats in confidence_levels.items():
        if stats['total'] > 0:
            confidence_accuracies[conf_level] = stats['correct'] / stats['total']
            print(f"{conf_level} 신뢰도 응답 정확도: {confidence_accuracies[conf_level]:.4f} ({stats['correct']}/{stats['total']})")
    
    # 결과를 CSV 파일로 저장
    results_df = pd.DataFrame(results_with_answers)
    results_file = f"{output_dir}/criminal_law_results_{timestamp}.csv"
    results_df.to_csv(results_file, index=False)
    print(f"상세 결과 CSV 파일 저장 완료: {results_file}")
    
    # 결과 시각화 및 분석 차트 생성
    if len(results_with_answers) > 0:
        # 결과 시각화 디렉토리 생성
        viz_dir = f"{output_dir}/visualizations"
        os.makedirs(viz_dir, exist_ok=True)
        
        # 1. 혼동 행렬 시각화
        plt.figure(figsize=(10, 8))
        confusion_data = {
            'A': {'A': 0, 'B': 0, 'C': 0, 'D': 0},
            'B': {'A': 0, 'B': 0, 'C': 0, 'D': 0},
            'C': {'A': 0, 'B': 0, 'C': 0, 'D': 0},
            'D': {'A': 0, 'B': 0, 'C': 0, 'D': 0}
        }
        
        for result in results_with_answers:
            confusion_data[result['actual']][result['predicted']] += 1
            
        conf_matrix = np.array([
            [confusion_data['A']['A'], confusion_data['A']['B'], confusion_data['A']['C'], confusion_data['A']['D']],
            [confusion_data['B']['A'], confusion_data['B']['B'], confusion_data['B']['C'], confusion_data['B']['D']],
            [confusion_data['C']['A'], confusion_data['C']['B'], confusion_data['C']['C'], confusion_data['C']['D']],
            [confusion_data['D']['A'], confusion_data['D']['B'], confusion_data['D']['C'], confusion_data['D']['D']]
        ])
        
        sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues',
                   xticklabels=['A', 'B', 'C', 'D'],
                   yticklabels=['A', 'B', 'C', 'D'])
        plt.title('혼동 행렬 (Confusion Matrix)')
        plt.xlabel('예측 답변')
        plt.ylabel('실제 정답')
        plt.tight_layout()
        confusion_file = f"{viz_dir}/confusion_matrix_{timestamp}.png"
        plt.savefig(confusion_file)
        plt.close()
        
        # 2. 질문 유형별 정확도 시각화
        plt.figure(figsize=(12, 6))
        types = []
        accs = []
        counts = []
        
        for q_type, stats in question_types.items():
            if stats['total'] > 0:
                types.append(q_type)
                accs.append(stats['correct'] / stats['total'])
                counts.append(stats['total'])
        
        # 카운트에 비례하는 막대 너비 설정
        max_count = max(counts) if counts else 1
        widths = [0.3 + 0.7 * (count / max_count) for count in counts]
        
        # 막대 그래프 생성, 높이는 정확도, 너비는 카운트에 비례
        for i, (q_type, acc, width) in enumerate(zip(types, accs, widths)):
            plt.barh(i, acc, height=width, color='skyblue')
            plt.text(acc + 0.02, i, f'{acc:.3f} ({question_types[q_type]["correct"]}/{question_types[q_type]["total"]})')
        
        plt.yticks(range(len(types)), types)
        plt.xlabel('정확도')
        plt.title('질문 유형별 정확도')
        plt.xlim(0, 1.1)
        plt.grid(axis='x', linestyle='--', alpha=0.7)
        plt.tight_layout()
        type_acc_file = f"{viz_dir}/type_accuracy_{timestamp}.png"
        plt.savefig(type_acc_file)
        plt.close()
        
        # 3. 신뢰도별 정확도 시각화
        plt.figure(figsize=(10, 6))
        confs = []
        conf_accs = []
        conf_counts = []
        
        for conf, stats in confidence_levels.items():
            if stats['total'] > 0:
                confs.append(conf)
                conf_accs.append(stats['correct'] / stats['total'])
                conf_counts.append(stats['total'])
        
        bars = plt.bar(confs, conf_accs, color=['green', 'yellow', 'red'])
        plt.title('신뢰도 수준별 정확도')
        plt.ylabel('정확도')
        plt.ylim(0, 1)
        
        # 막대 위에 카운트 표시
        for i, (bar, count, acc) in enumerate(zip(bars, conf_counts, conf_accs)):
            plt.text(bar.get_x() + bar.get_width()/2, acc + 0.02, 
                   f'{acc:.3f}\n({count}개)', 
                   ha='center', va='bottom')
            
        plt.grid(axis='y', linestyle='--', alpha=0.7)
        plt.tight_layout()
        conf_acc_file = f"{viz_dir}/confidence_accuracy_{timestamp}.png"
        plt.savefig(conf_acc_file)
        plt.close()
        
        print(f"분석 결과 시각화 파일 저장 완료: {viz_dir}/")
    
    # 결과 요약 정보
    summary = {
        'timestamp': timestamp,
        'total_questions': len(questions_df),
        'processed_questions': len(results_with_answers),
        'correct_answers': correct_count,
        'accuracy': accuracy,
        'type_accuracies': type_accuracies,
        'confidence_accuracies': confidence_accuracies,
        'error_patterns': error_patterns,
        'results_file': results_file,
        'batch_output_file': batch_output_path,
        'visualization_files': {
            'confusion_matrix': confusion_file if len(results_with_answers) > 0 else None,
            'type_accuracy': type_acc_file if len(results_with_answers) > 0 else None,
            'confidence_accuracy': conf_acc_file if len(results_with_answers) > 0 else None
        }
    }
    
    # 요약 파일 저장
    summary_file = f"{output_dir}/criminal_law_benchmark_summary_{timestamp}.json"
    with open(summary_file, 'w', encoding='utf-8') as f:
        json.dump(summary, f, ensure_ascii=False, indent=2)
    
    print(f"벤치마크 평가 완료. 최종 정확도: {accuracy:.4f}")
    print(f"결과 요약 파일: {summary_file}")
    
    return summary

In [11]:
def run_pipeline(self, 
                questions_csv_path: str, 
                output_dir: str = "results",
                skip_data_loading: bool = True) -> Dict[str, Any]:
    """전체 파이프라인 실행 - 통합 최적화 버전
    
    Args:
        questions_csv_path: 질문 CSV 파일 경로
        output_dir: 결과 저장 디렉토리
        skip_data_loading: 데이터 로딩 단계 건너뛰기 여부
        
    Returns:
        평가 결과 요약
    """
    start_time = time.time()
    
    print("=== 한국 형법 RAG 파이프라인 실행 시작 ===")
    print(f"시작 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
    # 데이터 로드 (선택적)
    if not skip_data_loading:
        print("\n=== 단계 1: 데이터 로드 및 그래프 구축 ===")
        pdf_path = './dataset/criminal-law.pdf'
        precedent_dir = './dataset/precedent_label/'
        
        if not os.path.exists(pdf_path):
            print(f"경고: {pdf_path} 파일이 존재하지 않습니다!")
            return {"error": f"파일을 찾을 수 없음: {pdf_path}"}
            
        if not os.path.exists(precedent_dir):
            print(f"경고: {precedent_dir} 디렉토리가 존재하지 않습니다!")
            return {"error": f"디렉토리를 찾을 수 없음: {precedent_dir}"}
            
        self.load_data(pdf_path, precedent_dir)
    else:
        print("\n=== 단계 1: 데이터 로드 건너뛰기 (skip_data_loading=True) ===")
    
    # 질문 로드
    try:
        print("\n=== 단계 2: 질문 데이터 로드 ===")
        questions_df = pd.read_csv(questions_csv_path)
        print(f"CSV 파일에서 {len(questions_df)} 개의 질문 로드 완료")
        
        # 데이터 정합성 확인
        required_columns = ['question', 'answer', 'A', 'B', 'C', 'D']
        missing_columns = [col for col in required_columns if col not in questions_df.columns]
        
        if missing_columns:
            error_msg = f"CSV 파일 형식 오류: 다음 열이 없습니다: {', '.join(missing_columns)}"
            print(f"오류: {error_msg}")
            return {"error": error_msg}
            
        # 데이터 유효성 검사
        if questions_df['answer'].min() < 1 or questions_df['answer'].max() > 4:
            print("경고: 'answer' 열의 값이 유효하지 않습니다. 값은 1-4 사이여야 합니다.")
            
        # 진행률 출력을 위한 전체 단계 수
        total_steps = 3 if skip_data_loading else 4
        current_step = 2
        
    except Exception as e:
        print(f"질문 데이터 로드 실패: {e}")
        return {"error": f"CSV 로드 오류: {str(e)}"}
    
    try:
        # 배치 요청 생성
        print(f"\n=== 단계 {current_step + 1}/{total_steps}: 배치 요청 생성 및 실행 ===")
        batch_requests = self.create_batch_requests(questions_df)
        
        # 배치 API 실행
        batch_output_path = self.run_batch_api(batch_requests, output_dir)
        current_step += 1
        
        # 결과 평가
        print(f"\n=== 단계 {current_step + 1}/{total_steps}: 결과 평가 및 분석 ===")
        summary = self.evaluate_results(batch_output_path, questions_df, output_dir)
        
        # 리소스 정리
        print("\n=== 리소스 정리 중... ===")
        self.search_service.close()
        
        end_time = time.time()
        total_time = end_time - start_time
        
        # 총 실행 시간 추가
        summary['execution_time_seconds'] = total_time
        summary['execution_time_formatted'] = f"{total_time // 3600:.0f}시간 {(total_time % 3600) // 60:.0f}분 {total_time % 60:.2f}초"
        
        print(f"\n=== 파이프라인 실행 완료 ===")
        print(f"총 실행 시간: {summary['execution_time_formatted']}")
        print(f"정확도: {summary['accuracy']:.4f} ({summary['correct_answers']}/{summary['processed_questions']})")
        
        return summary
        
    except Exception as e:
        import traceback
        print(f"파이프라인 실행 중 오류 발생: {e}")
        traceback.print_exc()
        
        end_time = time.time()
        total_time = end_time - start_time
        
        return {
            "error": str(e),
            "execution_time_seconds": total_time,
            "execution_time_formatted": f"{total_time // 3600:.0f}시간 {(total_time % 3600) // 60:.0f}분 {total_time % 60:.2f}초"
        }

In [13]:
# 메인 실행 함수
# ===========

def main():
    """메인 실행 함수 - 개선된 버전"""
    print("한국 형법 RAG 에이전트 시스템 시작하는 중...")
    
    # 환경 변수 로드
    load_dotenv()
    
    # 환경 설정
    openai_api_key = os.getenv("OPENAI_API_KEY")
    neo4j_uri = os.getenv("NEO4J_URI")
    neo4j_username = os.getenv("NEO4J_USERNAME")
    neo4j_password = os.getenv("NEO4J_PASSWORD")
    
    # 필수 환경 변수 확인
    missing_vars = []
    if not openai_api_key:
        missing_vars.append("OPENAI_API_KEY")
    if not neo4j_uri:
        missing_vars.append("NEO4J_URI")
    if not neo4j_username:
        missing_vars.append("NEO4J_USERNAME")
    if not neo4j_password:
        missing_vars.append("NEO4J_PASSWORD")
        
    if missing_vars:
        print(f"오류: 다음 환경 변수가 설정되지 않았습니다: {', '.join(missing_vars)}")
        print("프로그램을 실행하기 전 .env 파일에 모든 환경 변수를 설정해주세요.")
        return
    
    # 필요한 디렉토리 구조 확인
    os.makedirs("dataset", exist_ok=True)
    os.makedirs("results", exist_ok=True)
    
    # 입력 파일 확인
    questions_csv_path = './dataset/Criminal-Law-test.csv'
    if not os.path.exists(questions_csv_path):
        print(f"오류: 질문 파일을 찾을 수 없습니다: {questions_csv_path}")
        print("dataset 디렉토리에 CriminalLawtest.csv 파일이 있는지 확인하세요.")
        return
    
    # 판례 데이터와 형법 PDF 파일 확인
    pdf_path = './dataset/criminal-law.pdf'
    precedent_dir = './dataset/precedent_label/'
    
    skip_data_loading = True  # 기본값
    
    if not os.path.exists(pdf_path):
        print(f"경고: 형법 PDF 파일을 찾을 수 없습니다: {pdf_path}")
        print("PDF 파일이 없으면 데이터 로딩 단계를 건너뛸 수 없습니다.")
    
    if not os.path.exists(precedent_dir) or not os.path.isdir(precedent_dir):
        print(f"경고: 판례 디렉토리를 찾을 수 없습니다: {precedent_dir}")
        print("판례 디렉토리가 없으면 데이터 로딩 단계를 건너뛸 수 없습니다.")
    
    if not os.path.exists(pdf_path) or not os.path.exists(precedent_dir):
        # 사용자에게 데이터 로딩 여부 확인
        user_input = input("데이터 파일이 없습니다. 그래도 진행하시겠습니까? (데이터가 이미 Neo4j에 로드되어 있다면 'y' 입력): ")
        skip_data_loading = user_input.lower() == 'y'
        
        if not skip_data_loading:
            print("프로그램을 종료합니다. 필요한 데이터 파일을 준비한 후 다시 실행하세요.")
            return
    
    try:
        # 파이프라인 생성 및 실행
        print("파이프라인 초기화 중...")
        pipeline = LegalRAGPipeline(
            openai_api_key,
            neo4j_uri,
            neo4j_username,
            neo4j_password
        )
        
        # 데이터 설정
        output_dir = "results"
        
        # 파이프라인 실행
        summary = pipeline.run_pipeline(
            questions_csv_path,
            output_dir,
            skip_data_loading=skip_data_loading
        )
        
        # 오류 확인
        if "error" in summary:
            print(f"\n실행 중 오류가 발생했습니다: {summary['error']}")
            if "execution_time_formatted" in summary:
                print(f"총 실행 시간: {summary['execution_time_formatted']}")
            return
        
        # 결과 요약 출력
        print("\n=== 파이프라인 실행 요약 ===")
        print(f"총 질문 수: {summary['total_questions']}")
        print(f"처리된 질문 수: {summary['processed_questions']}")
        print(f"정답 수: {summary['correct_answers']}")
        print(f"정확도: {summary['accuracy']:.4f}")
        print(f"총 실행 시간: {summary['execution_time_formatted']}")
        print(f"결과 파일: {summary['results_file']}")
        print(f"배치 출력 파일: {summary['batch_output_file']}")
        
        # 유형별 정확도 출력
        if 'type_accuracies' in summary:
            print("\n=== 질문 유형별 정확도 ===")
            for q_type, acc in summary['type_accuracies'].items():
                print(f"{q_type}: {acc:.4f}")
                
        print("\n작업이 성공적으로 완료되었습니다!")
        
    except Exception as e:
        import traceback
        print(f"프로그램 실행 중 예상치 못한 오류 발생: {e}")
        traceback.print_exc()


if __name__ == "__main__":
    main()

한국 형법 RAG 에이전트 시스템 시작하는 중...
파이프라인 초기화 중...
프로그램 실행 중 예상치 못한 오류 발생: 1 validation error for OpenAIEmbeddings
openai_api_key
  Extra inputs are not permitted [type=extra_forbidden, input_value='sk-proj-xc7byHaIfHgPJpCb...QExXYBRZuFZonNzN9O1y2cA', input_type=str]
    For further information visit https://errors.pydantic.dev/2.11/v/extra_forbidden


Traceback (most recent call last):
  File "/var/folders/mh/1w84fr7s5kxcwc2l24qrjjwc0000gn/T/ipykernel_22028/898157036.py", line 70, in main
    pipeline = LegalRAGPipeline(
  File "/var/folders/mh/1w84fr7s5kxcwc2l24qrjjwc0000gn/T/ipykernel_22028/1719764201.py", line 24, in __init__
    self.embedding_model = OpenAIEmbeddings(
  File "/opt/anaconda3/envs/venv/lib/python3.9/site-packages/pydantic/main.py", line 253, in __init__
    validated_self = self.__pydantic_validator__.validate_python(data, self_instance=self)
pydantic_core._pydantic_core.ValidationError: 1 validation error for OpenAIEmbeddings
openai_api_key
  Extra inputs are not permitted [type=extra_forbidden, input_value='sk-proj-xc7byHaIfHgPJpCb...QExXYBRZuFZonNzN9O1y2cA', input_type=str]
    For further information visit https://errors.pydantic.dev/2.11/v/extra_forbidden


In [None]:
# Jupyter 노트북에서 대화형으로 실행하기 위한 코드

def run_benchmark_interactively():
    """Jupyter 노트북에서 대화형으로 벤치마크 실행"""
    # 환경 변수 로드
    load_dotenv()
    
    # 환경 설정
    openai_api_key = os.getenv("OPENAI_API_KEY")
    neo4j_uri = os.getenv("NEO4J_URI")
    neo4j_username = os.getenv("NEO4J_USERNAME")
    neo4j_password = os.getenv("NEO4J_PASSWORD")
    
    # 필수 환경 변수 확인
    missing_vars = []
    if not openai_api_key:
        missing_vars.append("OPENAI_API_KEY")
    if not neo4j_uri:
        missing_vars.append("NEO4J_URI")
    if not neo4j_username:
        missing_vars.append("NEO4J_USERNAME")
    if not neo4j_password:
        missing_vars.append("NEO4J_PASSWORD")
        
    if missing_vars:
        print(f"오류: 다음 환경 변수가 설정되지 않았습니다: {', '.join(missing_vars)}")
        print("노트북을 실행하기 전 .env 파일에 모든 환경 변수를 설정해주세요.")
        return None
    
    # 파이프라인 초기화
    pipeline = LegalRAGPipeline(
        openai_api_key,
        neo4j_uri,
        neo4j_username,
        neo4j_password
    )
    
    print("한국 형법 RAG 에이전트 시스템이 준비되었습니다.")
    return pipeline

# 파이프라인 객체 생성
pipeline = run_benchmark_interactively()

if pipeline:
    # 데이터 파일 확인 및 경로 설정
    questions_csv_path = './dataset/CriminalLawtest.csv'
    pdf_path = './dataset/criminal-law.pdf'
    precedent_dir = './dataset/precedent_label/'
    
    files_exist = True
    if not os.path.exists(questions_csv_path):
        print(f"경고: 질문 파일을 찾을 수 없습니다: {questions_csv_path}")
        files_exist = False
    if not os.path.exists(pdf_path):
        print(f"경고: 형법 PDF 파일을 찾을 수 없습니다: {pdf_path}")
    if not os.path.exists(precedent_dir) or not os.path.isdir(precedent_dir):
        print(f"경고: 판례 디렉토리를 찾을 수 없습니다: {precedent_dir}")
    
    if files_exist:
        print("\n다음 중 실행할 작업을 선택하세요:")
        print("1. 전체 파이프라인 실행 (데이터 로드 포함)")
        print("2. 데이터 로드 건너뛰고 벤치마크만 실행 (Neo4j에 이미 데이터가 있는 경우)")
        print("3. 특정 질문에 대해 RAG 테스트 실행")
        
        # Jupyter에서는 화면에 직접 출력하고, 다음 셀에서 사용자 입력을 받습니다.
        print("\n다음 셀에서 선택한 옵션에 해당하는 코드를 실행하세요.")
    else:
        print("\n필요한 파일이 없습니다. 파일을 준비한 후 다시 시도하세요.")

In [15]:
# 옵션 2: 데이터 로드 건너뛰고 벤치마크만 실행
def run_benchmark_only():
    if not pipeline:
        print("파이프라인이 초기화되지 않았습니다.")
        return
        
    questions_csv_path = './dataset/CriminalLawtest.csv'
    output_dir = "results"
    
    print("=== 벤치마크만 실행 시작 (데이터 로드 건너뜀) ===")
    summary = pipeline.run_pipeline(
        questions_csv_path,
        output_dir,
        skip_data_loading=True  # 데이터 로드 건너뜀
    )
    
    if "error" in summary:
        print(f"오류 발생: {summary['error']}")
    else:
        print("\n=== 실행 결과 요약 ===")
        print(f"정확도: {summary['accuracy']:.4f} ({summary['correct_answers']}/{summary['processed_questions']})")
        print(f"총 실행 시간: {summary['execution_time_formatted']}")
        
    return summary

# 옵션 2 실행
summary = run_benchmark_only()  # 실행하려면 주석 해제

NameError: name 'pipeline' is not defined