## Create a Knowledge Base with Hierarchical chunking strategy

#### Concept

**Hierarchical Chunking**: Organizes your data into a hierarchical structure, allowing for more granular and efficient retrieval based on the inherent relationships within your data. 

Organizing your data into a hierarchical structure enables your **RAG (Retrieval-Augmented Generation)** workflow to efficiently navigate and retrieve information from complex, nested datasets. After documents are parsed, the first step is to **chunk** them based on the **parent** and **child chunking size**. 

- **Parent chunks (higher level)** represent larger segments, such as entire documents or sections.
- **Child chunks (lower level)** represent smaller segments, such as paragraphs or sentences.

The relationship between parent and child chunks is maintained, allowing for **efficient retrieval and navigation** of the corpus.

#### Benefits

- **Efficient Retrieval**: The hierarchical structure enables faster and more targeted retrieval of relevant information by first performing a **semantic search** on child chunks and then returning the parent chunk. By replacing child chunks with parent chunks, we provide **larger and more comprehensive context** to the foundation model (FM).
- **Context Preservation**: Organizing the corpus hierarchically helps maintain contextual relationships between chunks, ensuring more **coherent and contextually relevant** text generation.

> **Note:** In hierarchical chunking, **parent chunks** are returned while **search is performed on child chunks**. As a result, you may see **fewer search results**, since one parent can have multiple child chunks.

### **Best Use Cases**
Hierarchical chunking is best suited for **complex documents** with a nested or hierarchical structure, such as:
- **Technical manuals**
- **Legal documents**
- **Academic papers** with complex formatting and nested tables.


In [8]:
import json
with open("../variables.json", "r") as f:
    variables = json.load(f)

variables

{'accountNumber': '791677101579',
 'regionName': 'us-west-2',
 'collectionArn': 'arn:aws:aoss:us-west-2:791677101579:collection/u99a2f111uq506nobq6l',
 'collectionId': 'u99a2f111uq506nobq6l',
 'vectorIndexName': 'ws-index-',
 'bedrockExecutionRoleArn': 'arn:aws:iam::791677101579:role/advanced-rag-workshop-bedrock_execution_role-us-west-2',
 's3Bucket': '791677101579-us-west-2-advanced-rag-workshop',
 'kbFixedChunk': '2OLAU6UCAW',
 'kbSemanticChunk': 'SCMPE1YU8Y'}

### 1. Create a Knowledge Base

In [9]:
# Import necessary libraries
from retrying import retry
import boto3

# Initialize the Bedrock agent client with the appropriate region
bedrock_agent = boto3.client("bedrock-agent", region_name=variables["regionName"])

# Helper function to create a knowledge base with retry mechanism
@retry(wait_random_min=1000, wait_random_max=2000, stop_max_attempt_number=3)
def create_knowledge_base_func(name, description, chunking_type):
    """
    Creates a knowledge base in Amazon Bedrock with OpenSearch Serverless storage configuration.
    
    Args:
    - name (str): The name of the knowledge base.
    - description (str): A brief description of the knowledge base.
    - chunking_type (str): The type of chunking to be used (e.g., 'fixed', 'hierarchical').
    
    Returns:
    - dict: The knowledge base details returned by the API call.
    """
    # Define the embedding model ARN used by Bedrock for document embedding
    embedding_model_arn = f"arn:aws:bedrock:{variables['regionName']}::foundation-model/amazon.titan-embed-text-v2:0"

    # Define OpenSearch Serverless configuration
    opensearch_serverless_configuration = {
        "collectionArn": variables["collectionArn"],  # ARN of the OpenSearch collection
        "vectorIndexName": variables["vectorIndexName"] + chunking_type,  # Vector index name based on chunking type
        "fieldMapping": {  # Field mapping for the OpenSearch index
            "vectorField": "vector",
            "textField": "text",
            "metadataField": "text-metadata"
        }
    }
    
    # Print the OpenSearch configuration for debugging purposes
    print(opensearch_serverless_configuration)
    
    try:
        # Call the Bedrock agent's create_knowledge_base API to create the knowledge base
        create_kb_response = bedrock_agent.create_knowledge_base(
            name=name,
            description=description,
            roleArn=variables["bedrockExecutionRoleArn"],  # ARN of the execution role
            knowledgeBaseConfiguration={
                "type": "VECTOR",  # Define the knowledge base as a vector knowledge base
                "vectorKnowledgeBaseConfiguration": {
                    "embeddingModelArn": embedding_model_arn  # Define the embedding model ARN
                }
            },
            storageConfiguration={
                "type": "OPENSEARCH_SERVERLESS",  # Using OpenSearch Serverless for storage
                "opensearchServerlessConfiguration": opensearch_serverless_configuration
            }
        )
        
        # Return the knowledge base details from the response
        return create_kb_response["knowledgeBase"]
    
    except Exception as e:
        # Handle exceptions (e.g., API failures) and print the error message
        print(f"Error creating knowledge base: {e}")
        return None

