In [1]:
import os
import sys

# Source ROS 2 setup script
ros_setup = "/opt/ros/jazzy/setup.bash"

if os.path.exists(ros_setup):
    print(f"‚úÖ Sourcing ROS 2 from: {ros_setup}")
    os.system(f"source {ros_setup} && echo 'ROS_DISTRO=$ROS_DISTRO'")
else:
    print(f"‚ùå ROS 2 setup script not found: {ros_setup}")
    print("üí° Install ROS 2 Jazzy: sudo apt install ros-jazzy-desktop")
    sys.exit(1)

‚úÖ Sourcing ROS 2 from: /opt/ros/jazzy/setup.bash


sh: 1: source: not found


In [2]:
import os
import sys

# Path to ROS 2 setup script
ros_setup = "/opt/ros/jazzy/setup.bash"

# Check if file exists
if os.path.exists(ros_setup):
    print(f"‚úÖ Found ROS 2 setup script: {ros_setup}")
    
    # Use '.' instead of 'source' for compatibility
    result = os.system(f". {ros_setup} && echo 'ROS 2 sourced'")
    
    if result == 0:
        print("‚úÖ ROS 2 sourced successfully!")
        # Now test ROS 2
        os.system("ros2 --version")
        os.system("ros2 node list")
    else:
        print("‚ùå Failed to source ROS 2.")
else:
    print(f"‚ùå ROS 2 setup script not found: {ros_setup}")
    print("üí° Install ROS 2 Jazzy: sudo apt install ros-jazzy-desktop")
    sys.exit(1)

‚úÖ Found ROS 2 setup script: /opt/ros/jazzy/setup.bash
‚ùå Failed to source ROS 2.


sh: 1: /opt/ros/jazzy/setup.bash: Bad substitution
sh: 6: /opt/ros/jazzy/setup.bash: builtin: not found
sh: 11: .: cannot open /setup.sh: No such file


In [3]:
import subprocess
import sys
import tempfile
import os

# ----------------------------
# STEP 1: Define ROS 2 setup path
# ----------------------------
ROS_SETUP = "/opt/ros/jazzy/setup.bash"

# ----------------------------
# STEP 2: Test if setup file exists
# ----------------------------
if not os.path.exists(ROS_SETUP):
    print("‚ùå CRITICAL: ROS 2 Jazzy setup script not found.")
    print(f"   Expected at: {ROS_SETUP}")
    print("   üí° Install ROS 2 Jazzy: sudo apt install ros-jazzy-desktop")
    sys.exit(1)
else:
    print("‚úÖ ROS 2 Jazzy setup script found.")

