In [22]:
# API 키를 환경변수로 관리하기 위한 설정 파일
from dotenv import load_dotenv

# API 키 정보 로드
load_dotenv()

True

# Agentic RAG

In [23]:
# 적재된 Pinecone DB에서 데이터 불러오기
import os
from pinecone import Pinecone
from langchain_pinecone import PineconeVectorStore
from dotenv import load_dotenv
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from typing import Annotated, List, Dict, Any, TypedDict
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langgraph.prebuilt.tool_node import ToolNode
from langchain_core.tools import create_retriever_tool
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate

# langsmith 추적
load_dotenv()

# API키 불러오기
open_api_key = os.environ['OPENAI_API_KEY']
pinecone_api = os.environ['PINECONE_API_KEY']

# OpenAI 임베딩 생성
embeddings = OpenAIEmbeddings(
    model="text-embedding-ada-002",  # OpenAI의 임베딩 모델 사용
    api_key=open_api_key)

# 인덱스 가져오기
pc = Pinecone(api_key=pinecone_api)
index_name = "quickstart"
index = pc.Index(index_name)

# PineconeVectorStore를 사용하여 벡터 스토어 생성
vectorstore = PineconeVectorStore(index=index, 
                                  embedding=embeddings, 
                                  text_key="page_content")

# 1) pinecone retriever 설정
pinecone_retriever = vectorstore.as_retriever(search_kwargs={"k": 10})

# 2) reranker 설정
# reranker = HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-v2-m3")
# compressor = CrossEncoderReranker(model=reranker, 
#                                     top_n=5)
# retriever = ContextualCompressionRetriever(base_retriever=pinecone_retriever, 
#                                           base_compressor=compressor)

In [24]:
# Pinecone 문서 구조 확인
sample_docs = pinecone_retriever.invoke("test query")  # 임의의 테스트 쿼리로 문서 검색
for doc in sample_docs:
    print("Document Structure:")
    print("Page Content:", doc.page_content)
    print("Metadata:", doc.metadata)
    print("-" * 50)

Document Structure:
Page Content: - - - - -
Metadata: {'author': 'j o u r n a l h o m e p a g e : w w w . e l s e v i e r . c o m', 'category': ['Perioperative', 'PediatricSpecific'], 'id': 194.0, 'page': 7.0, 'paper_title': 'Anaesthesia Critical Care & Pain Medicine', 'source': '1-s2.0-S2352556824000626-main.pdf', 'summary': 'Tranexamic acid reduces blood loss in various surgeries without increasing complications, recommended for orthopedic and cardiac surgeries. Limited data for colorectal surgery.', 'type': 'list', 'year': 2055.0}
--------------------------------------------------
Document Structure:
Page Content: 3.
Metadata: {'author': 'a Children’s Hospital, Rotterdam, the Netherlands\nDepartment of Neonatal and Pediatric Intensive Care, Division of Neonatology, Erasmus MC - Sophia\nb Children’s Hospital, Rotterdam, the Netherlands\nDepartment of Anesthesiology, Erasmus MC - Sophia\nc Children’s Hospital, Rotterdam, the Netherlands\nDepartment of Pediatric Surgery, Erasmus MC - S

## Tool 정의

In [26]:
from langchain_core.tools import create_retriever_tool
from langchain_core.prompts import PromptTemplate

retriever_tool = create_retriever_tool(
    pinecone_retriever,
    name="pediatric_guideline_tool",
    description="Retrieve information from pediatric anesthesia guideline documents stored in Pinecone.",
    document_prompt=PromptTemplate.from_template(
        "<document>\n<context>{page_content}</context>\n<metadata><source>{source}</source><page>{page}</page><category>{category}</category><year>{year}</year><summary>{summary}</summary></metadata>\n</document>"
    )
)

## State 정의

In [27]:
# --- 0. 상태 정의 ---
class AgenticState(TypedDict):
    question: Annotated[str, "User question"]
    memory_hit: bool
    answers: Annotated[List[str], "Generated answers"]
    clarified_question: str
    clarification_needed: bool
    subqueries: Annotated[List[str], "Decomposed sub-queries"]
    retrieval_needed: bool
    documents: Annotated[List[Dict], "Retrieved documents"]
    memory: Annotated[List[Dict[str, str]], "Memory store"]
    meta_filter: Dict[str, Any]
    tool_calls: Annotated[List[Dict], "Tool calls"]

# Nodes

In [None]:
# --- 1. agent Node ---
# Agentic RAG의 시작점으로, LLM이 질문을 분석해 도구 호출(retriever_tool) 여부를 결정.
# bind_functions로 도구를 바인딩하고, function_call을 통해 도구 호출 여부를 확인.
# 도구가 필요하면 tool_calls와 retrieval_needed를 설정, 그렇지 않으면 직접 답변 생성.

def agent_node(state: AgenticState):
    llm = ChatOpenAI(model="gpt-4o", temperature=0).bind_tools(tools=[retriever_tool])  # bind_functions -> bind_tools
    question = state.get("clarified_question") or state["question"]
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are an expert in pediatric anesthesia. Analyze the question and either call a tool or generate an answer directly."),
        ("human", "{question}")
    ]).format(question=question)
    response = llm.invoke(prompt)
    tool_calls = response.additional_kwargs.get("tool_calls", []) if hasattr(response, "additional_kwargs") else []
    if tool_calls:
        return {"tool_calls": tool_calls, "retrieval_needed": True}  # 모든 tool_calls 반환
    return {"answers": [response.content], "retrieval_needed": False}

