# 두 개의 RAG (OpenSearch Index) 비교 샘플 코드

>이 노트북은,
> - SageMaker Studio* **`Data Science 3.0`** kernel 및 ml.m5.large 인스턴스에서 테스트 되었습니다.
> - SageMaker Notebook **`conda_python3`** 에서 테스트 되었습니다.


## 1. 환경 설정

In [None]:
%load_ext autoreload
%autoreload 2

### OpenSearch 클러스터 정보 지정

In [None]:
import boto3
region = boto3.Session().region_name
opensearch = boto3.client('opensearch', region)


In [None]:
# [필수] 아래 OpenSearch 정보는 각자 환경에 맞게 수정 합니다.

opensearch_user_id = 'raguser'
opensearch_user_password = 'Passw0rd1!'

domain_name = 'ebp-poc-all'
opensearch_domain_endpoint = 'https://search-ebp-poc-all-uw3oqtjpbgjeg4tbb5pf3ipnpi.us-west-2.es.amazonaws.com'

### Bedrock Client 생성

In [None]:
import boto3
import os
import json
from botocore.config import Config
import botocore 
from pprint import pprint
from termcolor import colored

session = boto3.Session()

retry_config = Config(
    region_name=os.environ.get("AWS_DEFAULT_REGION", None), # us-east-1 또는 us-west-2로 직접 지정해도 됩니다.
    retries={
        "max_attempts": 10,
        "mode": "standard",
    },
)

# modelId = "anthropic.claude-3-haiku-20240307-v1:0"  # (Change this to try different model versions)
modelId = "anthropic.claude-3-sonnet-20240229-v1:0"
accept = "application/json"
contentType = "application/json"

bedrock = boto3.client(service_name='bedrock')
boto3_bedrock = boto3.client(service_name='bedrock-runtime',config=retry_config)

#model_list = bedrock.list_foundation_models()
#result = [(fm_list["modelName"], fm_list["modelId"]) for fm_list in model_list["modelSummaries"] if fm_list['inferenceTypesSupported'] == ['ON_DEMAND']]
#pprint(result)

## 2. Titan Embedding 및 LLM 인 Claude-3 모델 로딩

### LLM 로딩 (Claude-v3 sonnet)

In [None]:
from langchain_community.chat_models import BedrockChat
from langchain_core.messages import HumanMessage
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

In [None]:
llm_text = BedrockChat(
    model_id=modelId,
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()],
    model_kwargs={
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 4096,
        "temperature" : 0,
        "top_k": 350,
        "top_p": 0.999
    }
)
llm_text

### Embedding 모델 선택

In [None]:
from langchain_community.embeddings import BedrockEmbeddings

llm_emb = BedrockEmbeddings(
    client=boto3_bedrock,
    # model_id="cohere.embed-multilingual-v3"
    model_id="amazon.titan-embed-g1-text-02"
)

-------------------

## 3. OpenSearch 벡터 Index 생성

### 오픈 서치 인덱스 유무에 따라 삭제
오픈 서치에 해당 인덱스가 존재하면, 삭제 합니다. 

In [None]:
from opensearchpy import OpenSearch, RequestsHttpConnection
http_auth = (opensearch_user_id, opensearch_user_password)
os_client = OpenSearch(
            hosts=[
                {'host': opensearch_domain_endpoint.replace("https://", ""),
                 'port': 443
                }
            ],
            http_auth=http_auth, # Master username, Master password,
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection
        )

### Project-1을 위한 index 생성

In [None]:
index_name_1 = "project-1-index"
exists = os_client.indices.exists(index_name_1)

if exists:
    os_client.indices.delete(index=index_name_1)
    print("Index is deleted")
else:
    print("Index does not exist")

In [None]:
## metadata, text, vector_field 의 네이밍은 langchain에서 지정된 이름
### model에 따라 dimension 사이즈 변경 필요 (Titan : 1536, Cohere : 1024)
import json

with open('index_body_simple.json', 'r') as f:
    index_body = json.load(f)

print(json.dumps(index_body, indent=2))


