# OpenSearch를 통한 Simple RAG with LlamaIndex
>이 노트북은,
> - SageMaker Studio* **`Data Science 3.0`** kernel 및 ml.m5.large 인스턴스에서 테스트 되었습니다.
> - SageMaker Notebook **`conda_python3`** 에서 테스트 되었습니다.


여기서는 OpenSearch 가 설치된 것을 가정하고, 한글 형태소 분석기의 사용하는 법을 알려 드립니다.

---

### [중요]
- 이 노트북은 Bedrock Titan Embedding Model 을 기본으로 사용합니다. 
- 오픈 서치 서비스가 액티브 된 상태를 가정 합니다.
- 앞서 02_setup_openSearch_simple 노트북이 모두 완료가 되어 있어야 합니다.


---
## Ref: 
- [Amazon OpenSearch Service로 검색 구현하기](https://catalog.us-east-1.prod.workshops.aws/workshops/de4e38cb-a0d9-4ffe-a777-bf00d498fa49/ko-KR/indexing/blog-reindex)
- [OpenSearch Python Client](https://opensearch.org/docs/1.3/clients/python-high-level/)
- [OpenSearch Match, Multi-Match, and Match Phrase Queries](https://opster.com/guides/opensearch/opensearch-search-apis/opensearch-match-multi-match-and-match-phrase-queries/)
- OpenSearch Query 에서 Filter, Must, Should, Not Mush 에 대한 설명 입니다.
    - [OpenSearch Boolean Queries](https://opster.com/guides/opensearch/opensearch-search-apis/opensearch-boolean-queries/#:~:text=Boolean%20queries%20are%20used%20to,as%20terms%2C%20match%20and%20query_string.)
- [OpenSearch Query Description (한글)](https://esbook.kimjmin.net/05-search)


## 1. 환경 세팅

In [1]:
!pip install opensearch_dsl==2.1.0



In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
import boto3
from utils.ssm import parameter_store

region=boto3.Session().region_name
pm = parameter_store(region)

In [4]:
opensearch_domain_endpoint = pm.get_params(
    key="opensearch_domain_endpoint",
    enc=False
)

opensearch_user_id = pm.get_params(
    key="opensearch_user_id",
    enc=False
)

opensearch_user_password = pm.get_params(
    key="opensearch_user_password",
    enc=True
)

In [5]:
opensearch_domain_endpoint = opensearch_domain_endpoint
opensearch_user_id = opensearch_user_id
opensearch_user_password = opensearch_user_password


### Bedrock Client 생성

In [6]:
import boto3
import os
import json
from botocore.config import Config
import botocore 
from pprint import pprint
from termcolor import colored

session = boto3.Session()

retry_config = Config(
    region_name=os.environ.get("AWS_DEFAULT_REGION", None),
    retries={
        "max_attempts": 10,
        "mode": "standard",
    },
)

#modelId = "anthropic.claude-3-5-sonnet-20240620-v1:0"  # Claude 3.5 Sonnet
modelId = "anthropic.claude-3-sonnet-20240229-v1:0" # Claude 3.0 Sonnet
accept = "application/json"
contentType = "application/json"

bedrock = boto3.client(service_name='bedrock')
boto3_bedrock = boto3.client(service_name='bedrock-runtime',config=retry_config)

model_list = bedrock.list_foundation_models()
result = [(fm_list["modelName"], fm_list["modelId"]) for fm_list in model_list["modelSummaries"] if fm_list['inferenceTypesSupported'] == ['ON_DEMAND']]
pprint(result)

[('Titan Text Large', 'amazon.titan-tg1-large'),
 ('Titan Image Generator G1', 'amazon.titan-image-generator-v1'),
 ('Titan Text G1 - Premier', 'amazon.titan-text-premier-v1:0'),
 ('Titan Text Embeddings v2', 'amazon.titan-embed-g1-text-02'),
 ('Titan Text G1 - Lite', 'amazon.titan-text-lite-v1'),
 ('Titan Text G1 - Express', 'amazon.titan-text-express-v1'),
 ('Titan Embeddings G1 - Text', 'amazon.titan-embed-text-v1'),
 ('Titan Text Embeddings V2', 'amazon.titan-embed-text-v2:0'),
 ('Titan Multimodal Embeddings G1', 'amazon.titan-embed-image-v1'),
 ('SDXL 1.0', 'stability.stable-diffusion-xl-v1'),
 ('J2 Grande Instruct', 'ai21.j2-grande-instruct'),
 ('J2 Jumbo Instruct', 'ai21.j2-jumbo-instruct'),
 ('Jurassic-2 Mid', 'ai21.j2-mid'),
 ('Jurassic-2 Mid', 'ai21.j2-mid-v1'),
 ('Jurassic-2 Ultra', 'ai21.j2-ultra'),
 ('Jurassic-2 Ultra', 'ai21.j2-ultra-v1'),
 ('Jamba-Instruct', 'ai21.jamba-instruct-v1:0'),
 ('Claude Instant', 'anthropic.claude-instant-v1'),
 ('Claude', 'anthropic.claude-v2:

## 2. Titan Embedding 및 LLM 인 Claude-3 sonnet 모델 로딩

### LLM 로딩 (Claude-v3 sonnet)

In [7]:
from langchain_aws import ChatBedrock
from langchain_core.messages import HumanMessage
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

In [8]:
llm_text = ChatBedrock(
    model_id=modelId,
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()],
    model_kwargs={
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 4096,
        "temperature" : 0,
        "top_k": 0,
        "top_p": 0.0
    }
)
llm_text

ChatBedrock(callbacks=[<langchain_core.callbacks.streaming_stdout.StreamingStdOutCallbackHandler object at 0x7f1cc0223a90>], client=<botocore.client.BedrockRuntime object at 0x7f1cc0269900>, region_name='us-east-1', model_id='anthropic.claude-3-sonnet-20240229-v1:0', model_kwargs={'anthropic_version': 'bedrock-2023-05-31', 'max_tokens': 4096, 'temperature': 0, 'top_k': 0, 'top_p': 0.0}, streaming=True)

In [9]:
prompt1 = "나는 인공지능 AI 보험 서비스입니다. 생명과 손해 보험의 차이에 대해 설명해 주세요."
messages = [
    HumanMessage(content=prompt1)
]

# messages = [
#     {"role": "user", "content": [{"type": "text", "text": prompt1}]},
# ]

response1 = llm_text.invoke(messages)

생명보험과 손해보험은 보험의 주요 유형으로 다음과 같은 차이점이 있습니다.

1. 보장 대상
- 생명보험은 사람의 생명과 관련된 위험을 보장합니다. 예를 들어 사망, 상해, 질병 등을 대상으로 합니다.
- 손해보험은 재산상의 손해나 배상책임을 보장합니다. 예를 들어 화재, 자동차사고, 배상책임 등을 대상으로 합니다.

2. 보험금 지급 사유
- 생명보험은 피보험자의 사망, 상해, 질병 등 인적 위험이 발생했을 때 보험금을 지급합니다.
- 손해보험은 재물의 손해나 제3자에 대한 배상책임이 발생했을 때 실제 입은 손해를 보상합니다.

3. 보험기간
- 생명보험은 일반적으로 장기계약이며, 종신보험의 경우 피보험자 종신까지 보장됩니다.
- 손해보험은 단기계약이 일반적이며, 1년 만기로 갱신하는 경우가 많습니다.

4. 보험료 산정 기준
- 생명보험료는 피보험자의 나이, 건강상태, 가입금액 등에 따라 결정됩니다.
- 손해보험료는 보험목적물의 가액, 위험률, 보상한도 등에 따라 결정됩니다.

요약하면 생명보험은 개인의 생명과 관련된 위험을 장기적으로 보장하고, 손해보험은 재산상 손해나 배상책임을 단기적으로 보상하는 것이 주요 차이점입니다.

### Embedding 모델 선택

In [10]:
from langchain_community.embeddings import BedrockEmbeddings

llm_emb = BedrockEmbeddings(
    client=boto3_bedrock,
    # model_id="cohere.embed-multilingual-v3"
    model_id="amazon.titan-embed-g1-text-02"
)

-------------------

## 3. OpenSearch 벡터 Index 생성
### 선수 조건
- 랭체인 오프서처 참고 자료
    - [Langchain Opensearch](https://python.langchain.com/docs/integrations/vectorstores/opensearch)

### 오픈 서치 인덱스 유무에 따라 삭제
오픈 서치에 해당 인덱스가 존재하면, 삭제 합니다. 

In [11]:
from opensearchpy import OpenSearch, RequestsHttpConnection
http_auth = (opensearch_user_id, opensearch_user_password)
os_client = OpenSearch(
            hosts=[
                {'host': opensearch_domain_endpoint.replace("https://", ""),
                 'port': 443
                }
            ],
            http_auth=http_auth, # Master username, Master password,
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection
        )

### index 생성

In [12]:
# 인덱스 이름

index_name = "complex-doc-index-32"

In [13]:

exists = os_client.indices.exists(index_name)

if exists:
    os_client.indices.delete(index=index_name)
    print("Index is deleted")
else:
    print("Index does not exist")

Index does not exist


In [14]:
## metadata, text, vector_field 의 네이밍은 langchain에서 지정된 이름
### model에 따라 dimension 사이즈 변경 필요 (Titan : 1536, Cohere : 1024)
import json

with open('index_body_simple.json', 'r') as f:
    index_body = json.load(f)

print(json.dumps(index_body, indent=2))


{
  "settings": {
    "index.knn": true,
    "index.knn.algo_param.ef_search": 512
  },
  "mappings": {
    "properties": {
      "metadata": {
        "properties": {
          "source": {
            "type": "keyword"
          },
          "type": {
            "type": "keyword"
          },
          "timestamp": {
            "type": "date"
          }
        }
      },
      "vector_field": {
        "type": "knn_vector",
        "dimension": 1536,
        "method": {
          "engine": "faiss",
          "name": "hnsw",
          "parameters": {
            "ef_construction": 512,
            "m": 16
          },
          "space_type": "l2"
        }
      }
    }
  }
}


In [15]:
os_client.indices.create(index_name, body=index_body)

{'acknowledged': True,
 'shards_acknowledged': True,
 'index': 'complex-doc-index-32'}

In [16]:
%%time
from langchain_community.vectorstores import OpenSearchVectorSearch

vector_db = OpenSearchVectorSearch(
    index_name=index_name,
    opensearch_url=opensearch_domain_endpoint,
    embedding_function=llm_emb,
    http_auth=http_auth, # http_auth
)

CPU times: user 2.1 ms, sys: 673 μs, total: 2.77 ms
Wall time: 2.42 ms


## 4. 데이터 준비


In [35]:
# LLama Index 라이브러리 설치

!pip install llama-index llama-index-vector-stores-opensearch

Collecting llama-index-vector-stores-opensearch
  Downloading llama_index_vector_stores_opensearch-0.1.12-py3-none-any.whl.metadata (729 bytes)
Downloading llama_index_vector_stores_opensearch-0.1.12-py3-none-any.whl (6.7 kB)
Installing collected packages: llama-index-vector-stores-opensearch
Successfully installed llama-index-vector-stores-opensearch-0.1.12


In [28]:
import pypdf
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader

In [30]:
# PDF 파일이 있는 디렉토리 경로 지정 (파일 명이 아닌 디렉토리로 지정)
pdf_directory = "./rag_data/"

# PDF 파일 로드
documents = SimpleDirectoryReader(pdf_directory).load_data()

In [37]:
import time
from langchain_core.documents import Document
from datetime import datetime
import os
from multiprocessing.pool import ThreadPool
from multiprocessing import Manager

def process_document(param):
    vector_db, doc = param
    source_name = os.path.basename(doc.metadata['file_name'])
    type_name = source_name.split('_')[0]

    pruned_text = doc.text.replace('\n', ' ')

    if len(pruned_text) >= 20:  # 임의로 20자 이상인 텍스트만 처리
        chunk = Document(
            page_content=pruned_text,
            metadata={
                "source": source_name,
                "type": type_name,
                "timestamp": datetime.now(),
                "page_label": doc.metadata.get('page_label', '')
            }
        )
        vector_db.add_documents([chunk])



manager = Manager()
result_dict = manager.dict()

# documents 변수를 사용하여 파라미터 생성
param = [(vector_db, doc) for doc in documents]

num_processes = min(len(documents), os.cpu_count() - 1)
if num_processes < 1:
    num_processes = 1

print(f"Number of processes: {num_processes}")

with ThreadPool(processes=num_processes) as pool:
    pool.map(process_document, param)
    pool.close()
    pool.join()

print("Indexing completed.")

Number of processes: 7
Indexing completed.


### OpenSearch에 생성된 인덱스의 구성 확인

In [38]:
index_info = os_client.indices.get(index=index_name)
print(json.dumps(index_info, indent=2))

{
  "complex-doc-index-32": {
    "aliases": {},
    "mappings": {
      "properties": {
        "metadata": {
          "properties": {
            "page_label": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "source": {
              "type": "keyword"
            },
            "timestamp": {
              "type": "date"
            },
            "type": {
              "type": "keyword"
            }
          }
        },
        "text": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "vector_field": {
          "type": "knn_vector",
          "dimension": 1536,
          "method": {
            "engine": "faiss",
            "space_type": "l2",
            "name": "hnsw"

## 5. OpenSearch Hybrid 검색

OpenSearch Hybrid 는 아래와 같은 방식으로 작동합니다.
- (1) "Vector 서치" 하여 스코어를 얻은 후에 표준화를 하여 스코어를 구함. 
    - 전체 결과에서 가장 높은 스코어는 표준화 과정을 통하여 스코어가 1.0 이 됨.
- (2) Keyword 서치도 동일하게 함.
- (3) Reciprocal Rank Fusion (RRF) 기반 Re-rank
    - Paper: https://plg.uwaterloo.ca/~gvcormac/cormacksigir09-rrf.pdf
    - Desc: https://medium.com/@sowmiyajaganathan/hybrid-search-with-re-ranking-ff120c8a426d
    - **RRF의 경우 score가 아닌 ranking 정보를 활용, 때문에 score normalization이 필요 없음**

RRF는 langchain에서 "Ensemble Retriever" 이름으로 api를 제공합니다. 
- https://python.langchain.com/docs/modules/data_connection/retrievers/ensemble


### Ensemble retriever 정의
- https://python.langchain.com/docs/modules/data_connection/retrievers/ensemble
- RRF 방식만 지원
- Rank constant (param "c")
    - This value determines how much influence documents in individual result sets per query have over the final ranked result set. A higher value indicates that lower ranked documents have more influence. This value must be greater than or equal to 1. Defaults to 60.
    - 숫자 높을 수록 낮은 랭크의 문서가 더 중요시 된다

In [39]:
from opensearch_dsl import Search
from langchain.retrievers import EnsembleRetriever
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain.chains.question_answering import load_qa_chain
# from utils.rag import run_RetrievalQA, show_context_used

In [40]:
from langchain.schema import BaseRetriever
from typing import Any, Dict, List, Optional, List, Tuple
from langchain.callbacks.manager import CallbackManagerForRetrieverRun

# lexical(keyword) search based (using Amazon OpenSearch)
class OpenSearchLexicalSearchRetriever(BaseRetriever):
    os_client: Any
    index_name: str
    k = 3
    filter = []

    def normalize_search_results(self, search_results):
        hits = (search_results["hits"]["hits"])
        max_score = float(search_results["hits"]["max_score"])
        for hit in hits:
            hit["_score"] = float(hit["_score"]) / max_score
        search_results["hits"]["max_score"] = hits[0]["_score"]
        search_results["hits"]["hits"] = hits
        return search_results

    def update_search_params(self, **kwargs):
        self.k = kwargs.get("k", 3)
        self.filter = kwargs.get("filter", [])
        self.index_name = kwargs.get("index_name", self.index_name)

    def _reset_search_params(self, ):
        self.k = 3
        self.filter = []
        
    def query_lexical(self, query, filter=[], k=5):
        QUERY_TEMPLATE = {
            "size": k,
            "query": {
                "bool": {
                    "must": [
                        {
                            "match": {
                                "text": {
                                    "query": query,
                                    "operator":  "or"
                                }
                            }
                        }
                    ],
                    "filter": filter
                }
            }
        }
        
        if len(filter) > 0:
            QUERY_TEMPLATE["query"]["bool"]["filter"].extend(filter)
            
        return QUERY_TEMPLATE
    

    def _get_relevant_documents(
        self, query: str, *, run_manager: CallbackManagerForRetrieverRun) -> List[Document]:
        
        query = self.query_lexical(
            query=query,
            filter=self.filter,
            k=self.k
        )

        # print ("lexical search query: ")
        # print(query)
        
        search_results = self.os_client.search(
            body=query,
            index=self.index_name
        )

        results = []
        if search_results["hits"]["hits"]:
            search_results = self.normalize_search_results(search_results)
            for res in search_results["hits"]["hits"]:

                metadata = res["_source"]["metadata"]
                metadata["id"] = res["_id"]

                doc = Document(
                    page_content=res["_source"]["text"],
                    metadata=metadata
                )
                results.append((doc))

        self._reset_search_params()

        return results[:self.k]

In [41]:
prompt_template = """
\n\nHuman: 마지막에 질문 <question></question>에 대한 자세한 답변을 제공하기 위해 <context></context> 정보를 활용하세요.
모르는 내용은 답변을 지어내려고 하지 말고 모른다고 말하세요.

<context>
{context}
</context>

<question>{question}</question>

\n\nAssistant:"""


PROMPT = PromptTemplate(
    template=prompt_template,
    input_variables=["context", "question"]
)

In [42]:
chain = load_qa_chain(
    llm=llm_text,
    chain_type="stuff",
    prompt=PROMPT,
    verbose=False  # LLM 답변 과정을 출력하려면 True로 변경 합니다.
)

In [43]:
# 사용자 쿼리

query = """
관세법상 환급 대상에는 어떤것들이 있고, 언제까지 어떻게 청구해야 되는지 알려줘.

"""

### Hybrid Search 방식으로 RAG 검색

In [44]:
boolean_filter = []

In [45]:
opensearch_semantic_retriever = vector_db.as_retriever(
    search_type="similarity",
    search_kwargs={
        "k": 5,
        "boolean_filter": boolean_filter
    }
)

In [46]:
opensearch_lexical_retriever = OpenSearchLexicalSearchRetriever(
    os_client=os_client,
    index_name=index_name,
    k=3,
    filter=boolean_filter
)

In [47]:
ensemble_retriever = EnsembleRetriever(
    retrievers=[opensearch_lexical_retriever, opensearch_semantic_retriever],
    weights=[0.5, 0.5],
    c=100,
    k=5
)

In [48]:
answer = chain.invoke(
    {
        "input_documents": ensemble_retriever.get_relevant_documents(query), 
        "question": query
    }
)

print("\n\n##############################")
print("query: \n", query)
print("answer: \n", answer['output_text'])

  warn_deprecated(


관세법상 환급 대상에는 다음과 같은 것들이 있습니다.

1. 과오납금의 환급
- 관세, 가산세, 강제징수비의 과오납금이 있는 경우 환급 대상이 됩니다.

2. 계약내용과 상이한 물품에 대한 환급
- 수입신고 물품이 계약내용과 상이하고 수입신고 당시 성질이나 형태가 변경되지 않은 경우, 수입신고수리일로부터 1년 이내에 보세구역에 반입하여 수출하거나 보세공장에 반입한 경우 환급 대상이 됩니다.

3. 수입한 상태 그대로 수출되는 자가사용물품에 대한 관세환급
- 개인의 자가사용물품을 수입한 상태 그대로 수출하는 경우로서 수입신고수리일부터 6개월 이내에 보세구역에 반입하여 수출하거나 세관장 확인 후 수출하는 경우 환급 대상이 됩니다.

4. 지정보세구역 장치물품의 멸실, 손상으로 인한 관세환급
- 수입신고 물품이 지정보세구역에 장치된 상태에서 재해로 멸실, 변질, 손상되어 가치가 감소한 경우 환급 대상이 됩니다.

환급청구권은 권리를 행사할 수 있는 날부터 5년 이내에 관세청장이 지정한 세관에 환급신청을 해야 합니다. 구체적인 기산일은 사유별로 다릅니다.

##############################
query: 
 
관세법상 환급 대상에는 어떤것들이 있고, 언제까지 어떻게 청구해야 되는지 알려줘.


answer: 
 관세법상 환급 대상에는 다음과 같은 것들이 있습니다.

1. 과오납금의 환급
- 관세, 가산세, 강제징수비의 과오납금이 있는 경우 환급 대상이 됩니다.

2. 계약내용과 상이한 물품에 대한 환급
- 수입신고 물품이 계약내용과 상이하고 수입신고 당시 성질이나 형태가 변경되지 않은 경우, 수입신고수리일로부터 1년 이내에 보세구역에 반입하여 수출하거나 보세공장에 반입한 경우 환급 대상이 됩니다.

3. 수입한 상태 그대로 수출되는 자가사용물품에 대한 관세환급
- 개인의 자가사용물품을 수입한 상태 그대로 수출하는 경우로서 수입신고수리일부터 6개월 이내에 보세구역에 반입하여 수출하거나 세관장 확인 후 수출하는 경우 환급 대상이 됩니다.

4. 지정보