# Transform OpenAPI APIs into MCP tools using Bedrock AgentCore Gateway

## Overview
Customers can bring OpenAPI spec in JSON or YAML and transform the apis into MCP tools using Bedrock AgentCore Gateway. 

The Gateway workflow involves the following steps to connect your agents to external tools:
* **Create the tools for your Gateway** - Define your tools using schemas such as OpenAPI specifications for REST APIs. The OpenAPI specifications are then parsed by Amazon Bedrock AgentCore for creating the Gateway.
* **Create a Gateway endpoint** - Create the gateway that will serve as the MCP entry point with inbound authentication.
* **Add targets to your Gateway** - Configure the OpenAPI targets that define how the gateway routes requests to specific tools. All the APIs that part of OpenAPI file will become an MCP-compatible tool, and will be made available through your Gateway endpoint URL. Configure outbound authorization for each OpenAPI Gateway target. 
* **Update your agent code** - Connect your agent to the Gateway endpoint to access all configured tools through the unified MCP interface.

![How does it work](images/openapi-gateway-apikey.png)

### Tutorial Details


| Information          | Details                                                   |
|:---------------------|:----------------------------------------------------------|
| Tutorial type        | Interactive                                               |
| AgentCore components | AgentCore Gateway, AgentCore Identity                     |
| Agentic Framework    | Strands Agents                                            |
| Gateway Target type  | OpenAPI                                                   |
| Agent                | Finance Agent                                        |
| Inbound Auth IdP     | Amazon Cognito                                            |
| Outbound Auth        | API Key                                                   |
| LLM model            | Anthropic Claude Sonnet 3.7 Inference profile              |
| Tutorial components  | Creating AgentCore Gateway and Invoking AgentCore Gateway |
| Tutorial vertical    | Cross-vertical                                            |
| Example complexity   | Easy                                                      |
| SDK used             | boto3 , AgentCore starter kit                             |

In the first part of the tutorial we will create some AmazonCore Gateway targets

### Tutorial Architecture
In this tutorial we will transform operations defined in OpenAPI yaml/json file into MCP tools and host it in Bedrock AgentCore Gateway.
The solution uses Strands Agent using Amazon Bedrock models.
In our example we will use a strands agent which will invoke Agentcore gateway to use the tools exposed by Intrinio API

## Prerequisites

To execute this tutorial you will need:
* Jupyter notebook (Python kernel)
* uv
* AWS credentials
* Amazon Cognito

In [None]:
!pip install --force-reinstall -U -r requirements.txt --quiet

In [None]:
# Set some environment variables
import os
os.environ['AWS_DEFAULT_REGION'] = os.environ.get('AWS_REGION', 'us-east-1')
BUCKET_NAME='madagentcoreworkshop'
FILE_NAME='intrinio-api-schema.json'
OBJECT_KEY='openapi_3_spec.json'
API_KEY='mad-intrinio-key'
 
 

In [None]:
import os
import sys

# Get the directory of the current script
if '__file__' in globals():
    current_dir = os.path.dirname(os.path.abspath(__file__))
else:
    current_dir = os.getcwd()  # Fallback if __file__ is not defined (e.g., Jupyter)

# Navigate to the directory containing utils.py (one level up)
utils_dir = os.path.abspath(os.path.join(current_dir, '../..'))

# Add to sys.path
sys.path.insert(0, utils_dir)

# Now you can import utils
import utils

In [None]:
#### Create an IAM role for the Gateway to assume
import utils

agentcore_gateway_iam_role = utils.create_agentcore_gateway_role("sample-APIgateway")
print("Agentcore gateway role ARN: ", agentcore_gateway_iam_role['Role']['Arn'])

# Use the Yahoo OAuth Authorizer and Create Gateway

In [None]:
from bedrock_agentcore_starter_toolkit.operations.gateway.client import GatewayClient

# # Initialize the Gateway client
client = GatewayClient(region_name=os.environ['AWS_DEFAULT_REGION'])

 

In [None]:
authorizer_configuration={'customJWTAuthorizer': {'allowedAudience': ['idb2b.finance.agentic-test'],
  'discoveryUrl': 'https://id-uat.b2b.yahooincapis.com/zts/v1/.well-known/openid-configuration'}}

In [None]:
gateway = client.create_mcp_gateway(
    # name=none, # the name of the Gateway - if you don't set one, one will be generated.
    role_arn=agentcore_gateway_iam_role['Role']['Arn'], # the role arn that the Gateway will use - if you don't set one, one will be created.
    authorizer_config=authorizer_configuration, # Variable from inbound authorization setup steps. Contains the OAuth authorizer details for authorizing callers to your Gateway (MCP only supports OAuth).
    enable_semantic_search=True # enable semantic search.

)
gateway_id=gateway['gatewayId']
gateway_url=gateway['gatewayUrl']

