In [1]:
from langchain_aws import ChatBedrockConverse
from langchain.schema import HumanMessage, SystemMessage
import boto3
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from opensearchpy import OpenSearch, RequestsHttpConnection
import os
import boto3
import json
import sys
from langchain.schema import BaseRetriever, Document
from typing import List, Optional, Dict, Tuple
from langchain_core.documents import Document
import fitz
import concurrent.futures
from langchain_aws import BedrockEmbeddings
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain.prompts.prompt import PromptTemplate
from operator import itemgetter
from langchain_core.runnables import RunnableLambda
from pydantic import BaseModel, Field


In [2]:
bedrock_client = boto3.client(
    service_name='bedrock-runtime',
    region_name='us-east-1'  # replace with your region
)
bedrock_embeddings = BedrockEmbeddings(
    model_id="amazon.titan-embed-text-v2:0",  # You can choose other models like "cohere.embed-english-v3"
    region_name="us-east-1",  # Change to your AWS region
    client=bedrock_client  # Optional: provide your own boto3 client
)


In [3]:
#text를 vector로 변환
def get_embedding(text):
    session = boto3.Session()
    bedrock = session.client(
        service_name='bedrock-runtime',
        region_name='us-east-1'  # Specify a region
    )

    response = bedrock.invoke_model(
        body=json.dumps({"inputText": text}),
        modelId="amazon.titan-embed-text-v2:0",
        accept="application/json",
        contentType="application/json"
    )

    response_body = json.loads(response['body'].read())
    return response_body['embedding']

In [4]:
#opensearch에 index(sql table과 유사) 생성
def define_index(opensearch_client, index_name):
    index_settings = {
        "settings": {
            "index": {  
                "knn": True,                          
                "knn.algo_param.ef_search": 100,      
                "number_of_shards": 3,                
                "number_of_replicas": 2, 
                "analysis": {
                    "analyzer": {
                        "nori_analyzer": {
                            "tokenizer": "nori_tokenizer",
                            "filter": ["nori_stop", "lowercase"]
                        }
                    },
                    "filter": {
                        "nori_stop": {
                            "type": "nori_part_of_speech",
                            "stoptags": ["J", "JKS", "JKB", "JKO", "JKG", "JKC", "JKV", "JKQ", "JX", "JC"]
                        }
                    }
                }
            }
        },   
        "mappings": {
            "properties": {
                "document": {
                    "type": "text",
                    "analyzer": "nori_analyzer"
                },
                "doc_vector": {
                    "type": "knn_vector",
                    "dimension": 1024,
                    "method": {
                        "name": "hnsw",   
                        "space_type": "l2",
                        "engine": "faiss",
                        "parameters": {
                            "ef_construction": 128,
                            "m": 16
                        }
                    }
                },
                "source": {
                    "type": "keyword"  # Use keyword for exact matching
                },
                "page": {
                    "type": "integer"
                },
                "total_pages": {
                    "type": "integer"
                }
            }
        }
    }
    try:
        response = opensearch_client.indices.create(
            index=index_name,
            body=index_settings
        )
        print("Index created successfully:", response)
        
    except Exception as e:
        print("Error creating index:", e)

In [5]:
#pdf파일 opensearch에 적재
def save_chunk(opensearh_client,  index_name,chunks):
    for d in enumerate(chunks):
        document = d[1].page_content
        raw_embedding = bedrock_embeddings.embed_query(d[1].page_content)
        doc_vector = [float(val) for val in raw_embedding]
        source = d[1].metadata["source"]
        page_info= d[1].metadata["page"]
        total_page=d[1].metadata["total_pages"]
    
        # Prepare document for indexing
        document = {
            "document": document,
            "doc_vector":doc_vector,
            "source" : source, 
            "page" : page_info , 
            "total_pages" : total_page 
        }
        try:
            response = opensearh_client.index(
                index=index_name,
                body=document,
                id=f"sagemaker_doc_{d[0]}"
            )
            if d[0] % 500 ==0 :
                print(f"Document {d[0]} indexed successfully")
        except Exception as e:
            print(f"Error indexing document {d[0]}: {e}")


