---
## üîß Installation and Setup

First, let's install all the required packages for our MCP server.

In [1]:
# Install required packages
!pip install flask flask-cors requests termcolor pyngrok -q

print("‚úÖ All packages installed successfully!")
print("üì¶ Installed: Flask (web server), Flask-CORS (cross-origin support), requests (HTTP client), termcolor (colored output), pyngrok (tunneling)")


‚úÖ All packages installed successfully!
üì¶ Installed: Flask (web server), Flask-CORS (cross-origin support), requests (HTTP client), termcolor (colored output), pyngrok (tunneling)


---
## üóÑÔ∏è Database Setup


In [3]:
!python "/content/database_setup (1).py"

Connected to database: support.db
Tables created successfully!
Triggers created successfully!

DATABASE SCHEMA

CUSTOMERS TABLE:
------------------------------------------------------------
  id              INTEGER     
  name            TEXT       NOT NULL 
  email           TEXT        
  phone           TEXT        
  status          TEXT       NOT NULL DEFAULT 'active'
  created_at      TIMESTAMP   DEFAULT CURRENT_TIMESTAMP
  updated_at      TIMESTAMP   DEFAULT CURRENT_TIMESTAMP

TICKETS TABLE:
------------------------------------------------------------
  id              INTEGER     
  customer_id     INTEGER    NOT NULL 
  issue           TEXT       NOT NULL 
  status          TEXT       NOT NULL DEFAULT 'open'
  priority        TEXT       NOT NULL DEFAULT 'medium'
  created_at      DATETIME    DEFAULT CURRENT_TIMESTAMP

FOREIGN KEYS:
------------------------------------------------------------
  tickets.customer_id -> customers.id

Would you like to insert sample data? (y/n): y

In [4]:
DB_PATH = '/content/support.db'

In [6]:
import sqlite3
import json
from datetime import datetime
from typing import Optional, Dict, List, Any

In [7]:
def get_db_connection():
    """Create a database connection with row factory for dict-like access."""
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row  # This allows us to access columns by name
    return conn

def row_to_dict(row: sqlite3.Row) -> Dict[str, Any]:
    """Convert a SQLite row to a dictionary."""
    return {key: row[key] for key in row.keys()}

# ==================== READ OPERATIONS ====================

def get_customer(customer_id: int) -> Dict[str, Any]:
    """
    Retrieve a specific customer by ID.

    Args:
        customer_id: The unique ID of the customer

    Returns:
        Dict containing customer data or error message
    """
    try:
        conn = get_db_connection()
        cursor = conn.cursor()

        cursor.execute('SELECT * FROM customers WHERE id = ?', (customer_id,))
        row = cursor.fetchone()
        conn.close()

        if row:
            return {
                'success': True,
                'customer': row_to_dict(row)
            }
        else:
            return {
                'success': False,
                'error': f'Customer with ID {customer_id} not found'
            }
    except Exception as e:
        return {
            'success': False,
            'error': f'Database error: {str(e)}'
        }

def list_customers(status: Optional[str] = None, limit: int = 10) -> Dict[str, Any]:
    """
    List all customers, optionally filtered by status.

    Args:
        status: Optional filter - 'active', 'disabled', or None for all

    Returns:
        Dict containing list of customers or error message
    """
    try:
        conn = get_db_connection()
        cursor = conn.cursor()

        if status:
            if status not in ['active', 'disabled']:
                return {
                    'success': False,
                    'error': 'Status must be "active" or "disabled"'
                }
            cursor.execute('SELECT * FROM customers WHERE status = ? ORDER BY name LIMIT ?', (status,limit))
        else:
            cursor.execute('SELECT * FROM customers ORDER BY name LIMIT ?', (limit,))

        rows = cursor.fetchall()
        conn.close()

        customers = [row_to_dict(row) for row in rows]

        return {
            'success': True,
            'count': len(customers),
            'customers': customers
        }
    except Exception as e:
        return {
            'success': False,
            'error': f'Database error: {str(e)}'
        }


# ==================== Consumer Tools ====================

