diff --git a/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp b/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp index 711b04998c..058ed45818 100644 --- a/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp +++ b/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp @@ -94,6 +94,8 @@ class SubscriptionTopicStatistics /// Handle a message received by the subscription to collect statistics. /** + * This method acquires a lock to prevent race conditions to collectors list. + * * \param received_message the message received by the subscription * \param now_nanoseconds current time in nanoseconds */ @@ -101,6 +103,7 @@ class SubscriptionTopicStatistics const CallbackMessageT & received_message, const rclcpp::Time now_nanoseconds) const { + std::lock_guard lock(mutex_); for (const auto & collector : subscriber_statistics_collectors_) { collector->OnMessageReceived(received_message, now_nanoseconds.nanoseconds()); } @@ -116,21 +119,32 @@ class SubscriptionTopicStatistics } /// Publish a populated MetricsStatisticsMessage. + /** + * This method acquires a lock to prevent race conditions to collectors list. + */ virtual void publish_message() { + std::vector msgs; rclcpp::Time window_end{get_current_nanoseconds_since_epoch()}; - for (auto & collector : subscriber_statistics_collectors_) { - const auto collected_stats = collector->GetStatisticsResults(); - - auto message = libstatistics_collector::collector::GenerateStatisticMessage( - node_name_, - collector->GetMetricName(), - collector->GetMetricUnit(), - window_start_, - window_end, - collected_stats); - publisher_->publish(message); + { + std::lock_guard lock(mutex_); + for (auto & collector : subscriber_statistics_collectors_) { + const auto collected_stats = collector->GetStatisticsResults(); + + auto message = libstatistics_collector::collector::GenerateStatisticMessage( + node_name_, + collector->GetMetricName(), + collector->GetMetricUnit(), + window_start_, + window_end, + collected_stats); + msgs.push_back(message); + } + } + + for (auto & msg : msgs) { + publisher_->publish(msg); } window_start_ = window_end; } @@ -138,11 +152,14 @@ class SubscriptionTopicStatistics protected: /// Return a vector of all the currently collected data. /** + * This method acquires a lock to prevent race conditions to collectors list. + * * \return a vector of all the collected data */ std::vector get_current_collector_data() const { std::vector data; + std::lock_guard lock(mutex_); for (const auto & collector : subscriber_statistics_collectors_) { data.push_back(collector->GetStatisticsResults()); } @@ -151,23 +168,35 @@ class SubscriptionTopicStatistics private: /// Construct and start all collectors and set window_start_. + /** + * This method acquires a lock to prevent race conditions to collectors list. + */ void bring_up() { auto received_message_period = std::make_unique(); received_message_period->Start(); - subscriber_statistics_collectors_.emplace_back(std::move(received_message_period)); + { + std::lock_guard lock(mutex_); + subscriber_statistics_collectors_.emplace_back(std::move(received_message_period)); + } window_start_ = rclcpp::Time(get_current_nanoseconds_since_epoch()); } /// Stop all collectors, clear measurements, stop publishing timer, and reset publisher. + /** + * This method acquires a lock to prevent race conditions to collectors list. + */ void tear_down() { - for (auto & collector : subscriber_statistics_collectors_) { - collector->Stop(); - } + { + std::lock_guard lock(mutex_); + for (auto & collector : subscriber_statistics_collectors_) { + collector->Stop(); + } - subscriber_statistics_collectors_.clear(); + subscriber_statistics_collectors_.clear(); + } if (publisher_timer_) { publisher_timer_->cancel(); @@ -187,6 +216,8 @@ class SubscriptionTopicStatistics return std::chrono::duration_cast(now.time_since_epoch()).count(); } + /// Mutex to protect the subsequence vectors + mutable std::mutex mutex_; /// Collection of statistics collectors std::vector> subscriber_statistics_collectors_{}; /// Node name used to generate topic statistics messages to be published