# Part 6: Text2Cypher + 하이브리드 RAG 파이프라인

**소요시간**: 약 2시간  
**목표**: 자연어 질문 → Cypher 쿼리 자동 생성 → 자가 교정 → 하이브리드 검색 통합  
**도메인**: 제조 (브레이크패드 생산 공정)

---

## 학습 내용

| 섹션 | 내용 | 핵심 기술 |
|------|------|----------|
| 1 | 환경 설정 | LangChain + Neo4j 연결 |
| 2 | 기본 Text2Cypher | GraphCypherQAChain |
| 3 | Few-shot 정확도 향상 | 예시 기반 프롬프트 |
| 4 | Agent 기반 자가 수정 | 6단계 검증 + 재시도 |
| 5 | 하이브리드 검색 | Vector + Graph + RRF |
| 6 | 전체 파이프라인 통합 | 의도 분류 → 검색 → 답변 |
| 7 | 연습 문제 | 실전 과제 |

---

## 1. 환경 설정

LangChain, Neo4j 연결을 설정하고 기존 KG 스키마를 로드합니다.

In [None]:
# 필수 패키지 설치 확인
# pip install langchain-neo4j langchain-openai openai neo4j python-dotenv pandas

In [None]:
import os
import json
import time
from dotenv import load_dotenv
from openai import OpenAI
from neo4j import GraphDatabase
import pandas as pd

# .env 파일에서 환경변수 로드
load_dotenv()

# 환경변수
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
NEO4J_URI = os.getenv("NEO4J_URI", "bolt://localhost:7687")
NEO4J_USER = os.getenv("NEO4J_USERNAME", "neo4j")
NEO4J_PASS = os.getenv("NEO4J_PASSWORD", "graphrag2024")

# OpenAI 클라이언트
openai_client = OpenAI(api_key=OPENAI_API_KEY)

# Neo4j 드라이버
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASS))

# 연결 테스트
with driver.session() as session:
    result = session.run("RETURN 1 AS test")
    print(f"Neo4j 연결 성공: {result.single()['test']}")

print(f"OpenAI API 키 설정 완료: {OPENAI_API_KEY[:8]}...")

In [None]:
# ============================================================
# LangChain Neo4j 그래프 연결 + 스키마 자동 추출
# ============================================================

from langchain_neo4j import Neo4jGraph

graph = Neo4jGraph(
    url=NEO4J_URI,
    username=NEO4J_USER,
    password=NEO4J_PASS
)

# 스키마 자동 추출
graph.refresh_schema()

print("=== Neo4j 그래프 스키마 ===")
print(graph.schema)
print("\n스키마 로드 완료")

In [None]:
# ============================================================
# 기존 데이터 확인 (Part 5에서 구축한 KG)
# Part 5를 실행하지 않았다면, 아래 셀에서 샘플 데이터를 생성합니다
# ============================================================

with driver.session() as session:
    result = session.run("MATCH (n) RETURN count(n) AS total_nodes")
    total = result.single()["total_nodes"]
    
    if total == 0:
        print("그래프가 비어 있습니다. 샘플 데이터를 생성합니다...")
        NEED_SAMPLE_DATA = True
    else:
        print(f"기존 그래프 노드: {total}개")
        NEED_SAMPLE_DATA = False
        
        # 노드/관계 요약
        stats = session.run("""
            MATCH (n)
            RETURN labels(n)[0] AS label, count(n) AS cnt
            ORDER BY cnt DESC
        """)
        for r in stats:
            print(f"  {r['label']}: {r['cnt']}개")

In [None]:
# ============================================================
# 샘플 데이터 생성 (Part 5 미수행 시)
# 브레이크패드 제조 KG 기본 구조
# ============================================================

