Skip to content

Commit

Permalink
Add interface type filters to ros2 interface package (#765)
Browse files Browse the repository at this point in the history
* Add interface type filters to ros2 interface package

Signed-off-by: David V. Lu <davidvlu@gmail.com>
  • Loading branch information
DLu committed May 19, 2023
1 parent 1e7df98 commit 4b2b2fd
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 2 deletions.
34 changes: 32 additions & 2 deletions ros2interface/ros2interface/verb/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,53 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import collections

from ros2interface.api import package_name_completer
from ros2interface.verb import VerbExtension
from rosidl_runtime_py import get_interfaces
from rosidl_runtime_py import get_action_interfaces, get_interfaces
from rosidl_runtime_py import get_message_interfaces, get_service_interfaces


class PackageVerb(VerbExtension):
"""Output a list of available interface types within one package."""

def add_arguments(self, parser, cli_name):
parser.add_argument(
'-m', '--only-msgs', action='store_true',
help='Print out only the message types')

parser.add_argument(
'-s', '--only-srvs', action='store_true',
help='Print out only the service types')

parser.add_argument(
'-a', '--only-actions', action='store_true',
help='Print out only the action types')

arg = parser.add_argument(
'package_name',
help="Name of the ROS package (e.g. 'example_interfaces')")
arg.completer = package_name_completer

def main(self, *, args):
interfaces = collections.defaultdict(list)
try:
interfaces = get_interfaces([args.package_name])
if not args.only_msgs and not args.only_srvs and not args.only_actions:
interfaces = get_interfaces([args.package_name])
else:
get_commands = []
if args.only_msgs:
get_commands.append(get_message_interfaces)
if args.only_srvs:
get_commands.append(get_service_interfaces)
if args.only_actions:
get_commands.append(get_action_interfaces)

for get_interface_cmd in get_commands:
pkg_interfaces = get_interface_cmd([args.package_name])
for package_name, interface_names in pkg_interfaces.items():
interfaces[package_name] += interface_names
except LookupError as e:
return str(e)
for package_name in sorted(interfaces):
Expand Down
48 changes: 48 additions & 0 deletions ros2interface/test/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,54 @@ def test_package_on_test_msgs(self):
)
assert all(interface in output_lines for interface in some_interfaces)

def test_package_on_test_msgs_only_msgs(self):
with self.launch_interface_command(
arguments=['package', 'test_msgs', '-m']
) as interface_command:
assert interface_command.wait_for_shutdown(timeout=5)
assert interface_command.exit_code == launch_testing.asserts.EXIT_OK
output_lines = interface_command.output.splitlines()
assert launch_testing.tools.expect_output(
expected_lines=itertools.repeat(
re.compile(r'test_msgs/msg/[A-z0-9_]+'), len(output_lines)
),
lines=output_lines,
strict=True
)
assert all(interface in output_lines for interface in some_messages_from_test_msgs)

def test_package_on_test_msgs_only_srvs(self):
with self.launch_interface_command(
arguments=['package', 'test_msgs', '-s']
) as interface_command:
assert interface_command.wait_for_shutdown(timeout=5)
assert interface_command.exit_code == launch_testing.asserts.EXIT_OK
output_lines = interface_command.output.splitlines()
assert launch_testing.tools.expect_output(
expected_lines=itertools.repeat(
re.compile(r'test_msgs/srv/[A-z0-9_]+'), len(output_lines)
),
lines=output_lines,
strict=True
)
assert all(interface in output_lines for interface in some_services_from_test_msgs)

def test_package_on_test_msgs_only_actions(self):
with self.launch_interface_command(
arguments=['package', 'test_msgs', '-a']
) as interface_command:
assert interface_command.wait_for_shutdown(timeout=5)
assert interface_command.exit_code == launch_testing.asserts.EXIT_OK
output_lines = interface_command.output.splitlines()
assert launch_testing.tools.expect_output(
expected_lines=itertools.repeat(
re.compile(r'test_msgs/action/[A-z0-9_]+'), len(output_lines)
),
lines=output_lines,
strict=True
)
assert all(interface in output_lines for interface in some_actions_from_test_msgs)

def test_packages(self):
with self.launch_interface_command(arguments=['packages']) as interface_command:
assert interface_command.wait_for_shutdown(timeout=2)
Expand Down

0 comments on commit 4b2b2fd

Please sign in to comment.