Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved examples #324

Merged
merged 8 commits into from
Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions launch_testing/launch_testing_examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Launch testing examples

This package contains simple use cases for the ``launch`` and ``launch_testing`` packages.
These are designed to help beginners get started with these packages and help them understand the concepts.

## Examples

### `check_node_launch_test.py`

Usage:

```sh
launch_test launch_testing_examples/check_node_launch_test.py
```

There might be situations where nodes, once launched, take some time to actually start and we need to wait for the node to start to perform some action.
We can simulate this using ``launch.actions.TimerAction``. This example shows one way to detect when a node has been launched.
We delay the launch by 5 seconds, and wait for the node to start with a timeout of 8 seconds.

### `check_msgs_launch_test.py`

Usage:

```sh
launch_test launch_testing_examples/check_msgs_launch_test.py
```

Consider a problem statement where you need to launch a node and check if messages are published on a particular topic.
This example demonstrates how to do that, using a talker node.
It uses the ``Event`` object to end the test as soon as the first message is received on the chatter topic, with a timeout of 5 seconds.

### `set_param_launch_test.py`

Usage:

```sh
launch_test launch_testing_examples/set_param_launch_test.py
```

This example demonstrates how to launch a node, set a parameter in it and check if that was successful.

### `hello_world_launch_test.py`

Usage:

```sh
launch_test launch_testing_examples/hello_world_launch_test.py
```

This test is a simple example on how to use the ``launch_testing``.

It launches a process and asserts that it prints "hello_world" to ``stdout`` using ``proc_output.assertWaitFor()``.
Finally, it checks if the process exits normally (zero exit code).

The ``@launch_testing.markers.keep_alive`` decorator ensures that the launch process stays alive long enough for the tests to run.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys

from threading import Event
from threading import Thread
import unittest

import launch
import launch.actions
import launch_ros.actions
import launch_testing.actions
import launch_testing.markers
import pytest
import rclpy
from rclpy.node import Node
from std_msgs.msg import String


@pytest.mark.launch_test
@launch_testing.markers.keep_alive
def generate_test_description():
path_to_test = os.path.dirname(__file__)

return launch.LaunchDescription([
launch_ros.actions.Node(
executable=sys.executable,
arguments=[os.path.join(path_to_test, 'talker.py')],
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
additional_env={'PYTHONUNBUFFERED': '1'},
name='demo_node_1',
),

launch_testing.actions.ReadyToTest()
])


class TestFixture(unittest.TestCase):

def test_check_if_msgs_published(self, proc_output):
rclpy.init()
node = MakeTestNode('test_node')
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
node.start_subscriber()
msgs_received_flag = node.msg_event_object.wait(timeout=5.0)
assert msgs_received_flag, 'Did not receive msgs !'
rclpy.shutdown()


class MakeTestNode(Node):
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, name='test_node'):
super().__init__(name)
self.msg_event_object = Event()

def start_subscriber(self):
# Create a subscriber
self.subscription = self.create_subscription(
String,
'chatter',
self.subscriber_callback,
10
)

# Add a spin thread
self.ros_spin_thread = Thread(target=lambda node: rclpy.spin(node), args=(self,))
self.ros_spin_thread.start()

def subscriber_callback(self, data):
self.msg_event_object.set()
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import time
import unittest

import launch
import launch.actions
import launch_ros.actions
import launch_testing.actions
import launch_testing.markers
import pytest
import rclpy
from rclpy.node import Node


@pytest.mark.launch_test
@launch_testing.markers.keep_alive
def generate_test_description():
path_to_test = os.path.dirname(__file__)

return launch.LaunchDescription([
launch.actions.TimerAction(
period=5.0,
actions=[
launch_ros.actions.Node(
executable=sys.executable,
arguments=[os.path.join(path_to_test, 'talker.py')],
additional_env={'PYTHONUNBUFFERED': '1'},
name='demo_node_1',
)
]),
launch_testing.actions.ReadyToTest()
])


class TestFixture(unittest.TestCase):

def test_node_start(self, proc_output):
rclpy.init()
node = MakeTestNode('test_node')
assert node.wait_for_node('demo_node_1', 8.0), 'Node not found !'
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
rclpy.shutdown()


class MakeTestNode(Node):

def __init__(self, name='test_node'):
super().__init__(name)

def wait_for_node(self, node_name, timeout=8.0):
start = time.time()
flag = False
print('Waiting for node...')
while time.time() - start < timeout and not flag:
flag = node_name in self.get_node_names()
time.sleep(0.1)

return flag
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest

import launch
import launch.actions
import launch_testing.actions
import launch_testing.markers
import pytest


# This function specifies the processes to be run for our test
@pytest.mark.launch_test
@launch_testing.markers.keep_alive
def generate_test_description():
"""Launch a simple process to print 'hello_world'."""
return launch.LaunchDescription([
# Launch a process to test
launch.actions.ExecuteProcess(
cmd=['echo', 'hello_world'],
shell=True
),
# Tell launch to start the test
launch_testing.actions.ReadyToTest()
])


# This is our test fixture. Each method is a test case.
# These run alongside the processes specified in generate_test_description()
class TestHelloWorldProcess(unittest.TestCase):

def test_read_stdout(self, proc_output):
"""Check if 'hello_world' was found in the stdout."""
# 'proc_output' is an object added automatically by the launch_testing framework.
# It captures the outputs of the processes launched in generate_test_description()
# Refer to the documentation for further details.
proc_output.assertWaitFor('hello_world', timeout=10, stream='stdout')


# These tests are run after the processes in generate_test_description() have shutdown.
@launch_testing.post_shutdown_test()
class TestHelloWorldShutdown(unittest.TestCase):

def test_exit_codes(self, proc_info):
"""Check if the processes exited normally."""
launch_testing.asserts.assertExitCodes(proc_info)
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import rclpy
from rclpy.node import Node


class TestNode(Node):

def __init__(self):
super().__init__('parameter_blackboard')
self.declare_parameter('demo_parameter_1', False)


def main(args=None):
rclpy.init(args=args)

node = TestNode()

try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import unittest

import launch
import launch.actions
import launch_ros.actions
import launch_testing.actions
import launch_testing.markers
import pytest
from rcl_interfaces.srv import SetParameters
import rclpy
from rclpy.node import Node


@pytest.mark.launch_test
@launch_testing.markers.keep_alive
def generate_test_description():
path_to_test = os.path.dirname(__file__)

return launch.LaunchDescription([
launch_ros.actions.Node(
executable=sys.executable,
arguments=[os.path.join(path_to_test, 'parameter_blackboard.py')],
additional_env={'PYTHONUNBUFFERED': '1'},
name='demo_node_1',
),
launch_testing.actions.ReadyToTest()
])


class TestFixture(unittest.TestCase):

def test_set_parameter(self, proc_output):
rclpy.init()
node = MakeTestNode('test_node')
response = node.set_parameter(state=True)
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
assert response.successful, 'Could not set parameter!'
rclpy.shutdown()


class MakeTestNode(Node):

def __init__(self, name='test_node'):
super().__init__(name)

def set_parameter(self, state=True, timeout=5.0):
parameters = [rclpy.Parameter('demo_parameter_1', value=state).to_parameter_msg()]

client = self.create_client(SetParameters, 'demo_node_1/set_parameters')
ready = client.wait_for_service(timeout_sec=timeout)
if not ready:
raise RuntimeError('Wait for service timed out')

request = SetParameters.Request()
request.parameters = parameters
future = client.call_async(request)
rclpy.spin_until_future_complete(self, future, timeout_sec=timeout)

response = future.result()
return response.results[0]
Loading