## Step-1: Import dependencies for this blogpost steps and create boto3 clients

This Notebook is from AWS blogs, please look into the steps here
[Dive deep into vector data stores using Amazon Bedrock Knowledge Bases](https://aws.amazon.com/blogs/machine-learning/dive-deep-into-vector-data-stores-using-amazon-bedrock-knowledge-bases/)

In [None]:
%pip install opensearch-py
%pip install retrying

from urllib.request import urlretrieve
import json
import os
import boto3
import random
import time
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth, RequestError
credentials = boto3.Session().get_credentials()
service = 'aoss'
suffix = random.randrange(200, 900)
boto3_session = boto3.session.Session()
region_name = boto3_session.region_name
iam_client = boto3_session.client('iam')
account_number = boto3.client('sts').get_caller_identity().get('Account')
identity = boto3.client('sts').get_caller_identity()['Arn']
s3_client = boto3.client("s3", region_name=region_name)
aoss_client = boto3_session.client('opensearchserverless')
bedrock_agent_client = boto3_session.client('bedrock-agent', region_name=region_name)
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime', region_name=region_name)
rds = boto3.client('rds', region_name=region_name)
# Create Secret Manager Client to retrieve secret values
secrets_manager = boto3.client('secretsmanager', region_name=region_name)
# Create RDS Data Client to run queries against Aurora PostgreSQL Database
rds_data_client = boto3.client('rds-data', region_name=region_name)
awsauth = auth = AWSV4SignerAuth(credentials, region_name, service)

## Step-2: Create Amazon S3 Bucket to be used as source for vector databases and uploading source file
- Set-up Amazon S3 Bucket, this step is optional and you can use one of your existing S3 bucket. If you need to create an Amazon S3 bucket which will be used to upload source data for Vector Databases, below is the sample code. Ensure that you are following your organization’s best practices and guidelines while creating new Amazon S3 bucket.

In [None]:
# Set the bucket name
bucket_name = "bedrock-kb-blogpost-986127"

if region_name in ('af-south-1','ap-east-1','ap-northeast-1','ap-northeast-2','ap-northeast-3','ap-south-1','ap-south-2','ap-southeast-1','ap-southeast-2','ap-southeast-3','ca-central-1','cn-north-1','cn-northwest-1','EU','eu-central-1','eu-north-1','eu-south-1','eu-south-2','eu-west-1','eu-west-2','eu-west-3','me-south-1','sa-east-1','us-east-2','us-gov-east-1','us-gov-west-1','us-west-1','us-west-2'):
    # Create the bucket
    response = s3_client.create_bucket(
        Bucket=bucket_name,
        CreateBucketConfiguration={
                'LocationConstraint': region_name
            }
    )
    # Print the response and validate that value for HTTPStatusCode is 200
    print(response)
else:
    
    # Create the bucket
    response = s3_client.create_bucket(
        Bucket=bucket_name
    )
    # Print the response and validate that value for HTTPStatusCode is 200
    print(response)

## Step-3: Import sample source data and metadata into S3, this will be input for Vector Databases

In [None]:
# Leverage Amazon Shareholder news letter as datasets for loading into vector databases for this Blogpost 
urls = [
    'https://s2.q4cdn.com/299287126/files/doc_financials/2023/ar/2022-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2022/ar/2021-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2021/ar/Amazon-2020-Shareholder-Letter-and-1997-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2020/ar/2019-Shareholder-Letter.pdf'
]

# Define standard file names which be leveraged while loading data to Amazon S3
filenames = [
    'AMZN-2022-Shareholder-Letter.pdf',
    'AMZN-2021-Shareholder-Letter.pdf',
    'AMZN-2020-Shareholder-Letter.pdf',
    'AMZN-2019-Shareholder-Letter.pdf'
]

# Create local temporary directory to download files, before uploading to Amazon S3
!mkdir -p ./data

# Assing local directory path to a python variable
local_data_path = "./data/"

# Assign S3 bucket name to a python variable. This was created in Step-2 above.
# This bucket will be used as source for vector databases and uploading source files.
data_s3_bucket = bucket_name

# Define S3 Prefix with in the bucket to upload files
data_s3_prefix = 'shareholder_newsletter'

# Download file to local_data_path
for idx, url in enumerate(urls):
    file_path = local_data_path + filenames[idx]
    urlretrieve(url, file_path)

# define metadata corresponding to Shareholder letters
metadata_2022 = {
    "metadataAttributes": {
        "company": "Amazon",
        "document_type": "Shareholder Letter",
        "year": 2022
    }
}


metadata_2021 = {
    "metadataAttributes": {
        "company": "Amazon",
        "document_type": "Shareholder Letter",
        "year": 2021
    }
}

metadata_2020 = {
    "metadataAttributes": {
        "company": "Amazon",
        "document_type": "Shareholder Letter",
        "year": 2020
    }
}

metadata_2019 = {
    "metadataAttributes": {
        "company": "Amazon",
        "document_type": "Shareholder Letter",
        "year": 2019
    }
}

# Create metadata files in local_data_path which will be uploaded to Amazon S3

# Create metadata file for 2022
metadata_2022_json = json.dumps(metadata_2022)

with open(f"{local_data_path}AMZN-2022-Shareholder-Letter.pdf.metadata.json", "w") as f:
    f.write(str(metadata_2022_json))

f.close()

# Create metadata file for 2021
metadata_2021_json = json.dumps(metadata_2021)

with open(f"{local_data_path}AMZN-2021-Shareholder-Letter.pdf.metadata.json", "w") as f:
    f.write(str(metadata_2021_json))

f.close()

# Create metadata file for 2020
metadata_2020_json = json.dumps(metadata_2020)

with open(f"{local_data_path}AMZN-2020-Shareholder-Letter.pdf.metadata.json", "w") as f:
    f.write(str(metadata_2020_json))

f.close()

# Create metadata file for 2019
metadata_2019_json = json.dumps(metadata_2019)

with open(f"{local_data_path}AMZN-2019-Shareholder-Letter.pdf.metadata.json", "w") as f:
    f.write(str(metadata_2019_json))

f.close()
    
# Upload files to Amazon S3
def uploadDirectory(path,bucket_name):
        for root,dirs,files in os.walk(path):
            for file in files:
                key = data_s3_prefix + '/' + file
                s3_client.upload_file(os.path.join(root,file),bucket_name,key)

uploadDirectory(local_data_path, data_s3_bucket)

# Delete files from local directory
!rm -r ./data/

## Step-4: Define functions for IAM Policy / Role for creating Bedrock Knowledge Bases
- Define Function which will be used to create execution role for Amazon Bedrock
- Define Functions which will be used to attach policies related to Amazon OpenSearch and Amazon Aurora to Amazon Bedrock execution role

In [None]:
encryption_policy_name = f"bedrock-sample-rag-sp-{suffix}"
network_policy_name = f"bedrock-sample-rag-np-{suffix}"
access_policy_name = f'bedrock-sample-rag-ap-{suffix}'
bedrock_execution_role_name = f'AmazonBedrockExecutionRoleForKnowledgeBase_{suffix}'
fm_policy_name = f'AmazonBedrockFoundationModelPolicyForKnowledgeBase_{suffix}'
s3_policy_name = f'AmazonBedrockS3PolicyForKnowledgeBase_{suffix}'
oss_policy_name = f'AmazonBedrockOSSPolicyForKnowledgeBase_{suffix}'
rds_policy_name = f'AmazonBedrockRDSPolicyForKnowledgeBase_{suffix}'
aurora_policy_name = f'AmazonBedrockAuroraPolicyForKnowledgeBase_{suffix}'
oss_vector_store_name = f'os-shareholder-letter-{suffix}'
oss_index_name = "os_shareholder_letter"
aurora_vector_db_cluster = f'aurora-shareholder-letter-{suffix}'
aurora_vector_db_instance = f'aurora-shareholder-letter-instance-{suffix}'
aurora_database_name = 'vectordb'
aurora_schema_name = 'bedrock_kb'
aurora_table_name = 'aurora_shareholder_letter'


def create_bedrock_execution_role(bucket_name):
    foundation_model_policy_document = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "bedrock:InvokeModel",
                ],
                "Resource": [
                    f"arn:aws:bedrock:{region_name}::foundation-model/amazon.titan-embed-text-v2:0" 
                ]
            }
        ]
    }

    s3_policy_document = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:ListBucket"
                ],
                "Resource": [f'arn:aws:s3:::{data_s3_bucket}', f'arn:aws:s3:::{data_s3_bucket}/*'], 
                "Condition": {
                    "StringEquals": {
                        "aws:ResourceAccount": f"{account_number}"
                    }
                }
            }
        ]
    }

    assume_role_policy_document = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": "bedrock.amazonaws.com"
                },
                "Action": "sts:AssumeRole"
            }
        ]
    }
    
    
    # create policies based on the policy documents
    fm_policy = iam_client.create_policy(
        PolicyName=fm_policy_name,
        PolicyDocument=json.dumps(foundation_model_policy_document),
        Description='Policy for accessing foundation model',
    )

    s3_policy = iam_client.create_policy(
        PolicyName=s3_policy_name,
        PolicyDocument=json.dumps(s3_policy_document),
        Description='Policy for reading documents from s3')

    # create bedrock execution role
    bedrock_kb_execution_role = iam_client.create_role(
        RoleName=bedrock_execution_role_name,
        AssumeRolePolicyDocument=json.dumps(assume_role_policy_document),
        Description='Amazon Bedrock Knowledge Base Execution Role for accessing OSS and S3',
        MaxSessionDuration=3600
    )

    # fetch arn of the policies and role created above
    bedrock_kb_execution_role_arn = bedrock_kb_execution_role['Role']['Arn']
    s3_policy_arn = s3_policy["Policy"]["Arn"]
    fm_policy_arn = fm_policy["Policy"]["Arn"]

    # attach policies to Amazon Bedrock execution role
    iam_client.attach_role_policy(
        RoleName=bedrock_kb_execution_role["Role"]["RoleName"],
        PolicyArn=fm_policy_arn
    )
    iam_client.attach_role_policy(
        RoleName=bedrock_kb_execution_role["Role"]["RoleName"],
        PolicyArn=s3_policy_arn
    )
    return bedrock_kb_execution_role