if NEED_SAMPLE_DATA:
    sample_cypher = """
    // 공정 노드
    CREATE (p1:Process {name: '원재료 배합', temp_range: '20~30C', duration: '30분'})
    CREATE (p2:Process {name: '프리폼 성형', temp_range: '80~100C', pressure: '50 MPa', duration: '15분'})
    CREATE (p3:Process {name: '열압착 성형', temp_range: '150~180C', pressure: '100 MPa', duration: '20분'})
    CREATE (p4:Process {name: '열처리 경화', temp_range: '200~250C', duration: '4시간'})
    CREATE (p5:Process {name: '연삭 가공', duration: '10분'})
    CREATE (p6:Process {name: '도장 코팅', temp_range: '60~80C', duration: '30분'})
    
    // 공정 순서
    CREATE (p1)-[:NEXT]->(p2)
    CREATE (p2)-[:NEXT]->(p3)
    CREATE (p3)-[:NEXT]->(p4)
    CREATE (p4)-[:NEXT]->(p5)
    CREATE (p5)-[:NEXT]->(p6)
    
    // 장비 노드
    CREATE (e1:Equipment {name: 'MX-200 혼합기', type: '혼합'})
    CREATE (e2:Equipment {name: 'HP-01 유압프레스', type: '프레스'})
    CREATE (e3:Equipment {name: 'HP-02 열압착기', type: '열압착'})
    CREATE (e4:Equipment {name: 'HT-500 열처리로', type: '열처리'})
    CREATE (e5:Equipment {name: 'GR-100 연삭기', type: '연삭'})
    CREATE (e6:Equipment {name: 'CT-300 도장설비', type: '도장'})
    
    // 공정-장비 관계
    CREATE (p1)-[:USES_EQUIPMENT]->(e1)
    CREATE (p2)-[:USES_EQUIPMENT]->(e2)
    CREATE (p3)-[:USES_EQUIPMENT]->(e3)
    CREATE (p4)-[:USES_EQUIPMENT]->(e4)
    CREATE (p5)-[:USES_EQUIPMENT]->(e5)
    CREATE (p6)-[:USES_EQUIPMENT]->(e6)
    
    // 원재료 노드
    CREATE (m1:Material {name: '페놀수지', supplier: '한화솔루션', heat_resistance: '250C'})
    CREATE (m2:Material {name: '아라미드 섬유', supplier: '코오롱인더스트리', heat_resistance: '400C'})
    CREATE (m3:Material {name: '마찰재 분말', supplier: '상신브레이크', heat_resistance: '350C'})
    CREATE (m4:Material {name: '철판 백플레이트', supplier: '포스코', heat_resistance: '800C'})
    CREATE (m5:Material {name: '접착제', supplier: '헨켈코리아', heat_resistance: '200C'})
    
    // 원재료-공정 관계
    CREATE (p1)-[:USES_MATERIAL]->(m1)
    CREATE (p1)-[:USES_MATERIAL]->(m2)
    CREATE (p1)-[:USES_MATERIAL]->(m3)
    CREATE (p3)-[:USES_MATERIAL]->(m4)
    CREATE (p3)-[:USES_MATERIAL]->(m5)
    
    // 결함 노드
    CREATE (d1:Defect {name: '접착 박리', severity: 'high', description: '마찰재와 백플레이트 분리'})
    CREATE (d2:Defect {name: '크랙 발생', severity: 'medium', description: '마찰재 표면 균열'})
    CREATE (d3:Defect {name: '두께 불량', severity: 'low', description: '규격 초과 두께 편차'})
    
    // 결함-공정 관계
    CREATE (d1)-[:CAUSED_BY]->(p3)
    CREATE (d2)-[:CAUSED_BY]->(p4)
    CREATE (d3)-[:CAUSED_BY]->(p5)
    
    // 품질 검사 노드
    CREATE (qi1:QualityInspection {name: '마찰계수 검사', standard: '0.35~0.45', result: '0.41', judgment: 'pass'})
    CREATE (qi2:QualityInspection {name: '접착 강도 검사', standard: '>=15 MPa', result: '12.3 MPa', judgment: 'fail'})
    CREATE (qi3:QualityInspection {name: '경도 검사', standard: '40~60 HRR', result: '52', judgment: 'pass'})
    
    // 품질 검사-결함 관계
    CREATE (qi2)-[:INDICATES]->(d1)
    """
    
    with driver.session() as session:
        session.run(sample_cypher)
        print("샘플 데이터 생성 완료")
        
        result = session.run("MATCH (n) RETURN count(n) AS cnt")
        print(f"전체 노드: {result.single()['cnt']}개")
    
    # 스키마 갱신
    graph.refresh_schema()
    print(f"\n스키마 갱신 완료")
else:
    print("기존 데이터 사용. 샘플 생성 건너뜀.")

---

## 2. 기본 Text2Cypher

LangChain의 `GraphCypherQAChain`을 사용하여 자연어 질문을 Cypher로 변환합니다.  
`verbose=True`로 생성된 Cypher 쿼리를 확인합니다.

In [None]:
# ============================================================
# GraphCypherQAChain 기본 설정
# ============================================================

from langchain_neo4j import GraphCypherQAChain
from langchain_openai import ChatOpenAI

# LLM 설정
llm = ChatOpenAI(
    model="gpt-4o",
    temperature=0,
    api_key=OPENAI_API_KEY
)

# 기본 Text2Cypher 체인
basic_chain = GraphCypherQAChain.from_llm(
    llm=llm,
    graph=graph,
    verbose=True,          # 생성된 Cypher 확인
    return_intermediate_steps=True,  # 중간 단계 반환
    allow_dangerous_requests=True    # Neo4j 쿼리 실행 허용
)

print("기본 GraphCypherQAChain 설정 완료")
print(f"사용 모델: gpt-4o")
print(f"스키마 길이: {len(graph.schema)} 문자")

In [None]:
# ============================================================
# 간단한 질문으로 테스트
# ============================================================

test_questions_basic = [
    "어떤 장비가 있나요?",
    "브레이크패드 생산 공정은?",
]

basic_results = []

for question in test_questions_basic:
    print(f"\n{'='*60}")
    print(f"Q: {question}")
    print('='*60)
    
    try:
        result = basic_chain.invoke({"query": question})
        
        # 생성된 Cypher 추출
        cypher = result.get("intermediate_steps", [{}])[0].get("query", "N/A")
        answer = result.get("result", "N/A")
        
        print(f"\n생성된 Cypher: {cypher}")
        print(f"답변: {answer}")
        
        basic_results.append({
            "question": question,
            "cypher": cypher,
            "answer": answer[:100],
            "status": "success"
        })
    except Exception as e:
        print(f"오류: {e}")
        basic_results.append({
            "question": question,
            "cypher": "N/A",
            "answer": str(e)[:100],
            "status": "fail"
        })

In [None]:
# ============================================================
# 성공/실패 케이스 분석
# ============================================================

analysis_questions = [
    {"q": "접착 박리의 원인 공정은?", "expected": "열압착 성형", "difficulty": "쉬움"},
    {"q": "전체 공정 순서를 보여줘", "expected": "배합→성형→압착→경화→연삭→도장", "difficulty": "보통"},
    {"q": "HP-02 열압착기를 사용하는 공정은?", "expected": "열압착 성형", "difficulty": "쉬움"},
    {"q": "접착 강도 검사가 불합격인 이유와 관련 공정은?", "expected": "열압착 성형, 접착 박리", "difficulty": "어려움"},
    {"q": "마찰재 분말의 내열온도보다 높은 온도로 운영되는 공정은?", "expected": "열처리 경화 (200~250C > 350C 아님)", "difficulty": "매우 어려움"},
]

analysis_results = []