# Transforming Intrinio Open APIs into MCP tools using Bedrock AgentCore Gateway

We will use Intrinio APIs to expose as MCP tools. We will use Intrinio API key to configure the credentials provider for creating the OpenAPI target.

In [None]:
import boto3
import json
from pprint import pprint
from botocore.config import Config
import boto3
from botocore.exceptions import ClientError

client = boto3.client('secretsmanager', region_name=os.environ['AWS_DEFAULT_REGION'])
response = client.get_secret_value(SecretId=API_KEY)
secret_dict = json.loads(response['SecretString'])
secret_value = list(secret_dict.values())[0]
acps = boto3.client(service_name="bedrock-agentcore-control")

try:
    response= acps.create_api_key_credential_provider(
        name="IntrinioAPIKey",
        apiKey=secret_value,  
    )
except Exception as e:

    print(e)
    

    response = acps.get_api_key_credential_provider(
        name="IntrinioAPIKey"
    )
credentialProviderARN = response['credentialProviderArn']
pprint(f"Egress Credentials provider ARN, {credentialProviderARN}")

#### If you see an error as below
#### "An error occurred (ValidationException) when calling the CreateApiKeyCredentialProvider operation: Credential provider with #### name: IntrinioAPIKey already exists
#### ('Egress Credentials provider ARN, '
#### 'arn:aws:bedrock-agentcore:xxxxxxxxxx:token-vault/default/apikeycredentialprovider/IntrinioAPIKey')"
####  ignore the error. This means the credential provider is created by other users

# Create an OpenAPI target 

#### We will use a S3 bucket to store the OpenAPI spec from Intrinio

In [None]:

openapi_s3_uri = f's3://{BUCKET_NAME}/{OBJECT_KEY}'
print(f'Uploaded object S3 URI: {openapi_s3_uri}')


#### Configure outbound auth and Create the gateway target

In [None]:
gateway_client = boto3.client('bedrock-agentcore-control', region_name = os.environ['AWS_DEFAULT_REGION'])

# S3 Uri for OpenAPI spec file
Intrinio_openapi_s3_target_config = {
    "mcp": {
          "openApiSchema": {
              "s3": {
                  "uri": openapi_s3_uri
              }
          }
      }
}

# API Key credentials provider configuration
api_key_credential_config = [
    {
        "credentialProviderType" : "API_KEY", 
        "credentialProvider": {
            "apiKeyCredentialProvider": {
                    "credentialParameterName": "api_key", # Replace this with the name of the api key name expected by the respective API provider. For passing token in the header, use "Authorization"
                    "providerArn": credentialProviderARN,
                    "credentialLocation":"QUERY_PARAMETER", # Location of api key. Possible values are "HEADER" and "QUERY_PARAMETER".
                    #"credentialPrefix": " " # Prefix for the token. Valid values are "Basic". Applies only for tokens.
            }
        }
    }
  ]

targetname='DemoOpenAPITargetS3Intrinio'
response = gateway_client.create_gateway_target(
    gatewayIdentifier=gateway_id,
    name=targetname,
    description='OpenAPI Target with S3Uri using SDK',
    targetConfiguration=Intrinio_openapi_s3_target_config,
    credentialProviderConfigurations=api_key_credential_config)

# Calling Bedrock AgentCore Gateway from a Strands Agent

The Strands agent seamlessly integrates with AWS tools through the Bedrock AgentCore Gateway, which implements the Model Context Protocol (MCP) specification. This integration enables secure, standardized communication between AI agents and AWS services.

At its core, the Bedrock AgentCore Gateway serves as a protocol-compliant Gateway that exposes fundamental MCP APIs: ListTools and InvokeTools. These APIs allow any MCP-compliant client or SDK to discover and interact with available tools in a secure, standardized way. When the Strands agent needs to access AWS services, it communicates with the Gateway using these MCP-standardized endpoints.

The Gateway's implementation adheres strictly to the (MCP Authorization specification)[https://modelcontextprotocol.org/specification/draft/basic/authorization], ensuring robust security and access control. This means that every tool invocation by the Strands agent goes through authorization step, maintaining security while enabling powerful functionality.

For example, when the Strands agent needs to access MCP tools, it first calls ListTools to discover available tools, then uses InvokeTools to execute specific actions. The Gateway handles all the necessary security validations, protocol translations, and service interactions, making the entire process seamless and secure.

This architectural approach means that any client or SDK that implements the MCP specification can interact with AWS services through the Gateway, making it a versatile and future-proof solution for AI agent integrations.

# Request the access token from Athenz B2B for inbound authorization