def create_oss_policy_attach_bedrock_execution_role(collection_id, bedrock_kb_execution_role):
    # define oss policy document
    oss_policy_document = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "aoss:APIAccessAll"
                ],
                "Resource": [
                    f"arn:aws:aoss:{region_name}:{account_number}:collection/{collection_id}"
                ]
            }
        ]
    }
    oss_policy = iam_client.create_policy(
        PolicyName=oss_policy_name,
        PolicyDocument=json.dumps(oss_policy_document),
        Description='Policy for accessing opensearch serverless',
    )
    oss_policy_arn = oss_policy["Policy"]["Arn"]
    print("Opensearch serverless arn: ", oss_policy_arn)

    iam_client.attach_role_policy(
        RoleName=bedrock_kb_execution_role["Role"]["RoleName"],
        PolicyArn=oss_policy_arn
    )
    return None


def create_policies_in_oss(vector_store_name, aoss_client, bedrock_kb_execution_role_arn):
    encryption_policy = aoss_client.create_security_policy(
        name=encryption_policy_name,
        policy=json.dumps(
            {
                'Rules': [{'Resource': ['collection/' + vector_store_name],
                           'ResourceType': 'collection'}],
                'AWSOwnedKey': True
            }),
        type='encryption'
    )

    network_policy = aoss_client.create_security_policy(
        name=network_policy_name,
        policy=json.dumps(
            [
                {'Rules': [{'Resource': ['collection/' + vector_store_name],
                            'ResourceType': 'collection'}],
                 'AllowFromPublic': True}
            ]),
        type='network'
    )
    access_policy = aoss_client.create_access_policy(
        name=access_policy_name,
        policy=json.dumps(
            [
                {
                    'Rules': [
                        {
                            'Resource': ['collection/' + vector_store_name],
                            'Permission': [
                                'aoss:CreateCollectionItems',
                                'aoss:DeleteCollectionItems',
                                'aoss:UpdateCollectionItems',
                                'aoss:DescribeCollectionItems'],
                            'ResourceType': 'collection'
                        },
                        {
                            'Resource': ['index/' + vector_store_name + '/*'],
                            'Permission': [
                                'aoss:CreateIndex',
                                'aoss:DeleteIndex',
                                'aoss:UpdateIndex',
                                'aoss:DescribeIndex',
                                'aoss:ReadDocument',
                                'aoss:WriteDocument'],
                            'ResourceType': 'index'
                        }],
                    'Principal': [identity, bedrock_kb_execution_role_arn],
                    'Description': 'Easy data policy'}
            ]),
        type='data'
    )
    return encryption_policy, network_policy, access_policy


