In [1]:
%pip install -U opensearch-py
%pip install -U boto3
%pip install -U retrying

Collecting opensearch-py
  Using cached opensearch_py-2.6.0-py2.py3-none-any.whl.metadata (7.0 kB)
Collecting Events (from opensearch-py)
  Using cached Events-0.5-py3-none-any.whl.metadata (3.9 kB)
Using cached opensearch_py-2.6.0-py2.py3-none-any.whl (311 kB)
Using cached Events-0.5-py3-none-any.whl (6.8 kB)
Installing collected packages: Events, opensearch-py
Successfully installed Events-0.5 opensearch-py-2.6.0
Note: you may need to restart the kernel to use updated packages.
Collecting boto3
  Downloading boto3-1.34.115-py3-none-any.whl.metadata (6.6 kB)
Collecting botocore<1.35.0,>=1.34.115 (from boto3)
  Downloading botocore-1.34.115-py3-none-any.whl.metadata (5.7 kB)
Downloading boto3-1.34.115-py3-none-any.whl (139 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.3/139.3 kB[0m [31m18.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading botocore-1.34.115-py3-none-any.whl (12.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.3/12.3 MB[0m 

In [1]:
import warnings
warnings.filterwarnings('ignore')

In [2]:
import json
import os
import boto3
from botocore.exceptions import ClientError
import pprint
from utility import create_bedrock_execution_role, create_oss_policy_attach_bedrock_execution_role, create_policies_in_oss, interactive_sleep
import random
from retrying import retry
suffix = random.randrange(200, 900)

sts_client = boto3.client('sts')
boto3_session = boto3.session.Session()
region_name = boto3_session.region_name
bedrock_agent_client = boto3_session.client('bedrock-agent', region_name=region_name)
service = 'aoss'
s3_client = boto3.client('s3')
account_id = sts_client.get_caller_identity()["Account"]
s3_suffix = f"{region_name}-{account_id}"
bucket_name = f'bedrock-kb-{s3_suffix}' # replace it with your bucket name.
pp = pprint.PrettyPrinter(indent=2)

In [3]:
print(bucket_name)

bedrock-kb-us-east-1-298241842438


In [4]:
# Check if bucket exists, and if not create S3 bucket for knowledge base data source
try:
    s3_client.head_bucket(Bucket=bucket_name)
    print(f'Bucket {bucket_name} Exists')
except ClientError as e:
    print(f'Creating bucket {bucket_name}')
    s3bucket = s3_client.create_bucket(
        Bucket=bucket_name
    )

Bucket bedrock-kb-us-east-1-298241842438 Exists


In [5]:
# Create vector store

import boto3
import time
vector_store_name = f'bedrock-sample-rag-{suffix}'
index_name = f"bedrock-sample-rag-index-{suffix}"
aoss_client = boto3_session.client('opensearchserverless')
bedrock_kb_execution_role = create_bedrock_execution_role(bucket_name=bucket_name)
bedrock_kb_execution_role_arn = bedrock_kb_execution_role['Role']['Arn']

# create security, network and data access policies within OSS
encryption_policy, network_policy, access_policy = create_policies_in_oss(vector_store_name=vector_store_name,
                       aoss_client=aoss_client,
                       bedrock_kb_execution_role_arn=bedrock_kb_execution_role_arn)
collection = aoss_client.create_collection(name=vector_store_name,type='VECTORSEARCH')

pp.pprint(collection)

{ 'ResponseMetadata': { 'HTTPHeaders': { 'connection': 'keep-alive',
                                         'content-length': '314',
                                         'content-type': 'application/x-amz-json-1.0',
                                         'date': 'Thu, 30 May 2024 17:27:08 '
                                                 'GMT',
                                         'x-amzn-requestid': '37010073-11d1-4958-b596-9ae0065e900a'},
                        'HTTPStatusCode': 200,
                        'RequestId': '37010073-11d1-4958-b596-9ae0065e900a',
                        'RetryAttempts': 0},
  'createCollectionDetail': { 'arn': 'arn:aws:aoss:us-east-1:298241842438:collection/2apv8rhxaz8wpegt2r3d',
                              'createdDate': 1717090028524,
                              'id': '2apv8rhxaz8wpegt2r3d',
                              'kmsKeyArn': 'auto',
                              'lastModifiedDate': 1717090028524,
                             

In [6]:
# Get the OpenSearch serverless collection URL
collection_id = collection['createCollectionDetail']['id']
host = collection_id + '.' + region_name + '.aoss.amazonaws.com'
print(host)

2apv8rhxaz8wpegt2r3d.us-east-1.aoss.amazonaws.com


In [7]:
# wait for collection creation
# This can take couple of minutes to finish
response = aoss_client.batch_get_collection(names=[vector_store_name])
# Periodically check collection status
while (response['collectionDetails'][0]['status']) == 'CREATING':
    print('Creating collection...')
    interactive_sleep(30)
    response = aoss_client.batch_get_collection(names=[vector_store_name])
print('\nCollection successfully created:')
pp.pprint(response["collectionDetails"])

Creating collection...
Done!.........................
Creating collection...
Done!.........................
Creating collection...
Done!.........................
Creating collection...
Done!.........................
Creating collection...
Done!.........................
Creating collection...
Done!.........................
Creating collection...
Done!.........................
Creating collection...
Done!.........................
Creating collection...
Done!.........................
Creating collection...
Done!.........................

Collection successfully created:
[ { 'arn': 'arn:aws:aoss:us-east-1:298241842438:collection/2apv8rhxaz8wpegt2r3d',
    'collectionEndpoint': 'https://2apv8rhxaz8wpegt2r3d.us-east-1.aoss.amazonaws.com',
    'createdDate': 1717090028524,
    'dashboardEndpoint': 'https://2apv8rhxaz8wpegt2r3d.us-east-1.aoss.amazonaws.com/_dashboards',
    'id': '2apv8rhxaz8wpegt2r3d',
    'kmsKeyArn': 'auto',
    'lastModifiedDate': 1717090326940,
    'name': 'bedrock-sample

In [8]:
# create opensearch serverless access policy and attach it to Bedrock execution role
try:
    create_oss_policy_attach_bedrock_execution_role(collection_id=collection_id,
                                                    bedrock_kb_execution_role=bedrock_kb_execution_role)
    # It can take up to a minute for data access rules to be enforced
    interactive_sleep(60)
except Exception as e:
    print("Policy already exists")
    pp.pprint(e)

Opensearch serverless arn:  arn:aws:iam::298241842438:policy/AmazonBedrockOSSPolicyForKnowledgeBase_257
Done!.......................................................


In [9]:
# Create the vector index in Opensearch serverless, with the knn_vector field index mapping, specifying the dimension size, name and engine.
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth, RequestError
credentials = boto3.Session().get_credentials()
awsauth = auth = AWSV4SignerAuth(credentials, region_name, service)

index_name = f"bedrock-sample-index-{suffix}"
body_json = {
   "settings": {
      "index.knn": "true",
       "number_of_shards": 1,
       "knn.algo_param.ef_search": 512,
       "number_of_replicas": 0,
   },
   "mappings": {
      "properties": {
         "vector": {
            "type": "knn_vector",
            "dimension": 1536,
             "method": {
                 "name": "hnsw",
                 "engine": "faiss",
                 "space_type": "l2"
             },
         },
         "text": {
            "type": "text"
         },
         "text-metadata": {
            "type": "text"         }
      }
   }
}

# Build the OpenSearch client
oss_client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)

In [10]:
# Create index
try:
    response = oss_client.indices.create(index=index_name, body=json.dumps(body_json))
    print('\nCreating index:')
    pp.pprint(response)

    # index creation can take up to a minute
    interactive_sleep(60)
except RequestError as e:
    # you can delete the index if its already exists
    # oss_client.indices.delete(index=index_name)
    print(f'Error while trying to create the index, with error {e.error}\nyou may unmark the delete above to delete, and recreate the index')


Creating index:
{ 'acknowledged': True,
  'index': 'bedrock-sample-index-218',
  'shards_acknowledged': True}
Done!.......................................................


In [11]:
#Upload files manually

s3_client = boto3.client("s3")

In [12]:
opensearchServerlessConfiguration = {
            "collectionArn": collection["createCollectionDetail"]['arn'],
            "vectorIndexName": index_name,
            "fieldMapping": {
                "vectorField": "vector",
                "textField": "text",
                "metadataField": "text-metadata"
            }
        }

# Ingest strategy - How to ingest data from the data source
chunkingStrategyConfiguration = {
    "chunkingStrategy": "FIXED_SIZE",
    "fixedSizeChunkingConfiguration": {
        "maxTokens": 256,
        "overlapPercentage": 20
    }
}

# The data source to ingest documents from, into the OpenSearch serverless knowledge base index
s3Configuration = {
    "bucketArn": f"arn:aws:s3:::{bucket_name}",
    # "inclusionPrefixes":["*.*"] # you can use this if you want to create a KB using data within s3 prefixes.
}

# The embedding model used by Bedrock to embed ingested documents, and realtime prompts
embeddingModelArn = f"arn:aws:bedrock:{region_name}::foundation-model/amazon.titan-embed-text-v1"

name = f"bedrock-sample-knowledge-base-{suffix}"
description = "Knowledge base de productos de ferreteria"
roleArn = bedrock_kb_execution_role_arn

In [13]:
# Create a KnowledgeBase
from retrying import retry

@retry(wait_random_min=1000, wait_random_max=2000,stop_max_attempt_number=7)
def create_knowledge_base_func():
    create_kb_response = bedrock_agent_client.create_knowledge_base(
        name = name,
        description = description,
        roleArn = roleArn,
        knowledgeBaseConfiguration = {
            "type": "VECTOR",
            "vectorKnowledgeBaseConfiguration": {
                "embeddingModelArn": embeddingModelArn
            }
        },
        storageConfiguration = {
            "type": "OPENSEARCH_SERVERLESS",
            "opensearchServerlessConfiguration":opensearchServerlessConfiguration
        }
    )
    return create_kb_response["knowledgeBase"]

In [14]:
try:
    kb = create_knowledge_base_func()
except Exception as err:
    print(f"{err=}, {type(err)=}")

In [15]:
# Get KnowledgeBase 
get_kb_response = bedrock_agent_client.get_knowledge_base(knowledgeBaseId = kb['knowledgeBaseId'])

In [16]:
# Create a DataSource in KnowledgeBase 
create_ds_response = bedrock_agent_client.create_data_source(
    name = name,
    description = description,
    knowledgeBaseId = kb['knowledgeBaseId'],
    dataSourceConfiguration = {
        "type": "S3",
        "s3Configuration":s3Configuration
    }
)
ds = create_ds_response["dataSource"]

In [17]:
# Get DataSource 
bedrock_agent_client.get_data_source(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

{'ResponseMetadata': {'RequestId': 'ea3cfd4f-c624-4819-b947-ebb93afca798',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Thu, 30 May 2024 17:37:32 GMT',
   'content-type': 'application/json',
   'content-length': '438',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'ea3cfd4f-c624-4819-b947-ebb93afca798',
   'x-amz-apigw-id': 'YmHmgHTgIAMEITw=',
   'x-amzn-trace-id': 'Root=1-6658b95c-2403ede76b220d921bdf00ab'},
  'RetryAttempts': 0},
 'dataSource': {'createdAt': datetime.datetime(2024, 5, 30, 17, 37, 31, 9985, tzinfo=tzlocal()),
  'dataDeletionPolicy': 'DELETE',
  'dataSourceConfiguration': {'s3Configuration': {'bucketArn': 'arn:aws:s3:::bedrock-kb-us-east-1-298241842438'},
   'type': 'S3'},
  'dataSourceId': 'TLD7YQDV5U',
  'description': 'Knowledge base de productos de ferreteria',
  'knowledgeBaseId': 'RXPLHM7XDB',
  'name': 'bedrock-sample-knowledge-base-218',
  'status': 'AVAILABLE',
  'updatedAt': datetime.datetime(2024, 5, 30, 17, 37, 31, 9985, tzinfo=tzlocal())}}

In [18]:
# Start an ingestion job
start_job_response = bedrock_agent_client.start_ingestion_job(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

In [19]:
job = start_job_response["ingestionJob"]

In [20]:
# Get job 
while(job['status']!='COMPLETE' ):
    get_job_response = bedrock_agent_client.get_ingestion_job(
      knowledgeBaseId = kb['knowledgeBaseId'],
        dataSourceId = ds["dataSourceId"],
        ingestionJobId = job["ingestionJobId"]
  )
    job = get_job_response["ingestionJob"]
pp.pprint(job)
interactive_sleep(40)

{ 'dataSourceId': 'TLD7YQDV5U',
  'ingestionJobId': 'X0DE0YRPRG',
  'knowledgeBaseId': 'RXPLHM7XDB',
  'startedAt': datetime.datetime(2024, 5, 30, 17, 37, 37, 581924, tzinfo=tzlocal()),
  'statistics': { 'numberOfDocumentsDeleted': 0,
                  'numberOfDocumentsFailed': 0,
                  'numberOfDocumentsScanned': 5000,
                  'numberOfMetadataDocumentsModified': 0,
                  'numberOfMetadataDocumentsScanned': 0,
                  'numberOfModifiedDocumentsIndexed': 0,
                  'numberOfNewDocumentsIndexed': 5000},
  'status': 'COMPLETE',
  'updatedAt': datetime.datetime(2024, 5, 30, 17, 47, 59, 273572, tzinfo=tzlocal())}
Done!...................................


In [21]:
# Print the knowledge base Id in bedrock, that corresponds to the Opensearch index in the collection we created before, we will use it for the invocation later
kb_id = kb["knowledgeBaseId"]
%store kb_id

Stored 'kb_id' (str)


In [23]:
bedrock_agent_client = boto3_session.client('bedrock-agent', region_name=region_name)

In [24]:
bedrock_agent_client.delete_data_source(dataSourceId = ds["dataSourceId"], knowledgeBaseId=kb['knowledgeBaseId'])
bedrock_agent_client.delete_knowledge_base(knowledgeBaseId=kb['knowledgeBaseId'])
oss_client.indices.delete(index=index_name)
aoss_client.delete_collection(id=collection_id)
aoss_client.delete_access_policy(type="data", name=access_policy['accessPolicyDetail']['name'])
aoss_client.delete_security_policy(type="network", name=network_policy['securityPolicyDetail']['name'])
aoss_client.delete_security_policy(type="encryption", name=encryption_policy['securityPolicyDetail']['name'])

{'ResponseMetadata': {'RequestId': '25774035-2a3a-4f63-adbf-83181e43be25',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '25774035-2a3a-4f63-adbf-83181e43be25',
   'date': 'Wed, 29 May 2024 18:49:28 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '2',
   'connection': 'keep-alive'},
  'RetryAttempts': 0}}

In [25]:
from utility import delete_iam_role_and_policies
delete_iam_role_and_policies()

0