In [None]:
def get_private_key():

    secret_name = "AthenzPrivateKey"
    region_name = "us-east-1"

    # Create a Secrets Manager client
    session = boto3.session.Session()
    secret_client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        get_secret_value_response = secret_client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        # For a list of exceptions thrown, see
        # https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
        raise e

    secret = get_secret_value_response['SecretString']
    return secret

In [None]:
import base64, json, time
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature


def b64url(data: bytes) -> str:
    return base64.urlsafe_b64encode(data).decode().rstrip("=")

def generate_jwt_token():

    # JWT header and payload
    header = {
        "alg": "ES256",
        "typ": "JWT",
        "kid": "v0"
    }
    payload = {
        "iss": "idb2b.finance.agentic-test.217f928e-f18b-4123-9682-dd320fc1fcb4",
        "sub": "idb2b.finance.agentic-test.217f928e-f18b-4123-9682-dd320fc1fcb4",
        "aud": "https://id-uat.b2b.yahooincapis.com/zts/v1", 
        "exp": int(time.time()) + 10 * 60 * 60  # 10 minutes
    }
    
    # Encode header and payload
    encoded_header = b64url(json.dumps(header, separators=(",", ":")).encode())
    encoded_payload = b64url(json.dumps(payload, separators=(",", ":")).encode())
    signing_input = f"{encoded_header}.{encoded_payload}".encode()
    private_key_str = get_private_key()
    print(private_key_str)
    private_key = serialization.load_pem_private_key(bytearray(private_key_str, "UTF-8"), password=None)
#    private_key = serialization.load_pem_private_key(private_key_str.encode('utf-8'),password=None)
    
    # Load EC private key
# with open("./client_private_key.pem", "rb") as key_file:
#      private_key = serialization.load_pem_private_key(key_file.read(), password=None)
    
    # Sign and convert DER → raw (r||s)
    der_signature = private_key.sign(signing_input, ec.ECDSA(hashes.SHA256()))
    r, s = decode_dss_signature(der_signature)
    r_bytes = r.to_bytes(32, byteorder="big")
    s_bytes = s.to_bytes(32, byteorder="big")
    raw_signature = r_bytes + s_bytes
    print ("private key successful")
    
    # Encode raw signature to base64url
    encoded_signature = b64url(raw_signature)
    
    # Final JWT
    jwt_token = f"{encoded_header}.{encoded_payload}.{encoded_signature}"
    print("JWT Client Assertion:\n", jwt_token)
    return jwt_token

In [None]:
def get_access_token():
    url = "https://id-uat.b2b.yahooincapis.com/zts/v1/oauth2/token"
    payload = {
        "grant_type": "client_credentials",
        "scope": "agent",
        "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
        "client_assertion": generate_jwt_token()
    }
    
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }
    
    response = requests.post(url, data=payload, headers=headers)
    
#    print("Status code:", response.status_code)
#  print("Response body:", response.text)
    response_json=response.json()
    access_token = response_json.get("access_token")
    return access_token
    

In [None]:
import requests
from bedrock_agentcore_starter_toolkit.operations.gateway.client import GatewayClient

# Initialize the Gateway client
gateway_client_toolkit = GatewayClient(region_name=os.environ['AWS_DEFAULT_REGION'])

access_token =  get_access_token()


In [None]:
access_token

# Finance agent will use Bedrock AgentCore Gateway to retrive information from MCP tools

In [None]:
from strands.models import BedrockModel
from mcp.client.streamable_http import streamablehttp_client 
from strands.tools.mcp.mcp_client import MCPClient
from strands import Agent

def create_streamable_http_transport():
    return streamablehttp_client(gateway_url,headers={"Authorization": f"Bearer {access_token}"})

mcp_client = MCPClient(create_streamable_http_transport)

## The IAM group/user/ configured in ~/.aws/credentials should have access to Bedrock model
yourmodel = BedrockModel(
    model_id="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
    temperature=0.7
)

In [None]:
from strands import Agent
import logging


SYSTEM_PROMPT="You are a Financial Agent. You can use various tools available to you to get the financial and company information for a company" \
"Use the company name or ticker within the prompt and pass it as a required parametr or identifier to the tools. Identify the required parameters or identifiers" \
"Sometimes tag is a required parameter to the tool . use your judgement to derive a possible value for the tag from the prompt" 
# Configure the root strands logger. Change it to DEBUG if you are debugging the issue
logging.getLogger("strands").setLevel(logging.INFO)

# Add a handler to see the logs
logging.basicConfig(
    format="%(levelname)s | %(name)s | %(message)s", 
    handlers=[logging.StreamHandler()]
)

