# Building the AWSome Event Agent with Amazon Bedrock
---

## Overview

This notebook walks through building a complete **Agentic RAG system** that can:
- Answer questions about AWS re:Invent sessions using a Knowledge Base
- Remember user preferences and conversation history
- Provide personalized recommendations
- Run securely at scale with authentication

---

## Architecture Overview

<div style="text-align:left">
    <img src="architecture_images/image_1.png" width="80%"/>
</div>

---

## Chapters

1. **Chapter 1: The Agent** - Create an intelligent agent
2. **Chapter 2: The Knowledge Base** - Store and retrieve session information
3. **Chapter 3: The Memory** - Add long-term memory for personalization
4. **Chapter 4: Runtime** - Deploy securely at scale
5. **Chapter 5: Identity** - Add authentication with Cognito

---

## Prerequisites

**Software Requirements:**
- Python 3.10 or newer
- Jupyter notebook environment or compatible IDE
- pip package manager

**AWS Requirements:**
- AWS credentials configured with appropriate permissions for:
  - Amazon Bedrock (model access and Knowledge Base)
  - Amazon S3 (bucket creation and object storage)
  - Amazon S3 Vectors (vector bucket and index creation)
  - Amazon Cognito (user pool management)
  - AWS IAM (role and policy management)
  - AgentCore Memory (create, read, write)
  - AgentCore Runtime (deploy agents)
  - Amazon ECR (container registry)

**Knowledge Requirements:**
- Basic understanding of vector databases
- Familiarity with AWS services
- Understanding of RAG (Retrieval-Augmented Generation) concepts
- Understanding of AI agents and LLMs
- Familiarity with memory systems

## Setup: Install Dependencies

In [None]:
%pip install -qUr requirements.txt

In [None]:
import os
import json
import uuid
import time
import boto3
import logging
import pandas as pd
import requests
from io import StringIO
from datetime import datetime
from utils import create_agentcore_role
from IPython.display import Markdown, display

from botocore.exceptions import ClientError

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger("awsome-event-agent")

# Configuration
REGION = os.getenv("AWS_REGION", "us-west-2")
UNIQUE_ID = str(uuid.uuid4())[:8]
MODEL_ID = "us.anthropic.claude-haiku-4-5-20251001-v1:0"
EMBEDDING_MODEL = "amazon.titan-embed-text-v2:0"

print("=" * 70)
print("üöÄ AWSOME EVENT AGENT - COMPLETE WORKSHOP")
print("=" * 70)
print(f"\n‚úÖ Configuration:")
print(f"   Region: {REGION}")
print(f"   Unique ID: {UNIQUE_ID}")
print(f"   Model: {MODEL_ID}")
print(f"   Embeddings: {EMBEDDING_MODEL}")

---

# Chapter 1: The Agent

## Overview

In this chapter, we'll create an intelligent agent using **Strands** framework.

### What We'll Build

1. **Simple Agent**: Basic conversational agent

### What is Strands?

Strands is an agentic framework that provides:
- **Model Abstraction**: Work with any LLM (Bedrock, OpenAI, etc.)
- **Tool Integration**: Easy function calling for agents
- **Hook System**: Lifecycle events for custom logic
- **State Management**: Persistent agent state
- **Streaming Support**: Real-time response streaming

<div style="text-align:left">
    <img src="architecture_images/image_2.png" width="80%"/>
</div>


## Step 1.1: Create a Simple Agent

Let's start by creating a basic conversational agent without any tools or memory.

**Agent Components:**
- **Model**: BedrockModel wrapping Claude Haiku 4.5
- **System Prompt**: Defines the agent's role and behavior
- **No Tools Yet**: Agent can only chat, not retrieve information

**Expected Behavior:**
When asked about re:Invent sessions, the agent won't have access to the data and will say so.

In [None]:
print("\n" + "=" * 70)
print("ü§ñ CREATING SIMPLE AGENT")
print("=" * 70)

from strands import Agent
from strands.models import BedrockModel

# Create model
model = BedrockModel(model_id=MODEL_ID)

# Create simple agent
simple_agent = Agent(
    model=model, system_prompt="You're an intelligent event assistant."
)

print("\n‚úÖ Simple agent created")

# Test it
print("\nüìù Testing simple agent...\n")
response = simple_agent("Which session is good to learn about security with AI Agents?")

print("\n‚ö†Ô∏è Notice: The agent doesn't have access to session data yet!")

---

# Chapter 2: The Knowledge Base

## Overview

In this chapter, we'll create an **Amazon Bedrock Knowledge Base** with AWS re:Invent session data.

### What is a Knowledge Base?

Amazon Bedrock Knowledge Base is a fully-managed RAG (Retrieval-Augmented Generation) solution that:
- **Automates Ingestion**: Processes and chunks documents automatically
- **Generates Embeddings**: Uses foundation models to create vector representations
- **Stores in Vector DB**: Manages vector storage and indexing
- **Provides Search API**: Simple interface for semantic search
- **Scales Automatically**: No infrastructure management required

<div style="text-align:left">
    <img src="architecture_images/image_4.png" width="80%"/>
</div>

### What We'll Do

1. Download re:Invent 2024 session data (583 sessions)
2. Create S3 bucket and upload documents
3. Create S3 Vector Store and Index
4. Create Knowledge Base with automatic ingestion
5. Test retrieval
6. Agent with Tools: Add Knowledge Base search capability
7. Test RAG: See the agent use the KB to answer questions

## Step 2.1: Download re:Invent Session Data

We'll download the AWS re:Invent 2024 session catalog from a public source. This dataset contains:
- **583 sessions** covering various AWS services and topics
- Session codes, types, titles, and descriptions
- Topics ranging from AI/ML to security, databases, and more

### Data Source

The data comes from a curated TSV (Tab-Separated Values) file that includes:
- **Session Code**: Unique identifier (e.g., "AIM301", "SEC401")
- **Session Type**: Category (e.g., "Breakout session", "Workshop", "Chalk talk")
- **Session Title**: Descriptive name of the session
- **Session Description**: Detailed overview of session content

In [None]:
print("\n" + "=" * 70)
print("üì• DOWNLOADING RE:INVENT 2024 SESSION DATA")
print("=" * 70)

# Download session data
url = "https://gist.githubusercontent.com/timothyjrogers/1b239edc65ebaeb239676992247ddad5/raw/reinvent-2024-sessions.tsv"
response = requests.get(url)
response.raise_for_status()

# Parse TSV data
df = pd.read_csv(StringIO(response.text), sep="\t")

print(f"\n‚úÖ Downloaded {len(df)} sessions")
print(f"\nColumns: {list(df.columns)}")
print(f"\nSample session:")
print(df.iloc[0][["Session Code", "Session Type", "Session Title"]].to_dict())

## Step 2.2: Prepare Documents for Knowledge Base

Now we'll transform the raw session data into individual text documents optimized for vector search.

### Document Structure

Each session will be converted into a structured text file containing:
```
Session Code: AIM301
Session Type: Breakout session
Title: Building production-ready AI applications

Description:
[Full session description...]

---
Category: AWS re:Invent 2024
Type: Breakout session
Session ID: AIM301
```

### Why Individual Files?

Creating one file per session allows:
- **Granular Retrieval**: Each session can be independently matched
- **Clear Attribution**: Easy to track which document contains which info
- **Flexible Updates**: Can add/remove sessions without reprocessing all
- **Optimal Chunking**: Bedrock can chunk each session appropriately

In [None]:
print("\n" + "=" * 70)
print("üìù PREPARING SESSION DOCUMENTS")
print("=" * 70)

# Create documents directory
docs_dir = "reinvent_sessions_docs"
os.makedirs(docs_dir, exist_ok=True)

# Create one text file per session
session_files = []
for idx, row in df.iterrows():
    session_doc = f"""Session Code: {row["Session Code"]}
Session Type: {row["Session Type"]}
Title: {row["Session Title"]}

Description:
{row["Session Description"]}

---
Category: AWS re:Invent 2024
Type: {row["Session Type"]}
Session ID: {row["Session Code"]}
"""

    filename = f"{row['Session Code'].replace(':', '-')}.txt"
    filepath = os.path.join(docs_dir, filename)
    with open(filepath, "w", encoding="utf-8") as f:
        f.write(session_doc)

    session_files.append(filename)

    if (idx + 1) % 100 == 0:
        print(f"   Created {idx + 1}/{len(df)} documents...")

print(f"\n‚úÖ Created {len(session_files)} document files")
print(f"   Location: {docs_dir}/")

## Step 2.3: Create S3 Bucket and Upload Documents

We'll create an Amazon S3 bucket and upload our prepared documents. This bucket serves as the source for Bedrock Knowledge Base ingestion.

### S3 Bucket Configuration

The bucket will be created with:
- **Region-specific location**: Ensures low latency for Bedrock access
- **Standard storage class**: Cost-effective for knowledge base documents
- **Plain text content type**: Optimized for text processing

### Upload Process

We'll upload all 583 documents to a `sessions/` prefix in the bucket. This organization:
- Keeps documents logically grouped
- Enables easy data source configuration
- Allows for future expansion with additional document types

