# Lab 4: Building Multi-Agent Systems with Asynchronous Communication

## Introduction

Welcome to Lab 4! In this lab, we'll take our asynchronous agent architecture to the next level by implementing a multi-agent system. Building on the foundations established in previous labs, we'll create a collaborative workflow where specialized agents work together to accomplish complex tasks.

Real-world AI applications often require diverse expertise and capabilities that are difficult to encapsulate in a single agent. By distributing responsibilities across multiple specialized agents, we can create more robust, maintainable, and scalable systems.

By the end of this lab, you'll understand how to:
- Implement the "Agents as Tools" pattern for inter-agent communication
- Create specialized agents with distinct responsibilities and expertise
- Configure parent-child relationships between collaborating agents
- Manage task routing and result callbacks between agents
- Build a complete multi-agent workflow with state persistence

## The Multi-Agent Pattern

The multi-agent pattern addresses several challenges in complex AI systems:

1. **Specialization**: Each agent can focus on what it does best, with tailored instructions and tools
2. **Modularity**: Agents can be developed, tested, and deployed independently
3. **Scalability**: Different agent types can scale based on their specific workload
4. **Maintainability**: Changes to one agent's functionality don't require changes to others

Our implementation will focus on a two-agent system:
- **Post Generator Agent**: Creates engaging social media content
- **Evaluator Agent**: Assesses content against brand guidelines

These agents will communicate asynchronously through SQS queues, with each agent maintaining its state in DynamoDB while waiting for responses from other agents.

## Architecture Components

Our multi-agent architecture builds on our previous work and adds these components:

1. **Evaluator Agent SQS Queue**:
   - Receives evaluation tasks from the post generator agent
   - Manages the queue of content to be evaluated

2. **Evaluator Agent Lambda Function**:
   - Contains the specialized evaluator agent code
   - Processes evaluation tasks from its SQS queue
   - Returns results to the requesting agent

Let's implement this architecture step by step to create a sophisticated system of collaborating agents that can handle complex workflows requiring different types of expertise.

⚠️⚠️ Please run the [prerequisites](../lab_0/prerequisites.ipynb) before continuing with this lab, if you haven't done so already. ⚠️⚠️

This lab assumes that:
- An ENVIRONMENT VARIABLE `PUBLISH_API_ENDPOINT` is set with the URL to use to publish to UniTok website.
- A valid `strands` AWS Lambda Layer exists in the AWS account.
- A valid AWS profile exists OR
- Valid AWS credentials are setup.

## Step 1: Setting Up Additional AWS Resources

First, we need to create the additional AWS resources required for our long-running tools architecture:

In [None]:
%env PUBLISH_API_ENDPOINT https://08pzccde2k.execute-api.us-east-1.amazonaws.com/prod/posts

In [None]:
# Import necessary libraries
import boto3
import json
import os
import time
from datetime import datetime

# Check for required environment variable
PUBLISH_API_ENDPOINT = os.environ.get("PUBLISH_API_ENDPOINT", None)
print(f"Using following API endpoint for publishing UniTok posts: {PUBLISH_API_ENDPOINT}")

if not PUBLISH_API_ENDPOINT:
    raise ValueError("PUBLISH_API_ENDPOINT environment variable is not set")

### Creating the SQS Queue

Let's create the `evaluator-agent-tasks` SQS queue:

In [None]:
# Create SQS queue for evaluator agent
import boto3
try:
    sqs = boto3.client('sqs')
    evaluator_queue_name = 'evaluator-agent-tasks'
    # Create Dead Letter Queue first
    dlq_response = sqs.create_queue(
        QueueName=f'{evaluator_queue_name}-dead-letter-queue',
        Attributes={
            'VisibilityTimeout': '300',
            'MessageRetentionPeriod': '1209600'  # 14 days
        }
    )

    # Get the DLQ ARN
    dlq_url = dlq_response['QueueUrl']
    dlq_attributes = sqs.get_queue_attributes(
        QueueUrl=dlq_url,
        AttributeNames=['QueueArn']
    )
    dlq_arn = dlq_attributes['Attributes']['QueueArn']

    # Create main queue with redrive policy
    evaluator_queue_response = sqs.create_queue(
        QueueName='evaluator-agent-tasks',
        Attributes={
            'VisibilityTimeout': '900',
            'MessageRetentionPeriod': '1209600',  # 14 days
            'ReceiveMessageWaitTimeSeconds': '20',
            'RedrivePolicy': f'{{"deadLetterTargetArn":"{dlq_arn}","maxReceiveCount":"3"}}'
        }
    )
    evaluator_queue_url = evaluator_queue_response['QueueUrl']
    evaluator_queue_arn = sqs.get_queue_attributes(
        QueueUrl=evaluator_queue_url,
        AttributeNames=['QueueArn']
    )['Attributes']['QueueArn']
    print(f"Queue created with URL: {evaluator_queue_url}")
except Exception as e:
    print(f"Error creating SQS queues: {str(e)}")
    # Check if queues already exist
    try:
        evaluator_queue_url = sqs.get_queue_url(QueueName=evaluator_queue_name)['QueueUrl']
        print(f"Using existing SQS queue: {evaluator_queue_url}")
    except:
        raise


### Creating the Lambda Function

Now, let's prepare our Lambda function:

In [None]:
# Cell for setting up the Lambda function configuration
import boto3
import os

# Function configuration
function_name = 'EvaluatorAgent'
handler = 'index.lambda_handler'
runtime = 'python3.11'
architecture = 'arm64'
code_directory = '../functions/evaluator_agent/'
timeout = 900  # Default timeout in seconds
memory_size = 128  # Default memory in MB

# Get the layer ARN - assuming StrandsLayer is already created
lambda_client = boto3.client('lambda')
layer_name = 'strands-agents-dependencies'

# Get the latest layer version
response = lambda_client.list_layer_versions(LayerName=layer_name)
layer_arn = response['LayerVersions'][0]['LayerVersionArn'] if response['LayerVersions'] else None

if not layer_arn:
    raise Exception(f"Layer {layer_name} not found")

# Get resource ARNs needed for policies
sqs_client = boto3.client('sqs')

post_generator_queue_url = sqs_client.get_queue_url(
    QueueName='post-generator-agent-tasks'
)['QueueUrl']
post_generator_queue_arn = sqs_client.get_queue_attributes(
    QueueUrl=post_generator_queue_url,
    AttributeNames=['QueueArn']
)['Attributes']['QueueArn']


# Environment variables
environment_variables = {
    'MEMORY_TABLE': 'AgentMemoryTable',
    'CALLBACK_SQS_URL': evaluator_queue_url,
    'POST_GENERATOR_AGENT_SQS_URL': post_generator_queue_url
}

# Create IAM policy document
policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        # Basic Lambda execution
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        # SQS poll from own queue
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ReceiveMessage",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes"
            ],
            "Resource": evaluator_queue_arn
        },
        # SQS send to post generator agent queue
        {
            "Effect": "Allow",
            "Action": [
                "sqs:SendMessage"
            ],
            "Resource": post_generator_queue_arn
        },
        # DynamoDB permissions
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:UpdateItem"
            ],
            "Resource": f"arn:aws:dynamodb:*:*:table/AgentMemoryTable"
        },
        # Bedrock permissions
        {
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream"
            ],
            "Resource": "*"
        }
    ]
}

print(f"Function configuration prepared for {function_name}")


### Lambda Handler Function

Now, let's create the Lambda handler function:


In [None]:
!mkdir ../functions/evaluator_agent/

In [None]:
%%writefile ../functions/evaluator_agent/index.py
# Lambda function implementation of an async Strands Agent with multi-agent collaboration
import json
import uuid
import boto3
import logging
import os
from strands import Agent, tool
from strands.models import BedrockModel

import publish_evaluation

logger = logging.getLogger(__name__)

MEMORY_TABLE = os.environ.get('MEMORY_TABLE', 'AgentMemoryTable')
CALLBACK_SQS_URL = os.environ.get('CALLBACK_SQS_URL', None)
AGENT_NAME = 'evaluator-agent'

def save_to_agent_memory(session_id, messages, parent=None):
    # Put messages () against the session_id in memory store
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(MEMORY_TABLE)
    logger.info(f"Saving {len(messages)} messages to agent memory for session_id {session_id}")
    agent_memory_object = {
        'session_id': session_id, 
        'agent_name': AGENT_NAME,
        'messages': messages,
    }
    if parent:
        agent_memory_object['parent'] = parent
    
    table.put_item(Item=agent_memory_object)
    return True

def load_from_agent_memory(session_id):
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(MEMORY_TABLE)
    logger.info(f"Loading messages from agent memory for session_id {session_id}")
    # Load messages from agent memory of given session_id for this AGENT_NAME
    response = table.get_item(Key={'session_id': session_id, 'agent_name': AGENT_NAME})
    item = response.get('Item', {})
    messages = item.get('messages', [])
    parent = item.get('parent', None)
    logger.info(f"Loaded {len(messages)} messages from agent memory of {AGENT_NAME} for session_id: {session_id}")
    return messages, parent

def prepare(task) -> Agent:
    type = task.get('type', None)
    parent = task.get('parent',None)
    assert type is not None, "Task type is not specified"
    assert type in ["new", "existing"], "Task type is not supported, must be `new` or `existing`"
    logger.info(f"Preparing agent for {type} task")
    if type == "new":
        # Structure of a new task
        # {
        #     'type': 'new',
        #     'body': {
        #         'task': 'new task description',
        #     },
        #     'parent': {
        #         'agent_name': 'name of the agent who requested this task',
        #         'session_id': 'id of the session that the parent is carrying',
        #         'callback_sqs': 'SQS queue url to report the completion of the task',
        #         'tool_use_id': 'id of the tool that was initiaed to call this agent'
        #     }
        # }
        task_body = task.get('body', None)
        assert task_body is not None, "Task body is not specified"
        task_description = task_body.get('task', None)
        assert task_description is not None, "Task description is not specified"

        if parent:
            session_id = parent['session_id']
            logger.info(f"Reusing parent session_id: {session_id}")
        else:
            # Create a new session_id UUID
            session_id = str(uuid.uuid4())
            logger.info(f"New session_id: {session_id}")
            parent = {
                'agent_name': AGENT_NAME,
                'session_id': session_id,
                'callback_sqs': CALLBACK_SQS_URL
            }
        # Create messages
        messages = []
        return session_id, messages, task_description, parent
    
    if type == "existing":
        #  Result of successful tool execution
        # {
        #     'session_id': 'id of the session',
        #     'type': 'existing',
        #     'toolName': 'name of the tool',
        #     'body': [{
        #         'toolResult': {
        #             'toolUseId': 'id of the tool that was used',
        #             'status': 'success|error',
        #             'content': [{'text': 'tool result content | error message'}]
        #         }
        #     }]
        # }

        session_id = task.get('session_id', None)
        assert session_id is not None, "Session ID is not specified"
        logger.info(f"Using existing session_id: {session_id}")
        # Load messages from agent memory
        messages, parent = load_from_agent_memory(session_id)
        if messages and len(messages) > 1:
            # Remove the last message from the messages
            messages = messages[:-1]
            # Append the tool result to the messages
            logger.info(f"Appending tool result to messages: {task.get('body', [{}])}")
            messages.append({
                "role": "user",
                "content": task.get('body', [{}])
            })
            return session_id, messages, "Continue", parent
        else:
            logger.info("No messages found in agent memory, starting a new conversation")
            return session_id, [], "Continue", parent
    logger.error(f"Unknown task type: {type}")
    return str(uuid.uuid4()), [], "Hello, how can you help?", parent