for item in analysis_questions:
    print(f"\nQ: {item['q']} [난이도: {item['difficulty']}]")
    try:
        result = basic_chain.invoke({"query": item["q"]})
        cypher = result.get("intermediate_steps", [{}])[0].get("query", "N/A")
        answer = result.get("result", "N/A")
        
        # 간단한 정확도 판정 (기대값 포함 여부)
        expected_keywords = item["expected"].split(", ")
        hit = any(kw in answer for kw in expected_keywords)
        status = "success" if hit else "partial"
        
        print(f"  Cypher: {cypher[:80]}...")
        print(f"  답변: {answer[:100]}")
        print(f"  판정: {'O' if hit else 'X'} (기대: {item['expected']})")
        
        analysis_results.append({
            "질문": item["q"][:30],
            "난이도": item["difficulty"],
            "기대값": item["expected"][:20],
            "결과": "O" if hit else "X"
        })
    except Exception as e:
        print(f"  오류: {e}")
        analysis_results.append({
            "질문": item["q"][:30],
            "난이도": item["difficulty"],
            "기대값": item["expected"][:20],
            "결과": "오류"
        })

print("\n=== 기본 Text2Cypher 결과 요약 ===")
display(pd.DataFrame(analysis_results))

---

## 3. Few-shot으로 정확도 향상

질문-Cypher 쌍 예시를 프롬프트에 추가하면 LLM이 더 정확한 쿼리를 생성합니다.  
동일한 질문을 Few-shot 유무로 비교합니다.

In [None]:
# ============================================================
# Few-shot 예시 정의 (5개)
# ============================================================

few_shot_examples = [
    {
        "question": "어떤 장비들이 있나요?",
        "cypher": "MATCH (e:Equipment) RETURN e.name AS equipment_name, e.type AS equipment_type"
    },
    {
        "question": "브레이크패드 생산 공정 순서는?",
        "cypher": "MATCH path = (p1:Process)-[:NEXT*]->(pN:Process) WHERE NOT ()-[:NEXT]->(p1) RETURN [n IN nodes(path) | n.name] AS process_order"
    },
    {
        "question": "접착 박리 결함의 원인 공정은?",
        "cypher": "MATCH (d:Defect {name: '접착 박리'})-[:CAUSED_BY]->(p:Process) RETURN p.name AS cause_process, p.temp_range AS temp_range"
    },
    {
        "question": "열압착 성형 공정에서 사용하는 장비와 재료는?",
        "cypher": "MATCH (p:Process {name: '열압착 성형'}) OPTIONAL MATCH (p)-[:USES_EQUIPMENT]->(e:Equipment) OPTIONAL MATCH (p)-[:USES_MATERIAL]->(m:Material) RETURN p.name AS process, collect(DISTINCT e.name) AS equipment, collect(DISTINCT m.name) AS materials"
    },
    {
        "question": "품질 검사 불합격 항목과 관련 결함은?",
        "cypher": "MATCH (qi:QualityInspection) WHERE qi.judgment = 'fail' OPTIONAL MATCH (qi)-[:INDICATES]->(d:Defect) RETURN qi.name AS inspection, qi.result AS result, qi.standard AS standard, d.name AS related_defect"
    }
]

print("Few-shot 예시 5개 정의 완료:")
for i, ex in enumerate(few_shot_examples, 1):
    print(f"  {i}. {ex['question']}")
    print(f"     → {ex['cypher'][:70]}...")

In [None]:
# ============================================================
# Few-shot 프롬프트를 포함한 Text2Cypher 체인 구성
# ============================================================

from langchain_core.prompts import FewShotPromptTemplate, PromptTemplate

# 개별 예시 템플릿
example_template = PromptTemplate(
    input_variables=["question", "cypher"],
    template="질문: {question}\nCypher: {cypher}"
)

# Few-shot 프롬프트 구성
fewshot_prompt = FewShotPromptTemplate(
    examples=few_shot_examples,
    example_prompt=example_template,
    prefix="""당신은 Neo4j Cypher 쿼리 전문가입니다.
주어진 그래프 스키마를 참고하여 자연어 질문을 Cypher 쿼리로 변환하세요.

스키마:
{schema}

규칙:
- Cypher 쿼리만 반환하세요 (설명 없이)
- 문자열 매칭은 CONTAINS를 사용하세요
- 노드 라벨과 관계 타입을 스키마에서 정확히 참조하세요
- 결과는 한국어로 의미 있는 alias를 사용하세요

아래는 예시입니다:
""",
    suffix="질문: {question}\nCypher:",
    input_variables=["schema", "question"]
)

# Few-shot 체인 생성
fewshot_chain = GraphCypherQAChain.from_llm(
    llm=llm,
    graph=graph,
    cypher_prompt=fewshot_prompt,
    verbose=True,
    return_intermediate_steps=True,
    allow_dangerous_requests=True
)

print("Few-shot GraphCypherQAChain 설정 완료")

In [None]:
# ============================================================
# 동일 질문 재테스트: 기본 vs Few-shot 비교
# ============================================================

comparison_questions = [
    "접착 박리의 원인 공정은?",
    "전체 공정 순서를 보여줘",
    "접착 강도 검사가 불합격인 이유와 관련 공정은?",
    "HP-02 열압착기를 사용하는 공정은?",
]

comparison_results = []

