In [None]:
!pip install langgraph

ModularRAG 
[[https://arxiv.org/pdf/2407.21059]]

In [6]:
from langchain_openai import OpenAIEmbeddings
from langchain_openai import ChatOpenAI
from langchain_postgres import PGVector
import os 
from dotenv import load_dotenv

load_dotenv()

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)    
embeddings = OpenAIEmbeddings()

vectorstore = PGVector(
    collection_name="chapter6_collection",
    embeddings=embeddings,
    connection=f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@{os.getenv('PGVECTOR_HOST')}:{os.getenv('PGVECTOR_PORT')}/{os.getenv('POSTGRES_DB')}",
    use_jsonb=True,
)
retriever = vectorstore.as_retriever()


In [11]:
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field

# 문서 검색 필요 여부 검사
class NeedRetrieval(BaseModel):
    need_retrieval: str = Field(description="문서 검색이 필요한지 여부(Y or N)")
    reason: str = Field(description="문서 검색이 필요하거나 필요하지 않은 이유")

structured_llm = llm.with_structured_output(NeedRetrieval)

system = """
당신은 사용자의 질문을 분석하여 문서 검색이 필요한지 여부를 판단하는 역할을 합니다.
다음과 같은 경우에는 문서 검색이 필요합니다:
- 사실 기반 정보를 요구하는 질문
- 특정 데이터나 통계를 요구하는 질문
- 역사적 사건이나 날짜에 관한 질문

다음과 같은 경우에는 문서 검색이 필요하지 않습니다:
- 일반적인 대화나 인사
- 개인적인 의견을 묻는 질문
- 창의적인 내용 생성 요청
- 간단한 계산이나 논리적 추론만 필요한 질문

질문을 분석하고 문서 검색이 필요한지 여부와 그 이유를 제공해주세요.
"""

prompt = ChatPromptTemplate.from_messages([
    ("system", system),
    ("user", "{question}")
])

need_retrieval_chain = prompt | structured_llm


In [None]:
need_retrieval_chain.invoke("안녕하세요?")

In [14]:
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field

class GradeDocuments(BaseModel):
    score : int = Field(description="The score of the document(from 0 to 10)")

structured_llm = llm.with_structured_output(GradeDocuments)

system = """
당신은 문서와 질문을 받아서 문서의 정확도를 평가하는 역할을 합니다.
문서의 정확도를 평가하는 기준은 다음과 같습니다.
- 문서가 질문에 대해 정확하게 답변하였는가?
- 문서가 질문에 대해 중요한 내용을 포함하였는가?
- 문서가 질문에 대해 불필요한 내용을 포함하였는가?

문서의 정확도를 0부터 10까지의 점수로 평가해주세요.
"""

prompt = ChatPromptTemplate.from_messages([
    ("system", system),
    ("user", "문서 : {documents}"), 
    ("user", "질문 : {question}"),
])


grade_documents_chain = prompt | structured_llm 

In [None]:
grade_documents_chain.invoke({"documents": "일시불 결제 시 캐시백 해줍니다.", "question": "할부전환에 필요한 조건은 무엇인가?"})

In [121]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

from langchain_core.prompts import ChatPromptTemplate

system = """
당신은 신용카드사의 친절한 상담사 입니다.
사용자의 질문에 대해 친절하게 답변해주세요.
문서가 제공되는 경우 문서의 내용을 참고하여 답변해주세요.
"""
prompt = ChatPromptTemplate.from_messages([
    ("system", system),
    ("user", "질문: {question}"),
    ("user", "문서: {documents}")
])

def format_docs(docs):
    return "\n".join([f"문서 {i+1}: {doc.page_content}" for i, doc in enumerate(docs)]) 

generate_chain = prompt | llm | StrOutputParser()


In [122]:
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field

class Groundedness(BaseModel):
    score: int = Field(description="문서의 근거성 점수(0부터 10까지)")
    explanation: str = Field(description="점수에 대한 설명")

structured_llm = llm.with_structured_output(Groundedness)

system = """
당신은 사용자의 질문과 LLM의 응답을 받아서 응답의 근거성(groundedness)을 평가하는 역할을 합니다.
근거성이란 응답이 사용자의 질문과 문서에 기반하여 작성되었는지를 의미합니다.

근거성을 평가하는 기준은 다음과 같습니다:
- 응답의 내용이 사용자의 질문과 문서에 명시적으로 언급되어 있는가?
- 응답이 사용자의 질문과 문서의 내용을 왜곡하거나 과장하지 않았는가?
- 응답이 사용자의 질문과 문서에 없는 내용을 추가하지 않았는가?

근거성을 0부터 10까지의 점수로 평가하고, 그 이유를 설명해주세요.
"""

prompt = ChatPromptTemplate.from_messages([
    ("system", system),
    ("user", "질문: {question}"),
    ("user", "문서: {documents}"),
    ("user", "응답: {response}"),
])

groundedness_chain = prompt | structured_llm


In [None]:
groundedness_chain.invoke(
    {"question": "할부전환에 필요한 조건은 무엇인가?",
     "documents": "일시불 결제 시 캐시백 해줍니다.", 
     "response": "1개월 이전 승인건만 가능합니다"}
)


In [124]:
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field

class QueryRewrite(BaseModel):
    rewritten_query: str = Field(description="재작성된 쿼리")
    explanation: str = Field(description="쿼리 재작성 이유 설명")

structured_llm = llm.with_structured_output(QueryRewrite)

system = """
당신은 사용자의 질문을 분석하고 더 효과적인 검색을 위해 질문을 재작성하는 역할을 합니다.
원래 질문의 의도를 유지하면서 검색 엔진에서 더 관련성 높은 결과를 얻을 수 있도록 질문을 재작성해주세요.

쿼리 재작성 시 고려할 사항:
- 불필요한 단어나 문구 제거
- 핵심 키워드 강조
- 동의어나 관련 용어 추가
- 질문의 의도를 명확히 표현

재작성된 쿼리와 재작성 이유를 함께 제공해주세요.
"""

prompt = ChatPromptTemplate.from_messages([
    ("system", system),
    ("user", "원래 질문: {question}"),
])

query_rewrite_chain = prompt | structured_llm 


In [None]:
query_rewrite_chain.invoke({"question": "할부전환에 필요한 조건은 무엇인가?"})


In [126]:
from typing import Annotated, List
from typing_extensions import TypedDict

class GraphState(TypedDict):
    question : Annotated[str, "사용자의 질문"]
    documents: Annotated[List[str], "검색된 문서"]
    generation : Annotated[str, "LLM의 응답"]
    have_to_retrieve : Annotated[str, "문서 검색 필요 여부"]


##

## 노드 정의

In [127]:

def need_retrieval(state:GraphState) -> GraphState:
    print("NEED RETRIEVAL....")
    question = state["question"]
    have_to_retrieve = need_retrieval_chain.invoke({
        "question": question
    })
    return {'have_to_retrieve': have_to_retrieve.need_retrieval, 'documents':[]}

## 문서 검색
def retrieve(state:GraphState) -> GraphState:
    print("RETRIEVE....")
    question = state["question"]
    documents = retriever.invoke(question)
    return {'documents': documents}

## 문서 평가
def grade_documents(state:GraphState) -> GraphState:
    print("GRADE DOCUMENTS....")
    question = state["question"]
    documents = state["documents"]
    
    filtered_docs = []
    for d in documents:
        score = grade_documents_chain.invoke({
            "documents": d,
            "question": question
        })
        if score.score >= 6:
            filtered_docs.append(d)
    
    return {'documents': filtered_docs}

## 응답 생성
def generate(state:GraphState) -> GraphState:
    print("GENERATE....")
    question = state["question"]
    documents = state["documents"]

    generation = generate_chain.invoke({
        "question": question,
        "documents": format_docs(documents)
    })
    return {'generation': generation}

## 쿼리 재작성
def rewrite_query(state:GraphState) -> GraphState:
    print("REWRITE QUERY....")
    question = state["question"]
    rewritten_query = query_rewrite_chain.invoke({
        "question": question
    })
    return {'question': rewritten_query.rewritten_query}


## 조건부 엣지 정의

In [128]:

## 문서 검색 필요 여부 검사
def decide_need_retrieval(state:GraphState) -> GraphState:
    print("DECISION NEED RETRIEVAL....")
    have_to_retrieve = state["have_to_retrieve"]

    if have_to_retrieve == "Y":
        return 'retrieve'
    else:
        return 'generate'


def decide_groundedness(state:GraphState) -> GraphState:
    print("DECISION GROUNDEDNESS....")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    
    groundedness = groundedness_chain.invoke({
        "question": question,
        "documents": documents,
        "response": generation
    })
    if groundedness.score >= 5:
        return 'end'
    else:
        return 'rewrite_query'


## 그래프 생성

In [129]:
from langgraph.graph import StateGraph, START, END

workflow = StateGraph(GraphState)

workflow.add_node("need_retrieval", need_retrieval)
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("generate", generate)
workflow.add_node("rewrite_query", rewrite_query)

workflow.add_edge(START, "need_retrieval")
workflow.add_conditional_edges(
    "need_retrieval",
    decide_need_retrieval,
    {
        'retrieve': 'retrieve',
        'generate': 'generate'
    }
)
workflow.add_edge('retrieve', 'grade_documents')
workflow.add_edge('grade_documents', 'generate')

workflow.add_conditional_edges(
    'generate',
    decide_groundedness,
    {
        'end': END,
        'rewrite_query': 'rewrite_query'
    }
)

workflow.add_edge('rewrite_query', "need_retrieval")

app = workflow.compile()


In [None]:
print ( app.get_graph().draw_mermaid() )

In [None]:
from langchain_core.runnables import RunnableConfig
import uuid
config = RunnableConfig(recursion_limit=15, configurable={'thread_id':uuid.uuid4()})
response = app.invoke({'question' :"적립서비스가 제공되는 전월 실적 기준은 얼마인가요?"}, config=config)

In [None]:
print(response)

In [None]:
response