# Multi-Agent AI Campaign Planning System on AWS

Marketing campaigns often involve complex, multi-step planning processes that can benefit from AI automation. In this solution, we deploy a **multi-agent AI campaign planning system** on AWS. The system coordinates multiple specialized AI agents to take a marketing brief from input to a full campaign plan, using AWS’s serverless and AI services.

## Introduction

In this notebook, we deploy a multi-agent AI campaign planning system on AWS. By leveraging Amazon Bedrock foundation models (e.g., Anthropic Claude, Amazon Titan) for AI tasks and AWS Step Functions for orchestration, we enable specialized agents (running as Lambda functions) to collaborate seamlessly. Supporting services—S3, DynamoDB, Kendra, SNS, and IAM—provide storage, state management, knowledge retrieval, notifications, and secure roles, respectively.

## Architecture and Components

- **Amazon Bedrock Models (Claude, Titan)** – Generative AI for each agent.
- **AWS Lambda Functions (Agents 0–5)** – Input extractor, business interpreter, campaign planner, content generator, channel strategist, evaluator.
- **AWS Step Functions** – Orchestrates the workflow (Agent0 → Agent1 → ... → Agent5).
- **Amazon S3** – Stores campaign briefs and generated assets.
- **Amazon DynamoDB** – Persists intermediate and final campaign data (on-demand billing).
- **Amazon Kendra** – Knowledge base for RAG, indexing marketing documents.
- **Amazon SNS** – Notifications on success/failure.
- **AWS IAM** – Least-privilege roles for Lambdas and Step Functions.

## Setup Instructions (AWS Access & Environment)

Ensure you have AWS programmatic access with permissions to create IAM roles, Lambdas, Step Functions, S3 buckets, DynamoDB tables, Kendra indexes, and SNS topics. Avoid hard-coding secrets—use environment variables or AWS config files.

### AWS Credentials and Region


In [None]:
import os
# Set AWS credentials (avoid hard-coding in production)
os.environ['AWS_ACCESS_KEY_ID'] = 'AKIA...YOURKEY'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'YOURSECRET'
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'  # Use a Bedrock & Kendra supported region

## 1. Provision AWS Resources

### 1.1 Create S3 Buckets for Briefs and Assets


In [None]:
import boto3, random, string

region = os.environ['AWS_DEFAULT_REGION']
s3 = boto3.client('s3', region_name=region)

# Unique bucket names (modify suffix as needed)
suffix = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(6))
briefs_bucket = f"ai-campaign-briefs-{suffix}"
assets_bucket = f"ai-campaign-assets-{suffix}"

for bucket in [briefs_bucket, assets_bucket]:
    try:
        s3.create_bucket(
            Bucket=bucket,
            CreateBucketConfiguration={'LocationConstraint': region}
        )
        print(f"Created bucket: {bucket}")
    except Exception as e:
        print(f"Bucket creation error: {e}")

### 1.2 Upload Dummy Marketing Brief


In [None]:
import json

# Dummy marketing brief
campaign_brief = {
    "campaign_id": "CAMPAIGN123",
    "campaign_name": "GenZ Social Hype 2025",
    "target_audience": "Gen-Z",
    "product": "Tech Gadget X",
    "objectives": "Drive brand engagement and social buzz among Gen-Z users for Tech Gadget X launch.",
    "key_messages": "Trendy, innovative, sustainable; speak the Gen-Z lingo.",
    "channels": "social_media",
    "timeline": "Q3 2025",
    "budget": "50k USD"
}
brief_key = "sample_brief_genz.json"
s3.put_object(Bucket=briefs_bucket, Key=brief_key, Body=json.dumps(campaign_brief))
print(f"Uploaded dummy brief to s3://{briefs_bucket}/{brief_key}")

### 1.3 Create DynamoDB Table for Campaign Data


In [None]:
dynamodb = boto3.client('dynamodb', region_name=region)
table_name = f"AI_CampaignData_{suffix}"

