## LangChain + RAG Fusion + GPT-4o Python Project: Easy AI/Chat for your Docs
* https://medium.com/gitconnected/langchain-rag-fusion-gpt-4o-python-project-easy-ai-chat-for-your-docs-1b802f889318

![](./img/rag-fusion.jpg)

In [50]:
from operator import itemgetter

from langchain.load import dumps, loads
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.vectorstores import Chroma
from langchain_core.documents.base import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langchain_core.retrievers import BaseRetriever
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_text_splitters import TokenTextSplitter
from langchain.storage import LocalFileStore
from langchain.embeddings import CacheBackedEmbeddings

In [63]:
LLM_MODEL_OPENAI = "gpt-4o"
EMBEDDING_MODEL = "text-embedding-3-small"

TOP_K = 5
MAX_DOCS_FOR_CONTEXT = 8
DOCUMENT_PDF = "./data/gs.pdf"

from dotenv import load_dotenv
load_dotenv()

True

In [53]:
def load_and_split_document(pdf: str) -> list[Document]:
    raw_documents = PyPDFLoader(pdf).load()
    text_splitter = TokenTextSplitter(chunk_size=2048, chunk_overlap=24)
    documents = text_splitter.split_documents(raw_documents)
    print("Original document: ", len(documents), " docs")
    return documents

In [54]:
def create_retriever(search_type: str, kwargs: dict) -> BaseRetriever:
    documents = load_and_split_document(DOCUMENT_PDF)

    embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL)
    store = LocalFileStore("./data/cache/")
    cached_embedder = CacheBackedEmbeddings.from_bytes_store(
        embeddings,
        store,
        namespace=embeddings.model
    )
    vectordb = Chroma.from_documents(documents, cached_embedder)
    retriever = vectordb.as_retriever(
        search_type=search_type,
        search_kwargs=kwargs,
    )
    return retriever

다음으로, 우리는 각 쿼리에 대해 유사한 문서를 찾기 위해 벡터 검색을 수행하고, 그런 다음 이러한 문서들을 재정렬합니다. 재정렬을 위해 우리는 유사도 순위에 의존하는 Reciprocal Rank Fusion (RRF)을 사용합니다.

Reciprocal Rank Fusion에서 문서 d의 점수 𝑅𝑅𝐹(𝑑)은 문서 𝑑의 순위인 𝑟𝑎𝑛𝑘 𝑖(𝑑)와 하이퍼파라미터 k를 기반으로 계산됩니다.

하이퍼파라미터 k는 Reciprocal Rank Fusion (RRF) 점수에 영향을 미칩니다. k 값이 높을수록 점수 곡선이 평평해져 낮은 순위의 문서들이 더 큰 영향을 받게 됩니다. 일반적으로 k=60이 자주 사용되며, 나는 현재 분석에서 이 값을 채택하고 있습니다.

Reciprocal Rank Fusion을 계산하는 함수는 reciprocal_rank_fusion이라고 합니다. 테스트 목적으로 RRF 점수를 표시하지만, 문서 조각의 내용만이 LLM에게 맥락으로 전달되어야 하기 때문에 반환하는 것은 문서 조각 목록뿐입니다.

네 개의 유사한 쿼리 각각이 다섯 개의 조각을 검색하기 때문에, 총 조각 수는 최대 20개에 이를 수 있습니다. 그러나 이는 과도한 맥락을 생성하므로 상위 순위의 조각들만 사용됩니다. 맥락으로 전달되는 문서의 최대 수, MAX_DOCS_FOR_CONTEXT는 8로 설정되어 있습니다.

In [55]:
def reciprocal_rank_fusion(results: list[list], k=60):
    fused_scores = {}
    for docs in results:
        for rank, doc in enumerate(docs):
            doc_str = dumps(doc)
            if doc_str not in fused_scores:
                fused_scores[doc_str] = 0
            fused_scores[doc_str] += 1 / (rank + k )
    
    reranked_results = [
        (loads(doc), score)
        for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
    ]

    print("Reranked documents: ", len(reranked_results))
    for doc in reranked_results:
        print('---')
        print('Docs: ', ' '.join(doc[0].page_content[:100].split()))
        print('RRF score: ', doc[1])

    return [x[0] for x in reranked_results[:MAX_DOCS_FOR_CONTEXT]]

다음 함수인 query_generator는 유사한 쿼리를 생성합니다. 이 함수는 원래의 쿼리인 original_query를 프롬프트에 삽입하고 LLM(언어 모델)에게 유사한 쿼리를 생성하도록 요청함으로써 작동합니다. 원래 코드에서는 단순히 'Generate multiple search queries related to: {original_query}'라는 프롬프트만 사용했습니다. 이 접근 방식은 비교적 광범위한 쿼리를 생성했습니다.