for question in comparison_questions:
    print(f"\n{'='*60}")
    print(f"Q: {question}")
    
    # 기본 체인
    try:
        basic_res = basic_chain.invoke({"query": question})
        basic_cypher = basic_res.get("intermediate_steps", [{}])[0].get("query", "N/A")
        basic_answer = basic_res.get("result", "N/A")[:80]
        basic_status = "O"
    except Exception as e:
        basic_cypher = "오류"
        basic_answer = str(e)[:80]
        basic_status = "X"
    
    # Few-shot 체인
    try:
        fs_res = fewshot_chain.invoke({"query": question})
        fs_cypher = fs_res.get("intermediate_steps", [{}])[0].get("query", "N/A")
        fs_answer = fs_res.get("result", "N/A")[:80]
        fs_status = "O"
    except Exception as e:
        fs_cypher = "오류"
        fs_answer = str(e)[:80]
        fs_status = "X"
    
    comparison_results.append({
        "질문": question[:25],
        "기본_결과": basic_status,
        "기본_Cypher": basic_cypher[:50],
        "Fewshot_결과": fs_status,
        "Fewshot_Cypher": fs_cypher[:50]
    })

print("\n=== 기본 vs Few-shot 비교 ===")
display(pd.DataFrame(comparison_results))

---

## 4. Agent 기반 자가 수정 Text2Cypher (핵심)

LLM이 생성한 Cypher 쿼리를 **6단계로 검증**하고, 실패 시 **최대 3회 재생성**합니다.

### 6단계 검증

| 단계 | 검증 항목 | 설명 |
|------|----------|------|
| 1 | 구문 (Syntax) | Cypher 문법 오류 확인 |
| 2 | 노드 라벨 (Labels) | 스키마에 존재하는 라벨인지 |
| 3 | 관계 타입 (Relations) | 스키마에 존재하는 관계인지 |
| 4 | 속성 (Properties) | 노드/관계에 존재하는 속성인지 |
| 5 | 방향 (Direction) | 관계 방향이 올바른지 |
| 6 | 집계 (Aggregation) | COUNT/SUM 등 올바른 사용 |

In [None]:
# ============================================================
# 스키마 정보 추출 (검증에 사용)
# ============================================================

import re

def extract_schema_info():
    """Neo4j에서 노드 라벨, 관계 타입, 속성 정보를 추출합니다."""
    schema_info = {"labels": set(), "rel_types": set(), "properties": {}}
    
    with driver.session() as session:
        # 노드 라벨
        result = session.run("CALL db.labels()")
        schema_info["labels"] = {r["label"] for r in result}
        
        # 관계 타입
        result = session.run("CALL db.relationshipTypes()")
        schema_info["rel_types"] = {r["relationshipType"] for r in result}
        
        # 각 라벨별 속성
        for label in schema_info["labels"]:
            result = session.run(f"MATCH (n:{label}) WITH keys(n) AS ks UNWIND ks AS k RETURN DISTINCT k")
            schema_info["properties"][label] = {r["k"] for r in result}
    
    return schema_info

schema_info = extract_schema_info()

print("=== 스키마 정보 ===")
print(f"노드 라벨: {schema_info['labels']}")
print(f"관계 타입: {schema_info['rel_types']}")
print(f"속성:")
for label, props in schema_info['properties'].items():
    print(f"  {label}: {props}")

In [None]:
# ============================================================
# 6단계 검증 함수
# ============================================================

def validate_cypher(cypher: str, schema: dict) -> dict:
    """생성된 Cypher 쿼리를 6단계로 검증합니다."""
    errors = []
    warnings = []
    
    # 1. 구문 검사 (기본적인 패턴 체크)
    if not cypher.strip():
        errors.append("[구문] 빈 쿼리입니다.")
    elif not any(kw in cypher.upper() for kw in ["MATCH", "RETURN", "CREATE", "MERGE"]):
        errors.append("[구문] MATCH/RETURN/CREATE/MERGE 키워드가 없습니다.")
    
    # 괄호 균형 검사
    if cypher.count('(') != cypher.count(')'):
        errors.append(f"[구문] 괄호 불균형: '(' {cypher.count('(')}개, ')' {cypher.count(')')}개")
    if cypher.count('[') != cypher.count(']'):
        errors.append(f"[구문] 대괄호 불균형: '[' {cypher.count('[')}개, ']' {cypher.count(']')}개")
    
    # 2. 노드 라벨 검사
    label_pattern = re.findall(r':\s*([A-Z][a-zA-Z]*)', cypher)
    for label in label_pattern:
        # 관계 타입이 아닌 경우만 (대문자+소문자 혼합 = 노드 라벨)
        if label != label.upper() and label not in schema["labels"]:
            errors.append(f"[라벨] '{label}'은 스키마에 없는 노드 라벨입니다. 가능한 라벨: {schema['labels']}")
    
    # 3. 관계 타입 검사
    rel_pattern = re.findall(r'\[\w*:([A-Z_]+)', cypher)
    for rel in rel_pattern:
        if rel not in schema["rel_types"]:
            errors.append(f"[관계] '{rel}'은 스키마에 없는 관계 타입입니다. 가능한 관계: {schema['rel_types']}")
    
    # 4. 속성 검사 (노드 라벨이 식별 가능한 경우)
    prop_pattern = re.findall(r'(\w+)\.(\w+)', cypher)
    for var, prop in prop_pattern:
        if prop in ['name', 'id']:  # 공통 속성은 스킵
            continue
        # 알려진 속성인지 확인
        all_props = set()
        for label_props in schema["properties"].values():
            all_props.update(label_props)
        if prop not in all_props and prop not in ['count', 'type', 'labels']:
            warnings.append(f"[속성] '{var}.{prop}'은 알려지지 않은 속성입니다.")
    
    # 5. 방향 검사 (기본 패턴 확인)
    if '<-' in cypher and '->' in cypher:
        warnings.append("[방향] 양방향 관계 패턴 감지. 의도적인지 확인하세요.")
    
    # 6. 집계 검사
    has_agg = any(fn in cypher.upper() for fn in ['COUNT(', 'SUM(', 'AVG(', 'COLLECT('])
    has_return = 'RETURN' in cypher.upper()
    if has_agg and has_return:
        # 집계와 비집계 컬럼 혼합 확인 (간단한 휴리스틱)
        return_clause = cypher.upper().split('RETURN')[-1]
        if ',' in return_clause:
            warnings.append("[집계] 집계 함수와 비집계 컬럼이 혼합되어 있습니다. GROUP BY 확인 필요.")
    
    return {
        "valid": len(errors) == 0,
        "errors": errors,
        "warnings": warnings
    }


