# Multi-Agent System for Customer Support Routing

A production-ready multi-agent orchestration system that intelligently routes customer inquiries to specialized RAG agents.

## System Architecture

1. **Orchestrator**: Classifies user intent (HR, Tech, Finance)
2. **Specialized RAG Agents**: Domain-specific agents with separate knowledge bases
3. **Langfuse Integration**: Full observability and tracing
4. **Evaluator Agent**: Automated quality scoring (Bonus)

## 1. Setup & Configuration

In [19]:
import os
import json
from pathlib import Path
from typing import Dict, List
from dotenv import load_dotenv

# LangChain 1.0+ imports
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.prompts import PromptTemplate
from langchain_core.documents import Document
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

# Langfuse for observability
from langfuse import Langfuse, observe

# Load environment variables from .env file
load_dotenv()

# Verify .env file exists
env_file = Path(".env")
if not env_file.exists():
    print("âš  WARNING: .env file not found!")
    print("   Please create a .env file with your API keys.")
    print("   See README.md for instructions.")
else:
    print("âœ“ .env file found")

# Initialize Langfuse
langfuse = Langfuse(
    public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
    secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
    host=os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com")
)

print("âœ“ Setup complete")
try:
    print(f"âœ“ Langfuse: {'Connected' if langfuse.auth_check() else 'Not connected'}")
except:
    print("âš  Langfuse connection issue (system will still work)")

âœ“ .env file found
âœ“ Setup complete
âœ“ Langfuse: Connected


## 2. Initialize Models & Configuration

In [20]:
# Configuration
BASE_DIR = Path(".")
DATA_DIR = BASE_DIR / "data"
CHROMA_DIR = BASE_DIR / "chroma_db"

# Validate API key is loaded (must run Cell 2 first!)
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
    raise ValueError(
        "ERROR: OPENAI_API_KEY not found!\n\n"
        "Please:\n"
        "1. Create a .env file in the project root\n"
        "2. Add: OPENAI_API_KEY=your-openrouter-api-key-here\n"
        "3. Make sure you ran Cell 2 (Setup & Configuration) first\n"
        "4. Get your API key from: https://openrouter.ai"
    )

print(f"âœ“ API key loaded (length: {len(api_key)} characters)")

# Initialize LLM (via OpenRouter)
llm = ChatOpenAI(
    model="openai/gpt-4o-mini",
    temperature=0,
    base_url="https://openrouter.ai/api/v1",
    api_key=api_key
)

# Initialize embeddings
embeddings = OpenAIEmbeddings(
    model="text-embedding-3-small",
    openai_api_key=api_key,
    openai_api_base="https://openrouter.ai/api/v1"
)

# Text splitter configuration
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200
)

print("âœ“ Models initialized")

âœ“ API key loaded (length: 73 characters)
âœ“ Models initialized


## 3. Load Documents & Create Vector Stores

In [21]:
@observe()
def load_domain_documents(domain: str) -> List[Document]:
    """Load all documents from a domain directory."""
    domain_path = DATA_DIR / f"{domain}_docs"
    
    if not domain_path.exists():
        print(f"âš  {domain_path} not found")
        return []
    
    loader = DirectoryLoader(
        str(domain_path),
        glob="**/*.txt",
        loader_cls=TextLoader
    )
    
    documents = loader.load()
    print(f"âœ“ Loaded {len(documents)} documents from {domain}")
    return documents

# Load documents for all domains
hr_docs = load_domain_documents("hr")
tech_docs = load_domain_documents("tech")
finance_docs = load_domain_documents("finance")

print(f"\nðŸ“Š Summary: HR={len(hr_docs)}, Tech={len(tech_docs)}, Finance={len(finance_docs)}")

âœ“ Loaded 10 documents from hr
âœ“ Loaded 10 documents from tech
âœ“ Loaded 10 documents from finance

ðŸ“Š Summary: HR=10, Tech=10, Finance=10


In [22]:
@observe()
def create_vector_store(documents: List[Document], domain: str) -> Chroma:
    """Create and persist a vector store for a domain."""
    if not documents:
        raise ValueError(f"No documents for {domain}")
    
    # Split into chunks
    chunks = text_splitter.split_documents(documents)
    print(f"âœ“ Created {len(chunks)} chunks for {domain}")
    
    # Create vector store
    persist_path = str(CHROMA_DIR / domain)
    vector_store = Chroma.from_documents(
        documents=chunks,
        embedding=embeddings,
        persist_directory=persist_path
    )
    
    return vector_store

