Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions docs/user-guide/concepts/multi-agent/graph.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,51 @@ async def run_graph():
result = asyncio.run(run_graph())
```

## Streaming Events

Graphs support real-time streaming of events during execution using [`stream_async`](../../../api-reference/multiagent.md#strands.multiagent.graph.Graph.stream_async). This provides visibility into node execution, parallel processing, and nested multi-agent systems.

```python
from strands import Agent
from strands.multiagent import GraphBuilder

# Create specialized agents
researcher = Agent(name="researcher", system_prompt="You are a research specialist...")
analyst = Agent(name="analyst", system_prompt="You are an analysis specialist...")

# Build the graph
builder = GraphBuilder()
builder.add_node(researcher, "research")
builder.add_node(analyst, "analysis")
builder.add_edge("research", "analysis")
builder.set_entry_point("research")
graph = builder.build()

# Stream events during execution
async for event in graph.stream_async("Research and analyze market trends"):
# Track node execution
if event.get("multi_agent_node_start"):
print(f"🔄 Node {event['node_id']} starting")

# Monitor agent events within nodes
elif event.get("multi_agent_node_stream"):
inner_event = event["event"]
if "data" in inner_event:
print(inner_event["data"], end="")

# Track node completion
elif event.get("multi_agent_node_stop"):
node_result = event["node_result"]
print(f"\n✅ Node {event['node_id']} completed in {node_result.execution_time}ms")

# Get final result
elif event.get("multi_agent_result"):
result = event["result"]
print(f"Graph completed: {result.status}")
```

For more details on multi-agent streaming including event types, parallel execution, and nested graphs, see [Multi-Agent Streaming](../streaming/multi-agent-streaming.md).

## Graph Results

When a Graph completes execution, it returns a [`GraphResult`](../../../api-reference/multiagent.md#strands.multiagent.graph.GraphResult) object with detailed information:
Expand Down
39 changes: 39 additions & 0 deletions docs/user-guide/concepts/multi-agent/swarm.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,45 @@ async def run_swarm():
result = asyncio.run(run_swarm())
```

## Streaming Events

