In [1]:
import requests
import json
from termcolor import colored

## Please make sure make that run the python mcp_server.py first to initialize the server.

In [2]:
SERVER_HOST = "127.0.0.1"
SERVER_PORT = 5000
SERVER_URL = f"http://{SERVER_HOST}:{SERVER_PORT}"

In [4]:
def send_mcp_message(method: str, params: dict = None, message_id: int = 1):
    """
    Send an MCP message to the server (via SSE) and parse the response.
    """
    message = {
        "jsonrpc": "2.0",
        "id": message_id,
        "method": method
    }
    if params:
        message["params"] = params

    print(colored("\nüì§ Sending MCP Request:", "cyan", attrs=["bold"]))
    print(colored(json.dumps(message, indent=2), "cyan"))

    try:
        response = requests.post(
            f"{SERVER_URL}/mcp",
            json=message,
            stream=True,
            timeout=10
        )

        for line in response.iter_lines():
            if not line:
                continue

            line = line.decode("utf-8")
            if line.startswith("data: "):
                payload = json.loads(line[6:])  # remove 'data:'

                print(colored("\nüì• Received MCP Response:", "green", attrs=["bold"]))
                print(colored(json.dumps(payload, indent=2), "green"))
                return payload

    except Exception as e:
        print(colored(f"\n‚ùå Error: {e}", "red"))
        return None



## TEST 1: MCP INITIALIZATION

In [5]:
print(colored("="*60, "magenta"))
print(colored("TEST 1: MCP INITIALIZATION", "magenta", attrs=["bold"]))
print(colored("="*60, "magenta"))

init_response = send_mcp_message(
    method="initialize",
    params={
        "protocolVersion": "2024-11-05",
        "capabilities": {},
        "clientInfo": {"name": "colab-test-client", "version": "1.0.0"},
    },
    message_id=1
)

if init_response and "result" in init_response:
    print(colored("\n‚úÖ Initialization successful!", "green", attrs=["bold"]))
    print("Protocol Version:", init_response["result"]["protocolVersion"])
    print("Server:", init_response["result"]["serverInfo"]["name"])
else:
    print(colored("\n‚ùå Initialization failed", "red", attrs=["bold"]))

