# 지능형 RAG 워크플로우 구현하기

**LangGraph를 활용한 지능형 RAG 시스템**

이번 실습에서는 LangGraph의 핵심 요소인 **상태(State)**, **노드(Node)**, **엣지(Edge)** 를 활용하여 다음과 같은 RAG Agent 시스템을 구축해보겠습니다:

### 시스템 주요 기능
1. **지능형 문서 검색**: Vector Store에서 관련 문서 자동 검색
2. **결과 품질 평가**: LLM이 검색 결과의 충분성을 자동 판단
3. **동적 웹 검색**: 필요시 실시간 웹 검색으로 정보 보강
4. **사용자 개입 지점**: 웹 검색 결과 추가에 대한 사용자  승인
5. **지식베이스 확장**: 승인된 정보의 자동 학습 및 저장
6. **컨텍스트 기반 답변**: 모든 정보를 종합한 최종 답변 생성

### 워크플로우 흐름도
```
                                                                                       
 START → 검색 → 평가 → (불충분) → 웹검색 → 저장여부(사용자) → (저장) → 답변생성 → END  
                 ↓                           ↓                                         
               (충분)                     (저장안함)                                   
                 ↓                           ↓                                         
               답변생성                     답변생성                                   
                 ↓                           ↓                                         
                END                         END                                        
                                                                                       
```


## 1. 환경 설정 및 기본 모듈 로드


In [6]:
import os
from dotenv import load_dotenv

# .env 파일에서 환경변수 로드
load_dotenv()

# API 키 확인
openai_api_key = os.getenv('OPENAI_API_KEY')

print(f"OPENAI_API_KEY 설정: {openai_api_key[:10] if openai_api_key else 'None'}...")


OPENAI_API_KEY 설정: sk-proj-qg...


## 2. 상태(State) 정의

그래프의 전체 워크플로우 동안 데이터를 관리할 중앙 상태 객체를 정의합니다.


In [None]:
from typing import TypedDict, List, Dict, Any

# 워크플로우에서 사용할 상태 정의
# 일단 필요해 보이는 상태를 전부 입력하고 이후 리팩토링 시 정리할 것
class RAGState(TypedDict):
    question: str                    # 사용자의 원본 질문
    documents: List[Dict[str, Any]]  # 1차 Retriever 검색 결과
    web_results: List[Dict[str, Any]] # 웹 검색 결과 (필요시)
    is_sufficient: bool              # 1차 검색 결과의 충분성 여부
    user_approval: str               # 웹 결과 추가에 대한 사용자 승인 ("yes" or "no")
    final_answer: str                # LLM이 생성한 최종 답변

## 3. RAG 기반 시스템 구축

기존 수업자료의 RAG 구성 요소들을 활용하여 Vector Store와 검색 시스템을 구축합니다.


In [8]:
# PDF 문서 로딩 및 처리
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS

# 1. 문서 로딩
loader = PyPDFLoader("../data/[AI.GOV_해외동향]_2025-1호.pdf")
docs = loader.load()

# 2. 텍스트 분할
text_splitter = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", ". ", " "],
    chunk_size=1000,
    chunk_overlap=100
)
chunks = text_splitter.split_documents(docs)

# 3. 임베딩 및 벡터 저장소 생성
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vector_store = FAISS.from_documents(documents=chunks, embedding=embeddings)

# 4. Retriever 생성
retriever = vector_store.as_retriever(
    search_type="mmr",
    search_kwargs={
        "k": 3,
        "lambda_mult": 0.8
    }
)

AuthenticationError: Error code: 401 - {'error': {'message': 'Incorrect API key provided: sk-proj-********************************************************************************************************************************************************XxwA. You can find your API key at https://platform.openai.com/account/api-keys.', 'type': 'invalid_request_error', 'param': None, 'code': 'invalid_api_key'}}

## 4. LLM 및 웹 검색 도구 설정


In [4]:
from langchain.chat_models import init_chat_model
from langchain_community.tools.tavily_search import TavilySearchResults

# LLM 설정
llm = init_chat_model("gpt-4o-mini", model_provider="openai")

