## Using metadata filtering with Amazon Bedrock Agents and Knowledge Bases

Metadata filtering is a powerful feature that allows you to refine search results by pre-filtering the vector store based on custom metadata attributes. This approach narrows down the search space to the most relevant documents or passages, reducing noise and irrelevant information. 

In this notebook, we explore 3 different implementations of metadata filtering with Amazon Bedrock Agents.

#### Method 1. Explicit metadata filtering

In this method, the user creates their own filter and provides it to the Amazon Bedrock Agent during the agent `invoke_agent` API call. To change what is being filtered for, the user must manually edit the filter and pass it to the agent.

Learn more [here](https://aws.amazon.com/blogs/machine-learning/amazon-bedrock-knowledge-bases-now-supports-metadata-filtering-to-improve-retrieval-accuracy/).

![arch1](./images/architecture_1.png)

#### Method 2. Intelligent metadata filtering custom approach

This method uses LLMs on Amazon Bedrock and Pydantic data models to dynamically extract metadata filters from natural language queries. The process begins by the user asking a query, which is then processed by an LLM to extract relevant metadata. The extracted metadata is used to create a metadata filter, and then passed to Amazon Bedrock Agent. Finally, the generated response is returned to the user. The benefit of this method is that a filter does not need to be manually created and the filter can change throughout a conversation according to context.


Learn more [here](https://aws.amazon.com/blogs/machine-learning/streamline-rag-applications-with-intelligent-metadata-filtering-using-amazon-bedrock/).

![arch2](./images/architecture_2.png)

#### Method 3. Implicit metadata filtering

This method is similar to method 2, but uses a feature built-in to Amazon Bedrock Agents and Knowledge Bases. There is no need to use Pydantic data models or LLMs. Currently, this feature is only available for the following [FMs](https://docs.aws.amazon.com/bedrock/latest/userguide/what-is-bedrock.html).

![arch3](./images/architecture_3.png)

### Solution Overview

In this notebook, we demonstrate how to create a **sample dataset**, **Amazon Bedrock Knowledge Base**, and **Amazon Bedrock Agent**. 

Our sample dataset consists of 5 fictional employees of ExampleCompany, each with:
1. A .pdf file detailing their financial benefits. 
2. A .pdf file detailing their healthcare benefits.
3. Metadata attached to each .pdf file. The metadata attributes are `employee_name`, `role`, and `document_type`.

After setting up resources, we implement the **3 metadata filtering methods**. 

For each method, we are using the scenario: An employee named Alex Anderson is looking for information about his 401k. The information he is looking for is stored in a .pdf file with the metadata attributes `employee_name=Alex Anderson` and `document_type=finance`.

### Prerequisites
Before proceeding with this tutorial, make sure you have the following in place:

- **AWS account** – You should have an AWS account with access to Amazon Bedrock.
- **Model access** – Amazon Bedrock users need to request access to FMs before they’re available for use. For this solution, you need to enable access to the Amazon Titan Embeddings G1 – Text, Amazon Nova Lite, Anthropic's Claude Instant, and Anthropic's Claude 3.5 Sonnet v2.

### Set up the environment 
First, set up your environment with the necessary imports and Boto3 clients

In [None]:
!pip install -q boto3 fpdf pydantic opensearch-py retrying termcolor rich reportlab

In [None]:
!pip install boto3==1.37.4

In [None]:
import boto3
import uuid
import textwrap
import os
import json
import sys

sys.path.insert(0, ".")
sys.path.insert(1, "./../../../")

from fpdf import FPDF
from typing import List, Optional
from pydantic import BaseModel, validator
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import letter
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.lib.utils import simpleSplit
from src.utils.knowledge_base_helper import KnowledgeBasesForAmazonBedrock
from src.utils.bedrock_agent_helper import AgentsForAmazonBedrock

session = boto3.session.Session()
region = session.region_name
s3_client = boto3.client("s3", region_name=region)
sts_client = boto3.client("sts")
bedrock_runtime_client = boto3.client("bedrock-runtime", region_name=region)
bedrock_client = boto3.client("bedrock")
iam_client = boto3.client("iam")

### Prepare a dataset for Amazon Bedrock Knowledge Bases


Create an Amazon Simple Storage Service (Amazon S3) bucket to store the data.

In [None]:
account_id = sts_client.get_caller_identity()["Account"]
unique_suffix = uuid.uuid4().hex[:4]
bucket_name = f"bedrock-documents-{account_id}-{unique_suffix}"

s3_client.create_bucket(Bucket=bucket_name)
print(f"Bucket '{bucket_name}' created successfully in {region}!")

Create local folders to store the data that will be generated.

In [None]:
! mkdir sample_documents sample_documents/healthcare sample_documents/finance

Generate sample data for five employees of ExampleCompany. 

- Finance documents have the naming structure `finance-x.pdf`
- Healthcare documents have the naming structure `healthcare-x.pdf`
- Metadata files have the suffix `.metadata.json`


In [None]:
# Function to generate sample documents
MODEL_ID = "us.amazon.nova-lite-v1:0"


def generate_sample_document(prompt):
    system_list = [
        {
            "text": "You are helping to generate sample data. When the user provides you with a prompt, write a complete and realistic document for that prompt. "
        }
    ]

    message_list = [{"role": "user", "content": [{"text": prompt}]}]

    inf_params = {"maxTokens": 500, "topP": 0.9, "topK": 20, "temperature": 0.3}

    request_body = {
        "schemaVersion": "messages-v1",
        "messages": message_list,
        "system": system_list,
        "inferenceConfig": inf_params,
    }

    response = bedrock_runtime_client.invoke_model(
        modelId=MODEL_ID, body=json.dumps(request_body)
    )
    response_body = json.loads(response["body"].read().decode("utf-8"))
    response = response_body["output"]["message"]["content"][0]["text"]
    return response


# Function to convert multi-line text to PDF
def text_to_pdf(text, filename, file_path):
    c = canvas.Canvas(f"{file_path}/{filename}", pagesize=letter)
    width, height = letter
    y = height - 40

    lines = simpleSplit(text, "Helvetica", 12, width - 80)

    for line in lines:
        if y < 40:  # Check if we need a new page
            c.showPage()
            y = height - 40
        c.drawString(40, y, line)
        y -= 15  # Move down for next line

    c.save()
    print(f"Created {file_path}/{filename}")


# Function to create metadata
def generate_metadata(employee_name, role, document_type, filename, file_path):
    metadata = {
        "metadataAttributes": {
            "employee_name": employee_name,
            "role": role,
            "document_type": document_type,
        }
    }

    metadata_filename = f"{filename}.metadata.json"
    with open(f"{file_path}/{metadata_filename}", "w") as f:
        json.dump(metadata, f, indent=4)
    print(f"Created {file_path}/{metadata_filename}")

In [None]:
employee_list = [
    {"name": "Alex Anderson", "role": "independent_worker"},
    {"name": "Beth Baker", "role": "manager"},
    {"name": "Charlie Cook", "role": "independent_worker"},
    {"name": "Dave Duncan", "role": "manager"},
    {"name": "Emily Eaton", "role": "independent_worker"},
]

# Create sample healthcare documents and metadata
for index, employee in enumerate(employee_list):
    prompt = "Generate a healthcare benefits document for an employee of ExampleCompany. The document needs to be one page long and includes a section on medical benefits, a dental plan, and a vision plan."
    text = generate_sample_document(prompt)

    output_filename = f"healthcare-{index}"
    output_file_path = "sample_documents/healthcare"

    text_to_pdf(text, output_filename, output_file_path)
    generate_metadata(
        employee["name"],
        employee["role"],
        "healthcare",
        output_filename,
        output_file_path,
    )


# Create sample finance documents and metadata
for index, employee in enumerate(employee_list):
    prompt = "Generate a finance benefits document for an employee of ExampleCompany. The document needs to be one page long and includes a 401k plan and stock options."
    text = generate_sample_document(prompt)

    output_filename = f"finance-{index}"
    output_file_path = "sample_documents/finance"

    text_to_pdf(text, output_filename, output_file_path)
    generate_metadata(
        employee["name"],
        employee["role"],
        "finance",
        output_filename,
        output_file_path,
    )

### Create a knowledge base for Amazon Bedrock

The knowledge base connects the Amazon Bedrock model to your dataset for Retrieval Augmented Generation (RAG).

In [None]:
kb = KnowledgeBasesForAmazonBedrock()

kb_name = f"my-knowledge-base-{unique_suffix}"
kb_description = "Knowledge base for documents and metadata"
kb_bucket = bucket_name

kb_id, ds_id = kb.create_or_retrieve_knowledge_base(
    kb_name,
    kb_description,
    kb_bucket,
)

### Upload data to S3
Upload the data that you generated to the Amazon S3 bucket you created earlier. Healthcare documents and its metadata will be stored in the folder `s3://bedrock-documents-xxxxxxxxxxxx-xxxxxxx/healthcare`. Finance documents and its metadata will be stored in the folder `s3://bedrock-documents-xxxxxxxxxxxx-xxxxxxx/finance`.

In [None]:
def upload_folder_to_s3(local_path, bucket_name, s3_key):
    for filename in os.listdir(local_path):
        if ".ipynb" in filename:
            pass
        else:
            s3_client.upload_file(
                local_path + filename, bucket_name, f"{s3_key}/{filename}"
            )
            print(f"Uploaded {filename} to {bucket_name}/{s3_key}")

In [None]:
upload_folder_to_s3("sample_documents/healthcare/", bucket_name, "healthcare")
upload_folder_to_s3("sample_documents/finance/", bucket_name, "finance")

### Synchronize the dataset with the knowledge base

After you create the knowledge base and your data files and metadata files are in an Amazon S3 bucket, you can sync to ingest your data sources into the knowledge base.



In [None]:
kb.synchronize_data(kb_id, ds_id)

### Create the Bedrock agent

In [None]:
agents = AgentsForAmazonBedrock()

agent_name = f"my-bedrock-agent-{unique_suffix}"
agent_description = "Agent for finance and healthcare documents"
agent_instructions = "You are a helpful chatbot for employees of a company. Answer in a polite and netural tone."
agent_foundation_model = [
    "anthropic.claude-3-sonnet-20240229-v1:0",
    "anthropic.claude-3-5-sonnet-20240620-v1:0",
    "anthropic.claude-3-haiku-20240307-v1:0",
]

# CREATE AGENT
agent_id, agent_alias_id, agent_alias_arn = agents.create_agent(
    agent_name=agent_name,
    agent_description=agent_description,
    agent_instructions=agent_instructions,
    model_ids=agent_foundation_model,  # IDs of the foundation models this agent is allowed to use, the first one will be used
    # to create the agent, and the others will also be captured in the agent IAM role for future use
)

# WAIT FOR STATUS UPDATE
agents.wait_agent_status_update(agent_id=agent_id)

# PREPARE AGENT
agents.prepare(agent_name=agent_name)

# WAIT FOR STATUS UPDATE
agents.wait_agent_status_update(agent_id=agent_id)

### Associate the Bedrock agent with the knowledge base.

In [None]:
agents.associate_kb_with_agent(agent_id, agent_description, kb_id)

Now that we have set up the Bedrock knowledge base and Bedrock agent, let's look at 3 different ways to implement metadata filtering.

### Method 1. Using explicit metadata filtering

![arch1](./images/architecture_1.png)

Construct the filters (the following are some examples):

In [None]:
single_filter = {"equals": {"key": "employee_name", "value": "Alex Anderson"}}

one_group_filter = {
    "andAll": [
        {"equals": {"key": "employee_name", "value": "Alex Anderson"}},
        {"equals": {"key": "document_type", "value": "finance"}},
    ]
}

Pass the filter to `retrievalConfiguration`.

In [None]:
session_state = {
    "knowledgeBaseConfigurations": [
        {
            "knowledgeBaseId": kb_id,
            "retrievalConfiguration": {
                "vectorSearchConfiguration": {"filter": one_group_filter}
            },
        }
    ]
}

Invoke the Amazon Bedrock agent and view the response. 

In this example, we have passed in a filter where `employee_name=Alex Anderson` and `document_type=finance`.

In [None]:
text = "How much will my employer match my 401k contribution?"
response = agents.invoke(
    input_text=text,
    agent_id=agent_id,
    agent_alias_id=agent_alias_id,
    session_state=session_state,
)
print(response)

### Method 2: Intelligent meta data filtering custom approach

![arch2](./images/architecture_2.png)

Define Pydantic models to validate and strucutre our extracted entities:

In [None]:
class Entity(BaseModel):
    employee_name: Optional[str]
    document_type: Optional[str]
    role: Optional[str]


class ExtractedEntities(BaseModel):
    entities: List[Entity]

    @validator("entities", pre=True)
    def remove_duplicates(cls, entities):
        unique_entities = []
        seen = set()
        for entity in entities:
            entity_tuple = tuple(sorted(entity.items()))
            if entity_tuple not in seen:
                seen.add(entity_tuple)
                unique_entities.append(dict(entity_tuple))
        return unique_entities

You now define a tool for entity extraction with basic instructions and use it with Amazon Bedrock. You should use a proper description for this to work for your use case:

In [None]:
tools = [
    {
        "toolSpec": {
            "name": "extract_entities",
            "description": "Extract named entities from the text. If you are not 100% sure of the entity value, use 'unknown'.",
            "inputSchema": {
                "json": {
                    "type": "object",
                    "properties": {
                        "entities": {
                            "type": "array",
                            "items": {
                                "type": "object",
                                "properties": {
                                    "employee_name": {
                                        "type": "string",
                                        "description": "The name of the user. There should be a first and last name.",
                                    },
                                    "document_type": {
                                        "type": "string",
                                        "description": "The type of document the user wants. It is either healthcare or finance.",
                                    },
                                    "role": {
                                        "type": "string",
                                        "description": "The type of role the user is. It is either manager or independent worker.",
                                    },
                                },
                                "required": ["employee_name", "document_type", "role"],
                            },
                        }
                    },
                    "required": ["entities"],
                }
            },
        }
    }
]

MODEL_ID = "anthropic.claude-3-sonnet-20240229-v1:0"


def extract_entities(text):
    response = bedrock_runtime_client.converse(
        modelId=MODEL_ID,
        inferenceConfig={"temperature": 0, "maxTokens": 4000},
        toolConfig={"tools": tools},
        messages=[{"role": "user", "content": [{"text": text}]}],
    )

    json_entities = None
    for content in response["output"]["message"]["content"]:
        if "toolUse" in content and content["toolUse"]["name"] == "extract_entities":
            json_entities = content["toolUse"]["input"]
            break

    if json_entities:
        return ExtractedEntities.parse_obj(json_entities)
    else:
        print("No entities found in the response.")
        return None

Create a function to construct the metadata filter based on the extracted entities:

In [None]:
def construct_metadata_filter(extracted_entities):
    if not extracted_entities or not extracted_entities.entities:
        return None

    entity = extracted_entities.entities[0]
    metadata_filter = {"andAll": []}

    if entity.employee_name and entity.employee_name != "unknown":
        metadata_filter["andAll"].append(
            {"equals": {"key": "employee_name", "value": entity.employee_name}}
        )

    if entity.document_type and entity.document_type != "unknown":
        metadata_filter["andAll"].append(
            {"equals": {"key": "document_type", "value": entity.document_type}}
        )

    if entity.role and entity.role != "unknown":
        metadata_filter["andAll"].append(
            {"equals": {"key": "role", "value": entity.role}}
        )

    return metadata_filter if metadata_filter["andAll"] else None

Create a main function that processes the query and retrieves results:

In [None]:
def process_query(text):
    extracted_entities = extract_entities(text)
    metadata_filter = construct_metadata_filter(extracted_entities)

    session_state = {
        "knowledgeBaseConfigurations": [
            {
                "knowledgeBaseId": kb_id,
                "retrievalConfiguration": {
                    "vectorSearchConfiguration": {
                        "filter": metadata_filter,
                    }
                },
            }
        ]
    }
    response = agents.invoke(
        input_text=text,
        agent_id=agent_id,
        agent_alias_id=agent_alias_id,
        session_state=session_state,
    )
    return response

Invoke the Amazon Bedrock agent and view the response. 

In this example, the LLM dynamically extracts the metadata attributes `employee_name=Alex Anderson` and `document_type=finance` from the query, creates a filter using those attributes, and then passes the filter and query into the Amazon Bedrock agent.

In [None]:
text = "I am Alex Anderson. How much will my employer match my 401k contribution?"
response = process_query(text)
print(response)

### Method 3. Implicit metadata filtering
![arch3](./images/architecture_3.png)

For implicit filtering, the Amazon Bedrock Knowledge Base execution role needs invoke permissions for Anthropic's Claude 3.5 Sonnet v2. Add the policy to the role.

In [None]:
kb_role_arn = kb.get_kb(kb_id)["knowledgeBase"]["roleArn"]
kb_role = kb_role_arn.rsplit("/", 1)[1]

policy_name = "BedrockInferenceProfileAccess"
policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "bedrock:GetInferenceProfile",
                "bedrock:InvokeModel",
                "bedrock:ListInferenceProfiles",
            ],
            "Resource": [
                f"arn:aws:bedrock:us-east-1:{account_id}:inference-profile/us.anthropic.claude-3-5-sonnet-20241022-v2:0",
                "arn:aws:bedrock:us-east-1::foundation-model/*",
                f"arn:aws:bedrock:us-east-2:{account_id}:inference-profile/us.anthropic.claude-3-5-sonnet-20241022-v2:0",
                "arn:aws:bedrock:us-east-2::foundation-model/*",
                f"arn:aws:bedrock:us-west-2:{account_id}:inference-profile/us.anthropic.claude-3-5-sonnet-20241022-v2:0*",
                "arn:aws:bedrock:us-west-2::foundation-model/*",
            ],
        }
    ],
}

