# Multi-Agent Collaboration - DSL query use case

In [1]:
import logging
import boto3
import os
import json
import time
import zipfile
import subprocess
from textwrap import dedent

### Configure Logging

In [2]:
# -----------------------------------------------------------------------------
# Configure Logging
# -----------------------------------------------------------------------------
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

### Import Bedrock Agent utilities

In [3]:
from src.utils.bedrock_agent import Agent, SupervisorAgent, agents_helper, region, account_id

2025-02-07 16:53:39,779 - botocore.credentials - INFO - Found credentials in shared credentials file: ~/.aws/credentials


boto3 version: 1.36.9


### Define AWS clients

In [4]:
sts_client = boto3.client('sts')
session = boto3.session.Session()

account_id = sts_client.get_caller_identity()["Account"]
region = session.region_name
account_id_suffix = account_id[:3]
agent_suffix = f"{region}-{account_id_suffix}"

s3_client = boto3.client('s3', region_name=region)
bedrock_client = boto3.client('bedrock-runtime', region_name=region)
iam_client = boto3.client('iam', region_name=region)
lambda_client = boto3.client('lambda', region_name=region)

logger.info(f"Region: {region}")
logger.info(f"Account ID: {account_id}")
logger.info(f"Agent Suffix: {agent_suffix}")


2025-02-07 16:53:41,189 - __main__ - INFO - Region: us-west-2
2025-02-07 16:53:41,189 - __main__ - INFO - Account ID: 533267284022
2025-02-07 16:53:41,190 - __main__ - INFO - Agent Suffix: us-west-2-533


### Helper Functions

In [5]:
def create_iam_role(role_name: str) -> str:
    """
    Creates or retrieves an IAM Role with the necessary trust policy for Lambda.
    Attaches AWSLambdaBasicExecutionRole, and adds inline policies for OpenSearch 
    and AOSS access.

    :param role_name: Name of the IAM Role to create or retrieve.
    :return: ARN of the created or retrieved IAM Role.
    """
    logger.info(f"Creating or retrieving IAM Role: {role_name}")
    assume_role_policy_document = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": "lambda.amazonaws.com"
                },
                "Action": "sts:AssumeRole"
            }
        ]
    }

    try:
        role = iam_client.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps(assume_role_policy_document)
        )
        logger.info(f"IAM Role {role_name} created.")
    except iam_client.exceptions.EntityAlreadyExistsException:
        logger.info(f"IAM Role {role_name} already exists. Retrieving existing role.")
        role = iam_client.get_role(RoleName=role_name)

    # Attach AWS Lambda execution policy
    iam_client.attach_role_policy(
        RoleName=role_name,
        PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
    )
    logger.info(f"Attached AWSLambdaBasicExecutionRole to {role_name}.")

    # Attach additional policies for OpenSearch access
    opensearch_policy_document = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "es:Describe*",
                    "es:List*",
                    "es:Get*"
                ],
                "Resource": "*"
            }
        ]
    }
    opensearch_policy_name = f"{role_name}-OpenSearchPolicy"
    try:
        iam_client.put_role_policy(
            RoleName=role_name,
            PolicyName=opensearch_policy_name,
            PolicyDocument=json.dumps(opensearch_policy_document)
        )
        logger.info(f"Attached OpenSearch policy to IAM Role {role_name}.")
    except Exception as e:
        logger.error(f"Failed to attach OpenSearch policy to IAM Role {role_name}: {str(e)}")

    # Attach the new policy for aoss:APICall
    aoss_policy_document = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "aoss:*"
                ],
                "Resource": "*"
            }
        ]
    }
    aoss_policy_name = f"{role_name}-AOSSPolicy"
    try:
        iam_client.put_role_policy(
            RoleName=role_name,
            PolicyName=aoss_policy_name,
            PolicyDocument=json.dumps(aoss_policy_document)
        )
        logger.info(f"Attached AOSS policy to IAM Role {role_name}.")
    except Exception as e:
        logger.error(f"Failed to attach AOSS policy to IAM Role {role_name}: {str(e)}")

    role_arn = role['Role']['Arn']

    # Wait for IAM role to propagate
    logger.info("Waiting 10 seconds for IAM role to propagate...")
    time.sleep(10)

    return role_arn


def create_lambda_package(source_file: str, zip_file_path: str, dependencies: list):
    """
    Packages a Lambda function and its dependencies into a single ZIP file.

    :param source_file: Path to the Lambda function source code.
    :param zip_file_path: Path to the ZIP file that will be created.
    :param dependencies: A list of Python packages required by the Lambda.
    """
    logger.info(f"Packaging Lambda function from {source_file} with dependencies {dependencies}")
    package_dir = "package"

    # Install dependencies to a local folder
    if not os.path.exists(package_dir):
        os.makedirs(package_dir)
    logger.info("Installing dependencies locally...")
    subprocess.run(
        f"pip install {' '.join(dependencies)} -t {package_dir}",
        shell=True,
        check=True
    )

    # Create ZIP file with dependencies and function
    logger.info(f"Creating Lambda deployment package: {zip_file_path}")
    with zipfile.ZipFile(zip_file_path, 'w') as zipf:
        # Add dependencies
        for root, _, files in os.walk(package_dir):
            for file in files:
                file_path = os.path.join(root, file)
                arcname = os.path.relpath(file_path, package_dir)
                zipf.write(file_path, arcname)

        # Add the Lambda function code
        zipf.write(source_file, os.path.basename(source_file))

    # Cleanup temporary package directory
    logger.info("Cleaning up temporary package directory...")
    subprocess.run(f"rm -rf {package_dir}", shell=True)
    logger.info("Lambda package created successfully.")


