Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a --max-wait-time option to ros2 topic pub #800

Merged
merged 13 commits into from
Feb 24, 2023
15 changes: 13 additions & 2 deletions ros2topic/ros2topic/verb/pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ def add_arguments(self, parser, cli_name):
help=(
'Wait until finding the specified number of matching subscriptions. '
'Defaults to 1 when using "-1"/"--once"/"--times", otherwise defaults to 0.'))
parser.add_argument(
'--max-wait-time', type=positive_float, default=None,
help=(
'This sets the maximum wait time if --wait-until-matching-subscriptions is set'))
arjo129 marked this conversation as resolved.
Show resolved Hide resolved
parser.add_argument(
'--keep-alive', metavar='N', type=positive_float, default=0.1,
help='Keep publishing node alive for N seconds after the last msg '
Expand Down Expand Up @@ -119,6 +123,7 @@ def main(args):
times,
args.wait_matching_subscriptions
if args.wait_matching_subscriptions is not None else int(times != 0),
args.max_wait_time,
arjo129 marked this conversation as resolved.
Show resolved Hide resolved
qos_profile,
args.keep_alive)

Expand All @@ -132,6 +137,7 @@ def publisher(
print_nth: int,
times: int,
wait_matching_subscriptions: int,
max_wait_time: float | None,
qos_profile: QoSProfile,
keep_alive: float,
) -> Optional[str]:
Expand All @@ -147,14 +153,19 @@ def publisher(
pub = node.create_publisher(msg_module, topic_name, qos_profile)

times_since_last_log = 0
total_wait_time = 0
while pub.get_subscription_count() < wait_matching_subscriptions:
# Print a message reporting we're waiting each 1s, check condition each 100ms.
if not times_since_last_log:
print(
f'Waiting for at least {wait_matching_subscriptions} matching subscription(s)...')
if max_wait_time is not None and max_wait_time <= total_wait_time:
return 'Timed out waiting for subscribers'
arjo129 marked this conversation as resolved.
Show resolved Hide resolved
times_since_last_log = (times_since_last_log + 1) % 10
time.sleep(0.1)

WAIT_TIME = 0.1
arjo129 marked this conversation as resolved.
Show resolved Hide resolved
time.sleep(WAIT_TIME)
total_wait_time += WAIT_TIME

msg = msg_module()
try:
timestamp_fields = set_message_fields(
Expand Down
26 changes: 26 additions & 0 deletions ros2topic/test/test_echo_pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,32 @@ def message_callback(msg):
finally:
# Cleanup
self.node.destroy_subscription(subscription)

@launch_testing.markers.retry_on_failure(times=5)
def test_pub_basic(self, launch_service, proc_info, proc_output):
command_action = ExecuteProcess(
cmd=(['ros2', 'topic', 'pub', '-t', '5', '--max-wait-time', '1', '/clitest/topic/pub_times',
'std_msgs/String', 'data: hello']),
additional_env={
'PYTHONUNBUFFERED': '1'
},
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation=get_rmw_implementation_identifier()
)
) as command:
assert command.wait_for_shutdown(timeout=2)
arjo129 marked this conversation as resolved.
Show resolved Hide resolved
assert launch_testing.tools.expect_output(
expected_lines=[
'Waiting for at least 1 matching subscription(s)...',
'Waiting for at least 1 matching subscription(s)...'
'Timed out waiting for subscribers'
],
text=command.output,
strict=True)

@launch_testing.markers.retry_on_failure(times=5)
def test_pub_times(self, launch_service, proc_info, proc_output):
Expand Down