라이브러리 임포트



In [None]:
from typing import Annotated, Optional, Literal, List, Dict, Any
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
import enum
import os
from dotenv import load_dotenv
import uuid

# LangChain 및 기타 필요한 라이브러리 임포트
from langchain_anthropic import ChatAnthropic
from langchain.prompts import ChatPromptTemplate
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.messages.base import BaseMessage

# LangGraph 관련 임포트
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver

# 기존 RAG 코드에서 가져온 컴포넌트
from langchain_upstage import UpstageDocumentParseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from sentence_transformers import SentenceTransformer

# 랭퓨즈
from langfuse import Langfuse, get_client
from langfuse.langchain import CallbackHandler

# 스트리밍
import asyncio
from langchain_core.output_parsers import StrOutputParser

설정 및 초기화

In [None]:
# .env 파일 로드 (필요한 경우)
load_dotenv()



유틸리티 함수

In [72]:
# Format documents function
def format_docs(docs):
    """Format documents into a single string"""
    return "\n\n".join(doc.page_content for doc in docs)


# Initialize document loading and processing
def initialize_rag_components(file_path: str = "./test_modified.pdf"):
    """Initialize all components for RAG"""
    print("문서 로딩 중...")

    # Document loading
    loader = UpstageDocumentParseLoader(
        file_path,
        split="page",
        output_format="markdown",
        ocr="auto",
        coordinates=True,
    )
    docs = loader.load()
    print(f"문서 로딩 완료: {len(docs)} 페이지")

    # Document chunking
    print("문서 청킹 중...")
    splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=300)
    docs_splitter = splitter.split_documents(docs)
    print(f"청킹 완료: {len(docs_splitter)} 청크")

    # Embedding model
    print("임베딩 모델 로딩 중...")
    # Change 'mps' to 'cuda' for NVIDIA GPUs or 'cpu' if you don't have GPU
    device = "cpu"  # 기본값으로 CPU 사용
    try:
        import torch

        if torch.cuda.is_available():
            device = "cuda"
        elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
            device = "mps"
    except ImportError:
        pass

    print(f"사용 중인 디바이스: {device}")

    hf_embeddings = HuggingFaceEmbeddings(
        model_name="intfloat/multilingual-e5-large-instruct",
        model_kwargs={"device": device},
        encode_kwargs={"normalize_embeddings": True},
    )

    # Vector store
    print("벡터 스토어 생성 중...")
    vectorstore = FAISS.from_documents(
        documents=docs_splitter,
        embedding=hf_embeddings,
    )
    print("벡터 스토어 생성 완료")

    # Retriever
    retriever = vectorstore.as_retriever(
        search_type="similarity",
        search_kwargs={"k": 5},
    )

    # LLM
    print("LLM 초기화 중...")
    llm = ChatAnthropic(
        model="claude-3-haiku-20240307",
        temperature=0,
        streaming=True,  # 🔥 스트리밍 활성화
    )
    print("초기화 완료!")

    return {
        "retriever": retriever,
        "llm": llm,
    }

모델 클래스

In [73]:
# Define categories for query classification
class Category(enum.Enum):
    DOCUMENT = "document"  # 문서 관련 질문
    GENERAL = "general"  # 일반적인 질문
    GREETING = "greeting"  # 인사말


# Pydantic model for structured output
class QueryClassification(BaseModel):
    """사용자 쿼리 분류 모델"""

    category: Category = Field(
        description="쿼리를 카테고리화 하세요. DOCUMENT(문서 관련 질문) / GENERAL(일반적인 질문) / GREETING(인사) 중에 하나로 구분하세요."
    )
    reasoning: str = Field(description="왜 이 카테고리를 선택했는지 설명하세요.")


# Define the state structure
class State(TypedDict):
    messages: Annotated[List[BaseMessage], add_messages]
    context: List[str]
    category: Optional[str]


