# RAG pipeline 구축하기 3


Rag pipeline을 만들고 각 함수를 손쉽게 사용해보자

이번 포스팅에는 Retrieval를 통해 VectorDB의 정보를 잘 검색하여 가져오고 

그 내용으로 LLM이 질문에 대한 답변을 생성하는 부분을 추가해본다.


## 0. Setting 

.env 파일을 만들어 API키들을 넣어준다. 

나중에 Ollema를 사용해서 API 없이 local에서 작동가능한 LLM을 사용해 보자.



```bash
OPENAI_API_KEY='sk-proj-5m5haMMQ0Sgkctb7Udixxxxx'


```

In [1]:
# API KEY를 환경변수로 관리하기 위한 설정 파일
from dotenv import load_dotenv
# API KEY 정보로드
load_dotenv()

True

## 1. RAG pipeline - Chat 

In [2]:
# langchain_openai에서 ChatOpenAI(LLM)과 OpenAIEmbeddings(임베딩모델: text를 vector화하는 모델)을 load
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

# PDF파일등 데이터를 Chroma형식의 vectorDB에 저장하고 리트리버가 수집한 데이터에 접근하기 위해 Chroma를 load
from langchain_community.vectorstores import Chroma

# 우리가 만든 config.py에서 모델등 옵션들을 수정
from utils.config import config, metadata_field_info

# 추가 데이터를 업로드하기 위해 file을 다큐먼트로 만들고 (convert_file_to_documents) 다큐먼트를 자르는 (split_document) 함수를 load합니다.
from utils.update import convert_file_to_documents

from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain.retrievers.self_query.base import SelfQueryRetriever
# Ensemble retriever
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
import pickle

## 먼저 아래는 이전 게시물에서 Retriever만 있는 RAGPipeline이다.

In [3]:
class Ragpipeline:
    def __init__(self):
        # chatGPT API를 통해 llm 모델 로드
        self.llm = ChatOpenAI(
            model=config["llm_predictor"]["model_name"],  # chatgpt 모델 이름
            temperature=config["llm_predictor"]["temperature"],  # 창의성 0~1
        )
        
        # 초기화 리스트들 
        self.vector_store   = self.init_vectorDB()                  # 1. RAG가 접근할 vectorDB를 초기화합니다.
        self.retriever      = self.init_retriever()                 # 2. LLM이 질문에 대한 답변을 생성하기 전 질문에 관련된 컨텐츠 기반 답변을 생성하기 위해, 컨텐츠를 검색해 찾는 리트리버를 초기화 합니다. 
        self.bm25_retriever = self.init_bm25_retriever()
        self.ensemble_retriever = self.init_ensemble_retriever()
        self.mq_ensemble_retriever = self.init_mq_ensemble_retriever()

    def init_vectorDB(self, persist_directory=config["chroma"]["persist_dir"]):
        """vectorDB 설정"""
        embeddings = OpenAIEmbeddings(model=config["embed_model"]["model_name"])  # VectorDB에 저장될 데이터를 임베딩할 모델을 선언합니다.
        vector_store = Chroma(
            persist_directory=persist_directory,  # 기존에 vectordb가 있으면 해당 위치의 vectordb를 load하고 없으면 새로 생성합니다.
            embedding_function=embeddings,                      # 새롭게 데이터가 vectordb에 넣어질때 사용할 임베딩 방식을 정합니다, 저희는 위에서 선언한 embeddings를 사용합니다.
            collection_name = 'india',                          # india라는 이름을 정해줌으로써 나중에 vector store 관리 가능 
            collection_metadata = {'hnsw:space': 'cosine'},     # cosine 말고 l2 가 default / collection_metadata를 통해 유사도 검색에 사용될 공간('hnsw:space')을 'cosine'으로 지정하여, 코사인 유사도를 사용
        )
        return vector_store

    def init_retriever(self):            
        # base retriever 3 
        retriever = self.vector_store.as_retriever(
            search_type="mmr",                                              # mmr 검색 방법으로 
            search_kwargs={'fetch_k': 10, "k": 5, 'lambda_mult': 0.4},      # 상위 10개의 관련 context에서 최종 5개를 추리고 'lambda_mult'는 관련성과 다양성 사이의 균형을 조정하는 파라메타 default 값이 0.5
        )
        return retriever
    
    def init_bm25_retriever(self):
        all_docs = pickle.load(open(config["pkl_path"], 'rb'))
        bm25_retriever = BM25Retriever.from_documents(all_docs)
        bm25_retriever.k = 1                                            # BM25Retriever의 검색 결과 개수를 1로 설정합니다.
        return bm25_retriever
    
    def init_ensemble_retriever(self):
        ensemble_retriever = EnsembleRetriever(
            retrievers=[self.bm25_retriever, self.retriever],
            weights=[0.4, 0.6],
            search_type=config["ensemble_search_type"],  # mmr
        )
        return ensemble_retriever
    
    # 멀티쿼리 - 앙상블
    def init_mq_ensemble_retriever(self):
        mq_ensemble_retriever = MultiQueryRetriever.from_llm(
            llm=self.llm,
            retriever=self.ensemble_retriever
        )
        return mq_ensemble_retriever
    


