# 지식 문서 업로드 기능 구현 (약 20분 소요)

In [20]:
import boto3
%store -r opensearch_user_id opensearch_user_password domain_name opensearch_domain_endpoint

try:
    opensearch_user_id
    opensearch_user_password
    domain_name
    opensearch_domain_endpoint
   
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] Run 00_setup notebook first or Create Your Own OpenSearch Domain")
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")

In [2]:
%load_ext autoreload
%autoreload 2

In [18]:
import re
import json
import pdfplumber
from datetime import datetime
from langchain_core.documents import Document

### PDF 문서 처리방식 확인

In [16]:
pdf_list = ['./data/sample1.pdf']

In [43]:
# PDF에서 cid를 추출해서 ASCII 문자로 변환
def text_pruner(text, current_pdf_file):
    def replace_cid(match):
        ascii_num = int(match.group(1))
        try:
            return chr(ascii_num)
        except:
            return ''  # 변환 실패 시 빈 문자열로 처리
    cid_pattern = re.compile(r'\(cid:(\d+)\)')
    return re.sub(cid_pattern, replace_cid, text)

# PDF 파일 처리 함수
def process_pdf(pdf_file):
    print(f"Processing PDF file: {pdf_file}")
    docs = []
    source_name = pdf_file.split('/')[-1]
    type_name = source_name.split(' ')[-1].replace('.pdf', '')

    with pdfplumber.open(pdf_file) as pdf:
        # 페이지 단위 Chunking (단순화 된 방법)
        for page_number, page in enumerate(pdf.pages, start=1):
            page_text = page.extract_text()
            if page_text:
                pruned_text = text_pruner(page_text, pdf_file)
            else:
                pruned_text = ""
            # 텍스트 길이가 20 이상인 경우에만 Documnet로 저장
            if len(pruned_text) >= 20:  
                doc = Document(
                    page_content=pruned_text.replace('\n', ' '),
                    metadata={
                        "source": source_name,
                        "type": type_name,
                        "timestamp": datetime.now().isoformat()
                    }
                )
                docs.append(doc)
    if docs:
        load_document(docs)

def load_document(docs):
    # 동작방식을 확인하기 위해, 출력만 진행 (업데이트 예정)
    print("sample doc: ", docs[0])
    print("number of docs", len(docs))    

for pdf_file in pdf_list:
    process_pdf(pdf_file)


Processing PDF file: ./data/sample1.pdf
sample doc:  page_content='해외여행보험 보통약관 제1조(보험계약의 성립) ①보험계약은 보험계약자의 청약과 보험회사의 승낙으로 이루어집니 다.(이하 "보험계약"은 "계약","보험계약자"는 "계약자","보험회사"는 "회사"라 합니다) ② 회사는 계약의 청약을 받고 보험료 전액 또는 제1회 보험료(일정기간 단위의 분할보험료) 를 받은 경우에는 청약일(진단계약의 경우에는 진단일)로부터 30일 이내에 승낙 또는 거절의 통지를 하며 통지가 없으면 승낙한 것으로 봅니다. ③ 회사가 청약을 승낙한 때에는 지체없이 보험증권을 계약자에게 교부하여 드리며, 청약을 거절할 경우에는 거절통지와 함께 받은 금액을 계약자에게 돌려드립니다. ④ 이미 성립한 계약을 연장하거나 변경하는 경우에는 회사는 보험증권에 그 사실을 기재하거 나 서면으로 알림으로써 보험증권의 교부에 대신할 수 있습니다. 제2조(약관교부 및 설명의무 등) ① 회사는 계약을 체결할 때 계약자에게 보험약관을 드리고 그 약관의 중요한 내용을 설명하여 드립니다. ② 회사가 제1항에 의해 제공될 약관을 계약자에게 전달하지 아니하거나 약관의 중요한 내용 을 설명하지 아니한 때 또는 계약체결시 계약자가 청약서에 자필서명(날인을 포함합니다)을 하지 아니한 때에는 계약자는 계약일로부터 1월 이내에 계약을 취소할 수 있습니다. ③ 제2항에 따라 계약이 취소된 경우에는 회사는 이미 납입한 보험료를 계약자에게 돌려 드리 며, 보험료를 받은 기간에 대하여 보험개발원이 공시하는 정기예금이율로 계산한 금액을 더 하여 지급합니다. 제3조(보험료) ① 보험료는 다른 약정이 없으면 보험기간이 시작되기 전에 내어야 합니다. ② 다른 약정이 없으면 보험기간이 시작된 후라도 보험료를 받기 전에 생긴 손해는 보상하여 드리지 아니합니다. 제4조(회사의 책임의 시기 및 종기) ① 회사의 책임은 보험기간의 첫날 오후 4시에 시작하여 마지막날 오후 4시에 끝납니다. 그러나,

