# Building a Multi-Agent Customer Support Triage System with Orkes Conductor

This notebook demonstrates the technical implementation of a multi-agent system using Orkes Conductor to handle customer support ticket triage.

## Architecture Overview

Our system consists of three specialized AI agents:
1. **Classifier Agent**: Analyzes ticket content to determine category, sentiment, and urgency
2. **Knowledge Agent**: Searches internal documentation for relevant solutions
3. **Escalation Agent**: Routes unresolved or high-priority tickets to human agents

We'll also implement a custom worker task to demonstrate how external services can integrate with the workflow.

## Step 1: Install Required Dependencies

First, let's install the Orkes Conductor Python SDK and other dependencies we'll need.

In [None]:
!pip install conductor-python openai requests python-dotenv

## Step 2: Import Libraries and Configure Conductor Client

In [None]:
import os
from conductor.client.configuration.configuration import Configuration
from conductor.client.orkes_clients import OrkesClients
from conductor.client.http.models import Task, WorkflowDef, TaskDef
from conductor.client.worker.worker_task import WorkerTask
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.llm_tasks.llm_chat_complete import LlmChatComplete
from conductor.client.workflow.task.simple_task import SimpleTask
from conductor.client.workflow.task.switch_task import SwitchTask
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
import json

# Configure your Orkes Conductor instance
# Get these from https://orkes.io/ after signing up for free
configuration = Configuration(
    server_api_url="https://play.orkes.io/api",
    key_id="YOUR_KEY_ID",
    key_secret="YOUR_KEY_SECRET"
)

clients = OrkesClients(configuration)
workflow_executor = WorkflowExecutor(configuration)

## Step 3: Define the Classifier Agent

The Classifier Agent uses an LLM to analyze incoming tickets and extract key information.

In [None]:
def create_classifier_agent():
    """
    Creates the classifier agent task that analyzes ticket content
    Returns: category, sentiment, urgency
    """
    classifier_prompt = """
    You are a customer support ticket classifier. Analyze the following support ticket and provide:
    1. Category (billing, technical, account, general)
    2. Sentiment (positive, neutral, negative, angry)
    3. Urgency (low, medium, high, critical)
    
    Ticket: ${workflow.input.ticket_content}
    
    Respond in JSON format:
    {
      "category": "<category>",
      "sentiment": "<sentiment>",
      "urgency": "<urgency>",
      "reasoning": "<brief explanation>"
    }
    """
    
    classifier_task = LlmChatComplete(
        task_ref_name="classify_ticket",
        llm_provider="openai",
        model="gpt-4",
        instructions=classifier_prompt,
        messages=[{
            "role": "user",
            "content": "${workflow.input.ticket_content}"
        }]
    )
    
    return classifier_task

## Step 4: Define the Knowledge Agent

The Knowledge Agent searches internal documentation and past resolutions to find potential solutions.

In [None]:
def create_knowledge_agent():
    """
    Creates the knowledge search agent that looks for solutions
    in documentation, FAQs, and past tickets
    """
    knowledge_prompt = """
    You are a knowledge base expert. Based on the ticket classification, 
    search for relevant solutions and provide recommendations.
    
    Ticket Content: ${workflow.input.ticket_content}
    Category: ${classify_ticket.output.result.category}
    Urgency: ${classify_ticket.output.result.urgency}
    
    Provide:
    1. Relevant KB articles or documentation
    2. Suggested resolution steps
    3. Confidence level (0-100) that this resolves the issue
    
    Respond in JSON format:
    {
      "kb_articles": ["article_id_1", "article_id_2"],
      "resolution_steps": ["step1", "step2"],
      "confidence": <0-100>,
      "suggested_response": "<draft response to customer>"
    }
    """
    
    knowledge_task = LlmChatComplete(
        task_ref_name="search_knowledge",
        llm_provider="openai",
        model="gpt-4",
        instructions=knowledge_prompt,
        messages=[{
            "role": "user",
            "content": "Find solution for: ${workflow.input.ticket_content}"
        }]
    )
    
    return knowledge_task

## Step 5: Create Custom Worker Task for Notification

This worker task demonstrates how to implement custom business logic that runs outside of Conductor but is orchestrated by it. This worker will handle sending notifications to Slack or email when tickets are escalated.

In [None]:
from conductor.client.worker.worker_task import WorkerTask

@WorkerTask(task_definition_name='send_escalation_notification', domain='support', poll_interval=0.5)
def send_notification(task_input):
    """
    Custom worker that sends notifications when tickets are escalated.
    This worker polls Conductor for tasks and executes them.
    
    Args:
        task_input: Dictionary containing ticket details and escalation info
        
    Returns:
        Dictionary with notification status
    """
    ticket_id = task_input.get('ticket_id')
    urgency = task_input.get('urgency')
    category = task_input.get('category')
    assigned_to = task_input.get('assigned_to', 'support-team')
    
    print(f"ðŸš¨ Escalating ticket {ticket_id}")
    print(f"   Category: {category}")
    print(f"   Urgency: {urgency}")
    print(f"   Assigned to: {assigned_to}")
    
    # In production, you would send actual notifications here:
    # - Slack webhook
    # - Email via SendGrid/SES
    # - PagerDuty for critical issues
    # - Update ticketing system (Zendesk, Jira, etc.)
    
    notification_channels = []
    
    if urgency in ['critical', 'high']:
        notification_channels.extend(['slack', 'email', 'pagerduty'])
    else:
        notification_channels.append('email')
    
    # Simulate sending notifications
    results = {
        'notification_sent': True,
        'channels': notification_channels,
        'timestamp': '2025-01-15T10:30:00Z',
        'assigned_team': assigned_to
    }
    
    return results

