# 웹 검색 기반 리서치 시스템 - LangGraph HITL (Human-in-the-Loop) 활용

---

## 환경 설정 및 준비

`(1) Env 환경변수`

In [None]:
from dotenv import load_dotenv
load_dotenv()

`(2) 기본 라이브러리`

In [None]:
import os
from glob import glob
from pprint import pprint
import json
from datetime import datetime

`(3) Langsmith tracing 설정`

In [None]:
# Langsmith tracing 여부 확인
print(f"LangSmith 추적 상태: {os.getenv('LANGSMITH_TRACING')}")

---

## **Human-in-the-Loop (사용자 개입)**

- **HITL**는 AI 시스템에 인간의 판단과 전문성을 통합

- **Breakpoints**로 특정 단계에서 실행 중지 가능
    - **Breakpoint**는 LangGraph의 **체크포인트 기능** 기반으로 작동하는 시스템
    - 각 노드 실행 후 그래프의 상태를 **스레드에 저장**하여 나중에도 접근 가능
    - 그래프 실행을 특정 지점에서 **일시 중지**하고 사용자 승인 후 재개 가능

- 사용자의 **입력**이나 **승인**을 기다리는 패턴으로 작동

- 시스템 결정에 대한 **인간의 통제**와 **검증** 보장

### **웹 검색 기반 리서치 시스템 구현**

- **주제 분석**: 사용자가 제공한 주제를 분석하여 검색 키워드 생성 (사용자가 직접 검토 및 수정 가능)
- **웹 검색**: 다양한 소스에서 관련 정보 수집
- **보고서 작성**: AI가 종합적인 리서치 보고서 초안 생성
- **사용자 검토**: 사용자가 보고서 내용을 검토하고 수정 요청
- **최종 완성**: 피드백을 반영한 최종 보고서 생성

### (1) 상태 정의 및 도구 설정

In [None]:
from typing import List, Dict, Annotated, Optional
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, START, END, MessagesState
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command, Send
from langchain_openai import ChatOpenAI
from langchain_tavily import TavilySearch
from langchain_core.messages import BaseMessage, AIMessage, HumanMessage
from langchain_core.prompts import ChatPromptTemplate
from IPython.display import Image, display
import operator

# ========================================
# State 정의 (Reducer 포함)
# ========================================

class ResearchState(MessagesState):
    topic: str                  # 연구 주제
    keywords: List[str]         # 검색 키워드

    ready_for_search: bool      # 검색 준비 완료 여부
    search_results: Annotated[List[Dict], operator.add]  # Reducer로 Send 결과 자동 병합
    report: str                 # 최종 보고서

    feedback: str               # 피드백

# ========================================
# LLM, 도구 설정
# ========================================

llm = ChatOpenAI(model="gpt-4.1-mini", temperature=0.7)
search_tool = TavilySearch(max_results=3)

### (2) 그래프 노드 정의 및 연결

- **주제 분석**: 사용자가 제공한 주제를 분석하여 검색 키워드 생성

In [None]:
# pydantic 모델 (구조화된 출력용)
class Keywords(BaseModel):
    """키워드 생성 결과"""
    keywords: List[str] = Field(description="생성된 키워드 목록")
    confidence: float = Field(description="키워드 신뢰도")


def generate_keywords(state: ResearchState) -> ResearchState:
    """Pydantic으로 구조화된 키워드 생성"""
    prompt = ChatPromptTemplate.from_messages([
        ("system", "전문 리서치 분석가로서 효과적인 검색 키워드를 생성하세요."),
        ("human", "주제: {topic}\n3-7개 키워드와 신뢰도(0-1)를 생성하세요.")
    ])
    # 구조화된 출력
    chain = prompt | llm.with_structured_output(Keywords)
    result = chain.invoke({"topic": state["topic"]})
    
    if result.keywords is None or len(result.keywords) == 0:
        return {
            "keywords": [],
            "messages": [AIMessage("키워드 생성에 실패했습니다.")]
        } # type: ignore
    
    return {
        "keywords": result.keywords,
        "messages": [AIMessage(f"키워드 {len(result.keywords)}개 생성 (신뢰도: {result.confidence:.0%})")]
    } # type: ignore

