Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 185 additions & 1 deletion server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2011,6 +2011,45 @@ def get_actions() -> dict:
dict: Contains list of all active actions,
or a message string if no actions are found.
"""
# Check if required service is available
required_services = ["/rosapi/action_servers"]

with ws_manager:
# Get available services to check compatibility
services_message = {
"op": "call_service",
"service": "/rosapi/services",
"type": "rosapi/Services",
"args": {},
"id": "check_services_for_get_actions",
}

services_response = ws_manager.request(services_message)
if not services_response or not isinstance(services_response, dict):
return {
"warning": "Cannot check service availability",
"compatibility": {
"issue": "Cannot determine available services",
"required_services": required_services,
"suggestion": "Ensure rosbridge is running and rosapi is available",
},
}

available_services = services_response.get("values", {}).get("services", [])
missing_services = [svc for svc in required_services if svc not in available_services]

if missing_services:
return {
"warning": "Action listing not supported by this rosbridge/rosapi version",
"compatibility": {
"issue": "Required action services are not available",
"missing_services": missing_services,
"required_services": required_services,
"available_services": [s for s in available_services if "action" in s],
"suggestion": "This rosbridge version doesn't support action listing services",
},
}

# rosbridge service call to get action list
message = {
"op": "call_service",
Expand Down Expand Up @@ -2065,6 +2104,47 @@ def get_action_type(action: str) -> dict:
if not action or not action.strip():
return {"error": "Action name cannot be empty"}

# Check if required service is available
required_services = ["/rosapi/interfaces"]

with ws_manager:
# Get available services to check compatibility
services_message = {
"op": "call_service",
"service": "/rosapi/services",
"type": "rosapi/Services",
"args": {},
"id": "check_services_for_get_action_type",
}

services_response = ws_manager.request(services_message)
if not services_response or not isinstance(services_response, dict):
return {
"warning": "Cannot check service availability",
"action": action,
"compatibility": {
"issue": "Cannot determine available services",
"required_services": required_services,
"suggestion": "Ensure rosbridge is running and rosapi is available",
},
}

available_services = services_response.get("values", {}).get("services", [])
missing_services = [svc for svc in required_services if svc not in available_services]

if missing_services:
return {
"warning": "Action type resolution not supported by this rosbridge/rosapi version",
"action": action,
"compatibility": {
"issue": "Required services are not available",
"missing_services": missing_services,
"required_services": required_services,
"available_services": [s for s in available_services if "interface" in s],
"suggestion": "This rosbridge version doesn't support interface listing services",
},
}

# Since there's no direct action_type service, we'll derive it from known patterns
# or use a mapping approach for common actions

Expand Down Expand Up @@ -2111,7 +2191,16 @@ def get_action_type(action: str) -> dict:
"suggestion": "This action might not be available or use a different naming pattern",
}

return {"error": f"Failed to get type for action {action}"}
return {
"error": f"Failed to get type for action {action}",
"action": action,
"compatibility": {
"issue": "Failed to retrieve interfaces from rosapi",
"required_services": ["/rosapi/interfaces"],
"suggestion": "Ensure rosbridge is running and rosapi is available",
"note": "Action type resolution requires /rosapi/interfaces service",
},
}


@mcp.tool(
Expand All @@ -2135,6 +2224,57 @@ def get_action_details(action_type: str) -> dict:
if not action_type or not action_type.strip():
return {"error": "Action type cannot be empty"}

# Check if required action detail services are available
required_services = [
"/rosapi/action_goal_details",
"/rosapi/action_result_details",
"/rosapi/action_feedback_details",
]

with ws_manager:
# Get available services to check compatibility
services_message = {
"op": "call_service",
"service": "/rosapi/services",
"type": "rosapi/Services",
"args": {},
"id": "check_services_for_action_details",
}

services_response = ws_manager.request(services_message)
if not services_response or not isinstance(services_response, dict):
return {
"error": "Failed to check service availability",
"action_type": action_type,
"compatibility": {
"issue": "Cannot determine available services",
"required_services": required_services,
"suggestion": "Ensure rosbridge is running and rosapi is available",
},
}

available_services = services_response.get("values", {}).get("services", [])
missing_services = [svc for svc in required_services if svc not in available_services]

if missing_services:
return {
"error": f"Action details for {action_type} not found",
"action_type": action_type,
"compatibility": {
"issue": "Required action detail services are not available",
"missing_services": missing_services,
"required_services": required_services,
"available_services": [s for s in available_services if "action" in s],
"suggestions": [
"Use get_actions() to list available actions",
"Use get_action_type() to get action type from action name",
"Action details may not be exposed by this rosbridge/rosapi version",
"Consider subscribing to action topics directly for live message inspection",
],
"note": "Action detail services (/rosapi/action_*_details) are not part of standard rosapi",
},
}

result = {"action_type": action_type, "goal": {}, "result": {}, "feedback": {}}

# Get goal, result, and feedback details in a single WebSocket context
Expand Down Expand Up @@ -2413,6 +2553,50 @@ def inspect_all_actions() -> dict:
dict: Contains detailed information about all actions,
including action names, types, and server information.
"""
# Check if required action services are available
required_services = ["/rosapi/action_servers"]

with ws_manager:
# Get available services to check compatibility
services_message = {
"op": "call_service",
"service": "/rosapi/services",
"type": "rosapi/Services",
"args": {},
"id": "check_services_for_inspect_actions",
}

services_response = ws_manager.request(services_message)
if not services_response or not isinstance(services_response, dict):
return {
"error": "Failed to check service availability",
"compatibility": {
"issue": "Cannot determine available services",
"required_services": required_services,
"suggestion": "Ensure rosbridge is running and rosapi is available",
},
}

available_services = services_response.get("values", {}).get("services", [])
missing_services = [svc for svc in required_services if svc not in available_services]

if missing_services:
return {
"error": "Action inspection not supported by this rosbridge/rosapi version",
"compatibility": {
"issue": "Required action services are not available",
"missing_services": missing_services,
"required_services": required_services,
"available_services": [s for s in available_services if "action" in s],
"suggestions": [
"This rosbridge version doesn't support action inspection services",
"Use get_actions() to list available actions",
"Consider upgrading rosbridge or using a different implementation",
],
"note": "Action inspection requires /rosapi/action_servers service",
},
}

# First get all actions
actions_message = {
"op": "call_service",
Expand Down