In [None]:
os_client.indices.create(index_name_1, body=index_body)

In [None]:
%%time
from langchain_community.vectorstores import OpenSearchVectorSearch

vector_db_1 = OpenSearchVectorSearch(
    index_name=index_name_1,
    opensearch_url=opensearch_domain_endpoint,
    embedding_function=llm_emb,
    http_auth=http_auth, # http_auth
)

### Project-2를 위한 index 생성

In [None]:
index_name_2 = "project-2-index"
exists = os_client.indices.exists(index_name_2)

if exists:
    os_client.indices.delete(index=index_name_2)
    print("Index is deleted")
else:
    print("Index does not exist")

In [None]:
## metadata, text, vector_field 의 네이밍은 langchain에서 지정된 이름
### model에 따라 dimension 사이즈 변경 필요 (Titan : 1536, Cohere : 1024)
import json

with open('index_body_simple.json', 'r') as f:
    index_body = json.load(f)

print(json.dumps(index_body, indent=2))


In [None]:
os_client.indices.create(index_name_2, body=index_body)

In [None]:
%%time
from langchain_community.vectorstores import OpenSearchVectorSearch

vector_db_2 = OpenSearchVectorSearch(
    index_name=index_name_2,
    opensearch_url=opensearch_domain_endpoint,
    embedding_function=llm_emb,
    http_auth=http_auth, # http_auth
)

## 4. 데이터 준비


In [None]:
# PyPDF 설치
!pip install pypdf

In [None]:
import time
from langchain_community.document_loaders import PDFPlumberLoader
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_community.document_loaders import PyPDFium2Loader

# PyPDF 임포트
from langchain_community.document_loaders import PyPDFLoader

from langchain_core.documents import Document

# from llmsherpa.readers import LayoutPDFReader

#### Project-1 에 대한 PDF 문서 폴더 지정

In [None]:
import glob


# Project-1 문서 폴더
data_path_1 = './data/project-1/*'
pdf_list_1 = glob.glob(data_path_1)
pdf_list_1

#### Project-2에 대한 PDF 문서 폴더 지정

In [None]:
import glob

# Project-2 문서 폴더
data_path_2 = './data/project-2/*'
pdf_list_2 = glob.glob(data_path_2)
pdf_list_2

In [None]:
from multiprocessing.pool import ThreadPool
from multiprocessing import  Manager

import pdfplumber

In [None]:
import re

def prune_text(text, current_pdf_file):

    def replace_cid(match):
        print(f"Please check PDF file {current_pdf_file} : {match}")
        ascii_num = int(match.group(1))
        try:
            return chr(ascii_num)
        except:
            return ''  # In case of conversion failure, return empty string

    # Regular expression to find all (cid:x) patterns
    cid_pattern = re.compile(r'\(cid:(\d+)\)')
    pruned_text = re.sub(cid_pattern, replace_cid, text)
    return pruned_text

In [None]:
from datetime import datetime

def read_pdf(param):
    vector_db = param[0]
    current_pdf_file = param[1]
    print(f"current_pdf_file : {current_pdf_file}")
    docs = []
    source_name = current_pdf_file.split('/')[-1]
    type_name = source_name.split('_')[0]

    loader = PyPDFLoader(current_pdf_file)
    pages = loader.load_and_split()

    for page_number, page in enumerate(pages, start=1):
        page_text = page.page_content
        if page_text:
            pruned_text = prune_text(page_text, current_pdf_file)
        else:
            pruned_text = ""
        if len(pruned_text) >= 20:  ## 임의로 20 이상인 sentence만 뽑도록 함
            chunk = Document(
                page_content=pruned_text.replace('\n',' '),
                metadata={
                    "source" : source_name,
                    "type": type_name,
                    "timestamp": datetime.now()
                }
            )
            #print(f"chunk : {chunk}")
            vector_db.add_documents([chunk])
    #         docs.append(chunk)
    # if len(docs) > 0 :
    #     vector_db.add_documents(docs)

### Project-1 을 위한 OpenSearch 인덱스 생성