def lambda_handler(event, context):
    logger.info(f"Received event: {event}")
        
    # Even when processing a single message, AWS Lambda still wraps it in a Records array
    if not event.get('Records') or len(event['Records']) == 0:
        logger.error("No records found in the event")
        return {
            'statusCode': 400,
            'body': json.dumps('No SQS message records found in the event')
        }
    
    # Extract the first (and only) message
    record = json.loads(event['Records'][0]['body'])
    session_id, history, prompt, parent = prepare(record)

    system_prompt = """
    You are a specialized content evaluator for Unicorn Rentals, a company that offers unicorns for rent that kids and grown-ups can play with.

    Your task is to evaluate social media posts for UniTok, our unicorn-themed social media platform, and ensure they adhere to our brand guidelines.

    Brand Guidelines for Unicorn Rentals:
    1. Content must be family-friendly and positive
    2. Posts should highlight the magical experience of spending time with unicorns
    3. When mentioning our new color selection feature, it should be accurate (colors: pink, blue, purple, green, yellow, and rainbow)
    4. Our brand voice is magical, playful, and family-friendly
    5. Posts should be between 50-200 characters for optimal engagement
    6. Emojis should be used sparingly but effectively
    7. Content should appeal to our target audience: families with children, fantasy enthusiasts, and event planners

    For each post you evaluate:
    1. Check if it adheres to ALL brand guidelines
    2. Provide a clear APPROVED or REJECTED decision
    3. If rejected, provide specific feedback on what needs to be improved
    4. If approved, provide a brief explanation of why it meets our guidelines

    Your evaluation should be thorough but concise.
    Once evaluation is complete, publish your evaluations.
    """
    # Create model
    model = BedrockModel(
        model_id="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
        region_name="us-east-1"
    )

    # Create agent
    agent = Agent(
        system_prompt=system_prompt,
        model=model,
        tools=[publish_evaluation],
        messages=history,
    )
    
    result = agent(prompt, session_id=session_id, parent=parent)

    if result.state.get("stop_event_loop", False):
        logger.info("Agent needs to wait for tool result. Saving state and sleeping.")
    save_to_agent_memory(session_id, agent.messages, parent)

    logger.info(str(result))




In [None]:
%%writefile ../functions/evaluator_agent/publish_evaluation.py
import logging
import os
import boto3
import json
from typing import Any
from botocore.exceptions import ClientError
from strands.types.tools import ToolResult, ToolUse

# Initialize logging and set paths
logger = logging.getLogger(__name__)
POST_GENERATOR_AGENT_SQS_URL = os.environ.get("POST_GENERATOR_AGENT_SQS_URL", None)
TOOL_SPEC = {
    "name": "publish_evaluation",
    "description": "Report back the evaluation results",
    "inputSchema": {
        "json": {
            "type": "object",
            "properties": {
                "evaluation": {
                    "type": "string",
                    "description": "Thorough and concise evaluation report with APPROVED or REJECTED remarks along with feedback on areas of improvment"
                }
            },
            "required": ["evaluation"]
        }
    }
}

def publish_evaluation(tool: ToolUse, **kwargs: Any) -> ToolResult:
    tool_use_id = tool["toolUseId"]
    content = tool["input"]["evaluation"]
    request_state = kwargs.get("request_state", {})
    session_id = request_state.get('session_id', kwargs.get("session_id", None))
    parent = request_state.get('parent', kwargs.get("parent", None))
    logger.debug(f"Session ID: {session_id}")

    # Send an existing task to report to parent agent via SQS
    # Structure of an existing task
    #  Result of successful tool execution
    # {
    #     'session_id': 'id of the session',
    #     'type': 'existing',
    #     'toolName': 'name of the tool',
    #     'body': [{
    #         'toolResult': {
    #             'toolUseId': 'id of the tool that was used',
    #             'status': 'success|error',
    #             'content': [{'text': 'tool result content | error message'}]
    #         }
    #     }]
    # }
    message_body = {
        "session_id": session_id,
        "type": "existing",
        "toolName": "evaluator_agent",
        "body": [{
            'toolResult': {
                'toolUseId': parent['tool_use_id'],
                'status': 'success',
                'content': [{'text': content}]
            }
        }]
    }
    logger.info(f'Reporting evaluation for session_id {session_id} with the toolResult {message_body["body"][0]["toolResult"]}')
    sqs = boto3.client('sqs')
    sqs.send_message(
        QueueUrl=POST_GENERATOR_AGENT_SQS_URL,
        MessageBody=json.dumps(message_body),
        MessageAttributes={
            'session_id': {
                'StringValue': session_id,
                'DataType': 'String'
            },
            'tool_use_id': {
                'StringValue': parent['tool_use_id'],
                'DataType': 'String'
            }
        }
    )

    # Set the stop flag, so that the agent can sleep and store it's state in memory.
    request_state["stop_event_loop"] = True

    # Return success
    return {
        "toolUseId": tool_use_id,
        "status": "success",
        "content": [{"text": "Reported evaluation results to the requester"}]
    }

