# Deploying LangGraph ReAct Agent to AWS Bedrock AgentCore

This notebook demonstrates how to deploy a LangGraph ReAct agent to AWS Bedrock AgentCore using direct boto3 API calls. It also enables you to visualize the agent's decision-making process in the GenAI Observability dashboard in Amazon CloudWatch.

LangGraph is a library for building stateful, multi-actor applications with LLMs using graphs. It's particularly well-suited for implementing ReAct (Reasoning and Acting) style agents that can reason about a problem and take actions to solve it.

To can access the Amazon Bedrock AgentCore Developer Guide, check out the [AWS documentation](https://docs.aws.amazon.com/bedrock-agentcore/).

## Prerequisites

- Python 3.12 or later
- boto3 1.39 or later
- AWS CLI installed and configured with appropriate permissions

In addition, you should have the following readily available:
- Amazon S3 bucket to package code
- AWS IAM role for automation access
- Knowledge bases are fully synched
- External functions for enterprise systems

## Step 0: Install required packages

Ensure you have the latest versions required installed.

## Step 1: Import required libraries

Import all necessary Python libraries for AWS interactions and file handling.

In [30]:
!aws sso login --profile bedrock-demo

Attempting to automatically open the SSO authorization page in your default browser.
If the browser does not open, open the following URL:

https://oidc.us-east-1.amazonaws.com/authorize?response_type=code&client_id=5HFSsfzb_H2rFv2ZjBPM_nVzLWVhc3QtMQ&redirect_uri=http%3A%2F%2F127.0.0.1%3A50027%2Foauth%2Fcallback&state=3773008d-72f4-4cf4-87cb-4ad23e83bef2&code_challenge_method=S256&scopes=sso%3Aaccount%3Aaccess&code_challenge=aDI4ZmNFgIE30_4xPJj95YXyMPTGTgeHk1lzUg-92KA
Successfully logged into Start URL: https://d-9067a83728.awsapps.com/start/#


In [None]:
import os
import re
import sys
import json
import time
import uuid
import string
import random
import zipfile
import tempfile
import collections
import boto3
import subprocess
import botocore
from pathlib import Path

 # Set AWS Profile for the entire notebook
os.environ['AWS_PROFILE'] = 'bedrock-demo'
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'

# Get AWS credentials from CLI (works with SSO)
def get_aws_credentials():
    """Get AWS credentials from CLI that work with SSO"""
    try:
        # Get credentials using AWS CLI
        result = subprocess.run(
            ['aws', 'sts', 'get-caller-identity', '--profile', 'bedrock-demo'],
            capture_output=True, text=True, check=True
        )
        identity = json.loads(result.stdout)

        # Get session token
        creds_result = subprocess.run(
            ['aws', 'configure', 'export-credentials', '--profile', 'bedrock-demo'],
            capture_output=True, text=True, check=True
        )
        creds = json.loads(creds_result.stdout)

        # Set environment variables
        os.environ['AWS_ACCESS_KEY_ID'] = creds['AccessKeyId']
        os.environ['AWS_SECRET_ACCESS_KEY'] = creds['SecretAccessKey']
        os.environ['AWS_SESSION_TOKEN'] = creds['SessionToken']
        os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'

        print(f"✅ AWS Credentials configured from SSO")
        print(f"   Account: <YOUR_ACCOUNT_ID>")
        print(f"   User: <YOUR_USER>")

        # Configure boto3 default session
        boto3.setup_default_session(
            aws_access_key_id=creds['AccessKeyId'],
            aws_secret_access_key=creds['SecretAccessKey'],
            aws_session_token=creds['SessionToken'],
            region_name='us-east-1'
        )

        return True

    except subprocess.CalledProcessError as e:
        print(f"❌ Failed to get AWS credentials: {e}")
        print("\nPlease run: aws sso login --profile bedrock-demo")
        return False
    except Exception as e:
        print(f"❌ Error setting up credentials: {e}")
        return False

# Get AWS credentials
if not get_aws_credentials():
    print("\n⚠️ Please login to AWS SSO and restart the kernel")
else:
    print("\n✅ Ready to proceed with the notebook")

# LaunchDarkly imports for configuration management
import ldclient
from ldclient import Context
from ldclient.config import Config as LDConfig
from ldai.client import LDAIClient

## Step 2: Set required parameters

Update the parameters below using your environment specific details. These parameters will be used throughout the notebook for creating and configuring the agent.

In [None]:
# LaunchDarkly Configuration
LAUNCHDARKLY_SDK_KEY = os.getenv('LAUNCHDARKLY_SDK_KEY', '<provide LaunchDarkly SDK Key>')
LAUNCHDARKLY_AGENT_KEY = os.getenv('LAUNCHDARKLY_AGENT_KEY', 'pet-store-agent')

# AWS Configuration (can be overridden by LaunchDarkly)
# These should be set in your .env file:
# AWS_ACCOUNT_ID=your-account-id
# AGENTCORE_ROLE_NAME=your-role-name
# BUILD_BUCKET_NAME=your-bucket-name

AWS_ACCOUNT_ID = os.getenv('AWS_ACCOUNT_ID', '<YOUR_ACCOUNT_ID>')
AGENTCORE_ROLE_NAME = os.getenv('AGENTCORE_ROLE_NAME', 'PetStoreAgentCoreExecutionRole')
BUILD_BUCKET_NAME = os.getenv('BUILD_BUCKET_NAME', 'petstore-build')

# Construct ARNs from environment variables
SolutionAccessRoleArn = f'arn:aws:iam::{AWS_ACCOUNT_ID}:role/{AGENTCORE_ROLE_NAME}'
CodeBucketForAutomationARN = f'arn:aws:s3:::{BUILD_BUCKET_NAME}'
Agent_Directory_Name = '.'

# LlamaIndex Configuration (will be pulled from LaunchDarkly)
LLAMAINDEX_STORAGE_DIR = './storage'  # Default, will be overridden by LaunchDarkly
LLAMAINDEX_DATA_DIR = './data'        # Default, will be overridden by LaunchDarkly

print(f"LaunchDarkly Agent Key: {LAUNCHDARKLY_AGENT_KEY}")
print(f"AWS Account ID: {'***' + AWS_ACCOUNT_ID[-4:] if len(AWS_ACCOUNT_ID) > 4 else '***'}")
print(f"Role: {AGENTCORE_ROLE_NAME}")
print(f"Bucket: {BUILD_BUCKET_NAME}")

## Step 3: Initialize LaunchDarkly and Fetch Configuration

Initialize the LaunchDarkly client and fetch the agent configuration dynamically. This replaces hardcoded values with feature flags.

In [33]:
def fetch_launchdarkly_config():
    """
    Fetch configuration from LaunchDarkly for the pet store agent.
    Returns a dictionary with all necessary configuration values.
    """
    config = {}
    
    try:
        # Initialize LaunchDarkly client
        ldclient.set_config(LDConfig(LAUNCHDARKLY_SDK_KEY))
        ld_client = ldclient.get()
        
        if not ld_client.is_initialized():
            print("⚠️ Warning: LaunchDarkly client failed to initialize. Using defaults.")
            return config
        
        # Create AI client wrapper
        ai_client = LDAIClient(ld_client)
        
        # Build context for configuration retrieval
        context = Context.builder("deployment-context").build()
        
        # Get agent configuration
        from ldai.client import AIAgentConfigDefault, ModelConfig, ProviderConfig
        
        default_agent = AIAgentConfigDefault(
            enabled=False,
            model=ModelConfig("ERROR_MODEL_NOT_CONFIGURED"),
            provider=ProviderConfig("ERROR_PROVIDER_NOT_CONFIGURED"),
            instructions="ERROR: LaunchDarkly AI Config not found or disabled.",
        )
        
        agent_config = ai_client.agent_config(
            LAUNCHDARKLY_AGENT_KEY,
            context,
            default_value=default_agent,
            variables={}
        )
        
        # Convert to dict to access nested values
        agent_dict = agent_config.to_dict() if hasattr(agent_config, 'to_dict') else {}
        
        # Extract configuration from agent config
        model_config = agent_dict.get("model", {})
        parameters = model_config.get("parameters", {})
        custom = model_config.get("custom", {})
        
        # Extract tool configurations to get Lambda function names
        tools = parameters.get("tools", [])
        lambda_functions = {}
        
        for tool in tools:
            if isinstance(tool, dict):
                tool_name = tool.get("name", "")
                tool_custom = tool.get("custom", {})
                
                # Extract Lambda function names from tool configs
                if "get_inventory" in tool_name:
                    lambda_functions["inventory_lambda"] = tool_custom.get("lambda_function_name", 
                                                                            "team-PetStoreInventoryManagementFunction")
                elif "get_user" in tool_name:
                    lambda_functions["user_lambda"] = tool_custom.get("lambda_function_name", 
                                                                      "team-PetStoreUserManagementFunction")
        
        # Build configuration dictionary
        config = {
            "aws_region": custom.get("aws_region", os.getenv("AWS_DEFAULT_REGION", "us-east-1")),
            "lambda_functions": lambda_functions,
            "llamaindex": {
                "storage_dir": custom.get("llamaindex", {}).get("storage_dir", LLAMAINDEX_STORAGE_DIR),
                "data_dir": custom.get("llamaindex", {}).get("data_dir", LLAMAINDEX_DATA_DIR),
                "chunk_size": custom.get("llamaindex", {}).get("chunk_size", 1024),
                "chunk_overlap": custom.get("llamaindex", {}).get("chunk_overlap", 200),
                "similarity_top_k": custom.get("llamaindex", {}).get("similarity_top_k", 5)
            },
            "model": {
                "name": agent_config.model.name,
                "provider": agent_config.provider.name,
                "temperature": parameters.get("temperature", 0.7),
                "max_tokens": parameters.get("max_tokens", 4096)
            },
            "tools_enabled": [tool.get("name") for tool in tools if isinstance(tool, dict) and tool.get("name")]
        }
        
        print("✅ Successfully fetched configuration from LaunchDarkly")
        print(f"   Model: {config['model']['name']} ({config['model']['provider']})")
        print(f"   AWS Region: {config['aws_region']}")
        print(f"   LlamaIndex Storage: {config['llamaindex']['storage_dir']}")
        print(f"   Enabled Tools: {', '.join(config['tools_enabled'])}")
        
    except Exception as e:
        print(f"❌ Error fetching LaunchDarkly config: {str(e)}")
        print("Using default configuration values")
        
        # Default configuration as fallback
        config = {
            "aws_region": os.getenv("AWS_DEFAULT_REGION", "us-east-1"),
            "lambda_functions": {},
            "llamaindex": {
                "storage_dir": LLAMAINDEX_STORAGE_DIR,
                "data_dir": LLAMAINDEX_DATA_DIR,
                "chunk_size": 1024,
                "chunk_overlap": 200,
                "similarity_top_k": 5
            },
            "model": {
                "name": "anthropic.claude-3-5-sonnet-20241022-v2:0",
                "provider": "bedrock",
                "temperature": 0.7,
                "max_tokens": 4096
            },
            "tools_enabled": ["retrieve_product_info", "retrieve_pet_care", "get_inventory", 
                             "get_user_by_email", "get_user_by_id"]
        }
    
    return config

# Fetch configuration from LaunchDarkly
agent_config = fetch_launchdarkly_config()

# Extract key values for use in the notebook
aws_region = agent_config.get("aws_region", "us-east-1")
llamaindex_config = agent_config.get("llamaindex", {})
lambda_functions = agent_config.get("lambda_functions", {})
tools_enabled = agent_config.get("tools_enabled", [])

✅ Successfully fetched configuration from LaunchDarkly
   Model: amazon.nova-pro-v1:0 (Bedrock)
   AWS Region: us-east-1
   LlamaIndex Storage: ./storage
   Enabled Tools: retrieve_product_info, get_inventory, retrieve_pet_care, get_user_by_email, get_user_by_id


## Step 4: Verify agent code requirements

We'll check that necessary files for the agent exist in the expected locations before proceeding with the deployment.

In [34]:
# Verify AgentCore entrypoint exists
agentcore_entrypoint_file = Path(f"{Agent_Directory_Name}/agentcore_entrypoint.py")
if agentcore_entrypoint_file.exists():
    print(f"AgentCore entrypoint found at {agentcore_entrypoint_file}")
else:
    print(f"AgentCore entrypoint not found at {agentcore_entrypoint_file}")

# Verify requirements file exists
requirements_file = Path(f"{Agent_Directory_Name}/requirements.txt")
if requirements_file.exists():
    print(f"Requirements file found at {requirements_file}")
else:
    print(f"Requirements file not found at {requirements_file}")

AgentCore entrypoint found at agentcore_entrypoint.py
Requirements file found at requirements.txt


##### Step 4: Create Dockerfile for Agent

We need to create a Dockerfile to package the agent for deployment to AgentCore Runtime. This Dockerfile will include all necessary dependencies and configuration.

In [35]:
def create_dockerfile():
    '''Create a Dockerfile for AgentCore Runtime with LaunchDarkly and LlamaIndex support'''
    dockerfile_content = f'''
FROM --platform=linux/arm64 public.ecr.aws/docker/library/python:3.12-slim-bookworm

WORKDIR /app

# Copy requirements and install dependencies
COPY {Agent_Directory_Name}/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install bedrock-agentcore
RUN pip install aws-opentelemetry-distro

# Copy agent code and tools
COPY {Agent_Directory_Name}/*.py ./

# Copy LlamaIndex storage directory if it exists
COPY {Agent_Directory_Name}/storage ./storage
COPY {Agent_Directory_Name}/data ./data

# Set default AWS region
ENV AWS_DEFAULT_REGION=${{AWS_DEFAULT_REGION}}

# LaunchDarkly Configuration
ENV LAUNCHDARKLY_SDK_KEY=${{LAUNCHDARKLY_SDK_KEY}}
ENV LAUNCHDARKLY_AGENT_KEY=${{LAUNCHDARKLY_AGENT_KEY}}

# LlamaIndex Configuration
ENV LLAMAINDEX_STORAGE_DIR=${{LLAMAINDEX_STORAGE_DIR}}
ENV LLAMAINDEX_DATA_DIR=${{LLAMAINDEX_DATA_DIR}}

# Optional Lambda Function Names (can be overridden by LaunchDarkly at runtime)
ENV INVENTORY_LAMBDA=${{INVENTORY_LAMBDA}}
ENV USER_LAMBDA=${{USER_LAMBDA}}

# Enable fallback tools for local testing
ENV ENABLE_FALLBACK_TOOLS=${{ENABLE_FALLBACK_TOOLS}}

# OpenTelemetry Configuration for AWS CloudWatch GenAI Observability
ENV OTEL_PYTHON_DISTRO=aws_distro
ENV OTEL_PYTHON_CONFIGURATOR=aws_configurator
ENV OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
ENV OTEL_TRACES_EXPORTER=otlp
ENV OTEL_EXPORTER_OTLP_LOGS_HEADERS=x-aws-log-group=agents/langgraph-agent-logs,x-aws-log-stream=default,x-aws-metric-namespace=agents
ENV OTEL_RESOURCE_ATTRIBUTES=service.name=langgraph-agent
ENV AGENT_OBSERVABILITY_ENABLED=true

# Expose the port that AgentCore Runtime expects
EXPOSE 8080

# Run the agent
CMD ["opentelemetry-instrument", "python", "agentcore_entrypoint.py"]
'''
    
    # Write the Dockerfile
    dockerfile_path = Path("Dockerfile")
    with open(dockerfile_path, 'w') as f:
        f.write(dockerfile_content)
    
    return dockerfile_path

# Create the Dockerfile
dockerfile_path = create_dockerfile()
print(f"Dockerfile created at {dockerfile_path}")

Dockerfile created at Dockerfile


## Step 5: Build Docker image using SageMaker Docker Build CLI

Use the SageMaker Docker Build CLI to build and push our Docker image to Amazon ECR. This tool handles the Docker build process in the background using AWS CodeBuild.

In [36]:
# Define required functions for building arm64 container image

Position = collections.namedtuple("Position", ["timestamp", "skip"])

def _log_stream(client, log_group, stream_name, position):
    start_time, skip = position
    next_token = None
    event_count = 1
    while event_count > 0:
        token_arg = {"nextToken": next_token} if next_token else {}
        response = client.get_log_events(
            logGroupName=log_group, logStreamName=stream_name,
            startTime=start_time, startFromHead=True, **token_arg
        )
        next_token = response["nextForwardToken"]
        events = response["events"]
        event_count = len(events)
        if event_count > skip:
            events = events[skip:]
            skip = 0
        else:
            skip = skip - event_count
            events = []
        for ev in events:
            ts, count = position
            if ev["timestamp"] == ts:
                position = Position(timestamp=ts, skip=count + 1)
            else:
                position = Position(timestamp=ev["timestamp"], skip=1)
            yield ev, position

def _logs_for_build(build_id, session, wait=False, poll=10):
    codebuild = session.client("codebuild")
    description = codebuild.batch_get_builds(ids=[build_id])["builds"][0]
    status = description["buildStatus"]
    log_group = description["logs"].get("groupName")
    stream_name = description["logs"].get("streamName")
    position = Position(timestamp=0, skip=0)
    config = botocore.config.Config(retries={"max_attempts": 15})
    client = session.client("logs", config=config)

    while log_group is None and status == "IN_PROGRESS":
        time.sleep(poll)
        description = codebuild.batch_get_builds(ids=[build_id])["builds"][0]
        log_group = description["logs"].get("groupName")
        stream_name = description["logs"].get("streamName")
        status = description["buildStatus"]

    last_describe_job_call = time.time()
    dot_printed = False
    dot = True

    while True:
        for event, position in _log_stream(client, log_group, stream_name, position):
            print(event["message"].rstrip())
            if dot:
                dot = False
                if dot_printed:
                    print()

        if not wait or status != "IN_PROGRESS":
            break

        time.sleep(poll)
        if dot:
            print(".", end="")
            sys.stdout.flush()
            dot_printed = True

        if time.time() - last_describe_job_call >= 30:
            description = codebuild.batch_get_builds(ids=[build_id])["builds"][0]
            status = description["buildStatus"]
            last_describe_job_call = time.time()
            if status != "IN_PROGRESS":
                print()
                break

def build_arm64_image(role, bucket, repository_name, verbose=False):
    # Use default session (which now has credentials from environment)
    session = boto3.Session()
    account_id = session.client("sts").get_caller_identity()["Account"]
    region = session.region_name or 'us-east-1'

    # Upload source code
    random_suffix = "".join(random.choices(string.ascii_letters, k=16))
    key = f"codebuild-{random_suffix}.zip"

    with tempfile.TemporaryFile() as tmp:
        with zipfile.ZipFile(tmp, "w") as zip:
            for dirname, _, filelist in os.walk("."):
                # Skip hidden directories and __pycache__
                if '/.git' in dirname or '__pycache__' in dirname or '.ipynb_checkpoints' in dirname:
                    continue
                for file in filelist:
                    # Skip hidden files and checkpoints
                    if not file.startswith('.'):
                        zip.write(f"{dirname}/{file}")

            # Read the buildspec from the test file we created
            with open('test_buildspec.yml', 'r') as f:
                buildspec_content = f.read()

            # Write buildspec to zip
            zip.writestr("buildspec.yml", buildspec_content)

        tmp.seek(0)
        session.client("s3").upload_fileobj(tmp, bucket, key)

    # Create ECR repo
    try:
        session.client("ecr").create_repository(repositoryName=repository_name)
        print(f"Created ECR repository {repository_name}")
    except:
        print(f"ECR repository {repository_name} already exists")

    # Create and run CodeBuild project
    project_name = f"build-{random_suffix}"
    codebuild = session.client("codebuild")

    codebuild.create_project(
        name=project_name,
        source={"type": "S3", "location": f"{bucket}/{key}"},
        artifacts={"type": "NO_ARTIFACTS"},
        environment={
            "type": "ARM_CONTAINER",
            "image": "aws/codebuild/amazonlinux2-aarch64-standard:3.0",
            "computeType": "BUILD_GENERAL1_SMALL",
            "environmentVariables": [
                {"name": "AWS_DEFAULT_REGION", "value": region},
                {"name": "AWS_ACCOUNT_ID", "value": account_id},
                {"name": "IMAGE_REPO_NAME", "value": repository_name},
                {"name": "IMAGE_TAG", "value": "latest"},
            ],
            "privilegedMode": True,
        },
        serviceRole=f"arn:aws:iam::{account_id}:role/{role}",
    )

    build_id = codebuild.start_build(projectName=project_name)["build"]["id"]
    print(f"Starting build {build_id} with verbose={verbose}")

    if verbose:
        # Stream logs using library implementation
        _logs_for_build(build_id, session, wait=True)
    else:
        # Just wait for completion without streaming logs
        while True:
            build_info = codebuild.batch_get_builds(ids=[build_id])["builds"][0]
            status = build_info["buildStatus"]
            if status != "IN_PROGRESS":
                break
            print(".", end="", flush=True)
            time.sleep(30)
        print()

    # Get final status
    build_info = codebuild.batch_get_builds(ids=[build_id])["builds"][0]
    status = build_info["buildStatus"]
    print(f"Build complete, status = {status}")

    # Cleanup
    codebuild.delete_project(name=project_name)
    session.client("s3").delete_object(Bucket=bucket, Key=key)

    if status == "SUCCEEDED":
        ecr_uri = f"{account_id}.dkr.ecr.{region}.amazonaws.com/{repository_name}:latest"
        print(f"Image URI: {ecr_uri}")
        return ecr_uri
    else:
        raise Exception(f"Build failed with status: {status}")

print("✅ Build functions defined successfully - using external buildspec file")

✅ Build functions defined successfully - using external buildspec file


In [37]:
# Verify AWS credentials and build configuration
print("Build Configuration:")
print(f"  AWS Profile: bedrock-demo")
print(f"  Solution Role: {SolutionAccessRoleArn}")
print(f"  S3 Bucket: {CodeBucketForAutomationARN}")

# Kick start building arm64 container image
role_name = SolutionAccessRoleArn.split('/')[-1]
repository_name = "langgraph-agent-repo"

# Extract bucket name correctly from ARN
if ':::' in CodeBucketForAutomationARN:
    bucket_name = CodeBucketForAutomationARN.split(':::')[-1]
else:
    # Handle case where it's just a bucket name or different ARN format
    bucket_name = CodeBucketForAutomationARN.replace('arn:aws:s3:::', '')

print(f"  Role Name: {role_name}")
print(f"  Bucket Name: {bucket_name}")
print(f"  ECR Repository: {repository_name}")
print()

# Build the Docker image with VERBOSE OUTPUT to see errors
try:
    ecr_uri = build_arm64_image(role_name, bucket_name, repository_name, verbose=True)  # Changed to verbose=True

    # Verify the image exists in ECR
    try:
        ecr_client = boto3.client('ecr', region_name='us-east-1')
        response = ecr_client.describe_images(
            repositoryName=repository_name,
            imageIds=[{'imageTag': 'latest'}]
        )
        print(f"✅ Image verified in repository: {repository_name}")
    except Exception as e:
        print(f"⚠️ Warning verifying image: {str(e)}")

except Exception as e:
    print(f"❌ Build failed: {str(e)}")
    print("\nTroubleshooting:")
    print("1. Check the build logs above for specific errors")
    print("2. Common issues:")
    print("   - Missing files in the Docker build")
    print("   - S3 bucket permissions")
    print("   - IAM role permissions for CodeBuild")
    print("   - Docker image build errors")

Build Configuration:
  AWS Profile: bedrock-demo
  Solution Role: arn:aws:iam::955116512041:role/PetStoreAgentCoreExecutionRole
  S3 Bucket: arn:aws:s3:::petstore-build
  Role Name: PetStoreAgentCoreExecutionRole
  Bucket Name: petstore-build
  ECR Repository: langgraph-agent-repo

ECR repository langgraph-agent-repo already exists
Starting build build-DRkFeMDEilQeMhkK:dc79adfa-0467-4d9e-8976-695885663a03 with verbose=True
.[Container] 2026/01/14 04:35:52.264918 Running on CodeBuild On-demand

[Container] 2026/01/14 04:35:52.264995 Waiting for agent ping
[Container] 2026/01/14 04:35:52.365896 Waiting for DOWNLOAD_SOURCE
[Container] 2026/01/14 04:35:53.955142 Phase is DOWNLOAD_SOURCE
[Container] 2026/01/14 04:35:53.956095 CODEBUILD_SRC_DIR=/codebuild/output/src2889961814/src
[Container] 2026/01/14 04:35:53.956533 YAML location is /codebuild/output/src2889961814/src/buildspec.yml
[Container] 2026/01/14 04:35:53.958414 Setting HTTP client timeout to higher timeout for S3 source
[Container

## Step 6: Deploy AgentCore Runtime

Create the AgentCore Runtime using boto3 APIs with the Docker image we built in the previous steps.

In [38]:
# Create or Update AgentCore Runtime with LaunchDarkly configuration
existing_runtime = None
region = boto3.Session().region_name
agent_runtime_name = "LangGraphAgentCoreRuntime"
agentcore_control_client = boto3.client('bedrock-agentcore-control')

# Try to get existing agent runtime first
list_response = agentcore_control_client.list_agent_runtimes()
for runtime in list_response.get('agentRuntimes', []):
    if runtime['agentRuntimeName'] == agent_runtime_name:
        existing_runtime = runtime
        agent_runtime_id = existing_runtime['agentRuntimeId']
        agent_runtime_arn = existing_runtime['agentRuntimeArn']
        print(f"Found existing AgentCore Runtime ID: {agent_runtime_id}")

# Build environment variables from LaunchDarkly config
environment_variables = {
    "AWS_DEFAULT_REGION": aws_region,
    "LAUNCHDARKLY_SDK_KEY": LAUNCHDARKLY_SDK_KEY,
    "LAUNCHDARKLY_AGENT_KEY": LAUNCHDARKLY_AGENT_KEY,
    "LLAMAINDEX_STORAGE_DIR": llamaindex_config.get("storage_dir", "./storage"),
    "LLAMAINDEX_DATA_DIR": llamaindex_config.get("data_dir", "./data"),
    "ENABLE_FALLBACK_TOOLS": "false"  # Set to false in production
}

# Add Lambda function names if available from LaunchDarkly
if lambda_functions.get("inventory_lambda"):
    environment_variables["INVENTORY_LAMBDA"] = lambda_functions["inventory_lambda"]
if lambda_functions.get("user_lambda"):
    environment_variables["USER_LAMBDA"] = lambda_functions["user_lambda"]

print(f"Environment variables configured:")
for key, value in environment_variables.items():
    if "KEY" in key:
        print(f"  {key}: ***hidden***")
    else:
        print(f"  {key}: {value}")

if existing_runtime: # Update the existing runtime
    update_response = agentcore_control_client.update_agent_runtime(
        agentRuntimeId=agent_runtime_id,
        roleArn=SolutionAccessRoleArn,
        agentRuntimeArtifact={
            "containerConfiguration": {
                "containerUri": ecr_uri
            }
        },
        networkConfiguration={
            "networkMode": "PUBLIC"
        },
        environmentVariables=environment_variables,
        lifecycleConfiguration={
            "maxLifetime": 60
        }
    )
    print(f"Updated existing AgentCore Runtime")
else: # Create new runtime
    create_response = agentcore_control_client.create_agent_runtime(
        agentRuntimeName=agent_runtime_name,
        roleArn=SolutionAccessRoleArn,
        agentRuntimeArtifact={
            "containerConfiguration": {
                "containerUri": ecr_uri
            }
        },
        networkConfiguration={
            "networkMode": "PUBLIC"
        },
        environmentVariables=environment_variables,
        lifecycleConfiguration={
            "maxLifetime": 60
        }
    )
    agent_runtime_id = create_response['agentRuntimeId']
    agent_runtime_arn = create_response['agentRuntimeArn']
    print(f"Created new AgentCore Runtime ID: {agent_runtime_id}")

Found existing AgentCore Runtime ID: LangGraphAgentCoreRuntime-q81sPwHgst
Environment variables configured:
  AWS_DEFAULT_REGION: us-east-1
  LAUNCHDARKLY_SDK_KEY: ***hidden***
  LAUNCHDARKLY_AGENT_KEY: ***hidden***
  LLAMAINDEX_STORAGE_DIR: ./storage
  LLAMAINDEX_DATA_DIR: ./data
  ENABLE_FALLBACK_TOOLS: false
  INVENTORY_LAMBDA: team-PetStoreInventoryManagementFunction
  USER_LAMBDA: team-PetStoreUserManagementFunction
Updated existing AgentCore Runtime


In [39]:
def check_runtime_status(agent_runtime_id):
    """Check the status of the AgentCore Runtime"""
    response = agentcore_control_client.get_agent_runtime(
        agentRuntimeId=agent_runtime_id
    )
    return response['status']

# Wait for the runtime to be ready
print("Waiting for AgentCore Runtime to be ready...")
runtime_status = check_runtime_status(agent_runtime_id)
while runtime_status not in ['READY', 'CREATE_FAILED', 'DELETE_FAILED', 'UPDATE_FAILED']:
    print(f"Runtime status: {runtime_status}")
    time.sleep(10)
    runtime_status = check_runtime_status(agent_runtime_id)
print(f"Runtime status: {runtime_status}")

Waiting for AgentCore Runtime to be ready...
Runtime status: UPDATING
Runtime status: READY


## Step 7: Test Agent Runtime Deployment

Send a test prompt to the AgentCore Runtime to verify that the agent is live.

In [40]:
# Create a client for the AgentCore data plane
agentcore_client = boto3.client('bedrock-agentcore')

# Show current configuration being used
print("🔧 Agent Configuration:")
print(f"   Model: {agent_config['model']['name']} ({agent_config['model']['provider']})")
print(f"   Tools enabled: {', '.join(tools_enabled)}")
print(f"   LlamaIndex Storage: {llamaindex_config.get('storage_dir', './storage')}")
print()

# Test the AgentCore Runtime with sample queries
test_queries = [
    "What is the price of Doggy Delights?",
    "Tell me about your cat products",
    "How often should I bathe a Chihuahua?"
]

for query in test_queries:
    print(f"📝 Query: {query}")

    try:
        invoke_response = agentcore_client.invoke_agent_runtime(
            agentRuntimeArn=agent_runtime_arn,
            qualifier="DEFAULT",
            traceId=str(uuid.uuid4()),
            contentType="application/json",
            payload=json.dumps({
                "prompt": query,
                "user_id": "test_user",
                "subscription_status": "active"
            })
        )

        # Process the response
        if "text/event-stream" in invoke_response.get("contentType", ""):
            content = []
            for line in invoke_response["response"].iter_lines(chunk_size=1):
                if line:
                    line = line.decode("utf-8")
                    if line.startswith("data: "):
                        line = line[6:]
                        content.append(line)
            response_text = "\n".join(content)
        else:
            events = []
            for event in invoke_response.get("response", []):
                events.append(event)

            # Combine all events to fix truncation
            combined_content = ""
            for event in events:
                combined_content += event.decode("utf-8")

            response_text = json.loads(combined_content)

        print(f"✅ Response:")
        if isinstance(response_text, dict):
            print(json.dumps(response_text, indent=2))
        else:
            print(response_text)
        print("-" * 50)

    except Exception as e:
        print(f"❌ Error: {str(e)}")
        print("-" * 50)

print("\n✨ Agent is using LlamaIndex for retrieval - no Knowledge Base IDs required!")
print(f"   RAG tools available: retrieve_product_info, retrieve_pet_care")
print(f"   Lambda tools available: get_inventory, get_user_by_email, get_user_by_id")

🔧 Agent Configuration:
   Model: amazon.nova-pro-v1:0 (Bedrock)
   Tools enabled: retrieve_product_info, get_inventory, retrieve_pet_care, get_user_by_email, get_user_by_id
   LlamaIndex Storage: ./storage

📝 Query: What is the price of Doggy Delights?
✅ Response:
{
  "status": "Accept",
  "message": "Here is the price of Doggy Delights.",
  "customerType": "Guest",
  "items": [
    {
      "productId": "DD006",
      "price": 54.99,
      "quantity": 1,
      "bundleDiscount": 0,
      "total": 54.99,
      "replenishInventory": false
    }
  ],
  "shippingCost": 14.95,
  "petAdvice": "",
  "subtotal": 54.99,
  "additionalDiscount": 0,
  "total": 69.94
}
--------------------------------------------------
📝 Query: Tell me about your cat products
✅ Response:
{
  "status": "Accept",
  "message": "Here are our cat products:",
  "customerType": "Guest",
  "items": [
    {
      "productId": "KC015",
      "price": 129.99,
      "quantity": 1,
      "bundleDiscount": 0,
      "total": 129.9

## Step 8: Cleanup Resources (Optional)

Clean up the AWS resources created in this notebook to avoid incurring unnecessary charges by uncommenting the last line.

In [41]:
def cleanup_resources():
    '''Clean up AWS resources created in this notebook.'''  
    # Delete the AgentCore Runtime
    try:
        agentcore_control_client.delete_agent_runtime(
            agentRuntimeId=agent_runtime_id
        )
        print(f"Initiated deletion of AgentCore Runtime: {agent_runtime_id}")
    except Exception as e:
        print(f"Error deleting AgentCore Runtime: {agent_runtime_id}")
    
    # Delete the ECR repository
    try:
        ecr_client = boto3.client('ecr')
        ecr_client.delete_repository(
            repositoryName=repository_name,
            force=True  # Force deletion even if it contains images
        )
        print(f"Deleted ECR repository: {repository_name}")
    except Exception as e:
        print(f"Error deleting ECR repository: {repository_name}")
    
    # Remove the Dockerfile
    try:
        if os.path.exists('Dockerfile'):
            os.remove('Dockerfile')
            print("Deleted Dockerfile")
    except Exception as e:
        print(f"Error deleting Dockerfile")

# Uncomment the line below to clean up resources
# cleanup_resources()

## Summary

In this notebook, we demonstrated how to:

1. Package the agent into a Docker container
2. Deploy the agent to AgentCore Runtime using direct boto3 API calls
3. Test the deployed agent using AgentCore Runtime Endpoint