def create_rds_policy_attach_bedrock_execution_role(db_cluster_arn, aurora_db_secret_arn, bedrock_kb_execution_role):
    # define rds policy document
    rds_policy_document = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "rds-data:ExecuteStatement",
                    "rds:DescribeDBClusters",
                    "rds-data:BatchExecuteStatement"
                ],
                "Resource": [
                    db_cluster_arn
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "secretsmanager:GetSecretValue",
                    "secretsmanager:DescribeSecret"
                ],
                "Resource": [
                    aurora_db_secret_arn
                ]
            }
        ]
    }
    rds_policy = iam_client.create_policy(
        PolicyName=rds_policy_name,
        PolicyDocument=json.dumps(rds_policy_document),
        Description='Policy for accessing RDS Aurora Database',
    )
    rds_policy_arn = rds_policy["Policy"]["Arn"]
    print("RDS Aurora Policy arn: ", rds_policy_arn)

    iam_client.attach_role_policy(
        RoleName=bedrock_kb_execution_role["Role"]["RoleName"],
        PolicyArn=rds_policy_arn
    )
    return None

## Step-5: Create Execution Role for Amazon Bedrock
- This role will be used as execution role while creating Amazon Bedrock Knowledge base

In [None]:
bedrock_kb_execution_role = create_bedrock_execution_role(bucket_name=data_s3_bucket)
bedrock_kb_execution_role_arn = bedrock_kb_execution_role['Role']['Arn']

