# Document Layout Aware Processing and Retrieval Augmented Generation.

This notebook was tested on a SageMaker Studio Notebook `Data Science 3.0` kernel and  `ml.t3.xlarge` instance.

---
---

## Contents

1. [Objective](#Objective)
1. [Background](#Background-(Problem-Description-and-Approach))
1. [Document Extraction](#Document-Extraction)
1. [Document Processing](#Document-Processing)
1. [Document Chunking](#Document-Chunking)
1. [Indexing](#Indexing)
1. [RAG](#RAG)
1. [CleanUp](#CleanUp)
1. [Conclusion](#Conclusion)

---

## Objective

This example notebook guides you through the process of utilizing Amazon Textract's layout feature. This feature allows you to extract content from your document while maintaining its layout and reading format. Amazon Textract Layout feature is able to detect the following sections:
- Titles
- Headers
- Sub-headers
- Text
- Tables
- Figures
- List 
- Footers
- Page Numbers
- Key-Value pairs

Here is a snippet of Textract Layout feature on a page of Amazon Sustainability report using the Textract Console UI:
<img src="images/amazonsus2022.jpg" width="1000"/>

The [Amazon Textract Textractor Library](https://aws-samples.github.io/amazon-textract-textractor/index.html) is a library that seamlessly works with Textract features to aid in document processing. You can start by checking out the [examples in the documentation.](https://aws-samples.github.io/amazon-textract-textractor/notebooks/layout_analysis_for_text_linearization.html)
This notebook utilizes the Textractor library to interact with Amazon Textract and interpret its response. It enriches the extracted document text with XML tags to delineate sections, facilitating layout-aware chunking and document indexing into a Vector Database (DB). This process aims to enhance Retrieval Augmented Generation (RAG) performance.

---

## Background (Problem Description and Approach)

- **Problem statement**: 
RAG serves as a technique aimed at enhancing the effectiveness of Large Language Models (LLMs) on lengthy textual content. While widely adopted, implementing RAG necessitates initial processing to extract and segment text into meaningful chunks, especially challenging for intricate assets like PDFs. Many document parsing approaches overlook layout semantics or use simplistic methods like fixed window carving, lacking awareness of document structure or elements. This can disrupt contextual continuity and diminish the performance of RAG systems. An optimal RAG input pipeline would intelligently divide PDF texts into vectorized segments aligned with layout and content semantics, preserving informational integrity for the LLM. In essence, a context-aware parsing phase is pivotal for enabling RAG techniques to realize their full potential, particularly when handling extensive or intricate documents.

- **Our approach**: 

<img src="images/txt layout-Page-2.jpg" width="800"/>

1. Upload multi-page document to Amazon S3.
2. Call Amazon Textract Start Document Analysis api call to extract Document Text including Layout and Tables. The response provides structured text aligned with the original document formatting and the pandas tables of each table detected in the document.
3. Enrich this extracted text further with XML tags indicating semantic sections, adding contextual metadata through the Textractor library.
4. The textrcat library extracts tables in plain text, maintaining their original layout. However, for improved processing and manipulation, it's advisable to convert them to CSV format. This method replaces the plain text tables with their CSV counterparts obtained from Textract's table feature.
5.  In this approach, the extracted text is segmented based on document title sections, the highest hierarchy level in a document. Each subsection within the title section is then chunked according to a maximum word threshold. Below outlines our approach to handling the chunking of subsection elements.:

    - **Tables:** Tables are chunked row by row until the maximum number of alphanumeric words is reached. For each table chunk, the column headers are added to the table along with the table header, typically the sentence or paragraph preceding the table in the document. This ensures that the information of the table is retained in each chunk.
    
    <img src="images/table chunkers.png" width="800" height=700/>
    
        To handle tables with merged cells, this solution first unmerges any merged cell ranges, then duplicates the original merged cell value into each of the corresponding individual cells after unmerging.
    
    <img src="images/complex-tables.png" width="800" height=700/>
    
    - **List:** Chunking lists found in documents can be challenging. Naive chunking methods often split list items by sentence or newline characters. However, this approach presents issues as only the first list chunk typically contains the list title, which provides essential information about the list items. Consequently, subsequent list chunks become obsolete. In this notebook, lists are chunked based on their individual list items. Additionally, the header of the list is appended to each list chunk to ensure that the information of the list is preserved in each chunk.
    <img src="images/list chunker.png" width="800" height=700/>
    
    - **Section and subsection:** The structure of a document can generally be categorized into titles, sections, and paragraphs. A paragraph is typically the smallest unit of a document that conveys information independently, particularly within the context of a section or subsection header. In this method, text sections are chunked based on paragraphs, and the section header is added to each paragraph chunk (as well as tables and lists) within that section of the document.
    <img src="images/text chunks.png" width="800" height=700/>
    
6. Metadata is appended to each respective chunk during indexing, encompassing:
    - The entire CSV tables detected within the chunk.
    - The section header ID associated with the chunk.
    - The section title ID linked to the chunk.
    
    When retrieving a passage based on hybrid search (combining semantic and text matching), there's flexibility in the amount of content forwarded to the LLM. Some queries may necessitate additional information, allowing users to choose whether to send the corresponding chunk subsection or title section based on the specific use case.

 *Some chunk may exceed the fixed word count threshold due to preserving paragraphs and dealing with complex tables. 

**Prerequisite:**
- [Amazon Bedrock model access](https://docs.aws.amazon.com/bedrock/latest/userguide/model-access.html)
- [Deploy Embedding and Text Generation Large Language Models with SageMaker JumpStart](https://docs.aws.amazon.com/sagemaker/latest/dg/jumpstart-foundation-models-use.html)
- Amazon OpenSearch Cluster (Provisioned Cluster or Serverless):
    - [Create OpenSearch Service Domain](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/createupdatedomains.html). This solution uses **IAM** as master user for fine-grained access control. **NOTE:** This solution only works with Amazon Opensearch Service version 2.11 and higher.
    OR
    - [Create OpenSearch Serverless Collection](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-getting-started.html)

## Step 1: Setup

Install required packages

In [None]:
!pip install --force-reinstall amazon-textract-textractor==1.7.11
!pip install inflect
!pip install requests-aws4auth
!pip install opensearch-py
!pip install anthropic
%pip install -U opensearch-py==2.3.1
%pip install -U boto3==1.33.2
%pip install -U retrying==1.3.4

Restart the Kernel 


In [25]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [26]:
import os
from PIL import Image
import pandas as pd
import re
import json
import uuid
from textractor import Textractor
from textractor.visualizers.entitylist import EntityList
from textractor.data.constants import TextractFeatures
import io
import inflect
import pprint
from utility import create_bedrock_execution_role, create_oss_policy_attach_bedrock_execution_role, create_policies_in_oss
import random
from retrying import retry
suffix = random.randrange(200, 900)
from collections import OrderedDict
import boto3
import time
import openpyxl
import warnings
warnings.filterwarnings('ignore')
from openpyxl.cell import Cell
from openpyxl.worksheet.cell_range import CellRange
s3=boto3.client("s3")
from botocore.config import Config
config = Config(
    read_timeout=600, 
    retries = dict(
        max_attempts = 5 
    )
)
from anthropic import Anthropic
client = Anthropic()
bedrock_runtime = boto3.client(service_name='bedrock-runtime',region_name='us-west-2',config=config)

In [27]:
import pprint
from utility import create_bedrock_execution_role, create_oss_policy_attach_bedrock_execution_role, create_policies_in_oss
import random
from retrying import retry
suffix = random.randrange(200, 900)

In [28]:
sts_client = boto3.client('sts')
boto3_session = boto3.session.Session()
region_name = boto3_session.region_name
bedrock_agent_client = boto3_session.client('bedrock-agent', region_name=region_name)
service = 'aoss'
s3_client = boto3.client('s3')
account_id = sts_client.get_caller_identity()["Account"]
s3_suffix = f"{region_name}-{account_id}"
bucket_name = f'textract-2-kb-ws-{s3_suffix}' # replace it with your bucket name.
pp = pprint.PrettyPrinter(indent=2)

In [29]:
s3bucket = s3_client.create_bucket(
    Bucket=bucket_name,
    CreateBucketConfiguration={ 'LocationConstraint': region_name }
)

Utility functions are provided for embedding generation using select models from Amazon SageMaker Jumpstart and Amazon Bedrock. These models were chosen arbitrarily, but you have the flexibility to customize them by using models of your preference available on Bedrock, SageMaker JumpStart, or HuggingFace.\
**Change the placeholders for sagemaker endpoint names for the respective models below.**

In [30]:
"""
This dictionary `model_dimension_mapping` maps different model names to their respective embedding dimensions.
"""
model_dimension_mapping={"titanv2":1024,"titanv1":1536,"bge":1024,"all-mini-lm":384,"e5":1024}

def _get_emb_(passage, model):
    """
    This function takes a passage of text and a model name as input, and returns the corresponding text embedding.
    The function first checks the provided model name and then invokes the appropriate model or API to generate the text embedding.  
    After invoking the appropriate model or API, the function extracts the text embedding from the response and returns it.
    """

    if "titanv1" in model:
        response = bedrock_runtime.invoke_model(body=json.dumps({"inputText":passage}),
                                    modelId="amazon.titan-embed-text-v1", 
                                    accept="application/json", 
                                    contentType="application/json")

        response_body = json.loads(response.get('body').read())
        embedding=response_body['embedding']
    elif "titanv2" in model:
        response = bedrock_runtime.invoke_model(body=json.dumps({"inputText":passage,"dimensions":1024,"normalize":False}),
                                    modelId="amazon.titan-embed-text-v2:0", 
                                    accept="application/json", 
                                    contentType="application/json")

        response_body = json.loads(response.get('body').read())
        embedding=response_body['embedding']
    elif "all-mini-lm" in model:
        payload = {'text_inputs': [passage]}
        payload = json.dumps(payload).encode('utf-8')

        response = SAGEMAKER.invoke_endpoint(EndpointName="SAGEMAKER JUMPSTART ALL MINI LM V6 ENDPOINT", 
                                                    ContentType='application/json',  
                                                    Body=payload)

        model_predictions = json.loads(response['Body'].read())
        embedding = model_predictions['embedding'][0]
    elif "e5" in model:
        payload = {"text_inputs":[passage],"mode":"embedding"} #{'text_inputs': [passage]}
        payload = json.dumps(payload).encode('utf-8')
        response = SAGEMAKER.invoke_endpoint(EndpointName="SAGEMAKER JUMPSTART E5 ENDPOINT", 
                                                    ContentType='application/json',  
                                                    Body=payload)

        model_predictions = json.loads(response['Body'].read())
        embedding = model_predictions['embedding'][0]
    elif "bge" in model:
        payload = {"text_inputs":[passage],"mode":"embedding"} #{'text_inputs': [passage]}
        payload = json.dumps(payload).encode('utf-8')
        response = SAGEMAKER.invoke_endpoint(EndpointName="SAGEMAKER JUMPSTART BGE ENDPOINT", 
                                                    ContentType='application/json',  
                                                    Body=payload)

        model_predictions = json.loads(response['Body'].read())
        embedding = model_predictions['embedding'][0]
    return embedding


Utility function to inference Anthropic Claude models on Bedrock.

In [31]:
def bedrock_streemer(response):
    stream = response.get('body')
    answer = ""
    i = 1
    if stream:
        for event in stream:
            chunk = event.get('chunk')
            if  chunk:
                chunk_obj = json.loads(chunk.get('bytes').decode())
                if "delta" in chunk_obj:                    
                    delta = chunk_obj['delta']
                    if "text" in delta:
                        text=delta['text'] 
                        print(text, end="")
                        answer+=str(text)       
                        i+=1
                if "amazon-bedrock-invocationMetrics" in chunk_obj:
                    input_tokens= chunk_obj['amazon-bedrock-invocationMetrics']['inputTokenCount']
                    output_tokens=chunk_obj['amazon-bedrock-invocationMetrics']['outputTokenCount']
                    print(f"\nInput Tokens: {input_tokens}\nOutput Tokens: {output_tokens}")
    return answer,input_tokens, output_tokens

def bedrock_claude_(chat_history,system_message, prompt,model_id,image_path=None):
    content=[]
    if image_path:       
        if not isinstance(image_path, list):
            image_path=[image_path]      
        for img in image_path:
            s3 = boto3.client('s3')
            match = re.match("s3://(.+?)/(.+)", img)
            image_name=os.path.basename(img)
            _,ext=os.path.splitext(image_name)
            if "jpg" in ext: ext=".jpeg"                        
            if match:
                bucket_name = match.group(1)
                key = match.group(2)    
                obj = s3.get_object(Bucket=bucket_name, Key=key)
                base_64_encoded_data = base64.b64encode(obj['Body'].read())
                base64_string = base_64_encoded_data.decode('utf-8')
            content.extend([{"type":"text","text":image_name},{
              "type": "image",
              "source": {
                "type": "base64",
                "media_type": f"image/{ext.lower().replace('.','')}",
                "data": base64_string
              }
            }])
    
    content.append({
        "type": "text",
        "text": prompt
            })
    chat_history.append({"role": "user",
            "content": content})
    prompt = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1500,
        "temperature": 0.1,
        "system":system_message,
        "messages": chat_history
    }
    answer = ""
    prompt = json.dumps(prompt)
    response = bedrock_runtime.invoke_model_with_response_stream(body=prompt, modelId=model_id, accept="application/json", contentType="application/json")
    answer,input_tokens,output_tokens=bedrock_streemer(response) 
    return answer, input_tokens, output_tokens

def _invoke_bedrock_with_retries(current_chat, chat_template, question, model_id, image_path):
    max_retries = 5
    backoff_base = 2
    max_backoff = 3  # Maximum backoff time in seconds
    retries = 0

    while True:
        try:
            response,input_tokens,output_tokens = bedrock_claude_(current_chat, chat_template, question, model_id, image_path)
            return response,input_tokens,output_tokens
        except ClientError as e:
            if e.response['Error']['Code'] == 'ThrottlingException':
                if retries < max_retries:
                    # Throttling, exponential backoff
                    sleep_time = min(max_backoff, backoff_base ** retries + random.uniform(0, 1))
                    time.sleep(sleep_time)
                    retries += 1
                else:
                    raise e
            elif e.response['Error']['Code'] == 'ModelStreamErrorException':
                if retries < max_retries:
                    # Throttling, exponential backoff
                    sleep_time = min(max_backoff, backoff_base ** retries + random.uniform(0, 1))
                    time.sleep(sleep_time)
                    retries += 1
                else:
                    raise e
            else:
                # Some other API error, rethrow
                raise
                

## Document Extraction
We employ the Amazon 2024 10K report as an example document. Using the textractor library, we trigger the Amazon Textract `start document analysis` API to initiate an asynchronous process for extracting document text and identifying additional elements like document layout and tables.\
Change **BUCKET** placeholder to your bucket name in Amazon S3.


In [32]:
BUCKET=bucket_name
extractor = Textractor(region_name="us-west-2")
file="underwriting_guide.pdf" #Change to file path either in S3 or Local
doc_id= os.path.basename(file)
file_name, ext = os.path.splitext(file)
if file.startswith("s3://"):
    document = extractor.start_document_analysis(
        file_source=file,
        features=[TextractFeatures.LAYOUT,TextractFeatures.TABLES],
        # client_request_token=doc_id,
        save_image=False,
        s3_output_path=f"s3://{BUCKET}/textract-output/{file_name}/"
  
    )
else:
    document = extractor.start_document_analysis(
        file_source=file,
        features=[TextractFeatures.LAYOUT,TextractFeatures.TABLES],
        # client_request_token=doc_id,
        save_image=False,
        s3_upload_path=f"s3://{BUCKET}",
        s3_output_path=f"s3://{BUCKET}/textract-output/{file_name}/"
    )

By leveraging the Textractor linearization function, we enhance the extracted content with XML tags while concealing certain page sections such as headers, footers, and non-essential images.

We opt to tag tables, lists, title sections, and sub-sections to facilitate the efficient identification and chunking of these document elements.

In [33]:
from textractor.data.text_linearization_config import TextLinearizationConfig

config = TextLinearizationConfig(
    hide_figure_layout=False,
    title_prefix="<titles><<title>><title>",
    title_suffix="</title><</title>>",
    hide_header_layout=True,
    section_header_prefix="<headers><<header>><header>",
    section_header_suffix="</header><</header>>",
    table_prefix="<tables><table>",
    table_suffix="</table>",
    list_layout_prefix="<<list>><list>",
    list_layout_suffix="</list><</list>>",
    hide_footer_layout=True,
    hide_page_num_layout=True,
)

print(document.pages[3].get_text(config=config))

Health

Underwriting
 Guide

8


<titles><<title>><title>Health Insurance Build Charts </title><</title>>

<<list>><list>1. If there has been weight loss of more than 20 pounds within one year, divide the loss in half and add it to current weight before entering into the table. 
2. A reduction in rating due to build will be considered once an insured loses enough to qualify for the lower rating and maintains the reduced weight for at least 6-12 months. 
3. Underweight can be more serious than overweight. Keep in mind that in certain people, because of small physical stature, an underweight condition is normal and perfectly healthy. 
4. Sudden weight loss without voluntary dieting is an ominous sign. 
5. Certain conditions require an additional rating because of the enhanced morbidity risk, e.g., hypertension and overweight build. 
6. The weight is in pounds. </list><</list>>



<tables><table>Height		Height MALE						Height MALE Height		Height MALE Height FEMALE					
Height MALE Height

## Document Processing

This code snippet comprises a Python function `split_list_items_` and a script segment that processes a document containing tables and text, converting tables into CSV format and maintaining the document structure with text and tables.

The function `split_list_items_` takes a string as input, likely representing a document with nested lists marked by specific XML tags. It parses this string, extracting items and handling nested lists appropriately. The function then returns a list containing the extracted items.

The script segment following the function processes each page of the document. It identifies tables, converts them to CSV format, and wraps them with XML tags for identification. If lists are present in the document, the script utilizes the `split_list_items_` function to handle them. The processed content is stored in dictionaries for further use.

The `layout_table_to_excel` loads a pandas dataframe in excel format to handle spanned columns/rows in complex tables. It duplicates the spanned row/columns value across corresponding spanned cells to help keep the intergrity of complex tables.

This script segment efficiently manages document content, ensuring tables are properly formatted while preserving the document's structure with text and lists. It serves to handle data extraction and processing tasks involving documents with mixed content types.

In [34]:
import numpy as np
def strip_newline(cell):
    """
    A utility function to strip newline characters from a cell.
    Parameters:
    cell (str): The cell value.
    Returns:
    str: The cell value with newline characters removed.
    """
    return str(cell).strip()

def layout_table_to_excel(document, ids,csv_seperator):    
    """
    Converts an Excel table from a document to a Pandas DataFrame, 
    handling duplicated values across merged cells.

    Args:
        document: Document containing Excel table 
        ids: ID of the Excel table in the document
        csv_seperator: Separator for CSV string conversion

    Returns: 
        Pandas DataFrame representation of the Excel table
    """
    # save the table in excel format to preserve the structure of any merged cells
    buffer = io.BytesIO()    
    document.tables[ids].to_excel(buffer)
    buffer.seek(0)
    # Load workbook, get active worksheet
    wb = openpyxl.load_workbook(buffer)
    worksheet = wb.active
    # Unmerge cells, duplicate merged values to individual cells
    all_merged_cell_ranges: list[CellRange] = list(
            worksheet.merged_cells.ranges
        )
    for merged_cell_range in all_merged_cell_ranges:
        merged_cell: Cell = merged_cell_range.start_cell
        worksheet.unmerge_cells(range_string=merged_cell_range.coord)
        for row_index, col_index in merged_cell_range.cells:
            cell: Cell = worksheet.cell(row=row_index, column=col_index)
            cell.value = merged_cell.value
    # determine table header index
    df = pd.DataFrame(worksheet.values)
    df=df.map(strip_newline)
    df0=df.to_csv(sep=csv_seperator,index=False, header=None)
    row_count=len([x for x in df0.split("\n") if x])
    if row_count>1:
        if not all(value.strip() == '' for value in df0.split("\n")[0].split(csv_seperator)): 
            row_count=1
    # attach table column names
    column_row=0 if row_count==1 else 1
    df.columns = df.iloc[column_row] 
    df = df[column_row+1:]
    return df

def split_list_items_(items):
    """
    Splits the given string into a list of items, handling nested lists.

    Parameters:
    items (str): The input string containing items and possibly nested lists.

    Returns:
    list: A list containing the items extracted from the input string.
    """
    parts = re.split("(<<list>><list>|</list><</list>>)", items)  
    output = []

    inside_list = False
    list_item = ""

    for p in parts:
        if p == "<<list>><list>":
            inside_list = True    
            list_item=p
        elif p == "</list><</list>>":
            inside_list = False
            list_item += p
            output.append(list_item)
            list_item = "" 
        elif inside_list:
            list_item += p.strip()
        else:
            output.extend(p.split('\n'))
    return output

In [35]:
import io
"""
This script processes a document containing tables and text. It converts the tables into CSV format 
and wraps them with XML tags for easy identification. The document structure with text and tables is maintained.
"""
csv_seperator="|" #"\t"
document_holder={}
table_page={}
count=0
# Whether to handle merged cells by duplicating merged value across corresponding individual cells
unmerge_span_cells=True 
# Loop through each page in the document
for ids,page in enumerate(document.pages):
    table_count=len([word for word in page.get_text(config=config).split() if "<tables><table>" in word]) # get the number of table in the extracted document page by header we set earlier
    assert table_count==len(page.tables) # check that number of tables per page is same as *tables extracted by textract TABLE feature
    content=page.get_text(config=config).split("<tables>")
    document_holder[ids]=[]    
    for idx,item in enumerate(content):
        if "<table>" in item:           
            if unmerge_span_cells:
                df=layout_table_to_excel(document, count,csv_seperator)
            else:
                df0=  document.tables[count].to_pandas(use_columns=False).to_csv(header=False, index=None,sep=csv_seperator)
                row_count=len([x for x in df0.split("\n") if x]) #Check the number of rows in the parsed table to determine how to read the table headers. if table row count is 1 then headers is obviously at 0 else headers may or may not be at 0
                #Check if the first row in the csv is empty headers
                if row_count>1:
                    if not all(value.strip() == '' for value in df0.split("\n")[0].split(csv_seperator)): 
                        row_count=1
                df=pd.read_csv(io.StringIO(df0), sep=csv_seperator, 
                               header=0 if row_count==1 else 1, keep_default_na=False) # read table with appropiate column headers
                df.rename(columns=lambda x: '' if str(x).startswith('Unnamed:') else x, inplace=True) 
            table=df.to_csv(index=None, sep=csv_seperator)

            if ids in table_page:
                table_page[ids].append(table)
            else:
                table_page[ids]=[table]
            # Extract table data and remaining content
            pattern = re.compile(r'<table>(.*?)(</table>)', re.DOTALL) 
            data=item
            table_match = re.search(pattern, data)
            table_data = table_match.group(1) if table_match else '' 
            remaining_content = data[table_match.end():] if table_match else data            
            content[idx]=f"<<table>><table>{table}</table><</table>>" ## attach xml tags to differentiate table from other text
            count+=1
            # Check for list items in remaining content
            if "<<list>>" in remaining_content:
                output=split_list_items_(remaining_content)
                output=[x.strip() for x in output if x.strip()]
                document_holder[ids].extend([content[idx]]+output)           
            else:
                document_holder[ids].extend([content[idx]]+[x.strip() for x in remaining_content.split('\n') if x.strip()]) # split other text by new line to be independent items in the python list.
        else:   
            # Check for list items and tables in remaining content
            if "<<list>>" in item and "<table>" not in item:   
                output=split_list_items_(item)
                output=[x.strip() for x in output if x.strip()]
                document_holder[ids].extend(output)
            else:
                document_holder[ids].extend([x.strip() for x in item.split("\n") if x.strip()])

Here we first flatten a nested list into a single list and then join its elements using newline characters. Subsequently, the string is split into segments based on the `<titles>` tag (split by title section hierarchy), generating a list of sub-section segments. Following this, the function `sub_header_content_splitta` is defined to process a string, splitting it by XML tags and extracting text segments, excluding segments containing specific XML tags such as `<header>`, `<list>`, or `<table>`. This function takes a string as input, applies a regular expression pattern to split it by XML tags, and iterates through the resulting segments to filter out those containing the specified XML tags. The extracted text segments are then returned as a list. 

In [36]:
# # Flatten the nested list document_holder into a single list and Join the flattened list by "\n"
flattened_list = [item for sublist in document_holder.values() for item in sublist]
result = "\n".join( flattened_list)
header_split=result.split("<titles>")

def sub_header_content_splitta(string):   
    """
    Splits the input string by XML tags and returns a list containing the segments of text,
    excluding segments containing specific XML tags such as "<header>", "<list>", or "<table>".

    Parameters:
    string (str): The input string to be processed.

    Returns:
    list: A list containing the segments of text extracted from the input string.
    """ 
    pattern = re.compile(r'<<[^>]+>>')
    segments = re.split(pattern, string)
    result = []
    for segment in segments:
        if segment.strip():
            if "<header>" not in segment and "<list>" not in segment and  "<table>" not in segment:
                segment=[x.strip() for x in segment.split('\n') if x.strip()]
                result.extend(segment)
            else:
                result.append(segment)
    return result


## Document Chunking
This cell iterates through the document per title section and chunks content within each sub-sections in the following manner:
- It uses number of words as chunking threshold.
- It looks for the different xml tags to identify the different document content types. 
    - Iterating through the various sub-section within a section title identified by the **sub-section header** and only chunking contents within each sub-section. No chunk include multiple subsection content even if the max words threshold has not been met.
    - If a table xml tag is found, it checks if there is a sentence before that table (the heueristics employed here is that the sentence before a table is usually the table header) and use it as table headers. It then splits table by rows until desired chunk is achieved and appends the corresponding section header to the table chunk.
    - If a list is found, split list by items until desired chunk is achieved. Employ same heuristics as above and append list headers to all list chunk.
    - For other text, it chunks by paragraphs and appends each sub-section header to the corresponding chunks.
- A dicionary containing each complete sub-section is also stored to be used as metadata during indexing.
- The complete table found in each chunk is also stored for metadata purposes.

In [None]:
import re
import pandas as pd
from io import StringIO

max_words = 200
chunks = {}
table_header_dict={} 
chunk_header_mapping={}
list_header_dict={}

# iterate through each title section
for title_ids, items in enumerate(header_split):
    title_chunks = []
    current_chunk = []
    num_words = 0   
    table_header_dict[title_ids]={}
    chunk_header_mapping[title_ids]={}
    list_header_dict[title_ids]={}
    chunk_counter=0
    for item_ids,item in enumerate(items.split('<headers>')): #headers
        # print("".join(current_chunk).strip())
        lines=sub_header_content_splitta(item)             
        SECTION_HEADER=None 
        TITLES=None
        num_words = 0  
        for ids_line,line in enumerate(lines): #header lines  
            
            if line.strip():
                if "<title>" in line:   
                    TITLES=re.findall(r'<title>(.*?)</title>', line)[0].strip()
                    line=TITLES 
                    if re.sub(r'<[^>]+>', '', "".join(lines)).strip()==TITLES:
                        chunk_header_mapping[title_ids][chunk_counter]=lines
                        chunk_counter+=1
                if "<header>" in line:   
                    SECTION_HEADER=re.findall(r'<header>(.*?)</header>', line)[0].strip()
                    line=SECTION_HEADER    
                    first_header_portion=True
                next_num_words = num_words + len(re.findall(r'\w+', line))  

                if  "<table>" not in line and "<list>" not in line:
                    if next_num_words > max_words and "".join(current_chunk).strip()!=SECTION_HEADER and current_chunk and "".join(current_chunk).strip()!=TITLES:
                
                        if SECTION_HEADER :
                            if first_header_portion:
                                first_header_portion=False                                            
                            else:
                                current_chunk.insert(0, SECTION_HEADER.strip())                       
                        
                        title_chunks.append(current_chunk)                  
                        chunk_header_mapping[title_ids][chunk_counter]=lines
               
                        current_chunk = []
                        num_words = 0 
                        chunk_counter+=1
             
                    current_chunk.append(line)    
                    num_words += len(re.findall(r'\w+', line))

                """
                Goal is to segment out table items and chunks intelligently.
                We chunk the table by rows and for each chunk of the table we append the table column headers
                and table headers if any. This way we preserve the table information across each chunks.
                This will help improve semantic search where all the chunks relating to a table would be in the 
                top k=n response giving the LLM mcomplet information on the table.
                """

                if "<table>" in line:
                    # Get table header which is usually line before table in document              
                    line_index=lines.index(line)
                    if line_index!=0 and "<table>" not in lines[line_index-1] and "<list>" not in lines[line_index-1]: #Check if table is first item on the page, then they wont be a header (header may be included it table) and also if table is the the last item in the list
                        header=lines[line_index-1].replace("<header>","").replace("</header>","")
                    else:
                        header=""                   
              
                    table = line.split("<table>")[-1].split("</table>")[0] # get table from demarcators              
                    df=pd.read_csv(io.StringIO(table), sep=csv_seperator, keep_default_na=False,header=None)
                    df.columns = df.iloc[0]
                    df = df[1:]
                    df.rename(columns=lambda x: '' if str(x).startswith('Unnamed:') else x, inplace=True)                    
                    table_chunks = []
                    curr_chunk = [df.columns.to_list()] #start current chunk with table column names    
                    words=len(re.findall(r'\w+', str(current_chunk)+" "+str(curr_chunk)))  
                    # Iterate through the rows in the table
                    for row in df.itertuples(index=False):
                        curr_chunk.append(row)         
                        words+=len(re.findall(r'\w+', str(row)))
                        if words > max_words:                        
                            if [x for x in table_header_dict[title_ids] if chunk_counter == x]:
                                table_header_dict[title_ids][chunk_counter].extend([header]+[table])
                            else:
                                table_header_dict[title_ids][chunk_counter]=[header]+[table]                            
                            table_chunks.append("\n".join([csv_seperator.join(str(x) for x in curr_chunk[0])] + [csv_seperator.join(str(x) for x in r) for r in curr_chunk[1:]])) #join chunk lines together to for a csv 
                            tab_chunk="\n".join([csv_seperator.join(str(x) for x in curr_chunk[0])] + [csv_seperator.join(str(x) for x in r) for r in curr_chunk[1:]]) #join chunk lines together to for a csv
                            words = len(re.findall(r'\w+', str(curr_chunk[0]))) # set word count to word length of column header names
                            if header: #If header  attach header to table                         
                                if current_chunk and current_chunk[-1].strip().lower()==header.strip().lower(): #check if header is in the chunk and remove to avoid duplicacy of header in chunk                        
                                    current_chunk.pop(-1)
                                # Append section header to table
                                if SECTION_HEADER and SECTION_HEADER.lower().strip() != header.lower().strip():
                                    if first_header_portion:
                                        first_header_portion=False
                                    else:
                                        current_chunk.insert(0, SECTION_HEADER.strip())                             
                                current_chunk.extend([header.strip()+':' if not header.strip().endswith(':') else header.strip() ]+[tab_chunk]) #enrich table header with ':'
                                title_chunks.append(current_chunk)                           
                        
                            else:
                                if SECTION_HEADER:
                                    if first_header_portion:
                                        first_header_portion=False
                                    else:
                                        current_chunk.insert(0, SECTION_HEADER.strip())                                
                                current_chunk.extend([tab_chunk])
                                title_chunks.append(current_chunk)                        
                            chunk_header_mapping[title_ids][chunk_counter]=lines
                            chunk_counter+=1
                            num_words=0
                            current_chunk=[]
                            curr_chunk = [curr_chunk[0]]
                    
                    if curr_chunk != [df.columns.to_list()] and lines.index(line) == len(lines)-1: #if table chunk still remaining and table is last item in page append as last chunk
                        table_chunks.append("\n".join([csv_seperator.join(str(x) for x in curr_chunk[0])] + [csv_seperator.join(str(x) for x in r) for r in curr_chunk[1:]]))
                        tab_chunk="\n".join([csv_seperator.join(str(x) for x in curr_chunk[0])] + [csv_seperator.join(str(x) for x in r) for r in curr_chunk[1:]])                        
                        if [x for x in table_header_dict[title_ids] if chunk_counter == x]:
                            table_header_dict[title_ids][chunk_counter].extend([header]+[table])
                        else:
                            table_header_dict[title_ids][chunk_counter]=[header]+[table]   
                        
                        if header: 
                            if current_chunk and current_chunk[-1].strip().lower()==header.strip().lower():#check if header is in the chunk and remove to avoid duplicacy of header in chunk
                                current_chunk.pop(-1) 
                            if SECTION_HEADER and SECTION_HEADER.lower().strip() != header.lower().strip():
                                if first_header_portion:
                                    first_header_portion=False
                                else:
                                    current_chunk.insert(0, SECTION_HEADER.strip())                          
                            current_chunk.extend([header.strip()+':' if not header.strip().endswith(':') else header.strip() ]+[tab_chunk])
                            title_chunks.append(current_chunk)                   
                        else:
                            if SECTION_HEADER:
                                if first_header_portion:
                                    first_header_portion=False
                                else:
                                    current_chunk.insert(0, SECTION_HEADER.strip())                            
                            current_chunk.extend([tab_chunk])
                            title_chunks.append(current_chunk)             
                        chunk_header_mapping[title_ids][chunk_counter]=lines
                        chunk_counter+=1
                        num_words=0
                        current_chunk=[]
                    elif curr_chunk != [df.columns.to_list()] and lines.index(line) != len(lines)-1: #if table is not last item in page and max word threshold is not reached, send no next loop
                        table_chunks.append("\n".join([csv_seperator.join(str(x) for x in curr_chunk[0])] + [csv_seperator.join(str(x) for x in r) for r in curr_chunk[1:]]))
                        tab_chunk="\n".join([csv_seperator.join(str(x) for x in curr_chunk[0])] + [csv_seperator.join(str(x) for x in r) for r in curr_chunk[1:]])
                        
                        if [x for x in table_header_dict[title_ids] if chunk_counter == x]:
                            table_header_dict[title_ids][chunk_counter].extend([header]+[table])
                        else:
                            table_header_dict[title_ids][chunk_counter]=[header]+[table]                         
                        if header:               
                            if current_chunk and current_chunk[-1].strip().lower()==header.strip().lower():#check if header is in the chunk and remove to avoid duplicacy of header in chunk
                                current_chunk.pop(-1) 
                            current_chunk.extend([header.strip()+':' if not header.strip().endswith(':') else header.strip() ]+[tab_chunk])
                        else:
                            current_chunk.extend([tab_chunk])                  
                        num_words=words
                     

                """
                Goal is to segment out list items and chunk intelligently.
                We chunk each list by items in the list and 
                for each list chunk we append the list header to the chunk to preserve the information of the list across chunks.
                This would boost retrieval process where question pertaining to a list will have all list chunks within
                the topK=n responses.
                """

                if "<list>" in line:
                    # Get list header which is usually line before list in document
                    line_index=lines.index(line)
                    if line_index!=0 and "<table>" not in lines[line_index-1] and "<list>" not in lines[line_index-1]: #Check if table or list is the previous item on the page, then they wont be a header
                        header=lines[line_index-1].replace("<header>","").replace("</header>","")
                    else:
                        header=""           
                    list_pattern = re.compile(r'<list>(.*?)(?:</list>|$)', re.DOTALL)   ## Grab all list contents within the list xml tags        
                    list_match = re.search(list_pattern, line)
                    list_ = list_match.group(1)
                    list_lines=list_.split("\n")                

                    curr_chunk = []  
                    words=len(re.findall(r'\w+', str(current_chunk)))  #start word count from any existing chunk
                    # Iterate through the items in the list
                    for lyst_item in list_lines:
                        curr_chunk.append(lyst_item)         
                        words+=len(re.findall(r'\w+', lyst_item)) 
                        if words >= max_words: # 
                            if [x for x in list_header_dict[title_ids] if chunk_counter == x]:
                                list_header_dict[title_ids][chunk_counter].extend([header]+[list_])
                            else:
                                list_header_dict[title_ids][chunk_counter]=[header]+[list_]  
                            words=0     
                            list_chunk="\n".join(curr_chunk)
                            if header: # attach list header                       
                                if current_chunk and current_chunk[-1].strip().lower()==header.strip().lower():#check if header is in the chunk and remove to avoid duplicacy of header in chunk                        
                                    current_chunk.pop(-1)  
                                # Append section content header to list
                                if SECTION_HEADER and SECTION_HEADER.lower().strip() != header.lower().strip():
                                    if first_header_portion:
                                        first_header_portion=False
                                    else:
                                        current_chunk.insert(0, SECTION_HEADER.strip())
                                    
                                current_chunk.extend([header.strip()+':' if not header.strip().endswith(':') else header.strip() ]+[list_chunk]) 
                                title_chunks.append(current_chunk)                          
                         
                            else:
                                if SECTION_HEADER:
                                    if first_header_portion:
                                        first_header_portion=False
                                    else:
                                        current_chunk.insert(0, SECTION_HEADER.strip())
                                    
                                current_chunk.extend([list_chunk])
                                title_chunks.append(current_chunk)                            
                            chunk_header_mapping[title_ids][chunk_counter]=lines
                            chunk_counter+=1
                            num_words=0
                            current_chunk=[]
                            curr_chunk = []
                    if curr_chunk  and lines.index(line) == len(lines)-1: #if list chunk still remaining and list is last item in page append as last chunk
                        list_chunk="\n".join(curr_chunk)
                        if [x for x in list_header_dict[title_ids] if chunk_counter == x]:
                            list_header_dict[title_ids][chunk_counter].extend([header]+[list_])
                        else:
                            list_header_dict[title_ids][chunk_counter]=[header]+[list_]  
                        if header: 
                            if current_chunk and current_chunk[-1].strip().lower()==header.strip().lower(): #check if header is in the chunk and remove to avoid duplicacy of header in chunk
                                current_chunk.pop(-1)                            
                            if SECTION_HEADER and SECTION_HEADER.lower().strip() != header.lower().strip():
                                if first_header_portion:
                                    first_header_portion=False
                                else:
                                    current_chunk.insert(0, SECTION_HEADER.strip())                   
                            current_chunk.extend([header.strip()+':' if not header.strip().endswith(':') else header.strip() ]+[list_chunk])
                            title_chunks.append(current_chunk)                        
                        else:
                            if SECTION_HEADER:
                                if first_header_portion:
                                    first_header_portion=False
                                else:
                                    current_chunk.insert(0, SECTION_HEADER.strip())                   
                            current_chunk.extend([list_chunk])
                            title_chunks.append(current_chunk)                     
                        chunk_header_mapping[title_ids][chunk_counter]=lines
                        chunk_counter+=1
                        num_words=0
                        current_chunk=[]
                    elif curr_chunk and lines.index(line) != len(lines)-1: #if list is not last item in page and max word threshold is not reached, send to next loop          
                        list_chunk="\n".join(curr_chunk)
                        if [x for x in list_header_dict[title_ids] if chunk_counter == x]:
                            list_header_dict[title_ids][chunk_counter].extend([header]+[list_])
                        else:
                            list_header_dict[title_ids][chunk_counter]=[header]+[list_]  
                        if header:               
                            if current_chunk and current_chunk[-1].strip().lower()==header.strip().lower():#check if header is in the chunk and remove to avoid duplicacy of header in chunk
                                current_chunk.pop(-1) 
                            current_chunk.extend([header.strip()+':' if not header.strip().endswith(':') else header.strip() ]+[list_chunk])
                        else:
                            current_chunk.extend([list_chunk])                  
                        num_words=words


        if current_chunk and "".join(current_chunk).strip()!=SECTION_HEADER and "".join(current_chunk).strip()!=TITLES:
    
            if SECTION_HEADER:
                if first_header_portion:
                    first_header_portion=False
                else:
                    current_chunk.insert(0, SECTION_HEADER.strip())         
            title_chunks.append(current_chunk)
            chunk_header_mapping[title_ids][chunk_counter]=lines
            current_chunk=[]
            chunk_counter+=1
    if current_chunk:
  
        title_chunks.append(current_chunk) 
        chunk_header_mapping[title_ids][chunk_counter]=lines
    chunks[title_ids] = title_chunks
       
    for title_id, title_chunks in chunks.items():
        for chunk_id, chunk in enumerate(title_chunks):
            # Create a key for the chunk file
            key = f'chunk/title_{title_id}/chunk_{chunk_id}.txt'

            # Convert the chunk list to a string
            chunk_str = '\n'.join(chunk)

            # Upload the chunk to S3
            s3.put_object(
                Bucket=bucket_name,
                Key=key,
                Body=chunk_str.encode('utf-8')
            )
    

In [38]:
# Print chunks per title section
for i, chunk in enumerate(chunks[2][:10], start=0):
    print(f'Chunk {i}:')
    for item in chunk:
        print(item)
    print('\n')

Chunk 0:
Health Insurance Build Charts
<title>Health Insurance Build Charts </title>:
1. If there has been weight loss of more than 20 pounds within one year, divide the loss in half and add it to current weight before entering into the table. 
2. A reduction in rating due to build will be considered once an insured loses enough to qualify for the lower rating and maintains the reduced weight for at least 6-12 months. 
3. Underweight can be more serious than overweight. Keep in mind that in certain people, because of small physical stature, an underweight condition is normal and perfectly healthy. 
4. Sudden weight loss without voluntary dieting is an ominous sign. 
5. Certain conditions require an additional rating because of the enhanced morbidity risk, e.g., hypertension and overweight build. 
6. The weight is in pounds.
Height|Height|MALE|MALE|MALE|MALE|MALE|MALE|Height|Height|FEMALE|FEMALE|FEMALE|FEMALE|FEMALE|FEMALE
F 
T E E|I 
H N C|20% for 
Weights than less|Weight Avg.|Premium

In [39]:
# List of title header sections document was split into
for x in chunk_header_mapping:
    if chunk_header_mapping[x]:
        try:
            title_pattern = re.compile(r'<title>(.*?)(?:</title>|$)', re.DOTALL)       
            title_match = re.search(title_pattern, chunk_header_mapping[x][0][0])
            title_ = title_match.group(1) if title_match else ""
            print(title_, end='\n')
        except:
            continue


Preferred Underwriting Guidelines 
Health Insurance Build Charts 
Juvenile Build Charts 
Occupations Not Eligible for Health Insurance 
Non-Medical Guidelines 
Declinations 
Declinable Medications 
Common Medications/Therapeutic Use Reference 
Medical Underwriting Guidelines 
Health Underwriting Guide 18 
19 
20 
21 
22 
23 
24 
Health Underwriting Guide 26 
27 
28 
29 
30 
31 
Health Underwriting Guide 32 


Upload section contents (title and headers for each chunk) to s3

In [40]:
with open (f"{doc_id}.json", "w") as f:
    json.dump(chunk_header_mapping,f)
s3.upload_file(f"{doc_id}.json", BUCKET, f"{doc_id}.json")

## Indexing

In [41]:
import boto3
import time
vector_store_name = f'bedrock-sample-rag-{suffix}'
index_name = f"bedrock-sample-rag-index-{suffix}"
aoss_client = boto3_session.client('opensearchserverless')
bedrock_kb_execution_role = create_bedrock_execution_role(bucket_name=bucket_name)
bedrock_kb_execution_role_arn = bedrock_kb_execution_role['Role']['Arn']

In [42]:
# create security, network and data access policies within OSS
encryption_policy, network_policy, access_policy = create_policies_in_oss(vector_store_name=vector_store_name,
                       aoss_client=aoss_client,
                       bedrock_kb_execution_role_arn=bedrock_kb_execution_role_arn)
collection = aoss_client.create_collection(name=vector_store_name,type='VECTORSEARCH')

In [43]:
pp.pprint(collection)

{ 'ResponseMetadata': { 'HTTPHeaders': { 'connection': 'keep-alive',
                                         'content-length': '314',
                                         'content-type': 'application/x-amz-json-1.0',
                                         'date': 'Thu, 23 May 2024 14:32:07 '
                                                 'GMT',
                                         'x-amzn-requestid': '6efa0aed-898c-4e53-96f0-c12dcef57398'},
                        'HTTPStatusCode': 200,
                        'RequestId': '6efa0aed-898c-4e53-96f0-c12dcef57398',
                        'RetryAttempts': 0},
  'createCollectionDetail': { 'arn': 'arn:aws:aoss:us-west-2:533356244334:collection/m2lfe3wceui8dktho4b0',
                              'createdDate': 1716474727117,
                              'id': 'm2lfe3wceui8dktho4b0',
                              'kmsKeyArn': 'auto',
                              'lastModifiedDate': 1716474727117,
                             

In [44]:
collection_id = collection['createCollectionDetail']['id']
host = collection_id + '.' + region_name + '.aoss.amazonaws.com'
print(host)

m2lfe3wceui8dktho4b0.us-west-2.aoss.amazonaws.com


In [45]:
# wait for collection creation
response = aoss_client.batch_get_collection(names=[vector_store_name])
# Periodically check collection status
while (response['collectionDetails'][0]['status']) == 'CREATING':
    print('Creating collection...')
    time.sleep(30)
    response = aoss_client.batch_get_collection(names=[vector_store_name])
print('\nCollection successfully created:')
print(response["collectionDetails"])

Creating collection...

Collection successfully created:
[{'arn': 'arn:aws:aoss:us-west-2:533356244334:collection/m2lfe3wceui8dktho4b0', 'collectionEndpoint': 'https://m2lfe3wceui8dktho4b0.us-west-2.aoss.amazonaws.com', 'createdDate': 1716474727117, 'dashboardEndpoint': 'https://m2lfe3wceui8dktho4b0.us-west-2.aoss.amazonaws.com/_dashboards', 'id': 'm2lfe3wceui8dktho4b0', 'kmsKeyArn': 'auto', 'lastModifiedDate': 1716474753816, 'name': 'bedrock-sample-rag-707', 'standbyReplicas': 'ENABLED', 'status': 'ACTIVE', 'type': 'VECTORSEARCH'}]


In [46]:
# create oss policy and attach it to Bedrock execution role
create_oss_policy_attach_bedrock_execution_role(collection_id=collection_id,
                                                bedrock_kb_execution_role=bedrock_kb_execution_role)

Opensearch serverless arn:  arn:aws:iam::533356244334:policy/AmazonBedrockOSSPolicyForKnowledgeBase_871


In [47]:
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
credentials = boto3.Session().get_credentials()
awsauth = auth = AWSV4SignerAuth(credentials, region_name, service)

index_name = f"bedrock-sample-index-{suffix}"
body_json = {
   "settings": {
      "index.knn": "true",
       "number_of_shards": 1,
       "knn.algo_param.ef_search": 512,
       "number_of_replicas": 0,
   },
   "mappings": {
      "properties": {
         "vector": {
            "type": "knn_vector",
            "dimension": 1536,
             "method": {
                 "name": "hnsw",
                 "engine": "faiss"
             },
         },
         "text": {
            "type": "text"
         },
         "text-metadata": {
            "type": "text"         }
      }
   }
}
# Build the OpenSearch client
oss_client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)
# # It can take up to a minute for data access rules to be enforced
time.sleep(60)

In [48]:
# Create index
response = oss_client.indices.create(index=index_name, body=json.dumps(body_json))
print('\nCreating index:')
print(response)
time.sleep(60) # index creation can take up to a minute


Creating index:
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'bedrock-sample-index-707'}


In [49]:
opensearchServerlessConfiguration = {
            "collectionArn": collection["createCollectionDetail"]['arn'],
            "vectorIndexName": index_name,
            "fieldMapping": {
                "vectorField": "vector",
                "textField": "text",
                "metadataField": "text-metadata"
            }
        }

chunkingStrategyConfiguration = {
    "chunkingStrategy": "NONE" #Set to no chunking since we are manually chunking based on the document
}

s3Configuration = {
    "bucketArn": f"arn:aws:s3:::{bucket_name}",
    "inclusionPrefixes":["chunk/"] # you can use this if you want to create a KB using data within s3 prefixes.
}

embeddingModelArn = f"arn:aws:bedrock:{region_name}::foundation-model/amazon.titan-embed-text-v1"

name = f"bedrock-sample-knowledge-base-{suffix}"
description = "Insurance Underwriting KB."
roleArn = bedrock_kb_execution_role_arn


In [50]:
# Create a KnowledgeBase
from retrying import retry

@retry(wait_random_min=1000, wait_random_max=2000,stop_max_attempt_number=7)
def create_knowledge_base_func():
    create_kb_response = bedrock_agent_client.create_knowledge_base(
        name = name,
        description = description,
        roleArn = roleArn,
        knowledgeBaseConfiguration = {
            "type": "VECTOR",
            "vectorKnowledgeBaseConfiguration": {
                "embeddingModelArn": embeddingModelArn
            }
        },
        storageConfiguration = {
            "type": "OPENSEARCH_SERVERLESS",
            "opensearchServerlessConfiguration":opensearchServerlessConfiguration
        }
    )
    return create_kb_response["knowledgeBase"]

In [51]:
try:
    kb = create_knowledge_base_func()
except Exception as err:
    print(f"{err=}, {type(err)=}")

In [52]:
pp.pprint(kb)

{ 'createdAt': datetime.datetime(2024, 5, 23, 14, 34, 38, 853420, tzinfo=tzlocal()),
  'description': 'Insurance Underwriting KB.',
  'knowledgeBaseArn': 'arn:aws:bedrock:us-west-2:533356244334:knowledge-base/A7WJNFHDI8',
  'knowledgeBaseConfiguration': { 'type': 'VECTOR',
                                  'vectorKnowledgeBaseConfiguration': { 'embeddingModelArn': 'arn:aws:bedrock:us-west-2::foundation-model/amazon.titan-embed-text-v1'}},
  'knowledgeBaseId': 'A7WJNFHDI8',
  'name': 'bedrock-sample-knowledge-base-707',
  'roleArn': 'arn:aws:iam::533356244334:role/AmazonBedrockExecutionRoleForKnowledgeBase_871',
  'status': 'CREATING',
  'storageConfiguration': { 'opensearchServerlessConfiguration': { 'collectionArn': 'arn:aws:aoss:us-west-2:533356244334:collection/m2lfe3wceui8dktho4b0',
                                                                   'fieldMapping': { 'metadataField': 'text-metadata',
                                                                                   

In [53]:
# Get KnowledgeBase 
get_kb_response = bedrock_agent_client.get_knowledge_base(knowledgeBaseId = kb['knowledgeBaseId'])

In [54]:
# Create a DataSource in KnowledgeBase 
create_ds_response = bedrock_agent_client.create_data_source(
    name = name,
    description = description,
    knowledgeBaseId = kb['knowledgeBaseId'],
    dataSourceConfiguration = {
        "type": "S3",
        "s3Configuration":s3Configuration
    },
    vectorIngestionConfiguration = {
        "chunkingConfiguration": chunkingStrategyConfiguration
    }
)
ds = create_ds_response["dataSource"]
pp.pprint(ds)

{ 'createdAt': datetime.datetime(2024, 5, 23, 14, 34, 39, 450973, tzinfo=tzlocal()),
  'dataSourceConfiguration': { 's3Configuration': { 'bucketArn': 'arn:aws:s3:::textract-2-kb-ws-us-west-2-533356244334',
                                                    'inclusionPrefixes': [ 'chunk/']},
                               'type': 'S3'},
  'dataSourceId': 'QRVNWSBYEO',
  'description': 'Insurance Underwriting KB.',
  'knowledgeBaseId': 'A7WJNFHDI8',
  'name': 'bedrock-sample-knowledge-base-707',
  'status': 'AVAILABLE',
  'updatedAt': datetime.datetime(2024, 5, 23, 14, 34, 39, 450973, tzinfo=tzlocal()),
  'vectorIngestionConfiguration': { 'chunkingConfiguration': { 'chunkingStrategy': 'NONE'}}}


In [55]:
# Get DataSource 
bedrock_agent_client.get_data_source(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

{'ResponseMetadata': {'RequestId': '9494a12f-f4e7-49db-9ee0-69bda636b45c',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Thu, 23 May 2024 14:34:39 GMT',
   'content-type': 'application/json',
   'content-length': '545',
   'connection': 'keep-alive',
   'x-amzn-requestid': '9494a12f-f4e7-49db-9ee0-69bda636b45c',
   'x-amz-apigw-id': 'YOoP9GLJvHcEmAA=',
   'x-amzn-trace-id': 'Root=1-664f53ff-3b2290673b45a2c04a3a0404'},
  'RetryAttempts': 0},
 'dataSource': {'knowledgeBaseId': 'A7WJNFHDI8',
  'dataSourceId': 'QRVNWSBYEO',
  'name': 'bedrock-sample-knowledge-base-707',
  'status': 'AVAILABLE',
  'description': 'Insurance Underwriting KB.',
  'dataSourceConfiguration': {'type': 'S3',
   's3Configuration': {'bucketArn': 'arn:aws:s3:::textract-2-kb-ws-us-west-2-533356244334',
    'inclusionPrefixes': ['chunk/']}},
  'vectorIngestionConfiguration': {'chunkingConfiguration': {'chunkingStrategy': 'NONE'}},
  'createdAt': datetime.datetime(2024, 5, 23, 14, 34, 39, 450973, tzinfo=tzlocal())

### Start ingestion job
Once the KB and data source is created, we can start the ingestion job.
During the ingestion job, KB will fetch the documents in the data source, pre-process it to extract text, chunk it based on the chunking size provided, create embeddings of each chunk and then write it to the vector database, in this case OSS.

In [63]:
# Start an ingestion job
start_job_response = bedrock_agent_client.start_ingestion_job(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])
job = start_job_response["ingestionJob"]
pp.pprint(job)

#  Get job 
while(job['status']!='COMPLETE' ):
      get_job_response = bedrock_agent_client.get_ingestion_job(
          knowledgeBaseId = kb['knowledgeBaseId'],
            dataSourceId = ds["dataSourceId"],
            ingestionJobId = job["ingestionJobId"]
      )
job = get_job_response["ingestionJob"]
pp.pprint(job)
time.sleep(40)
print("Print KB ID")
kb_id = kb["knowledgeBaseId"]
pp.pprint(kb_id)


Print KB ID
'A7WJNFHDI8'


In [64]:
%store kb_id

Stored 'kb_id' (str)


### Try out KB using Retrieve API
Retrieve API retrieves the most relavant chunks allowing you to choose between passing the chunks directly to the model or to perform post processing tecniques such as re-ranking.

In [65]:
# try out KB using Retrieve
bedrock_agent_runtime_client = boto3.client("bedrock-agent-runtime", region_name=region_name)
model_id = "anthropic.claude-v2" # try with both claude instant as well as claude-v2. for claude v2 - "anthropic.claude-v2"
model_arn = f'arn:aws:bedrock:{region_name}::foundation-model/{model_id}'

In [66]:
time.sleep(5)
query = "What is The Health Insurance Build Chart?" #Type your query here. 


# retreive api for fetching only the relevant context.
relevant_documents = bedrock_agent_runtime_client.retrieve(
    retrievalQuery= {
        'text': query
    },
    knowledgeBaseId=kb_id,
    retrievalConfiguration= {
        'vectorSearchConfiguration': {
            'numberOfResults': 3 # will fetch top 3 documents which matches closely with the query.
        }
    }
)

In [81]:
passage = relevant_documents['retrievalResults']
contextList=[]
for content in passage:
    contextList.append(content['content']['text'])
    print(content['content']['text'])

Height|Height|MALE|MALE|MALE|MALE|MALE|MALE|Height|Height|FEMALE|FEMALE|FEMALE|FEMALE|FEMALE|FEMALE
6|7|152|223|295-319|320-339|340-360|361+|6|3|123|176|249-271|272-286|287-303|304+
6|8|158|228|302-327|328-347|348-368|369+|6|4|127|181|256-278|279-295|296-312|313+
The Health Insurance Build Chart is a guide to the rating action World Insurance Company will take regarding weight The percentage increases assume that there are no other impairments present. If other impairments are found, the judgment of the underwriter will determine what action will be taken. Weights greater than those in the chart will render an applicant uninsurable for health coverage.
The Health Insurance Build Chart is for use only with insureds and dependents age 15 or over. Cases involving overweight dependents under age 15 will be considered indi- vidually by the underwriter in consultation with the Medical Director.
Health Insurance Build Charts
:
1. If there has been weight loss of more than 20 pounds within one

# RAG

#### RERANKING (OPTIONAL)

This section touches on Reranking. It uses the deployed Sagemaker Jumpstart [BGE M3 model](https://huggingface.co/BAAI/bge-m3). To use this code logic you first have to deploy the model to an endpoint. After which you collect the endpoint name and replace in the logic.

**NOTE**: You can skip this section if you do not have a deployed model.

In [None]:
%pip install sentence_transformers

In [79]:
import numpy as np
from sentence_transformers import CrossEncoder

def rerank_cross_encoder(query, docs):
    cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

    # Create pairs as strings
    pairs = [[query, doc_text] for doc_text in docs]

    # Predict scores for pairs
    scores = cross_encoder.predict(pairs)

    # Get the indices of the chunks with the highest scores
    n = 2  # Number of top chunks to keep
    top_indices = np.argsort(scores)[::-1][:n]

    # Put the chunks in a list
    reranked_docs = [docs[idx] for idx in top_indices]

    return reranked_docs

In [83]:
reranked_passages=rerank_cross_encoder(query,contextList)

## Bedrock Anthropic LLM Inference

Using the a prompt template with placeholders for the retrieved passages as `passages` under **document** tags and any retrieved standalone tables and list found within each retrieved passages as `tab` under **additional_information** tags.\
Change the `csv_seperator` variable name to what was used during chunking. default is "|" pipe character.\
Anthropic Claude models (Claude 3 and 2) is used to generate a response to the user question.

In [84]:
csv_seperator="|"
prompt_template=f"""You are a helpful, obedient and truthful insurance underwriting assistance.

<document>
{reranked_passages}
</document>          


<instructions>
When providing your response based on the document:
1. Understand the question to know what is being asked of you.
2. Review the entire document provided and check if it contains relevant information to answer the question. Only pay attention to passages with relevant information.
3. Any tables provided within the document or additional information are delimited by {csv_seperator} character.
4. If the document is sufficient to answer the question, provide a comprehensive answer ENTIRELY based on the document provided. DO NOT make up answers not present in the document.
5. If the answer is not available in the document, say so.
</instructions>

Question: {query}
if able to answer:
    Include in your response before your answer:    
    <source>document or additional info tag(s) containing the relevant info</source>"""

print(f' Size of prompt token is {client.count_tokens(prompt_template)}')

 Size of prompt token is 768


In [85]:
model_id="anthropic.claude-3-sonnet-20240229-v1:0" #"anthropic.claude-3-sonnet-20240229-v1:0""anthropic.claude-v2","anthropic.claude-3-haiku-20240307-v1:0"
model_response,input_tokens, output_tokens=_invoke_bedrock_with_retries([], "", prompt_template,model_id , [])

<source>['The Health Insurance Build Chart is a guide to the rating action World Insurance Company will take regarding weight The percentage increases assume that there are no other impairments present. If other impairments are found, the judgment of the underwriter will determine what action will be taken. Weights greater than those in the chart will render an applicant uninsurable for health coverage.', 'The Health Insurance Build Chart is for use only with insureds and dependents age 15 or over. Cases involving overweight dependents under age 15 will be considered indi- vidually by the underwriter in consultation with the Medical Director.']</source>

The Health Insurance Build Chart is a guide used by World Insurance Company to determine rating actions (such as percentage increases in premiums) based on an applicant's weight. The chart provides weight ranges for different heights and genders, and indicates the corresponding premium increase percentages. Weights greater than the max

# Conclusion

This notebook showcases the extraction of content from a document while maintaining its layout structure. Additionally, we processed and chunked the document, ensuring the integrity of the information was preserved. Furthermore, we indexed these chunks and associated hierarchical metadata information, offering flexibility in information retrieval. 

Finally, we conducted a RAG query and generated contextual answers.