def review_keywords(state: ResearchState) -> ResearchState:
    """Human-in-the-loop: 키워드 검토"""
    keywords_list = "\n".join([f"{i+1}. {k}" for i, k in enumerate(state["keywords"])])

    # interrupt 사용자 입력 대기 (HITL) -> 사용자에게 요청 (사용자가 입력한 내용은 user_input에 저장)
    user_input = interrupt({
        "keywords": state["keywords"],
        "question": f"키워드 검토:\n{keywords_list}\n\n옵션:\n- '승인': 현재 키워드 사용\n- '재생성': 새 키워드 생성\n- '수정: 키워드1,키워드2,...': 직접 수정"
    })

    # 사용자가 입력한 피드백 저장
    return {"feedback": user_input}

def process_keyword_feedback(state: ResearchState) -> ResearchState:
    """키워드 피드백 처리 - 키워드 직접 수정"""
    feedback = state.get("feedback", "").strip()
    
    if not feedback or "승인" in feedback.lower():
        # 승인된 경우
        return {
            "keywords": state["keywords"], 
            "messages": [AIMessage("키워드가 승인되었습니다.")]
        }  # type: ignore

    elif "수정:" in feedback:
        # 직접 수정된 키워드 추출
        try:
            new_keywords_str = feedback.split("수정:", 1)[1].strip()
            new_keywords = [kw.strip() for kw in new_keywords_str.split(",") if kw.strip()]
            return {
                "keywords": new_keywords,
                "messages": [AIMessage(f"키워드가 수정되었습니다: {', '.join(new_keywords)}")]
            }  # type: ignore
        except:
            return {
                "keywords": [],
                "messages": [AIMessage("키워드 형식이 올바르지 않습니다. 재생성합니다.")]
            }  # type: ignore

    else:
        # 재생성 또는 기타 경우
        return {
            "keywords": [],
            "messages": [AIMessage("키워드를 재생성합니다.")]
        }  # type: ignore

`(1) 주제 분석 테스트를 위한 워크플로우 정의`

In [None]:
# ===== 워크플로우 그래프 구성 =====
def should_continue_after_review(state: ResearchState) -> str:
    """검토 후 다음 단계 결정"""
    feedback = state.get("feedback", "").strip().lower()

    # 피드백 유형에 따른 다음 단계 결정
    if not feedback or "승인" in feedback:
        return "approved"    # 승인된 경우

    elif "수정:" in feedback:
        return "process_feedback"    # 직접 수정된 키워드 처리

    else:
        return "regenerate"  # 재생성

# StateGraph 생성
workflow = StateGraph(ResearchState)

# 노드 추가
workflow.add_node("analyze_topic", generate_keywords)
workflow.add_node("review_keywords", review_keywords)
workflow.add_node("process_feedback", process_keyword_feedback)

# 엣지 연결
workflow.add_edge(START, "analyze_topic")
workflow.add_edge("analyze_topic", "review_keywords")

# 조건부 엣지
workflow.add_conditional_edges(
    "review_keywords",
    should_continue_after_review,
    {
        "process_feedback": "process_feedback",
        "regenerate": "analyze_topic",  # 재생성시 다시 키워드 생성으로
        "approved": END
    }
)

# 피드백 처리 후 종료
workflow.add_edge("process_feedback", END)

# 그래프 컴파일
graph = workflow.compile(checkpointer=InMemorySaver())

# 상태 그래프 시각화
display(Image(graph.get_graph().draw_mermaid_png()))

`(2) 워크플로우 시작`

- **interrupt**까지 실행

In [None]:
# 초기 상태 설정
initial_state = {
    "topic": "인공지능 기술 발전"
}

# 스레드 설정
config = {"configurable": {"thread_id": f"test_thread_1"}}


# 워크플로우 실행
# 첫 번째 실행 - 중단점에서 실행을 멈춤
for event in graph.stream(initial_state, config=config):
    print(f"Event: {event}")

`(3) 현재 상태 확인`

- **get_state** 함수로 현재 상태 확인 가능


In [None]:
# 현재 상태 - 그래프 체크포인트 확인
current_state = graph.get_state(config)

print(f"\n현재 상태: {current_state.values}")
print(f"대기 중인 interrupt: {current_state.tasks}")

In [None]:
# 다음에 실행될 노드를 확인 
current_state.next

In [None]:
# 대기 중인 interrupt 확인
task = current_state.tasks[0]
print(task.id)
print(task.name)
pprint(task.interrupts[0].value)

`(4) 사용자 개입 (HITL): 거부`

- 사용자가 승인을 해주지 않는 경우, 피드백을 통해 다시 생성
- **Command** 함수로 사용자 피드백 전달

In [None]:
from langgraph.types import Command

# 사용자 피드백 제공 (거부: 재생성 요청)
human_feedback = "재생성"

print(f"config: ", config)

# 사용자 입력으로 그래프 재개 (Command 사용)
for event in graph.stream(Command(resume=human_feedback), config=config):
    print(f"Event: {event}")

In [None]:
# 현재 상태 - 그래프 체크포인트 확인
current_state = graph.get_state(config)

print(f"\n현재 상태: {current_state.values}")
print(f"대기 중인 interrupt: {current_state.tasks}")

In [None]:
# 다음에 실행될 노드를 확인 
current_state.next

In [None]:
# 대기 중인 interrupt 확인
task = current_state.tasks[0]
print(task.id)
print(task.name)
pprint(task.interrupts[0].value)

`(5) 사용자 개입 (HITL): 수정`

- 사용자가 승인을 해주지 않는 경우, 피드백을 통해 다시 생성
- **Command** 함수로 사용자 피드백 전달

In [None]:
from langgraph.types import Command

# 사용자 피드백 제공 (거부: 직접 키워드 수정)
human_feedback = "수정: 생성형 AI, LLM 발전"

print(f"config: ", config)

# 사용자 입력으로 그래프 재개 (Command 사용)
for event in graph.stream(Command(resume=human_feedback), config=config):
    print(f"Event: {event}")

In [None]:
# 현재 상태 - 그래프 체크포인트 확인
current_state = graph.get_state(config)

print(f"\n현재 상태: {current_state.values}")
print(f"대기 중인 interrupt: {current_state.tasks}")

In [None]:
# 다음에 실행될 노드를 확인 
current_state.next

`(5) 사용자 개입 (HITL): 승인`

- 사용자가 '승인'을 해서 최종 답변을 생성하고 종료

In [None]:
# 초기 상태 설정
initial_state = {
    "topic": "기후변화 위기"
}

# 스레드 설정
config = {"configurable": {"thread_id": f"test_thread_2"}}


# 워크플로우 실행
# 첫 번째 실행 - 중단점에서 실행을 멈춤
for event in graph.stream(initial_state, config=config):
    print(f"Event: {event}")

In [None]:
# 현재 상태 - 그래프 체크포인트 확인
current_state = graph.get_state(config)

print(f"\n현재 상태: {current_state.values}")
print(f"대기 중인 interrupt: {current_state.tasks}")

In [None]:
# 피드백 요청 메시지 출력
print(current_state.tasks[0].interrupts[0].value.get("question"))

In [None]:
from langgraph.types import Command

# 사용자 피드백 제공 (승인 문자열을 사용)
human_feedback = "승인"

# 사용자 입력으로 그래프 재개
for event in graph.stream(Command(resume=human_feedback), config=config):
    print(f"Event: {event}")

In [None]:
# 현재 상태 - 그래프 체크포인트 확인
current_state = graph.get_state(config)

print(f"\n현재 상태: {current_state.values}")
print(f"대기 중인 interrupt: {current_state.tasks}")

