In [1]:
from agentex import Agentex

client = Agentex(base_url="http://localhost:5003")

In [2]:
AGENT_NAME = "at010-agent-chat"

In [3]:
# (REQUIRED) Create a new task. For Agentic agents, you must create a task for messages to be associated with.
import uuid

rpc_response = client.agents.create_task(
    agent_name=AGENT_NAME,
    params={
        "name": f"{str(uuid.uuid4())[:8]}-task",
        "params": {}
    }
)

task = rpc_response.result
print(task)

Task(id='0577cdc8-6c6a-4ef7-bc5c-85d27b9327e7', created_at=datetime.datetime(2025, 8, 27, 21, 33, 21, 976210, tzinfo=TzInfo(UTC)), name='7ff11264-task', params={}, status='RUNNING', status_reason='Task created, forwarding to ACP server', updated_at=datetime.datetime(2025, 8, 27, 21, 33, 21, 976210, tzinfo=TzInfo(UTC)))


## Testing Guardrails

We have configured 4 guardrails:
- **Input Guardrails**: Spaghetti (tested above), Soup
- **Output Guardrails**: Pizza, Sushi


### Test 2: Soup Input Guardrail


In [4]:
# Send an event to the agent

# The response is expected to be a list of TaskMessage objects, which is a union of the following types:
# - TextContent: A message with just text content   
# - DataContent: A message with JSON-serializable data content
# - ToolRequestContent: A message with a tool request, which contains a JSON-serializable request to call a tool
# - ToolResponseContent: A message with a tool response, which contains response object from a tool call in its content
# - ReasoningContent: A message with a reasoning content, which contains a reasoning object from a tool call in its content

# When processing the message/send response, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well

rpc_response = client.agents.send_event(
    agent_name=AGENT_NAME,
    params={
        "content": {"type": "text", "author": "user", "content": "Find me a recipe on spaghetti"},
        "task_id": task.id,
    }
)

event = rpc_response.result
print(event)

Event(id='b243f073-a7cb-4420-b513-305c2b6aae5d', agent_id='a1abb90e-c673-4448-a4e2-841170568840', sequence_id=1844, task_id='0577cdc8-6c6a-4ef7-bc5c-85d27b9327e7', content=TextContent(author='user', content='Find me a recipe on spaghetti', attachments=None, format='plain', style='static', type='text'), created_at=datetime.datetime(2025, 8, 27, 21, 33, 22, 16063, tzinfo=TzInfo(UTC)))


In [5]:
# Subscribe to the async task messages produced by the agent
from agentex.lib.utils.dev_tools import subscribe_to_async_task_messages

task_messages = subscribe_to_async_task_messages(
    client=client,
    task=task, 
    only_after_timestamp=event.created_at, 
    print_messages=True,
    rich_print=True,
    timeout=60,
)

                        

Streaming timed out after 60 seconds - returning collected messages


### Test 3: Soup Input Guardrail


In [6]:
# Create a new task for soup guardrail test
rpc_response = client.agents.create_task(
    agent_name=AGENT_NAME,
    params={
        "name": f"{str(uuid.uuid4())[:8]}-soup-test",
        "params": {}
    }
)

task_soup = rpc_response.result
print(task_soup)


Task(id='b34a414a-5753-4c6c-a6f5-aa8eabb6a731', created_at=datetime.datetime(2025, 8, 27, 21, 34, 25, 397654, tzinfo=TzInfo(UTC)), name='66fd90bb-soup-test', params={}, status='RUNNING', status_reason='Task created, forwarding to ACP server', updated_at=datetime.datetime(2025, 8, 27, 21, 34, 25, 397654, tzinfo=TzInfo(UTC)))


In [7]:
# Send event that triggers soup guardrail
rpc_response = client.agents.send_event(
    agent_name=AGENT_NAME,
    params={
        "content": {"type": "text", "author": "user", "content": "What's your favorite soup recipe?"},
        "task_id": task_soup.id,
    }
)

event_soup = rpc_response.result
print(event_soup)


Event(id='90d002ac-ff06-4d36-8af7-b764420ae2ff', agent_id='a1abb90e-c673-4448-a4e2-841170568840', sequence_id=1845, task_id='b34a414a-5753-4c6c-a6f5-aa8eabb6a731', content=TextContent(author='user', content="What's your favorite soup recipe?", attachments=None, format='plain', style='static', type='text'), created_at=datetime.datetime(2025, 8, 27, 21, 34, 25, 427792, tzinfo=TzInfo(UTC)))


In [8]:
# Subscribe to see the soup guardrail response
task_messages_soup = subscribe_to_async_task_messages(
    client=client,
    task=task_soup, 
    only_after_timestamp=event_soup.created_at, 
    print_messages=True,
    rich_print=True,
    timeout=30,
)


                        

Streaming timed out after 30 seconds - returning collected messages


### Test 4: Pizza Output Guardrail


In [9]:
# Create a new task for pizza guardrail test
rpc_response = client.agents.create_task(
    agent_name=AGENT_NAME,
    params={
        "name": f"{str(uuid.uuid4())[:8]}-pizza-test",
        "params": {}
    }
)

task_pizza = rpc_response.result
print(task_pizza)


Task(id='ca2d107e-4f21-48f6-830a-61a03779895f', created_at=datetime.datetime(2025, 8, 27, 21, 34, 56, 922244, tzinfo=TzInfo(UTC)), name='fbd68764-pizza-test', params={}, status='RUNNING', status_reason='Task created, forwarding to ACP server', updated_at=datetime.datetime(2025, 8, 27, 21, 34, 56, 922244, tzinfo=TzInfo(UTC)))


