# Creating Multimodal RAG using Amazon Bedrock Knowledge Bases

Amazon Bedrock Knowledge Bases leverage Retrieval Augmented Generation (RAG), a technique that harnesses customer data stores to enhance responses generated by foundation models. Knowledge bases allow agents to access existing customer data repositories without extensive administrator overhead. To connect a knowledge base to your data, you specify an S3 bucket as the data source. By employing knowledge bases, applications gain enriched contextual information, streamlining development through a fully-managed RAG solution. This level of abstraction accelerates time-to-market by minimizing the effort of incorporating your data into agent functionality and it optimizes cost by negating the necessity for continuous model retraining to leverage private data.
Knowledge Base Preparation

## Use Case Description

We will imagine that we are a Customer Support Agent for AnyCompany Telecom who helps resolve customer issues. We have an AI assistant that we can use to provide quick resolutions to customer queries, ensuring a strong customer experience. This AI asistant uses RAG to retrieve relevant information to resolve connectivity issues, product questions, and more.

For this, we would need a RAG System that has these relevant information about a particular hotel. For this exercise, we will will load some documents which have some information about the hotel. This information will include not just text but also other charts, graph information.


![mages/mm-rag.png](../images/mm-rag.png)


### Setup

In [None]:
%pip install --upgrade -r ../requirements.txt -q

In [57]:
#Install AWS and API tools
import boto3
import sagemaker
from botocore.exceptions import ClientError

# Data handling tools
import json
import base64

# Display and formatting tools
from IPython.display import display, Image
import pprint

# System tools
import os

# Utilities
import random
import time
from retrying import retry

boto3_session = boto3.session.Session()
pp = pprint.PrettyPrinter(indent=2)





sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [58]:
# Get the region from the SageMaker session
region_name = sagemaker.Session().boto_region_name
print(f"Current AWS Region: {region_name}")

Current AWS Region: us-west-2


In [None]:
MICRO_MODEL_ID = "us.amazon.nova-micro-v1:0"
LITE_MODEL_ID = "us.amazon.nova-lite-v1:0"
PRO_MODEL_ID = "us.amazon.nova-pro-v1:0"
PREMIER_MODEL_ID = "us.amazon.nova-premier-v1:0"

In [None]:
account_id = boto3.client("sts").get_caller_identity().get("Account")
s3_client = boto3.client("s3")

bucket_name = f"mmu-workshop-{account_id}"
tmp_bucket_name = f"mmu-workshop-tmp-{account_id}"

r = s3_client.list_buckets(Prefix=bucket_name)
if r["Buckets"][0]["Name"].startswith(bucket_name):
    bucket_name = r["Buckets"][0]["Name"]
    s3_client.put_object(Bucket=bucket_name, Key="mm-data/")
    print(f"Successfully created mm-data/ folder in {bucket_name}")
    print(f"S3 URI for Data Source: s3://{bucket_name}/mm-data/")

r = s3_client.list_buckets(Prefix=tmp_bucket_name)
if r["Buckets"][0]["Name"].startswith(tmp_bucket_name):
    tmp_bucket_name = r["Buckets"][0]["Name"]
    print(f"S3 URI for Multimodal Storage: s3://{tmp_bucket_name}")




### 1. Create the KB