In [None]:
# 다음 노드 확인
current_state.next

In [None]:
# 현재 상태 출력
pprint(current_state.values)

- **웹 검색**: 다양한 소스에서 관련 정보 수집

    - 맵-리듀스 패턴으로 병렬 검색 수행
    - `Send` 사용

In [None]:
results = search_tool.invoke("기후변화")
results

In [None]:
# ===== 검색 노드 =====
def ready_to_search(state: ResearchState) -> ResearchState:
    """검색 준비"""
    return {"ready_for_search": True}

def dispatch_searches(state: ResearchState) -> List[Send]:
    """병렬 검색 디스패치"""
    return [Send("search_one", {"keyword": kw}) for kw in state["keywords"]]

def search_one(state: Dict) -> ResearchState:
    """개별 검색 실행"""
    keyword = state["keyword"]
    try:
        results = search_tool.invoke(keyword)
        data = results.get("results", results) if isinstance(results, dict) else results
        return {
            "search_results": [{
                "keyword": keyword,
                "data": data if isinstance(data, list) else [],
                "success": True
            }]
        }  # type: ignore
    except Exception as e:
        return {
            "search_results": [{
                "keyword": keyword,
                "error": str(e),
                "success": False
            }]
        } # type: ignore


# ===== 보고서 생성  =====
class Report(BaseModel):
    """보고서 모델"""
    summary: str = Field(description="주제 요약")
    findings: List[str] = Field(description="주요 발견사항 리스트 (출처명, 링크 등을 포함)")
    recommendations: List[str] = Field(description="제언 리스트")

def generate_report(state: ResearchState) -> ResearchState:
    """LLM을 사용한 보고서 생성"""

    results = state.get("search_results", [])

    if not results:
        return {"report": f"# {state['topic']}\n\n❌ 검색 결과가 없습니다."}

    prompt = ChatPromptTemplate.from_messages([
        ("system", "당신은 리서치 전문가입니다. 검색 결과를 분석해 간결한 보고서를 작성하세요. 반드시 출처명과 링크를 명확히 표기하고, 신뢰할 수 없는 정보는 포함하지 마세요."),
        ("human", "주제: {topic}\n\n검색 결과:\n{content}")
    ])
    
    try:
        chain = prompt | llm.with_structured_output(Report)
        report = chain.invoke({
            "topic": state["topic"],
            "content": str(results)[:12000]  # 토큰 제한
        })
        
        # 인라인 마크다운 생성
        markdown = f"""# {state["topic"]}

## 📌 요약
{report.summary}

## 🔍 주요 발견사항
{chr(10).join(f"{i}. {finding}" for i, finding in enumerate(report.findings, 1))}

## 💡 제언
{chr(10).join(f"{i}. {rec}" for i, rec in enumerate(report.recommendations, 1))}

---
*생성: {datetime.now().strftime('%Y-%m-%d %H:%M')}*
*검색: {sum(1 for r in state['search_results'] if r.get('success'))}개 성공*"""
        
        return {"report": markdown}
        
    except Exception as e:
        return {"report": f"# {state['topic']}\n\n⚠️ 보고서 생성에 실패했습니다: {str(e)}"}

In [None]:
# ===== 워크플로우 구성 =====
# 1. ready_to_search (검색 준비)
# 2. dispatch_searches (병렬 검색 시작)
# 3. search_one (개별 검색 - 병렬)
# 4. generate_report (검색 결과 취합/리듀스) 

workflow = StateGraph(ResearchState)

# 노드 추가
workflow.add_node("analyze_topic", generate_keywords)
workflow.add_node("review_keywords", review_keywords)
workflow.add_node("process_feedback", process_keyword_feedback)

workflow.add_node("ready_to_search", ready_to_search)
workflow.add_node("search_one", search_one)  # Send의 타겟 노드
workflow.add_node("generate_report", generate_report)

# 엣지 연결
workflow.add_edge(START, "analyze_topic")
workflow.add_edge("analyze_topic", "review_keywords")