In [4]:
pipeline = Ragpipeline()

## 이제, 이전 게시물에서 만든 RAGpipeline 이후에 chat 부분을 추가해준다.

utils에 prompt.py에 프롬프트를 작성해줍니다.

In [3]:
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain.schema import HumanMessage

from utils.redis_utils import save_message_to_redis, get_messages_from_redis
from utils.prompt import *

In [6]:
# 한 이유: 현재 대화가 이어지도록 하면서 검색기가 가져온 정보들을 바탕으로 대화 할 수 있게 하기 위해서
def init_chat_chain(self):
    # 참고 링크 : https://python.langchain.com/v0.1/docs/modules/chains/
    
    # 1. 이어지는 대화가 되도록 대화기록과 체인
    history_aware_retriever = create_history_aware_retriever(           # llm과 retriever, 그리고 prompt를 연결한다. 이 체인은 대화 기록을 가져온 다음 이를 사용하여 기본 검색기에 전달되는 검색어를 생성합니다.
        self.llm, self.retriever, contextualize_q_prompt                # contextualize_q_prompt의 주요 목표는 사용자의 질문을 이해하기 쉽게 다시 작성하는 것입니다. 전반적으로 대화 맥락을 이해합니다.
    )
    
    # 2. 문서들의 내용을 답변할 수 있도록 리트리버와 체인
    question_answer_chain = create_stuff_documents_chain(               # prompt와 llm을 연결하여 나중에 리트리버가 제공하는 문서의 내용들과 연결될 준비를 시켜놓습니다. 
        self.llm, qa_prompt)
    
    #. 1번과 2번을 서로 체인
    rag_chat_chain = create_retrieval_chain(                            # 이 체인은 사용자 쿼리를 받아 retriever에게 전달되어 관련 문서를 가져옵니다. 그런 다음 해당 문서(및 원래 입력)가 LLM으로 전달되어 응답을 생성합니다.
        history_aware_retriever, question_answer_chain)
    
    print("[초기화] RAG chain 초기화 완료")
    return rag_chat_chain

