# Lab 1. Data Analytics Agent

## Introduction

In this notebook we show you how to create your first sub-agent on [Amazon Bedrock Agents](https://aws.amazon.com/bedrock/agents/).

Amazon Bedrock Agents enable generative AI applications to execute multi-step business tasks using natural language.

In our first example we will create a data analytics agent, where users can ask the agent to translate natural language questions into data queries and return information about financial transactions and projections. 

The following represents the piece of architecture that will be built on this module.

![Data Analytics Agent Architecture](img/analytics_agent.png)

In this example, we will enable our agent to use code interpretation capabilities to translate natural language questions into structured data queries and perform analysis on financial transaction data. We are also using [Amazon Bedrock Knowledge Bases](https://aws.amazon.com/bedrock/knowledge-bases/) to provide documentation about financial data analysis and query capabilities.

For completion reasons, we assume that the financial projections have already been done outside the scope of this agent using a ML model.

## Setup

Firstly, you are going to install boto3 dependencies from pip. Make sure you have the latest version of it for full capabilities

In [None]:
!pip uninstall boto3 botocore awscli --yes

In [None]:
# Install latest dependencies
!python3 -m pip install --force-reinstall --no-cache -q -r ../requirements.txt

#### Restart kernel

If you face issues to apply the latest multi-agent capabilities, uncomment this line to restart kernel to ensure packages updates to take effect

In [None]:
import IPython

# IPython.Application.instance().kernel.do_shutdown(True)

Check your boto3 version

In [None]:
!pip freeze | grep boto3

#### Create Workshop ID

Next, you are going to create an ID for your resources that will be reused for the remainder of the workshop.

In [None]:
import uuid
import os
from pathlib import Path

def get_or_create_workshop_id():
    workshop_id_file = '../.workshop_id'
    
    if os.path.exists(workshop_id_file):
        with open(workshop_id_file, 'r') as f:
            return f.read().strip()
    else:
        workshop_id = str(uuid.uuid4())[:8]
        with open(workshop_id_file, 'w') as f:
            f.write(workshop_id)
        return workshop_id

workshop_id = get_or_create_workshop_id()
resource_suffix = f"{workshop_id}"
print("Your resource suffix is", resource_suffix)

## Creating Agent

On this section we declare global variables that will be act as helpers during entire notebook and you will start to create your first agent.

In [None]:
import boto3
import os
import json
import time
from datetime import datetime
from dateutil.relativedelta import relativedelta

sts_client = boto3.client('sts')
session = boto3.session.Session()

account_id = sts_client.get_caller_identity()["Account"]
region = session.region_name

s3_client = boto3.client('s3', region)
bedrock_client = boto3.client('bedrock-runtime', region)

agent_foundation_model = [
    'anthropic.claude-3-5-sonnet-20240620-v1:0',
    'anthropic.claude-3-sonnet-20240229-v1:0',
    'anthropic.claude-3-haiku-20240307-v1:0'
]

curr_month = datetime.now()

In [None]:
analytics_agent_name = f"analytics-{resource_suffix}"

data_analytics_lambda_name = f"fn-analytics-agent-{resource_suffix}"

analytics_agent_role_name = f'AmazonBedrockExecutionRoleForAgents_{analytics_agent_name}'

dynamodb_table = f"{analytics_agent_name}-table"
dynamodb_pk = "customer_id"
dynamodb_sk = "day"

dynamoDB_args = [dynamodb_table, dynamodb_pk, dynamodb_sk]

knowledge_base_name = f'{analytics_agent_name}-kb'

knowledge_base_description = "KB containing information on financial data analysis and query capabilities"
bucket_name = f'analytics-agent-kb-{account_id}-{resource_suffix}'


### Importing helper functions

On following section, we're adding `bedrock_agent_helper.py` and `knowledge_base_helper` on Python path, so the files can be recognized and their functionalities can be invoked.

Now, you're going to import from helper classes `bedrock_agent_helper.py` and `knowledge_base_helper.py`.
 
Those files contain helper classes totally focused on make labs experience smoothly. 

All interactions with Bedrock will be handled by these classes.

Following are methods that you're going to invoke on this lab:

On `agents.py`:
- `create_agent`: Create a new agent and respective IAM roles
- `add_action_group_with_lambda`: Create a lambda function and add it as an action group for a previous created agent
- `create_agent_alias`: Create an alias for this agent
- `invoke`: Execute agent

On `knowledge_bases.py`:
- `create_or_retrieve_knowledge_base`: Create Knowledge Base on Amazon Bedrock if it doesn't exist or get info about previous created.
- `synchronize_data`: Read files on S3, convert text info into vectors and add that information on Vector Database.

In [None]:
import sys

sys.path.insert(0, ".")
sys.path.insert(1, "..")

from utils.bedrock_agent_helper import (
    AgentsForAmazonBedrock
)
from utils.knowledge_base_helper import (
    KnowledgeBasesForAmazonBedrock
)
agents = AgentsForAmazonBedrock()
kb = KnowledgeBasesForAmazonBedrock()

## Create and syncronize Knowledge Base

On this section, you're going to create a Amazon Bedrock Knowledge Base and ingest data on it.

This data contains basic information about how projections process is done.

**This creation process can take several minutes.**

In [None]:
%%time
kb_id, ds_id = kb.create_or_retrieve_knowledge_base(
    knowledge_base_name,
    knowledge_base_description,
    bucket_name
)

print(f"Knowledge Base ID: {kb_id}")
print(f"Data Source ID: {ds_id}")

## Create Synthetic Data to Load on S3

Instead of get data elsewhere, you're going to generate data, using a LLM on Amazon Bedrock.
This fake data that will be generated, will be uploaded into a S3 bucket and then added into an Amazon Bedrock Knowledge Base.

In [None]:
path = "kb_documents"

# Check whether the specified path exists or not
is_exist = os.path.exists(path)
if not is_exist:
   # Create a new directory if it does not exist
   os.makedirs(path)
   print("The {} directory was created!".format(path))
else:
   print("The {} directory already exists!".format(path))

Creating helper methods to invoke LLM on Bedrock and to write a local file using Python

In [None]:
def invoke_bedrock_generate_financial_files(prompt):
    message_list = []

    initial_message = {
        "role": "user",
        "content": [
            {
                "text": prompt
            }
        ],
    }

    message_list.append(initial_message)

    response = bedrock_client.converse(
        modelId=agent_foundation_model[0],
        messages=message_list,
        inferenceConfig={
            "maxTokens": 2048,
            "temperature": 0
        },
    )

    return response['output']['message']


def write_file(file_name, content):
    f = open(file_name, 'w')
    f.write(content)
    f.close()

### Generating data prompt
Generating one file with projectionsing info using the LLM model

In [None]:
text_generation_financial_instructions = '''
    You will act as a data scientist who specializes in financial data analysis using Python and data science libraries. You will generate a step-by-step guide on how to analyze financial transaction data and translate natural language questions into structured queries.

    This data has the following json structure:
    {
        "user_id": "1",
        "day": "2025/03/01",
        "transactionAmount": "1460.75",
        "type": "actual",
        "transactionType": "deposit"
    }

    Explain how to translate natural language questions into structured data queries for financial analysis. Provide a step-by-step guide with code samples for analyzing transaction patterns, identifying trends, and generating insights from financial data.

    Include explanations on how to interpret transaction patterns, identify anomalies, and extract meaningful insights from financial data to support decision-making.

    Answer only with the step-by-step, avoid answer with afirmations like:
    "OK, they can generate it," or "Yes, please find following example."
    Be direct and only reply the step-by-step.
'''

financial_analytics_file_name = 'financial-analytics-info.txt'

response_message = invoke_bedrock_generate_financial_files(
    text_generation_financial_instructions
)

print("Generated data to be stored in the KB:\n", response_message['content'][0]['text'])
write_file(
    '{}/{}'.format(path, financial_analytics_file_name),
    response_message['content'][0]['text']
)

### Uploading data to s3
Uploading generated files into an Amazon S3 Bucket.

In [None]:
def upload_directory(path, bucket_name):
    for root,dirs,files in os.walk(path):
        for file in files:
            file_to_upload = os.path.join(root,file)
            print(f"uploading file {file_to_upload} to {bucket_name}")
            s3_client.upload_file(file_to_upload,bucket_name,file)

### Synchronizing Knowledge Base
Now that the data is available in the s3 bucket, let's synchronize it to our knowledge base

In [None]:
upload_directory("kb_documents", bucket_name)

# sync knowledge base
kb.synchronize_data(kb_id, ds_id)

You are a financial data analytics assistant that helps users analyze their transaction data.Your primary capabilities are:1. Translating natural language questions into structured data queries2. Providing information about historical transactions, projected transactions, and transaction statistics3. Performing calculations and analysis on financial data4. Explaining financial trends and patterns in simple termsWhen bank operators ask questions about customer data, try to understand what they're looking for and translate their request into the appropriate query. For example:User: "Show recent transactions for customer with ID 1"You should use the get_historical_transactions function with the customer_id.User: "What are the projected transactions for customer with ID 1 next month?"You should use the get_projected_transactions function with the customer_id.User: "How much did customer with ID 2 spend on withdrawals last month?"You should use the get_historical_transactions function with the customer_id, then analyze the results to calculate the total amount of withdrawals.User: "Can you analyze spending patterns for customer with ID 3?"You should use the get_transaction_statistics function with the customer_id and provide insights on their transaction patterns.When providing information, always be clear and concise. Explain financial concepts in simple terms and highlight important trends or patterns in the data. If you need to perform calculations, show your work so the user can understand how you arrived at your conclusions.Remember that bank operators may need to explain financial concepts to customers, so avoid jargon when possible. If you need to use technical terms, provide brief explanations.Your goal is to help bank operators analyze customer transaction data and provide insights through natural language interaction.


In [None]:
kb_info = kb.get_kb(kb_id)
kb_arn = kb_info['knowledgeBase']['knowledgeBaseArn']

In [None]:
kb_config = {
    'kb_id': kb_id,
    'kb_instruction': """Access this knowledge base when needing to explain specific projections generation methodology."""
}

In [None]:
agent_description = """You are a transaction data projections bot.
You can retrieve historical transaction amounts, projectionsed transactions, amount statistics and update a projections for a specific user"""

agent_instruction = """You are a financial data analytics assistant that helps users analyze their transaction data.
You are a financial data analytics assistant that helps users analyze their transaction data.

Your primary capabilities are:
1. Translating natural language questions into structured data queries
2. Providing information about historical transactions, projected transactions, and transaction statistics
3. Performing calculations and analysis on financial data
4. Explaining financial trends and patterns in simple terms

When bank operators ask questions about customer data, try to understand what they're looking for and translate their request into the appropriate query. For example:

User: "Show recent transactions for customer with ID 1"
You should use the get_historical_transactions function with the customer_id.

User: "What are the projected transactions for customer with ID 1 next month?"
You should use the get_projected_transactions function with the customer_id.

User: "How much did customer with ID 2 spend on withdrawals last month?"
You should use the get_historical_transactions function with the customer_id, then analyze the results to calculate the total amount of withdrawals.

User: "Can you analyze spending patterns for customer with ID 3?"
You should use the get_transaction_statistics function with the customer_id and provide insights on their transaction patterns.

When providing information, always be clear and concise. Explain financial concepts in simple terms and highlight important trends or patterns in the data. If you need to perform calculations, show your work so the user can understand how you arrived at your conclusions.

Remember that bank operators may need to explain financial concepts to customers, so avoid jargon when possible. If you need to use technical terms, provide brief explanations.

Your goal is to help bank operators analyze customer transaction data and provide insights through natural language interaction.

"""

analytics_agent = agents.create_agent(
    analytics_agent_name,
    agent_description,
    agent_instruction,
    agent_foundation_model,
    kb_arns=[kb_arn],
    code_interpretation=True
)

analytics_agent

### Associating knowledge base
Now that we've created the agent, let's associate the previously created knowledge base to it.

In [None]:
agents.associate_kb_with_agent(
    analytics_agent[0],
    kb_config['kb_instruction'],
    kb_config['kb_id']
)

### Creating Lambda

In order to enable the agent to execute tasks, we will create an AWS Lambda function that implements the tasks execution. We will then provide this lambda function to the agent action group. You can find more information on how to use action groups to define actions that your agent can perform [here](https://docs.aws.amazon.com/bedrock/latest/userguide/agents-action-create.html)

On this block, we're going to generate Lambda function Code:

In [None]:
%%writefile financial_analytics.py
import boto3
import json
import os

from boto3.dynamodb.conditions import Key, Attr
from datetime import datetime
from decimal import Decimal

dynamodb_resource = boto3.resource('dynamodb')
dynamodb_table = os.getenv('dynamodb_table')
dynamodb_pk = os.getenv('dynamodb_pk')
dynamodb_sk = os.getenv('dynamodb_sk')
truncated_month = datetime.today().replace(day=1, hour=0, minute=0, second=0, microsecond=0)


def get_named_parameter(event, name):
    return next(item for item in event['parameters'] if item['name'] == name)['value']
    
def populate_function_response(event, response_body):
    return {'response': {'actionGroup': event['actionGroup'], 'function': event['function'],
                'functionResponse': {'responseBody': {'TEXT': {'body': str(response_body)}}}}}

def trunc_datetime(month,year):
    return datetime.today().replace(year =int(year), month=int(month), day=1, hour=0, minute=0, second=0, microsecond=0)

def put_dynamodb(table_name, item):
    table = dynamodb_resource.Table(table_name)
    resp = table.put_item(Item=item)
    return resp

def read_dynamodb(
    table_name: str, 
    pk_field: str,
    pk_value: str,
    sk_field: str=None, 
    sk_value: str=None,
    attr_key: str=None,
    attr_val: str=None
):
    try:

        table = dynamodb_resource.Table(table_name)
        # Create expression
        if sk_field:
            key_expression = Key(pk_field).eq(pk_value) & Key(sk_field).eq(sk_value)
        else:
            key_expression = Key(pk_field).eq(pk_value)

        if attr_key:
            attr_expression = Attr(attr_key).eq(attr_val)
            query_data = table.query(
                KeyConditionExpression=key_expression,
                FilterExpression=attr_expression
            )
        else:
            query_data = table.query(
                KeyConditionExpression=key_expression
            )
        
        return query_data['Items']
    except Exception:
        print(f'Error querying table: {table_name}.')

def get_projected_transactions(user_id):
    return read_dynamodb(dynamodb_table, 
                         dynamodb_pk, 
                         user_id, 
                         attr_key="type", attr_val="projected")

def get_historical_transactions(user_id):
    return read_dynamodb(dynamodb_table, 
                         dynamodb_pk, 
                         user_id, 
                         attr_key="type", attr_val="actual")

def get_transaction_statistics(user_id):
    return read_dynamodb(dynamodb_table, 
                         dynamodb_pk, 
                         user_id, 
                         dynamodb_sk, 
                         truncated_month.strftime('%Y/%m/%d'))

def update_projections(user_id, month, year, amount):
    current_date = trunc_datetime(month, year)
    if  current_date >= truncated_month:
        item = {
            'customer_id': user_id,
            'day': current_date.strftime('%Y/%m/%d'),
            'transactionAmount': Decimal(amount),
            'type': 'projected'
        }
        put_dynamodb(dynamodb_table, item)
        return "Day: {} updated for user: {}".format(current_date.strftime('%Y/%m/%d'), user_id)
    else:
        return "You're trying to change a past date: {} for user: {}, which is not allowed".format(current_date.strftime('%Y/%m/%d'), user_id)

def lambda_handler(event, context):
    print(event)
    
    # name of the function that should be invoked
    function = event.get('function', '')

    # parameters to invoke function with
    parameters = event.get('parameters', [])
    user_id = get_named_parameter(event, "user_id")

    if function == 'get_projected_transactions':
        result = get_projected_transactions(user_id)
    elif function == 'get_historical_transactions':
        result = get_historical_transactions(user_id)
    elif function == 'get_transaction_statistics':
        result = get_transaction_statistics(user_id)
    elif function == 'update_projections':
        month = get_named_parameter(event, "month")
        year = get_named_parameter(event, "year")
        amount = get_named_parameter(event, "amount")
        result = update_projections(user_id, month, year, amount)
    else:
        result = f"Error, function '{function}' not recognized"

    response = populate_function_response(event, result)
    print(response)
    return response

### Defining available actions

Next we will define the available actions that an agent can perform using [Function Details](https://docs.aws.amazon.com/bedrock/latest/userguide/agents-action-function.html). You can also do this task using OpenAPI Schemas, which can be very useful if you already have an OpenAPI schema available for your application.

When creating your function details, it is important to provide clear descriptions for the function and for its parameters, as your agent depends on them to correctly orchestrate the tasks to be executed

In [None]:
functions_def = [
    {
        "name": "get_projected_transactions",
        "description": """Gets the next 3 months transaction data projections""",
        "parameters": {
            "user_id": {
                "description": "Unique user identifier",
                "required": True,
                "type": "string"
            }
        }
    },
    {
        "name": "get_historical_transactions",
        "description": """Gets transaction data history to date""",
        "parameters": {
            "user_id": {
                "description": "Unique user identifier",
                "required": True,
                "type": "string"
            }
        }
    },
    {
        "name": "get_transaction_statistics",
        "description": """Gets current month amount analytics""",
        "parameters": {
            "user_id": {
                "description": "Unique user identifier",
                "required": True,
                "type": "string"
            }
        }
    },
    {
        "name": "update_projections",
        "description": """Updates the financial projections for a specific month""",
        "parameters": {
            "user_id": {
                "description": "Unique user identifier",
                "required": True,
                "type": "string"
            },
            "month": {
                "description": "Target update month. In the format MM",
                "required": True,
                "type": "integer"
            },
            "year": {
                "description": "Target update year. In the format YYYY",
                "required": True,
                "type": "integer"
            },
            "amount": {
                "description": "New transactions value",
                "required": True,
                "type": "integer"
            }
        }
    }
]

### Creating action group and attaching to the agent
Now it's time to add this Lambda function and the function details as an action group for this agent and prepare it.

In [None]:
agents.add_action_group_with_lambda(
    agent_name=analytics_agent_name,
    lambda_function_name=data_analytics_lambda_name,
    source_code_file="financial_analytics.py",
    agent_functions=functions_def,
    agent_action_group_name="financial_analytics_actions",
    agent_action_group_description="Function to get amount projections for a user ",
    dynamo_args=dynamoDB_args
)

## Loading data to DynamoDB

Now that we've created our agent, let's load some generated data to DynamoDB. That will allow the agent to interact with some live data to perform actions

In [None]:
agents.generate_fake_data_dynamodb()

In [None]:
with open("1_user_sample_data.json") as f:
    table_items = [json.loads(line) for line in f]
    
agents.load_dynamodb(dynamodb_table, table_items)

Testing that data was loaded on DynamoDB

In [None]:
today_query = f"{curr_month.year}/{curr_month.month:02d}/01"

resp = agents.query_dynamodb(
    dynamodb_table, dynamodb_pk, '1', dynamodb_sk, today_query
)
resp

## Testing Data Analytics Agent

Now, let's run some tests on the agent we just created to make sure it's working. To do so we will use our test alias: `TSTALIASID` which allows you to invoke a draft version of your agent

### Testing get projected transactions
First let's test the get projected transactions action

In [None]:
%%time
response = agents.invoke(
    """can you show projected transactions for customer with ID 1? 
    How do they compare with their past transaction data? Customer ID is 1""", 
    analytics_agent[0], enable_trace=True
)
print("====================")
print(response)

In [None]:
time.sleep(60)

### Testing get historical transactions
Now we can test the historical transaction data and also use code interpretation to calculate the average transaction amounts by type

In [None]:
%%time
response = agents.invoke(
    "can you show past transaction amounts for customer with ID 1? What is their average spending during summer months?", 
    analytics_agent[0], enable_trace=True
)
print("====================")
print(response)

In [None]:
time.sleep(60)

In [None]:
analytics_agent[0]

### Testing knowledge base access
Now let's check the knowledge base access by asking a question about financial data analysis

In [None]:
%%time
response = agents.invoke(
    "What's algorithm used for projections?", 
    analytics_agent[0], enable_trace=True
)
print("====================")
print(response)

In [None]:
time.sleep(60)

### Testing transaction projections update
Now we can test the functionality to update the projected transactions

In [None]:
%%time

future_2m = curr_month + relativedelta(months=2)
future_2m_formatted = future_2m.strftime("%Y/%m")

response = agents.invoke(
    f"Can you update projected transactions for month {future_2m_formatted} for customer with ID 1? They will be travelling and their estimate will be 50.", 
    analytics_agent[0], enable_trace=True
)
print("====================")
print(response)

In [None]:
time.sleep(60)

### Confirming that projections was updated
After updating our projections, let's check that the projections was updated and plot a new graph

In [None]:
%%time
response = agents.invoke(
    "Can you show projected transaction amounts month by month for customer with ID 1?", 
    analytics_agent[0], enable_trace=True
)
print("====================")
print(response)

In [None]:
time.sleep(60)

### Testing transaction statistics
Finally, let's test the transaction statistics functionality

In [None]:
%%time
response = agents.invoke(
    "can you show recent transactions for customer with ID 1?", 
    analytics_agent[0], enable_trace=True
)
print("====================")
print(response)

## Create alias

As you can see, you can use your agent with the `TSTALIASID` to complete tasks. 
However, for multi-agents collaboration it is expected that you first test your agent and only use it once it is fully functional. 
Therefore to use an agent as a sub-agent in a multi-agent collaboration you first need to create an agent alias and connect it to a new version. 

Since we've tested and validated our agent, let's now create an alias for it:

In [None]:
analytics_agent_alias_id, analytics_agent_alias_arn = agents.create_agent_alias(
    analytics_agent[0], 'v1'
)

## Saving information
Let's store some environment variables to be used on our next notebooks.

In [None]:
analytics_agent_arn = agents.get_agent_arn_by_name(analytics_agent_name)
analytics_agent_id = analytics_agent[0]
data_analytics_kb = knowledge_base_name
data_analytics_dynamodb = dynamodb_table

%store analytics_agent_arn
%store analytics_agent_alias_arn
%store analytics_agent_alias_id
%store data_analytics_lambda_name
%store analytics_agent_name
%store analytics_agent_id
%store data_analytics_kb
%store data_analytics_dynamodb

In [None]:
analytics_agent_arn, analytics_agent_alias_arn, analytics_agent_alias_id

## Next Steps
Congratulations! We've now created a data analytics agent. Next we will create our user insights agent