with mcp_client:
    # Call the listTools 
    tools = mcp_client.list_tools_sync()
    # Create an Agent with the model and tools
    agent = Agent(model=yourmodel,tools=tools, system_prompt=SYSTEM_PROMPT) ## you can replace with any model you like
    print(f"Tools loaded in the agent are {agent.tool_names}")
    # print(f"Tools configuration in the agent are {agent.tool_config}")
    # Invoke the agent with the sample prompt. This will only invoke  MCP listTools and retrieve the list of tools the LLM has access to. The below does not actually call any tool.
    # agent("Hi , can you list all tools available to you")
    agent("get company information for Nvidia")
    agent("get company financial information for Apple")
    # Invoke the agent with sample prompt, invoke the tool and display the response
    #Call the MCP tool explicitly. The MCP Tool name and arguments must match with your AWS Lambda function or the OpenAPI/Smithy API
    # result = client.call_tool_sync(
    # tool_use_id="get-intrinio_tools_1", # You can replace this with unique identifier. 
    # name=targetname+"___getCompanyFundamentals", # This is the tool name based on AWS Lambda target types. This will change based on the target name
    # arguments={"ver": "1.0","feedtype": "json"}
    #)
    #Print the MCP Tool response
    #print(f"Tool Call result: {result['content'][0]['text']}")


# Strands Agents with AgentCore Memory (Short-Term Memory)


## Introduction

This tutorial demonstrates how to enhance the strands agent we just built with AgentCore **short-term memory** (Raw events). The agent remembers recent conversations in the session using `get_last_k_turns` and can continue conversations seamlessly when user returns.


### Tutorial Details

| Information         | Details                                                                          |
|:--------------------|:---------------------------------------------------------------------------------|
| Tutorial type       | Short Term Conversational                                                        |
| Agent type          | Finance Agent                                                                   |
| Agentic Framework   | Strands Agents                                                                   |
| LLM model           | Anthropic Claude Sonnet 3.7                                                      |
| Tutorial components | AgentCore Short-term Memory, AgentInitializedEvent and MessageAddedEvent hooks   |
| Example complexity  | Beginner                                                                         |

You'll learn to:
- Use short-term memory for conversation continuity
- Retrieve last K conversation turns
- Initialize agents with conversation history

## Architecture
<div style="text-align:left">
    <img src="EnhanceWithMemory.png" width="65%" />
</div>

## Prerequisites

- Python 3.10+
- AWS credentials with AgentCore Memory permissions
- AgentCore Memory role ARN
- Access to Amazon Bedrock models

Let's get started by setting up our environment!

In [None]:
import logging
from datetime import datetime

# Setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("finance-agent")

In [None]:
# Imports
from strands.hooks import AgentInitializedEvent, HookProvider, HookRegistry, MessageAddedEvent
from bedrock_agentcore.memory import MemoryClient

# Configuration
os.environ['AWS_DEFAULT_REGION'] = os.environ.get('AWS_REGION', 'us-east-1')
ACTOR_ID = "user_123" # It can be any unique identifier (AgentID, User ID, etc.)
SESSION_ID = "personal_session_001" # Unique session identifier


In [None]:
from lab_helpers.utils import get_ssm_parameter, put_ssm_parameter  
memory_client = MemoryClient(region_name=os.environ['AWS_DEFAULT_REGION'])
memory_name = "FinanceAgentMemory"

def create_or_get_memory_resource():
    try:
        memory_id = get_ssm_parameter("/app/financeagent/agentcore/memory_id")
        memory_client.gmcp_client.get_memory(memoryId=memory_id)
        return memory_id
    except:
        try:
            print("Creating AgentCore Memory resources. This will take 2-3 minutes...")
            print("While we wait, let's understand what's happening behind the scenes:")
            print("• Setting up managed vector databases for semantic search")
            print("• Configuring memory extraction pipelines")
            print("• Provisioning secure, multi-tenant storage")
            print("• Establishing namespace isolation for customer data")
            # *** AGENTCORE MEMORY USAGE *** - Create memory resource with semantic strategy
            response = memory_client.create_memory_and_wait(
                name=memory_name,
                description="short term  memory for finance agent",
                strategies=[],
                event_expiry_days=90,          # Memories expire after 90 days
            )
            memory_id = response["id"]
            try:
                put_ssm_parameter("/app/financeagent/agentcore/memory_id", memory_id)
            except:
                raise
            return memory_id
        except Exception as e:
            print(f"Failed to create memory resource: {e}")
            return None

In [None]:
memory_id = create_or_get_memory_resource()
if memory_id:
    print("✅ AgentCore Memory created successfully!")
    print(f"Memory ID: {memory_id}")
else:
    print("Memory resource not created. Try Again !")

