# Creating Knowledge Base for RAG-Powered App

The following outlines the process of creating a data pipeline that ingests documents, typically stored in Amazon S3, into a knowledge base such as a vector database like Amazon OpenSearch Service Serverless (AOSS). This setup ensures that the documents are available for lookup when a query is received.


## Data Ingestion Workflow

1. **Create Amazon Bedrock Knowledge Base Execution Role**
   - Set up a role with necessary policies to access data from your S3 bucket and write embeddings into Amazon OpenSearch Service Serverless (AOSS). AOSS is a vector database.

2. **Create an Empty OpenSearch Serverless Index**
   - Initialize an index within Amazon OpenSearch Service Serverless where embeddings will be stored.

3. **Download Documents**
   - Retrieve documents from their source location (e.g., S3 bucket) to prepare for ingestion.

4. **Create Amazon Bedrock Knowledge Base**
   - Establish a knowledge base using Amazon Bedrock to facilitate intelligent data handling and retrieval.

5. **Set Up Data Source in Knowledge Base**
   - Configure a data source within the knowledge base to connect directly to your Amazon S3 repository.

6. **Initiate Ingestion Job Using KB APIs**
   - Utilize Amazon Bedrock APIs to start an ingestion job:
     - Retrieve data from S3, divide it into manageable chunks.
     - Convert these chunks into embeddings using Amazon Titan Embeddings model.
     - Store the embeddings seamlessly in Amazon OpenSearch Service Serverless (AOSS).

7. **Build Question Answering Application**
   - Once data is integrated into the Bedrock Knowledge Base, leverage the provided Knowledge Base APIs to construct a question answering application.
   - Use accompanying notebooks in this folder to facilitate application development.

## Prerequisites - Permissions and prerequisites set up:

1. **Amazon IAM Permissions**:
   - Permissions to create and delete IAM roles on AWS.
   
2. **Amazon S3 Permissions**:
   - Permissions to create, update, and delete S3 buckets are required.
   
3. **Amazon Bedrock Access**:
   - Ensure access to Amazon Bedrock for creating knowledge bases and managing data ingestion.
   
4. **Amazon OpenSearch Service Serverless (AOSS) Access**:
   - Permissions to access Amazon OpenSearch Service Serverless are necessary for creating and updating indexes.

In SageMaker Studio, add the following managed policies to your role: 
1. IAMFullAccess
2. AWSLambda_FullAccess
3. AmazonS3FullAccess
4. AmazonBedrockFullAccess
5. Custom policy for Amazon OpenSearch Serverless such as:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "aoss:*",
            "Resource": "*"
        }
    ]
}

## Setup to connect with Amazon Bedrock

In [None]:
%pip install -U opensearch-py==2.3.1
%pip install -U boto3==1.33.2
%pip install -U retrying==1.3.4

In [None]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
import json
import os
import boto3
from botocore.exceptions import ClientError
import pprint
from utility import create_bedrock_execution_role, create_oss_policy_attach_bedrock_execution_role, create_policies_in_oss, interactive_sleep
import random
from retrying import retry

In [None]:
suffix = random.randrange(200, 900)

sts_client = boto3.client('sts')
boto3_session = boto3.session.Session()
region_name = boto3_session.region_name
bedrock_agent_client = boto3_session.client('bedrock-agent', region_name=region_name)
service = 'aoss'
s3_client = boto3.client('s3')
account_id = sts_client.get_caller_identity()["Account"]
s3_suffix = f"{region_name}-{account_id}"
bucket_name = f'bedrock-kb-{s3_suffix}' # replace with the bucket name.
pp = pprint.PrettyPrinter(indent=2)

In [None]:
# Check if bucket exists, and if not create S3 bucket for knowledge base data source
try:
    s3_client.head_bucket(Bucket=bucket_name)
    print(f'Bucket {bucket_name} Exists')
except ClientError as e:
    print(f'Creating bucket {bucket_name}')
    s3bucket = s3_client.create_bucket(
        Bucket=bucket_name,
        CreateBucketConfiguration={ 'LocationConstraint': region_name }
    )

## Create a vector store - OpenSearch Serverless index
### Step 1 - Create OSS policies and collection

OpenSearch Serverless enables effortless searching and analysis of extensive datasets, eliminating concerns about infrastructure and data administration.

An OpenSearch Serverless collection comprises interconnected OpenSearch indexes designed to accommodate specific workloads or use cases.

Make sure to clean up this service after using as you will be incurred high costs for storing documents in OSS index.

In [None]:
import boto3
import time
vector_store_name = f'bedrock-sample-rag-{suffix}'
index_name = f"bedrock-sample-rag-index-{suffix}"
aoss_client = boto3_session.client('opensearchserverless')
bedrock_kb_execution_role = create_bedrock_execution_role(bucket_name=bucket_name)
bedrock_kb_execution_role_arn = bedrock_kb_execution_role['Role']['Arn']

In [None]:
# create security, network and data access policies within OSS
encryption_policy, network_policy, access_policy = create_policies_in_oss(vector_store_name=vector_store_name,
                       aoss_client=aoss_client,
                       bedrock_kb_execution_role_arn=bedrock_kb_execution_role_arn)