# Router function for categorizing queries
def router(state: State) -> Dict[str, Any]:
    """사용자 쿼리를 카테고리로 분류하는 라우터"""
    print("쿼리 분류 중...")

    # Get the most recent user message
    user_message = state["messages"][-1].content

    # Create the router input
    router_input = f"""
    다음 사용자 쿼리를 분석하고 카테고리를 결정하세요.
    카테고리:
    - document: 문서 내용에 관한 질문 (예: "아주대학교에 대해 알려줘", "이 문서에서 중요한 내용은?")
    - general: 일반적인 질문으로, 문서와 관련이 없음 (예: "오늘 날씨 어때?", "파이썬이란?")
    - greeting: 인사말 (예: "안녕", "반가워", "뭐해?")
    
    쿼리: {user_message}
    """

    # Get LLM
    llm = rag_components["llm"]

    # Structured output with the classification model
    structured_llm = llm.with_structured_output(QueryClassification)

    # Get classification with Langfuse tracking
    classification = structured_llm.invoke(router_input)

    category = classification.category.value
    print(f"분류 결과: {category} (이유: {classification.reasoning})")

    return {"category": category}


# Conditional routing function
def route_by_category(state: State) -> Literal["document_qa", "general_qa", "greeting"]:
    """카테고리에 기반하여 다음 노드를 결정"""
    category = state.get("category", "").lower()

    if category == "document":
        return "document_qa"
    elif category == "general":
        return "general_qa"
    elif category == "greeting":
        return "greeting"
    else:
        # 기본값은 일반 질의응답
        return "general_qa"


# Define LangGraph nodes
def retrieve_documents(state: State) -> Dict[str, Any]:
    """문서에서 관련 내용 검색"""
    print("문서 검색 중...")

    # Get the most recent user message
    user_message = state["messages"][-1]

    # Retrieve documents
    retriever = rag_components["retriever"]
    docs = retriever.invoke(user_message.content)

    # Format documents
    formatted_docs = format_docs(docs)
    print(f"검색 완료: {len(docs)} 문서 찾음")

    # Return updated state
    return {"context": [formatted_docs]}


def document_qa(state: State) -> Dict[str, Any]:
    """문서 기반 질의응답"""
    print("문서 기반 응답 생성 중...")
    context = state["context"][0] if state["context"] else "문서 정보 없음"
    # Get user message
    user_message = state["messages"][-1].content

    # 2. 이전 대화들은 별도로 히스토리 구성
    history_messages = state["messages"][:-1]  # [사용자1, AI1, 사용자2, AI2, ...]
    formatted_history = ""
    for msg in history_messages:
        role = "사용자" if isinstance(msg, HumanMessage) else "AI"
        formatted_history += f"{role}: {msg.content}\n"

    # Create prompt
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                """너는 친절한 한국어 AI 비서야. 
            제공된 문서 내용(context)과 이전 대화 내용을 참고해서 질문에 답해.
            반드시 한국어로만 대답하고, 문서에 없는 내용은 대답하지 말고 모른다고 해.
            
            참고 문서:
            {context}
            
            이전 대화:
            {chat_history}
            """,
            ),
            ("user", "{user_input}"),
        ]
    )

    # Invoke the LLM with Langfuse tracking
    llm = rag_components["llm"]

    # Create formatted message for LLM
    formatted_message = prompt.format_messages(
        context=context,  # ← 이 부분이 누락되어 있었음!
        chat_history=formatted_history,
        user_input=user_message,
    )

    # Get response from LLM with Langfuse tracking
    ai_response = llm.invoke(formatted_message)
    response_content = ai_response.content
    print("문서 기반 응답 생성 완료")

    # Return the assistant message
    return {"messages": [AIMessage(content=response_content)]}


