# Observability and human feedback collection for a RAG application using knowledge bases for Amazon bedrock.

### Context:
In the following example, we will use a `Knowledge Base for Amazon Bedrock` that you have already created and add the request and response to an `Amazon Kinesis Data Firehose`. The Amazon Firehose will then apply a transformation on the data to flatten the nested JSON and created logical partitions in the data using `call_type` variable, which will ease data usage later when you query it in a database. 

The data transformation takes place using a `Transformation Lambda function` associated with the Amazon Kinesis Data Firehose. This configuration allows for data transformation without adding any latency to your application due to the transformation step. You can optionally disable the data flattening in the Amazon lambda function.

### Prerequisite
After successfully setting up the backend resources required using the provided `CloudFormation template` to gather necessary data on user requests, your custom metadata like latency, time to first token, tags, model responses, citations, and any other custom identifiers you would like to add (e.g., user_id/customer_id), you can now test if your observability architecture is working as expected and determine the latency introduced by adding this additional component to your application.

#### `Important Note`: 

##### 1. Please use your AWS configuration to fill in the `config.py` file before running the code 

##### 2: Make sure you have upgraded your boto3 version to have at least `1.34.126` version.

#### Section 1:

In the below section, we will go through the code that interacts with the Knowledge Base for Amazon Bedrock to retrieve and generate responses based on user questions. The code imports necessary libraries and modules, including the AWS SDK (boto3) and the observability custom module called `observability` that contains the `BedrockLogs` class for logging, evaluation, and observability purposes.

It sets up AWS clients for Firehose, Bedrock Agent Runtime, and a boto3 session, and imports configuration values from a separate `config` module. The `retrieve_and_generate` function is defined, which takes a question and parameters as input, and calls the `bedrock_agent_runtime_client.retrieve_and_generate` method to retrieve and generate a response from the Amazon Bedrock Knowledge Base based on the provided question and configuration parameters.

The `main` function is decorated with `@bedrock_logs.watch`, which logs and tracks the function call for observability purposes. `@bedrock_logs.watch` tracks first input argument, so, you can choose to pass a JSON and add any metadata according to your use case and the observability solution will track it. Similarly, the observability solution will also track all the return varaiable values. You can also log any custom metric like time to first token, time to last token, or any other custom metric of your choosing inside the decorated function and add it to the return statement so that it gets logged.

Inside the `main` function, the `retrieve_and_generate` function is called with the user's question and generation configuration parameters. The response from the Knowledge Base is stored in the `application_metadata` dictionary, along with the generation parameters, and the generated response text is returned.

The code also includes a helper function `generate_web_session_id` to generate a random session ID, which can be used as a custom identifier for tracking purposes.

In [None]:
# upgrading boto3 to the minimum required version and above.
# ! pip install 'boto3>=1.34.126' -- upgrade

In [None]:
import boto3
import json
import time
from datetime import datetime
import pytz
import string
import random

# Custom Module:
from observability import BedrockLogs

# Import your configuration values
from config import (
    REGION, FIREHOSE_NAME, CRAWLER_NAME, MODEL_ARN, KB_ID, 
    APPLICATION_NAME, CUSTOM_TAG, GUARDRAIL_ID, GUARDRAIL_VERSION,
    MAX_TOKENS, TEMPERATURE, TOP_P
)

# Initialize BedrockLogs in Local mode with feedback variables:
# bedrock_logs = BedrockLogs(delivery_stream_name=FIREHOSE_NAME, 
#                            feedback_variables=True)

# Initialize BedrockLogs in Local mode with feedback variables:
bedrock_logs = BedrockLogs(delivery_stream_name='local', 
                           feedback_variables=True)

# Create AWS clients
boto3_session = boto3.session.Session()
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')

