diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index d6b90cc815..cf2458de62 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -101,7 +101,7 @@ create_subscription( auto sub_call_back = [weak_subscription_topic_stats]() { auto subscription_topic_stats = weak_subscription_topic_stats.lock(); if (subscription_topic_stats) { - subscription_topic_stats->publish_message(); + subscription_topic_stats->publish_message_and_reset_measurements(); } }; diff --git a/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp b/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp index 46155ca4e9..4b9221406f 100644 --- a/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp +++ b/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp @@ -127,7 +127,7 @@ class SubscriptionTopicStatistics /** * This method acquires a lock to prevent race conditions to collectors list. */ - virtual void publish_message() + virtual void publish_message_and_reset_measurements() { std::vector msgs; rclcpp::Time window_end{get_current_nanoseconds_since_epoch()}; @@ -136,6 +136,7 @@ class SubscriptionTopicStatistics std::lock_guard lock(mutex_); for (auto & collector : subscriber_statistics_collectors_) { const auto collected_stats = collector->GetStatisticsResults(); + collector->ClearCurrentMeasurements(); auto message = libstatistics_collector::collector::GenerateStatisticMessage( node_name_, diff --git a/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp b/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp index eac1ed9b95..6c6d5e6e2a 100644 --- a/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp +++ b/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp @@ -45,6 +45,7 @@ namespace { +constexpr const std::chrono::seconds defaultStatisticsPublishPeriod{1}; constexpr const char kTestPubNodeName[]{"test_pub_stats_node"}; constexpr const char kTestSubNodeName[]{"test_sub_stats_node"}; constexpr const char kTestSubStatsTopic[]{"/test_sub_stats_topic"}; @@ -53,10 +54,17 @@ constexpr const char kTestTopicStatisticsTopic[]{"/test_topic_statistics_topic"} constexpr const char kMessageAgeSourceLabel[]{"message_age"}; constexpr const char kMessagePeriodSourceLabel[]{"message_period"}; constexpr const uint64_t kNoSamples{0}; -constexpr const std::chrono::seconds kTestDuration{10}; -constexpr const uint64_t kNumExpectedMessages{8}; -constexpr const uint64_t kNumExpectedMessageAgeMessages{kNumExpectedMessages / 2}; -constexpr const uint64_t kNumExpectedMessagePeriodMessages{kNumExpectedMessages / 2}; +constexpr const std::chrono::seconds kTestTimeout{10}; +constexpr const uint64_t kNumExpectedWindows{4}; +constexpr const uint64_t kNumExpectedMessages{kNumExpectedWindows * 2}; +constexpr const uint64_t kNumExpectedMessageAgeMessages{kNumExpectedWindows}; +constexpr const uint64_t kNumExpectedMessagePeriodMessages{kNumExpectedWindows}; +constexpr const std::chrono::seconds kUnstableMessageAgeWindowDuration{ + defaultStatisticsPublishPeriod * (kNumExpectedWindows / 2)}; +// kUnstableMessageAgeWindowDuration can take following value. +// Min: defaultStatisticsPublishPeriod * 2 +// Max: defaultStatisticsPublishPeriod * (kNumExpectedWindows - 2) +constexpr const std::chrono::seconds kUnstableMessageAgeOffset{std::chrono::seconds{1}}; } // namespace using rclcpp::msg::MessageWithHeader; @@ -161,6 +169,49 @@ class MessageWithHeaderPublisher : public rclcpp::Node std::uniform_int_distribution uniform_dist_; }; +/** + * TransitionMessageStamp publisher node : used to publish MessageWithHeader with `header` value set + * The message age results change during the test. + */ + +class TransitionMessageStampPublisher : public rclcpp::Node +{ +public: + TransitionMessageStampPublisher( + const std::string & name, const std::string & topic, + const std::chrono::seconds transition_duration, const std::chrono::seconds message_age_offset, + const std::chrono::milliseconds & publish_period = std::chrono::milliseconds{100}) + : Node(name), transition_duration_(transition_duration), message_age_offset_(message_age_offset) + { + publisher_ = create_publisher(topic, 10); + publish_timer_ = this->create_wall_timer(publish_period, [this]() {this->publish_message();}); + start_time_ = this->now(); + } + +private: + void publish_message() + { + auto msg = MessageWithHeader{}; + auto now = this->now(); + auto elapsed_time = now - start_time_; + if (elapsed_time < transition_duration_) { + // Apply only to the topic statistics in the first half + // Subtract offset so message_age is always >= offset. + msg.header.stamp = now - message_age_offset_; + } else { + msg.header.stamp = now; + } + publisher_->publish(msg); + } + + std::chrono::seconds transition_duration_; + std::chrono::seconds message_age_offset_; + rclcpp::Time start_time_; + + rclcpp::Publisher::SharedPtr publisher_; + rclcpp::TimerBase::SharedPtr publish_timer_; +}; + /** * Empty subscriber node: used to create subscriber topic statistics requirements */ @@ -202,6 +253,7 @@ class MessageWithHeaderSubscriber : public rclcpp::Node // manually enable topic statistics via options auto options = rclcpp::SubscriptionOptions(); options.topic_stats_options.state = rclcpp::TopicStatisticsState::Enable; + options.topic_stats_options.publish_period = defaultStatisticsPublishPeriod; auto callback = [](MessageWithHeader::UniquePtr msg) { (void) msg; @@ -382,7 +434,7 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_no // Spin and get future ex.spin_until_future_complete( statistics_listener->GetFuture(), - kTestDuration); + kTestTimeout); // Compare message counts, sample count should be the same as published and received count EXPECT_EQ(kNumExpectedMessages, statistics_listener->GetNumberOfMessagesReceived()); @@ -447,7 +499,7 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_wi // Spin and get future ex.spin_until_future_complete( statistics_listener->GetFuture(), - kTestDuration); + kTestTimeout); // Compare message counts, sample count should be the same as published and received count EXPECT_EQ(kNumExpectedMessages, statistics_listener->GetNumberOfMessagesReceived()); @@ -476,3 +528,61 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_wi check_if_statistic_message_is_populated(msg); } } + +TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_include_window_reset) +{ + // Create a MessageWithHeader publisher + auto msg_with_header_publisher = std::make_shared( + kTestPubNodeName, kTestSubStatsTopic, kUnstableMessageAgeWindowDuration, + kUnstableMessageAgeOffset); + + // msg_with_header_subscriber has a topic statistics instance as part of its + // subscription this will listen to and generate statistics + auto msg_with_header_subscriber = + std::make_shared(kTestSubNodeName, kTestSubStatsTopic); + // Create a listener for topic statistics messages + auto statistics_listener = std::make_shared( + "test_receive_stats_include_window_reset", "/statistics", kNumExpectedMessages); + + rclcpp::executors::SingleThreadedExecutor ex; + ex.add_node(msg_with_header_publisher); + ex.add_node(statistics_listener); + ex.add_node(msg_with_header_subscriber); + + // Spin and get future + ex.spin_until_future_complete(statistics_listener->GetFuture(), kTestTimeout); + + const auto received_messages = statistics_listener->GetReceivedMessages(); + EXPECT_EQ(kNumExpectedMessages, received_messages.size()); + + auto message_age_offset = + std::chrono::duration(kUnstableMessageAgeOffset).count(); + + // Check that the first statistic contains the offset inside of its window + auto head_message = received_messages[0]; + for (const auto & stats_point : head_message.statistics) { + const auto type = stats_point.data_type; + switch (type) { + case StatisticDataType::STATISTICS_DATA_TYPE_MINIMUM: + case StatisticDataType::STATISTICS_DATA_TYPE_MAXIMUM: + EXPECT_GE(stats_point.data, message_age_offset); + break; + default: + break; + } + } + + // Check that the last statistic does not contain the offset outside of its window + auto tail_message = received_messages[received_messages.size() - 1]; + for (const auto & stats_point : tail_message.statistics) { + const auto type = stats_point.data_type; + switch (type) { + case StatisticDataType::STATISTICS_DATA_TYPE_MINIMUM: + case StatisticDataType::STATISTICS_DATA_TYPE_MAXIMUM: + EXPECT_LT(stats_point.data, message_age_offset); + break; + default: + break; + } + } +}