# 테스트
test_cypher = "MATCH (d:Defect)-[:CAUSED_BY]->(p:Process) RETURN d.name, p.name"
validation = validate_cypher(test_cypher, schema_info)
print(f"테스트 쿼리 검증: {validation}")

In [None]:
# ============================================================
# 자가 수정 Text2Cypher Agent
# generate → validate → correct → execute (최대 3회 재시도)
# ============================================================

class Text2CypherAgent:
    """자가 수정 기능을 가진 Text2Cypher Agent"""
    
    def __init__(self, openai_client, driver, schema_info, graph_schema_text, max_retries=3):
        self.client = openai_client
        self.driver = driver
        self.schema_info = schema_info
        self.graph_schema = graph_schema_text
        self.max_retries = max_retries
        self.few_shots = few_shot_examples  # 위에서 정의한 Few-shot 예시
    
    def generate(self, question: str, error_context: str = "") -> str:
        """자연어 질문 → Cypher 쿼리 생성"""
        
        # Few-shot 예시 구성
        examples_text = "\n".join([
            f"질문: {ex['question']}\nCypher: {ex['cypher']}\n"
            for ex in self.few_shots
        ])
        
        # 에러 컨텍스트 (재시도 시)
        error_section = ""
        if error_context:
            error_section = f"\n이전 시도에서 다음 오류가 발생했습니다. 이를 수정하여 다시 생성하세요:\n{error_context}\n"
        
        prompt = f"""당신은 Neo4j Cypher 쿼리 전문가입니다.
주어진 그래프 스키마를 참고하여 자연어 질문을 Cypher 쿼리로 변환하세요.

스키마:
{self.graph_schema}

규칙:
- Cypher 쿼리만 반환하세요 (```나 설명 없이 순수 Cypher만)
- 문자열 매칭은 CONTAINS를 사용하세요
- 노드 라벨과 관계 타입을 스키마에서 정확히 참조하세요

예시:
{examples_text}
{error_section}
질문: {question}
Cypher:"""
        
        response = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        cypher = response.choices[0].message.content.strip()
        # 코드 블록 제거
        cypher = cypher.replace("```cypher", "").replace("```", "").strip()
        return cypher
    
    def validate(self, cypher: str) -> dict:
        """6단계 검증"""
        return validate_cypher(cypher, self.schema_info)
    
    def correct(self, cypher: str, errors: list) -> str:
        """LLM 기반 교정"""
        error_text = "\n".join(errors)
        
        prompt = f"""다음 Cypher 쿼리에 오류가 있습니다. 수정하세요.

원본 쿼리:
{cypher}

발견된 오류:
{error_text}

스키마:
{self.graph_schema}

수정된 Cypher 쿼리만 반환하세요 (설명 없이):"""
        
        response = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        corrected = response.choices[0].message.content.strip()
        corrected = corrected.replace("```cypher", "").replace("```", "").strip()
        return corrected
    
    def execute(self, cypher: str) -> list:
        """Neo4j에서 실행"""
        with self.driver.session() as session:
            result = session.run(cypher)
            return [dict(r) for r in result]
    
    def run(self, question: str) -> dict:
        """전체 파이프라인: generate → validate → correct → execute"""
        log = []
        
        # 1단계: 생성
        cypher = self.generate(question)
        log.append({"step": "generate", "cypher": cypher})
        
        for attempt in range(self.max_retries):
            # 2단계: 검증
            validation = self.validate(cypher)
            log.append({"step": "validate", "attempt": attempt + 1, "result": validation})
            
            if not validation["valid"]:
                # 3단계: 교정
                cypher = self.correct(cypher, validation["errors"])
                log.append({"step": "correct", "attempt": attempt + 1, "cypher": cypher})
                continue
            
            # 4단계: 실행
            try:
                results = self.execute(cypher)
                log.append({"step": "execute", "success": True, "result_count": len(results)})
                return {
                    "question": question,
                    "cypher": cypher,
                    "results": results,
                    "attempts": attempt + 1,
                    "success": True,
                    "log": log
                }
            except Exception as e:
                # 실행 오류 → 재생성
                error_msg = str(e)
                log.append({"step": "execute", "success": False, "error": error_msg})
                cypher = self.generate(question, error_context=f"실행 오류: {error_msg}")
                log.append({"step": "regenerate", "attempt": attempt + 1, "cypher": cypher})
        
        return {
            "question": question,
            "cypher": cypher,
            "results": [],
            "attempts": self.max_retries,
            "success": False,
            "log": log
        }


# Agent 인스턴스 생성
agent = Text2CypherAgent(
    openai_client=openai_client,
    driver=driver,
    schema_info=schema_info,
    graph_schema_text=graph.schema,
    max_retries=3
)

print("Text2Cypher Agent 생성 완료 (최대 재시도: 3회)")

In [None]:
# ============================================================
# Agent 테스트: 자가 수정 전/후 성공률 비교
# ============================================================

agent_test_questions = [
    "접착 박리의 원인 공정은?",
    "전체 공정 순서를 보여줘",
    "HP-02 열압착기를 사용하는 공정은?",
    "접착 강도 검사가 불합격인 이유와 관련 공정은?",
    "원재료 배합에 사용되는 재료의 내열온도는?",
]

agent_results = []