## Step 6: Define the Escalation Logic

Based on the knowledge agent's confidence and the ticket urgency, we'll decide whether to auto-resolve or escalate.

In [None]:
def create_escalation_decision():
    """
    Creates a switch task that routes tickets based on confidence and urgency
    """
    # This is a simple task that evaluates escalation criteria
    escalation_eval = SimpleTask(
        task_def_name="evaluate_escalation",
        task_reference_name="eval_escalation"
    )
    
    # Worker task for sending notifications
    notification_task = SimpleTask(
        task_def_name="send_escalation_notification",
        task_reference_name="notify_team",
        inputs={
            'ticket_id': '${workflow.input.ticket_id}',
            'urgency': '${classify_ticket.output.result.urgency}',
            'category': '${classify_ticket.output.result.category}',
            'assigned_to': '${eval_escalation.output.assigned_team}'
        }
    )
    
    return escalation_eval, notification_task

## Step 7: Build the Complete Workflow

Now we'll assemble all agents into a cohesive workflow that orchestrates the entire triage process.

In [None]:
def create_support_triage_workflow():
    """
    Creates the complete multi-agent support triage workflow
    """
    workflow = ConductorWorkflow(
        name="customer_support_triage",
        version=1,
        executor=workflow_executor
    )
    
    # Add all tasks in sequence
    classifier = create_classifier_agent()
    knowledge = create_knowledge_agent()
    escalation_eval, notification = create_escalation_decision()
    
    # Build the workflow chain
    workflow >> classifier >> knowledge >> escalation_eval
    
    # Add conditional notification based on escalation
    workflow >> notification
    
    return workflow

# Create and register the workflow
support_workflow = create_support_triage_workflow()

## Step 8: Register the Worker

The worker needs to be running to poll for tasks. In production, this would be a separate service.

In [None]:
from conductor.client.automator.task_handler import TaskHandler

# Create task handler and register the worker
task_handler = TaskHandler(
    workers=[send_notification],
    configuration=configuration,
    scan_for_annotated_workers=False
)

# Start polling for tasks (in production, this runs as a daemon)
# task_handler.start_processes()

print("âœ… Worker registered and ready to process tasks")
print("   Task Definition: send_escalation_notification")
print("   Domain: support")
print("   Poll Interval: 0.5s")

## Step 9: Test the Workflow

Let's test our multi-agent system with sample support tickets.

In [None]:
# Sample test tickets
test_tickets = [
    {
        "ticket_id": "TKT-001",
        "ticket_content": "I can't log into my account. I've tried resetting my password but I'm not receiving the email."
    },
    {
        "ticket_id": "TKT-002",
        "ticket_content": "URGENT: Our production system is down and we're losing money every minute. This is critical!"
    },
    {
        "ticket_id": "TKT-003",
        "ticket_content": "Hi, I was charged twice for my subscription this month. Can you help?"
    }
]

# Execute workflows for each ticket
for ticket in test_tickets:
    print(f"\n{'='*60}")
    print(f"Processing: {ticket['ticket_id']}")
    print(f"{'='*60}")
    
    # Execute the workflow
    workflow_id = support_workflow.execute(
        workflow_input=ticket
    )
    
    print(f"\nâœ… Workflow started with ID: {workflow_id}")
    print(f"   View in Conductor UI: https://play.orkes.io/execution/{workflow_id}")
    
    # In production, you would monitor workflow status:
    # workflow_status = workflow_executor.get_workflow(workflow_id)
    # print(f"   Status: {workflow_status.status}")

## Step 10: Monitor and Debug

Orkes Conductor provides powerful monitoring capabilities. Here's how to check workflow execution.

In [None]:
def monitor_workflow(workflow_id):
    """
    Retrieve and display workflow execution details
    """
    workflow_client = clients.get_workflow_client()
    execution = workflow_client.get_workflow(workflow_id, include_tasks=True)
    
    print(f"\nWorkflow Status: {execution.status}")
    print(f"\nTask Execution Details:")
    print("-" * 60)
    
    for task in execution.tasks:
        print(f"\nðŸ“‹ {task.task_type}: {task.reference_task_name}")
        print(f"   Status: {task.status}")
        print(f"   Start Time: {task.start_time}")
        print(f"   End Time: {task.end_time}")
        
        if task.output_data:
            print(f"   Output: {json.dumps(task.output_data, indent=2)[:200]}...")
    
    return execution

# Example usage:
# execution_details = monitor_workflow(workflow_id)

## Summary

This implementation demonstrates:

1. **Multi-Agent Orchestration**: Three specialized AI agents (Classifier, Knowledge, Escalation) working together
2. **Custom Worker Integration**: A worker task that polls Conductor and executes external business logic
3. **Workflow Composition**: Using Conductor's SDK to build complex workflows with conditional logic
4. **Monitoring & Observability**: Built-in capabilities to track workflow execution

### Key Takeaways:

- **Separation of Concerns**: Each agent has a single, well-defined responsibility
- **Scalability**: Workers can be scaled independently based on load
- **Reliability**: Conductor handles retries, error handling, and task persistence
- **Flexibility**: Easy to add new agents or modify the workflow without changing agent code

### Next Steps:

1. Integrate with your actual LLM provider and knowledge base
2. Add error handling and retry policies
3. Implement real notification channels (Slack, Email, PagerDuty)
4. Set up monitoring and alerting
5. Deploy workers as containerized services
