# Lab 3. Peak Load Manager

## はじめに

このノートブックでは、Amazon Bedrock Agents で 3 番目で最後のサブエージェントを作成する方法を説明します。

このエージェントは、オフピーク時間に移行できる重要でないプロセスを識別し、グリッド割り当てを再分配します。

以下は、このモジュール上に構築されるアーキテクチャの一部を表しています。

![アーキテクチャ](img/peak_laod_agent.png)

## セットアップ

boto3 のバージョンが最新であることを確認してください。

そうでない場合は、[notebook 1](../1-energy-forecast/1_forecasting_agent.ipynb) を返さず、セットアップ ブロックを再度実行してください。

In [None]:
!pip freeze | grep boto3

## エージェントの作成

このセクションでは、ノートブック全体でヘルパーとして機能するグローバル変数を宣言し、2 番目のエージェントの作成を開始します。

In [None]:
import boto3
import json

sts_client = boto3.client('sts')
session = boto3.session.Session()

account_id = sts_client.get_caller_identity()["Account"]
region = session.region_name
account_id_suffix = account_id[:3]
agent_suffix = f"{region}-{account_id_suffix}"

agent_foundation_model = [
    'anthropic.claude-3-haiku-20240307-v1:0',
    'anthropic.claude-3-sonnet-20240229-v1:0',
    'anthropic.claude-3-5-sonnet-20240620-v1:0'
]

In [None]:
peak_agent_name = f"peak-agent-{agent_suffix}"

peak_lambda_name = f"fn-peak-agent-{agent_suffix}"

peak_agent_role_name = f'AmazonBedrockExecutionRoleForAgents_{peak_agent_name}'

dynamodb_table = f"{peak_agent_name}-table"
dynamodb_pk = "customer_id"
dynamodb_sk = "item_id"

dynamoDB_args = [dynamodb_table, dynamodb_pk, dynamodb_sk]


### ヘルパー関数のインポート

次のセクションでは、Python パスに `bedrock_agent_helper.py` を追加して、ファイルを認識してその機能を呼び出すことができるようにします。

次に、ヘルパー クラス `bedrock_agent_helper.py` をインポートします。

これらのファイルには、ラボをスムーズに実行することに重点を置いたヘルパー クラスが含まれています。

Bedrock とのすべてのやり取りは、これらのクラスによって処理されます。

このラボで呼び出すメソッドは次のとおりです。

`agents.py` の場合:
- `create_agent`: 新しいエージェントとそれぞれの IAM ロールを作成します
- `add_action_group_with_lambda`: Lambda 関数を作成し、以前に作成したエージェントのアクション グループとして追加します
- `create_agent_alias`: このエージェントのエイリアスを作成します
- `invoke`: エージェントを実行します

In [None]:
import sys

sys.path.insert(0, ".")
sys.path.insert(1, "..")

from utils.bedrock_agent_helper import (
    AgentsForAmazonBedrock
)
agents = AgentsForAmazonBedrock()

## エージェントの作成
リソース割り当てと重要でないプロセスの検出を処理するアクション グループを持つ Peak Load Manager エージェントを作成します。

このエージェントでは、次の手順を使用します:
```
IoT デバイス データとプロセス スケジュールを分析してエネルギー消費パターンを最適化する Peak Load Manager ボットです。

必要な機能は次のとおりです:
1. IoT デバイスからデータを取得する
2. ピーク時の重要でない負荷を特定し、他のスケジュールに再割り当てする
3. スケジュール調整を推奨する

応答スタイル:
- 正確かつ分析的である
- 明確で実用的な言葉を使用する
- 実行可能な推奨事項に焦点を当てる
- データを使用して提案をサポートする
- 簡潔でありながら徹底的である
- IoT デバイスから取得できる情報を要求しない
```

また、エージェントが利用できるツールとして次のツールを用意します:
- `detect_peak`: 当月の消費ピークを検出する
- `detect_non_essential_processes`: ピークの原因となっている重要でないプロセスを検出する
- `redistribute_allocation`: 当月の特定の項目に割り当てられたクォータを減らす/増やす

In [None]:
peak_agent = agents.create_agent(
    peak_agent_name,
    """You are a peak load manager bot. 
    You can retrieve information from IoT devices, 
    identify process and their peak energy consumption and suggest shifts to off-peak hours.
    """,
    """You are a Peak Load Manager Bot that optimizes energy consumption patterns
by analyzing IoT device data and process schedules.

Your capabilities include:
1. Retrieving data from IoT devices
2. Identifying non-essential loads during peak hours and reallocating them to other schedules
3. Recommending schedule adjustments

Response style:
- Be precise and analytical
- Use clear, practical language
- Focus on actionable recommendations
- Support suggestions with data
- Be concise yet thorough
- Do not request information that can be retrieved from IoT devices
    """,
    agent_foundation_model
)

