# ‚öôÔ∏è Module 9: Workflow Orchestration

**AI Agent Architectures Workshop - Day 2**

This notebook covers:
- Azure Durable Functions for agent workflows
- Logic Apps for visual orchestration
- State machine logic and branching
- Adaptive RAG with multi-agent systems

**Prerequisites:** Run `00_setup.ipynb` first.

In [None]:
!pip install openai python-dotenv --quiet

In [None]:
import os
import json
import asyncio
from enum import Enum
from dataclasses import dataclass
from typing import Optional, List

# =============================================================================
# GOOGLE COLAB SETUP - Add these secrets (click üîë icon):
#   - AZURE_OPENAI_KEY: Your API key
#   - AZURE_OPENAI_ENDPOINT: https://xxx.openai.azure.com/
#   - AZURE_OPENAI_DEPLOYMENT: Your model deployment name
# =============================================================================

DEMO_MODE = False
client = None
MODEL_NAME = "gpt-4o"

try:
    from google.colab import userdata
    AZURE_OPENAI_KEY = userdata.get('AZURE_OPENAI_KEY')
    AZURE_OPENAI_ENDPOINT = userdata.get('AZURE_OPENAI_ENDPOINT')
    try:
        MODEL_NAME = userdata.get('AZURE_OPENAI_DEPLOYMENT')
    except:
        pass
    if AZURE_OPENAI_KEY and AZURE_OPENAI_ENDPOINT:
        if not AZURE_OPENAI_ENDPOINT.startswith('http'):
            AZURE_OPENAI_ENDPOINT = 'https://' + AZURE_OPENAI_ENDPOINT
        print(f"‚úÖ Credentials loaded. Model: {MODEL_NAME}")
    else:
        raise ValueError("Missing")
except Exception:
    print("‚ö†Ô∏è Running in DEMO MODE")
    DEMO_MODE = True

if not DEMO_MODE:
    from openai import AzureOpenAI
    client = AzureOpenAI(
        api_key=AZURE_OPENAI_KEY,
        api_version="2024-06-01",
        azure_endpoint=AZURE_OPENAI_ENDPOINT
    )
    print("‚úÖ Client ready")

## 1. State Machine Pattern for Agent Workflows

Model agent workflows as state machines for predictable execution.

In [None]:
class LoanState(Enum):
    SUBMITTED = "submitted"
    DOCUMENT_REVIEW = "document_review"
    CREDIT_CHECK = "credit_check"
    UNDERWRITING = "underwriting"
    HUMAN_REVIEW = "human_review"
    APPROVED = "approved"
    REJECTED = "rejected"
    PENDING_INFO = "pending_info"

@dataclass
class LoanApplication:
    id: str
    applicant_name: str
    amount: float
    state: LoanState = LoanState.SUBMITTED
    credit_score: Optional[int] = None
    risk_score: Optional[float] = None
    notes: List[str] = None
    
    def __post_init__(self):
        if self.notes is None:
            self.notes = []

class LoanWorkflowOrchestrator:
    """State machine orchestrator for loan processing"""
    
    # Define valid state transitions
    TRANSITIONS = {
        LoanState.SUBMITTED: [LoanState.DOCUMENT_REVIEW],
        LoanState.DOCUMENT_REVIEW: [LoanState.CREDIT_CHECK, LoanState.PENDING_INFO],
        LoanState.CREDIT_CHECK: [LoanState.UNDERWRITING, LoanState.REJECTED],
        LoanState.UNDERWRITING: [LoanState.APPROVED, LoanState.HUMAN_REVIEW, LoanState.REJECTED],
        LoanState.HUMAN_REVIEW: [LoanState.APPROVED, LoanState.REJECTED, LoanState.PENDING_INFO],
        LoanState.PENDING_INFO: [LoanState.DOCUMENT_REVIEW],
    }
    
    def can_transition(self, current: LoanState, target: LoanState) -> bool:
        return target in self.TRANSITIONS.get(current, [])
    
    def transition(self, app: LoanApplication, target: LoanState, note: str = None):
        if not self.can_transition(app.state, target):
            raise ValueError(f"Invalid transition: {app.state} -> {target}")
        
        old_state = app.state
        app.state = target
        if note:
            app.notes.append(f"[{old_state.value} -> {target.value}] {note}")
        print(f"   üìç {old_state.value} ‚Üí {target.value}")

