Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clear statistics after window reset (#1531) #1535

Merged
merged 11 commits into from
Feb 5, 2021
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/create_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ create_subscription(
auto sub_call_back = [weak_subscription_topic_stats]() {
auto subscription_topic_stats = weak_subscription_topic_stats.lock();
if (subscription_topic_stats) {
subscription_topic_stats->publish_message();
subscription_topic_stats->publish_message_and_reset_measurements();
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class SubscriptionTopicStatistics
/**
* This method acquires a lock to prevent race conditions to collectors list.
*/
virtual void publish_message()
virtual void publish_message_and_reset_measurements()
{
std::vector<MetricsMessage> msgs;
rclcpp::Time window_end{get_current_nanoseconds_since_epoch()};
Expand All @@ -136,6 +136,7 @@ class SubscriptionTopicStatistics
std::lock_guard<std::mutex> lock(mutex_);
for (auto & collector : subscriber_statistics_collectors_) {
const auto collected_stats = collector->GetStatisticsResults();
collector->ClearCurrentMeasurements();

auto message = libstatistics_collector::collector::GenerateStatisticMessage(
node_name_,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

namespace
{
constexpr const std::chrono::seconds defaultStatisticsPublishPeriod{1};
constexpr const char kTestPubNodeName[]{"test_pub_stats_node"};
constexpr const char kTestSubNodeName[]{"test_sub_stats_node"};
constexpr const char kTestSubStatsTopic[]{"/test_sub_stats_topic"};
Expand All @@ -53,10 +54,17 @@ constexpr const char kTestTopicStatisticsTopic[]{"/test_topic_statistics_topic"}
constexpr const char kMessageAgeSourceLabel[]{"message_age"};
constexpr const char kMessagePeriodSourceLabel[]{"message_period"};
constexpr const uint64_t kNoSamples{0};
constexpr const std::chrono::seconds kTestDuration{10};
constexpr const uint64_t kNumExpectedMessages{8};
constexpr const uint64_t kNumExpectedMessageAgeMessages{kNumExpectedMessages / 2};
constexpr const uint64_t kNumExpectedMessagePeriodMessages{kNumExpectedMessages / 2};
constexpr const std::chrono::seconds kTestTimeout{10};
constexpr const uint64_t kNumExpectedWindows{4};
constexpr const uint64_t kNumExpectedMessages{kNumExpectedWindows * 2};
constexpr const uint64_t kNumExpectedMessageAgeMessages{kNumExpectedWindows};
constexpr const uint64_t kNumExpectedMessagePeriodMessages{kNumExpectedWindows};
constexpr const std::chrono::seconds kUnstableMessageAgeWindowDuration{
defaultStatisticsPublishPeriod * (kNumExpectedWindows / 2)};
// kUnstableMessageAgeWindowDuration can take following value.
// Min: defaultStatisticsPublishPeriod * 2
// Max: defaultStatisticsPublishPeriod * (kNumExpectedWindows - 2)
constexpr const std::chrono::seconds kUnstableMessageAgeOffset{std::chrono::seconds{1}};
} // namespace

using rclcpp::msg::MessageWithHeader;
Expand Down Expand Up @@ -161,6 +169,49 @@ class MessageWithHeaderPublisher : public rclcpp::Node
std::uniform_int_distribution<uint32_t> uniform_dist_;
};

/**
* TransitionMessageStamp publisher node : used to publish MessageWithHeader with `header` value set
* The message age results change during the test.
*/

class TransitionMessageStampPublisher : public rclcpp::Node
{
public:
TransitionMessageStampPublisher(
const std::string & name, const std::string & topic,
const std::chrono::seconds transition_duration, const std::chrono::seconds message_age_offset,
const std::chrono::milliseconds & publish_period = std::chrono::milliseconds{100})
: Node(name), transition_duration_(transition_duration), message_age_offset_(message_age_offset)
{
publisher_ = create_publisher<MessageWithHeader>(topic, 10);
publish_timer_ = this->create_wall_timer(publish_period, [this]() {this->publish_message();});
start_time_ = this->now();
}

private:
void publish_message()
{
auto msg = MessageWithHeader{};
auto now = this->now();
auto elapsed_time = now - start_time_;
if (elapsed_time < transition_duration_) {
// Apply only to the topic statistics in the first half
// Subtract offset so message_age is always >= offset.
msg.header.stamp = now - message_age_offset_;
} else {
msg.header.stamp = now;
}
publisher_->publish(msg);
}

std::chrono::seconds transition_duration_;
std::chrono::seconds message_age_offset_;
rclcpp::Time start_time_;

rclcpp::Publisher<MessageWithHeader>::SharedPtr publisher_;
rclcpp::TimerBase::SharedPtr publish_timer_;
};

/**
* Empty subscriber node: used to create subscriber topic statistics requirements
*/
Expand Down Expand Up @@ -202,6 +253,7 @@ class MessageWithHeaderSubscriber : public rclcpp::Node
// manually enable topic statistics via options
auto options = rclcpp::SubscriptionOptions();
options.topic_stats_options.state = rclcpp::TopicStatisticsState::Enable;
options.topic_stats_options.publish_period = defaultStatisticsPublishPeriod;

auto callback = [](MessageWithHeader::UniquePtr msg) {
(void) msg;
Expand Down Expand Up @@ -382,7 +434,7 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_no
// Spin and get future
ex.spin_until_future_complete(
statistics_listener->GetFuture(),
kTestDuration);
kTestTimeout);

// Compare message counts, sample count should be the same as published and received count
EXPECT_EQ(kNumExpectedMessages, statistics_listener->GetNumberOfMessagesReceived());
Expand Down Expand Up @@ -447,7 +499,7 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_wi
// Spin and get future
ex.spin_until_future_complete(
statistics_listener->GetFuture(),
kTestDuration);
kTestTimeout);

// Compare message counts, sample count should be the same as published and received count
EXPECT_EQ(kNumExpectedMessages, statistics_listener->GetNumberOfMessagesReceived());
Expand Down Expand Up @@ -476,3 +528,61 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_wi
check_if_statistic_message_is_populated(msg);
}
}

TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_include_window_reset)
{
// Create a MessageWithHeader publisher
auto msg_with_header_publisher = std::make_shared<TransitionMessageStampPublisher>(
kTestPubNodeName, kTestSubStatsTopic, kUnstableMessageAgeWindowDuration,
kUnstableMessageAgeOffset);

// msg_with_header_subscriber has a topic statistics instance as part of its
// subscription this will listen to and generate statistics
auto msg_with_header_subscriber =
std::make_shared<MessageWithHeaderSubscriber>(kTestSubNodeName, kTestSubStatsTopic);
// Create a listener for topic statistics messages
auto statistics_listener = std::make_shared<rclcpp::topic_statistics::MetricsMessageSubscriber>(
"test_receive_stats_include_window_reset", "/statistics", kNumExpectedMessages);

rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(msg_with_header_publisher);
ex.add_node(statistics_listener);
ex.add_node(msg_with_header_subscriber);

// Spin and get future
ex.spin_until_future_complete(statistics_listener->GetFuture(), kTestTimeout);

const auto received_messages = statistics_listener->GetReceivedMessages();
mm318 marked this conversation as resolved.
Show resolved Hide resolved
EXPECT_EQ(kNumExpectedMessages, received_messages.size());

auto message_age_offset =
std::chrono::duration<double, std::milli>(kUnstableMessageAgeOffset).count();

// Check that the first statistic contains the offset inside of its window
auto head_message = received_messages[0];
for (const auto & stats_point : head_message.statistics) {
const auto type = stats_point.data_type;
switch (type) {
case StatisticDataType::STATISTICS_DATA_TYPE_MINIMUM:
case StatisticDataType::STATISTICS_DATA_TYPE_MAXIMUM:
EXPECT_GE(stats_point.data, message_age_offset);
break;
default:
break;
}
}

// Check that the last statistic does not contain the offset outside of its window
auto tail_message = received_messages[received_messages.size() - 1];
for (const auto & stats_point : tail_message.statistics) {
const auto type = stats_point.data_type;
switch (type) {
case StatisticDataType::STATISTICS_DATA_TYPE_MINIMUM:
case StatisticDataType::STATISTICS_DATA_TYPE_MAXIMUM:
EXPECT_LT(stats_point.data, message_age_offset);
break;
default:
break;
}
}
}