# 웹 검색 도구 설정
web_search_tool = TavilySearchResults(
    max_results=3,
    include_answer=True,
    include_raw_content=False,
    search_depth="advanced"
)

  web_search_tool = TavilySearchResults(


## 5. 노드(Node) 함수들 구현

각 기능 단위를 독립적인 파이썬 함수로 정의합니다.


### 5.1 검색 노드 (retrieve_node)


In [None]:
# 초기 정보 검색
def retrieve_node(state: RAGState) -> RAGState:

    # Vector Store에서 관련 문서 검색
    documents = retriever.invoke(state['question'])
    
    # Documents를 딕셔너리 형태로 변환
    doc_list = []
    for doc in documents:
        doc_list.append({
            "content": doc.page_content,
            "metadata": doc.metadata,
            "source": "vector_store"
        })
    
    # 상태 업데이트
    return {"documents" : doc_list}

### 5.2 평가 노드 (evaluate_node)
- 검색된 문서가 질문에 답변하기에 충분한지 LLM을 통해 평가합니다.

In [None]:
from langchain_core.prompts import ChatPromptTemplate

def evaluate_node(state: RAGState) -> RAGState:
    
    # 평가용 프롬프트 생성
    # 여기서는 YES/NO 로만 판단하고 있지만, 실제 사용시에는 수치적으로 비교하는게 필요
    # 유사도 0.8 정확도 0.8 이상 등 수치를 사용할 것
    evaluation_prompt = ChatPromptTemplate.from_messages([
        ("system", """
당신은 검색 결과의 품질을 평가하는 전문가입니다.
주어진 문서들이 사용자의 질문에 답변하기에 충분한지 판단해주세요.

평가 기준:
1. 문서들이 질문과 직접적으로 관련이 있는가?
2. 질문에 대한 구체적이고 충분한 정보가 포함되어 있는가?
3. 답변을 작성하기에 필요한 핵심 내용이 모두 포함되어 있는가?

응답은 반드시 'YES' 또는 'NO'로만 답변해주세요.
YES: 충분함, NO: 불충분함
        """),
        ("user", """
질문: {question}

검색된 문서들:
{documents}

이 문서들이 질문에 답변하기에 충분한가요? (YES/NO)
        """)
    ])
    
    # 문서 내용 정리
    # 검색된 모든 문서의 내용을 하나의 문자열로 결합
    # - 각 문서는 "문서 1:", "문서 2:" 등의 번호를 붙여 구분
    # 문서 내용을 하나로 합치기
    doc_contents = ""
    for i, doc in enumerate(state['documents']):
        doc_contents += f"문서 {i+1}: {doc['content']}\n\n"
    
    # 마지막 줄바꿈 제거
    doc_contents = doc_contents.rstrip()
    
    # LLM 평가 실행
    evaluation_chain = evaluation_prompt | llm
    response = evaluation_chain.invoke({
        "question": state['question'],
        "documents": doc_contents
    })
    
    # 결과 판단
    # LLM이 소문자로 응답할 가능성도 있으므로 예외 처리
    # LLM의 응답을 대문자로 변환하여 'YES'가 포함되어 있는지 확인

    # - 응답이 'YES'인 경우: 검색된 문서가 충분함 (True)
    # - 응답이 'NO'인 경우: 검색된 문서가 불충분함 (False)
    is_sufficient = "YES" in response.content.upper()
    
    # 상태 업데이트
    return {"is_sufficient" : is_sufficient}


### 5.3 웹 검색 노드 (web_search_node)
- 정보 보강: evaluate_node에서 결과가 불충분하다고 판단했을 때, 웹 검색을 통해 추가 정보를 수집합니다.

In [None]:
def web_search_node(state: RAGState) -> RAGState:
    # 웹 검색 실행
    search_results = web_search_tool.invoke({"query": state['question']})
    
    # 검색 결과를 딕셔너리 형태로 변환
    web_docs = []
    for result in search_results:
        web_docs.append({
            "content": result.get("content", ""),
            "title": result.get("title", ""),
            "url": result.get("url", ""),
            "source": "web_search"
        })
    
    
    return {"web_results" : web_docs}

### 5.4 사용자 승인 노드 (human_approval_node)


#### Human In The Loop (HITL)
- AI 시스템과 사람의 협업 구조  
**인공지능(AI)** 모델의 훈련, 검증, 운영 과정에 사람이 개입하여 의사결정을 보완하는 방식.

**HITL(Human In The Loop)의 필요성**
- AI의 불완전성  
    - 데이터 편향(Bias)과 불확실성(Uncertainty) 존재
    - AI 단독 의사결정의 위험성
    - 사람의 검증과 보완 필요

In [None]:
def human_approval_node(state: RAGState) -> RAGState:
    
    # print("🐻웹 검색 결과:")
    # for result in state['web_results']:
    #     print(f"\n제목: {result['title']}")
    #     content = result['content'][:200] + "..." if len(result['content']) > 200 else result['content']
    #     print(f"내용: {content}")

    # 사용자 승인 요청
    user_input = input("\n웹 검색 결과를 저장할까요? (y/n): ")
    
    if user_input.lower() == 'y':
        approval = "yes"
    else:
        approval = "no"

    # 상태 업데이트
    return {"user_approval" : approval}

### 5.5 문서 추가 노드 (add_documents_node)
- 지식 확장: 사용자가 승인한 웹 검색 결과를 Vector Store에 추가합니다.

In [None]:
from langchain_core.documents import Document
def add_documents_node(state: RAGState) -> RAGState:
    # 웹 검색 결과를 Document 객체로 변환
    new_docs = []
    for result in state['web_results']:
        doc = Document(
            page_content=result['content'],
            metadata={
                'title': result.get('title', ''),
                'url': result.get('url', ''),
                'source': 'web_search',
            }
        )
        new_docs.append(doc)
    
    # Vector Store에 문서 추가
    # 추후 파일형태로 저장가능
    global vector_store
    vector_store.add_documents(new_docs)
    
    # 기존 문서에 웹 검색 결과 추가
    combined_docs = state['documents'] + state['web_results']
    
    
    return {"documents" : combined_docs}

### 5.6 답변 생성 노드 (generate_node)


In [None]:
def generate_node(state: RAGState) -> RAGState:
    
    # 답변 생성용 프롬프트
    generation_prompt = ChatPromptTemplate.from_messages([
        ("system", """
당신은 전문적인 AI 어시스턴트입니다. 
주어진 문서들을 바탕으로 사용자의 질문에 대해 정확하고 상세한 답변을 제공해주세요.

답변 작성 가이드라인:
1. 제공된 문서의 내용만을 사용하여 답변하세요
2. 구체적인 사실과 데이터를 포함하세요
3. 문서에서 직접 인용할 때는 따옴표를 사용하세요
4. 만약 충분한 정보가 없다면 그렇게 명시하세요
5. 답변은 한국어로 작성하고, 친근하고 이해하기 쉽게 설명하세요
        """),
        ("user", """
질문: {question}

참고 문서들:
{documents}

위 문서들을 바탕으로 질문에 답변해주세요.
        """)
    ])
    
    
    all_docs = state.get('documents', [])
    
    # 모든 문서의 내용을 하나의 문자열로 합치기
    doc_contents = ""

    for doc in all_docs:
        source = doc.get('source', 'unknown')
        title = doc.get('title', '')
        content = doc['content']
        doc_contents += f"[{source}] {title}\n{content}\n\n"


    # 답변 생성
    generation_chain = generation_prompt | llm

    response = generation_chain.invoke({
        "question": state['question'],
        "documents": doc_contents
    })
    
    final_answer = response.content
    
    
    # 상태 업데이트
    return {"final_answer" : final_answer}

## 6. 조건부 분기 함수 구현

그래프의 동적 흐름을 제어하는 조건부 분기 함수들을 정의합니다.

In [None]:
def decide_to_search_web(state: RAGState) -> str:
    
    if state['is_sufficient']:
        return "generate"
    else:
        return "web_search"

def decide_to_add_documents(state: RAGState) -> str:
    
    if state['user_approval'] == "yes":
        return "add_documents"
    else:
        return "generate"



## 7. 그래프 구성 및 엣지 연결

모든 노드를 연결하여 완전한 RAG 워크플로우를 구성합니다.


In [None]:
from langgraph.graph import StateGraph, START, END

# 1. 상태 그래프 생성
workflow = StateGraph(RAGState)

# 2. 모든 노드를 그래프에 추가
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("evaluate", evaluate_node)
workflow.add_node("web_search", web_search_node)
workflow.add_node("human_approval", human_approval_node)
workflow.add_node("add_documents", add_documents_node)
workflow.add_node("generate", generate_node)

# 3. 엣지 연결
# 시작 → 검색
workflow.add_edge(START, "retrieve")

# 검색 → 평가
workflow.add_edge("retrieve", "evaluate")

# 평가 → 조건부 분기 (충분하면 generate, 불충분하면 web_search)
workflow.add_conditional_edges(
    "evaluate",
    decide_to_search_web,
    {
        "generate": "generate",
        "web_search": "web_search"
    }
)

# 웹 검색 → 사용자 승인
workflow.add_edge("web_search", "human_approval")

# 사용자 승인 → 조건부 분기 (승인시 add_documents, 거부시 generate)
workflow.add_conditional_edges(
    "human_approval",
    decide_to_add_documents,
    {
        "add_documents": "add_documents",
        "generate": "generate"
    }
)

# 문서 추가 → 답변 생성
workflow.add_edge("add_documents", "generate")

# 답변 생성 → 종료
workflow.add_edge("generate", END)

# 4. 그래프 컴파일
graph = workflow.compile()



## 8. 그래프 시각화


draw_mermaid_png() 함수에서 문제가 발생하여 mermaid 문법으로 받은걸 직접 변환해서 확인해보겠습니다.

https://www.mermaidchart.com/play

In [None]:
print(graph.get_graph().draw_mermaid())

In [None]:
# Memory 기능을 위한 import
from langgraph.checkpoint.memory import MemorySaver

# In-Memory Checkpointer 생성
# 메모리에 대화 상태를 저장하여 세션 동안 대화 기록 유지
memory = MemorySaver()

graph_with_memory = workflow.compile(checkpointer=memory)

In [None]:
from langchain_core.runnables import RunnableConfig

user1 = RunnableConfig(
    recursion_limit=30,
    configurable={"thread_id": "1"}
)

## 시스템 테스트

구축한 RAG 워크플로우를 실제로 테스트해봅니다.


### 테스트 1: 정보가 부족하여 검색해야 할 경우


In [None]:
# 초기 상태 설정
initial_state = {
    "question": "미국의 AI 정책 동향에 대해 설명해주세요",
    "documents": [],
    "web_results": [],
    "is_sufficient": False,
    "user_approval": "",
    "final_answer": ""
}

print("테스트 1 시작: 정보가 부족하여 검색하는 경우")
print("=" * 60)

# 그래프 실행
result = graph_with_memory.invoke(initial_state, config=user1)

print("최종 답변:")
print("=" * 60)
print(result["final_answer"])


### 테스트 2: 정보가 생성되어 검색이 필요없어진 경우

In [None]:
# 초기 상태 설정
initial_state = {
    "question": "미국의 AI 정책 동향에 대해 설명해주세요",
    "documents": [],
    "web_results": [],
    "is_sufficient": False,
    "user_approval": "",
    "final_answer": ""
}

print("테스트 2 시작: 추가적인 정보검색이 필요없어질 경우")
print("=" * 60)

# 그래프 실행
result = graph_with_memory.invoke(initial_state, config=user1)

print("최종 답변:")
print("=" * 60)
print(result["final_answer"])


### 시스템 상태 확인 및 분석

실행된 워크플로우의 상태와 과정을 분석해봅니다.


In [None]:
# 첫 번째 세션의 상태 기록 확인
print("세션 1 실행 기록 분석:")
print("=" * 50)

for i, state_snapshot in enumerate(graph_with_memory.get_state_history(user1)):
    print(f"\n단계 {i}:")
    print(f"  다음 노드: {state_snapshot.next}")
    print(f"  질문: {state_snapshot.values.get('question', ' ')[:50]}...")
    print(f"  검색된 문서 수: {len(state_snapshot.values.get('documents', []))}")
    print(f"  충분성 평가: {state_snapshot.values.get('is_sufficient', ' ')}")
    print(f"  웹검색결과 수: {len(state_snapshot.values.get('web_results', []))}")
    print(f"  사용자 응답: {state_snapshot.values.get('user_approval', '')}")
    print(f"  답변 생성 여부: {'예' if state_snapshot.values.get('final_answer') else '아니오'}")
    


임시 챗봇처럼 사용해보기

In [None]:
def interactive_rag_chat():
    """
    사용자와 인터랙티브하게 질의응답을 진행하는 함수
    """
    print("RAG 챗봇에 오신 것을 환영합니다!")
    print("'quit' 또는 'exit'를 입력하면 종료됩니다.")
    print("=" * 60)
    
    session_counter = 1
    
    while True:
        # 사용자 질문 입력
        user_question = input("\n❓ 질문을 입력하세요: ").strip()
        
        # 종료 조건 확인
        if user_question.lower() in ['quit', 'exit', '종료', '끝']:
            print("👋 RAG 챗봇을 종료합니다. 감사합니다!")
            break
        
        if not user_question:
            print("❌ 질문을 입력해주세요.")
            continue
        
        # 새로운 세션 설정
        session_config = RunnableConfig(
            recursion_limit=50,
            configurable={"thread_id": f"interactive_session_{session_counter}"}
        )
        
        # 질문 처리
        question_state = {
            "question": user_question,
            "documents": [],
            "web_results": [],
            "is_sufficient": False,
            "user_approval": "",
            "final_answer": ""
        }
        
        try:
            print("\n처리 중...")
            result = graph_with_memory.invoke(question_state, config=session_config)
            
            print("\n" + "=" * 60)
            print("답변:")
            print("=" * 60)
            print(result["final_answer"])
            print("=" * 60)
            
        except Exception as e:
            print(f"처리 중 오류가 발생했습니다: {e}")
        
        session_counter += 1


### 챗봇 실행하기


In [None]:
# # 인터랙티브 챗봇 실행
# interactive_rag_chat()