In [1]:
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))
sys.path.append(os.path.abspath(os.path.join(os.path.join(os.getcwd(), '..'), '..')))

from vectordb.vectorstore import VectorDB
from schemas.models import get_milvus_settings
from langchain_community.embeddings.openai import OpenAIEmbeddings

### retriver 객체 생성

In [2]:
vector_db = VectorDB(settings=get_milvus_settings(), embedding=OpenAIEmbeddings())
collection = vector_db._get_vectorstore("document_embeddings")
retriever = collection.as_retriever(search_kwargs={"k": 4})

  vector_db = VectorDB(settings=get_milvus_settings(), embedding=OpenAIEmbeddings())


[ 2024-10-16 10:23:48.899514 ] : connect_to_milvus()
[ 2024-10-16 10:23:49.028470 ] : get_vectorstore()


  vectorstore = Milvus(


### LLM 객체 생성

In [3]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)

In [4]:
def make_history_summary(conversation_summary:str, conversation_history:list):

    recent_history = "\n".join([f"{msg['role']}: {msg['content']}" for msg in conversation_history]) 

    # 조건에 따라 텍스트 구성
    history_summary = ""
    if conversation_summary:
        history_summary += f"과거 대화 내역 요약: {conversation_summary}\n\n"
    if recent_history:
        history_summary += f"최근 대화 내역: {recent_history}\n\n"

    return history_summary

conversation_summary = "당신의 이름은 신강식이라고 합니다. AI는 반갑다고 인사합니다."
conversation_history = []

history = make_history_summary(conversation_summary, conversation_history)
print(history)

과거 대화 내역 요약: 당신의 이름은 신강식이라고 합니다. AI는 반갑다고 인사합니다.




### 1. Router

In [5]:
from langchain_core.prompts import (
    ChatPromptTemplate,
)

from typing import List

from llms.tools.structured_model import RouteQuery

def router(llm:ChatOpenAI,  history:str, question:str)->RouteQuery:

    router_system_msg = """당신은 사용자 질문을 vectorstore, common, web으로 라우팅하는 전문가입니다.\nvectorstore에는 금융분야 마이데이터에 대한 기술 가이드 및 표준 API 규격 문서가 포함되어 있습니다.\n해당 주제에 대한 질문은 vectorstore를 사용합니다.\n일반적인 대화는 common을 사용합니다.\n반면 웹 검색이 필요로 하다고 판단되면 web을 사용합니다.\n사용자의 질문을 우선으로 평가하며 필요한 경우에 과거 대화 기록을 참조하세요."""

    router_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", router_system_msg),
            ("human", f"{history}\n\n질문: {question}"),
        ]
    )

    router_chain = router_prompt | llm.with_structured_output(RouteQuery)

    return router_chain.invoke({"history":history,"question": question})

In [6]:
router(llm, "반갑다", history)

RouteQuery(datasource='common')

## Common

### Common Generator

In [61]:
from langchain_core.output_parsers import StrOutputParser

def common(llm:ChatOpenAI, history:str, question:str)->str:

    common_system_msg = """당신은 질문에 대한 답변을 주는 챗봇입니다."""

    common_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", common_system_msg),
            ("human", "{history} \n\n 사용자의 질문: \n\n {question}"),
        ]
    )

    common_chain = common_prompt | llm | StrOutputParser()

    return common_chain.invoke({"history": history, "question": question})

In [62]:
common(llm, history, "내 이름이 뭐야?")

'당신의 이름은 신강식입니다.'

## RAG

### Retrieval Grader

In [7]:
from llms.tools.structured_model import GradeDocuments


def retrieval_grader(llm:ChatOpenAI, document:str, question:str)->GradeDocuments:

    retrieval_grader_system_msg = """당신은 검색된 문서와 사용자 질문의 관련성을 평가하는 채점자입니다.\n문서에 사용자 질문과 관련된 키워드 또는 의미론적 의미가 포함된 경우 관련성이 있는 것으로 등급을 매깁니다.\n엄격한 테스트일 필요는 없습니다. 목표는 잘못된 검색을 걸러내는 것입니다.\n'yes' 또는 'no'를 사용하여 문서와 질문의 관련성을 평가합니다."""

    retrieval_grader_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", retrieval_grader_system_msg),
            ("human", "검색된 문서: \n\n {document} \n\n 사용자 질문: {question}"),
        ]
    )

    retrieval_grader_chain = retrieval_grader_prompt | llm.with_structured_output(GradeDocuments)

    return retrieval_grader_chain.invoke({"document": document, "question": question})