def update_customer(customer_id: int, name: Optional[str] = None,
                   email: Optional[str] = None, phone: Optional[str] = None) -> Dict[str, Any]:
    """
    Update customer information.

    Args:
        customer_id: The unique ID of the customer to update
        name: New name (optional)
        email: New email (optional)
        phone: New phone (optional)

    Returns:
        Dict containing updated customer data or error message
    """
    try:
        # Check if customer exists
        conn = get_db_connection()
        cursor = conn.cursor()

        cursor.execute('SELECT * FROM customers WHERE id = ?', (customer_id,))
        if not cursor.fetchone():
            conn.close()
            return {
                'success': False,
                'error': f'Customer with ID {customer_id} not found'
            }

        # Build update query dynamically based on provided fields
        updates = []
        params = []

        if name is not None:
            updates.append('name = ?')
            params.append(name.strip())
        if email is not None:
            updates.append('email = ?')
            params.append(email)
        if phone is not None:
            updates.append('phone = ?')
            params.append(phone)

        if not updates:
            conn.close()
            return {
                'success': False,
                'error': 'No fields to update'
            }

        # Always update the updated_at timestamp
        updates.append('updated_at = CURRENT_TIMESTAMP')
        params.append(customer_id)

        update_clause = ', '.join(updates)
        query = f'UPDATE customers SET {update_clause} WHERE id = ?'
        cursor.execute(query, params)
        conn.commit()

        # Fetch updated customer
        cursor.execute('SELECT * FROM customers WHERE id = ?', (customer_id,))
        row = cursor.fetchone()
        conn.close()

        return {
            'success': True,
            'message': f'Customer {customer_id} updated successfully',
            'customer': row_to_dict(row)
        }
    except Exception as e:
        return {
            'success': False,
            'error': f'Database error: {str(e)}'
        }
# ==================== Ticket Tools ====================

def create_ticket(customer_id: int, issue: str, priority: str = "medium") -> Dict[str, Any]:
    """Create a support ticket for a customer."""
    try:
        if priority not in ["low", "medium", "high"]:
            return {"success": False, "error": "Priority must be low, medium, or high"}

        conn = get_db_connection()
        cur = conn.cursor()

        # verify customer
        cur.execute("SELECT id FROM customers WHERE id = ?", (customer_id,))
        if not cur.fetchone():
            conn.close()
            return {"success": False, "error": f"Customer {customer_id} not found"}

        cur.execute(
            "INSERT INTO tickets (customer_id, issue, priority) VALUES (?, ?, ?)",
            (customer_id, issue, priority)
        )
        conn.commit()
        ticket_id = cur.lastrowid

        cur.execute("SELECT * FROM tickets WHERE id = ?", (ticket_id,))
        row = cur.fetchone()
        conn.close()

        return {"success": True, "ticket": row_to_dict(row)}
    except Exception as e:
        return {"success": False, "error": str(e)}

def get_customer_history(customer_id: int) -> Dict[str, Any]:
    """Retrieve all tickets for a given customer."""
    try:
        conn = get_db_connection()
        cur = conn.cursor()
        cur.execute("SELECT * FROM tickets WHERE customer_id = ? ORDER BY created_at DESC", (customer_id,))
        rows = cur.fetchall()
        conn.close()
        return {"success": True, "count": len(rows), "tickets": [row_to_dict(r) for r in rows]}
    except Exception as e:
        return {"success": False, "error": str(e)}

# Test the functions
print("‚úÖ Customer management functions defined successfully!")
print("\nüìã Available functions:")
print("   - get_customer(customer_id) - uses customers.id")
print("   - list_customers(status, limit) - uses customers.status")
print("   - update_customer(customer_id, data) - uses customers fields")
print("   - create_ticket(customer_id, issue, priority) - uses tickets fields")
print("   - get_customer_history(customer_id) - uses tickets.customer_id")




if __name__ == "__main__":
    print("‚úÖ MCP Tool Interface Ready")
    print("\nüß™ Testing Tools:")
    print(get_customer(1))
    print(list_customers("active", 3))
    print(update_customer(1, email="new_email@example.com"))
    print(create_ticket(1, "Refund request for overcharge", "high"))
    print(get_customer_history(1))

‚úÖ Customer management functions defined successfully!