def general_qa(state: State) -> Dict[str, Any]:
    """일반 질의응답"""
    print("일반 응답 생성 중...")

    # Get user message
    user_message = state["messages"][-1].content

    # 2. 이전 대화들은 별도로 히스토리 구성
    history_messages = state["messages"][:-1]  # [사용자1, AI1, 사용자2, AI2, ...]
    formatted_history = ""
    for msg in history_messages:
        role = "사용자" if isinstance(msg, HumanMessage) else "AI"
        formatted_history += f"{role}: {msg.content}\n"

    # Create prompt
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                """너는 친절한 한국어 AI 비서야. 
            사용자의 질문에 대해 간결하고 정확하게 답변해. 한국어로 대답해.
            
            이전 대화:
            {chat_history}
            """,
            ),
            ("user", "{user_input}"),
        ]
    )

    # Invoke the LLM with Langfuse tracking
    llm = rag_components["llm"]

    # Create formatted message for LLM
    formatted_message = prompt.format_messages(
        user_input=user_message,  # ← 이 부분이 누락되어 있었음!
        chat_history=formatted_history,  # ← 이 부분이 누락되어 있었음!
    )

    # Get response from LLM with Langfuse tracking
    ai_response = llm.invoke(formatted_message)
    response_content = ai_response.content

    print("일반 응답 생성 완료")

    # Return the assistant message
    return {"messages": [AIMessage(content=response_content)]}


def greeting(state: State) -> Dict[str, Any]:
    """인사말에 응답"""
    print("인사 응답 생성 중...")

    # Get user message
    user_message = state["messages"][-1].content

    # 2. 이전 대화들은 별도로 히스토리 구성
    history_messages = state["messages"][:-1]  # [사용자1, AI1, 사용자2, AI2, ...]
    formatted_history = ""
    for msg in history_messages:
        role = "사용자" if isinstance(msg, HumanMessage) else "AI"
        formatted_history += f"{role}: {msg.content}\n"

    # Create prompt
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                """너는 친절한 한국어 AI 비서야.
            사용자의 인사에 친근하고 따뜻하게 응답해. 간결하게 한국어로 대답해.

            이전 대화:
            {chat_history}
            """,
            ),
            ("user", "{user_input}"),  # ✅ 수정된 부분
        ]
    )

    # Invoke the LLM with Langfuse tracking
    llm = rag_components["llm"]  # 변수 변경하기
    # Create formatted message for LLM
    formatted_message = prompt.format_messages(
        user_input=user_message,
        chat_history=formatted_history,  # ← 이 부분이 누락되어 있었음!
    )

    # Get response from LLM with Langfuse tracking
    ai_response = llm.invoke(formatted_message)
    response_content = ai_response.content

    print("인사 응답 생성 완료")

    # Return the assistant message
    return {"messages": [AIMessage(content=response_content)]}

그래프 노드 함수 및 그래프 빌드 및 실행

In [74]:
conversation_history = []  # 전체 대화 히스토리 저장


# Run the graph with a given user input
def run_graph(user_input: str):
    global conversation_history

    # 새로운 사용자 메시지를 히스토리에 추가
    user_message = HumanMessage(content=user_input)
    conversation_history.append(user_message)

    """Run the graph with a user input and return the response"""
    # Create initial state with the user message
    initial_state = {
        "messages": conversation_history,  # 전체 대화 히스토리가 누적된 배열
        "context": [],
        "category": None,
    }

    # Run the graph and get the final state
    result = graph.invoke(initial_state)

    # Extract the AI response
    if "messages" in result and len(result["messages"]) > 1:
        # Get the assistant message (should be the last one)
        ai_msg = result["messages"][-1]
        if isinstance(ai_msg, AIMessage):
            # ✅ AI 응답을 conversation_history에 추가 (핵심 수정사항!)
            conversation_history.append(ai_msg)
            return ai_msg.content

    return "응답을 생성할 수 없습니다."