결과를 좁히기 위해, 나는 프롬프트에 다음과 같은 지시문을 추가하도록 업데이트했습니다: 'When creating queries, please refine or add closely related contextual information, without significantly altering the original query's meaning.'

In [56]:
def query_generator(original_query: dict) -> list[str]:
    query = original_query.get("query")

    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a helpful assistant that generates multiple search queries based on a single input query."),
        ("user", "Generate mnultiple search queries related to: {original_query}. When creating queries, please refine or add closely related contextual information in Korean, without significantly altering the original query's meaning"),
        ("user", "OUTPUT (3 queries):"),
    ])

    model = ChatOpenAI(temperature=0, model=LLM_MODEL_OPENAI)

    query_generator_chain = (
        prompt | model | StrOutputParser() | (lambda x: x.split("\n"))
    )

    queries = query_generator_chain.invoke({"original_query": query})

    queries.insert(0, "0. " + query)

    print("Generated queries:\n", "\n".join(queries))

    return queries


벡터 검색은 다음과 같이 수행됩니다:

1. RunnableLambda(query_generator)는 앞서 설명한 대로 유사한 쿼리를 생성합니다.
2. retriever.map()을 사용하여 query_generator가 생성한 네 개의 유사한 쿼리와 원래 쿼리를 포함하여 각 쿼리에 대해 벡터 검색을 수행합니다. map() 함수는 각 쿼리에 대해 다섯 개의 조각을 검색합니다.
3. 마지막으로, reciprocal_rank_fusion을 적용하여 결과를 재정렬합니다. 이는 다음 섹션에서 자세히 다룰 예정입니다.

In [57]:
def rrf_retriever(query: str) -> list[Document]:
    retriever = create_retriever(search_type="similarity", kwargs={"k": TOP_K})

    chain = (
        {"query": itemgetter("query")}
        | RunnableLambda(query_generator)
        | retriever.map()
        | reciprocal_rank_fusion
    )

    result = chain.invoke({"query": query})
    return result

In [58]:
def query(query: str):
    model = ChatOpenAI(
        temperature=0,
        model=LLM_MODEL_OPENAI
    )
    prompt = PromptTemplate(
        template="""Please answer the [question] using only the following [information] in Korean. If there is no [information] available to answer the question, do not force an answer.

            Information: {context}

            Question: {question}
            Final answer:""",
        input_variables=["context", "question"]
    )
    chain = (
        {
            "context": itemgetter("question") | RunnableLambda(rrf_retriever),
            "question": itemgetter("question")
        }
        | RunnablePassthrough.assign(
            context=itemgetter("context")
        )
        | {
            "response": prompt | model | StrOutputParser(),
            "context": itemgetter("context"),
        }
    )

    result = chain.invoke({"question": query})

    return result

In [62]:
result = query("개인퇴직연금")
print('---\nAnswer:' + result['response'])

Original document:  11  docs
Generated queries:
 0. 개인퇴직연금
1. 개인퇴직연금 가입 방법 및 절차
2. 개인퇴직연금 세제 혜택과 절세 방법
3. 개인퇴직연금 운용 전략 및 추천 상품
Reranked documents:  3
---
Docs:  10/118. 상해보험 1)가입대상 : 전 직원 (임원상해보험 별도 가입) 2) 가입내용 : 全 직원의 불의의 사고 및 질병에 대비하기 위해 회사에서 일괄적으로
RRF score:  0.16137431484139575
---
Docs:  10. 주택 자금 대출 제도 1)대출 내용 : 본인의 거주 목적으로 주택 구입 혹은 임차 계약 시 대출 지원 (무주택자에 한함) 2)대출 한도 : 5천만원 3)대출 금리 :
RRF score:  0.08068715742069787
---
Docs:  구 분경조금 (월기본급 기준)휴가화환장례용품 장례인력 결혼본인 100% 5일○ 자녀 100% 2일○ 형제·자매(본인,배우자 ) 50% 1일 회갑본인/배우자 100% 1일 부모(본
RRF score:  0.08068715742069787
---
Answer:개인퇴직연금에 대한 정보는 다음과 같습니다:

1) 지원 내용 : 임직원의 노후 생활 안정을 위하여 개인퇴직연금 지원
2) 지원 금액 : 임직원이 적립하는 금액과 동일하게 회사 지원분 적립함
단, 최대 月 25만원 한도로 지원함
