diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp index 71ae6981c..8f5676ad0 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp @@ -61,6 +61,7 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds explicit PubListener(CustomPublisherInfo * info) : deadline_changes_(false), liveliness_changes_(false), + incompatible_qos_changes_(false), conditionMutex_(nullptr), conditionVariable_(nullptr) { @@ -94,6 +95,11 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds eprosima::fastdds::dds::DataWriter * writer, const eprosima::fastdds::dds::LivelinessLostStatus & status) final; + RMW_FASTRTPS_SHARED_CPP_PUBLIC + void + on_offered_incompatible_qos( + eprosima::fastdds::dds::DataWriter *, + const eprosima::fastdds::dds::OfferedIncompatibleQosStatus &) final; // EventListenerInterface implementation RMW_FASTRTPS_SHARED_CPP_PUBLIC @@ -141,6 +147,10 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds eprosima::fastdds::dds::LivelinessLostStatus liveliness_lost_status_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + std::atomic_bool incompatible_qos_changes_; + eprosima::fastdds::dds::OfferedIncompatibleQosStatus incompatible_qos_status_ + RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); }; diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index 85a4ac540..eddc7bac2 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -70,6 +70,8 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds : data_(false), deadline_changes_(false), liveliness_changes_(false), + sample_lost_changes_(false), + incompatible_qos_changes_(false), conditionMutex_(nullptr), conditionVariable_(nullptr) { @@ -112,6 +114,18 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds eprosima::fastdds::dds::DataReader *, const eprosima::fastrtps::LivelinessChangedStatus &) final; + RMW_FASTRTPS_SHARED_CPP_PUBLIC + void + on_sample_lost( + eprosima::fastdds::dds::DataReader *, + const eprosima::fastdds::dds::SampleLostStatus &) final; + + RMW_FASTRTPS_SHARED_CPP_PUBLIC + void + on_requested_incompatible_qos( + eprosima::fastdds::dds::DataReader *, + const eprosima::fastdds::dds::RequestedIncompatibleQosStatus &) final; + // EventListenerInterface implementation RMW_FASTRTPS_SHARED_CPP_PUBLIC bool @@ -176,6 +190,14 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds eprosima::fastdds::dds::LivelinessChangedStatus liveliness_changed_status_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + std::atomic_bool sample_lost_changes_; + eprosima::fastdds::dds::SampleLostStatus sample_lost_status_ + RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + + std::atomic_bool incompatible_qos_changes_; + eprosima::fastdds::dds::RequestedIncompatibleQosStatus incompatible_qos_status_ + RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); diff --git a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp index 840d2e830..1a14fb98a 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp @@ -13,8 +13,11 @@ // limitations under the License. #include "rmw_fastrtps_shared_cpp/custom_publisher_info.hpp" + #include "fastdds/dds/core/status/BaseStatus.hpp" #include "fastdds/dds/core/status/DeadlineMissedStatus.hpp" + +#include "event_helpers.hpp" #include "types/event_types.hpp" EventListenerInterface * @@ -60,6 +63,25 @@ void PubListener::on_liveliness_lost( liveliness_changes_.store(true, std::memory_order_relaxed); } +void PubListener::on_offered_incompatible_qos( + eprosima::fastdds::dds::DataWriter * /* writer */, + const eprosima::fastdds::dds::OfferedIncompatibleQosStatus & status) +{ + std::lock_guard lock(internalMutex_); + + // the change to incompatible_qos_status_ needs to be mutually exclusive with + // rmw_wait() which checks hasEvent() and decides if wait() needs to be called + ConditionalScopedLock clock(conditionMutex_, conditionVariable_); + + // Assign absolute values + incompatible_qos_status_.last_policy_id = status.last_policy_id; + incompatible_qos_status_.total_count = status.total_count; + // Accumulate deltas + incompatible_qos_status_.total_count_change += status.total_count_change; + + incompatible_qos_changes_.store(true, std::memory_order_relaxed); +} + bool PubListener::hasEvent(rmw_event_type_t event_type) const { assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type)); @@ -68,6 +90,8 @@ bool PubListener::hasEvent(rmw_event_type_t event_type) const return liveliness_changes_.load(std::memory_order_relaxed); case RMW_EVENT_OFFERED_DEADLINE_MISSED: return deadline_changes_.load(std::memory_order_relaxed); + case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE: + return incompatible_qos_changes_.load(std::memory_order_relaxed); default: break; } @@ -97,6 +121,18 @@ bool PubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info) deadline_changes_.store(false, std::memory_order_relaxed); } break; + case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE: + { + auto rmw_data = static_cast(event_info); + rmw_data->total_count = incompatible_qos_status_.total_count; + rmw_data->total_count_change = incompatible_qos_status_.total_count_change; + rmw_data->last_policy_kind = + rmw_fastrtps_shared_cpp::internal::dds_qos_policy_to_rmw_qos_policy( + incompatible_qos_status_.last_policy_id); + incompatible_qos_status_.total_count_change = 0; + incompatible_qos_changes_.store(false, std::memory_order_relaxed); + } + break; default: return false; } diff --git a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp index 17da1fa79..1703761b1 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp @@ -17,6 +17,7 @@ #include "fastdds/dds/core/status/DeadlineMissedStatus.hpp" #include "fastdds/dds/core/status/LivelinessChangedStatus.hpp" +#include "event_helpers.hpp" #include "types/event_types.hpp" EventListenerInterface * @@ -64,6 +65,43 @@ void SubListener::on_liveliness_changed( liveliness_changes_.store(true, std::memory_order_relaxed); } +void SubListener::on_sample_lost( + eprosima::fastdds::dds::DataReader * /* reader */, + const eprosima::fastdds::dds::SampleLostStatus & status) +{ + std::lock_guard lock(internalMutex_); + + // the change to sample_lost_status_ needs to be mutually exclusive with + // rmw_wait() which checks hasEvent() and decides if wait() needs to be called + ConditionalScopedLock clock(conditionMutex_, conditionVariable_); + + // Assign absolute values + sample_lost_status_.total_count = status.total_count; + // Accumulate deltas + sample_lost_status_.total_count_change += status.total_count_change; + + sample_lost_changes_.store(true, std::memory_order_relaxed); +} + +void SubListener::on_requested_incompatible_qos( + eprosima::fastdds::dds::DataReader * /* reader */, + const eprosima::fastdds::dds::RequestedIncompatibleQosStatus & status) +{ + std::lock_guard lock(internalMutex_); + + // the change to incompatible_qos_status_ needs to be mutually exclusive with + // rmw_wait() which checks hasEvent() and decides if wait() needs to be called + ConditionalScopedLock clock(conditionMutex_, conditionVariable_); + + // Assign absolute values + incompatible_qos_status_.last_policy_id = status.last_policy_id; + incompatible_qos_status_.total_count = status.total_count; + // Accumulate deltas + incompatible_qos_status_.total_count_change += status.total_count_change; + + incompatible_qos_changes_.store(true, std::memory_order_relaxed); +} + bool SubListener::hasEvent(rmw_event_type_t event_type) const { assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type)); @@ -72,6 +110,10 @@ bool SubListener::hasEvent(rmw_event_type_t event_type) const return liveliness_changes_.load(std::memory_order_relaxed); case RMW_EVENT_REQUESTED_DEADLINE_MISSED: return deadline_changes_.load(std::memory_order_relaxed); + case RMW_EVENT_MESSAGE_LOST: + return sample_lost_changes_.load(std::memory_order_relaxed); + case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE: + return incompatible_qos_changes_.load(std::memory_order_relaxed); default: break; } @@ -104,6 +146,27 @@ bool SubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info) deadline_changes_.store(false, std::memory_order_relaxed); } break; + case RMW_EVENT_MESSAGE_LOST: + { + auto rmw_data = static_cast(event_info); + rmw_data->total_count = sample_lost_status_.total_count; + rmw_data->total_count_change = sample_lost_status_.total_count_change; + sample_lost_status_.total_count_change = 0; + sample_lost_changes_.store(false, std::memory_order_relaxed); + } + break; + case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE: + { + auto rmw_data = static_cast(event_info); + rmw_data->total_count = incompatible_qos_status_.total_count; + rmw_data->total_count_change = incompatible_qos_status_.total_count_change; + rmw_data->last_policy_kind = + rmw_fastrtps_shared_cpp::internal::dds_qos_policy_to_rmw_qos_policy( + incompatible_qos_status_.last_policy_id); + incompatible_qos_status_.total_count_change = 0; + incompatible_qos_changes_.store(false, std::memory_order_relaxed); + } + break; default: return false; } diff --git a/rmw_fastrtps_shared_cpp/src/event_helpers.hpp b/rmw_fastrtps_shared_cpp/src/event_helpers.hpp new file mode 100644 index 000000000..e667f96ff --- /dev/null +++ b/rmw_fastrtps_shared_cpp/src/event_helpers.hpp @@ -0,0 +1,33 @@ +// Copyright 2022 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef EVENT_HELPERS_HPP_ +#define EVENT_HELPERS_HPP_ + +#include "fastdds/dds/core/policy/QosPolicies.hpp" + +#include "rmw/qos_policy_kind.h" + +namespace rmw_fastrtps_shared_cpp +{ +namespace internal +{ + +rmw_qos_policy_kind_t dds_qos_policy_to_rmw_qos_policy( + eprosima::fastdds::dds::QosPolicyId_t policy_id); + +} // namespace internal +} // namespace rmw_fastrtps_shared_cpp + +#endif // EVENT_HELPERS_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp index 4544d3136..6705ef551 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp @@ -17,13 +17,17 @@ #include "rmw/impl/cpp/macros.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" +#include "event_helpers.hpp" #include "types/event_types.hpp" static const std::unordered_set g_rmw_event_type_set{ RMW_EVENT_LIVELINESS_CHANGED, RMW_EVENT_REQUESTED_DEADLINE_MISSED, RMW_EVENT_LIVELINESS_LOST, - RMW_EVENT_OFFERED_DEADLINE_MISSED + RMW_EVENT_OFFERED_DEADLINE_MISSED, + RMW_EVENT_MESSAGE_LOST, + RMW_EVENT_OFFERED_QOS_INCOMPATIBLE, + RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE }; namespace rmw_fastrtps_shared_cpp @@ -36,6 +40,29 @@ bool is_event_supported(rmw_event_type_t event_type) return g_rmw_event_type_set.count(event_type) == 1; } +rmw_qos_policy_kind_t dds_qos_policy_to_rmw_qos_policy( + eprosima::fastdds::dds::QosPolicyId_t policy_id) +{ + using eprosima::fastdds::dds::QosPolicyId_t; + + switch (policy_id) { + case QosPolicyId_t::DURABILITY_QOS_POLICY_ID: + return RMW_QOS_POLICY_DURABILITY; + case QosPolicyId_t::DEADLINE_QOS_POLICY_ID: + return RMW_QOS_POLICY_DEADLINE; + case QosPolicyId_t::LIVELINESS_QOS_POLICY_ID: + return RMW_QOS_POLICY_LIVELINESS; + case QosPolicyId_t::RELIABILITY_QOS_POLICY_ID: + return RMW_QOS_POLICY_RELIABILITY; + case QosPolicyId_t::HISTORY_QOS_POLICY_ID: + return RMW_QOS_POLICY_HISTORY; + case QosPolicyId_t::LIFESPAN_QOS_POLICY_ID: + return RMW_QOS_POLICY_LIFESPAN; + default: + return RMW_QOS_POLICY_INVALID; + } +} + } // namespace internal rmw_ret_t