Swarms support real-time streaming of events during execution using [`stream_async`](../../../api-reference/multiagent.md#strands.multiagent.swarm.Swarm.stream_async). This provides visibility into agent collaboration, handoffs, and autonomous coordination.

```python
from strands import Agent
from strands.multiagent import Swarm

# Create specialized agents
coordinator = Agent(name="coordinator", system_prompt="You coordinate tasks...")
specialist = Agent(name="specialist", system_prompt="You handle specialized work...")

# Create swarm
swarm = Swarm([coordinator, specialist])

# Stream events during execution
async for event in swarm.stream_async("Design and implement a REST API"):
# Track node execution
if event.get("multi_agent_node_start"):
print(f"🔄 Agent {event['node_id']} taking control")

# Monitor agent events
elif event.get("multi_agent_node_stream"):
inner_event = event["event"]
if "data" in inner_event:
print(inner_event["data"], end="")

# Track handoffs
elif event.get("multi_agent_handoff"):
print(f"\n🔀 Handoff: {event['from_node']} → {event['to_node']}")

# Get final result
elif event.get("multi_agent_result"):
result = event["result"]
print(f"\nSwarm completed: {result.status}")
```

For more details on multi-agent streaming including event types, handoff tracking, and nested swarms, see [Multi-Agent Streaming](../streaming/multi-agent-streaming.md).

## Swarm Results

When a Swarm completes execution, it returns a [`SwarmResult`](../../../api-reference/multiagent.md#strands.multiagent.swarm.SwarmResult) object with detailed information:
Expand Down
161 changes: 161 additions & 0 deletions docs/user-guide/concepts/streaming/multi-agent-streaming.md
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont know if we need a whole separate doc explaining how streaming works for graph and swarm. I think we can just include them in their respective docs, and maybe mention it here as well: https://strandsagents.com/latest/documentation/docs/user-guide/concepts/streaming/overview/

Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# Multi-Agent Streaming

Multi-agent systems (Graph and Swarm) support real-time streaming of events during execution, providing visibility into the orchestration process, node execution, and agent handoffs. This enables responsive UIs, real-time monitoring, and debugging of complex multi-agent workflows.

Both Graph and Swarm use the same `stream_async` method pattern as single agents, yielding events as execution progresses.

## Multi-Agent Event Types

In addition to the standard [streaming events](./overview.md#event-types), multi-agent systems emit coordination events:

- `multi_agent_node_start`: When a node begins execution
- `node_id`: Unique identifier for the node
- `node_type`: Type of node ("agent", "swarm", "graph")
- `multi_agent_node_stream`: Forwarded events from agents/multi-agents with node context
- `node_id`: Identifier of the node generating the event
- `event`: The original agent event, see [agent streaming events](./overview.md#event-types)
- `multi_agent_node_stop`: When a node completes execution
- `node_id`: Unique identifier for the node
- `node_result`: Complete NodeResult with execution details, metrics, and status
- `multi_agent_handoff`: When control is handed off between agents (Swarm only)
- `from_node`: Node ID handing off control
- `to_node`: Node ID receiving control
- `message`: Handoff message explaining the transfer
- `multi_agent_result`: Final multi-agent result
- `result`: The final GraphResult or SwarmResult

## Graph Streaming

Graphs stream events as nodes execute according to the graph structure. Events are emitted in real-time, including from parallel node execution.

```python
from strands import Agent
from strands.multiagent import GraphBuilder

# Create specialized agents
researcher = Agent(name="researcher", system_prompt="You are a research specialist...")
analyst = Agent(name="analyst", system_prompt="You are an analysis specialist...")

# Build the graph
builder = GraphBuilder()
builder.add_node(researcher, "research")
builder.add_node(analyst, "analysis")
builder.add_edge("research", "analysis")
builder.set_entry_point("research")
graph = builder.build()

# Stream events during execution
async for event in graph.stream_async("Research and analyze market trends"):
# Track node execution
if event.get("multi_agent_node_start"):
print(f"🔄 Node {event['node_id']} starting ({event['node_type']})")

# Monitor agent events within nodes
elif event.get("multi_agent_node_stream"):
inner_event = event["event"]

# Access nested agent events
if "data" in inner_event:
print(inner_event["data"], end="")

# Track node completion
elif event.get("multi_agent_node_stop"):
node_result = event["node_result"]
print(f"\n✅ Node {event['node_id']} completed in {node_result.execution_time}ms")

# Get final result
elif event.get("multi_agent_result"):
result = event["result"]
print(f"Graph completed: {result.status}")
```

## Swarm Streaming

Swarms stream events as agents collaborate and hand off tasks. Handoff events provide visibility into the autonomous coordination process.

```python
from strands import Agent
from strands.multiagent import Swarm

# Create specialized agents
coordinator = Agent(name="coordinator", system_prompt="You coordinate tasks...")
specialist = Agent(name="specialist", system_prompt="You handle specialized work...")

# Create swarm
swarm = Swarm([coordinator, specialist])

# Stream events during execution
async for event in swarm.stream_async("Design and implement a REST API"):
# Track node execution
if event.get("multi_agent_node_start"):
print(f"🔄 Agent {event['node_id']} taking control")

# Monitor agent events
elif event.get("multi_agent_node_stream"):
inner_event = event["event"]

if "data" in inner_event:
print(inner_event["data"], end="")

# Track handoffs
elif event.get("multi_agent_handoff"):
print(f"\n🔀 Handoff: {event['from_node']} → {event['to_node']}")

# Track node completion
elif event.get("multi_agent_node_stop"):
node_result = event["node_result"]
print(f"✅ Agent {event['node_id']} completed in {node_result.execution_time}ms")

# Get final result
elif event.get("multi_agent_result"):
result = event["result"]
print(f"Swarm completed: {result.status}")
```



## Accessing Node Results

Node stop events include the complete NodeResult with execution details:

```python
from strands import Agent
from strands.multiagent import GraphBuilder

# Create graph
agent_a = Agent(name="agent_a", system_prompt="You are agent A...")
agent_b = Agent(name="agent_b", system_prompt="You are agent B...")

builder = GraphBuilder()
builder.add_node(agent_a, "a")
builder.add_node(agent_b, "b")
builder.add_edge("a", "b")
graph = builder.build()

# Collect node results
node_results = {}

async for event in graph.stream_async("Process task"):
if event.get("multi_agent_node_stop"):
node_id = event["node_id"]
node_result = event["node_result"]

# Store result for analysis
node_results[node_id] = {
"status": node_result.status,
"execution_time": node_result.execution_time,
"tokens": node_result.accumulated_usage["totalTokens"],
"result": node_result.result
}

# Analyze results
for node_id, result in node_results.items():
print(f"{node_id}: {result['execution_time']}ms, {result['tokens']} tokens")
```

## Next Steps

- Learn about [Graph patterns](../multi-agent/graph.md) for structured workflows
- Explore [Swarm patterns](../multi-agent/swarm.md) for autonomous collaboration
- See [Async Iterators](./async-iterators.md) for async streaming patterns
- Review [Callback Handlers](./callback-handlers.md) for synchronous event processing
1 change: 1 addition & 0 deletions docs/user-guide/concepts/streaming/overview.md
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add the new types to the "Event types" section: https://strandsagents.com/latest/documentation/docs/user-guide/concepts/streaming/overview/#event-types

This is the best place we have for knowing (what are all the events that can be emitted).

Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,5 @@ orchestrator("What is 3+3?")

- Learn about [Async Iterators](async-iterators.md) for asynchronous streaming
- Explore [Callback Handlers](callback-handlers.md) for synchronous event processing
- See [Multi-Agent Streaming](multi-agent-streaming.md) for Graph and Swarm streaming
- See the [Agent API Reference](../../../api-reference/agent.md) for complete method documentation
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ nav:
- Overview: user-guide/concepts/streaming/overview.md
- Async Iterators: user-guide/concepts/streaming/async-iterators.md
- Callback Handlers: user-guide/concepts/streaming/callback-handlers.md
- Multi-Agent Streaming: user-guide/concepts/streaming/multi-agent-streaming.md
- Multi-agent:
- Agent2Agent (A2A): user-guide/concepts/multi-agent/agent-to-agent.md
- Agents as Tools: user-guide/concepts/multi-agent/agents-as-tools.md
Expand Down