## Prerequisites

To execute this tutorial you will need:
* Python 3.10+
* AWS credentials
* Amazon Bedrock AgentCore SDK
* Strands Agents
* Docker running

In [None]:
%pip install --force-reinstall -U -r requirements.txt --quiet

## Configure Inbound Auth with a Custom JWT IdP

You will need:
- The IdP discovery URL (OIDC .well-known endpoint)
- Allowed clients or audience (see `CUSTOM_IDP_ALLOWED_CLIENTS` / `JWT_AUDIENCE`)
- A valid JWT for a test user

Tip: put these in `.env` so the notebook can load them.

If your JWT uses `aud` and does not include `client_id`, set `JWT_AUDIENCE` in `.env` so the runtime uses `allowedAudience`.


In [None]:
import sys
import os
import subprocess
from boto3.session import Session
from dotenv import dotenv_values, load_dotenv
print(os.getcwd())
vals = dotenv_values(".env")
os.environ["CUSTOM_IDP_DISCOVERY_URL"] = vals["CUSTOM_IDP_DISCOVERY_URL"]
os.environ["CUSTOM_IDP_ALLOWED_CLIENTS"] = vals["CUSTOM_IDP_ALLOWED_CLIENTS"]
os.environ["CUSTOM_IDP_BEARER_TOKEN"] = vals["CUSTOM_IDP_BEARER_TOKEN"]
if vals.get("GOOGLE_CLIENT_ID"):
    os.environ["GOOGLE_CLIENT_ID"] = vals["GOOGLE_CLIENT_ID"]
if vals.get("GOOGLE_CLIENT_SECRET"):
    os.environ["GOOGLE_CLIENT_SECRET"] = vals["GOOGLE_CLIENT_SECRET"]
if vals.get("JWT_AUDIENCE"):
    os.environ["JWT_AUDIENCE"] = vals["JWT_AUDIENCE"]
# Get the current notebook's directory
current_dir = os.path.dirname(os.path.abspath('__file__' if '__file__' in globals() else '.'))

# Set your custom IdP config here or via environment variables
CUSTOM_IDP_DISCOVERY_URL = os.environ.get(
    "CUSTOM_IDP_DISCOVERY_URL",
    "<https://your-idp/.well-known/openid-configuration>"
)
CUSTOM_IDP_ALLOWED_CLIENTS = os.environ.get(
    "CUSTOM_IDP_ALLOWED_CLIENTS",
    "<client-id-or-audience>"
)
CUSTOM_IDP_BEARER_TOKEN = os.environ.get(
    "CUSTOM_IDP_BEARER_TOKEN",
    "<paste-jwt-here>"
)

# Normalize allowed clients to a list
CUSTOM_IDP_ALLOWED_CLIENTS = [c.strip() for c in CUSTOM_IDP_ALLOWED_CLIENTS.split(",") if c.strip()]

boto_session = Session()
region = boto_session.region_name
print(f"Region: {region}")
print(f"CUSTOM_IDP_DISCOVERY_URL: {CUSTOM_IDP_DISCOVERY_URL}")
print(f"CUSTOM_IDP_ALLOWED_CLIENTS: {CUSTOM_IDP_ALLOWED_CLIENTS}")


# Outbound Auth

Outbound Auth allows agents and the AgentCore Gateway to securely access AWS resources and third-party services on behalf of users who have been authenticated and authorized during Inbound Auth. To integrate authorization with an AWS resource or third-party service, it's necessary to configure both Inbound Auth and Outbound Auth.

With just-enough access and secure permission delegation supported by AgentCore Identity, agents can seamlessly and securely access AWS resources and third-party tools such as GitHub, Google, Salesforce, and Slack. Agents can perform actions on these services either on behalf of users or independently, provided there is pre-authorized user consent. Additionally, you can reduce consent fatigue using a secure token vault and create streamlined AI agent experiences.

## Outbound Authentication Configuration

First, you register your client application with third-party providers and then create an Outbound Auth. You specify how you want to validate access to the AWS resource or third-party service or AgentCore Gateway targets. You can use OAuth 2LO/3LO or API keys. With OAuth, you can select from providers that AgentCore Identity provides. In which case you enter the configuration details for the providers from AgentCore Identity. Alternatively, you can supply details for a custom provider.