In [None]:
class MemoryHookProvider(HookProvider):
    def __init__(self, memory_client: memory_client, memory_id: str, actor_id: str, session_id: str):
        self.memory_client = memory_client
        self.memory_id = memory_id
        self.actor_id = actor_id
        self.session_id = session_id
    def on_agent_initialized(self, event: AgentInitializedEvent):
        """Load recent conversation history when agent starts"""
        try:
            # Load the last 5 conversation turns from memory
            recent_turns = self.memory_client.get_last_k_turns(
                memory_id=self.memory_id,
                actor_id=self.actor_id,
                session_id=self.session_id,
                k=5
            )
            
            if recent_turns:
                # Format conversation history for context
                context_messages = []
                for turn in recent_turns:
                    for message in turn:
                        role = message['role']
                        content = message['content']['text']
                        context_messages.append(f"{role}: {content}")
                
                context = "\n".join(context_messages)
                # Add context to agent's system prompt.
                event.agent.system_prompt += f"\n\nRecent conversation:\n{context}"
                logger.info(f"✅ Loaded {len(recent_turns)} conversation turns")
                
        except Exception as e:
            logger.error(f"Memory load error: {e}")
    
    def on_message_added(self, event: MessageAddedEvent):
        MAX_LENGTH = 9000
        """Store messages in memory"""
        messages = event.agent.messages     
        if len(messages) > MAX_LENGTH:
            truncated_text = messages[:MAX_LENGTH]
        else:
            truncated_text = messages
        try:
            self.memory_client.create_event(
                memory_id=self.memory_id,
                actor_id=self.actor_id,
                session_id=self.session_id,
                messages=[(str(truncated_text[-1].get("content", "")), truncated_text[-1]["role"])]
            )
        except Exception as e:
            logger.error(f"Memory save error: {e}")
    
    def register_hooks(self, registry: HookRegistry):
        # Register memory hooks
        registry.add_callback(MessageAddedEvent, self.on_message_added)
        registry.add_callback(AgentInitializedEvent, self.on_agent_initialized)

In [None]:
def create_personal_agent():
    """Create personal agent with memory """
    agent = Agent(
        name="FinanceAssistant",
        model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",  # or your preferred model
        system_prompt=f""" You are a Financial Agent. You can use various tools available to you to get the financial and company information for a company
Use the company name or ticker within the prompt and pass it as a required parametr or identifier to the tools. Identify the required parameters or identifiers
Sometimes tag is a required parameter to the tool . use your judgement to derive a possible value for the tag from the prompt
        
       
        
       
        Today's date: {datetime.today().strftime('%Y-%m-%d')}
        Be friendly and professional.""",
        hooks=[MemoryHookProvider(memory_client, memory_id, ACTOR_ID, SESSION_ID)],
        tools=tools,
    )
    return agent

# Create agent
agent = create_personal_agent()
logger.info("✅ Personal agent created with memory ")

In [None]:
with mcp_client:
    agent("get company information for Nvidia")
    agent("My name is Alex and I'm interested in learning about company IBM.")
    agent("I'm particularly interested in machine learning applications.")

In [None]:
 # Create new agent instance (simulates user returning)
print("=== User Returns - New Session ===")
new_agent = create_personal_agent()

# Test memory continuity
print(f"User: What was my name again?")
print(f"Agent: ", end="")
with mcp_client:
    new_agent("What was my name again?")

    print(f"User: What was my last question?")
    print(f"Agent: ", end="")
    new_agent("what was my last question")

## View Stored Memory

In [None]:
# Check what's stored in memory
print("=== Memory Contents ===")
recent_turns = memory_client.get_last_k_turns(
    memory_id=memory_id,
    actor_id=ACTOR_ID,
    session_id=SESSION_ID,
    k=6 # Adjust k to see more or fewer turns
)

for i, turn in enumerate(recent_turns, 1):
    print(f"Turn {i}:")
    for message in turn:
        role = message['role']
        content = message['content']['text'][:100] + "..." if len(message['content']['text']) > 100 else message['content']['text']
        print(f"  {role}: {content}")
    print()

## Step 3: Deploy to Production - Use AgentCore Runtime with Observability

### Overview

In Steps 1 we scaled Finance Agent by centralizing tools through AgentCore Gateway with secure authentication. In Step 2  we enhanced the Agent with short term Memory. But the Agent was still runninng in local environment (notebook instance) . Now we'll complete the production journey by deploying our agent to AgentCore Runtime with comprehensive observability. This will transform our prototype into a production-ready system that can handle real-world traffic with full monitoring and automatic scaling.

[Amazon Bedrock AgentCore Runtime](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/agents-tools-runtime.html) is a secure, fully managed runtime that empowers organizations to deploy and scale AI agents in production, regardless of framework, protocol, or model choice. It provides enterprise-grade reliability, automatic scaling, and comprehensive monitoring capabilities.

