## Create a Knowledge Base with Custom chunking strategy

#### Custom Chunking Logic with Lambda Functions in Amazon Bedrock

When creating a Knowledge Base (KB) for Amazon Bedrock, you can connect a Lambda function to specify your custom chunking logic. During the ingestion process, if a Lambda function is provided, the Knowledge Base will execute the Lambda function and store the input and output values in the specified intermediate S3 bucket.

#### Use Cases for Lambda Functions in KBs

- **Custom Chunking Logic:** Lambda functions can be used to implement custom logic for chunking documents during ingestion, enabling more control over how documents are divided into meaningful chunks.
- **Chunk-level Metadata Processing:** Lambda functions can also process chunked data, for example, by adding custom metadata at the chunk level, enriching the data for more advanced retrieval or analysis.

This allows for more flexibility and tailored handling of document data within the Knowledge Base, making it possible to apply unique chunking strategies and augment the data with specific metadata for improved search and retrieval.


In [None]:
import json
with open("variables.json", "r") as f:
    variables = json.load(f)

variables

### 0. Create a Lambda function with custom chunking logic

In [None]:
# Import necessary libraries for handling files, time, and AWS resources
from io import BytesIO
import zipfile
import boto3
import time

# Create IAM client to interact with AWS IAM service
iam = boto3.client("iam", region_name=variables["regionName"])

# Define the IAM assume role policy for the Lambda function
assume_role_policy_document = {
    "Version": "2012-10-17",  # Policy version
    "Statement": [
        {
            "Effect": "Allow",  # Allow the action
            "Principal": {
                "Service": "lambda.amazonaws.com"  # Principal entity, Lambda service
            },
            "Action": "sts:AssumeRole"  # Action Lambda is allowed to perform
        }
    ]
}

# Convert the IAM assume role policy into JSON format
assume_role_policy_document_json = json.dumps(assume_role_policy_document)

# Create the IAM role for the Lambda function with the assume role policy
lambda_iam_role = iam.create_role(
    RoleName=f"advanced-rag-custom-chunk-{variables['regionName']}-role",  # Define a unique role name based on region
    AssumeRolePolicyDocument=assume_role_policy_document_json  # Attach the policy to the role
)

# Attach an S3 access policy to the newly created IAM role
iam.put_role_policy(
    RoleName=lambda_iam_role["Role"]["RoleName"],  # Specify the role name
    PolicyName="s3policy",  # Name of the policy to attach
    PolicyDocument=json.dumps(  # Define the policy document granting access to specific S3 resources
        {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",  # Allow these actions
                    "Action": [
                        "s3:GetObject",  # Allow getting objects from S3
                        "s3:ListBucket",  # Allow listing contents of the S3 bucket
                        "s3:PutObject"  # Allow putting objects into S3
                    ],
                    "Resource": [
                        f"arn:aws:s3:::{variables['s3Bucket']}-custom-chunk",  # S3 bucket ARN
                        f"arn:aws:s3:::{variables['s3Bucket']}-custom-chunk/*"  # S3 bucket objects ARN
                    ],
                    "Condition": {
                        "StringEquals": {
                            "aws:ResourceAccount": f"{variables['accountNumber']}"  # Restrict access to the specific AWS account
                        }
                    }
                }
            ]
        }
    )
)

# Prepare the Lambda function code by creating a ZIP file
s = BytesIO()  # Create an in-memory binary stream for the zip file
z = zipfile.ZipFile(s, 'w')  # Create a ZipFile object for writing
z.write("lambda_function.py")  # Add the lambda function code to the zip file
z.close()  # Close the ZipFile object
zip_content = s.getvalue()  # Get the content of the zip file

# Sleep for 10 seconds to ensure resources are available
time.sleep(10)

# Create the Lambda function using the IAM role, timeout settings, and the ZIP content
lambda_function = boto3.client("lambda",
                               region_name=variables["regionName"]).create_function(
    FunctionName="advanced-rag-custom-chunk",  # Define the Lambda function name
    Runtime='python3.12',  # Specify the runtime environment (Python 3.12)
    Timeout=60,  # Set the timeout for the Lambda function (in seconds)
    Role=lambda_iam_role['Role']['Arn'],  # IAM role ARN for Lambda function execution
    Code={'ZipFile': zip_content},  # Provide the zipped Lambda function code
    Handler='lambda_function.lambda_handler'  # Define the handler function in the code
)

In [None]:
# Create an S3 client to interact with the AWS S3 service in the specified region
s3 = boto3.client("s3", region_name=variables["regionName"])

try:
    # Check if the bucket already exists by sending a HEAD request to S3
    s3.head_bucket(Bucket=variables["s3Bucket"]+"-custom-chunk")
    # If the bucket exists, print a message
    print(f"Bucket '{variables['s3Bucket']}' already exists.")