In [44]:
import json

# agent_node 테스트 (수정)
def test_agent_node():
    # 테스트용 상태
    test_state = {"question": "What is the risk of VTE?", "memory": []}
    
    # agent_node 실행
    result = agent_node(test_state)
    
    # 결과 출력
    print("Agent Node Result:", result)
    if "tool_calls" in result:
        print("Tool Calls Detected:", result["tool_calls"])
        # tool_calls 내부 구조에 따라 쿼리 추출
        if result["tool_calls"]:
            tool_call = result["tool_calls"][0]
            if "function" in tool_call and "arguments" in tool_call["function"]:
                arguments_str = tool_call["function"]["arguments"]
                try:
                    arguments = json.loads(arguments_str)  # JSON 파싱
                    query = arguments.get("query", "No query found")
                    print("Query in Tool Call:", query)
                except json.JSONDecodeError:
                    print("Failed to parse arguments:", arguments_str)
            else:
                print("Query not found in expected structure. Tool Call Details:", tool_call)
        print("Retrieval Needed:", result["retrieval_needed"])
    else:
        print("Direct Answer Generated:", result["answers"])
        print("Retrieval Needed:", result["retrieval_needed"])

# 테스트 실행
test_agent_node()

Agent Node Result: {'tool_calls': [{'id': 'call_X3jfl076yA1sCWXAYuZs0fZu', 'function': {'arguments': '{"query":"risk of VTE in pediatric anesthesia"}', 'name': 'pediatric_guideline_tool'}, 'type': 'function'}], 'retrieval_needed': True}
Tool Calls Detected: [{'id': 'call_X3jfl076yA1sCWXAYuZs0fZu', 'function': {'arguments': '{"query":"risk of VTE in pediatric anesthesia"}', 'name': 'pediatric_guideline_tool'}, 'type': 'function'}]
Query in Tool Call: risk of VTE in pediatric anesthesia
Retrieval Needed: True


In [45]:
# --- 2. Memory Lookup Node ---
# 이전에 동일하거나 유사한 질문에 대한 답변이 메모리에 있는지 확인. NLP로 유사도 계산.

from sentence_transformers import SentenceTransformer
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

# 임베딩 모델 초기화
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

def check_memory(state: AgenticState):
    question = state["question"]
    memory = state.get("memory", [])
    
    # 메모리가 비어 있으면 바로 종료
    if not memory:
        return {"memory_hit": False}
    
    # 질문 임베딩 생성
    question_embedding = embedding_model.encode(question, convert_to_tensor=True)
    
    # 메모리 항목 임베딩 생성 및 유사도 계산
    max_similarity = 0.0
    best_match = None
    for entry in memory:
        memory_embedding = embedding_model.encode(entry["question"], convert_to_tensor=True)
        similarity = cosine_similarity(question_embedding.reshape(1, -1), memory_embedding.reshape(1, -1))[0][0]
        if similarity > max_similarity:
            max_similarity = similarity
            best_match = entry
    
    # 유사도 임계값 (0.85 이상이면 유사 질문으로 간주)
    threshold = 0.85
    if max_similarity >= threshold and best_match:
        return {"answers": [best_match["answer"]], "memory_hit": True, "similarity_score": max_similarity}
    return {"memory_hit": False, "similarity_score": max_similarity}

# 테스트
test_state = {"question": "What is the risk of VTE?", "memory": [{"question": "소아마취 시 프로포폴과 케타민 사용 시 주의점", "answer": "호흡억제 주의 필요"}]}
result = check_memory(test_state)
print("Check Memory Result:", result)

The installed version of bitsandbytes was compiled without GPU support. 8-bit optimizers, 8-bit multiplication, and GPU quantization are unavailable.


Check Memory Result: {'memory_hit': False, 'similarity_score': 0.073943645}


In [None]:
# --- 3. Clarification Node (질문이 불명확하면 사용자 입력 요청) ---
# 질문이 명확한지 판단하고, 명확하지 않으면 추가 정보를 요청.

