Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Fast-DDS Waitsets instead of listeners #619

Merged
merged 34 commits into from
Aug 24, 2022

Conversation

richiware
Copy link
Contributor

@richiware richiware commented Jun 27, 2022

Since version 2.4.0, Fast-DDS supports Waitsets. This PR brings new Fast-DDS Waitsets into rmw_fastrtps. The main motivation is to fix bugs related to the current "waitset" mechanism implemented on top of Fast-DDS listeners, like #613.

List of functionalities to be supported:

  • Publication/subscription
  • Request/Response
  • Events
  • Event callback

@audrow audrow changed the base branch from master to rolling June 28, 2022 14:22
@richiware richiware force-pushed the feature/fastdds-waitsets branch 3 times, most recently from d49bf2e to 8b23ff6 Compare July 1, 2022 06:39
@richiware richiware marked this pull request as ready for review July 1, 2022 06:40
@richiware
Copy link
Contributor Author

@clalancette Could you run a CI job against this PR, please?

@@ -191,6 +116,11 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener
info_->response_subscriber_matched_count_.store(publishers_.size());
}

size_t get_unread_responses()
{
return info_->response_reader_->get_unread_count();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recommend this. This change could take into account same samples repeatly if they were not read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richiware Please see this discussion from which I understand the correct implementation is to return the total number of unread changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to have some input from @alsora and/or @mauropasse on this, BTW

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MiguelCompany @richiware our previous implementation relied simply on a counter to check unread events, instead of asking the DDS about unread messages. For example:

void on_data_available(eprosima::fastdds::dds::DataReader * reader)
{
  if (on_new_message_cb_) {
    on_new_message_cb_(user_data_, 1);
  } else {
    new_data_unread_count_++;
  }
}

// Then when setting the user callback:
void set_on_new_message_callback(const void * user_data, rmw_event_callback_t callback)
{
  if (new_data_unread_count_) {
    callback(user_data, new_data_unread_count_);
    new_data_unread_count_ = 0;
  }
}

The reason for having a different new_data_unread_count_ than the unread samples on the DDS queue, is that this function (set_on_new_message_callback) is sometimes called very fast twice in a row. With our approach:
First time: callback(user_data, new_data_unread_count_);
Second time: the callback is not called, since new_data_unread_count_ = 0.

With your approach here, calling info_->response_reader_->get_unread_count(); could potentially (if set_on_new_message_callback is called many times fast) lead to problems.
In our case, every call of the callback(...) pushes events into a queue, which are then processed (messages are read). With your approach here, the queue could potentially have dulicated/triplicated/.. events per unread sample, if we didn't have time to process them between calls.
I'd stay with the approach taken in this PR: #625

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mauropasse The thing is that there is no guarantee that on_data_available is called once per sample.

Another thing to consider is the case where you have KEEP_LAST with a depth of 1. As explained on this comment, there is always a possibility that some samples that were notified to the callback will not actually be read by the application layer because of QoS settings (mainly KEEP_LAST history). The samples could be counted but then be pushed out of the cache by newer samples before they can be take()n out of it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the KEEP_LAST, we took that into account and we limited the amount of new_data_unread_count_ to be as big as the QoS history depth.
In case of multiple events but a single sample on DDS queue (due history = 1), we have means to mitigate it with a "bounded queue" on the executor, which checks the QoS depth before pushing new events into the queue.
In the normal queue, not performing this check, we just have "no op" messages taken (no messages taken).
With regards to the on_data_available not being called once per sample, this issue would affect also the default SingleThreadedExecutor (waitset not awaken since on_data_available is not called), and I don't have an answer for that issue right now. It has never happened before to me and I didn't know there was no guarantee for on_data_available called once per sample.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using

    return info_->response_reader_->get_unread_count(true);

? (i.e. mark_as_read = true)

and keeping a count here?
does marking samples as read affect how take() work?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richiware @MiguelCompany was this issue resolved?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it was. We did exactly what you mention, see here

RCPPUTILS_TSA_GUARDED_BY(
discovery_m_);

std::mutex discovery_m_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this up, before the declaration of subscriptions_.
I also think we should make this mutable, in case we need to take it inside a const method in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in ec7a576


std::set<eprosima::fastrtps::rtps::GUID_t> subscriptions_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
RCPPUTILS_TSA_GUARDED_BY(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep this on a single line. Applies to all the RCPPUTILS_TSA_GUARDED_BY( below

Copy link
Contributor Author

@richiware richiware Aug 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in ec7a576


std::atomic_bool deadline_changes_;
bool deadline_changes_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either keep this as an atomic_bool, or mark it with RCPPUTILS_TSA_GUARDED_BY.
Applies to all booleans below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 850eeeb

Comment on lines 180 to 183
subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(
mutex_);
clients_endpoints_map_t clients_endpoints_ RCPPUTILS_TSA_GUARDED_BY(
mutex_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(
mutex_);
clients_endpoints_map_t clients_endpoints_ RCPPUTILS_TSA_GUARDED_BY(
mutex_);
subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
clients_endpoints_map_t clients_endpoints_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in ec7a576

private:
CustomSubscriberInfo * subscriber_info_ = nullptr;

bool deadline_changes_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either keep booleans as atomic_bool or add RCPPUTILS_TSA_GUARDED_BY

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 850eeeb


if (subscriptions) {
for (size_t i = 0; i < subscriptions->subscriber_count; ++i) {
void * data = subscriptions->subscribers[i];
auto custom_subscriber_info = static_cast<CustomSubscriberInfo *>(data);
custom_subscriber_info->listener_->attachCondition(conditionMutex, conditionVariable);
no_has_wait |= (0 < custom_subscriber_info->data_reader_->get_unread_count());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use RETCODE_OK == data_reader_->get_first_untaken_info(), which means the reader has something to take.

Same for the rest of get_unread_count below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 5ff6c5e

rmw_fastrtps_shared_cpp/src/rmw_wait.cpp Show resolved Hide resolved
if (!changed_statuses.is_active(eprosima::fastdds::dds::StatusMask::data_available())) {
subscriptions->subscribers[i] = 0;
}
} else if (0 == custom_subscriber_info->data_reader_->get_unread_count()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use get_first_untaken_info instead of get_unread_count

Applies to services and clients as well.

Copy link
Contributor Author

@richiware richiware Aug 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 5ff6c5e

rmw_fastrtps_shared_cpp/src/rmw_wait.cpp Show resolved Hide resolved
return c == condition;
}))
{
if (!condition->get_trigger_value()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check if the condition has been triggered while waiting. Just this check is enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 5ff6c5e

@MiguelCompany
Copy link
Contributor

@jsantiago-eProsima Please fix linters

eprosima::fastdds::dds::RequestedIncompatibleQosStatus incompatible_qos_status_
RCPPUTILS_TSA_GUARDED_BY(
discovery_m_);
RCPPUTILS_TSA_GUARDED_BY(discovery_m_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
RCPPUTILS_TSA_GUARDED_BY(discovery_m_);
RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 5ff6c5e

Comment on lines 129 to 132
RCUTILS_SAFE_FWRITE_TO_STDERR( \
RCUTILS_STRINGIFY(__FILE__) ":" RCUTILS_STRINGIFY(__function__) ":" \
RCUTILS_STRINGIFY(__LINE__) RCUTILS_STRINGIFY("failed to create waitset") \
": ros discovery info listener thread will shutdown ...\n");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have a TERMINATE_THREAD_WITH_RETURN, or remove the break; from TERMINATE_THREAD so it can be used inside and outside the loop. In the latter case I would rename it to something like LOG_THREAD_FATAL_ERROR

Copy link
Contributor

@MiguelCompany MiguelCompany left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good to me.

Would be great if a maintainer takes another look and launches a CI

@EduPonz
Copy link
Contributor

EduPonz commented Jul 12, 2022

Friendly ping for a CI run @clalancette @nuclearsandwich, I think this is highly relevant

CC: @SteveMacenski

@alsora
Copy link
Contributor

alsora commented Jul 13, 2022

CI:

  • Linux Build Status
  • Linux-aarch64 Build Status
  • Windows Build Status

@alsora
Copy link
Contributor

alsora commented Jul 14, 2022

New CI

  • Linux Build Status
  • Linux-aarch64 Build Status
  • Windows Build Status

@MiguelCompany
Copy link
Contributor

@alsora According to our local testing, this should now be good to go.

@alsora
Copy link
Contributor

alsora commented Jul 15, 2022

  • Linux Build Status
  • Linux-aarch64 Build Status
  • Windows Build Status

Copy link
Contributor

@MiguelCompany MiguelCompany left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but DCO needs to be fixed

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
@richiware
Copy link
Contributor Author

@fujitatomoya @ivanpauno I think we've addressed all comments

Copy link
Member

@ivanpauno ivanpauno left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with green CI

@fujitatomoya
Copy link
Collaborator

@richiware thanks!

CI:

  • Linux Build Status
  • Linux-aarch64 Build Status
  • Windows Build Status

@fujitatomoya
Copy link
Collaborator

fujitatomoya commented Aug 17, 2022

CI:

  • Windows Build Status

@MiguelBarro
Copy link
Contributor

MiguelBarro commented Aug 24, 2022

The failure on

launch_testing_examples.launch_testing_examples.check_multiple_nodes_launch_test.launch_testing_examples.check_multiple_nodes_launch_test

is unrelated to this pull request (it's flaky). In order to test it we built ros2 rolling from sources in two docker
windows images (debug & release configuration). Those images are available here: debug and release.

Basically, the test launches three talker nodes and waits till all are discovered (oddly it checks discovery twice).
The test fails seldomly but when it does is due to a failure in node initialization (can be checked in the trace).
Because the number of discovered nodes doesn't match the expected ones (3) it fails.

The test can be run directly from the docker images doing:

docker run --rm -t miguelbarroeprosima/rolling_release launch_test --package-name test_launch_testing C:\ros2-windows\src\ros2\examples\launch_testing\launch_testing_examples\launch_testing_examples\check_multiple_nodes_launch_test.py

I cannot explain why it appears in two consecutive ci runs.
Maybe we should launch again the windows ci to see if it fails again.

@fujitatomoya
Copy link
Collaborator

CI failure are unrelated (see #619 (comment)), i will go ahead to merge this.

@fujitatomoya
Copy link
Collaborator

@richiware @MiguelCompany should we backport this to humble?

@MiguelCompany
Copy link
Contributor

@richiware @MiguelCompany should we backport this to humble?

Yes. We should. Let's try:

@Mergifyio backport humble

MiguelCompany pushed a commit to eProsima/rmw_fastrtps that referenced this pull request Aug 30, 2022
Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
@MiguelCompany MiguelCompany deleted the feature/fastdds-waitsets branch August 30, 2022 08:44
@MiguelCompany
Copy link
Contributor

@fujitatomoya I just created the backport to humble on #633

It is basically a cherry-pick of 1dcfe80 into humble

jefferyyjhsu pushed a commit to jefferyyjhsu/rmw_fastrtps that referenced this pull request Aug 30, 2022
Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
jefferyyjhsu pushed a commit to jefferyyjhsu/rmw_fastrtps that referenced this pull request Aug 30, 2022
Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
jefferyyjhsu pushed a commit to jefferyyjhsu/rmw_fastrtps that referenced this pull request Sep 1, 2022
Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
jefferyyjhsu pushed a commit to jefferyyjhsu/rmw_fastrtps that referenced this pull request Sep 1, 2022
Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
jefferyyjhsu pushed a commit to jefferyyjhsu/rmw_fastrtps that referenced this pull request Sep 2, 2022
Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
audrow pushed a commit that referenced this pull request Sep 7, 2022
Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
Co-authored-by: Ricardo González <ricardo@richiware.dev>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

9 participants