**Workshop Journey:**

- **Step 1 (Done):** Create Agent Prototype with Gateway and Identity - Tools access across agents securely
- **Step 2 (Done):** Enhance with Memory - Added conversation context
- **Step 3(Current):** Deploy to Production - Used AgentCore Runtime with observability


### Why AgentCore Runtime & Production Deployment Matter

Current State (Steps 1 & 2): Agent runs locally with centralized tools but faces production challenges:

- Agent runs locally in a single session
- No comprehensive monitoring or debugging capabilities
- Cannot handle multiple concurrent users reliably

After this step , we will have a production-ready agent infrastructure with:

- Serverless auto-scaling to handle variable demand
- Comprehensive observability with traces, metrics, and logging
- Enterprise reliability with automatic error recovery
- Secure deployment with proper access controls
- Easy management through AWS console and APIs and support for real-world production workloads.


### Adding comprehensive observability with AgentCore Observability

Additionally, AgentCore Runtime integrates seamlessly with [AgentCore Observability](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/observability.html) to provide full visibility into your agent's behavior in production. AgentCore Observability automatically captures traces, metrics, and logs from your agent interactions, tool usage, and memory access patterns. In this lab we will see how AgentCore Runtime integrates with CloudWatch GenAI Observability to provide comprehensive monitoring and debugging capabilities.

For request tracing, AgentCore Observability captures the complete conversation flow including tool invocations, memory retrievals, and model interactions. For performance monitoring, it tracks response times, success rates, and resource utilization to help optimize your agent's performance.

During the observability flow, AgentCore Runtime automatically instruments your agent code and sends telemetry data to CloudWatch. You can then use CloudWatch dashboards and GenAI Observability features to analyze patterns, identify bottlenecks, and troubleshoot issues in real-time.

### Architecture for Step 3
<div style="text-align:left"> 
    <img src="Step3.png" width="75%"/> 
</div>

*Agent now runs in AgentCore Runtime with full observability through CloudWatch, serving production traffic with auto-scaling and comprehensive monitoring. Memory and Gateway integrations from previous labs remain fully functional in the production environment.*

### Key Features

- **Serverless Agent Deployment:** Transform your local agent into a scalable production service using AgentCore Runtime with minimal code changes
- **Comprehensive Observability:** Full request tracing, performance metrics, and debugging capabilities through CloudWatch GenAI Observability

### Prerequisites

- Python 3.12+
- AWS account with appropriate permissions
- Docker, Finch or Podman installed and running
- Amazon Bedrock AgentCore SDK
- Strands Agents framework

**Note**: You MUST enable [CloudWatch Transaction Search](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/Enable-TransactionSearch.html) to be able to see AgentCore Observability traces in CloudWatch.


In [None]:
%%writefile finance_agent.py

import boto3
from strands.models import BedrockModel
from mcp.client.streamable_http import streamablehttp_client 
from strands.tools.mcp.mcp_client import MCPClient
from strands import Agent
from bedrock_agentcore.runtime import BedrockAgentCoreApp
from strands import Agent
import logging
from datetime import datetime
import os
import requests

# Imports
from strands.hooks import AgentInitializedEvent, HookProvider, HookRegistry, MessageAddedEvent
from bedrock_agentcore.memory import MemoryClient

# Configuration
REGION = os.getenv('AWS_REGION', 'us-east-1') # AWS region for the agent
ACTOR_ID = "user_123" # It can be any unique identifier (AgentID, User ID, etc.)
SESSION_ID = "personal_session_001" # Unique session identifier
GATEWAY_URL='https://testgateway810d5a68-sbcqd21fmi.gateway.bedrock-agentcore.us-west-2.amazonaws.com/mcp'

# Setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("finance-agent")


from bedrock_agentcore_starter_toolkit.operations.gateway.client import GatewayClient
from typing import Dict, Any
from botocore.exceptions import ClientError
import uuid

# Configuration
os.environ['AWS_DEFAULT_REGION'] = os.environ.get('AWS_REGION', 'us-east-1')
ACTOR_ID = "user_123" # It can be any unique identifier (AgentID, User ID, etc.)
SESSION_ID = "personal_session_001" # Unique session identifier
# Initialize Memory Client
from lab_helpers.utils import get_ssm_parameter, put_ssm_parameter  
memory_client = MemoryClient(region_name=os.environ['AWS_DEFAULT_REGION'])
memory_name = "FinanceAgentMemory"

