# 🔥 FlowForge: Client Meeting Prep Tool (CMPT) Chain Implementation

## A Comprehensive Tutorial on Building DAG-Based Chain Orchestration

---

## 📋 Table of Contents

### Part 1: Understanding FlowForge
1. [What is FlowForge?](#1-what-is-flowforge)
2. [What FlowForge Provides vs What CMPT Implements](#2-flowforge-vs-cmpt)
3. [Architecture Overview](#3-architecture-overview)

### Part 2: Setup & Configuration
4. [Imports and Dependencies](#4-imports)
5. [Data Models & Schemas](#5-data-models)
6. [Grid Configuration (Business Rules)](#6-grid-config)

### Part 3: FlowForge Core Concepts
7. [Initializing FlowForge](#7-init-forge)
8. [Registering Data Agents](#8-data-agents)
9. [Defining Chain Steps](#9-chain-steps)
10. [Creating the Chain](#10-chain-definition)

### Part 4: CMPT Pipeline Implementation
11. [Context Builder Steps](#11-context-builder)
12. [Content Prioritization Steps](#12-content-prioritization)
13. [Data Fetching (Parallel Execution)](#13-data-fetch)
14. [Response Builder Steps](#14-response-builder)
15. [LLM Integration](#15-llm-integration)

### Part 5: Execution & Debugging
16. [Chain Validation](#16-validation)
17. [DAG Visualization](#17-visualization)
18. [Running the Chain](#18-execution)
19. [Error Handling & Debugging](#19-error-handling)

### Part 6: Advanced Features
20. [Middleware Integration](#20-middleware)
21. [Additional Components from Old Code](#21-resources)
    - 21.1 [Metrics Validator](#21-1-metrics-validator)
    - 21.2 [Static Subquery Engine](#21-2-static-subquery-engine)
    - 21.3 [LLM Prompts](#21-3-llm-prompts)
    - 21.4 [Persona Extraction (LDAP & ZoomInfo)](#21-4-persona-extraction)
22. [Testing Strategies](#22-testing)
23. [Complete Migration Summary & Production Checklist](#23-summary)

---

<a id="1-what-is-flowforge"></a>
## 1. What is FlowForge?

**FlowForge** is a DAG-based Chain Orchestration Framework inspired by Dagster patterns. It provides:

| Feature | Description |
|---------|-------------|
| **Decorator-based API** | Clean `@forge.agent()`, `@forge.step()`, `@forge.chain()` syntax |
| **Automatic DAG Resolution** | Dependencies are resolved and parallel execution happens automatically |
| **Context Management** | Scoped storage (step/chain/global), token tracking |
| **Middleware Pipeline** | Extensible hooks for logging, caching, summarization |
| **MCP Integration** | Connect external MCP servers as agents |
| **Validation & Visualization** | Built-in chain validation and DAG visualization |

### FlowForge Architecture Diagram

```
┌─────────────────────────────────────────────────────────────────────────────────────┐
│                                    FlowForge Core                                    │
├─────────────────────────────────────────────────────────────────────────────────────┤
│                                                                                     │
│  ┌─────────────────┐   ┌─────────────────┐   ┌─────────────────┐                   │
│  │   Registries    │   │   DAG Executor  │   │ Context Manager │                   │
│  │                 │   │                 │   │                 │                   │
│  │ • AgentRegistry │   │ • Build DAG     │   │ • Step Scope    │                   │
│  │ • StepRegistry  │   │ • Resolve Deps  │   │ • Chain Scope   │                   │
│  │ • ChainRegistry │   │ • Parallel Exec │   │ • Token Track   │                   │
│  └─────────────────┘   └─────────────────┘   └─────────────────┘                   │
│           │                    │                      │                            │
│           └────────────────────┼──────────────────────┘                            │
│                                │                                                    │
│                    ┌───────────▼───────────┐                                       │
│                    │   Middleware Layer    │                                       │
│                    │  (Logging, Caching)   │                                       │
│                    └───────────────────────┘                                       │
└─────────────────────────────────────────────────────────────────────────────────────┘
```

<a id="2-flowforge-vs-cmpt"></a>
## 2. What FlowForge Provides vs What CMPT Implements

### 🔧 FlowForge Provides (Framework Layer)

| Component | What FlowForge Handles |
|-----------|------------------------|
| **DAG Execution Engine** | Automatic dependency resolution, topological sorting, parallel execution |
| **Step Registration** | `@forge.step()` decorator with deps, produces, timeout, retry |
| **Agent Registration** | `@forge.agent()` decorator for data source classes |
| **Chain Definition** | `@forge.chain()` decorator for defining execution pipelines |
| **Context Management** | `ChainContext` with scoped storage (STEP, CHAIN, GLOBAL) |
| **Resource Management** | Lifecycle management for DB connections, HTTP clients |
| **Validation** | `forge.check()` validates chains, dependencies, cycles |
| **Visualization** | `forge.graph()` generates ASCII/Mermaid DAG diagrams |
| **Error Handling** | fail_fast vs continue modes, retry logic |
| **Middleware** | Pre/post hooks for logging, caching, metrics |

---

### 📖 DAG Execution Engine Explained

**DAG** = **D**irected **A**cyclic **G**raph - a structure where tasks flow in one direction with no cycles.

The DAG Execution Engine does three things **automatically**:

| Capability | Description |
|------------|-------------|
| **1. Dependency Resolution** | Figures out which steps need which other steps to complete first |
| **2. Topological Sorting** | Orders steps so dependencies always run before dependents |
| **3. Parallel Execution** | Runs independent steps simultaneously to save time |

#### CMPT Example: Data Fetching

When you define steps like this:

```python
@forge.step(name="fetch_news_data", deps=["prioritize_content"])
@forge.step(name="fetch_sec_data", deps=["prioritize_content"])  
@forge.step(name="fetch_earnings_data", deps=["prioritize_content"])
@forge.step(name="parse_agent_data", deps=["fetch_news_data", "fetch_sec_data", "fetch_earnings_data"])
```

**FlowForge automatically figures out:**

| Step | What FlowForge Determines |
|------|---------------------------|
| **Dependency Resolution** | `fetch_news_data`, `fetch_sec_data`, `fetch_earnings_data` all need `prioritize_content` first |
| **Topological Sort** | `prioritize_content` → `[fetch_news, fetch_sec, fetch_earnings]` → `parse_agent_data` |
| **Parallel Execution** | All three fetch steps share the same dependency, so they run **simultaneously** |

#### Visual Comparison: Sequential vs Parallel

**❌ Without FlowForge (Old Code - Sequential):**
```
prioritize_content    (1 sec)
       ↓
fetch_news_data       (2 sec)
       ↓
fetch_sec_data        (2 sec)
       ↓
fetch_earnings_data   (2 sec)
       ↓
parse_agent_data      (1 sec)
─────────────────────────────
Total: 8 seconds
```

**✅ With FlowForge (Automatic Parallelization):**
```
prioritize_content         (1 sec)
       ↓
┌──────┼──────┐
↓      ↓      ↓
fetch_ fetch_ fetch_        (2 sec - all three run simultaneously!)
news   sec    earnings
└──────┼──────┘
       ↓
parse_agent_data           (1 sec)
─────────────────────────────
Total: 4 seconds (50% faster!)
```

#### Another CMPT Example: LLM Calls

```python
@forge.step(name="generate_financial_metrics", deps=["build_prompts"])
@forge.step(name="generate_strategic_analysis", deps=["build_prompts"])
```

Both LLM calls depend on `build_prompts`, so FlowForge runs them **in parallel**:

```
build_prompts                    (0.5 sec)
       ↓
┌──────┴──────┐
↓             ↓
generate_     generate_          (10 sec - both run simultaneously!)
financial_    strategic_
metrics       analysis
─────────────────────────────────
Total: ~10 sec instead of 20+ sec
```

#### 💡 Key Insight

> **You don't write parallelization code.** You just declare dependencies with `deps=[]`, and FlowForge:
> 1. Builds the DAG automatically
> 2. Identifies which steps can run in parallel  
> 3. Manages the execution with asyncio

---

### 📊 CMPT Implements (Business Logic Layer)

| Component | What CMPT Chain Implements |
|-----------|----------------------------|
| **Data Agents** | NewsAgent, SECAgent, EarningsAgent (MCP integrations) |
| **Context Builder** | Company info extraction, temporal context, persona extraction |
| **Content Prioritization** | Temporal source prioritizer, subquery engine, topic ranker |
| **Response Builder** | Prompt construction, LLM calls, metrics validation |
| **Business Rules** | Grid config, priority profiles, earnings proximity rules |
| **Data Models** | FinancialMetricsResponse, StrategicAnalysisResponse, CitationDict |

### Visual Comparison: Framework vs Business Logic

```
┌───────────────────────────────────────────────────────────────────────┐
│                        CMPT CHAIN APPLICATION                         │
├───────────────────────────────────────────────────────────────────────┤
│  Business Logic Layer (CMPT Implements)                               │
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────┐              │
│  │ Data Agents │  │ Grid Config  │  │ LLM Prompts     │              │
│  │ • News      │  │ • Priorities │  │ • Metrics       │              │
│  │ • SEC       │  │ • Rules      │  │ • Analysis      │              │
│  │ • Earnings  │  │ • Topics     │  │ • Validation    │              │
│  └─────────────┘  └──────────────┘  └─────────────────┘              │
├───────────────────────────────────────────────────────────────────────┤
│  Framework Layer (FlowForge Provides)                                 │
│  ┌────────────┐  ┌────────────┐  ┌────────────┐  ┌────────────┐      │
│  │ Decorators │  │ DAG Engine │  │ Context    │  │ Middleware │      │
│  │ @step      │  │ Parallel   │  │ Management │  │ Logging    │      │
│  │ @agent     │  │ Execution  │  │ Scopes     │  │ Caching    │      │
│  │ @chain     │  │ Retry      │  │ Tokens     │  │ Metrics    │      │
│  └────────────┘  └────────────┘  └────────────┘  └────────────┘      │
└───────────────────────────────────────────────────────────────────────┘
```

<a id="3-architecture-overview"></a>
## 3. CMPT Chain Architecture Overview

The CMPT pipeline follows this execution flow:

```
┌─────────────────────────────────────────────────────────────────────────────────────┐
│                     CLIENT MEETING PREP CHAIN SERVER                                │
├─────────────────────────────────────────────────────────────────────────────────────┤
│                                                                                     │
│  ┌──────────────────────┐    ┌─────────────────────────┐    ┌────────────────────┐  │
│  │   CONTEXT BUILDER    │    │CONTENT PRIORITIZATION   │    │ RESPONSE BUILDER & │  │
│  │                      │    │       ENGINE            │    │    GENERATOR       │  │
│  │                      │    │                         │    │                    │  │
│  │ • Company Firm       │    │ • Temporal Source       │    │ • Agent Execution  │  │
│  │   Extractor          │    │   Prioritizer           │    │                    │  │
│  │                      │(2) │                         │(3) │ • Prompt Builder   │  │
│  │ • RBC Persona        │───▶│ • Subquery Engine       │───▶│                    │  │
│  │   Extractor          │    │                         │    │ • Response Builder │  │
│  │                      │    │ • Topic Ranker          │    │                    │  │
│  │ • Temporal Context   │    │                         │    │ • Validation       │  │
│  │   Extractor          │    │ • Grid Config           │    │                    │  │
│  └──────────────────────┘    └─────────────────────────┘    └─────────┬──────────┘  │
│                                                                        │ (5)        │
└────────────────────────────────────────────────────────────────────────┼────────────┘
                                                                         │
                         ┌───────────────────────────────────────────────┘
                         │ (4)
                         ▼
┌─────────────────────────────────────────────────────────────────────────────────────┐
│                                  DATA AGENTS                                        │
├────────────────────┬─────────────────────┬──────────────────────────────────────────┤
│    SEC Filing      │       News          │              Earnings                    │
│                    │                     │                                          │
│   >> RavenPack     │    >> RavenPack     │              FACTSET                     │
│                    │                     │                                          │
│ Last 8 quarters    │ 1 year (market cap) │              Latest                      │
│ (revenue),         │ 30 days (others)    │                                          │
│ Latest (Others)    │                     │                                          │
└────────────────────┴─────────────────────┴──────────────────────────────────────────┘
```

### Data Flow Sequence

1. **User Request → Context Builder**: User provides meeting details (client, date, attendees)
2. **Context Builder → Content Prioritization**: Extracted context flows to prioritization engine
3. **Content Prioritization → Response Builder**: Prioritized content with subqueries flows to generator
4. **Response Builder → Data Agents**: Agent execution queries external data sources (PARALLEL)
5. **Data Agents → Response Builder**: Retrieved data flows back for response building
6. **Response Builder → LLM**: Generate financial metrics and strategic analysis (PARALLEL)
7. **LLM → Response**: Final CMPT content returned to user

<a id="4-imports"></a>
## 4. Imports and Dependencies

Let's start by importing all necessary modules.

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                           IMPORTS & DEPENDENCIES
# ═══════════════════════════════════════════════════════════════════════════════

from __future__ import annotations

# Standard library
import asyncio
import logging
import os
import re
import json
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Optional, Dict, List
from difflib import SequenceMatcher

# Third-party
from pydantic import BaseModel, Field
import httpx  # For HTTP requests

# FlowForge - The framework we're using
from flowforge import FlowForge
from flowforge.core.context import ChainContext, ContextScope

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("✅ All imports successful!")

<a id="5-data-models"></a>
## 5. Data Models & Schemas

These Pydantic models define the structure of our data. They ensure type safety and provide validation.

### Key Models:
- **CMPTRequest**: Input request for the chain
- **CitationDict**: Source tracking for LLM extractions
- **FinancialMetricsResponse**: Financial data with citations
- **StrategicAnalysisResponse**: SWOT, investment thesis, etc.

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                           DATA MODELS & SCHEMAS
# ═══════════════════════════════════════════════════════════════════════════════

class ToolName(Enum):
    """
    Data agent tool names - maps to MCP tool endpoints.
    
    These are the three primary data sources for CMPT:
    - EARNINGS: Quarterly earnings transcripts from FACTSET
    - NEWS: Recent news articles from RavenPack
    - SEC: SEC filings (10-K, 10-Q) from RavenPack
    """
    EARNINGS_TOOL = "earnings_agent"
    NEWS_TOOL = "news_agent"
    SEC_TOOL = "SEC_agent"  # Note: Capital letters to match old code


class CMPTRequest(BaseModel):
    """
    Unified request model for CMPT chain.
    
    This matches the original ChainServerRequest from the old codebase.
    All fields are optional to support flexible input.
    """
    corporate_client_email: Optional[str] = None
    corporate_client_names: Optional[str] = None
    rbc_employee_email: Optional[str] = None
    meeting_datetime: Optional[str] = None
    corporate_company_name: Optional[str] = None
    verbose: bool = False


class CitationDict(BaseModel):
    """
    Structured citation with source tracking.
    
    This is CRITICAL for validation - every extracted metric must
    have a citation that can be verified against source documents.
    
    Example:
        {
            "source_agent": ["SEC_agent"],
            "source_content": ["Total revenues were $96,773 million..."],
            "reasoning": "Extracted from 10-Q filing dated 2025-10-20"
        }
    """
    source_agent: List[str] = Field(
        description="List of data agents used (SEC_agent, earnings_agent, news_agent)"
    )
    source_content: List[str] = Field(
        description="List of VERBATIM quotes from source chunks - DO NOT PARAPHRASE"
    )
    reasoning: str = Field(
        description="Explanation of extraction/calculation logic with actual numbers"
    )


class FinancialMetricsResponse(BaseModel):
    """
    Financial metrics extraction response with full citations.
    
    Each metric has:
    - Value field (float or None)
    - Date field (when applicable)
    - Citation field (ALWAYS required, even for null values)
    
    Key metrics:
    - Revenue (current, YoY change, next year estimate)
    - EBITDA margin (with YoY change)
    - Stock price (with daily and YoY changes)
    - Market cap
    - Revenue growth trajectory (last 7 quarters)
    """
    # Revenue metrics
    current_annual_revenue: Optional[float] = None
    current_annual_revenue_date: Optional[str] = None
    current_annual_revenue_citation: Optional[CitationDict] = None
    current_annual_revenue_yoy_change: Optional[float] = None
    current_annual_revenue_yoy_change_citation: Optional[CitationDict] = None
    estimated_annual_revenue_next_year: Optional[float] = None
    estimated_annual_revenue_next_year_date: Optional[str] = None
    estimated_annual_revenue_next_year_citation: Optional[CitationDict] = None
    
    # EBITDA metrics
    ebitda_margin: Optional[float] = None
    ebitda_margin_citation: Optional[CitationDict] = None
    ebitda_margin_yoy_change: Optional[float] = None
    ebitda_margin_yoy_change_citation: Optional[CitationDict] = None
    
    # Stock metrics
    stock_price: Optional[float] = None
    stock_price_citation: Optional[CitationDict] = None
    stock_price_daily_change: Optional[float] = None
    stock_price_daily_change_percent: Optional[float] = None
    stock_price_yoy_change: Optional[float] = None
    stock_price_yoy_change_citation: Optional[CitationDict] = None
    
    # Market cap
    market_cap: Optional[float] = None
    market_cap_citation: Optional[CitationDict] = None
    market_cap_date: Optional[str] = None
    
    # Quarterly trajectory
    revenue_growth_trajectory: Optional[Dict[str, Optional[float]]] = None
    revenue_growth_trajectory_citation: Optional[CitationDict] = None


class StrategicAnalysisResponse(BaseModel):
    """
    Strategic analysis response for client meeting prep.
    
    Includes:
    - SWOT analysis (strengths, weaknesses, opportunities, threats)
    - Investment thesis
    - Key risks
    - Strategic opportunities
    - Recent developments (with dates and source URLs)
    """
    strength: List[str] = Field(description="4-6 key competitive strengths")
    weakness: List[str] = Field(description="4-6 key vulnerabilities")
    opportunity: List[str] = Field(description="4-6 growth opportunities")
    threat: List[str] = Field(description="4-6 external threats")
    
    investment_thesis: List[Dict[str, List[str]]] = Field(
        description="3-4 investment thesis points with subheadings and bullets"
    )
    
    key_risk_highlights: List[str] = Field(description="5-7 critical risks")
    
    strategic_opportunities: List[Dict[str, List[str]]] = Field(
        description="3-4 strategic opportunities for M&A, capital raising, etc."
    )
    
    recent_developments: List[Dict[str, Any]] = Field(
        description="4-6 recent developments with category, date, description, source_url"
    )
    
    sources: List[str] = Field(description="8-12 sources cited")


print("✅ Data models defined!")
print(f"   - ToolName enum with {len(ToolName)} agents")
print(f"   - CMPTRequest for chain input")
print(f"   - CitationDict for source tracking")
print(f"   - FinancialMetricsResponse with {len(FinancialMetricsResponse.model_fields)} fields")
print(f"   - StrategicAnalysisResponse with {len(StrategicAnalysisResponse.model_fields)} fields")

<a id="6-grid-config"></a>
## 6. Grid Configuration (Business Rules)

The Grid Configuration defines the business rules for content prioritization.

### Key Concepts:
- **Priority Profiles**: How much weight each data source gets (earnings vs news vs SEC)
- **Temporal Rules**: Rules that change priorities based on meeting date and earnings proximity
- **Topics**: Content prioritization topics for ranking

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                           GRID CONFIGURATION
# ═══════════════════════════════════════════════════════════════════════════════

"""
GRID CONFIG: Business rules for content prioritization.

This is where the "intelligence" of the CMPT chain lives.
The rules determine which data sources get priority based on:
- Company type (public vs private)
- Proximity to earnings date
- Meeting date
"""

GRID_CONFIG = {
    # How many weeks before/after earnings to use "earnings_dominant" profile
    "earnings_proximity_weeks": 1,
    
    # Priority profiles - percentage weights for each data source
    "priority_profiles": {
        "earnings_dominant": {
            # When meeting is near earnings, prioritize earnings transcripts
            ToolName.EARNINGS_TOOL.value: 50,
            ToolName.NEWS_TOOL.value: 30,
            ToolName.SEC_TOOL.value: 20
        },
        "news_dominant": {
            # Default profile - news is most relevant for recent context
            ToolName.EARNINGS_TOOL.value: 20,
            ToolName.NEWS_TOOL.value: 60,
            ToolName.SEC_TOOL.value: 20
        }
    },
    
    # Temporal source prioritizer rules (evaluated in order, first match wins)
    "temporal_source_prioritizer": {
        "rules": [
            {
                # Rule 1: Non-public companies don't have SEC/earnings data
                "name": "non_public_company",
                "condition": lambda ctx: ctx["company_type"] not in ["PUB", "SUB"],
                "priority_profile": "news_dominant"
            },
            {
                # Rule 2: Public companies near earnings date
                "name": "earnings_proximity",
                "condition": lambda ctx: (
                    ctx["company_type"] in ["PUB", "SUB"] and
                    ctx.get("max_earnings_event_date") and
                    abs((ctx["max_earnings_event_date"] - ctx["meeting_date"]).days) <= ctx["window"]
                ),
                "priority_profile": "earnings_dominant"
            }
        ],
        "default_profile": "news_dominant"
    },
    
    # Topics for content prioritization/ranking
    "content_prioritization_topics": [
        "financial_performance",
        "strategic_initiatives",
        "market_position",
        "risk_factors",
        "management_changes"
    ]
}

print("✅ Grid configuration loaded!")
print(f"   - Earnings proximity window: {GRID_CONFIG['earnings_proximity_weeks']} weeks")
print(f"   - Priority profiles: {list(GRID_CONFIG['priority_profiles'].keys())}")
print(f"   - Temporal rules: {[r['name'] for r in GRID_CONFIG['temporal_source_prioritizer']['rules']]}")
print(f"   - Content topics: {GRID_CONFIG['content_prioritization_topics']}")

<a id="7-init-forge"></a>
## 7. Initializing FlowForge

Now we create our FlowForge instance. This is the central orchestrator.

### Key Parameters:
- `name`: Identifier for this forge instance
- `version`: Version string for tracking
- `max_parallel`: Maximum concurrent step executions (uses semaphore)
- `default_timeout_ms`: Default timeout for steps
- `isolated`: Whether to use isolated registries (prevents state bleed between tests)

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                           INITIALIZE FLOWFORGE
# ═══════════════════════════════════════════════════════════════════════════════

"""
Create the FlowForge instance.

FlowForge is the central orchestrator that:
1. Registers agents, steps, and chains via decorators
2. Resolves dependencies and builds the DAG
3. Executes steps with parallel optimization
4. Manages context across step executions
5. Provides validation and visualization tools
"""

forge = FlowForge(
    name="cmpt_chain",           # Name of this forge instance
    version="2.0.0",             # Version for tracking
    max_parallel=10,             # Max concurrent step executions
    default_timeout_ms=60000,    # 60 second default timeout
    isolated=True,               # Use isolated registries (good for testing)
)

print(f"✅ FlowForge initialized!")
print(f"   - Name: {forge.name}")
print(f"   - Version: {forge.version}")
print(f"   - Isolated mode: {forge._isolated}")
print(f"\n📌 FlowForge is now ready to register agents, steps, and chains!")

<a id="8-data-agents"></a>
## 8. Registering Data Agents

Data Agents are classes that fetch data from external sources (MCP servers, APIs, etc.).

### FlowForge Agent Features:
- Registered with `@forge.agent()` decorator
- Can have name, group, description
- Retrieved via `forge.get_agent("name")`
- Typically async methods for data fetching

### CMPT Agents:
1. **NewsAgent**: Fetches news from RavenPack via MCP
2. **SECAgent**: Fetches SEC filings from RavenPack via MCP
3. **EarningsAgent**: Fetches earnings transcripts from FACTSET via MCP

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                           DATA AGENTS
# ═══════════════════════════════════════════════════════════════════════════════

@forge.agent(name="news_agent", group="data", description="Fetches news articles via MCP")
class NewsAgent:
    """
    News data agent using MCP integration.
    
    Connects to RavenPack MCP server to fetch:
    - Recent news articles (last 30 days)
    - Market cap/stock price mentions (last 5 days)
    - Executive/leadership changes (last 30 days)
    - M&A activity
    """

    def __init__(self):
        # Configuration from environment variables
        self.server_url = os.getenv("NEWS_AGENT_MCP_URL")
        self.bearer_token = os.getenv("NEWS_AGENT_MCP_BEARER_TOKEN")
        self.tool_name = os.getenv("NEWS_AGENT_MCP_TOOL", "search_news")

    async def fetch(self, company_name: str, **kwargs) -> List[Dict]:
        """
        Fetch news for a company.
        
        Args:
            company_name: Name of the company to search
            
        Returns:
            List of news article chunks with metadata
        """
        subqueries = self._build_subqueries(company_name)
        results = []

        for subquery in subqueries:
            try:
                result = await self._execute_query(subquery)
                if result:
                    results.extend(result)
            except Exception as e:
                logger.error(f"News query failed: {e}")

        return results

    def _build_subqueries(self, company_name: str) -> List[Dict]:
        """
        Build news search subqueries.
        
        Creates targeted queries for different types of news:
        - Executive changes
        - M&A activity
        - Stock/market cap updates
        """
        end_date = datetime.now().strftime('%Y-%m-%d')
        month_ago = (datetime.now() - timedelta(days=31)).strftime('%Y-%m-%d')
        five_days_ago = (datetime.now() - timedelta(days=5)).strftime('%Y-%m-%d')

        return [
            {
                "search_query": f"{company_name} executive leadership changes",
                "topics": ["executive", "CEO", "CFO", "leadership"],
                "absolute_date_range": {"start_date": month_ago, "end_date": end_date}
            },
            {
                "search_query": f"{company_name} mergers acquisitions M&A",
                "topics": ["merger", "acquisition", "M&A", "deal"],
                "absolute_date_range": {"start_date": month_ago, "end_date": end_date}
            },
            {
                "search_query": f"{company_name} stock price market cap",
                "topics": ["market cap", "stock price", "valuation"],
                "absolute_date_range": {"start_date": five_days_ago, "end_date": end_date}
            },
        ]

    async def _execute_query(self, subquery: Dict) -> List[Dict]:
        """Execute MCP query - placeholder for real implementation"""
        # In production, this would use the MCP client:
        # async with streamablehttp_client(self.server_url, headers=headers) as (read_stream, write_stream, _):
        #     async with ClientSession(read_stream, write_stream) as session:
        #         await session.initialize()
        #         result = await session.call_tool(name=self.tool_name, arguments=subquery)
        logger.info(f"Executing news query: {subquery.get('search_query', '')[:50]}...")
        return []


@forge.agent(name="sec_agent", group="data", description="Fetches SEC filings via MCP")
class SECAgent:
    """
    SEC filings data agent.
    
    Connects to RavenPack MCP server to fetch:
    - 10-K annual reports
    - 10-Q quarterly reports
    - Last 8 quarters of revenue data
    - Balance sheet data
    """

    def __init__(self):
        self.server_url = os.getenv("SEC_AGENT_MCP_URL")
        self.bearer_token = os.getenv("SEC_AGENT_MCP_BEARER_TOKEN")
        self.tool_name = os.getenv("SEC_AGENT_MCP_TOOL", "search_filings")

    async def fetch(self, company_name: str, **kwargs) -> List[Dict]:
        """Fetch SEC filings for a company"""
        subqueries = self._build_subqueries(company_name)
        results = []

        for subquery in subqueries:
            try:
                result = await self._execute_query(subquery)
                if result:
                    results.extend(result)
            except Exception as e:
                logger.error(f"SEC query failed: {e}")

        return results

    def _build_subqueries(self, company_name: str) -> List[Dict]:
        """
        Build SEC search subqueries.
        
        Creates queries for:
        - Income statements (revenue, last 8 quarters)
        - Balance sheets (shares outstanding, latest)
        """
        return [
            {
                "reporting_entity": company_name,
                "search_queries": [
                    "consolidated statements of operations",
                    "statements of income",
                    "total revenue"
                ],
                "keywords": [
                    "net sales", "total revenue", "quarterly revenue",
                    "three months ended", "revenue"
                ],
                "get_latest": 8  # Last 8 quarters for trajectory
            },
            {
                "reporting_entity": company_name,
                "search_queries": [
                    "consolidated balance sheets",
                    "stockholders equity",
                    "common stock outstanding shares"
                ],
                "keywords": [
                    "outstanding shares", "common stock", "shares outstanding"
                ],
                "get_latest": 1  # Just latest for market cap calc
            }
        ]

    async def _execute_query(self, subquery: Dict) -> List[Dict]:
        """Execute MCP query - placeholder"""
        logger.info(f"Executing SEC query for: {subquery.get('reporting_entity', '')}")
        return []


@forge.agent(name="earnings_agent", group="data", description="Fetches earnings transcripts via MCP")
class EarningsAgent:
    """
    Earnings transcript data agent.
    
    Connects to FACTSET MCP server to fetch:
    - Quarterly earnings call transcripts
    - Management commentary
    - Forward guidance
    """

    def __init__(self):
        self.server_url = os.getenv("EARNINGS_AGENT_MCP_URL")
        self.bearer_token = os.getenv("EARNINGS_AGENT_MCP_BEARER_TOKEN")
        self.tool_name = os.getenv("EARNINGS_AGENT_MCP_TOOL", "search_earnings")

    async def fetch(
        self,
        company_name: str,
        fiscal_year: str,
        fiscal_quarter: str,
        **kwargs
    ) -> List[Dict]:
        """
        Fetch earnings transcripts for a company.
        
        Args:
            company_name: Company name
            fiscal_year: e.g., "2025"
            fiscal_quarter: e.g., "1", "2", "3", "4"
        """
        try:
            subquery = {
                "query": f"Give me the earnings transcript for {company_name} for fiscal year: {fiscal_year} and quarter: {fiscal_quarter}."
            }
            return await self._execute_query(subquery)
        except Exception as e:
            logger.error(f"Earnings query failed: {e}")
            return []

    async def _execute_query(self, subquery: Dict) -> List[Dict]:
        """Execute MCP query - placeholder"""
        logger.info(f"Executing earnings query: {subquery.get('query', '')[:50]}...")
        return []


print("✅ Data agents registered!")
print(f"   - Agents: {forge.list_agents()}")

<a id="9-chain-steps"></a>
## 9. Defining Chain Steps (Overview)

Steps are the building blocks of a chain. Each step:
- Takes a `ChainContext` as input
- Can read/write to the context
- Declares its dependencies and what it produces
- Returns a result dictionary

### FlowForge Step Decorator Parameters:

| Parameter | Description | Example |
|-----------|-------------|----------|
| `name` | Unique step identifier | `"extract_company_info"` |
| `deps` | List of dependency step names | `["build_context"]` |
| `produces` | Context keys this step produces | `["company_info"]` |
| `description` | Human-readable description | `"Extract company info"` |
| `group` | Step group for organization | `"context_builder"` |
| `timeout_ms` | Execution timeout | `30000` |
| `retry` | Number of retries on failure | `3` |
| `max_concurrency` | Limit parallel instances | `2` |
| `resources` | Resources to inject | `["db", "llm"]` |

<a id="11-context-builder"></a>
## 11. Context Builder Steps

The Context Builder extracts information needed for the pipeline:
1. **Company Info**: Ticker, company type, sector from foundation service
2. **Temporal Context**: Earnings calendar, fiscal year/quarter
3. **Persona Info** (optional): RBC employee and client personas

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                       CONTEXT BUILDER STEPS
# ═══════════════════════════════════════════════════════════════════════════════

@forge.step(
    name="extract_company_info",
    produces=["company_info"],
    description="Extract company information from external API",
    timeout_ms=30000,
    group="context_builder"
)
async def extract_company_info(ctx: ChainContext) -> Dict:
    """
    Extract company information (ticker, type, etc.) from foundation service.
    
    This step:
    1. Reads company_name from context (provided in initial data)
    2. Calls the foundation company matches API
    3. Stores the result in context for downstream steps
    
    Equivalent to: ContextBuilderService.corporate_client_firm_extractor
    """
    company_name = ctx.get("company_name")

    if not company_name:
        logger.warning("No company name provided")
        return {"company_info": None}

    try:
        # In production, this would call the foundation service:
        # url = os.getenv('FOUNDATION_COMPANY_MATCHES')
        # async with httpx.AsyncClient(verify=False, timeout=20.0) as client:
        #     response = await client.post(url, json={"company_name": company_name})
        #     result = response.json()
        #     if result.get("result", {}).get("matches"):
        #         company_info = result["result"]["matches"][0]

        # Placeholder response structure matching old code
        company_info = {
            "company_name": company_name,
            "ticker_symbol": None,
            "company_type": "PUB",  # PUB, PRIV, SUB
            "sector": None,
            "industry": None,
        }

        # Store in context for downstream steps
        ctx.set("company_info", company_info, scope=ContextScope.CHAIN)
        logger.info(f"Extracted company info for: {company_name}")

        return {"company_info": company_info}

    except Exception as e:
        logger.error(f"Company info extraction failed: {e}")
        return {"company_info": None, "error": str(e)}


@forge.step(
    name="extract_temporal_context",
    deps=["extract_company_info"],  # Depends on company info
    produces=["temporal_context"],
    description="Extract temporal context (earnings calendar) for the company",
    timeout_ms=30000,
    group="context_builder"
)
async def extract_temporal_context(ctx: ChainContext) -> Dict:
    """
    Extract temporal context including earnings calendar.
    
    Uses company info from previous step to:
    1. Look up upcoming earnings dates
    2. Determine fiscal year/quarter
    3. Find the closest earnings event to meeting date
    
    Equivalent to: ContextBuilderService.temporal_content_extractor
    """
    company_info = ctx.get("company_info", {})
    company_name = company_info.get("company_name") or ctx.get("company_name")
    ticker = company_info.get("ticker_symbol")

    try:
        # In production:
        # url = os.getenv('FOUNDATION_EARNING_CALENDAR_URL')
        # payload = {"top_n": GRID_CONFIG["earnings_proximity_weeks"]}
        # if company_name: payload["company_name"] = company_name
        # elif ticker: payload["ticker_symbol"] = ticker
        # response = await client.post(url, json=payload)
        # result = response.json()
        # # Parse to find latest event_dt

        # Placeholder response
        temporal_context = {
            "event_dt": None,  # Next earnings date
            "fiscal_year": str(datetime.now().year),
            "fiscal_period": str((datetime.now().month - 1) // 3 + 1),
            "earnings_events": []
        }

        ctx.set("temporal_context", temporal_context, scope=ContextScope.CHAIN)
        logger.info(f"Extracted temporal context for: {company_name}")

        return {"temporal_context": temporal_context}

    except Exception as e:
        logger.error(f"Temporal context extraction failed: {e}")
        return {"temporal_context": None, "error": str(e)}


@forge.step(
    name="build_context",
    deps=["extract_company_info", "extract_temporal_context"],  # Fan-in point
    produces=["context_builder_output"],
    description="Combine all context builder outputs",
    group="context_builder"
)
async def build_context(ctx: ChainContext) -> Dict:
    """
    Combine all context builder outputs into a unified structure.
    
    This is the "fan-in" step that waits for all context extraction
    to complete and creates the final context builder output.
    
    Equivalent to: ContextBuilderService.execute
    """
    company_info = ctx.get("company_info", {})
    temporal_context = ctx.get("temporal_context", {})
    meeting_date = ctx.get("meeting_date") or datetime.now().strftime("%Y-%m-%d")

    context_builder_output = {
        "company_info": company_info,
        "temporal_context": temporal_context,
        "meeting_date": meeting_date,
        "company_name": company_info.get("company_name") or ctx.get("company_name"),
        "company_type": company_info.get("company_type", "PUB"),
        # These would be populated if persona extraction was enabled
        "rbc_persona": None,
        "corporate_client_persona": None,
    }

    ctx.set("context_builder_output", context_builder_output, scope=ContextScope.CHAIN)

    return {"context_builder_output": context_builder_output}


print("✅ Context builder steps registered!")
print(f"   - Steps: extract_company_info -> extract_temporal_context -> build_context")

<a id="12-content-prioritization"></a>
## 12. Content Prioritization Steps

The Content Prioritization engine determines:
1. **Source Priorities**: Which data sources get more weight
2. **Subqueries**: What queries to send to each agent
3. **Topic Rankings**: Which topics are most important

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                    CONTENT PRIORITIZATION STEPS
# ═══════════════════════════════════════════════════════════════════════════════

@forge.step(
    name="temporal_source_prioritizer",
    deps=["build_context"],
    produces=["source_priorities"],
    description="Determine data source priorities based on temporal context",
    group="content_prioritization"
)
async def temporal_source_prioritizer(ctx: ChainContext) -> Dict:
    """
    Determine data source priorities based on meeting date and earnings proximity.
    
    This implements the business rules from GRID_CONFIG:
    - If meeting is near earnings date -> earnings_dominant profile
    - If company is private -> news_dominant profile
    - Otherwise -> default profile
    
    Equivalent to: ContentPrioritizationService.temporal_source_prioritizer
    """
    context_output = ctx.get("context_builder_output", {})
    temporal_context = context_output.get("temporal_context", {})
    meeting_date = context_output.get("meeting_date")
    company_type = context_output.get("company_type", "PUB")

    config = GRID_CONFIG
    earnings_proximity_weeks = config.get("earnings_proximity_weeks", 1)
    window = earnings_proximity_weeks * 7  # Convert to days

    # Parse dates
    meeting_datetime = datetime.strptime(meeting_date, "%Y-%m-%d") if meeting_date else datetime.now()
    max_earnings_date = None
    if temporal_context.get("event_dt"):
        max_earnings_date = datetime.strptime(temporal_context["event_dt"], "%Y-%m-%d")

    # Build rule context for evaluation
    rule_context = {
        "company_type": company_type,
        "meeting_date": meeting_datetime,
        "max_earnings_event_date": max_earnings_date,
        "window": window
    }

    # Evaluate rules from grid config
    prioritizer_config = config["temporal_source_prioritizer"]
    priority_profiles = config["priority_profiles"]
    default_profile = prioritizer_config["default_profile"]
    source_priorities = priority_profiles[default_profile]

    # Check each rule in order - first match wins
    for rule in prioritizer_config["rules"]:
        try:
            if callable(rule["condition"]) and rule["condition"](rule_context):
                profile_name = rule["priority_profile"]
                source_priorities = priority_profiles[profile_name]
                logger.info(f"Applied priority rule: {rule['name']} -> {profile_name}")
                break
        except Exception as e:
            logger.warning(f"Error evaluating rule '{rule.get('name')}': {e}")

    ctx.set("source_priorities", source_priorities, scope=ContextScope.CHAIN)

    return {"source_priorities": source_priorities}


@forge.step(
    name="generate_subqueries",
    deps=["build_context"],  # Same dep as prioritizer - can run in parallel!
    produces=["subqueries"],
    description="Generate subqueries for data agents",
    group="content_prioritization"
)
async def generate_subqueries(ctx: ChainContext) -> Dict:
    """
    Generate subqueries for each data agent.
    
    Uses the agent classes to build their specific subqueries
    based on the company context.
    
    Equivalent to: ContentPrioritizationService.subquery_engine
    """
    context_output = ctx.get("context_builder_output", {})
    company_name = context_output.get("company_name", "")
    temporal_context = context_output.get("temporal_context", {})

    fiscal_year = temporal_context.get("fiscal_year", str(datetime.now().year))
    fiscal_quarter = temporal_context.get("fiscal_period", "1")

    # Get agents and build their subqueries
    news_agent = forge.get_agent("news_agent")
    sec_agent = forge.get_agent("sec_agent")

    subqueries = {
        ToolName.NEWS_TOOL.value: news_agent._build_subqueries(company_name),
        ToolName.SEC_TOOL.value: sec_agent._build_subqueries(company_name),
        ToolName.EARNINGS_TOOL.value: [{
            "company_name": company_name,
            "fiscal_year": fiscal_year,
            "fiscal_quarter": fiscal_quarter
        }]
    }

    ctx.set("subqueries", subqueries, scope=ContextScope.CHAIN)

    return {"subqueries": subqueries}


@forge.step(
    name="prioritize_content",
    deps=["temporal_source_prioritizer", "generate_subqueries"],  # Fan-in
    produces=["content_prioritization_output"],
    description="Combine prioritization outputs",
    group="content_prioritization"
)
async def prioritize_content(ctx: ChainContext) -> Dict:
    """
    Combine all content prioritization outputs.
    
    Equivalent to: ContentPrioritizationService.execute
    """
    source_priorities = ctx.get("source_priorities", {})
    subqueries = ctx.get("subqueries", {})

    content_prioritization_output = {
        "temporal_source_prioritizer": source_priorities,
        "subqueries_from_engine": subqueries,
        "topic_ranker_result": GRID_CONFIG.get("content_prioritization_topics", [])
    }

    ctx.set("content_prioritization_output", content_prioritization_output, scope=ContextScope.CHAIN)

    return {"content_prioritization_output": content_prioritization_output}


print("✅ Content prioritization steps registered!")
print(f"   - temporal_source_prioritizer: Applies business rules")
print(f"   - generate_subqueries: Creates agent queries")
print(f"   - prioritize_content: Combines outputs")

<a id="13-data-fetch"></a>
## 13. Data Fetching Steps (Parallel Execution)

These steps fetch data from the external agents. Since they all depend on the same `prioritize_content` step, **FlowForge will execute them in parallel automatically!**

```
                    prioritize_content
                           │
           ┌───────────────┼───────────────┐
           │               │               │
           ▼               ▼               ▼
    fetch_news_data  fetch_sec_data  fetch_earnings_data
           │               │               │
           └───────────────┼───────────────┘
                           │
                           ▼
                    parse_agent_data
```

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                    DATA FETCHING STEPS (PARALLEL)
# ═══════════════════════════════════════════════════════════════════════════════

@forge.step(
    name="fetch_news_data",
    deps=["prioritize_content"],
    produces=["news_data"],
    description="Fetch news data from news agent",
    timeout_ms=60000,
    group="data_fetch"
)
async def fetch_news_data(ctx: ChainContext) -> Dict:
    """
    Fetch news data using the news agent.
    
    This step runs in PARALLEL with fetch_sec_data and fetch_earnings_data
    because they all share the same dependency.
    """
    context_output = ctx.get("context_builder_output", {})
    company_name = context_output.get("company_name", "")

    try:
        news_agent = forge.get_agent("news_agent")
        news_data = await news_agent.fetch(company_name)
        ctx.set("news_data", news_data, scope=ContextScope.CHAIN)
        logger.info(f"Fetched {len(news_data)} news items")
        return {"news_data": news_data}
    except Exception as e:
        logger.error(f"News data fetch failed: {e}")
        return {"news_data": [], "error": str(e)}


@forge.step(
    name="fetch_sec_data",
    deps=["prioritize_content"],  # Same dep = parallel execution!
    produces=["sec_data"],
    description="Fetch SEC filings data",
    timeout_ms=60000,
    group="data_fetch"
)
async def fetch_sec_data(ctx: ChainContext) -> Dict:
    """Fetch SEC filings data using the SEC agent"""
    context_output = ctx.get("context_builder_output", {})
    company_name = context_output.get("company_name", "")

    try:
        sec_agent = forge.get_agent("sec_agent")
        sec_data = await sec_agent.fetch(company_name)
        ctx.set("sec_data", sec_data, scope=ContextScope.CHAIN)
        logger.info(f"Fetched {len(sec_data)} SEC filings")
        return {"sec_data": sec_data}
    except Exception as e:
        logger.error(f"SEC data fetch failed: {e}")
        return {"sec_data": [], "error": str(e)}


@forge.step(
    name="fetch_earnings_data",
    deps=["prioritize_content"],  # Same dep = parallel execution!
    produces=["earnings_data"],
    description="Fetch earnings transcript data",
    timeout_ms=60000,
    group="data_fetch"
)
async def fetch_earnings_data(ctx: ChainContext) -> Dict:
    """Fetch earnings data using the earnings agent"""
    context_output = ctx.get("context_builder_output", {})
    temporal_context = context_output.get("temporal_context", {})
    company_name = context_output.get("company_name", "")

    fiscal_year = temporal_context.get("fiscal_year", str(datetime.now().year))
    fiscal_quarter = temporal_context.get("fiscal_period", "1")

    try:
        earnings_agent = forge.get_agent("earnings_agent")
        earnings_data = await earnings_agent.fetch(
            company_name,
            fiscal_year=fiscal_year,
            fiscal_quarter=fiscal_quarter
        )
        ctx.set("earnings_data", earnings_data, scope=ContextScope.CHAIN)
        logger.info(f"Fetched {len(earnings_data)} earnings items")
        return {"earnings_data": earnings_data}
    except Exception as e:
        logger.error(f"Earnings data fetch failed: {e}")
        return {"earnings_data": [], "error": str(e)}


print("✅ Data fetching steps registered!")
print(f"   - fetch_news_data, fetch_sec_data, fetch_earnings_data")
print(f"   - These will run in PARALLEL since they share the same dependency!")

<a id="14-response-builder"></a>
## 14. Response Builder Steps

The Response Builder takes the fetched data and:
1. Parses and formats the agent data
2. Builds LLM prompts
3. Calls LLM for metrics and analysis (parallel)
4. Validates the results
5. Builds the final response

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                       RESPONSE BUILDER STEPS
# ═══════════════════════════════════════════════════════════════════════════════

@forge.step(
    name="parse_agent_data",
    deps=["fetch_news_data", "fetch_sec_data", "fetch_earnings_data"],  # Fan-in from parallel
    produces=["parsed_agent_data"],
    description="Parse and format data from all agents",
    group="response_builder"
)
async def parse_agent_data(ctx: ChainContext) -> Dict:
    """
    Parse and format data from all agents into a unified structure.
    
    This step formats the raw MCP responses into a readable format
    for the LLM prompts.
    
    Equivalent to: ResponseBuilderAndGenerator.context_parser
    """
    news_data = ctx.get("news_data", [])
    sec_data = ctx.get("sec_data", [])
    earnings_data = ctx.get("earnings_data", [])

    def format_chunks(data: List, agent_name: str) -> str:
        """
        Format data chunks into readable text for LLM.
        
        Output format:
        CHUNK-1
        
        METADATA
        key1: value1
        key2: value2
        
        CHUNK-CONTENT
        The actual text content...
        """
        if not data:
            return ""

        formatted_chunks = []
        for idx, item in enumerate(data, 1):
            if isinstance(item, dict):
                chunk_str = f"CHUNK-{idx}\n\n"
                chunk_str += "METADATA\n"

                # Extract metadata (everything except 'text')
                metadata = {k: v for k, v in item.items() if k != "text"}
                for k, v in metadata.items():
                    chunk_str += f"{k}: {v}\n"

                chunk_str += "\n\nCHUNK-CONTENT\n"
                chunk_str += item.get("text", str(item))
                formatted_chunks.append(chunk_str)

        return "\n\n\n".join(formatted_chunks)

    parsed_agent_data = {
        ToolName.NEWS_TOOL.value: format_chunks(news_data, "news"),
        ToolName.SEC_TOOL.value: format_chunks(sec_data, "sec"),
        ToolName.EARNINGS_TOOL.value: format_chunks(earnings_data, "earnings"),
    }

    ctx.set("parsed_agent_data", parsed_agent_data, scope=ContextScope.CHAIN)

    return {"parsed_agent_data": parsed_agent_data}


@forge.step(
    name="build_prompts",
    deps=["parse_agent_data"],
    produces=["prompts"],
    description="Build LLM prompts for metrics and analysis",
    group="response_builder"
)
async def build_prompts(ctx: ChainContext) -> Dict:
    """
    Build LLM prompts for financial metrics and strategic analysis.
    
    Creates two prompts:
    1. Financial metrics prompt (for extracting numbers)
    2. Strategic analysis prompt (for SWOT, thesis, etc.)
    
    Equivalent to: ResponseBuilderAndGenerator.prompt_builder
    """
    parsed_data = ctx.get("parsed_agent_data", {})
    context_output = ctx.get("context_builder_output", {})
    prioritization = ctx.get("content_prioritization_output", {})

    company_name = context_output.get("company_name", "Unknown Company")
    source_priorities = prioritization.get("temporal_source_prioritizer", {})

    # Build financial metrics prompt
    financial_metrics_prompt = f"""
## Task: Extract Financial Metrics for {company_name}

[SEC_AGENT]
{parsed_data.get(ToolName.SEC_TOOL.value, 'No SEC data available')}

[EARNINGS_AGENT]
{parsed_data.get(ToolName.EARNINGS_TOOL.value, 'No earnings data available')}

[NEWS_AGENT]
{parsed_data.get(ToolName.NEWS_TOOL.value, 'No news data available')}

Extract precise financial metrics with citations for each value.
"""

    # Build strategic analysis prompt with source weights
    news_pct = source_priorities.get(ToolName.NEWS_TOOL.value, 40)
    earnings_pct = source_priorities.get(ToolName.EARNINGS_TOOL.value, 30)
    sec_pct = source_priorities.get(ToolName.SEC_TOOL.value, 30)

    strategic_analysis_prompt = f"""
## Task: Strategic Analysis for {company_name}

[NEWS_AGENT] (Include {news_pct}% from this source)
{parsed_data.get(ToolName.NEWS_TOOL.value, 'No news data available')}

[EARNINGS_AGENT] (Include {earnings_pct}% from this source)
{parsed_data.get(ToolName.EARNINGS_TOOL.value, 'No earnings data available')}

[SEC_AGENT] (Include {sec_pct}% from this source)
{parsed_data.get(ToolName.SEC_TOOL.value, 'No SEC data available')}

Generate comprehensive strategic analysis including SWOT, investment thesis, and recent developments.
"""

    prompts = {
        "financial_metrics_prompt": financial_metrics_prompt,
        "strategic_analysis_prompt": strategic_analysis_prompt,
    }

    ctx.set("prompts", prompts, scope=ContextScope.CHAIN)

    return {"prompts": prompts}


print("✅ Response builder parsing steps registered!")
print(f"   - parse_agent_data: Formats raw data")
print(f"   - build_prompts: Creates LLM prompts")

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                       LLM GENERATION STEPS (PARALLEL)
# ═══════════════════════════════════════════════════════════════════════════════

@forge.step(
    name="generate_financial_metrics",
    deps=["build_prompts"],
    produces=["financial_metrics"],
    description="Generate financial metrics using LLM",
    timeout_ms=120000,  # 2 minute timeout for LLM calls
    group="response_builder"
)
async def generate_financial_metrics(ctx: ChainContext) -> Dict:
    """
    Generate financial metrics using structured LLM output.
    
    This step runs in PARALLEL with generate_strategic_analysis.
    
    In production, this would:
    1. Call the LLM gateway with the prompt
    2. Use structured output (JSON mode)
    3. Parse into FinancialMetricsResponse
    
    Equivalent to: ResponseBuilderAndGenerator.get_structured_response
    """
    prompts = ctx.get("prompts", {})
    prompt = prompts.get("financial_metrics_prompt", "")

    # In production:
    # from flowforge.services.llm_gateway import get_llm_client
    # client = get_llm_client()
    # response = await client.generate_async(
    #     prompt,
    #     system_prompt=FINANCIAL_METRICS_SYSTEM_PROMPT,
    #     response_format=FinancialMetricsResponse
    # )

    # Placeholder response
    financial_metrics = {
        "current_annual_revenue": None,
        "ebitda_margin": None,
        "stock_price": None,
        "market_cap": None,
        "revenue_growth_trajectory": None,
    }

    ctx.set("financial_metrics", financial_metrics, scope=ContextScope.CHAIN)

    return {"financial_metrics": financial_metrics}


@forge.step(
    name="generate_strategic_analysis",
    deps=["build_prompts"],  # Same dep = parallel with metrics!
    produces=["strategic_analysis"],
    description="Generate strategic analysis using LLM",
    timeout_ms=120000,
    group="response_builder"
)
async def generate_strategic_analysis(ctx: ChainContext) -> Dict:
    """
    Generate strategic analysis using structured LLM output.
    
    Runs in PARALLEL with generate_financial_metrics.
    """
    prompts = ctx.get("prompts", {})
    prompt = prompts.get("strategic_analysis_prompt", "")

    # Placeholder response
    strategic_analysis = {
        "strength": [],
        "weakness": [],
        "opportunity": [],
        "threat": [],
        "investment_thesis": [],
        "key_risk_highlights": [],
        "strategic_opportunities": [],
        "recent_developments": [],
        "sources": [],
    }

    ctx.set("strategic_analysis", strategic_analysis, scope=ContextScope.CHAIN)

    return {"strategic_analysis": strategic_analysis}


@forge.step(
    name="validate_metrics",
    deps=["generate_financial_metrics"],
    produces=["validation_results"],
    description="Validate extracted financial metrics against sources",
    group="response_builder"
)
async def validate_metrics(ctx: ChainContext) -> Dict:
    """
    Validate extracted financial metrics against source documents.
    
    This step:
    1. Checks that all citations are valid
    2. Verifies quotes exist in source documents
    3. Runs sanity checks on numeric values
    
    Equivalent to: MetricsValidator.validate_financial_metrics
    """
    financial_metrics = ctx.get("financial_metrics", {})
    parsed_data = ctx.get("parsed_agent_data", {})

    # In production, use MetricsValidator:
    # validation_results = MetricsValidator.validate_financial_metrics(
    #     FinancialMetricsResponse(**financial_metrics),
    #     parsed_data
    # )

    # Placeholder validation
    validation_results = {
        "validation_summary": {
            "total_fields_checked": 0,
            "fields_with_values": 0,
            "sources_verified": 0,
            "warnings_count": 0,
        },
        "field_validations": {},
        "warnings": [],
        "sanity_checks": {"passed": [], "failed": [], "warnings": []}
    }

    ctx.set("validation_results", validation_results, scope=ContextScope.CHAIN)

    return {"validation_results": validation_results}


@forge.step(
    name="build_response",
    deps=["generate_financial_metrics", "generate_strategic_analysis", "validate_metrics"],
    produces=["final_response"],
    description="Build final CMPT response",
    group="response_builder"
)
async def build_response(ctx: ChainContext) -> Dict:
    """
    Build the final CMPT response combining all outputs.
    
    This is the final step that assembles everything into
    the ChainServerResponse format.
    
    Equivalent to: ChainOrchestrator.execute_chain (final assembly)
    """
    context_output = ctx.get("context_builder_output", {})
    prioritization = ctx.get("content_prioritization_output", {})
    financial_metrics = ctx.get("financial_metrics", {})
    strategic_analysis = ctx.get("strategic_analysis", {})
    validation_results = ctx.get("validation_results", {})
    parsed_data = ctx.get("parsed_agent_data", {})

    final_response = {
        "context_builder": {
            "company_info": context_output.get("company_info"),
            "temporal_context": context_output.get("temporal_context"),
            "meeting_date": context_output.get("meeting_date"),
            "rbc_persona": context_output.get("rbc_persona"),
            "corporate_client_persona": context_output.get("corporate_client_persona"),
        },
        "content_prioritization": {
            "temporal_source_prioritizer": prioritization.get("temporal_source_prioritizer"),
            "subqueries_from_engine": prioritization.get("subqueries_from_engine"),
            "topic_ranker_result": prioritization.get("topic_ranker_result"),
        },
        "response_builder_and_generator": {
            "financial_metrics_result": financial_metrics,
            "strategic_analysis_result": strategic_analysis,
            "validation_results": validation_results,
            "parsed_data_agent_chunks": {k: len(v) for k, v in parsed_data.items()},
            "company_name": context_output.get("company_name"),
        },
    }

    ctx.set("final_response", final_response, scope=ContextScope.CHAIN)

    return {"final_response": final_response}


print("✅ LLM generation and final steps registered!")
print(f"   - generate_financial_metrics || generate_strategic_analysis (parallel)")
print(f"   - validate_metrics: Verifies citations")
print(f"   - build_response: Final assembly")

<a id="10-chain-definition"></a>
## 10. Creating the Chain Definition

Now we define the chain that ties all the steps together. The `@forge.chain()` decorator registers the chain with FlowForge.

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                           CHAIN DEFINITION
# ═══════════════════════════════════════════════════════════════════════════════

@forge.chain(name="cmpt_pipeline", description="Client Meeting Prep Tool pipeline")
class CMPTPipeline:
    """
    Complete CMPT pipeline chain.
    
    This class defines the chain by listing all steps in execution order.
    FlowForge will:
    1. Resolve dependencies from step definitions
    2. Build the DAG
    3. Execute steps with automatic parallelization
    
    Execution Flow:
    ================
    
    1. Context Builder:
       extract_company_info → extract_temporal_context → build_context
       
    2. Content Prioritization (PARALLEL):
       temporal_source_prioritizer ┐
                                   ├→ prioritize_content
       generate_subqueries        ┘
       
    3. Data Fetch (PARALLEL):
       fetch_news_data    ┐
       fetch_sec_data     ├→ parse_agent_data
       fetch_earnings_data┘
       
    4. Response Builder:
       parse_agent_data → build_prompts
       
    5. LLM Generation (PARALLEL):
       generate_financial_metrics  ┐
                                   ├→ build_response
       generate_strategic_analysis ┘
       validate_metrics            ┘
    """

    steps = [
        # Context Builder (sequential)
        "extract_company_info",
        "extract_temporal_context",
        "build_context",
        
        # Content Prioritization (prioritizer & subqueries run in parallel)
        "temporal_source_prioritizer",
        "generate_subqueries",
        "prioritize_content",
        
        # Data Fetch (all three run in parallel)
        "fetch_news_data",
        "fetch_sec_data",
        "fetch_earnings_data",
        
        # Response Builder
        "parse_agent_data",
        "build_prompts",
        
        # LLM Generation (metrics & analysis run in parallel)
        "generate_financial_metrics",
        "generate_strategic_analysis",
        
        # Validation & Final Response
        "validate_metrics",
        "build_response",
    ]

    # Error handling mode:
    # - "fail_fast": Stop on first error
    # - "continue": Try to continue with other steps
    error_handling = "continue"


print("✅ Chain definition registered!")
print(f"   - Chain name: cmpt_pipeline")
print(f"   - Total steps: {len(CMPTPipeline.steps)}")
print(f"   - Error handling: {CMPTPipeline.error_handling}")

<a id="16-validation"></a>
## 16. Chain Validation

FlowForge provides built-in validation to check:
- All steps exist
- Dependencies are resolvable
- No circular dependencies
- Chain structure is valid

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                           CHAIN VALIDATION
# ═══════════════════════════════════════════════════════════════════════════════

print("\n" + "=" * 60)
print("  CMPT Chain Validation")
print("=" * 60 + "\n")

# Validate the chain
validation_result = forge.check("cmpt_pipeline")

print("\n" + "=" * 60)
print("  Validation Summary")
print("=" * 60)
print(f"\nValid: {validation_result['valid']}")
print(f"Errors: {len(validation_result.get('errors', []))}")
print(f"Warnings: {len(validation_result.get('warnings', []))}")

<a id="17-visualization"></a>
## 17. DAG Visualization

FlowForge can generate visual representations of the chain DAG in:
- ASCII art (for terminal)
- Mermaid.js (for documentation/web)

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                           DAG VISUALIZATION
# ═══════════════════════════════════════════════════════════════════════════════

print("\n" + "=" * 60)
print("  CMPT Chain DAG (ASCII)")
print("=" * 60 + "\n")

# Generate ASCII visualization
forge.graph("cmpt_pipeline", format="ascii")

In [None]:
# Mermaid visualization (can be rendered in Jupyter or documentation)
print("\n" + "=" * 60)
print("  CMPT Chain DAG (Mermaid)")
print("=" * 60 + "\n")

mermaid_output = forge.graph("cmpt_pipeline", format="mermaid")

# If you have mermaid rendering enabled in Jupyter:
# from IPython.display import display, Markdown
# display(Markdown(f"```mermaid\n{mermaid_output}\n```"))

<a id="18-execution"></a>
## 18. Running the Chain

Now let's execute the chain with sample data.

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                           CHAIN EXECUTION
# ═══════════════════════════════════════════════════════════════════════════════

async def run_cmpt_chain(
    company_name: str,
    meeting_date: Optional[str] = None,
    client_email: Optional[str] = None,
    verbose: bool = False,
) -> Dict:
    """
    Convenience function to run the CMPT chain.
    
    Args:
        company_name: Name of the company to analyze
        meeting_date: Meeting date (YYYY-MM-DD format)
        client_email: Client email address
        verbose: Enable verbose logging
    
    Returns:
        Final CMPT response dictionary
    """
    if verbose:
        logging.getLogger().setLevel(logging.DEBUG)

    data = {
        "company_name": company_name,
        "meeting_date": meeting_date or datetime.now().strftime("%Y-%m-%d"),
        "client_email": client_email,
    }

    result = await forge.launch("cmpt_pipeline", data=data)
    return result


# Execute the chain
print("\n" + "=" * 60)
print("  Running CMPT Chain")
print("=" * 60 + "\n")

result = await run_cmpt_chain(
    company_name="Apple Inc.",
    meeting_date="2025-01-15",
    verbose=False,
)

print("\n" + "=" * 60)
print("  Execution Results")
print("=" * 60)
print(f"\nSuccess: {result.get('success', 'N/A')}")
print(f"Steps completed: {len(result.get('step_results', []))}")

if result.get('context'):
    ctx = result['context']
    print(f"Context keys: {list(ctx.keys())}")

<a id="19-error-handling"></a>
## 19. Error Handling & Debugging

FlowForge provides several mechanisms for error handling:
- `error_handling="fail_fast"`: Stop on first error
- `error_handling="continue"`: Try to continue with other steps
- `debug_callback`: Called after each step for debugging

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                       ERROR HANDLING & DEBUGGING
# ═══════════════════════════════════════════════════════════════════════════════

# Debug callback example
def debug_callback(ctx, step_name, result):
    """
    Called after each step completes.
    Useful for monitoring execution and debugging.
    """
    success = result.get('success', True)
    duration = result.get('duration_ms', 0)
    status = "✓" if success else "✗"
    print(f"  {status} {step_name}: {duration:.0f}ms")


# Run with debug callback
print("\n" + "=" * 60)
print("  Running with Debug Callback")
print("=" * 60 + "\n")

result = await forge.launch(
    "cmpt_pipeline",
    data={"company_name": "Microsoft", "meeting_date": "2025-02-01"},
    debug_callback=debug_callback
)

print(f"\nFinal result: {'Success' if result.get('success') else 'Failed'}")

<a id="20-middleware"></a>
## 20. Middleware Integration (Advanced)

FlowForge supports middleware for cross-cutting concerns like logging, caching, and metrics.

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                           MIDDLEWARE EXAMPLE
# ═══════════════════════════════════════════════════════════════════════════════

# Example: Timing middleware
class TimingMiddleware:
    """
    Middleware that tracks execution time of each step.
    """
    def __init__(self):
        self.timings = {}
    
    async def before(self, ctx, step_name):
        """Called before step execution"""
        import time
        ctx.set(f"_timing_{step_name}_start", time.time())
    
    async def after(self, ctx, step_name, result):
        """Called after step execution"""
        import time
        start = ctx.get(f"_timing_{step_name}_start")
        if start:
            duration = time.time() - start
            self.timings[step_name] = duration
            logger.info(f"Step {step_name} took {duration:.2f}s")


# Register middleware
# timing_middleware = TimingMiddleware()
# forge.use(timing_middleware)

print("✅ Middleware example defined!")
print("   Note: In production, register with forge.use(middleware)")

<a id="21-resources"></a>
## 21. Additional Components from Old Code

The following sections document additional components from the original CMPT implementation that can be integrated as needed.

---

### 21.1 Metrics Validator (validation_utils.py)

The MetricsValidator validates extracted financial metrics against source documents to ensure LLM outputs are grounded.

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                    METRICS VALIDATOR (From validation_utils.py)
# ═══════════════════════════════════════════════════════════════════════════════

class MetricsValidator:
    """
    Validates extracted financial metrics against source chunks.
    
    This is CRITICAL for production - ensures that LLM extractions are 
    actually grounded in the source documents and not hallucinated.
    
    Key Validations:
    1. Citation has required fields (source_agent, source_content, reasoning)
    2. Quoted text actually exists in source documents
    3. Referenced agent names are valid
    4. Numeric values pass sanity checks
    """
    
    @staticmethod
    def validate_financial_metrics(
        metrics,  # FinancialMetricsResponse object
        source_chunks: Dict[str, str]
    ) -> Dict[str, Any]:
        """
        Validate extracted financial metrics against source chunks.
        
        Args:
            metrics: FinancialMetricsResponse object with extracted data
            source_chunks: Dictionary with agent names as keys and combined content as values
                          e.g., {"news_agent": "...", "earnings_agent": "...", "SEC_agent": "..."}
        
        Returns:
            Dictionary with validation results
        """
        validation_results = {
            "validation_summary": {
                "total_fields_checked": 0,
                "fields_with_values": 0,
                "fields_with_sources": 0,
                "sources_verified": 0,
                "sources_not_found": 0,
                "warnings_count": 0
            },
            "field_validations": {},
            "warnings": []
        }
        
        # Critical fields that require source validation
        critical_fields = [
            ("current_annual_revenue", "current_annual_revenue_citation"),
            ("current_annual_revenue_yoy_change", "current_annual_revenue_yoy_change_citation"),
            ("estimated_annual_revenue_next_year", "estimated_annual_revenue_next_year_citation"),
            ("ebitda_margin", "ebitda_margin_citation"),
            ("ebitda_margin_yoy_change", "ebitda_margin_yoy_change_citation"),
            ("stock_price", "stock_price_citation"),
            ("stock_price_yoy_change", "stock_price_yoy_change_citation"),
            ("market_cap", "market_cap_citation"),
            ("revenue_growth_trajectory", "revenue_growth_trajectory_citation")
        ]
        
        for value_field, citation_field in critical_fields:
            validation_results["validation_summary"]["total_fields_checked"] += 1
            
            field_value = getattr(metrics, value_field, None) if hasattr(metrics, value_field) else metrics.get(value_field)
            citation = getattr(metrics, citation_field, None) if hasattr(metrics, citation_field) else metrics.get(citation_field)
            
            field_validation = {
                "has_value": field_value is not None,
                "has_citation": citation is not None,
                "citation_verified": False,
                "extracted_numbers": [],
                "issues": []
            }
            
            if field_value is not None:
                validation_results["validation_summary"]["fields_with_values"] += 1
            
            if citation is not None:
                validation_results["validation_summary"]["fields_with_sources"] += 1
                
                # Extract citation data
                if isinstance(citation, dict):
                    source_agents = citation.get('source_agent', [])
                    source_contents = citation.get('source_content', [])
                    reasoning = citation.get('reasoning', '')
                elif hasattr(citation, 'source_agent'):
                    source_agents = citation.source_agent
                    source_contents = citation.source_content
                    reasoning = citation.reasoning
                else:
                    field_validation["issues"].append("Invalid citation format")
                    validation_results["field_validations"][value_field] = field_validation
                    continue
                
                # Validate citation completeness
                if not source_agents or not source_contents or not reasoning:
                    field_validation["issues"].append("Citation missing required fields")
                else:
                    # Verify quotes exist in source documents
                    verified_quotes = 0
                    for i, quote in enumerate(source_contents):
                        agent_name = source_agents[i] if i < len(source_agents) else source_agents[0]
                        if agent_name in source_chunks:
                            if MetricsValidator._verify_citation_in_sources(quote, source_chunks[agent_name]):
                                verified_quotes += 1
                            else:
                                field_validation["issues"].append(f"Quote #{i+1} not found in {agent_name}")
                    
                    if verified_quotes == len(source_contents) and verified_quotes > 0:
                        validation_results["validation_summary"]["sources_verified"] += 1
                        field_validation["citation_verified"] = True
                    else:
                        validation_results["validation_summary"]["sources_not_found"] += 1
            
            validation_results["field_validations"][value_field] = field_validation
        
        # Run sanity checks
        validation_results["sanity_checks"] = MetricsValidator._run_sanity_checks(metrics)
        validation_results["validation_summary"]["warnings_count"] = len(validation_results["warnings"])
        
        return validation_results
    
    @staticmethod
    def _verify_citation_in_sources(citation: str, sources: str) -> bool:
        """Check if citation exists in sources with fuzzy matching AND exact number matching"""
        if not citation or not sources:
            return False
        
        # Extract numbers from both
        citation_numbers = MetricsValidator._extract_numbers_from_text(citation)
        sources_numbers = MetricsValidator._extract_numbers_from_text(sources)
        
        # All citation numbers must exist in sources
        if citation_numbers:
            for num in citation_numbers:
                if not any(abs(num - src_num) < 0.01 for src_num in sources_numbers):
                    return False
        
        # Fuzzy text match (70% threshold)
        citation_clean = ' '.join(citation.lower().strip().split())
        sources_clean = ' '.join(sources.lower().split())
        
        min_match_length = int(len(citation_clean) * 0.70)
        for i in range(len(citation_clean) - min_match_length + 1):
            substring = citation_clean[i:i + min_match_length]
            if substring in sources_clean:
                return True
        
        # Word overlap fallback (75% threshold)
        citation_words = set(citation_clean.split())
        sources_words = set(sources_clean.split())
        if len(citation_words) > 0:
            overlap = len(citation_words & sources_words) / len(citation_words)
            if overlap >= 0.75:
                return True
        
        return False
    
    @staticmethod
    def _extract_numbers_from_text(text: str) -> List[float]:
        """Extract all numbers from text"""
        pattern = r'\$?\d+(?:,\d{3})*(?:\.\d+)?'
        matches = re.findall(pattern, text)
        numbers = []
        for match in matches:
            try:
                clean_number = match.replace('$', '').replace(',', '')
                numbers.append(float(clean_number))
            except ValueError:
                continue
        return numbers
    
    @staticmethod
    def _run_sanity_checks(metrics) -> Dict[str, Any]:
        """Run sanity checks on extracted metrics"""
        checks = {"passed": [], "failed": [], "warnings": []}
        
        # Get metric value (works with dict or object)
        def get_val(name):
            return getattr(metrics, name, None) if hasattr(metrics, name) else metrics.get(name)
        
        # Revenue should be positive
        revenue = get_val("current_annual_revenue")
        if revenue is not None:
            if revenue > 0:
                checks["passed"].append("Revenue is positive")
            else:
                checks["failed"].append(f"Revenue is non-positive: {revenue}")
        
        # EBITDA margin should be between -100 and 100
        ebitda = get_val("ebitda_margin")
        if ebitda is not None:
            if -100 <= ebitda <= 100:
                checks["passed"].append("EBITDA margin in reasonable range")
            else:
                checks["failed"].append(f"EBITDA margin out of range: {ebitda}%")
        
        # Stock price should be positive
        stock = get_val("stock_price")
        if stock is not None:
            if stock > 0:
                checks["passed"].append("Stock price is positive")
            else:
                checks["failed"].append(f"Stock price is non-positive: {stock}")
        
        return checks
    
    @staticmethod
    def print_validation_report(validation_results: Dict[str, Any]) -> None:
        """Pretty print validation results"""
        print("\n" + "=" * 80)
        print("FINANCIAL METRICS VALIDATION REPORT")
        print("=" * 80)
        
        summary = validation_results["validation_summary"]
        print(f"\n📊 SUMMARY:")
        print(f"  Total fields checked: {summary['total_fields_checked']}")
        print(f"  Fields with values: {summary['fields_with_values']}")
        print(f"  Fields with source citations: {summary['fields_with_sources']}")
        print(f"  Citations verified in sources: {summary['sources_verified']}")
        print(f"  Citations NOT found: {summary['sources_not_found']}")
        
        if validation_results["warnings"]:
            print(f"\n⚠️ WARNINGS ({len(validation_results['warnings'])}):")
            for warning in validation_results["warnings"][:5]:
                print(f"  - {warning}")
        
        sanity = validation_results["sanity_checks"]
        print(f"\n✓ SANITY CHECKS: {len(sanity['passed'])} passed, {len(sanity['failed'])} failed")
        print("=" * 80 + "\n")


print("✅ MetricsValidator class defined!")
print("   - validate_financial_metrics(): Main validation method")
print("   - _verify_citation_in_sources(): Fuzzy quote matching")
print("   - _run_sanity_checks(): Numeric value sanity checks")
print("   - print_validation_report(): Pretty print results")

### 21.2 Static Subquery Engine

The Static Subquery Engine generates pre-defined subqueries for each data agent. This is used when you need consistent, structured queries rather than dynamic LLM-generated ones.

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                    STATIC SUBQUERY ENGINE (From static_subquery_engine.py)
# ═══════════════════════════════════════════════════════════════════════════════

class StaticSubqueryEngine:
    """
    Generates static, pre-defined subqueries for data agents.
    
    This provides consistent, structured queries that can be used when:
    - You want deterministic query generation
    - You don't need dynamic LLM-generated queries
    - You want to ensure specific data coverage
    
    Subqueries are organized by agent type:
    - SEC_agent: Financial statements, balance sheet queries
    - news_agent: Recent news, executive changes, M&A
    - earnings_agent: Earnings transcript queries
    """
    
    @staticmethod
    def get_subquery_arguments(
        company_name: str,
        fiscal_year: str,
        fiscal_quarter: str
    ) -> Dict[str, List[Dict]]:
        """
        Generate subquery arguments for all data agents.
        
        Args:
            company_name: Company to query
            fiscal_year: e.g., "2025"
            fiscal_quarter: e.g., "1", "2", "3", "4"
            
        Returns:
            Dictionary with agent names as keys and list of subqueries as values
        """
        end_date = datetime.now().strftime('%Y-%m-%d')
        month_ago = (datetime.now() - timedelta(days=31)).strftime('%Y-%m-%d')
        five_days_ago = (datetime.now() - timedelta(days=5)).strftime('%Y-%m-%d')
        year_ago = (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d')
        
        return {
            # SEC Filing Subqueries
            ToolName.SEC_TOOL.value: [
                {
                    "reporting_entity": company_name,
                    "search_queries": [
                        "consolidated statements of operations",
                        "statements of income",
                        "total revenue"
                    ],
                    "keywords": [
                        "net sales", "total revenue", "quarterly revenue",
                        "three months ended", "revenue", "fiscal year"
                    ],
                    "get_latest": 8,  # Last 8 quarters for trajectory
                    "description": "Income statement data for revenue metrics"
                },
                {
                    "reporting_entity": company_name,
                    "search_queries": [
                        "consolidated balance sheets",
                        "stockholders equity",
                        "common stock outstanding shares"
                    ],
                    "keywords": [
                        "outstanding shares", "common stock", "shares outstanding",
                        "total stockholders equity"
                    ],
                    "get_latest": 1,  # Latest for market cap calculation
                    "description": "Balance sheet for shares outstanding"
                },
                {
                    "reporting_entity": company_name,
                    "search_queries": [
                        "operating income",
                        "depreciation and amortization",
                        "EBITDA"
                    ],
                    "keywords": [
                        "operating income", "depreciation", "amortization",
                        "D&A", "operating expenses"
                    ],
                    "get_latest": 2,
                    "description": "Data for EBITDA margin calculation"
                }
            ],
            
            # News Agent Subqueries
            ToolName.NEWS_TOOL.value: [
                {
                    "search_query": f"{company_name} executive leadership CEO CFO changes",
                    "topics": ["executive", "CEO", "CFO", "leadership", "management"],
                    "absolute_date_range": {"start_date": month_ago, "end_date": end_date},
                    "description": "Executive and leadership changes"
                },
                {
                    "search_query": f"{company_name} mergers acquisitions M&A deal",
                    "topics": ["merger", "acquisition", "M&A", "deal", "buyout"],
                    "absolute_date_range": {"start_date": month_ago, "end_date": end_date},
                    "description": "M&A activity"
                },
                {
                    "search_query": f"{company_name} stock price market cap valuation",
                    "topics": ["market cap", "stock price", "valuation", "shares"],
                    "absolute_date_range": {"start_date": five_days_ago, "end_date": end_date},
                    "description": "Recent stock price and market cap"
                },
                {
                    "search_query": f"{company_name} strategy growth expansion",
                    "topics": ["strategy", "growth", "expansion", "investment"],
                    "absolute_date_range": {"start_date": month_ago, "end_date": end_date},
                    "description": "Strategic developments"
                },
                {
                    "search_query": f"{company_name} risk regulatory lawsuit",
                    "topics": ["risk", "regulatory", "lawsuit", "compliance", "SEC"],
                    "absolute_date_range": {"start_date": month_ago, "end_date": end_date},
                    "description": "Risk and regulatory news"
                }
            ],
            
            # Earnings Agent Subqueries
            ToolName.EARNINGS_TOOL.value: [
                {
                    "company_name": company_name,
                    "fiscal_year": fiscal_year,
                    "fiscal_quarter": fiscal_quarter,
                    "query": f"Give me the earnings transcript for {company_name} for fiscal year: {fiscal_year} and quarter: {fiscal_quarter}.",
                    "description": "Latest earnings transcript"
                },
                {
                    "company_name": company_name,
                    "fiscal_year": str(int(fiscal_year) - 1) if fiscal_quarter == "1" else fiscal_year,
                    "fiscal_quarter": "4" if fiscal_quarter == "1" else str(int(fiscal_quarter) - 1),
                    "query": f"Give me the previous quarter earnings transcript for {company_name}.",
                    "description": "Prior quarter for YoY comparison"
                }
            ]
        }
    
    @staticmethod
    def get_subquery_count(subqueries: Dict[str, List[Dict]]) -> Dict[str, int]:
        """Get count of subqueries per agent"""
        return {agent: len(queries) for agent, queries in subqueries.items()}


# Example usage
print("✅ StaticSubqueryEngine class defined!")

# Demo
demo_subqueries = StaticSubqueryEngine.get_subquery_arguments("Apple Inc.", "2025", "1")
counts = StaticSubqueryEngine.get_subquery_count(demo_subqueries)
print(f"\n📊 Subquery counts for 'Apple Inc.':")
for agent, count in counts.items():
    print(f"   - {agent}: {count} subqueries")

### 21.3 LLM Prompts (From llm_prompts.py)

These are the detailed system prompts used for LLM extraction. They provide extensive guidance on:
- What to extract
- How to format citations
- Calculation methods for derived metrics

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                    LLM PROMPTS (From llm_prompts.py)
# ═══════════════════════════════════════════════════════════════════════════════

# Data templates for prompts (filled with agent content)
DATA_FOR_FINANCIAL_METRICS_PROMPT = """
[SEC_AGENT]
Agent Guidance:
- Primary source for all financial metrics
- Extract precise numbers from financial statements
- Look for year-over-year comparisons in financial statements
{SEC_AGENT_CONTENT}

[EARNINGS_AGENT]
Agent Guidance:
- Use for forward guidance and quarterly commentary
- Extract YoY growth rates mentioned in management discussion
{EARNINGS_AGENT_CONTENT}

[NEWS_AGENT]
Agent Guidance:
- Extract current stock price and today's price movement if mentioned in recent articles
- Look for daily price changes (e.g., "stock up $1.39 or 0.56% today")
{NEWS_AGENT_CONTENT}
"""

DATA_FOR_STRATEGIC_ANALYSIS_PROMPT = """
[NEWS_AGENT]
Agent Guidance:
- Include {NEWS_percentage}% of the response from this agent chunks
- Extract date, category, and source URL for each development
{NEWS_AGENT_CONTENT}

[EARNINGS_AGENT]
Agent Guidance:
- Include {EARNINGS_percentage}% of the response from this agent chunks
{EARNINGS_AGENT_CONTENT}

[SEC_AGENT]
Agent Guidance:
- Include {SEC_percentage}% of the response from this agent chunks
{SEC_AGENT_CONTENT}
"""

# Main prompt for financial metrics extraction
FINANCIAL_METRICS_SYSTEM_PROMPT = """
## Task: Extract Financial Metrics

You are analyzing financial data for {COMPANY_NAME}. Extract precise financial metrics from SEC filings and earnings reports.

## CRITICAL JSON FORMAT REQUIREMENT:
**For ALL numeric fields (float type):**
- Return actual numbers (e.g., 96.773, 25.5, 1.39) OR the JSON null value
- NEVER return strings like "<UNKNOWN>", "N/A", "null", "None", or any text placeholders
- If data is genuinely missing after exhausting all sources, return null (JSON null, not the string "null")

## CRITICAL: Source Attribution (REQUIRED FOR VERIFICATION)
For ALL numeric fields, you MUST provide citations as dictionaries with these keys:
- **source_agent**: List of agent names (e.g., ["SEC_agent", "earnings_agent"])
- **source_content**: List of VERBATIM quotes from those agents (COPY-PASTE EXACTLY - DO NOT PARAPHRASE)
- **reasoning**: String explaining your extraction/calculation logic

**IMPORTANT CITATION RULES:**
1. **VERBATIM QUOTES ONLY**: Copy-paste the EXACT text from source documents into source_content
2. **QUOTE LENGTH**: Each quote should be 20-100 words (enough context to verify)
3. **SHOW YOUR WORK**: For calculated metrics (like EBITDA margin), include actual calculation in reasoning

## Extraction Guidelines:

**Revenue Metrics (REQUIRED):**
- Current annual revenue: Most recent 10-K or annualized 10-Q
- Year-over-year change: Calculate or extract YoY % change in revenue
- Next year estimate: Company guidance or extrapolate from growth trends

**EBITDA Margin (REQUIRED - CALCULATE IF NEEDED):**
- First, search for explicit "EBITDA" in SEC_AGENT
- If not found, calculate: (Operating Income + Depreciation + Amortization) / Revenue × 100
- MUST show calculation in reasoning

**Stock Price (REQUIRED - MULTIPLE SOURCES):**
1. Check NEWS_AGENT for recent price mentions
2. Check SEC filing cover pages for "Class A Common Stock" price
3. Extract daily change in both dollars and percentage
4. Extract year-over-year change

**Market Cap (CALCULATE IF NEEDED):**
- Formula: Stock Price × Outstanding Shares
- Find "Outstanding Shares" in SEC_AGENT

**Revenue Growth Trajectory (REQUIRED):**
- Build dictionary of last 7 quarterly revenues
- Use fiscal quarter notation: "Q1 FY2026", "Q3 FY2025"

## Data Source Strategy:
- SEC_AGENT: Financial statements, balance sheet, cash flow
- EARNINGS_AGENT: Management commentary, guidance, Q&A mentions
- NEWS_AGENT: Recent price action, market cap references, daily stock movements

**Your goal: Maximize data extraction with VERBATIM, VERIFIABLE citations.**
"""

# Main prompt for strategic analysis
STRATEGIC_ANALYSIS_SYSTEM_PROMPT = """
## Task: Strategic Analysis for Client Meeting

You are preparing a strategic briefing for {COMPANY_NAME} for an RBC Capital Markets client meeting.

**CRITICAL: You must fill ALL fields in the response schema. Do not skip any fields.**

## Required Output Structure (ALL FIELDS MANDATORY):

### 1. SWOT Analysis (strength, weakness, opportunity, threat)
- 4-6 bullets each
- Each bullet: 15-25 words, specific, data-backed

### 2. Investment Thesis (investment_thesis field - REQUIRED)
**Format**: List of 3-4 dictionaries, each with:
- Key: One subheading (e.g., "Growth Drivers", "Competitive Moat")
- Value: List of 2-4 bullet points (15-30 words each)

### 3. Key Risk Highlights (key_risk_highlights field)
- 5-7 critical risks
- Each bullet: 15-30 words with impact and timeline

### 4. Strategic Opportunities (strategic_opportunities field - REQUIRED)
**Format**: List of 3-4 dictionaries with:
- Key: Category (e.g., "M&A Advisory", "Capital Raising")
- Value: List of 2-3 specific opportunities (15-30 words each)

### 5. Recent Developments (recent_developments field - REQUIRED)
**Format**: List of 4-6 dictionaries with:
- **category**: ONE of: "News", "M&A", "Management", "Company", "Industry"
- **header**: 5-10 word title/summary
- **date**: Date in format "MMM DD YYYY"
- **description**: 20-40 words describing what happened
- **source_url**: Full URL to source

### 6. Sources (sources field)
- 8-12 sources cited with URLs
"""


def build_financial_metrics_prompt(
    company_name: str,
    sec_content: str,
    earnings_content: str,
    news_content: str
) -> str:
    """Build the complete financial metrics prompt"""
    data_section = DATA_FOR_FINANCIAL_METRICS_PROMPT.format(
        SEC_AGENT_CONTENT=sec_content or "No SEC data available",
        EARNINGS_AGENT_CONTENT=earnings_content or "No earnings data available",
        NEWS_AGENT_CONTENT=news_content or "No news data available"
    )
    
    return FINANCIAL_METRICS_SYSTEM_PROMPT.format(
        COMPANY_NAME=company_name
    ) + "\n\n## Source Data:\n" + data_section


def build_strategic_analysis_prompt(
    company_name: str,
    news_content: str,
    earnings_content: str,
    sec_content: str,
    news_pct: int = 40,
    earnings_pct: int = 30,
    sec_pct: int = 30
) -> str:
    """Build the complete strategic analysis prompt"""
    data_section = DATA_FOR_STRATEGIC_ANALYSIS_PROMPT.format(
        NEWS_AGENT_CONTENT=news_content or "No news data available",
        EARNINGS_AGENT_CONTENT=earnings_content or "No earnings data available",
        SEC_AGENT_CONTENT=sec_content or "No SEC data available",
        NEWS_percentage=news_pct,
        EARNINGS_percentage=earnings_pct,
        SEC_percentage=sec_pct
    )
    
    return STRATEGIC_ANALYSIS_SYSTEM_PROMPT.format(
        COMPANY_NAME=company_name
    ) + "\n\n## Source Data:\n" + data_section


print("✅ LLM Prompts defined!")
print("   - DATA_FOR_FINANCIAL_METRICS_PROMPT: Data template")
print("   - DATA_FOR_STRATEGIC_ANALYSIS_PROMPT: Data template with source weights")
print("   - FINANCIAL_METRICS_SYSTEM_PROMPT: Full extraction instructions")
print("   - STRATEGIC_ANALYSIS_SYSTEM_PROMPT: Full SWOT/thesis instructions")
print("   - build_financial_metrics_prompt(): Helper to build complete prompt")
print("   - build_strategic_analysis_prompt(): Helper to build complete prompt")

### 21.4 Persona Extraction Services (LDAP & ZoomInfo)

The original CMPT chain includes optional persona extraction for:
- **RBC Persona**: Employee information from internal LDAP
- **Corporate Client Persona**: Client information from ZoomInfo

These are marked as "deferred" in the original architecture but can be enabled for enhanced personalization.

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#              PERSONA EXTRACTION SERVICES (LDAP & ZoomInfo)
# ═══════════════════════════════════════════════════════════════════════════════

"""
Persona Extraction: Optional components for enhanced personalization.

These services can be integrated as additional steps in the Context Builder
to provide persona information about meeting attendees.

Architecture:
- LDAPService: Looks up RBC employee info by email (internal)
- ZoomInfoService: Looks up external client info (corporate clients)

Both services follow the LookupServiceInterface pattern.
"""

# Base interface that both services implement
class LookupServiceInterface:
    """Interface for persona lookup services"""
    def lookup(self, identifiers: List[str]) -> List[Dict[str, Any]]:
        """Lookup profiles by identifiers (email, name, etc.)"""
        raise NotImplementedError


class LDAPService(LookupServiceInterface):
    """
    LDAP Service for RBC employee persona extraction.
    
    Looks up employee information from internal LDAP directory.
    
    Usage:
        ldap_service = LDAPService(LDAPEmailStrategy())
        profiles = ldap_service.lookup(["employee@rbc.com"])
    """
    
    def __init__(self, strategy=None):
        """
        Initialize with a lookup strategy.
        
        Args:
            strategy: Strategy for LDAP lookup (email, name, etc.)
        """
        self.strategy = strategy
    
    def lookup(self, identifiers: List[str]) -> List[Dict[str, Any]]:
        """
        Lookup employee profiles from LDAP.
        
        Returns:
            List of employee profiles with:
            - name: Full name
            - email: Email address
            - title: Job title
            - department: Department name
            - division: Division name
            - manager: Manager name
        """
        # Placeholder - in production this would connect to LDAP
        # Example using ldap3 library:
        # from ldap3 import Server, Connection, SUBTREE
        # server = Server(os.getenv('LDAP_SERVER'))
        # conn = Connection(server, user=os.getenv('LDAP_USER'), password=os.getenv('LDAP_PASS'))
        # conn.search(base_dn, f'(mail={email})', SUBTREE, attributes=['cn', 'title', 'department'])
        
        logger.info(f"LDAP lookup for: {identifiers}")
        return []


class ZoomInfoService:
    """
    ZoomInfo Service for corporate client persona extraction.
    
    Looks up external client information from ZoomInfo API.
    
    Features:
    - Email-based lookup: Direct profile match
    - Name-based lookup: Fuzzy match with company ranking
    """
    
    def __init__(self):
        self.api_url = os.getenv("ZOOMINFO_API_URL")
        self.api_key = os.getenv("ZOOMINFO_API_KEY")
    
    def get_user_details_from_zoominfo(self, email: str = None, name: str = None):
        """
        Get user details from ZoomInfo API.
        
        Args:
            email: Email address for direct lookup
            name: Name for fuzzy lookup
            
        Returns:
            ZoomInfo API response
        """
        # Placeholder - in production this would call ZoomInfo API
        # headers = {"Authorization": f"Bearer {self.api_key}"}
        # if email:
        #     response = requests.get(f"{self.api_url}/person/search", params={"email": email}, headers=headers)
        # elif name:
        #     response = requests.get(f"{self.api_url}/person/search", params={"fullName": name}, headers=headers)
        
        logger.info(f"ZoomInfo lookup for: {email or name}")
        
        # Return mock response structure
        class MockResponse:
            status_code = 200
            def json(self):
                return {"data": []}
        return MockResponse()
    
    def lookup_by_names(self, names_string: str) -> List[Dict[str, Any]]:
        """
        Lookup profiles by name string.
        
        Args:
            names_string: Comma-separated names
            
        Returns:
            List of matching profiles
        """
        names = [n.strip() for n in names_string.split(",")]
        results = []
        for name in names:
            response = self.get_user_details_from_zoominfo(name=name)
            if response.status_code == 200:
                data = response.json().get("data", [])
                results.extend(data)
        return results


def rank_profiles_by_company(profiles: List[Dict], company_name: str) -> List[Dict]:
    """
    Rank ZoomInfo profiles by similarity to target company.
    
    When searching by name, multiple profiles may be returned.
    This function ranks them by company match quality.
    
    Args:
        profiles: List of ZoomInfo profiles
        company_name: Target company to match
        
    Returns:
        Profiles sorted by company similarity (highest first)
    """
    if not profiles or not company_name:
        return profiles
    
    for profile in profiles:
        company_from_profile = profile.get('company', '') or profile.get('department', '')
        if company_from_profile:
            similarity = SequenceMatcher(
                None, 
                company_name.lower(), 
                company_from_profile.lower()
            ).ratio()
            profile['company_similarity_score'] = similarity
        else:
            profile['company_similarity_score'] = 0.0
    
    return sorted(profiles, key=lambda x: x.get('company_similarity_score', 0), reverse=True)


# Example FlowForge steps for persona extraction (optional)
# These can be added to the chain if persona extraction is needed

# @forge.step(
#     name="extract_rbc_persona",
#     deps=["extract_company_info"],
#     produces=["rbc_persona"],
#     description="Extract RBC employee persona from LDAP",
#     timeout_ms=10000,
#     group="context_builder"
# )
# async def extract_rbc_persona(ctx: ChainContext) -> Dict:
#     """Extract RBC employee persona using LDAP"""
#     rbc_email = ctx.get("rbc_employee_email")
#     if not rbc_email:
#         return {"rbc_persona": None}
#     
#     ldap_service = LDAPService(LDAPEmailStrategy())
#     profiles = ldap_service.lookup([rbc_email])
#     persona = profiles[0] if profiles else None
#     ctx.set("rbc_persona", persona, scope=ContextScope.CHAIN)
#     return {"rbc_persona": persona}


# @forge.step(
#     name="extract_client_persona",
#     deps=["extract_company_info"],
#     produces=["corporate_client_persona"],
#     description="Extract corporate client persona from ZoomInfo",
#     timeout_ms=15000,
#     group="context_builder"
# )
# async def extract_client_persona(ctx: ChainContext) -> Dict:
#     """Extract corporate client persona using ZoomInfo"""
#     client_email = ctx.get("client_email")
#     client_names = ctx.get("client_names")
#     company_name = ctx.get("company_name")
#     
#     zoom_service = ZoomInfoService()
#     
#     if client_email:
#         response = zoom_service.get_user_details_from_zoominfo(email=client_email)
#         profiles = response.json().get("data", []) if response.status_code == 200 else []
#     elif client_names and company_name:
#         profiles = zoom_service.lookup_by_names(client_names)
#         profiles = rank_profiles_by_company(profiles, company_name)
#     else:
#         profiles = []
#     
#     persona = profiles[0] if profiles else None
#     ctx.set("corporate_client_persona", persona, scope=ContextScope.CHAIN)
#     return {"corporate_client_persona": persona}


print("✅ Persona extraction services defined!")
print("   - LookupServiceInterface: Base interface")
print("   - LDAPService: RBC employee lookup")
print("   - ZoomInfoService: External client lookup")
print("   - rank_profiles_by_company(): Company matching helper")
print("\n📌 Note: Persona extraction steps are commented out.")
print("   Uncomment and add to chain.steps to enable.")

<a id="22-testing"></a>
## 22. Testing Strategies

FlowForge supports several testing approaches for chain validation.

In [None]:
# ═══════════════════════════════════════════════════════════════════════════════
#                           TESTING STRATEGIES
# ═══════════════════════════════════════════════════════════════════════════════

"""
FlowForge Testing Patterns:

1. Unit Testing: Test individual steps in isolation
2. Integration Testing: Test full chain with mocked agents
3. Validation Testing: Use forge.check() for structural validation
4. End-to-End Testing: Full chain with real/mock external services
"""

# Example: Unit test for a step
async def test_extract_company_info():
    """Unit test example for extract_company_info step"""
    from flowforge.core.context import ChainContext, ContextManager
    
    # Create isolated context
    ctx = ContextManager().create_context("test_chain")
    ctx.set("company_name", "Test Corp")
    
    # Run the step function directly
    result = await extract_company_info(ctx)
    
    # Assertions
    assert "company_info" in result
    assert result["company_info"]["company_name"] == "Test Corp"
    print("✅ Unit test passed!")


# Example: Integration test with mocked services
async def test_cmpt_integration():
    """Integration test using FlowForge's isolated mode"""
    
    # Create isolated forge instance (prevents state bleed)
    test_forge = FlowForge(name="cmpt_test", isolated=True)
    
    # Register steps (same as main implementation)
    # ... (steps would be registered here)
    
    # Run chain with test data
    result = await test_forge.launch(
        "cmpt_pipeline",
        data={
            "company_name": "Apple Inc",
            "meeting_date": "2025-02-15"
        }
    )
    
    # Assertions
    assert result.get("success") is True
    print("✅ Integration test passed!")


# Validation test
def test_chain_validation():
    """Test chain structure using forge.check()"""
    validation = forge.check("cmpt_pipeline")
    
    assert validation["valid"] is True, f"Validation failed: {validation.get('errors')}"
    assert len(validation.get("errors", [])) == 0
    print("✅ Validation test passed!")


# Run validation test
test_chain_validation()

print("\n📋 Testing strategies available:")
print("   - test_extract_company_info(): Unit test example")
print("   - test_cmpt_integration(): Integration test example")
print("   - test_chain_validation(): Structural validation test")

---

## 📚 Complete Summary: Old Code → FlowForge Migration

### What Was Migrated from `requirements_and_old_code.MD`:

| Old Code Component | FlowForge Implementation | Section |
|--------------------|--------------------------|---------|
| `ChainServerRequest` | `CMPTRequest` model | §5 |
| `FinancialMetricsResponse`, `StrategicAnalysisResponse` | Pydantic models with CitationDict | §5 |
| `grid_config.py` (GRID) | `GRID_CONFIG` dictionary | §6 |
| `ContextBuilderService.corporate_client_firm_extractor` | `extract_company_info` step | §11 |
| `ContextBuilderService.temporal_content_extractor` | `extract_temporal_context` step | §11 |
| `ContentPrioritizationService.temporal_source_prioritizer` | `temporal_source_prioritizer` step | §12 |
| `StaticSubqueryEngine.get_subquery_arguments` | `generate_subqueries` step + `StaticSubqueryEngine` class | §12, §21.2 |
| `ResponseBuilderAndGenerator.context_parser` | `parse_agent_data` step | §14 |
| `ResponseBuilderAndGenerator.prompt_builder` | `build_prompts` step | §14 |
| `ResponseBuilderAndGenerator.get_structured_response` | `generate_financial_metrics`, `generate_strategic_analysis` steps | §14 |
| `MetricsValidator.validate_financial_metrics` | `validate_metrics` step + `MetricsValidator` class | §14, §21.1 |
| `ChainOrchestrator.execute_chain` | `CMPTPipeline` chain + `build_response` step | §10 |
| `llm_prompts.py` | LLM prompt templates and builders | §21.3 |
| `ldap_service.py`, `zoom_info_service.py` | `LDAPService`, `ZoomInfoService` classes | §21.4 |

### FlowForge Benefits Over Old Code:

| Feature | Old Code | FlowForge |
|---------|----------|-----------|
| **Execution** | Sequential service calls | Automatic DAG parallelization |
| **Dependencies** | Manual orchestration | Declarative `deps=[]` |
| **Context** | Passed between functions | `ChainContext` with scoped storage |
| **Error Handling** | Try/catch in each service | Configurable `fail_fast` vs `continue` |
| **Validation** | Manual | Built-in `forge.check()` |
| **Visualization** | None | ASCII/Mermaid DAG diagrams |
| **Testing** | Per-service tests | Isolated forge instances |

### Pipeline Execution Comparison:

**Old Code (Sequential):**
```
ContextBuilder.execute() → ContentPrioritization.execute() → ResponseBuilder.execute()
```

**FlowForge (Parallel where possible):**
```
extract_company_info → extract_temporal_context → build_context
                                                       ↓
              ┌──────────────────────────────────────────────────────────────┐
              ↓                                                              ↓
temporal_source_prioritizer                                    generate_subqueries
              └──────────────────────────┬───────────────────────────────────┘
                                         ↓
                                  prioritize_content
                                         ↓
           ┌───────────────────────┬─────┴─────┬───────────────────────┐
           ↓                       ↓           ↓                       ↓
    fetch_news_data         fetch_sec_data    fetch_earnings_data
           └───────────────────────┴─────┬─────┴───────────────────────┘
                                         ↓
                                  parse_agent_data → build_prompts
                                                          ↓
                           ┌──────────────────────────────┴──────────────────────────────┐
                           ↓                                                             ↓
               generate_financial_metrics                              generate_strategic_analysis
                           ↓                                                             │
                    validate_metrics ────────────────────────────────────────────────────┤
                                                                                         ↓
                                                                                  build_response
```

---

## 🚀 Production Deployment Checklist

### Environment Variables Required:
```bash
# Foundation Services
FOUNDATION_COMPANY_MATCHES=https://...
FOUNDATION_EARNING_CALENDAR_URL=https://...

# MCP Agents
NEWS_AGENT_MCP_URL=https://...
NEWS_AGENT_MCP_BEARER_TOKEN=...
SEC_AGENT_MCP_URL=https://...
SEC_AGENT_MCP_BEARER_TOKEN=...
EARNINGS_AGENT_MCP_URL=https://...
EARNINGS_AGENT_MCP_BEARER_TOKEN=...

# LLM Gateway
LLM_SERVER_URL=https://...
LLM_OAUTH_ENDPOINT=https://...
LLM_CLIENT_ID=...
LLM_CLIENT_SECRET=...

# Optional: Persona Services
LDAP_SERVER=...
ZOOMINFO_API_URL=...
ZOOMINFO_API_KEY=...
```

### Production Enhancements:
1. **Replace placeholder `_execute_query` methods** with real MCP client calls
2. **Connect LLM Gateway** for metrics/analysis generation
3. **Enable caching middleware** for repeated queries
4. **Add monitoring/metrics** middleware
5. **Configure retry/timeout** per step based on SLAs
6. **Enable persona extraction** if needed

---

## 📖 Reference: FlowForge API Quick Guide

| API | Description | Example |
|-----|-------------|---------|
| `FlowForge(name, ...)` | Create forge instance | `forge = FlowForge(name="cmpt")` |
| `@forge.agent(name)` | Register data agent class | `@forge.agent(name="news_agent")` |
| `@forge.step(name, deps, produces)` | Register chain step | `@forge.step(name="fetch", deps=["build"])` |
| `@forge.chain(name)` | Register chain definition | `@forge.chain(name="pipeline")` |
| `forge.check(chain_name)` | Validate chain | `forge.check("pipeline")` |
| `forge.graph(chain_name)` | Visualize DAG | `forge.graph("pipeline", format="ascii")` |
| `forge.launch(chain_name, data)` | Execute chain | `await forge.launch("pipeline", data={...})` |
| `forge.get_agent(name)` | Get registered agent | `agent = forge.get_agent("news_agent")` |
| `forge.list_agents()` | List all agents | `forge.list_agents()` |
| `ctx.get(key)` | Read from context | `value = ctx.get("company_info")` |
| `ctx.set(key, value, scope)` | Write to context | `ctx.set("data", val, scope=ContextScope.CHAIN)` |

---

**End of Tutorial**

This notebook provides a complete implementation of the CMPT chain using FlowForge, migrating all components from the original `requirements_and_old_code.MD` file.