peak_agent

## アクション グループの作成

このセッションでは、ピーク管理を処理するアクション グループを作成し、それをエージェントに関連付けます。そのためには、まずエージェントのアクションを実行する Lambda 関数コードを作成します。次に、関数の詳細を使用してエージェントが実行できるアクションを定義します。前のエージェントと同様に、OpenAPI スキーマを使用して利用可能なアクションを定義することもできます。

#### Lambda 関数の作成
まず、Lambda 関数を作成しましょう

In [None]:
%%writefile peak_load.py
import os
import boto3
import json
import random

from boto3.dynamodb.conditions import Key, Attr

dynamodb_resource = boto3.resource('dynamodb')
dynamodb_table = os.getenv('dynamodb_table')
dynamodb_pk = os.getenv('dynamodb_pk')
dynamodb_sk = os.getenv('dynamodb_sk')

def get_named_parameter(event, name):
    return next(item for item in event['parameters'] if item['name'] == name)['value']
    
def populate_function_response(event, response_body):
    return {'response': {'actionGroup': event['actionGroup'], 'function': event['function'],
                'functionResponse': {'responseBody': {'TEXT': {'body': str(response_body)}}}}}

def put_dynamodb(table_name, item):
    table = dynamodb_resource.Table(table_name)
    
    resp = table.update_item(
        Key={'customer_id': item['customer_id'],
             'item_id': item['item_id']},
        UpdateExpression='SET #attr1 = :val1',
        ExpressionAttributeNames={'#attr1': 'quota'},
        ExpressionAttributeValues={':val1':  item['quota']}
    )
    return resp

def read_dynamodb(
    table_name: str, 
    pk_field: str,
    pk_value: str,
    sk_field: str=None, 
    sk_value: str=None,
    attr_key: str=None,
    attr_val: str=None
):
    try:

        table = dynamodb_resource.Table(table_name)
        # Create expression
        if sk_field:
            key_expression = Key(pk_field).eq(pk_value) & Key(sk_field).eq(sk_value)
        else:
            key_expression = Key(pk_field).eq(pk_value)

        if attr_key:
            attr_expression = Attr(attr_key).eq(attr_val)
            query_data = table.query(
                KeyConditionExpression=key_expression,
                FilterExpression=attr_expression
            )
        else:
            query_data = table.query(
                KeyConditionExpression=key_expression
            )
        
        return query_data['Items']
    except Exception:
        print(f'Error querying table: {table_name}.')


def detect_peak(customer_id):
    return read_dynamodb(dynamodb_table, 
                         dynamodb_pk, 
                         customer_id, 
                         attr_key="peak", attr_val="True")

def detect_non_essential_processes(customer_id):
    return read_dynamodb(dynamodb_table, 
                         dynamodb_pk, 
                         customer_id,
                         attr_key="essential", attr_val="False")

                
def redistribute_allocation(customer_id, item_id, quota):
    item = {
        'customer_id': customer_id,
        'item_id': item_id,
        'quota': quota
    }
    resp = put_dynamodb(dynamodb_table, item)
    return "Item {} has been updated. New quota: {}".format(item_id, quota)


def lambda_handler(event, context):
    print(event)
    
    # name of the function that should be invoked
    function = event.get('function', '')

    # parameters to invoke function with
    parameters = event.get('parameters', [])
    
    customer_id = get_named_parameter(event, "customer_id")

    if function == 'detect_peak':    
        result = detect_peak(customer_id)
    elif function == 'detect_non_essential_processes':    
        result = detect_non_essential_processes(customer_id)
    elif function == 'redistribute_allocation':    
        item_id = get_named_parameter(event, "item_id")
        quota = get_named_parameter(event, "quota")
        result = redistribute_allocation(customer_id, item_id, quota)
    else:
        result = f"Error, function '{function}' not recognized"

    response = populate_function_response(event, result)
    print(response)
    return response

### 利用可能なアクションの定義
エージェントが実行できるアクションを定義します

