
# Agno Workflows — Executive, Visual Guide (Mermaid-first)

**Goal:** a concise, visual, production-focused notebook on Agno **Workflows** — what they are, why they matter, and how to use them with **deterministic** control.  
Diagrams are in **Mermaid**, with minimal code only where useful.



## 1) What are Workflows?

Workflows orchestrate **Agents**, **Teams**, and **Functions** through an explicit series of **Steps**.  
Unlike free chat-style coordination, workflows run in a **predictable order** with **defined I/O**, making them ideal for production automation.

### High-level Flow
```mermaid
flowchart TD
    A[User / Trigger] --> B[Workflow]
    B --> C1[Step 1: Agent]
    C1 --> C2[Step 2: Function]
    C2 --> D{Condition?}
    D -- Yes --> E1[Branch A: Team]
    D -- No  --> E2[Branch B: Agent]
    E1 --> F[Optional Loop / Router]
    E2 --> F
    F --> G[Output + Artifacts]
```



## 2) Why use Workflows?

| Need | What Workflows Provide |
|---|---|
| Deterministic execution | Fixed step order, typed inputs/outputs, repeatable runs |
| Complex orchestration | Teams + Agents + Functions with branching, looping, parallelism |
| Reliability & audit | Session state, retries, event logs, clear dataflow |

**When to choose:** Use *Workflows* for **predictable pipelines**. Use *Teams* for **open-ended collaboration**.



## 3) Building Blocks

| Block | Purpose |
|---|---|
| **Workflow** | Top-level orchestrator |
| **Step** | Single executor (Agent / Team / Function) |
| **Parallel** | Run multiple steps concurrently |
| **Condition** | Run steps only if predicate is true |
| **Loop** | Repeat steps until a condition is met |
| **Router** | Choose next step(s) dynamically |

### Data flow for a Function Step
```mermaid
sequenceDiagram
    participant WF as Workflow
    participant S as Step (Function)
    WF->>S: StepInput{ input, previous_step_content, images, additional_data, ... }
    S-->>WF: StepOutput{ content, success, stop?, images?, metadata? }
```



## 4) Small, mixed example (Agent + Function + Agent)

> A tiny pipeline: research (team), custom preprocessor (function), planner (agent).  
> Keep steps independent; return **StepOutput(content=...)** from functions.


In [None]:

# Minimal didactic example — snippets only (may not run without Agno installed)
from agno.workflow import Step, Workflow, StepInput, StepOutput

# Pretend: existing 'research_team' and 'content_agent' are defined elsewhere.

def data_preprocessor(step_input: StepInput) -> StepOutput:
    # Normalize / enrich the prior step's content
    content = step_input.previous_step_content or ""
    return StepOutput(content=f"[Preprocessed]\n{content[:500]}")

workflow = Workflow(
    name="Mixed Execution Pipeline",
    steps=[
        # Step(name="Research", team=research_team),
        Step(name="Preprocess", executor=data_preprocessor),
        # Step(name="Plan Content", agent=content_agent),
    ]
)

# workflow.print_response("Analyze the competitive landscape for fintech startups", markdown=True)



## 5) Running Workflows

- **`run()`** returns a `WorkflowRunOutput` (non-streaming)  
- **`print_response()`** prints nicely (helper around `run()`)
- **`arun()`** is the async variant

```python
response = my_workflow.run(input="AI trends 2024", markdown=True)
# Async: await my_workflow.arun(input="AI trends 2024")
```



## 6) Streaming Responses & Events

Set `stream=True` to get events as they happen. Use `stream_events=True` for **all** workflow events.

```mermaid
flowchart LR
    S[Start] --> E1[WorkflowStarted]
    E1 --> P[StepStarted -> Executor events -> StepCompleted]
    P --> B{More Steps?}
    B -- Yes --> P
    B -- No --> E2[WorkflowCompleted]
```



### Core Event Types (summary)

| Category | Events |
|---|---|
| Core | `WorkflowStarted`, `WorkflowCompleted`, `WorkflowError` |
| Step | `StepStarted`, `StepCompleted`, `StepError` |
| Parallel | `ParallelExecutionStarted`, `ParallelExecutionCompleted` |
| Condition | `ConditionExecutionStarted`, `ConditionExecutionCompleted` |
| Loop | `LoopExecutionStarted`, `LoopIterationStartedEvent`, `LoopIterationCompletedEvent`, `LoopExecutionCompleted` |
| Router | `RouterExecutionStarted`, `RouterExecutionCompleted` |
| Functions | `StepOutput` (for custom function steps) |



## 7) Storing Events (observability)

- `store_events=True` captures full history for debugging/audit.  
- `events_to_skip=[...]` filters noisy events.

```python
debug_workflow = Workflow(name="Debug", store_events=True, steps=[...])
prod_workflow  = Workflow(name="Prod", store_events=True, events_to_skip=[...], steps=[...])
fast_workflow  = Workflow(name="Fast", store_events=False, steps=[...])
```



## 8) Input & Output (structured, type-safe)