for question in agent_test_questions:
    print(f"\n{'='*60}")
    print(f"Q: {question}")
    print('='*60)
    
    result = agent.run(question)
    
    print(f"  Cypher: {result['cypher'][:80]}")
    print(f"  시도 횟수: {result['attempts']}")
    print(f"  성공: {result['success']}")
    if result['results']:
        print(f"  결과 ({len(result['results'])}건):")
        for r in result['results'][:3]:
            print(f"    {r}")
    
    agent_results.append({
        "질문": question[:25],
        "성공": "O" if result["success"] else "X",
        "시도횟수": result["attempts"],
        "결과수": len(result["results"])
    })

print("\n=== Agent Text2Cypher 결과 요약 ===")
df_agent = pd.DataFrame(agent_results)
display(df_agent)

success_rate = sum(1 for r in agent_results if r["성공"] == "O") / len(agent_results) * 100
print(f"\nAgent 성공률: {success_rate:.0f}%")

---

## 5. 하이브리드 검색

Vector 검색(임베딩 유사도)과 Graph 검색(Cypher 쿼리)을 결합합니다.  
RRF(Reciprocal Rank Fusion)로 두 결과를 통합합니다.

In [None]:
# ============================================================
# 노드에 벡터 임베딩 추가
# ============================================================

def get_embedding(text: str) -> list:
    """OpenAI 임베딩 생성"""
    response = openai_client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    return response.data[0].embedding


# 주요 노드에 임베딩 추가
with driver.session() as session:
    # Process 노드
    processes = session.run("MATCH (p:Process) RETURN p.name AS name").data()
    for proc in processes:
        emb = get_embedding(f"제조 공정: {proc['name']}")
        session.run(
            "MATCH (p:Process {name: $name}) SET p.embedding = $emb",
            name=proc["name"], emb=emb
        )
    print(f"Process 노드 {len(processes)}개 임베딩 완료")
    
    # Defect 노드
    defects = session.run("MATCH (d:Defect) RETURN d.name AS name, d.description AS desc").data()
    for defect in defects:
        text = f"결함: {defect['name']}. {defect.get('desc', '')}"
        emb = get_embedding(text)
        session.run(
            "MATCH (d:Defect {name: $name}) SET d.embedding = $emb",
            name=defect["name"], emb=emb
        )
    print(f"Defect 노드 {len(defects)}개 임베딩 완료")
    
    # Equipment 노드
    equips = session.run("MATCH (e:Equipment) RETURN e.name AS name, e.type AS type").data()
    for eq in equips:
        text = f"장비: {eq['name']}. 유형: {eq.get('type', '')}"
        emb = get_embedding(text)
        session.run(
            "MATCH (e:Equipment {name: $name}) SET e.embedding = $emb",
            name=eq["name"], emb=emb
        )
    print(f"Equipment 노드 {len(equips)}개 임베딩 완료")

print("\n전체 임베딩 추가 완료")

In [None]:
# ============================================================
# 3가지 검색 방식 구현
# ============================================================

import numpy as np


def cosine_similarity(a: list, b: list) -> float:
    """코사인 유사도 계산"""
    a, b = np.array(a), np.array(b)
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))


def vector_search(question: str, top_k: int = 5) -> list:
    """Vector 검색: 임베딩 유사도 기반"""
    q_emb = get_embedding(question)
    
    with driver.session() as session:
        # 임베딩이 있는 모든 노드에서 검색
        result = session.run("""
            MATCH (n)
            WHERE n.embedding IS NOT NULL
            RETURN labels(n)[0] AS label, n.name AS name, 
                   n.embedding AS embedding,
                   n.description AS description
        """)
        
        scored = []
        for r in result:
            sim = cosine_similarity(q_emb, r["embedding"])
            scored.append({
                "label": r["label"],
                "name": r["name"],
                "description": r.get("description", ""),
                "score": sim,
                "source": "vector"
            })
        
        scored.sort(key=lambda x: x["score"], reverse=True)
        return scored[:top_k]


def graph_search(question: str) -> list:
    """Graph 검색: Cypher 쿼리 기반 (Agent 사용)"""
    result = agent.run(question)
    
    if result["success"] and result["results"]:
        return [{
            **r, 
            "source": "graph",
            "cypher": result["cypher"]
        } for r in result["results"]]
    return []


def rrf_fusion(vector_results: list, graph_results: list, k: int = 60) -> list:
    """RRF (Reciprocal Rank Fusion) 결합"""
    scores = {}  # name -> score
    
    # Vector 결과에 RRF 점수 부여
    for rank, item in enumerate(vector_results):
        name = item.get("name", str(item))
        scores[name] = scores.get(name, 0) + 1 / (k + rank + 1)
    
    # Graph 결과에 RRF 점수 부여
    for rank, item in enumerate(graph_results):
        # Graph 결과에서 이름 추출
        name = None
        for key in item:
            if isinstance(item[key], str) and key != "source" and key != "cypher":
                name = item[key]
                break
        if name:
            scores[name] = scores.get(name, 0) + 1 / (k + rank + 1)
    
    # 점수순 정렬
    fused = sorted(scores.items(), key=lambda x: x[1], reverse=True)
    return [{"name": name, "rrf_score": score} for name, score in fused]


def hybrid_search(question: str) -> dict:
    """하이브리드 검색: Vector + Graph + RRF"""
    v_results = vector_search(question)
    g_results = graph_search(question)
    fused = rrf_fusion(v_results, g_results)
    
    return {
        "vector": v_results,
        "graph": g_results,
        "hybrid": fused
    }


print("검색 함수 정의 완료: vector_search, graph_search, hybrid_search")

In [None]:
# ============================================================
# 3가지 방식 비교 테스트
# ============================================================