response = iam_client.put_role_policy(
    RoleName=kb_role, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)
)

Create the conversation history. We are giving the employee name through the conversation history because in a realistic scenario, a user talking to a chatbot would expect the chatbot to already know basic information such as name.

In the real-life scenario, the employee would be logged into their company portal and the `user_name` value could be retrieved from the authentication protocol.

In [None]:
user_name = "Alex Anderson"  # get from authentication

conversation_history = [
    {
        "content": [
            {"text": f"Hi, I am {user_name}. Use my name to help with your search."}
        ],
        "role": "user",
    },
    {
        "content": [{"text": f"Hi {user_name}, how can I help you?"}],
        "role": "assistant",
    },
]

Construct the metadata attributes for the implicit filter:

In [None]:
metadata_attributes = [
    {
        "description": "The name of the user. There should be a first and last name.",
        "key": "employee_name",
        "type": "STRING",
    },
    {
        "description": "The type of information the user is looking for. The type should be either finance or healthcare.",
        "key": "document_type",
        "type": "STRING",
    },
    {
        "description": "The role of the user. The role should be either manager or independent worker.",
        "key": "role",
        "type": "STRING",
    },
]

Pass the conversation history and metadata attributes:

In [None]:
session_state = {
    "conversationHistory": {"messages": conversation_history},
    "knowledgeBaseConfigurations": [
        {
            "knowledgeBaseId": kb_id,
            "retrievalConfiguration": {
                "vectorSearchConfiguration": {
                    "implicitFilterConfiguration": {
                        "metadataAttributes": metadata_attributes,
                        "modelArn": f"arn:aws:bedrock:us-east-1:{account_id}:inference-profile/us.anthropic.claude-3-5-sonnet-20241022-v2:0",
                    }
                }
            },
        }
    ],
}

Invoke the Amazon Bedrock agent and view the response. 

In this example, the Amazon Bedrock agent already knows the employee name due to its conversation history. The user query tells the Amazon Bedrock model to look for finance documents. Combining the conversation history and user query, the Amazon Bedrock model creates an implicit filter where `employee_name=Alex Anderson` and `document_type=finance`.

In [None]:
response = agents.invoke(
    input_text="How much will my employer match my 401k contribution?",
    agent_id=agent_id,
    agent_alias_id=agent_alias_id,
    session_state=session_state,
)
print(response)

### Clean up

Run the cell below to delete all resources created in this notebook.

Delete AWS resources:

In [None]:
iam_client.delete_role_policy(RoleName=kb_role, PolicyName=policy_name)
kb.delete_kb(kb_name=kb_name)
agents.delete_agent(agent_name=agent_name)

Delete local files and folders:

In [None]:
!rm -r sample_documents