# Function to generate a response from the Amazon Bedrock Knowledge Base
def retrieve_and_generate(question, params):
    if 'sessionId' in params.keys():   
        return bedrock_agent_runtime_client.retrieve_and_generate(
            input={
                'text': question
            },
            retrieveAndGenerateConfiguration={
                'type': 'KNOWLEDGE_BASE',
                'knowledgeBaseConfiguration': {
                    'knowledgeBaseId': KB_ID,
                    'modelArn': MODEL_ARN,
                    'generationConfiguration': {
                        'guardrailConfiguration': {
                            'guardrailId': GUARDRAIL_ID,
                            'guardrailVersion': GUARDRAIL_VERSION
                        },
                        'inferenceConfig': {
                            'textInferenceConfig': {
                                'maxTokens': MAX_TOKENS,
                                'temperature': TEMPERATURE,
                                'topP': TOP_P
                            }
                        },
                    },
                },
            },
            sessionId=params['sessionId']
        )
    else:
        return bedrock_agent_runtime_client.retrieve_and_generate(
            input={
                'text': question
            },
            retrieveAndGenerateConfiguration={
                'type': 'KNOWLEDGE_BASE',
                'knowledgeBaseConfiguration': {
                    'knowledgeBaseId': KB_ID,
                    'modelArn': MODEL_ARN,
                    'generationConfiguration': {
                        'guardrailConfiguration': {
                            'guardrailId': GUARDRAIL_ID,
                            'guardrailVersion': GUARDRAIL_VERSION
                        },
                        'inferenceConfig': {
                            'textInferenceConfig': {
                                'maxTokens': MAX_TOKENS,
                                'temperature': TEMPERATURE,
                                'topP': TOP_P
                            }
                        },
                    },
                },
            }
        )

# dummy value to demonstrate custom metadata:
def generate_web_session_id(length=16):
    # Define the characters to be used in the session ID
    characters = string.ascii_letters + string.digits
    
    # Generate a random string of the specified length
    session_id = ''.join(random.choices(characters, k=length))
    
    return session_id

# Example on how to deploy and track observability or evaluataion:
@bedrock_logs.watch(call_type='Retrieve-and-Generate-with-KB')
def main(application_metadata):
    params = {
        'guardrailId': GUARDRAIL_ID,
        'guardrailVersion': GUARDRAIL_VERSION,
        'maxTokens': MAX_TOKENS,
        'temperature': TEMPERATURE,
        'topP': TOP_P
    }

    response = retrieve_and_generate(application_metadata['question'], params)
    application_metadata['model_response'] = response
    return response['output']['text']

# Important note on sensitive data:
If you do not want the observability module to log a sensitive information, you can simply move that information into a second, third,... variable based on your design. Here are two example use cases:

1. Where you want all input arguments to be a part of your logs:
```python
@bedrock_logs.watch(call_type='Retrieve-and-Generate-with-KB')
def main(json_of_data_to_be_track):
    # implement your metrics and functions
    return None
```
    
2. You do not want sensitive data or other custom data to become part of ythe logs:
```python
@bedrock_logs.watch(call_type='Retrieve-and-Generate-with-KB')
def main(json_of_data_to_be_track, untracked_data< untracked_data2, untracked_json, etc...):
    # implement your metrics and functions
    return None
```

##### Track application logs or evaluation based on your use case:
In the below cell, we will use the decorated function `main()` and pass a custom metadata json `application_metadata` to it. You can configure any custom value, tags, or any metric or an identifier that you want to be tracked here. 

In [None]:
question = 'What is Amazon doing in the field of generative AI?'
# your custom metrics or identifiers, for example:
application_metadata = {
    'webSessionId': generate_web_session_id(),
    'userID': 'User-1',
    'applicationName': APPLICATION_NAME,
    'customTags': CUSTOM_TAG
}

# other custom metrics or identifiers
dt = datetime.fromtimestamp(time.time(), tz=pytz.utc)
application_metadata['request_time'] = dt.strftime('%Y-%m-%dT%H:%M:%SZ')
application_metadata['model_arn'] = MODEL_ARN
application_metadata['question'] = question

# Make the API call. There are three ways:
# 1: if you have enabled local mode and with feedback mode:
response, log, run_id, observation_id  = main(application_metadata)