When a user wants access to an AWS resource or third-party service or AgentCore Gateway target, the Outbound Auth confirms that the access tokens provided by Incoming Auth are valid and if so, allows access to the resource.

<div style="text-align:center">
    <img src="images/outbound_auth.png" width="90%"/>
</div>


Here are the various parameters you can use with the @require_access_token decorator.


| Parameter Name      | Description                                                              |
|:--------------------|:-------------------------------------------------------------------------|
| provider_name       | The credential provider name                                             |
| into                | Parameter name to inject the token into                                  |
| scopes              | OAuth2 scopes to request                                                 |
| on_auth_url	      | Callback for handling authorization URLs                                 |
| auth_flow           | Authentication flow type ("M2M" or "USER_FEDERATION")                    |
| callback_url        | OAuth2 callback URL                                                      |
| force_authentication| Force re-authentication                                                  |
| token_poller        | Custom token poller implementation                                       |




# Hosting Strands Agents in Amazon Bedrock AgentCore Runtime

## Overview


In this tutorial we will develop a scheduling agent using Strands agents that can list the events from the users Google Calendar. We will configure a credential provider to help with credential management with Google. We can use the named provider for Google and modify our agent code to call the credential provider and use the access_token to get the users calendar events or schedule from Google.

### Tutorial Architecture

<div style="text-align:center">
    <img src="images/outbound_auth_3lo.png" width="90%"/>
</div>


### Tutorial Details

| Information         | Details                                                                  |
|:--------------------|:-------------------------------------------------------------------------|
| Tutorial type       | Conversational                                                           |
| Agent type          | Single                                                                   |
| Agentic Framework   | Strands Agents                                                           |
| LLM model           | Anthropic Claude Haiku 4.5                                              |
| Tutorial components | Hosting agent on AgentCore Runtime. Using Strands Agent and Claude Model |
| Tutorial vertical   | Cross-vertical                                                           |
| Example complexity  | Medium                                                                   |
| SDK used            | Amazon BedrockAgentCore Python SDK and boto3                             |
| Credential Provider | Type : OAuth2 - Google Provider                                          |


### Tutorial Key Features

* Hosting Agents on Amazon Bedrock AgentCore Runtime
* Using Claude models
* Using Strands Agents
* Using AgentCore egress Auth with OAuth2 Google credential provider.


In [None]:
# Confirm you have set the required values
if "<" in CUSTOM_IDP_DISCOVERY_URL or not CUSTOM_IDP_ALLOWED_CLIENTS:
    raise ValueError(
        "Please set CUSTOM_IDP_DISCOVERY_URL and CUSTOM_IDP_ALLOWED_CLIENTS before continuing."
    )

if "<" in CUSTOM_IDP_BEARER_TOKEN:
    print("CUSTOM_IDP_BEARER_TOKEN is still a placeholder; update it before invoking.")
else:
    print("Custom IdP JWT detected ✓")


In [None]:
# Quick JWT sanity check (no signature verification)
import base64
import json as _json
from datetime import datetime, timezone

token = CUSTOM_IDP_BEARER_TOKEN.strip()
parts = token.split(".")

def _b64url_decode(data: str) -> bytes:
    padding = '=' * (-len(data) % 4)
    return base64.urlsafe_b64decode(data + padding)

if "<" in token:
    raise ValueError("Set CUSTOM_IDP_BEARER_TOKEN before decoding.")
if len(parts) < 2:
    raise ValueError("Unexpected token format; expected a JWT.")

header_b64, payload_b64, *_ = parts
header = _json.loads(_b64url_decode(header_b64))
payload = _json.loads(_b64url_decode(payload_b64))
print('header:', header)
print('aud:', payload.get('aud'))

exp = payload.get('exp')
print('exp (epoch):', exp)
if exp:
    exp_dt = datetime.fromtimestamp(exp, tz=timezone.utc)
    now_dt = datetime.now(tz=timezone.utc)
    remaining = exp_dt - now_dt
    print(f"exp (utc): {exp_dt.isoformat()}")
    print(f"time remaining: {remaining}")
    if remaining.total_seconds() <= 0:
        raise ValueError("JWT is expired. Refresh the token and re-run this cell.")


## Configure Google for OAuth2 (User Consent)

You will need a Google Cloud project with the Calendar API enabled and an OAuth client ID/secret.
Summary:
- Create a project and enable Google Calendar API
- Configure OAuth consent screen
- Create OAuth client (Web application)
- Add your Google account as a test user (if External)
- Capture Client ID and Client Secret


## OAuth2 Authorization URL Session Binding Process

Session binding ensures the OAuth consent is tied to the same user who initiated the request.

High-level flow:
1. Agent asks AgentCore Identity for an authorization URL.
2. User completes consent with the provider (Google).
3. Provider redirects to your callback with `session_id`.
4. Your callback calls `CompleteResourceTokenAuth` with the user identifier.

For local development, this notebook uses `oauth2_callback_server.py` on `localhost:9090`.
It stores the user token identifier and completes the session binding.


**Note on the callback server**: The local callback server in this notebook is a minimal
sample and is not production-ready. See `CALLBACK_SERVER_NOTES.md` for details.


In [None]:
import os
print("GOOGLE_CLIENT_ID set:", os.environ.get("GOOGLE_CLIENT_ID"))
print("GOOGLE_CLIENT_SECRET set:", os.environ.get("GOOGLE_CLIENT_SECRET"))
print("id len:", len(os.environ.get("GOOGLE_CLIENT_ID","")))
print("secret len:", len(os.environ.get("GOOGLE_CLIENT_SECRET","")))
print("id repr:", repr(os.environ.get("GOOGLE_CLIENT_ID","")[:5]))
print("secret repr:", repr(os.environ.get("GOOGLE_CLIENT_SECRET","")[:5]))

In [None]:
from bedrock_agentcore.services.identity import IdentityClient

identity_client = IdentityClient(region)
print (os.environ.get("GOOGLE_CLIENT_ID"))
google_client_id = (os.environ.get("GOOGLE_CLIENT_ID") or "").strip()
google_client_secret = (os.environ.get("GOOGLE_CLIENT_SECRET") or "").strip()

print("GOOGLE_CLIENT_ID set:", bool(google_client_id))
print("GOOGLE_CLIENT_SECRET set:", bool(google_client_secret))

if not google_client_id or not google_client_secret:
    raise ValueError("Set GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET in your .env before continuing.")

# Configure Google OAuth2 provider - On-Behalf-Of User
google_provider = identity_client.create_oauth2_credential_provider({
    "name": "google-cal-provider",
    "credentialProviderVendor": "GoogleOauth2",
    "oauth2ProviderConfigInput": {
        "googleOauth2ProviderConfig": {
            "clientId": google_client_id,
            "clientSecret": google_client_secret,
        }
    }
})
print(google_provider)
print("\n")
print(f"callbackUrl: {google_provider['callbackUrl']}")