## Packaging and Deployment
Now, let's package our code and deploy it to AWS:

In [None]:
# Cell for packaging the Lambda function

import zipfile
import tempfile
import os
import shutil

def package_lambda_function(source_dir):
    """
    Package the Lambda function code into a zip file
    """
    if not os.path.exists(source_dir):
        raise FileNotFoundError(f"Source directory {source_dir} not found")
    
    # Create a temporary directory for packaging
    with tempfile.TemporaryDirectory() as temp_dir:
        zip_path = os.path.join(temp_dir, 'function.zip')
        
        with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            # Walk through all files in the source directory
            for root, _, files in os.walk(source_dir):
                for file in files:
                    file_path = os.path.join(root, file)
                    # Calculate the path within the zip file (relative to source_dir)
                    arcname = os.path.relpath(file_path, source_dir)
                    zipf.write(file_path, arcname)
        
        # Read the zip file content
        with open(zip_path, 'rb') as zip_file:
            zip_content = zip_file.read()
        
        return zip_content

# Package the Lambda function
try:
    lambda_zip_content = package_lambda_function(code_directory)
    zip_size_kb = len(lambda_zip_content) / 1024
    print(f"Lambda function packaged successfully: {zip_size_kb:.2f} KB")
except Exception as e:
    print(f"Error packaging Lambda function: {str(e)}")
    raise


In [None]:
# Cell for deploying to AWS

import boto3
import json
import time

def create_iam_role(role_name, policy_document):
    """Create an IAM role for the Lambda function"""
    iam_client = boto3.client('iam')
    
    # Create the IAM role with basic Lambda execution trust policy
    trust_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {"Service": "lambda.amazonaws.com"},
                "Action": "sts:AssumeRole"
            }
        ]
    }
    
    try:
        response = iam_client.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps(trust_policy),
            Description=f"Role for {function_name} Lambda function"
        )
        role_arn = response['Role']['Arn']
        
        # Attach the inline policy to the role
        iam_client.put_role_policy(
            RoleName=role_name,
            PolicyName=f"{role_name}-policy",
            PolicyDocument=json.dumps(policy_document)
        )
        
        # Wait for role to propagate (IAM changes can take time to propagate)
        print("Waiting for IAM role to propagate...")
        time.sleep(10)
        
        return role_arn
    
    except iam_client.exceptions.EntityAlreadyExistsException:
        # If role already exists, get its ARN
        response = iam_client.get_role(RoleName=role_name)
        role_arn = response['Role']['Arn']
        
        # Update the policy
        iam_client.put_role_policy(
            RoleName=role_name,
            PolicyName=f"{role_name}-policy",
            PolicyDocument=json.dumps(policy_document)
        )
        
        return role_arn

def deploy_lambda_function(function_name, handler, runtime, role_arn, zip_content, 
                          layer_arns, environment_vars, architecture, timeout=900, memory_size=128):
    """Deploy the Lambda function to AWS"""
    lambda_client = boto3.client('lambda')
    
    try:
        # Check if function already exists
        try:
            lambda_client.get_function(FunctionName=function_name)
            # Function exists, update it
            print(f"Updating existing Lambda function: {function_name}")
            response = lambda_client.update_function_code(
                FunctionName=function_name,
                ZipFile=zip_content
            )
            print("Waiting 10 seconds for the function to be updated")
            time.sleep(10)  # Wait for the function to be updated
            # Update configuration
            lambda_client.update_function_configuration(
                FunctionName=function_name,
                Runtime=runtime,
                Role=role_arn,
                Handler=handler,
                Layers=layer_arns,
                Timeout=timeout,
                MemorySize=memory_size,
                Environment={
                    'Variables': environment_vars
                },
                Architectures=[architecture]
            )
            
        except lambda_client.exceptions.ResourceNotFoundException:
            # Function doesn't exist, create it
            print(f"Creating new Lambda function: {function_name}")
            response = lambda_client.create_function(
                FunctionName=function_name,
                Runtime=runtime,
                Role=role_arn,
                Handler=handler,
                Code={
                    'ZipFile': zip_content
                },
                Layers=layer_arns,
                Timeout=timeout,
                MemorySize=memory_size,
                Environment={
                    'Variables': environment_vars
                },
                Architectures=[architecture]
            )
        
        return response['FunctionArn']
    
    except Exception as e:
        print(f"Error deploying Lambda function: {str(e)}")
        raise

