In [1]:
# %pip install -U opensearch-py==2.3.1
# %pip install -U boto3==1.34.87 
# %pip install -U retrying==1.3.4

In [2]:
import json
import os
import boto3
import pprint
from utility import create_bedrock_execution_role, create_oss_policy_attach_bedrock_execution_role, create_policies_in_oss
import random
from retrying import retry
suffix = random.randrange(200, 1900)


boto3_session = boto3.session.Session()
region_name = boto3_session.region_name
bedrock_agent_client = boto3_session.client('bedrock-agent', region_name=region_name)
service = 'aoss'
bucket_name = "test-kb1" # replace it with your bucket name.
pp = pprint.PrettyPrinter(indent=2)

In [3]:
import boto3
import time
vector_store_name = f'bedrock-sample-rag-{suffix}'
index_name = f"bedrock-sample-rag-index-{suffix}"
aoss_client = boto3_session.client('opensearchserverless')
bedrock_kb_execution_role = create_bedrock_execution_role(bucket_name=bucket_name)
bedrock_kb_execution_role_arn = bedrock_kb_execution_role['Role']['Arn']

In [4]:
# create security, network and data access policies within OSS
encryption_policy, network_policy, access_policy = create_policies_in_oss(vector_store_name=vector_store_name,
                       aoss_client=aoss_client,
                       bedrock_kb_execution_role_arn=bedrock_kb_execution_role_arn)
collection = aoss_client.create_collection(name=vector_store_name,type='VECTORSEARCH')

In [5]:
pp.pprint(collection)
time.sleep(10)

{ 'ResponseMetadata': { 'HTTPHeaders': { 'connection': 'keep-alive',
                                         'content-length': '314',
                                         'content-type': 'application/x-amz-json-1.0',
                                         'date': 'Fri, 19 Apr 2024 01:48:29 '
                                                 'GMT',
                                         'x-amzn-requestid': '86842a68-b4ec-4efd-abaf-0e32a2bf9fec'},
                        'HTTPStatusCode': 200,
                        'RequestId': '86842a68-b4ec-4efd-abaf-0e32a2bf9fec',
                        'RetryAttempts': 0},
  'createCollectionDetail': { 'arn': 'arn:aws:aoss:us-east-1:730335477319:collection/d6lhkzcpc5z2qh1xevoe',
                              'createdDate': 1713491309261,
                              'id': 'd6lhkzcpc5z2qh1xevoe',
                              'kmsKeyArn': 'auto',
                              'lastModifiedDate': 1713491309261,
                             

In [6]:
collection_id = collection['createCollectionDetail']['id']
host = collection_id + '.' + region_name + '.aoss.amazonaws.com'
print(host)

d6lhkzcpc5z2qh1xevoe.us-east-1.aoss.amazonaws.com


In [7]:
# create oss policy and attach it to Bedrock execution role
create_oss_policy_attach_bedrock_execution_role(collection_id=collection_id,
                                                bedrock_kb_execution_role=bedrock_kb_execution_role)

Opensearch serverless arn:  arn:aws:iam::730335477319:policy/AmazonBedrockOSSPolicyForKnowledgeBase_650


In [8]:
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
credentials = boto3.Session().get_credentials()
awsauth = auth = AWSV4SignerAuth(credentials, region_name, service)

index_name = f"bedrock-sample-index-{suffix}"
body_json = {
   "settings": {
      "index.knn": "true"
   },
   "mappings": {
      "properties": {
         "vector": {
            "type": "knn_vector",
            "dimension": 1536,
             "method": {
                 "name": "hnsw",
                 "engine": "faiss",
                 "space_type": "innerproduct",
                 "parameters": {
                     "ef_construction": 512,
                     "m": 16
                 },
             },
         },
         "text": {
            "type": "text"
         },
         "text-metadata": {
            "type": "text"         }
      }
   }
}
# Build the OpenSearch client
oss_client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)
# # It can take up to a minute for data access rules to be enforced
time.sleep(60)

In [9]:
# Create index
response = oss_client.indices.create(index=index_name, body=json.dumps(body_json))
print('\nCreating index:')
print(response)


Creating index:
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'bedrock-sample-index-497'}


In [10]:
opensearchServerlessConfiguration = {
            "collectionArn": collection["createCollectionDetail"]['arn'],
            "vectorIndexName": index_name,
            "fieldMapping": {
                "vectorField": "vector",
                "textField": "text",
                "metadataField": "text-metadata"
            }
        }

chunkingStrategyConfiguration = {
    "chunkingStrategy": "FIXED_SIZE",
    "fixedSizeChunkingConfiguration": {
        "maxTokens": 512,
        "overlapPercentage": 20
    }
}

s3Configuration = {
    "bucketArn": f"arn:aws:s3:::{bucket_name}",
    # "inclusionPrefixes":["*.*"] # you can use this if you want to create a KB using data within s3 prefixes.
}

embeddingModelArn = f"arn:aws:bedrock:{region_name}::foundation-model/amazon.titan-embed-text-v1"

name = f"bedrock-sample-knowledge-base-{suffix}"
description = "Multi file Knowledge base."
roleArn = bedrock_kb_execution_role_arn

In [11]:
# Attched bedrock full access to the role

# Create an IAM client
iam = boto3.client('iam')
# Get the role ARN
role_name = roleArn.split('/')[-1]
# Get the policy ARN
policy_arn = 'arn:aws:iam::aws:policy/AmazonBedrockFullAccess'
# Attach the policy to the role
iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)

{'ResponseMetadata': {'RequestId': '95c3fca4-6031-4e86-b8e0-b5d68f8c61ba',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Fri, 19 Apr 2024 01:49:40 GMT',
   'x-amzn-requestid': '95c3fca4-6031-4e86-b8e0-b5d68f8c61ba',
   'content-type': 'text/xml',
   'content-length': '212'},
  'RetryAttempts': 0}}

In [12]:
# Create a KnowledgeBase
from retrying import retry

@retry(wait_random_min=1000, wait_random_max=2000,stop_max_attempt_number=7)
def create_knowledge_base_func():
    create_kb_response = bedrock_agent_client.create_knowledge_base(
        name = name,
        description = description,
        roleArn = roleArn,
        knowledgeBaseConfiguration = {
            "type": "VECTOR",
            "vectorKnowledgeBaseConfiguration": {
                "embeddingModelArn": embeddingModelArn
            }
        },
        storageConfiguration = {
            "type": "OPENSEARCH_SERVERLESS",
            "opensearchServerlessConfiguration":opensearchServerlessConfiguration
        }
    )
    return create_kb_response["knowledgeBase"]

In [13]:
try:
    kb = create_knowledge_base_func()
except Exception as err:
    print(f"{err=}, {type(err)=}")

In [14]:
pp.pprint(kb)

