Skip to content

Commit

Permalink
Mitigate discovery race condition between clients and services (#132)
Browse files Browse the repository at this point in the history
* Mitigate discovery race condition between clients and services.

Signed-off-by: Andrea Sorbini <asorbini@rti.com>
  • Loading branch information
asorbini committed Apr 3, 2024
1 parent b8c410c commit 7c95abb
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 46 deletions.
9 changes: 9 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
#include "rmw/rmw.h"
#include "rosidl_typesupport_cpp/message_type_support.hpp"

#ifndef DDS_GUID_INITIALIZER
#define DDS_GUID_INITIALIZER DDS_GUID_DEFAULT
#endif /* DDS_GUID_INITIALIZER */

class RMW_Connext_MessageTypeSupport;
class RMW_Connext_Publisher;
class RMW_Connext_Subscriber;
Expand Down Expand Up @@ -292,4 +296,9 @@ rmw_connextdds_get_cft_filter_expression(
rcutils_allocator_t * const allocator,
rmw_subscription_content_filter_options_t * const options);

rmw_ret_t
rmw_connextdds_guid_to_instance_handle(
const struct DDS_GUID_t * const guid,
DDS_InstanceHandle_t * const instanceHandle);

#endif // RMW_CONNEXTDDS__DDS_API_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,13 @@
#define RMW_CONNEXT_LIMIT_KEEP_ALL_SAMPLES 1000
#endif /* RMW_CONNEXT_LIMIT_SAMPLES_MAX */

#ifndef RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_DEFAULT
#define RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_DEFAULT 100000 /* 100ms */
#endif /* RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_DEFAULT */

#ifndef RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_INFINITE
#define RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_INFINITE (31536000ull * 1000000ull) /* 1 year */
#endif /* RMW_CONNEXT_LIMIT_DEFAULT_BLOCKING_TIME_INFINITE */


#endif // RMW_CONNEXTDDS__RESOURCE_LIMITS_HPP_
107 changes: 102 additions & 5 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <chrono>

#include "rmw_connextdds/context.hpp"
#include "rmw_connextdds/type_support.hpp"
Expand Down Expand Up @@ -70,6 +71,28 @@ bool rmw_connextdds_find_string_in_list(
DDS_Duration_t rmw_connextdds_duration_from_ros_time(
const rmw_time_t * const ros_time);

class RMW_Connext_OrderedGid
{
public:
explicit RMW_Connext_OrderedGid(const rmw_gid_t & value)
: value(value)
{
}

bool operator<(const RMW_Connext_OrderedGid & other) const
{
return memcmp(value.data, other.value.data, RMW_GID_STORAGE_SIZE) < 0;
}

bool operator==(const RMW_Connext_OrderedGid & other) const
{
return memcmp(value.data, other.value.data, RMW_GID_STORAGE_SIZE) == 0;
}

private:
rmw_gid_t value;
};

/******************************************************************************
* WaitSet wrapper
******************************************************************************/
Expand Down Expand Up @@ -157,7 +180,7 @@ class RMW_Connext_Publisher
RMW_Connext_WriteParams * const params);

rmw_ret_t
enable() const
enable()
{
if (DDS_RETCODE_OK !=
DDS_Entity_enable(
Expand All @@ -183,6 +206,8 @@ class RMW_Connext_Publisher
return RMW_RET_ERROR;
}

max_blocking_time = load_max_blocking_time();

return RMW_RET_OK;
}

Expand Down Expand Up @@ -215,6 +240,12 @@ class RMW_Connext_Publisher
DDS_SampleIdentity_t * const sample_identity,
DDS_SampleIdentity_t * const related_sample_identity);

rmw_ret_t
wait_for_subscription(
rmw_gid_t & reader_gid,
bool & unmatched,
rmw_gid_t & related_writer_gid);

DDS_Topic * dds_topic() const
{
return DDS_DataWriter_get_topic(this->dds_writer);
Expand All @@ -232,19 +263,82 @@ class RMW_Connext_Publisher
return DDS_Publisher_get_participant(pub);
}

void
push_related_endpoints(const rmw_gid_t & endpoint, const rmw_gid_t & related)
{
std::lock_guard<std::mutex> lock(matched_mutex);
known_endpoints.emplace(RMW_Connext_OrderedGid(endpoint), related);
known_endpoints.emplace(RMW_Connext_OrderedGid(related), endpoint);
}

void
on_publication_matched(
const DDS_PublicationMatchedStatus * const status)
{
std::lock_guard<std::mutex> lock(matched_mutex);
DDS_ReturnCode_t dds_rc =
DDS_DataWriter_get_matched_subscriptions(writer(), &matched_subscriptions);
if (DDS_RETCODE_OK != dds_rc) {
RMW_CONNEXT_LOG_ERROR_A_SET("failed to list matched subscriptions: dds_rc=%d", dds_rc)
}
if (status->current_count_change < 0) {
rmw_gid_t unmatched_gid;
rmw_connextdds_ih_to_gid(status->last_subscription_handle, unmatched_gid);
pop_related_endpoints(unmatched_gid);
}
matched_cv.notify_all();
}

void
on_related_subscription_matched(
RMW_Connext_Subscriber * const sub,
const DDS_SubscriptionMatchedStatus * const status)
{
UNUSED_ARG(sub);
std::lock_guard<std::mutex> lock(matched_mutex);
if (status->current_count_change < 0) {
rmw_gid_t unmatched_gid;
rmw_connextdds_ih_to_gid(status->last_publication_handle, unmatched_gid);
pop_related_endpoints(unmatched_gid);
}
matched_cv.notify_all();
}

rmw_context_impl_t * const ctx;

~RMW_Connext_Publisher();

private:
rmw_context_impl_t * ctx;
DDS_DataWriter * dds_writer;
RMW_Connext_MessageTypeSupport * type_support;
const bool created_topic;
rmw_gid_t ros_gid;
RMW_Connext_PublisherStatusCondition status_condition;
std::mutex matched_mutex;
std::condition_variable matched_cv;
std::chrono::microseconds max_blocking_time;
std::map<RMW_Connext_OrderedGid, rmw_gid_t> known_endpoints;
DDS_InstanceHandleSeq matched_subscriptions;

RMW_Connext_Publisher(
rmw_context_impl_t * const ctx,
DDS_DataWriter * const dds_writer,
RMW_Connext_MessageTypeSupport * const type_support,
const bool created_topic);

void
pop_related_endpoints(const rmw_gid_t & endpoint)
{
auto endpoint_entry = known_endpoints.find(RMW_Connext_OrderedGid(endpoint));
if (endpoint_entry == known_endpoints.end()) {
return;
}
known_endpoints.erase(RMW_Connext_OrderedGid(endpoint));
known_endpoints.erase(RMW_Connext_OrderedGid(endpoint_entry->second));
}

std::chrono::microseconds
load_max_blocking_time() const;
};