## Step-6: Create Amazon OpenSearch Vector Collection
- This will be used in Amazon Bedrock Knowledge bases

In [None]:
# create security, network and data access policies within OSS
encryption_policy, network_policy, access_policy = create_policies_in_oss(vector_store_name=oss_vector_store_name,
                       aoss_client=aoss_client,
                       bedrock_kb_execution_role_arn=bedrock_kb_execution_role_arn)

# Create OpenSearch Serverless Vector Collection
collection = aoss_client.create_collection(name=oss_vector_store_name,type='VECTORSEARCH')

# Get the OpenSearch serverless collection URL
collection_id = collection['createCollectionDetail']['id']
host = collection_id + '.' + region_name + '.aoss.amazonaws.com'
print(host)

# wait for collection creation
# This can take couple of minutes to finish
response = aoss_client.batch_get_collection(names=[oss_vector_store_name])
# Periodically check collection status
while (response['collectionDetails'][0]['status']) == 'CREATING':
    print('Creating collection...')
    time.sleep(30)
    response = aoss_client.batch_get_collection(names=[oss_vector_store_name])
print('\nCollection successfully created:')


# create opensearch serverless access policy and attach it to Bedrock execution role
try:
    create_oss_policy_attach_bedrock_execution_role(collection_id=collection_id,
                                                    bedrock_kb_execution_role=bedrock_kb_execution_role)
    # It can take up to a minute for data access rules to be enforced
    time.sleep(60)
