# AgentCore Self-Managed Memory Strategy 데모

이 노트북은 boto3를 사용하여 Amazon Bedrock AgentCore self-managed memory strategy를 설정하고 사용하는 방법을 보여줍니다. Self-managed memory strategy를 사용하면 대화 이벤트에 의해 트리거되는 메모리 추출 및 통합을 위한 커스텀 파이프라인을 생성할 수 있습니다.

## 작동 방식

1. 트리거 구성: 단기 메모리 이벤트를 기반으로 파이프라인을 호출하는 트리거 조건(메시지 수, 유휴 시간 초과, 토큰 수)을 정의합니다
2. 알림 수신: 트리거 조건이 충족되면 AgentCore가 SNS 토픽에 알림을 게시합니다
3. 페이로드 처리: AgentCore가 대화 데이터를 S3 버킷에 전달합니다
4. 메모리 레코드 추출 및 저장: 커스텀 파이프라인이 페이로드를 검색하고 메모리를 처리합니다

Self-managed memory strategy에 대한 자세한 내용은 [공식 AWS 문서](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/memory-self-managed-strategies.html#use-self-managed-strategy)를 참조하세요.

## 설정 개요

이 데모는 다음을 수행합니다:
1. 필요한 AWS 인프라 생성 (S3, SNS, SQS, Lambda, IAM 역할)
2. Self-managed strategy를 사용하는 AgentCore 메모리 생성
3. 메모리 처리 파이프라인을 시연하기 위한 테스트 이벤트 생성
4. 저장된 메모리의 검색 및 사용을 시연하기 위한 Agent 생성
5. 완료 후 리소스 정리

## 설정 및 Import

In [None]:
!pip install -r requirements.txt --quiet

In [None]:
import boto3
import json
import time
import uuid
import os
from datetime import datetime
from aws_utils import AWSUtils  # AWS 리소스 관리를 위한 유틸리티 클래스

# Configure AWS region
region_name = 'us-east-1'  # Change to your preferred region
aws_utils = AWSUtils(region_name=region_name)

# Read Lambda function code
with open('lambda_function.py', 'r') as f:
    lambda_code = f.read()


## Step 1: 페이로드 전달을 위한 S3 버킷 생성

트리거 조건이 충족되면 AgentCore가 대화 페이로드를 전달할 S3 버킷을 생성합니다.

In [None]:
# Create S3 bucket with a unique name
bucket_name = aws_utils.create_s3_bucket('agentcore-memory-payloads')  # 고유한 이름으로 S3 버킷 생성
print(f"S3 bucket created: {bucket_name}")

## Step 2: 메모리 작업 알림을 위한 SNS 토픽 생성

AgentCore가 메모리 처리 파이프라인을 트리거할 때 알림을 받을 SNS 토픽을 생성합니다.

In [None]:
# Create SNS topic
sns_topic_name = f"agentcore-memory-notifications-{int(time.time())}"
sns_topic_arn = aws_utils.create_sns_topic(sns_topic_name)  # 알림을 위한 SNS 토픽 생성
print(f"SNS topic created: {sns_topic_arn}")

## Step 3: SNS 구독이 있는 SQS 큐 생성

SNS 토픽을 구독하는 SQS 큐를 생성합니다. 이 큐는 Lambda 함수를 트리거할 메모리 작업 알림을 받습니다.

In [None]:
# Create SQS queue and subscribe to SNS topic
queue_name = f"agentcore-memory-queue-{int(time.time())}"
queue_url, queue_arn = aws_utils.create_sqs_queue_with_sns_subscription(queue_name, sns_topic_arn)  # SQS 큐 생성 및 SNS 구독 설정
print(f"SQS queue created: {queue_url}")

## Step 4: IAM 역할 생성

두 개의 IAM 역할을 생성합니다:
1. AgentCore가 S3 및 SNS에 액세스하기 위한 역할
2. Lambda가 S3, SQS 및 AgentCore API에 액세스하기 위한 역할

In [None]:
# Create IAM role for AgentCore
agentcore_role_name = f"AgentCoreMemoryExecutionRole-{int(time.time())}"
agentcore_role_arn = aws_utils.create_iam_role_for_agentcore(
    agentcore_role_name, 
    bucket_name, 
    sns_topic_arn
)  # AgentCore가 S3와 SNS에 접근하기 위한 IAM 역할 생성
print(f"AgentCore IAM role created: {agentcore_role_arn}")

# Create IAM role for Lambda
lambda_role_name = f"LambdaMemoryProcessingRole-{int(time.time())}"
lambda_role_arn = aws_utils.create_iam_role_for_lambda(
    lambda_role_name, 
    bucket_name, 
    queue_arn
)  # Lambda가 S3, SQS, AgentCore API에 접근하기 위한 IAM 역할 생성
print(f"Lambda IAM role created: {lambda_role_arn}")

## Step 5: 메모리 처리를 위한 Lambda 함수 생성

SQS 메시지에 의해 트리거될 Lambda 함수를 생성합니다. 이 함수는 다음을 수행합니다:
1. S3에서 대화 페이로드 다운로드
2. Bedrock model을 사용하여 메모리 추출
3. 추출된 메모리를 AgentCore에 다시 저장

In [None]:
# Create Lambda function
function_name = f"agentcore-memory-processor-{int(time.time())}"
function_arn = aws_utils.create_lambda_function(
    function_name,
    lambda_role_arn,
    lambda_code
)  # 메모리 처리를 위한 Lambda 함수 생성
print(f"Lambda function created: {function_arn}")

# Add SQS trigger to Lambda
event_source_uuid = aws_utils.add_sqs_trigger_to_lambda(function_name, queue_arn)  # Lambda에 SQS 트리거 연결
print(f"SQS trigger added to Lambda: {event_source_uuid}")

## Step 6: Self-Managed Strategy를 사용하는 AgentCore 메모리 생성

설정한 인프라를 사용하는 self-managed strategy 구성으로 AgentCore 메모리를 생성합니다.

In [None]:

import importlib
import aws_utils
importlib.reload(aws_utils)  # aws_utils 모듈 리로드

# # Create a new instance of AWSUtils with the updated code
aws_utils = aws_utils.AWSUtils(region_name=region_name)

# Create memory with self-managed strategy
memory_name = f"SelfManageMemory{int(time.time())}"
memory_description = "Demo memory using self-managed strategy"

memory_id = aws_utils.create_memory_with_self_managed_strategy(
    memory_name=memory_name,
    memory_description=memory_description,
    role_arn=agentcore_role_arn,
    sns_topic_arn=sns_topic_arn,
    s3_bucket_name=bucket_name,
    message_trigger_count=3,  # Trigger after 3 messages
    token_trigger_count=500,  # Trigger after ~500 tokens
    idle_timeout=300,         # Trigger after 5 minutes of idle time
    historical_window_size=5  # Include 5 previous messages in context
)

print(f"Memory created: {memory_id}")
# print(f"Strategy ID: {strategy_id}")

In [None]:
def wait_for_memory_to_get_active(memory_id):
    response = aws_utils.agentcore_client_control.get_memory(
        memoryId = memory_id)

    # 메모리 상태가 ACTIVE가 될 때까지 대기
    while response['memory']['status'] != 'ACTIVE':
        time.sleep(30)
        response = aws_utils.agentcore_client_control.get_memory(
        memoryId = memory_id)
        print(f"Memory creation status: {response['memory']['status']}")
    return response['memory']['status']

wait_for_memory_to_get_active(memory_id=memory_id)

## Step 7: 메모리 파이프라인을 트리거하기 위한 테스트 이벤트 생성

이제 self-managed memory 파이프라인을 트리거하기 위한 테스트 이벤트를 생성해 보겠습니다. 메시지 트리거 수를 초과하기에 충분한 이벤트를 생성합니다.

In [None]:
actor_id = "test-user-123"

In [None]:
# Create test events
session_id = aws_utils.create_test_events(
    memory_id=memory_id,
    actor_id=actor_id,
    num_events=6  # This will exceed our message_trigger_count of 3
)  # 메시지 트리거 수를 초과하는 테스트 이벤트 생성

print(f"Created test events with session ID: {session_id}")

In [None]:
aws_utils.agentcore_client.list_events(
    memoryId = memory_id, 
    actorId = actor_id,
    sessionId = session_id )

## Step 8: 메모리 처리 대기

이제 메모리 처리 파이프라인이 실행될 때까지 기다려야 합니다. 여기에는 다음이 포함됩니다:
1. AgentCore가 트리거 조건 감지 (메시지 수 초과)
2. AgentCore가 SNS에 알림 게시
3. SNS가 SQS에 메시지 전달
4. SQS가 Lambda 함수 트리거
5. Lambda가 대화를 처리하고 메모리 저장

잠시 기다린 후 메모리가 생성되었는지 확인해 보겠습니다.

In [None]:
print("Waiting 30 seconds for memory processing to complete...")
time.sleep(30)

## Step 9: 메모리 레코드 확인

메모리 파이프라인이 메모리 레코드를 생성했는지 메모리를 검색하여 확인해 보겠습니다.

In [None]:
session_id

In [None]:
# List memory records
namespace=f"/interests/actor/{actor_id}/session/{session_id}/"
def list_memory_records(memory_id, namespace):
    try:
        response = aws_utils.agentcore_client.list_memory_records(
            memoryId=memory_id,
            namespace=namespace
        )
        print(f"Found {len(response.get('memoryRecordSummaries'))} memory records")
        
        # Display the search results
        for idx, result in enumerate(response.get("memoryRecordSummaries")):
            print(f"Memory: {idx}")
            print(f"Content: {result['content']['text']}")
    except Exception as e:
        print(f"Error searching memory: {e}")
list_memory_records(memory_id, namespace)

위의 레코드는 통합 로직을 추가하지 않았기 때문에 사용자 관심사의 반복을 보여줍니다. 따라서 반복이 있으며, self managed strategy를 제공하는 기능을 통해 추출 및 수집만 원하는지 정의할 수 있습니다. 이는 비즈니스 사용 사례에 따라 달라집니다.

In [None]:
# Search memory records
def retrieve_memory_records(memory_id, query, topK, namespace):
    try:
        response = aws_utils.agentcore_client.retrieve_memory_records(
            memoryId=memory_id,
            searchCriteria = {
            'searchQuery': query,
            'topK': topK  # 상위 K개의 관련 메모리 검색
        },
            namespace=namespace
        )
        print(f"Found {len(response.get('memoryRecordSummaries'))} memory records")
        
        # Display the search results
        for idx, result in enumerate(response.get('memoryRecordSummaries')):
            print(f"\nMemory Record {idx + 1}:")
            print(f"Content: {result['content']['text']}")
    except Exception as e:
        print(f"Error searching memory: {e}")

retrieve_memory_records(memory_id=memory_id, query="food choices for dinner", topK=5, namespace=namespace)

## Step 10: 다른 콘텐츠로 추가 테스트 이벤트 생성

다른 메모리 처리 사이클을 트리거하기 위해 다른 콘텐츠로 더 많은 테스트 이벤트를 생성해 보겠습니다.

In [None]:
# Create custom test events
session_id = str(uuid.uuid4())
actor_id = "test-user-456"

# Custom events with more specific information
test_events = [
    {
        "user": "I'm trying to eat healthier and have been exploring Mediterranean cuisine lately.",
        "assistant": "That's wonderful! Mediterranean food is both delicious and nutritious. What Mediterranean dishes have you tried so far?"
    },
    {
        "user": "I love Greek salads with feta cheese and olives, and I've been making homemade hummus.",
        "assistant": "Homemade hummus is fantastic! Do you prefer it with tahini or without? And what's your favorite way to serve it?"
    },
    {
        "user": "I always use tahini and like to serve it with fresh vegetables and pita bread. I'm also vegetarian, so I avoid meat.",
        "assistant": "Being vegetarian opens up so many Mediterranean options! Have you tried making stuffed grape leaves or lentil-based dishes?"
    },
    {
        "user": "Not yet, but I'd love to learn. I'm also allergic to shellfish, so I have to be careful with seafood dishes.",
        "assistant": "Good to know about the shellfish allergy. For vegetarian Mediterranean cooking, you might enjoy making moussaka with eggplant or trying some traditional Greek bean dishes. Would you like some recipe suggestions?"
    }
]

# Create events
for idx, event in enumerate(test_events):
    try:
        event_payload = [
            {
                'conversational': {
                    'content': {
                        'text': event['user']
                    },
                    'role': 'USER'
                }
            },
            {
                'conversational': {
                    'content': {
                        'text': event['assistant']
                    },
                    'role': 'ASSISTANT'
                }
            }
        ]

        aws_utils.agentcore_client.create_event(
            memoryId=memory_id,
            actorId=actor_id,
            sessionId=session_id,
            eventTimestamp=int(time.time()),
            payload=event_payload,
            clientToken=str(uuid.uuid4())  # 중복 방지를 위한 고유 토큰
        )

        print(f"Created event {idx+1}/{len(test_events)}")
        time.sleep(1)

    except Exception as e:
        print(f"Error creating test event: {e}")

print("\nWaiting 30 seconds for memory processing to complete...")
time.sleep(30)

## Step 11: 새로운 메모리 검색

이제 하이킹과 사용자의 개에 관련된 새로운 메모리를 검색해 보겠습니다.

In [None]:
# Search memory records for outdoor activities
namespace=f"/interests/actor/{actor_id}/session/{session_id}/"
retrieve_memory_records(memory_id=memory_id, query="dog pets golden retriever", topK=5, namespace=namespace)

## Step 12: Agent 생성

이 섹션에서는 hook을 통해 AgentCore Self-Managed Memory와 통합된 Strands agent를 사용하여 지능형 요리 어시스턴트를 구축하는 방법을 설명합니다. 이전 대화와 개인 취향을 기반으로 개인화된 레스토랑 추천을 제공하기 위해 사용자 음식 선호도, 식이 제한 및 식사 기록에 대한 장기 메모리에 중점을 둡니다



In [None]:
import logging
import json
from typing import Dict
from datetime import datetime
from botocore.exceptions import ClientError

# Setup logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("customer-support")

# Import required modules
from strands import Agent, tool
from strands.hooks import AfterInvocationEvent, HookProvider, HookRegistry, MessageAddedEvent
from ddgs import DDGS  # DuckDuckGo 검색 라이브러리
from bedrock_agentcore.memory import MemoryClient

# Initialize MemoryClient
client = MemoryClient(region_name=region_name)

## Step 13: Self-Managed Memory를 사용하는 요리 어시스턴트를 위한 Memory Hook Provider 생성

Hook은 agent의 실행 라이프사이클의 특정 지점에서 실행되는 특수 함수입니다. 커스텀 hook provider는 self-managed memory strategy를 활용하여 다음과 같이 요리 컨텍스트를 자동으로 관리합니다:

- **관련 음식 선호도 검색** self-managed memory 레코드에서
- **컨텍스트 정보 주입** 식이 제한, 요리 선호도 및 식사 기록에 대한 정보를 새 쿼리에 주입
- **식사 상호작용 저장** 배치 작업을 사용하여 향후 참조를 위해 저장

이는 다음과 같은 원활한 메모리 경험을 생성합니다:
- 각 쿼리를 처리하기 전에 저장된 음식 선호도를 자동으로 검색
- 식사 기록을 기반으로 컨텍스트 인식 레스토랑 추천 제공

Self-managed 접근 방식은 음식 선호도가 저장, 검색 및 사용되어 식사 추천 경험을 향상시키는 방법을 완전히 제어할 수 있게 합니다.


In [None]:
# Helper function to get namespaces from memory strategies list
def get_namespaces(mem_client: MemoryClient, memory_id: str) -> Dict:
    """Get namespace mapping for memory strategies."""
    strategies = mem_client.get_memory_strategies(memory_id)
    # 각 strategy의 type을 키로, namespace를 값으로 하는 딕셔너리 생성
    return {i["type"]: i["namespaces"][0] for i in strategies}

In [None]:
class CulinaryAssistantMemoryHooks(HookProvider):
    """Memory hooks for culinary assistant agent"""
    
    def __init__(self, memory_id: str, namespace: str):
        self.memory_id = memory_id
        self.namespace = namespace
    
    def retrieve_food_preferences(self, event: MessageAddedEvent):
        """Retrieve user food preferences before processing dining query"""
        messages = event.agent.messages
        # 마지막 메시지가 사용자 메시지이고 tool 결과가 아닌 경우
        if messages[-1]["role"] == "user" and "toolResult" not in messages[-1]["content"][0]:
            user_query = messages[-1]["content"][0]["text"]
            
            try:
                # Retrieve food preferences using direct API
                response = aws_utils.agentcore_client.retrieve_memory_records(
                    memoryId=self.memory_id,
                    searchCriteria={
                        'searchQuery': user_query,
                        'topK': 5  # 상위 5개의 관련 음식 선호도 검색
                    },
                    namespace=self.namespace
                )
                
                memory_records = response.get('memoryRecordSummaries', [])
                
                if memory_records:
                    # Format retrieved preferences
                    preferences_context = []
                    for record in memory_records:
                        content = record.get('content', {}).get('text', '').strip()
                        if content:
                            preferences_context.append(content)
                    
                    # Inject food preferences into the query
                    if preferences_context:
                        context_text = "\n".join(preferences_context)
                        original_text = messages[-1]["content"][0]["text"]
                        # 사용자 쿼리에 음식 선호도 컨텍스트 주입
                        messages[-1]["content"][0]["text"] = (
                            f"User Food Preferences:\n{context_text}\n\n{original_text}"
                        )
                        logger.info(f"Retrieved {len(preferences_context)} food preference records")
                
            except Exception as e:
                logger.error(f"Failed to retrieve food preferences: {e}")
    
    def save_dining_interaction(self, event: AfterInvocationEvent):
        """Save dining recommendation interaction after agent response"""
        try:
            messages = event.agent.messages
            if len(messages) >= 2 and messages[-1]["role"] == "assistant":
                # Get last user query and agent response
                user_query = None
                agent_response = None
                
                # 역순으로 메시지를 순회하여 마지막 사용자 쿼리와 agent 응답 추출
                for msg in reversed(messages):
                    if msg["role"] == "assistant" and not agent_response:
                        agent_response = msg["content"][0]["text"]
                    elif msg["role"] == "user" and not user_query and "toolResult" not in msg["content"][0]:
                        user_query = msg["content"][0]["text"]
                        break
                
                if user_query and agent_response:
                    # Save the interaction using direct API
                    interaction_content = f"Query: {user_query}\nRecommendation: {agent_response}"
                    
                    # You would use create_memory_record API here
                    # aws_utils.agentcore_client.create_memory_record(...)
                    
                    logger.info("Saved dining interaction to memory")
                    
        except Exception as e:
            logger.error(f"Failed to save dining interaction: {e}")
    
    def register_hooks(self, registry: HookRegistry) -> None:
        """Register culinary assistant memory hooks"""
        # MessageAddedEvent 발생 시 음식 선호도 검색 hook 등록
        registry.add_callback(MessageAddedEvent, self.retrieve_food_preferences)
        # AfterInvocationEvent 발생 시 식사 상호작용 저장 hook 등록
        registry.add_callback(AfterInvocationEvent, self.save_dining_interaction)
        logger.info("Culinary assistant memory hooks registered")

## Step 14: 요리 어시스턴트 Agent 생성

In [None]:
# Create memory hooks for culinary assistant
print(memory_id)
culinary_hooks = CulinaryAssistantMemoryHooks(memory_id, namespace)

# Create culinary assistant agent
culinary_agent = Agent(
    hooks=[culinary_hooks],  # 메모리 hook 등록
    model="global.anthropic.claude-haiku-4-5-20251001-v1:0",
    tools=[],  # Update these tools as needed
    state={"actor_id": actor_id, "session_id": session_id},  # agent 상태 정보
    system_prompt="""You are the Culinary Assistant, a sophisticated restaurant recommendation assistant.

PURPOSE:
- Help users discover restaurants based on their preferences
- Remember user preferences throughout the conversation
- Provide personalized dining recommendations

You have access to a Memory tool that enables you to:
- Store user preferences (dietary restrictions, favorite cuisines, budget preferences, etc.)
- Retrieve previously stored information to personalize recommendations"""
)

print("✅ Culinary assistant agent created with memory capabilities")

#### Agent가 준비되었습니다. 

### 요리 어시스턴트 시나리오를 테스트해 보겠습니다

In [None]:
response1 = culinary_agent("what are the food choices for Dinner?")
print(f"Support Agent: {response1}")

## Step 15: 리소스 정리

이제 불필요한 비용이 발생하지 않도록 생성한 모든 리소스를 정리하겠습니다.

In [None]:
# Clean up all resources
import importlib
import aws_utils
importlib.reload(aws_utils)  # aws_utils 모듈 리로드

# # Create a new instance of AWSUtils with the updated code
aws_utils = aws_utils.AWSUtils(region_name=region_name)

# # Clean up resources with auto-discovery
aws_utils.cleanup_resources(discover_resources=True)  # 생성된 모든 리소스 자동 탐지 및 정리
print("All resources have been cleaned up!")

## 요약

이 노트북에서는 다음 방법을 시연했습니다:

1. Self-managed memory에 필요한 AWS 인프라 설정
2. Self-managed strategy를 사용하는 AgentCore 메모리 생성
3. 메모리 처리를 위한 트리거 조건 구성
4. Lambda 기반 메모리 처리 파이프라인 구현
5. 샘플 대화로 메모리 시스템 테스트
6. 추출된 메모리 검색
7. Self managed memory를 테스트하기 위한 요리 agent 생성
8. 모든 리소스 정리

Self-managed memory strategy는 메모리 추출에 대한 완전한 제어를 제공하여 특정 사용 사례에 맞는 커스텀 파이프라인을 구축할 수 있게 합니다.