From e07914a4dc6d0cd33a222086a59299f7720d6d49 Mon Sep 17 00:00:00 2001 From: Stefano Date: Wed, 29 Oct 2025 08:53:07 -0500 Subject: [PATCH 1/3] Check rosapi services for action_details and throw error/warning if not available. --- server.py | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/server.py b/server.py index 2ac0029..2daa7d4 100644 --- a/server.py +++ b/server.py @@ -2135,6 +2135,57 @@ def get_action_details(action_type: str) -> dict: if not action_type or not action_type.strip(): return {"error": "Action type cannot be empty"} + # Check if required action detail services are available + required_services = [ + "/rosapi/action_goal_details", + "/rosapi/action_result_details", + "/rosapi/action_feedback_details" + ] + + with ws_manager: + # Get available services to check compatibility + services_message = { + "op": "call_service", + "service": "/rosapi/services", + "type": "rosapi/Services", + "args": {}, + "id": "check_services_for_action_details" + } + + services_response = ws_manager.request(services_message) + if not services_response or not isinstance(services_response, dict): + return { + "error": "Failed to check service availability", + "action_type": action_type, + "compatibility": { + "issue": "Cannot determine available services", + "required_services": required_services, + "suggestion": "Ensure rosbridge is running and rosapi is available" + } + } + + available_services = services_response.get("values", {}).get("services", []) + missing_services = [svc for svc in required_services if svc not in available_services] + + if missing_services: + return { + "error": f"Action details not supported by this rosbridge/rosapi version", + "action_type": action_type, + "compatibility": { + "issue": "Required action detail services are not available", + "missing_services": missing_services, + "available_services": [s for s in available_services if "action" in s], + "ros_version": "ROS 2 Humble detected", + "suggestions": [ + "This rosbridge version doesn't support action message detail services", + "Use get_actions() to list available actions", + "Use get_action_type() to get action type from action name", + "Consider upgrading rosbridge or using a different implementation" + ], + "note": "Action detail services (/rosapi/action_*_details) are not part of standard rosapi" + } + } + result = {"action_type": action_type, "goal": {}, "result": {}, "feedback": {}} # Get goal, result, and feedback details in a single WebSocket context From 30701618c7972da5de2f05cdd0d03dd5cba89903 Mon Sep 17 00:00:00 2001 From: Stefano Date: Wed, 29 Oct 2025 09:37:27 -0500 Subject: [PATCH 2/3] Improve return messages for action tools when services are not compatible --- server.py | 143 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 138 insertions(+), 5 deletions(-) diff --git a/server.py b/server.py index 2daa7d4..faefb6f 100644 --- a/server.py +++ b/server.py @@ -2011,6 +2011,45 @@ def get_actions() -> dict: dict: Contains list of all active actions, or a message string if no actions are found. """ + # Check if required service is available + required_services = ["/rosapi/action_servers"] + + with ws_manager: + # Get available services to check compatibility + services_message = { + "op": "call_service", + "service": "/rosapi/services", + "type": "rosapi/Services", + "args": {}, + "id": "check_services_for_get_actions" + } + + services_response = ws_manager.request(services_message) + if not services_response or not isinstance(services_response, dict): + return { + "warning": "Cannot check service availability", + "compatibility": { + "issue": "Cannot determine available services", + "required_services": required_services, + "suggestion": "Ensure rosbridge is running and rosapi is available" + } + } + + available_services = services_response.get("values", {}).get("services", []) + missing_services = [svc for svc in required_services if svc not in available_services] + + if missing_services: + return { + "warning": "Action listing not supported by this rosbridge/rosapi version", + "compatibility": { + "issue": "Required action services are not available", + "missing_services": missing_services, + "required_services": required_services, + "available_services": [s for s in available_services if "action" in s], + "suggestion": "This rosbridge version doesn't support action listing services" + } + } + # rosbridge service call to get action list message = { "op": "call_service", @@ -2065,6 +2104,47 @@ def get_action_type(action: str) -> dict: if not action or not action.strip(): return {"error": "Action name cannot be empty"} + # Check if required service is available + required_services = ["/rosapi/interfaces"] + + with ws_manager: + # Get available services to check compatibility + services_message = { + "op": "call_service", + "service": "/rosapi/services", + "type": "rosapi/Services", + "args": {}, + "id": "check_services_for_get_action_type" + } + + services_response = ws_manager.request(services_message) + if not services_response or not isinstance(services_response, dict): + return { + "warning": "Cannot check service availability", + "action": action, + "compatibility": { + "issue": "Cannot determine available services", + "required_services": required_services, + "suggestion": "Ensure rosbridge is running and rosapi is available" + } + } + + available_services = services_response.get("values", {}).get("services", []) + missing_services = [svc for svc in required_services if svc not in available_services] + + if missing_services: + return { + "warning": "Action type resolution not supported by this rosbridge/rosapi version", + "action": action, + "compatibility": { + "issue": "Required services are not available", + "missing_services": missing_services, + "required_services": required_services, + "available_services": [s for s in available_services if "interface" in s], + "suggestion": "This rosbridge version doesn't support interface listing services" + } + } + # Since there's no direct action_type service, we'll derive it from known patterns # or use a mapping approach for common actions @@ -2111,7 +2191,16 @@ def get_action_type(action: str) -> dict: "suggestion": "This action might not be available or use a different naming pattern", } - return {"error": f"Failed to get type for action {action}"} + return { + "error": f"Failed to get type for action {action}", + "action": action, + "compatibility": { + "issue": "Failed to retrieve interfaces from rosapi", + "required_services": ["/rosapi/interfaces"], + "suggestion": "Ensure rosbridge is running and rosapi is available", + "note": "Action type resolution requires /rosapi/interfaces service" + } + } @mcp.tool( @@ -2169,18 +2258,18 @@ def get_action_details(action_type: str) -> dict: if missing_services: return { - "error": f"Action details not supported by this rosbridge/rosapi version", + "error": f"Action details for {action_type} not found", "action_type": action_type, "compatibility": { "issue": "Required action detail services are not available", "missing_services": missing_services, + "required_services": required_services, "available_services": [s for s in available_services if "action" in s], - "ros_version": "ROS 2 Humble detected", "suggestions": [ - "This rosbridge version doesn't support action message detail services", "Use get_actions() to list available actions", "Use get_action_type() to get action type from action name", - "Consider upgrading rosbridge or using a different implementation" + "Action details may not be exposed by this rosbridge/rosapi version", + "Consider subscribing to action topics directly for live message inspection" ], "note": "Action detail services (/rosapi/action_*_details) are not part of standard rosapi" } @@ -2464,6 +2553,50 @@ def inspect_all_actions() -> dict: dict: Contains detailed information about all actions, including action names, types, and server information. """ + # Check if required action services are available + required_services = ["/rosapi/action_servers"] + + with ws_manager: + # Get available services to check compatibility + services_message = { + "op": "call_service", + "service": "/rosapi/services", + "type": "rosapi/Services", + "args": {}, + "id": "check_services_for_inspect_actions" + } + + services_response = ws_manager.request(services_message) + if not services_response or not isinstance(services_response, dict): + return { + "error": "Failed to check service availability", + "compatibility": { + "issue": "Cannot determine available services", + "required_services": required_services, + "suggestion": "Ensure rosbridge is running and rosapi is available" + } + } + + available_services = services_response.get("values", {}).get("services", []) + missing_services = [svc for svc in required_services if svc not in available_services] + + if missing_services: + return { + "error": "Action inspection not supported by this rosbridge/rosapi version", + "compatibility": { + "issue": "Required action services are not available", + "missing_services": missing_services, + "required_services": required_services, + "available_services": [s for s in available_services if "action" in s], + "suggestions": [ + "This rosbridge version doesn't support action inspection services", + "Use get_actions() to list available actions", + "Consider upgrading rosbridge or using a different implementation" + ], + "note": "Action inspection requires /rosapi/action_servers service" + } + } + # First get all actions actions_message = { "op": "call_service", From 3e3fa4093baa68c851481c484d978e1882334348 Mon Sep 17 00:00:00 2001 From: Stefano Date: Thu, 30 Oct 2025 09:22:59 -0500 Subject: [PATCH 3/3] ruff format --- server.py | 84 +++++++++++++++++++++++++++---------------------------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/server.py b/server.py index faefb6f..278f7da 100644 --- a/server.py +++ b/server.py @@ -2013,7 +2013,7 @@ def get_actions() -> dict: """ # Check if required service is available required_services = ["/rosapi/action_servers"] - + with ws_manager: # Get available services to check compatibility services_message = { @@ -2021,9 +2021,9 @@ def get_actions() -> dict: "service": "/rosapi/services", "type": "rosapi/Services", "args": {}, - "id": "check_services_for_get_actions" + "id": "check_services_for_get_actions", } - + services_response = ws_manager.request(services_message) if not services_response or not isinstance(services_response, dict): return { @@ -2031,13 +2031,13 @@ def get_actions() -> dict: "compatibility": { "issue": "Cannot determine available services", "required_services": required_services, - "suggestion": "Ensure rosbridge is running and rosapi is available" - } + "suggestion": "Ensure rosbridge is running and rosapi is available", + }, } - + available_services = services_response.get("values", {}).get("services", []) missing_services = [svc for svc in required_services if svc not in available_services] - + if missing_services: return { "warning": "Action listing not supported by this rosbridge/rosapi version", @@ -2046,8 +2046,8 @@ def get_actions() -> dict: "missing_services": missing_services, "required_services": required_services, "available_services": [s for s in available_services if "action" in s], - "suggestion": "This rosbridge version doesn't support action listing services" - } + "suggestion": "This rosbridge version doesn't support action listing services", + }, } # rosbridge service call to get action list @@ -2106,7 +2106,7 @@ def get_action_type(action: str) -> dict: # Check if required service is available required_services = ["/rosapi/interfaces"] - + with ws_manager: # Get available services to check compatibility services_message = { @@ -2114,9 +2114,9 @@ def get_action_type(action: str) -> dict: "service": "/rosapi/services", "type": "rosapi/Services", "args": {}, - "id": "check_services_for_get_action_type" + "id": "check_services_for_get_action_type", } - + services_response = ws_manager.request(services_message) if not services_response or not isinstance(services_response, dict): return { @@ -2125,13 +2125,13 @@ def get_action_type(action: str) -> dict: "compatibility": { "issue": "Cannot determine available services", "required_services": required_services, - "suggestion": "Ensure rosbridge is running and rosapi is available" - } + "suggestion": "Ensure rosbridge is running and rosapi is available", + }, } - + available_services = services_response.get("values", {}).get("services", []) missing_services = [svc for svc in required_services if svc not in available_services] - + if missing_services: return { "warning": "Action type resolution not supported by this rosbridge/rosapi version", @@ -2141,8 +2141,8 @@ def get_action_type(action: str) -> dict: "missing_services": missing_services, "required_services": required_services, "available_services": [s for s in available_services if "interface" in s], - "suggestion": "This rosbridge version doesn't support interface listing services" - } + "suggestion": "This rosbridge version doesn't support interface listing services", + }, } # Since there's no direct action_type service, we'll derive it from known patterns @@ -2198,8 +2198,8 @@ def get_action_type(action: str) -> dict: "issue": "Failed to retrieve interfaces from rosapi", "required_services": ["/rosapi/interfaces"], "suggestion": "Ensure rosbridge is running and rosapi is available", - "note": "Action type resolution requires /rosapi/interfaces service" - } + "note": "Action type resolution requires /rosapi/interfaces service", + }, } @@ -2227,10 +2227,10 @@ def get_action_details(action_type: str) -> dict: # Check if required action detail services are available required_services = [ "/rosapi/action_goal_details", - "/rosapi/action_result_details", - "/rosapi/action_feedback_details" + "/rosapi/action_result_details", + "/rosapi/action_feedback_details", ] - + with ws_manager: # Get available services to check compatibility services_message = { @@ -2238,9 +2238,9 @@ def get_action_details(action_type: str) -> dict: "service": "/rosapi/services", "type": "rosapi/Services", "args": {}, - "id": "check_services_for_action_details" + "id": "check_services_for_action_details", } - + services_response = ws_manager.request(services_message) if not services_response or not isinstance(services_response, dict): return { @@ -2249,13 +2249,13 @@ def get_action_details(action_type: str) -> dict: "compatibility": { "issue": "Cannot determine available services", "required_services": required_services, - "suggestion": "Ensure rosbridge is running and rosapi is available" - } + "suggestion": "Ensure rosbridge is running and rosapi is available", + }, } - + available_services = services_response.get("values", {}).get("services", []) missing_services = [svc for svc in required_services if svc not in available_services] - + if missing_services: return { "error": f"Action details for {action_type} not found", @@ -2269,10 +2269,10 @@ def get_action_details(action_type: str) -> dict: "Use get_actions() to list available actions", "Use get_action_type() to get action type from action name", "Action details may not be exposed by this rosbridge/rosapi version", - "Consider subscribing to action topics directly for live message inspection" + "Consider subscribing to action topics directly for live message inspection", ], - "note": "Action detail services (/rosapi/action_*_details) are not part of standard rosapi" - } + "note": "Action detail services (/rosapi/action_*_details) are not part of standard rosapi", + }, } result = {"action_type": action_type, "goal": {}, "result": {}, "feedback": {}} @@ -2555,7 +2555,7 @@ def inspect_all_actions() -> dict: """ # Check if required action services are available required_services = ["/rosapi/action_servers"] - + with ws_manager: # Get available services to check compatibility services_message = { @@ -2563,9 +2563,9 @@ def inspect_all_actions() -> dict: "service": "/rosapi/services", "type": "rosapi/Services", "args": {}, - "id": "check_services_for_inspect_actions" + "id": "check_services_for_inspect_actions", } - + services_response = ws_manager.request(services_message) if not services_response or not isinstance(services_response, dict): return { @@ -2573,13 +2573,13 @@ def inspect_all_actions() -> dict: "compatibility": { "issue": "Cannot determine available services", "required_services": required_services, - "suggestion": "Ensure rosbridge is running and rosapi is available" - } + "suggestion": "Ensure rosbridge is running and rosapi is available", + }, } - + available_services = services_response.get("values", {}).get("services", []) missing_services = [svc for svc in required_services if svc not in available_services] - + if missing_services: return { "error": "Action inspection not supported by this rosbridge/rosapi version", @@ -2591,10 +2591,10 @@ def inspect_all_actions() -> dict: "suggestions": [ "This rosbridge version doesn't support action inspection services", "Use get_actions() to list available actions", - "Consider upgrading rosbridge or using a different implementation" + "Consider upgrading rosbridge or using a different implementation", ], - "note": "Action inspection requires /rosapi/action_servers service" - } + "note": "Action inspection requires /rosapi/action_servers service", + }, } # First get all actions