# ----------------------------
# STEP 3: Run full diagnostic in a sourced subshell
# ----------------------------
DIAGNOSTIC_SCRIPT = '''
import sys
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from example_interfaces.srv import AddTwoInts
import threading
import time

print("üß™ Running ROS 2 Jazzy Health Check...")

# Initialize ROS 2
try:
    rclpy.init()
    print("‚úÖ rclpy.init() succeeded")
except Exception as e:
    print(f"‚ùå rclpy.init() failed: {e}")
    sys.exit(1)

# ----------------------------
# Test 1: Create a simple node
# ----------------------------
try:
    node = Node("health_check_node")
    print("‚úÖ Created node: health_check_node")
    node.destroy_node()
except Exception as e:
    print(f"‚ùå Failed to create node: {e}")

# ----------------------------
# Test 2: Publisher and Subscriber
# ----------------------------
class PubSubTester(Node):
    def __init__(self):
        super().__init__('pubsub_tester')
        self.publisher = self.create_publisher(String, 'test_topic', 10)
        self.subscription = self.create_subscription(String, 'test_topic', self.listener_callback, 10)
        self.received = False

    def publish_test_message(self):
        msg = String()
        msg.data = "Hello from health check!"
        self.publisher.publish(msg)
        self.get_logger().info('üì§ Published test message')

    def listener_callback(self, msg):
        self.get_logger().info(f'üì• Received: "{msg.data}"')
        self.received = True

# Create tester node
tester = PubSubTester()

# Spin in background
def spin_node(n):
    rclpy.spin(n)

spin_thread = threading.Thread(target=spin_node, args=(tester,), daemon=True)
spin_thread.start()

# Wait for node to be ready
time.sleep(0.5)

# Publish and wait for response
tester.publish_test_message()
time.sleep(1)

if tester.received:
    print("‚úÖ Pub/Sub test passed!")
else:
    print("‚ùå Pub/Sub test failed: no message received")

tester.destroy_node()

# ----------------------------
# Test 3: Service Server and Client
# ----------------------------
class ServiceServer(Node):
    def __init__(self):
        super().__init__('service_server')
        self.srv = self.create_service(AddTwoInts, 'health_check_service', self.service_callback)

    def service_callback(self, request, response):
        response.sum = request.a + request.b
        self.get_logger().info(f'üßÆ Service called: {request.a} + {request.b} = {response.sum}')
        return response

class ServiceClient(Node):
    def __init__(self):
        super().__init__('service_client')
        self.client = self.create_client(AddTwoInts, 'health_check_service')
        while not self.client.wait_for_service(timeout_sec=1.0):
            pass

    def call_service(self, a, b):
        request = AddTwoInts.Request()
        request.a = a
        request.b = b
        future = self.client.call_async(request)
        rclpy.spin_until_future_complete(self, future)
        return future.result()

# Create server and client
server = ServiceServer()
client = ServiceClient()

# Spin server in background
server_thread = threading.Thread(target=spin_node, args=(server,), daemon=True)
server_thread.start()

time.sleep(0.5)

# Call service
try:
    result = client.call_service(3, 4)
    if result.sum == 7:
        print("‚úÖ Service test passed: 3 + 4 = 7")
    else:
        print(f"‚ùå Service test failed: expected 7, got {result.sum}")
except Exception as e:
    print(f"‚ùå Service call failed: {e}")

server.destroy_node()
client.destroy_node()

# ----------------------------
# Test 4: CLI tools
# ----------------------------
print("\nüîß Testing ROS 2 CLI tools...")
import subprocess

def test_cli(cmd):
    try:
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5)
        if result.returncode == 0:
            print(f"‚úÖ {cmd} ‚Üí Success")
            return True
        else:
            print(f"‚ùå {cmd} ‚Üí Failed: {result.stderr.strip()}")
            return False
    except Exception as e:
        print(f"‚ùå {cmd} ‚Üí Error: {e}")
        return False

test_cli

SyntaxError: incomplete input (588059413.py, line 25)

In [4]:
import subprocess
import sys
import tempfile
import os

# ----------------------------
# STEP 1: Define ROS 2 setup path
# ----------------------------
ROS_SETUP = "/opt/ros/jazzy/setup.bash"

# ----------------------------
# STEP 2: Test if setup file exists
# ----------------------------
if not os.path.exists(ROS_SETUP):
    print("‚ùå CRITICAL: ROS 2 Jazzy setup script not found.")
    print(f"   Expected at: {ROS_SETUP}")
    print("   üí° Install ROS 2 Jazzy: sudo apt install ros-jazzy-desktop")
    sys.exit(1)
else:
    print("‚úÖ ROS 2 Jazzy setup script found.")

# ----------------------------
# STEP 3: Run full diagnostic in a sourced subshell
# ----------------------------
DIAGNOSTIC_SCRIPT = '''
import sys
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from example_interfaces.srv import AddTwoInts
import threading
import time

print("üß™ Running ROS 2 Jazzy Health Check...")

# Initialize ROS 2
try:
    rclpy.init()
    print("‚úÖ rclpy.init() succeeded")
except Exception as e:
    print(f"‚ùå rclpy.init() failed: {e}")
    sys.exit(1)

# ----------------------------
# Test 1: Create a simple node
# ----------------------------
try:
    node = Node("health_check_node")
    print("‚úÖ Created node: health_check_node")
    node.destroy_node()
except Exception as e:
    print(f"‚ùå Failed to create node: {e}")