In [7]:
# 한 이유: 사용자별 대화 세션을 관리해주고 이어서 대화가 될 수 있게 해주기 위해 
def chat_generation(self, question: str) -> dict:
    def get_session_history(session_id=None, user_email=None):
        session_id = session_id if session_id else self.current_session_id
        user_email = user_email if user_email else self.current_user_email

        if session_id not in self.session_histories:
            self.session_histories[session_id] = ChatMessageHistory()
            # Redis에서 세션 히스토리 불러오기
            history_messages = get_messages_from_redis(user_email, session_id)
            for message in history_messages:
                self.session_histories[session_id].add_message(HumanMessage(content=message))
                
        return self.session_histories[session_id]

    final_chain = self.chain

    # 특정 유형의 작업(체인)에 메시지 기록을 추가, 대화형 애플리케이션 또는 복잡한 데이터 처리 작업을 구현할 때 이전 메시지의 맥락을 유지해야 할 필요가 있을 때 유용
    conversational_rag_chain = RunnableWithMessageHistory(      
        final_chain,                                # 실행할 Runnable 객체
        get_session_history,                        # 세션 기록을 가져오는 함수
        input_messages_key="input",                 # 입력 메시지의 키
        history_messages_key="chat_history",        # 기록 메시지의 키
        output_messages_key="answer"                # 출력 메시지의 키 
    )
    response = conversational_rag_chain.invoke(
        {"input": question},
        config={"configurable": {"session_id": self.current_session_id}}            # 같은 session_id 를 입력하면 이전 대화 스레드의 내용을 가져오기 때문에 이어서 대화가 가능!
    )

    # Redis에 세션 히스토리 저장
    save_message_to_redis(self.current_user_email, self.current_session_id, question)
    save_message_to_redis(self.current_user_email, self.current_session_id, response["answer"])
    
    return response

### 위 내용을 이제 CLASS안에 넣어봅시다.