rmw_publisher_t *
Expand Down Expand Up @@ -286,7 +380,8 @@ class RMW_Connext_Subscriber
const bool intro_members_cpp = false,
std::string * const type_name = nullptr,
const char * const cft_name = nullptr,
const char * const cft_filter = nullptr);
const char * const cft_filter = nullptr,
RMW_Connext_Publisher * const related_pub = nullptr);

rmw_ret_t
finalize();
Expand Down Expand Up @@ -521,7 +616,7 @@ class RMW_Connext_Subscriber
const bool ignore_local;

private:
rmw_context_impl_t * ctx;
rmw_context_impl_t * const ctx;
DDS_DataReader * dds_reader;
DDS_Topic * dds_topic;
DDS_TopicDescription * dds_topic_cft;
Expand All @@ -535,6 +630,7 @@ class RMW_Connext_Subscriber
size_t loan_len;
size_t loan_next;
std::mutex loan_mutex;
RMW_Connext_Publisher * const related_pub;

RMW_Connext_Subscriber(
rmw_context_impl_t * const ctx,
Expand All @@ -545,7 +641,8 @@ class RMW_Connext_Subscriber
const bool created_topic,
DDS_TopicDescription * const dds_topic_cft,
const char * const cft_expression,
const bool internal);
const bool internal,
RMW_Connext_Publisher * const related_pub);

friend class RMW_Connext_SubscriberStatusCondition;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,9 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
DDS_DataReader * reader);

rmw_ret_t
install(RMW_Connext_Subscriber * const sub);
install(
RMW_Connext_Subscriber * const sub,
RMW_Connext_Publisher * const related_pub = nullptr);

void
on_requested_deadline_missed(
Expand Down Expand Up @@ -909,7 +911,8 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
DDS_SampleLostStatus status_sample_lost_last;
DDS_SubscriptionMatchedStatus status_matched_last;

RMW_Connext_Subscriber * sub;
RMW_Connext_Subscriber * sub{nullptr};
RMW_Connext_Publisher * related_pub{nullptr};

std::mutex new_data_event_mutex_;
rmw_event_callback_t new_data_event_cb_{nullptr};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct RMW_Connext_RequestReplyMessage
{
bool request;
rmw_gid_t gid;
rmw_gid_t writer_gid;
int64_t sn;
void * payload;
};
Expand Down
Loading

0 comments on commit 7c95abb

Please sign in to comment.