Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Final batch of examples #327

Merged
merged 17 commits into from Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 17 additions & 0 deletions launch_testing/launch_testing_examples/README.md
Expand Up @@ -17,6 +17,23 @@ There might be situations where nodes, once launched, take some time to actually
We can simulate this using ``launch.actions.TimerAction``. This example shows one way to detect when a node has been launched.
We delay the launch by 5 seconds, and wait for the node to start with a timeout of 8 seconds.

### `check_multiple_node_launch_test.py`

```sh
launch_test test/examples/check_multiple_node_launch_test.py
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
```

This test launches multiple nodes, and checks if they were launched successfully using the ``WaitForNodes`` utility.
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved

### `record_rosbag_launch_test.py`

```sh
launch_test test/examples/record_rosbag_launch_test.py
```

This test launches a ``talker`` node, records the topics to a ``rosbag`` and makes sure that the messages were recorded successfuly,
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
then deletes the bag file.

### `check_msgs_launch_test.py`

Usage:
Expand Down
@@ -0,0 +1,157 @@
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import random
import string
import time
import unittest

import launch
import launch.actions
import launch_ros.actions
import launch_testing.actions
import launch_testing.markers
import pytest
import rclpy
from rclpy.node import Node


@pytest.mark.launch_test
@launch_testing.markers.keep_alive
def generate_test_description():
# Default number of nodes to be launched
n = 3
node_list = []
node_names = []

for i in range(n):
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
node_list.append(
launch_ros.actions.Node(
executable='talker',
package='demo_nodes_cpp',
name='demo_node_' + str(i)
)
)
node_names.append('demo_node_' + str(i))

node_list.append(launch_testing.actions.ReadyToTest())
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
return launch.LaunchDescription(node_list), {'node_list': node_names}


class TestFixture(unittest.TestCase):
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved

def test_nodes_successful(self, node_list):
"""Check if all the nodes were launched correctly."""
time.sleep(1.0)
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved

# Method 1
wait_for_nodes_1 = WaitForNodes(node_list, timeout=5.0)
assert wait_for_nodes_1.wait()
assert wait_for_nodes_1.get_nodes_not_found() == set()
wait_for_nodes_1.shutdown()

# Method 2
with WaitForNodes(node_list, timeout=5.0) as wait_for_nodes_2:
print('All nodes were found !')
assert wait_for_nodes_2.get_nodes_not_found() == set()

def test_nodes_unsuccessful(self, node_list):
"""Insert a invalid node name that should not exist."""
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
time.sleep(1.0)
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
invalid_node_list = node_list + ['invalid_node']

# Method 1
wait_for_nodes_1 = WaitForNodes(invalid_node_list, timeout=1.0)
assert not wait_for_nodes_1.wait()
assert wait_for_nodes_1.get_nodes_not_found() == {'invalid_node'}
wait_for_nodes_1.shutdown()

# Method 2
with pytest.raises(RuntimeError):
with WaitForNodes(invalid_node_list, timeout=1.0):
pass


# TODO (adityapande-1995): Move WaitForNodes implementation to launch_testing_ros
# after https://github.com/ros2/rclpy/issues/831 is resolved
class WaitForNodes:
"""
Wait to discover supplied nodes.

Example usage:
--------------
# Method 1, calling wait() and shutdown() manually
def method_1():
node_list = ['foo', 'bar']
wait_for_nodes = WaitForNodes(node_list, timeout=5.0)
assert wait_for_nodes.wait()
print('Nodes found!')
assert wait_for_nodes.get_nodes_not_found() == set()
wait_for_nodes.shutdown()

# Method 2, using the 'with' keyword
def method_2():
with WaitForNodes(['foo', 'bar'], timeout=5.0) as wait_for_nodes:
assert wait_for_nodes.get_nodes_not_found() == set()
print('Nodes found!')
"""

def __init__(self, node_names, timeout=5.0):
self.node_names = node_names
self.timeout = timeout
self.__ros_context = rclpy.Context()
rclpy.init(context=self.__ros_context)
self._prepare_node()

self.__expected_nodes_set = set(node_names)
self.__nodes_found = None

def _prepare_node(self):
self.__node_name = '_test_node_' +\
''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
self.__ros_node = Node(node_name=self.__node_name, context=self.__ros_context)

def wait(self):
start = time.time()
flag = False
print('Waiting for nodes')
while time.time() - start < self.timeout and not flag:
flag = all(name in self.__ros_node.get_node_names() for name in self.node_names)
time.sleep(0.3)

self.__nodes_found = set(self.__ros_node.get_node_names())
self.__nodes_found.remove(self.__node_name)
return flag

def shutdown(self):
self.__ros_node.destroy_node()
rclpy.shutdown(context=self.__ros_context)

def __enter__(self):
if not self.wait():
raise RuntimeError('Did not find all nodes !')

return self

def __exit__(self, exep_type, exep_value, trace):
if exep_type is not None:
raise Exception('Exception occured, value: ', exep_value)
self.shutdown()

def get_nodes_found(self):
return self.__nodes_found

def get_nodes_not_found(self):
return self.__expected_nodes_set - self.__nodes_found
@@ -0,0 +1,73 @@
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import shutil
import tempfile
import time
import unittest

import launch
import launch.actions
import launch_ros.actions
import launch_testing.actions
import launch_testing.markers
import pytest

import yaml


@pytest.mark.launch_test
@launch_testing.markers.keep_alive
def generate_test_description():
rosbag_dir = tempfile.mkdtemp() + '/test_bag'

node_list = [
launch_ros.actions.Node(
executable='talker',
package='demo_nodes_cpp',
name='demo_node'
),
launch.actions.ExecuteProcess(
cmd=['ros2', 'bag', 'record', '-a', '-o', rosbag_dir],
output='screen'
),
launch_testing.actions.ReadyToTest()
]

return launch.LaunchDescription(node_list), {'rosbag_dir': rosbag_dir}


class TestFixture(unittest.TestCase):
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved

def test_delay(self):
"""Delay the shutdown of processes so that rosbag can record some messages."""
time.sleep(3)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a potential gap in launch_testing. It would be nice if there was a launch_testing feature that could delay the shutdown event. I know there is a KeepAlive action, but I don't know if that is exactly what we want here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just add a Timer action in the launch description with a 3s period that does nothing.
That will keep the launch service alive and delay the shutdown.

It's still kind of a hack though.



@launch_testing.post_shutdown_test()
class TestFixtureAfterShutdown(unittest.TestCase):

def test_rosbag_record(self, rosbag_dir):
"""Check if the rosbag2 recording was successful."""
with open(rosbag_dir + '/metadata.yaml', 'r') as file:
metadata = yaml.safe_load(file)
assert metadata['rosbag2_bagfile_information']['message_count'] > 0
print('The following topics received messages:')
for item in metadata['rosbag2_bagfile_information']['topics_with_message_count']:
print(item['topic_metadata']['name'], 'recieved ', item['message_count'],
' messages')

# Delete the rosbag directory
shutil.rmtree(rosbag_dir.replace('/test_bag', ''))
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 3 additions & 0 deletions launch_testing/launch_testing_examples/package.xml
Expand Up @@ -19,6 +19,9 @@
<test_depend>launch_testing</test_depend>
<test_depend>launch_testing_ros</test_depend>
<test_depend>rclpy</test_depend>
<test_depend>ros2cli</test_depend>
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
<test_depend>ros2bag</test_depend>
<test_depend>rosbag2_transport</test_depend>
<test_depend>std_msgs</test_depend>
<test_depend>demo_nodes_cpp</test_depend>

Expand Down