In [7]:
class Ragpipeline:
    def __init__(self):
        # chatGPT API를 통해 llm 모델 로드
        self.llm = ChatOpenAI(
            model=config["llm_predictor"]["model_name"],  # chatgpt 모델 이름
            temperature=config["llm_predictor"]["temperature"],  # 창의성 0~1
        )
        
        # 초기화 리스트들 
        self.vector_store   = self.init_vectorDB()                  
        self.retriever      = self.init_retriever()                
        self.bm25_retriever = self.init_bm25_retriever()
        self.ensemble_retriever = self.init_ensemble_retriever()
        self.mq_ensemble_retriever = self.init_mq_ensemble_retriever()
        self.chain          = self.init_chat_chain()
        
        self.session_histories = {}
        self.current_user_email = None
        self.current_session_id = None

    def init_vectorDB(self, persist_directory=config["chroma"]["persist_dir"]):
        """vectorDB 설정"""
        embeddings = OpenAIEmbeddings(model=config["embed_model"]["model_name"])  # VectorDB에 저장될 데이터를 임베딩할 모델을 선언합니다.
        vector_store = Chroma(
            persist_directory=persist_directory,  # 기존에 vectordb가 있으면 해당 위치의 vectordb를 load하고 없으면 새로 생성합니다.
            embedding_function=embeddings,                      # 새롭게 데이터가 vectordb에 넣어질때 사용할 임베딩 방식을 정합니다, 저희는 위에서 선언한 embeddings를 사용합니다.
            collection_name = 'india',                          # india라는 이름을 정해줌으로써 나중에 vector store 관리 가능 
            collection_metadata = {'hnsw:space': 'cosine'},     # cosine 말고 l2 가 default / collection_metadata를 통해 유사도 검색에 사용될 공간('hnsw:space')을 'cosine'으로 지정하여, 코사인 유사도를 사용
        )
        return vector_store

    def init_retriever(self):            
        # base retriever 3 
        retriever = self.vector_store.as_retriever(
            search_type="mmr",                                              # mmr 검색 방법으로 
            search_kwargs={'fetch_k': 5, "k": 2, 'lambda_mult': 0.4},      # 상위 10개의 관련 context에서 최종 5개를 추리고 'lambda_mult'는 관련성과 다양성 사이의 균형을 조정하는 파라메타 default 값이 0.5
        )
        return retriever
    
    def init_bm25_retriever(self):
        all_docs = pickle.load(open(config["pkl_path"], 'rb'))
        bm25_retriever = BM25Retriever.from_documents(all_docs)
        bm25_retriever.k = 1                                            # BM25Retriever의 검색 결과 개수를 1로 설정합니다.
        return bm25_retriever
    
    def init_ensemble_retriever(self):
        ensemble_retriever = EnsembleRetriever(
            retrievers=[self.bm25_retriever, self.retriever],
            weights=[0.4, 0.6],
            search_type=config["ensemble_search_type"],  # mmr
        )
        return ensemble_retriever
    
    # 멀티쿼리 - 앙상블
    def init_mq_ensemble_retriever(self):
        mq_ensemble_retriever = MultiQueryRetriever.from_llm(
            llm=self.llm,
            retriever=self.ensemble_retriever
        )
        return mq_ensemble_retriever
    
    def init_chat_chain(self):
        # 1. 이어지는 대화가 되도록 대화기록과 체인
        history_aware_retriever = create_history_aware_retriever(self.llm, self.mq_ensemble_retriever, contextualize_q_prompt)      # self.mq_ensemble_retriever
        # 2. 문서들의 내용을 답변할 수 있도록 리트리버와 체인
        question_answer_chain = create_stuff_documents_chain(self.llm, qa_prompt)
        # 3. 1과 2를 합침
        rag_chat_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)
        
        return rag_chat_chain
    
    def chat_generation(self, question: str) -> dict:
        def get_session_history(session_id=None, user_email=None):
            session_id = session_id if session_id else self.current_session_id
            user_email = user_email if user_email else self.current_user_email

            if session_id not in self.session_histories:
                self.session_histories[session_id] = ChatMessageHistory()
                # Redis에서 세션 히스토리 불러오기
                history_messages = get_messages_from_redis(user_email, session_id)
                for message in history_messages:
                    self.session_histories[session_id].add_message(HumanMessage(content=message))
                    
            return self.session_histories[session_id]

        # 특정 유형의 작업(체인)에 메시지 기록을 추가, 대화형 애플리케이션 또는 복잡한 데이터 처리 작업을 구현할 때 이전 메시지의 맥락을 유지해야 할 필요가 있을 때 유용
        conversational_rag_chain = RunnableWithMessageHistory(      
            self.chain,                                 # 실행할 Runnable 객체
            get_session_history,                        # 세션 기록을 가져오는 함수
            input_messages_key="input",                 # 입력 메시지의 키
            history_messages_key="chat_history",        # 기록 메시지의 키
            output_messages_key="answer"                # 출력 메시지의 키 
        )
        response = conversational_rag_chain.invoke(
            {"input": question},
            config={"configurable": {"session_id": self.current_session_id}}            # 같은 session_id 를 입력하면 이전 대화 스레드의 내용을 가져오기 때문에 이어서 대화가 가능!
        )

        # Redis에 세션 히스토리 저장
        save_message_to_redis(self.current_user_email, self.current_session_id, question)
        save_message_to_redis(self.current_user_email, self.current_session_id, response["answer"])
        
        return response


In [8]:
pipeline = Ragpipeline()

In [9]:
question = '인도 통관 및 운송에 대해서 알려줘.'
answer = pipeline.chat_generation(question)

print(answer)