def create_lambda_function(function_name: str,
                           role_arn: str,
                           handler: str,
                           runtime: str,
                           zip_file_path: str,
                           region_name: str = region) -> dict:
    """
    Creates or updates an AWS Lambda function.

    :param function_name: Name of the Lambda function to create or update.
    :param role_arn: ARN of the IAM Role that Lambda will assume.
    :param handler: The function handler (e.g., 'index.lambda_handler').
    :param runtime: The Lambda runtime (e.g., 'python3.12').
    :param zip_file_path: Path to the ZIP file containing the Lambda code.
    :param region_name: AWS region where the Lambda will be created.
    :return: The response from the create_function or update_function_code API call.
    """
    logger.info(f"Creating/updating Lambda function: {function_name}")
    lambda_client = boto3.client('lambda', region_name=region_name)

    with open(zip_file_path, 'rb') as f:
        zip_content = f.read()

    try:
        response = lambda_client.create_function(
            FunctionName=function_name,
            Runtime=runtime,
            Role=role_arn,
            Handler=handler,
            Code={'ZipFile': zip_content},
            Description='Lambda function to execute DSL queries',
            Timeout=15,
            MemorySize=128,
            Publish=True
        )
        logger.info(f"Lambda function {function_name} created successfully.")
    except lambda_client.exceptions.ResourceConflictException:
        logger.info(f"Lambda function {function_name} already exists. Updating its code...")
        response = lambda_client.update_function_code(
            FunctionName=function_name,
            ZipFile=zip_content
        )
        logger.info(f"Lambda function {function_name} updated successfully.")

    return response


def add_resource_based_policy(function_name: str,
                              agent_ids: list,
                              region_name: str,
                              account_id: str):
    """
    Adds a resource-based policy to the specified Lambda function to allow invocation
    from one or more Bedrock agents.

    :param function_name: Name of the Lambda function.
    :param agent_ids: List of agent IDs permitted to invoke this Lambda.
    :param region_name: AWS region.
    :param account_id: AWS account ID.
    """
    logger.info(f"Adding resource-based policy to Lambda function {function_name} for agents: {agent_ids}")
    statement_id_prefix = "AllowExecutionFromBedrockAgent"
    policy_doc = {
        "Version": "2012-10-17",
        "Statement": []
    }

    for agent_id in agent_ids:
        sid = f"{statement_id_prefix}_{agent_id}"
        policy_doc['Statement'].append({
            "Sid": sid,
            "Effect": "Allow",
            "Principal": {
                "Service": "bedrock.amazonaws.com"
            },
            "Action": "lambda:InvokeFunction",
            "Resource": f"arn:aws:lambda:{region_name}:{account_id}:function:{function_name}",
            "Condition": {
                "ArnLike": {
                    "AWS:SourceArn": f"arn:aws:bedrock:{region_name}:{account_id}:agent/{agent_id}"
                }
            }
        })

    # Retrieve existing policy and remove any existing statements with the same prefix
    try:
        existing_policy = lambda_client.get_policy(FunctionName=function_name)
        existing_policy_doc = json.loads(existing_policy['Policy'])
        for stmt in existing_policy_doc['Statement']:
            if stmt['Sid'].startswith(statement_id_prefix):
                sid_to_remove = stmt['Sid']
                logger.info(f"Removing existing statement: {sid_to_remove}")
                lambda_client.remove_permission(
                    FunctionName=function_name,
                    StatementId=sid_to_remove
                )
    except lambda_client.exceptions.ResourceNotFoundException:
        logger.info(f"No existing policy found for Lambda function {function_name}.")
    except Exception as e:
        logger.error(f"Error retrieving/removing existing policy for {function_name}: {str(e)}")

    # Add new permissions
    for stmt in policy_doc['Statement']:
        sid_val = stmt['Sid']
        try:
            lambda_client.add_permission(
                FunctionName=function_name,
                StatementId=sid_val,
                Action=stmt['Action'],
                Principal=stmt['Principal']['Service'],
                SourceArn=stmt['Condition']['ArnLike']['AWS:SourceArn']
            )
            logger.info(f"Added permission for statement: {sid_val}")
        except Exception as e:
            logger.error(f"Failed to add resource-based policy for {function_name}, statement {sid_val}: {str(e)}")


## Main Execution

In [6]:
# Load Shipping Schema 
with open('schemas/ecom_shipping_schema.json', 'r') as file:
    ecom_shipping_schema = json.load(file)
ecom_shipping_schema_string = json.dumps(ecom_shipping_schema, indent=2)

# Agent foundation model 
agent_foundation_model = [
    "anthropic.claude-3-5-sonnet-20241022-v2:0"
]

# Force re-create default setting for Agent objects, but for now set to False
Agent.set_force_recreate_default(False)

In [7]:
"""
Main execution flow:
    1. Create an IAM Role for Lambda.
    2. Create/Update two Lambda functions (execute-dsl-query, execute-modified-dsl-query).
    3. Create DSL Query Agent & Query Fixer Agent referencing those Lambda functions.
    4. Retrieve the newly created agent IDs.
    5. Add resource-based policies to each Lambda function for those agent IDs.
    6. Create the Supervisor Agent to orchestrate both DSL Query and Query Fixer agents.
    7. Invoke the Supervisor Agent with a sample query.
    8. Delete the agents (cleanup).
"""
# -------------------------------------------------------------------------
# 1. Create (or retrieve) IAM Role for Lambda
# -------------------------------------------------------------------------
IAM_ROLE_NAME = f"LambdaExecutionRole-{agent_suffix}"
role_arn = create_iam_role(IAM_ROLE_NAME)