### RAG Generator

In [88]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.documents import Document

def rag(llm:ChatOpenAI,  history:str, documents:List[Document], question:str) -> str:

    rag_system_msg = """당신은 주어진 질문에 대한 답변을 하는 전문가입니다.\n다음 검색된 문서를 사용하여 질문에 답하세요.\n답을 모르면 모른다고 말하세요.\n필요할 경우에만 과거 대화 기록을 참조하세요.\n답변은 최대한 상세하게 작성되어야 합니다.\n마지막에 참조한 문서의 이름과 페이지를 언급하세요."""

    rag_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", rag_system_msg),
            ("human", "{history} \n\n 검색된 문서: \n\n {documents} \n\n 사용자 질문: {question}"),
        ]
    )

    rag_chain = rag_prompt | llm | StrOutputParser()

    return rag_chain.invoke({"history": history, "documents": documents, "question": question})

### Web RAG Generator

In [98]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.documents import Document

def web_rag(llm:ChatOpenAI,  history:str, documents:List[Document], question:str) -> str:

    web_rag_system_msg = """당신은 주어진 질문에 대한 답변을 하는 전문가입니다.\n제시된 문서는 웹 검색을 통해 얻은 문서입니다.\n이를 사용하여 질문에 답하세요.\n답변은 최대한 상세하게 작성되어야 합니다.\n답을 모르면 모른다고 말하세요.\n필요할 경우에만 과거 대화 기록을 참조하세요.\n마지막에 참조한 웹 사이트의 정보를 언급하고 링크를 제시하세요."""

    web_rag_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", web_rag_system_msg),
            ("human", "{history} \n\n 검색된 문서: \n\n {documents} \n\n 사용자 질문: {question}"),
        ]
    )

    rag_chain = web_rag_prompt | llm | StrOutputParser()

    return rag_chain.invoke({"history": history, "documents": documents, "question": question})

## Hallucination

### Hallucination Grader

In [179]:
from llms.tools.structured_model import GradeHallucinations


def hallucination_grader(llm:ChatOpenAI, history:str, documents:List[Document], generation:str, hallucinate_cnt:int)->GradeHallucinations:

    if hallucinate_cnt > 3:
        return GradeHallucinations(multiple_score="infinite")
    
    hallucination_system_msg = """검색된 문서를 사실이라고 보고, 사실에 근거하여 답변을 생성하고있는지 여부를 평가하는 등급입니다.\n'yes', 'no'를 사용하여 평가합니다.\n'yes'는 답이 사실에 근거하는 것을 의미합니다.\n'no'는 답변이 문서에 기반하지 않음을 의미합니다.\n 단, 너무 엄격할 필요는 없기에 문서와 답변의 의미가 일치하면 'yes'로 평가합니다.\n필요할 경우에만 과거 대화 이력을 참조하세요."""

    hallucination_grader_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", hallucination_system_msg),
            ("human", "{history}\n\n 검색된 문서(사실): \n\n {documents} \n\n LLM의 답변: {generation}"),
        ]
    )

    hallucination_grader_chain = hallucination_grader_prompt | llm.with_structured_output(GradeHallucinations)

    return hallucination_grader_chain.invoke({"hallucinate_cnt": hallucinate_cnt,"history": history
                                        , "documents": documents, "generation": generation })

### Answer Grader

In [10]:
from llms.tools.structured_model import GradeAnswer