### OpenSearch를 지식 저장소로 활용

이제 지식 저장소로 OpenSearch를 활용하기 위해, 각 Document들을 보관합니다.

#### OpenSearch 클라이언트 생성

In [28]:
from opensearchpy import OpenSearch, RequestsHttpConnection

In [25]:
http_auth = (opensearch_user_id, opensearch_user_password)
os_client = OpenSearch(
            hosts=[
                {'host': opensearch_domain_endpoint.replace("https://", ""),
                 'port': 443
                }
            ],
            http_auth=http_auth, 
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection
        )

#### OpenSearch에 `sample_pdf` 인덱스 생성

In [27]:
with open('index_template.json', 'r') as f:
    index_body = json.load(f)

index_name = "sample_pdf"
exists = os_client.indices.exists(index_name)

if exists:
    os_client.indices.delete(index=index_name)
    print("Existing index has been deleted. Create new one.")
else:
    print("Index does not exist, Create one.")

os_client.indices.create(index_name, body=index_body)

Existing index has been deleted. Create new one.


{'acknowledged': True, 'shards_acknowledged': True, 'index': 'sample_pdf'}

In [33]:
from botocore.config import Config

In [34]:
region_name = 'us-east-1'
retry_config = Config(
        region_name=region_name,
        retries={
            "max_attempts": 10,
            "mode": "standard",
        },
    )
boto3_bedrock = boto3.client("bedrock-runtime", region_name=region_name, config=retry_config)

#### 벡터 임베딩 및 OpenSearch 벡터 저장/검색을 위한 클래스 생성

검색증강생성(RAG)의 자연어 기반 챗봇이 가능한 핵심 원리 중 하나는 벡터임베딩을 활용한 텍스트의 저장 및 검색입니다.

1. 자연어 텍스트를 벡터임베딩으로 변환해주는 Bedrock Embedding 모델을 정의하고,
2. OpenSearch에서 제공하는 벡터 저장/검색을 위한 클래스(`OpenSearchVectorSearch`)를 생성합니다.

In [35]:
from langchain.embeddings import BedrockEmbeddings
from langchain_community.vectorstores import OpenSearchVectorSearch

In [39]:
llmemb = BedrockEmbeddings(
    client=boto3_bedrock,
    model_id="amazon.titan-embed-g1-text-02"
)
dimension = 1536
print("Bedrock Embeddings Model Loaded")

vector_db = OpenSearchVectorSearch(
    index_name=index_name,
    opensearch_url=opensearch_domain_endpoint,
    embedding_function=llmemb,
    http_auth=http_auth,
)
print("OpenSearch Vector DB Client Created")

Bedrock Embeddings Model Loaded
OpenSearch Vector DB Client Created


#### 이제 문서 로드 함수 `load_document()`를 업데이트합니다

앞에서는 동작방식을 확인하기 위해, 문서를 print로만 출력하고 종료했었습니다.

In [44]:
def load_document(docs):
    vector_db.add_documents(docs)

for pdf_file in pdf_list:
    process_pdf(pdf_file)

Processing PDF file: ./data/sample1.pdf


실행이 끝난 후에 OpenSearch에서 `sample_pdf` 인덱스를 조회해보면, 텍스트와 이 텍스트의 문맥적 의미를 담는 벡터가 함께 저장된 것이 확인됩니다.

(아래 내용은 참고만 하셔도 됩니다)

<img src="./image/uploader-1.png">

### 파일 업로더를 Streamlit 애플리케이션에 통합

이제 Streamlit에서 업로드 된 파일이 자동으로 OpenSearch의 벡터 저장소에 보관되도록 처리하겠습니다.

In [114]:
%%writefile uploader.py
import os
import boto3
import re
import json
import pdfplumber
from datetime import datetime
from opensearchpy import OpenSearch, RequestsHttpConnection
from botocore.config import Config
from langchain_core.documents import Document
from langchain_community.embeddings import BedrockEmbeddings
from langchain_community.vectorstores import OpenSearchVectorSearch

opensearch_user_id = '{opensearch_user_id}'
opensearch_user_password = '{opensearch_user_password}'
domain_name = '{domain_name}'
opensearch_domain_endpoint = '{opensearch_domain_endpoint}'
index_name = 'sample_pdf'
region_name = 'us-east-1'


def setup_opensearch_client():
    http_auth = (opensearch_user_id, opensearch_user_password)
    os_client = OpenSearch(
                hosts=[{{'host': opensearch_domain_endpoint.replace("https://", ""), 'port': 443}}],
                http_auth=http_auth, 
                use_ssl=True,
                verify_certs=True,
                connection_class=RequestsHttpConnection
            )
    return os_client
    