**Step 1:** Navigate to the [Amazon Bedrock > Knowledge base > Create knowledge](https://console.aws.amazon.com/bedrock/home#/knowledge-bases/create-knowledge-base) base console as shown:

![images/kb/kb-home.png](../images/kb/kb-home.png)

**Step 2**: Next, lets select Create Knowledge Base > Knowledge Base with vector dtore.

**Step 3**: Next, lets provide knowledge base details such as KB name, Description, etc.

In below illustration, we are giving 
- **KB Name**: `knowledge-base-customer-support` and feel free to put some relevent description
- **IAM Role**: Let KB create an IAM role with all needed permissions.
- **DataSource**: Next we choose S3 as our Data source, where we will add some menus to use as RAG data source later in the notebook.

![images/kb/kb-setup.png](../images/kb/kb-setup.png)

**Step 4**: **Configure Data Source**

Now, lets configure the data source (S3, in this case) for this to happen we need to provide details as described below:

- **S3 URI**: The S3 URI where our multimodal files of the dataset are located. Use the `s3://mmu-workshop-*****/mm-data` as bucket and sub-folder path. 
- For **Parsing Strategy**, select **Foundation models as a parser** and then choose **Amazon Nova Lite** as model.

This means that Nova Lite will be used to parse the multimodal content and summarize the images before passing it to generator model, where we will be using Amazon Nova.

![images/kb/kb_config-1.png](../images/kb/kb-config-1.png)


![images/kb/kb_config-2](../images/kb/kb-config-2.png)

<div class="alert alert-block alert-info">
<b>Note: S3 URI for Data Source</b>- ⚠️ For S3 location for Data Source choose the bucket where you will store the multimodal PDF files. If you are running this notebook as part of an AWS event using Workshop Studio, you should see a bucket with similar name mmu-workshop-********  and create a partition in there to separate our data such as `mm-data`
So the overall S3 URI becomes like:

    
  `s3://mmu-workshop-********/mm-data`
<br>
<b>Note: S3 URI for Multimodal Storage</b>- ⚠️ For S3 location for Multimodal Storage(we will show this below) we will create a separate bucket where the parsed images will be stored. If you are using in workshop you should see a bucket with similar name - mmu-workshop-tmp-********
So the overall S3 URI becomes like:

    
  `s3://mmu-workshop-tmp-********`
</div>

**Step 5: Select Embedding Model and Configure Vector Store**

- Select the embedding model **Amazon Titan Embedding Model v2**, which will be used to create vector representations of the multimodal data. 
- **NOTE:** If you have requested Bedrock model access for only Nova models in the first lab, please make sure to request access for Titan Text Embeddings V2 following the same instructions, [request access following these instructions](https://docs.aws.amazon.com/bedrock/latest/userguide/model-access-modify.html)

- For **Vector Database**, select **Open Search Serverless**.

- For **Multimodal storage destination**, select the previously discovered S3 bucket for temporary files `mmu-workshop-tmp-******`.

![images/kb/storage-config.png](../images/kb/storage-config.png)

**Step 6: Review and Create the Knowledge Base**

Finally, lets review the details enter, and click submit to create a Multimodal Knowledge base

Once the Knowledge base is created (this generally takes 4-5 mins) you would see an image similar to the following message on screen

![images/kb/kb_successmsg.png](../images/kb/kb_successmsg.png)



### This overall process takes roughly 10 mins

### 2. Key Information

Once the Knowledge Base is created lets note down the following key information in below cell.

<div class="alert alert-block alert-info">
<b>Note:</b> Please make sure to add Knowledge Base Name, Id, Data Source ID and the IAM role name created below
</div>

![images/kb/kb-id.png](../images/kb/kb-id.png)

![images/kb/data-source-id.png](../images/kb/data-source-id.png)

In [None]:
## ⚠️ ⚠️ replace below values with the created Knowledge Base and Data Source
kb_name = "" # add your knowledge base name here
kb_id = ""   # add your knowledge base id here
ds_id = ""  # add your data source id here
kb_iam_role_name = ""    #add your knowledge base iam role name here

Verify that all resources are created correctly and ready to be used:

In [None]:
bedrock_agent_client = boto3_session.client("bedrock-agent")
bedrock_agent_runtime_client = boto3.client("bedrock-agent-runtime")

get_kb_response = bedrock_agent_client.get_knowledge_base(knowledgeBaseId=kb_id)

get_ds_response = bedrock_agent_client.get_data_source(
    knowledgeBaseId=kb_id, dataSourceId=ds_id
)

%store kb_name
%store kb_id
%store ds_id
%store account_id
%store bucket_name

### 3. Ingest the customer support information files into the S3 location 

### Using multiple S3 buckets 

1. **Data Source Input S3 Bucket**: This s3 bucket will serve as an input for creating our Data Source which will create a Vector Database using OpenSearch Serverless. For this we will use the pre-created bucket of the format
`mmu-workshop-<ACCOUNT_ID>-*****`

2. **Multimodal Storage Bucket**: This s3 bucket will be used to write and read any extracted images from multimodal documents that needs to be refrenced while answering questions related to images. For this we will use a pre-created bucket of the format
`mmu-workshop-tmp-<ACCOUNT_ID>-*****`

**Note**: We will be syncing in "mm-data" partition, if you are syncing somewhere else please make the appropriate modifications.

In [None]:
# Upload data to S3 to the bucket that was configured as a data source to the Knowledge Base

s3_client = boto3.client("s3")


def interactive_sleep(seconds: int):
    dots = ""
    for i in range(seconds):
        dots += "."
        print(dots, end="\r")
        time.sleep(1)


def uploadDirectory(path, bucket_name, s3_path):
    for root, dirs, files in os.walk(path):
        for file in files:
            local_file_path = os.path.join(root, file)
            s3_key = os.path.join(s3_path, os.path.relpath(local_file_path, path))
            # Upload the file with the new S3 key
            s3_client.upload_file(local_file_path, bucket_name, s3_key)


uploadDirectory("mm-rag-docs", bucket_name, "mm-data")

### 4. Sync the KB Data Source _[You can also skip this to do from UI by clicking "Sync Data Source"]_

![images/kb/kb_sync.png](../images/kb/kb_sync.png)

This may take 5-7 mins to complete.

In [None]:
interactive_sleep(30)
ingest_jobs = []

# Start an ingestion job
try:
    start_job_response = bedrock_agent_client.start_ingestion_job(
        knowledgeBaseId=kb_id, dataSourceId=ds_id
    )
    job = start_job_response["ingestionJob"]
    job_id = job["ingestionJobId"]
    print(f"Ingestion job started successfully. {job_id=}")

    while job["status"] not in ["COMPLETE", "FAILED", "STOPPED"]:
        get_job_response = bedrock_agent_client.get_ingestion_job(
            knowledgeBaseId=kb_id, dataSourceId=ds_id, ingestionJobId=job_id
        )
        job = get_job_response["ingestionJob"]
    pp.pprint(job)
    interactive_sleep(40)
    ingest_jobs.append(job)
except Exception as e:
    print("Failed to start ingestion job!")
    print(e)

### 5. Update the IAM Policy to include Amazon Nova as generator

In [None]:
def attach_policy_json_to_role(role_name, policy_name, policy_json):
    """
    Attaches a policy JSON directly to an IAM role.

    :param role_name: The name of the IAM role
    :param policy_name: The name to give the new policy
    :param policy_json: The policy document as a JSON string or dictionary
    :return: True if successful, False otherwise
    """
    try:
        # Create an IAM client
        iam_client = boto3.client("iam")

        # Ensure policy_json is a string
        if isinstance(policy_json, dict):
            policy_json = json.dumps(policy_json)

        # Create the policy
        response = iam_client.create_policy(
            PolicyName=policy_name, PolicyDocument=policy_json
        )

        # Get the ARN of the newly created policy
        policy_arn = response["Policy"]["Arn"]

        # Attach the policy to the role
        iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)

        print(
            f"Successfully created policy {policy_name} and attached it to role {role_name}"
        )
        return True

    except Exception as e:
        print(f"Error attaching policy JSON to role: {str(e)}")
        return False


# Example usage
policy_name = "NovaProModelPolicy"
policy_json = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "BedrockInvokeModelStatement",
            "Effect": "Allow",
            "Action": ["bedrock:*"],
            "Resource": [
                f"arn:aws:bedrock:us-east-1::foundation-model/{PRO_MODEL_ID.removeprefix('us.')}",
                f"arn:aws:bedrock:us-west-2::foundation-model/{PRO_MODEL_ID.removeprefix('us.')}",
                f"arn:aws:bedrock:us-west-2:{account_id}:inference-profile/{PRO_MODEL_ID}",
            ],
        }
    ],
}

