### 1.  Create a OpenSearch Collection

In [7]:

import boto3
import random
import json

suffix = random.randrange(200, 900)
print(suffix)


382


In [8]:
boto3_session = boto3.session.Session()

region_name = boto3_session.region_name
iam_client = boto3_session.client('iam')
account_number = boto3.client('sts').get_caller_identity().get('Account')
identity = boto3.client('sts').get_caller_identity()['Arn']

In [9]:
def create_execution_role():
    execution_role_name = f'ExecutionRoleForKnowledgeBase_{suffix}'
    
    s3_policy_arn = 'arn:aws:iam::aws:policy/AmazonS3FullAccess'
    sm_policy_arn = 'arn:aws:iam::aws:policy/AmazonSageMakerFullAccess'

    assume_role_policy_document = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": "bedrock.amazonaws.com"
                },
                "Action": "sts:AssumeRole"
            }
        ]
    }

    # create bedrock execution role
    bedrock_kb_execution_role = iam_client.create_role(
        RoleName=execution_role_name,
        AssumeRolePolicyDocument=json.dumps(assume_role_policy_document),
        Description='Amazon Bedrock Knowledge Base Execution Role for accessing OSS and S3',
        MaxSessionDuration=3600
    )

    # attach policies to Amazon Bedrock execution role
    iam_client.attach_role_policy(
        RoleName=bedrock_kb_execution_role["Role"]["RoleName"],
        PolicyArn=s3_policy_arn
    )
    iam_client.attach_role_policy(
        RoleName=bedrock_kb_execution_role["Role"]["RoleName"],
        PolicyArn=sm_policy_arn
    )
    return bedrock_kb_execution_role

kb_execution_role = create_execution_role()
kb_execution_role_arn = kb_execution_role['Role']['Arn']

In [10]:
def create_policies_in_oss(collection_name, aoss_client, exec_role_arn):
    
    encryption_policy_name = f"sample-rag-sp-{suffix}"
    network_policy_name = f"sample-rag-np-{suffix}"
    access_policy_name = f'sample-rag-ap-{suffix}'
    
    identity = boto3.client('sts').get_caller_identity()['Arn']

    encryption_policy = aoss_client.create_security_policy(
        name=encryption_policy_name,
        policy=json.dumps(
            {
                'Rules': [{'Resource': ['collection/' + collection_name],
                           'ResourceType': 'collection'}],
                'AWSOwnedKey': True
            }),
        type='encryption'
    )

    network_policy = aoss_client.create_security_policy(
        name=network_policy_name,
        policy=json.dumps(
            [
                {'Rules': [{'Resource': ['collection/' + collection_name],
                            'ResourceType': 'collection'}],
                 'AllowFromPublic': True}
            ]),
        type='network'
    )
    access_policy = aoss_client.create_access_policy(
        name=access_policy_name,
        policy=json.dumps(
            [
                {
                    'Rules': [
                        {
                            'Resource': ['collection/' + collection_name],
                            'Permission': [
                                'aoss:CreateCollectionItems',
                                'aoss:DeleteCollectionItems',
                                'aoss:UpdateCollectionItems',
                                'aoss:DescribeCollectionItems'],
                            'ResourceType': 'collection'
                        },
                        {
                            'Resource': ['index/' + collection_name + '/*'],
                            'Permission': [
                                'aoss:CreateIndex',
                                'aoss:DeleteIndex',
                                'aoss:UpdateIndex',
                                'aoss:DescribeIndex',
                                'aoss:ReadDocument',
                                'aoss:WriteDocument'],
                            'ResourceType': 'index'
                        }],
                    'Principal': [identity, exec_role_arn],
                    'Description': 'Easy data policy'}
            ]),
        type='data'
    )
    return encryption_policy, network_policy, access_policy

In [11]:
collection_name = f'kb-collection-{suffix}'

aoss_client = boto3_session.client('opensearchserverless')

encryption_policy, network_policy, access_policy = create_policies_in_oss(collection_name=collection_name,
                                                                        aoss_client=aoss_client,
                                                                        exec_role_arn = kb_execution_role_arn)

collection = aoss_client.create_collection(name=collection_name,type='VECTORSEARCH')

In [12]:
collection_id = collection['createCollectionDetail']['id']
host = collection_id + '.' + region_name + '.aoss.amazonaws.com'
print(host)

mydzqae4izivbnn8kmv9.us-east-1.aoss.amazonaws.com


In [13]:
def create_oss_policy_attach_execution_role(collection_id, exec_role):
    oss_policy_name = f'AmazonOSSPolicyForKnowledgeBase_{suffix}'

    # define oss policy document
    oss_policy_document = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "aoss:APIAccessAll"
                ],
                "Resource": [
                    f"arn:aws:aoss:{region_name}:{account_number}:collection/{collection_id}"
                ]
            }
        ]
    }
    oss_policy = iam_client.create_policy(
        PolicyName=oss_policy_name,
        PolicyDocument=json.dumps(oss_policy_document),
        Description='Policy for accessing opensearch serverless',
    )
    oss_policy_arn = oss_policy["Policy"]["Arn"]
    print("Opensearch serverless arn: ", oss_policy_arn)

    iam_client.attach_role_policy(
        RoleName=exec_role["Role"]["RoleName"],
        PolicyArn=oss_policy_arn
    )
    return None

create_oss_policy_attach_execution_role(collection_id=collection_id,
                                                exec_role= kb_execution_role)

