Skip to content

Commit

Permalink
Implement matched event (#645)
Browse files Browse the repository at this point in the history
Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 committed Mar 22, 2023
1 parent 926b3a1 commit f13df4c
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ class RMWPublisherEvent final : public EventListenerInterface
eprosima::fastdds::dds::QosPolicyId_t last_policy_id,
uint32_t total_count, uint32_t total_count_change);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void update_matched(
int32_t total_count,
int32_t total_count_change,
int32_t current_count,
int32_t current_count_change);

private:
CustomPublisherInfo * publisher_info_ = nullptr;

Expand All @@ -178,6 +185,12 @@ class RMWPublisherEvent final : public EventListenerInterface
eprosima::fastdds::dds::OfferedIncompatibleQosStatus incompatible_qos_status_
RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_);

bool matched_changes_
RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_);

eprosima::fastdds::dds::PublicationMatchedStatus matched_status_
RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_);

void trigger_event(rmw_event_type_t event_type);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ class RMWSubscriptionEvent final : public EventListenerInterface
eprosima::fastdds::dds::QosPolicyId_t last_policy_id,
uint32_t total_count, uint32_t total_count_change);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void update_matched(
int32_t total_count,
int32_t total_count_change,
int32_t current_count,
int32_t current_count_change);

private:
CustomSubscriberInfo * subscriber_info_ = nullptr;

Expand Down Expand Up @@ -223,6 +230,12 @@ class RMWSubscriptionEvent final : public EventListenerInterface
eprosima::fastdds::dds::RequestedIncompatibleQosStatus incompatible_qos_status_
RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_);

bool matched_changes_
RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_);

eprosima::fastdds::dds::SubscriptionMatchedStatus matched_status_
RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_);

std::set<eprosima::fastrtps::rtps::GUID_t> publishers_ RCPPUTILS_TSA_GUARDED_BY(
publishers_mutex_);

Expand All @@ -233,6 +246,8 @@ class RMWSubscriptionEvent final : public EventListenerInterface
std::mutex on_new_message_m_;

mutable std::mutex publishers_mutex_;

void trigger_event(rmw_event_type_t event_type);
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_
63 changes: 62 additions & 1 deletion rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,15 @@ void CustomDataWriterListener::on_publication_matched(
} else if (status.current_count_change == -1) {
publisher_event_->untrack_unique_subscription(
eprosima::fastrtps::rtps::iHandle2GUID(status.last_subscription_handle));
} else {
return;
}

publisher_event_->update_matched(
status.total_count,
status.total_count_change,
status.current_count,
status.current_count_change);
}


Expand Down Expand Up @@ -82,7 +90,8 @@ RMWPublisherEvent::RMWPublisherEvent(CustomPublisherInfo * info)
: publisher_info_(info),
deadline_changed_(false),
liveliness_changed_(false),
incompatible_qos_changed_(false)
incompatible_qos_changed_(false),
matched_changes_(false)
{
}

Expand Down Expand Up @@ -158,6 +167,30 @@ bool RMWPublisherEvent::take_event(
inconsistent_topic_status_.total_count_change = 0;
}
break;
case RMW_EVENT_PUBLICATION_MATCHED:
{
auto rmw_data = static_cast<rmw_matched_status_t *>(event_info);

if (matched_changes_) {
rmw_data->total_count = static_cast<size_t>(matched_status_.total_count);
rmw_data->total_count_change = static_cast<size_t>(matched_status_.total_count_change);
rmw_data->current_count = static_cast<size_t>(matched_status_.current_count);
rmw_data->current_count_change = matched_status_.current_count_change;
matched_changes_ = false;
} else {
eprosima::fastdds::dds::PublicationMatchedStatus matched_status;
publisher_info_->data_writer_->get_publication_matched_status(matched_status);

rmw_data->total_count = static_cast<size_t>(matched_status.total_count);
rmw_data->total_count_change = static_cast<size_t>(matched_status.total_count_change);
rmw_data->current_count = static_cast<size_t>(matched_status.current_count);
rmw_data->current_count_change = matched_status.current_count_change;
}

matched_status_.total_count_change = 0;
matched_status_.current_count_change = 0;
}
break;
default:
return false;
}
Expand Down Expand Up @@ -212,6 +245,16 @@ void RMWPublisherEvent::set_on_new_event_callback(
inconsistent_topic_status_.total_count_change = 0;
}
break;
case RMW_EVENT_PUBLICATION_MATCHED:
{
if (matched_status_.total_count_change > 0) {
callback(user_data, matched_status_.total_count_change);
publisher_info_->data_writer_->get_publication_matched_status(matched_status_);
matched_status_.total_count_change = 0;
matched_status_.current_count_change = 0;
}
}
break;
default:
break;
}
Expand Down Expand Up @@ -307,6 +350,24 @@ void RMWPublisherEvent::update_inconsistent_topic(uint32_t total_count, uint32_t
trigger_event(RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE);
}