def create_or_update_event_source_mapping(function_name, queue_arn):
    """Create or update the SQS event source mapping for the Lambda function"""
    lambda_client = boto3.client('lambda')
    
    # List existing event source mappings
    response = lambda_client.list_event_source_mappings(
        FunctionName=function_name,
        EventSourceArn=queue_arn
    )
    
    if response['EventSourceMappings']:
        # Event source mapping exists, update it
        uuid = response['EventSourceMappings'][0]['UUID']
        print(f"Updating existing event source mapping: {uuid}")
        
        lambda_client.update_event_source_mapping(
            UUID=uuid,
            FunctionResponseTypes=['ReportBatchItemFailures'],
            BatchSize=1,
            MaximumBatchingWindowInSeconds=0
        )
    else:
        # Create new event source mapping
        print(f"Creating new event source mapping for queue: {queue_arn}")
        lambda_client.create_event_source_mapping(
            EventSourceArn=queue_arn,
            FunctionName=function_name,
            Enabled=True,
            BatchSize=1,
            MaximumBatchingWindowInSeconds=0,
            FunctionResponseTypes=['ReportBatchItemFailures']
        )

# Deploy the Lambda function
try:
    # Create IAM role for the Lambda function
    role_name = f"{function_name}-role"
    role_arn = create_iam_role(role_name, policy_document)
    print(f"IAM role created/updated: {role_arn}")
    
    # Deploy the Lambda function
    function_arn = deploy_lambda_function(
        function_name=function_name,
        handler=handler,
        runtime=runtime,
        role_arn=role_arn,
        zip_content=lambda_zip_content,
        layer_arns=[layer_arn],
        environment_vars=environment_variables,
        architecture=architecture,
        timeout=timeout,
        memory_size=memory_size
    )
    
    print(f"Lambda function deployed successfully: {function_arn}")
    
    # Create or update the SQS event source mapping
    create_or_update_event_source_mapping(function_name, evaluator_queue_arn)
    print(f"SQS event source mapping created/updated for queue: {evaluator_queue_arn}")
    
    # Get the function details
    lambda_client = boto3.client('lambda')
    function_details = lambda_client.get_function(FunctionName=function_name)
    print(f"Function details: {function_details['Configuration']['FunctionArn']}")
    
except Exception as e:
    print(f"Deployment failed: {str(e)}")
    raise


## Step 2: Updating the Post Generator Agent

Now, let's update our post generator agent to include the multi-agent collaboration

In [None]:
# Create directory for the updated post generator agent
!mkdir -p ../functions/post_generator_agent_v3/

The `post-generator-agent` Lambda function will have to be updated. Other than `index.py` a new file `evaluator_agent.py` will be added, which is the new tool for the agent.

In [None]:
%%writefile ../functions/post_generator_agent_v3/index.py
# Lambda function Implementation of an async Strands Agent that gets invoked via a task from SQS
import json
import uuid
import boto3
import logging
import os
from strands import Agent, tool
from strands.models import BedrockModel
# Local imports
import human_approval
import evaluator_agent

logger = logging.getLogger(__name__)

MEMORY_TABLE = os.environ.get('MEMORY_TABLE', 'agent-memory-store')
CALLBACK_SQS_URL = os.environ.get('CALLBACK_SQS_URL', None)
AGENT_NAME = 'post-generator-agent'

def save_to_agent_memory(session_id, messages, parent=None):
    # Put messages () against the session_id in memory store
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(MEMORY_TABLE)
    logger.info(f"Saving {len(messages)} messages to agent memory for session_id {session_id}")
    agent_memory_object = {
        'session_id': session_id, 
        'agent_name': AGENT_NAME,
        'messages': messages,
    }
    if parent:
        agent_memory_object['parent'] = parent
    
    table.put_item(Item=agent_memory_object)
    return True

def load_from_agent_memory(session_id):
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(MEMORY_TABLE)
    logger.info(f"Loading messages from {AGENT_NAME} memory for session_id {session_id}")
    # Load messages from agent memory of given session_id for this AGENT_NAME
    response = table.get_item(Key={'session_id': session_id, 'agent_name': AGENT_NAME})
    item = response.get('Item', {})
    messages = item.get('messages', [])
    parent = item.get('parent', None)
    logger.info(f"Loaded {len(messages)} messages from agent memory of {AGENT_NAME} for session_id: {session_id}")
    return messages, parent

