Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request (launch_testing): Check if a topic is publishing messages, with a timeout #272

Closed
adityapande-1995 opened this issue Sep 15, 2021 · 2 comments · Fixed by #274
Assignees

Comments

@adityapande-1995
Copy link
Contributor

Feature request

Feature description

It would be nice to have a way to check if a topic is publishing messages (with a timeout).
THis suggestion came up in #263

Implementation considerations

One way to do that can be :

from threading import Event, Thread
import unittest

import launch
import launch.actions
import launch_ros.actions
import launch_testing.actions
import pytest
import rclpy
from rclpy.node import Node
from std_msgs.msg import String


@pytest.mark.launch_test
def generate_test_description():
    return launch.LaunchDescription([
        launch_ros.actions.Node(
            package='demo_nodes_cpp',
            executable='talker',
            name='demo_node_1',
        ),

        launch_testing.actions.ReadyToTest()
    ])


class TestFixture(unittest.TestCase):

    def test_check_if_msgs_published(self, proc_output):
        rclpy.init()
        node = MakeTestNode('test_node')
        node.start_subscriber()
        msgs_received_flag = node.msg_event_object.wait(timeout=5.0)
        assert msgs_received_flag, 'Did not receive msgs !'
        rclpy.shutdown()


class MakeTestNode(Node):

    def __init__(self, name='test_node'):
        super().__init__(name)
        self.msg_event_object = Event()

    def start_subscriber(self):
        # Create a subscriber
        self.subscription = self.create_subscription(
            String,
            'chatter',
            self.subscriber_callback,
            10
        )

        # Add a spin thread
        self.ros_spin_thread = Thread(target=lambda node: rclpy.spin(node), args=(self,))
        self.ros_spin_thread.start()

    def subscriber_callback(self, data):
        self.msg_event_object.set()

@adityapande-1995 adityapande-1995 transferred this issue from ros2/launch Sep 30, 2021
@jacobperron
Copy link
Member

IMO, ideally syntax would look like this:

def test_foo(self):
    wait_for_topics = WaitForTopics(['foo', 'bar', 'baz'])
    assert wait_for_topics.wait(timeout=5.0)
    ...

However, we need an instance to a ROS node (and context). Maybe we can use the existing ROSAdapter class, e.g.:

class WaitForTopics:
   def __init__(self, *, topics):
       self.__ros_adapter = ROSAdapter()
       self.__ros_adapter.start()

    def __del__(self):
       self.__ros_adapter.shutdown()

@ivanpauno @wjwwood, thoughts?

@adityapande-1995
Copy link
Contributor Author

Relevant PR : #274

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants