Skip to content

Commit

Permalink
Add support use_sim_time for ros2 topic hz/bw/pub. (#754) (#777)
Browse files Browse the repository at this point in the history
* Add support use_sim_time for ros2 topic hz/bw/pub.

Signed-off-by: Lei Liu <Lei.Liu.AP@sony.com>

* Add testcase.

Signed-off-by: Lei Liu <Lei.Liu.AP@sony.com>

* Update code.

Signed-off-by: Lei Liu <Lei.Liu.AP@sony.com>

* Update code again.

Signed-off-by: Lei Liu <Lei.Liu.AP@sony.com>

* Add warning log.

Signed-off-by: Lei Liu <Lei.Liu.AP@sony.com>

* Revert STEADY_TIME.

Signed-off-by: Lei Liu <Lei.Liu.AP@sony.com>

* Fix flake8 warnings.

Signed-off-by: Lei Liu <Lei.Liu.AP@sony.com>

* Update code.

Signed-off-by: Lei Liu <Lei.Liu.AP@sony.com>

Signed-off-by: Lei Liu <Lei.Liu.AP@sony.com>
Signed-off-by: Tomoya Fujita <Tomoya.Fujita@sony.com>

Signed-off-by: Lei Liu <Lei.Liu.AP@sony.com>
Signed-off-by: Tomoya Fujita <Tomoya.Fujita@sony.com>
Co-authored-by: Lei Liu <64953129+llapx@users.noreply.github.com>
  • Loading branch information
fujitatomoya and llapx committed Nov 22, 2022
1 parent 74bd3eb commit 1cf39d4
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 4 deletions.
1 change: 1 addition & 0 deletions ros2topic/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
<test_depend>launch_testing_ros</test_depend>
<test_depend>python3-pytest</test_depend>
<test_depend>python3-pytest-timeout</test_depend>
<test_depend>rosgraph_msgs</test_depend>
<test_depend>std_msgs</test_depend>
<test_depend>test_msgs</test_depend>

Expand Down
17 changes: 13 additions & 4 deletions ros2topic/ros2topic/verb/bw.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@
# https://github.com/ros/ros_comm/blob/6e5016f4b2266d8a60c9a1e163c4928b8fc7115e/tools/rostopic/src/rostopic/__init__.py

from argparse import ArgumentTypeError
import sys
import threading
import time
import traceback

import rclpy
from rclpy.qos import qos_profile_sensor_data
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
from ros2cli.node.direct import DirectNode
from ros2topic.api import get_msg_class
from ros2topic.api import TopicNameCompleter
Expand Down Expand Up @@ -79,6 +80,7 @@ def add_arguments(self, parser, cli_name):
'--window', '-w', type=positive_int, default=DEFAULT_WINDOW_SIZE,
help='maximum window size, in # of messages, for calculating rate '
f'(default: {DEFAULT_WINDOW_SIZE})', metavar='WINDOW')
add_direct_node_arguments(parser)

def main(self, *, args):
with DirectNode(args) as node:
Expand All @@ -93,12 +95,14 @@ def __init__(self, node, window_size):
self.sizes = []
self.times = []
self.window_size = window_size
self.use_sim_time = node.get_parameter('use_sim_time').value
self.clock = node.get_clock()

def callback(self, data):
"""Execute ros sub callback."""
with self.lock:
try:
t = time.monotonic()
t = self.clock.now()
self.times.append(t)
# TODO(yechun1): Subscribing to the msgs and calculate the length may be
# inefficiency. To optimize here if found better solution.
Expand All @@ -117,11 +121,16 @@ def print_bw(self):
return
with self.lock:
n = len(self.times)
tn = time.monotonic()
tn = self.clock.now()
t0 = self.times[0]
if tn <= t0:
print('WARNING: time is reset!', file=sys.stderr)
self.times = []
self.sizes = []
return None, None, None, None, None

total = sum(self.sizes)
bytes_per_s = total / (tn - t0)
bytes_per_s = total / ((tn.nanoseconds - t0.nanoseconds) * 1.e-9)
mean = total / n