except:
    # If the bucket does not exist, create a new one
    s3.create_bucket(Bucket=variables["s3Bucket"]+"-custom-chunk", CreateBucketConfiguration={
        'LocationConstraint': variables["regionName"]})  # Specify the region for the new bucket
    # Print a message indicating the bucket has been created
    print(f"Bucket '{variables['s3Bucket']}-custom-chunk' created.")

### 1. Create a Knowledge Base

In [None]:
# Helper function definition
from retrying import retry  # Import retrying module to add retry logic
import boto3  # Import boto3 for AWS SDK to interact with AWS services

# Create a Bedrock agent client to interact with Amazon Bedrock service
bedrock_agent = boto3.client("bedrock-agent", region_name=variables["regionName"])

# Retry logic added to the function, which will retry the function 3 times with a random wait time between 1-2 seconds
@retry(wait_random_min=1000, wait_random_max=2000, stop_max_attempt_number=3)
def create_knowledge_base_func(name, description, chunking_type):
    # Define the embedding model ARN that will be used by Bedrock for embedding ingested documents
    embedding_model_arn = f"arn:aws:bedrock:{variables['regionName']}::foundation-model/amazon.titan-embed-text-v2:0"
    
    # Define OpenSearch Serverless configuration that includes the collection and vector index names
    opensearch_serverless_configuration = {
            "collectionArn": variables["collectionArn"],  # ARN of the OpenSearch collection
            "vectorIndexName": variables["vectorIndexName"] + chunking_type,  # Name of the vector index
            "fieldMapping": {
                "vectorField": "vector",  # Field name for the vector
                "textField": "text",      # Field name for the text
                "metadataField": "text-metadata"  # Field name for the metadata
            }
        }
    
    # Print the OpenSearch configuration for debugging purposes
    print(opensearch_serverless_configuration)
    
    # Call the Bedrock API to create the knowledge base with the specified configurations
    create_kb_response = bedrock_agent.create_knowledge_base(
        name=name,  # Name of the knowledge base
        description=description,  # Description of the knowledge base
        roleArn=variables["bedrockExecutionRoleArn"],  # ARN of the IAM role that Bedrock will use for execution
        knowledgeBaseConfiguration={
            "type": "VECTOR",  # Type of the knowledge base (VECTOR in this case)
            "vectorKnowledgeBaseConfiguration": {
                "embeddingModelArn": embedding_model_arn  # ARN of the embedding model used for the knowledge base
            }
        },
        storageConfiguration={
            "type": "OPENSEARCH_SERVERLESS",  # Type of the storage (using OpenSearch Serverless)
            "opensearchServerlessConfiguration": opensearch_serverless_configuration  # OpenSearch configuration
        }
    )
    
    # Return the created knowledge base details
    return create_kb_response["knowledgeBase"]

In [None]:
import boto3  # Import boto3 to interact with AWS services

# Create a knowledge base using the previously defined function
kb = create_knowledge_base_func(
    name="advanced-rag-workshop-custom-chunking",  # Name of the knowledge base
    description="Knowledge base using Amazon OpenSearch Service as a vector store",  # Description of the knowledge base
    chunking_type="custom"  # Custom chunking type for the knowledge base
)

# Get the details of the created knowledge base using the knowledgeBaseId
get_kb_response = bedrock_agent.get_knowledge_base(knowledgeBaseId=kb['knowledgeBaseId'])

# Write the variables and knowledge base ID to a JSON file for future use
with open("variables.json", "w") as f:
    # Save the variables along with the new knowledge base ID to the file
    json.dump({**variables, "kbCustomChunk": kb['knowledgeBaseId']}, f)

# Print the response from OpenSearch for debugging or logging purposes
print(f'OpenSearch Knowledge Response: {get_kb_response}')

### 2. Create Datasources for Knowledge Base

In [None]:
import time  # Import time to manage sleep intervals between actions

# Ingest strategy: Defines how to ingest data from the data source into the OpenSearch knowledge base
customTransformationConfiguration = {
    "intermediateStorage": {
        # Specifies the S3 location where intermediate data will be stored
        "s3Location": {
            "uri": f"s3://{variables['s3Bucket']}-custom-chunk/"  # Using the custom chunk bucket in S3
        }
    },
    "transformations": [
        {
            "transformationFunction": {
                # Specifies the Lambda function to apply for data transformation
                "transformationLambdaConfiguration": {
                    "lambdaArn": f"arn:aws:lambda:{variables['regionName']}:{variables['accountNumber']}:function:advanced-rag-custom-chunk"  # ARN of the Lambda function
                }
            },
            "stepToApply": "POST_CHUNKING"  # Defines when to apply the transformation function (after chunking)
        }
    ]
}

# Defines the S3 data source configuration for ingesting documents into OpenSearch
s3Configuration = {
    "bucketArn": f"arn:aws:s3:::{variables['s3Bucket']}",  # ARN for the S3 bucket
    # "inclusionPrefixes": ["shareholder_letters"]  # Optional: specify a prefix for the S3 objects to ingest
}