def answer_grader(llm:ChatOpenAI, history:str, question:str, generation:str)->GradeAnswer:

    answer_grader_system_msg = """당신은 답변이 질문을 해결하는지 여부를 평가하는 채점자입니다.\n'yes' 또는 'no'를 사용하여 평가하며, 'yes'는 답이 문제를 해결한다는 의미입니다.\n필요할 경우에만 과거 대화 이력을 참조하세요."""

    answer_grader_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", answer_grader_system_msg),
            ("human", "{history} \n\n 사용자의 질문: \n\n {question} \n\n LLM의 답변: {generation}"),
        ]
    )

    answer_grader_chain = answer_grader_prompt | llm.with_structured_output(GradeAnswer)

    return answer_grader_chain.invoke({"history": history, "question": question, "generation": generation})

## Transform

### Transform Query

In [59]:
from langchain_core.output_parsers import StrOutputParser

def re_write(llm:ChatOpenAI, history:str, question:str)->str:

    re_write_system_msg = """당신은 입력 질문을 금융 마이데이터 관점에서 최적화하여 변환하는 질문 재작성자입니다.\n사용자의 질문의 의도/의미를 추론해 보세요.\n최종 출력은 원래 질문과 동일한 의미를 가지면서 금융 마이데이터의 관점에서 더 나은 질문을 생성해야 합니다.\n필요할 경우에만 과거 대화 이력을 참조하세요."""

    re_write_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", re_write_system_msg),
            ("human", "{history} \n\n 사용자의 초기 질문: \n\n {question}\n개선된 질문:"),
        ]
    )

    re_write_chain = re_write_prompt | llm | StrOutputParser()

    return re_write_chain.invoke({"history": history, "question": question})

In [60]:
re_write(llm, history, "토큰 만료시 어떻게해")

'토큰 만료 시에는 어떤 조치를 취해야 하나요?'

### Web Transform Query

In [57]:
from langchain_core.output_parsers import StrOutputParser

def re_write_for_web(llm:ChatOpenAI, history:str, question:str)->str:

    re_write_for_web_system_msg = """당신은 입력 질문을 금융 마이데이터에 대한 내용을 포함하고, 웹 검색에 맞게 최적화하여 변환하는 질문 재작성자입니다.\n금융 마이데이터에 대한 내용을 더해 질문을 개선하세요.\n웹 검색을 위한 질문은 반드시 명사구로만 작성되어야 합니다.\n아래 예시를 참조하세요.\n[예시]\n1. 사용자의 초기 질문: 포켓몬고 어떻게 설치해? -> 개선된 질문: 포켓몬고 설치 방법\n2. 사용자의 초기 질문: 흑백요리사에서 가장 인기있는 요리는 무엇인가요?-> 개선된 질문: 흑백 요리사 인기 요리\n\n필요할 경우에만 과거 대화 이력을 참조하세요."""

    re_write_for_web_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", re_write_for_web_system_msg),
            ("human", "{history} \n\n 사용자의 초기 질문: \n\n {question}\n개선된 질문:"),
        ]
    )

    re_write_for_web_chain = re_write_for_web_prompt | llm | StrOutputParser()

    return re_write_for_web_chain.invoke({"history": history, "question": question})

In [58]:
re_write_for_web(llm, history, "API 스펙 중 aNS는 어떤 것을 뜻해?")

'금융 마이데이터에서 API 스펙 중 aNS는 무엇을 의미하나요?'

### Web Search

In [115]:
from langchain_community.tools.tavily_search import TavilySearchResults

# tavily_api_key를 환경 변수에서 가져오기
web_search_tool = TavilySearchResults(k=3)

### Define Graph State

In [195]:
from typing import List

from typing_extensions import TypedDict


class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        documents: list of documents
    """

    question: str
    history: str
    generation: str
    documents: List[str]
    hallucinate_cnt: int

In [202]:
state = GraphState()
state["history"] = history
state["question"] = "x-api-tran-id에 대해 알려주세요."

# Define Graph Flow

## Retrieve

In [203]:
def retrieve(state):
    """
    Retrieve documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("****retrieve****")
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = retriever.invoke(question)
    return {"documents": documents, "question": question}

In [204]:
result = retrieve(state)
print(result)

