# Build a Scalable Multi-Agent AI System with Flyte

Welcome to this hands-on tutorial on building intelligent, scalable AI agents!

## üéØ What We'll Build

A **production-ready multi-agent system** that:
- Routes tasks intelligently using an LLM-powered planner
- Executes independent tasks **in parallel** for speed
- Handles complex **dependency chains** automatically
- Scales horizontally using Flyte's distributed execution

Think of it as a **smart task coordinator** that knows when to fan out work and when to wait for dependencies.

## üìö Learning Objectives

By the end of this workshop, you'll understand:

1. **Agent Design Patterns** - How to build modular, specialized AI agents
2. **Dynamic Task Planning** - Using LLMs to create execution plans with dependencies
3. **Parallel Execution** - The "fanout" pattern for concurrent task execution
4. **Result Propagation** - Passing outputs from one agent to dependent agents
5. **Production Deployment** - Scaling agents with Flyte's distributed architecture

---

## üèóÔ∏è System Architecture

```
User Request
    ‚Üì
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ Planner Agent   ‚îÇ ‚Üê Analyzes request, creates execution plan
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
         ‚îÇ
         ‚îú‚îÄ‚Üí Step 0: math (deps: [])     ‚îê
         ‚îú‚îÄ‚Üí Step 1: string (deps: [])   ‚îú‚îÄ Wave 1: Parallel execution
         ‚îú‚îÄ‚Üí Step 2: search (deps: [])   ‚îò
         ‚îÇ
         ‚îî‚îÄ‚Üí Step 3: code (deps: [0,1,2]) ‚Üê Wave 2: Waits for 0,1,2
```

**Key Components:**
- üß† **Planner Agent**: Routes tasks & identifies dependencies
- üîß **Specialist Agents**: Math, String, Web Search, Code
- üéØ **Orchestrator**: Executes plan with parallel fanout
- üîó **Dependency Engine**: Passes results between agents

---

## ‚öôÔ∏è Setup and Configuration

In [1]:
# this is just printing local files in the notebook
def print_code_file(file_path):
  from pygments import highlight
  from pygments.lexers import PythonLexer
  from pygments.formatters import HtmlFormatter
  from IPython.display import HTML

  code = open(file_path).read()
  formatter = HtmlFormatter(style="one-dark", noclasses=True) #can change theme
  html = highlight(code, PythonLexer(), formatter)
  display(HTML(html))

In [2]:
print_code_file("requirements.txt")

**What's happening here:**

This config file sets up:
- üì¶ **Base Environment** (`base_env`): Shared Docker image + secrets for all agents
- üîë **API Keys**: OpenAI credentials loaded from environment
- üê≥ **Container Image**: Debian base with Python dependencies

**Why this matters:** Each agent runs in its own Flyte task, but they share this base configuration. This means you can easily scale specific agents independently while maintaining consistent dependencies.

In [35]:
print_code_file("config.py")

---

## üõ†Ô∏è Utility Functions: The Glue That Holds Everything Together

Before we dive into agents and tools, let's look at the infrastructure that makes our system work:

### 1Ô∏è‚É£ **Decorators** - Making Agent/Tool Creation Easy

Our decorator system provides framework-like capabilities:
- `@agent("name")` - Registers agents in a global registry
- `@tool(agent="name")` - Associates tools with specific agents
- Automatic discovery and routing

**Think of it like Flask routes or FastAPI endpoints** - decorators make complex registration simple.

### 2Ô∏è‚É£ **Plan Executor** - The Agent's Brain

This is where the magic happens:
- Takes a user task (e.g., "Calculate 5 + 3")
- Asks an LLM to create a JSON plan with tool calls
- Executes each tool step-by-step
- Handles the "previous" result pattern for chaining

**Key Innovation:** Few-shot prompting ensures the LLM returns valid JSON tool plans.

### 3Ô∏è‚É£ **Logger** - Observability

Tracks what's happening for debugging and monitoring.

Let's examine the code:

In [36]:
print_code_file("utils/decorators.py")

In [37]:
print_code_file("utils/plan_executor.py")

---

## üîß Tools: Giving Agents Capabilities