In [6]:
#속도를 위해 parallel하게 진행. 페이지를 부분적으로 나누어서 text파일 읽기
def process_page_range(pdf_path, start_page, end_page):
    """Process a range of pages from a PDF."""
    # Open the PDF
    doc = fitz.open(pdf_path)
    total_pages = doc.page_count
    
    batch_docs = []
    for i in range(start_page, end_page):
        if i >= total_pages:
            break
            
        # Get page and extract text
        page = doc[i]
        text = page.get_text()
        
        # Create Document object
        doc_obj = Document(
            page_content=text,
            metadata={
                "source": pdf_path,
                "page": i + 1,
                "total_pages": total_pages
            }
        )
        batch_docs.append(doc_obj)
    
    return batch_docs
    
#pdf파일 읽기 
def load_pdf_parallel(pdf_path, batch_size=100, max_workers=4):
    """Load a PDF using parallel processing."""
    # Get total pages
    doc = fitz.open(pdf_path)
    total_pages = doc.page_count
    doc.close()
    print(f"PDF has {total_pages} pages in total")
    
    # Create batches
    batches = []
    for start_page in range(0, total_pages, batch_size):
        end_page = min(start_page + batch_size, total_pages)
        batches.append((start_page, end_page))
    
    all_docs = []
    
    with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
        # Submit tasks
        future_to_batch = {
            executor.submit(process_page_range, pdf_path, start, end): (start, end)
            for start, end in batches
        }
        
        # Process results as they complete
        for future in concurrent.futures.as_completed(future_to_batch):
            start, end = future_to_batch[future]
            try:
                batch_docs = future.result()
                all_docs.extend(batch_docs)
                print(f"Completed batch: pages {start+1} to {end}")
            except Exception as e:
                print(f"Error processing pages {start+1} to {end}: {e}")
    
    # Sort by page number
    all_docs.sort(key=lambda x: x.metadata["page"])
    
    return all_docs

In [33]:
#opensearh client 연결 생성 및 확인
opensearch_client = OpenSearch(
    hosts = [{'host': 'opensearch_url', 'port': 443}],
    http_auth = ('user', 'password'),  # Replace with your master credentials
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)
try:
    response = opensearch_client.info()
    print("Successfully connected to OpenSearch")
    print(response)
except Exception as e:
    print(f"Connection failed: {e}")

Connection failed: ConnectionError(HTTPSConnectionPool(host='opensearch_url', port=443): Max retries exceeded with url: / (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x7f902060cd10>: Failed to resolve 'opensearch_url' ([Errno -2] Name or service not known)"))) caused by: ConnectionError(HTTPSConnectionPool(host='opensearch_url', port=443): Max retries exceeded with url: / (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x7f902060cd10>: Failed to resolve 'opensearch_url' ([Errno -2] Name or service not known)")))


In [8]:
#pdf파일을 chunk로 나눈다
path="/home/jovyan/langtest/bigdata/data/s3-userguide.pdf"
page=load_pdf_parallel(path,max_workers=os.cpu_count())

PDF has 2716 pages in total
Completed batch: pages 101 to 200
Completed batch: pages 1 to 100
Completed batch: pages 201 to 300
Completed batch: pages 301 to 400
Completed batch: pages 401 to 500
Completed batch: pages 501 to 600
Completed batch: pages 601 to 700
Completed batch: pages 701 to 800
Completed batch: pages 801 to 900
Completed batch: pages 901 to 1000
Completed batch: pages 1001 to 1100
Completed batch: pages 1201 to 1300
Completed batch: pages 1101 to 1200
Completed batch: pages 1301 to 1400
Completed batch: pages 1401 to 1500
Completed batch: pages 1501 to 1600
Completed batch: pages 1601 to 1700
Completed batch: pages 1701 to 1800
Completed batch: pages 1801 to 1900
Completed batch: pages 1901 to 2000
Completed batch: pages 2001 to 2100
Completed batch: pages 2201 to 2300
Completed batch: pages 2101 to 2200
Completed batch: pages 2301 to 2400
Completed batch: pages 2401 to 2500
Completed batch: pages 2601 to 2700
Completed batch: pages 2501 to 2600
Completed batch: page