void RMWPublisherEvent::update_matched(
int32_t total_count,
int32_t total_count_change,
int32_t current_count,
int32_t current_count_change)
{
std::lock_guard<std::mutex> lock(on_new_event_m_);

matched_status_.total_count = total_count;
matched_status_.total_count_change += total_count_change;
matched_status_.current_count = current_count;
matched_status_.current_count_change += current_count_change;

matched_changes_ = true;

trigger_event(RMW_EVENT_PUBLICATION_MATCHED);
}

void RMWPublisherEvent::trigger_event(rmw_event_type_t event_type)
{
if (on_new_event_cb_[event_type]) {
Expand Down
99 changes: 72 additions & 27 deletions rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,15 @@ CustomDataReaderListener::on_subscription_matched(
} else if (info.current_count_change == -1) {
subscription_event_->untrack_unique_publisher(
eprosima::fastrtps::rtps::iHandle2GUID(info.last_publication_handle));
} else {
return;
}

subscription_event_->update_matched(
info.total_count,
info.total_count_change,
info.current_count,
info.current_count_change);
}

void
Expand Down Expand Up @@ -102,7 +110,8 @@ RMWSubscriptionEvent::RMWSubscriptionEvent(CustomSubscriberInfo * info)
deadline_changed_(false),
liveliness_changed_(false),
sample_lost_changed_(false),
incompatible_qos_changed_(false)
incompatible_qos_changed_(false),
matched_changes_(false)
{
}

Expand Down Expand Up @@ -193,6 +202,29 @@ bool RMWSubscriptionEvent::take_event(
inconsistent_topic_status_.total_count_change = 0;
}
break;
case RMW_EVENT_SUBSCRIPTION_MATCHED:
{
auto rmw_data = static_cast<rmw_matched_status_t *>(event_info);

if (matched_changes_) {
rmw_data->total_count = static_cast<size_t>(matched_status_.total_count);
rmw_data->total_count_change = static_cast<size_t>(matched_status_.total_count_change);
rmw_data->current_count = static_cast<size_t>(matched_status_.current_count);
rmw_data->current_count_change = matched_status_.current_count_change;
matched_changes_ = false;
} else {
eprosima::fastdds::dds::SubscriptionMatchedStatus matched_status;
subscriber_info_->data_reader_->get_subscription_matched_status(matched_status);

rmw_data->total_count = static_cast<size_t>(matched_status.total_count);
rmw_data->total_count_change = static_cast<size_t>(matched_status.total_count_change);
rmw_data->current_count = static_cast<size_t>(matched_status.current_count);
rmw_data->current_count_change = matched_status.current_count_change;
}
matched_status_.total_count_change = 0;
matched_status_.current_count_change = 0;
}
break;
default:
return false;
}
Expand Down Expand Up @@ -270,6 +302,15 @@ void RMWSubscriptionEvent::set_on_new_event_callback(
}
}
break;
case RMW_EVENT_SUBSCRIPTION_MATCHED:
{
if (matched_status_.total_count_change > 0) {
callback(user_data, matched_status_.total_count_change);
subscriber_info_->data_reader_->get_subscription_matched_status(matched_status_);
matched_status_.total_count_change = 0;
matched_status_.current_count_change = 0;
}
}
default:
break;
}
Expand Down Expand Up @@ -368,12 +409,7 @@ void RMWSubscriptionEvent::update_requested_deadline_missed(

deadline_changed_ = true;

if (on_new_event_cb_[RMW_EVENT_REQUESTED_DEADLINE_MISSED]) {
on_new_event_cb_[RMW_EVENT_REQUESTED_DEADLINE_MISSED](user_data_[
RMW_EVENT_REQUESTED_DEADLINE_MISSED], 1);
}

event_guard[RMW_EVENT_REQUESTED_DEADLINE_MISSED].set_trigger_value(true);
trigger_event(RMW_EVENT_REQUESTED_DEADLINE_MISSED);
}

