# Build an application that uses RAG with Amazon Bedrock Knowledge Bases

> *Complete all steps in us-east-1. 
This notebook can be use with the  **`conda_tensorflow2_p310`** kernel in SageMaker Notebooks*

---
Make sure you have requested access for Claude 3 models in Bedrock.

After completing the steps, be sure to perform the clean up steps at the end to avoid unnecessary charges. 
Total cost should be a few $ if everything is terminated afterwards. 
Be sure to delete the SageMaker notebook at the end if no longer needed. 

---

### Install Required Libraries
Install required libraries like boto3, which is the AWS SDK for Python that interacts with Bedrock. And opensearch-py which is the Python client used to interact with OpenSearch.
We can ignore any errors.

In [None]:
%pip install -U opensearch-py==2.3.1
%pip install -U boto3==1.36.3
%pip install -U retrying==1.3.4

In [None]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

### Create the S3 Bucket
Import the required libraries, and set our variables.
Create an S3 bucket to store our custom data - be sure to change the name of the S3 bucket.

In [None]:
import json
import os
import boto3
from botocore.exceptions import ClientError
import pprint
from utility import create_bedrock_execution_role, create_oss_policy_attach_bedrock_execution_role, create_policies_in_oss, interactive_sleep
import random
from retrying import retry
import time
import warnings
warnings.filterwarnings('ignore')
suffix = random.randrange(200, 900)

sts_client = boto3.client('sts')
boto3_session = boto3.session.Session()
region_name = boto3_session.region_name
bedrock_agent_client = boto3_session.client('bedrock-agent', region_name=region_name)
service = 'aoss'
s3_client = boto3.client('s3')
s3_suffix = f"{region_name}-faye-123"  # replace with your name and some random numbers.
bucket_name = f'bedrock-kb-{s3_suffix}' # replace it with your bucket name.
pp = pprint.PrettyPrinter(indent=2)

In [None]:
# Create S3 bucket for the knowledge base data source 
try:
    s3_client.head_bucket(Bucket=bucket_name)
    print(f'Bucket {bucket_name} Exists')
except ClientError as e:
    print(f'Bucket name:{bucket_name}')
    s3bucket = s3_client.create_bucket(
        Bucket=bucket_name,
    )

### Upload Data to the S3 Bucket
Upload the Human Resources Policy.docx file to the the S3 bucket. You'll find it here:
https://github.com/pluralsight-cloud/amazon-bedrock-building-genai-applications/tree/main/RAG-Demo

### Create the OSS Collection
Create the OpenSearch Serverless collection which is a container for OpenSearch vector indexes.

In [None]:
vector_store_name = f'bedrock-sample-rag-{suffix}'
index_name = f"bedrock-sample-rag-index-{suffix}"
aoss_client = boto3_session.client('opensearchserverless')
bedrock_kb_execution_role = create_bedrock_execution_role(bucket_name=bucket_name)
bedrock_kb_execution_role_arn = bedrock_kb_execution_role['Role']['Arn']

In [None]:
# create security, network and data access policies for OSS
encryption_policy, network_policy, access_policy = create_policies_in_oss(vector_store_name=vector_store_name,
                       aoss_client=aoss_client,
                       bedrock_kb_execution_role_arn=bedrock_kb_execution_role_arn)
collection = aoss_client.create_collection(name=vector_store_name,type='VECTORSEARCH')

In [None]:
# Get the OpenSearch serverless collection URL
collection_id = collection['createCollectionDetail']['id']
host = collection_id + '.' + region_name + '.aoss.amazonaws.com'
print(host)

In [None]:
# Wait for collection creation to complete
# This can take a few minutes
response = aoss_client.batch_get_collection(names=[vector_store_name])
# Periodically check collection status
while (response['collectionDetails'][0]['status']) == 'CREATING':
    print('Creating collection...')
    interactive_sleep(30)
    response = aoss_client.batch_get_collection(names=[vector_store_name])
print('\nCollection successfully created:')
pp.pprint(response["collectionDetails"])

In [None]:
# create opensearch serverless access policy and attach it to Bedrock execution role
try:
    create_oss_policy_attach_bedrock_execution_role(collection_id=collection_id,
                                                    bedrock_kb_execution_role=bedrock_kb_execution_role)
    # It can take up to a minute for data access rules to be enforced
    interactive_sleep(60)
except Exception as e:
    print("Policy already exists")
    pp.pprint(e)

### Create the OSS Vector Index
This will contain the vector embeddings, or numerical representations of our data. So that the Bedrock can make sense of our data, and understand the meaning it contains.