collection = aoss_client.create_collection(name=vector_store_name,type='VECTORSEARCH')

In [None]:
# Get the OpenSearch serverless collection URL
collection_id = collection['createCollectionDetail']['id']
host = collection_id + '.' + region_name + '.aoss.amazonaws.com'
print(host)

In [None]:
# wait for collection creation
# This can take couple of minutes to finish
response = aoss_client.batch_get_collection(names=[vector_store_name])
# Periodically check collection status
while (response['collectionDetails'][0]['status']) == 'CREATING':
    print('Creating collection...')
    interactive_sleep(30)
    response = aoss_client.batch_get_collection(names=[vector_store_name])
print('\nCollection successfully created:')
pp.pprint(response["collectionDetails"])


In [None]:
# create opensearch serverless access policy and attach it to Bedrock execution role
try:
    create_oss_policy_attach_bedrock_execution_role(collection_id=collection_id,
                                                    bedrock_kb_execution_role=bedrock_kb_execution_role)
    # It can take up to a minute for data access rules to be enforced
    interactive_sleep(60)
except Exception as e:
    print("Policy already exists")
    pp.pprint(e)


#### Step 2: Create Vector Index

In [None]:
# Create the vector index in Opensearch serverless, with the knn_vector field index mapping, specifying the dimension size, name and engine.
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth, RequestError
credentials = boto3.Session().get_credentials()
awsauth = auth = AWSV4SignerAuth(credentials, region_name, service)

index_name = f"bedrock-sample-index-{suffix}"
body_json = {
   "settings": {
      "index.knn": "true",
       "number_of_shards": 1,
       "knn.algo_param.ef_search": 512,
       "number_of_replicas": 0,
   },
   "mappings": {
      "properties": {
         "vector": {
            "type": "knn_vector",
            "dimension": 1536,
             "method": {
                 "name": "hnsw",
                 "engine": "faiss",
                 "space_type": "l2"
             },
         },
         "text": {
            "type": "text"
         },
         "text-metadata": {
            "type": "text"         }
      }
   }
}

# Build the OpenSearch client
oss_client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)

In [None]:
# Create index
try:
    response = oss_client.indices.create(index=index_name, body=json.dumps(body_json))
    print('\nCreating index:')
    pp.pprint(response)

    # index creation can take up to a minute
    interactive_sleep(60)
except RequestError as e:
    # you can delete the index if its already exists
    # oss_client.indices.delete(index=index_name)
    print(f'Error while trying to create the index, with error {e.error}\nyou may unmark the delete above to delete, and recreate the index')
    

##### Download data to ingest into our knowledge base

In [None]:
# Download and prepare dataset
!mkdir -p ./data

from urllib.request import urlretrieve
urls = [
    'https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-ug.pdf',
    'https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-userguide.pdf',
    'https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/dynamodb-dg.pdf.pdf',
    'https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/index.html.pdf',
    'https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/aurora-ug.pdf',
    'https://docs.aws.amazon.com/lambda/latest/dg/lambda-dg.pdf',
    'https://docs.aws.amazon.com/vpc/latest/userguide/vpc-ug.pdf',
    'https://docs.aws.amazon.com/athena/latest/ug/athena-ug.pdf',
    'https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-dg.pdf',
    'https://docs.aws.amazon.com/bedrock/latest/userguide/bedrock-ug.pdf',
    'https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AmazonCloudFront_DevGuide.pdf',
    'https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/acw-ug.pdf',
    'https://docs.aws.amazon.com/codewhisperer/latest/userguide/user-guide.pdf',
    'https://docs.aws.amazon.com/cognito/latest/developerguide/cognito-dg.pdf#cognito-user-identity-pools',
    'https://docs.aws.amazon.com/documentdb/latest/developerguide/developerguide.pdf',
    'https://docs.aws.amazon.com/pdfs/neptune/latest/userguide/neptune-ug.pdf#intro'

]

filenames = [
    'AmazonEC2UserGuide.pdf',
    'AmazonS3UserGuide.pdf',
    'AmazonDynamodbDeveloperGuide.pdf',
    'AmazonAuroraUserGuide.pdf',
    'AmazonLambda.pdf',
    'AmazonVPC.pdf',
    'AmazonAthena.pdf',
    'AmazonAPIGateway.pdf',
    'AmazonBedrock.pdf',
    'AmazonCloudFront.pdf',
    'AmazonCloudWatch.pdf',
    'Amazoncodewhisperer.pdf',
    'Amazoncognito.pdf',
    'Amazondocumentdb.pdf'
    'neptune.pdf'
]

data_root = "./data/"

for idx, url in enumerate(urls):
    file_path = data_root + filenames[idx]
    urlretrieve(url, file_path)

#### Upload data to S3 Bucket data source

In [None]:
# Upload data to s3 to the bucket that was configured as a data source to the knowledge base
s3_client = boto3.client("s3")
def uploadDirectory(path,bucket_name):
        for root,dirs,files in os.walk(path):
            for file in files:
                s3_client.upload_file(os.path.join(root,file),bucket_name,file)