print("‚úÖ State machine defined")

In [None]:
def agent_document_review(app: LoanApplication) -> dict:
    """Agent reviews submitted documents"""
    prompt = f"""Review this loan application for document completeness:
    Applicant: {app.applicant_name}
    Amount: ${app.amount:,.2f}
    
    Check for: ID verification, income proof, employment verification.
    Respond with JSON: {{"complete": true/false, "missing": [list], "notes": "string"}}"""
    
    response = client.chat.completions.create(
        model=MODEL_NAME,
        messages=[{"role": "user", "content": prompt}],
        response_format={"type": "json_object"}
    )
    return json.loads(response.choices[0].message.content)

def agent_credit_check(app: LoanApplication) -> dict:
    """Agent performs credit analysis"""
    # Simulate credit check
    import random
    app.credit_score = random.randint(580, 850)
    
    prompt = f"""Analyze credit worthiness:
    Credit Score: {app.credit_score}
    Loan Amount: ${app.amount:,.2f}
    
    Respond with JSON: {{"approved": true/false, "risk_level": "low/medium/high", "notes": "string"}}"""
    
    response = client.chat.completions.create(
        model=MODEL_NAME,
        messages=[{"role": "user", "content": prompt}],
        response_format={"type": "json_object"}
    )
    return json.loads(response.choices[0].message.content)

def agent_underwriting(app: LoanApplication) -> dict:
    """Agent performs underwriting decision"""
    prompt = f"""Make underwriting decision:
    Applicant: {app.applicant_name}
    Amount: ${app.amount:,.2f}
    Credit Score: {app.credit_score}
    
    Decision criteria:
    - Auto-approve: Score >= 720, Amount <= $100K
    - Human review: Score 650-719 OR Amount > $100K
    - Auto-reject: Score < 650
    
    Respond with JSON: {{"decision": "approve/human_review/reject", "risk_score": 0.0-1.0, "reasoning": "string"}}"""
    
    response = client.chat.completions.create(
        model=MODEL_NAME,
        messages=[{"role": "user", "content": prompt}],
        response_format={"type": "json_object"}
    )
    result = json.loads(response.choices[0].message.content)
    app.risk_score = result.get("risk_score", 0.5)
    return result

print("‚úÖ Agent functions defined")

In [None]:
def run_loan_workflow(app: LoanApplication) -> LoanApplication:
    """Execute the complete loan workflow"""
    orchestrator = LoanWorkflowOrchestrator()
    
    print(f"\nüè¶ Processing Loan Application: {app.id}")
    print(f"   Applicant: {app.applicant_name}")
    print(f"   Amount: ${app.amount:,.2f}")
    print("\nüìã Workflow Execution:")
    
    # Stage 1: Document Review
    orchestrator.transition(app, LoanState.DOCUMENT_REVIEW)
    doc_result = agent_document_review(app)
    
    if not doc_result.get("complete", False):
        orchestrator.transition(app, LoanState.PENDING_INFO, 
                               f"Missing: {doc_result.get('missing', [])}")
        return app
    
    # Stage 2: Credit Check
    orchestrator.transition(app, LoanState.CREDIT_CHECK, doc_result.get("notes"))
    credit_result = agent_credit_check(app)
    
    if not credit_result.get("approved", False):
        orchestrator.transition(app, LoanState.REJECTED, 
                               f"Credit check failed: {credit_result.get('notes')}")
        return app
    
    # Stage 3: Underwriting
    orchestrator.transition(app, LoanState.UNDERWRITING, 
                           f"Credit score: {app.credit_score}")
    uw_result = agent_underwriting(app)
    
    decision = uw_result.get("decision", "reject")
    if decision == "approve":
        orchestrator.transition(app, LoanState.APPROVED, uw_result.get("reasoning"))
    elif decision == "human_review":
        orchestrator.transition(app, LoanState.HUMAN_REVIEW, uw_result.get("reasoning"))
    else:
        orchestrator.transition(app, LoanState.REJECTED, uw_result.get("reasoning"))
    
    return app