## Update the callback url on the Google/OAuth 2.0 client. 
Navigate back to the [Google Developer Console](https://console.developers.google.com/)

1. Select the Project in Google Developer Console
    1.    Select the project created earlier.
2. Update the callback uri.
    1.    Go to APIs & Services from the left hand menu and then select  > Credentials.
    2.    Click the client under "OAuth 2.0 Client IDs"
    3.    Under "Authorised redirect URIs", enter the callback url from the previous step. The callback url was printed so you can easily copy it form the previous step.
    4.    Click Save.

## Preparing your agent for deployment on AgentCore Runtime

### Strands Agent with a model hosted on Amazon Bedrock
Here is a Strands agent code that includes the following :
1. Creates a new tool called "get_calendar_events_today", to get the events from your Google Calendar for today
2. Uses the Credential provider created in the previous step to fetch the access_token from Google. This includes the user consent flow where the consent is sent to the user for approval as part of the 3LO flow.
3. The Strands agent calls the tool for any user requests related to the users agenda.

In [None]:
# Get the OAuth2 callback URL based on the current environment (notebook/SageMaker)
# This is evaluated HERE in the notebook, not in the agent container
from oauth2_callback_server import get_oauth2_callback_url
oauth2_callback_url_for_agent = get_oauth2_callback_url()

print(f"Callback URL for agent (determined from notebook environment): {oauth2_callback_url_for_agent}")

# Write strands_claude_google_3lo.py with the callback URL embedded as a string literal
google_agent_code = f'''import os
import datetime
import json
import asyncio
import traceback
from typing import Dict, Any, Optional, AsyncGenerator

from strands import Agent, tool
from bedrock_agentcore.runtime import BedrockAgentCoreApp
from bedrock_agentcore.identity.auth import requires_access_token

from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

# Environment configuration
os.environ["STRANDS_OTEL_ENABLE_CONSOLE_EXPORT"] = "true"
os.environ["OTEL_PYTHON_EXCLUDED_URLS"] = "/ping,/invocations"

# Required OAuth2 scope for Google Calendar API
SCOPES = ["https://www.googleapis.com/auth/calendar.readonly"]

# Global variable to store the access token
google_access_token: Optional[str] = None


@tool(
    name="Get_calendar_events_today",
    description="Retrieves the calendar events for the day from your Google Calendar"
)
def get_calendar_events_today() -> str:
    """
    Retrieve calendar events for today from Google Calendar.
    
    Returns:
        str: JSON string containing events or error information
    """
    global google_access_token
    
    # Check if we already have a token
    if not google_access_token:
        return json.dumps({{
            "auth_required": True,
            "message": "Google Calendar authentication is required. Please wait while we set up the authorization.",
            "events": []
        }})
    
    # Create credentials from the provided access token
    creds = Credentials(token=google_access_token, scopes=SCOPES)
    try:
        service = build("calendar", "v3", credentials=creds)
        
        # Calculate today's time range
        today_start = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
        today_end = today_start.replace(hour=23, minute=59, second=59)
        
        # Format with CDT timezone (-05:00)
        time_min = today_start.strftime('%Y-%m-%dT00:00:00-05:00')
        time_max = today_end.strftime('%Y-%m-%dT23:59:59-05:00')
        
        events_result = (
            service.events()
            .list(
                calendarId="primary",
                timeMin=time_min,
                timeMax=time_max,
                singleEvents=True,
                orderBy="startTime",
            )
            .execute()
        )
        events = events_result.get("items", [])

        if not events:
            return json.dumps({{"events": []}})

        return json.dumps({{"events": events}})
        
    except HttpError as error:
        error_message = str(error)
        return json.dumps({{"error": error_message, "events": []}})
    except Exception as e:
        error_message = str(e)
        return json.dumps({{"error": error_message, "events": []}})


# Initialize the agent with tools and your preferred model choice
agent = Agent(
    model="global.anthropic.claude-haiku-4-5-20251001-v1:0",
    tools=[get_calendar_events_today]
)

# Initialize app and streaming queue
app = BedrockAgentCoreApp()


class StreamingQueue:
    """A queue for streaming responses asynchronously."""
    
    def __init__(self):
        self.finished = False
        self.queue = asyncio.Queue()
        
    async def put(self, item: str) -> None:
        """Add an item to the queue."""
        await self.queue.put(item)

    async def finish(self) -> None:
        """Mark the queue as finished."""
        self.finished = True
        await self.queue.put(None)

    async def stream(self) -> AsyncGenerator[str, None]:
        """Stream items from the queue."""
        while True:
            item = await self.queue.get()
            if item is None and self.finished:
                break
            yield item


queue = StreamingQueue()


async def on_auth_url(url: str) -> None:
    """Handle authorization URL callback."""
    print(f"Authorization url: {{url}}")
    await queue.put(f"Authorization url: {{url}}")


async def agent_task(user_message: str) -> None:
    """
    Execute the agent task with authentication handling.
    
    Args:
        user_message: The user's input message
    """
    try:
        await queue.put("Begin agent execution")
        
        # Call the agent first to see if it needs authentication
        response = agent(user_message)
        
        # Extract text content from the response structure
        response_text = ""
        if isinstance(response.message, dict):
            content = response.message.get('content', [])
            if isinstance(content, list):
                for item in content:
                    if isinstance(item, dict) and 'text' in item:
                        response_text += item['text']
        else:
            response_text = str(response.message)
        
        # Check if the response indicates authentication is required
        auth_keywords = [
            "authentication", "authorize", "authorization", "auth", 
            "sign in", "login", "access", "permission", "credential",
            "need authentication", "requires authentication"
        ]
        needs_auth = any(keyword.lower() in response_text.lower() for keyword in auth_keywords)
       
        if needs_auth:
            await queue.put("Authentication required for Google Calendar access. Starting authorization flow...")
            
            # Trigger the 3LO authentication flow
            try:
                global google_access_token
                google_access_token = await need_token_3lo_async(access_token='')
                await queue.put("Authentication successful! Retrying calendar request...")
                
                # Retry the agent call now that we have authentication
                response = agent(user_message)
            except Exception as auth_error:
                print(f"auth_error: {{auth_error}}")
                await queue.put(f"Authentication failed: {{str(auth_error)}}")
        
        await queue.put(response.message)
        await queue.put("End agent execution")
    except Exception as e:
        await queue.put(f"Error: {{str(e)}}")
    finally:
        await queue.finish()


@requires_access_token(
    provider_name="google-cal-provider",
    scopes=SCOPES,
    auth_flow='USER_FEDERATION',
    on_auth_url=on_auth_url,
    force_authentication=True,
    callback_url="{oauth2_callback_url_for_agent}"  # ← Callback URL determined from notebook environment
)
async def need_token_3lo_async(*, access_token: str) -> str:
    """
    Handle 3-legged OAuth token retrieval.
    
    Args:
        access_token: The OAuth access token
        
    Returns:
        str: The access token
    """
    global google_access_token
    google_access_token = access_token
    return access_token


@app.entrypoint
async def agent_invocation(payload: Dict[str, Any]) -> AsyncGenerator[str, None]:
    """
    Main entrypoint for agent invocations.
    
    Args:
        payload: The request payload containing the prompt
        
    Yields:
        str: Streaming response items
    """
    user_message = payload.get(
        "prompt", 
        "No prompt found in input, please guide customer to create a json payload with prompt key"
    )
    
    # Create and start the agent task
    task = asyncio.create_task(agent_task(user_message))
    
    # Return the stream, but ensure the task runs concurrently
    async def stream_with_task() -> AsyncGenerator[str, None]:
        # Stream results as they come
        async for item in queue.stream():
            yield item
        
        # Ensure the task completes
        await task
    
    return stream_with_task()


if __name__ == "__main__":
    app.run()
'''

# Write the file
with open("strands_claude_google_3lo.py", "w") as f:
    f.write(google_agent_code)

print("✅ strands_claude_google_3lo.py written successfully with embedded callback URL")

## Deploying the agent to AgentCore Runtime
The CreateAgentRuntime operation supports comprehensive configuration options, letting you specify container images, environment variables and encryption settings. You can also configure protocol settings (HTTP, MCP) and authorization mechanisms to control how your clients communicate with the agent.

Note: Operations best practice is to package code as container and push to ECR using CI/CD pipelines and IaC

In this tutorial can will the Amazon Bedrock AgentCode Python SDK to easily package your artifacts and deploy them to AgentCore runtime.


### Configure AgentCore Runtime deployment

Next we will use our starter toolkit to configure the AgentCore Runtime deployment with an entrypoint, the execution role we just created and a requirements file. We will also configure the starter kit to auto create the Amazon ECR repository on launch.

During the configure step, your docker file will be generated based on your application code

Note : The authorizer_configuration is configured for Inbound Auth with your custom JWT IdP.

<div style="text-align:left">
    <img src="images/configure.png" width="40%"/>
</div>

In [None]:
from bedrock_agentcore_starter_toolkit.notebook.runtime.bedrock_agentcore import Runtime

discovery_url = CUSTOM_IDP_DISCOVERY_URL
allowed_clients = CUSTOM_IDP_ALLOWED_CLIENTS
jwt_audience = os.environ.get("JWT_AUDIENCE")

authorizer_configuration = {"customJWTAuthorizer": {"discoveryUrl": discovery_url}}
if jwt_audience:
    authorizer_configuration["customJWTAuthorizer"]["allowedAudience"] = [jwt_audience]
else:
    authorizer_configuration["customJWTAuthorizer"]["allowedClients"] = allowed_clients

agentcore_runtime = Runtime()

response = agentcore_runtime.configure(
    entrypoint="strands_claude_google_3lo.py",
    auto_create_execution_role=True,
    auto_create_ecr=True,
    requirements_file="requirements.txt",
    region=region,
    memory_mode="NO_MEMORY",
    agent_name="strands_agent_google_3lo",
    authorizer_configuration=authorizer_configuration,
)
print(response)


## Review the AgentCore configuration

In [None]:
!cat .bedrock_agentcore.yaml

### Launching agent to AgentCore Runtime

Now that we've got a docker file, let's launch the agent to the AgentCore Runtime. This will create the Amazon ECR repository and the AgentCore Runtime

<div style="text-align:left">
    <img src="images/launch.png" width="75%"/>
</div>

In [None]:
from oauth2_callback_server import get_oauth2_callback_url

# Deploy the agent to AgentCore Runtime and get deployment details
launch_result = agentcore_runtime.launch()
print(launch_result)

if launch_result.agent_id:
    # Extract the workload name from the deployed agent's ID for identity management
    workload_name = launch_result.agent_id
    # Retrieve the current workload identity configuration from AgentCore Identity
    workload_identity = identity_client.get_workload_identity(name=workload_name)
    # Extract existing OAuth2 callback URLs that are already registered for this workload
    allowed_resource_oauth_2_return_urls = workload_identity.get("allowedResourceOauth2ReturnUrls") or []
    # Get the local OAuth2 callback server URL for session binding (localhost:9090/oauth2/callback)
    oauth2_callback_url = get_oauth2_callback_url()
    print(f"Updating workload {workload_name} with callback url {oauth2_callback_url}")

    # Register the local callback URL with the workload identity to enable OAuth2 session binding
    updated_workload_identity = identity_client.update_workload_identity(
        name=workload_name,
        allowed_resource_oauth_2_return_urls=[*allowed_resource_oauth_2_return_urls, oauth2_callback_url],
    )
    print(updated_workload_identity)

#### Add extra required policies to auto-created role

If you are using this on a new account where model has not been accessed before, you will need to add extra required policies to the auto-created role to allow the agent to access the model.

In [None]:
import json
import boto3
agentcore_control_client = boto3.client(
    'bedrock-agentcore-control',
    region_name=region
)

runtime_response = agentcore_control_client.get_agent_runtime(
    agentRuntimeId=launch_result.agent_id
)
runtime_role = runtime_response['roleArn']

policies_to_add = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "BedrockModelAccess",
            "Effect": "Allow",
            "Action": [
                "aws-marketplace:ViewSubscriptions",
                "aws-marketplace:Subscribe"
            ],
            "Resource": "*"
        }
    ]
}
iam_client = boto3.client(
    'iam',
    region_name=region
)

