# Amazon Bedrock Knowledge Bases - End to end example using multiple data source(s)

This notebook provides sample code for building an end-to-end example for building a RAG application using Amazon Bedrock Knowledge Bases and ingest documents into the index from various data sources (S3, Confluence, Sharepoint, Salesforce, and Web). Please note that you can add upto 5 data sources.


#### Notebook Walkthrough

A data pipeline that ingests documents (typically stored in multiple data sources) into a knowledge base i.e. a vector database such as Amazon OpenSearch Service Serverless (AOSS) so that it is available for lookup when a question is received.

- Load the documents into the knowledge base by connecting various data sources (S3, Confluence, Sharepoint, Salesforce, and Web). 
- Ingestion - Knowledge base will split them into smaller chunks (based on the strategy selected), generate embeddings and store it in the associated vectore store.

<!-- ![data_ingestion.png](./images/data_ingestion.png) -->
<img src="./images/data_ingestion.png" width=50% height=20% />


#### Steps: 
- Create Knowledge Base execution role with necessary policies for accessing data from various data sources (S3, Confluence, Sharepoint, Salesforce, and Web) and writing embeddings into OSS.
- Create an empty OpenSearch serverless index.
- Pre-requisite: 
    - For S3 , create s3 bucket (if not exists) and upload the data
    - for other data sources - Refer to the pre-requisites for corresponding [AWS documentation page](https://docs.aws.amazon.com/bedrock/latest/userguide/data-source-connectors.html)
- Create knowledge base
- Create data source(s) within knowledge base
- For each data source, start ingestion jobs using KB APIs which will read data from the data source, chunk it, convert chunks into embeddings using Amazon Titan Embeddings model and then store these embeddings in AOSS. All of this without having to build, deploy and manage the data pipeline.

Once the data is available in the Bedrock Knowledge Base then a question answering application can be built using the Knowledge Base APIs provided by Amazon Bedrock.



<div class="alert alert-block alert-info">
<b>Note:</b> Please make sure to enable `amazon.nova-micro-v1:0`, `Anthropic Claude 3 Sonnet`, `amazon.titan-text-express-v1`, `anthropic.claude-3-haiku-20240307-v1:0` and,  `Titan Text Embeddings V2` model access in Amazon Bedrock Console.
<br> -------------------------------------------------------------------------------------------------------------------------------------------------------   </br>
    
Please run the notebook cell by cell instead of using "Run All Cells" option.
</div>


## Setup
Before running the rest of this notebook, you'll need to run the cells below to (ensure necessary libraries are installed and) connect to Bedrock.

In [1]:
%pip install --upgrade pip --quiet
%pip install -r ../requirements.txt --no-deps --quiet
%pip install -r ../requirements.txt --upgrade --quiet

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [3]:
import warnings
warnings.filterwarnings('ignore')

In [4]:
import os
import sys
import time
import boto3
import logging
import pprint
import json

# Set the path to import module
from pathlib import Path
current_path = Path().resolve()
current_path = current_path.parent
if str(current_path) not in sys.path:
    sys.path.append(str(current_path))
# Print sys.path to verify
# print(sys.path)

from utils.knowledge_base import BedrockKnowledgeBase

In [5]:
#Clients
s3_client = boto3.client('s3')
sts_client = boto3.client('sts')
session = boto3.session.Session()
region =  session.region_name
account_id = sts_client.get_caller_identity()["Account"]
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime') 
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
region, account_id

('us-west-2', '183631345587')

In [6]:
import time

# Get the current timestamp
current_time = time.time()

# Format the timestamp as a string
timestamp_str = time.strftime("%Y%m%d%H%M%S", time.localtime(current_time))[-7:]
# Create the suffix using the timestamp
suffix = f"{timestamp_str}"

knowledge_base_name = f"bedrock-sample-knowledge-base-{suffix}"
knowledge_base_description = "Multi data source knowledge base."

bucket_name = f'{knowledge_base_name}-{account_id}'
intermediate_bucket_name = f'{knowledge_base_name}-intermediate-{account_id}'
foundation_model = "anthropic.claude-3-sonnet-20240229-v1:0"

In [7]:
print(boto3.__version__)

1.42.2


### Now you can add multiple and different data sources (S3, Confluence, Sharepoint, Salesforce, Web Crawler) to a Knowledge Base. For this notebook, we'll test Knowledge Base creation with multiple and different data sources.

Each data source may have different pre-requisites, please refer to the AWS documetation for more information.

In [8]:
# For this notebook, we'll create Knowledge Base with multiple data sources ( 1 S3 bucket, 1 confluence page, 1 Sharepoint site, 1 Salesforce site, 1 Web Crawler)

data_bucket_name = f'bedrock-kb-{suffix}-1' # replace it with your first bucket name.

## Below is a list of data sources including, 1 S3 buckets, 1 confluence, 1 Sharepoint, 1 Salesforce connectors
## Please uncomment the data sources that you want to add and update the placeholder values accordingly.

data_sources=[
                {"type": "S3", "bucket_name": data_bucket_name}, 
                
                # {"type": "CONFLUENCE", "hostUrl": "https://example.atlassian.net", "authType": "BASIC",
                #  "credentialsSecretArn": f"arn:aws::secretsmanager:{region_name}:secret:<<your_secret_name>>"},

                # {"type": "SHAREPOINT", "tenantId": "888d0b57-69f1-4fb8-957f-e1f0bedf64de", "domain": "yourdomain",
                #   "authType": "OAUTH2_CLIENT_CREDENTIALS",
                #  "credentialsSecretArn": f"arn:aws::secretsmanager:{region_name}:secret:<<your_secret_name>>",
                #  "siteUrls": ["https://yourdomain.sharepoint.com/sites/mysite"]
                # },

                # {"type": "SALESFORCE", "hostUrl": "https://company.salesforce.com/", "authType": "OAUTH2_CLIENT_CREDENTIALS",
                #  "credentialsSecretArn": f"arn:aws::secretsmanager:{region_name}:secret:<<your_secret_name>>"
                # },

                # {"type": "WEB", "seedUrls": [{ "url": "https://www.examplesite.com"}],
                #  "inclusionFilters": ["https://www\.examplesite\.com/.*\.html"],
                #  "exclusionFilters": ["https://www\.examplesite\.com/contact-us\.html"]
                # }
            ]
                
pp = pprint.PrettyPrinter(indent=2)

## Create Knowledge Base

In [9]:
knowledge_base = BedrockKnowledgeBase(
    kb_name=f'{knowledge_base_name}',
    kb_description=knowledge_base_description,
    data_sources=data_sources,
    chunking_strategy = "FIXED_SIZE", 
    suffix = f'{suffix}-f'
)

Step 1 - Creating or retrieving S3 bucket(s) for Knowledge Base documents
['bedrock-kb-4113145-1']
buckets_to_check:  ['bedrock-kb-4113145-1']
Creating bucket bedrock-kb-4113145-1
Step 2 - Creating Knowledge Base Execution Role (AmazonBedrockExecutionRoleForKnowledgeBase_4113145-f) and Policies
Step 3a - Creating OSS encryption, network and data access policies
Step 3b - Creating OSS Collection (this step takes a couple of minutes to complete)
{ 'ResponseMetadata': { 'HTTPHeaders': { 'connection': 'keep-alive',
                                         'content-length': '320',
                                         'content-type': 'application/x-amz-json-1.0',
                                         'date': 'Thu, 04 Dec 2025 11:31:48 '
                                                 'GMT',
                                         'x-amzn-requestid': '2db76b78-2175-4d0e-8930-a9dbdcd9e830'},
                        'HTTPStatusCode': 200,
                        'RequestId': '2db76b78-

[2025-12-04 11:33:19,626] p1684 {base.py:258} INFO - PUT https://vbb3kl43y0rxux3k6yrh.us-west-2.aoss.amazonaws.com:443/bedrock-sample-rag-index-4113145-f [status:200 request:0.449s]



Creating index:
{ 'acknowledged': True,
  'index': 'bedrock-sample-rag-index-4113145-f',
  'shards_acknowledged': True}
Step 4 - Will create Lambda Function if chunking strategy selected as CUSTOM
Not creating lambda function as chunking strategy is FIXED_SIZE
Step 5 - Creating Knowledge Base
{ 'createdAt': datetime.datetime(2025, 12, 4, 11, 34, 19, 746267, tzinfo=tzlocal()),
  'description': 'Multi data source knowledge base.',
  'knowledgeBaseArn': 'arn:aws:bedrock:us-west-2:183631345587:knowledge-base/NJI6M4CD7I',
  'knowledgeBaseConfiguration': { 'type': 'VECTOR',
                                  'vectorKnowledgeBaseConfiguration': { 'embeddingModelArn': 'arn:aws:bedrock:us-west-2::foundation-model/amazon.titan-embed-text-v2:0'}},
  'knowledgeBaseId': 'NJI6M4CD7I',
  'name': 'bedrock-sample-knowledge-base-4113145',
  'roleArn': 'arn:aws:iam::183631345587:role/AmazonBedrockExecutionRoleForKnowledgeBase_4113145-f',
  'status': 'CREATING',
  'storageConfiguration': { 'opensearchServ

### Download data to ingest into our knowledge base.
We'll use the following data:
 - sythetic data stored in a local directory as first data source

#### Upload data to S3 Bucket data source

In [10]:
def upload_directory(path, bucket_name):
        for root,dirs,files in os.walk(path):
            for file in files:
                file_to_upload = os.path.join(root,file)
                print(f"uploading file {file_to_upload} to {bucket_name}")
                s3_client.upload_file(file_to_upload,bucket_name,file)

upload_directory("../synthetic_dataset", data_bucket_name)

uploading file ../synthetic_dataset/bda.m4v to bedrock-kb-4113145-1
uploading file ../synthetic_dataset/NOTICE to bedrock-kb-4113145-1
uploading file ../synthetic_dataset/podcastdemo.mp3 to bedrock-kb-4113145-1
uploading file ../synthetic_dataset/README.md to bedrock-kb-4113145-1
uploading file ../synthetic_dataset/LICENSE to bedrock-kb-4113145-1
uploading file ../synthetic_dataset/octank_financial_10K.pdf to bedrock-kb-4113145-1


### Start ingestion job
Once the KB and data source(s) created, we can start the ingestion job for each data source.
During the ingestion job, KB will fetch the documents in the data source, pre-process it to extract text, chunk it based on the chunking size provided, create embeddings of each chunk and then write it to the vector database, in this case OSS.

NOTE: Currently, you can only kick-off one ingestion job at one time.

In [11]:
# ensure that the kb is available
time.sleep(30)
# sync knowledge base
knowledge_base.start_ingestion_job()

job 1 started successfully

{ 'dataSourceId': 'ZFCYEMVVO0',
  'failureReasons': [ '["Encountered error: Ignored 1 files as their file '
                      'format was not supported. [Files: '
                      's3://bedrock-kb-4113145-1/podcastdemo.mp3]. Call to '
                      'Customer Source did not succeed.","Encountered error: '
                      'Ignored 1 files as their file format was not supported. '
                      '[Files: s3://bedrock-kb-4113145-1/bda.m4v]. Call to '
                      'Customer Source did not succeed."]'],
  'ingestionJobId': 'SA09NYBRAT',
  'knowledgeBaseId': 'NJI6M4CD7I',
  'startedAt': datetime.datetime(2025, 12, 4, 11, 34, 53, 286828, tzinfo=tzlocal()),
  'statistics': { 'numberOfDocumentsDeleted': 0,
                  'numberOfDocumentsFailed': 2,
                  'numberOfDocumentsScanned': 6,
                  'numberOfMetadataDocumentsModified': 0,
                  'numberOfMetadataDocumentsScanned': 0,
               

In [12]:
# keep the kb_id for invocation later in the invoke request
kb_id = knowledge_base.get_knowledge_base_id()
%store kb_id

'NJI6M4CD7I'
Stored 'kb_id' (str)


### 2.2 Test the Knowledge Base
Now the Knowlegde Base is available we can test it out using the [**retrieve**](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agent-runtime/client/retrieve.html) and [**retrieve_and_generate**](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agent-runtime/client/retrieve_and_generate.html) functions. 

#### Testing Knowledge Base with Retrieve and Generate API

Let's first test the knowledge base using the retrieve and generate API. With this API, Bedrock takes care of retrieving the necessary references from the knowledge base and generating the final answer using a foundation model from Bedrock.

query = `Provide a summary of consolidated statements of cash flows of Octank Financial for the fiscal years ended December 31, 2019.`

The right response for this query as per ground truth QA pair is:
```
The cash flow statement for Octank Financial in the year ended December 31, 2019 reveals the following:
- Cash generated from operating activities amounted to $710 million, which can be attributed to a $700 million profit and non-cash charges such as depreciation and amortization.
- Cash outflow from investing activities totaled $240 million, with major expenditures being the acquisition of property, plant, and equipment ($200 million) and marketable securities ($60 million), partially offset by the sale of property, plant, and equipment ($40 million) and maturing marketable securities ($20 million).
- Financing activities resulted in a cash inflow of $350 million, stemming from the issuance of common stock ($200 million) and long-term debt ($300 million), while common stock repurchases ($50 million) and long-term debt payments ($100 million) reduced the cash inflow.
Overall, Octank Financial experienced a net cash enhancement of $120 million in 2019, bringing their total cash and cash equivalents to $210 million.

In [13]:
query = "Provide a summary of consolidated statements of cash flows of Octank Financial for the fiscal years ended December 31, 2019?"

In [14]:
foundation_model = "anthropic.claude-3-haiku-20240307-v1:0"

response = bedrock_agent_runtime_client.retrieve_and_generate(
    input={
        "text": query
    },
    retrieveAndGenerateConfiguration={
        "type": "KNOWLEDGE_BASE",
        "knowledgeBaseConfiguration": {
            'knowledgeBaseId': kb_id,
            "modelArn": "arn:aws:bedrock:{}::foundation-model/{}".format(region, foundation_model),
            "retrievalConfiguration": {
                "vectorSearchConfiguration": {
                    "numberOfResults":5
                } 
            }
        }
    }
)

print(response['output']['text'],end='\n'*2)

According to the search results, the consolidated statements of cash flows for Octank Financial for the fiscal year ended December 31, 2019 show the following:

- Net cash provided by operating activities was $710 million.
- Net cash used in investing activities was $240 million, primarily due to purchases of property, plant, and equipment and marketable securities.
- Net cash provided by financing activities was $350 million, primarily due to proceeds from issuance of common stock and long-term debt.
- The net increase in cash and cash equivalents for the year was $120 million, with cash and cash equivalents at the end of the year totaling $210 million.



As you can see, with the retrieve and generate API we get the final response directly and we don't see the different sources used to generate this response. Let's now retrieve the source information from the knowledge base with the retrieve API.

#### Testing Knowledge Base with Retrieve API
If you need an extra layer of control, you can retrieve the chuncks that best match your query using the retrieve API. In this setup, we can configure the desired number of results and control the final answer with your own application logic. The API then provides you with the matching content, its S3 location, the similarity score and the chunk metadata.

In [15]:
response_ret = bedrock_agent_runtime_client.retrieve(
    knowledgeBaseId=kb_id, 
    nextToken='string',
    retrievalConfiguration={
        "vectorSearchConfiguration": {
            "numberOfResults":5,
        } 
    },
    retrievalQuery={
        "text": "How many new positions were opened across Amazon's fulfillment and delivery network?"
    }
)

def response_print(retrieve_resp):
#structure 'retrievalResults': list of contents. Each list has content, location, score, metadata
    for num,chunk in enumerate(response_ret['retrievalResults'],1):
        print(f'Chunk {num}: ',chunk['content']['text'],end='\n'*2)
        print(f'Chunk {num} Location: ',chunk['location'],end='\n'*2)
        print(f'Chunk {num} Score: ',chunk['score'],end='\n'*2)
        print(f'Chunk {num} Metadata: ',chunk['metadata'],end='\n'*2)

response_print(response_ret)

Chunk 1:  20) | | Changes in operating assets and liabilities: | | | | | Accounts receivable | ($100) | ($80) | ($60) | | Inventory | ($150) | ($130) | ($110) | | Prepaid expenses and other current assets | $50 | $40 | $30 | | Accounts payable | $100 | $80 | $60 | | Accrued liabilities and other | $80 | $60 | $40 | | Net cash provided by operating activities | $1,100 | $880 | $710 | | **Cash flows from investing activities:** | | | | | Purchases of property, plant, and equipment | ($300) | ($250) | ($200) | | Proceeds from sales of property, plant, and equipment | 20 | 30 | 40 | | Purchases of marketable securities | ($100) | ($80) | ($60) | | Maturing marketable securities | 40 | 30 | 20 | | Net cash used in investing activities | ($360) | ($290) | ($240) | | **Cash flows from financing activities:** | | | | | Proceeds from issuance of common stock | 400 | 300 | 200 | | Repurchase of common stock | ($150) | ($100) | ($50) | | Proceeds from issuance of long-term debt | 500 | 400 | 300 

### Clean up
Please make sure to uncomment and run the below section to delete all the resources.

In [16]:
# delete role and policies
print("===============================Deleting Knowledge Base and associated resources==============================\n")
# knowledge_base.delete_kb(delete_s3_bucket=True, delete_iam_roles_and_policies=True)