In [None]:
manager = Manager()
result_dict = manager.dict()

# ml.m5.xlarge에서 multiprocessing으로 동작 확인
param = [(vector_db_1, current_pdf_file)for current_pdf_file in pdf_list_1]

num_processes = len(pdf_list_1)%os.cpu_count()

if num_processes == 0 :
    num_processes = os.cpu_count() - 1

print(f"num of process : {num_processes}")

with ThreadPool(processes=num_processes) as pool:
    pool.map(read_pdf, param)
    pool.close()
    pool.join()

### Project-1 을 위한 OpenSearch 인덱스 구성 확인

In [None]:
index_info_1 = os_client.indices.get(index=index_name_1)
print(json.dumps(index_info_1, indent=2))

### Project-2 를 위한 OpenSearch 인덱스 생성

In [None]:
manager = Manager()
result_dict = manager.dict()

# ml.m5.xlarge에서 multiprocessing으로 동작 확인
param = [(vector_db_2, current_pdf_file)for current_pdf_file in pdf_list_2]

num_processes = len(pdf_list_2)%os.cpu_count()

if num_processes == 0 :
    num_processes = os.cpu_count() - 1

print(f"num of process : {num_processes}")

with ThreadPool(processes=num_processes) as pool:
    pool.map(read_pdf, param)
    pool.close()
    pool.join()

### Project-2 를 위한 OpenSearch 인덱스 구성 확인

In [None]:
index_info_2 = os_client.indices.get(index=index_name_2)
print(json.dumps(index_info_2, indent=2))

## 5. Hybrid Search 함수 정의

In [None]:
from opensearch_dsl import Search
from langchain.retrievers import EnsembleRetriever
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain.chains.question_answering import load_qa_chain
# from utils.rag import run_RetrievalQA, show_context_used

In [None]:
from langchain.schema import BaseRetriever
from typing import Any, Dict, List, Optional, List, Tuple
from langchain.callbacks.manager import CallbackManagerForRetrieverRun

# lexical(keyword) search based (using Amazon OpenSearch)
class OpenSearchLexicalSearchRetriever(BaseRetriever):
    os_client: Any
    index_name: str
    k = 3
    filter = []

    def normalize_search_results(self, search_results):
        hits = (search_results["hits"]["hits"])
        max_score = float(search_results["hits"]["max_score"])
        for hit in hits:
            hit["_score"] = float(hit["_score"]) / max_score
        search_results["hits"]["max_score"] = hits[0]["_score"]
        search_results["hits"]["hits"] = hits
        return search_results

    def update_search_params(self, **kwargs):
        self.k = kwargs.get("k", 3)
        self.filter = kwargs.get("filter", [])
        self.index_name = kwargs.get("index_name", self.index_name)

    def _reset_search_params(self, ):
        self.k = 3
        self.filter = []
        
    def query_lexical(self, query, filter=[], k=5):
        QUERY_TEMPLATE = {
            "size": k,
            "query": {
                "bool": {
                    "must": [
                        {
                            "match": {
                                "text": {
                                    "query": query,
                                    "operator":  "or"
                                }
                            }
                        }
                    ],
                    "filter": filter
                }
            }
        }
        
        if len(filter) > 0:
            QUERY_TEMPLATE["query"]["bool"]["filter"].extend(filter)
            
        return QUERY_TEMPLATE
    

    def _get_relevant_documents(
        self, query: str, *, run_manager: CallbackManagerForRetrieverRun) -> List[Document]:
        
        query = self.query_lexical(
            query=query,
            filter=self.filter,
            k=self.k
        )

        # print ("lexical search query: ")
        # print(query)
        
        search_results = self.os_client.search(
            body=query,
            index=self.index_name
        )

        results = []
        if search_results["hits"]["hits"]:
            search_results = self.normalize_search_results(search_results)
            for res in search_results["hits"]["hits"]:

                metadata = res["_source"]["metadata"]
                metadata["id"] = res["_id"]

                doc = Document(
                    page_content=res["_source"]["text"],
                    metadata=metadata
                )
                results.append((doc))

        self._reset_search_params()

        return results[:self.k]