üìã Available functions:
   - get_customer(customer_id) - uses customers.id
   - list_customers(status, limit) - uses customers.status
   - update_customer(customer_id, data) - uses customers fields
   - create_ticket(customer_id, issue, priority) - uses tickets fields
   - get_customer_history(customer_id) - uses tickets.customer_id
‚úÖ MCP Tool Interface Ready

üß™ Testing Tools:
{'success': True, 'customer': {'id': 1, 'name': 'John Doe', 'email': 'john.doe@example.com', 'phone': '+1-555-0101', 'status': 'active', 'created_at': '2025-11-18 17:38:21', 'updated_at': '2025-11-18 17:38:21'}}
{'success': True, 'count': 3, 'customers': [{'id': 4, 'name': 'Alice Williams', 'email': 'alice.w@techcorp.com', 'phone': '+1-555-0104', 'status': 'active', 'created_at': '2025-11-18 17:38:21', 'updated_at': '2025-11-18 17:38:21'}, {'id': 5, 'name': 'Charlie Brown', 'email': 'charlie.brown@email.com', 'phone': '+1-555-0105', 'status': 'active

In [8]:
TOOL_MAP = {
    "get_customer": get_customer,
    "list_customers": list_customers,
    "update_customer": update_customer,
    "create_ticket": create_ticket,
    "get_customer_history": get_customer_history,
}

In [9]:
from flask import Flask, request, Response, jsonify
from flask_cors import CORS
import json
import threading
import time
from typing import Dict, Any, Generator

In [10]:

# ---------- Flask MCP server ----------

from threading import Thread
import time

app = Flask(__name__)
CORS(app)

@app.route("/mcp", methods=["POST"])
def mcp_endpoint():
    message = request.get_json()
    tool_name = message.get("params", {}).get("name")
    arguments = message.get("params", {}).get("arguments", {})

    if tool_name not in TOOL_MAP:
        response = {
            "jsonrpc": "2.0",
            "id": message.get("id"),
            "error": {"code": -32601, "message": f"Unknown tool: {tool_name}"}
        }
    else:
        result = TOOL_MAP[tool_name](**arguments)
        response = {
            "jsonrpc": "2.0",
            "id": message.get("id"),
            "result": {
                "content": [
                    {"type": "text", "text": json.dumps(result)}
                ]
            }
        }
    return jsonify(response)

@app.route("/health")
def health():
    return {"status": "healthy", "server": "customer-mcp", "version": "1.0.0"}

def start_mcp_server():
    def run():
        app.run(host="127.0.0.1", port=5000, debug=False, use_reloader=False)
    thread = Thread(target=run, daemon=True)
    thread.start()
    time.sleep(1)
    print("‚úÖ MCP server started on http://127.0.0.1:5000")

start_mcp_server()

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m


‚úÖ MCP server started on http://127.0.0.1:5000


# Define AgentÔºå System Architecture

In [11]:
import requests
import json
import re
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from termcolor import colored

In [12]:
MCP_URL = "http://127.0.0.1:5000/mcp"

# ---------- MCP client helper ----------

def mcp_call_tool(name: str, arguments: dict, message_id: int = 1) -> dict:
    """
    Generic helper to call an MCP tool using the JSON-RPC 2.0 format.
    Constructs the request payload, sends the POST request,
    prints the request/response for debugging, and parses the result.

    Args:
        name: Name of the MCP tool to call
        arguments: Dictionary of tool arguments
        message_id: Unique JSON-RPC message ID

    Returns:
        Parsed JSON result from the MCP server (dict)
    """
    payload = {
        "jsonrpc": "2.0",
        "id": message_id,
        "method": "tools/call",
        "params": {
            "name": name,
            "arguments": arguments
        }
    }

    # Debug print ‚Äî outgoing request
    print(colored(f"\n[MCP] ‚Üí tools/call: {name}", "cyan"))
    print(colored(json.dumps(payload, indent=2), "cyan"))

    # Send JSON-RPC request
    resp = requests.post(MCP_URL, json=payload)
    data = resp.json()

    # Debug print ‚Äî incoming response
    print(colored("[MCP] ‚Üê response", "green"))
    print(colored(json.dumps(data, indent=2), "green"))

    # Extract result text content
    if "result" in data:
        text = data["result"]["content"][0]["text"]
        return json.loads(text)

    # Error fallback
    return {"success": False, "error": data.get("error", {}).get("message", "Unknown error")}


In [17]:

# ---------- CustomerDataAgent ----------

class CustomerDataAgent:
    """
    Specialist agent responsible for interacting with the database via MCP.
    Handles customer retrieval, updates, ticket operations, and combined queries.
    """

    def get_customer(self, customer_id: int) -> Dict[str, Any]:
        """MCP wrapper for get_customer tool."""
        return mcp_call_tool("get_customer", {"customer_id": customer_id}, message_id=101)

    def list_customers(self, status: Optional[str] = None, limit: int = 10) -> Dict[str, Any]:
        """MCP wrapper for list_customers tool."""
        args = {"limit": limit}
        if status is not None:
            args["status"] = status
        return mcp_call_tool("list_customers", args, message_id=102)

    def update_customer(self, customer_id: int, data: Dict[str, Any]) -> Dict[str, Any]:
        """MCP wrapper for update_customer tool."""
        args = {"customer_id": customer_id}
        args.update(data)
        return mcp_call_tool("update_customer", args, message_id=103)

    def create_ticket(self, customer_id: int, issue: str, priority: str = "medium") -> Dict[str, Any]:
        """MCP wrapper for create_ticket tool."""
        return mcp_call_tool(
            "create_ticket",
            {"customer_id": customer_id, "issue": issue, "priority": priority},
            message_id=104
        )

    def get_customer_history(self, customer_id: int) -> Dict[str, Any]:
        """MCP wrapper for get_customer_history tool."""
        return mcp_call_tool(
            "get_customer_history",
            {"customer_id": customer_id},
            message_id=105

        )
    def get_active_customers_with_open_tickets(self) -> Dict[str, Any]:
        """Áî®‰∫éÂ§çÊùÇÊü•ËØ¢ÔºöÊâÄÊúâ active customers + Ëá≥Â∞ë‰∏Ä‰∏™ open ticket„ÄÇ"""
        base = self.list_customers(status="active", limit=50)
        if not base["success"]:
            return base

        results = []
        for cust in base["customers"]:
            hist = self.get_customer_history(cust["id"])
            if not hist["success"]:
                continue
            open_tickets = [t for t in hist["tickets"] if t["status"] == "open"]
            if open_tickets:
                results.append({"customer": cust, "open_tickets": open_tickets})

        return {"success": True, "count": len(results), "items": results}


In [14]:
class SupportAgent:
    """
    Specialist agent focused on customer support logic.
    Does not access the database directly ‚Äî instead, it requests data
    from CustomerDataAgent when needed (A2A coordination).
    """

    def __init__(self, data_agent: CustomerDataAgent):
        self.data_agent = data_agent
        self.name = "SupportAgent"

    def handle_account_help(self, customer_id: int, logs: List[str]) -> str:
        """
        Scenario: Basic account help.
        Asks CustomerDataAgent to fetch customer info.
        """
        logs.append(f"{self.name}: Requesting DataAgent.get_customer({customer_id})")
        result = self.data_agent.get_customer(customer_id)
        if not result["success"]:
            return f"Unable to retrieve account info (ID {customer_id}): {result['error']}"

        cust = result["customer"]
        return f"I found your account: {cust['name']} (ID {cust['id']}), status: {cust['status']}."

    def handle_upgrade_request(self, customer_id: int, logs: List[str]) -> str:
        """
        Scenario: Coordinated request for account upgrade.
        Fetches customer data first, then provides a recommendation.
        """
        logs.append(f"{self.name}: Fetching customer info before upgrade suggestion")
        result = self.data_agent.get_customer(customer_id)
        if not result["success"]:
            return f"Unable to fetch your account info (ID {customer_id}): {result['error']}"
        cust = result["customer"]
        return (
            f"Hi {cust['name']}! Your current status is {cust['status']}.\n"
            "I can assist you in upgrading your plan based on usage and contract options."
        )

    def handle_billing_and_cancel(self, logs: List[str]) -> str:
        """
        Scenario: Multi-intent ‚Äî cancellation + billing issue.
        Demonstrates A2A negotiation.
        """
        logs.append(f"{self.name}: Detected cancellation + billing issue (multi-intent)")
        return (
            "I see that you want to cancel your subscription and also have a billing issue.\n"
            "I'll review your latest billing records and initiate a refund if needed before processing the cancellation."
        )

    def handle_escalation_refund(self, customer_id: int, logs: List[str]) -> str:
        """
        Scenario: Escalation ‚Äî urgent refund / double charge.
        Creates a high-priority ticket through CustomerDataAgent.
        """
        logs.append(f"{self.name}: Urgent refund request detected ‚Äî creating high-priority ticket")
        ticket = self.data_agent.create_ticket(
            customer_id=customer_id,
            issue="Double charge / urgent refund request",
            priority="high"
        )
        if not ticket["success"]:
            return f"Failed to create refund ticket: {ticket['error']}"

        t = ticket["ticket"]
        logs.append(f"{self.name}: High-priority ticket created: #{t['id']}")
        return (
            f"Sorry about the duplicate charge.\n"
            f"I‚Äôve created a high-priority refund ticket (ID {t['id']}). "
            "The billing team will contact you shortly."
        )

    def handle_ticket_report(self, logs: List[str]) -> str:
        """
        Scenario: Comprehensive multi-step query.
        Requires fetching:
        1. All active customers
        2. Their ticket histories
        3. Extracting those with open tickets
        """
        logs.append(f"{self.name}: Requesting DataAgent for active customers + open tickets report")
        result = self.data_agent.get_active_customers_with_open_tickets()
        if not result["success"]:
            return f"Failed to generate report: {result['error']}"

        if result["count"] == 0:
            return "All active customers currently have no open tickets. System health is good. ‚úÖ"

        lines = ["Here are active customers with open tickets:"]
        for item in result["items"]:
            cust = item["customer"]
            lines.append(f"\n- {cust['name']} (ID {cust['id']})")
            for t in item["open_tickets"]:
                lines.append(f"    ‚Ä¢ Ticket #{t['id']}: {t['issue']} [priority={t['priority']}]")
        return "\n".join(lines)

    def handle_update_email_and_history(self, customer_id: int, new_email: str, logs: List[str]) -> str:
        """
        Scenario: Multi-intent ‚Äî update email + view history.
        Requires:
        1. CustomerDataAgent.update_customer
        2. CustomerDataAgent.get_customer_history
        """
        logs.append(f"{self.name}: Updating email, then fetching ticket history")
        upd = self.data_agent.update_customer(customer_id, {"email": new_email})
        if not upd["success"]:
            return f"Email update failed: {upd['error']}"

        hist = self.data_agent.get_customer_history(customer_id)
        if not hist["success"]:
            return f"Email updated to {new_email}, but ticket history retrieval failed."

        lines = [f"Email updated to {new_email}. Here is your ticket history:"]
        for t in hist["tickets"]:
            lines.append(
                f"- Ticket #{t['id']}: {t['issue']} "
                f"[status={t['status']}, priority={t['priority']}]"
            )
        return "\n".join(lines)


In [15]:

# ---------- RouterAgent ----------

@dataclass
class RouterResult:
    """
    Container for router output:
    - original query
    - final answer text
    - A2A logs showing all inter-agent communication
    """
    query: str
    final_answer: str
    logs: List[str] = field(default_factory=list)


class RouterAgent:
    """
    Core orchestrator agent.
    Detects user intent, routes tasks to the appropriate specialist agents,
    coordinates multi-step flows, and synthesizes final answers.
    """

    def __init__(self, data_agent: CustomerDataAgent, support_agent: SupportAgent):
        self.data_agent = data_agent
        self.support_agent = support_agent
        self.name = "RouterAgent"

    def _extract_customer_id(self, text: str) -> Optional[int]:
        """Extracts numeric customer ID patterns like 'id 123' or 'customer 123'."""
        m = re.search(r"(?:id|customer)\s*(\d+)", text.lower())
        if m:
            return int(m.group(1))
        return None

    def _extract_email(self, text: str) -> Optional[str]:
        """Extracts email address patterns from free text."""
        m = re.search(r"[\w\.-]+@[\w\.-]+\.\w+", text)
        if m:
            return m.group(0)
        return None

    def handle_query(self, query: str) -> RouterResult:
        """
        Main routing logic.
        Detects the intent category and dispatches to the correct agent.
        All steps are logged for A2A transparency.
        """
        logs: List[str] = []
        logs.append(f"{self.name}: Received user query ‚Üí \"{query}\"")

        text = query.lower()
        cid = self._extract_customer_id(query)

        # Simple Query
        if "get customer information" in text and cid is not None:
            logs.append(f"{self.name}: Recognized as simple query (direct MCP call)")
            result = self.data_agent.get_customer(cid)
            if not result["success"]:
                answer = f"Unable to retrieve customer {cid}: {result['error']}"
            else:
                c = result["customer"]
                answer = (
                    f"Customer {cid} info:\n"
                    f"- Name: {c['name']}\n"
                    f"- Email: {c['email']}\n"
                    f"- Phone: {c['phone']}\n"
                    f"- Status: {c['status']}"
                )
            return RouterResult(query=query, final_answer=answer, logs=logs)

        # Scenario 1: Task allocation
        if "help with my account" in text and cid is not None:
            logs.append(f"{self.name}: Scenario 1 detected (account help + customer id)")
            answer = self.support_agent.handle_account_help(cid, logs)
            return RouterResult(query=query, final_answer=answer, logs=logs)

        # Scenario 2: Negotiation / multi-intent
        if "cancel" in text and "billing" in text:
            logs.append(f"{self.name}: Scenario 2 detected (cancel + billing)")
            answer = self.support_agent.handle_billing_and_cancel(logs)
            return RouterResult(query=query, final_answer=answer, logs=logs)

        # Scenario 3: Multi-step report
        if "active customers" in text and "open tickets" in text:
            logs.append(f"{self.name}: Scenario 3 detected (complex multi-step report)")
            answer = self.support_agent.handle_ticket_report(logs)
            return RouterResult(query=query, final_answer=answer, logs=logs)

        # Coordinated upgrade case
        if "upgrading my account" in text and cid is not None:
            logs.append(f"{self.name}: Coordinated upgrade detected")
            answer = self.support_agent.handle_upgrade_request(cid, logs)
            return RouterResult(query=query, final_answer=answer, logs=logs)

        # Escalation case
        if "charged twice" in text or "double charged" in text:
            logs.append(f"{self.name}: Urgent escalation detected (double charge)")
            cid = cid or 1  # fallback
            answer = self.support_agent.handle_escalation_refund(cid, logs)
            return RouterResult(query=query, final_answer=answer, logs=logs)

        # Multi-intent: email update + ticket history
        if "update my email" in text and "ticket history" in text:
            logs.append(f"{self.name}: Multi-intent detected (update email + history)")
            cid = cid or 1
            email = self._extract_email(query) or "new@email.com"
            answer = self.support_agent.handle_update_email_and_history(cid, email, logs)
            return RouterResult(query=query, final_answer=answer, logs=logs)

        # Default fallback
        logs.append(f"{self.name}: No matching intent found ‚Äî returning fallback message")
        return RouterResult(
            query=query,
            final_answer=(
                "I can help with account support, billing issues, upgrades, cancellations, "
                "refund requests, and ticket history. Please try rephrasing your request."
            ),
            logs=logs
        )

# Test Scenarios

In [18]:

data_agent = CustomerDataAgent()
support_agent = SupportAgent(data_agent=data_agent)
router = RouterAgent(data_agent=data_agent, support_agent=support_agent)

def run_test(query: str):
    print(colored("\n" + "="*100, "magenta"))
    print(colored(f"USER QUERY: {query}", "magenta", attrs=["bold"]))
    print(colored("="*100, "magenta"))

    result = router.handle_query(query)

    print(colored("\n--- A2A LOGS (Router ‚Üî DataAgent ‚Üî SupportAgent) ---", "cyan"))
    for line in result.logs:
        print("‚Ä¢", line)

    print(colored("\n--- FINAL ANSWER TO USER ---", "green", attrs=["bold"]))
    print(result.final_answer)
    print()

# 1) Simple Query
run_test("Get customer information for ID 5")

# 2) Scenario 1: Task Allocation
run_test("I need help with my account, customer ID 1")

# 3) Coordinated Query: upgrade
run_test("I'm customer 1 and need help upgrading my account")

# 4) Complex Query: active customers with open tickets
run_test("Show me all active customers who have open tickets")

# 5) Escalation
run_test("I've been charged twice, please refund immediately!")

# 6) Multi-Intent
run_test("Update my email to new@email.com and show my ticket history")


INFO:werkzeug:127.0.0.1 - - [18/Nov/2025 17:57:38] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [18/Nov/2025 17:57:38] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [18/Nov/2025 17:57:38] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [18/Nov/2025 17:57:38] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [18/Nov/2025 17:57:38] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [18/Nov/2025 17:57:38] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [18/Nov/2025 17:57:38] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [18/Nov/2025 17:57:38] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [18/Nov/2025 17:57:38] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [18/Nov/2025 17:57:38] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [18/Nov/2025 17:57:38] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [18/Nov/2025 17:57:38] "POST /mcp HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [18/Nov/2025 17:57:38] "POST /mcp HT


USER QUERY: Get customer information for ID 5

[MCP] ‚Üí tools/call: get_customer
{
  "jsonrpc": "2.0",
  "id": 101,
  "method": "tools/call",
  "params": {
    "name": "get_customer",
    "arguments": {
      "customer_id": 5
    }
  }
}
[MCP] ‚Üê response
{
  "id": 101,
  "jsonrpc": "2.0",
  "result": {
    "content": [
      {
        "text": "{\"success\": true, \"customer\": {\"id\": 5, \"name\": \"Charlie Brown\", \"email\": \"charlie.brown@email.com\", \"phone\": \"+1-555-0105\", \"status\": \"active\", \"created_at\": \"2025-11-18 17:38:21\", \"updated_at\": \"2025-11-18 17:38:21\"}}",
        "type": "text"
      }
    ]
  }
}

--- A2A LOGS (Router ‚Üî DataAgent ‚Üî SupportAgent) ---
‚Ä¢ RouterAgent: Received user query ‚Üí "Get customer information for ID 5"
‚Ä¢ RouterAgent: Recognized as simple query (direct MCP call)

--- FINAL ANSWER TO USER ---
Customer 5 info:
- Name: Charlie Brown
- Email: charlie.brown@email.com
- Phone: +1-555-0105
- Status: active


USER QUERY: I ne

In [25]:
!jupyter nbconvert --to html "/content/GenAI_HW5_simple.ipynb" --output project.html



[NbConvertApp] Converting notebook /content/GenAI_HW5_simple.ipynb to html
[NbConvertApp] Writing 421523 bytes to /content/project.html


**Conclusion**
Through this assignment, I gained a deep, hands-on understanding of how multi-agent systems can coordinate through structured message passing and how MCP (Model Context Protocol) enables clean separation between AI reasoning and external tool execution. Implementing a Router Agent, Customer Data Agent, and Support Agent forced me to think about agency specialization, intent routing, and how to design protocols for agent-to-agent communication. I also learned how important it is to build explicit logging for transparency‚Äîonce the system became more complex, multi-step workflows (such as querying a customer, creating a ticket, and generating a final answer) were only understandable because each agent logged when and why control was handed off. Integrating Gemini as the underlying LLM required designing prompts for deterministic behaviors, extracting structured signals, and building a consistent state flow across the agents.

The most challenging part was handling multi-step coordination. Scenarios such as billing escalation or ‚Äúactive customers with open tickets‚Äù required the Router Agent to break the request into subtasks, negotiate information between agents, and merge partial results into a coherent final message. MCP integration also introduced difficulties: JSON-RPC streaming, argument schemas, error propagation, and SQLite bindings required debugging and enforcing strict input/output formats. Additionally, running the MCP server inside Colab created concurrency issues (like port conflicts and lingering background threads). Working through these challenges helped me understand the practical engineering considerations behind AI-driven orchestration‚Äîespecially the importance of tool design, deterministic routing logic, and robust agent collaboration patterns