# Lab 3. Peak Load Manager

## Introduction

In this notebook we show you how to create your third and last sub-agent on Amazon Bedrock Agents.

This agent identifies non-essential processes that can be shifted to off-peak hours and redistributes the grid allocation.

This agent can also provide energy saving tips based on the search of videos embedded in Amazon Bedrock Knowledge Bases using insights generated with [Amazon Bedrock Data Automation (BDA)](https://aws.amazon.com/bedrock/bda/).
BDA automates the generation of useful insights from unstructured multimodal content such as documents, images, audio, and video for your AI-powered applications.

The following represents the piece of architecture that will be built on this module.

![Architecture](img/peak_load_agent.png)

## Setup

Before you start, please ensure you selected the notebook kernel as Python 3, and run the following cell to make sure that your boto3 version is the latest one.

If not, return no [notebook 1](../1-energy-forecast/1_forecasting_agent.ipynb) and run Setup block again.

In [None]:
# Install latest dependencies
!python3 -m pip install --upgrade "boto3>=1.37.4"

In [None]:
!pip freeze | grep boto3

In [None]:
!pip uninstall -y numpy
!pip install "numpy<2.0"

Get your workshop ID / resource suffix. If not found, return to [notebook 1](../1-energy-forecast/1_forecasting_agent.ipynb) and run the Setup block again.

In [None]:
import os

def get_workshop_id():
    workshop_id_file = '../.workshop_id'
    if os.path.exists(workshop_id_file):
        with open(workshop_id_file, 'r') as f:
            return f.read().strip()
    else:
        return None
    
workshop_id = get_workshop_id()
resource_suffix = f"{workshop_id}"

if workshop_id is None:
    print("No workshop ID found. Please run the Setup script in notebook 1.")
else:
    print("Your resource suffix is", resource_suffix)

## Creating Agent

On this section we declare global variables that will be act as helpers during entire notebook and we will start to create out second agent.

In [None]:
import boto3
import sagemaker
import os
import json, uuid
import random
import time
sts_client = boto3.client('sts')
boto_session = boto3.session.Session()
region = boto_session.region_name

session = sagemaker.Session()
bucket_name = session.default_bucket()

account_id = sts_client.get_caller_identity()["Account"]

agent_foundation_model = [
    'anthropic.claude-3-5-sonnet-20240620-v1:0',
    'anthropic.claude-3-sonnet-20240229-v1:0',
    'anthropic.claude-3-haiku-20240307-v1:0',
]


In [None]:
peak_agent_name = f"peak-agent-{resource_suffix}"

peak_lambda_name = f"fn-peak-agent-{resource_suffix}"

peak_agent_role_name = f'AmazonBedrockExecutionRoleForAgents_{peak_agent_name}'

dynamodb_table = f"{peak_agent_name}-table"
dynamodb_pk = "customer_id"
dynamodb_sk = "item_id"

dynamoDB_args = [dynamodb_table, dynamodb_pk, dynamodb_sk]


### Importing helper functions

On following section, we're adding `bedrock_agent_helper.py` on Python path, so the files can be recognized and their functionalities can be invoked.

Now, you're going to import from helper classes `bedrock_agent_helper.py`.
 
Those files contain helper classes totally focused on make labs experience smoothly. 

All interactions with Bedrock will be handled by these classes.

Following are methods that you're going to invoke on this lab:

On `agents.py`:
- `create_agent`: Create a new agent and respective IAM roles
- `add_action_group_with_lambda`: Create a lambda function and add it as an action group for a previous created agent
- `create_agent_alias`: Create an alias for this agent
- `invoke`: Execute agent

In [None]:
import sys

sys.path.insert(0, ".")
sys.path.insert(1, "..")

from utils.bedrock_agent_helper import (
    AgentsForAmazonBedrock
)
agents = AgentsForAmazonBedrock()

## Creating Agent
Create the Peak Load Manager agent that will have an action group to handle resource allocation and non-essential processes detection.

For this agent we will use the following instructions:
```
You are a Peak Load Manager Bot that optimizes energy consumption patterns by analyzing IoT device data and process schedules.

Your capabilities include:
1. Retrieving data from IoT devices
2. Identifying non-essential loads during peak hours and reallocating them to other schedules
3. Recommending schedule adjustments

Response style:
- Be precise and analytical
- Use clear, practical language
- Focus on actionable recommendations
- Support suggestions with data
- Be concise yet thorough
- Do not request information that can be retrieved from IoT devices
```

And we will make the following tool available to the agent:
- `detect_peak`: detect consumption peak during current month
- `detect_non_essential_processes`: detect non-essential processes that are causing the peaks
- `redistribute_allocation`: reduce/increase allocated quota for a specific item during current month

In [None]:
peak_agent = agents.create_agent(
    peak_agent_name,
    """You are a peak load manager bot. 
    You can retrieve information from IoT devices and Knowledge Bases, 
    identify process and their peak energy consumption and suggest shifts to off-peak hours.
    """,
    """You are a Peak Load Manager Bot that optimizes energy consumption patterns
by analyzing IoT device data, video information from Knowledge Bases and process schedules.

Your capabilities include:
1. Retrieving data from IoT devices and Knowledge Bases
2. Identifying non-essential loads during peak hours and reallocating them to other schedules
3. Recommending schedule adjustments
4. Identify energy waste patterns inside residential areas and recommend energy saving tips

Response style:
- Be precise and analytical
- Use clear, practical language
- Focus on actionable recommendations
- Support suggestions with data
- Be concise yet thorough
- Do not request information that can be retrieved from IoT devices
    """,
    agent_foundation_model
)

peak_agent

## Creating BDA project
To start a BDA job, you need a BDA project, which organizes both standard and custom output configurations. This project is reusable, allowing you to apply the same configuration to process multiple video/audio files that share the same settings.

In [None]:
bda_client = boto3.client('bedrock-data-automation', region_name=region)
bda_runtime_client = boto3.client('bedrock-data-automation-runtime', region_name=region)
s3_client = boto3.client('s3', region_name=region)

kb_bucket_name = f'peak-load-kb-datasource-{account_id}-{resource_suffix}'

s3_client.create_bucket(
    Bucket=kb_bucket_name,
    CreateBucketConfiguration={'LocationConstraint': region} # Comment this out if you are in us-east-1
)

bucket_name_input = f's3://{bucket_name}/bda/input'      # DBA input path
bucket_name_output = f's3://{bucket_name}/bda/output'    # DBA output path

In [None]:
project_name= f'bda-kb-project-{resource_suffix}'

# delete project if it already exists
projects_existing = [project for project in bda_client.list_data_automation_projects(projectStageFilter='ALL')["projects"] if project["projectName"] == project_name]
if len(projects_existing) >0:
    print(f"Deleting existing project: {projects_existing[0]}")
    bda_client.delete_data_automation_project(projectArn=projects_existing[0]["projectArn"])

In [None]:
response = bda_client.create_data_automation_project(
    projectName=project_name,
    projectDescription='BDA video processing project',
    projectStage='DEVELOPMENT',
    standardOutputConfiguration={
        "video": {
            "extraction": {
                "category": {
                    "state": "ENABLED",
                    "types": ["CONTENT_MODERATION", "TEXT_DETECTION", "TRANSCRIPT"]
                },
                "boundingBox": {"state": "ENABLED"}
            },
            "generativeField": {
                "state": "ENABLED",
                "types": ["VIDEO_SUMMARY", "CHAPTER_SUMMARY", "IAB"]
            }
        },
        "audio": {
            "extraction": {
                "category": {
                    "state": "ENABLED", 
                    "types": ["AUDIO_CONTENT_MODERATION", "TOPIC_CONTENT_MODERATION", "TRANSCRIPT"]
                }
            },
            "generativeField": {
                "state": "ENABLED",
                "types": ["AUDIO_SUMMARY", "TOPIC_SUMMARY", "IAB"]
            }
        }
    }
)


In [None]:
kb_project_arn = response.get("projectArn")
print("BDA kb project ARN:", kb_project_arn)

### Start BDA tasks
We will now invoke the BDA API to process the uploaded audio file. You need to provide the BDA project ARN that we created at the beginning of the lab and specify an S3 location where BDA will store the output results.

For a complete API reference for invoke a BDA async task, refer to this [document](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-data-automation-runtime/client/invoke_data_automation_async.html).

In [None]:
import os
import boto3
import requests
from pathlib import Path

# Create video directory if it doesn't exist
video_dir = "./video"
Path(video_dir).mkdir(parents=True, exist_ok=True)

# List of video URLs and their target filenames
videos = [
    ("IMG_4814.MOV", "https://ws-assets-prod-iad-r-pdx-f3b3f9f1a7d6a3d0.s3.us-west-2.amazonaws.com/1031afa5-be84-4a6a-9886-4e19ce67b9c2/video/IMG_4814.MOV"),
    ("IMG_4813.MOV", "https://ws-assets-prod-iad-r-pdx-f3b3f9f1a7d6a3d0.s3.us-west-2.amazonaws.com/1031afa5-be84-4a6a-9886-4e19ce67b9c2/video/IMG_4813.MOV")
]

# Download the videos
for filename, url in videos:
    output_path = os.path.join(video_dir, filename)
    print(f"Downloading {filename}...")
    
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        with open(output_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
        print(f"Successfully downloaded {filename}")
    else:
        print(f"Failed to download {filename}")


In [None]:
time.sleep(15)

In [None]:
# Upload video files
import os
from IPython.display import JSON, IFrame, Video, display, clear_output
from datetime import datetime
import time
from time import sleep

path="./video"
        
for root,dirs,files in os.walk(path):
    for file in files:
        file_to_upload = os.path.join(root,file)
                       
        file_input = f'bda/input/video/{file}'
        
        print(f"uploading file {file_to_upload} to {bucket_name}")
        s3_client.upload_file(file_to_upload,bucket_name,file_input)

        output_name = f'bda/output/'
        # Start BDA task video
        response_vid = bda_runtime_client.invoke_data_automation_async(
            inputConfiguration={'s3Uri':  f"s3://{bucket_name}/{file_input}"},
            outputConfiguration={'s3Uri': f"s3://{bucket_name}/{output_name}"},
            dataAutomationProfileArn= f'arn:aws:bedrock:{region}:{account_id}:data-automation-profile/us.data-automation-v1',
            dataAutomationConfiguration={
                'dataAutomationProjectArn':kb_project_arn,
                #'dataAutomationArn': kb_project_arn,
                'stage': 'DEVELOPMENT'
            })

        invocation_video_arn = response_vid.get("invocationArn")
        print("BDA video task started:", invocation_video_arn)

        statusVideo, status_vid_response = None, None
        
        while statusVideo not in ["Success","ServiceError","ClientError"]:
            status_vid_response = bda_runtime_client.get_data_automation_status(
                invocationArn=invocation_video_arn
            )
            statusVideo = status_vid_response.get("status")
    
            clear_output(wait=True)
            print(f"{datetime.now().strftime('%H:%M:%S')} : "\
              f"BDA kb video task: {statusVideo} ")
            time.sleep(5)


        output_vid_config = status_vid_response.get("outputConfiguration",{}).get("s3Uri")
        print("Ouput configuration file:", output_vid_config)

        # prep BDA output for the kb
        out_vid_loc = status_vid_response['outputConfiguration']['s3Uri'].split("/job_metadata.json", 1)[0].split(bucket_name+"/")[1]
        out_vid_loc += "/0/standard_output/0/result.json"
        print(out_vid_loc)
        s3_client.download_file(bucket_name, out_vid_loc, f'result_vid_{file}.json')
        
        kb_file = f'data/result_vid_{file}_kb.json'
        local_file =f'result_vid_{file}.json'
        
        #filter_json(f'result_vid_{file}.json', local_file)

        print(f"uploading file {local_file} to KB bucket {kb_bucket_name}")
        s3_client.upload_file(local_file, kb_bucket_name, kb_file )
        
        


In [None]:
# Get the absolute path to the parent directory
current_dir = os.path.dirname(os.path.abspath(''))
parent_dir = os.path.dirname(current_dir)

# Add both current and parent directories to sys.path
sys.path.insert(0, current_dir)
sys.path.insert(1, parent_dir)

from utils.knowledge_base import BedrockKnowledgeBase

knowledge_base_name = 'peak-load-kb-video'
knowledge_base_description = "Knowledge Base containing peak load video data that show various energy consumption patterns inside a house"

data=[{"type": "S3", "bucket_name": kb_bucket_name}]
# For multi-modal RAG While instantiating BedrockKnowledgeBase, pass multi_modal= True and choose the parser you want to use

knowledge_base = BedrockKnowledgeBase(
    kb_name= f'{knowledge_base_name}_{suffix}',
    kb_description=knowledge_base_description,
    data_sources=data,
    multi_modal= True,
    parser= 'BEDROCK_DATA_AUTOMATION', #'BEDROCK_FOUNDATION_MODEL'
    chunking_strategy = "FIXED_SIZE", 
    suffix = f'{resource_suffix}-f'
)

### Start the knowledge Bases ingestion job

In [None]:
# ensure that the kb is available
time.sleep(30)

# sync knowledge base
knowledge_base.start_ingestion_job()

time.sleep(30)
# keep the kb_id for invocation later in the invoke request
kb_id = knowledge_base.get_knowledge_base_id()
%store kb_id

### Associating Knowledge Base to agent

In [None]:
# Associate knowledge base
kb_response = agents.associate_kb_with_agent(
    agent_id=peak_agent[0],
    description="This knowledge base contains relevant information for the agent to find energy waste pattern inside a household",
    kb_id=kb_id
)

time.sleep(30)

## Creating Action Group

On this session, we're going create an action group to handle the peak menagement and associate it with our agent. To do so, we will first create a Lambda function code to fulfill the execution of the agent's actions Next we will define the actions available actions that an agent can take using function details. Similar to the previous agent, you can also define the actions available using OpenAPI Schema.

#### Creating Lambda function
First let's create the lambda function

In [None]:
%%writefile peak_load.py
import os
import boto3
import json
import random

from boto3.dynamodb.conditions import Key, Attr

dynamodb_resource = boto3.resource('dynamodb')
dynamodb_table = os.getenv('dynamodb_table')
dynamodb_pk = os.getenv('dynamodb_pk')
dynamodb_sk = os.getenv('dynamodb_sk')

def get_named_parameter(event, name):
    return next(item for item in event['parameters'] if item['name'] == name)['value']
    
def populate_function_response(event, response_body):
    return {'response': {'actionGroup': event['actionGroup'], 'function': event['function'],
                'functionResponse': {'responseBody': {'TEXT': {'body': str(response_body)}}}}}

def put_dynamodb(table_name, item):
    table = dynamodb_resource.Table(table_name)
    
    resp = table.update_item(
        Key={'customer_id': item['customer_id'],
             'item_id': item['item_id']},
        UpdateExpression='SET #attr1 = :val1',
        ExpressionAttributeNames={'#attr1': 'quota'},
        ExpressionAttributeValues={':val1':  item['quota']}
    )
    return resp

def read_dynamodb(
    table_name: str, 
    pk_field: str,
    pk_value: str,
    sk_field: str=None, 
    sk_value: str=None,
    attr_key: str=None,
    attr_val: str=None
):
    try:

        table = dynamodb_resource.Table(table_name)
        # Create expression
        if sk_field:
            key_expression = Key(pk_field).eq(pk_value) & Key(sk_field).eq(sk_value)
        else:
            key_expression = Key(pk_field).eq(pk_value)

        if attr_key:
            attr_expression = Attr(attr_key).eq(attr_val)
            query_data = table.query(
                KeyConditionExpression=key_expression,
                FilterExpression=attr_expression
            )
        else:
            query_data = table.query(
                KeyConditionExpression=key_expression
            )
        
        return query_data['Items']
    except Exception:
        print(f'Error querying table: {table_name}.')


def detect_peak(customer_id):
    return read_dynamodb(dynamodb_table, 
                         dynamodb_pk, 
                         customer_id, 
                         attr_key="peak", attr_val="True")

def detect_non_essential_processes(customer_id):
    return read_dynamodb(dynamodb_table, 
                         dynamodb_pk, 
                         customer_id,
                         attr_key="essential", attr_val="False")

                
def redistribute_allocation(customer_id, item_id, quota):
    item = {
        'customer_id': customer_id,
        'item_id': item_id,
        'quota': quota
    }
    resp = put_dynamodb(dynamodb_table, item)
    return "Item {} has been updated. New quota: {}".format(item_id, quota)


def lambda_handler(event, context):
    print(event)
    
    # name of the function that should be invoked
    function = event.get('function', '')

    # parameters to invoke function with
    parameters = event.get('parameters', [])
    
    customer_id = get_named_parameter(event, "customer_id")

    if function == 'detect_peak':    
        result = detect_peak(customer_id)
    elif function == 'detect_non_essential_processes':    
        result = detect_non_essential_processes(customer_id)
    elif function == 'redistribute_allocation':    
        item_id = get_named_parameter(event, "item_id")
        quota = get_named_parameter(event, "quota")
        result = redistribute_allocation(customer_id, item_id, quota)
    else:
        result = f"Error, function '{function}' not recognized"

    response = populate_function_response(event, result)
    print(response)
    return response

### Defining available actions
Now it's time to define the actions that can be taken by the agent

In [None]:
functions_def = [
    {
        "name": "detect_peak",
        "description": """detect consumption peak during current month""",
        "parameters": {
                        "customer_id": {
                            "description": "The ID of the customer",
                            "required": True,
                            "type": "string"
                        }
                    }
    },
    {
        "name": "detect_non_essential_processes",
        "description": """detect non-essential processes that are causing the peaks""",
        "parameters": {
                        "customer_id": {
                            "description": "The ID of the customer",
                            "required": True,
                            "type": "string"
                        }
                    }
    },
    {
        "name": "redistribute_allocation",
        "description": """reduce/increase allocated quota for a specific 
                            item during current month""",
        "parameters": {
                        "customer_id": {
                            "description": "The ID of the customer",
                            "required": True,
                            "type": "string"
                        },
                        "item_id": {
                            "description": "Item that will be updated",
                            "required": True,
                            "type": "string"
                        },
                        "quota": {
                            "description": "new quota",
                            "required": True,
                            "type": "string"
                        }
                    }
    }
]

### Associating action group to agent
Finally, we can associate a new action group with our previously created agent

In [None]:
resp = agents.add_action_group_with_lambda(
    agent_name=peak_agent_name,
    lambda_function_name=peak_lambda_name,
    source_code_file="peak_load.py",
    agent_functions=functions_def,
    agent_action_group_name="peak_load_actions",
    agent_action_group_description="Function to get usage, peaks, redistribution for a user",
    dynamo_args=dynamoDB_args
)

## Loading data to DynamoDB

Now that we've created our agent, let's load some generated data to DynamoDB. That will allow the agent to interact with some live data to perform actions

In [None]:
with open("3_peak_sample_data.json") as f:
    table_items = [json.loads(line) for line in f]

agents.load_dynamodb(dynamodb_table, table_items)

Testing that data was loaded on DynamoDB

In [None]:
resp = agents.query_dynamodb(dynamodb_table, dynamodb_pk, '1', dynamodb_sk, "1")
resp

## Testing Agent

Now, let's run some tests on the agent we just created to make sure it's working. To do so we will use our test alias: `TSTALIASID` which allows you to invoke a draft version of your agent

### Testing non-essential process detection
First let's ask a question related to non-essential process detection

In [None]:
%%time
response = agents.invoke(
    "What's causing my peak load? My id is 2", 
    peak_agent[0], enable_trace=True
)
print("====================")
print(response)

### Testing load optimization
Next let's ask the agent to optimize the consumption

In [None]:
%%time
response = agents.invoke(
    "Is it possible to optimize my consumption? My id is 1", 
    peak_agent[0], enable_trace=True
)
print("====================")
print(response)

### Testing load relocation
Finally, let's ask the agent to do some quota relloacation

In [None]:
%%time
response = agents.invoke(
    """Is it possible to change quota allocation? My id is 2, my item is 5 and new quota is 100""", 
    peak_agent[0], enable_trace=True
)
print("====================")
print(response)

Store environment variables to be used on next notebooks.

## Testing Agent with KB

The following function takes some time to retrieve the video file from the Knowledge Bases, so please be patient for it to finish. 

In [None]:
%%time
time.sleep(30)
response = agents.invoke(
    #"give me a list of videos related with lamps", 
    "what energy efficiency advice can you give based on the videos showing lamps in the living room, please include the original knowledge base source",
    #"what energy efficiency advice can you give based on the videos related fridge",
    peak_agent[0], 
    enable_trace=True,
    #end_session=True
)
print("====================")
print(response)

### Load the energy consumption video clip referred by the agent in citation

In [None]:
# Extract video path and timestamps from the response
import re
from IPython.display import HTML
from utils.knowledge_base_operators import play_video_from_bedrock_response

play_video_from_bedrock_response(response)

## Create alias

As you can see, you can use your agent with the `TSTALIASID` to complete tasks. 
However, for multi-agents collaboration it is expected that you first test your agent and only use it once it is fully functional. 
Therefore to use an agent as a sub-agent in a multi-agent collaboration you first need to create an agent alias and connect it to a new version. 

Since we've tested and validated our agent, let's now create an alias for it:

In [None]:
peak_agent_alias_id, peak_agent_alias_arn = agents.create_agent_alias(
    peak_agent[0], 'v1'
)
peak_agent_id = peak_agent[0]

In [None]:
peak_agent_arn = agents.get_agent_arn_by_name(peak_agent_name)
peak_dynamodb = dynamodb_table
peak_kb = knowledge_base_name

%store peak_agent_arn
%store peak_agent_alias_arn
%store peak_agent_alias_id
%store peak_lambda_name
%store peak_agent_name
%store peak_agent_id
%store peak_dynamodb
%store peak_kb

In [None]:
peak_agent_arn, peak_agent_alias_arn, peak_agent_alias_id

## Next Steps
Congratulations! We've now created all of our sub-agents. Next we will create our supervisor agent to do the orchestration between the sub-agents