response = iam_client.put_role_policy(
    PolicyDocument=json.dumps(policies_to_add),
    PolicyName="outbound_policies",
    RoleName=runtime_role.split("/")[1],
)

### Checking for the AgentCore Runtime Status
Now that we've deployed the AgentCore Runtime, let's check for it's deployment status

In [None]:
import time

status_response = agentcore_runtime.status()
status = status_response.endpoint['status']
end_status = ['READY', 'CREATE_FAILED', 'DELETE_FAILED', 'UPDATE_FAILED']
while status not in end_status:
    time.sleep(10)
    status_response = agentcore_runtime.status()
    status = status_response.endpoint['status']
    print(status)
print(f"Final status: {status}")

In [None]:
import urllib.parse

agent_arn = None
if 'launch_result' in globals() and getattr(launch_result, 'agent_arn', None):
    agent_arn = launch_result.agent_arn
else:
    status_response = agentcore_runtime.status()
    agent_arn = status_response.agent.get('agentRuntimeArn')

invoke_url = (
    f"https://bedrock-agentcore.{region}.amazonaws.com/runtimes/"
    f"{urllib.parse.quote(agent_arn, safe='')}/invocations"
)
print(f"Agent ARN: {agent_arn}")
print(f"Invoke URL: {invoke_url}")