# 2: if you have enabled firehose mode and with feedback mode:
# response, run_id, observation_id  = main(application_metadata)

# 3: if you have disabled feedback mode:
# response = main(application_metadata)

# printing the run_id and observation_id for reference:
print(f"run_id: {run_id},\nobservation_id: {observation_id}")

#### Section 2: Collecting feedback for your GenAI application responses.

In this section, you are using the `run_id` and `observation_id` generated from the `main function` to collect feedback on the responses from your end-users or QA team. The code defines two functions, `observation_level_feedback` and `session_level_feedback`, both decorated with `@bedrock_logs.watch` to track the feedback collection process.

The `call_type` variable in the decorator is used to create logical partitions in the collected data. This allows you to separate the feedback data based on whether it is collected at the observation level or the session level, making it easier to analyze and process the feedback data later.

The `observation_level_feedback` function is designed to collect feedback at the observation level, which means feedback is associated with a specific `observation_id`. This function takes a dictionary as input, containing the `user_id`, `f_run_id` (the run_id associated with the feedback), `f_observation_id` (the observation_id associated with the feedback), and `actual_feedback` (the feedback itself, which can be a simple "Thumbs-up" or more detailed text).

The `session_level_feedback` function is designed to collect feedback at the session level, which means feedback is associated with a specific `run_id`. The input parameters for this function are not provided in the code snippet.

When using the feedback mechanism, it is crucial to always pass the `run_id` and `observation_id` for which the feedback is being collected like we did with `f_run_id` and `observation_id`. These identifiers act as keys for joining various logically partitioned datasets, allowing you to associate the feedback with the specific response generated by your GenAI application.

The code demonstrates how the `observation_level_feedback` function can be called with a dictionary containing the necessary information, including a dummy `user_feedback` value of "Thumbs-up".

By collecting feedback at the observation or session level and using the `call_type` variable to create logical partitions, you can effectively organize and analyze the feedback data, enabling you to evaluate the performance and quality of the responses, identify areas for improvement, and refine the knowledge base or model accordingly.

In [None]:
@bedrock_logs.watch(call_type='observation-feedback')
def observation_level_feedback(feedback):
    pass

@bedrock_logs.watch(call_type='session-feedback')
def session_level_feedback(feedback):
    pass


# defining a dummy user_feedback:
user_feedback = 'Thumbs-up'

observation_feedback_from_front_end = {
    'user_id': 'User-1',
    'f_run_id': run_id,
    'f_observation_id': observation_id,
    'actual_feedback': user_feedback
}

# log observation-feedback as a separate dataset based on call_type
observation_level_feedback(observation_feedback_from_front_end)

In [None]:
user_feedback = 'Amazing - this is fast and an awesome way to help the customers!'
session_feedback_from_front_end = {
    'user_id': 'User-1',
    'f_run_id': run_id,
    'actual_feedback': user_feedback
}

# session-feedback
session_level_feedback(session_feedback_from_front_end)

### Next Steps:

1. Now that your data is available in Amazon S3, you can `optionally` trigger the `Glue Crawler` to help you with the creation of Amazon `Athena tables`. These Athena tables can be used to create amazing dashboards for analyzing and visualizing the collected data.

2. Using Athena and Amazon S3, you can perform detailed analysis for troubleshooting your application, response evaluation, or build analytical dashboards. The provided screenshots demonstrate how you can not only track metrics for your application but also incorporate any information logged via `@bedrock_logs.watch`, including custom data or metrics like latency, token metrics, cost-related metrics, and more.

3. Below is an example dahsboard **(not a part of this solution)**. These dashboards serve as basic examples, showcasing the potential for visualizing and analyzing the collected data. By leveraging the power of Amazon Athena and S3, you can gain valuable insights into your GenAI application's performance, identify areas for improvement, and make data-driven decisions to enhance the overall user experience.

**Executive Summary**
![kb-observability-dashboard-example1.png](./images/kb-observability-dashboard-example1.png)
![kb-observability-dashboard-example2.png](./images/kb-observability-dashboard-example2.png)

# End