Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a --max-wait-time option to ros2 topic pub #800

Merged
merged 13 commits into from Feb 24, 2023
19 changes: 17 additions & 2 deletions ros2topic/ros2topic/verb/pub.py
Expand Up @@ -33,6 +33,7 @@
import yaml

MsgType = TypeVar('MsgType')
DEFAULT_WAIT_TIME = 0.1


def nonnegative_int(inval):
Expand Down Expand Up @@ -82,6 +83,11 @@ def add_arguments(self, parser, cli_name):
help=(
'Wait until finding the specified number of matching subscriptions. '
'Defaults to 1 when using "-1"/"--once"/"--times", otherwise defaults to 0.'))
parser.add_argument(
'--max-wait-time-secs', type=positive_float, default=None,
help=(
'This sets the maximum wait time in seconds if --wait-until-matching-subscriptions is set.'
'By default, this flag is not set meaning the subscriber will wait endlessly.'))
parser.add_argument(
'--keep-alive', metavar='N', type=positive_float, default=0.1,
help='Keep publishing node alive for N seconds after the last msg '
Expand All @@ -108,6 +114,9 @@ def main(args):
if args.once:
times = 1

if args.wait_matching_subscriptions is None and args.max_wait_time_secs is not None:
return '--max-wait-time-secs option is only effective with --wait-matching-subscriptions'

with DirectNode(args, node_name=args.node_name) as node:
return publisher(
node.node,
Expand All @@ -119,6 +128,7 @@ def main(args):
times,
args.wait_matching_subscriptions
if args.wait_matching_subscriptions is not None else int(times != 0),
args.max_wait_time_secs,
qos_profile,
args.keep_alive)

Expand All @@ -132,6 +142,7 @@ def publisher(
print_nth: int,
times: int,
wait_matching_subscriptions: int,
max_wait_time: float | None,
qos_profile: QoSProfile,
keep_alive: float,
) -> Optional[str]:
Expand All @@ -147,14 +158,18 @@ def publisher(
pub = node.create_publisher(msg_module, topic_name, qos_profile)

times_since_last_log = 0
total_wait_time = 0
while pub.get_subscription_count() < wait_matching_subscriptions:
# Print a message reporting we're waiting each 1s, check condition each 100ms.
if not times_since_last_log:
print(
f'Waiting for at least {wait_matching_subscriptions} matching subscription(s)...')
if max_wait_time is not None and max_wait_time <= total_wait_time:
return f'Timed out waiting for subscribers: Expected {wait_matching_subscriptions} subcribers but only got {pub.get_subscription_count()} subscribers'
times_since_last_log = (times_since_last_log + 1) % 10
time.sleep(0.1)

time.sleep(DEFAULT_WAIT_TIME)
total_wait_time += DEFAULT_WAIT_TIME

msg = msg_module()
try:
timestamp_fields = set_message_fields(
Expand Down
26 changes: 26 additions & 0 deletions ros2topic/test/test_echo_pub.py
Expand Up @@ -176,6 +176,32 @@ def message_callback(msg):
finally:
# Cleanup
self.node.destroy_subscription(subscription)

@launch_testing.markers.retry_on_failure(times=5)
def test_pub_basic(self, launch_service, proc_info, proc_output):
command_action = ExecuteProcess(
cmd=(['ros2', 'topic', 'pub', '-t', '5', '--max-wait-time-secs', '1', '/clitest/topic/pub_times',
'std_msgs/String', 'data: hello']),
additional_env={
'PYTHONUNBUFFERED': '1'
},
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation=get_rmw_implementation_identifier()
)
) as command:
assert command.wait_for_shutdown(timeout=5)
assert launch_testing.tools.expect_output(
expected_lines=[
'Waiting for at least 1 matching subscription(s)...',
'Waiting for at least 1 matching subscription(s)...'
'Timed out waiting for subscribers: Expected 1 subcribers but only got 0 subscribers'
],
text=command.output,
strict=True)

@launch_testing.markers.retry_on_failure(times=5)
def test_pub_times(self, launch_service, proc_info, proc_output):
Expand Down