os_client = setup_opensearch_client()

retry_config = Config(
    region_name=region_name,
    retries={{
        "max_attempts": 10,
        "mode": "standard",
    }},
)
boto3_bedrock = boto3.client("bedrock-runtime", region_name=region_name, config=retry_config)

llmemb = BedrockEmbeddings(
    client=boto3_bedrock,
    model_id="amazon.titan-embed-g1-text-02"
)

http_auth = (opensearch_user_id, opensearch_user_password)
vector_db = OpenSearchVectorSearch(
    index_name=index_name,
    opensearch_url=opensearch_domain_endpoint,
    embedding_function=llmemb,
    http_auth=http_auth,
)

# PDF에서 cid를 추출해서 ASCII 문자로 변환하는 함수
def text_pruner(text, current_pdf_file):
    def replace_cid(match):
        ascii_num = int(match.group(1))
        try:
            return chr(ascii_num)
        except:
            return ''  # 변환 실패 시 빈 문자열로 처리
    cid_pattern = re.compile(r'\(cid:(\d+)\)')
    return re.sub(cid_pattern, replace_cid, text)

# PDF 파일 처리 함수
def process_pdf(pdf_file):
    docs = []
    source_name = pdf_file.name
    type_name = source_name.split(' ')[-1].replace('.pdf', '')

    with pdfplumber.open(pdf_file) as pdf:
        for page_number, page in enumerate(pdf.pages, start=1):
            page_text = page.extract_text()
            if page_text:
                pruned_text = text_pruner(page_text, pdf_file)
            else:
                pruned_text = ""
            if len(pruned_text) >= 20:  
                doc = Document(
                    page_content=pruned_text.replace('\n', ' '),
                    metadata={{
                        "source": source_name,
                        "type": type_name,
                        "timestamp": datetime.now().isoformat()
                    }}
                )
                docs.append(doc)
    if docs:
        load_document(docs)

def load_document(docs):
    vector_db.add_documents(docs)
    print("Done")


# 인덱스 생성 및 관리
def manage_index(os_client):
    with open('index_template.json', 'r') as f:
        index_body = json.load(f)
        
    exists = os_client.indices.exists(index_name)
    if exists:
        os_client.indices.delete(index=index_name)
    else:
        os_client.indices.create(index_name, body=index_body)

def upload_and_process_file(uploaded_file):
    try:
        print("File uploaded:", uploaded_file.name)
        manage_index(os_client)
        process_pdf(uploaded_file)
        return True
    except Exception as e:
        print(f"Error processing file: {{e}}")
        return False
    

Overwriting uploader.py


In [115]:
file_path = 'uploader.py'  
with open(file_path, 'r') as file:
    content = file.read()

content = content.format(
    opensearch_user_id=opensearch_user_id,
    opensearch_user_password=opensearch_user_password,
    domain_name=domain_name,
    opensearch_domain_endpoint=opensearch_domain_endpoint
)

with open(file_path, 'w') as file:
    file.write(content)

In [116]:
%%writefile demo-app.py
from basic import get_conversation
import streamlit as st
from uploader import upload_and_process_file  # 추가된 부분

uploaded_file = st.file_uploader("파일을 업로드하세요", type=["pdf"])
prompt = st.text_input("프롬프트를 입력하세요.")
search_type = st.radio("Search Type", ["Basic", "Basic-RAG", "Hybrid-RAG", "Advanced-RAG"])

chat_box = st.empty()

if 'conversation' not in st.session_state or 'stream_handler' not in st.session_state:
    st.session_state.conversation, st.session_state.stream_handler = get_conversation(chat_box)

def search_documents(search_type: str, prompt: str):
    if search_type == "Basic":
        st.session_state.stream_handler.reset_accumulated_text()
        st.session_state.conversation.predict(input=prompt)
    elif search_type == "Basic-RAG":
        st.write(f"문서 기본 검색: {prompt}")
    elif search_type == "Hybrid-RAG":
        st.write(f"하이브리드 검색: {prompt}")
    elif search_type == "Advanced-RAG":
        st.write(f"고급 검색: {prompt}")

if st.button("검색"):
    search_documents(search_type, prompt)

if uploaded_file is not None:
    print("File uploaded:", uploaded_file.name)
    upload_and_process_file(uploaded_file)
else:
    print("No file uploaded.")

if uploaded_file is not None:
    res = upload_and_process_file(uploaded_file)
    if res:
        st.success("파일이 성공적으로 처리됐습니다.")
    else:
        st.error("파일 처리 중 오류가 발생했습니다.")

Overwriting demo-app.py


#### 이제 Streamlit 애플리케이션에서 파일을 업로드 했을 때 아래와 같이 표시됩니다

<img src="./image/uploader-2.png" width="800">