comparison_question = "접착 박리 결함의 원인과 관련 장비는?"

print(f"질문: {comparison_question}")
print("=" * 60)

# 1. Vector only
print("\n[1] Vector Only")
v_results = vector_search(comparison_question)
for r in v_results[:5]:
    print(f"  [{r['label']}] {r['name']} (유사도: {r['score']:.4f})")

# 2. Graph only
print("\n[2] Graph Only")
g_results = graph_search(comparison_question)
for r in g_results[:5]:
    filtered = {k: v for k, v in r.items() if k not in ['source', 'cypher']}
    print(f"  {filtered}")

# 3. Hybrid (Vector + Graph + RRF)
print("\n[3] Hybrid (RRF Fusion)")
h_results = hybrid_search(comparison_question)
for r in h_results["hybrid"][:5]:
    print(f"  {r['name']} (RRF 점수: {r['rrf_score']:.6f})")

In [None]:
# ============================================================
# 응답 품질 비교표
# ============================================================

quality_questions = [
    "접착 박리 결함의 원인 공정은?",
    "열처리 경화에 사용하는 장비는?",
    "품질 검사 불합격 항목과 관련 결함은?",
]

quality_results = []

for question in quality_questions:
    print(f"\nQ: {question}")
    
    # Vector
    v = vector_search(question, top_k=3)
    v_answer = ", ".join([r["name"] for r in v]) if v else "결과 없음"
    
    # Graph
    g = graph_search(question)
    g_answer = str(g[0]) if g else "결과 없음"
    g_answer = g_answer[:60]
    
    # Hybrid
    h = hybrid_search(question)
    h_answer = ", ".join([r["name"] for r in h["hybrid"][:3]]) if h["hybrid"] else "결과 없음"
    
    quality_results.append({
        "질문": question[:20],
        "Vector": v_answer[:30],
        "Graph": g_answer[:30],
        "Hybrid": h_answer[:30]
    })

print("\n=== 검색 방식별 응답 품질 비교 ===")
display(pd.DataFrame(quality_results))

---

## 6. 전체 파이프라인 통합

질문 → 의도 분류 → 적절한 검색 전략 선택 → 답변 생성

In [None]:
# ============================================================
# 의도 분류기 (질문 유형 → 검색 전략)
# ============================================================

def classify_intent(question: str) -> str:
    """질문 의도를 분류하여 검색 전략을 결정합니다.
    
    Returns:
        'vector': 사실 확인, 유사도 기반
        'graph': 관계 추적, 구조적 질의
        'hybrid': 복합 질문
    """
    
    prompt = f"""다음 질문의 유형을 분류하세요.

질문: {question}

분류 기준:
- "vector": 특정 정보 검색, 설명 요청, 유사한 항목 찾기
  예: "접착제의 내열온도는?", "브레이크패드란 무엇인가?"
- "graph": 관계 추적, 경로 탐색, 연결된 항목 찾기
  예: "접착 박리의 원인 공정은?", "공정 순서를 보여줘"
- "hybrid": 복합 질문, 여러 소스 필요, 비교 분석
  예: "접착 강도 불합격의 원인과 예방 방법은?", "가장 위험한 결함과 관련 장비는?"

vector, graph, hybrid 중 하나만 반환하세요."""
    
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0
    )
    
    intent = response.choices[0].message.content.strip().lower()
    if intent not in ["vector", "graph", "hybrid"]:
        intent = "hybrid"  # 기본값
    return intent


# 테스트
test_intents = [
    "접착제의 내열온도는?",
    "접착 박리의 원인 공정은?",
    "접착 강도 불합격의 원인과 예방 방법은?"
]

for q in test_intents:
    intent = classify_intent(q)
    print(f"  [{intent:>7}] {q}")

In [None]:
# ============================================================
# 전체 GraphRAG 파이프라인
# ============================================================

def graphrag_pipeline(question: str) -> dict:
    """전체 GraphRAG 파이프라인
    질문 → 의도 분류 → 검색 → LLM 답변 생성
    """
    start_time = time.time()
    
    # 1. 의도 분류
    intent = classify_intent(question)
    
    # 2. 검색 전략 실행
    context_parts = []
    search_details = {}
    
    if intent == "vector":
        results = vector_search(question, top_k=5)
        context_parts = [f"[{r['label']}] {r['name']}: {r.get('description', '')} (유사도: {r['score']:.3f})" for r in results]
        search_details = {"strategy": "vector", "results_count": len(results)}
    
    elif intent == "graph":
        result = agent.run(question)
        if result["success"]:
            context_parts = [json.dumps(r, ensure_ascii=False) for r in result["results"]]
            search_details = {"strategy": "graph", "cypher": result["cypher"], "results_count": len(result["results"])}
        else:
            # Graph 실패 시 Vector로 fallback
            results = vector_search(question, top_k=5)
            context_parts = [f"[{r['label']}] {r['name']}: {r.get('description', '')}" for r in results]
            search_details = {"strategy": "graph→vector_fallback", "results_count": len(results)}
    
    else:  # hybrid
        h = hybrid_search(question)
        
        # Vector 컨텍스트
        for r in h["vector"][:3]:
            context_parts.append(f"[Vector] [{r['label']}] {r['name']}: {r.get('description', '')}")
        
        # Graph 컨텍스트
        for r in h["graph"][:3]:
            filtered = {k: v for k, v in r.items() if k not in ['source', 'cypher']}
            context_parts.append(f"[Graph] {json.dumps(filtered, ensure_ascii=False)}")
        
        search_details = {
            "strategy": "hybrid",
            "vector_count": len(h["vector"]),
            "graph_count": len(h["graph"]),
            "fused_count": len(h["hybrid"])
        }
    
    # 3. LLM 답변 생성
    context_text = "\n".join(context_parts) if context_parts else "관련 정보를 찾지 못했습니다."
    
    answer_prompt = f"""다음 정보를 참고하여 질문에 답변하세요.

검색된 정보:
{context_text}

질문: {question}

규칙:
- 검색된 정보만 활용하세요
- 정보가 부족하면 솔직히 말하세요
- 한국어로 답변하세요
- 간결하고 구체적으로 답변하세요"""
    
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": answer_prompt}],
        temperature=0
    )
    
    answer = response.choices[0].message.content
    elapsed = time.time() - start_time
    
    return {
        "question": question,
        "intent": intent,
        "answer": answer,
        "context": context_parts,
        "search_details": search_details,
        "elapsed_seconds": round(elapsed, 2)
    }