In [None]:
prompt_template = """
\n\nHuman: Use the following pieces of context to provide a concise answer to the question at the end.
If you don't know the answer, just say that you don't know, don't try to make up an answer.

{context}

Question: {question}

\n\nAssistant:"""


PROMPT = PromptTemplate(
    template=prompt_template,
    input_variables=["context", "question"]
)

In [None]:
chain = load_qa_chain(
    llm=llm_text,
    chain_type="stuff",
    prompt=PROMPT,
    verbose=False  # LLM 답변 과정을 출력하려면 True로 변경 합니다.
)

In [None]:
# Project-1, Project-2 인덱스에 각각 요청할 공통 쿼리

query = "1차에너지 공급 연평균 성장률에 대해서 상세히 알려주세요."

## 6. Project-1에 대한 쿼리 (하이브리드 서치 : Sementic + Lexical)

In [None]:
boolean_filter = []

In [None]:
index_name = index_name_1

In [None]:
opensearch_semantic_retriever = vector_db.as_retriever(
    search_type="similarity",
    search_kwargs={
        "k": 5,
        "boolean_filter": boolean_filter
    }
)

In [None]:
opensearch_lexical_retriever = OpenSearchLexicalSearchRetriever(
    os_client=os_client,
    index_name=index_name,
    k=3,
    filter=boolean_filter
)

In [None]:
ensemble_retriever = EnsembleRetriever(
    retrievers=[opensearch_lexical_retriever, opensearch_semantic_retriever],
    weights=[0.5, 0.5],
    c=100,
    k=5
)

In [None]:
answer_1 = chain.invoke(
    {
        "input_documents": ensemble_retriever.get_relevant_documents(query), 
        "question": query
    }
)

print("##############################")
print("query: \n", query)
print("answer: \n", answer_1['output_text'])

## 7. Project-2에 대한 쿼리 (하이브리드 서치 : Sementic + Lexical)

In [None]:
boolean_filter = []

In [None]:
index_name = index_name_2

In [None]:
opensearch_semantic_retriever = vector_db.as_retriever(
    search_type="similarity",
    search_kwargs={
        "k": 5,
        "boolean_filter": boolean_filter
    }
)

In [None]:
opensearch_lexical_retriever = OpenSearchLexicalSearchRetriever(
    os_client=os_client,
    index_name=index_name,
    k=3,
    filter=boolean_filter
)

In [None]:
ensemble_retriever = EnsembleRetriever(
    retrievers=[opensearch_lexical_retriever, opensearch_semantic_retriever],
    weights=[0.5, 0.5],
    c=100,
    k=5
)

In [None]:
answer_2 = chain.invoke(
    {
        "input_documents": ensemble_retriever.get_relevant_documents(query), 
        "question": query
    }
)

print("##############################")
print("query: \n", query)
print("answer: \n", answer_2['output_text'])

## 8. Project-1과 Project-2의 답변을 비교

In [None]:
context_1 = answer_1['output_text']
context_2 = answer_2['output_text']

# 답변 비교 프롬프트 예시
comp_prompt = f"""
다음 {context_1}과 {context_2}를 비교 합니다. 
답변은 최대한 상세히 합니다. 모르는 내용을 말하지 않습니다.
"""

In [None]:
import os
from langchain_community.chat_models import BedrockChat

def get_text_response(input_content):
    llm = BedrockChat(
        credentials_profile_name=os.environ.get("BWB_PROFILE_NAME"),
        region_name=os.environ.get("BWB_REGION_NAME"),
        endpoint_url=os.environ.get("BWB_ENDPOINT_URL"),
        model_id="anthropic.claude-3-sonnet-20240229-v1:0",
        model_kwargs={
            "max_tokens": 4096,
            "temperature": 0,
            "top_p": 0.01,
            "top_k": 0,
        }
    )
    return llm.predict(input_content)


if __name__ == "__main__":
    input_text = comp_prompt
    response_content = get_text_response(input_content=input_text)
    print(response_content)