Skip to content

Commit

Permalink
Convert test_verb_dump into launch test
Browse files Browse the repository at this point in the history
Fixes #480

The actual tests are the same, except with the use of launch_testing we ensure the CLI daemon
is restarted between tests. This follows a similar pattern as the other ros2cli tests.

I've left a TODO to test with all RMW implementations. I did not enable this now  since I do
not expect the tests to pass with all RMW implementations.
See #482

Signed-off-by: Jacob Perron <jacob@openrobotics.org>
  • Loading branch information
jacobperron committed Apr 13, 2020
1 parent 8e6e051 commit 5ba0bc0
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 79 deletions.
4 changes: 4 additions & 0 deletions ros2param/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>ament_xmllint</test_depend>
<test_depend>launch</test_depend>
<test_depend>launch_ros</test_depend>
<test_depend>launch_testing</test_depend>
<test_depend>launch_testing_ros</test_depend>
<test_depend>python3-pytest</test_depend>

<export>
Expand Down
46 changes: 46 additions & 0 deletions ros2param/test/fixtures/parameter_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2020 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys

import rclpy
from rclpy.parameter import PARAMETER_SEPARATOR_STRING


def main(args=None):
rclpy.init(args=args)

node = rclpy.create_node('parameter_node')
node.declare_parameter('bool_param', True)
node.declare_parameter('int_param', 42)
node.declare_parameter('double_param', 1.23)
node.declare_parameter('str_param', 'Hello World')
node.declare_parameter('foo' + PARAMETER_SEPARATOR_STRING + 'str_param', 'foo')
node.declare_parameter('foo' + PARAMETER_SEPARATOR_STRING +
'bar' + PARAMETER_SEPARATOR_STRING +
'str_param', 'foobar')

try:
rclpy.spin(node)
except KeyboardInterrupt:
print('parameter node stopped cleanly')
except BaseException:
print('exception in parameter node:', file=sys.stderr)
raise
finally:
node.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()
226 changes: 147 additions & 79 deletions ros2param/test/test_verb_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,75 +12,112 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from io import StringIO
import contextlib
import os
import sys
import tempfile
import threading
import time
import unittest
from unittest.mock import patch
import xmlrpc

from launch import LaunchDescription
from launch.actions import ExecuteProcess
from launch_ros.actions import Node
import launch_testing
import launch_testing.actions
import launch_testing.asserts
import launch_testing.markers
import launch_testing.tools

import pytest

import rclpy
from rclpy.executors import MultiThreadedExecutor
from rclpy.parameter import PARAMETER_SEPARATOR_STRING

from ros2cli import cli
from ros2cli.node.strategy import NodeStrategy

TEST_NODE = 'test_node'
TEST_NAMESPACE = '/foo'

TEST_TIMEOUT = 5.0

EXPECTED_PARAMETER_FILE = """\
test_node:
ros__parameters:
bool_param: true
double_param: 1.23
foo:
bar:
str_param: foobar
str_param: foo
int_param: 42
str_param: Hello World
use_sim_time: false
"""
EXPECTED_PARAMETER_FILE = (
f'{TEST_NODE}:\n'
' ros__parameters:\n'
' bool_param: true\n'
' double_param: 1.23\n'
' foo:\n'
' bar:\n'
' str_param: foobar\n'
' str_param: foo\n'
' int_param: 42\n'
' str_param: Hello World\n'
' use_sim_time: false\n'
)

# TODO(jacobperron): Enable testing with all RMW implementations when they pass
# @launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations())
@pytest.mark.rostest
@launch_testing.parametrize('rmw_implementation', ['rmw_fastrtps_cpp'])
def generate_test_description(rmw_implementation):
path_to_fixtures = os.path.join(os.path.dirname(__file__), 'fixtures')
additional_env = {'RMW_IMPLEMENTATION': rmw_implementation}

# Parameter node test fixture
path_to_parameter_node_script = os.path.join(path_to_fixtures, 'parameter_node.py')
parameter_node = Node(
node_executable=sys.executable,
name=TEST_NODE,
namespace=TEST_NAMESPACE,
arguments=[path_to_parameter_node_script],
additional_env=additional_env
)

return LaunchDescription([
# TODO(jacobperron): Provide a common RestartCliDaemon launch action in ros2cli
ExecuteProcess(
cmd=['ros2', 'daemon', 'stop'],
name='daemon-stop',
on_exit=[
ExecuteProcess(
cmd=['ros2', 'daemon', 'start'],
name='daemon-start',
on_exit=[
parameter_node,
launch_testing.actions.ReadyToTest(),
],
additional_env=additional_env
)
]
),
])


class TestVerbDump(unittest.TestCase):

@classmethod
def setUpClass(cls):
cls.context = rclpy.context.Context()
rclpy.init(context=cls.context)
cls.node = rclpy.create_node(
TEST_NODE, namespace=TEST_NAMESPACE, context=cls.context)

cls.executor = MultiThreadedExecutor(context=cls.context, num_threads=2)
cls.executor.add_node(cls.node)