In [None]:
# Configure the OpenSearch Serverless vector index
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth, RequestError
credentials = boto3.Session().get_credentials()
awsauth = auth = AWSV4SignerAuth(credentials, region_name, service)

index_name = f"bedrock-sample-index-{suffix}"
body_json = {
   "settings": {
      "index.knn": "true",
       "number_of_shards": 1,
       "knn.algo_param.ef_search": 512,
       "number_of_replicas": 0,
   },
   "mappings": {
      "properties": {
         "vector": {
            "type": "knn_vector",
            "dimension": 1536,
             "method": {
                 "name": "hnsw",
                 "engine": "faiss",
                 "space_type": "l2"
             },
         },
         "text": {
            "type": "text"
         },
         "text-metadata": {
            "type": "text"         }
      }
   }
}

# Build the OpenSearch client
oss_client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)

In [None]:
# Create the vector index
try:
    response = oss_client.indices.create(index=index_name, body=json.dumps(body_json))
    print('\nCreating index:')
    pp.pprint(response)

    # index creation can take up to a minute
    interactive_sleep(60)
except RequestError as e:
    print(f'Error while trying to create the index, with error {e.error}\n delete, and recreate the index')

### Create The Knowledge Base and Ingest Data 
After uploading your data to the private S3 bucket, create the Knowledge Base.
Configure the Bedrock Knowledge Base using the OpenSearch Serverless vector index. 
Data source will be our S3 bucket.

In [None]:
# Defining the OSS configuration
opensearchServerlessConfiguration = {
            "collectionArn": collection["createCollectionDetail"]['arn'],
            "vectorIndexName": index_name,
            "fieldMapping": {
                "vectorField": "vector",
                "textField": "text",
                "metadataField": "text-metadata"
            }
        }

# Defining the ingestion strategy to ingest data from the data source
chunkingStrategyConfiguration = {
    "chunkingStrategy": "FIXED_SIZE",
    "fixedSizeChunkingConfiguration": {
        "maxTokens": 512,
        "overlapPercentage": 20
    }
}

# Defining the data source to ingest documents from, into the OpenSearch serverless knowledge base index
s3Configuration = {
    "bucketArn": f"arn:aws:s3:::{bucket_name}",
    
}

# Defining the embedding model used by Bedrock to embed ingested documents, and real time prompts
embeddingModelArn = f"arn:aws:bedrock:{region_name}::foundation-model/amazon.titan-embed-text-v1"

name = f"bedrock-sample-knowledge-base-{suffix}"
description = "My custom data knowledge base."
roleArn = bedrock_kb_execution_role_arn

In [None]:
# Defining the function that will create the Knowledge Base
from retrying import retry

@retry(wait_random_min=1000, wait_random_max=2000,stop_max_attempt_number=7)
def create_knowledge_base_func():
    create_kb_response = bedrock_agent_client.create_knowledge_base(
        name = name,
        description = description,
        roleArn = roleArn,
        knowledgeBaseConfiguration = {
            "type": "VECTOR",
            "vectorKnowledgeBaseConfiguration": {
                "embeddingModelArn": embeddingModelArn
            }
        },
        storageConfiguration = {
            "type": "OPENSEARCH_SERVERLESS",
            "opensearchServerlessConfiguration":opensearchServerlessConfiguration
        }
    )
    return create_kb_response["knowledgeBase"]

In [None]:
# Creating the Knowledge Base
try:
    kb = create_knowledge_base_func()
except Exception as err:
    print(f"{err=}, {type(err)=}")

In [None]:
# Create a DataSource in Knowledge Base 
create_ds_response = bedrock_agent_client.create_data_source(
    name = name,
    description = description,
    knowledgeBaseId = kb['knowledgeBaseId'],
    dataSourceConfiguration = {
        "type": "S3",
        "s3Configuration":s3Configuration
    },
    vectorIngestionConfiguration = {
        "chunkingConfiguration": chunkingStrategyConfiguration
    }
)
ds = create_ds_response["dataSource"]
pp.pprint(ds)