attach_policy_json_to_role(kb_iam_role_name, policy_name, policy_json)

### 6. Test the KB retrieve and retrieve and generate

### Using RetrieveAndGenerate API
Behind the scenes, RetrieveAndGenerate API converts queries into embeddings, searches the knowledge base, and then augments the foundation model prompt with the search results as context information and returns the FM-generated response to the question. For multi-turn conversations, Knowledge Bases manage short-term memory of the conversation to provide more contextual results.

The output of the RetrieveAndGenerate API includes the generated response, source attribution as well as the retrieved text chunks.

In [None]:
import io
import s3fs
import ipywidgets as widgets
from PIL import Image as PILImage


def ask_bedrock_llm_with_knowledge_base(query: str, model_arn: str, kb_id: str) -> str:
    response = bedrock_agent_runtime_client.retrieve_and_generate(
        input={"text": query},
        retrieveAndGenerateConfiguration={
            "type": "KNOWLEDGE_BASE",
            "knowledgeBaseConfiguration": {
                "knowledgeBaseId": kb_id,
                "modelArn": model_arn,
            },
        },
    )
    return response


fs = s3fs.S3FileSystem()


## Function to print retrieved response
def print_response(response):
    # structure 'retrievalResults': list of contents. Each list has ['ResponseMetadata', 'citations', 'output', 'sessionId']
    generated_text = response["output"]["text"]
    ref_ref_location_lst = []
    ref_ref_location_lst.append({"generated_text": generated_text})
    for num, chunk in enumerate(response["citations"]):
        ref_locations = []
        for i, ref in enumerate(chunk["retrievedReferences"]):
            data_dict = {
                "ref_location": ref["location"],
                "ref_metadata": ref["metadata"]["x-amz-bedrock-kb-source-uri"],
            }
            if "x-amz-bedrock-kb-byte-content-source" in ref["metadata"].keys():
                data_dict["ref_image"] = ref["metadata"][
                    "x-amz-bedrock-kb-byte-content-source"
                ]
            ref_locations.append(data_dict)
        ref_ref_location_lst.append({"chunk_details": ref_locations})
    return ref_ref_location_lst