In [9]:
#반환된 결과는 List[Document] 형태입니다.
from langchain_text_splitters import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(chunk_size=600, chunk_overlap=100)
split_docs = text_splitter.split_documents(page)


In [10]:
print("총 갯수 -> ",len(split_docs))

총 갯수 ->  7468


In [11]:
try:
    delete_response = opensearch_client.indices.delete(index='s3_explain')
    print(f"Index 's3_explain' deleted successfully: {delete_response}")
except Exception as delete_error:
    print(f"Failed to delete index 's3_explain': {delete_error}")
    
except Exception as e:
    print(f"Connection failed: {e}")

Index 's3_explain' deleted successfully: {'acknowledged': True}


In [12]:
#index 생성
define_index(opensearch_client, "s3_explain")

Index created successfully: {'acknowledged': True, 'shards_acknowledged': True, 'index': 's3_explain'}


In [13]:
save_chunk(opensearch_client, "s3_explain" , split_docs)

Document 0 indexed successfully
Document 500 indexed successfully
Document 1000 indexed successfully
Document 1500 indexed successfully
Document 2000 indexed successfully
Document 2500 indexed successfully
Document 3000 indexed successfully
Document 3500 indexed successfully
Document 4000 indexed successfully
Document 4500 indexed successfully
Document 5000 indexed successfully
Document 5500 indexed successfully
Document 6000 indexed successfully
Document 6500 indexed successfully
Document 7000 indexed successfully


In [14]:
#BM25 search ( nori_analyzer를 이용하여 형태소 분석을 진행한후 텍스트 유사도 측정 )
def simple_text_search(client, search_text, index_name="s3_explain", k=10):
    try:
        # Simple text search query
        text_query = {
            "size": k,  # Number of results to return
            "_source": {
                "excludes": ["doc_vector"]  # Exclude vector field from results
            },
            "query": {
                "match": {
                    "document": {
                        "query": search_text,
                        "analyzer": "nori_analyzer"
                    }
                }
            }
        }

        # Execute search
        response = client.search(
            index=index_name,
            body=text_query
        )


        documents = []
        if response.get("hits", {}).get("hits", []):
            search_results = normalize_search_results(response) 
            for res in search_results["hits"]["hits"]:
                source = res['_source']
                page_content = {k: source[k] for k in source if k != "table_summary_v"}
                metadata = {"id": res['_id']}
                score = res['_score']  # Get the score from the search result
                documents.append((Document(page_content=json.dumps(page_content, ensure_ascii=False), metadata=metadata), score))
        return documents  

    except Exception as e:
        print(f"Search error: {str(e)}")
        return None

#BM25 search의 score는 0.1 to 15.0 임으로 normazlie(0~1)를 해야함 (추후 vector search와 score 비교를 하기위해)
def normalize_search_results(search_results):
        hits = (search_results["hits"]["hits"])
        max_score = float(search_results["hits"]["max_score"])
        for hit in hits:
            hit["_score"] = float(hit["_score"]) / max_score
        search_results["hits"]["max_score"] = hits[0]["_score"]
        search_results["hits"]["hits"] = hits
        return search_results