def prepare(task) -> Agent:
    type = task.get('type', None)
    parent = task.get('parent', None)
    assert type is not None, "Task type is not specified"
    assert type in ["new", "existing"], "Task type is not supported, must be `new` or `existing`"
    logger.info(f"Preparing agent for {type} task")
    if type == "new":
        # Structure of a new task
        # {
        #     'type': 'new',
        #     'body': {
        #         'task': 'new task description',
        #     },
        #     'parent': { # Optional
        #         'agent_name': 'name of the agent who requested this task',
        #         'session_id': 'id of the session that the parent is carrying',
        #         'callback_sqs': 'SQS queue url to report the completion of the task',
        #         'tool_use_id': 'id of the tool that was initiaed to call this agent'
        #     }
        # }
        task_body = task.get('body', None)
        assert task_body is not None, "Task body is not specified"
        task_description = task_body.get('task', None)
        assert task_description is not None, "Task description is not specified"

        if parent:
            session_id = parent.get('session_id', None)
            assert session_id is not None, "Session ID is not specified in parent"
            logger.info(f"Reusing parent session_id: {session_id}")
        else:
            # Create a new session_id UUID
            session_id = str(uuid.uuid4())
            logger.info(f"New session_id: {session_id}")
            parent = {
                'agent_name': AGENT_NAME,
                'session_id': session_id,
                'callback_sqs': CALLBACK_SQS_URL
            }
        # Create messages
        messages = []
        return session_id, messages, task_description, parent
    
    if type == "existing":
        #  Result of successful tool execution
        # {
        #     'session_id': 'id of the session',
        #     'type': 'existing',
        #     'toolName': 'name of the tool',
        #     'body': [{
        #         'toolResult': {
        #             'toolUseId': 'id of the tool that was used',
        #             'status': 'success|error',
        #             'content': [{'text': 'tool result content | error message'}]
        #         }
        #     }]
        # }

        session_id = task.get('session_id', None)
        assert session_id is not None, "Session ID is not specified"
        logger.info(f"Using existing session_id: {session_id}")
        # Load messages from agent memory
        messages, parent = load_from_agent_memory(session_id)
        if messages and len(messages) > 1:
            # Remove the last message from the messages
            messages = messages[:-1]
            # Append the tool result to the messages
            logger.info(f"Appending tool result to messages: {task.get('body', [{}])}")
            messages.append({
                "role": "user",
                "content": task.get('body', [{}])
            })
            return session_id, messages, "Continue", parent
        else:
            logger.info("No messages found in agent memory, starting a new conversation")
            return session_id, [], "Continue", parent
    logger.error(f"Unknown task type: {type}")
    return str(uuid.uuid4()), [], "Hello, how can you help?", parent

@tool
def publish_post(
    content: str, 
    author: str = "Unicorn Rentals", 
    unicorn_color: str = "rainbow", 
    image_url: str = None) -> str:
    """
    Publish a post to the UniTok social media platform.

    Args:
        content (str): The text content of the post.
        author (str, optional): The author of the post. Defaults to "Unicorn Rentals".
        unicorn_color (str, optional): The color of the unicorn. Choose from: pink, blue, purple, green, yellow, or rainbow. Defaults to "rainbow".
        image_url (str, optional): URL to an image to include with the post. Defaults to None.

    Returns:
        str: A message indicating the post was published successfully, or an error message.
    """

    # For this lab, we'll simulate posting to UniTok
    print(f"Publishing post to UniTok: {content}")
    print(f"Author: {author}")
    print(f"Unicorn Color: {unicorn_color}")
    if image_url:
        print(f"Image URL: {image_url}")
    post_data = {
        "content": content,
        "author": author,
        "unicornColor": unicorn_color
    }
    if image_url:
        post_data["imageUrl"] = image_url
    try:
        # Send the post to the API
        logger.info(f"Publishing post to UniTok: {post_data}")
        response = requests.post(PUBLISH_API_ENDPOINT, json=post_data)
        logger.info(f"Response from UniTok: {response}")
        # Check if the request was successful
        if response.status_code == 201:
            post_id = response.json().get("postId")
            logger.info(f"Post published successfully! Post ID: {post_id}")
            return f"Post published successfully! Post ID: {post_id}"
        else:
            logger.error(f"Failed to publish post. Status code: {response.status_code}, Response: {response.text}")
            return f"Failed to publish post. Status code: {response.status_code}, Response: {response.text}"
    except Exception as e:
        logger.error(f"Error publishing post with the exception {e}")
        return "Error publishing post with the exception {e}"

def lambda_handler(event, context):
    logger.info(f"Received event: {event}")
        
    # Even when processing a single message, AWS Lambda still wraps it in a Records array
    if not event.get('Records') or len(event['Records']) == 0:
        logger.error("No records found in the event")
        return {
            'statusCode': 400,
            'body': json.dumps('No SQS message records found in the event')
        }
    
    # Extract the first (and only) message
    record = json.loads(event['Records'][0]['body'])
    session_id, history, prompt, parent = prepare(record)

    system_prompt = """
    You are a creative social media manager for Unicorn Rentals, a company that offers unicorns for rent that kids and grown-ups can play with.

    Your task is to create engaging social media posts for UniTok, our unicorn-themed social media platform.
    Before publishing, you must request evaluation of the post to ensure your content adheres to our brand guidelines.

    Process for creating and publishing posts:
    1. Generate a creative post based on the user's request
    2. Request evaluation to check if it meets our brand guidelines
    3. If the post is REJECTED, revise the post based on feedback and evaluate again
    4. Once the post is APPROVED, request human approval of the post
    5. If the human approves the post then publish it to our platform
    6. If the human denies the post, then restart the process

    Important information about Unicorn Rentals:
    - We offer unicorns in various colors: pink, blue, purple, green, yellow, and rainbow (our most popular)
    - Our new product feature allows customers to pick their favorite color unicorn to rent
    - Our target audience includes families with children, fantasy enthusiasts, and event planners
    - Our brand voice is magical, playful, and family-friendly

    When creating posts:
    - Keep content family-friendly and positive
    - Highlight the magical experience of spending time with unicorns
    - Mention the new color selection feature when appropriate
    - Use emojis sparingly but effectively
    - Keep posts between 50-200 characters for optimal engagement
    - If your posts are continously being rejected by evaluator and denied by humans then stop after 3 tries

    Always show your thought process when creating posts, evaluating them, and making revisions.
    """
    # Create model
    model = BedrockModel(
        model_id="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
        region_name="us-east-1"
    )

    # Create agent
    agent = Agent(
        system_prompt=system_prompt,
        model=model,
        tools=[evaluator_agent, human_approval, publish_post],
        messages=history,
    )
    
    result = agent(prompt, session_id=session_id, parent=parent)

    if result.state.get("stop_event_loop", False):
        logger.info("Agent needs to wait for tool result. Saving state and sleeping.")
    save_to_agent_memory(session_id, agent.messages, parent)

    logger.info(str(result))