# Create vector stores
hr_vector_store = create_vector_store(hr_docs, "hr")
tech_vector_store = create_vector_store(tech_docs, "tech")
finance_vector_store = create_vector_store(finance_docs, "finance")

print("\nâœ“ All vector stores created")

âœ“ Created 20 chunks for hr
âœ“ Created 24 chunks for tech
âœ“ Created 29 chunks for finance

âœ“ All vector stores created


## 4. Create Specialized RAG Agents

In [23]:
# Domain-specific prompt templates
PROMPTS = {
    "HR": """You are a helpful HR assistant. Answer questions based on company HR documentation.

Context:
{context}

Question: {question}

Provide a clear, accurate answer based on the documentation.
Answer:""",
    
    "Tech": """You are a helpful IT support assistant. Answer technical questions based on company IT documentation.

Context:
{context}

Question: {question}

Provide a clear technical answer with specific steps when relevant.
Answer:""",
    
    "Finance": """You are a helpful finance assistant. Answer financial questions based on company finance documentation.

Context:
{context}

Question: {question}

Provide a precise financial answer with accurate numbers and procedures.
Answer:"""
}

@observe()
def create_rag_chain(vector_store: Chroma, domain: str):
    """Create a RAG chain using LCEL for a domain."""
    # Create retriever
    retriever = vector_store.as_retriever(search_kwargs={"k": 4})
    
    # Create prompt
    prompt = PromptTemplate.from_template(PROMPTS[domain])
    
    # Format documents function
    def format_docs(docs):
        return "\n\n".join(doc.page_content for doc in docs)
    
    # Build LCEL chain
    chain = (
        {"context": retriever | format_docs, "question": RunnablePassthrough()}
        | prompt
        | llm
        | StrOutputParser()
    )
    
    return chain, retriever

# Create RAG chains for each domain
hr_chain, hr_retriever = create_rag_chain(hr_vector_store, "HR")
tech_chain, tech_retriever = create_rag_chain(tech_vector_store, "Tech")
finance_chain, finance_retriever = create_rag_chain(finance_vector_store, "Finance")

print("âœ“ All RAG agents created")

âœ“ All RAG agents created


## 5. Orchestrator: Intent Classification & Routing

In [24]:
@observe()
def classify_intent(query: str) -> str:
    """Classify user query into HR, Tech, or Finance."""
    prompt = f"""Classify this query into one category: HR, Tech, or Finance.

Categories:
- HR: Benefits, leave, recruitment, payroll, performance reviews
- Tech: IT support, software, hardware, network, troubleshooting
- Finance: Expenses, budgets, invoices, accounting, taxes

Query: {query}

Respond with ONLY one word: HR, Tech, or Finance
Classification:"""
    
    response = llm.invoke(prompt)
    classification = response.content.strip().upper()
    
    # Normalize response
    if "HR" in classification:
        return "HR"
    elif "TECH" in classification or "IT" in classification:
        return "Tech"
    elif "FINANCE" in classification:
        return "Finance"
    else:
        return "HR"  # Default fallback

print("âœ“ Intent classifier ready")

âœ“ Intent classifier ready


In [25]:
@observe()
def route_to_agent(query: str, intent: str) -> Dict:
    """Route query to the appropriate specialized agent."""
    # Agent mapping
    agents = {
        "HR": (hr_chain, hr_retriever),
        "Tech": (tech_chain, tech_retriever),
        "Finance": (finance_chain, finance_retriever)
    }
    
    # Get agent (default to HR if intent not found)
    chain, retriever = agents.get(intent, agents["HR"])
    
    # Get answer and source documents
    answer = chain.invoke(query)
    # In LangChain 1.0+, retrievers use .invoke() instead of .get_relevant_documents()
    sources = retriever.invoke(query)
    
    return {
        "intent": intent,
        "answer": answer,
        "sources": sources
    }

print("âœ“ Routing function ready")

âœ“ Routing function ready


In [26]:
@observe()
def orchestrator(query: str) -> Dict:
    """Main orchestrator: classify intent and route to appropriate agent."""
    # Classify intent
    intent = classify_intent(query)
    
    # Route to agent
    result = route_to_agent(query, intent)
    
    # Format response
    return {
        "query": query,
        "classified_intent": intent,
        "answer": result["answer"],
        "sources": [doc.page_content[:200] + "..." for doc in result["sources"][:2]]
    }

print("âœ“ Orchestrator ready")