### Start the local OAuth2 callback server

Start the callback server before invoking the runtime so it can handle the
OAuth redirect and complete session binding. Leave it running while you invoke.


In [None]:
from oauth2_callback_server import wait_for_oauth2_server_to_be_ready

oauth2_callback_server_cmd = [sys.executable, 'oauth2_callback_server.py', '--region', region]
oauth2_callback_server_process = subprocess.Popen(oauth2_callback_server_cmd)

if not wait_for_oauth2_server_to_be_ready():
    raise RuntimeError('OAuth2 callback server did not start in time.')
print('OAuth2 callback server is ready at http://localhost:9090')


### Invoking AgentCore Runtime

Invoke the runtime. The agent will emit an Authorization URL if it needs Google access.
Open that URL, complete consent, then re-run the invoke if needed.


In [None]:
from oauth2_callback_server import store_token_in_oauth2_callback_server
import json
import requests
import uuid

bearer_token = CUSTOM_IDP_BEARER_TOKEN
if "<" in bearer_token:
    raise ValueError("Set CUSTOM_IDP_BEARER_TOKEN to a valid JWT before invoking.")

store_token_in_oauth2_callback_server(bearer_token)

payload = {"prompt": "What is in my agenda for today? Highlight the main events!"}
headers = {
    "Authorization": f"Bearer {bearer_token}",
    "Content-Type": "application/json",
    "X-Amzn-Bedrock-AgentCore-Runtime-Session-Id": str(uuid.uuid4()),
}