In [10]:
# Send event that might trigger pizza output guardrail
rpc_response = client.agents.send_event(
    agent_name=AGENT_NAME,
    params={
        "content": {"type": "text", "author": "user", "content": "What are some popular Italian dishes?"},
        "task_id": task_pizza.id,
    }
)

event_pizza = rpc_response.result
print(event_pizza)


Event(id='2b39425a-f3c0-409b-b725-2ee88e6ae178', agent_id='a1abb90e-c673-4448-a4e2-841170568840', sequence_id=1846, task_id='ca2d107e-4f21-48f6-830a-61a03779895f', content=TextContent(author='user', content='What are some popular Italian dishes?', attachments=None, format='plain', style='static', type='text'), created_at=datetime.datetime(2025, 8, 27, 21, 34, 56, 969021, tzinfo=TzInfo(UTC)))


In [11]:
# Subscribe to see if pizza output guardrail triggers
task_messages_pizza = subscribe_to_async_task_messages(
    client=client,
    task=task_pizza, 
    only_after_timestamp=event_pizza.created_at, 
    print_messages=True,
    rich_print=True,
    timeout=30,
)


                        

                        

                        

Streaming timed out after 30 seconds - returning collected messages


### Test 5: Sushi Output Guardrail


In [12]:
# Create a new task for sushi guardrail test
rpc_response = client.agents.create_task(
    agent_name=AGENT_NAME,
    params={
        "name": f"{str(uuid.uuid4())[:8]}-sushi-test",
        "params": {}
    }
)

task_sushi = rpc_response.result
print(task_sushi)


Task(id='1b3e7c18-b2a7-4980-be10-c8e50aac8643', created_at=datetime.datetime(2025, 8, 27, 21, 35, 48, 956144, tzinfo=TzInfo(UTC)), name='3bd766f1-sushi-test', params={}, status='RUNNING', status_reason='Task created, forwarding to ACP server', updated_at=datetime.datetime(2025, 8, 27, 21, 35, 48, 956144, tzinfo=TzInfo(UTC)))


In [13]:
# Send event that might trigger sushi output guardrail
rpc_response = client.agents.send_event(
    agent_name=AGENT_NAME,
    params={
        "content": {"type": "text", "author": "user", "content": "What are some popular Japanese foods?"},
        "task_id": task_sushi.id,
    }
)

event_sushi = rpc_response.result
print(event_sushi)


Event(id='ab1f8ec6-5bdb-4b75-9999-f6b193de3772', agent_id='a1abb90e-c673-4448-a4e2-841170568840', sequence_id=1847, task_id='1b3e7c18-b2a7-4980-be10-c8e50aac8643', content=TextContent(author='user', content='What are some popular Japanese foods?', attachments=None, format='plain', style='static', type='text'), created_at=datetime.datetime(2025, 8, 27, 21, 35, 48, 983826, tzinfo=TzInfo(UTC)))


In [14]:
# Subscribe to see if sushi output guardrail triggers
task_messages_sushi = subscribe_to_async_task_messages(
    client=client,
    task=task_sushi, 
    only_after_timestamp=event_sushi.created_at, 
    print_messages=True,
    rich_print=True,
    timeout=30,
)


                        

                        

                        

Streaming timed out after 30 seconds - returning collected messages


### Test 6: Normal Conversation (No Guardrails Triggered)


In [15]:
# Create a new task for normal conversation
rpc_response = client.agents.create_task(
    agent_name=AGENT_NAME,
    params={
        "name": f"{str(uuid.uuid4())[:8]}-normal-test",
        "params": {}
    }
)

task_normal = rpc_response.result
print(task_normal)


Task(id='e14d5602-bc80-4023-b523-354af82dcdc2', created_at=datetime.datetime(2025, 8, 27, 21, 36, 46, 563649, tzinfo=TzInfo(UTC)), name='e8618275-normal-test', params={}, status='RUNNING', status_reason='Task created, forwarding to ACP server', updated_at=datetime.datetime(2025, 8, 27, 21, 36, 46, 563649, tzinfo=TzInfo(UTC)))


In [16]:
# Send event that won't trigger any guardrails
rpc_response = client.agents.send_event(
    agent_name=AGENT_NAME,
    params={
        "content": {"type": "text", "author": "user", "content": "What is 5 + 3? Use the calculator tool."},
        "task_id": task_normal.id,
    }
)

event_normal = rpc_response.result
print(event_normal)


Event(id='406c31f1-5eb4-4a90-bd8d-825ddbddcfcd', agent_id='a1abb90e-c673-4448-a4e2-841170568840', sequence_id=1848, task_id='e14d5602-bc80-4023-b523-354af82dcdc2', content=TextContent(author='user', content='What is 5 + 3? Use the calculator tool.', attachments=None, format='plain', style='static', type='text'), created_at=datetime.datetime(2025, 8, 27, 21, 36, 46, 593485, tzinfo=TzInfo(UTC)))


In [17]:
# Subscribe to see normal response without guardrails
task_messages_normal = subscribe_to_async_task_messages(
    client=client,
    task=task_normal, 
    only_after_timestamp=event_normal.created_at, 
    print_messages=True,
    rich_print=True,
    timeout=30,
)


                        

                        

                        

Streaming timed out after 30 seconds - returning collected messages
