Skip to content
This repository has been archived by the owner on Jun 10, 2021. It is now read-only.

Commit

Permalink
Add subscriber_topic_statistics class and tests
Browse files Browse the repository at this point in the history
Signed-off-by: Prajakta Gokhale <prajaktg@amazon.com>
  • Loading branch information
Prajakta Gokhale committed Mar 18, 2020
1 parent e672ed0 commit 4f8520f
Show file tree
Hide file tree
Showing 4 changed files with 715 additions and 0 deletions.
10 changes: 10 additions & 0 deletions system_metrics_collector/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ add_library(system_metrics_collector SHARED
src/system_metrics_collector/proc_cpu_data.cpp
src/system_metrics_collector/proc_pid_cpu_data.cpp
src/system_metrics_collector/utilities.cpp
src/topic_statistics_collector/subscriber_topic_statistics.cpp
)
ament_target_dependencies(system_metrics_collector
message_filters
Expand Down Expand Up @@ -150,6 +151,11 @@ if(BUILD_TESTING)
target_link_libraries(test_received_message_age system_metrics_collector)
ament_target_dependencies(test_received_message_age rcl rclcpp message_filters std_msgs sensor_msgs)

ament_add_gtest(test_subscriber_topic_statistics
test/topic_statistics_collector/test_subscriber_topic_statistics.cpp)
target_link_libraries(test_subscriber_topic_statistics system_metrics_collector)
ament_target_dependencies(test_subscriber_topic_statistics rcl rclcpp sensor_msgs)

install(TARGETS
test_moving_average_statistics
DESTINATION
Expand All @@ -170,6 +176,10 @@ if(BUILD_TESTING)
test_received_message_age
DESTINATION
lib/${PROJECT_NAME})
install(TARGETS
test_subscriber_topic_statistics
DESTINATION
lib/${PROJECT_NAME})
endif()

# Install launch files
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <chrono>
#include <limits>
#include <memory>
#include <string>
#include <utility>

#include "rclcpp/rclcpp.hpp"

#include "subscriber_topic_statistics.hpp"
#include "system_metrics_collector/constants.hpp"
#include "system_metrics_collector/metrics_message_publisher.hpp"

using metrics_statistics_msgs::msg::MetricsMessage;
using std::placeholders::_1;

namespace topic_statistics_collector
{
template<typename T>
SubscriberTopicStatistics<T>::SubscriberTopicStatistics(
const std::string & node_name,
const rclcpp::NodeOptions & node_options,
const std::string & topic_name)
: rclcpp_lifecycle::LifecycleNode{node_name, node_options},
topic_name_(topic_name)
{
rcl_interfaces::msg::IntegerRange positive_range;
positive_range.from_value = 1;
positive_range.to_value =
std::numeric_limits<decltype(rcl_interfaces::msg::IntegerRange::to_value)>::max();
positive_range.step = 1;

rcl_interfaces::msg::ParameterDescriptor descriptor;
descriptor.read_only = true;
descriptor.integer_range.push_back(positive_range);

descriptor.description =
"The period in milliseconds between each published MetricsMessage.";
auto publish_period = declare_parameter(
system_metrics_collector::collector_node_constants::kPublishPeriodParam,
system_metrics_collector::collector_node_constants::kDefaultPublishPeriod.count(),
descriptor);
publish_period_ = std::chrono::milliseconds{publish_period};

statistics_collectors_.push_back(std::make_unique<ReceivedMessageAgeCollector<T>>());
statistics_collectors_.push_back(std::make_unique<ReceivedMessagePeriodCollector<T>>());

auto qos_options_ = rclcpp::QoS(rclcpp::KeepAll());
subscription_ = create_subscription<T>(
topic_name_,
qos_options_,
std::bind(&SubscriberTopicStatistics::CollectorCallback, this, _1),
subscription_options_);
}

template<typename T>
bool SubscriberTopicStatistics<T>::SetupStart()
{
assert(publish_timer_ == nullptr);

if (publisher_ == nullptr) {
publisher_ = create_publisher<MetricsMessage>(
system_metrics_collector::collector_node_constants::kStatisticsTopicName,
10 /*history_depth*/);
}

publisher_->on_activate();

RCLCPP_DEBUG(this->get_logger(), "SetupStart: creating publish_timer_");
publish_timer_ = this->create_wall_timer(
publish_period_, [this]() {
this->PublishStatisticMessage();
this->ClearCurrentMeasurements();
this->window_start_ = this->now();
});

window_start_ = now();

return true;
}

template<typename T>
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn
SubscriberTopicStatistics<T>::on_activate(const rclcpp_lifecycle::State &)
{
RCLCPP_DEBUG(this->get_logger(), "on_activate");
const auto ret = Start();
return ret ? CallbackReturn::SUCCESS : CallbackReturn::ERROR;
}

template<typename T>
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn
SubscriberTopicStatistics<T>::on_deactivate(const rclcpp_lifecycle::State & state)
{
RCLCPP_DEBUG(this->get_logger(), "on_deactivate");
const auto ret = Stop();
return ret ? CallbackReturn::SUCCESS : CallbackReturn::ERROR;
}

template<typename T>
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn
SubscriberTopicStatistics<T>::on_shutdown(const rclcpp_lifecycle::State & state)
{
RCLCPP_DEBUG(this->get_logger(), "on_shutdown");
Stop();
publisher_.reset();
return CallbackReturn::SUCCESS;
}

template<typename T>
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn
SubscriberTopicStatistics<T>::on_error(const rclcpp_lifecycle::State & state)
{
RCLCPP_DEBUG(this->get_logger(), "on_error");
Stop();
if (publisher_) {
publisher_.reset();
}
return CallbackReturn::SUCCESS;
}

template<typename T>
bool SubscriberTopicStatistics<T>::SetupStop()
{
assert(publish_timer_ != nullptr);
assert(publisher_ != nullptr);

publisher_->on_deactivate();

publish_timer_->cancel();
publish_timer_.reset();

return true;
}

template<typename T>
void SubscriberTopicStatistics<T>::CollectorCallback(const typename T::SharedPtr received_message)
{
for (const auto & collector : statistics_collectors_) {
collector->OnMessageReceived(*received_message);
}
}

template<typename T>
std::string SubscriberTopicStatistics<T>::GetStatusString() const
{
std::stringstream ss;
ss << "name=" << get_name() <<
", publishing_topic=" << (publisher_ ? publisher_->get_topic_name() : "") <<
", publish_period=" << std::to_string(publish_period_.count()) + "ms" <<
", " << Collector::GetStatusString();
return ss.str();
}

template<typename T>
void SubscriberTopicStatistics<T>::PublishStatisticMessage()
{
assert(publisher_ != nullptr);
assert(publisher_->is_activated());

const auto msg = system_metrics_collector::GenerateStatisticMessage(
get_name(),
GetMetricName(),
GetMetricUnit(),
window_start_,
now(),
GetStatisticsResults());
publisher_->publish(msg);
}

} // namespace topic_statistics_collector
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.


#ifndef TOPIC_STATISTICS_COLLECTOR__SUBSCRIBER_TOPIC_STATISTICS_HPP_
#define TOPIC_STATISTICS_COLLECTOR__SUBSCRIBER_TOPIC_STATISTICS_HPP_

#include <chrono>
#include <memory>
#include <string>
#include <vector>

#include "rclcpp_lifecycle/lifecycle_node.hpp"
#include "rclcpp/rclcpp.hpp"

#include "received_message_age.hpp"
#include "received_message_period.hpp"
#include "system_metrics_collector/collector.hpp"
#include "system_metrics_collector/metrics_message_publisher.hpp"

namespace topic_statistics_collector
{

/**
* Class which makes periodic topic statistics measurements, using a ROS2 timer.
*/
template<typename T>
class SubscriberTopicStatistics : public system_metrics_collector::MetricsMessagePublisher,
public rclcpp_lifecycle::LifecycleNode,
public system_metrics_collector::Collector
{
public:
/**
* Constructs a SubscriberTopicStatistics node.
* The following parameter can be set via the rclcpp::NodeOptions:
* `publish_period`: the period at which metrics are published
*
* @param node_name the name of this node, it must be non-empty
* @param node_options the options (arguments, parameters, etc.) for this node
* @param topic_name the name of topic to compute statistics for
*/
SubscriberTopicStatistics(
const std::string & node_name,
const rclcpp::NodeOptions & node_options,
const std::string & topic_name);

virtual ~SubscriberTopicStatistics() = default;

/**
* Returns a pretty printed status representation of this class
*
* @return a string detailing the current status
*/
std::string GetStatusString() const override;

/**
* Starts the node.
*
* @param state input state unused
* @return CallbackReturn success if start returns true, error otherwise
*/
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn on_activate(
const rclcpp_lifecycle::State & state);

/**
* Stops the node.
*
* @param input state unused
* @return CallbackReturn success if start returns true, error otherwise
*/
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn on_deactivate(
const rclcpp_lifecycle::State & state);

/**
* Stops the node and performs cleanup.
*
* @param input state unused
* @return CallbackReturn success
*/
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn on_shutdown(
const rclcpp_lifecycle::State & state);

/**
* Stops the node and attempts to perform cleanup.
*
* @param input state unused
* @return CallbackReturn success
*/
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn on_error(
const rclcpp_lifecycle::State & previous_state);

/**
* Creates ROS2 timers and a publisher for periodically triggering measurements
* and publishing MetricsMessages
*
* @return if setup was successful
*/
bool SetupStart() override;

/**
* Stops the ROS2 timers that were created by SetupStart()
*
* @return if teardown was successful
*/
bool SetupStop() override;

/**
* LifecyclePublisher publisher that is activated on SetupStart and deactivated on SetupStop().
*/
rclcpp_lifecycle::LifecyclePublisher<metrics_statistics_msgs::msg::MetricsMessage>::SharedPtr
publisher_;

private:
/**
* Publishes the statistics derived from the collected measurements (this is to be called via a
* ROS2 timer per the publish_period)
*/
void PublishStatisticMessage() override;

/**
* Callback function to execute when messages on subscribed topics are received
*
* @param msg message received
**/
void CollectorCallback(const typename T::SharedPtr received_message);

/**
* Tracks the starting time of the statistics
*/
rclcpp::Time window_start_;

/**
* The period used to publish measurement data
*/
std::chrono::milliseconds publish_period_{0};

/**
* ROS2 timer used to publish measurement messages
*/
rclcpp::TimerBase::SharedPtr publish_timer_;

/**
* Subscriber to listen to incoming messages on a topic
*/
typename rclcpp::Subscription<T>::SharedPtr subscription_;

/**
* Subscription options to configure the subscriber
*/
rclcpp::SubscriptionOptions subscription_options_;

/**
* Topic name to compute statistics for
*/
const std::string topic_name_;

/**
* ROS2 message age calculator
*/
std::vector<std::unique_ptr<TopicStatisticsCollector<T>>> statistics_collectors_;
};

} // namespace topic_statistics_collector
#endif // TOPIC_STATISTICS_COLLECTOR__SUBSCRIBER_TOPIC_STATISTICS_HPP_

0 comments on commit 4f8520f

Please sign in to comment.