def create_or_get_memory_resource():
    try:
        memory_id = get_ssm_parameter("/app/financeagent/agentcore/memory_id")
        memory_client.gmcp_client.get_memory(memoryId=memory_id)
        return memory_id
    except:
        try:
            print("Creating AgentCore Memory resources. This will take 2-3 minutes...")
            print("While we wait, let's understand what's happening behind the scenes:")
            print("• Setting up managed vector databases for semantic search")
            print("• Configuring memory extraction pipelines")
            print("• Provisioning secure, multi-tenant storage")
            print("• Establishing namespace isolation for customer data")
            # *** AGENTCORE MEMORY USAGE *** - Create memory resource with semantic strategy
            response = memory_client.create_memory_and_wait(
                name=memory_name,
                description="short term  memory for finance agent",
                strategies=[],
                event_expiry_days=90,          # Memories expire after 90 days
            )
            memory_id = response["id"]
            try:
                put_ssm_parameter("/app/financeagent/agentcore/memory_id", memory_id)
            except:
                raise
            return memory_id
        except Exception as e:
            print(f"Failed to create memory resource: {e}")
            return None
            
memory_id = create_or_get_memory_resource()
if memory_id:
    print("✅ AgentCore Memory created successfully!")
    print(f"Memory ID: {memory_id}")
else:
    print("Memory resource not created. Try Again !")
    
class MemoryHookProvider(HookProvider):
    def __init__(self, memory_client: memory_client, memory_id: str, actor_id: str, session_id: str):
        self.memory_client = memory_client
        self.memory_id = memory_id
        self.actor_id = actor_id
        self.session_id = session_id
    def on_agent_initialized(self, event: AgentInitializedEvent):
        """Load recent conversation history when agent starts"""
        try:
            # Load the last 5 conversation turns from memory
            recent_turns = self.memory_client.get_last_k_turns(
                memory_id=self.memory_id,
                actor_id=self.actor_id,
                session_id=self.session_id,
                k=5
            )
            
            if recent_turns:
                # Format conversation history for context
                context_messages = []
                for turn in recent_turns:
                    for message in turn:
                        role = message['role']
                        content = message['content']['text']
                        context_messages.append(f"{role}: {content}")
                
                context = "\n".join(context_messages)
                # Add context to agent's system prompt.
                event.agent.system_prompt += f"\n\nRecent conversation:\n{context}"
                logger.info(f"✅ Loaded {len(recent_turns)} conversation turns")
                
        except Exception as e:
            logger.error(f"Memory load error: {e}")
    
    def on_message_added(self, event: MessageAddedEvent):
        MAX_LENGTH = 9000
        """Store messages in memory"""
        messages = event.agent.messages     
        if len(messages) > MAX_LENGTH:
            truncated_text = messages[:MAX_LENGTH]
        else:
            truncated_text = messages
        try:
            self.memory_client.create_event(
                memory_id=self.memory_id,
                actor_id=self.actor_id,
                session_id=self.session_id,
                messages=[(str(truncated_text[-1].get("content", "")), truncated_text[-1]["role"])]
            )
        except Exception as e:
            logger.error(f"Memory save error: {e}")
    
    def register_hooks(self, registry: HookRegistry):
        # Register memory hooks
        registry.add_callback(MessageAddedEvent, self.on_message_added)

import base64, json, time
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature


def b64url(data: bytes) -> str:
    return base64.urlsafe_b64encode(data).decode().rstrip("=")

def get_secret():

    secret_name = "AthenzPrivateKey"
    region_name = "us-west-2"

    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        # For a list of exceptions thrown, see
        # https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
        raise e

    secret = get_secret_value_response['SecretString']
    return secret

def generate_jwt_token():

    # JWT header and payload
    header = {
        "alg": "ES256",
        "typ": "JWT",
        "kid": "v0"
    }
    payload = {
        "iss": "idb2b.finance.agentic-test.217f928e-f18b-4123-9682-dd320fc1fcb4",
        "sub": "idb2b.finance.agentic-test.217f928e-f18b-4123-9682-dd320fc1fcb4",
        "aud": "https://id-uat.b2b.yahooincapis.com/zts/v1", 
        "exp": int(time.time()) + 10 * 60 * 60  # 10 minutes
    }
    
    # Encode header and payload
    encoded_header = b64url(json.dumps(header, separators=(",", ":")).encode())
    encoded_payload = b64url(json.dumps(payload, separators=(",", ":")).encode())
    signing_input = f"{encoded_header}.{encoded_payload}".encode()
    private_key_str = get_secret()
    private_key = serialization.load_pem_private_key(bytearray(private_key_str, "UTF-8"), password=None)
    # Load EC private key
  #  with open("./client_private_key.pem", "rb") as key_file:
#     private_key = serialization.load_pem_private_key(key_file.read(), password=None)
    print ("private key successful")
    # Sign and convert DER → raw (r||s)
    der_signature = private_key.sign(signing_input, ec.ECDSA(hashes.SHA256()))
    r, s = decode_dss_signature(der_signature)
    r_bytes = r.to_bytes(32, byteorder="big")
    s_bytes = s.to_bytes(32, byteorder="big")
    raw_signature = r_bytes + s_bytes
    
    # Encode raw signature to base64url
    encoded_signature = b64url(raw_signature)
    
    # Final JWT
    jwt_token = f"{encoded_header}.{encoded_payload}.{encoded_signature}"
    print("JWT Client Assertion:\n", jwt_token)
    return jwt_token

def get_access_token():
    url = "https://id-uat.b2b.yahooincapis.com/zts/v1/oauth2/token"
    payload = {
        "grant_type": "client_credentials",
        "scope": "agent",
        "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
        "client_assertion": generate_jwt_token()
    }
    
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }
    
    response = requests.post(url, data=payload, headers=headers)
    
   
    response_json=response.json()
    access_token = response_json.get("access_token")
    print("access token: ", access_token)
    return access_token

def create_streamable_http_transport(access_token):
    return lambda: streamablehttp_client(GATEWAY_URL,headers={"Authorization": f"Bearer {access_token}"})

memory_id = create_or_get_memory_resource()
if memory_id:
    print("✅ AgentCore Memory created successfully!")
    print(f"Memory ID: {memory_id}")
else:
    print("Memory resource not created. Try Again !")

access_token =  get_access_token()

mcp_client = MCPClient(create_streamable_http_transport(access_token))
   
app = BedrockAgentCoreApp()

logger.info("✅ Personal agent created with memory ")

yourmodel = BedrockModel(
    model_id="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
    temperature=0.7
)
# Get tools once at startup
with mcp_client:
    tools = mcp_client.list_tools_sync()
    agent = Agent(
        name="PersonalAssistant",
        model=yourmodel,
        system_prompt=f"""You are a Financial Agent. You can use various tools available to you to get the financial and company information for a company
                        Use the company name or ticker within the prompt and pass it as a required parametr or identifier to the tools. Identify the required parameters or identifiers
                        Sometimes tag is a required parameter to the tool . use your judgement to derive a possible value for the tag from the prompt          
                        Today's date: {datetime.today().strftime('%Y-%m-%d')}
                        Be friendly and professional.""",
        hooks=[MemoryHookProvider(memory_client, memory_id, ACTOR_ID, SESSION_ID)],
        tools=tools,
    )    
app = BedrockAgentCoreApp() 
@app.entrypoint
def strands_agent_bedrock(payload):
    """
    Invoke the agent with a payload
    """    
    user_input = payload.get("prompt")
    print("User input:", user_input)
    with mcp_client:
        response = agent(user_input)
        return response.message['content'][0]['text']


if __name__ == "__main__":
    app.run()


In [None]:
%reload_ext autoreload 
from bedrock_agentcore_starter_toolkit import Runtime
from boto3.session import Session
from lab_helpers.utils import create_agentcore_runtime_execution_role 
boto_session = Session()
region = boto_session.region_name
import boto3

# Initialize the runtime toolkit
boto_session = boto3.session.Session()
region = boto_session.region_name


execution_role_arn = create_agentcore_runtime_execution_role()
agentcore_runtime = Runtime()
agent_name = "finance_agent_1"
response = agentcore_runtime.configure(
    entrypoint="finance_agent.py",
    execution_role=execution_role_arn,
    auto_create_ecr=True,
    requirements_file="requirements.txt",
    region=region,
    agent_name=agent_name
)
response


In [None]:
launch_result = agentcore_runtime.launch()

In [None]:
import time
status_response = agentcore_runtime.status()
status = status_response.endpoint['status']
end_status = ['READY', 'CREATE_FAILED', 'DELETE_FAILED', 'UPDATE_FAILED']
while status not in end_status:
    time.sleep(10)
    status_response = agentcore_runtime.status()
    status = status_response.endpoint['status']
    print(status)
status

In [None]:
invoke_response = agentcore_runtime.invoke({"prompt": "What is the company financials for Apple"})
invoke_response

In [None]:
invoke_response = agentcore_runtime.invoke({"prompt": "What was my last question"})
invoke_response

# Clean up
Additional resources are also created like IAM role, IAM Policies, Credentials provider, AWS Lambda functions, Cognito user pools, s3 buckets that you might need to manually delete as part of the clean up. This depends on the example you run.

## Delete the gateway (Optional)

In [None]:
import utils
utils.delete_gateway(gateway_client,gateway_id)
client.delete_memory_and_wait(memory_id)
logger.info(f"✅ Deleted memory: {memory_id}")