# Check if 'ds_custom_chunk' is already defined and delete if necessary
if 'ds_custom_chunk' in locals():
    try:
        # If the data source already exists, delete it to avoid conflicts
        bedrock_agent.delete_data_source(
            knowledgeBaseId = kb['knowledgeBaseId'],  # Knowledge base ID
            dataSourceId = ds_custom_chunk["dataSourceId"],  # Data source ID to delete
        )
        time.sleep(15)  # Wait for 15 seconds before proceeding
    except Exception as e:
        print(e)  # Print the error if any
        pass  # Continue without any action if there is an exception

# Create a new data source for the knowledge base
create_ds_response = bedrock_agent.create_data_source(
    name = f'advanced-rag-example',  # Name of the new data source
    description = "A data source for Advanced RAG workshop",  # Description of the data source
    knowledgeBaseId = kb['knowledgeBaseId'],  # Knowledge base ID where the data source is added
    dataSourceConfiguration = {
        "type": "S3",  # Specifies the data source type (S3 in this case)
        "s3Configuration": s3Configuration  # S3 configuration defined earlier
    },
    vectorIngestionConfiguration = {
        # Defines how vector ingestion is handled for this data source
        "chunkingConfiguration": {"chunkingStrategy": "NONE"},  # No chunking strategy (all content as a single chunk)
        "customTransformationConfiguration": customTransformationConfiguration  # Custom transformation configuration
    }
)

# Store the created data source information
ds_custom_chunk = create_ds_response["dataSource"]

# Return the created data source information
ds_custom_chunk

### 3. Start Ingestion Job for Amazon Bedrock Knowledge base pointing to Amazon OpenSearch

> **Note**: The ingestion process will take approximately 2-3 minutes to complete. During this time, the system is processing your documents by:
> 1. Extracting text from the source files
> 2. Chunking the content according to the defined strategy (Fixed / Semantic / Hierachical / Custom)
> 3. Generating embeddings for each chunk
> 4. Storing the embeddings and associated metadata in the OpenSearch vector database
>
> You'll see status updates as the process progresses. Please wait for the "Ingestion job completed successfully" message before proceeding to the next step.

In [None]:
import time  # Import time to manage delays during job status checking

ingest_jobs = []  # Initialize a list to track ingestion jobs

# Start an ingestion job
try:
    # Start the ingestion job for the data source into the knowledge base
    start_job_response = bedrock_agent.start_ingestion_job(
        knowledgeBaseId = kb['knowledgeBaseId'],  # Knowledge base ID
        dataSourceId = ds_custom_chunk["dataSourceId"]  # Data source ID for ingestion
    )
    
    # Get the job details from the response
    job = start_job_response["ingestionJob"]
    print(f"ingestion job started successfully\n")  # Print message indicating job start

    # Check the status of the job until it completes
    while(job['status'] != 'COMPLETE'):  # Continue checking until job status is 'COMPLETE'
        # Retrieve the current status of the ingestion job
        get_job_response = bedrock_agent.get_ingestion_job(
            knowledgeBaseId = kb['knowledgeBaseId'],  # Knowledge base ID
            dataSourceId = ds_custom_chunk["dataSourceId"],  # Data source ID
            ingestionJobId = job["ingestionJobId"]  # Job ID to track
        )
        
        # Update the job status from the response
        job = get_job_response["ingestionJob"]

    time.sleep(15)  # Wait for 15 seconds before proceeding to ensure job completion
    print(f"job completed successfully\n")  # Print message indicating successful job completion

except Exception as e:
    # If there is any error, print an error message
    print(f"Couldn't start job.\n")
    print(e)  # Print the exception error message


### 4. Retrieve

In [None]:
import boto3

# Initialize the Bedrock agent runtime client
bedrock_agent_runtime = boto3.client("bedrock-agent-runtime", region_name=variables["regionName"])

# Query for relevant documents
query = "What are three hree sub-tasks in question answering over knowledge bases?"  # Define the query to be searched

# Retrieve relevant documents based on the query from the knowledge base
relevant_documents_os = bedrock_agent_runtime.retrieve(
    retrievalQuery={
        'text': query  # Specify the query text to search for relevant documents
    },
    knowledgeBaseId=kb['knowledgeBaseId'],  # Provide the knowledge base ID to search in
    retrievalConfiguration={
        'vectorSearchConfiguration': {
            'numberOfResults': 3  # Limit the results to top 3 documents closely matching the query
        }
    }
)

# Return the relevant documents fetched
relevant_documents_os

> **Note**: After creating the knowledge base, you can explore its details and settings in the Amazon Bedrock console. This gives you a more visual interface to understand how the knowledge base is structured.
> 
> **[➡️ View your Knowledge Bases in the AWS Console](https://us-west-2.console.aws.amazon.com/bedrock/home?region=us-west-2#/knowledge-bases)**
>
> In the console, you can:
> - See all your knowledge bases in one place
> - View ingestion status and statistics
> - Test queries through the built-in chat interface
> - Modify settings and configurations