dynamodb.create_table(
    TableName=table_name,
    AttributeDefinitions=[
        {"AttributeName": "campaign_id", "AttributeType": "S"},
        {"AttributeName": "step", "AttributeType": "S"}
    ],
    KeySchema=[
        {"AttributeName": "campaign_id", "KeyType": "HASH"},
        {"AttributeName": "step", "KeyType": "RANGE"}
    ],
    BillingMode='PAY_PER_REQUEST'
)
dynamodb.get_waiter('table_exists').wait(TableName=table_name)
print(f"DynamoDB table created: {table_name}")

### 1.4 Set Up Amazon Kendra (Knowledge Base Index)


In [None]:
iam = boto3.client('iam')
kendra = boto3.client('kendra', region_name=region)

# Create IAM role for Kendra
trust_policy = {
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Principal": {"Service": "kendra.amazonaws.com"},
        "Action": "sts:AssumeRole"
    }]
}
role_resp = iam.create_role(
    RoleName=f"KendraIndexRole-{suffix}",
    AssumeRolePolicyDocument=json.dumps(trust_policy),
    Description="Role for Kendra index to access S3"
)
iam.attach_role_policy(
    RoleName=role_resp['Role']['RoleName'],
    PolicyArn="arn:aws:iam::aws:policy/AmazonKendraReadOnlyAccess"
)
iam.attach_role_policy(
    RoleName=role_resp['Role']['RoleName'],
    PolicyArn="arn:aws:iam::aws:policy/CloudWatchLogsFullAccess"
)
kendra_role_arn = role_resp['Role']['Arn']
print(f"Created Kendra IAM role: {kendra_role_arn}")

# Create Kendra index
index_resp = kendra.create_index(
    Name=f"CampaignKnowledgeBase-{suffix}",
    RoleArn=kendra_role_arn,
    Edition="DEVELOPER_EDITION"
)
index_id = index_resp['Id']
print(f"Creating Kendra index (ID: {index_id})...")
kendra.get_waiter('index_created').wait(Id=index_id)
print("Kendra index is active.")

### 1.5 Add Dummy Documents to Kendra


In [None]:
# Ingest dummy documents into Kendra
documents = [
    {
        "Id": "doc1",
        "Title": "GenZ Social Media Trends 2024",
        "Blob": b"Gen Z audiences heavily use TikTok, Instagram, and YouTube. Authenticity and social causes drive engagement...",
        "ContentType": "PLAIN_TEXT"
    },
    {
        "Id": "doc2",
        "Title": "B2B SaaS Lead Gen Best Practices",
        "Blob": b"Enterprise marketing relies on LinkedIn, webinars, and whitepapers. Emphasize ROI, case studies...",
        "ContentType": "PLAIN_TEXT"
    }
]
resp = kendra.batch_put_document(IndexId=index_id, Documents=documents)
print("Added dummy documents to Kendra index.")

### 1.6 Create SNS Topic for Notifications


In [None]:
sns = boto3.client('sns', region_name=region)
topic_resp = sns.create_topic(Name=f"CampaignCompletionTopic-{suffix}")
topic_arn = topic_resp['TopicArn']
print(f"Created SNS topic: {topic_arn}")

### 1.7 Create IAM Roles for Lambda and Step Functions


In [None]:
# Lambda execution role
lambda_role = iam.create_role(
    RoleName=f"CampaignAgentLambdaRole-{suffix}",
    AssumeRolePolicyDocument=json.dumps({
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Principal": {"Service": "lambda.amazonaws.com"},
            "Action": "sts:AssumeRole"
        }]
    }),
    Description="Lambda execution role for AI campaign agents"
)
lambda_role_arn = lambda_role['Role']['Arn']
iam.attach_role_policy(
    RoleName=lambda_role['Role']['RoleName'],
    PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
)
print(f"Created Lambda IAM role: {lambda_role_arn}")