> ‚ö†Ô∏è **Bucket Naming**: S3 bucket names must be globally unique. We use a UUID suffix to ensure uniqueness.

In [None]:
print("\n" + "=" * 70)
print("üì¶ CREATING S3 BUCKET AND UPLOADING DOCUMENTS")
print("=" * 70)

# Initialize clients
s3_client = boto3.client("s3", region_name=REGION)
sts_client = boto3.client("sts", region_name=REGION)
account_id = sts_client.get_caller_identity()["Account"]

BUCKET_NAME = f"bedrock-agentcore-kb-{UNIQUE_ID}"

# Create S3 bucket
print(f"\nüì¶ Creating S3 bucket: {BUCKET_NAME}")
try:
    if REGION == "us-east-1":
        s3_client.create_bucket(Bucket=BUCKET_NAME)
    else:
        s3_client.create_bucket(
            Bucket=BUCKET_NAME, CreateBucketConfiguration={"LocationConstraint": REGION}
        )
    print(f"‚úÖ Bucket created")
except s3_client.exceptions.BucketAlreadyOwnedByYou:
    print(f"‚úÖ Bucket already exists")

# Upload documents
print(f"\nüì§ Uploading {len(session_files)} documents...")
uploaded_count = 0
for filename in session_files:
    filepath = os.path.join(docs_dir, filename)
    s3_key = f"sessions/{filename}"

    with open(filepath, "rb") as f:
        s3_client.put_object(
            Bucket=BUCKET_NAME, Key=s3_key, Body=f, ContentType="text/plain"
        )

    uploaded_count += 1
    if uploaded_count % 100 == 0:
        print(f"   Uploaded {uploaded_count}/{len(session_files)} files...")

print(f"\n‚úÖ Uploaded {uploaded_count} documents to s3://{BUCKET_NAME}/sessions/")

## Step 2.4: Create IAM Role for Knowledge Base

The Knowledge Base needs an IAM role to access AWS resources. This role will have permissions to:
- Read documents from the S3 bucket
- Invoke the Titan Embeddings model
- Manage the S3 Vectors index

### Trust Policy

The trust policy allows:
- `bedrock.amazonaws.com` to assume this role
- Ensures only Bedrock service can use these permissions

### Permissions Policy

The policy grants access to:
- **S3 Operations**: GetObject, ListBucket
- **Bedrock Models**: InvokeModel for embeddings
- **S3 Vectors**: Full CRUD operations on vector buckets and indexes

> üí° **Security**: This follows least-privilege principle, granting only what's needed for KB operations.

In [None]:
print("\n" + "=" * 70)
print("üîê CREATING IAM ROLE FOR KNOWLEDGE BASE")
print("=" * 70)

iam_client = boto3.client("iam", region_name=REGION)
kb_role_name = f"BedrockKBRole_{UNIQUE_ID}"

trust_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {"Service": "bedrock.amazonaws.com"},
            "Action": "sts:AssumeRole",
        }
    ],
}

try:
    create_role_response = iam_client.create_role(
        RoleName=kb_role_name,
        AssumeRolePolicyDocument=json.dumps(trust_policy),
        Description="Role for Bedrock Knowledge Base",
    )
    kb_role_arn = create_role_response["Role"]["Arn"]
    print(f"\n‚úÖ Created role: {kb_role_arn}")
except iam_client.exceptions.EntityAlreadyExistsException:
    kb_role_arn = f"arn:aws:iam::{account_id}:role/{kb_role_name}"
    print(f"\n‚úÖ Role already exists: {kb_role_arn}")

# Attach policy
policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": ["s3:GetObject", "s3:ListBucket"],
            "Resource": [
                f"arn:aws:s3:::{BUCKET_NAME}",
                f"arn:aws:s3:::{BUCKET_NAME}/*",
            ],
        },
        {
            "Effect": "Allow",
            "Action": "bedrock:InvokeModel",
            "Resource": f"arn:aws:bedrock:{REGION}::foundation-model/{EMBEDDING_MODEL}",
        },
        {
            "Effect": "Allow",
            "Action": ["s3vectors:*"],
            "Resource": f"arn:aws:s3vectors:{REGION}:{account_id}:bucket/*",
        },
    ],
}

iam_client.put_role_policy(
    RoleName=kb_role_name,
    PolicyName=f"BedrockKBPolicy_{UNIQUE_ID}",
    PolicyDocument=json.dumps(policy_document),
)
print(f"‚úÖ Attached policy to role")

print("\n‚è≥ Waiting for IAM role to propagate (30 seconds)...")
time.sleep(30)

## Step 2.5: Create S3 Vector Store and Index

### What is S3 Vectors?

Amazon S3 Vectors is the first cloud object store with native support to store and query vectors, delivering purpose-built, cost-optimized vector storage for AI agents, AI inference, and semantic search of your content stored in Amazon S3. It scales automatically without cluster management and integrates natively with Amazon Bedrock Knowledge Bases.

### Vector Store (Vector Bucket)

The S3 Vector Store, also called a vector bucket, is a specialized S3 bucket optimized for vector operations. It's fully managed by AWS with no infrastructure to maintain, supports multiple indexes per bucket, and scales automatically based on usage.

### Vector Index Configuration

The vector index stores and searches embeddings with three key settings:

- **Dimension (1024)**: Matches Titan Embeddings V2 output and captures rich semantic information from the model.

- **Distance Metric (Cosine)**: Measures similarity by comparing vector direction rather than magnitude, which is ideal for text embeddings.

- **Data Type (Float32)**: Balances precision and storage efficiency for semantic search operations.

In [None]:
print("\n" + "=" * 70)
print("üìä CREATING S3 VECTOR STORE AND INDEX")
print("=" * 70)

s3vectors_client = boto3.client("s3vectors", region_name=REGION)
vector_store_name = f"bedrock-kb-{UNIQUE_ID}"
vector_index_name = f"reinvent-sessions-index-{UNIQUE_ID}"

# Create vector bucket
print(f"\nüìä Creating S3 Vector Store: {vector_store_name}")
try:
    s3vectors_client.create_vector_bucket(vectorBucketName=vector_store_name)
    print(f"‚úÖ Vector bucket created")
except Exception as e:
    print(f"‚úÖ Vector bucket exists or created: {e}")

# Get vector bucket ARN
bucket_response = s3vectors_client.get_vector_bucket(vectorBucketName=vector_store_name)
vector_store_arn = bucket_response["vectorBucket"]["vectorBucketArn"]
print(f"   ARN: {vector_store_arn}")

# Create vector index
print(f"\nüìä Creating Vector Index: {vector_index_name}")
try:
    s3vectors_client.create_index(
        vectorBucketName=vector_store_name,
        indexName=vector_index_name,
        dimension=1024,  # Titan Embeddings V2
        distanceMetric="cosine",
        dataType="float32",
        metadataConfiguration={"nonFilterableMetadataKeys": ["AMAZON_BEDROCK_TEXT"]},
    )
    print(f"‚úÖ Vector index created")
except Exception as e:
    print(f"‚úÖ Vector index exists or created: {e}")

# Get index ARN
index_response = s3vectors_client.list_indexes(vectorBucketName=vector_store_name)
vector_index_arn = index_response["indexes"][0]["indexArn"]
print(f"   ARN: {vector_index_arn}")

## Step 2.6: Create Knowledge Base and Ingest Data

### Knowledge Base Orchestration

<div style="text-align:left">
    <img src="architecture_images/image_3.png" width="80%"/>
</div>

Now we create the Bedrock Knowledge Base that orchestrates the entire RAG pipeline.

**Knowledge Base Flow**:
1. **Document Ingestion**: Reads documents from Amazon S3
2. **Text Extraction**: Parses document content
3. **Chunking**: Splits documents into optimal segments (300 tokens, 20% overlap)
4. **Embedding**: Generates vectors using Titan V2
5. **Indexing**: Stores vectors in S3 Vectors

**Configuration Details**:
- **Type**: VECTOR (for semantic search)
- **Embedding Model**: Titan Embeddings V2
- **Storage**: S3 Vectors (serverless)
- **Dimensions**: 1024
- **Data Type**: FLOAT32

### Chunking Strategy

**Fixed-Size Chunking**:
- **Max Tokens**: 300 per chunk
- **Overlap**: 20% between consecutive chunks
```

In [None]:
print("\n" + "=" * 70)
print("üìö CREATING KNOWLEDGE BASE")
print("=" * 70)

bedrock_agent_client = boto3.client("bedrock-agent", region_name=REGION)
KB_NAME = f"ReInventSessionsKB_{UNIQUE_ID}"

# Create Knowledge Base
print(f"\nüìö Creating Knowledge Base: {KB_NAME}")
kb_response = bedrock_agent_client.create_knowledge_base(
    name=KB_NAME,
    description="AWS re:Invent 2024 sessions knowledge base",
    roleArn=kb_role_arn,
    knowledgeBaseConfiguration={
        "type": "VECTOR",
        "vectorKnowledgeBaseConfiguration": {
            "embeddingModelArn": f"arn:aws:bedrock:{REGION}::foundation-model/{EMBEDDING_MODEL}",
            "embeddingModelConfiguration": {
                "bedrockEmbeddingModelConfiguration": {
                    "dimensions": 1024,
                    "embeddingDataType": "FLOAT32",
                }
            },
        },
    },
    storageConfiguration={
        "type": "S3_VECTORS",
        "s3VectorsConfiguration": {"indexArn": vector_index_arn},
    },
)

KB_ID = kb_response["knowledgeBase"]["knowledgeBaseId"]
print(f"‚úÖ Knowledge Base created: {KB_ID}")

# Wait for KB to be ready
print("\n‚è≥ Waiting for Knowledge Base to be ACTIVE...")
while True:
    kb_status = bedrock_agent_client.get_knowledge_base(knowledgeBaseId=KB_ID)
    status = kb_status["knowledgeBase"]["status"]
    if status == "ACTIVE":
        print(f"‚úÖ Knowledge Base is ACTIVE")
        break
    elif status == "FAILED":
        print(f"‚ùå Knowledge Base creation failed")
        break
    print(f"   Status: {status}", end="\r")
    time.sleep(5)

### Step 2.6.1: Create Data Source and Start Ingestion

The final step in Knowledge Base setup is creating a data source and ingesting our documents.

**Data Source Configuration**:
- **Type**: S3
- **Bucket**: Our previously created bucket
- **Chunking**: Fixed-size with 20% overlap
- **Automatic**: Bedrock handles parsing and embedding

**Ingestion Process**:
1. Scan S3 bucket (583 files)
2. Parse each text file
3. Chunk into 300-token segments
4. Generate embeddings (Titan V2)
5. Store in S3 Vectors index

In [None]:
print("\n" + "=" * 70)
print("üìÅ CREATING DATA SOURCE AND INGESTING DATA")
print("=" * 70)

# Create Data Source
print(f"\nüìÅ Creating data source...")
ds_response = bedrock_agent_client.create_data_source(
    knowledgeBaseId=KB_ID,
    name=f"ReInventSessions_DataSource_{UNIQUE_ID}",
    description="S3 data source with re:Invent session documents",
    dataSourceConfiguration={
        "type": "S3",
        "s3Configuration": {"bucketArn": f"arn:aws:s3:::{BUCKET_NAME}"},
    },
    vectorIngestionConfiguration={
        "chunkingConfiguration": {
            "chunkingStrategy": "FIXED_SIZE",
            "fixedSizeChunkingConfiguration": {
                "maxTokens": 300,
                "overlapPercentage": 20,
            },
        }
    },
)

data_source_id = ds_response["dataSource"]["dataSourceId"]
print(f"‚úÖ Data source created: {data_source_id}")

# Start ingestion
print("\nüîÑ Starting data ingestion...")
ingestion_response = bedrock_agent_client.start_ingestion_job(
    knowledgeBaseId=KB_ID, dataSourceId=data_source_id
)

ingestion_job_id = ingestion_response["ingestionJob"]["ingestionJobId"]
print(f"   Ingestion job started: {ingestion_job_id}")

# Wait for ingestion
print("‚è≥ Waiting for ingestion to complete (this may take a few minutes)...")
while True:
    job_status = bedrock_agent_client.get_ingestion_job(
        knowledgeBaseId=KB_ID,
        dataSourceId=data_source_id,
        ingestionJobId=ingestion_job_id,
    )
    status = job_status["ingestionJob"]["status"]
    if status == "COMPLETE":
        stats = job_status["ingestionJob"].get("statistics", {})
        print(f"\n‚úÖ Ingestion complete!")
        print(f"   Documents processed: {stats.get('numberOfDocumentsScanned', 0)}")
        break
    elif status == "FAILED":
        failures = job_status["ingestionJob"].get("failureReasons", [])
        print(f"\n‚ùå Ingestion failed: {failures}")
        break
    print(f"   Status: {status}", end="\r")
    time.sleep(15)

## Step 2.7: Test Knowledge Base Retrieval

Let's test the Knowledge Base retrieval to ensure it's working correctly.

### Retrieval API

The `retrieve` API:
- **Embeds the query**: Converts text to vector using same model (Titan V2)
- **Searches vectors**: Finds most similar chunks using cosine similarity
- **Returns results**: Top K chunks with relevance scores
- **Includes metadata**: Source documents and locations

In [None]:
print("\n" + "=" * 70)
print("üß™ TESTING KNOWLEDGE BASE RETRIEVAL")
print("=" * 70)

bedrock_agent_runtime = boto3.client("bedrock-agent-runtime", region_name=REGION)

# Test query
test_query = "What sessions are about generative AI and security?"
print(f"\nüîç Query: {test_query}")

response = bedrock_agent_runtime.retrieve(
    knowledgeBaseId=KB_ID,
    retrievalQuery={"text": test_query},
    retrievalConfiguration={"vectorSearchConfiguration": {"numberOfResults": 3}},
)

print(f"\n‚úÖ Found {len(response['retrievalResults'])} results:\n")
for idx, result in enumerate(response["retrievalResults"], 1):
    content = result["content"]["text"]
    score = result["score"]
    print(f"{idx}. Score: {score:.3f}")
    print(f"   {content[:200]}...\n")

## Step 2.8: Create Knowledge Base Search Tool

Now we'll create a Strands tool that allows our agent to search the Knowledge Base.

### What are Strands Tools?

Tools are Python functions that agents can call. The `@tool` decorator generates an OpenAPI-compatible schema using Python type hints for parameter validation and docstrings for natural language descriptions. The agent automatically decides when to invoke tools based on the conversation context.

### Tool Design

Our `search_reinvent_sessions` tool takes a query string and optional max_results parameter (default 5). It calls the Bedrock Knowledge Base retrieve API, extracts relevant chunks with similarity scores, and formats them as a list of dictionaries containing rank, content, and score for the agent to use.

The agent automatically calls this tool when users ask about re:Invent sessions, request recommendations, or need specific technical information related to session keywords.


In [None]:
print("\n" + "=" * 70)
print("üîß CREATING KNOWLEDGE BASE SEARCH TOOL")
print("=" * 70)

from strands import tool
from typing import List, Dict, Any


@tool
def search_reinvent_sessions(query: str, max_results: int = 5) -> List[Dict[str, Any]]:
    """
    Search AWS re:Invent sessions from a knowledge base using semantic search.

    Use this tool when users ask for recommendations about AWS re:Invent sessions,
    or when you want to find relevant sessions based on their interests.

    Args:
        query: The search query describing what kind of sessions to find
        max_results: Maximum number of results to return (default: 5, max: 10)

    Returns:
        List of relevant re:Invent sessions with their details
    """
    try:
        logger.info(f"üîç Searching KB with query: {query}")

        # Call the Knowledge Base retrieve API
        response = bedrock_agent_runtime.retrieve(
            knowledgeBaseId=KB_ID,
            retrievalQuery={"text": query},
            retrievalConfiguration={
                "vectorSearchConfiguration": {"numberOfResults": min(max_results, 10)}
            },
        )

        # Extract and format results
        results = []
        for idx, item in enumerate(response.get("retrievalResults", []), 1):
            result = {
                "rank": idx,
                "content": item.get("content", {}).get("text", ""),
                "score": item.get("score", 0.0),
            }
            results.append(result)

        logger.info(f"‚úÖ Found {len(results)} re:Invent sessions")
        return results

    except Exception as e:
        logger.error(f"‚ùå Error searching KB: {e}")
        return [{"error": f"Failed to search knowledge base: {str(e)}"}]


print("\n‚úÖ Knowledge Base search tool created")

---

# Chapter 3: The Memory

## Overview

In this chapter, we'll add **short-term memory** and **long-term memory** to our agent using AgentCore Memory.

### What is AgentCore Memory?

AgentCore Memory is a fully-managed service that provides persistent storage for agent memories across sessions. 

**Key Capabilities**:
- **Multi-Strategy**: Supports UserPreference, Semantic, and Episodic memory
- **Actor-Scoped**: Memories isolated per user (actor)
- **Automatic Processing**: Background analysis and extraction
- **Namespace Organization**: Logical separation of memory types
- **Cross-Session**: Persists beyond conversation history

### Memory Architecture

<div style="text-align:left">
    <img src="architecture_images/image_5.png" width="80%"/>
</div>

### Memory Strategies

**UserPreferenceMemoryStrategy** (What we'll use):
- **Purpose**: Automatically learn user preferences from conversations
- **Storage**: `/users/{actorId}/preferences` namespace
- **Content**: Communication style, interests, background, preferences
- **Processing**: LLM-based analysis of conversation context
- **Automatic**: No explicit commands needed

**How It Works**:
```
User: "I'm a Python developer interested in AI"
  ‚Üì
Agent responds naturally
  ‚Üì
UserPreference Strategy (background):
  - Analyzes conversation
  - Extracts: "User is a Python developer"
  - Extracts: "User is interested in AI"
  - Stores in /users/{actorId}/preferences
  ‚Üì
Next conversation:
  - Agent loads preferences
  - Personalizes responses
  - Recommends Python/AI content
```

> üí° **Key Insight**: UserPreference learning happens automatically without explicit "remember" commands, making interactions feel natural.

## Step 3.1: Create AgentCore Memory

Let's create the memory resource with the UserPreference strategy.

### Memory Configuration

**Name**: `EventAgentMemory_{UNIQUE_ID}`
- Unique identifier for this memory resource
- Allows multiple memory resources per account

**Strategies**: UserPreference
- Can add more strategies later (Semantic, Episodic, etc.)
- Each strategy has its own namespace

**Namespaces**: `/users/{actorId}/preferences`
- `{actorId}` is dynamically replaced with user ID
- Ensures user data isolation
- Enables multi-tenant applications

**Event Expiry**: 30 days
- How long to keep raw conversation events
- Extracted preferences persist longer
- Adjust based on your data retention needs

> ‚è±Ô∏è **Creation Time**: Memory resource creation takes 2-3 minutes as AWS provisions the infrastructure.

In [None]:
print("\n" + "=" * 70)
print("üß† CREATING AGENTCORE MEMORY")
print("=" * 70)

from bedrock_agentcore.memory.constants import StrategyType
from bedrock_agentcore_starter_toolkit.operations.memory.manager import MemoryManager
from bedrock_agentcore.memory.session import MemorySessionManager

# Initialize Memory Manager
memory_manager = MemoryManager(region_name=REGION)

# Create memory with UserPreference strategy
print("\nüß† Creating memory with UserPreference strategy...")
print("   This takes 2-3 minutes...\n")

memory = memory_manager.get_or_create_memory(
    name=f"EventAgentMemory_{UNIQUE_ID}",
    strategies=[
        {
            StrategyType.USER_PREFERENCE.value: {
                "name": "UserPreferences",
                "namespaces": ["/users/{actorId}/preferences"],
                "description": "Captures customer preferences and behavior",
            }
        }
    ],
    description="Memory for Event Agent with user preferences",
    event_expiry_days=30,
)

MEMORY_ID = memory.get("id")
print(f"\n‚úÖ Memory created successfully!")
print(f"   Memory ID: {MEMORY_ID}")
print(f"   Status: {memory.get('status')}")
print(f"   Namespace: /users/{{actorId}}/preferences")

## Step 3.2: Create Memory Hook Provider

### Understanding Strands Hooks

<div style="text-align:left">
    <img src="architecture_images/image_6.png" width="80%"/>
</div>

Hooks are lifecycle events in the Strands framework that allow you to inject custom logic:

**Available Hook Events:**
- **AgentInitializedEvent**: When agent starts (load context)
- **MessageAddedEvent**: When message is added (save to memory)
- **ToolExecutedEvent**: After tool runs (log results)
- **RequestEnd**: When cycle completes

<div style="text-align:left">
    <img src="architecture_images/image_7.png" width="80%"/>
</div>

In [None]:
print("\n" + "=" * 70)
print("üîó CREATING MEMORY HOOK PROVIDER")
print("=" * 70)

from bedrock_agentcore.memory.constants import ConversationalMessage, MessageRole
from strands.hooks import (
    HookProvider,
    HookRegistry,
    AgentInitializedEvent,
    MessageAddedEvent,
)


class MemoryHookProvider(HookProvider):
    """Custom hook provider to integrate with AgentCore Memory"""

    def __init__(self):
        logger.info(f"Initializing MemoryHookProvider")
        self.memory_session_manager = MemorySessionManager(MEMORY_ID, REGION)

    def on_agent_initialized(self, event: AgentInitializedEvent):
        """Load user preferences when agent starts"""
        logger.info("üéØ Agent initialization - loading preferences")

        actor_id = event.agent.state.get("actor_id")

        if not actor_id:
            logger.warning("Missing actor_id")
            return

        try:
            # Retrieve user preferences from LTM
            preferences = self.memory_session_manager.search_long_term_memories(
                namespace_prefix=f"/users/{actor_id}/preferences",
                query="What are the user's preferences, interests, and background?",
                top_k=5,
            )

            if preferences:
                logger.info(f"‚úÖ Loaded {len(preferences)} preferences")
                pref_messages = []
                for pref in preferences:
                    pref_text = pref.get("content", {}).get("text", "")
                    if pref_text:
                        try:
                            pref_json = json.loads(pref_text)
                            pref_messages.append(
                                f"- {pref_json.get('preference', pref_text)}"
                            )
                        except:
                            pref_messages.append(f"- {pref_text}")

                if pref_messages:
                    context = "\n".join(pref_messages)
                    event.agent.system_prompt += f" **User Preferences:** {context}"
                    logger.info("‚úÖ Added preferences to system prompt")
            else:
                logger.info("‚ÑπÔ∏è No preferences found yet")

        except Exception as e:
            logger.error(f"‚ùå Error loading preferences: {e}")

    def on_message_added(self, event: MessageAddedEvent):
        """Store messages in memory"""
        logger.info("üí¨ Message added - storing in memory")

        actor_id = event.agent.state.get("actor_id")
        session_id = event.agent.state.get("session_id")

        if not all([actor_id, session_id]):
            logger.warning("Missing required state values")
            return

        try:
            messages = event.agent.messages
            last_message = messages[-1]
            message_content = str(last_message.get("content", ""))
            if last_message["role"] == "user":
                message_role = MessageRole.USER
            elif last_message["role"] == "assistant":
                message_role = MessageRole.ASSISTANT

            self.memory_session_manager.add_turns(
                actor_id=actor_id,
                session_id=session_id,
                messages=[ConversationalMessage(message_content, message_role)],
            )
            logger.info("‚úÖ Message stored")

        except Exception as e:
            logger.error(f"‚ùå Error storing message: {e}")

    def register_hooks(self, registry: HookRegistry):
        """Register hooks with the agent"""
        registry.add_callback(AgentInitializedEvent, self.on_agent_initialized)
        registry.add_callback(MessageAddedEvent, self.on_message_added)


print("\n‚úÖ MemoryHookProvider class defined")

## Step 3.3: Create Agent with Memory and RAG

Now we'll create an agent that combines memory hooks and RAG tools.

### Agent Configuration

**Components**:
- **Model**: Claude Haiku 4.5 via BedrockModel
- **Hooks**: MemoryHookProvider for automatic memory management
- **Tools**: search_reinvent_sessions for Knowledge Base access
- **System Prompt**: Instructions for behavior
- **State**: actor_id and session_id for memory scoping

### State Management

The agent state contains:
- **actor_id**: User identifier (from Cognito later)
- **session_id**: Conversation identifier (from Runtime later)
- Used by hooks to load/save correct memories

### Expected Behavior

When this agent runs:
1. **Initialization**: Loads any existing preferences
2. **User Message**: Stored in memory
3. **Tool Use**: Searches KB when needed
4. **Agent Response**: Stored in memory for history
5. **Background**: UserPreference analyzes for learning

> üí° **First Run**: On first conversation, preferences will be empty. They'll be learned from this interaction.

In [None]:
print("\n" + "=" * 70)
print("ü§ñ CREATING AGENT WITH MEMORY AND RAG")
print("=" * 70)

# Generate test IDs
ACTOR_ID = f"user-{UNIQUE_ID}"
session_id_1 = f"session-1-{str(uuid.uuid4())[:8]}"

print(f"\nüìã Configuration:")
print(f"   Actor ID: {ACTOR_ID}")
print(f"   Session ID: {session_id_1}")
print(f"   Memory ID: {MEMORY_ID}")

# Create memory hook
memory_hook = MemoryHookProvider()

# Create agent with memory and RAG
memory_rag_agent = Agent(
    model=model,
    hooks=[memory_hook],
    tools=[search_reinvent_sessions],
    system_prompt="""You're an intelligent event assistant. You can retrieve session information using your tools.""",
    state={"actor_id": ACTOR_ID, "session_id": session_id_1},
)

print("\n‚úÖ Memory-enabled RAG agent created")

## Step 3.4: Test Memory - Session 1

Let's test the memory system by sharing personal information.

In [None]:
print("=" * 70)
print("üß™ TEST 1: Background Learning (UserPreference Memory)")
print("=" * 70)
print("\nüìù Sharing personal information...")
print("   Expected: Agent responds naturally, background learning occurs\n")

response = memory_rag_agent(
    """Hi! Let me tell you about myself:
    - My name is Alice
    - I work as a software engineer at TechCorp
    - My favorite programming language is Python
    - I'm interested in cloud architecture, AI, and security
    - I prefer hands-on technical sessions over high-level overviews
    """
)

print("\n‚úÖ Message stored in short-term memory")
print("‚è≥ UserPreference strategy will analyze this in the background...")

## Step 3.5: Verify Long-Term Memory Extraction

Let's wait for the UserPreference strategy to process the conversation and check what preferences were extracted.

### Background Processing

The Long-term memory with UserPreference strategy:
1. **Monitors**: Watches for new conversation turns
2. **Analyzes**: Uses LLM to extract preferences
3. **Structures**: Formats as JSON with categories
4. **Stores**: Saves to long-term memory namespace
5. **Indexes**: Makes searchable for future retrieval

### Verification Method

We'll use the `list_long_term_memory_records` API to:
- Query the user's preference namespace
- Retrieve all stored preferences
- Display the extracted JSON structure

> ‚è±Ô∏è **Wait Time**: We wait 40 seconds to ensure processing completes.

In [None]:
print("\n‚è≥ Waiting for UserPreference strategy to process (40 seconds)...")
time.sleep(40)

# Check for extracted preferences
print("\nüîç Checking for extracted preferences...")
memory_session_manager = MemorySessionManager(MEMORY_ID, REGION)
try:
    response = memory_session_manager.list_long_term_memory_records(
        namespace_prefix=f"/users/{ACTOR_ID}/preferences", max_results=10
    )

    if response:
        print(f"\n‚úÖ Found {len(response)} preferences!\n")
        for idx, pref in enumerate(response, 1):
            content = pref.get("content", {}).get("text", "")
            print(f"{idx}. {content}‚Ä¶")
    else:
        print("‚ÑπÔ∏è No preferences extracted yet (may need more time)")

except Exception as e:
    print(f"‚ö†Ô∏è Could not retrieve preferences: {e}")

## Step 3.6: Test Cross-Session Memory

Now for the crucial test: can the agent remember preferences in a completely new session?

### What Makes This Test Important

**New Session = No Chat History**:
- Different session_id
- No messages array
- Clean slate for conversation
- But same actor_id (same user)

**Memory Persistence**:
- Preferences stored in LTM (not chat history)
- Loaded via `on_agent_initialized` hook
- Injected into system prompt
- Agent "remembers" without explicit history

In [None]:
print("\n" + "=" * 70)
print("üß™ SESSION 2: New Session, Same User")
print("=" * 70)

# Create NEW session
session_id_2 = f"session-2-{str(uuid.uuid4())[:8]}"
print(f"\nüîÑ Creating NEW agent with NEW session ID: {session_id_2}\n")

# Create completely new agent instance
new_memory_hook = MemoryHookProvider()
new_memory_rag_agent = Agent(
    model=model,
    hooks=[new_memory_hook],
    tools=[search_reinvent_sessions],
    system_prompt="""You're an intelligent assistant. You can retrieve session information using your tools. When recommending sessions use the top 3.
    """,
    state={
        "actor_id": ACTOR_ID,  # Same user
        "session_id": session_id_2,  # Different session
    },
)

print("‚úÖ New agent created (will load preferences from LTM)\n")

# Ask about previous information
print("üìù Asking about information from previous session...\n")

response = new_memory_rag_agent("Which session should I attend in reinvent?")
display(Markdown(f"**Agent:** {response.message['content'][0]['text']}"))

print("\n‚úÖ Agent retrieved preferences from long-term memory!")
print("   This is NOT from chat history - it's from the UserPreference strategy")

print("\n" + "=" * 70)
print("‚úÖ CHAPTER 3 COMPLETE: Memory is working!")
print("=" * 70)

---

# Chapter 4: Runtime

## Overview

In this chapter, we'll deploy our agent to **AgentCore Runtime** for secure, scalable execution.

<div style="text-align:left">
    <img src="architecture_images/image_8.png" width="80%"/>
</div>

### Why AgentCore Runtime?

**Production Benefits**:
- **Isolation**: Each session runs in isolated microVM
- **Scalability**: Automatic scaling based on load
- **Security**: Controlled execution environment
- **Production-ready**: Built for enterprise workloads
- **Managed Infrastructure**: No servers to maintain
- **Built-in Monitoring**: CloudWatch logs and X-Ray traces

### Deployment Process

<div style="text-align:left">
    <img src="architecture_images/image_9.png" width="80%"/>
</div>

**Steps We'll Follow**:
1. Create agent Python file for deployment
2. Create IAM execution role with comprehensive permissions
3. Configure runtime deployment settings
4. Launch to cloud (CodeBuild creates container)
5. Test deployed agent

## Step 4.1: Create Agent File for Runtime

We need to create a standalone Python file that can run in the AgentCore Runtime environment.

### Key Differences from local Agent

**Runtime Requirements**:
- Must be a standalone Python file
- Uses `@app.entrypoint` decorator
- Receives payload and context objects
- Returns string response
- Handles environment variables

### Runtime Context Object

The `context` parameter provides:
- **session_id**: Unique identifier for conversation
- **request_headers**: HTTP headers (for authentication)

### Environment Variables

The agent needs these injected:
- `MODEL_ID`: LLM model to use
- `MEMORY_ID`: Memory resource identifier
- `KB_ID`: Knowledge Base identifier
- `AWS_REGION`: AWS region

In [None]:
%%writefile event_agent.py
import os
import json
import boto3
import logging
from strands import Agent, tool
from typing import Dict, Any, List
from strands.models import BedrockModel
from bedrock_agentcore.runtime import BedrockAgentCoreApp
from bedrock_agentcore.memory.session import MemorySessionManager
from strands.hooks import AgentInitializedEvent, HookProvider, HookRegistry, MessageAddedEvent
from bedrock_agentcore.memory.constants import StrategyType, ConversationalMessage, MessageRole

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger("event-agent")

# Initialize the agent core app
app = BedrockAgentCoreApp()

# Environment variables
MODEL_ID = os.getenv('MODEL_ID')
MEMORY_ID = os.getenv('MEMORY_ID')
REGION = os.getenv('AWS_REGION')
KB_ID = os.getenv('KB_ID')

# Global instances
agent = None
bedrock_agent_runtime = None

class MemoryHookProvider(HookProvider):
    """Custom hook provider to integrate with AgentCore Memory"""
    
    def __init__(self):
        logger.info(f"Initializing MemoryHookProvider")
        self.memory_session_manager = MemorySessionManager(MEMORY_ID, REGION)
    
    def on_agent_initialized(self, event: AgentInitializedEvent):
        """Load user preferences when agent starts"""
        logger.info("üéØ Agent initialization - loading preferences")
        
        actor_id = event.agent.state.get("actor_id")
        
        if not actor_id:
            logger.warning("Missing actor_id")
            return
        
        try:
            # Retrieve user preferences from LTM
            preferences = self.memory_session_manager.search_long_term_memories(
                namespace_prefix=f"/users/{actor_id}/preferences",
                query="What are the user's preferences, interests, and background?",
                top_k=5
            )
            
            if preferences:
                logger.info(f"‚úÖ Loaded {len(preferences)} preferences")
                pref_messages = []
                for pref in preferences:
                    pref_text = pref.get('content', {}).get('text', '')
                    if pref_text:
                        try:
                            pref_json = json.loads(pref_text)
                            pref_messages.append(f"- {pref_json.get('preference', pref_text)}")
                        except:
                            pref_messages.append(f"- {pref_text}")
                
                if pref_messages:
                    context = "\n".join(pref_messages)
                    event.agent.system_prompt += f"\n\n**User Preferences:**\n{context}"
                    logger.info("‚úÖ Added preferences to system prompt")
            else:
                logger.info("‚ÑπÔ∏è No preferences found yet")
                
        except Exception as e:
            logger.error(f"‚ùå Error loading preferences: {e}")
    
    def on_message_added(self, event: MessageAddedEvent):
        """Store messages in memory"""
        logger.info("üí¨ Message added - storing in memory")
        
        actor_id = event.agent.state.get("actor_id")
        session_id = event.agent.state.get("session_id")
        
        if not all([actor_id, session_id]):
            logger.warning("Missing required state values")
            return
        
        try:
            messages = event.agent.messages
            last_message = messages[-1]
            message_content = str(last_message.get("content", ""))
            if last_message["role"] == "user":
                message_role = MessageRole.USER
            elif last_message["role"] == "assistant":
                message_role = MessageRole.ASSISTANT
            
            self.memory_session_manager.add_turns(
                actor_id=actor_id,
                session_id=session_id,
                messages=[ConversationalMessage(message_content, message_role)]
            )
            logger.info("‚úÖ Message stored")
            
        except Exception as e:
            logger.error(f"‚ùå Error storing message: {e}")
    
    def register_hooks(self, registry: HookRegistry):
        """Register hooks with the agent"""
        registry.add_callback(AgentInitializedEvent, self.on_agent_initialized)
        registry.add_callback(MessageAddedEvent, self.on_message_added)

@tool
def search_reinvent_sessions(query: str) -> List[Dict[str, Any]]:
    """
    Search AWS re:Invent sessions from a knowledge base.
    
    Args:
        query: Search query for sessions
    
    Returns:
        List of relevant sessions
    """
    global bedrock_agent_runtime, KB_ID
    
    try:
        if not bedrock_agent_runtime:
            bedrock_agent_runtime = boto3.client('bedrock-agent-runtime', region_name=REGION)
        
        response = bedrock_agent_runtime.retrieve(
            knowledgeBaseId=KB_ID,
            retrievalQuery={'text': query},
            retrievalConfiguration={
                'vectorSearchConfiguration': {
                    'numberOfResults': 5
                }
            }
        )
        
        results = []
        for idx, item in enumerate(response.get('retrievalResults', []), 1):
            result = {
                'rank': idx,
                'content': item.get('content', {}).get('text', ''),
                'score': item.get('score', 0.0)
            }
            results.append(result)
        
        return results
        
    except Exception as e:
        logger.error(f"Error searching KB: {e}")
        return [{"error": str(e)}]

def initialize_agent(actor_id, session_id):
    """Initialize the agent"""
    global agent
    
    model = BedrockModel(model_id=MODEL_ID)
    memory_hook = MemoryHookProvider()
    
    agent = Agent(
        model=model,
        hooks=[memory_hook],
        tools=[search_reinvent_sessions],
        system_prompt="""You're an intelligent event assistant with long-term memory and knowledge retrieval.
        
        Use search_reinvent_sessions to find session information.
        If you get enough information in your first search don't do additional tool calls.
        Remember user preferences and provide personalized recommendations.
        """,
        state={
            "actor_id": actor_id,
            "session_id": session_id
        }
    )

@app.entrypoint
def runtime_agent(payload, context):
    """
    Main entry point for the runtime agent
    """
    global agent
    
    user_input = payload.get("prompt")
    actor_id = payload.get("actor_id")
    session_id = context.session_id
    
    if not user_input:
        return "Error: Missing 'prompt' field"
    
    # Initialize agent on first request
    if agent is None:
        initialize_agent(actor_id, session_id)
            
    # Invoke agent
    response = agent(user_input)
    return response.message['content'][0]['text']

if __name__ == "__main__":
    app.run()

## Step 4.2: Create Execution Role

The Runtime needs comprehensive IAM permissions to operate.

> üí° **Helper Function**: We use a utility function to create the role with all needed permissions.

> ‚è±Ô∏è **Propagation**: IAM role changes take ~30 seconds to propagate globally.

In [None]:
print("\n" + "=" * 70)
print("üîê CREATING EXECUTION ROLE FOR RUNTIME")
print("=" * 70)

agent_name = f"event_agent_{UNIQUE_ID}"
execution_role_arn = create_agentcore_role(agent_name=agent_name, region=REGION)

print(f"‚úÖ Attached execution policy")

print("\n‚è≥ Waiting for role propagation (30 seconds)...")
time.sleep(30)

## Step 4.3: Configure and Launch Runtime

Now we'll configure the deployment and launch to Bedrock AgentCore Runtime.

### Configuration Parameters

- `entrypoint`: Python file with agent code ("event_agent.py")
- `execution_role`: IAM role ARN for permissions
- `region`: AWS region for deployment
- `agent_name`: Unique identifier
- `auto_create_ecr`: Automatically create container registry (True)
- `requirements_file`: Python dependencies ("requirements.txt")
- `non_interactive`: Prevent prompts in notebooks (True)
- `memory_mode`: "NO_MEMORY" (we created our own memory resource)

### Environment Variables

We inject configuration via environment variables:
- `MEMORY_ID`: Memory resource to use
- `KB_ID`: Knowledge Base to query
- `MODEL_ID`: LLM model identifier
- `AWS_REGION`: AWS region

> üí° **Monitoring**: The toolkit provides real-time progress updates.

In [None]:
print("\n" + "=" * 70)
print("üöÄ CONFIGURING AND LAUNCHING RUNTIME")
print("=" * 70)

from bedrock_agentcore_starter_toolkit import Runtime

agentcore_runtime = Runtime()

# Configure
print(f"\n‚öôÔ∏è Configuring runtime agent: {agent_name}")
agentcore_runtime.configure(
    entrypoint="event_agent.py",
    execution_role=execution_role_arn["Role"]["Arn"],
    auto_create_ecr=True,
    requirements_file="requirements.txt",
    region=REGION,
    agent_name=agent_name,
    non_interactive=True,
    memory_mode="NO_MEMORY",  # We're using our own custom memory
)
print("‚úÖ Runtime configured")

# Launch
print("\nüöÄ Launching runtime (this takes ~2-3 minutes)...")
agentcore_runtime.launch(
    env_vars={
        "MEMORY_ID": MEMORY_ID,
        "KB_ID": KB_ID,
        "MODEL_ID": MODEL_ID,
        "AWS_REGION": REGION,
    }
)
print("‚úÖ Runtime launched")

# Wait for ready
print("\n‚è≥ Waiting for runtime to be READY...")
while True:
    status_response = agentcore_runtime.status()
    status = status_response.endpoint["status"]
    if status == "READY":
        print(f"\n‚úÖ Runtime is READY!")
        break
    elif status in ["CREATE_FAILED", "DELETE_FAILED", "UPDATE_FAILED"]:
        print(f"\n‚ùå Runtime deployment failed: {status}")
        break
    print(f"   Status: {status}", end="\r")
    time.sleep(10)

## Step 4.4: Test Runtime Agent

Let's test our deployed agent to ensure it's working correctly.

### Session Management

The Runtime automatically:
- Generates `session_id` if not provided
- Maintains session context across requests
- Isolates sessions from each other
- Enables conversation continuity

> üí° **Same actor_id**: We use the same actor_id from earlier so the agent can access the preferences we stored.

In [None]:
print("\n" + "=" * 70)
print("üß™ TESTING RUNTIME AGENT")
print("=" * 70)

# Test invocation
test_session_id = f"runtime-test-{str(uuid.uuid4())[:20]}"

print(f"\nüìù Invoking runtime agent...\n")
response = agentcore_runtime.invoke(
    {"prompt": "What sessions would you recommend?", "actor_id": ACTOR_ID},
    session_id=test_session_id,
)

response_text = "".join(response["response"]).strip('"')
response_text = response_text.replace(
    "\\n", "\n"
)  # Convert literal \n to actual newlines
display(Markdown(response_text))

print("\n‚úÖ Runtime agent is working!")
print("\n" + "=" * 70)
print("‚úÖ CHAPTER 4 COMPLETE: Agent deployed to Runtime!")
print("=" * 70)

---

# Chapter 5: Identity

## Overview

In this final chapter, we'll add **authentication** using AgentCore Identity and Amazon Cognito.

### Why AgentCore Identity?

**Production Requirements**:
- **Authentication**: Verify who the user is
- **Authorization**: Control access to the agent
- **User Isolation**: Each user gets their own memory namespace
- **Security**: Token-based authentication

<div style="text-align:left">
    <img src="architecture_images/image_10.png" width="80%"/>
</div>

### What We'll Do

1. Create Cognito User Pool
2. Create test user with password
3. Update agent code to verify tokens
4. Reconfigure Runtime with authentication
5. Test authenticated agent

## Step 5.1: Create Cognito User Pool

Amazon Cognito provides user management and authentication.

### What is Amazon Cognito?

Cognito is AWS's managed authentication service that provides:
- **User Management**: Create, authenticate, and manage users
- **OAuth 2.0 Support**: Industry-standard authentication
- **JWT Tokens**: Secure, stateless authentication tokens
- **MFA Support**: Multi-factor authentication (optional)
- **Social Login**: Google, Facebook, Apple (not used here)
- **SAML**: Enterprise identity federation

### JWT Token Structure

**Access Token Claims**:
```json
{
  "sub": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",  // User ID
  "token_use": "access",
  "scope": "openid profile",
  "auth_time": 1234567890,
  "iss": "https://cognito-idp.us-west-2.amazonaws.com/us-west-2_AbCdEfGhI",
  "exp": 1234571490,  // Expires in 1 hour
  "iat": 1234567890,
  "client_id": "1234567890abcdefghijk",
  "username": "testuser"
}
```

### What Gets Created

**User Pool**:
- Container for user identities
- Password policies
- Token configuration

**App Client**:
- OAuth configuration
- Token settings
- Allowed flows

**Test User**:
- Username: `testuser`
- Password: `TempPassword123!`
- Status: Confirmed

> üí° **Production Tip**: In production, integrate with your existing identity provider or implement user registration flows.

In [None]:
print("\n" + "=" * 70)
print("üîê CREATING COGNITO USER POOL")
print("=" * 70)

from utils import setup_cognito_user_pool

print("\nüîê Setting up Cognito...")
cognito_config = setup_cognito_user_pool(region=REGION)

print("\n‚úÖ Cognito setup completed!")
print(f"   Pool ID: {cognito_config['pool_id']}")
print(f"   Client ID: {cognito_config['client_id']}")
print(f"   Discovery URL: {cognito_config['discovery_url']}")
print(f"   Test user: testuser / TempPassword123!")

## Step 5.2: Update Agent File with Token Validation

Now we'll create an updated agent file that validates JWT tokens.

### Code Changes

**Updated Entrypoint**:
```python
@app.entrypoint
def runtime_agent(payload, context):
    # Extract token from header
    auth_header = context.request_headers.get('Authorization')
    
    # Validate and get user ID
    actor_id = get_user_sub(auth_header, REGION, COGNITO_USER_POOL)
    
    # Use actor_id for memory operations
    ...