# 키워드 검토 후 조건부 라우팅
workflow.add_conditional_edges(
    "review_keywords",
    should_continue_after_review,
    {
        "process_feedback": "process_feedback",
        "regenerate": "analyze_topic",
        "approved": "ready_to_search"
    }
)

# 피드백 처리 후 검색
workflow.add_edge("process_feedback", "ready_to_search")

# Send 맵-리듀스 연결
workflow.add_conditional_edges(
    "ready_to_search",
    dispatch_searches,
    ["search_one"]
)
workflow.add_edge("search_one", "generate_report")

# 분석 완료 후 종료
workflow.add_edge("generate_report", END)

# 컴파일
graph = workflow.compile(checkpointer=InMemorySaver())

# 시각화
display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
# 초기 상태 설정
initial_state = {
    "topic": "2025 한국의 AI 산업"
}

# 스레드 설정
config = {"configurable": {"thread_id": f"test_thread_1"}}


# 워크플로우 실행
# 첫 번째 실행 - 중단점에서 실행을 멈춤
for event in graph.stream(initial_state, config=config):
    print(f"Event: {event}")

In [None]:
from langgraph.types import Command

# 사용자 피드백 제공 (승인 문자열을 사용)
human_feedback = "승인"

# 사용자 입력으로 그래프 재개
for event in graph.stream(Command(resume=human_feedback), config=config): 
    print(f"Event: {event}")

In [None]:
# 마지막 상태 출력
final_state = graph.get_state(config)

final_state.values

In [None]:
# 최종 검색 결과 출력
print(final_state.values.get("report", []))

- **사용자 검토**: 사용자가 보고서 내용을 검토하고 수정 요청

In [None]:
# ===== 보고서 검토 노드 (Human-in-the-loop) =====

def review_report(state: ResearchState) -> ResearchState:
    """Human-in-the-loop: 보고서 검토"""
    
    # 보고서 미리보기 (500자로 제한)
    preview = state["report"][:500] + "..." if len(state["report"]) > 500 else state["report"]
    
    # 사용자 입력 대기
    user_input = interrupt({
        "report_preview": preview,
        "instruction": "보고서를 검토하세요. '승인' 또는 구체적인 수정 요청을 입력하세요:"
    })
    
    return Command(
        goto=END if "승인" in user_input else "revise_report",
        update={"feedback": user_input}
    )  # type: ignore

- **최종 완성**: 피드백을 반영한 최종 보고서 생성

In [None]:
# ===== 보고서 수정 노드 =====
def revise_report(state: ResearchState) -> ResearchState:
    """피드백 처리 및 보고서 수정"""
    
    feedback = state.get("feedback", "").strip()
    
    # 승인인 경우 바로 완료
    if "승인" in feedback.lower() or "approve" in feedback.lower():
        return {
            "messages": [AIMessage("✅ 보고서가 승인되었습니다!")]
        }  # type: ignore

    # 수정 요청 처리
    if feedback:

        # 구조화된 프롬프트 템플릿
        revision_prompt = ChatPromptTemplate.from_messages([
            ("system", """당신은 전문적인 보고서 편집자입니다.
    사용자의 피드백을 반영하여 보고서를 수정하세요.

    
    ## 수정 원칙:
    1. 원본 보고서의 구조와 형식 유지
    2. 사용자 요청사항을 정확히 반영
    3. 전문적이고 명확한 문체 유지
    4. 마크다운 형식 준수"""),
            ("human", """## 현재 보고서:
    {report}

    ## 수정 요청사항:
    {feedback}

    위 요청사항을 반영하여 보고서를 수정해주세요.""")
        ])
        
        try:
            # LLM 체인 실행
            chain = revision_prompt | llm
            response = chain.invoke({
                "report": state["report"],
                "feedback": feedback
            })
            
            # 수정된 보고서 반환
            return {
                "report": response.content,
                "feedback": "",  # 피드백 초기화
                "messages": [AIMessage(f"📝 보고서가 수정되었습니다: {feedback[:50]}...")]
            }  # type: ignore

        except Exception as e:
            return {
                "messages": [AIMessage(f"❌ 수정 중 오류 발생: {str(e)[:100]}")]
            }  # type: ignore
    
    # 피드백이 없는 경우
    return {
        "messages": [AIMessage("⚠️ 피드백이 없어 보고서를 그대로 유지합니다.")]
    }  # type: ignore

