In [None]:
!pip install -r requirements.txt


In [None]:
from langchain_aws import ChatBedrockConverse
from langchain_core.documents import Document
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_core.prompts import BasePromptTemplate
from langchain_core.output_parsers import BaseOutputParser
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from opensearchpy import OpenSearch, RequestsHttpConnection
import os
import boto3
import json
import sys
from typing import List, Optional, Dict, Tuple
from langchain_core.documents import Document
from pypdf import PdfReader
import concurrent.futures
from langchain_aws import BedrockEmbeddings
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.prompts import PromptTemplate
from operator import itemgetter
from langchain_core.runnables import RunnableLambda
from pydantic import BaseModel, Field
from sentence_transformers import SentenceTransformer
import torch


In [2]:
device = 'cpu'
embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device=device)

In [3]:

#text를 vector로 변환
def get_embedding(text):
    embeddings = embedding_model.encode(text)
    return embeddings


In [4]:
len(get_embedding("안녕하세요?"))

384

In [5]:
#opensearch에 index(sql table과 유사) 생성
def define_index(opensearch_client, index_name):
    index_settings = {
        "settings": {
            "index": {  
                "knn": True,                          
                "knn.algo_param.ef_search": 100,      
                "number_of_shards": 3,                
                "number_of_replicas": 2, 
                "analysis": {
                    "analyzer": {
                        "nori_analyzer": {
                            "tokenizer": "nori_tokenizer",
                            "filter": ["nori_stop", "lowercase"]
                        }
                    },
                    "filter": {
                        "nori_stop": {
                            "type": "nori_part_of_speech",
                            "stoptags": ["J", "JKS", "JKB", "JKO", "JKG", "JKC", "JKV", "JKQ", "JX", "JC"]
                        }
                    }
                }
            }
        },   
        "mappings": {
            "properties": {
                "document": {
                    "type": "text",
                    "analyzer": "nori_analyzer"
                },
                "doc_vector": {
                    "type": "knn_vector",
                    "dimension": 384,
                    "method": {
                        "name": "hnsw",   
                        "space_type": "l2",
                        "engine": "faiss",
                        "parameters": {
                            "ef_construction": 128,
                            "m": 16
                        }
                    }
                },
                "source": {
                    "type": "keyword"  # Use keyword for exact matching
                },
                "page": {
                    "type": "integer"
                },
                "total_pages": {
                    "type": "integer"
                }
            }
        }
    }
    try:
        response = opensearch_client.indices.create(
            index=index_name,
            body=index_settings
        )
        print("Index created successfully:", response)
        
    except Exception as e:
        print("Error creating index:", e)

In [6]:
#pdf파일 opensearch에 적재
def save_chunk(opensearh_client,  index_name,chunks):
    for d in enumerate(chunks):
        document = d[1].page_content
        raw_embedding = get_embedding(d[1].page_content)
        doc_vector = [float(val) for val in raw_embedding]
        source = d[1].metadata["source"]
        page_info= d[1].metadata["page"]
        total_page=d[1].metadata["total_pages"]
    
        # Prepare document for indexing
        document = {
            "document": document,
            "doc_vector":doc_vector,
            "source" : source, 
            "page" : page_info , 
            "total_pages" : total_page 
        }
        try:
            response = opensearh_client.index(
                index=index_name,
                body=document,
                id=f"sagemaker_doc_{d[0]}"
            )
            if d[0] % 500 ==0 :
                print(f"Document {d[0]} indexed successfully")
        except Exception as e:
            print(f"Error indexing document {d[0]}: {e}")


In [7]:
#속도를 위해 parallel하게 진행. 페이지를 부분적으로 나누어서 text파일 읽기
def process_page_range(pdf_path, start_page, end_page):
    """Process a range of pages from a PDF."""
    # Open the PDF
    doc = fitz.open(pdf_path)
    total_pages = doc.page_count
    
    batch_docs = []
    for i in range(start_page, end_page):
        if i >= total_pages:
            break
            
        # Get page and extract text
        page = doc[i]
        text = page.get_text()
        
        # Create Document object
        doc_obj = Document(
            page_content=text,
            metadata={
                "source": pdf_path,
                "page": i + 1,
                "total_pages": total_pages
            }
        )
        batch_docs.append(doc_obj)
    
    return batch_docs
    