[1m[35mTEST 1: MCP INITIALIZATION[0m
[1m[36m
üì§ Sending MCP Request:[0m
[36m{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "initialize",
  "params": {
    "protocolVersion": "2024-11-05",
    "capabilities": {},
    "clientInfo": {
      "name": "colab-test-client",
      "version": "1.0.0"
    }
  }
}[0m
[1m[32m
üì• Received MCP Response:[0m
[32m{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "protocolVersion": "2024-11-05",
    "capabilities": {
      "tools": {}
    },
    "serverInfo": {
      "name": "customer-management-server",
      "version": "1.0.0"
    }
  }
}[0m
[1m[32m
‚úÖ Initialization successful![0m
Protocol Version: 2024-11-05
Server: customer-management-server


## TEST 2: LIST AVAILABLE TOOLS

In [6]:
print(colored("="*60, "magenta"))
print(colored("TEST 2: LIST AVAILABLE TOOLS", "magenta", attrs=["bold"]))
print(colored("="*60, "magenta"))

tools_response = send_mcp_message(
    method="tools/list",
    message_id=2
)

if tools_response and "result" in tools_response:
    tools = tools_response["result"]["tools"]
    print(colored(f"\n‚úÖ Found {len(tools)} tools:", "green", attrs=["bold"]))
    for i, tool in enumerate(tools, start=1):
        print(colored(f"\n{i}. {tool['name']}", "yellow", attrs=["bold"]))
        print("   " + tool["description"])
else:
    print(colored("\n‚ùå Failed to list tools", "red"))

[1m[35mTEST 2: LIST AVAILABLE TOOLS[0m
[1m[36m
üì§ Sending MCP Request:[0m
[36m{
  "jsonrpc": "2.0",
  "id": 2,
  "method": "tools/list"
}[0m
[1m[32m
üì• Received MCP Response:[0m
[32m{
  "jsonrpc": "2.0",
  "id": 2,
  "result": {
    "tools": [
      {
        "name": "get_customer",
        "description": "Retrieve a specific customer by their ID.",
        "inputSchema": {
          "type": "object",
          "properties": {
            "customer_id": {
              "type": "integer",
              "description": "The unique ID of the customer"
            }
          },
          "required": [
            "customer_id"
          ]
        }
      },
      {
        "name": "list_customers",
        "description": "List all customers. Can filter by status.",
        "inputSchema": {
          "type": "object",
          "properties": {
            "status": {
              "type": "string",
              "enum": [
                "active",
                "disabled"


## TEST 3: GET CUSTOMER

In [None]:
print(colored("="*60, "magenta"))
print(colored("TEST 3: GET CUSTOMER", "magenta", attrs=["bold"]))
print(colored("="*60, "magenta"))

# Call the get_customer tool
customer_response = send_mcp_message(
    method="tools/call",
    params={
        "name": "get_customer",      # Tool name
        "arguments": {
            "customer_id": 1         # Tool arguments
        }
    },
    message_id=3
)

if customer_response and "result" in customer_response:
    # The result is in content[0].text as a JSON string
    content_text = customer_response["result"]["content"][0]["text"]
    content = json.loads(content_text)
    
    if content.get("success"):
        customer = content["customer"]
        print(colored(f"\n[OK] Found customer:", "green", attrs=["bold"]))
        print(f"   ID: {customer['id']}")
        print(f"   Name: {customer['name']}")
        print(f"   Email: {customer['email']}")
        print(f"   Status: {customer['status']}")
    else:
        print(colored(f"\n[FAIL] {content.get('error')}", "red"))
else:
    print(colored("\n[FAIL] No response received", "red"))

[1m[35mTEST 3: GET CUSTOMER[0m
[1m[36m
üì§ Sending MCP Request:[0m
[36m{
  "jsonrpc": "2.0",
  "id": 3,
  "method": "tools/call",
  "params": {
    "name": "get_customer",
    "arguments": {
      "customer_id": 1
    }
  }
}[0m
[1m[32m
üì• Received MCP Response:[0m
[32m{
  "jsonrpc": "2.0",
  "id": 3,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "{\n  \"success\": true,\n  \"customer\": {\n    \"id\": 1,\n    \"name\": \"John Doe\",\n    \"email\": \"john.doe@example.com\",\n    \"phone\": \"+1-555-0101\",\n    \"status\": \"active\",\n    \"created_at\": \"2025-11-30 01:43:29\",\n    \"updated_at\": \"2025-11-30 01:43:29\"\n  }\n}"
      }
    ]
  }
}[0m
[1m[32m
[OK] Found customer:[0m
   ID: 1
   Name: John Doe
   Email: john.doe@example.com
   Status: active
   Tier: N/A


## TEST 4: LIST ALL CUSTOMERS

In [8]:
print(colored("="*60, "magenta"))
print(colored("TEST 4: LIST ALL CUSTOMERS", "magenta", attrs=["bold"]))
print(colored("="*60, "magenta"))

# Call the list_customers tool
list_response = send_mcp_message(
    method="tools/call",
    params={
        "name": "list_customers",
        "arguments": {}              # Empty = list all customers
    },
    message_id=4
)

if list_response and "result" in list_response:
    content_text = list_response["result"]["content"][0]["text"]
    content = json.loads(content_text)
    
    if content.get("success"):
        customers = content["customers"]
        print(colored(f"\n[OK] Found {content['count']} customers:", "green", attrs=["bold"]))
        for c in customers:
            print(f"   - ID {c['id']}: {c['name']} ({c['email']}) - {c['status']}, {c.get('tier', 'N/A')}")
    else:
        print(colored(f"\n[FAIL] {content.get('error')}", "red"))
else:
    print(colored("\n[FAIL] No response received", "red"))

[1m[35mTEST 4: LIST ALL CUSTOMERS[0m
[1m[36m
üì§ Sending MCP Request:[0m
[36m{
  "jsonrpc": "2.0",
  "id": 4,
  "method": "tools/call",
  "params": {
    "name": "list_customers",
    "arguments": {}
  }
}[0m
[1m[32m
üì• Received MCP Response:[0m
[32m{
  "jsonrpc": "2.0",
  "id": 4,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "{\n  \"success\": true,\n  \"count\": 15,\n  \"customers\": [\n    {\n      \"id\": 4,\n      \"name\": \"Alice Williams\",\n      \"email\": \"alice.w@techcorp.com\",\n      \"phone\": \"+1-555-0104\",\n      \"status\": \"active\",\n      \"created_at\": \"2025-11-30 01:43:29\",\n      \"updated_at\": \"2025-11-30 01:43:29\"\n    },\n    {\n      \"id\": 3,\n      \"name\": \"Bob Johnson\",\n      \"email\": \"bob.johnson@example.com\",\n      \"phone\": \"+1-555-0103\",\n      \"status\": \"disabled\",\n      \"created_at\": \"2025-11-30 01:43:29\",\n      \"updated_at\": \"2025-11-30 01:43:29\"\n    },\n    {\n

## TEST 5: CREATE TICKET

In [9]:
print(colored("="*60, "magenta"))
print(colored("TEST 5: CREATE TICKET", "magenta", attrs=["bold"]))
print(colored("="*60, "magenta"))

# Call the create_ticket tool
ticket_response = send_mcp_message(
    method="tools/call",
    params={
        "name": "create_ticket",
        "arguments": {
            "customer_id": 1,                              # Required: customer ID
            "issue": "Cannot login to my account",         # Required: issue description
            "priority": "high"                             # Optional: low, medium, high, urgent
        }
    },
    message_id=5
)

if ticket_response and "result" in ticket_response:
    content_text = ticket_response["result"]["content"][0]["text"]
    content = json.loads(content_text)
    
    if content.get("success"):
        ticket = content["ticket"]
        print(colored(f"\n[OK] Ticket created:", "green", attrs=["bold"]))
        print(f"   Ticket ID: {ticket['id']}")
        print(f"   Customer ID: {ticket['customer_id']}")
        print(f"   Issue: {ticket['issue']}")
        print(f"   Priority: {ticket['priority']}")
        print(f"   Status: {ticket['status']}")
        print(f"   Created: {ticket['created_at']}")
    else:
        print(colored(f"\n[FAIL] {content.get('error')}", "red"))
else:
    print(colored("\n[FAIL] No response received", "red"))

[1m[35mTEST 5: CREATE TICKET[0m
[1m[36m
üì§ Sending MCP Request:[0m
[36m{
  "jsonrpc": "2.0",
  "id": 5,
  "method": "tools/call",
  "params": {
    "name": "create_ticket",
    "arguments": {
      "customer_id": 1,
      "issue": "Cannot login to my account",
      "priority": "high"
    }
  }
}[0m
[1m[32m
üì• Received MCP Response:[0m
[32m{
  "jsonrpc": "2.0",
  "id": 5,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "{\n  \"success\": true,\n  \"message\": \"Ticket #30 created\",\n  \"ticket\": {\n    \"id\": 30,\n    \"customer_id\": 1,\n    \"issue\": \"Cannot login to my account\",\n    \"status\": \"open\",\n    \"priority\": \"high\",\n    \"created_at\": \"2025-11-30 04:27:55\"\n  }\n}"
      }
    ]
  }
}[0m
[1m[32m
[OK] Ticket created:[0m
   Ticket ID: 30
   Customer ID: 1
   Issue: Cannot login to my account
   Priority: high
   Status: open
   Created: 2025-11-30 04:27:55


## TEST 6: UPDATE CUSTOMER

In [10]:
print(colored("="*60, "magenta"))
print(colored("TEST 6: UPDATE CUSTOMER", "magenta", attrs=["bold"]))
print(colored("="*60, "magenta"))

# Call the update_customer tool
update_response = send_mcp_message(
    method="tools/call",
    params={
        "name": "update_customer",
        "arguments": {
            "customer_id": 1,                    # Required: customer ID
            "data": {                            # Required: fields to update
                "name": "John Doe Updated",
                "email": "john.updated@example.com",
                "phone": "555-9999",
                "status": "active"               # "active" or "disabled"
            }
        }
    },
    message_id=6
)

if update_response and "result" in update_response:
    content_text = update_response["result"]["content"][0]["text"]
    content = json.loads(content_text)
    
    if content.get("success"):
        customer = content["customer"]
        print(colored(f"\n[OK] Customer updated:", "green", attrs=["bold"]))
        print(f"   ID: {customer['id']}")
        print(f"   Name: {customer['name']}")
        print(f"   Email: {customer['email']}")
        print(f"   Phone: {customer['phone']}")
        print(f"   Status: {customer['status']}")
        print(f"   Updated At: {customer['updated_at']}")
    else:
        print(colored(f"\n[FAIL] {content.get('error')}", "red"))
else:
    print(colored("\n[FAIL] No response received", "red"))

[1m[35mTEST 6: UPDATE CUSTOMER[0m
[1m[36m
üì§ Sending MCP Request:[0m
[36m{
  "jsonrpc": "2.0",
  "id": 6,
  "method": "tools/call",
  "params": {
    "name": "update_customer",
    "arguments": {
      "customer_id": 1,
      "data": {
        "name": "John Doe Updated",
        "email": "john.updated@example.com",
        "phone": "555-9999",
        "status": "active"
      }
    }
  }
}[0m
[1m[32m
üì• Received MCP Response:[0m
[32m{
  "jsonrpc": "2.0",
  "id": 6,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "{\n  \"success\": true,\n  \"message\": \"Customer 1 updated\",\n  \"customer\": {\n    \"id\": 1,\n    \"name\": \"John Doe Updated\",\n    \"email\": \"john.updated@example.com\",\n    \"phone\": \"555-9999\",\n    \"status\": \"active\",\n    \"created_at\": \"2025-11-30 01:43:29\",\n    \"updated_at\": \"2025-11-30 04:29:18\"\n  }\n}"
      }
    ]
  }
}[0m
[1m[32m
[OK] Customer updated:[0m
   ID: 1
   Name: John Doe Updat

## TEST 7: GET CUSTOMER HISTORY

In [11]:
print(colored("="*60, "magenta"))
print(colored("TEST 7: GET CUSTOMER HISTORY", "magenta", attrs=["bold"]))
print(colored("="*60, "magenta"))

# Call the get_customer_history tool
history_response = send_mcp_message(
    method="tools/call",
    params={
        "name": "get_customer_history",
        "arguments": {
            "customer_id": 1                     # Required: customer ID
        }
    },
    message_id=7
)

if history_response and "result" in history_response:
    content_text = history_response["result"]["content"][0]["text"]
    content = json.loads(content_text)
    
    if content.get("success"):
        customer = content["customer"]
        summary = content["summary"]
        tickets = content["tickets"]
        
        print(colored(f"\n[OK] Customer History:", "green", attrs=["bold"]))
        
        # Customer info
        print(colored("\n   Customer Info:", "yellow", attrs=["bold"]))
        print(f"   ID: {customer['id']}")
        print(f"   Name: {customer['name']}")
        print(f"   Email: {customer['email']}")
        print(f"   Status: {customer['status']}")
        
        # Summary
        print(colored("\n   Ticket Summary:", "yellow", attrs=["bold"]))
        print(f"   Total Tickets: {summary['total_tickets']}")
        print(f"   Open Tickets: {summary['open_tickets']}")
        print(f"   Resolved Tickets: {summary['resolved_tickets']}")
        
        # Ticket list
        if tickets:
            print(colored("\n   Ticket History:", "yellow", attrs=["bold"]))
            for t in tickets:
                print(f"   - #{t['id']}: {t['issue'][:50]}...")
                print(f"     Priority: {t['priority']} | Status: {t['status']} | Created: {t['created_at']}")
        else:
            print(colored("\n   No tickets found for this customer", "white"))
    else:
        print(colored(f"\n[FAIL] {content.get('error')}", "red"))
else:
    print(colored("\n[FAIL] No response received", "red"))

[1m[35mTEST 7: GET CUSTOMER HISTORY[0m
[1m[36m
üì§ Sending MCP Request:[0m
[36m{
  "jsonrpc": "2.0",
  "id": 7,
  "method": "tools/call",
  "params": {
    "name": "get_customer_history",
    "arguments": {
      "customer_id": 1
    }
  }
}[0m
[1m[32m
üì• Received MCP Response:[0m
[32m{
  "jsonrpc": "2.0",
  "id": 7,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "{\n  \"success\": true,\n  \"customer\": {\n    \"id\": 1,\n    \"name\": \"John Doe Updated\",\n    \"email\": \"john.updated@example.com\",\n    \"phone\": \"555-9999\",\n    \"status\": \"active\",\n    \"created_at\": \"2025-11-30 01:43:29\",\n    \"updated_at\": \"2025-11-30 04:29:18\"\n  },\n  \"summary\": {\n    \"total_tickets\": 3,\n    \"open_tickets\": 3,\n    \"resolved_tickets\": 0\n  },\n  \"tickets\": [\n    {\n      \"id\": 30,\n      \"customer_id\": 1,\n      \"issue\": \"Cannot login to my account\",\n      \"status\": \"open\",\n      \"priority\": \"high\",\n