# Metadata filtering using Amazon Bedrock Knowledge Bases
This notebook provides sample code walkthrough for 'metadata filtering' feature, for Amazon Bedrock Knowledge Bases.

Using metadata filtering feature, you can use to improve search results by pre-filtering your retrievals from vector stores. 
For more details on this feature, please read this [blog](https://aws.amazon.com/blogs/machine-learning/amazon-bedrock-knowledge-bases-now-supports-metadata-filtering-to-improve-retrieval-accuracy/).

## 1. Import the needed libraries
First step is to install the pre-requisites packages.

In [None]:
%pip install --upgrade pip --quiet
%pip install -r ../requirements.txt --no-deps --quiet
%pip install -r ../requirements.txt --upgrade --quiet

In [None]:
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [None]:
import botocore
botocore.__version__

In [None]:
import os
import sys
import time
import boto3
import logging
import pprint
import json

# Set the path to import module
from pathlib import Path
current_path = Path().resolve()
current_path = current_path.parent
if str(current_path) not in sys.path:
    sys.path.append(str(current_path))
# Print sys.path to verify
# print(sys.path)

pp = pprint.PrettyPrinter(indent=2)

from utils.knowledge_base import BedrockKnowledgeBase

In [None]:
#Clients
s3_client = boto3.client('s3')
sts_client = boto3.client('sts')
session = boto3.session.Session()
region =  session.region_name
account_id = sts_client.get_caller_identity()["Account"]
bedrock_runtime_client = boto3.client('bedrock-runtime')
bedrock_agent_client = boto3.client('bedrock-agent')
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
region, account_id

In [None]:
import time

# Get the current timestamp
current_time = time.time()

# Format the timestamp as a string
timestamp_str = time.strftime("%Y%m%d%H%M%S", time.localtime(current_time))[-7:]
# Create the suffix using the timestamp
suffix = f"{timestamp_str}"
knowledge_base_name = 'metadata-filtering-kb'
knowledge_base_description = "Knowledge Base metadata filtering."
bucket_name = f'{knowledge_base_name}-{suffix}'
foundation_model = "us.anthropic.claude-3-5-sonnet-20240620-v1:0"

# Define data sources
data_source=[{"type": "S3", "bucket_name": bucket_name}]

## 2. Create knowledge bases with fixed chunking strategy
Let's start by creating a [Amazon Bedrock Knowledge Bases](https://aws.amazon.com/bedrock/knowledge-bases/) to store video games data in csv format. Knowledge Bases allow you to integrate with different vector databases including [Amazon OpenSearch Serverless](https://aws.amazon.com/opensearch-service/features/serverless/), [Amazon Aurora](https://aws.amazon.com/rds/aurora/), [Pinecone](http://app.pinecone.io/bedrock-integration), [Redis Enterprise]() and [MongoDB Atlas](). For this example, we will integrate the knowledge base with Amazon OpenSearch Serverless. To do so, we will use the helper class `BedrockKnowledgeBase` which will create the knowledge base and all of its pre-requisites:
1. IAM roles and policies
2. S3 bucket
3. Amazon OpenSearch Serverless encryption, network and data access policies
4. Amazon OpenSearch Serverless collection
5. Amazon OpenSearch Serverless vector index
6. Knowledge base
7. Knowledge base data source

We will create a knowledge base using fixed chunking strategy. 

You can chhose different chunking strategies by changing the below parameter values: 
```
"chunkingStrategy": "FIXED_SIZE | NONE | HIERARCHICAL | SEMANTIC"
```

In [None]:
knowledge_base_metadata = BedrockKnowledgeBase(
    kb_name=f'{knowledge_base_name}-{suffix}',
    kb_description=knowledge_base_description,
    data_sources=data_source, 
    chunking_strategy = "FIXED_SIZE", 
    suffix = suffix
)

### 2.1 Download video game dataset and upload it to Amazon S3

Now that we have created the knowledge base, let's populate it with the `video_games` dataset to KB. This data is being downloaded from [here](https://aws-blogs-artifacts-public.s3.amazonaws.com/ML-16482/30_generated_video_game_records.zip). This data is about fictional video games containing information like title, description, genre, year, publisher, and score for each video games.

In [None]:
import os
import zipfile

# Download the zip file
!wget https://aws-blogs-artifacts-public.s3.amazonaws.com/ML-16482/30_generated_video_game_records.zip --no-check-certificate

# Unzip the file content - This data will get unzipped into a folder name 'video_game'
with zipfile.ZipFile('./30_generated_video_game_records.zip', 'r') as zipf:
    csv_files = [x for x in zipf.infolist() if not x.filename.startswith('__MACOSX/') and x.filename.endswith('.csv')]
    for csv_file in csv_files:
        zipf.extract(csv_file, './')

#remove original zip file
# os.remove('./30_generated_video_game_records.zip')

Let's upload the video games data available in the `video_game` folder to s3.

In [None]:
def upload_directory(path, bucket_name):
        for root,dirs,files in os.walk(path):
            for file in files:
                if not file.startswith('.DS_Store'):
                    file_to_upload = os.path.join(root,file)
                    print(f"uploading file {file_to_upload} to {bucket_name}")
                    s3_client.upload_file(file_to_upload,bucket_name,file)

upload_directory("video_game", bucket_name)

Now we start the ingestion job.

In [None]:
# ensure that the kb is available
time.sleep(30)
# sync knowledge base
knowledge_base_metadata.start_ingestion_job()

Finally we save the Knowledge Base Id to test the solution at a later stage. 

In [None]:
kb_id_metadata = knowledge_base_metadata.get_knowledge_base_id()

### 2.2 Prepare metadata for ingestion

Now, we generate the metadata file (`[filename].metadata.json`) for each of the source file being ingested to the Knowledge Bases with their Id, genres, year, publisher, and score. Below is the example metadata file attached to one of the files.

```
{"metadataAttributes": 
    {
        "Id": "1.csv",
        "genres": "'Open World",
        "year": 2023,
        "publisher": "Interstellar Games",
        "score": 8.5
    }
}
```

We upload all the metadata files generated for each file to S3, then we run the ingestion job again to reflect the changes into the Knowledge Base.

In [None]:
import csv
import json
import pandas as pd

def generate_matadata(data_dir , metadata_fields):
    # Define the metadata attributes
    metadata_attributes = metadata_fields

    # Loop through all CSV files in the directory
    for filename in os.listdir(data_dir):
        filename= f'{data_dir}/{filename}'
        if filename.endswith(".csv"):
            # Read the CSV file
            df = pd.read_csv(filename)
            df["Id"] = [os.path.basename(filename)]
            
            # Extract the metadata attributes
            metadata = {k:v[0] for k,v in df[metadata_attributes].to_dict(orient='list').items()}
            # reorder the keys
            metadata = {key: metadata[key] for key in metadata_attributes}
            
            # Create a JSON object
            json_data = {"metadataAttributes": metadata}
            
            
            # Write the JSON object to a file
            with open(f"{filename.replace('.csv', '.csv.metadata.json')}", "w") as f:
                json.dump(json_data, f)

In [None]:
data_dir = './video_game'
metadata_fields = ["Id", "genres", "year", "publisher", "score"]

generate_matadata(data_dir, metadata_fields)

In [None]:
# upload metadata file to S3
upload_directory("video_game", bucket_name)

In [None]:
# delete metadata files from local
data_dir = './video_game'
for filename in os.listdir(data_dir):
    filename= f'{data_dir}/{filename}'
    if filename.endswith(".csv.metadata.json"):
        os.remove(filename)

Now start the ingestion job. Since, we are using the same documents as used for fixed chunking, we are skipping the step to upload documents to s3 bucket. 

In [None]:
# ensure that the kb is available
time.sleep(30)
# sync knowledge base
knowledge_base_metadata.start_ingestion_job()

### 2.3 Compare the difference of using metadata with Retrieval API

Out of all the games that we have information within the Knowledge Bases, say our users wanted to ask about games based on conditions such as generes or scores.

In [None]:
query = "What are the strategy games published after 2023 with score of at least 9.0?"

In [None]:
# Without metadata filter

response_without_mf = bedrock_agent_runtime_client.retrieve(
    retrievalQuery= {
        'text': query
    },
    knowledgeBaseId=kb_id_metadata
)

print(f"Number of retrieved chunks: {len(response_without_mf['retrievalResults'])}")
pp.pprint(response_without_mf['retrievalResults'])

Here, we can observe that only one of the returned chunks actually answers the user's query, but we had to retrieve more than one chunks without the filter.

Now, how can we do better with metadata filter?

In [None]:
one_group_filter= {
    "andAll": [
        {
            "equals": {
                "key": "genres",
                "value": "Strategy"
            }
        },
        {
            "greaterThanOrEquals": {
                "key": "score",
                "value": 9.0
            }
        }
    ]
}

In [None]:
# With metadata filter

response_with_mf = bedrock_agent_runtime_client.retrieve(
    retrievalQuery= {
        'text': query
    },
    knowledgeBaseId=kb_id_metadata,
    retrievalConfiguration= {
        'vectorSearchConfiguration': {
            "filter": one_group_filter
        }
    }
)

print(f"Number of retrieved chunks: {len(response_with_mf['retrievalResults'])}")
pp.pprint(response_with_mf['retrievalResults'])

Now, we can observe that we only retrieved one chunk based on the conditions that we set in the filter. But why does this matter?

### 2.4 Compare the difference in customized RAG workflow that uses Retrieve API

In [None]:
# fetch context from the response
def get_contexts(retrievalResults):
    contexts = []
    for retrievedResult in retrievalResults: 
        contexts.append(retrievedResult['content']['text'])
    return contexts

In [None]:
# Without metadata filter

contexts = get_contexts(response_without_mf['retrievedResults'])

prompt_without_mf = f"""Answer user's question based on the provided context.

<context>
{contexts}
</context>

<question>
{query}
</question>"""

model_id = "us.anthropic.claude-3-5-sonnet-20240620-v1:0"
messages = [
    {
        "role": "user",
        "content": [{"text": prompt_without_mf}],
    }
]
response = bedrock_runtime_client.converse(
    modelId=model_id,
    messages=messages,
    inferenceConfig={"maxTokens": 1024, "temperature": 0.5, "topP": 1},
)
response_text = response["output"]["message"]["content"][0]["text"]

print(response_text)

In [None]:
# With metadata filter

contexts = get_contexts(response_with_mf['retrievedResults'])

prompt_with_mf = f"""Answer user's question based on the provided context.

<context>
{contexts}
</context>

<question>
{query}
</question>"""

model_id = "us.anthropic.claude-3-5-sonnet-20240620-v1:0"
messages = [
    {
        "role": "user",
        "content": [{"text": prompt_with_mf}],
    }
]
response = bedrock_runtime_client.converse(
    modelId=model_id,
    messages=messages,
    inferenceConfig={"maxTokens": 1024, "temperature": 0.5, "topP": 1},
)
response_text = response["output"]["message"]["content"][0]["text"]

print(response_text)

Both came to the same conclusion, but note the difference in the length of prompt.

In [None]:
print(f"Prompt with retrieval without metadata filter:\n\n{prompt_without_mf}")

In [None]:
print(f"Prompt with retrieval with metadata filter:\n\n{prompt_with_mf}")

### Conclusion

In a more advanced use cases that involes tens of retrieved chunks to answer a complex question, metadata filter can significantly improve the relevancy and accuracy of the response by reducing the noise from unrelated chunks. To enable metadata filtering, you would need a metadata file attached to each of the file indexed in the Knowledge Bases. Additionally, you would need to provide the query filter based on metadata fields at the time of retrieval.

This feature is also commonly used for access control. For example, you can include `user_role` as `admin` or `regular_user` at the query time based on the end user who is asking the question, and set the `user_role` metadata fields based on the desired permission level for each of the file. For example, if you have a proprietry document where only admins should have access, you can set the file's metadata field of `user_role` as `admin`. This way, only admins would get the answers from proprietry documents when querying the Knowledge Bases while regular users won't. Refer to this [AWS Blog](https://aws.amazon.com/blogs/machine-learning/access-control-for-vector-stores-using-metadata-filtering-with-knowledge-bases-for-amazon-bedrock/) for more details.

In some cases, it's hard to manually set the metadata filter for every single retrieval query when calling APIs. Refer to the `autogenerated_metadata_filters notebook` as well as `Implicit metadata filtering` field within the [documentation](https://docs.aws.amazon.com/bedrock/latest/userguide/kb-test-config.html) for more details on generating metadata at query time automatically by defining your schema.