In [None]:
# Start ingestion job to ingest data from the S3 bucket to the Knowledge Base
start_job_response = bedrock_agent_client.start_ingestion_job(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

In [None]:
# Print the details of the ingestion job
job = start_job_response["ingestionJob"]
pp.pprint(job)

In [None]:
# Monitor the progress of the ingestion job 
while(job['status']!='COMPLETE' ):
    get_job_response = bedrock_agent_client.get_ingestion_job(
      knowledgeBaseId = kb['knowledgeBaseId'],
        dataSourceId = ds["dataSourceId"],
        ingestionJobId = job["ingestionJobId"]
  )
    job = get_job_response["ingestionJob"]
pp.pprint(job)
interactive_sleep(40)

In [None]:
# Print the knowledge base Id to use for invocation later
kb_id = kb["knowledgeBaseId"]
pp.pprint(kb_id)
# store the kb_id to use later in the invoke request
%store kb_id

### Test the knowledge Base Using RetrieveAndGenerate API
RetrieveAndGenerate is a Bedrock API call that queries a knowledge base and generates a response based on the retrieved data and using the specified model.

In [None]:
# Define the Bedrock agent runtime client, used to interact with Bedrock
bedrock_agent_runtime_client = boto3.client("bedrock-agent-runtime", region_name=region_name)
# Define the model IDs - we'll compare how two different Claude models behave
claude_model_ids = [ ["Claude 3 Sonnet", "anthropic.claude-3-sonnet-20240229-v1:0"], ["Claude Instant", "anthropic.claude-instant-v1"]]

In [None]:
# Define our query function, using the RetrieveAndGenerate API call
def ask_bedrock_llm_with_knowledge_base(query: str, model_arn: str, kb_id: str) -> str:
    response = bedrock_agent_runtime_client.retrieve_and_generate(
        input={
            'text': query
        },
        retrieveAndGenerateConfiguration={
            'type': 'KNOWLEDGE_BASE',
            'knowledgeBaseConfiguration': {
                'knowledgeBaseId': kb_id,
                'modelArn': model_arn
            }
        },
    )

    return response

In [None]:
# Run a query - the model should be able to find an answer in the knowledge base
query = "What is the parental leave policy at Bob's Pizza?"

for model_id in claude_model_ids:
    model_arn = f'arn:aws:bedrock:{region_name}::foundation-model/{model_id[1]}'
    response = ask_bedrock_llm_with_knowledge_base(query, model_arn, kb_id)
    generated_text = response['output']['text']
    citations = response["citations"]
    contexts = []
    for citation in citations:
        retrievedReferences = citation["retrievedReferences"]
        for reference in retrievedReferences:
            contexts.append(reference["content"]["text"])
    print(f"---------- Generated using {model_id[0]}:")
    pp.pprint(generated_text )
    print(f'---------- The citations for the response generated by {model_id[0]}:')
    pp.pprint(contexts)
    print()

In [None]:
# Test using a query it should not be able to answer. The model should return that it does not know.
# You can try alternative queries to check for hallucinations
query = "What is the sick leave policy at Bob's Pizza?"

for model_id in claude_model_ids:
    model_arn = f'arn:aws:bedrock:{region_name}::foundation-model/{model_id[1]}'
    response = ask_bedrock_llm_with_knowledge_base(query, model_arn, kb_id)
    generated_text = response['output']['text']
    citations = response["citations"]
    contexts = []
    for citation in citations:
        retrievedReferences = citation["retrievedReferences"]
        for reference in retrievedReferences:
            contexts.append(reference["content"]["text"])
    print(f"---------- Generated using {model_id[0]}:")
    pp.pprint(generated_text )
    print(f'---------- The citations for the response generated by {model_id[0]}:')
    pp.pprint(contexts)
    print()

### Run The Following To Clean Up Knowledge Base, OSS, S3 and IAM

In [None]:
# clean up Bedrock client
bedrock_agent_client = boto3_session.client('bedrock-agent', region_name=region_name)

In [None]:
# clean up Knowledge Base
bedrock_agent_client.delete_data_source(dataSourceId = ds["dataSourceId"], knowledgeBaseId=kb['knowledgeBaseId'])
bedrock_agent_client.delete_knowledge_base(knowledgeBaseId=kb['knowledgeBaseId'])
oss_client.indices.delete(index=index_name)
aoss_client.delete_collection(id=collection_id)
aoss_client.delete_access_policy(type="data", name=access_policy['accessPolicyDetail']['name'])
aoss_client.delete_security_policy(type="network", name=network_policy['securityPolicyDetail']['name'])
aoss_client.delete_security_policy(type="encryption", name=encryption_policy['securityPolicyDetail']['name'])

In [None]:
# clean up IAM
from utility import delete_iam_role_and_policies
delete_iam_role_and_policies()

In [None]:
# clean up S3
objects = s3_client.list_objects(Bucket=bucket_name)
if 'Contents' in objects:
    for obj in objects['Contents']:
        s3_client.delete_object(Bucket=bucket_name, Key=obj['Key'])
s3_client.delete_bucket(Bucket=bucket_name)

### If you no longer need the SageMaker notebook, remember to delete it!