# Test the workflow
test_app = LoanApplication(
    id="LOAN-2024-001",
    applicant_name="John Smith",
    amount=75000.00
)

result = run_loan_workflow(test_app)
print(f"\n‚úÖ Final State: {result.state.value}")
print(f"   Credit Score: {result.credit_score}")
print(f"   Risk Score: {result.risk_score}")

## 2. Azure Durable Functions Pattern

Simulate Durable Functions orchestration pattern.

In [None]:
# Simulated Durable Functions pattern
# In production, this would use azure-functions-durable

class DurableOrchestrator:
    """Simulates Azure Durable Functions orchestration"""
    
    def __init__(self):
        self.activities = {}
        self.execution_log = []
    
    def register_activity(self, name: str, func):
        """Register an activity function"""
        self.activities[name] = func
    
    async def call_activity(self, name: str, input_data: dict) -> dict:
        """Call an activity function (with retry logic)"""
        if name not in self.activities:
            raise ValueError(f"Activity '{name}' not registered")
        
        self.execution_log.append({"activity": name, "input": input_data})
        
        # Simulate activity execution
        result = self.activities[name](input_data)
        
        self.execution_log[-1]["output"] = result
        return result
    
    async def call_activity_with_retry(self, name: str, input_data: dict, 
                                       max_retries: int = 3) -> dict:
        """Call activity with exponential backoff retry"""
        for attempt in range(max_retries):
            try:
                return await self.call_activity(name, input_data)
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                wait_time = 2 ** attempt
                print(f"   ‚ö†Ô∏è Retry {attempt + 1}/{max_retries} after {wait_time}s")
                await asyncio.sleep(wait_time)
    
    async def fan_out_fan_in(self, activity_name: str, inputs: list) -> list:
        """Execute activities in parallel and collect results"""
        tasks = [self.call_activity(activity_name, inp) for inp in inputs]
        return await asyncio.gather(*tasks)

# Define activity functions
def activity_extract_data(input_data: dict) -> dict:
    """Extract data from document"""
    return {"extracted": True, "fields": ["name", "income", "employment"]}

def activity_validate_data(input_data: dict) -> dict:
    """Validate extracted data"""
    return {"valid": True, "errors": []}

def activity_call_agent(input_data: dict) -> dict:
    """Call LLM agent for analysis"""
    response = client.chat.completions.create(
        model=MODEL_NAME,
        messages=[{"role": "user", "content": f"Analyze: {input_data}"}],
        max_tokens=100
    )
    return {"analysis": response.choices[0].message.content[:100]}

print("‚úÖ Durable orchestrator defined")

In [None]:
async def loan_orchestration_workflow(application: dict):
    """Main orchestration workflow (simulates Durable Functions)"""
    orchestrator = DurableOrchestrator()
    
    # Register activities
    orchestrator.register_activity("extract_data", activity_extract_data)
    orchestrator.register_activity("validate_data", activity_validate_data)
    orchestrator.register_activity("call_agent", activity_call_agent)
    
    print("üîÑ Starting Durable Orchestration...")
    
    # Step 1: Extract data
    print("   üìÑ Extracting data...")
    extracted = await orchestrator.call_activity("extract_data", application)
    
    # Step 2: Validate data
    print("   ‚úÖ Validating data...")
    validated = await orchestrator.call_activity("validate_data", extracted)
    
    # Step 3: Fan-out to multiple agents (parallel)
    print("   üîÄ Fan-out: Calling multiple agents in parallel...")
    agent_inputs = [
        {"task": "credit_analysis", "data": extracted},
        {"task": "income_verification", "data": extracted},
        {"task": "employment_check", "data": extracted}
    ]
    agent_results = await orchestrator.fan_out_fan_in("call_agent", agent_inputs)
    
    # Step 4: Fan-in and synthesize
    print("   üîÄ Fan-in: Synthesizing results...")
    
    return {
        "status": "completed",
        "extracted": extracted,
        "validated": validated,
        "agent_results": agent_results,
        "execution_log": orchestrator.execution_log
    }

