Skip to content

Commit

Permalink
Add test for nonunique node names
Browse files Browse the repository at this point in the history
Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>
  • Loading branch information
emersonknapp committed Mar 10, 2020
1 parent 899881e commit 7b6f22c
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 3 deletions.
4 changes: 4 additions & 0 deletions ros2node/ros2node/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
from rclpy.node import HIDDEN_NODE_PREFIX
from ros2cli.node.strategy import NodeStrategy

INFO_NONUNIQUE_WARNING_TEMPLATE = (
'There are {num_nodes} nodes in the graph with the exact name "{node_name}". '
'You are seeing information about only one of them.'
)

NodeName = namedtuple('NodeName', ('name', 'namespace', 'full_name'))
TopicInfo = namedtuple('Topic', ('name', 'types'))
Expand Down
8 changes: 5 additions & 3 deletions ros2node/ros2node/verb/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys

from ros2cli.node.direct import DirectNode
from ros2cli.node.strategy import add_arguments
Expand All @@ -22,6 +23,7 @@
from ros2node.api import get_service_client_info
from ros2node.api import get_service_server_info
from ros2node.api import get_subscriber_info
from ros2node.api import INFO_NONUNIQUE_WARNING_TEMPLATE
from ros2node.api import NodeNameCompleter
from ros2node.verb import VerbExtension

Expand All @@ -48,9 +50,9 @@ def main(self, *, args):
node_names = get_node_names(node=node, include_hidden_nodes=args.include_hidden)
count = [n.full_name for n in node_names].count(args.node_name)
if count > 1:
print('# WARNING: There are {} nodes in the graph with this exact name "{}". '
'You are seeing information about only one of them.'.format(
count, args.node_name))
print(
INFO_NONUNIQUE_WARNING_TEMPLATE.format(num_nodes=count, node_name=args.node_name),
file=sys.stderr)
if count > 0:
with DirectNode(args) as node:
print(args.node_name)
Expand Down
125 changes: 125 additions & 0 deletions ros2node/test/test_cli_duplicate_node_names.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
import os
import sys
import unittest

from launch import LaunchDescription
from launch.actions import ExecuteProcess
from launch.actions import OpaqueFunction

from launch_ros.actions import Node

import launch_testing
import launch_testing.asserts
import launch_testing.markers
import launch_testing.tools
import launch_testing_ros.tools

import pytest

from rmw_implementation import get_available_rmw_implementations
from ros2node.api import INFO_NONUNIQUE_WARNING_TEMPLATE


@pytest.mark.rostest
@launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations())
def generate_test_description(rmw_implementation, ready_fn):
path_to_complex_node_script = os.path.join(
os.path.dirname(__file__), 'fixtures', 'complex_node.py'
)
additional_env = {'RMW_IMPLEMENTATION': rmw_implementation}
return LaunchDescription([
# Always restart daemon to isolate tests.
ExecuteProcess(
cmd=['ros2', 'daemon', 'stop'],
name='daemon-stop',
on_exit=[
ExecuteProcess(
cmd=['ros2', 'daemon', 'start'],
name='daemon-start',
on_exit=[
# Add test fixture actions.
Node(
node_executable=sys.executable,
arguments=[path_to_complex_node_script],
node_name='complex_node',
additional_env=additional_env,
),
Node(
node_executable=sys.executable,
arguments=[path_to_complex_node_script],
node_name='complex_node',
additional_env=additional_env,
),
Node(
node_executable=sys.executable,
arguments=[path_to_complex_node_script],
node_name='complex_node_2',
additional_env=additional_env,
),
OpaqueFunction(function=lambda context: ready_fn()),
],
additional_env=additional_env
)
]
),
])


class TestROS2NodeCLIWithDuplicateNodeNames(unittest.TestCase):

@classmethod
def setUpClass(
cls,
launch_service,
proc_info,
proc_output,
rmw_implementation
):
@contextlib.contextmanager
def launch_node_command(self, arguments):
node_command_action = ExecuteProcess(
cmd=['ros2', 'node', *arguments],
additional_env={
'RMW_IMPLEMENTATION': rmw_implementation,
'PYTHONUNBUFFERED': '1'
},
name='ros2node-cli',
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, node_command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
# ignore launch_ros and ros2cli daemon nodes
filtered_patterns=['.*launch_ros.*', '.*ros2cli.*'],
filtered_rmw_implementation=rmw_implementation
)
) as node_command:
yield node_command
cls.launch_node_command = launch_node_command

@launch_testing.markers.retry_on_failure(times=5, delay=1)
def test_info_warning(self):
with self.launch_node_command(arguments=['info', '/complex_node']) as node_command:
assert node_command.wait_for_shutdown(timeout=20)
assert node_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
INFO_NONUNIQUE_WARNING_TEMPLATE.format(num_nodes='2', node_name='/complex_node')
],
text=node_command.output, strict=False
), 'Output does not match:\n' + node_command.output

0 comments on commit 7b6f22c

Please sign in to comment.