#pdf파일 읽기 
def load_pdf_parallel(pdf_path, batch_size=100, max_workers=4):
    """Load a PDF using parallel processing."""
    # Get total pages
    doc = fitz.open(pdf_path)
    total_pages = doc.page_count
    doc.close()
    print(f"PDF has {total_pages} pages in total")
    
    # Create batches
    batches = []
    for start_page in range(0, total_pages, batch_size):
        end_page = min(start_page + batch_size, total_pages)
        batches.append((start_page, end_page))
    
    all_docs = []
    
    with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
        # Submit tasks
        future_to_batch = {
            executor.submit(process_page_range, pdf_path, start, end): (start, end)
            for start, end in batches
        }
        
        # Process results as they complete
        for future in concurrent.futures.as_completed(future_to_batch):
            start, end = future_to_batch[future]
            try:
                batch_docs = future.result()
                all_docs.extend(batch_docs)
                print(f"Completed batch: pages {start+1} to {end}")
            except Exception as e:
                print(f"Error processing pages {start+1} to {end}: {e}")
    
    # Sort by page number
    all_docs.sort(key=lambda x: x.metadata["page"])
    
    return all_docs

In [21]:
#opensearh client 연결 생성 및 확인
opensearch_client = OpenSearch(
    hosts = [{'host': 'search-bank-opensearch-domain-hz673yoao6ld43n3atan6uqvgu.ap-northeast-2.es.amazonaws.com', 'port': 443}],
    http_auth = ('Bankadmin', 'Bankadmin123!'),  # Replace with your master credentials
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)
try:
    response = opensearch_client.info()
    print("Successfully connected to OpenSearch")
    print(response)
except Exception as e:
    print(f"Connection failed: {e}")

Successfully connected to OpenSearch
{'name': '231d6b0ab6706e8eefe31db7bf507b69', 'cluster_name': '008971635601:bank-opensearch-domain', 'cluster_uuid': 'c3SpWUfdRC2_5LzPRUmGAA', 'version': {'number': '7.10.2', 'build_type': 'tar', 'build_hash': 'unknown', 'build_date': '2025-10-02T02:20:56.818914285Z', 'build_snapshot': False, 'lucene_version': '10.2.1', 'minimum_wire_compatibility_version': '2.19.0', 'minimum_index_compatibility_version': '2.0.0'}, 'tagline': 'The OpenSearch Project: https://opensearch.org/'}


In [37]:
index_name = "nori_test_index"