except Exception as e:
    print("Policy already exists")
    pp.pprint(e)

## Step-7: Create Index in Amazon OpenSearch Vector Collection
- This index will be managed via Bedrock Knowledge bases

In [None]:
body_json = {
   "settings": {
      "index.knn": "true",
       "number_of_shards": 1,
       "knn.algo_param.ef_search": 512,
       "number_of_replicas": 0,
   },
   "mappings": {
      "properties": {
         "vector": {
            "type": "knn_vector",
            "dimension": 1024,
             "method": {
                 "name": "hnsw",
                 "engine": "faiss",
                 "space_type": "l2"
             },
         },
         "text": {
            "type": "text"
         },
         "text-metadata": {
            "type": "text"         }
      }
   }
}

# Build the OpenSearch client
oss_client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)

# Create index
try:
    response = oss_client.indices.create(index=oss_index_name, body=json.dumps(body_json))
    print('Creating index:')
    # index creation can take up to a minute
    time.sleep(60)
    print('Index Creation Completed:')
except RequestError as e:
    # you can delete the index if its already exists
    # oss_client.indices.delete(index=oss_index_name)
    print(f'Error while trying to create the index, with error {e.error}\nyou may unmark the delete above to delete, and recreate the index')

## Step-8: Create Knowledgebase in Amazon Bedrock pointing to Amazon OpenSearch Vector Collection and Index created in Step-6 and Step-7

In [None]:
opensearchServerlessConfiguration = {
            "collectionArn": collection["createCollectionDetail"]['arn'],
            "vectorIndexName": oss_index_name,
            "fieldMapping": {
                "vectorField": "vector",
                "textField": "text",
                "metadataField": "text-metadata"
            }
        }

# The embedding model used by Bedrock to embed ingested documents, and realtime prompts
embeddingModelArn = f"arn:aws:bedrock:{region_name}::foundation-model/amazon.titan-embed-text-v2:0"

name = f"kb-os-shareholder-letter-{suffix}"
description = "Amazon shareholder letter knowledge base."
roleArn = bedrock_kb_execution_role_arn

# Create a KnowledgeBase
from retrying import retry

@retry(wait_random_min=1000, wait_random_max=2000,stop_max_attempt_number=7)
def create_knowledge_base_func():
    create_kb_response = bedrock_agent_client.create_knowledge_base(
        name = name,
        description = description,
        roleArn = roleArn,
        knowledgeBaseConfiguration = {
            "type": "VECTOR",
            "vectorKnowledgeBaseConfiguration": {
                "embeddingModelArn": embeddingModelArn
            }
        },
        storageConfiguration = {
            "type": "OPENSEARCH_SERVERLESS",
            "opensearchServerlessConfiguration":opensearchServerlessConfiguration
        }
    )
    return create_kb_response["knowledgeBase"]


try:
    kb = create_knowledge_base_func()
except Exception as err:
    print(f"{err=}, {type(err)=}")
    
# Get KnowledgeBase 
get_kb_response = bedrock_agent_client.get_knowledge_base(knowledgeBaseId = kb['knowledgeBaseId'])

print(f'OpenSearch Knowledge Response: {get_kb_response}')

## Step-9: Create Datasource for Knowledgebase created in step-8

In [None]:
# Ingest strategy - How to ingest data from the data source
chunkingStrategyConfiguration = {
    "chunkingStrategy": "FIXED_SIZE",
    "fixedSizeChunkingConfiguration": {
        "maxTokens": 512,
        "overlapPercentage": 20
    }
}