{'input': '인도 통관 및 운송에 대해서 알려줘.', 'chat_history': [HumanMessage(content='2024.07.31 16:36:28 - 인도의 통관 및 운송 절차는 다음과 같이 진행됩니다:\n\n### 1. 통관 절차\n\n#### 정식통관\n- **소요 시간**: 인도에서 일반적인 경우 통관에 소요되는 시간은 행정상 운송 수입의 경우 3~4 근무일이 소요됩니다.\n- **서류 준비**: 수입자는 필요한 서류를 준비하여 제출해야 합니다. 주요 서류로는 포장명세서(P/L), 상업송장(C/I), 선하증권(B/L) 등이 있습니다.\n- **수입신고**: 물품이 입항하면 보세구역에 적하되고, 이후 전자데이터 교환(EDI) 신고 또는 수작업 신고를 통해 수입신고를 합니다.\n- **검사 및 평가**: 인도 관세청의 수입요건, 관세 평가 등을 검사하는 Appraiser Section과 관련 서류 및 관세율을 재점검하는 Audit Section 과정을 거칩니다.\n- **관세 납부**: 모든 서류에 서명을 받은 후, Challan No.라는 인도 관세 납부를 위한 번호를 부여받게 됩니다. 이를 근거로 관세를 완납하면 물품을 반출할 수 있습니다.\n\n### 2. 운송 절차\n\n#### 내륙운송\n- **인프라 상태**: 인도는 도로와 항만 등 인프라가 낙후되어 있고, 운송수단도 노후되어 있습니다. 따라서 내륙운송 과정에서 사고가 발생할 수 있습니다.\n- **보험 가입**: 내륙운송보험에 가입하는 것이 권장됩니다. 이는 운송 중 발생할 수 있는 사고에 대비하기 위함입니다.\n\n#### 서류 작성\n- **포장명세서(P/L)와 상업송장(C/I)**: 수출하는 물품의 경우 포장명세서와 상업송장을 컨테이너별로 철저히 작성해야 합니다. 인도 세관은 실물 검사를 철저히 실시하며, 서류와 화물이 일치하지 않을 경우 통관이 불가능할 수 있습니다.\n\n### 3. 유의사항\n- **서류의 정확성**: 통관에 필요한 서류를 정확하게 작성하고 제출해야

In [12]:
answer.keys()

dict_keys(['input', 'chat_history', 'context', 'answer'])

In [10]:
answer['chat_history']

[HumanMessage(content='2024.07.31 16:36:28 - 인도의 통관 및 운송 절차는 다음과 같이 진행됩니다:\n\n### 1. 통관 절차\n\n#### 정식통관\n- **소요 시간**: 인도에서 일반적인 경우 통관에 소요되는 시간은 행정상 운송 수입의 경우 3~4 근무일이 소요됩니다.\n- **서류 준비**: 수입자는 필요한 서류를 준비하여 제출해야 합니다. 주요 서류로는 포장명세서(P/L), 상업송장(C/I), 선하증권(B/L) 등이 있습니다.\n- **수입신고**: 물품이 입항하면 보세구역에 적하되고, 이후 전자데이터 교환(EDI) 신고 또는 수작업 신고를 통해 수입신고를 합니다.\n- **검사 및 평가**: 인도 관세청의 수입요건, 관세 평가 등을 검사하는 Appraiser Section과 관련 서류 및 관세율을 재점검하는 Audit Section 과정을 거칩니다.\n- **관세 납부**: 모든 서류에 서명을 받은 후, Challan No.라는 인도 관세 납부를 위한 번호를 부여받게 됩니다. 이를 근거로 관세를 완납하면 물품을 반출할 수 있습니다.\n\n### 2. 운송 절차\n\n#### 내륙운송\n- **인프라 상태**: 인도는 도로와 항만 등 인프라가 낙후되어 있고, 운송수단도 노후되어 있습니다. 따라서 내륙운송 과정에서 사고가 발생할 수 있습니다.\n- **보험 가입**: 내륙운송보험에 가입하는 것이 권장됩니다. 이는 운송 중 발생할 수 있는 사고에 대비하기 위함입니다.\n\n#### 서류 작성\n- **포장명세서(P/L)와 상업송장(C/I)**: 수출하는 물품의 경우 포장명세서와 상업송장을 컨테이너별로 철저히 작성해야 합니다. 인도 세관은 실물 검사를 철저히 실시하며, 서류와 화물이 일치하지 않을 경우 통관이 불가능할 수 있습니다.\n\n### 3. 유의사항\n- **서류의 정확성**: 통관에 필요한 서류를 정확하게 작성하고 제출해야 합니다. 서류의 오류나 불일치로 인해 통관이 지연될 수 있습니다.\n- **사전 준비**

In [14]:
print(answer['answer'])

인도의 통관 및 운송 절차는 다음과 같이 진행됩니다:

### 1. 통관 절차

#### 정식통관
- **소요 시간**: 인도에서 일반적인 경우 통관에 소요되는 시간은 행정상 운송 수입의 경우 3~4 근무일, 항공 운송의 경우 약 1~2 근무일이 소요됩니다.
- **수입신고**: 물품이 입항하면 보세구역에 적하되고, 이후 전자데이터 교환(EDI) 신고 또는 수작업 신고를 통해 수입신고를 합니다.
- **검사 및 평가**: 인도 관세청의 수입요건, 관세 평가 등을 검사하는 Appraiser Section과 관련 서류 및 관세율을 재점검하는 Audit Section 과정을 거칩니다.
- **관세 납부**: 모든 서류에 서명을 받은 후, Challan No.라는 인도 관세 납부를 위한 번호를 부여받게 됩니다. 이를 근거로 관세를 완납하면 물품을 반출할 수 있습니다.
- **체화료**: 정식통관의 경우 인도 세관은 일반적으로 화물 도착 후 7일 이후부터 체화료(Demurrage Charge)를 부과하고, 컨테이너 운송의 경우 5일 이후부터, 항공운송의 경우 3일 이후부터 체화료가 부과됩니다.

#### 임시 통관
- **목적**: 인도에 들여온 품목을 사용하지 않고 24개월 이내에 다시 반출할 목적이 있는 경우 임시통관이 이루어집니다.

### 2. 운송 절차

#### 내륙운송
- **인프라 상태**: 인도는 도로와 항만 등 인프라가 낙후되어 있고, 운송수단도 노후되어 있습니다. 따라서 내륙운송 과정에서 사고가 발생할 수 있습니다.
- **보험 가입**: 내륙운송보험에 가입하는 것이 권장됩니다. 이는 운송 중 발생할 수 있는 사고에 대비하기 위함입니다.

#### 서류 작성
- **포장명세서(P/L)와 상업송장(C/I)**: 수출하는 물품의 경우 포장명세서와 상업송장을 컨테이너별로 철저히 작성해야 합니다. 인도 세관은 실물 검사를 철저히 실시하며, 서류와 화물이 일치하지 않을 경우 통관이 불가능할 수 있습니다.

### 3. 유의사항
- **서류의 정확성**: 통관

In [15]:
answer['context']

[Document(metadata={'category': '정책', 'page': 0, 'source': '[정책][제약산업정보포털][2019.04.08]인도 통관 및 운송.pdf', 'year': 2019}, page_content='5. 통관 및 운송\n \n \n가. 통관제도\n  \n \n통관 유형별 절차\n \n1) 정식통관 \n \n인도에서 일반적인 경우 통관에 소요되는 시간은 행정상 운송 수입의 경우 3~4 근무일, '),
 Document(metadata={'category': '정책', 'page': 0, 'source': '[정책][제약산업정보포털][2019.04.08]인도 통관 및 운송.pdf', 'year': 2019}, page_content='하면 모든 서류에 서명 받게 된다. 이후 Challan No.라는 인도 관세 납부를 위한 번호를 부여받게 되고, 이를 근거로 관세를\n완납하게 되면 물품을 반출할 수 있다. 또한, '),
 Document(metadata={'category': '정책', 'page': 1, 'source': '[정책][제약산업정보포털][2019.04.08]인도 통관 및 운송.pdf', 'year': 2019}, page_content='\n  ㅇ 통관 정보의 불투명: 인도의 관세 행정은 한국에 비해 그 정보 및 기준이 전반적으로 불투명하며, 실무선의 재량권이 크고 융통성 없\n는 처리 관행으로 인해 통관 애로가 자주'),
 Document(metadata={'source': '/Users/munseunghyeon/Downloads/big_project/승현_데이터수집/kotra_abroad_news_1th_.csv', 'url': 'https://dream.kotra.or.kr/user/extra/kotranews/bbs/linkView/jsp/Page.do?dataIdx=208409', 'date': '2023-11-28', 'category': '2. 경제 및 시장 분석', 'page_no': 13}, pag