In [1]:
import os
import json
import yaml
import time
import boto3
import logging
from globals import *
from pathlib import Path
from botocore.exceptions import ClientError

In [2]:
# Setup logging
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)

In [3]:
!pygmentize globals.py

[37m# global constants[39;49;00m[37m[39;49;00m
CONFIG_FILE: [36mstr[39;49;00m = [33m"[39;49;00m[33mconfig.yml[39;49;00m[33m"[39;49;00m[37m[39;49;00m
MODEL_ID_TOPROMPT_ID_MAPPING_FILE: [36mstr[39;49;00m = [33m"[39;49;00m[33m.model_id_to_prompt_id_mapping.json[39;49;00m[33m"[39;49;00m[37m[39;49;00m
[37m[39;49;00m
LAMBDA_DIR: [36mstr[39;49;00m = [33m"[39;49;00m[33mlambda[39;49;00m[33m"[39;49;00m[37m[39;49;00m
CONFIG_FILE: [36mstr[39;49;00m = [33m"[39;49;00m[33mconfig.yml[39;49;00m[33m"[39;49;00m[37m[39;49;00m
CODE_GEN_LAMBDA: [36mstr[39;49;00m = [33m"[39;49;00m[33mcode-gen[39;49;00m[33m"[39;49;00m[37m[39;49;00m
LAMBDA_ARN_FILE: [36mstr[39;49;00m = [33m"[39;49;00m[33m.lambda_arn[39;49;00m[33m"[39;49;00m[37m[39;49;00m


In [11]:
config = yaml.safe_load(Path(CONFIG_FILE).read_text())
logger.info(f"config=\n{json.dumps(config, indent=2)}")

[2025-01-19 01:50:12,428] p22070 {3811199622.py:2} INFO - config=
{
  "general": {
    "app_name": "code-gen-agent",
    "description": "Amazon Bedrock Agent for generating code for the USACO benchmark",
    "role_name": "CodeGenLambdaRole",
    "region": "us-east-1",
    "model_id": "amazon.nova-micro-v1:0",
    "agent_instructions": "Generate Python code for the USACO problems. You have access to a tool for generating the code.\n",
    "ttl": 1800
  },
  "action_group": {
    "name": "Generate_Python_code",
    "description": "Generates Python code by using foundation models"
  },
  "prompt_info": {
    "name": "USACO_{model_id}",
    "description": "Generate code for the USACO benchmark"
  },
  "prompt_templates": {
    "nova": {
      "models": [
        "amazon.nova-pro-v1:0",
        "amazon.nova-lite-v1:0",
        "amazon.nova-micro-v1:0"
      ],
      "text": "Please reply with a Python 3 solution to the below problem. Read the general instructions below that are applicable t

In [5]:
# fetch the current AWS region
region = config['general']['region']
# the region to be dynamically fetched
logger.info(f"Current AWS region: {region}")
bedrock_agent = boto3.client(service_name = "bedrock-agent", region_name = region)

[2025-01-19 01:48:28,646] p22070 {4102934896.py:4} INFO - Current AWS region: us-east-1
[2025-01-19 01:48:28,669] p22070 {credentials.py:1075} INFO - Found credentials from IAM Role: fmbench-orchestrator


In [6]:
role_name: str = config['general']['role_name']
account: str = boto3.client('sts').get_caller_identity()['Account']
role = f"arn:aws:iam::{account}:role/{role_name}"

In [7]:
agent_name = config['general']['app_name']
agent_description = config['general']['description']
agent_model_id = config['general']['model_id']
agent_instructions = config['general']['agent_instructions']
ttl = int(config['general']['ttl'])
response = bedrock_agent.create_agent(
                        agentName=agent_name,
                        agentResourceRoleArn=role,
                        description=agent_description.replace('\n', ''), # console doesn't like newlines for subsequent editing
                        idleSessionTTLInSeconds=ttl,
                        foundationModel=agent_model_id,
                        instruction=agent_instructions,
                    )
time.sleep(5)

ConflictException: An error occurred (ConflictException) when calling the CreateAgent operation: Could not perform Create operation, since the code-gen-agent (id: 44QLPZDCIK) with the same name code-gen-agent already exists

In [8]:
logger.info(json.dumps(response, indent=2, default=str))
agent_id = "44QLPZDCIK" #response['agent']['agentId']
logger.info(f"agent_id={agent_id}")

NameError: name 'response' is not defined

In [9]:
agent_id = "44QLPZDCIK"

In [12]:
lambda_arn = Path(LAMBDA_ARN_FILE).read_text().strip()
agent_action_group_name: str = config['action_group']['name']
agent_action_group_description: str = config['action_group']['description']

logger.info(f"lambda_arn for the action group is {lambda_arn}, agent_action_group_name={agent_action_group_name}")
response = bedrock_agent.create_agent_action_group(
                agentId=agent_id,
                agentVersion='DRAFT',
                actionGroupExecutor={'lambda': lambda_arn},
                actionGroupName=agent_action_group_name,
                apiSchema=dict(payload=Path(os.path.join(LAMBDA_DIR, "openapi.json")).read_text()),
                description=agent_action_group_description
                )
logger.info(f"create agent response={json.dumps(response, indent=2, default=str)}")
response = bedrock_agent.prepare_agent(
        agentId=agent_id
    )
time.sleep(5)

[2025-01-19 01:50:21,013] p22070 {1726923266.py:5} INFO - lambda_arn for the action group is arn:aws:lambda:us-east-1:015469603702:function:code-gen:9, agent_action_group_name=Generate_Python_code
[2025-01-19 01:50:21,144] p22070 {1726923266.py:14} INFO - create agent response={
  "ResponseMetadata": {
    "RequestId": "aedcdef4-1d5b-4f8f-8c12-c4443eef0644",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "date": "Sun, 19 Jan 2025 01:50:21 GMT",
      "content-type": "application/json",
      "content-length": "3012",
      "connection": "keep-alive",
      "x-amzn-requestid": "aedcdef4-1d5b-4f8f-8c12-c4443eef0644",
      "x-amz-apigw-id": "EnMOmEU7IAMEaew=",
      "x-amzn-trace-id": "Root=1-678c5a5d-104e0bc27d2e3acb0571beb1"
    },
    "RetryAttempts": 0
  },
  "agentActionGroup": {
    "actionGroupExecutor": {
      "lambda": "arn:aws:lambda:us-east-1:015469603702:function:code-gen:9"
    },
    "actionGroupId": "IKPFOLDNIF",
    "actionGroupName": "Generate_Python_code",
    "

In [21]:
timeout = 60
sleep_time = 5
while timeout > 0:
    response = bedrock_agent.get_agent(
            agentId=agent_id
        )
    agent_status = response['agent']['agentStatus']
    logger.info(f"agent_id={agent_id}, agentStatus={agent_status}")
    if agent_status == 'PREPARED':
        break
    
    time.sleep(sleep_time)
    timeout -= sleep_time

[2025-01-19 04:24:42,920] p22070 {2449327317.py:8} INFO - agent_id=44QLPZDCIK, agentStatus=PREPARED


In [25]:
input_text = "code to upload file to s3 using amazon.nova-micro-v1:0 model"
agent_alias_id = "TSTALIASID"
import uuid

session_id = str(uuid.uuid4())
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime', region_name=region)
enable_trace = True
session_state = {}
end_session = False
agent_resp = bedrock_agent_runtime_client.invoke_agent(
            inputText=input_text,
            agentId=agent_id,
            agentAliasId=agent_alias_id,
            sessionId=session_id,
            sessionState=session_state,
            enableTrace=enable_trace,
            endSession=end_session,
        )


In [28]:
print(f"Agent raw response: {json.dumps(agent_resp, indent=2, default=str)}")

# Return error message if invoke was unsuccessful
if agent_resp["ResponseMetadata"]["HTTPStatusCode"] != 200:
    _error_message = f"API Response was not 200: {agent_resp}"
    if enable_trace:
        print(_error_message)



Agent raw response: {
  "ResponseMetadata": {
    "RequestId": "ef9df5f3-4de8-4407-ad26-6ee26c6c295b",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "date": "Sun, 19 Jan 2025 04:34:49 GMT",
      "content-type": "application/vnd.amazon.eventstream",
      "transfer-encoding": "chunked",
      "connection": "keep-alive",
      "x-amzn-requestid": "ef9df5f3-4de8-4407-ad26-6ee26c6c295b",
      "x-amz-bedrock-agent-session-id": "f790bbe8-992b-46e9-a38a-d1d1b8e7dc29",
      "x-amzn-bedrock-agent-content-type": "application/json"
    },
    "RetryAttempts": 0
  },
  "contentType": "application/json",
  "sessionId": "f790bbe8-992b-46e9-a38a-d1d1b8e7dc29",
  "completion": "<botocore.eventstream.EventStream object at 0x7c7d89c7be00>"
}


In [44]:
# from IPython.display import display, Markdown
import matplotlib.pyplot as plt
# import matplotlib.image as mpimg

from termcolor import colored
from rich.console import Console
from rich.markdown import Markdown

def _make_fully_cited_answer(
            orig_agent_answer, event, enable_trace=False, trace_level="none"
    ):
        _citations = event.get("chunk", {}).get("attribution", {}).get("citations", [])
        if _citations:
            if enable_trace:
                print(
                    f"got {len(event['chunk']['attribution']['citations'])} citations \n"
                )
        else:
            return orig_agent_answer

        # remove <sources> tags to work around a bug
        _pattern = r"\n\n<sources>\n\d+\n</sources>\n\n"
        _cleaned_text = re.sub(_pattern, "", orig_agent_answer)
        _pattern = "<sources><REDACTED></sources>"
        _cleaned_text = re.sub(_pattern, "", _cleaned_text)
        _pattern = "<sources></sources>"
        _cleaned_text = re.sub(_pattern, "", _cleaned_text)

        _fully_cited_answer = ""
        _curr_citation_idx = 0

        for _citation in _citations:
            if enable_trace and trace_level == "all":
                print(f"full citation: {_citation}")

            _start = _citation["generatedResponsePart"]["textResponsePart"]["span"][
                         "start"
                     ] - (
                             _curr_citation_idx + 1
                     )  # +1
            _end = (
                    _citation["generatedResponsePart"]["textResponsePart"]["span"]["end"]
                    - (_curr_citation_idx + 2)
                    + 4
            )  # +2
            _refs = _citation.get("retrievedReferences", [])
            if len(_refs) > 0:
                _ref_url = (
                    _refs[0].get("location", {}).get("s3Location", {}).get("uri", "")
                )
            else:
                _ref_url = ""
                _fully_cited_answer = _cleaned_text
                break

            _fully_cited_answer += _cleaned_text[_start:_end] + " [" + _ref_url + "] "

            if _curr_citation_idx == 0:
                _answer_prefix = _cleaned_text[:_start]
                _fully_cited_answer = _answer_prefix + _fully_cited_answer

            _curr_citation_idx += 1

            if enable_trace and trace_level == "all":
                print(f"\n\ncitation {_curr_citation_idx}:")
                print(
                    f"got {len(_citation['retrievedReferences'])} retrieved references for this citation\n"
                )
                print(f"citation span... start: {_start}, end: {_end}")
                print(
                    f"citation based on span:====\n{_cleaned_text[_start:_end]}\n===="
                )
                print(f"citation url: {_ref_url}\n============")

        if enable_trace and trace_level == "all":
            print(
                f"\nfullly cited answer:*************\n{_fully_cited_answer}\n*************"
            )

        return _fully_cited_answer



def invoke(
            input_text: str,
            agent_id: str,
            agent_alias_id: str = "TSTALIASID",
            session_id: str = str(uuid.uuid1()),
            session_state: dict = {},
            enable_trace: bool = True,
            end_session: bool = False,
            trace_level: str = "all",
            multi_agent_names: dict = {},
    ):
        """Invokes an agent with a given input text, while optional parameters
        also let you leverage an agent session, or target a specific agent alias.

        Args:
            input_text (str): The text to be processed by the agent.
            agent_id (str): The ID of the agent to invoke.
            agent_alias_id (str, optional): The alias ID of the agent to invoke. Defaults to "TSTALIASID".
            session_id (str, optional): The ID of the session. Defaults to a new UUID.
            session_state (dict, optional): The state of the session. Defaults to an empty dict.
            enable_trace (bool, optional): Whether to enable trace. Defaults to False.
            end_session (bool, optional): Whether to end the session. Defaults to False.
            trace_level (str, optional): The level of trace. Defaults to "none". Possible values are "none", "all", "core".

        Returns:
            str: The answer from the agent.
        """
        # To upload files to the agent, for use in the sandbox use sessionState:
        import base64
        import datetime
        from pathlib import Path
        _time_before_call = datetime.datetime.now()

        _agent_resp = bedrock_agent_runtime_client.invoke_agent(
            inputText=input_text,
            agentId=agent_id,
            agentAliasId=agent_alias_id,
            sessionId=session_id,
            sessionState=session_state,
            enableTrace=enable_trace,
            endSession=end_session,
        )
        print(f"Agent raw response: {_agent_resp}")

        if enable_trace:
            if trace_level == "all":
                print(f"invokeAgent API response object: {_agent_resp}")
            else:
                print(
                    f"invokeAgent API request ID: {_agent_resp['ResponseMetadata']['RequestId']}"
                )
                print(f"invokeAgent API session ID: {session_id}")

        # Return error message if invoke was unsuccessful
        if _agent_resp["ResponseMetadata"]["HTTPStatusCode"] != 200:
            _error_message = f"API Response was not 200: {_agent_resp}"
            if enable_trace and trace_level == "all":
                print(_error_message)
            return _error_message

        _total_in_tokens = 0
        _total_out_tokens = 0
        _total_llm_calls = 0
        _orch_step = 0
        _sub_step = 0
        _time_before_orchestration = datetime.datetime.now()
        
        _agent_answer = ""
        _event_stream = _agent_resp['completion']

        # Create the directory using the sessionId
        import io
        from pathlib import Path
        from datetime import datetime
        now_as_str = str(datetime.now()).replace(" ", "_").replace(":", "_")
        dirname = "agent_response"
        session_directory_path = Path(dirname)
        if not os.path.exists(session_directory_path):
            os.makedirs(session_directory_path) 
        print(f"Session directory is: {session_directory_path}")
        #The response of this operation contains an EventStream member. 
        event_stream = _agent_resp["completion"]

        # When iterated the EventStream will yield events.
        event_ctr = 0
        chunk_ctr = 0
        file_ctr = 0
        image_ctr = 0
        other_file_ctr= 0
        final_code = ""

        try:
            _sub_agent_name = "<collab-name-not-yet-provided>"
            for _event in _event_stream:
                _sub_agent_alias_id = None 
                if 'files' in _event:
                    file_ctr += 1
                    print(f"event_ctr={event_ctr+1}, received file in event, file_ctr={file_ctr+1}")
                    files = _event['files']['files']
                    for i, file in enumerate(files):
                        #print(f"file={file}")
                        print(f"event_ctr={event_ctr+1}, file_ctr={file_ctr+1}, i={i}")
                        name = file['name']
                        type = file['type']
                        bytes_data = file['bytes']
                        
                        # It the file is a PNG image then we can display it...
                        if type == 'image/png':
                            image_ctr += 1
                            print(f"event_ctr={event_ctr+1}, file_ctr={file_ctr+1}, image_ctr={image_ctr+1}")
                            fname = os.path.join(session_directory_path, f"output_image_{event_ctr+1}.png")
                            print(f"fname is: {fname}")
                            Path(fname).write_bytes(bytes_data)
                            # Display PNG image using Matplotlib
                            img = plt.imread(io.BytesIO(bytes_data))
                            plt.figure(figsize=(10, 10))
                            plt.imshow(img)
                            plt.axis('off')
                            plt.title(name)
                            plt.show()
                            plt.close()
                            
                        # If the file is NOT a PNG then we save it to disk...
                        else:
                            other_file_ctr += 1
                            print(f"event_ctr={event_ctr+1}, file_ctr={file_ctr+1}, other_file_ctr={other_file_ctr+1}")

                            # Save other file types to local disk
                            unique_fname = Path(name).stem + "_" + now_as_str + Path(name).suffix
                            with open(unique_fname, 'wb') as f:
                                f.write(bytes_data)
                            print(f"File '{name}' as {unique_fname} saved to disk.")
                elif 'chunk' in _event:
                    _data = _event['chunk']['bytes']
                    _agent_answer = _data.decode('utf8')
                    _agent_answer = _make_fully_cited_answer(_agent_answer, _event, enable_trace, trace_level)

                if 'trace' in _event and enable_trace:
                    if trace_level == "all":
                        print('---')
                    else:
                        if 'callerChain' in _event['trace']:
                            if len(_event['trace']['callerChain']) > 1:
                                _sub_agent_alias_arn = _event['trace']['callerChain'][1]['agentAliasArn']
                                # get sub agent id by grabbing all text following the second '/' character
                                _sub_agent_alias_id = _sub_agent_alias_arn.split('/', 1)[1]
                                try:
                                    _sub_agent_name = multi_agent_names[_sub_agent_alias_id]
                                except:
                                    print("You haven't provided agents names. To do so provide a dictionary in the format {f'{agent_id}/{agent_alias_id}': f'{agent_name}'})")
                                    _sub_agent_name = "<not-yet-provided>"

                        # if 'collaboratorName' in _event['trace']:
                        #     _sub_agent_name = _event['trace']['collaboratorName'] 
                        # else:
                        #     _sub_agent_name = "<collab-name-not-yet-provided>"

                    if 'routingClassifierTrace' in _event['trace']['trace']:
                        _route = _event['trace']['trace']['routingClassifierTrace']

                        if 'modelInvocationInput' in _route:
                            _orch_step +=1 
                            print(colored(f"---- Step {_orch_step} ----", "green"))
                            _time_before_routing = datetime.datetime.now()
                            print(colored("Classifying request to immediately route to one collaborator if possible.", "blue"))

                        if 'modelInvocationOutput' in _route:
                            _llm_usage = _route['modelInvocationOutput']['metadata']['usage']
                            _in_tokens = _llm_usage['inputTokens']
                            _total_in_tokens += _in_tokens 

                            _out_tokens = _llm_usage['outputTokens']
                            _total_out_tokens += _out_tokens

                            _total_llm_calls += 1
                            _route_duration = datetime.datetime.now() - _time_before_routing

                            _raw_resp_str = _route['modelInvocationOutput']['rawResponse']['content']
                            _raw_resp = json.loads(_raw_resp_str)
                            _classification = _raw_resp['content'][0]['text'].replace('<a>', '').replace('</a>', '')

                            if _classification == UNDECIDABLE_CLASSIFICATION:
                                print(colored(f"Routing classifier did not find a matching collaborator. Reverting to 'SUPERVISOR' mode.", "magenta"))
                            elif _classification == 'keep_previous_agent':
                                print(colored(f"Continuing conversation with previous collaborator.", "magenta"))
                                # # since we replaced the typical orchestration step with a simple routing
                                # # classification, bump the step count.
                                # _orch_step += 1
                            else:
                                _sub_agent_name = _classification
                                print(colored(f"Routing classifier chose collaborator: '{_classification}'", "magenta"))
                                # # since we replaced the typical orchestration step with a simple routing
                                # # classification, bump the step count.
                                # _orch_step += 1
                            print(colored(f"Routing classifier took {_route_duration.total_seconds():,.1f}s, using {_in_tokens+_out_tokens} tokens (in: {_in_tokens}, out: {_out_tokens}).\n", "yellow"))

                    if 'failureTrace' in _event['trace']['trace']:
                        print(colored(f"Agent error: {_event['trace']['trace']['failureTrace']['failureReason']}", "red"))

                    if 'orchestrationTrace' in _event['trace']['trace']:
                        _orch = _event['trace']['trace']['orchestrationTrace']

                        if trace_level in ["core", "outline"]:
                            if "rationale" in _orch:
                                _rationale = _orch['rationale']
                                print(colored(f"{_rationale['text']}", "blue"))

                            if "invocationInput" in _orch:
                                # NOTE: when agent determines invocations should happen in parallel
                                # the trace objects for invocation input still come back one at a time.
                                _input = _orch['invocationInput']

                                if 'actionGroupInvocationInput' in _input:
                                    if trace_level == "outline":
                                        print(colored(f"Using tool: {_input['actionGroupInvocationInput']['function']}", "magenta"))
                                    else:
                                        print(f"Action group information: {_input['actionGroupInvocationInput']}")
                                        print(colored(f"Using tool: {_input['actionGroupInvocationInput']['function']} with these inputs:", "magenta"))
                                        if (len(_input['actionGroupInvocationInput']['parameters']) == 1) and (_input['actionGroupInvocationInput']['parameters'][0]['name'] == 'input_text'):
                                            print(colored(f"{_input['actionGroupInvocationInput']['parameters'][0]['value']}", "magenta"))
                                        else:
                                            print(colored(f"{_input['actionGroupInvocationInput']['parameters']}\n", "magenta"))

                                elif 'agentCollaboratorInvocationInput' in _input:
                                    _collab_name = _input['agentCollaboratorInvocationInput']['agentCollaboratorName']
                                    _sub_agent_name = _collab_name
                                    _collab_input_text = _input['agentCollaboratorInvocationInput']['input']['text']
                                    _collab_arn = _input['agentCollaboratorInvocationInput']['agentCollaboratorAliasArn']
                                    _collab_ids = _collab_arn.split('/', 1)[1]

                                    if trace_level == "outline":
                                        print(colored(f"Using sub-agent collaborator: '{_collab_name} [{_collab_ids}]'", "magenta"))
                                    else:
                                        print(colored(f"Using sub-agent collaborator: '{_collab_name} [{_collab_ids}]' passing input text:", "magenta"))
                                        print(colored(f"{_collab_input_text[0:TRACE_TRUNCATION_LENGTH]}\n", "magenta"))

                                elif 'codeInterpreterInvocationInput' in _input:
                                    if trace_level == "outline":
                                        print(colored(f"Using code interpreter", "magenta"))
                                    else:
                                        console = Console()
                                        _gen_code = _input['codeInterpreterInvocationInput']['code']
                                        _code = f"```python\n{_gen_code}\n```"
                                        if _code is not None:
                                            fname = os.path.join(session_directory_path, f"code_event_{event_ctr+1}.py")
                                            from pathlib import Path
                                            # remove bedrock agent specific code from here
                                            code = _code.replace("$BASE_PATH$/", "")
                                            final_code += "\n" + code
                                            Path(fname).write_text(code)
                                        console.print(Markdown(f"**Generated code**\n{_code}"))

                            if "observation" in _orch:
                                if trace_level == "core":
                                    _output = _orch['observation']
                                    if 'actionGroupInvocationOutput' in _output:
                                        tool_output = _output['actionGroupInvocationOutput']['text']
                                        print(colored("--tool outputs:", "magenta"))
                                        
                                        # Parse the tool_output to check if it's a generate_code response
                                        try:
                                            if "'original_generated_code':" in tool_output:  # This indicates it's from generate_code function
                                                safe_input = input_text.lower()
                                                safe_input = ''.join(c if c.isalnum() else '_' for c in safe_input)[:20]
                                                unique_id = str(uuid.uuid4()) 
                                                filename = f"code_event_{unique_id}_{safe_input}.py"
                                                fname = os.path.join(session_directory_path, filename)
                                                
                                                code_start = tool_output.find("'original_generated_code': '") + len("'original_generated_code': '")
                                                code_end = tool_output.find("'", code_start)
                                                code = tool_output[code_start:code_end]
                                                
                                                # handle code formatting
                                                code = code.replace("```python", "").replace("```", "")
                                                code = code.replace("\\\\\\$BASE_PATH\\\\\\$/", "")
                                                code = code.replace('\\n', '\n')
                                                code = code.replace('\\t', '\t')
                                                code = code.replace('\\"', '"')
                                                code = code.replace("\\'", "'")
                                                code = code.replace("\\\\", "\\")
                                                
                                                Path(fname).write_text(code)
                                                print(f"\nCode saved to: {fname}")
                                                console = Console()
                                                _code = f"```python\n{code}\n```"
                                                console.print(Markdown(f"\n**Generated code**\n{_code}"))
                                            else:
                                                print(f"Tool output:")
                                                print(colored(f"  {tool_output}", "magenta"))
                                    
                                        except Exception as e:
                                            print(f"Error saving to primary location: {e}")
                                            try:
                                                fallback_fname = os.path.join(os.getcwd(), filename)
                                                Path(fallback_fname).write_text(code)
                                                print(f"Code saved to fallback location: {fallback_fname}")
                                            except Exception as e2:
                                                print(f"Failed to save code even to fallback location: {e2}")
                                    if 'agentCollaboratorInvocationOutput' in _output:
                                        _collab_name = _output['agentCollaboratorInvocationOutput']['agentCollaboratorName']
                                        _collab_output_text = _output['agentCollaboratorInvocationOutput']['output']['text']
                                        print(colored(f"\n----sub-agent {_collab_name} output text:", "magenta"))
                                        if '\n' in _collab_output_text:
                                            for line in _collab_output_text.split('\n'):
                                                print(colored(f"  {line}", "magenta"))
                                        else:
                                            print(colored(f"  {_collab_output_text}", "magenta"))
                                        print()

                                    if 'finalResponse' in _output:
                                        final_response = _output['finalResponse']['text']
                                        print(colored("Final response:", "cyan"))
                                        if '\n' in final_response:
                                            for line in final_response.split('\n'):
                                                print(colored(f"  {line}", "cyan"))
                                        else:
                                            print(colored(f"  {final_response}", "cyan"))

                        # if 'modelInvocationInput' in _orch:
                        #     if _sub_agent_alias_id is not None:
                        #         _sub_step += 1
                        #         print(colored(f"---- Step {_orch_step}.{_sub_step} [using sub-agent name:{_sub_agent_name}, id:{_sub_agent_alias_id}] ----", "green"))
                        #     else:
                        #         _orch_step += 1
                        #         _sub_step = 0
                        #         print(colored(f"---- Step {_orch_step} ----", "green"))

                        if 'modelInvocationOutput' in _orch:
                            if _sub_agent_alias_id is not None:
                                _sub_step += 1
                                print(colored(f"---- Step {_orch_step}.{_sub_step} [using sub-agent name:{_sub_agent_name}, id:{_sub_agent_alias_id}] ----", "green"))
                            else:
                                _orch_step += 1
                                _sub_step = 0
                                print(colored(f"---- Step {_orch_step} ----", "green"))

                            _llm_usage = _orch['modelInvocationOutput']['metadata']['usage']
                            _in_tokens = _llm_usage['inputTokens']
                            _total_in_tokens += _in_tokens 

                            _out_tokens = _llm_usage['outputTokens']
                            _total_out_tokens += _out_tokens

                            _total_llm_calls += 1
                            import datetime
                            _orch_duration = datetime.datetime.now() - _time_before_orchestration

                            print(colored(f'Took {_orch_duration.total_seconds():,.1f}s, using {_in_tokens+_out_tokens} tokens (in: {_in_tokens}, out: {_out_tokens}) to complete prior action, observe, orchestrate.', "yellow"))

                            # restart the clock for next step/sub-step
                            _time_before_orchestration = datetime.datetime.now()

                    elif 'preProcessingTrace' in _event['trace']['trace']:
                        _pre = _event['trace']['trace']['preProcessingTrace']
                        if 'modelInvocationOutput' in _pre:
                            _llm_usage = _pre['modelInvocationOutput']['metadata']['usage']
                            _in_tokens = _llm_usage['inputTokens']
                            _total_in_tokens += _in_tokens 

                            _out_tokens = _llm_usage['outputTokens']
                            _total_out_tokens += _out_tokens

                            _total_llm_calls += 1

                            print(colored("Pre-processing trace, agent came up with an initial plan.", "yellow"))
                            print(colored(f'Used LLM tokens, in: {_in_tokens}, out: {_out_tokens}', "yellow"))

                    elif 'postProcessingTrace' in _event['trace']['trace']:
                        _post = _event['trace']['trace']['postProcessingTrace']
                        if 'modelInvocationOutput' in _post:
                            _llm_usage = _post['modelInvocationOutput']['metadata']['usage']
                            _in_tokens = _llm_usage['inputTokens']
                            _total_in_tokens += _in_tokens 

                            _out_tokens = _llm_usage['outputTokens']
                            _total_out_tokens += _out_tokens

                            _total_llm_calls += 1
                            print(colored("Agent post-processing complete.", "yellow"))
                            print(colored(f'Used LLM tokens, in: {_in_tokens}, out: {_out_tokens}', "yellow"))

                    if trace_level == "all":
                        print(json.dumps(_event['trace'], indent=2))

                if 'files' in _event.keys() and enable_trace:
                    console = Console()
                    files_event = _event['files']
                    console.print(Markdown("**Files**"))

                    files_list = files_event['files']
                    for this_file in files_list:
                        print(f"{this_file['name']} ({this_file['type']})")
                        file_bytes = this_file['bytes']

                        # save bytes to file, given the name of file and the bytes 
                        file_name = os.path.join('output', this_file['name'])
                        with open(file_name, 'wb') as f:
                            f.write(file_bytes)
                        # if this_file['type'] == 'image/png' or this_file['type'] == 'image/jpeg':
                        #     img = mpimg.imread(file_name)
                        #     # plt.imshow(img)
                        #     # plt.show()

            if enable_trace:
                duration = datetime.datetime.now() - _time_before_call

                if trace_level in ["core", "outline", "all"]:
                    print(colored(f"Agent made a total of {_total_llm_calls} LLM calls, " +\
                                  f"using {_total_in_tokens+_total_out_tokens} tokens " +\
                                  f"(in: {_total_in_tokens}, out: {_total_out_tokens})" +\
                                  f", and took {duration.total_seconds():,.1f} total seconds", "yellow"))

                if trace_level == "all":
                    print(f"Returning agent answer as: {_agent_answer}")
            # Create the file path by joining the session directory and the desired file name
            file_path = os.path.join(session_directory_path, "final_code.py")
            print(f"final code file path is : {file_path}")
            Path(file_path).write_text(_agent_answer)
            return _agent_answer
        
        except Exception as e:
            print(f"Caught exception while processing input to invokeAgent:\n")
            print(f"  for input text:\n{input_text}\n")
            print(f"  on agent: {agent_id}, alias: {agent_alias_id}")
            print(f"  request ID: {_agent_resp['ResponseMetadata']['RequestId']}, retries: {_agent_resp['ResponseMetadata']['RetryAttempts']}\n")
            print(f"Error: {e}")
            raise Exception("Unexpected exception: ", e)
        


In [45]:
input_text = "code to upload file to s3 using amazon.nova-micro-v1:0 model"
invoke(input_text, agent_id)

Agent raw response: {'ResponseMetadata': {'RequestId': 'ebcb7591-ea7d-4e69-8f0d-d3df15493012', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Sun, 19 Jan 2025 04:53:10 GMT', 'content-type': 'application/vnd.amazon.eventstream', 'transfer-encoding': 'chunked', 'connection': 'keep-alive', 'x-amzn-requestid': 'ebcb7591-ea7d-4e69-8f0d-d3df15493012', 'x-amz-bedrock-agent-session-id': '468ba380-d621-11ef-ad0d-16ffcb135d9f', 'x-amzn-bedrock-agent-content-type': 'application/json'}, 'RetryAttempts': 0}, 'contentType': 'application/json', 'sessionId': '468ba380-d621-11ef-ad0d-16ffcb135d9f', 'completion': <botocore.eventstream.EventStream object at 0x7c7d89dda150>}
invokeAgent API response object: {'ResponseMetadata': {'RequestId': 'ebcb7591-ea7d-4e69-8f0d-d3df15493012', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Sun, 19 Jan 2025 04:53:10 GMT', 'content-type': 'application/vnd.amazon.eventstream', 'transfer-encoding': 'chunked', 'connection': 'keep-alive', 'x-amzn-requestid': 'ebcb7591-ea7

'Here is the Python code to upload a file to S3 using the specified model:\n\n```python\nimport boto3\nimport sys\n\ndef upload_to_s3(file_name, bucket, object_name=None):\n    """\n    Upload a file to an S3 bucket\n\n    :param file_name: File to upload\n    :param bucket: Bucket to upload to\n    :param object_name: S3 object name. If not specified then file_name is used\n    :return: True if file was uploaded, else False\n    """\n    # Initialize the S3 client\n    s3_client = boto3.client(\'s3\')\n\n    # If S3 object_name was not specified, use file_name\n    if object_name is None:\n        object_name = file_name\n\n    # Upload the file\n    try:\n        s3_client.upload_file(file_name, bucket, object_name)\n        return True\n    except FileNotFoundError:\n        print("The file was not found", file=sys.stderr)\n        return False\n    except NoCredentialsError:\n        print("Credentials not available", file=sys.stderr)\n        return False\n\ndef main():\n    # Exa