# The data source to ingest documents from, into the OpenSearch serverless knowledge base index
s3Configuration = {
    "bucketArn": f"arn:aws:s3:::{data_s3_bucket}",
    "inclusionPrefixes": [f"{data_s3_prefix}"] # you can use this if you want to create a KB using data within s3 prefixes.
    }
# Create a DataSource in KnowledgeBase 
create_ds_response = bedrock_agent_client.create_data_source(
    name = f'{name}-{data_s3_bucket}',
    description = description,
    knowledgeBaseId = kb['knowledgeBaseId'],
    dataSourceConfiguration = {
        "type": "S3",
        "s3Configuration":s3Configuration
    },
    vectorIngestionConfiguration = {
        "chunkingConfiguration": chunkingStrategyConfiguration
    }
)
ds = create_ds_response["dataSource"]

ds

## Step-10: Start Ingestion Job for Amazon Bedrock Knowledge base pointing to Amazon OpenSearch
- To generate vector embeddings for data in Amazon S3

In [None]:
ingest_jobs=[]
# Start an ingestion job
try:
    start_job_response = bedrock_agent_client.start_ingestion_job(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])
    job = start_job_response["ingestionJob"]
    print(f"ingestion job started successfully\n")

    while(job['status']!='COMPLETE' ):
        get_job_response = bedrock_agent_client.get_ingestion_job(
          knowledgeBaseId = kb['knowledgeBaseId'],
            dataSourceId = ds["dataSourceId"],
            ingestionJobId = job["ingestionJobId"]
        )
        job = get_job_response["ingestionJob"]

    time.sleep(30)
    print(f"job completed successfully\n")

except Exception as e:
    print(f"Couldn't start job.\n")
    print(e)

## Step-11: Create Aurora Database instance
- This code example creates managed database instance / you can create Serverless instance as well
- <span style='color:red'>Identify “security group Id” and SubnetIds for your VPC before running below step and replace in vpc_security_group_ids and SubnetIds variables below:</span>

In [None]:
# Define database instance parameters
db_instance_identifier = aurora_vector_db_instance
db_cluster_identifier = aurora_vector_db_cluster
engine = 'aurora-postgresql'
db_name = aurora_database_name
db_instance_class = 'db.r6g.2xlarge'
master_username = 'postgres'
# Get Security Group Id(s), for replicating Blogpost steps it can be one associated with Default VPC
vpc_security_group_ids = ['sg-XXXXXXX']
subnet_group_name = 'vectordbsubnetgroup'

response = rds.create_db_subnet_group(
    DBSubnetGroupName=subnet_group_name,
    DBSubnetGroupDescription='Subnet Group for Blogpost Aurora PostgreSql Database Cluster',
    # Get Subnet IDs, for replicating Blogpost steps it can be one associated with Default VPC 
    SubnetIds=[
        'subnet-XXXXXXX',
        'subnet-XXXXXXX',
        'subnet-XXXXXXX',
        'subnet-XXXXXXX',
        'subnet-XXXXXXX',
        'subnet-XXXXXXX'
    ]
)

# Create the Aurora cluster
response = rds.create_db_cluster(
    DBClusterIdentifier=db_cluster_identifier,
    Engine=engine,
    MasterUsername=master_username,
    ManageMasterUserPassword=True,
    DBSubnetGroupName=subnet_group_name,
    VpcSecurityGroupIds=vpc_security_group_ids,
    DatabaseName=db_name,
    EnableHttpEndpoint=True
)

# Create the Aurora instance
response = rds.create_db_instance(
    DBInstanceIdentifier=db_instance_identifier,
    DBInstanceClass=db_instance_class,
    Engine=engine,
    DBClusterIdentifier=db_cluster_identifier
)

## Step-12: Create vector extension, schema, and vector table in the Aurora Database
- <span style='color:red'>Check the status of Aurora Database creation as Available from previous step before running below cell from notebook </span>