In [10]:
import boto3
import json

# Try to create a knowledge base, but handle the case where it returns None
kb = create_knowledge_base_func(
    name="advanced-rag-workshop-hierarchical-chunking",
    description="Knowledge base using Amazon OpenSearch Service as a vector store",
    chunking_type="hierarchical"
)

# Check if kb is None (meaning creation failed)
if kb is None:
    print("Knowledge Base creation returned None. Checking if it already exists...")
    
    # List all knowledge bases to find the one that already exists
    list_kb_response = bedrock_agent.list_knowledge_bases()
    
    # Look for a knowledge base with the desired name
    for existing_kb in list_kb_response.get('knowledgeBaseSummaries', []):
        if existing_kb['name'] == "advanced-rag-workshop-hierarchical-chunking":
            kb_id = existing_kb['knowledgeBaseId']
            print(f"Found existing knowledge base with ID: {kb_id}")
            
            # Get the details of the existing knowledge base
            get_kb_response = bedrock_agent.get_knowledge_base(knowledgeBaseId=kb_id)
            
            # Read existing variables to preserve other fields
            try:
                # Read existing variables
                with open("../variables.json", "r") as f:
                    existing_variables = json.load(f)
            except (FileNotFoundError, json.JSONDecodeError):
                # If file doesn't exist or is invalid JSON
                existing_variables = {}
            
            # Update only the hierarchical chunking value
            existing_variables["kbHierarchicalChunk"] = kb_id
                            
            # Write back all variables
            with open("../variables.json", "w") as f:
                json.dump(existing_variables, f, indent=4, default=str)
            
            # Print the retrieved knowledge base response
            print(f'OpenSearch Knowledge Response: {json.dumps(get_kb_response, indent=4, default=str)}')
            break        
    else:
        print("Could not find a knowledge base with the specified name.")
else:
    # KB was created successfully, proceed with original flow
    try:
        # Retrieve details of the newly created knowledge base
        get_kb_response = bedrock_agent.get_knowledge_base(knowledgeBaseId=kb['knowledgeBaseId'])

        # Read existing variables to preserve other fields
        try:
            with open("../variables.json", "r") as f:
                variables = json.load(f)
        except (FileNotFoundError, json.JSONDecodeError):
            pass  # Use existing variables dict

        # Update the variables dictionary with the new knowledge base ID
        variables["kbHierarchicalChunk"] = kb['knowledgeBaseId']

        # Save updated variables to a JSON file, handling datetime serialization
        with open("../variables.json", "w") as f:
            json.dump(variables, f, indent=4, default=str)

        # Print the retrieved knowledge base response in a readable format
        print(f'OpenSearch Knowledge Response: {json.dumps(get_kb_response, indent=4, default=str)}')
    except Exception as e:
        print(f"Error processing newly created knowledge base: {e}")

