From b09ed8c8e362a9201e9601218a9d5534748c6e4c Mon Sep 17 00:00:00 2001
From: tech4242 <5933291+tech4242@users.noreply.github.com>
Date: Tue, 5 Aug 2025 11:33:03 +0200
Subject: [PATCH] fix: adding servername fallback for stdio
---
frontend/src/components/LogTable/LogRow.vue | 94 +++++++++-
mcphawk/stdio_server_detector_fallback.py | 140 ++++++++++++++
mcphawk/wrapper.py | 14 +-
.../test_stdio_server_detector_fallback.py | 175 ++++++++++++++++++
4 files changed, 418 insertions(+), 5 deletions(-)
create mode 100644 mcphawk/stdio_server_detector_fallback.py
create mode 100644 tests/unit/test_stdio_server_detector_fallback.py
diff --git a/frontend/src/components/LogTable/LogRow.vue b/frontend/src/components/LogTable/LogRow.vue
index 5a9725e..9594965 100644
--- a/frontend/src/components/LogTable/LogRow.vue
+++ b/frontend/src/components/LogTable/LogRow.vue
@@ -60,8 +60,84 @@
+
+
+
+
+ Date:
+
+ {{ formatDate(log.timestamp) }}
+
+
+
+ Time:
+
+ {{ formatTimestamp(log.timestamp) }}
+
+
+
+ Type:
+
+
+
+ Message ID:
+
+ {{ messageId }}
+
+
+
+ Server:
+
+ {{ serverInfo.name }} v{{ serverInfo.version }}
+
+ -
+
+
+ Client:
+
+ {{ clientInfo.name }} v{{ clientInfo.version }}
+
+
+
+ Transport:
+
+ {{ formattedTransportType }}
+
+
+
+ Direction:
+
+ {{ log.src_ip }} {{ directionIcon }} {{ log.dst_ip }}
+
+
+
+ Ports:
+
+ {{ portInfo }}
+
+
+
+ PID:
+
+ {{ log.pid }}
+
+
+
+ Log ID:
+
+ {{ log.log_id }}
+
+
+
+
+
+
-
{{ formattedJson }}
+
JSON-RPC Message:
+
{{ formattedJson }}
@@ -161,4 +237,20 @@ const serverInfo = computed(() => {
return null
})
+const clientInfo = computed(() => {
+ if (!props.log.metadata) return null
+ try {
+ const meta = JSON.parse(props.log.metadata)
+ if (meta.client_name) {
+ return {
+ name: meta.client_name,
+ version: meta.client_version || ''
+ }
+ }
+ } catch {
+ // ignore
+ }
+ return null
+})
+
\ No newline at end of file
diff --git a/mcphawk/stdio_server_detector_fallback.py b/mcphawk/stdio_server_detector_fallback.py
new file mode 100644
index 0000000..d672e82
--- /dev/null
+++ b/mcphawk/stdio_server_detector_fallback.py
@@ -0,0 +1,140 @@
+"""Fallback server detection for stdio transport when initialize message is not available."""
+
+import re
+from pathlib import Path
+from typing import Optional
+
+
+def detect_server_from_command(command: list[str]) -> Optional[dict[str, str]]:
+ """
+ Try to detect MCP server info from the command being executed.
+
+ This is a fallback when we don't have initialize message info.
+
+ Args:
+ command: Command and arguments list
+
+ Returns:
+ Dict with 'name' and 'version' if detected, None otherwise
+ """
+ if not command:
+ return None
+
+ # Get the executable name
+ exe = command[0]
+ exe_name = Path(exe).name
+ exe_path = Path(exe).stem # without extension
+
+ # Check if running Python module with -m
+ if exe_name in ['python', 'python3', 'python3.exe', 'python.exe']:
+ # Look for -m module_name pattern
+ for i, arg in enumerate(command[1:], 1):
+ if arg == '-m' and i < len(command) - 1:
+ module = command[i + 1]
+
+ # Special case for mcphawk
+ if module == 'mcphawk' and i + 2 < len(command) and command[i + 2] == 'mcp':
+ return {'name': 'MCPHawk Query Server', 'version': 'unknown'}
+
+ # Extract name from module
+ name = extract_server_name(module)
+ if name:
+ return {'name': name, 'version': 'unknown'}
+
+ # Check executable name
+ name = extract_server_name(exe_path)
+ if name:
+ return {'name': name, 'version': 'unknown'}
+
+ # Check for .py files in arguments
+ for arg in command[1:]:
+ if arg.endswith('.py'):
+ script_name = Path(arg).stem
+ name = extract_server_name(script_name)
+ if name:
+ return {'name': name, 'version': 'unknown'}
+
+ return None
+
+
+def extract_server_name(text: str) -> Optional[str]:
+ """
+ Extract a human-readable server name from various text patterns.
+
+ Args:
+ text: Text to extract server name from (module name, exe name, etc)
+
+ Returns:
+ Human-readable server name or None
+ """
+ if not text or not isinstance(text, str):
+ return None
+
+ # Pattern 1: mcp-server-{name} or mcp_server_{name}
+ match = re.match(r'^mcp[-_]server[-_](.+)$', text, re.IGNORECASE)
+ if match:
+ name_part = match.group(1)
+ # Convert to title case, handling both - and _
+ words = re.split(r'[-_]', name_part)
+ return f"MCP {' '.join(word.capitalize() for word in words)} Server"
+
+ # Pattern 2: {name}-mcp-server or {name}_mcp_server
+ match = re.match(r'^(.+?)[-_]mcp[-_]server$', text, re.IGNORECASE)
+ if match:
+ name_part = match.group(1)
+ words = re.split(r'[-_]', name_part)
+ return f"{' '.join(word.capitalize() for word in words)} MCP Server"
+
+ # Pattern 3: mcp-{name} or mcp_{name} (but not mcp-server)
+ match = re.match(r'^mcp[-_](.+)$', text, re.IGNORECASE)
+ if match:
+ name_part = match.group(1)
+ # Skip if it's just "server" without additional parts
+ if name_part.lower() == 'server':
+ return None
+ words = re.split(r'[-_]', name_part)
+ return f"MCP {' '.join(word.capitalize() for word in words)}"
+
+ # Pattern 4: {name}-mcp or {name}_mcp
+ match = re.match(r'^(.+?)[-_]mcp$', text, re.IGNORECASE)
+ if match:
+ name_part = match.group(1)
+ words = re.split(r'[-_]', name_part)
+ return f"{' '.join(word.capitalize() for word in words)} MCP"
+
+ # Pattern 5: contains 'mcp' somewhere
+ if 'mcp' in text.lower():
+ # Clean up and format
+ words = re.split(r'[-_]', text)
+ # Filter out empty strings from split
+ words = [w for w in words if w]
+ formatted_words = []
+ for word in words:
+ if word.lower() == 'mcp':
+ formatted_words.append('MCP')
+ else:
+ formatted_words.append(word.capitalize())
+ return ' '.join(formatted_words)
+
+ return None
+
+
+def merge_server_info(
+ detected: Optional[dict[str, str]],
+ from_protocol: Optional[dict[str, str]]
+) -> Optional[dict[str, str]]:
+ """
+ Merge server info from command detection and protocol messages.
+
+ Protocol info takes precedence as it's more accurate.
+
+ Args:
+ detected: Server info detected from command
+ from_protocol: Server info from initialize response
+
+ Returns:
+ Merged server info or None
+ """
+ if from_protocol:
+ return from_protocol
+ return detected
diff --git a/mcphawk/wrapper.py b/mcphawk/wrapper.py
index 6c67db8..2c66918 100644
--- a/mcphawk/wrapper.py
+++ b/mcphawk/wrapper.py
@@ -15,6 +15,10 @@
from typing import Optional
from mcphawk.logger import log_message
+from mcphawk.stdio_server_detector_fallback import (
+ detect_server_from_command,
+ merge_server_info,
+)
from mcphawk.web.broadcaster import broadcast_new_log
logger = logging.getLogger(__name__)
@@ -30,6 +34,7 @@ def __init__(self, command: list[str], debug: bool = False):
self.running = False
self.server_info = None # Track server info from initialize response
self.client_info = None # Track client info from initialize request
+ self.server_info_fallback = detect_server_from_command(command) # Fallback detection
self.stdin_thread: Optional[threading.Thread] = None
self.stdout_thread: Optional[threading.Thread] = None
self.stderr_thread: Optional[threading.Thread] = None
@@ -237,10 +242,11 @@ def _log_jsonrpc_message(self, message: dict, direction: str):
"direction": direction
}
- # Add server info if we have it
- if self.server_info:
- metadata["server_name"] = self.server_info["name"]
- metadata["server_version"] = self.server_info["version"]
+ # Merge server info (protocol takes precedence over fallback)
+ merged_server_info = merge_server_info(self.server_info_fallback, self.server_info)
+ if merged_server_info:
+ metadata["server_name"] = merged_server_info["name"]
+ metadata["server_version"] = merged_server_info["version"]
# Add client info if we have it
if self.client_info:
diff --git a/tests/unit/test_stdio_server_detector_fallback.py b/tests/unit/test_stdio_server_detector_fallback.py
new file mode 100644
index 0000000..80652f3
--- /dev/null
+++ b/tests/unit/test_stdio_server_detector_fallback.py
@@ -0,0 +1,175 @@
+"""Unit tests for stdio server detection fallback."""
+
+
+from mcphawk.stdio_server_detector_fallback import (
+ detect_server_from_command,
+ extract_server_name,
+ merge_server_info,
+)
+
+
+class TestExtractServerName:
+ """Test the extract_server_name function."""
+
+ def test_mcp_server_pattern(self):
+ """Test mcp-server-{name} and mcp_server_{name} patterns."""
+ assert extract_server_name("mcp-server-filesystem") == "MCP Filesystem Server"
+ assert extract_server_name("mcp_server_filesystem") == "MCP Filesystem Server"
+ assert extract_server_name("mcp-server-git") == "MCP Git Server"
+ assert extract_server_name("mcp_server_github") == "MCP Github Server"
+ assert extract_server_name("MCP-SERVER-SQLITE") == "MCP Sqlite Server"
+ assert extract_server_name("mcp-server-brave-search") == "MCP Brave Search Server"
+ assert extract_server_name("mcp_server_multi_word_name") == "MCP Multi Word Name Server"
+
+ def test_name_mcp_server_pattern(self):
+ """Test {name}-mcp-server and {name}_mcp_server patterns."""
+ assert extract_server_name("filesystem-mcp-server") == "Filesystem MCP Server"
+ assert extract_server_name("my_custom_mcp_server") == "My Custom MCP Server"
+ assert extract_server_name("github-actions-mcp-server") == "Github Actions MCP Server"
+
+ def test_mcp_name_pattern(self):
+ """Test mcp-{name} and mcp_{name} patterns."""
+ assert extract_server_name("mcp-filesystem") == "MCP Filesystem"
+ assert extract_server_name("mcp_git") == "MCP Git"
+ assert extract_server_name("mcp-query") == "MCP Query"
+ # Should not match mcp-server (too generic)
+ assert extract_server_name("mcp-server") is None
+
+ def test_name_mcp_pattern(self):
+ """Test {name}-mcp and {name}_mcp patterns."""
+ assert extract_server_name("filesystem-mcp") == "Filesystem MCP"
+ assert extract_server_name("github_mcp") == "Github MCP"
+ assert extract_server_name("my-custom-mcp") == "My Custom MCP"
+
+ def test_contains_mcp(self):
+ """Test names that contain 'mcp' somewhere."""
+ assert extract_server_name("my-mcp-tool") == "My MCP Tool"
+ assert extract_server_name("tool_with_mcp_support") == "Tool With MCP Support"
+ assert extract_server_name("mcptools") == "Mcptools"
+
+ def test_no_mcp(self):
+ """Test names without 'mcp' return None."""
+ assert extract_server_name("regular-server") is None
+ assert extract_server_name("python-app") is None
+ assert extract_server_name("node") is None
+
+ def test_edge_cases(self):
+ """Test edge cases."""
+ assert extract_server_name("") is None
+ assert extract_server_name(None) is None
+ assert extract_server_name("mcp") == "MCP"
+ assert extract_server_name("MCP") == "MCP"
+ assert extract_server_name("_mcp_") == "MCP"
+
+
+class TestDetectServerFromCommand:
+ """Test the detect_server_from_command function."""
+
+ def test_empty_command(self):
+ """Test with empty or None command."""
+ assert detect_server_from_command([]) is None
+ assert detect_server_from_command(None) is None
+
+ def test_direct_executable(self):
+ """Test detection from direct executable names."""
+ assert detect_server_from_command(["mcp-server-filesystem"]) == {
+ "name": "MCP Filesystem Server",
+ "version": "unknown"
+ }
+ assert detect_server_from_command(["/usr/local/bin/mcp-server-git"]) == {
+ "name": "MCP Git Server",
+ "version": "unknown"
+ }
+ assert detect_server_from_command(["./mcp_server_github"]) == {
+ "name": "MCP Github Server",
+ "version": "unknown"
+ }
+
+ def test_python_module(self):
+ """Test detection from python -m module patterns."""
+ assert detect_server_from_command(["python", "-m", "mcp_server_filesystem"]) == {
+ "name": "MCP Filesystem Server",
+ "version": "unknown"
+ }
+ assert detect_server_from_command(["python3", "-m", "mcp-server-git"]) == {
+ "name": "MCP Git Server",
+ "version": "unknown"
+ }
+ assert detect_server_from_command(["python3.exe", "-m", "my_mcp_tool"]) == {
+ "name": "My MCP Tool",
+ "version": "unknown"
+ }
+
+ def test_mcphawk_special_case(self):
+ """Test special case for mcphawk mcp command."""
+ assert detect_server_from_command(["python", "-m", "mcphawk", "mcp"]) == {
+ "name": "MCPHawk Query Server",
+ "version": "unknown"
+ }
+ assert detect_server_from_command(["python3", "-m", "mcphawk", "mcp", "--transport", "stdio"]) == {
+ "name": "MCPHawk Query Server",
+ "version": "unknown"
+ }
+ # Without 'mcp' subcommand, should use regular detection
+ assert detect_server_from_command(["python", "-m", "mcphawk", "wrap"]) == {
+ "name": "Mcphawk",
+ "version": "unknown"
+ }
+
+ def test_python_script(self):
+ """Test detection from .py script names."""
+ assert detect_server_from_command(["python", "mcp-server-custom.py"]) == {
+ "name": "MCP Custom Server",
+ "version": "unknown"
+ }
+ assert detect_server_from_command(["python3", "/path/to/my_mcp_server.py", "--port", "8080"]) == {
+ "name": "My MCP Server",
+ "version": "unknown"
+ }
+ # Non-MCP scripts should return None
+ assert detect_server_from_command(["python", "regular_script.py"]) is None
+
+ def test_complex_commands(self):
+ """Test more complex command patterns."""
+ assert detect_server_from_command([
+ "/usr/bin/python3", "-u", "-m", "mcp_server_filesystem", "--path", "/tmp"
+ ]) == {
+ "name": "MCP Filesystem Server",
+ "version": "unknown"
+ }
+ assert detect_server_from_command([
+ "node", "/app/mcp-server-nodejs.js"
+ ]) is None # .js files not supported
+
+ def test_no_mcp_commands(self):
+ """Test commands without MCP should return None."""
+ assert detect_server_from_command(["ls", "-la"]) is None
+ assert detect_server_from_command(["python", "script.py"]) is None
+ assert detect_server_from_command(["node", "server.js"]) is None
+
+
+class TestMergeServerInfo:
+ """Test the merge_server_info function."""
+
+ def test_protocol_takes_precedence(self):
+ """Test that protocol info takes precedence over detected."""
+ detected = {"name": "Detected Server", "version": "unknown"}
+ from_protocol = {"name": "Protocol Server", "version": "1.0.0"}
+
+ assert merge_server_info(detected, from_protocol) == from_protocol
+
+ def test_fallback_to_detected(self):
+ """Test fallback to detected when no protocol info."""
+ detected = {"name": "Detected Server", "version": "unknown"}
+
+ assert merge_server_info(detected, None) == detected
+
+ def test_both_none(self):
+ """Test when both are None."""
+ assert merge_server_info(None, None) is None
+
+ def test_only_protocol(self):
+ """Test when only protocol info exists."""
+ from_protocol = {"name": "Protocol Server", "version": "1.0.0"}
+
+ assert merge_server_info(None, from_protocol) == from_protocol