Skip to content

Commit

Permalink
add test_info_topic_verbose()
Browse files Browse the repository at this point in the history
Signed-off-by: Miaofei <miaofei@amazon.com>
  • Loading branch information
mm318 committed Dec 31, 2019
1 parent c3bd37b commit f035b9f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 5 deletions.
10 changes: 5 additions & 5 deletions ros2topic/ros2topic/verb/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@

def print_topic_info(topic_name, get_topic_info_func):
for info in get_topic_info_func(topic_name):
print('\nNode Name: %s' % info['node_name'])
print('Node Namespace: %s' % info['node_namespace'])
print('Topic Type: %s' % info['topic_type'])
print('Node name: %s' % info['node_name'])
print('Node namespace: %s' % info['node_namespace'])
print('Topic type: %s' % info['topic_type'])
print('GID: %s' % '.'.join(format(x, '02x') for x in info['gid']))
print('QoS Profile:')
print('QoS profile:')
qos_profile = info['qos_profile']
print(' Reliability: %s' % QoSReliabilityPolicy(qos_profile['reliability']).name)
print(' Durability: %s' % QoSDurabilityPolicy(qos_profile['durability']).name)
print(' Lifespan: %d nanoseconds' % qos_profile['lifespan'].nanoseconds)
print(' Deadline: %d nanoseconds' % qos_profile['deadline'].nanoseconds)
print(' Liveliness: %s' % QoSLivelinessPolicy(qos_profile['liveliness']).name)
print(' Liveliness Lease Duration: %d nanoseconds' %
print(' Liveliness lease duration: %d nanoseconds\n' %
qos_profile['liveliness_lease_duration'].nanoseconds)

class InfoVerb(VerbExtension):
Expand Down
29 changes: 29 additions & 0 deletions ros2topic/test/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,35 @@ def test_topic_info(self):
strict=True
)

@launch_testing.markers.retry_on_failure(times=5)
def test_topic_info_verbose(self):
with self.launch_topic_command(arguments=['info', '--verbose', '/chatter']) as topic_command:
assert topic_command.wait_for_shutdown(timeout=10)
assert topic_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
'Type: std_msgs/msg/String',
'',
'Publisher count: 1',
'',
re.compile(r'Node name: \w+'),
'Node namespace: /',
'Topic type: std_msgs/msg/String',
re.compile(r'GID: [\w\.]+'),
'QoS profile:',
re.compile(r' Reliability: RMW_QOS_POLICY_RELIABILITY_\w+'),
re.compile(r' Durability: RMW_QOS_POLICY_DURABILITY_\w+'),
re.compile(r' Lifespan: \d+ nanoseconds'),
re.compile(r' Deadline: \d+ nanoseconds'),
re.compile(r' Liveliness: RMW_QOS_POLICY_LIVELINESS_\w+'),
re.compile(r' Liveliness lease duration: \d+ nanoseconds'),
'',
'Subscription count: 0'
],
text=topic_command.output,
strict=True
)

def test_info_on_unknown_topic(self):
with self.launch_topic_command(arguments=['info', '/unknown_topic']) as topic_command:
assert topic_command.wait_for_shutdown(timeout=10)
Expand Down

0 comments on commit f035b9f

Please sign in to comment.