### [LangGraph를 활용한 RAG]

- (참고) 테디노트  
- https://console.upstage.ai/api-keys

In [None]:
# api key
from dotenv import load_dotenv

load_dotenv()

In [26]:
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_upstage import UpstageEmbeddings
from langchain_openai import ChatOpenAI
from langchain_community.document_loaders import PDFPlumberLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.prompts import ChatPromptTemplate, HumanMessagePromptTemplate, SystemMessagePromptTemplate
from operator import itemgetter
from typing import List

def load_documents(source_uris: List[str]):
    """문서 로드"""
    docs = []
    for source_uri in source_uris:
        loader = PDFPlumberLoader(source_uri)
        docs.extend(loader.load())
    return docs

def create_text_splitter(chunk_size=500, chunk_overlap=50):
    """텍스트를 분할하는 splitter 생성"""
    return RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)

def split_documents(docs, text_splitter):
    """텍스트를 분할"""
    return text_splitter.split_documents(docs)

def create_embedding():
    """임베딩 생성"""
    return UpstageEmbeddings(model="solar-embedding-1-large")

def create_vectorstore(split_docs):
    """벡터스토어 생성"""
    return FAISS.from_documents(
        documents=split_docs, embedding=create_embedding()
    )

def create_retriever(vectorstore, k=5):
    """retriever 생성"""
    return vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": k})

def create_model():
    """모델 생성"""
    return ChatOpenAI(model_name="gpt-4-turbo", temperature=0)

# def create_prompt():
#     """프롬프트 생성"""
#     return hub.pull("teddynote/rag-korean-with-source")


def create_prompt():
    """ChatPromptTemplate 객체로 프롬프트 생성"""
    prompt = ChatPromptTemplate.from_messages([
        SystemMessagePromptTemplate.from_template(
            '당신은 질문-답변(Question-Answering)을 수행하는 친절한 AI 어시스턴트입니다. '
            '당신의 임무는 주어진 문맥(context)에서 주어진 질문(question)에 답하는 것입니다.\n'
            '검색된 다음 문맥(context)을 사용하여 질문(question)에 답하세요. '
            '만약, 주어진 문맥(context)에서 답을 찾을 수 없다면, '
            '답을 모른다면 `주어진 정보에서 질문에 대한 정보를 찾을 수 없습니다`라고 답하세요.\n'
            '기술적인 용어나 이름은 번역하지 않고 그대로 사용해 주세요. '
            '출처(page, source)를 답변에 포함하세요. 답변은 한글로 답변해 주세요.'
        ),
        HumanMessagePromptTemplate.from_template(
            '#Question: \n{question} \n\n#Context: \n{context} \n\n#Answer:'
        )
    ])
    return prompt


def format_docs(docs):
    """문서 포맷팅"""
    return "\n".join(docs)

def create_chain(source_uris):
    """체인 생성"""
    docs = load_documents(source_uris)
    text_splitter = create_text_splitter()
    split_docs = split_documents(docs, text_splitter)
    vectorstore = create_vectorstore(split_docs)
    retriever = create_retriever(vectorstore)
    model = create_model()
    prompt = create_prompt()
    
    chain = (
        {"question": itemgetter("question"), "context": itemgetter("context")}
        | prompt
        | model
        | StrOutputParser()
    )
    return chain


In [27]:
source_uris = ["SPRI_AI_Brief_2023년12월호_F.pdf"]

# 문서 로드
docs = load_documents(source_uris)

# 텍스트 분할기 생성 및 문서 분할
text_splitter = create_text_splitter()
split_docs = split_documents(docs, text_splitter)

# 벡터스토어 생성 및 리트리버 생성
vectorstore = create_vectorstore(split_docs)
pdf_retriever = create_retriever(vectorstore)

# 모델 및 프롬프트 생성
model = create_model()
prompt = create_prompt()

# 체인 생성
pdf_chain = (
    {"question": itemgetter("question"), "context": itemgetter("context")}
    | prompt
    | model
    | StrOutputParser()
)


In [None]:
pdf_chain

In [None]:
pdf_retriever

In [30]:
from typing import TypedDict


# GraphState 상태를 저장하는 용도로 사용합니다.
class GraphState(TypedDict):
    question: str  # 질문
    context: str  # 문서의 검색 결과
    answer: str  # 답변
    relevance: str  # 답변의 문서에 대한 관련성

In [31]:
def format_docs(docs):
    return "\n".join(
        [
            f"<document><content>{doc.page_content}</content><source>{doc.metadata['source']}</source><page>{int(doc.metadata['page'])+1}</page></document>"
            for doc in docs
        ]
    )


def format_searched_docs(docs):
    return "\n".join(
        [
            f"<document><content>{doc['content']}</content><source>{doc['url']}</source></document>"
            for doc in docs
        ]
    )



In [35]:
from langchain_upstage import UpstageGroundednessCheck
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_community.tools.tavily_search import TavilySearchResults

# 업스테이지 문서 관련성 체크 기능을 설정합니다. https://upstage.ai
upstage_ground_checker = UpstageGroundednessCheck()


# 문서에서 검색하여 관련성 있는 문서를 찾습니다.
def retrieve_document(state: GraphState) -> GraphState:
    # 문서에서 검색하여 관련성 있는 문서를 찾습니다.
    retrieved_docs = pdf_retriever.invoke(state["question"])

    # 검색된 문서를 형식화합니다.
    retrieved_docs = format_docs(retrieved_docs)

    # 검색된 문서를 context 키에 저장합니다.
    return GraphState(context=retrieved_docs)