index_body = {
    "settings": {
        "analysis": {
            "analyzer": {
                "nori_analyzer": {
                    "type": "nori",
                    "decompound_mode": "mixed"
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "title": {
                "type": "text",
                "analyzer": "nori_analyzer"
            },
            "content": {
                "type": "text",
                "analyzer": "nori_analyzer"
            }
        }
    }
}

In [39]:
try:
    delete_response = opensearch_client.indices.delete(index=index_name)
    print(f"Index 's3_explain' deleted successfully: {delete_response}")
except Exception as delete_error:
    print(f"Failed to delete index 's3_explain': {delete_error}")
    
except Exception as e:
    print(f"Connection failed: {e}")

Index 's3_explain' deleted successfully: {'acknowledged': True}


In [40]:
# 인덱스 생성

opensearch_client.indices.create(index=index_name, body=index_body)


{'acknowledged': True, 'shards_acknowledged': True, 'index': 'nori_test_index'}

In [41]:
# 2. 테스트 문서 인덱싱
test_docs = [
    {"title": "한국어 형태소 분석", "content": "노리 분석기는 한국어를 잘 처리합니다"},
    {"title": "은행 업무 시스템", "content": "금융권에서 사용하는 검색 시스템입니다"},
    {"title": "OpenSearch 활용", "content": "엘라스틱서치 호환 검색엔진입니다"}
]


In [42]:
for i, doc in enumerate(test_docs):
    opensearch_client.index(index=index_name, id=i+1, body=doc)



In [44]:
for i, doc in enumerate(test_docs):
    print(f"\n문서 {i+1}: {doc['title']}")
    print(f"원문: {doc['content']}")
    
    # 노리 analyzer로 분석
    analyze_result = opensearch_client.indices.analyze(
        index=index_name,
        body={
            "analyzer": "nori_analyzer",
            "text": doc['content']
        }
    )
    
    tokens = [token['token'] for token in analyze_result['tokens']]
    print(f"분석된 토큰: {' | '.join(tokens)}")







문서 1: 한국어 형태소 분석
원문: 노리 분석기는 한국어를 잘 처리합니다
분석된 토큰: 노리 | 분석기 | 분석 | 기 | 한국어 | 한국 | 어 | 처리

문서 2: 은행 업무 시스템
원문: 금융권에서 사용하는 검색 시스템입니다
분석된 토큰: 금융 | 사용 | 검색 | 시스템 | 입니다 | 이

문서 3: OpenSearch 활용
원문: 엘라스틱서치 호환 검색엔진입니다
분석된 토큰: 엘라스틱 | 서치 | 호환 | 검색 | 엔진 | 입니다 | 이


In [None]:


# 4. 검색 테스트
search_query = {
    "query": {
        "match": {
            "content": "한국어"
        }
    }
}

search_result = opensearch_client.search(index=index_name, body=search_query)
print(f"\n검색 결과: {search_result['hits']['total']['value']}건")



In [22]:
#pdf파일을 chunk로 나눈다
path="/home/jovyan/langtest/langraph_work_shop/data/s3-userguide.pdf"
page=load_pdf_parallel(path,max_workers=os.cpu_count())

PDF has 2716 pages in total


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Completed batch: pages 101 to 200
Completed batch: pages 1 to 100
Completed batch: pages 201 to 300
Completed batch: pages 301 to 400
Completed batch: pages 401 to 500
Completed batch: pages 501 to 600
Completed batch: pages 601 to 700
Completed batch: pages 701 to 800
Completed batch: pages 801 to 900
Completed batch: pages 901 to 1000
Completed batch: pages 1001 to 1100
Completed batch: pages 1101 to 1200
Completed batch: pages 1201 to 1300
Completed batch: pages 1301 to 1400
Completed batch: pages 1401 to 1500
Completed batch: pages 1501 to 1600
Completed batch: pages 1601 to 1700
Completed batch: pages 1701 to 1800
Completed batch: pages 1801 to 1900
Completed batch: pages 1901 to 2000
Completed batch: pages 2001 to 2100
Completed batch: pages 2101 to 2200
Completed batch: pages 2201 to 2300
Completed batch: pages 2301 to 2400
Completed batch: pages 2401 to 2500
Completed batch: pages 2501 to 2600
Completed batch: pages 2601 to 2700
Completed batch: pages 2701 to 2716


In [23]:
#반환된 결과는 List[Document] 형태입니다.
from langchain_text_splitters import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(chunk_size=600, chunk_overlap=100)
split_docs = text_splitter.split_documents(page)


In [24]:
print("총 갯수 -> ",len(split_docs))

총 갯수 ->  7468


In [25]:
try:
    delete_response = opensearch_client.indices.delete(index='s3_explain')
    print(f"Index 's3_explain' deleted successfully: {delete_response}")
except Exception as delete_error:
    print(f"Failed to delete index 's3_explain': {delete_error}")
    
except Exception as e:
    print(f"Connection failed: {e}")

Index 's3_explain' deleted successfully: {'acknowledged': True}


In [26]:
#index 생성
define_index(opensearch_client, "s3_explain")

Index created successfully: {'acknowledged': True, 'shards_acknowledged': True, 'index': 's3_explain'}


In [27]:
save_chunk(opensearch_client, "s3_explain" ,  split_docs[1000:2500])

Document 0 indexed successfully
Document 500 indexed successfully
Document 1000 indexed successfully


In [28]:
#BM25 search ( nori_analyzer를 이용하여 형태소 분석을 진행한후 텍스트 유사도 측정 )
def simple_text_search(client, search_text, index_name="s3_explain", k=10):
    try:
        # Simple text search query
        text_query = {
            "size": k,  # Number of results to return
            "_source": {
                "excludes": ["doc_vector"]  # Exclude vector field from results
            },
            "query": {
                "match": {
                    "document": {
                        "query": search_text,
                        "analyzer": "nori_analyzer"
                    }
                }
            }
        }

        # Execute search
        response = client.search(
            index=index_name,
            body=text_query
        )


        documents = []
        if response.get("hits", {}).get("hits", []):
            search_results = normalize_search_results(response) 
            for res in search_results["hits"]["hits"]:
                source = res['_source']
                page_content = {k: source[k] for k in source if k != "table_summary_v"}
                metadata = {"id": res['_id']}
                score = res['_score']  # Get the score from the search result
                documents.append((Document(page_content=json.dumps(page_content, ensure_ascii=False), metadata=metadata), score))
        return documents  

    except Exception as e:
        print(f"Search error: {str(e)}")
        return None

#BM25 search의 score는 0.1 to 15.0 임으로 normazlie(0~1)를 해야함 (추후 vector search와 score 비교를 하기위해)
def normalize_search_results(search_results):
        hits = (search_results["hits"]["hits"])
        max_score = float(search_results["hits"]["max_score"])
        for hit in hits:
            hit["_score"] = float(hit["_score"]) / max_score
        search_results["hits"]["max_score"] = hits[0]["_score"]
        search_results["hits"]["hits"] = hits
        return search_results

In [29]:
#vector 유사도 검색 
def vector_search(client, embedding_vector, index_name="s3_explain", k=3):
    try:
        # KNN vector search query
        vector_query = {
            "size": k,
            "_source": {
                "excludes": ["doc_vector"]  # Exclude vector field from results
            },
            "query": {
                "knn": {
                    "doc_vector": {
                        "vector": embedding_vector,
                        "k": k
                    }
                }
            }
        }

        # Execute the search
        response = client.search(
            index=index_name,
            body=vector_query
        )

        documents = []
        for res in response["hits"]["hits"]:
            source = res['_source']
            page_content = {k: source[k] for k in source if k != "vector"}
            metadata = {"id": res['_id']}  # Add metadata with a unique identifier
            score = res['_score']  # Get the match score from the search result
            documents.append((Document(page_content=json.dumps(page_content, ensure_ascii=False), metadata=metadata), score))
        return documents
            

        return results

    except Exception as e:
        print(f"Search error: {str(e)}")
        return None

In [30]:
#노리를 통해 검색한 10개 + vector 유사도 기반으로 검색한 10개 총 20개를 정렬
def get_ensemble_results(doc_lists: List[List[Tuple[Document, float]]], weights: List[float], k: int = 5) -> List[Document]:
        hybrid_score_dic: Dict[str, float] = {}
        doc_map: Dict[str, Document] = {}
        
        # Weight-based adjustment
        for doc_list, weight in zip(doc_lists, weights):
            for doc, score in doc_list:
                doc_id = doc.metadata.get("id", doc.page_content)
                if doc_id not in hybrid_score_dic:
                    hybrid_score_dic[doc_id] = 0.0
                hybrid_score_dic[doc_id] += score * weight
                doc_map[doc_id] = doc
    
        sorted_docs = sorted(hybrid_score_dic.items(), key=lambda x: x[1], reverse=True)
        return [doc_map[doc_id] for doc_id, _ in sorted_docs[:k]]

In [31]:
#하이브리드 서치 코드 -> 노리를 통해 검색한 10개 + vector 유사도 기반으로 검색한 10개 총 20개에서 rerank를 통해 최종 10개만 선택한다
def retrieval_augmented(query , k =10): 
    
    #embedding 
    raw_embedding = get_embedding(query)
    #vector search 가져오기 
    vector=vector_search(opensearch_client, raw_embedding ,k =k)
    #lexical search 가져오기 
    lexical=simple_text_search(opensearch_client, query ,k =k)

    #rerank
    rerank_doc = get_ensemble_results(
            doc_lists=[vector, lexical],
            weights= [0.7, 0.3],
            k=k,
    )
    return rerank_doc
    

In [32]:
#결과 검색
result =retrieval_augmented("Amazon S3 GetObject 에 대해 설명해줘")

In [33]:
result

[Document(metadata={'id': 'sagemaker_doc_1017'}, page_content='{"document": "하고 필요한 데이터의 하위 집합만 검색할 수 있습니다. Amazon S3 Select를 사용하여 이 데이터를 필\\n데이터 쿼리 적용\\nAPI 버전 2006-03-01 656", "source": "/home/jovyan/langtest/langraph_work_shop/data/s3-userguide.pdf", "page": 674, "total_pages": 2716}'),
 Document(metadata={'id': 'sagemaker_doc_1282'}, page_content='{"document": "• 고객 제공 키를 사용한 서버 측 암호화(SSE-C)\\n• Amazon S3 콘솔에서 새 버킷을 생성할 때 기존 버킷 설정을 복사하는 옵션입니다.\\n디렉터리 버킷에서 지원되지 않는 Amazon S3 기능\\nAPI 버전 2006-03-01 766", "source": "/home/jovyan/langtest/langraph_work_shop/data/s3-userguide.pdf", "page": 784, "total_pages": 2716}'),
 Document(metadata={'id': 'sagemaker_doc_1366'}, page_content='{"document": "작업에 대한 자세한 내용은 Amazon S3 객체에 대한 대규모 배치 작업 수행을 참조하세요.\\nS3 Express One Zone에서 배치 작업 사용\\nAPI 버전 2006-03-01 796", "source": "/home/jovyan/langtest/langraph_work_shop/data/s3-userguide.pdf", "page": 814, "total_pages": 2716}'),
 Document(metadata={'id': 'sagemaker_doc_668'}, page_content='{"document":