uploadDirectory(data_root, bucket_name)

### Create Knowledge Base

1. **Configure OpenSearch Serverless**:
   - Set up the configuration parameters including collection ARN, index name, vector field, text field, and metadata field for OpenSearch Serverless.

2. **Define Chunking Strategy**:
   - Specify the chunking strategy configuration to divide documents into chunks based on a predefined chunk size.

3. **Configure S3 Integration**:
   - Establish the S3 configuration settings, which will later be used to create the data source object.

4. **Specify Titan Embeddings Model ARN**:
   - Define the ARN (Amazon Resource Name) for the Titan embeddings model, which will generate embeddings for each text chunk during the ingestion process.

In [None]:
opensearchServerlessConfiguration = {
            "collectionArn": collection["createCollectionDetail"]['arn'],
            "vectorIndexName": index_name,
            "fieldMapping": {
                "vectorField": "vector",
                "textField": "text",
                "metadataField": "text-metadata"
            }
        }

# Ingest strategy - How to ingest data from the data source
chunkingStrategyConfiguration = {
    "chunkingStrategy": "FIXED_SIZE",
    "fixedSizeChunkingConfiguration": {
        "maxTokens": 512,
        "overlapPercentage": 20
    }
}

# The data source to ingest documents from, into the OpenSearch serverless knowledge base index
s3Configuration = {
    "bucketArn": f"arn:aws:s3:::{bucket_name}",
    # "inclusionPrefixes":["*.*"] # you can use this if you want to create a KB using data within s3 prefixes.
}

# The embedding model used by Bedrock to embed ingested documents, and realtime prompts
embeddingModelArn = f"arn:aws:bedrock:{region_name}::foundation-model/amazon.titan-embed-text-v1"

name = f"bedrock-sample-knowledge-base-{suffix}"
description = "Amazon shareholder letter knowledge base."
roleArn = bedrock_kb_execution_role_arn

Next, provide the opensearchServerlessConfiguration, chunkingStrategyConfiguration, s3Configuration as input to the create_knowledge_base method, which will create the Knowledge base.

# Create a KnowledgeBase
from retrying import retry

@retry(wait_random_min=1000, wait_random_max=2000,stop_max_attempt_number=7)
def create_knowledge_base_func():
    create_kb_response = bedrock_agent_client.create_knowledge_base(
        name = name,
        description = description,
        roleArn = roleArn,
        knowledgeBaseConfiguration = {
            "type": "VECTOR",
            "vectorKnowledgeBaseConfiguration": {
                "embeddingModelArn": embeddingModelArn
            }
        },
        storageConfiguration = {
            "type": "OPENSEARCH_SERVERLESS",
            "opensearchServerlessConfiguration":opensearchServerlessConfiguration
        }
    )
    return create_kb_response["knowledgeBase"]

In [None]:
try:
    kb = create_knowledge_base_func()
except Exception as err:
    print(f"{err=}, {type(err)=}")
pp.pprint(kb)

In [None]:
# Get KnowledgeBase 
get_kb_response = bedrock_agent_client.get_knowledge_base(knowledgeBaseId = kb['knowledgeBaseId'])

Create a data source that links to the previously established knowledge base. Once the data source is configured, we can proceed with ingesting the documents.

In [None]:
# Create a DataSource in KnowledgeBase 
create_ds_response = bedrock_agent_client.create_data_source(
    name = name,
    description = description,
    knowledgeBaseId = kb['knowledgeBaseId'],
    dataSourceConfiguration = {
        "type": "S3",
        "s3Configuration":s3Configuration
    },
    vectorIngestionConfiguration = {
        "chunkingConfiguration": chunkingStrategyConfiguration
    }
)
ds = create_ds_response["dataSource"]
pp.pprint(ds)

In [None]:
# Get DataSource 
bedrock_agent_client.get_data_source(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

To initiate the ingestion process, we'll start the job associated with the knowledge base (KB) and data source we created earlier. During this phase, the KB will retrieve documents from the specified data source, preprocess them to extract text, segment them into chunks according to the defined chunking size, generate embeddings for each chunk using the Titan embeddings model, and finally, store these embeddings in the vector database, specifically in OSS (OpenSearch Service Serverless).

In [None]:
# Start an ingestion job
start_job_response = bedrock_agent_client.start_ingestion_job(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

In [None]:
job = start_job_response["ingestionJob"]
pp.pprint(job)

In [None]:
# Get job 
while(job['status']!='COMPLETE' ):
    get_job_response = bedrock_agent_client.get_ingestion_job(
      knowledgeBaseId = kb['knowledgeBaseId'],
        dataSourceId = ds["dataSourceId"],
        ingestionJobId = job["ingestionJobId"]
  )
    job = get_job_response["ingestionJob"]
    
    interactive_sleep(30)

pp.pprint(job)

In [None]:
# Print the knowledge base Id in bedrock, that corresponds to the Opensearch index in the collection we created before, we will use it for the invocation later
kb_id = kb["knowledgeBaseId"]
pp.pprint(kb_id)

In [None]:
# keep the kb_id for invocation later in the invoke request
%store kb_id