### Accepted input forms
| Type | Example | When to use |
|---|---|---|
| String | `"Analyze AI trends"` | Simple prompts |
| Dict | `{"query": "AI", "sources": ["web"]}` | Key/value inputs |
| List | `["AI","ML"]` | Batch-ish, short lists |
| Pydantic | `ResearchRequest(topic="AI", depth=5)` | Validated, type-safe API surfaces |

> When forwarded to an Agent/Team, non-string input is serialized to text.


In [None]:

# Pydantic input (schema validation)
from pydantic import BaseModel, Field

class ResearchRequest(BaseModel):
    topic: str = Field(description="Research topic")
    depth: int = Field(description="Depth 1-10")
    sources: list[str] = Field(description="Preferred sources")

# workflow.print_response(message=ResearchRequest(topic="AI trends 2024", depth=8, sources=["academic","industry"]))



### Step-level Structured IO

Each agent can specify an `output_schema`. Function steps receive **typed** content from the previous step and should return `StepOutput(content=<typed model or string>)`.


In [None]:

# Example transformation in a custom function step
def transform(step_input: StepInput) -> StepOutput:
    prev = step_input.previous_step_content  # could be a Pydantic model
    # ...inspect/modify...
    return StepOutput(content=prev)  # or a new model / summarized text



## 9) Media Input & Processing

Workflows can pass **images/videos/audio** between steps; artifacts accumulate in the final `WorkflowRunOutput`.

```mermaid
flowchart TD
    A[Input: Text + Image] --> S1[Agent: Analyze Image]
    S1 -->|content + derived artifacts| S2[Agent: Research Related News]
    S2 --> Z[Final Output + All Artifacts]
```



## 10) Additional Data & Metadata

Use `additional_data` to pass **context/config** without polluting the message flow (e.g., user tier, SLA, priority). Access via `step_input.additional_data`.

```python
def planner(step_input: StepInput) -> StepOutput:
    priority = (step_input.additional_data or {}).get("priority","normal")
    # ...use alongside main input & previous_step_content...
    return StepOutput(content=f"Plan built. Priority={priority}")
```



## 11) Access Multiple Previous Steps

Retrieve any earlier content by **step name** or grab everything at once.

```python
def comprehensive_report(step_input: StepInput) -> StepOutput:
    hn = step_input.get_step_content("research_hackernews") or ""
    web = step_input.get_step_content("research_web") or ""
    all_prev = step_input.get_all_previous_content()
    return StepOutput(content=f"# Report\nHN: {hn[:300]}\nWeb: {web[:300]}")
```



### Visual: named lookups
```mermaid
flowchart LR
    A[research_hackernews] --> C[comprehensive_report]
    B[research_web] --> C
    C --> D[final_reasoning]
```



## 12) Early Stopping

Any step can halt execution by returning `StepOutput(stop=True)`.

```mermaid
flowchart TD
    S1[Security Scan] --> S2[Security Gate]
    S2 -->|Vulnerable| STOP(((Stop Workflow)))
    S2 -->|Safe| S3[Deploy]
    S3 --> S4[Post-Deploy Monitoring]
```


In [None]:

def security_gate(step_input: StepInput) -> StepOutput:
    result = (step_input.previous_step_content or "").upper()
    if "VULNERABLE" in result:
        return StepOutput(content="Security alert. Blocked.", stop=True)
    return StepOutput(content="Secure. Proceed.")



## 13) Workflow Cancellation

- For **background** runs, keep the `run_id` and call `workflow.cancel_run(run_id)`.
- Streaming cancellations emit `WorkflowCancelledEvent`.

```mermaid
sequenceDiagram
    participant App
    participant WF as Workflow
    App->>WF: arun(input=..., background=True)
    WF-->>App: run_id
    App->>WF: cancel_run(run_id)
    WF-->>App: cancelled=True
```



## 14) Workflow Tools (call a Workflow from an Agent/Team)

Give a workflow to an Agent via `WorkflowTools` so the agent can execute it as a tool.


In [None]:

# from agno.tools.workflow import WorkflowTools
# wf_tools = WorkflowTools(workflow=blog_post_workflow)
# agent = Agent(model=OpenAIChat(id="gpt-5-mini"), tools=[wf_tools])
# agent.print_response("Create a blog post on 'AI trends 2024'")



## 15) Background Execution (async)

Run long jobs **non-blocking** with `await workflow.arun(..., background=True)` and poll with `get_run(run_id)`.

```mermaid
flowchart LR
    A[Start Background] --> B[arun(background=True)]
    B --> C[(run_id)]
    C --> D[Poll get_run(run_id)]
    D -->|Completed| E[Retrieve Final Output]
```



## 16) Production Checklist

- [ ] Define explicit steps; keep each executor focused  
- [ ] Prefer **typed** inputs/outputs for stability  
- [ ] Capture events (`store_events=True`) in non-dev environments  
- [ ] Add **Conditions**, **Loops**, **Routers** where logic demands  
- [ ] Gate risky actions with **Early Stop** or pre-check steps  
- [ ] For long runs, use **background execution** + cancellation  



---

### TL;DR
Workflows give you **deterministic**, **repeatable**, and **observable** automation for Agent/Team systems. Start linear, then add **parallel**, **conditional**, **loop**, and **routing** patterns as your pipeline matures.