# LLM을 사용하여 답변을 생성합니다.
def llm_answer(state: GraphState) -> GraphState:
    question = state["question"]
    context = state["context"]

    # 체인을 호출하여 답변을 생성합니다.
    response = pdf_chain.invoke({"question": question, "context": context})

    return GraphState(answer=response)


def rewrite(state):
    question = state["question"]
    answer = state["answer"]
    context = state["context"]
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                "You are a professional prompt rewriter. Your task is to generate the question in order to get additional information that is now shown in the context."
                "Your generated question will be searched on the web to find relevant information.",
            ),
            (
                "human",
                "Rewrite the question to get additional information to get the answer."
                "\n\nHere is the initial question:\n ------- \n{question}\n ------- \n"
                "\n\nHere is the initial context:\n ------- \n{context}\n ------- \n"
                "\n\nHere is the initial answer to the question:\n ------- \n{answer}\n ------- \n"
                "\n\nFormulate an improved question in Korean:",
            ),
        ]
    )

    # Question rewriting model
    model = ChatOpenAI(temperature=0, model="gpt-4-turbo")

    chain = prompt | model | StrOutputParser()
    response = chain.invoke(
        {"question": question, "answer": answer, "context": context}
    )
    return GraphState(question=response)


def search_on_web(state: GraphState) -> GraphState:
    # 문서에서 검색하여 관련성 있는 문서를 찾습니다.
    search_tool = TavilySearchResults(max_results=5)
    search_result = search_tool.invoke({"query": state["question"]})

    # 검색된 문서를 형식화합니다.
    search_result = format_searched_docs(search_result)
    # 검색된 문서를 context 키에 저장합니다.
    return GraphState(
        context=search_result,
    )


def relevance_check(state: GraphState) -> GraphState:
    print("relevance_check", state)
    # 관련성 체크를 실행합니다. 결과: grounded, notGrounded, notSure
    response = upstage_ground_checker.run(
        {"context": state["context"], "answer": state["answer"]}
    )
    return GraphState(
        relevance=response, question=state["question"], answer=state["answer"]
    )


def is_relevant(state: GraphState) -> GraphState:
    return state["relevance"]

In [36]:
from langgraph.graph import END, StateGraph
from langgraph.checkpoint.memory import MemorySaver

# langgraph.graph에서 StateGraph와 END를 가져옵니다.
workflow = StateGraph(GraphState)

# 노드들을 정의합니다.
workflow.add_node("retrieve", retrieve_document)  # 에이전트 노드를 추가합니다.
workflow.add_node("llm_answer", llm_answer)  # 정보 검색 노드를 추가합니다.
workflow.add_node(
    "relevance_check", relevance_check
)  # 답변의 문서에 대한 관련성 체크 노드를 추가합니다.
workflow.add_node("rewrite", rewrite)  # 질문을 재작성하는 노드를 추가합니다.
workflow.add_node("search_on_web", search_on_web)  # 웹 검색 노드를 추가합니다.

# 각 노드들을 연결합니다.
workflow.add_edge("retrieve", "llm_answer")  # 검색 -> 답변
workflow.add_edge("llm_answer", "relevance_check")  # 답변 -> 관련성 체크
workflow.add_edge("rewrite", "search_on_web")  # 재작성 -> 관련성 체크
workflow.add_edge("search_on_web", "llm_answer")  # 웹 검색 -> 답변


# 조건부 엣지를 추가합니다.
workflow.add_conditional_edges(
    "relevance_check",  # 관련성 체크 노드에서 나온 결과를 is_relevant 함수에 전달합니다.
    is_relevant,
    {
        "grounded": END,  # 관련성이 있으면 종료합니다.
        "notGrounded": "rewrite",  # 관련성이 없으면 다시 답변을 생성합니다.
        "notSure": "rewrite",  # 관련성 체크 결과가 모호하다면 다시 답변을 생성합니다.
    },
)


workflow.set_entry_point("retrieve")

memory = MemorySaver()

app = workflow.compile(checkpointer=memory)

In [None]:
import pprint
from langgraph.errors import GraphRecursionError
from langchain_core.runnables import RunnableConfig

config = RunnableConfig(
    recursion_limit=12, configurable={"thread_id": "CORRECTIVE-SEARCH-RAG"}
)

# AgentState 객체를 활용하여 질문을 입력합니다.
inputs = GraphState(
    question="생성형 AI 가우스를 만든 회사의 2023년도 매출액은 얼마인가요?"
)

# app.stream을 통해 입력된 메시지에 대한 출력을 스트리밍합니다.
try:
    for output in app.stream(inputs, config=config):
        # 출력된 결과에서 키와 값을 순회합니다.
        for key, value in output.items():
            # 노드의 이름과 해당 노드에서 나온 출력을 출력합니다.
            pprint.pprint(f"Output from node '{key}':")
            pprint.pprint("---")
            # 출력 값을 예쁘게 출력합니다.
            pprint.pprint(value, indent=2, width=80, depth=None)
        # 각 출력 사이에 구분선을 추가합니다.
        pprint.pprint("\n---\n")
except GraphRecursionError as e:
    pprint.pprint(f"Recursion limit reached: {e}")

In [None]:
print(output["relevance_check"]["question"])
print(output["relevance_check"]["answer"])
print(output["relevance_check"]["relevance"])