## OpenSearch Hybrid 검색을 통한 RAG

#### 설정

In [13]:
import sys, os
module_path = ".."
sys.path.append(os.path.abspath(module_path))

### 1. Bedrock Client 생성

In [14]:
from utils import bedrock

boto3_bedrock = bedrock.get_bedrock_client(
  assumed_role=None,
  endpoint_url=None,
  region='us-east-1'
)

Create new client
  Using region: us-east-1
  Using profile: None
boto3 Bedrock client successfully created!
bedrock-runtime(https://bedrock-runtime.us-east-1.amazonaws.com)


### 2. Titan Embedding 및 LLM Claude-v2 모델 로딩

#### LLM 로딩

In [38]:
from langchain.llms import Bedrock
from langchain.callbacks.stdout import StdOutCallbackHandler

# Create Anthropic Model
llm_text = Bedrock(
  model_id = "anthropic.claude-v2",
  client = boto3_bedrock,
  model_kwargs={
    "max_tokens_to_sample": 512
  },
  streaming=True,
  callbacks=[StdOutCallbackHandler()]
)

#### Embedding Model 선택

In [16]:
from langchain.embeddings import BedrockEmbeddings

llm_emb = BedrockEmbeddings(
  model_id="amazon.titan-embed-text-v1",
  client=boto3_bedrock,
  region_name="us-east-1",
)

### 전 단계 02_1에서 생성한 reindex 한 것을 지워서 reindex 과정 반복

In [17]:
# opensearch info
host = "localhost"
port = 9200
opensearch_endpoint = f"https://{host}:{port}"
http_auth = ("admin", "admin")
index_name = "genai-demo-index-v1"

ca_certs_path = 'root-ca.pem'
# Optional client certificates if you don't want to use HTTP basic authentication.
client_cert_path = 'admin.pem'
client_key_path = 'admin-key.pem'

#### OpenSearch ReIndexig
- 기존 "index genai-demo-index-v1"을 "genai-demo-index-v1-with-tokenizer"로 
- nori tokenizer 추가

OpenSearch Client

In [18]:
from opensearchpy import OpenSearch

# Create the client with SSL/TLS enabled, but hostname verification disabled.
os_client = OpenSearch(
    hosts = [{'host': host, 'port': port}],
    http_compress = True, # enables gzip compression for request bodies
    http_auth = http_auth,
    client_cert = client_cert_path,
    client_key = client_key_path,
    use_ssl = True,
    verify_certs = True,
    ssl_assert_hostname = False,
    ssl_show_warn = False,
    ca_certs = ca_certs_path
)

In [20]:
from pprint import pprint

index_info = os_client.indices.get(index=index_name)
pprint(index_info)

{'genai-demo-index-v1': {'aliases': {},
                         'mappings': {'properties': {'metadata': {'properties': {'row': {'type': 'long'},
                                                                                 'source': {'fields': {'keyword': {'ignore_above': 256,
                                                                                                                   'type': 'keyword'}},
                                                                                            'type': 'text'},
                                                                                 'timestamp': {'type': 'float'},
                                                                                 'type': {'fields': {'keyword': {'ignore_above': 256,
                                                                                                                 'type': 'keyword'}},
                                                                                          't

In [8]:
new_index_name = f"{index_name}-with-tokenizer"
new_index_name

'genai-demo-index-v1-with-tokenizer'

In [9]:
# use nori tokenizer 사용
# analyzer_config = {
#   "tokenizer": "nori",
#   "tokenizer_type": "nori_tokenizer",
#   "char_filter": ["html_strip"],
#   "filter": ["nori_number", "nori_readingform", "lowercase"],
#   "decompound_mode": "mixed",
#   "discard_punctuation": "true"
# }

#### index 수정: 형태소 분석기 사용 enablement

In [10]:
# nori 형태소 분석기를 custom analyzer 등록
index_info[index_name]["settings"]["analysis"] = {
  "tokenizer": {
    "nori": {
      "type": "nori_tokenizer",
      "decompound_mode": "mixed",
      "discard_punctuation": "true"
    }
  },
  "analyzer": {
    "my_analyzer": {
      "type": "custom",
      "char_filter": ["html_strip"],
      "tokenizer": "nori",
      "filter": ["nori_number", "nori_readingform", "lowercase"]  # token filter
    }
  }
}

# analyzer가 적용될 칼럼에 맞추어 수정
index_info[index_name]["mappings"]["properties"]["text"]["analyzer"] = "my_analyzer"
index_info[index_name]['mappings']['properties']['text']['search_analyzer'] = "my_analyzer"

# index 설정 변경 없음
index_info[index_name]["settings"]["index"] = {
  "number_of_shards": "5",
  "knn.algo_param": {"ef_search": "512"},
  "knn": True,
  "number_of_replicas": "2"
}

# del index alias
# del index_info[index_name]["aliases"]
new_index_info = index_info[index_name]


In [11]:
pprint(new_index_info)

{'aliases': {},
 'mappings': {'properties': {'metadata': {'properties': {'row': {'type': 'long'},
                                                         'source': {'fields': {'keyword': {'ignore_above': 256,
                                                                                           'type': 'keyword'}},
                                                                    'type': 'text'},
                                                         'timestamp': {'type': 'float'},
                                                         'type': {'fields': {'keyword': {'ignore_above': 256,
                                                                                         'type': 'keyword'}},
                                                                  'type': 'text'}}},
                             'text': {'analyzer': 'my_analyzer',
                                      'fields': {'keyword': {'ignore_above': 256,
                                                    

In [12]:
from utils.opensearch import opensearch_utils

index_exists = opensearch_utils.check_if_index_exists(os_client, new_index_name)
if index_exists:
    opensearch_utils.delete_index(os_client, new_index_name)
else:
    print("Index does not exist")

index_name=genai-demo-index-v1-with-tokenizer, exists=False
Index does not exist


In [13]:
# create index
reponse = os_client.indices.create(new_index_name, body=new_index_info)
print(reponse)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'genai-demo-index-v1-with-tokenizer'}


In [15]:
# reindex
_reindex = {
  "source": {"index": index_name},
  "dest": {"index": new_index_name}
}
print(_reindex)
response = os_client.reindex(_reindex)
print(response)

{'source': {'index': 'genai-demo-index-v1'}, 'dest': {'index': 'genai-demo-index-v1-with-tokenizer'}}
{'took': 470, 'timed_out': False, 'total': 92, 'updated': 0, 'created': 92, 'deleted': 0, 'batches': 1, 'version_conflicts': 0, 'noops': 0, 'retries': {'bulk': 0, 'search': 0}, 'throttled_millis': 0, 'requests_per_second': -1.0, 'throttled_until_millis': 0, 'failures': []}


## 3. LangChain OpenSearch VectorStore 생성

#### 새로 생성한 index name으로 변경

In [21]:
index_name = "genai-demo-index-v1-with-tokenizer"

In [22]:
from langchain.vectorstores import OpenSearchVectorSearch

vector_db = OpenSearchVectorSearch(
  opensearch_url=opensearch_endpoint,
  embedding_function=llm_emb,
  index_name =index_name,
  http_auth=http_auth,
  is_aoss=False,
  engine="faiss",
  space_type="l2",
  use_ssl=True,
  verify_certs=True,
  ssl_assert_hostname=False,
  ssl_show_warn=False,
  ca_certs=ca_certs_path
)


In [None]:
index_info = os_client.indices.get(index_name)
pprint(index_info)

## 4. OpenSearch "유사서치" 검색

In [24]:
import copy
from langchain.schema import Document
from langchain import PromptTemplate
from operator import itemgetter

### 1. OpenSearch Vector 검색

#### prompt template 생성

In [26]:
from utils.rag import run_RetrievalQA, show_context_used
from langchain.prompts import PromptTemplate

In [35]:
prompt_template = """

Human: Here is the context, inside <context></context> XML tags.

<context>
{context}
</context>

Only using the context as above, answer the following question with the rule as below:
    - Don't insert XML tag such as <context> and </context> when answering.
    - Write as much as you can
    - Be courteous and polite
    - Only answer the question if you can find the answer in the context with certainty.

Question:
{question}

If the answer is not in the context, just say "주어진 내용에서 관련 답변을 찾을 수 없습니다."


Assistant:"""

PROMPT = PromptTemplate(
  template=prompt_template, input_variables=["context", "question"]
)
pprint(PROMPT)

PromptTemplate(input_variables=['context', 'question'], template='\n\nHuman: Here is the context, inside <context></context> XML tags.\n\n<context>\n{context}\n</context>\n\nOnly using the context as above, answer the following question with the rule as below:\n    - Don\'t insert XML tag such as <context> and </context> when answering.\n    - Write as much as you can\n    - Be courteous and polite\n    - Only answer the question if you can find the answer in the context with certainty.\n\nQuestion:\n{question}\n\nIf the answer is not in the context, just say "주어진 내용에서 관련 답변을 찾을 수 없습니다."\n\n\nAssistant:')


#### 필터 생성

In [31]:
from utils.opensearch import opensearch_utils

filter01="홈페이지"
filter02="신한은행"

query="홈페이지 이용자아이디 여러 개 사용할 수 있나요?"

boolean_filter = opensearch_utils.get_filter(
  filter=[
    {"term": {"metadata.type": filter01}},
    {"term": {"metadata.source": filter02}},
  ]
)

pprint(boolean_filter)

{'bool': {'filter': [{'term': {'metadata.type': '홈페이지'}},
                     {'term': {'metadata.source': '신한은행'}}]}}


#### LangChain RetrievalQA를 통해 실행

In [None]:
from termcolor import colored
from pprint import pprint

result = run_RetrievalQA(
  query=query,
  boolean_filter=boolean_filter,
  llm=llm_text,
  prompt=PROMPT,
  vector_db=vector_db,
  verbose=True,
  k=5
)

print("#"*100)
print("query: ", query)
print("boolean_filter: ", boolean_filter)
print("#"*100)

print(colored("\n\n### Answer ###", "blue"))
print(result['result'])

print (colored("\n\n### Contexts ###", "green"))
show_context_used(result['source_documents'])

### 2. OpenSearch Keyword 검색

In [42]:
from utils.opensearch import opensearch_utils
from utils.rag import get_lexical_similar_docs
from langchain.chains.question_answering import load_qa_chain

In [44]:
filter01="홈페이지"
filter02="신한은행"

query = "홈페이지 이용자아이디 여러 개 사용할 수 있나요?"

search_keyword_result = get_lexical_similar_docs(
  query=query,
  minimum_should_match=25,
  filter=[
    {"term": {"metadata.type": filter01}},
    {"term": {"metadata.source": filter02}},
  ],
  os_client=os_client,
  index_name=index_name,
  k=5,
  hybrid=False
)
print(search_keyword_result)

lexical search query: 
{'query': {'bool': {'filter': [{'term': {'metadata.type': '홈페이지'}},
                               {'term': {'metadata.source': '신한은행'}}],
                    'must': [{'match': {'text': {'minimum_should_match': '25%',
                                                 'operator': 'or',
                                                 'query': '홈페이지 이용자아이디 여러 개 '
                                                          '사용할 수 있나요?'}}}]}},
 'size': 5}
[Document(page_content='ask: 홈페이지 이용자아이디 여러 개 사용할 수 있나요?\nInformation: 홈페이지 이용자 아이디는 개인의 경우 1인 1개만 이용 가능하고 기업의 경우에는 1개의 사업자번호당 사용자별로 이용자아이디를 복수로 사용할 수 있습니다. \r\n※ 개인사업자의 경우 개인과 기업 각각 이용자아이디를 발급하여 복수로 이용 가능합니다. \r\n기타 궁금하신 내용은 신한은행 고객센터 1599-8000로 문의하여 주시기 바랍니다.', metadata={'source': '신한은행', 'row': 21, 'type': '홈페이지', 'timestamp': 1701838934.879154, 'id': '618f6304-66fc-46a8-9322-330fa5707497'}), Document(page_content='ask: 영업점에서 인터넷뱅킹 가입하고 왔는데 홈페이지회원은 별도로 가입해야 하나요?\nInformation: 인터넷뱅킹 가입과 홈페이지 회원은 개별로 운영되고 있습니다. \r\n인

In [45]:
chain = load_qa_chain(
  llm=llm_text,
  chain_type="stuff",
  prompt=PROMPT,
  verbose=True
)

answer = chain.run(
  input_documents=search_keyword_result,
  question=query
)

print("##############################")
print("query: \n", query)
print("answer: \n", answer)



[1m> Entering new StuffDocumentsChain chain...[0m


[1m> Entering new LLMChain chain...[0m
Prompt after formatting:
[32;1m[1;3m

Human: Here is the context, inside <context></context> XML tags.

<context>
ask: 홈페이지 이용자아이디 여러 개 사용할 수 있나요?
Information: 홈페이지 이용자 아이디는 개인의 경우 1인 1개만 이용 가능하고 기업의 경우에는 1개의 사업자번호당 사용자별로 이용자아이디를 복수로 사용할 수 있습니다. 
※ 개인사업자의 경우 개인과 기업 각각 이용자아이디를 발급하여 복수로 이용 가능합니다. 
기타 궁금하신 내용은 신한은행 고객센터 1599-8000로 문의하여 주시기 바랍니다.

ask: 영업점에서 인터넷뱅킹 가입하고 왔는데 홈페이지회원은 별도로 가입해야 하나요?
Information: 인터넷뱅킹 가입과 홈페이지 회원은 개별로 운영되고 있습니다. 
인터넷뱅킹만 가입을 하셨다면 별도로 홈페이지에 회원가입 후 홈페이지 로그인이 가능합니다. 
※ 영업점에서 인터넷뱅킹 가입 시 전계좌조회서비스 가입하신 고객은 홈페이지 상에서 이용자비밀번호 등록 후 이용가능합니다. 
기타 궁금하신 내용은 신한은행 고객센터 1599-8000로 문의하여 주시기 바랍니다.

ask: 홈페이지 회원아이디를 변경하고 싶습니다.
Information: 회원탈퇴를 하시고, 사용하시고 싶은 아이디로 신규 회원가입신청을 하시면 됩니다. 다만, 사용하시고 싶은 아이디가 이미 다른 고객이 사용하고 있는 경우에는 사용하실 수 없습니다. 기타 문의는 콜센터 1599-8000번으로 문의 바랍니다.

ask: 홈페이지상에 제가 등록한 칭찬/불만/제안사항 조회할 수 있나요?
Information: 로그인 후 등록한 접수내용에 대해서 확인 가능합니다.
[경로 안내] 홈페이지 로그인 → (우측상단) 고객센터 →

### OpenSearch Hybrid 검색

In [46]:
from utils.rag import get_semantic_similar_docs, get_lexical_similar_docs, get_ensemble_results

In [51]:
def search_hybrid(**kwargs):
  assert "query" in kwargs, "Check your query"
  assert "vector_db" in kwargs, "Check your vector_db"
  assert "index_name" in kwargs, "Check your index_name"
  assert "os_client" in kwargs, "Check your os_client"

  verbose = kwargs.get("verbose", False)

  similar_docs_semantic = get_semantic_similar_docs(
    vector_db=vector_db,
    query=kwargs['query'],
    k=kwargs.get("k", 5),
    hybrid=True
  )

  similar_docs_keyword = get_lexical_similar_docs(
    query=kwargs["query"],
    minimum_should_match=kwargs.get("minimum_should_match", 0),
    filter=kwargs.get("filter", {}),
    index_name=kwargs['index_name'],
    os_client=kwargs['os_client'],
    k=kwargs.get("k", 5),
    hybrid=True
  )

  similar_docs_ensemble = get_ensemble_results(
    doc_lists=[similar_docs_semantic, similar_docs_keyword],
    weights=kwargs.get("ensemble_weights", [.5,.5]),
    algorithm=kwargs.get("fusion_algorithm", "RRF"),
    c=60,
    k=kwargs.get("k",5)
  )

  if verbose:
    print("##############################")
    print("similar_docs_semantic")
    print("##############################")
    print(similar_docs_semantic)

    print("##############################")
    print("similar_docs_keyword")
    print("##############################")
    print(similar_docs_keyword)

    print("##############################")
    print("similar_docs_ensemble")
    print("##############################")
    print(similar_docs_ensemble)

  similar_docs_ensemble = list(map(lambda x:x[0], similar_docs_ensemble))

  return similar_docs_ensemble

In [52]:
filter01="홈페이지"
filter02="신한은행"

query = "홈페이지 이용자아이디 여러 개 사용할 수 있나요?"

search_hybrid_result = search_hybrid(
  query=query,
  vector_db=vector_db,
  k=5,
  index_name=index_name,
  os_client=os_client,
  filter=[
    {"term": {"metadata.type": filter01}},
    {"term": {"metadata.source": filter02}},
  ],
  fusion_algorithm="RRF",
  ensemble_weights=[.5,.5],
  verbose=True
)

answer = chain.run(
  input_documents=search_hybrid_result,
  question=query
)

lexical search query: 
{'query': {'bool': {'filter': [{'term': {'metadata.type': '홈페이지'}},
                               {'term': {'metadata.source': '신한은행'}}],
                    'must': [{'match': {'text': {'minimum_should_match': '0%',
                                                 'operator': 'or',
                                                 'query': '홈페이지 이용자아이디 여러 개 '
                                                          '사용할 수 있나요?'}}}]}},
 'size': 5}
##############################
similar_docs_semantic
##############################
[(Document(page_content='ask: 홈페이지 이용자아이디 여러 개 사용할 수 있나요?\nInformation: 홈페이지 이용자 아이디는 개인의 경우 1인 1개만 이용 가능하고 기업의 경우에는 1개의 사업자번호당 사용자별로 이용자아이디를 복수로 사용할 수 있습니다. \r\n※ 개인사업자의 경우 개인과 기업 각각 이용자아이디를 발급하여 복수로 이용 가능합니다. \r\n기타 궁금하신 내용은 신한은행 고객센터 1599-8000로 문의하여 주시기 바랍니다.', metadata={'source': '신한은행', 'row': 21, 'type': '홈페이지', 'timestamp': 1701838934.879154}), 1.0), (Document(page_content='ask: 홈페이지 회원아이디를 변경하고 싶습니다.\nInformation: 회원탈퇴를 하시고, 사용하

In [53]:
print(f'question: {query}')
print(f'response: {answer}')

question: 홈페이지 이용자아이디 여러 개 사용할 수 있나요?
response:  네, 홈페이지 이용자아이디는 개인의 경우 1인 1개만 이용 가능하고 기업의 경우에는 1개의 사업자번호당 사용자별로 이용자아이디를 복수로 사용할 수 있습니다. 개인사업자의 경우 개인과 기업 각각 이용자아이디를 발급하여 복수로 이용 가능하다고 하셨습니다.