response = requests.post(invoke_url, headers=headers, data=json.dumps(payload), stream=True, timeout=300)
print("status:", response.status_code)
print("content-type:", response.headers.get("content-type"))

for raw in response.iter_lines():
  if raw:
      text = raw.decode("utf-8")
      print("LINE:", text)
      if text.startswith("data: "):
          print(text[6:])


In [None]:
# Sample Python client invoking the runtime directly (data-plane URL)
# For 3LO flows, ensure the callback server is running and user token is stored.
import json
import requests
import uuid

payload = {"prompt": "What is in my agenda for today? Highlight the main events!"}
headers = {
    "Authorization": f"Bearer {CUSTOM_IDP_BEARER_TOKEN}",
    "Content-Type": "application/json",
    "X-Amzn-Bedrock-AgentCore-Runtime-Session-Id": str(uuid.uuid4()),
}

resp = requests.post(invoke_url, headers=headers, data=json.dumps(payload), stream=True, timeout=300)
for line in resp.iter_lines():
    if not line:
        continue
    text = line.decode("utf-8")
    if text.startswith("data: "):
        print(text[6:])


In [None]:
# Sample Python client invoking via reverse proxy (proxy adds Authorization header)
import os
import json
import requests

PROXY_BASE_URL = os.environ.get("PROXY_BASE_URL", "https://agentid.ellinj.teleport.sh")

payload = {"prompt": "What is in my agenda for today? Highlight the main events!"}
headers = {
    "Content-Type": "application/json",
     "X-Amzn-Bedrock-AgentCore-Runtime-Session-Id": str(uuid.uuid4()),
}

resp = requests.post(f"{PROXY_BASE_URL}/invocations", headers=headers, data=json.dumps(payload), timeout=60)
print('status:', resp.status_code)
for line in resp.iter_lines():
    if not line:
        continue
    text = line.decode("utf-8")
    if text.startswith("data: "):
        print(text[6:])



## Cleanup (Optional)

- Stop the local OAuth2 callback server if it's running.
- Delete the AgentCore runtime and ECR repository created by this notebook.
- Uncomment and run the cells below to clean up.


In [None]:
# Stop the callback server (if started in this notebook)
try:
    oauth2_callback_server_process.terminate()
    oauth2_callback_server_process.wait(timeout=5)
    print('Callback server stopped.')
except NameError:
    print('Callback server was not started in this session.')
except Exception as exc:
    print(f'Failed to stop callback server: {exc}')


In [None]:
# Delete runtime and ECR repository
import boto3

agentcore_control_client = boto3.client('bedrock-agentcore-control', region_name=region)
ecr_client = boto3.client('ecr', region_name=region)

if 'launch_result' in globals() and getattr(launch_result, 'agent_id', None):
    agentcore_control_client.delete_agent_runtime(agentRuntimeId=launch_result.agent_id)
    print(f'Deleted runtime: {launch_result.agent_id}')
else:
    print('No launch_result.agent_id found; skipping runtime delete.')

if 'launch_result' in globals() and getattr(launch_result, 'ecr_uri', None):
    repo_name = launch_result.ecr_uri.split('/', 1)[1]
    ecr_client.delete_repository(repositoryName=repo_name, force=True)
    print(f'Deleted ECR repo: {repo_name}')
else:
    print('No launch_result.ecr_uri found; skipping ECR delete.')


## Congratulations!