# RAG langchain

In [1]:
from typing import List
from langchain_core.retrievers import BaseRetriever

from FlagEmbedding import BGEM3FlagModel
from pinecone import Pinecone, Index

from langchain_core.prompts import PromptTemplate

import torch
from transformers import pipeline, AutoTokenizer, AutoModel
from langchain_huggingface import HuggingFacePipeline

from langchain_core.runnables import RunnablePassthrough

2024-09-14 09:01:06.690805: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-09-14 09:01:06.690957: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-09-14 09:01:06.691015: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-09-14 09:01:06.702490: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
import os
from dotenv import load_dotenv

load_dotenv("../.env")

True

In [5]:
class HybridSearchRetriever(BaseRetriever):

    k: int
    """Number of top results to return"""
    embedding_model:BGEM3FlagModel
    
    pinecone_index:Index

    alpha:float

    def _get_relevant_documents(
        self, query: str
    ) -> List[str]:
        """Sync implementations for retriever."""
        user_query_emb = self.embedding_model.encode(query, return_dense=True, return_sparse=True, return_colbert_vecs=False) #dense, sparse 둘 다 반환함
        
        query_dense_vector = user_query_emb['dense_vecs'].tolist()
        user_query_sparse = user_query_emb['lexical_weights']
        query_sparse_vector = {
            'indices': [int(k) for k in user_query_sparse.keys() if k.isdigit()], #isdigit() 안하면 에러뜨더라
            'values': [float(v) for k, v in user_query_sparse.items() if k.isdigit()]
        }

        hdense, hsparse = self._hybrid_score_norm(query_dense_vector, query_sparse_vector, alpha=self.alpha)

        hybrid_query_response = self.pinecone_index.query(
            # namespace="example-namespace",
            top_k=3,
            vector=hdense,
            sparse_vector=hsparse,
            include_metadata=True,
        )
        
        documents = [
            f"{match['metadata']['answer_intro']}\n"
            f"{match['metadata']['answer_body']}\n"
            f"{match['metadata']['answer_conclusion']}"
            for match in hybrid_query_response['matches']
        ]
        return documents
    
    def _hybrid_score_norm(self, dense, sparse, alpha: float):
        """Hybrid score using a convex combination

        alpha * dense + (1 - alpha) * sparse

        Args:
            dense: Array of floats representing
            sparse: a dict of `indices` and `values`
            alpha: scale between 0 and 1
        """
        if alpha < 0 or alpha > 1:
            raise ValueError("Alpha must be between 0 and 1")
        hs = {
            'indices': sparse['indices'],
            'values':  [v * (1 - alpha) for v in sparse['values']]
        }
        return [v * alpha for v in dense], hs

In [8]:
import os

def load_model(model_name):
    model_path = os.path.join('models', model_name.replace('/', '_'))  # 'models/BAAI_bge-m3'
    # 모델이 로컬에 존재하는지 확인
    if not os.path.exists(model_path):
        # 모델과 토크나이저 다운로드 및 저장
        print(f"모델을 다운로드 중입니다: {model_name}")
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModel.from_pretrained(model_name)
        
        # 모델과 토크나이저 저장
        tokenizer.save_pretrained(model_path)
        model.save_pretrained(model_path)
        print(f"모델이 {model_path}에 저장되었습니다.")
    else:
        # 모델과 토크나이저 로드
        print(f"모델을 {model_path}에서 로드 중입니다.")
        tokenizer = AutoTokenizer.from_pretrained(model_path)
        model = AutoModel.from_pretrained(model_path)
        print(f"모델이 로드되었습니다.")
    return tokenizer, model

In [10]:
m3_tokenizer, m3_model = load_model('BAAI/bge-m3')

모델을 models/BAAI_bge-m3에서 로드 중입니다.
모델이 로드되었습니다.


In [12]:
pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
# Connect to the existing Pinecone index
pinecone_index = pc.Index("health-care")

embedding_model = BGEM3FlagModel(m3_model,use_fp16=True) # Setting use_fp16 to True speeds up computation with a slight performance degradation
retriever = HybridSearchRetriever(k=3, embedding_model=embedding_model, pinecone_index=pinecone_index, alpha=0.75)

KeyError: 'PINECONE_API_KEY'

In [None]:
gemma_2_tokenizer, gemma_2_model = load_model('google/gemma-2-2b-it')

In [None]:
gen = pipeline(
    task='text-generation',
    model=gemma_2_model,
    tokenizer=gemma_2_tokenizer,
    device=0 if torch.cuda.is_available() else -1,
    )

llm = HuggingFacePipeline(pipeline=gen)

In [None]:
template = """당신은 질문-답변(Question-Answering)을 수행하는 친절한 AI 어시스턴트입니다. 당신의 임무는 주어진 문맥(context) 에서 주어진 질문(question) 에 답하는 것입니다.
검색된 다음 문맥(context) 을 사용하여 질문(question) 에 답하세요. 만약, 주어진 문맥(context) 에서 답을 찾을 수 없다면, 답을 모른다면 `주어진 정보에서 질문에 대한 정보를 찾을 수 없습니다` 라고 답하세요.
한글로 답변해 주세요. 단, 기술적인 용어나 이름은 번역하지 않고 그대로 사용해 주세요.

#Question:
{question}

#Context:
{context}

#Answer:"""

prev_chat = []

chat = [
    *prev_chat,
     { "role": "user", "content": template}
]

prompt_template = gemma_2_tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)

prompt = PromptTemplate(input_variables=["question", "context"], template=prompt_template)
prompt

In [None]:
rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
)

In [None]:
def stream_response(response, return_output=False):
    answer = ""
    for token in response:
        answer += token
        print(token, end="", flush=True)
    if return_output:
        return answer
answer = rag_chain.invoke("갑자기 너무 배가 아파")
stream_response(answer)