In [1]:
from langchain.chat_models import ChatOpenAI # ChatGPT API를 사용하기 위한 채팅 모델
from langchain.document_loaders import UnstructuredFileLoader # 비정형 문서를 로드하기 위한 로더
from langchain.text_splitter import CharacterTextSplitter # 긴 문서를 작은 청크로 분할하기 위한 스플리터
from langchain.embeddings import OpenAIEmbeddings, CacheBackedEmbeddings # 텍스트를 벡터로 변환하기 위한 임베딩
from langchain.vectorstores import FAISS # 벡터 유사도 검색을 위한 벡터 데이터베이스
from langchain.storage import LocalFileStore # 임베딩 캐시를 로컬에 저장하기 위한 스토리지
from langchain.prompts import ChatPromptTemplate # 프롬프트 템플릿 생성을 위한 유틸리티
from langchain.schema.runnable import RunnablePassthrough, RunnableLambda # 체인 구성을 위한 유틸리티

# ChatGPT 모델 초기화 (temperature가 낮을수록 일관된 답변)
llm = ChatOpenAI(
    temperature=0.1,
)

# 임베딩 캐시를 저장할 로컬 디렉토리 설정
cache_dir = LocalFileStore("./.cache/")

# 문서 분할을 위한 스플리터 설정
# - tiktoken 토크나이저 사용
# - 각 청크는 최대 600토큰
# - 청크 간 100토큰 중복 허용 (문맥 유지를 위함)
splitter = CharacterTextSplitter.from_tiktoken_encoder(
    separator="\n",
    chunk_size=600,
    chunk_overlap=100,
)

# 문서 파일 로드
loader = UnstructuredFileLoader("./files/document.txt")

# 문서를 청크로 분할
docs = loader.load_and_split(text_splitter=splitter)

# OpenAI 임베딩 모델 초기화
embeddings = OpenAIEmbeddings()

# 임베딩 결과를 캐시하여 재사용할 수 있도록 설정
cached_embeddings = CacheBackedEmbeddings.from_bytes_store(embeddings, cache_dir)

# 분할된 문서들을 벡터화하여 FAISS 인덱스에 저장
vectorstore = FAISS.from_documents(docs, cached_embeddings)

# 검색을 위한 리트리버 생성
retriever = vectorstore.as_retriever()

print(retriever)

# 각 문서 청크를 처리하기 위한 프롬프트 템플릿 정의
map_doc_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """
            Use the following portion of a long document to see if any of the text is relevant to answer the question.
            Return any relevant text verbatim.
            If there is no relevant text, return : ''
            -------
            {context}
            """,
        ),
        ("human", "{question}"),
    ]
)

# 프롬프트와 LLM을 연결하여 각 문서 처리를 위한 체인 생성
map_doc_chain = map_doc_prompt | llm
 
# 검색된 문서들을 처리하는 함수
def map_docs(inputs):
    documents = inputs["documents"]  # 검색된 문서들
    question = inputs["question"]    # 사용자 질문
    # 각 문서에 대해 관련 내용을 추출하고 결합
    # return 값은 context 값으로 들어간다.
    result = "\n\n".join(
        map_doc_chain.invoke(
            {"context": doc.page_content, "question": question}
        ).content
        for doc in documents # 배열값을 개별 실행
    )
    print(result)
    return result

# 문서 검색과 매핑을 연결하는 체인 구성
# RunnableLambda : chain 내에서 함수를 호출하기 위한 유틸리티
map_chain = {
    "documents": retriever,  # 문서 검색
    "question": RunnablePassthrough(),  # 질문을 그대로 전달
} | RunnableLambda(map_docs)

# 최종 답변 생성을 위한 프롬프트 템플릿
final_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """
            Given the following extracted parts of a long document and a question, create a final answer. 
            If you don't know the answer, just say that you don't know. Don't try to make up an answer.
            ------
            {context}
            """,
        ),
        ("human", "{question}"),
    ]
)

# 전체 파이프라인 구성:
# 1. 문서 검색 및 관련 내용 추출 (map_chain) -> return 값은 context 값으로 들어간다.
# 2. 최종 프롬프트 생성
# 3. LLM을 통한 답변 생성
chain = {"context": map_chain, "question": RunnablePassthrough()} | final_prompt | llm

# 질문 실행
chain.invoke("How many ministries are mentioned")

# MapReduce 동작 원리
# 1. retriever : documents 들을 입력
# 2. llm을 통해 질문과 상관있는 documents 들을 검색
# 3. 생성된 응답들을 전부 합침.
# 4. 합쳐진 응답으로 부터 최종 결과 재검색



tags=['FAISS', 'CacheBackedEmbeddings'] vectorstore=<langchain.vectorstores.faiss.FAISS object at 0x16b28a9d0>
: The text mentions the Ministry of Love.

: Two ministries are mentioned in the text: the Ministry of Plenty and the Ministry of Truth.

: ''


: ''


AIMessage(content='Three ministries are mentioned in the text: the Ministry of Love, the Ministry of Plenty, and the Ministry of Truth.')