Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a --max-wait-time option to ros2 topic pub #800

Merged
merged 13 commits into from
Feb 24, 2023
3 changes: 2 additions & 1 deletion ros2topic/ros2topic/verb/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ def add_arguments(self, parser, cli_name):
parser.add_argument(
'--once', action='store_true', help='Print the first message received and then exit.')
parser.add_argument(
'--timeout', metavar='N', type=positive_float, help='Set a timeout in seconds for waiting', default=None)
'--timeout', metavar='N', type=positive_float,
help='Set a timeout in seconds for waiting', default=None)
parser.add_argument(
'--include-message-info', '-i', action='store_true',
help='Shows the associated message info.')
Expand Down
20 changes: 19 additions & 1 deletion ros2topic/ros2topic/verb/pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import yaml

MsgType = TypeVar('MsgType')
DEFAULT_WAIT_TIME = 0.1


def nonnegative_int(inval):
Expand Down Expand Up @@ -82,6 +83,12 @@ def add_arguments(self, parser, cli_name):
help=(
'Wait until finding the specified number of matching subscriptions. '
'Defaults to 1 when using "-1"/"--once"/"--times", otherwise defaults to 0.'))
parser.add_argument(
'--max-wait-time-secs', type=positive_float, default=None,
help=(
'This sets the maximum wait time in seconds if '
'--wait-until-matching-subscriptions is set. '
'By default, this flag is not set meaning the subscriber will wait endlessly.'))
parser.add_argument(
'--keep-alive', metavar='N', type=positive_float, default=0.1,
help='Keep publishing node alive for N seconds after the last msg '
Expand Down Expand Up @@ -119,6 +126,7 @@ def main(args):
times,
args.wait_matching_subscriptions
if args.wait_matching_subscriptions is not None else int(times != 0),
args.max_wait_time_secs,
qos_profile,
args.keep_alive)

Expand All @@ -132,6 +140,7 @@ def publisher(
print_nth: int,
times: int,
wait_matching_subscriptions: int,
max_wait_time: float | None,
qos_profile: QoSProfile,
keep_alive: float,
) -> Optional[str]:
Expand All @@ -146,14 +155,23 @@ def publisher(

pub = node.create_publisher(msg_module, topic_name, qos_profile)

if wait_matching_subscriptions == 0 and max_wait_time is not None:
return '--max-wait-time-secs option is only effective' \
' with --wait-matching-subscriptions, --once or --times'

times_since_last_log = 0
total_wait_time = 0
while pub.get_subscription_count() < wait_matching_subscriptions:
# Print a message reporting we're waiting each 1s, check condition each 100ms.
if not times_since_last_log:
print(
f'Waiting for at least {wait_matching_subscriptions} matching subscription(s)...')
if max_wait_time is not None and max_wait_time <= total_wait_time:
return f'Timed out waiting for subscribers: Expected {wait_matching_subscriptions}' \
f' subcribers but only got {pub.get_subscription_count()} subscribers'
times_since_last_log = (times_since_last_log + 1) % 10
time.sleep(0.1)
time.sleep(DEFAULT_WAIT_TIME)
total_wait_time += DEFAULT_WAIT_TIME

msg = msg_module()
try:
Expand Down
83 changes: 82 additions & 1 deletion ros2topic/test/test_echo_pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,87 @@ def message_callback(msg):
# Cleanup
self.node.destroy_subscription(subscription)

@launch_testing.markers.retry_on_failure(times=5)
def test_pub_maxwait_no_subscribers(self, launch_service, proc_info, proc_output):
command_action = ExecuteProcess(
cmd=(['ros2', 'topic', 'pub', '-t', '5', '--max-wait-time-secs',
'1', '/clitest/topic/pub_times', 'std_msgs/String', 'data: hello']),
additional_env={
'PYTHONUNBUFFERED': '1'
},
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation=get_rmw_implementation_identifier()
)
) as command:
assert command.wait_for_shutdown(timeout=5)
assert launch_testing.tools.expect_output(
expected_lines=[
'Waiting for at least 1 matching subscription(s)...',
'Waiting for at least 1 matching subscription(s)...',
'Timed out waiting for subscribers: Expected'
' 1 subcribers but only got 0 subscribers'
],
text=command.output,
strict=True)

@launch_testing.markers.retry_on_failure(times=5)
def test_pub_maxwait_malformed_arguments(self, launch_service, proc_info, proc_output):
command_action = ExecuteProcess(
cmd=(['ros2', 'topic', 'pub', '--max-wait-time-secs', '1',
'/clitest/topic/pub_times', 'std_msgs/String', 'data: hello']),
additional_env={
'PYTHONUNBUFFERED': '1'
},
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation=get_rmw_implementation_identifier()
)
) as command:
assert command.wait_for_shutdown(timeout=5)
assert launch_testing.tools.expect_output(
expected_lines=[
'--max-wait-time-secs option is only effective with'
' --wait-matching-subscriptions, --once or --times'
],
text=command.output,
strict=True)

@launch_testing.markers.retry_on_failure(times=5)
def test_pub_maxwait_yields(self, launch_service, proc_info, proc_output):
topic = '/clitest/topic/pub/max_wait_timeout'
command_action = ExecuteProcess(
cmd=(['ros2', 'topic', 'pub', '-t', '1', '--max-wait-time-secs', '20', topic,
'std_msgs/String', 'data: hello']),
additional_env={
'PYTHONUNBUFFERED': '1'
},
output='screen'
)

future = rclpy.task.Future()

def message_callback(msg):
"""If we receive one message, the test has succeeded."""
future.set_result(True)

self.node.create_subscription(String, topic, message_callback, 1)
with launch_testing.tools.launch_process(
launch_service, command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation=get_rmw_implementation_identifier()
)
) as command:
self.executor.spin_until_future_complete(future, timeout_sec=10)
assert future.done()
assert command.wait_for_shutdown(timeout=20)

@launch_testing.markers.retry_on_failure(times=5)
def test_pub_times(self, launch_service, proc_info, proc_output):
command_action = ExecuteProcess(
Expand Down Expand Up @@ -453,4 +534,4 @@ def test_echo_timeout(self, launch_service, proc_info, proc_output):
) as command:
# wait for command to shutdown on its own
assert command.wait_for_shutdown(timeout=5)
assert command.output == ""
assert command.output == ''