New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Final batch of examples #327
Merged
Merged
Changes from 16 commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
45d0fd3
Added example: Record to rosbag2
adityapande-1995 41a80a5
Removed rosbag dependency
adityapande-1995 ad73723
ros2cli added
adityapande-1995 15f26e3
ros2bag added
adityapande-1995 4c3f958
demo_nodes talker node added
adityapande-1995 5d7b82c
Split test in 2 files, added WaitForNodes utility
adityapande-1995 73f3eb6
typo fix in docstring
adityapande-1995 ce92035
Updated readme
adityapande-1995 40d4a68
Launch script does not require cmdline arguments now
adityapande-1995 9a0f7af
Review changes
adityapande-1995 46aea7d
Delete rosbag in tearDownClass method
adityapande-1995 c547867
Windows bugfix
adityapande-1995 89c6ce3
time.sleep for windows
adityapande-1995 fbe86a1
Removed rosbag example
adityapande-1995 fb8ee51
Dont run test for windows
adityapande-1995 aeae1e9
Added comment
adityapande-1995 832706f
Added comment
adityapande-1995 File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
152 changes: 152 additions & 0 deletions
152
...sting/launch_testing_examples/launch_testing_examples/check_multiple_nodes_launch_test.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
# Copyright 2021 Open Source Robotics Foundation, Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
|
||
import random | ||
import string | ||
import time | ||
import unittest | ||
|
||
import launch | ||
import launch.actions | ||
import launch_ros.actions | ||
import launch_testing.actions | ||
import launch_testing.markers | ||
import pytest | ||
import rclpy | ||
from rclpy.node import Node | ||
|
||
|
||
@pytest.mark.launch_test | ||
@launch_testing.markers.keep_alive | ||
def generate_test_description(): | ||
launch_actions = [] | ||
node_names = [] | ||
|
||
for i in range(3): | ||
launch_actions.append( | ||
launch_ros.actions.Node( | ||
executable='talker', | ||
package='demo_nodes_cpp', | ||
name='demo_node_' + str(i) | ||
) | ||
) | ||
node_names.append('demo_node_' + str(i)) | ||
|
||
launch_actions.append(launch_testing.actions.ReadyToTest()) | ||
return launch.LaunchDescription(launch_actions), {'node_list': node_names} | ||
|
||
|
||
class CheckMultipleNodesLaunched(unittest.TestCase): | ||
|
||
def test_nodes_successful(self, node_list): | ||
"""Check if all the nodes were launched correctly.""" | ||
# Method 1 | ||
wait_for_nodes_1 = WaitForNodes(node_list, timeout=5.0) | ||
assert wait_for_nodes_1.wait() | ||
assert wait_for_nodes_1.get_nodes_not_found() == set() | ||
wait_for_nodes_1.shutdown() | ||
|
||
# Method 2 | ||
with WaitForNodes(node_list, timeout=5.0) as wait_for_nodes_2: | ||
print('All nodes were found !') | ||
assert wait_for_nodes_2.get_nodes_not_found() == set() | ||
|
||
def test_node_does_not_exist(self, node_list): | ||
"""Insert a invalid node name that should not exist.""" | ||
invalid_node_list = node_list + ['invalid_node'] | ||
|
||
# Method 1 | ||
wait_for_nodes_1 = WaitForNodes(invalid_node_list, timeout=1.0) | ||
assert not wait_for_nodes_1.wait() | ||
assert wait_for_nodes_1.get_nodes_not_found() == {'invalid_node'} | ||
wait_for_nodes_1.shutdown() | ||
|
||
# Method 2 | ||
with pytest.raises(RuntimeError): | ||
with WaitForNodes(invalid_node_list, timeout=1.0): | ||
pass | ||
|
||
|
||
# TODO (adityapande-1995): Move WaitForNodes implementation to launch_testing_ros | ||
# after https://github.com/ros2/rclpy/issues/831 is resolved | ||
class WaitForNodes: | ||
""" | ||
Wait to discover supplied nodes. | ||
|
||
Example usage: | ||
-------------- | ||
# Method 1, calling wait() and shutdown() manually | ||
def method_1(): | ||
node_list = ['foo', 'bar'] | ||
wait_for_nodes = WaitForNodes(node_list, timeout=5.0) | ||
assert wait_for_nodes.wait() | ||
print('Nodes found!') | ||
assert wait_for_nodes.get_nodes_not_found() == set() | ||
wait_for_nodes.shutdown() | ||
|
||
# Method 2, using the 'with' keyword | ||
def method_2(): | ||
with WaitForNodes(['foo', 'bar'], timeout=5.0) as wait_for_nodes: | ||
assert wait_for_nodes.get_nodes_not_found() == set() | ||
print('Nodes found!') | ||
""" | ||
|
||
def __init__(self, node_names, timeout=5.0): | ||
self.node_names = node_names | ||
self.timeout = timeout | ||
self.__ros_context = rclpy.Context() | ||
rclpy.init(context=self.__ros_context) | ||
self._prepare_node() | ||
|
||
self.__expected_nodes_set = set(node_names) | ||
self.__nodes_found = None | ||
|
||
def _prepare_node(self): | ||
self.__node_name = '_test_node_' +\ | ||
''.join(random.choices(string.ascii_uppercase + string.digits, k=10)) | ||
self.__ros_node = Node(node_name=self.__node_name, context=self.__ros_context) | ||
|
||
def wait(self): | ||
start = time.time() | ||
flag = False | ||
print('Waiting for nodes') | ||
while time.time() - start < self.timeout and not flag: | ||
flag = all(name in self.__ros_node.get_node_names() for name in self.node_names) | ||
time.sleep(0.3) | ||
|
||
self.__nodes_found = set(self.__ros_node.get_node_names()) | ||
self.__nodes_found.remove(self.__node_name) | ||
return flag | ||
|
||
def shutdown(self): | ||
self.__ros_node.destroy_node() | ||
rclpy.shutdown(context=self.__ros_context) | ||
|
||
def __enter__(self): | ||
if not self.wait(): | ||
raise RuntimeError('Did not find all nodes !') | ||
|
||
return self | ||
|
||
def __exit__(self, exep_type, exep_value, trace): | ||
if exep_type is not None: | ||
raise Exception('Exception occured, value: ', exep_value) | ||
self.shutdown() | ||
|
||
def get_nodes_found(self): | ||
return self.__nodes_found | ||
|
||
def get_nodes_not_found(self): | ||
return self.__expected_nodes_set - self.__nodes_found |
83 changes: 83 additions & 0 deletions
83
launch_testing/launch_testing_examples/launch_testing_examples/record_rosbag_launch_test.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
# Copyright 2021 Open Source Robotics Foundation, Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
|
||
import os | ||
import shutil | ||
import tempfile | ||
import time | ||
import unittest | ||
|
||
import launch | ||
import launch.actions | ||
import launch_ros.actions | ||
import launch_testing.actions | ||
import launch_testing.markers | ||
import pytest | ||
|
||
import yaml | ||
|
||
|
||
@pytest.mark.launch_test | ||
@launch_testing.markers.keep_alive | ||
def generate_test_description(): | ||
rosbag_dir = os.path.join(tempfile.mkdtemp(), 'test_bag') | ||
|
||
node_list = [ | ||
launch_ros.actions.Node( | ||
executable='talker', | ||
package='demo_nodes_cpp', | ||
name='demo_node' | ||
), | ||
launch.actions.ExecuteProcess( | ||
cmd=['ros2', 'bag', 'record', '-a', '-o', rosbag_dir], | ||
output='screen' | ||
), | ||
launch_testing.actions.ReadyToTest() | ||
] | ||
|
||
return launch.LaunchDescription(node_list), {'rosbag_dir': rosbag_dir} | ||
|
||
|
||
class DelayShutdown(unittest.TestCase): | ||
|
||
def test_delay(self): | ||
"""Delay the shutdown of processes so that rosbag can record some messages.""" | ||
time.sleep(3) | ||
|
||
|
||
# TODO : Test fails on windows, to be fixed | ||
adityapande-1995 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if os.name != 'nt': | ||
@launch_testing.post_shutdown_test() | ||
class TestFixtureAfterShutdown(unittest.TestCase): | ||
|
||
rosbag_dir = None | ||
|
||
def test_rosbag_record(self, rosbag_dir): | ||
"""Check if the rosbag2 recording was successful.""" | ||
with open(os.path.join(rosbag_dir, 'metadata.yaml'), 'r') as file: | ||
metadata = yaml.safe_load(file) | ||
assert metadata['rosbag2_bagfile_information']['message_count'] > 0 | ||
print('The following topics received messages:') | ||
for item in metadata['rosbag2_bagfile_information']['topics_with_message_count']: | ||
print(item['topic_metadata']['name'], 'recieved ', item['message_count'], | ||
' messages') | ||
|
||
TestFixtureAfterShutdown.rosbag_dir = rosbag_dir | ||
|
||
@classmethod | ||
def tearDownClass(cls): | ||
"""Delete the rosbag directory.""" | ||
print('Deleting ', cls.rosbag_dir) | ||
shutil.rmtree(cls.rosbag_dir.replace('test_bag', '')) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a potential gap in launch_testing. It would be nice if there was a launch_testing feature that could delay the shutdown event. I know there is a
KeepAlive
action, but I don't know if that is exactly what we want here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just add a
Timer
action in the launch description with a 3s period that does nothing.That will keep the launch service alive and delay the shutdown.
It's still kind of a hack though.