In [None]:
##Get Amazon Aurora Database Secret Manager ARN created internally while creating DB Cluster and Database Cluster ARN

describe_db_clusters_response = rds.describe_db_clusters(
    DBClusterIdentifier=db_cluster_identifier,
    IncludeShared=False
)

aurora_db_secret_arn = describe_db_clusters_response['DBClusters'][0]['MasterUserSecret']['SecretArn']
db_cluster_arn = describe_db_clusters_response['DBClusters'][0]['DBClusterArn']


# Create Vector Extension in Aurora PostgreSQL Database which will be used in table creation
vector_extension_create_response = rds_data_client.execute_statement(
    resourceArn=db_cluster_arn,
    secretArn=aurora_db_secret_arn,
    sql='CREATE EXTENSION IF NOT EXISTS vector',
    database=db_name
)

# Create Schema in Aurora PostgreSQL database
schema_create_response = rds_data_client.execute_statement(
    resourceArn=db_cluster_arn,
    secretArn=aurora_db_secret_arn,
    sql='CREATE SCHEMA IF NOT EXISTS bedrock_integration',
    database=db_name
)

# Create Table which store vector embedding corresponding to Shareholder letters
table_create_response = rds_data_client.execute_statement(
    resourceArn=db_cluster_arn,
    secretArn=aurora_db_secret_arn,
    sql='CREATE TABLE IF NOT EXISTS bedrock_integration.share_holder_letter_kb(id uuid PRIMARY KEY, embedding vector(1024), chunks text, metadata json, company varchar(100), document_type varchar(100), year int)',
    database=db_name
)

# Check the status of queries
vector_extension_create_status = 'Success' if vector_extension_create_response['ResponseMetadata']['HTTPStatusCode'] == 200 else 'Fail'
schema_create_status = 'Success' if schema_create_response['ResponseMetadata']['HTTPStatusCode'] == 200 else 'Fail'
table_create_response = 'Success' if table_create_response['ResponseMetadata']['HTTPStatusCode'] == 200 else 'Fail'

# Print the status of queries
print(f"Create Vector Extension Status: {vector_extension_create_status}")
print(f"Create Schema Status: {schema_create_status}")
print(f"Create Table Status: {table_create_response}")

## Step-13: Create Knowledgebase in Amazon Bedrock pointing to Amazon Aurora Database and Table created in Step-11 and Step-12

In [None]:
# Attached RDS related permissions to the Bedrock Knowledgebase role

create_rds_policy_attach_bedrock_execution_role(db_cluster_arn, aurora_db_secret_arn, bedrock_kb_execution_role)

# Define RDS Configuration for Knowledge bases
rdsConfiguration = {
            'credentialsSecretArn': aurora_db_secret_arn,
            'databaseName': db_name,
            'fieldMapping': {
                'metadataField': 'metadata',
                'primaryKeyField': 'id',
                'textField': 'chunks',
                'vectorField': 'embedding'
            },
            'resourceArn': db_cluster_arn,
            'tableName': 'bedrock_integration.share_holder_letter_kb'
        }


# The embedding model used by Bedrock to embed ingested documents, and realtime prompts
embeddingModelArn = f"arn:aws:bedrock:{region_name}::foundation-model/amazon.titan-embed-text-v2:0"

name = f"kb-aurora-shareholder-letter-{suffix}"
description = "Amazon shareholder letter Aurora PG Vector knowledge base."
roleArn = bedrock_kb_execution_role_arn

# Create a KnowledgeBase
from retrying import retry

@retry(wait_random_min=1000, wait_random_max=2000,stop_max_attempt_number=7)
def create_knowledge_base_func():
    create_rds_kb_response = bedrock_agent_client.create_knowledge_base(
        name = name,
        description = description,
        roleArn = roleArn,
        knowledgeBaseConfiguration = {
            "type": "VECTOR",
            "vectorKnowledgeBaseConfiguration": {
                "embeddingModelArn": embeddingModelArn
            }
        },
        storageConfiguration = {
            "type": "RDS",
            "rdsConfiguration":rdsConfiguration
        }
    )
    return create_rds_kb_response["knowledgeBase"]