In [None]:
# ===== 워크플로우 구성 =====
# 1. ready_to_search (검색 준비)
# 2. dispatch_searches (병렬 검색 시작)
# 3. search_one (개별 검색 - 병렬)
# 4. generate_report (검색 결과 취합/리듀스) 
# 5. review_report (보고서 검토)
# 6. revise_report (보고서 수정)

workflow = StateGraph(ResearchState)

# 노드 추가
workflow.add_node("analyze_topic", generate_keywords)
workflow.add_node("review_keywords", review_keywords)
workflow.add_node("process_feedback", process_keyword_feedback)

workflow.add_node("ready_to_search", ready_to_search)
workflow.add_node("search_one", search_one)  # Send의 타겟 노드
workflow.add_node("generate_report", generate_report)

workflow.add_node("review_report", review_report)
workflow.add_node("revise_report", revise_report)

# 엣지 연결
workflow.add_edge(START, "analyze_topic")
workflow.add_edge("analyze_topic", "review_keywords")

# 키워드 검토 후 조건부 라우팅
workflow.add_conditional_edges(
    "review_keywords",
    should_continue_after_review,
    {
        "process_feedback": "process_feedback",
        "regenerate": "analyze_topic",
        "approved": "ready_to_search"
    }
)

# 피드백 처리 후 검색
workflow.add_edge("process_feedback", "ready_to_search")

# Send 맵-리듀스 연결
workflow.add_conditional_edges(
    "ready_to_search",
    dispatch_searches,
    ["search_one"]
)
workflow.add_edge("search_one", "generate_report")


# 보고서 생성 후 검토
workflow.add_edge("generate_report", "review_report")
workflow.add_edge("revise_report", "review_report")

# 컴파일
graph = workflow.compile(checkpointer=InMemorySaver())

# 시각화
display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
# 초기 상태 설정
initial_state = {
    "topic": "LangGraph와 LlamaIndex 비교"
}
    
# 스레드 설정
config = {"configurable": {"thread_id": f"test_thread_1"}}


# 워크플로우 실행
# 첫 번째 실행 - 중단점에서 실행을 멈춤
for event in graph.stream(initial_state, config=config):
    print(f"Event: {event}")

In [None]:
from langgraph.types import Command

# 사용자 피드백 제공 (승인 문자열을 사용)
human_feedback = "승인"

# 사용자 입력으로 그래프 재개
for event in graph.stream(Command(resume=human_feedback), config=config): 
    print(f"Event: {event}")

In [None]:
# 상태 출력
current_state = graph.get_state(config)

current_state.values

In [None]:
# 다음 노드 출력
current_state.next

In [None]:
from langgraph.types import Command

# 사용자 피드백 제공 
human_feedback = "비교표를 만들어서 추가하세요."

# 사용자 입력으로 그래프 재개
for event in graph.stream(Command(resume=human_feedback), config=config): 
    print(f"Event: {event}")

In [None]:
# 상태 출력
current_state = graph.get_state(config)
print(current_state.values.get("report", []))

In [None]:
from langgraph.types import Command

# 사용자 피드백 제공 (승인 문자열을 사용)
human_feedback = "승인"

# 사용자 입력으로 그래프 재개
for event in graph.stream(Command(resume=human_feedback), config=config): 
    print(f"Event: {event}")

In [None]:
# 마지막 상태 출력
final_state = graph.get_state(config)

# 최종 검색 결과 출력
print(final_state.values.get("report", []))

In [None]:
# 다음 노드 출력
final_state.next