def create_tree_widget(data, s3=None):
    s3 = s3 or s3fs.S3FileSystem(anon=False)
    main_accordion = widgets.Accordion()

    for i, item in enumerate(data):
        subchildren = []

        # Always add Generated Text first
        if "generated_text" in item:
            text_widget = widgets.Textarea(
                value=str(item["generated_text"]),
                disabled=True,
                layout=widgets.Layout(width="500px", height="200px"),
            )
            subchildren.append(text_widget)

        # Then add Chunk Details
        if "chunk_details" in item:
            chunk_accordion = widgets.Accordion()
            chunk_children = []

            for chunk in item["chunk_details"]:
                chunk_subchildren = []

                for key, value in chunk.items():
                    if (
                        key == "ref_image"
                        and isinstance(value, str)
                        and value.startswith("s3://")
                    ):
                        try:
                            with s3.open(value, "rb") as f:
                                img = PILImage.open(f).resize((400, 400))
                                img_byte_arr = io.BytesIO()
                                img.save(img_byte_arr, format="PNG")
                                img_widget = widgets.Image(
                                    value=img_byte_arr.getvalue(),
                                    format="png",
                                    width=400,
                                    height=400,
                                )
                            chunk_subchildren.append(img_widget)
                        except Exception as e:
                            chunk_subchildren.append(widgets.Label(f"Image Error: {e}"))
                    else:
                        chunk_subchildren.append(
                            widgets.Label(f"{key}: {json.dumps(value)}")
                        )

                chunk_item_accordion = widgets.Accordion(
                    children=tuple(chunk_subchildren)
                )
                for k, child in enumerate(chunk_subchildren):
                    chunk_item_accordion.set_title(k, list(chunk.keys())[k])

                chunk_children.append(chunk_item_accordion)

            chunk_accordion = widgets.Accordion(children=tuple(chunk_children))
            for j, child in enumerate(chunk_children):
                chunk_accordion.set_title(j, f"Chunk {j+1}")

            subchildren.append(chunk_accordion)

        # Create item accordion with correct titles
        item_accordion = widgets.Accordion(children=tuple(subchildren))
        item_accordion.set_title(0, "Generated Text")
        if len(subchildren) > 1:
            item_accordion.set_title(1, "Chunk Details")

        main_accordion.children += (item_accordion,)
        main_accordion.set_title(i, f"Item {i}")

    return main_accordion