# min and max
Expand Down
2 changes: 2 additions & 0 deletions ros2topic/ros2topic/verb/hz.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from rclpy.clock import Clock
from rclpy.clock import ClockType
from rclpy.qos import qos_profile_sensor_data
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
from ros2cli.node.direct import DirectNode
from ros2topic.api import get_msg_class
from ros2topic.api import TopicNameCompleter
Expand Down Expand Up @@ -82,6 +83,7 @@ def add_arguments(self, parser, cli_name):
dest='use_wtime', default=False, action='store_true',
help='calculates rate using wall time which can be helpful'
' when clock is not published during simulation')
add_direct_node_arguments(parser)

def main(self, *, args):
return main(args)
Expand Down
2 changes: 2 additions & 0 deletions ros2topic/ros2topic/verb/pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
from ros2cli.node.direct import DirectNode
from ros2topic.api import profile_configure_short_keys
from ros2topic.api import TopicMessagePrototypeCompleter
Expand Down Expand Up @@ -215,6 +216,7 @@ def add_arguments(self, parser, cli_name):
help='Quality of service durability setting to publish with '
'(overrides durability value of --qos-profile option, default: {})'
.format(default_profile.durability.short_key))
add_direct_node_arguments(parser)

def main(self, *, args):
return main(args)
Expand Down
259 changes: 259 additions & 0 deletions ros2topic/test/test_use_sim_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
# Copyright 2022 Sony Group Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
import re
import sys
import unittest

from launch import LaunchDescription
from launch.actions import ExecuteProcess

import launch_testing
import launch_testing.actions
import launch_testing.asserts
import launch_testing.markers
import launch_testing.tools
import launch_testing_ros.tools

import pytest

import rclpy
from rclpy.executors import SingleThreadedExecutor
from rclpy.utilities import get_rmw_implementation_identifier

from rosgraph_msgs.msg import Clock
from std_msgs.msg import String


# Skip cli tests on Windows while they exhibit pathological behavior
# https://github.com/ros2/build_farmer/issues/248
if sys.platform.startswith('win'):
pytest.skip(
'CLI tests can block for a pathological amount of time on Windows.',
allow_module_level=True)


TEST_NODE = 'cli_use_sim_time_test_node'
TEST_NAMESPACE = 'cli_use_sim_time'


@pytest.mark.rostest
@launch_testing.markers.keep_alive
def generate_test_description():
return LaunchDescription([
# Always restart daemon to isolate tests.
ExecuteProcess(
cmd=['ros2', 'daemon', 'stop'],
name='daemon-stop',
on_exit=[
ExecuteProcess(
cmd=['ros2', 'daemon', 'start'],
name='daemon-start',
on_exit=[
launch_testing.actions.ReadyToTest()
],
)
]
)
])


class TestROS2TopicUseSimTime(unittest.TestCase):

def timer_callback(self):
msg = Clock()
msg.clock.sec = self.clock_sec
self.publisher.publish(msg)
self.clock_sec += 1

def setUp(self):
self.context = rclpy.context.Context()
rclpy.init(context=self.context)
self.node = rclpy.create_node(
TEST_NODE, namespace=TEST_NAMESPACE, context=self.context
)
self.publisher = self.node.create_publisher(Clock, '/clock', 10)
self.executor = SingleThreadedExecutor(context=self.context)
self.executor.add_node(self.node)

def tearDown(self):
self.node.destroy_node()
rclpy.shutdown(context=self.context)

@launch_testing.markers.retry_on_failure(times=5)
def test_pub_times(self, launch_service, proc_info, proc_output):
# test method:
# speed up sim time to half second, then expect the process
# will end at half time compare to system time (5s).
self.clock_sec = 0
# Let's speed up 2x fast
rate = 0.5
clock_timer = self.node.create_timer(rate, self.timer_callback)