Tools are **the actions agents can take**. Each tool is:
- An `async` function for performance
- Decorated with `@tool(agent="...")` for automatic registration
- Traced with `@flyte.trace` for observability

**Architecture Pattern:**
```python
@tool(agent="math")
@flyte.trace
async def add(a, b):
    """Add two numbers"""
    return a + b
```

When an agent needs to solve a task, the LLM generates a plan like:
```json
[
  {"tool": "add", "args": [2, 3], "reasoning": "Adding 2 and 3"}
]
```

The plan executor finds the `add` tool and calls it with the arguments.

**Why async?** Enables parallel tool execution and better resource utilization.

Let's look at each agent's toolkit:

### Math Tools

**Capabilities:** Basic arithmetic, exponents, factorials

These are the building blocks for mathematical reasoning. Simple, focused, composable.

In [41]:
print_code_file("tools/math_tools.py")


### String Tools

**Capabilities:** Word counting, letter counting, text analysis

Useful for natural language processing tasks.

In [42]:
print_code_file("tools/string_tools.py")


### Web Search Tools

**Capabilities:** DuckDuckGo search, webpage content fetching

Gives agents access to real-time information from the web. Notice the adjustable parameters (region, time filter, etc.) - this lets the LLM customize searches.

In [43]:
print_code_file("tools/web_search_tools.py")


### Code Execution Tools

**Capabilities:** Safe Python execution in sandboxed environment

**Security first:** Restricted namespace allows only safe modules (math, json, re, etc.). No file I/O, no network access.

This enables agents to write and run code dynamically - powerful for complex computations!

In [44]:
print_code_file("tools/code_tools.py")

---

## ü§ñ Agents: Specialists That Use Tools to Solve Problems

Each agent follows the same pattern:

### Agent Anatomy:
```python
@dataclass
class MathAgentResult:
    """Structured output - type-safe and serializable"""
    final_result: str
    steps: str
    error: str = ""

env = base_env  # Shared Flyte environment

@env.task              # Makes this a Flyte task (containerized, scalable)
@agent("math")         # Registers in agent registry
async def math_agent(task: str) -> MathAgentResult:
    """
    1. Receives a task (e.g., "Calculate 5 factorial")
    2. Asks LLM to create a tool execution plan
    3. Executes the plan using registered math tools
    4. Returns structured result
    """
    result = await execute_plan(task, agent="math", system_msg=...)
    return MathAgentResult(final_result=result["final_result"], ...)
```

### Key Design Decisions:

**Why Flyte tasks?**
- Each agent runs in its own container
- Independent scaling (e.g., 10 math agents, 2 web search agents)
- Resource isolation and monitoring

**Why dataclasses?**
- Type-safe outputs (no dict key errors)
- Flyte-native serialization (no pickle issues)
- Clear contracts between agents

**Why async?**
- Parallel tool execution
- Non-blocking I/O for web searches
- Better resource utilization

Let's examine each specialist agent:

### Math Agent

**Specialty:** Arithmetic operations, exponents, factorials

**How it works:**
1. User asks: "Calculate 5 factorial"
2. LLM generates: `[{"tool": "factorial", "args": [5], "reasoning": "..."}]`
3. Tool executes: `5! = 120`
4. Returns: `MathAgentResult(final_result="120", ...)`

Simple, focused, reliable.

In [None]:
print_code_file("agents/math_agent.py")

### String Agent

**Specialty:** Text analysis and manipulation

**Example:**
- Task: "Count words in 'The quick brown fox'"
- Tool plan: `[{"tool": "word_count", "args": ["The quick brown fox"]}]`
- Result: `"4"`

Notice how agents can handle multi-step tasks by chaining tools.

In [46]:
print_code_file("agents/string_agent.py")

### Web Search Agent

**Specialty:** Real-time information retrieval

**Example:**
- Task: "Search for Python async tutorials"
- Tool plan:
  ```json
  [
    {"tool": "duck_duck_go", "args": ["Python async tutorial", 5, "us-en", ...]},
    {"tool": "fetch_webpage", "args": ["<url_from_results>", 3000]}
  ]
  ```
- Result: Search results + webpage content

**Note:** This agent can chain tools - search first, then fetch details from results.

In [47]:
print_code_file("agents/web_search_agent.py")