# Run the orchestration
test_application = {
    "id": "APP-001",
    "applicant": "Jane Doe",
    "amount": 50000
}

result = await loan_orchestration_workflow(test_application)
print(f"\n‚úÖ Orchestration complete!")
print(f"   Activities executed: {len(result['execution_log'])}")

## 3. Adaptive RAG with Multi-Agent

Dynamically route queries to specialized RAG agents.

In [None]:
class AdaptiveRAGRouter:
    """Routes queries to specialized RAG agents based on intent"""
    
    def __init__(self):
        self.agents = {
            "policy": "You are a policy expert. Answer questions about banking policies and procedures.",
            "product": "You are a product specialist. Answer questions about banking products and rates.",
            "compliance": "You are a compliance officer. Answer questions about regulations and requirements.",
            "general": "You are a helpful banking assistant. Answer general banking questions."
        }
    
    def classify_intent(self, query: str) -> str:
        """Classify query intent to route to appropriate agent"""
        classification_prompt = f"""Classify this banking query into one category:
- policy: Questions about bank policies, procedures, limits
- product: Questions about products, rates, features
- compliance: Questions about regulations, KYC, AML
- general: Other general questions

Query: {query}

Respond with just the category name."""
        
        response = client.chat.completions.create(
            model=MODEL_NAME,
            messages=[{"role": "user", "content": classification_prompt}],
            max_tokens=10
        )
        
        intent = response.choices[0].message.content.strip().lower()
        return intent if intent in self.agents else "general"
    
    def route_and_answer(self, query: str) -> dict:
        """Route query to appropriate agent and get answer"""
        # Step 1: Classify intent
        intent = self.classify_intent(query)
        print(f"   üéØ Intent: {intent}")
        
        # Step 2: Route to specialized agent
        agent_prompt = self.agents[intent]
        
        response = client.chat.completions.create(
            model=MODEL_NAME,
            messages=[
                {"role": "system", "content": agent_prompt},
                {"role": "user", "content": query}
            ]
        )
        
        return {
            "query": query,
            "intent": intent,
            "agent": intent,
            "answer": response.choices[0].message.content
        }

# Test adaptive RAG
router = AdaptiveRAGRouter()

test_queries = [
    "What is the daily wire transfer limit?",
    "What are your current mortgage rates?",
    "What documents do I need for KYC verification?",
    "How do I reset my online banking password?"
]

print("=== Adaptive RAG Routing ===")
for query in test_queries:
    print(f"\n‚ùì Query: {query}")
    result = router.route_and_answer(query)
    print(f"   üí¨ Answer: {result['answer'][:150]}...")

## Summary

**Workflow Orchestration Patterns:**

| Pattern | Azure Service | Use Case |
|---------|--------------|----------|
| State Machine | Durable Functions | Loan processing |
| Fan-out/Fan-in | Durable Functions | Parallel agent calls |
| Event-driven | Logic Apps + Event Grid | Document processing |
| Adaptive RAG | Custom routing | Intent-based agents |

**Key Takeaways:**
- Use state machines for predictable, auditable workflows
- Durable Functions handle retries and checkpointing automatically
- Fan-out/fan-in enables parallel agent execution
- Adaptive RAG routes to specialized agents for better accuracy