try:
# pub 5 times with default rate of 1 hz.
command_action = ExecuteProcess(
cmd=['ros2', 'topic', 'pub', '-t', str(5), '-w', '0', '-s',
'/clitest/topic/pub', 'std_msgs/String', 'data: hello'],
additional_env={
'PYTHONUNBUFFERED': '1'
},
)
with launch_testing.tools.launch_process(
launch_service, command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation=get_rmw_implementation_identifier()
)
) as command:
# The process will end up in around 2.5s (here we set 3s)
self.executor.spin_until_future_complete(
rclpy.task.Future(), timeout_sec=3
)
assert command.wait_for_shutdown(timeout=1)
assert launch_testing.tools.expect_output(
expected_lines=[
'publisher: beginning loop',
"publishing #1: std_msgs.msg.String(data='hello')",
'',
"publishing #2: std_msgs.msg.String(data='hello')",
'',
"publishing #3: std_msgs.msg.String(data='hello')",
'',
"publishing #4: std_msgs.msg.String(data='hello')",
'',
"publishing #5: std_msgs.msg.String(data='hello')",
'',
],
text=command.output,
strict=False
)

finally:
# Cleanup
self.node.destroy_timer(clock_timer)

@launch_testing.markers.retry_on_failure(times=5)
def test_pub_hz(self, launch_service, proc_info, proc_output):
# test method:
# the default publish rate is 1 hz, then we speed up the
# sim time to half second, then we expect the rate to 0.5 hz.
topic = '/clitest/topic/hz'
publisher = self.node.create_publisher(String, topic, 10)
assert publisher

self.pub_times = 10

def publish_message():
if self.pub_times == 0:
self.node.destroy_timer(publish_timer)
publisher.publish(String(data='hello'))
self.pub_times -= 1

publish_timer = self.node.create_timer(1.0, publish_message)

self.clock_sec = 0
# Let's speed up 2x fast
rate = 0.5
clock_timer = self.node.create_timer(rate, self.timer_callback)

try:
command_action = ExecuteProcess(
cmd=['ros2', 'topic', 'hz', '-s', topic],
additional_env={
'PYTHONUNBUFFERED': '1'
},
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation=get_rmw_implementation_identifier()
)
) as command:
# The future won't complete - we will hit the timeout
self.executor.spin_until_future_complete(
rclpy.task.Future(), timeout_sec=5
)
assert command.wait_for_output(functools.partial(
launch_testing.tools.expect_output, expected_lines=[
# without speed up, the average rate should be 1.0
re.compile(r'average rate: 0.500'),
], strict=False
), timeout=10), 'Echo CLI did not print expected message'
assert command.wait_for_shutdown(timeout=10)

finally:
# Cleanup
self.node.destroy_timer(publish_timer)
self.node.destroy_timer(clock_timer)
self.node.destroy_publisher(publisher)

@launch_testing.markers.retry_on_failure(times=5)
def test_pub_bw(self, launch_service, proc_info, proc_output):
# test method:
# the default average band width is 16 B/s, then we speed up the
# sim time to half second, then we expect the average band width
# to 8 B/s.
topic = '/clitest/topic/bw'
publisher = self.node.create_publisher(String, topic, 10)
assert publisher

self.pub_times = 10

def publish_message():
if self.pub_times == 0:
self.node.destroy_timer(publish_timer)
publisher.publish(String(data='hello'))
self.pub_times -= 1

publish_timer = self.node.create_timer(1.0, publish_message)

self.clock_sec = 0
# Let's speed up 2x fast
rate = 0.5
clock_timer = self.node.create_timer(rate, self.timer_callback)

try:
command_action = ExecuteProcess(
cmd=['ros2', 'topic', 'bw', '-s', '-w', '10', topic],
additional_env={
'PYTHONUNBUFFERED': '1'
},
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation=get_rmw_implementation_identifier()
)
) as command:
# The future won't complete - we will hit the timeout
self.executor.spin_until_future_complete(
rclpy.task.Future(), timeout_sec=10
)
assert command.wait_for_output(functools.partial(
launch_testing.tools.expect_output, expected_lines=[
# without speed up, the average band width should be 16 B
re.compile(r'8 B/s from \d+ messages'),
re.compile(r'\s*Message size mean: 16 B min: 16 B max: 16 B')
], strict=False
), timeout=10), 'Echo CLI did not print expected message'
assert command.wait_for_shutdown(timeout=5)

finally:
# Cleanup
self.node.destroy_timer(publish_timer)
self.node.destroy_timer(clock_timer)
self.node.destroy_publisher(publisher)

0 comments on commit 1cf39d4

Please sign in to comment.