****retrieve****
---RETRIEVE---
{'documents': [Document(metadata={'doc_name': 'api.pdf', 'page_number': 136, 'creation_time': 1729026884.7069488, 'modification_time': 1728916150.4709036, 'key': 'api.pdf_136_156500d074fa1950fa8e524eaaf46de8cda04f5cf83425300699796ec7991c48', 'pk': 453224949084362800}, page_content='x-api-tran-id\n거래고유번호\nY\nAN (25)\n거래고유번호 (첨부14 참조)\nx-api-type\nAPI 유형\nY\naNS (12)\n정기적/비정기적 전송 API 유형 (2.1-\uf000 및 \n3.3 참조)\nBody\norg_code\n기관코드\nY\naN (10)\n정보제공자 기관코드\n• 지원 API로부터 배포\naccount_num\n계좌번호\nY\naN (20)'), Document(metadata={'doc_name': 'api.pdf', 'page_number': 251, 'creation_time': 1729026884.7069488, 'modification_time': 1728916150.4709036, 'key': 'api.pdf_251_156500d074fa1950fa8e524eaaf46de8cda04f5cf83425300699796ec7991c48', 'pk': 453224949084363506}, page_content='x-api-tran-id\n거래고유번호\nY\nAN (25)\n거래고유번호 (첨부14 참조)\nx-api-type\nAPI 유형\nY\naNS (12)\n정기적/비정기적 전송 API 유형 (2.1-\uf000 및 \n3.3 참조)\nBody\norg_code\n기관코드\nY\naN (10)\n정보제공자 기관코드\n• 지원 API로부터 배포\n

In [83]:
state.update({"documents": result['documents']})

## Generate

### Common Generate

In [84]:
def generate(state):
    """
    Generate answer using gpt knowledge

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE(COMMON)---")
    question = state["question"]
    history = state["history"]

    generation = common(llm, history, question)
    # RAG generation
    return {"question": question, "history": history, "generation": generation}

In [94]:
result =generate(state)
print(result)

---GENERATE(COMMON)---
{'question': '토큰 발행이 만료되면 어떻게해?', 'history': '과거 대화 내역 요약: 당신의 이름은 신강식이라고 합니다. AI는 반갑다고 인사합니다.\n\n', 'generation': '토큰 발행이 만료되면 보통 새로운 토큰을 발급받아야 합니다. 만료된 토큰을 사용하면 인증이 실패할 수 있으므로 새로운 토큰을 발급받아 시스템에 다시 제공해야 합니다.'}


### RAG Generate

In [91]:
def generate_rag(state):
    """
    Generate answer using vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE RAG(VECTORSTORE)---")
    question = state["question"]
    documents = state["documents"]
    history = state["history"]
    hallucinate_cnt = state["hallucinate_cnt"]

    generation = rag(llm, history, documents, question)
    # RAG generation
    return {"documents": documents, "question": question, "history": history, "generation": generation, "hallucinate_cnt": hallucinate_cnt+1}

In [93]:
result = generate_rag(state)
print(result)

---GENERATE RAG(VECTORSTORE)---
{'documents': [Document(metadata={'doc_name': 'guide.pdf', 'page_number': 134, 'creation_time': 1729026894.7537396, 'modification_time': 1728916150.4873421, 'key': 'guide.pdf_134_93501f35de66dc071a7b0d836c7118791795f0ab69d67cfe836dcb3487f0396c', 'pk': 453224949084361958}, page_content='1-5. 지원API 제공용 접근토큰의 유효기간을 1년 이내로 설정하여 발급\n1-6. 접근토큰 발급·재발급 시 전송요구 동의기간을 참고하여 유효기간을 알맞게 \n설정하여 발급\n2.발급관리\n2-1. 실제로 전송을 요구한 정보주체에 해당하는 접근토큰과 리프레시토큰을 발급\n2-2. 정보주체별로 중복된 접근토큰과 리프레시토큰이 발급되지 않도록 발급시 \n확인・관리'), Document(metadata={'doc_name': 'guide.pdf', 'page_number': 134, 'creation_time': 1729026894.7537396, 'modification_time': 1728916150.4873421, 'key': 'guide.pdf_134_c37f12bf60d3e4890beab49c1ded8c16ca6cd666807225459749cdc2815979a8', 'pk': 453224949084361960}, page_content='토큰을 발급·관리\n3.갱신 및 \n폐기관리\n3-1. 전송요구 유효기간 동안 리프레시토큰을 무단 변경하지 않음 (갱신·삭제 등)\n3-2. 전송요구 변경에 따라 새로운 접근토큰과 리프레시토큰 발급 시 기존 접근\n토큰과 리프레시토큰을 즉시 폐기\n3-3. 전송요구 철회시 접근토큰과 리프레시토큰을 즉시 폐기\n4. 정보제공'), Document(metadata

### Web RAG Generate

In [101]:
def generate_web_rag(state):
    """
    Generate answer using web search results

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE RAG(WEB SEARCH)---")
    question = state["question"]
    documents = state["documents"]
    history = state["history"]

    generation = web_rag(llm, history, documents, question)
    # RAG generation
    return {"documents": documents, "question": question, "history": history, "generation": generation}

In [132]:
result = generate_web_rag(state)
print(result)

---GENERATE RAG(WEB SEARCH)---
{'documents': [Document(metadata={'doc_name': 'guide.pdf', 'page_number': 134, 'creation_time': 1729026894.7537396, 'modification_time': 1728916150.4873421, 'key': 'guide.pdf_134_93501f35de66dc071a7b0d836c7118791795f0ab69d67cfe836dcb3487f0396c', 'pk': 453224949084361958}, page_content='1-5. 지원API 제공용 접근토큰의 유효기간을 1년 이내로 설정하여 발급\n1-6. 접근토큰 발급·재발급 시 전송요구 동의기간을 참고하여 유효기간을 알맞게 \n설정하여 발급\n2.발급관리\n2-1. 실제로 전송을 요구한 정보주체에 해당하는 접근토큰과 리프레시토큰을 발급\n2-2. 정보주체별로 중복된 접근토큰과 리프레시토큰이 발급되지 않도록 발급시 \n확인・관리'), Document(metadata={'doc_name': 'guide.pdf', 'page_number': 134, 'creation_time': 1729026894.7537396, 'modification_time': 1728916150.4873421, 'key': 'guide.pdf_134_c37f12bf60d3e4890beab49c1ded8c16ca6cd666807225459749cdc2815979a8', 'pk': 453224949084361960}, page_content='토큰을 발급·관리\n3.갱신 및 \n폐기관리\n3-1. 전송요구 유효기간 동안 리프레시토큰을 무단 변경하지 않음 (갱신·삭제 등)\n3-2. 전송요구 변경에 따라 새로운 접근토큰과 리프레시토큰 발급 시 기존 접근\n토큰과 리프레시토큰을 즉시 폐기\n3-3. 전송요구 철회시 접근토큰과 리프레시토큰을 즉시 폐기\n4. 정보제공'), Document(metadata=

## Grade

### Grade Documents

In [198]:
def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with only filtered relevant documents
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    for d in documents:
        score = retrieval_grader(llm, d.page_content,  question)
        grade = score.binary_score
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            continue
    return {"documents": filtered_docs, "question": question, "hallucinate_cnt": 0}

In [104]:
grade_documents(state)

---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---


{'documents': [Document(metadata={'doc_name': 'guide.pdf', 'page_number': 134, 'creation_time': 1729026894.7537396, 'modification_time': 1728916150.4873421, 'key': 'guide.pdf_134_93501f35de66dc071a7b0d836c7118791795f0ab69d67cfe836dcb3487f0396c', 'pk': 453224949084361958}, page_content='1-5. 지원API 제공용 접근토큰의 유효기간을 1년 이내로 설정하여 발급\n1-6. 접근토큰 발급·재발급 시 전송요구 동의기간을 참고하여 유효기간을 알맞게 \n설정하여 발급\n2.발급관리\n2-1. 실제로 전송을 요구한 정보주체에 해당하는 접근토큰과 리프레시토큰을 발급\n2-2. 정보주체별로 중복된 접근토큰과 리프레시토큰이 발급되지 않도록 발급시 \n확인・관리'),
  Document(metadata={'doc_name': 'guide.pdf', 'page_number': 134, 'creation_time': 1729026894.7537396, 'modification_time': 1728916150.4873421, 'key': 'guide.pdf_134_c37f12bf60d3e4890beab49c1ded8c16ca6cd666807225459749cdc2815979a8', 'pk': 453224949084361960}, page_content='토큰을 발급·관리\n3.갱신 및 \n폐기관리\n3-1. 전송요구 유효기간 동안 리프레시토큰을 무단 변경하지 않음 (갱신·삭제 등)\n3-2. 전송요구 변경에 따라 새로운 접근토큰과 리프레시토큰 발급 시 기존 접근\n토큰과 리프레시토큰을 즉시 폐기\n3-3. 전송요구 철회시 접근토큰과 리프레시토큰을 즉시 폐기\n4. 정보제공'),
  Document(metadata={'doc_name': 'api.pdf', 'pa

## Transform

### Transform Query

In [105]:
def transform_query(state):
    """
    Transform the query to produce a better question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates question key with a re-phrased question
    """

    print("---TRANSFORM QUERY---")
    question = state["question"]
    documents = state["documents"]
    history = state["history"]

    # Re-write question
    better_question = re_write(llm, history, question)
    return {"documents": documents, "question": better_question, "history": history}

In [108]:
result = transform_query(state)
print(result)

---TRANSFORM QUERY---
{'documents': [Document(metadata={'doc_name': 'guide.pdf', 'page_number': 134, 'creation_time': 1729026894.7537396, 'modification_time': 1728916150.4873421, 'key': 'guide.pdf_134_93501f35de66dc071a7b0d836c7118791795f0ab69d67cfe836dcb3487f0396c', 'pk': 453224949084361958}, page_content='1-5. 지원API 제공용 접근토큰의 유효기간을 1년 이내로 설정하여 발급\n1-6. 접근토큰 발급·재발급 시 전송요구 동의기간을 참고하여 유효기간을 알맞게 \n설정하여 발급\n2.발급관리\n2-1. 실제로 전송을 요구한 정보주체에 해당하는 접근토큰과 리프레시토큰을 발급\n2-2. 정보주체별로 중복된 접근토큰과 리프레시토큰이 발급되지 않도록 발급시 \n확인・관리'), Document(metadata={'doc_name': 'guide.pdf', 'page_number': 134, 'creation_time': 1729026894.7537396, 'modification_time': 1728916150.4873421, 'key': 'guide.pdf_134_c37f12bf60d3e4890beab49c1ded8c16ca6cd666807225459749cdc2815979a8', 'pk': 453224949084361960}, page_content='토큰을 발급·관리\n3.갱신 및 \n폐기관리\n3-1. 전송요구 유효기간 동안 리프레시토큰을 무단 변경하지 않음 (갱신·삭제 등)\n3-2. 전송요구 변경에 따라 새로운 접근토큰과 리프레시토큰 발급 시 기존 접근\n토큰과 리프레시토큰을 즉시 폐기\n3-3. 전송요구 철회시 접근토큰과 리프레시토큰을 즉시 폐기\n4. 정보제공'), Document(metadata={'doc_nam

### Web Transform Query

In [110]:
def web_transform_query(state):
    """
    Transform the query to produce a better question for web search.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates question key with a re-phrased question
    """

    print("---TRANSFORM QUERY---")
    question = state["question"]
    history = state["history"]

    # Re-write question
    better_question = re_write_for_web(llm, history, question)
    return {"question": better_question, "history": history}

In [111]:
result = web_transform_query(state)
print(result)

---TRANSFORM QUERY---
{'question': '토큰 발행 만료 시 조치 방법', 'history': '과거 대화 내역 요약: 당신의 이름은 신강식이라고 합니다. AI는 반갑다고 인사합니다.\n\n'}


## Web

### Web Search

In [124]:
from langchain_core.documents import Document
from langchain_community.tools.tavily_search import TavilySearchResults


def web_search(state):
    """
    Web search based on the re-phrased question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with appended web results
    """

    print("---WEB SEARCH---")
    question = state["question"]

    # Web search
    web_search_tool = TavilySearchResults(max_results=3)
    docs = web_search_tool.invoke(question)
    web_results = []
    for d in docs:
        web_results.append(Document(page_content=d["content"], metadata={"url":d["url"]}))
    
    return {"documents": web_results, "question": question}

In [125]:
result = web_search(state)
print(result)

---WEB SEARCH---
{'documents': [Document(metadata={'url': 'https://academy.adriel.com/ko/토큰-만료는-왜-되나요'}, page_content='[데이터 연결] 토큰 만료는 왜 되나요? 토큰은 여러 이유로 만료가 될 수 있습니다. 토큰이 만료되면 데이터 소스를 재연결해야 합니다. <토큰 만료 원인> - 계정의 비밀 번호 등이 바뀐 경우 - 제공되는 api의 보안 상, 주기적으로 만료되는 경우'), Document(metadata={'url': 'https://www.toptut.com/ko/how-to-solve-oauth-token-expiration-issues-like-a-pro/'}, page_content='개발자, 기술 애호가 또는 단순히 토큰 만료 처리 방법에 대해 궁금한 점이 있는 분이라면 이 가이드를 통해 이 문제를 정면으로 해결할 수 있는 지식과 자신감을 얻으실 수 있습니다. 이 섹션에서는 새로 고침 토큰의 역할, 만료 시간 및 액세스 토큰 유효성에 대해 살펴보겠습니다. 액세스 토큰이 만료되려고 하면 애플리케이션은 새로 고침 토큰을 사용하여 사용자가 다시 로그인하지 않고도 새로운 액세스 토큰을 얻을 수 있습니다. 이 엔드포인트를 사용하면 이전에 받은 새로 고침 토큰을 사용하여 새 액세스 토큰을 얻을 수 있습니다. OAuth 토큰 만료 문제를 처리하는 것은 복잡할 수 있지만 만료 기호를 인식하고, 토큰 엔드포인트에 요청하고, 새로 고침 토큰을 활용하고, 새 액세스 토큰을 얻으면 이러한 문제를 원활하게 해결할 수 있습니다. 새로 고침 토큰 순환 및 클라이언트 암호 사용 외에도 일반적인 보안 모범 사례를 따르는 것이 중요합니다.'), Document(metadata={'url': 'https://m.blog.naver.com/songpha69/223007170550'}, page_content='토큰증권(ST Security Token) 분산원장 기술을 기반으로 디지털화한 증권. 

# Edge

### Route Question (web search or RAG or Common)

In [126]:
def route_question(state):
    """
    Route question to web search or RAG.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """
    print("****route_question****")
    print("1. ---ROUTE QUESTION---")
    question = state["question"]
    history = state["history"]
    source = router(llm, history, question)
    if source.datasource == "common":
        print("2. ---ROUTE QUESTION TO COMMON CHAT---")
        return "common"
    elif source.datasource == "vectorstore":
        print("2. ---ROUTE QUESTION TO RAG---")
        return "vectorstore"

In [127]:
route_question(state)

****route_question****
1. ---ROUTE QUESTION---
2. ---ROUTE QUESTION TO RAG---


'vectorstore'

### Decide to Generate

In [188]:
def decide_to_generate(state):
    """
    Determines whether to generate an answer, or re-generate a question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """
    print("****decide_to_generate****")
    print("1. ---ASSESS GRADED DOCUMENTS---")
    filtered_documents = state["documents"]

    if not filtered_documents:
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print(
            "2. ---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---"
        )
        return "transform_query"
    else:
        # We have relevant documents, so generate answer
        print("2. ---DECISION: GENERATE---")
        return "generate_rag"

In [189]:
decide_to_generate(state)

****decide_to_generate****
1. ---ASSESS GRADED DOCUMENTS---
2. ---DECISION: GENERATE---


'generate_rag'

### Check Hallucination

In [180]:
def check_hallucination(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """
    print("****check_hallucination****")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]
    history = state["history"]
    hallucinate_cnt = state["hallucinate_cnt"]

    score = hallucination_grader(llm, history, documents, generation, hallucinate_cnt)
    print(score)
    grade = score.multiple_score

    if grade == "yes":
        ## 끝
        return "useful"
    elif grade == "no":
        ## 재생성
        return "not useful"
    else:
        ## 무한루프
        return "infinite"

In [182]:
state.update({"generation": result['generation']})
state.update({"hallucinate_cnt": 4})
result_hallucination = check_hallucination(state)
print(result_hallucination)

****check_hallucination****
multiple_score='infinite'
infinite


### Compile Graph

In [199]:
from langgraph.graph import END, StateGraph, START

workflow = StateGraph(GraphState)

# Define the nodes

## Retrieve
workflow.add_node("web_search", web_search)  # web search
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents

## Generate
workflow.add_node("generate_rag", generate_rag)  # generatae
workflow.add_node("generate", generate)
workflow.add_node("generate_web_rag", generate_web_rag)  # generate_web_rag

## Develop
workflow.add_node("transform_query", transform_query)  # transform_query
workflow.add_node("web_transform_query", web_transform_query)  # web_transform_query

# Build graph
## Route
workflow.add_conditional_edges(
    START,
    route_question,
    {
        "common": "generate",
        "web": "web_search",
        "vectorstore": "retrieve",
    },
)

### Common
workflow.add_edge("generate", END)

### Self-reflective RAG
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "transform_query": "transform_query",
        "generate_rag": "generate_rag",
    },
)
workflow.add_edge("transform_query", "retrieve")

## Final Step
workflow.add_conditional_edges(
    "generate_rag",
    check_hallucination,
    {
        "useful": END,
        "not useful": "generate_rag",
        "infinite": "web_transform_query",
    },
)

### Web Search
workflow.add_edge("web_transform_query", "web_search")
workflow.add_edge("web_search", "generate_web_rag")
workflow.add_edge("generate_web_rag", END)

# Compile
app = workflow.compile()

### Test

In [201]:
from pprint import pprint


# Q: 토큰이 중복 발급되었을 경우 어떻게 되나요?

# Q: 정보 전송 요구 연장은 언제 가능한가요?

# Q: x-api-tran-id에 대해 알려주세요.

# Q: API 스펙 중 aNS는 어떤 것을 뜻하나요?

# Run
inputs = {
    "question": "x-api-tran-id에 대해 알려주세요.",
    "history": history,
}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint("\n---\n")

# Final generation
pprint(value["generation"])

****route_question****
1. ---ROUTE QUESTION---
2. ---ROUTE QUESTION TO RAG---
****retrieve****
---RETRIEVE---
"Node 'retrieve':"
'\n---\n'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
****decide_to_generate****
1. ---ASSESS GRADED DOCUMENTS---
2. ---DECISION: GENERATE---
"Node 'grade_documents':"
'\n---\n'
---GENERATE RAG(VECTORSTORE)---
****check_hallucination****
multiple_score='no'
"Node 'generate_rag':"
'\n---\n'
---GENERATE RAG(VECTORSTORE)---
****check_hallucination****
multiple_score='no'
"Node 'generate_rag':"
'\n---\n'
---GENERATE RAG(VECTORSTORE)---
****check_hallucination****
multiple_score='no'
"Node 'generate_rag':"
'\n---\n'
---GENERATE RAG(VECTORSTORE)---
****check_hallucination****
multiple_score='infinite'
"Node 'generate_rag':"
'\n---\n'
---TRANSFORM QUERY---
"Node 'web_transform_query':"
'\n---\n'
---WEB SEARCH---
"Node 'web_search':"
'\n---\n'
--

In [206]:
type(app)

langgraph.graph.state.CompiledStateGraph

In [None]:
def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """
    print("****grade_generation_v_documents_and_question****")
    print("1. ---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]
    history = state["history"]

    score = hallucination_grader(llm, history, documents, generation)
    grade = score.binary_score

    # Check hallucination
    if grade == "yes":
        print("2. ---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("3. ---GRADE GENERATION vs QUESTION---")
        score = answer_grader(llm, history, question, generation)
        grade = score.binary_score
        if grade == "yes":
            print("4. ---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("4. ---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        print("2. ---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"