cls.node.declare_parameter('bool_param', True)
cls.node.declare_parameter('int_param', 42)
cls.node.declare_parameter('double_param', 1.23)
cls.node.declare_parameter('str_param', 'Hello World')
cls.node.declare_parameter('foo' + PARAMETER_SEPARATOR_STRING +
'str_param', 'foo')
cls.node.declare_parameter('foo' + PARAMETER_SEPARATOR_STRING +
'bar' + PARAMETER_SEPARATOR_STRING +
'str_param', 'foobar')

# We need both the test node and 'dump'
# node to be able to spin
cls.exec_thread = threading.Thread(target=cls.executor.spin)
cls.exec_thread.start()

@classmethod
def tearDownClass(cls):
cls.executor.shutdown()
cls.node.destroy_node()
rclpy.shutdown(context=cls.context)
cls.exec_thread.join()
def setUpClass(
cls,
launch_service,
proc_info,
proc_output,
rmw_implementation
):

@contextlib.contextmanager
def launch_param_dump_command(self, arguments):
param_dump_command_action = ExecuteProcess(
cmd=['ros2', 'param', 'dump', *arguments],
additional_env={
'RMW_IMPLEMENTATION': rmw_implementation,
},
name='ros2param-dump-cli',
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, param_dump_command_action, proc_info, proc_output
) as param_dump_command:
yield param_dump_command
cls.launch_param_dump_command = launch_param_dump_command

def setUp(self):
# Ensure the daemon node is running and discovers the test node
Expand Down Expand Up @@ -109,52 +146,83 @@ def setUp(self):
self.fail(f'CLI daemon failed to find test node after {TEST_TIMEOUT} seconds')

def _output_file(self):

ns = self.node.get_namespace()
name = self.node.get_name()
if '/' == ns:
fqn = ns + name
else:
fqn = ns + '/' + name
return fqn[1:].replace('/', '__') + '.yaml'
return f'{TEST_NAMESPACE}/{TEST_NODE}'[1:].replace('/', '__') + '.yaml'

def test_verb_dump_invalid_node(self):
assert cli.main(
argv=['param', 'dump', 'invalid_node']) == 'Node not found'

assert cli.main(
argv=['param', 'dump', 'invalid_ns/test_node']) == 'Node not found'
with self.launch_param_dump_command(arguments=['invalid_node']) as param_dump_command:
assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_dump_command.exit_code != launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=['Node not found'],
text=param_dump_command.output,
strict=True
)
with self.launch_param_dump_command(
arguments=[f'invalid_ns/{TEST_NODE}']
) as param_dump_command:
assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_dump_command.exit_code != launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=['Node not found'],
text=param_dump_command.output,
strict=True
)

def test_verb_dump_invalid_path(self):
assert cli.main(
argv=['param', 'dump', 'foo/test_node', '--output-dir', 'invalid_path']) \
== "'invalid_path' is not a valid directory."
with self.launch_param_dump_command(
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', '--output-dir', 'invalid_path']
) as param_dump_command:
assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_dump_command.exit_code != launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=["'invalid_path' is not a valid directory."],
text=param_dump_command.output,
strict=True
)

def test_verb_dump(self):
with tempfile.TemporaryDirectory() as tmpdir:
assert cli.main(
argv=['param', 'dump', '/foo/test_node', '--output-dir', tmpdir]) is None

with self.launch_param_dump_command(
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', '--output-dir', tmpdir]
) as param_dump_command:
assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_dump_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[''],
text=param_dump_command.output,
strict=True
)
# Compare generated parameter file against expected
generated_param_file = os.path.join(tmpdir, self._output_file())
assert (open(generated_param_file, 'r').read() == EXPECTED_PARAMETER_FILE)

def test_verb_dump_print(self):
with patch('sys.stdout', new=StringIO()) as fake_stdout:
assert cli.main(
argv=['param', 'dump', 'foo/test_node', '--print']) is None

# Compare generated stdout against expected
assert fake_stdout.getvalue().strip() == EXPECTED_PARAMETER_FILE[:-1]

with self.launch_param_dump_command(
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', '--print']
) as param_dump_command:
assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_dump_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_text=EXPECTED_PARAMETER_FILE + '\n',
text=param_dump_command.output,
strict=True
)
# If both '--output-dir' and '--print' options are provided, ensure it only prints
# and no file is written
with tempfile.TemporaryDirectory() as tmpdir:
assert cli.main(
argv=['param', 'dump', 'foo/test_node', '--output-dir', tmpdir, '--print']) is None
with self.launch_param_dump_command(
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', '--output-dir', tmpdir, '--print']
) as param_dump_command:
assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_dump_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_text=EXPECTED_PARAMETER_FILE + '\n',
text=param_dump_command.output,
strict=True
)

not_generated_param_file = os.path.join(tmpdir, self._output_file())

with self.assertRaises(OSError) as context:
open(not_generated_param_file, 'r')

# Make sure the file was not create, thus '--print' did preempt
assert '[Errno 2] No such file or directory' in str(context.exception)

0 comments on commit 5ba0bc0

Please sign in to comment.