âœ“ Orchestrator ready


## 6. Testing

In [27]:
# Load test queries
try:
    with open("test_queries.json", "r") as f:
        test_queries = json.load(f)
    print(f"âœ“ Loaded {len(test_queries)} test queries")
except FileNotFoundError:
    test_queries = [
        {"query": "How many vacation days do I get per year?", "expected_intent": "HR"},
        {"query": "My laptop won't connect to WiFi", "expected_intent": "Tech"},
        {"query": "What is the expense reimbursement process?", "expected_intent": "Finance"}
    ]
    print(f"âœ“ Using {len(test_queries)} default test queries")

âœ“ Loaded 15 test queries


In [28]:
# Test individual queries
def test_query(query_text: str, expected: str = None):
    """Test a single query."""
    print(f"\n{'='*60}")
    print(f"Query: {query_text}")
    if expected:
        print(f"Expected: {expected}")
    
    result = orchestrator(query_text)
    
    print(f"Classified: {result['classified_intent']}")
    if expected:
        match = "âœ“" if result['classified_intent'].upper() == expected.upper() else "âœ—"
        print(f"Match: {match}")
    
    print(f"\nAnswer: {result['answer'][:200]}...")
    return result

# Quick test
test_query("How many vacation days do I get per year?", "HR")
test_query("My laptop won't connect to WiFi", "Tech")
test_query("What is the expense reimbursement process?", "Finance")


Query: How many vacation days do I get per year?
Expected: HR
Classified: HR
Match: âœ“

Answer: You are entitled to 20 days of annual leave per year as a full-time employee....

Query: My laptop won't connect to WiFi
Expected: Tech
Classified: Tech
Match: âœ“

Answer: If your laptop won't connect to WiFi, please follow these steps to troubleshoot the issue:

1. **Check Signal Strength**: Ensure that you are within range of the WiFi network. Look for the WiFi icon o...

Query: What is the expense reimbursement process?
Expected: Finance
Classified: Finance
Match: âœ“

Answer: The expense reimbursement process involves the following steps:

1. **Submission Timeline**: Submit your expense report within 30 days of incurring the expense.

2. **Expense Management Portal**: Use ...