# Interactive chat interface
def interactive_chat():
    """Interactive chat interface for the RAG system"""
    print("=" * 60)
    print("🤖 LangGraph 라우팅 RAG 챗봇 시작 (멀티턴 대화 지원)")
    print("💡 '종료' 입력 시 대화를 끝냅니다.")
    print("💡 '히스토리' 입력 시 현재 대화 히스토리를 확인합니다.")
    print("=" * 60)

    try:
        while True:
            try:
                user_input = input("\n🙋 사용자: ").strip()

                if not user_input:
                    print("❗ 메시지를 입력해주세요.")
                    continue

                if user_input.lower() == "종료":
                    print("👋 채팅을 종료합니다!")
                    break

                # 히스토리 확인 명령어 추가
                if user_input.lower() == "히스토리":
                    print("\n=== 현재 대화 히스토리 ===")
                    for i, msg in enumerate(conversation_history):
                        msg_type = "사용자" if isinstance(msg, HumanMessage) else "AI"
                        content = (
                            msg.content[:100] + "..."
                            if len(msg.content) > 100
                            else msg.content
                        )
                        print(f"{i+1}. {msg_type}: {content}")
                    print(f"총 {len(conversation_history)}개 메시지")
                    print("=" * 30)
                    continue

                # Get response
                print("🤖 AI: ", end="", flush=True)
                response = run_graph(user_input)
                print(response)

            except KeyboardInterrupt:
                print("\n👋 채팅을 종료합니다!")
                break

    except Exception as e:
        print(f"오류 발생: {str(e)}")
        import traceback

        traceback.print_exc()


# Global variable for RAG components
rag_components = None
graph = None

# Main execution
if __name__ == "__main__":
    try:
        # Initialize RAG components
        print("RAG 컴포넌트 초기화 중...")
        rag_components = initialize_rag_components()

        # Build the LangGraph
        print("LangGraph 구성 중...")
        graph_builder = StateGraph(State)

        # Add nodes
        graph_builder.add_node("router", router)
        graph_builder.add_node("retrieve", retrieve_documents)
        graph_builder.add_node("document_qa", document_qa)
        graph_builder.add_node("general_qa", general_qa)
        graph_builder.add_node("greeting", greeting)

        # Add edges
        graph_builder.add_edge(START, "router")

        # Add conditional edges based on the category
        graph_builder.add_conditional_edges(
            "router",
            route_by_category,
            {
                "document_qa": "retrieve",
                "general_qa": "general_qa",
                "greeting": "greeting",
            },
        )

        # Connect retrieve to document_qa
        graph_builder.add_edge("retrieve", "document_qa")

        # Connect all output nodes to END
        graph_builder.add_edge("document_qa", END)
        graph_builder.add_edge("general_qa", END)
        graph_builder.add_edge("greeting", END)

        # Compile the graph
        graph = graph_builder.compile().with_config(
            config={"callbacks": [langfuse_handler]}
        )
        print("LangGraph 구성 완료!")

        # Try to visualize the graph if possible
        try:
            print("그래프 시각화 시도 중...")
            from IPython.display import Image, display

            display(Image(graph.get_graph().draw_mermaid_png()))
            print("그래프 시각화 완료!")
        except Exception as e:
            print(f"그래프 시각화 실패: {e}")

        # Start interactive chat
        interactive_chat()

    except Exception as e:
        print(f"초기화 중 오류 발생: {str(e)}")
        import traceback

        traceback.print_exc()

RAG 컴포넌트 초기화 중...
문서 로딩 중...
초기화 중 오류 발생: HTTP error: {"error":{"message":"API key suspended due to insufficient credit. Register your payment method at https://console.upstage.ai/billing to continue.","type":"invalid_request_error","param":"","code":"api_key_is_not_allowed"}}


Traceback (most recent call last):
  File "/Users/baegjonghun/.pyenv/versions/3.11.10/lib/python3.11/site-packages/langchain_upstage/document_parse_parsers.py", line 181, in _get_response
    response.raise_for_status()
  File "/Users/baegjonghun/.pyenv/versions/3.11.10/lib/python3.11/site-packages/requests/models.py", line 1026, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 401 Client Error: Unauthorized for url: https://api.upstage.ai/v1/document-ai/document-parse

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/var/folders/4n/tjmm0jpn36jdkkfvpw1g79zw0000gn/T/ipykernel_77504/2322629006.py", line 97, in <module>
    rag_components = initialize_rag_components()
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/4n/tjmm0jpn36jdkkfvpw1g79zw0000gn/T/ipykernel_77504/3891573948.py", line 20, in initialize_rag_components
    docs = loader.load()
        