# Step Functions execution role
sf_role = iam.create_role(
    RoleName=f"StepFunctionRole-{suffix}",
    AssumeRolePolicyDocument=json.dumps({
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Principal": {"Service": "states.amazonaws.com"},
            "Action": "sts:AssumeRole"
        }]
    }),
    Description="Step Functions workflow role"
)
sf_role_arn = sf_role['Role']['Arn']
# Attach policies to sf_role as needed (invoke Lambda & publish SNS)
policy = {
    "Version": "2012-10-17",
    "Statement": [
        {"Effect": "Allow", "Action": "lambda:InvokeFunction", "Resource": "*"},
        {"Effect": "Allow", "Action": "sns:Publish", "Resource": topic_arn},
        {"Effect": "Allow", "Action": "dynamodb:UpdateItem", "Resource": f"arn:aws:dynamodb:{region}:*:table/{table_name}"}
    ]
}
iam.put_role_policy(
    RoleName=sf_role['Role']['RoleName'],
    PolicyName="StepFuncInvokePolicy",
    PolicyDocument=json.dumps(policy)
)
print(f"Created Step Functions IAM role: {sf_role_arn}")

### 1.8 Deploy Lambda Functions for Each Agent

Below we package and deploy six agent Lambdas (Agent0 through Agent5) using inline Python code and boto3.

In [None]:
import zipfile, io

lambda_client = boto3.client('lambda', region_name=region)
CLAUDE_MODEL = "anthropic.claude-v2"
TITAN_MODEL = "amazon.titan-text-express-v1"

def create_lambda_zip(code_str):
    zip_buffer = io.BytesIO()
    with zipfile.ZipFile(zip_buffer, 'w') as z:
        z.writestr('lambda_function.py', code_str)
    zip_buffer.seek(0)
    return zip_buffer.read()

# Define code for each agent (Agent0 to Agent5)
agent_codes = []

# Agent0: Input Extractor
agent0 = r"""<... full agent0 python code as per above ...>"""
agent_codes.append(agent0)

# Agent1: Business Interpreter
agent1 = r"""<... full agent1 python code as per above ...>"""
agent_codes.append(agent1)

# Agent2: Campaign Planner
agent2 = r"""<... full agent2 python code as per above ...>"""
agent_codes.append(agent2)

# Agent3: Content Generator
agent3 = r"""<... full agent3 python code as per above ...>"""
agent_codes.append(agent3)

# Agent4: Channel Strategist
agent4 = r"""<... full agent4 python code as per above ...>"""
agent_codes.append(agent4)

# Agent5: Evaluator
agent5 = r"""<... full agent5 python code as per above ...>"""
agent_codes.append(agent5)

# Deploy each Lambda
for i, code in enumerate(agent_codes):
    func_name = f"CampaignAgent{i}-{suffix}"
    zip_bytes = create_lambda_zip(code)
    try:
        lambda_client.create_function(
            FunctionName=func_name,
            Runtime='python3.9',
            Role=lambda_role_arn,
            Handler='lambda_function.lambda_handler',
            Code={'ZipFile': zip_bytes},
            Timeout=60,
            MemorySize=256,
            Environment={'Variables': {
                'DYNAMO_TABLE': table_name,
                'ASSETS_BUCKET': assets_bucket,
                'KENDRA_INDEX': index_id or "",
                'SNS_TOPIC': topic_arn
            }}
        )
        print(f"Created Lambda: {func_name}")
    except Exception as e:
        print(f"Error creating {func_name}: {e}")

### 1.9 Define Step Functions State Machine


In [None]:
import json

stepfunctions = boto3.client('stepfunctions', region_name=region)
account_id = boto3.client('sts').get_caller_identity()['Account']
lambda_arns = [f"arn:aws:lambda:{region}:{account_id}:function:CampaignAgent{i}-{suffix}" for i in range(6)]