Opensearch serverless arn:  arn:aws:iam::924155096146:policy/AmazonOSSPolicyForKnowledgeBase_382


### 2.  Create a OpenSearch Index

In [14]:
import json
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth

# Build the OpenSearch client
region_name = "us-east-1"
service = "aoss"
credentials = boto3.Session().get_credentials()
awsauth = auth = AWSV4SignerAuth(credentials, region_name, service)
oss_client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)

# Set properies for the Index
body_json = {
   "settings": {
      "index.knn": "true"
   },
   "mappings": {
      "properties": {
         "vector": {
            "type": "knn_vector",
            "dimension": 1536
         },
         "text": {
            "type": "text"
         },
         "text-metadata": {
            "type": "text"         }
      }
   }
}

# Create Index
index_name = f"kb-index-{suffix}"
response = oss_client.indices.create(index=index_name, body=json.dumps(body_json))


### 4.  Create a DataSource in KnowledgeBase 

In [None]:
import os
bucket_name = f"kbbucket{suffix}"
s3 = boto3.resource('s3')

s3.create_bucket(Bucket=bucket_name)

!mkdir -p ./data

from urllib.request import urlretrieve
urls = [
    'https://s2.q4cdn.com/299287126/files/doc_financials/2023/ar/2022-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2022/ar/2021-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2021/ar/Amazon-2020-Shareholder-Letter-and-1997-Shareholder-Letter.pdf',
    'https://s2.q4cdn.com/299287126/files/doc_financials/2020/ar/2019-Shareholder-Letter.pdf'
]

filenames = [
    'AMZN-2022-Shareholder-Letter.pdf',
    'AMZN-2021-Shareholder-Letter.pdf',
    'AMZN-2020-Shareholder-Letter.pdf',
    'AMZN-2019-Shareholder-Letter.pdf'
]

data_root = "./data/"

for idx, url in enumerate(urls):
    file_path = data_root + filenames[idx]
    urlretrieve(url, file_path)
    
s3_client = boto3.client("s3")
def uploadDirectory(path,bucket_name):
        for root,dirs,files in os.walk(path):
            for file in files:
                s3_client.upload_file(os.path.join(root,file),bucket_name,file)

uploadDirectory(data_root, bucket_name)

### 5. Load data in to Open Search

In [None]:
embedding_endpoint_name = ''

In [None]:
def query_endpoint_with_json_payload(encoded_json, endpoint_name, content_type="application/json"):
    client = boto3.client("runtime.sagemaker")
    response = client.invoke_endpoint(
        EndpointName=endpoint_name, ContentType=content_type, Body=encoded_json
    )
    #print(response)
    response_json = json.loads(response['Body'].read().decode("utf-8"))
    embeddings = response_json["embedding"]
    if len(embeddings) == 1:
        return [embeddings[0]]
    return embeddings

In [None]:
def embed_query(input_data):
    input_str = json.dumps({"text_inputs": input_data})
    encoded_input_str = input_str.encode("utf-8")
    features = query_endpoint_with_json_payload(encoded_input_str,embedding_endpoint_name)
    return features

In [2]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import S3DirectoryLoader

bucket = 'bedrockkbbucket705'
key = 'AMZN-2022-Shareholder-Letter.pdf'

## Load the docs
loader = S3DirectoryLoader(bucket)
docs = loader.load()

## Split in to chunks
text_splitter = RecursiveCharacterTextSplitter(
    # Set a really small chunk size, just to show.
    chunk_size = 1000,
    chunk_overlap  = 100,
)
chunks = text_splitter.split_documents(docs)

## convert in to vector embeddings and load to opensearch
for chunk in chunks:
    # The text data of each chunk
    exampleContent = chunk.page_content
    # Generating the embeddings for each chunk of text data
    text = json.dumps({"inputText": exampleContent})
    vectors = embed_query(text)
    
    indexDocument = {'vector': vectors,'text': text}
    
    response = aoss_client.index(
        index=index_name,
        body=indexDocument,
        refresh=False
    )

### 6.  Retreive data from Knowledge Base

In [None]:
generation_endpoint_name = ''

In [None]:
def query_docs(query: str, k: int = 3):
    """
    Convert the query into embedding and then find similar documents from AOSS
    """

    # embedding
    query_embedding = embed_query(query)

    # query to lookup OpenSearch kNN vector. Can add any metadata fields based filtering
    # here as part of this query.
    query_qna = {
        "size": k,
        "query": {
            "knn": {
            "vectors": {
                "vector": query_embedding,
                "k": k
                }
            }
        }
    }

    # OpenSearch API call
    relevant_documents = aoss_client.search(
        body = query_qna,
        index = index_name
    )
    return relevant_documents

In [None]:
def create_context_for_query(q: str) -> str:
    """
    Create a context out of the similar docs retrieved from the vector database
    by concatenating the text from the similar documents.
    """
    print(f"query -> {q}")
    aoss_response = query_docs(q)
    context = ""
    for r in aoss_response['hits']['hits']:
        s = r['_source']
        context += f"{s['text']}\n"
    return context

In [None]:
query = "What is Amazon's doing in the field of generative AI?"

context = create_context_for_query(query)

PROMPT_TEMPLATE = """

Answer the question asked in the <question> tag based only on the context provided in <context> tags. Do not include any preamble in your answer.
<context>
{}
</context>

<question>
{}
</question>
"""

prompt = PROMPT_TEMPLATE.format(context, query)

print(prompt)

generated_text = response['output']['text']
print(generated_text)