In [None]:
functions_def = [
    {
        "name": "detect_peak",
        "description": """detect consumption peak during current month""",
        "parameters": {
                        "customer_id": {
                            "description": "The ID of the customer",
                            "required": True,
                            "type": "string"
                        }
                    }
    },
    {
        "name": "detect_non_essential_processes",
        "description": """detect non-essential processes that are causing the peaks""",
        "parameters": {
                        "customer_id": {
                            "description": "The ID of the customer",
                            "required": True,
                            "type": "string"
                        }
                    }
    },
    {
        "name": "redistribute_allocation",
        "description": """reduce/increase allocated quota for a specific 
                            item during current month""",
        "parameters": {
                        "customer_id": {
                            "description": "The ID of the customer",
                            "required": True,
                            "type": "string"
                        },
                        "item_id": {
                            "description": "Item that will be updated",
                            "required": True,
                            "type": "string"
                        },
                        "quota": {
                            "description": "new quota",
                            "required": True,
                            "type": "string"
                        }
                    }
    }
]

### アクション グループをエージェントに関連付ける
最後に、以前に作成したエージェントに新しいアクション グループを関連付けることができます

In [None]:
resp = agents.add_action_group_with_lambda(
    agent_name=peak_agent_name,
    lambda_function_name=peak_lambda_name,
    source_code_file="peak_load.py",
    agent_functions=functions_def,
    agent_action_group_name="peak_load_actions",
    agent_action_group_description="Function to get usage, peaks, redistribution for a user",
    dynamo_args=dynamoDB_args
)

## DynamoDB へのデータのロード

エージェントを作成したので、生成されたデータを DynamoDB にロードしましょう。これにより、エージェントはライブ データとやり取りしてアクションを実行できるようになります。

In [None]:
with open("3_peak_sample_data.json") as f:
    table_items = [json.loads(line) for line in f]

agents.load_dynamodb(dynamodb_table, table_items)

データが DynamoDB にロードされたことをテストする

In [None]:
resp = agents.query_dynamodb(dynamodb_table, dynamodb_pk, '1', dynamodb_sk, "1")
resp

## エージェントのテスト

では、作成したエージェントでテストをいくつか実行して、動作していることを確認しましょう。そのためには、テスト エイリアス `TSTALIASID` を使用します。これにより、エージェントのドラフト バージョンを呼び出すことができます。

### 非必須プロセス検出のテスト
まず、非必須プロセス検出に関連する質問をしてみましょう。

In [None]:
%%time
response = agents.invoke(
    "What's causing my peak load? My id is 2", 
    peak_agent[0], enable_trace=True
)
print("====================")
print(response)

### 負荷最適化のテスト
次に、エージェントに消費を最適化するように依頼します

In [None]:
%%time
response = agents.invoke(
    "Is it possible to optimize my consumption? My id is 1", 
    peak_agent[0], enable_trace=True
)
print("====================")
print(response)

### 負荷の再配置のテスト
最後に、エージェントにクォータの再配置を依頼します

In [None]:
%%time
response = agents.invoke(
    """Is it possible to change quota allocation? My id is 2, my item is 5 and new quota is 100""", 
    peak_agent[0], enable_trace=True
)
print("====================")
print(response)

## エイリアスの作成

ご覧のとおり、エージェントを `TSTALIASID` とともに使用してタスクを完了できます。
ただし、マルチエージェント コラボレーションの場合は、最初にエージェントをテストし、完全に機能するようになってからのみ使用することが想定されます。
したがって、マルチエージェント コラボレーションでエージェントをサブエージェントとして使用するには、まずエージェント エイリアスを作成し、それを新しいバージョンに接続する必要があります。

エージェントをテストして検証したので、次にそのエイリアスを作成しましょう。

In [None]:
peak_agent_alias_id, peak_agent_alias_arn = agents.create_agent_alias(
    peak_agent[0], 'v1'
)
peak_agent_id = peak_agent[0]

次のノートブックで使用する環境変数を保存します。

In [None]:
peak_agent_arn = agents.get_agent_arn_by_name(peak_agent_name)
peak_dynamodb = dynamodb_table

%store peak_agent_arn
%store peak_agent_alias_arn
%store peak_agent_alias_id
%store peak_lambda_name
%store peak_agent_name
%store peak_agent_id
%store peak_dynamodb

In [None]:
peak_agent_arn, peak_agent_alias_arn, peak_agent_alias_id

## 次のステップ
おめでとうございます! これでサブエージェントがすべて作成されました。次はサブエージェント間のオーケストレーションを行うスーパーバイザーエージェントを作成します。