In [15]:
#vector 유사도 검색 
def vector_search(client, embedding_vector, index_name="s3_explain", k=3):
    try:
        # KNN vector search query
        vector_query = {
            "size": k,
            "_source": {
                "excludes": ["doc_vector"]  # Exclude vector field from results
            },
            "query": {
                "knn": {
                    "doc_vector": {
                        "vector": embedding_vector,
                        "k": k
                    }
                }
            }
        }

        # Execute the search
        response = client.search(
            index=index_name,
            body=vector_query
        )

        documents = []
        for res in response["hits"]["hits"]:
            source = res['_source']
            page_content = {k: source[k] for k in source if k != "vector"}
            metadata = {"id": res['_id']}  # Add metadata with a unique identifier
            score = res['_score']  # Get the match score from the search result
            documents.append((Document(page_content=json.dumps(page_content, ensure_ascii=False), metadata=metadata), score))
        return documents
            

        return results

    except Exception as e:
        print(f"Search error: {str(e)}")
        return None

In [16]:
#노리를 통해 검색한 10개 + vector 유사도 기반으로 검색한 10개 총 20개를 정렬
def get_ensemble_results(doc_lists: List[List[Tuple[Document, float]]], weights: List[float], k: int = 5) -> List[Document]:
        hybrid_score_dic: Dict[str, float] = {}
        doc_map: Dict[str, Document] = {}
        
        # Weight-based adjustment
        for doc_list, weight in zip(doc_lists, weights):
            for doc, score in doc_list:
                doc_id = doc.metadata.get("id", doc.page_content)
                if doc_id not in hybrid_score_dic:
                    hybrid_score_dic[doc_id] = 0.0
                hybrid_score_dic[doc_id] += score * weight
                doc_map[doc_id] = doc
    
        sorted_docs = sorted(hybrid_score_dic.items(), key=lambda x: x[1], reverse=True)
        return [doc_map[doc_id] for doc_id, _ in sorted_docs[:k]]

In [17]:
#하이브리드 서치 코드 -> 노리를 통해 검색한 10개 + vector 유사도 기반으로 검색한 10개 총 20개에서 rerank를 통해 최종 10개만 선택한다
def retrieval_augmented(query , k =10): 
    
    #embedding 
    raw_embedding = get_embedding(query)
    #vector search 가져오기 
    vector=vector_search(opensearch_client, raw_embedding ,k =k)
    #lexical search 가져오기 
    lexical=simple_text_search(opensearch_client, query ,k =k)

    #rerank
    rerank_doc = get_ensemble_results(
            doc_lists=[vector, lexical],
            weights= [0.7, 0.3],
            k=k,
    )
    return rerank_doc
    

In [18]:
#결과 검색
result =retrieval_augmented("Amazon S3 glacier에 대해 설명해줘")

In [19]:
result