def clarification(state: AgenticState):
    question = state.get("clarified_question") or state["question"]
    llm = ChatOpenAI(model="gpt-4o", temperature=0)
    clarify_prompt = ChatPromptTemplate.from_messages([
        ("system", "You are an expert in pediatric anesthesia. Determine if the user's question is clear and specific."),
        ("human", "Question: '{question}'\nIs this question clear and specific? Respond with 'yes' or 'no' only.")
    ]).format(question=question)
    res = llm.invoke(clarify_prompt).content.strip().lower()
    if res == "no":
        clar_prompt = ChatPromptTemplate.from_messages([
            ("system", "Provide a concise request for additional information to clarify the question."),
            ("human", "Question: '{question}'\nWhat additional information is needed? Respond in one sentence.")
        ]).format(question=question)
        clar_msg = llm.invoke(clar_prompt).content.strip()
        return {"clarification_needed": True, "clarify_msg": clar_msg}
    return {"clarification_needed": False, "clarified_question": question}

In [9]:
# --- 4. Clarification Response Node (사용자 추가 입력 받아서 state에 반영) ---
### 수정해야함.
def clarification_response(state: AgenticState):
    # 실제 프론트/채팅 인터페이스라면 사용자 입력을 받는 자리!
    # 여기선 예시로 그냥 state["clarified_question"]을 그대로 사용하거나,
    # 실제 상황에서는 input() 등으로 받아서 업데이트 가능.
    clarified = state.get("clarified_question") or state["question"]
    return {"clarified_question": clarified}

In [10]:
# --- 5. Additional information required? (LLM만으로 답변 가능?) ---
def answerable_by_llm(state: AgenticState):
    question = state.get("clarified_question") or state["question"]
    llm = ChatOpenAI(model="gpt-4o", temperature=0)
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are an expert in pediatric anesthesia. Determine if external document retrieval is needed."),
        ("human", "Can you answer this question with high confidence without external documents?\nQuestion: {question}\nRespond with 'yes' or 'no' only.")
    ]).format(question=question)
    res = llm.invoke(prompt).content.strip().lower()
    return {"retrieval_needed": res == "no"}

* 수정해야함.

In [11]:
# --- 6. Query Routing Node (메타필터/도구선택 분리) ---
### 수정해야함.
def query_routing(state: AgenticState):
    question = state.get("clarified_question") or state["question"]
    meta_filter = {}
    if any(keyword in question.lower() for keyword in ["drug", "약물", "medication"]):
        meta_filter["metadata"] = {"category": "drug"}
    elif any(keyword in question.lower() for keyword in ["recent", "최신", "최근"]):
        meta_filter["metadata"] = {"year": ">=2020"}
    return {"meta_filter": meta_filter}

In [12]:
# --- 7. Sub-query 분해 노드 ---
def decompose_query(state: AgenticState):
    question = state.get("clarified_question") or state["question"]
    llm = ChatOpenAI(model="gpt-4o", temperature=0)
    prompt = ChatPromptTemplate.from_messages([
        ("system", "Decompose the question into LLM-friendly sub-queries, one per line. If not a compound question, return the original question."),
        ("human", "Question: {question}")
    ]).format(question=question)
    response = llm.invoke(prompt).content.strip().split('\n')
    subqueries = [q.strip() for q in response if q.strip()]
    return {"subqueries": subqueries}

In [13]:
# --- 8. Retrieval Node (ToolNode) ---
retriever_tool_node = ToolNode(tools=[retriever_tool])

In [14]:
#  --- 9. 문서 relevance 체크 노드 ---
def check_relevance(state: AgenticState):
    llm = ChatOpenAI(model="gpt-4o", temperature=0)
    docs = state.get("documents", [])
    subqueries = state["subqueries"]
    for doc, subquery in zip(docs, subqueries):
        prompt = ChatPromptTemplate.from_messages([
            ("system", "You are an expert in pediatric anesthesia. Assess document relevance."),
            ("human", "Document: {doc}\nQuestion: {subquery}\nIs this document relevant to the question? Respond with 'yes' or 'no' only.")
        ]).format(doc=doc["page_content"], subquery=subquery)
        res = llm.invoke(prompt).content.strip().lower()
        if res != "yes":
            return "retry"
    return "proceed"