### Using Textual Information Retrieval

In [None]:
query = "My phone shows -82 dBm signal strength. Is this good enough for streaming video?"
model_arn = f"arn:aws:bedrock:{region_name}:{account_id}:inference-profile/{PRO_MODEL_ID}"

response = ask_bedrock_llm_with_knowledge_base(query, model_arn, kb_id)
response_data = print_response(response)
tree_widget = create_tree_widget(response_data)
display(tree_widget)

In [None]:
query = f"""I get full bars in my living room but can't make calls. In the bedroom I get one bar but calls work fine. 
Why is this happening?"""
model_arn = f"arn:aws:bedrock:{region_name}:{account_id}:inference-profile/{PRO_MODEL_ID}"

response = ask_bedrock_llm_with_knowledge_base(query, model_arn, kb_id)
response_data = print_response(response)
tree_widget = create_tree_widget(response_data)
display(tree_widget)

### Using Multimodal search to find information

In [None]:
query = "What are good places in my house to put my router?"

response = ask_bedrock_llm_with_knowledge_base(query, model_arn, kb_id)
response_data = print_response(response)
tree_widget = create_tree_widget(response_data)
display(tree_widget)

In [None]:
query = "My installer mounted my router high up on the wall - seems weird. Is this actually better?"

response = ask_bedrock_llm_with_knowledge_base(query, model_arn, kb_id)
response_data = print_response(response)
tree_widget = create_tree_widget(response_data)
display(tree_widget)

Now let's test a case that includes an image with the customer's query. For this, we adjust our previous function to include image in the input payload

In [None]:

#define function to encode images
def get_base64_encoded_value(media_path):
    """Convert media file to base64 encoded string.
    
    Args:
        media_path (str): Path to the media file
        
    Returns:
        str: Base64 encoded string
    """
    with open(media_path, "rb") as media_file:
        binary_data = media_file.read()
        base_64_encoded_data = base64.b64encode(binary_data)
        base64_string = base_64_encoded_data.decode("utf-8")
        return base64_string

#define function to call Bedrock including an image in the input
def ask_bedrock_llm_with_knowledge_base_mm(query: str, image_path: str, model_arn: str, kb_id: str) -> str:
    response = bedrock_agent_runtime_client.retrieve_and_generate(
        input={"text": query},
        retrieveAndGenerateConfiguration={
            "type": "KNOWLEDGE_BASE",
            "knowledgeBaseConfiguration": {
                "knowledgeBaseId": kb_id,
                "modelArn": model_arn,
            },
        },
    )

    messages = [
        {
            "role": "user",
            "content": [
                {
                    "image": {
                        "format": "png",
                        "source": {"bytes": get_base64_encoded_value(image_path)}
                    }
                },
                {"text": query}
            ]
        }
    ]
    return response


In [None]:
query = "This is the best I can do for my router setup given my skills. Can this work?"

# Path to the damage photo
image_path = "images/wifi-router.png"

# Display the image
print("Router setup")
display(Image(filename=image_path, width=600))  # Adjust width as needed for clear visibility


In [None]:
response = ask_bedrock_llm_with_knowledge_base_mm(query, image_path, model_arn, kb_id)
response_data = print_response(response)
tree_widget = create_tree_widget(response_data)
display(tree_widget)

## Conclusion

In this lab, we explored how to build a multimodal Retrieval Augmented Generation (RAG) system using Amazon Bedrock Knowledge Bases. We created a customer support assistant that can:

1. **Process multimodal documents** - Our system ingested documents containing not just text, but also tables and images
2. **Leverage Amazon Nova models** - We used Nova Lite as a parser to extract and understand visual content and Nova Pro as the generator model
3. **Build a complete multimodal RAG pipeline** - From creating the knowledge base to configuring the vector store to testing with different query types

These capabilities are essential for creating AI assistants that can truly understand and reason about the rich multimodal world of business information.