[Document(metadata={'id': 'sagemaker_doc_5760'}, page_content='{"document": "Amazon Simple Storage Service\\n사용 설명서\\nGlacier Instant Retrieval, S3 Glacier Flexible Retrieval 또는 Reduced Redundancy Storage를 스토\\n리지 클래스로 지정하여 객체를 덮어씁니다.\\nNote\\n복원된 객체에 대한 복사 작업은 Amazon S3 콘솔에서 S3 Glacier Flexible Retrieval 또는 S3 \\nGlacier Deep Archive 스토리지 클래스에 있는 객체에 대해 지원되지 않습니다. 이러한 유형\\n의 복사 작업에는\xa0AWS Command Line Interface(AWS CLI),\xa0AWS\xa0SDK 또는 REST API를 \\n사용하십시오.\\nS3 Glacier Flexible Retrieval 및 S3 Glacier Deep Archive 스토리지 클래스에 저장된 객체는 \\nAmazon S3를 통해서만 확인하고 사용할 수 있습니다. 별도의 Amazon S3 Glacier 서비스를 통해 사\\n용할 수는 없습니다.\\n이러한 객체는 Amazon S3 객체이므로 Amazon S3 콘솔 또는 Amazon S3 API를 통해서만 액세스할", "source": "/home/jovyan/langtest/bigdata/data/s3-userguide.pdf", "page": 2074, "total_pages": 2716}'),
 Document(metadata={'id': 'sagemaker_doc_5702'}, page_content='{"document": "Amazon Simple Storage Service\\n사용 설명서\\nImportant\\n모든 장기 데이터에 대해 Amazon S3 서비스 내의 S3 Glacier 스토리지 클래스를 사용하는 것\\n이 좋습니다.\\nAmaz

In [20]:
bedrock_client = boto3.client(
    service_name='bedrock-runtime',
    region_name='us-east-1'  # replace with your region
)

In [21]:
llm = ChatBedrockConverse(
    model_id="anthropic.claude-3-sonnet-20240229-v1:0",
    client=bedrock_client,
    temperature=0.7,
    max_tokens=2000
)

In [22]:
#template 생성
template_lambda = """The following is a friendly conversation between a human and an AI. 
The AI is talkative and provides lots of specific details from its context. 
If the AI does not know the answer to a question, it truthfully says it does not know. 
The AI ONLY uses information contained in the "Relevant Information" section and does not hallucinate.

Relevant Information:
{document}

Conversation:
Human: {question}
AI:"""

# Create the prompt template
prompt_lambda = PromptTemplate(
    input_variables=["document", "question"], 
    template=template_lambda
)

In [23]:
#input을 넣을때 함수 실행 (lambda 실행)
chain_lambda_rag = (
    {
        "document": lambda x: retrieval_augmented(x["question"]),
        "question": itemgetter("question")
    }
    | prompt_lambda
    | llm
    | StrOutputParser()
)


In [24]:
response = chain_lambda_rag.invoke({"question": "Amazon S3 glacier에 대해 설명해줘"})



In [25]:
response

'Amazon S3 Glacier는 장기 데이터 저장 및 데이터 아카이브를 위한 저비용 스토리지 클래스입니다. S3 Glacier에는 세 가지 스토리지 클래스가 있습니다.\n\n1. S3 Glacier Instant Retrieval: 거의 액세스하지 않지만 밀리초 단위로 검색이 필요한 장기 데이터에 사용합니다. 실시간 액세스가 가능합니다.\n\n2. S3 Glacier Flexible Retrieval: 분 단위로 데이터의 일부를 검색해야 하는 아카이브에 사용합니다. 이 스토리지 클래스의 데이터는 아카이빙되며 실시간 액세스가 불가능합니다.\n\n3. S3 Glacier Deep Archive: 거의 액세스할 필요가 없는 데이터를 아카이브할 때 사용합니다. 이 스토리지 클래스의 데이터 또한 아카이빙되며 실시간 액세스가 불가능합니다.\n\nS3 Glacier 스토리지 클래스는 S3 Standard 스토리지 클래스와 동일한 내구성과 복원성을 제공하지만, 스토리지 비용이 더 낮습니다. S3 Glacier에 저장된 객체는 Amazon S3를 통해서만 액세스할 수 있으며, 별도의 Amazon S3 Glacier 서비스에서는 액세스할 수 없습니다.'

In [26]:
#RunnableLambda안에서 실행할 함수
def get_document_runnable(data):
    print(data)
    # Make sure this function returns a string
    return retrieval_augmented(data)


In [27]:
#input을 넣을때 함수 실행 (RunnableLambda 실행)
chain_runnable_rag = (
    {
        "document": itemgetter("question") | RunnableLambda(get_document_runnable),
        "question": itemgetter("question")
    }
    | prompt_lambda
    | llm
    | StrOutputParser()
)


In [28]:
chain_runnable_rag.invoke({"question": "Amazon S3 glacier에 대해 설명해줘"})

Amazon S3 glacier에 대해 설명해줘


'Amazon S3 Glacier는 장기 데이터 보관을 위한 저비용 스토리지 클래스입니다. S3 Glacier에는 세 가지 스토리지 클래스가 있습니다.\n\n1. S3 Glacier Instant Retrieval: 거의 액세스하지 않고 밀리초 단위로 검색해야 하는 장기 데이터에 사용합니다. 이 클래스의 데이터는 실시간으로 액세스할 수 있습니다.\n\n2. S3 Glacier Flexible Retrieval: 분 단위로 데이터의 일부를 검색해야 하는 아카이브에 사용합니다. 이 클래스의 데이터는 아카이빙되어 실시간 액세스가 불가능합니다.\n\n3. S3 Glacier Deep Archive: 거의 액세스할 필요가 없는 데이터를 아카이브할 때 사용합니다. 이 클래스 또한 아카이빙되어 실시간 액세스가 불가능합니다.\n\nS3 Glacier 스토리지 클래스는 S3 Standard와 동일한 내구성과 복원성을 제공하지만, 스토리지 비용이 더 낮습니다. 데이터 액세스 빈도와 필요한 검색 속도에 따라 적절한 클래스를 선택하면 됩니다.'

In [29]:
#nested llm chain
def get_document_runnable_nested(data):
    print(data)
    # Make sure this function returns a string
    return { 
             "question" : data,
             "document": retrieval_augmented(data)
            }


In [30]:
template_rerank = """You are an AI document processor that organizes and summarizes information.

TASK:
1. Analyze the user's question to understand what information is relevant
2. Review the provided document and identify sections that relate to the question
3. Extract ONLY the parts of the document that are relevant to the question
4. Organize these relevant parts into a clear, concise summary
5. Remove any redundant or irrelevant information
6. Format the output as a structured summary of key points
7. DO NOT answer the question or provide any additional information not in the document
8. DO NOT include your own analysis or opinions

Document to Process:
{document}

Question for Context (use only to determine relevance):
{question}

OUTPUT INSTRUCTIONS:
- Return ONLY the organized and summarized relevant information
- Do not include any introduction, explanation, or conclusion
- Do not address the question directly
- Format as bullet points or short paragraphs of key information

Organized Relevant Information:"""




# Create the prompt template
prompt_rerank = PromptTemplate(
    input_variables=["document", "question"], 
    template=template_rerank
)

In [31]:
rerank_chain = (
    {
        "document": (
                    itemgetter("question") 
                    | RunnableLambda(get_document_runnable_nested) 
                    | prompt_rerank
                    | llm
                    | StrOutputParser()
                    ),
        "question": itemgetter("question")
    }
    | prompt_lambda
    | llm
    | StrOutputParser()
)


In [32]:
rerank_chain.invoke({"question": "Amazon S3 glacier에 대해 설명해줘"})

Amazon S3 glacier에 대해 설명해줘


'Amazon S3 Glacier는 AWS의 저렴한 클라우드 스토리지 서비스입니다. 장기 데이터 보관 및 아카이브에 최적화되어 있습니다. 주요 특징은 다음과 같습니다:\n\n- S3 Glacier에는 세 가지 스토리지 클래스가 있습니다:\n\n1) S3 Glacier Instant Retrieval - 밀리초 단위로 데이터에 실시간 액세스가 가능합니다.\n\n2) S3 Glacier Flexible Retrieval - 분 단위로 일부 데이터 검색이 가능한 아카이브 데이터용입니다.\n\n3) S3 Glacier Deep Archive - 거의 액세스할 필요가 없는 아카이브 데이터용입니다.\n\n- 이러한 S3 Glacier 클래스들은 S3 Standard와 동일한 내구성과 복원성을 제공하지만, 스토리지 비용이 더 저렴합니다.\n\n- S3 Glacier의 데이터는 Amazon S3를 통해서만 액세스할 수 있습니다.\n\nS3 Glacier는 장기 보관이 필요한 데이터에 적합하며, 데이터 액세스 요구 사항에 따라 적절한 스토리지 클래스를 선택할 수 있습니다.'