print("GraphRAG 파이프라인 정의 완료")

In [None]:
# ============================================================
# 10개 테스트 질문으로 end-to-end 테스트
# ============================================================

e2e_questions = [
    "접착 박리 결함의 원인 공정은?",
    "브레이크패드 생산 공정 순서는?",
    "HP-02 열압착기의 사용 공정과 온도 범위는?",
    "품질 검사에서 불합격된 항목은?",
    "원재료 배합에 사용되는 재료는?",
    "크랙 발생 결함의 원인과 관련 장비는?",
    "접착제의 내열온도와 공급사는?",
    "열처리 경화 공정의 스펙은?",
    "가장 심각한 결함은 무엇이고 어떻게 예방할 수 있나?",
    "포스코가 공급하는 재료의 용도는?",
]

e2e_results = []

for i, question in enumerate(e2e_questions, 1):
    print(f"\n[{i}/10] {question}")
    print("-" * 50)
    
    result = graphrag_pipeline(question)
    
    print(f"  의도: {result['intent']}")
    print(f"  검색: {result['search_details'].get('strategy', 'N/A')}")
    print(f"  답변: {result['answer'][:120]}...")
    print(f"  소요: {result['elapsed_seconds']}초")
    
    e2e_results.append({
        "번호": i,
        "질문": question[:20],
        "의도": result["intent"],
        "전략": result["search_details"].get("strategy", "N/A"),
        "응답시간(초)": result["elapsed_seconds"],
        "답변": result["answer"][:40]
    })

print("\n" + "=" * 60)
print("=== End-to-End 테스트 결과 ===")
display(pd.DataFrame(e2e_results))

# 평균 응답 시간
avg_time = sum(r["응답시간(초)"] for r in e2e_results) / len(e2e_results)
print(f"\n평균 응답 시간: {avg_time:.2f}초")

---

## 7. 연습 문제

### 연습 1: 자신만의 질문 5개 테스트

제조 도메인에 대한 질문을 직접 만들어 파이프라인으로 테스트하세요.

In [None]:
# ============================================================
# 연습 1: 자신만의 질문 5개 테스트
# ============================================================

my_questions = [
    # TODO: 자신만의 질문을 작성하세요
    "여기에 첫 번째 질문을 작성하세요",
    "여기에 두 번째 질문을 작성하세요",
    "여기에 세 번째 질문을 작성하세요",
    "여기에 네 번째 질문을 작성하세요",
    "여기에 다섯 번째 질문을 작성하세요",
]

# 힌트: 다양한 유형의 질문을 시도해 보세요
# - 사실 확인: "~의 스펙은?"
# - 관계 탐색: "~와 연결된 ~는?"
# - 복합 질의: "~이면서 ~인 것은?"
# - 추론: "~의 근본 원인은?"
# - 비교: "A와 B의 차이점은?"

print("TODO: 위 my_questions 리스트에 질문을 작성한 후 아래 코드를 실행하세요.")
print("")

# for question in my_questions:
#     result = graphrag_pipeline(question)
#     print(f"Q: {question}")
#     print(f"A: {result['answer'][:200]}")
#     print(f"  의도: {result['intent']}, 소요: {result['elapsed_seconds']}초")
#     print()

In [None]:
# ============================================================
# 연습 2: Few-shot 예시 추가/수정
# 기존 few_shot_examples에 새 예시를 추가하고 정확도 변화를 관찰하세요
# ============================================================

# TODO: 새로운 Few-shot 예시 추가
new_examples = [
    # {
    #     "question": "여기에 질문을 작성하세요",
    #     "cypher": "MATCH ... RETURN ..."
    # },
]

# 힌트: 기존에 실패했던 질문 유형에 대한 예시를 추가하면 효과적입니다
# 예시:
# - Multi-hop: "A → B → C 관계 탐색"
# - 집계: "가장 많은/적은 ~는?"
# - 조건: "~보다 높은/낮은 ~는?"

print("TODO: new_examples에 예시를 추가한 후,")
print("agent.few_shots 를 업데이트하고 동일 질문을 다시 테스트하세요.")
print("")
print("예시:")
print('  agent.few_shots = few_shot_examples + new_examples')
print('  result = agent.run("실패했던 질문")')

In [None]:
# ============================================================
# 정리: 리소스 종료
# ============================================================

driver.close()
print("Neo4j 드라이버 종료 완료")
print("")
print("=== Part 6 완료 ===")
print("")
print("학습 요약:")
print("  1. GraphCypherQAChain으로 기본 Text2Cypher를 구현했습니다")
print("  2. Few-shot 예시로 Cypher 생성 정확도를 향상시켰습니다")
print("  3. 6단계 검증 + 자가 수정 Agent를 구축했습니다")
print("  4. Vector + Graph + RRF 하이브리드 검색을 구현했습니다")
print("  5. 의도 분류 → 검색 → 답변 생성 전체 파이프라인을 완성했습니다")
print("")
print("다음 단계: Part 7 - 실무 적용 가이드 (프로덕션 배포)")