In [15]:
# --- 10. 답변 생성 노드 ---
def generate_answer(state: AgenticState):
    llm = ChatOpenAI(model="gpt-4o", temperature=0)
    answers = []
    subqueries = state.get("subqueries", [state["question"]])
    docs = state.get("documents", [{"page_content": ""}] * len(subqueries))
    context = "\n\n".join([doc["page_content"] for doc in docs])
    for q, doc in zip(subqueries, docs):
        source = doc.get("metadata", {}).get("source", "unknown_document")
        page = doc.get("metadata", {}).get("page", "unknown")
        prompt = PromptTemplate.from_template(
            """You are an assistant specialized in question-answering tasks based on medical research papers.
Use the following pieces of retrieved context to answer the question. If you don't know the answer, simply say that you don't know.
Answer in Korean.

# Direction:
1. Understand the intent of the question and provide the most accurate answer.
2. Identify and select the most relevant content from the retrieved context that directly relates to the question.
3. Construct a concise and logical answer by rearranging the selected information into coherent paragraphs.
4. If there is no relevant context for the question, state: "I can't find an answer to that question in the materials I have."
5. Present your answer in a table of key points where applicable.
6. Include all sources and their corresponding whole page numbers in your answer.
7. Write your answer entirely in Korean.
8. Be as detailed as possible in your answer.
9. Begin your answer with "This answer is based on content found in the document **📚" and end with "**📌 [document_name]" — here, [document_name] should be replaced with the document_name from the metadata.
10. Page numbers should be whole numbers.

#Context: 
{context}

###

#Example Format:
**📚 문서에서 검색한 내용 기반 답변입니다**

(Detailed answer to the question)

**📌 출처**
- document_name, 192쪽
- document_name, 192쪽
- ...

###

#Question:
{q}

#Answer:"""
        )
        ans = llm.invoke(prompt.format(context=context, q=q)).content.strip()
        # Add source if answer is valid
        if "I can't find an answer" not in ans and "모름" not in ans:
            ans += f"\n\n**📌 출처**\n- {source}, {page}쪽"
        answers.append(ans)
    return {"answers": answers}

In [16]:
# --- 11. 답변 검토 노드 ---
def review_answer(state: AgenticState):
    for a in state["answers"]:
        if len(a) < 20 or "모름" in a or "I can't find an answer" in a:
            return "regen"
    return "accept"

In [17]:
# --- 12. Memory update ---
def update_memory(state: AgenticState):
    mem = state.get("memory", [])
    question = state.get("clarified_question") or state["question"]
    answer = "\n".join(state["answers"])
    if len(mem) > 10:  # Limit memory size
        mem.pop(0)
    mem.append({"question": question, "answer": answer})
    return {"memory": mem}

In [19]:
# --- 13. Format response ---
def format_response(state: AgenticState):
    return {"response": "\n\n".join(state["answers"])}

In [20]:
# --- 14. LangGraph StateGraph 연결 ---
builder = StateGraph(AgenticState)
builder.add_node("agent", agent_node)
builder.add_node("check_memory", check_memory)
builder.add_node("clarification", clarification)
builder.add_node("answerable_by_llm", answerable_by_llm)
builder.add_node("query_routing", query_routing)
builder.add_node("decompose_query", decompose_query)
builder.add_node("retrieve_docs", retriever_tool_node)
builder.add_node("check_relevance", check_relevance)
builder.add_node("generate_answer", generate_answer)
builder.add_node("review_answer", review_answer)
builder.add_node("update_memory", update_memory)
builder.add_node("format_response", format_response)

<langgraph.graph.state.StateGraph at 0x1c5e24f61e0>

In [None]:
# Edges
builder.set_entry_point("agent")
builder.add_conditional_edges(
    "agent",
    lambda s: "tool" if s.get("tool_calls") else "memory",
    {"tool": "retrieve_docs", "memory": "check_memory"}
)
builder.add_conditional_edges(
    "check_memory",
    lambda s: "hit" if s["memory_hit"] else "miss",
    {"hit": "format_response", "miss": "clarification"}
)
builder.add_conditional_edges(
    "clarification",
    lambda s: "clarify" if s["clarification_needed"] else "ok",
    {"clarify": "clarification_response", "ok": "answerable_by_llm"}
)
builder.add_conditional_edges(
    "answerable_by_llm",
    lambda s: "direct" if not s["retrieval_needed"] else "retrieval",
    {"direct": "generate_answer", "retrieval": "query_routing"}
)
builder.add_edge("query_routing", "decompose_query")
builder.add_edge("decompose_query", "retrieve_docs")
builder.add_conditional_edges(
    "retrieve_docs",
    lambda s: check_relevance(s),
    {"proceed": "generate_answer", "retry": "decompose_query"}
)
builder.add_edge("generate_answer", "review_answer")
builder.add_conditional_edges(
    "review_answer",
    lambda s: "accept" if all(len(a) > 20 and "모름" not in a and "I can't find an answer" not in a for a in s["answers"]) else "regen",
    {"accept": "update_memory", "regen": "generate_answer"}
)
builder.add_edge("update_memory", "format_response")
builder.add_edge("format_response", END)

app = builder.compile()

ValueError: At 'clarification' node, 'condition' branch found unknown target 'clarification_response'

In [None]:
# Example usage
if __name__ == "__main__":
    state = {"question": "소아마취 시 프로포폴과 케타민의 병용 주의사항은?", "memory": []}
    for step in app.stream(state):
        print(step)