{'collectionArn': 'arn:aws:aoss:us-west-2:791677101579:collection/u99a2f111uq506nobq6l', 'vectorIndexName': 'ws-index-hierarchical', 'fieldMapping': {'vectorField': 'vector', 'textField': 'text', 'metadataField': 'text-metadata'}}
Error creating knowledge base: An error occurred (ConflictException) when calling the CreateKnowledgeBase operation: KnowledgeBase with name advanced-rag-workshop-hierarchical-chunking already exists.
Knowledge Base creation returned None. Checking if it already exists...
Found existing knowledge base with ID: UKZ63LEW5P
OpenSearch Knowledge Response: {
    "ResponseMetadata": {
        "RequestId": "2b61ce5e-bab1-401d-8c76-a320a96c124f",
        "HTTPStatusCode": 200,
        "HTTPHeaders": {
            "date": "Tue, 22 Apr 2025 19:25:49 GMT",
            "content-type": "application/json",
            "content-length": "968",
            "connection": "keep-alive",
            "x-amzn-requestid": "2b61ce5e-bab1-401d-8c76-a320a96c124f",
            "x-amz-a

### 2. Create Datasources for Knowledge Base

In [11]:
import time
import json
import boto3
from botocore.exceptions import ClientError

# First, retrieve the knowledge base ID by listing all KBs and finding the hierarchical one
print("Retrieving knowledge base ID for hierarchical chunking...")
list_kb_response = bedrock_agent.list_knowledge_bases()
kb_id = None

# Look for the hierarchical chunking knowledge base by name
for existing_kb in list_kb_response.get('knowledgeBaseSummaries', []):
    if existing_kb['name'] == "advanced-rag-workshop-hierarchical-chunking":
        kb_id = existing_kb['knowledgeBaseId']
        print(f"Found existing knowledge base with ID: {kb_id}")
        
        # Read existing variables to preserve other fields
        try:
            with open("../variables.json", "r") as f:
                variables = json.load(f)
        except (FileNotFoundError, json.JSONDecodeError):
            pass
            
        # Update variables with the hierarchical KB ID (if needed)
        variables["kbHierarchicalChunk"] = kb_id
        
        # Save updated variables
        with open("../variables.json", "w") as f:
            json.dump(variables, f, indent=4, default=str)
            
        break
else:
    print("Could not find the hierarchical chunking knowledge base.")

# Proceed only if we found the knowledge base ID
if kb_id:
    # Define the chunking strategy for ingestion using a hierarchical approach
    chunking_strategy_configuration = {
        "chunkingStrategy": "HIERARCHICAL",
        "hierarchicalChunkingConfiguration": {
            "levelConfigurations": [
                {"maxTokens": 1500},
                {"maxTokens": 300}
            ],
            "overlapTokens": 60
        }
    }

    # The data source to ingest documents from, with the data prefix
    s3_configuration = {
        "bucketArn": f"arn:aws:s3:::{variables['s3Bucket']}",
        "inclusionPrefixes": ["data"]  # Only include objects with the "data" prefix
    }

    data_source_name = "advanced-rag-example"

    # First, check if a data source with this name already exists in Bedrock
    try:
        # List all data sources for the knowledge base
        list_ds_response = bedrock_agent.list_data_sources(
            knowledgeBaseId=kb_id
        )
        
        # Check if our named data source exists
        existing_ds = None
        for ds in list_ds_response.get('dataSourceSummaries', []):
            if ds['name'] == data_source_name:
                existing_ds = ds
                break
        
        # If it exists, delete it
        if existing_ds:
            print(f"Found existing data source '{data_source_name}'. Deleting it...")
            bedrock_agent.delete_data_source(
                knowledgeBaseId=kb_id,
                dataSourceId=existing_ds["dataSourceId"]
            )
            print("Waiting for data source deletion to complete...")
            time.sleep(10)
            print("Data source deleted successfully.")
            
    except Exception as e:
        print(f"Error while checking or deleting data source: {e}")

    # Now create a new data source
    try:
        print(f"Creating new data source '{data_source_name}' with hierarchical chunking...")
        create_ds_response = bedrock_agent.create_data_source(
            name=data_source_name,
            description="A data source for Advanced RAG workshop",
            knowledgeBaseId=kb_id,
            dataSourceConfiguration={
                "type": "S3",
                "s3Configuration": s3_configuration
            },
            vectorIngestionConfiguration={
                "chunkingConfiguration": chunking_strategy_configuration
            }
        )
        
        # Store the created data source object
        ds_hierarchical_chunk = create_ds_response["dataSource"]
        print(f"Hierarchical chunking data source created successfully.")
        
    except ClientError as e:
        if e.response['Error']['Code'] == 'ConflictException':
            print(f"Data source '{data_source_name}' still exists. Retrieving it...")
            # Get the existing data source
            list_ds_response = bedrock_agent.list_data_sources(
                knowledgeBaseId=kb_id
            )
            for ds in list_ds_response.get('dataSourceSummaries', []):
                if ds['name'] == data_source_name:
                    ds_hierarchical_chunk = ds
                    print(f"Retrieved existing data source: {ds['dataSourceId']}")
                    break
        else:
            raise e

    # Print the data source information
    print(ds_hierarchical_chunk)
else:
    print("Cannot proceed without a valid knowledge base ID.")

Retrieving knowledge base ID for hierarchical chunking...
Found existing knowledge base with ID: UKZ63LEW5P
Found existing data source 'advanced-rag-example'. Deleting it...
Waiting for data source deletion to complete...
Data source deleted successfully.
Creating new data source 'advanced-rag-example' with hierarchical chunking...
Hierarchical chunking data source created successfully.
{'createdAt': datetime.datetime(2025, 4, 22, 19, 26, 0, 745165, tzinfo=tzlocal()), 'dataDeletionPolicy': 'DELETE', 'dataSourceConfiguration': {'s3Configuration': {'bucketArn': 'arn:aws:s3:::791677101579-us-west-2-advanced-rag-workshop', 'inclusionPrefixes': ['data']}, 'type': 'S3'}, 'dataSourceId': 'JLYHIOKNZ5', 'description': 'A data source for Advanced RAG workshop', 'knowledgeBaseId': 'UKZ63LEW5P', 'name': 'advanced-rag-example', 'status': 'AVAILABLE', 'updatedAt': datetime.datetime(2025, 4, 22, 19, 26, 0, 745165, tzinfo=tzlocal()), 'vectorIngestionConfiguration': {'chunkingConfiguration': {'chunking

### 3. Start Ingestion Job for Amazon Bedrock Knowledge base pointing to Amazon OpenSearch

> **Note**: The ingestion process will take approximately 2-3 minutes to complete. During this time, the system is processing your documents by:
> 1. Extracting text from the source files
> 2. Chunking the content according to the defined strategy (Fixed / Semantic / Hierachical / Custom)
> 3. Generating embeddings for each chunk
> 4. Storing the embeddings and associated metadata in the OpenSearch vector database
>
> You'll see status updates as the process progresses. Please wait for the "Ingestion job completed successfully" message before proceeding to the next step.

In [16]:
import time
import json

# Get the knowledge base ID from variables.json
try:
    with open("../variables.json", "r") as f:
        variables = json.load(f)
    kb_id = variables.get("kbHierarchicalChunk")
    
    if not kb_id:
        print("Knowledge base ID not found in variables.json")
except Exception as e:
    print(f"Error loading knowledge base ID: {e}")
    kb_id = None

# Start an ingestion job for the given data source and knowledge base
try:
    # Initiate the ingestion job and capture the response
    start_job_response = bedrock_agent.start_ingestion_job(
        knowledgeBaseId=kb_id,  # Use the retrieved knowledge base ID instead of kb['knowledgeBaseId']
        dataSourceId=ds_hierarchical_chunk["dataSourceId"]
    )
    job = start_job_response["ingestionJob"]
    print(f"Ingestion job started successfully\n")

    # Monitor the ingestion job status until it completes
    while job['status'] != 'COMPLETE':
        print("running...")
        time.sleep(10)
        # Check the status of the ingestion job
        get_job_response = bedrock_agent.get_ingestion_job(
            knowledgeBaseId=kb_id,  # Use the retrieved knowledge base ID here too
            dataSourceId=ds_hierarchical_chunk["dataSourceId"],
            ingestionJobId=job["ingestionJobId"]
        )
        job = get_job_response["ingestionJob"]

    print(f"Job completed successfully\n")

except Exception as e:
    print(f"Couldn't start job.\n")
    print(e)

Ingestion job started successfully

running...
Job completed successfully



### 4. Retrieve

In [17]:
import boto3
import json

# Load the knowledge base ID from variables.json
try:
    with open("../variables.json", "r") as f:
        variables = json.load(f)
    kb_id = variables.get("kbHierarchicalChunk")  # Get the hierarchical kb ID
    
    if not kb_id:
        print("Knowledge base ID not found in variables.json")
except Exception as e:
    print(f"Error loading knowledge base ID: {e}")
    kb_id = None

# Initialize the Bedrock Agent Runtime client
bedrock_agent_runtime = boto3.client("bedrock-agent-runtime", region_name=variables["regionName"])

# Define the query for document retrieval
query = "What were net incomes of Amazon in 2022,ß 2023 and 2024?" 

# Retrieve relevant documents from the knowledge base
relevant_documents_os = bedrock_agent_runtime.retrieve(
    retrievalQuery={
        'text': query  # Search query
    },
    knowledgeBaseId=kb_id,  # Use the retrieved knowledge base ID instead of kb['knowledgeBaseId']
    retrievalConfiguration={
        'vectorSearchConfiguration': {
            'numberOfResults': 3  # Limit to top 3 results
        }
    }
)

# Display the retrieved documents
print(json.dumps([i["content"]["text"] for i in relevant_documents_os["retrievalResults"]], indent=2))

[
  "CONSOLIDATED STATEMENTS OF COMPREHENSIVE INCOME (LOSS)     (in millions) Year Ended December 31,      2022 2023 2024     Net income (loss) $ (2,722) $ 30,425 $ 59,248 Other comprehensive income (loss):     Foreign currency translation adjustments, net of tax of $100, $(55), and $226 (2,586) 1,027 (3,333) Available-for-sale debt securities:     Change in net unrealized gains (losses), net of tax of $159, $(110), and $(2,086) (823) 366 6,339 Less: reclassification adjustment for losses (gains) included in \u201cOther income (expense), net,\u201d net of tax of $0, $(15), and $(2) 298 50 5     Net change (525) 416 6,344 Other, net of tax of $0, $(1), and $1 \u2014 4 (5)     Total other comprehensive income (loss) (3,111) 1,447 3,006 Comprehensive income (loss) $ (5,833) $ 31,872 $ 62,254     See accompanying notes to consolidated financial statements.     38Table of Contents     AMAZON.COM, INC. CONSOLIDATED BALANCE SHEETS     (in millions, except per share data) December 31,      202

> **Note**: After creating the knowledge base, you can explore its details and settings in the Amazon Bedrock console. This gives you a more visual interface to understand how the knowledge base is structured.
> 
> **[➡️ View your Knowledge Bases in the AWS Console](https://us-west-2.console.aws.amazon.com/bedrock/home?region=us-west-2#/knowledge-bases)**
>
> In the console, you can:
> - See all your knowledge bases in one place
> - View ingestion status and statistics
> - Test queries through the built-in chat interface
> - Modify settings and configurations