In [None]:
%%writefile ../functions/post_generator_agent_v3/evaluator_agent.py
import logging
import os
import boto3
import json
from typing import Any
from botocore.exceptions import ClientError
from strands.types.tools import ToolResult, ToolUse

# Initialize logging and set paths
logger = logging.getLogger(__name__)
EVALUATOR_AGENT_SQS_URL = os.environ.get("EVALUATOR_AGENT_SQS_URL", None)

TOOL_SPEC = {
    "name": "evaluator_agent",
    "description": "Request evaluation of a social media post for adherence to Unicorn Rentals brand guidelines.",
    "inputSchema": {
        "json": {
            "type": "object",
            "properties": {
                "content": {
                    "type": "string",
                    "description": "The text content of the post to evaluate."
                }
            },
            "required": ["content"]
        }
    }
}

def evaluator_agent(tool: ToolUse, **kwargs: Any) -> ToolResult:
    tool_use_id = tool["toolUseId"]
    content = tool["input"]["content"]
    request_state = kwargs.get("request_state", {})
    session_id = request_state.get('session_id', kwargs.get("session_id", None))
    parent = request_state.get('parent', kwargs.get("parent", None))
    logger.debug(f"Session ID: {session_id}")

    # Send a new task to the evaluator agent via SQS
    # Structure of a new task
    # {
    #     'type': 'new',
    #     'body': {
    #         'task': 'new task description',
    #     },
    #     'parent': {
    #         'agent_name': 'name of the agent who requested this task',
    #         'session_id': 'id of the session that the parent is carrying',
    #         'callback_sqs': 'SQS queue url to report the completion of the task',
    #         'tool_use_id': 'id of the tool that was initiaed to call this agent'
    #     }
    # }
    message_body = {
        "type": "new",
        "body": {
            "task": content
        }
    }
    if parent:
        message_body['parent'] = parent
        message_body['parent']['tool_use_id'] = tool_use_id
        
    sqs = boto3.client('sqs')
    sqs.send_message(
        QueueUrl=EVALUATOR_AGENT_SQS_URL,
        MessageBody=json.dumps(message_body),
        MessageAttributes={
            'session_id': {
                'StringValue': session_id,
                'DataType': 'String'
            },
            'tool_use_id': {
                'StringValue': tool_use_id,
                'DataType': 'String'
            }
        }
    )

    # Set the stop flag, so that the agent can sleep and store it's state in memory.
    request_state["stop_event_loop"] = True
    request_state["session_id"] = session_id
    
    # Return success page
    return {
        "toolUseId": tool_use_id,
        "status": "success",
        "content": [{"text": "Requested evaluation from evaluator agent and waiting for response"}]
    }

In [None]:
! cp ../functions/post_generator_agent_v2/human_approval.py ../functions/post_generator_agent_v3/human_approval.py

In [None]:
def get_api_id_by_name(api_name):
    """
    Get the API ID of an API Gateway by its name
    """
    import boto3
    # Create API Gateway client
    client = boto3.client('apigateway')
    # Get list of all REST APIs
    response = client.get_rest_apis()    
    # Find the API with the matching name
    for item in response['items']:
        if item['name'] == api_name:
            return item['id']
    # Handle pagination if there are many APIs
    while 'position' in response:
        response = client.get_rest_apis(position=response['position'])
        for item in response['items']:
            if item['name'] == api_name:
                return item['id']
    return None

In [None]:
# Fetch api_id of the approval API Gateway
api_id = get_api_id_by_name('ApprovalAPI')
region = boto3.session.Session().region_name
approval_api_endpoint = f"https://{api_id}.execute-api.{region}.amazonaws.com/dev/approval/"

# Fetch SNS topic ARN for approval notification
sns = boto3.client('sns')
topic_arn = sns.create_topic(Name='ApprovalNotificationTopic')['TopicArn']

# Package the Lambda function
post_generator_v3_zip = package_lambda_function('../functions/post_generator_agent_v3/')
print(f"Post Generator Agent V3 Lambda packaged: {len(post_generator_v3_zip) / 1024:.2f} KB")

