diff --git a/server.py b/server.py index 2ac0029..278f7da 100644 --- a/server.py +++ b/server.py @@ -2011,6 +2011,45 @@ def get_actions() -> dict: dict: Contains list of all active actions, or a message string if no actions are found. """ + # Check if required service is available + required_services = ["/rosapi/action_servers"] + + with ws_manager: + # Get available services to check compatibility + services_message = { + "op": "call_service", + "service": "/rosapi/services", + "type": "rosapi/Services", + "args": {}, + "id": "check_services_for_get_actions", + } + + services_response = ws_manager.request(services_message) + if not services_response or not isinstance(services_response, dict): + return { + "warning": "Cannot check service availability", + "compatibility": { + "issue": "Cannot determine available services", + "required_services": required_services, + "suggestion": "Ensure rosbridge is running and rosapi is available", + }, + } + + available_services = services_response.get("values", {}).get("services", []) + missing_services = [svc for svc in required_services if svc not in available_services] + + if missing_services: + return { + "warning": "Action listing not supported by this rosbridge/rosapi version", + "compatibility": { + "issue": "Required action services are not available", + "missing_services": missing_services, + "required_services": required_services, + "available_services": [s for s in available_services if "action" in s], + "suggestion": "This rosbridge version doesn't support action listing services", + }, + } + # rosbridge service call to get action list message = { "op": "call_service", @@ -2065,6 +2104,47 @@ def get_action_type(action: str) -> dict: if not action or not action.strip(): return {"error": "Action name cannot be empty"} + # Check if required service is available + required_services = ["/rosapi/interfaces"] + + with ws_manager: + # Get available services to check compatibility + services_message = { + "op": "call_service", + "service": "/rosapi/services", + "type": "rosapi/Services", + "args": {}, + "id": "check_services_for_get_action_type", + } + + services_response = ws_manager.request(services_message) + if not services_response or not isinstance(services_response, dict): + return { + "warning": "Cannot check service availability", + "action": action, + "compatibility": { + "issue": "Cannot determine available services", + "required_services": required_services, + "suggestion": "Ensure rosbridge is running and rosapi is available", + }, + } + + available_services = services_response.get("values", {}).get("services", []) + missing_services = [svc for svc in required_services if svc not in available_services] + + if missing_services: + return { + "warning": "Action type resolution not supported by this rosbridge/rosapi version", + "action": action, + "compatibility": { + "issue": "Required services are not available", + "missing_services": missing_services, + "required_services": required_services, + "available_services": [s for s in available_services if "interface" in s], + "suggestion": "This rosbridge version doesn't support interface listing services", + }, + } + # Since there's no direct action_type service, we'll derive it from known patterns # or use a mapping approach for common actions @@ -2111,7 +2191,16 @@ def get_action_type(action: str) -> dict: "suggestion": "This action might not be available or use a different naming pattern", } - return {"error": f"Failed to get type for action {action}"} + return { + "error": f"Failed to get type for action {action}", + "action": action, + "compatibility": { + "issue": "Failed to retrieve interfaces from rosapi", + "required_services": ["/rosapi/interfaces"], + "suggestion": "Ensure rosbridge is running and rosapi is available", + "note": "Action type resolution requires /rosapi/interfaces service", + }, + } @mcp.tool( @@ -2135,6 +2224,57 @@ def get_action_details(action_type: str) -> dict: if not action_type or not action_type.strip(): return {"error": "Action type cannot be empty"} + # Check if required action detail services are available + required_services = [ + "/rosapi/action_goal_details", + "/rosapi/action_result_details", + "/rosapi/action_feedback_details", + ] + + with ws_manager: + # Get available services to check compatibility + services_message = { + "op": "call_service", + "service": "/rosapi/services", + "type": "rosapi/Services", + "args": {}, + "id": "check_services_for_action_details", + } + + services_response = ws_manager.request(services_message) + if not services_response or not isinstance(services_response, dict): + return { + "error": "Failed to check service availability", + "action_type": action_type, + "compatibility": { + "issue": "Cannot determine available services", + "required_services": required_services, + "suggestion": "Ensure rosbridge is running and rosapi is available", + }, + } + + available_services = services_response.get("values", {}).get("services", []) + missing_services = [svc for svc in required_services if svc not in available_services] + + if missing_services: + return { + "error": f"Action details for {action_type} not found", + "action_type": action_type, + "compatibility": { + "issue": "Required action detail services are not available", + "missing_services": missing_services, + "required_services": required_services, + "available_services": [s for s in available_services if "action" in s], + "suggestions": [ + "Use get_actions() to list available actions", + "Use get_action_type() to get action type from action name", + "Action details may not be exposed by this rosbridge/rosapi version", + "Consider subscribing to action topics directly for live message inspection", + ], + "note": "Action detail services (/rosapi/action_*_details) are not part of standard rosapi", + }, + } + result = {"action_type": action_type, "goal": {}, "result": {}, "feedback": {}} # Get goal, result, and feedback details in a single WebSocket context @@ -2413,6 +2553,50 @@ def inspect_all_actions() -> dict: dict: Contains detailed information about all actions, including action names, types, and server information. """ + # Check if required action services are available + required_services = ["/rosapi/action_servers"] + + with ws_manager: + # Get available services to check compatibility + services_message = { + "op": "call_service", + "service": "/rosapi/services", + "type": "rosapi/Services", + "args": {}, + "id": "check_services_for_inspect_actions", + } + + services_response = ws_manager.request(services_message) + if not services_response or not isinstance(services_response, dict): + return { + "error": "Failed to check service availability", + "compatibility": { + "issue": "Cannot determine available services", + "required_services": required_services, + "suggestion": "Ensure rosbridge is running and rosapi is available", + }, + } + + available_services = services_response.get("values", {}).get("services", []) + missing_services = [svc for svc in required_services if svc not in available_services] + + if missing_services: + return { + "error": "Action inspection not supported by this rosbridge/rosapi version", + "compatibility": { + "issue": "Required action services are not available", + "missing_services": missing_services, + "required_services": required_services, + "available_services": [s for s in available_services if "action" in s], + "suggestions": [ + "This rosbridge version doesn't support action inspection services", + "Use get_actions() to list available actions", + "Consider upgrading rosbridge or using a different implementation", + ], + "note": "Action inspection requires /rosapi/action_servers service", + }, + } + # First get all actions actions_message = { "op": "call_service",