### Code Agent

**Specialty:** Dynamic Python code generation and execution

**Example:**
- Task: "Calculate the first 10 Fibonacci numbers"
- Tool plan:
  ```json
  [{
    "tool": "execute_python",
    "args": ["a,b=0,1\nresult=[]\nfor i in range(10):\n  result.append(a)\n  a,b=b,a+b", 5, "Fibonacci"],
    "reasoning": "Generate Fibonacci sequence"
  }]
  ```
- Result: `[0, 1, 1, 2, 3, 5, 8, 13, 21, 34]`

**Power move:** The LLM writes Python code, which is then executed safely. This enables complex computations beyond simple tool calls!

In [48]:
print_code_file("agents/code_agent.py")


### Planner Agent - The Brain of the System üß†

**This is where the magic happens!**

The Planner Agent is unique - instead of using tools, it **coordinates other agents**:

#### What It Does:
1. **Analyzes** the user's request
2. **Identifies** which agents are needed
3. **Determines dependencies** between steps
4. **Creates an execution plan** with parallel opportunities

#### Example:

**User Request:**
```
"Calculate 2+3 and 5+6, then add those results together"
```

**Planner's Output:**
```python
PlannerDecision(steps=[
    AgentStep(agent="math", task="Calculate 2+3", dependencies=[]),      # Step 0
    AgentStep(agent="math", task="Calculate 5+6", dependencies=[]),      # Step 1
    AgentStep(agent="math", task="Add results", dependencies=[0, 1])     # Step 2
])
```

**Key Insight:** Steps 0 and 1 have `dependencies=[]`, so they can run **in parallel**. Step 2 depends on both, so it waits and receives their results.

#### The Prompt Engineering:

The planner uses **detailed examples** to teach the LLM about dependencies:
- Independent tasks ‚Üí empty dependencies `[]`
- Sequential tasks ‚Üí dependency indices `[0]`, `[0, 1]`, etc.
- The LLM learns to identify parallelization opportunities

This is **dynamic DAG generation** - the execution graph is created at runtime based on the user's natural language request!

Let's see the code:

---

## üéØ The Orchestrator: Bringing It All Together

This is the **execution engine** that makes parallel, dependency-aware execution possible.

### How It Works:

#### 1. **Call the Planner**
```python
planner_decision = await planner_agent(user_request)
# Returns: PlannerDecision with steps and dependencies
```

#### 2. **Build Execution Waves**
```python
while pending_steps:
    # Find steps with satisfied dependencies
    ready_steps = [step for step in pending if all deps completed]
    
    # Execute them ALL IN PARALLEL
    results = await asyncio.gather(*[execute(step) for step in ready_steps])
```

#### 3. **Pass Results to Dependent Steps**

When a step has dependencies, we augment its task:
```python
if step.dependencies:
    context = "Context from previous steps:\n"
    for dep_id in step.dependencies:
        context += f"Step {dep_id}: {completed_results[dep_id]}\n"
    
    task = context + f"\nYour task: {step.task}"
```

### Visual Example:

```
User: "Calculate 2+3 and 5+6, then add results"

Planner ‚Üí [Step 0: 2+3 (deps:[]), Step 1: 5+6 (deps:[]), Step 2: add (deps:[0,1])]

‚ïî‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïó
‚ïë           WAVE 1 (Parallel)           ‚ïë
‚ï†‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ï£
‚ïë  Step 0: math_agent("Calculate 2+3")  ‚ïë ‚Üí Result: "5"
‚ïë  Step 1: math_agent("Calculate 5+6")  ‚ïë ‚Üí Result: "11"
‚ïö‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïù
              ‚Üì (both complete)
‚ïî‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïó
‚ïë            WAVE 2 (Sequential)        ‚ïë
‚ï†‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ï£
‚ïë  Step 2: math_agent(                  ‚ïë
‚ïë    "Context:                          ‚ïë
‚ïë     Step 0: 5                         ‚ïë
‚ïë     Step 1: 11                        ‚ïë
‚ïë                                       ‚ïë
‚ïë     Your task: Add results"           ‚ïë
‚ïë  )                                    ‚ïë ‚Üí Result: "16"
‚ïö‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïù
```