# Update the IAM policy for the post generator agent
post_generator_role_name = 'PostGeneratorAgent-role'
post_generator_policy = {
    "Version": "2012-10-17",
    "Statement": [
        # Basic Lambda execution
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        # SQS poll from own queue
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ReceiveMessage",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes"
            ],
            "Resource": f"arn:aws:sqs:*:*:post-generator-agent-tasks"
        },
        # SQS send to evaluator agent queue
        {
            "Effect": "Allow",
            "Action": [
                "sqs:SendMessage"
            ],
            "Resource": f"arn:aws:sqs:*:*:evaluator-agent-tasks"
        },
        # DynamoDB permissions
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:UpdateItem"
            ],
            "Resource": f"arn:aws:dynamodb:*:*:table/AgentMemoryTable"
        },
        # SNS permissions
        {
            "Effect": "Allow",
            "Action": [
                "sns:Publish"
            ],
            "Resource": topic_arn
        },
        # Bedrock permissions
        {
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream"
            ],
            "Resource": "*"
        }
    ]
}

# Update the IAM role policy
try:
    iam = boto3.client('iam')
    iam.put_role_policy(
        RoleName=post_generator_role_name,
        PolicyName=f"{post_generator_role_name}-policy",
        PolicyDocument=json.dumps(post_generator_policy)
    )
    print(f"Updated IAM role policy for {post_generator_role_name}")
except Exception as e:
    print(f"Error updating IAM role policy: {str(e)}")
    raise


In [None]:
# Deploy the updated post generator agent
lambda_client = boto3.client('lambda')

try:
    # Update the function
    print("Updating the function code for PostGeneratorAgent")
    lambda_client.update_function_code(
        FunctionName='PostGeneratorAgent',
        ZipFile=post_generator_v3_zip
    )
    print("Waiting 10 seconds for the function to be updated")
    time.sleep(10)  # Wait for the function to be updated
    print("Updating the configuration to add new environment variables.")
    # Update the configuration
    lambda_client.update_function_configuration(
        FunctionName='PostGeneratorAgent',
        Environment={
            'Variables': {
                'PUBLISH_API_ENDPOINT': os.environ.get('PUBLISH_API_ENDPOINT', ''),
                'APPROVAL_API_ENDPOINT': approval_api_endpoint,
                'TOPIC_ARN': topic_arn,
                'MEMORY_TABLE': 'AgentMemoryTable',
                'CALLBACK_SQS_URL': post_generator_queue_url,
                'EVALUATOR_AGENT_SQS_URL': evaluator_queue_url
            }
        }
    )
    
    print(f"Updated Lambda function: PostGeneratorAgent")
    
except Exception as e:
    print(f"Error updating Lambda function: {str(e)}")
    raise

## Step 3: Testing the Multi-Agent Pattern

Now, let's test our implementation of the long-running tool pattern with human approval

#### Scenario 1: Accepted by Evaluator - No revision needed

In [None]:
sqs = boto3.client('sqs')

task_without_revision = "Create a post announcing our new feature that lets customers pick their favorite unicorn color."
task = {
  "type": "new",
  "body": {
    "task": task_without_revision
  }
}
sqs.send_message(
    QueueUrl=post_generator_queue_url,
    MessageBody=json.dumps(task)
)

#### Scenario 2: Rejected by Evaluator - Revision needed based on feedback

In [None]:
task_with_revision = "Create a post about our unicorns for an adult Halloween party. Make it really scary!"
task = {
  "type": "new",
  "body": {
    "task": task_with_revision
  }
}
sqs.send_message(
    QueueUrl=post_generator_queue_url,
    MessageBody=json.dumps(task)
)

## Viewing Posts on the UniTok Website

Now that we've created and published posts using our agent, let's see them on the UniTok website!

To view your posts:
1. Find the **UniTokUrl** from your CDK deployment output from prerequisites. It should look something like: `https://d123abc456def.cloudfront.net`
2. Open this URL in your web browser
3. You should see the posts that our agent has created and published, displayed in reverse chronological order

Each post shows:
- The content of the post
- The author (which we set to "Unicorn Rentals")
- The unicorn color (visualized with the appropriate color)
- The timestamp when the post was created
- The number of likes (starting at 0)

This demonstrates the end-to-end flow of our agent: it generates creative content based on our prompts, publishes it to the UniTok API, and then we can see the posts on the UniTok website.

## Conclusion

Congratulations on completing Lab 4! You've successfully implemented a multi-agent system where specialized agents collaborate asynchronously to accomplish complex tasks. This architecture represents a significant advancement in building sophisticated AI systems that can handle real-world workflows requiring diverse expertise.

Key achievements in this lab:

1. **Agent Specialization**: You've created two specialized agents with distinct responsibilities - a post generator that creates content and an evaluator that ensures quality and brand compliance.

2. **Asynchronous Communication**: You've implemented the "Agents as Tools" pattern, allowing agents to communicate through SQS queues without blocking or waiting for immediate responses.

3. **State Persistence**: Both agents maintain their state in DynamoDB, enabling them to pause execution while waiting for responses and resume exactly where they left off.

4. **Parent-Child Relationships**: You've configured a workflow where one agent can delegate tasks to another and receive results when processing is complete.

5. **Complete Multi-Agent Workflow**: The entire system now supports a sophisticated content creation process with built-in quality control and feedback loops.

This multi-agent pattern can be extended to support any number of specialized agents working together on complex tasks. As your system grows, you could add additional agents for tasks like:
- Image generation for social media posts
- Audience targeting and segmentation
- Performance analytics and optimization
- Content scheduling and distribution

By mastering this pattern, you now have the tools to build sophisticated AI systems that can tackle complex, real-world problems through collaboration between specialized components. This approach enables you to create more maintainable, scalable, and robust agent architectures that can evolve with your business needs.