{ 'createdAt': datetime.datetime(2024, 4, 19, 1, 49, 42, 542095, tzinfo=tzlocal()),
  'description': 'Multi file Knowledge base.',
  'knowledgeBaseArn': 'arn:aws:bedrock:us-east-1:730335477319:knowledge-base/K6W4UFQTNU',
  'knowledgeBaseConfiguration': { 'type': 'VECTOR',
                                  'vectorKnowledgeBaseConfiguration': { 'embeddingModelArn': 'arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1'}},
  'knowledgeBaseId': 'K6W4UFQTNU',
  'name': 'bedrock-sample-knowledge-base-497',
  'roleArn': 'arn:aws:iam::730335477319:role/AmazonBedrockExecutionRoleForKnowledgeBase_650',
  'status': 'CREATING',
  'storageConfiguration': { 'opensearchServerlessConfiguration': { 'collectionArn': 'arn:aws:aoss:us-east-1:730335477319:collection/d6lhkzcpc5z2qh1xevoe',
                                                                   'fieldMapping': { 'metadataField': 'text-metadata',
                                                                                    

In [15]:
# Get KnowledgeBase 
get_kb_response = bedrock_agent_client.get_knowledge_base(knowledgeBaseId = kb['knowledgeBaseId'])

In [16]:
# Create a DataSource in KnowledgeBase 
create_ds_response = bedrock_agent_client.create_data_source(
    name = name,
    description = description,
    knowledgeBaseId = kb['knowledgeBaseId'],
    dataSourceConfiguration = {
        "type": "S3",
        "s3Configuration":s3Configuration
    },
    vectorIngestionConfiguration = {
        "chunkingConfiguration": chunkingStrategyConfiguration
    }
)
ds = create_ds_response["dataSource"]
pp.pprint(ds)

{ 'createdAt': datetime.datetime(2024, 4, 19, 1, 49, 49, 288557, tzinfo=tzlocal()),
  'dataSourceConfiguration': { 's3Configuration': { 'bucketArn': 'arn:aws:s3:::test-kb1'},
                               'type': 'S3'},
  'dataSourceId': '0T3LXJI3U2',
  'description': 'Multi file Knowledge base.',
  'knowledgeBaseId': 'K6W4UFQTNU',
  'name': 'bedrock-sample-knowledge-base-497',
  'status': 'AVAILABLE',
  'updatedAt': datetime.datetime(2024, 4, 19, 1, 49, 49, 288557, tzinfo=tzlocal()),
  'vectorIngestionConfiguration': { 'chunkingConfiguration': { 'chunkingStrategy': 'FIXED_SIZE',
                                                               'fixedSizeChunkingConfiguration': { 'maxTokens': 512,
                                                                                                   'overlapPercentage': 20}}}}


In [17]:
# Get DataSource 
bedrock_agent_client.get_data_source(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])
time.sleep(30)

In [18]:
### Start ingestion job
start_job_response = bedrock_agent_client.start_ingestion_job(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

In [19]:
job = start_job_response["ingestionJob"]
pp.pprint(job)

{ 'dataSourceId': '0T3LXJI3U2',
  'ingestionJobId': 'LL1YQ0TLCA',
  'knowledgeBaseId': 'K6W4UFQTNU',
  'startedAt': datetime.datetime(2024, 4, 19, 1, 50, 19, 966735, tzinfo=tzlocal()),
  'statistics': { 'numberOfDocumentsDeleted': 0,
                  'numberOfDocumentsFailed': 0,
                  'numberOfDocumentsScanned': 0,
                  'numberOfMetadataDocumentsModified': 0,
                  'numberOfMetadataDocumentsScanned': 0,
                  'numberOfModifiedDocumentsIndexed': 0,
                  'numberOfNewDocumentsIndexed': 0},
  'status': 'STARTING',
  'updatedAt': datetime.datetime(2024, 4, 19, 1, 50, 19, 966735, tzinfo=tzlocal())}


In [20]:
%timeit
# Get job 
while(job['status']!='COMPLETE' ):
  get_job_response = bedrock_agent_client.get_ingestion_job(
      knowledgeBaseId = kb['knowledgeBaseId'],
        dataSourceId = ds["dataSourceId"],
        ingestionJobId = job["ingestionJobId"]
  )
  job = get_job_response["ingestionJob"]
pp.pprint(job)
time.sleep(40)

{ 'dataSourceId': '0T3LXJI3U2',
  'ingestionJobId': 'LL1YQ0TLCA',
  'knowledgeBaseId': 'K6W4UFQTNU',
  'startedAt': datetime.datetime(2024, 4, 19, 1, 50, 19, 966735, tzinfo=tzlocal()),
  'statistics': { 'numberOfDocumentsDeleted': 0,
                  'numberOfDocumentsFailed': 1718,
                  'numberOfDocumentsScanned': 2000,
                  'numberOfMetadataDocumentsModified': 0,
                  'numberOfMetadataDocumentsScanned': 2000,
                  'numberOfModifiedDocumentsIndexed': 0,
                  'numberOfNewDocumentsIndexed': 282},
  'status': 'COMPLETE',
  'updatedAt': datetime.datetime(2024, 4, 19, 1, 51, 29, 484909, tzinfo=tzlocal())}


In [21]:
kb_id = kb["knowledgeBaseId"]
pp.pprint(kb_id)

'K6W4UFQTNU'


In [22]:
%store kb_id

Stored 'kb_id' (str)


In [23]:
# try out KB using RetrieveAndGenerate API
bedrock_agent_runtime_client = boto3.client("bedrock-agent-runtime", region_name=region_name)
model_id = "anthropic.claude-3-haiku-20240307-v1:0" 
model_arn = f'arn:aws:bedrock:us-east-1::foundation-model/{model_id}'

In [24]:
query = "Tell me a breakfast pizza recipe that I can make under 30 minutes"
response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        'text': query
    },
    retrieveAndGenerateConfiguration={
        'type': 'KNOWLEDGE_BASE',
        'knowledgeBaseConfiguration': {
            'knowledgeBaseId': kb_id,
            'modelArn': model_arn
        }
    },
)

generated_text = response['output']['text']
pp.pprint(generated_text)

('Based on the search results, there is a quick and easy breakfast pizza '
 'recipe that can be made in under 30 minutes. The recipe involves adding '
 'bread, cheese, and fruit to create a complete meal. The search results '
 'indicate this recipe is "really easy to make on a busy day" and "literally '
 'takes 5 minutes to make".')


In [25]:
## print out the source attribution/citations from the original documents to see if the response generated belongs to the context.
citations = response["citations"]
contexts = []
for citation in citations:
    retrievedReferences = citation["retrievedReferences"]
    for reference in retrievedReferences:
        contexts.append(reference["content"]["text"])

pp.pprint(contexts)

[ 'really easy to make on a busy day.',
  'this literally takes 5 minutes to make and is a perfect lunch when you '
  'don’t have time or don’t feel like making anything. adding some bread, '
  'cheese and fruit will make a complete meal! for more healthy gluten-free, '
  'low-gi, pesco-vegetarian recipes, please visit '
  'www.innerharmonynutrition.com.']


In [26]:
# retreive api for fetching only the relevant context.
relevant_documents = bedrock_agent_runtime_client.retrieve(
    retrievalQuery= {
        'text': query
    },
    knowledgeBaseId=kb_id,
    retrievalConfiguration= {
        'vectorSearchConfiguration': {
            'numberOfResults': 3 # will fetch top 3 documents which matches closely with the query.
        }
    }
)

In [27]:
pp.pprint(relevant_documents["retrievalResults"])

[ { 'content': { 'text': 'a quick, easy breakfast that taste good and is '
                         'healthy too!'},
    'location': {'s3Location': {'uri': 's3://test-kb1/1864.txt'}, 'type': 'S3'},
    'metadata': { 'minutes': '5',
                  'n_ingredients': '3',
                  'name': 'a breakfast yogurt parfait  granola',
                  'steps': "['place 1 / 3 cup of granola in a parfait glass', "
                           "'top with half of the yogurt', 'repeat layers', "
                           "'top with remaining granola']",
                  'x-amz-bedrock-kb-source-uri': 's3://test-kb1/1864.txt'},
    'score': 1.5535007},
  { 'content': {'text': 'a quick, easy bread to make.'},
    'location': {'s3Location': {'uri': 's3://test-kb1/769.txt'}, 'type': 'S3'},
    'metadata': { 'minutes': '55',
                  'n_ingredients': '3',
                  'name': '1 2 3 bread',
                  'steps': "['bake 50 minutes at 400 degrees']",
                  'x-amz-b