# ----------------------------
# Test 2: Publisher and Subscriber
# ----------------------------
class PubSubTester(Node):
    def __init__(self):
        super().__init__('pubsub_tester')
        self.publisher = self.create_publisher(String, 'test_topic', 10)
        self.subscription = self.create_subscription(String, 'test_topic', self.listener_callback, 10)
        self.received = False

    def publish_test_message(self):
        msg = String()
        msg.data = "Hello from health check!"
        self.publisher.publish(msg)
        self.get_logger().info('üì§ Published test message')

    def listener_callback(self, msg):
        self.get_logger().info(f'üì• Received: "{msg.data}"')
        self.received = True

# Create tester node
tester = PubSubTester()

# Spin in background
def spin_node(n):
    rclpy.spin(n)

spin_thread = threading.Thread(target=spin_node, args=(tester,), daemon=True)
spin_thread.start()

# Wait for node to be ready
time.sleep(0.5)

# Publish and wait for response
tester.publish_test_message()
time.sleep(1)

if tester.received:
    print("‚úÖ Pub/Sub test passed!")
else:
    print("‚ùå Pub/Sub test failed: no message received")

tester.destroy_node()

# ----------------------------
# Test 3: Service Server and Client
# ----------------------------
class ServiceServer(Node):
    def __init__(self):
        super().__init__('service_server')
        self.srv = self.create_service(AddTwoInts, 'health_check_service', self.service_callback)

    def service_callback(self, request, response):
        response.sum = request.a + request.b
        self.get_logger().info(f'üßÆ Service called: {request.a} + {request.b} = {response.sum}')
        return response

class ServiceClient(Node):
    def __init__(self):
        super().__init__('service_client')
        self.client = self.create_client(AddTwoInts, 'health_check_service')
        while not self.client.wait_for_service(timeout_sec=1.0):
            pass

    def call_service(self, a, b):
        request = AddTwoInts.Request()
        request.a = a
        request.b = b
        future = self.client.call_async(request)
        rclpy.spin_until_future_complete(self, future)
        return future.result()

# Create server and client
server = ServiceServer()
client = ServiceClient()

# Spin server in background
server_thread = threading.Thread(target=spin_node, args=(server,), daemon=True)
server_thread.start()

time.sleep(0.5)

# Call service
try:
    result = client.call_service(3, 4)
    if result.sum == 7:
        print("‚úÖ Service test passed: 3 + 4 = 7")
    else:
        print(f"‚ùå Service test failed: expected 7, got {result.sum}")
except Exception as e:
    print(f"‚ùå Service call failed: {e}")

server.destroy_node()
client.destroy_node()

# ----------------------------
# Test 4: CLI tools
# ----------------------------
print("\nüîß Testing ROS 2 CLI tools...")
import subprocess

def test_cli(cmd):
    try:
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5)
        if result.returncode == 0:
            print(f"‚úÖ {cmd} ‚Üí Success")
            return True
        else:
            print(f"‚ùå {cmd} ‚Üí Failed: {result.stderr.strip()}")
            return False
    except Exception as e:
        print(f"‚ùå {cmd} ‚Üí Error: {e}")
        return False

test_cli("ros2 --version")
test_cli("ros2 node list")
test_cli("ros2 topic list")

# ----------------------------
# Cleanup
# ----------------------------
rclpy.shutdown()
print("\nüéâ ROS 2 Jazzy Health Check Complete!")
'''

# ----------------------------
# Write script to temp file and run in sourced shell
# ----------------------------
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
    f.write(DIAGNOSTIC_SCRIPT)
    temp_script = f.name

print("üöÄ Starting ROS 2 Jazzy Health Check...")

# Use '.' instead of 'source' for shell compatibility
cmd = f". {ROS_SETUP} && python3 {temp_script}"

result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

# Print output
if result.returncode == 0:
    print("‚úÖ HEALTH CHECK PASSED:\n")
    print(result.stdout)
else:
    print("‚ùå HEALTH CHECK FAILED:\n")
    print(result.stderr)

# Clean up
os.unlink(temp_script)

‚úÖ ROS 2 Jazzy setup script found.
üöÄ Starting ROS 2 Jazzy Health Check...
‚ùå HEALTH CHECK FAILED:

/bin/sh: 1: /opt/ros/jazzy/setup.bash: Bad substitution
/bin/sh: 6: /opt/ros/jazzy/setup.bash: builtin: not found
/bin/sh: 11: .: cannot open /setup.sh: No such file