# -------------------------------------------------------------------------
# 2. Create the first Lambda (execute-dsl-query)
# -------------------------------------------------------------------------
DSL_QUERY_LAMBDA_NAME = f"execute-dsl-query-{agent_suffix}"
DSL_QUERY_LAMBDA_PATH = "src/lambda/execute_dsl_query.py"
DSL_QUERY_ZIP_PATH = "dsl_query_function.zip"

if not os.path.exists(DSL_QUERY_LAMBDA_PATH):
    logger.error(f"Error: {DSL_QUERY_LAMBDA_PATH} does not exist.")

DEPENDENCIES = ["opensearch-py", "requests", "urllib3"]

# Package & create the Lambda
create_lambda_package(DSL_QUERY_LAMBDA_PATH, DSL_QUERY_ZIP_PATH, DEPENDENCIES)
create_lambda_function(
    function_name=DSL_QUERY_LAMBDA_NAME,
    role_arn=role_arn,
    handler="execute_dsl_query.lambda_handler",
    runtime="python3.12",
    zip_file_path=DSL_QUERY_ZIP_PATH
)
os.remove(DSL_QUERY_ZIP_PATH)


2025-02-07 16:53:41,213 - __main__ - INFO - Creating or retrieving IAM Role: LambdaExecutionRole-us-west-2-533
2025-02-07 16:53:41,554 - __main__ - INFO - IAM Role LambdaExecutionRole-us-west-2-533 already exists. Retrieving existing role.
2025-02-07 16:53:41,802 - __main__ - INFO - Attached AWSLambdaBasicExecutionRole to LambdaExecutionRole-us-west-2-533.
2025-02-07 16:53:41,940 - __main__ - INFO - Attached OpenSearch policy to IAM Role LambdaExecutionRole-us-west-2-533.
2025-02-07 16:53:42,066 - __main__ - INFO - Attached AOSS policy to IAM Role LambdaExecutionRole-us-west-2-533.
2025-02-07 16:53:42,068 - __main__ - INFO - Waiting 10 seconds for IAM role to propagate...
2025-02-07 16:53:52,074 - __main__ - INFO - Packaging Lambda function from src/lambda/execute_dsl_query.py with dependencies ['opensearch-py', 'requests', 'urllib3']
2025-02-07 16:53:52,077 - __main__ - INFO - Installing dependencies locally...


Collecting opensearch-py
  Using cached opensearch_py-2.8.0-py3-none-any.whl.metadata (6.9 kB)
Collecting requests
  Using cached requests-2.32.3-py3-none-any.whl.metadata (4.6 kB)
Collecting urllib3
  Using cached urllib3-2.3.0-py3-none-any.whl.metadata (6.5 kB)
Collecting python-dateutil (from opensearch-py)
  Using cached python_dateutil-2.9.0.post0-py2.py3-none-any.whl.metadata (8.4 kB)
Collecting certifi>=2024.07.04 (from opensearch-py)
  Using cached certifi-2025.1.31-py3-none-any.whl.metadata (2.5 kB)
Collecting Events (from opensearch-py)
  Using cached Events-0.5-py3-none-any.whl.metadata (3.9 kB)
Collecting charset-normalizer<4,>=2 (from requests)
  Using cached charset_normalizer-3.4.1-cp312-cp312-macosx_10_13_universal2.whl.metadata (35 kB)
Collecting idna<4,>=2.5 (from requests)
  Using cached idna-3.10-py3-none-any.whl.metadata (10 kB)
Collecting six>=1.5 (from python-dateutil->opensearch-py)
  Using cached six-1.17.0-py2.py3-none-any.whl.metadata (1.7 kB)