void RMWSubscriptionEvent::update_liveliness_changed(
Expand All @@ -391,11 +427,7 @@ void RMWSubscriptionEvent::update_liveliness_changed(

liveliness_changed_ = true;

if (on_new_event_cb_[RMW_EVENT_LIVELINESS_CHANGED]) {
on_new_event_cb_[RMW_EVENT_LIVELINESS_CHANGED](user_data_[RMW_EVENT_LIVELINESS_CHANGED], 1);
}

event_guard[RMW_EVENT_LIVELINESS_CHANGED].set_trigger_value(true);
trigger_event(RMW_EVENT_LIVELINESS_CHANGED);
}

void RMWSubscriptionEvent::update_sample_lost(uint32_t total_count, uint32_t total_count_change)
Expand All @@ -409,11 +441,7 @@ void RMWSubscriptionEvent::update_sample_lost(uint32_t total_count, uint32_t tot

sample_lost_changed_ = true;

if (on_new_event_cb_[RMW_EVENT_MESSAGE_LOST]) {
on_new_event_cb_[RMW_EVENT_MESSAGE_LOST](user_data_[RMW_EVENT_MESSAGE_LOST], 1);
}

event_guard[RMW_EVENT_MESSAGE_LOST].set_trigger_value(true);
trigger_event(RMW_EVENT_MESSAGE_LOST);
}

void RMWSubscriptionEvent::update_requested_incompatible_qos(
Expand All @@ -430,12 +458,7 @@ void RMWSubscriptionEvent::update_requested_incompatible_qos(

incompatible_qos_changed_ = true;

if (on_new_event_cb_[RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE]) {
on_new_event_cb_[RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE](user_data_[
RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE], 1);
}

event_guard[RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE].set_trigger_value(true);
trigger_event(RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE);
}

void RMWSubscriptionEvent::update_inconsistent_topic(
Expand All @@ -450,10 +473,32 @@ void RMWSubscriptionEvent::update_inconsistent_topic(

inconsistent_topic_changed_ = true;

if (on_new_event_cb_[RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE]) {
on_new_event_cb_[RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE](
user_data_[RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE], 1);
trigger_event(RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE);
}

void RMWSubscriptionEvent::update_matched(
int32_t total_count,
int32_t total_count_change,
int32_t current_count,
int32_t current_count_change)
{
std::lock_guard<std::mutex> lock(on_new_event_m_);

matched_status_.total_count = total_count;
matched_status_.total_count_change += total_count_change;
matched_status_.current_count = current_count;
matched_status_.current_count_change += current_count_change;

matched_changes_ = true;

trigger_event(RMW_EVENT_SUBSCRIPTION_MATCHED);
}

void RMWSubscriptionEvent::trigger_event(rmw_event_type_t event_type)
{
if (on_new_event_cb_[event_type]) {
on_new_event_cb_[event_type](user_data_[event_type], 1);
}

event_guard[RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE].set_trigger_value(true);
event_guard[event_type].set_trigger_value(true);
}
8 changes: 8 additions & 0 deletions rmw_fastrtps_shared_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ static const std::unordered_set<rmw_event_type_t> g_rmw_event_type_set{
RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE,
RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE,
RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE,
RMW_EVENT_SUBSCRIPTION_MATCHED,
RMW_EVENT_PUBLICATION_MATCHED
};

namespace rmw_fastrtps_shared_cpp
Expand Down Expand Up @@ -70,6 +72,12 @@ eprosima::fastdds::dds::StatusMask rmw_event_to_dds_statusmask(
case RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE:
ret_statusmask = eprosima::fastdds::dds::StatusMask::inconsistent_topic();
break;
case RMW_EVENT_SUBSCRIPTION_MATCHED:
ret_statusmask = eprosima::fastdds::dds::StatusMask::subscription_matched();
break;
case RMW_EVENT_PUBLICATION_MATCHED:
ret_statusmask = eprosima::fastdds::dds::StatusMask::publication_matched();
break;
default:
break;
}
Expand Down

0 comments on commit f13df4c

Please sign in to comment.