```

> ‚ö†Ô∏è **Important**: The agent never sees the password - only validated tokens.

In [None]:
%%writefile event_agent_with_auth.py
import os
import jwt
import json
import boto3
import logging
from strands import Agent, tool
from jwt import PyJWKClient
from typing import Dict, Any, List
from strands.models import BedrockModel
from bedrock_agentcore.runtime import BedrockAgentCoreApp
from bedrock_agentcore.memory.session import MemorySessionManager
from strands.hooks import AgentInitializedEvent, HookProvider, HookRegistry, MessageAddedEvent
from bedrock_agentcore.memory.constants import StrategyType, ConversationalMessage, MessageRole


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("event-agent")

app = BedrockAgentCoreApp()

MODEL_ID = os.getenv('MODEL_ID')
MEMORY_ID = os.getenv('MEMORY_ID')
COGNITO_USER_POOL = os.getenv('COGNITO_USER_POOL')
REGION = os.getenv('AWS_REGION')
KB_ID = os.getenv('KB_ID')

agent = None
bedrock_agent_runtime = None

class MemoryHookProvider(HookProvider):
    """Custom hook provider to integrate with AgentCore Memory"""
    
    def __init__(self):
        logger.info(f"Initializing MemoryHookProvider")
        self.memory_session_manager = MemorySessionManager(MEMORY_ID, REGION)
    
    def on_agent_initialized(self, event: AgentInitializedEvent):
        """Load user preferences when agent starts"""
        logger.info("üéØ Agent initialization - loading preferences")
        
        actor_id = event.agent.state.get("actor_id")
        
        if not actor_id:
            logger.warning("Missing actor_id")
            return
        
        try:
            # Retrieve user preferences from LTM
            preferences = self.memory_session_manager.search_long_term_memories(
                namespace_prefix=f"/users/{actor_id}/preferences",
                query="What are the user's preferences, interests, and background?",
                top_k=5
            )
            
            if preferences:
                logger.info(f"‚úÖ Loaded {len(preferences)} preferences")
                pref_messages = []
                for pref in preferences:
                    pref_text = pref.get('content', {}).get('text', '')
                    if pref_text:
                        try:
                            pref_json = json.loads(pref_text)
                            pref_messages.append(f"- {pref_json.get('preference', pref_text)}")
                        except:
                            pref_messages.append(f"- {pref_text}")
                
                if pref_messages:
                    context = "\n".join(pref_messages)
                    event.agent.system_prompt += f"\n\n**User Preferences:**\n{context}"
                    logger.info("‚úÖ Added preferences to system prompt")
            else:
                logger.info("‚ÑπÔ∏è No preferences found yet")
                
        except Exception as e:
            logger.error(f"‚ùå Error loading preferences: {e}")
    
    def on_message_added(self, event: MessageAddedEvent):
        """Store messages in memory"""
        logger.info("üí¨ Message added - storing in memory")
        
        actor_id = event.agent.state.get("actor_id")
        session_id = event.agent.state.get("session_id")
        
        if not all([actor_id, session_id]):
            logger.warning("Missing required state values")
            return
        
        try:
            messages = event.agent.messages
            last_message = messages[-1]
            message_content = str(last_message.get("content", ""))
            if last_message["role"] == "user":
                message_role = MessageRole.USER
            elif last_message["role"] == "assistant":
                message_role = MessageRole.ASSISTANT
            
            self.memory_session_manager.add_turns(
                actor_id=actor_id,
                session_id=session_id,
                messages=[ConversationalMessage(message_content, message_role)]
            )
            logger.info("‚úÖ Message stored")
            
        except Exception as e:
            logger.error(f"‚ùå Error storing message: {e}")
    
    def register_hooks(self, registry: HookRegistry):
        """Register hooks with the agent"""
        registry.add_callback(AgentInitializedEvent, self.on_agent_initialized)
        registry.add_callback(MessageAddedEvent, self.on_message_added)

@tool
def search_reinvent_sessions(query: str) -> List[Dict[str, Any]]:
    global bedrock_agent_runtime, KB_ID
    
    try:
        if not bedrock_agent_runtime:
            bedrock_agent_runtime = boto3.client('bedrock-agent-runtime', region_name=REGION)
        
        response = bedrock_agent_runtime.retrieve(
            knowledgeBaseId=KB_ID,
            retrievalQuery={'text': query},
            retrievalConfiguration={
                'vectorSearchConfiguration': {
                    'numberOfResults': 5
                }
            }
        )
        
        results = []
        for idx, item in enumerate(response.get('retrievalResults', []), 1):
            results.append({
                'rank': idx,
                'content': item.get('content', {}).get('text', ''),
                'score': item.get('score', 0.0)
            })
        
        return results
    except Exception as e:
        return [{"error": str(e)}]

def get_user_sub(access_token: str, region: str, user_pool_id: str) -> str:
    """Extract user ID from Cognito token"""
    access_token = access_token[7:]  # Remove 'Bearer ' prefix
    jwks_url = f"https://cognito-idp.{region}.amazonaws.com/{user_pool_id}/.well-known/jwks.json"
    jwks_client = PyJWKClient(jwks_url)
    signing_key = jwks_client.get_signing_key_from_jwt(access_token)

    decoded = jwt.decode(
        access_token,
        signing_key.key,
        algorithms=["RS256"],
        issuer=f"https://cognito-idp.{region}.amazonaws.com/{user_pool_id}",
        options={"require": ["exp", "iat", "iss", "token_use"]}
    )

    if decoded.get("token_use") != "access":
        raise jwt.InvalidTokenError("Token is not an access token")

    return decoded["sub"]

def initialize_agent(actor_id, session_id):
    global agent
    
    model = BedrockModel(model_id=MODEL_ID)
    memory_hook = MemoryHookProvider()
    
    agent = Agent(
        model=model,
        hooks=[memory_hook],
        tools=[search_reinvent_sessions],
        system_prompt="""You're an intelligent event assistant with long-term memory.
        Use search_reinvent_sessions to find session information.
        If you get enough information in your first search don't do additional tool calls.
        Provide personalized recommendations based on user preferences.
        """,
        state={
            "actor_id": actor_id,
            "session_id": session_id
        }
    )

@app.entrypoint
def runtime_agent(payload, context):
    global agent
    
    # Extract user ID from Cognito token
    auth_header = context.request_headers.get('Authorization')
    actor_id = get_user_sub(auth_header, REGION, COGNITO_USER_POOL)
    
    user_input = payload.get("prompt")
    session_id = context.session_id
    
    if not user_input:
        return "Error: Missing 'prompt' field"
    
    if agent is None:
        initialize_agent(actor_id, session_id)
    
    response = agent(user_input)
    return response.message['content'][0]['text']

if __name__ == "__main__":
    app.run()

## Step 5.3: Reconfigure Runtime with Authentication

Now we'll reconfigure the Runtime to use the authenticated agent.

### Runtime Authorizer Configuration

**customJWTAuthorizer**:
```python
{
    "customJWTAuthorizer": {
        "discoveryUrl": "https://cognito-idp.region.amazonaws.com/poolId/.well-known/openid-configuration",
        "allowedClients": ["client-id-1", "client-id-2"]
    }
}
```

**What This Does**:
- Runtime validates JWT automatically
- Rejects invalid or expired tokens
- Only allows tokens from specified client IDs
- Forwards valid tokens to agent code

### Request Header Configuration

**requestHeaderAllowlist**:
```python
{"requestHeaderAllowlist": ["Authorization"]}
```

**Why This Matters**:
- By default, Runtime filters all headers
- We need Authorization header in agent code
- Allowlist makes it available to `context.request_headers`
- Security: Only explicitly allowed headers passed through

### Updated Configuration

**New Parameters**:
- `entrypoint`: "event_agent_with_auth.py" (updated file)
- `request_header_configuration`: Forward Authorization header
- `authorizer_configuration`: Cognito JWT validation

**New Environment Variable**:
- `COGNITO_USER_POOL`: Pool ID for token validation

> üí° **Two-Layer Security**: Runtime validates tokens AND agent code validates them again for defense in depth.

In [None]:
print("\n" + "=" * 70)
print("üîê RECONFIGURING RUNTIME WITH AUTHENTICATION")
print("=" * 70)

auth_agent_name = f"auth_event_agent_{UNIQUE_ID}"

# Configure with Cognito authorizer
print(f"\n‚öôÔ∏è Configuring authenticated runtime...")
agentcore_runtime.configure(
    entrypoint="event_agent_with_auth.py",
    execution_role=execution_role_arn["Role"]["Arn"],
    auto_create_ecr=True,
    requirements_file="requirements.txt",
    region=REGION,
    agent_name=auth_agent_name,
    non_interactive=True,
    memory_mode="NO_MEMORY",
    request_header_configuration={"requestHeaderAllowlist": ["Authorization"]},
    authorizer_configuration={
        "customJWTAuthorizer": {
            "discoveryUrl": cognito_config["discovery_url"],
            "allowedClients": [cognito_config["client_id"]],
        }
    },
)
print("‚úÖ Configured with Cognito authentication")

# Launch
print("\nüöÄ Launching authenticated runtime...")
agentcore_runtime.launch(
    env_vars={
        "MEMORY_ID": MEMORY_ID,
        "COGNITO_USER_POOL": cognito_config["pool_id"],
        "KB_ID": KB_ID,
        "MODEL_ID": MODEL_ID,
        "AWS_REGION": REGION,
    }
)

# Wait for ready
print("\n‚è≥ Waiting for authenticated runtime to be READY...")
while True:
    status_response_auth = agentcore_runtime.status()
    status = status_response_auth.endpoint["status"]
    if status == "READY":
        print(f"\n‚úÖ Authenticated runtime is READY!")
        break
    elif status in ["CREATE_FAILED", "DELETE_FAILED", "UPDATE_FAILED"]:
        print(f"\n‚ùå Deployment failed: {status}")
        break
    print(f"   Status: {status}", end="\r")
    time.sleep(10)

## Step 5.4: Test Authenticated Agent

Finally, let's test the fully authenticated agent.

In [None]:
print("\n" + "=" * 70)
print("üß™ TESTING AUTHENTICATED AGENT")
print("=" * 70)

auth_session_id = f"auth-test-{str(uuid.uuid4())[:25]}"

print(f"\nüìù Invoking with authentication...\n")
response = agentcore_runtime.invoke(
    {"prompt": "What sessions would you recommend for me based on AI and Security?"},
    session_id=auth_session_id,
    bearer_token=cognito_config["bearer_token"],
)

response_text = json.loads(response["response"])
display(Markdown(f"**Agent Response:**\n\n{response_text}"))

print("\n‚úÖ Authenticated agent is working!")
print("   User was authenticated via Cognito")
print("   User ID was extracted from token")
print("   Memory is isolated per user")

print("\n" + "=" * 70)
print("‚úÖ CHAPTER 5 COMPLETE: Authentication is working!")
print("=" * 70)

---

# üéâ Notebook Complete!


## Final Architecture

<div style="text-align:left">
    <img src="architecture_images/image_1.png" width="80%"/>
</div>

---

## What We Built

Congratulations! You've built a complete **Advanced Agentic RAG System** with:

### ‚úÖ Chapter 1: The Agent
- Strands-based intelligent agent
- Tool integration framework
- Conversational capabilities

### ‚úÖ Chapter 2: The Knowledge Base
- S3 bucket with 583 re:Invent session documents
- S3 Vectors for efficient vector storage
- Bedrock Knowledge Base with automatic ingestion
- Semantic search capability
- Custom RAG tool for agent

### ‚úÖ Chapter 3: The Memory
- AgentCore Memory with short-term and long-term storage
- UserPreference strategy for automatic learning
- Strands hooks for lifecycle control
- Cross-session memory persistence
- Background preference extraction

### ‚úÖ Chapter 4: Runtime
- Containerized agent deployment
- AgentCore Runtime for scalability
- Isolated session execution
- Production-ready infrastructure
- CloudWatch logs and X-Ray tracing

### ‚úÖ Chapter 5: Identity
- Amazon Cognito authentication
- Token-based authorization
- User isolation
- Secure access control
- JWT validation
---

## Congratulations! üéâ

You've successfully built and deployed:
- ‚úÖ Intelligent agent with Strands framework
- ‚úÖ RAG with Knowledge Base and S3 Vectors
- ‚úÖ Long-term memory with automatic learning
- ‚úÖ Production deployment on AgentCore Runtime
- ‚úÖ Secure authentication with Cognito
- ‚úÖ Cross-session memory persistence
- ‚úÖ Personalized recommendations


## Cleanup Resources

To avoid ongoing charges, remember to clean up the resources.

> ‚ö†Ô∏è **Important**: Only run these cleanup cells when you're completely done with the workshop.

In [None]:
# Only run this cell if you want to delete all resources
from utils import empty_and_delete_bucket

print("\n" + "=" * 70)
print("üóëÔ∏è  CLEANUP: Deleting Resources")
print("=" * 70)

# 1. Delete the AgentCore Runtimes
print("\n1. Deleting AgentCore Runtimes...")
try:
    agentcore_control_client = boto3.client(
        "bedrock-agentcore-control", region_name=REGION
    )

    # Delete first runtime
    runtime_delete_response = agentcore_control_client.delete_agent_runtime(
        agentRuntimeId=status_response.config.agent_id,
    )
    print(f"   ‚úÖ Deleted AgentCore Runtime: {status_response.config.agent_id}")

    # Delete authenticated runtime
    runtime_delete_response = agentcore_control_client.delete_agent_runtime(
        agentRuntimeId=status_response_auth.config.agent_id,
    )
    print(f"   ‚úÖ Deleted AgentCore Runtime: {status_response_auth.config.agent_id}")
except Exception as e:
    print(f"   ‚ö†Ô∏è Error deleting AgentCore Runtime: {e}")

# 2. Delete the ECR repository
print("\n2. Deleting ECR Repository...")
try:
    ecr_client = boto3.client("ecr", region_name=REGION)
    repository_name = status_response.config.ecr_repository.split("/")[1]

    response = ecr_client.delete_repository(
        repositoryName=repository_name,
        force=True,  # Force deletion even if it contains images
    )
    print(f"   ‚úÖ Deleted ECR repository: {repository_name}")
except Exception as e:
    print(f"   ‚ö†Ô∏è Error deleting ECR repository: {e}")

# 3. Delete the memory resource
print("\n3. Deleting Memory Resource...")
try:
    memory_manager.delete_memory(memory_id=MEMORY_ID)
    print(f"   ‚úÖ Deleted memory resource: {MEMORY_ID}")
except Exception as e:
    print(f"   ‚ö†Ô∏è Error deleting memory resource: {e}")

# 4. Delete the Cognito User Pool and associated resources
print("\n4. Deleting Cognito User Pool...")
try:
    cognito_client = boto3.client("cognito-idp", region_name=REGION)
    pool_id = cognito_config["pool_id"]

    # Delete the user pool
    cognito_client.delete_user_pool(UserPoolId=pool_id)
    print(f"   ‚úÖ Deleted Cognito User Pool: {pool_id}")

except Exception as e:
    print(f"   ‚ö†Ô∏è Error deleting Cognito resources: {e}")

# 5. Delete IAM roles
print("\n5. Deleting IAM Roles...")


def delete_iam_role(role_arn):
    """Delete IAM role and all its policies"""
    try:
        iam_client = boto3.client("iam", region_name=REGION)
        role_name = role_arn.split("/")[-1]

        # Detach managed policies
        attached_policies = iam_client.list_attached_role_policies(RoleName=role_name)
        for policy in attached_policies.get("AttachedPolicies", []):
            iam_client.detach_role_policy(
                RoleName=role_name, PolicyArn=policy["PolicyArn"]
            )

        # Delete inline policies
        inline_policies = iam_client.list_role_policies(RoleName=role_name)
        for policy_name in inline_policies.get("PolicyNames", []):
            iam_client.delete_role_policy(RoleName=role_name, PolicyName=policy_name)

        # Delete role
        iam_client.delete_role(RoleName=role_name)
        print(f"   ‚úÖ Deleted IAM role: {role_name}")
    except Exception as e:
        print(f"   ‚ö†Ô∏è Error deleting IAM role: {e}")


delete_iam_role(execution_role_arn["Role"]["Arn"])
delete_iam_role(kb_role_arn)

# 6. Delete Knowledge Base
print("\n6. Deleting Knowledge Base...")
try:
    bedrock_agent_client.delete_knowledge_base(knowledgeBaseId=KB_ID)
    print(f"   ‚úÖ Knowledge Base deleted: {KB_ID}")
except Exception as e:
    print(f"   ‚ö†Ô∏è Error deleting Knowledge Base: {e}")

# 7. Delete Vector Store
print("\n7. Deleting Vector Store...")
try:
    s3vectors_client.delete_index(
        vectorBucketName=vector_store_name, indexName=vector_index_name
    )
    print(f"   ‚úÖ S3 Vector Index deleted: {vector_index_name}")
except Exception as e:
    print(f"   ‚ö†Ô∏è Error deleting Vector Index: {e}")

try:
    s3vectors_client.delete_vector_bucket_policy(vectorBucketName=vector_store_name)
    print(f"   ‚úÖ S3 Vector Store policy deleted")
except Exception as e:
    print(f"   ‚ö†Ô∏è Error deleting Vector Store policy: {e}")

try:
    s3vectors_client.delete_vector_bucket(vectorBucketName=vector_store_name)
    print(f"   ‚úÖ S3 Vector Store deleted: {vector_store_name}")
except Exception as e:
    print(f"   ‚ö†Ô∏è Error deleting Vector Store: {e}")

# 8. Delete S3 Bucket
print("\n8. Deleting S3 Bucket...")
try:
    empty_and_delete_bucket(BUCKET_NAME)
    print(f"   ‚úÖ S3 Bucket emptied and deleted: {BUCKET_NAME}")
except Exception as e:
    print(f"   ‚ö†Ô∏è Error emptying and deleting S3 Bucket: {e}")

print("\n" + "=" * 70)
print("‚úÖ CLEANUP COMPLETE")
print("=" * 70)
print("\nAll resources have been deleted.")