Using cached op

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
aiobotocore 2.16.1 requires botocore<1.35.89,>=1.35.74, but you have botocore 1.36.9 which is incompatible.
datasets 2.21.0 requires dill<0.3.9,>=0.3.0, but you have dill 0.3.9 which is incompatible.
datasets 2.21.0 requires fsspec[http]<=2024.6.1,>=2023.1.0, but you have fsspec 2024.12.0 which is incompatible.
awscli 1.34.8 requires botocore==1.35.8, but you have botocore 1.36.9 which is incompatible.
awscli 1.34.8 requires s3transfer<0.11.0,>=0.10.0, but you have s3transfer 0.11.2 which is incompatible.[0m[31m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
2025-02-07 16:53:53,148 - __main__ - INFO - C

Successfully installed Events-0.5 certifi-2025.1.31 charset-normalizer-3.4.1 idna-3.10 opensearch-py-2.8.0 python-dateutil-2.9.0.post0 requests-2.32.3 six-1.17.0 urllib3-2.3.0


2025-02-07 16:53:54,978 - __main__ - INFO - Lambda function execute-dsl-query-us-west-2-533 already exists. Updating its code...
2025-02-07 16:53:56,959 - __main__ - INFO - Lambda function execute-dsl-query-us-west-2-533 updated successfully.


In [8]:
# -------------------------------------------------------------------------
# 2(b). Create the second Lambda (execute-modified-dsl-query)
# -------------------------------------------------------------------------
MODIFIED_QUERY_LAMBDA_NAME = f"execute-modified-dsl-query-{agent_suffix}"
MODIFIED_QUERY_LAMBDA_PATH = "src/lambda/execute_modified_dsl_query.py"
MODIFIED_QUERY_ZIP_PATH = "modified_query_function.zip"

if not os.path.exists(MODIFIED_QUERY_LAMBDA_PATH):
    logger.error(f"Error: {MODIFIED_QUERY_LAMBDA_PATH} does not exist.")

create_lambda_package(MODIFIED_QUERY_LAMBDA_PATH, MODIFIED_QUERY_ZIP_PATH, DEPENDENCIES)
create_lambda_function(
    function_name=MODIFIED_QUERY_LAMBDA_NAME,
    role_arn=role_arn,
    handler="execute_modified_dsl_query.lambda_handler",
    runtime="python3.12",
    zip_file_path=MODIFIED_QUERY_ZIP_PATH
)
os.remove(MODIFIED_QUERY_ZIP_PATH)

2025-02-07 16:53:56,971 - __main__ - INFO - Packaging Lambda function from src/lambda/execute_modified_dsl_query.py with dependencies ['opensearch-py', 'requests', 'urllib3']
2025-02-07 16:53:56,972 - __main__ - INFO - Installing dependencies locally...


Collecting opensearch-py
  Using cached opensearch_py-2.8.0-py3-none-any.whl.metadata (6.9 kB)
Collecting requests
  Using cached requests-2.32.3-py3-none-any.whl.metadata (4.6 kB)
Collecting urllib3
  Using cached urllib3-2.3.0-py3-none-any.whl.metadata (6.5 kB)
Collecting python-dateutil (from opensearch-py)
  Using cached python_dateutil-2.9.0.post0-py2.py3-none-any.whl.metadata (8.4 kB)
Collecting certifi>=2024.07.04 (from opensearch-py)
  Using cached certifi-2025.1.31-py3-none-any.whl.metadata (2.5 kB)
Collecting Events (from opensearch-py)
  Using cached Events-0.5-py3-none-any.whl.metadata (3.9 kB)
Collecting charset-normalizer<4,>=2 (from requests)
  Using cached charset_normalizer-3.4.1-cp312-cp312-macosx_10_13_universal2.whl.metadata (35 kB)
Collecting idna<4,>=2.5 (from requests)
  Using cached idna-3.10-py3-none-any.whl.metadata (10 kB)
Collecting six>=1.5 (from python-dateutil->opensearch-py)
  Using cached six-1.17.0-py2.py3-none-any.whl.metadata (1.7 kB)
Using cached op

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
aiobotocore 2.16.1 requires botocore<1.35.89,>=1.35.74, but you have botocore 1.36.9 which is incompatible.
datasets 2.21.0 requires dill<0.3.9,>=0.3.0, but you have dill 0.3.9 which is incompatible.
datasets 2.21.0 requires fsspec[http]<=2024.6.1,>=2023.1.0, but you have fsspec 2024.12.0 which is incompatible.
awscli 1.34.8 requires botocore==1.35.8, but you have botocore 1.36.9 which is incompatible.
awscli 1.34.8 requires s3transfer<0.11.0,>=0.10.0, but you have s3transfer 0.11.2 which is incompatible.[0m[31m
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
2025-02-07 16:53:57,910 - __main__ - INFO - C

Successfully installed Events-0.5 certifi-2025.1.31 charset-normalizer-3.4.1 idna-3.10 opensearch-py-2.8.0 python-dateutil-2.9.0.post0 requests-2.32.3 six-1.17.0 urllib3-2.3.0


2025-02-07 16:53:59,962 - __main__ - INFO - Lambda function execute-modified-dsl-query-us-west-2-533 already exists. Updating its code...
2025-02-07 16:54:01,568 - __main__ - INFO - Lambda function execute-modified-dsl-query-us-west-2-533 updated successfully.


In [9]:

# -------------------------------------------------------------------------
# 3. Create the DSL Query Agent & Query Fixer Agent
#
#    Important: reference the just-created Lambda ARNs in "tool_code"
#    The actual ARN is "arn:aws:lambda:<REGION>:<ACCOUNT>:function:<FUNCTION_NAME>"
# -------------------------------------------------------------------------
dsl_query_lambda_arn = f"arn:aws:lambda:{region}:{account_id}:function:{DSL_QUERY_LAMBDA_NAME}"
modified_query_lambda_arn = f"arn:aws:lambda:{region}:{account_id}:function:{MODIFIED_QUERY_LAMBDA_NAME}"

logger.info("Creating DSL Query Agent...")
dsl_query_agent = Agent.direct_create(
    name=f"dsl-query-agent-{agent_suffix}",
    role="DSL Query Creator",
    goal="Create DSL queries for a given user query",
    instructions=f"""
    You are an expert in generating Query DSL for Elasticsearch-style queries. Your task is to convert a 
    given natural language user question into a well-structured Query DSL.
    
    ## Instructions:
    - Use the provided e-commerce shipping schema to construct the query.
    - Encapsulate the output in <json>...</json> tags.
    - Follow the syntax of the Query DSL strictly; do not introduce elements outside the provided schema.
    
    ## Query Construction Rules:
    - **Keyword fields** (carrier, status, country): Use `term` for exact matches or `prefix`/`wildcard` for partial matches.
    - **Text fields** (description, address): Use `match` queries to account for analyzed terms.
    - **Nested fields** (tracking): Always use `nested` queries.
    - **Date fields**: Use `range` queries with date math for filtering by date ranges.
    - **Aggregations**: When counting occurrences, use a 'terms' aggregation on the relevant keyword field to capture the exact values present (e.g., 'delivery.carrier').
    - Break down complex queries into smaller parts for accuracy.
    - Think step-by-step before constructing the query.


    ## Schema:
    {ecom_shipping_schema_string}

    ## Output Format:
    - Return only the generated Query DSL within <json>...</json> tags.
    - Do not include explanations, comments, or additional text.
    """,
    tool_code=dsl_query_lambda_arn,
    tool_defs=[
        {
            "name": "execute_dsl_query",
            "description": "Executes a given DSL query and returns the results",
            "parameters": {
                "dsl_query": {
                    "description": "The DSL query to execute",
                    "type": "string",
                    "required": True,
                }
            }
        }
    ]
)

logger.info("Creating Query Fixer Agent...")
query_fixer_agent = Agent.direct_create(
    name=f"query-fixer-agent-{agent_suffix}",
    role="Query Repair Specialist",
    goal="Fix and optimize failed DSL queries",
    instructions=f"""
    You are an expert query debugger and optimizer. Your tasks are:
    1. Analyze failed DSL queries from the query generator
    2. Diagnose errors using OpenSearch error messages
    3. Apply targeted fixes while maintaining original intent
    4. Optimize queries for better recall when results are empty
    5. Extract exact terms from 'terms' aggregations for accurate reporting.
    6. Identify alternative ways to answer queries when direct fields are missing.
    7. Recognize schema gaps and propose workarounds or schema modifications.

    ## Repair Strategies:
    - SYNTAX ERRORS: Fix formatting issues in nested queries/aggregations
    - FIELD ERRORS: Map invalid fields to valid schema equivalents
    - ZERO HITS: Apply query relaxation techniques:
        * Add wildcards to keyword searches
        * Expand date ranges
        * Reduce strictness of term matches
        * Add synonym expansion
    - Ensure queries include 'terms' aggregations to capture exact values

    ## Optimization Rules:
    - Maintain original query structure where possible
    - Prefer query-time fixes over rearchitecting
    - Document all modifications in revision notes
    - Limit query relaxation to 3 iterations
    - When results are found, check the 'terms' aggregation for the exact field values.
    - Report the exact terms from the data (e.g., use "DHL Express" if that's the stored value).

    ## Schema Gap Analysis & Alternative Solutions:
    - Creative Field Mapping: If direct fields are missing, use existing fields to infer answers.
      * Example: If "delivery duration" is not available, compute it using `delivered_time - out_for_delivery_time`.
    - Schema Enhancement: Identify missing fields required for full query support.
    - Derived Data Solutions:
      * If exact data isn’t available but can be computed, create scripted fields using painless scripting.
      * If no alternative exists, clearly state the required data and suggest schema modifications.



    ## Schema:
    {ecom_shipping_schema_string}

    ## Output Format:
    - Return modified query in <json> tags
    - Include revision notes and exact terms from aggregations in <notes> tags 
    """,
    tool_code=modified_query_lambda_arn,
    tool_defs=[
        {
            "name": "retry_query",
            "description": "Retries a modified version of the failed query",
            "parameters": {
                "modified_dsl_query": {
                    "description": "The corrected DSL query",
                    "type": "string",
                    "required": True
                },
                "revision_notes": {
                    "description": "Description of modifications made",
                    "type": "string",
                },
            }
        }
    ]
)



# -------------------------------------------------------------------------
kb_rag_agent = Agent.direct_create(
    name=f"kb-response-agent-{agent_suffix}",
    role="Knowledge Base Content Analyzer",
    goal="Analyze retrieved document content and generate well-structured responses",
    instructions="""
    You analyze pre-retrieved document content and generate clear, accurate responses.

    ## Response Rules:
    - Synthesize information from provided passages
    - Include relevant quotes with proper citations
    - Use consistent citation format [doc_id:para_num]
    - Maintain factual accuracy
    - Flag any inconsistencies between sources

    ## Output Format:
    Response should be structured as:
    1. Direct answer
    2. Supporting evidence
    3. Source citations
    4. Confidence level (High/Medium/Low)

    ## Quality Guidelines:
    - Prefer direct quotes for key information
    - Summarize when appropriate
    - Note any information gaps
    - Maintain neutral tone
    """,
    kb_descr="Use knowledge base to extract relevant information, analyze content across multiple documents, and generate accurate responses with proper citations. Focus on maintaining context and factual accuracy.",
    kb_id="5GADU65GNF",
    verbose=True
)



2025-02-07 16:54:01,589 - __main__ - INFO - Creating DSL Query Agent...
2025-02-07 16:54:02,137 - __main__ - INFO - Creating Query Fixer Agent...


In [10]:
# -------------------------------------------------------------------------
# 4. Retrieve the newly created Agent IDs
# -------------------------------------------------------------------------
logger.info("Retrieving DSL Query Agent ID...")
dsl_query_agent_id = agents_helper.get_agent_id_by_name(dsl_query_agent.name)
logger.info(f"DSL Query Agent ID: {dsl_query_agent_id}")

logger.info("Retrieving Query Fixer Agent ID...")
query_fixer_agent_id = agents_helper.get_agent_id_by_name(query_fixer_agent.name)
logger.info(f"Query Fixer Agent ID: {query_fixer_agent_id}")

logger.info("Retrieving KB Response Agent ID...")
kb_rag_agent_id = agents_helper.get_agent_id_by_name(kb_rag_agent.name)
logger.info(f"KB Response Agent ID: {kb_rag_agent_id}")


2025-02-07 16:54:02,833 - __main__ - INFO - Retrieving DSL Query Agent ID...
2025-02-07 16:54:02,911 - __main__ - INFO - DSL Query Agent ID: CSKABIRKLE
2025-02-07 16:54:02,912 - __main__ - INFO - Retrieving Query Fixer Agent ID...
2025-02-07 16:54:02,996 - __main__ - INFO - Query Fixer Agent ID: SGSWZUVNOB
2025-02-07 16:54:02,998 - __main__ - INFO - Retrieving KB Response Agent ID...
2025-02-07 16:54:03,079 - __main__ - INFO - KB Response Agent ID: PM73T3Y9BR


In [11]:
# -------------------------------------------------------------------------
# 5. Add resource-based policy to each Lambda so the Agents can invoke them
# -------------------------------------------------------------------------
add_resource_based_policy(DSL_QUERY_LAMBDA_NAME, [dsl_query_agent_id], region, account_id)
add_resource_based_policy(MODIFIED_QUERY_LAMBDA_NAME, [query_fixer_agent_id], region, account_id)

2025-02-07 16:54:03,091 - __main__ - INFO - Adding resource-based policy to Lambda function execute-dsl-query-us-west-2-533 for agents: ['CSKABIRKLE']
2025-02-07 16:54:03,216 - __main__ - INFO - Removing existing statement: AllowExecutionFromBedrockAgent_CSKABIRKLE
2025-02-07 16:54:03,392 - __main__ - INFO - Added permission for statement: AllowExecutionFromBedrockAgent_CSKABIRKLE
2025-02-07 16:54:03,393 - __main__ - INFO - Adding resource-based policy to Lambda function execute-modified-dsl-query-us-west-2-533 for agents: ['SGSWZUVNOB']
2025-02-07 16:54:03,438 - __main__ - INFO - Removing existing statement: AllowExecutionFromBedrockAgent_SGSWZUVNOB
2025-02-07 16:54:03,600 - __main__ - INFO - Added permission for statement: AllowExecutionFromBedrockAgent_SGSWZUVNOB


In [None]:
supervisor_agent = SupervisorAgent.direct_create(
    name=f"supervisor-agent-{agent_suffix}",
    role="Query Pipeline Orchestrator",
    collaboration_type="SUPERVISOR",
    collaborator_objects=[dsl_query_agent, query_fixer_agent, kb_rag_agent],
    collaborator_agents=[
        {
            "agent": dsl_query_agent.name,
            "instructions": dedent(f"""
                {dsl_query_agent.name} is responsible for generating the primary DSL query based on 
                the provided e-commerce shipping schema. Your task is to produce a precise Query DSL 
                encapsulated in <json>...</json> tags. Ensure the query strictly follows the schema 
                and DSL syntax without any additional commentary or explanations.
            """).strip(),
            "relay_conversation_history": "TO_COLLABORATOR"
        },
        {
            "agent": query_fixer_agent.name,
            "instructions": dedent(f"""
                Engage {query_fixer_agent.name} when any of the following conditions occur:
                1. The DSL query returns syntax or validation errors.
                2. The DSL query execution returns zero hits.
                3. The query requires optimization for improved recall.
                4. Alternative query solutions are needed due to schema limitations.
                
                Responsibilities:
                - Analyze error messages and the current query structure.
                - Apply targeted fixes that preserve the original query intent.
                - Implement query relaxation techniques (for example, adding wildcards, extending date ranges, or expanding term matches).
                - Identify and map alternative fields if direct schema fields are missing.
                - Suggest schema enhancements when appropriate.
                - Document all modifications with clear revision notes and output exact terms from aggregations.
                
                Return the corrected DSL query within <json>...</json> tags and include any revision notes within <notes>...</notes> tags.
            """),
            "relay_conversation_history": "TO_COLLABORATOR"
        },
        {
            "agent": kb_rag_agent.name,
            "instructions": dedent(f"""
                Engage {kb_rag_agent.name} to answer user questions that require analyzing the document content retrieved from executed queries.
                When search results are available, your task is to:
                
                1. Synthesize and validate the information from the provided passages.
                2. Generate a final response that includes a direct answer and supporting evidence with relevant quotes and citations.
                
                Your output must be clear, well-structured, and factually accurate to support decision-making.
            """),
            "relay_conversation_history": "TO_COLLABORATOR"
        }
    ],
    instructions=dedent(f"""
        High-Level Overview:
        Route user queries to the appropriate agent based on the type of answer required:
          - Structured Data Retrieval: If the query requires retrieving structured information from the e-commerce shipping data, it is routed to {dsl_query_agent.name}. If the DSL query returns errors or zero hits, the query is immediately routed to the Query Fixer Agent for reattempts.
          - Document Content Analysis: If the query requires a synthesized, in-depth answer derived from analyzing executed query results, it is routed to {kb_rag_agent.name}.
        
        Detailed Instructions:
        
        Route A: Structured Data Retrieval (DSL Query Agent + Query Fixer Agent)
        1. Initial Query Analysis:
           - Receive the user's natural language query.
           - Determine if the query requires structured data retrieval from the e-commerce shipping data.
           - Validate the query against the provided schema:
             {ecom_shipping_schema_string}
           - If the query qualifies, route it to {dsl_query_agent.name}.
        
        2. DSL Query Execution:
           - {dsl_query_agent.name} generates a Query DSL that is encapsulated in <json>...</json> tags and follows the provided schema.
        
        3. Error Handling & Retry:
           - Monitor the query execution results:
             a. If the DSL query returns syntax or validation errors, or if the result is zero hits, capture the error context.
             b. Immediately route the query, along with diagnostic details, to {query_fixer_agent.name}.
             c. {query_fixer_agent.name} applies targeted fixes and query relaxation techniques, then returns a modified DSL query.
             d. Validate the modified query; allow up to 3 retry attempts if necessary.
        
        4. Evaluation & Final Approval (for structured data queries):
           - Confirm that the final DSL query adheres to best practices (for example, proper nested queries, correct field types and mappings).
           - Maintain an audit trail of all query versions and modifications.
           - Generate an execution summary including:
             - Query versions attempted.
             - Reasons for modifications.
             - Performance metrics.
        
        General Aggregation Guidance:
           - If an aggregation returns an unexpectedly inflated count, verify whether the aggregation is counting nested or repeated values.
           - To accurately count unique items, use a cardinality aggregation on a unique field identifier rather than aggregating on fields that might contain duplicate entries.
        
        Route B: Document Content Analysis (KB Response Agent)
        1. Initial Query Analysis:
           - Receive the user's natural language query.
           - Determine if the query requires synthesizing and analyzing document content from executed queries.
           - If so, route the query to {kb_rag_agent.name}.
        
        2. KB Response Generation:
           - {kb_rag_agent.name} synthesizes and validates the information from the provided passages.
           - Generate a final response that includes:
             a. A direct answer.
             b. Supporting evidence with relevant quotes and citations.
             c. A confidence level.
        
        3. Final Response:
           - Deliver the final, well-structured answer to the user.
    """)
)


In [13]:
# Inspect attributes of the SupervisorAgent object
# print(dir(supervisor_agent))


In [14]:
# Retrieve the Supervisor Agent ID
logger.info("Retrieving Supervisor Agent ID...")
supervisor_agent_id = agents_helper.get_agent_id_by_name(supervisor_agent.name)
logger.info(f"Supervisor Agent ID: {supervisor_agent_id}")

2025-02-07 16:54:03,995 - __main__ - INFO - Retrieving Supervisor Agent ID...
2025-02-07 16:54:04,084 - __main__ - INFO - Supervisor Agent ID: QDMOLJO9TV


In [15]:
# Lets get the agent based on the ID
# supervisor_agentV2 = agents_helper.get_agent_by_id("KGTOVCVLKI")

In [16]:
# -------------------------------------------------------------------------
# 7. Invoke the Supervisor Agent with a sample query
# -------------------------------------------------------------------------
response = supervisor_agent.invoke(
    input_text="How many orders have been shipped by DHL?",
    session_id="12345",
    enable_trace=True,
    trace_level="core"
)
logger.info(f"Supervisor agent response: {response}")

invokeAgent API request ID: 56625616-cd32-4d45-b16e-80dbaf235165
invokeAgent API session ID: 12345
  agent id: QDMOLJO9TV, agent alias id: LXJCIDHVKO
[32m---- Step 1 ----[0m
[33mTook 3.6s, using 5659 tokens (in: 5513, out: 146) to complete prior action, observe, orchestrate.[0m
[34mThis query requires structured data retrieval to count orders shipped by DHL carrier. I'll route this to the DSL query agent to create an appropriate query.[0m
[35mUsing sub-agent collaborator: 'dsl-query-agent-us-west-2-533 [CSKABIRKLE/V6WM6PSNHT]' passing input text:[0m
[35mCreate a DSL query to count the number of orders where the delivery carrier is "DHL Express".
[0m
[32m---- Step 1.1 [using sub-agent name:dsl-query-agent-us-west-2-533, id:CSKABIRKLE/V6WM6PSNHT] ----[0m
[33mTook 7.4s, using 6048 tokens (in: 5708, out: 340) to complete prior action, observe, orchestrate.[0m
[34mThe user has requested the same DSL query once again. I'll create the query as requested, maintaining consistency

2025-02-07 16:55:20,846 - __main__ - INFO - Supervisor agent response: There is 1 order that has been shipped by DHL Express.


[32m---- Step 1.3 [using sub-agent name:dsl-query-agent-us-west-2-533, id:CSKABIRKLE/V6WM6PSNHT] ----[0m
[33mTook 3.5s, using 5916 tokens (in: 5812, out: 104) to complete prior action, observe, orchestrate.[0m
[34mLet me send this response to the user.[0m
[36mFinal response:
There is 1 order that has been shipped by DHL Express....[0m
[33mAgent made a total of 4 LLM calls, using 23986 tokens (in: 23198, out: 788), and took 21.3 total seconds[0m


In [17]:
# -------------------------------------------------------------------------
# 7. Invoke the Supervisor Agent with a sample query
# -------------------------------------------------------------------------
response = supervisor_agent.invoke(
    input_text="How many orders have been shipped by DHL?",
    session_id="12345",
    enable_trace=True,
    trace_level="all"
)
logger.info(f"Supervisor agent response: {response}")

invokeAgent API response object: {'ResponseMetadata': {'RequestId': '636198a7-7743-4f97-a3f8-d18819a55c37', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Sat, 08 Feb 2025 00:55:31 GMT', 'content-type': 'application/vnd.amazon.eventstream', 'transfer-encoding': 'chunked', 'connection': 'keep-alive', 'x-amzn-requestid': '636198a7-7743-4f97-a3f8-d18819a55c37', 'x-amz-bedrock-agent-session-id': '12345', 'x-amzn-bedrock-agent-content-type': 'application/json'}, 'RetryAttempts': 0}, 'contentType': 'application/json', 'sessionId': '12345', 'completion': <botocore.eventstream.EventStream object at 0x135467320>}
---
{
  "agentAliasId": "LXJCIDHVKO",
  "agentId": "QDMOLJO9TV",
  "agentVersion": "1",
  "callerChain": [
    {
      "agentAliasArn": "arn:aws:bedrock:us-west-2:533267284022:agent-alias/QDMOLJO9TV/LXJCIDHVKO"
    }
  ],
  "sessionId": "12345",
  "trace": {
    "orchestrationTrace": {
      "modelInvocationInput": {
        "inferenceConfiguration": {
          "maximumLength": 2048,

2025-02-07 16:55:44,463 - __main__ - INFO - Supervisor agent response: There is 1 order that has been shipped by DHL Express.


---
[32m---- Step 3 ----[0m
[33mTook 2.9s, using 6248 tokens (in: 6144, out: 104) to complete prior action, observe, orchestrate.[0m
{
  "agentAliasId": "LXJCIDHVKO",
  "agentId": "QDMOLJO9TV",
  "agentVersion": "1",
  "callerChain": [
    {
      "agentAliasArn": "arn:aws:bedrock:us-west-2:533267284022:agent-alias/QDMOLJO9TV/LXJCIDHVKO"
    }
  ],
  "sessionId": "12345",
  "trace": {
    "orchestrationTrace": {
      "modelInvocationOutput": {
        "metadata": {
          "usage": {
            "inputTokens": 6144,
            "outputTokens": 104
          }
        },
        "rawResponse": {
          "content": "{\"stop_sequence\":null,\"type\":\"message\",\"id\":\"msg_bdrk_01F7PBEhuEv1hbVutv8T4AMJ\",\"content\":[{\"name\":null,\"type\":\"text\",\"id\":null,\"source\":null,\"input\":null,\"is_error\":null,\"text\":\"<thinking>Let me send this response to the user.</thinking>\",\"content\":null,\"guardContent\":null,\"tool_use_id\":null},{\"name\":\"AgentCommunication__sendMe

invokeAgent API input parameters: input_text: How many orders have been shipped by DHL?, agent_id: QDMOLJO9TV, agent_alias_id: LXJCIDHVKO, session_id: 12345, session_state: {}, enable_trace: True, end_session: False, trace_level: all, multi_agent_names: {'CSKABIRKLE/V6WM6PSNHT': 'dsl-query-agent-us-west-2-533', 'SGSWZUVNOB/7KNLAUFPQC': 'query-fixer-agent-us-west-2-533', 'PM73T3Y9BR/GHH3JVSG1F': 'kb-response-agent-us-west-2-533', 'QDMOLJO9TV/LXJCIDHVKO': 'supervisor-agent-us-west-2-533'}


In [None]:
# if isinstance(response, dict):
#     print(f"Answer: {response['answer']}")
#     print(f"Duration: {response['metadata']['duration_seconds']}s")
#     print(f"Total tokens: {response['metadata']['total_tokens']}")
#     # Save trace data if needed
#     with open('trace.json', 'w') as f:
#         json.dump(response['trace'], f, indent=2)
# else:
#     print(response)  # Just print the answer for other trace_level values

In [19]:
response = supervisor_agent.invoke(
    input_text="What are the effects of Covid-19 on e-commerce?",
    session_id="1245",
    enable_trace=True,
    trace_level="core"
)

invokeAgent API input parameters: input_text: What are the effects of Covid-19 on e-commerce?, agent_id: QDMOLJO9TV, agent_alias_id: LXJCIDHVKO, session_id: 1245, session_state: {}, enable_trace: True, end_session: False, trace_level: core, multi_agent_names: {'CSKABIRKLE/V6WM6PSNHT': 'dsl-query-agent-us-west-2-533', 'SGSWZUVNOB/7KNLAUFPQC': 'query-fixer-agent-us-west-2-533', 'PM73T3Y9BR/GHH3JVSG1F': 'kb-response-agent-us-west-2-533', 'QDMOLJO9TV/LXJCIDHVKO': 'supervisor-agent-us-west-2-533'}
[33mTook 8.0s, using 2897 tokens.[0m
[34mThis question requires analyzing shipping data patterns and trends related to Covid-19. I should route this to both the DSL query agent to get relevant data and then to the KB response agent for a comprehensive analysis.

First, I'll request a query to analyze shipping patterns during the Covid-19 period (focusing on 2020-2021) and compare it with pre-Covid data.[0m
[33mTook 11.0s, using 2118 tokens.[0m
[34mTo analyze shipping patterns during the Cov

In [None]:
# response = supervisor_agent.invoke(
#     input_text="How many orders have recipients in Spain and were last updated during customs clearance after January 16, 2024?",
#     session_id="1245",
#     enable_trace=True,
#     trace_level="core"
# )

In [None]:
# response = supervisor_agent.invoke(
#     input_text="What are the temperature controlled packages delivered within 2 hours?",
#     session_id="1245",
#     enable_trace=True,
#     trace_level="core"
# )

In [None]:
# # -------------------------------------------------------------------------
# # 8. Cleanup: Delete the created agents
# # -------------------------------------------------------------------------
# logger.info("Deleting Supervisor Agent...")
# agents_helper.delete_agent(supervisor_agent.name, verbose=True)

# logger.info("Deleting DSL Query Agent...")
# agents_helper.delete_agent(dsl_query_agent.name, verbose=True)

# logger.info("Deleting Query Fixer Agent...")
# agents_helper.delete_agent(query_fixer_agent.name, verbose=True)

# logger.info("Deleting KB Response Agent...")
# agents_helper.delete_agent(kb_rag_agent.name, verbose=True)

# logger.info("All agents deleted. Script completed.")