{'query': 'What is the expense reimbursement process?',
 'classified_intent': 'Finance',
 'answer': 'The expense reimbursement process involves the following steps:\n\n1. **Submission Timeline**: Submit your expense report within 30 days of incurring the expense.\n\n2. **Expense Management Portal**: Use the designated expense management portal to submit your expenses.\n\n3. **Receipt Requirements**: \n   - Original receipts, photos, or PDFs are required for all expenses over $25.\n   - Receipts must include the date, amount, vendor, and description of the expense.\n   - If a receipt is missing, an explanation is required.\n   - Credit card statements alone are not sufficient for reimbursement.\n   - For expenses in foreign currency, convert the amount to USD with the date of the transaction.\n\n4. **Approval Process**:\n   - Expenses under $100 require manager approval.\n   - Expenses between $100 and $1,000 require department head approval.\n   - Expenses over $1,000 require VP approv

In [29]:
# Run all test queries
print("\n" + "="*60)
print("RUNNING ALL TEST QUERIES")
print("="*60)

results = []
for i, test_case in enumerate(test_queries, 1):
    query = test_case.get("query", test_case)
    expected = test_case.get("expected_intent") if isinstance(test_case, dict) else None
    
    print(f"\n[{i}/{len(test_queries)}] {query}")
    result = orchestrator(query)
    
    correct = expected is None or result['classified_intent'].upper() == expected.upper()
    results.append({"query": query, "expected": expected, "classified": result['classified_intent'], "correct": correct})

# Summary
print("\n" + "="*60)
print("TEST SUMMARY")
print("="*60)
correct_count = sum(1 for r in results if r['correct'] and r['expected'])
total_with_expected = sum(1 for r in results if r['expected'])
if total_with_expected > 0:
    accuracy = (correct_count / total_with_expected) * 100
    print(f"Correct: {correct_count}/{total_with_expected}")
    print(f"Accuracy: {accuracy:.1f}%")


RUNNING ALL TEST QUERIES

[1/15] How many vacation days do I get per year?

[2/15] My laptop won't connect to WiFi, what should I do?

[3/15] What is the expense reimbursement process?

[4/15] How do I reset my password?

[5/15] What is our 401(k) matching policy?

[6/15] When are invoices processed for payment?

[7/15] Can I work remotely?

[8/15] How do I access the company VPN?

[9/15] What is the budget approval process?

[10/15] How do I request time off?

[11/15] What software tools are available for project management?

[12/15] How do I submit an expense report?

[13/15] What is the performance review process?

[14/15] My email is not working, who should I contact?

[15/15] What are the purchase approval limits?

TEST SUMMARY
Correct: 15/15
Accuracy: 100.0%


## 7. Langfuse Verification

In [30]:
# Verify Langfuse connection and flush traces
try:
    connected = langfuse.auth_check()
    print(f"âœ“ Langfuse: {'Connected' if connected else 'Not connected'}")
except Exception as e:
    print(f"âš  Langfuse error: {e}")

# Flush traces
langfuse.flush()
print("âœ“ Traces flushed")
print(f"\nðŸ“Š View traces at: {os.getenv('LANGFUSE_HOST', 'https://cloud.langfuse.com')}")

âœ“ Langfuse: Connected
âœ“ Traces flushed

ðŸ“Š View traces at: https://cloud.langfuse.com


## 8. Evaluator Agent (Bonus)

In [31]:
@observe()
def evaluate_response(query: str, answer: str, trace_id: str = None) -> Dict:
    """Evaluate response quality on multiple dimensions."""
    import re
    
    prompt = f"""Evaluate this Q&A pair and provide scores (1-10) for each dimension.

Query: {query}
Answer: {answer}

Dimensions:
1. Relevance: Does the answer address the query?
2. Completeness: Is the answer complete?
3. Accuracy: Is the answer factually correct?
4. Clarity: Is the answer clear and well-structured?

Provide scores as JSON: {{"relevance": X, "completeness": X, "accuracy": X, "clarity": X}}
JSON:"""
    
    response = llm.invoke(prompt)
    content = response.content.strip()
    
    # Parse JSON from response
    json_match = re.search(r'\{[^}]+\}', content)
    if json_match:
        try:
            scores = json.loads(json_match.group())
        except:
            scores = {"relevance": 7, "completeness": 7, "accuracy": 7, "clarity": 7}
    else:
        # Fallback: extract numbers
        numbers = re.findall(r'\d+', content)
        scores = {
            "relevance": int(numbers[0]) if len(numbers) > 0 else 7,
            "completeness": int(numbers[1]) if len(numbers) > 1 else 7,
            "accuracy": int(numbers[2]) if len(numbers) > 2 else 7,
            "clarity": int(numbers[3]) if len(numbers) > 3 else 7
        }
    
    overall = sum(scores.values()) / len(scores)
    
    # Record in Langfuse if trace_id provided
    if trace_id:
        try:
            for dim, score in scores.items():
                langfuse.score(name=f"eval_{dim}", value=score, trace_id=trace_id)
            langfuse.score(name="eval_overall", value=overall, trace_id=trace_id)
        except Exception as e:
            pass  # Skip if Langfuse recording fails
    
    return {"scores": scores, "overall_score": round(overall, 2)}

print("âœ“ Evaluator ready")

âœ“ Evaluator ready


In [32]:
@observe()
def orchestrator_with_evaluation(query: str) -> Dict:
    """Orchestrator with automatic quality evaluation."""
    # Get trace ID (optional - evaluation works without it)
    # Note: trace_id can be passed manually if needed for Langfuse scoring
    trace_id = None  # Simplified - evaluation will work without trace_id
    
    # Classify and route
    intent = classify_intent(query)
    result = route_to_agent(query, intent)
    
    # Evaluate
    evaluation = evaluate_response(query, result["answer"], trace_id)
    
    return {
        "query": query,
        "classified_intent": intent,
        "answer": result["answer"],
        "sources": [doc.page_content[:200] + "..." for doc in result["sources"][:2]],
        "evaluation": evaluation
    }

# Test evaluator
result = orchestrator_with_evaluation("How many vacation days do I get per year?")
print(f"Query: {result['query']}")
print(f"Intent: {result['classified_intent']}")
print(f"\nAnswer: {result['answer'][:200]}...")
print(f"\nEvaluation:")
for dim, score in result['evaluation']['scores'].items():
    print(f"  {dim}: {score}/10")
print(f"  Overall: {result['evaluation']['overall_score']}/10")

Query: How many vacation days do I get per year?
Intent: HR

Answer: You are entitled to 20 days of annual leave per year as a full-time employee....

Evaluation:
  relevance: 10/10
  completeness: 10/10
  accuracy: 10/10
  clarity: 10/10
  Overall: 10.0/10