try:
    rds_kb = create_knowledge_base_func()
except Exception as err:
    print(f"{err=}, {type(err)=}")
    
# Get KnowledgeBase 
get_rds_kb_response = bedrock_agent_client.get_knowledge_base(knowledgeBaseId = rds_kb['knowledgeBaseId'])

print(f'RDS Aurora Knowledge Response: {get_rds_kb_response}')

## Step-14: Create Datasource for Knowledgebase created in Step-13

In [None]:
# Ingest strategy - How to ingest data from the data source
chunkingStrategyConfiguration = {
    "chunkingStrategy": "FIXED_SIZE",
    "fixedSizeChunkingConfiguration": {
        "maxTokens": 512,
        "overlapPercentage": 20
    }
}

# The data source to ingest documents from, into the OpenSearch serverless knowledge base index
s3Configuration = {
    "bucketArn": f"arn:aws:s3:::{data_s3_bucket}",
    "inclusionPrefixes": [f"{data_s3_prefix}"] # you can use this if you want to create a KB using data within s3 prefixes.
    }
# Create a DataSource in KnowledgeBase 
create_ds_response = bedrock_agent_client.create_data_source(
    name = f'{name}-{data_s3_bucket}',
    description = description,
    knowledgeBaseId = rds_kb['knowledgeBaseId'],
    dataSourceConfiguration = {
        "type": "S3",
        "s3Configuration":s3Configuration
    },
    vectorIngestionConfiguration = {
        "chunkingConfiguration": chunkingStrategyConfiguration
    }
)
ds = create_ds_response["dataSource"]

ds

## Step-15: Start Ingestion Job for Amazon Bedrock Knowledge base pointing to Amazon Aurora PgVector table
- To generate vector embeddings for data in Amazon S3

In [None]:
ingest_jobs=[]
# Start an ingestion job
try:
    start_job_response = bedrock_agent_client.start_ingestion_job(knowledgeBaseId = rds_kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])
    job = start_job_response["ingestionJob"]
    print(f"ingestion job started successfully\n")

    while(job['status']!='COMPLETE' ):
        get_job_response = bedrock_agent_client.get_ingestion_job(
          knowledgeBaseId = rds_kb['knowledgeBaseId'],
            dataSourceId = ds["dataSourceId"],
            ingestionJobId = job["ingestionJobId"]
        )
        job = get_job_response["ingestionJob"]

    time.sleep(30)
    print(f"ingestion job completed successfully\n")

except Exception as e:
    print(f"Couldn't start ingestion job.\n")
    print(e)

## Step-16: Retrieve Data from Amazon OpenSearch Vector Database's Index

In [None]:
query = "What is Amazon's doing in the field of generative AI?"

relevant_documents_os = bedrock_agent_runtime_client.retrieve(
    retrievalQuery= {
        'text': query
    },
    knowledgeBaseId=kb['knowledgeBaseId'],
    retrievalConfiguration= {
        'vectorSearchConfiguration': {
            'numberOfResults': 3 # will fetch top 3 documents which matches closely with the query.
        }
    }
)

relevant_documents_os["retrievalResults"]

## Step-17: Retrieve Data from Amazon Aurora PG vector table

In [None]:
query = "What is Amazon's doing in the field of generative AI?"

relevant_documents_rds = bedrock_agent_runtime_client.retrieve(
    retrievalQuery= {
        'text': query
    },
    knowledgeBaseId=rds_kb['knowledgeBaseId'],
    retrievalConfiguration= {
        'vectorSearchConfiguration': {
            'numberOfResults': 3 # will fetch top 3 documents which matches closely with the query.
        }
    }
)

relevant_documents_rds["retrievalResults"]