states = {}
for i in range(6):
    state_name = f"Agent{i}Step"
    states[state_name] = {
        "Type": "Task",
        "Resource": lambda_arns[i],
        "Parameters": {"campaign_id.$": "$.campaign_id"},
        "Retry": [{"ErrorEquals": ["Lambda.ServiceException","Lambda.AWSLambdaException","Lambda.SdkClientException"], "IntervalSeconds": 2, "MaxAttempts": 3, "BackoffRate": 2.0}],
        "Catch": [{"ErrorEquals": ["States.ALL"], "ResultPath": "$.error", "Next": "FailureNotify"}],
        "Next": f"Agent{i+1}Step" if i < 5 else "NotifySuccess"
    }

states["NotifySuccess"] = {
    "Type": "Task",
    "Resource": "arn:aws:states:::aws-sdk:sns:publish",
    "Parameters": {"TopicArn": topic_arn, "Message.$": "States.Format('Campaign {} completed.', $.campaign_id)", "Subject": "Campaign Success"},
    "End": True
}
states["FailureNotify"] = {
    "Type": "Task",
    "Resource": "arn:aws:states:::aws-sdk:sns:publish",
    "Parameters": {"TopicArn": topic_arn, "Message.$": "States.Format('Campaign {} failed: {}', $.campaign_id, $.error.Error)", "Subject": "Campaign Failed"},
    "End": True
}

definition = {"Comment": "Multi-agent AI Campaign Planning", "StartAt": "Agent0Step", "States": states}

sf_resp = stepfunctions.create_state_machine(
    name=f"CampaignPlannerWorkflow-{suffix}",
    definition=json.dumps(definition),
    roleArn=sf_role_arn,
    type='STANDARD'
)
print(f"Created State Machine: {sf_resp['stateMachineArn']}")

## 2. Running the Multi-Agent Campaign Planner

Start a test execution using the dummy brief.

In [None]:
exec_resp = stepfunctions.start_execution(
    stateMachineArn=sf_resp['stateMachineArn'],
    input=json.dumps({
        "campaign_id": campaign_brief["campaign_id"],
        "brief_bucket": briefs_bucket,
        "brief_key": brief_key
    })
)
print(f"Started execution: {exec_resp['executionArn']}")

## 3. Templates for Generative Outputs

**Gen-Z Content Template:**
```
Based on the campaign plan: '''{plan}''', generate 3 short social media posts in a casual, trendy voice with emojis and slang.
```

**Enterprise Content Template:**
```
Based on the campaign plan: '''{plan}''', generate 3 LinkedIn posts in a professional, authoritative tone, emphasizing ROI.
```

## 4. Error Handling, Logging, and Security Considerations

- **Retries & Catch:** Step Functions retries on Lambda errors and catches to notify via SNS.
- **Logging:** Each Lambda logs to CloudWatch. Step Functions execution history provides state transitions.
- **Security:** IAM roles follow least privilege; environment variables for resource names.

### Clean Up (Optional)
```python
# Uncomment to delete resources
# stepfunctions.delete_state_machine(stateMachineArn=sf_resp['stateMachineArn'])
# for i in range(6): lambda_client.delete_function(FunctionName=f"CampaignAgent{i}-{suffix}")
# dynamodb.delete_table(TableName=table_name)
# sns.delete_topic(TopicArn=topic_arn)
# kendra.delete_index(Id=index_id)
# iam.delete_role(RoleName=lambda_role['Role']['RoleName'])
# iam.delete_role(RoleName=sf_role['Role']['RoleName'])
```

## Conclusion

This notebook deploys a scalable, production-ready, multi-agent AI campaign planning system on AWS, combining generative AI with serverless orchestration. It accelerates campaign ideation by fully automating brief parsing, strategy planning, content generation, channel selection, and evaluation.

## Sources

- AWS ML Blog: Orchestrate generative AI workflows with Bedrock & Step Functions.
- AWS Step Functions Documentation.
- Amazon Bedrock Developer Guide.
- Amazon Kendra Documentation.