### Key Features:

‚úÖ **Automatic Parallelization** - Uses `asyncio.gather()` for fanout  
‚úÖ **Dependency Resolution** - Tracks which steps are ready  
‚úÖ **Result Propagation** - Injects previous results into dependent tasks  
‚úÖ **Error Handling** - Gracefully handles circular dependencies  
‚úÖ **Observable** - Full logging of each wave and step  

This is essentially a **mini workflow engine** built on Flyte's distributed execution primitives!

Let's see the code:

In [40]:
print_code_file("flyte_dynamic.py")

---

## üöÄ Running the Workflow

The workflow supports two execution modes:

### üåê Remote Execution (Production Mode)
```bash
python flyte_dynamic.py
```
**What happens:**
- Uses `flyte.init_from_config()` to connect to your Flyte cluster
- Each agent runs in its own containerized task
- Parallel execution happens across multiple workers
- Full observability in Flyte UI with execution graphs
- Scalable and production-ready

**When to use:** Production deployments, large workloads, distributed execution

---

### üíª Local Execution (Development Mode)
```bash
python flyte_dynamic.py --local
```
**What happens:**
- Uses `flyte.init()` for in-process execution
- All agents run locally (no cluster needed)
- Still uses async/parallelization via asyncio
- Faster iteration for development and testing
- Same code paths, different execution backend

**When to use:** Local development, quick testing, debugging

---

### üìù Example Test Prompts

Try these in the notebook or by editing `flyte_dynamic.py`:

#### Simple:
```python
"Calculate 5 factorial"
```

#### Parallel Execution:
```python
"Calculate 2 plus 3 and 5 plus 6, then add those results together"
# ‚Üí Wave 1: [2+3, 5+6] in parallel
# ‚Üí Wave 2: [add 5 + 11]
```

#### Mixed Agents:
```python
"Calculate 10 times 5 and count words in 'Hello World', then multiply the word count by the calculation result"
# ‚Üí Wave 1: [math: 10*5, string: count] in parallel
# ‚Üí Wave 2: [math: 50 * 2 = 100]
```

#### Complex Multi-Agent:
```python
"Calculate 5 factorial, count letters in 'hello', and search for 'Flyte', then write Python code to combine them"
# ‚Üí Wave 1: [math, string, web_search] all in parallel
# ‚Üí Wave 2: [code agent receives all 3 results]
```

Let's run it!

In [None]:
!python flyte_dynamic.py

---

## üéì Key Takeaways

Congratulations! You've just explored a production-ready multi-agent system. Here's what makes it special:

### Architecture Highlights:

1. **üß© Modular Design**
   - Each agent is self-contained with its own tools
   - Easy to add new agents (just create file + import in planner)
   - Clear separation of concerns

2. **‚ö° Parallel Execution**
   - Automatic fanout for independent tasks
   - Uses `asyncio.gather()` for concurrency
   - Leverages Flyte's distributed execution

3. **üîó Dependency Management**
   - LLM determines dependencies at runtime
   - Results automatically passed to dependent steps
   - Circular dependency detection

4. **üéØ Smart Routing**
   - Planner analyzes requests in natural language
   - Creates optimal execution plans
   - No hardcoded workflows!

5. **üìä Production Ready**
   - Type-safe with dataclasses
   - Observable with Flyte UI
   - Scalable across multiple workers
   - Error handling and logging

### What You Can Build:

- **Data pipelines** with conditional branching
- **Research assistants** that search + analyze + summarize in parallel
- **Code generation systems** that plan ‚Üí implement ‚Üí test concurrently
- **Multi-modal applications** combining vision, text, and code agents

### Next Steps:

1. **Add your own agents** - Create specialized agents for your domain
2. **Extend tools** - Add more capabilities (database access, API calls, etc.)
3. **Improve prompting** - Fine-tune the planner for better dependency detection
4. **Add memory** - Implement cross-execution context
5. **Deploy** - Scale to production with Flyte's cluster management

---

## üôè Thank You!

Questions? Try experimenting with different prompts and watch how the system adapts!

**Remember:** The power is in the **dynamic DAG generation**. The LLM creates the execution plan, and Flyte executes it efficiently. That's the magic! ‚ú®