Skip to content
This repository has been archived by the owner on Jun 10, 2021. It is now read-only.

Commit

Permalink
Rebase on changes in interfaces, address feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Prajakta Gokhale <prajaktg@amazon.com>
  • Loading branch information
Prajakta Gokhale committed Mar 20, 2020
1 parent 4f8520f commit e57a4d1
Show file tree
Hide file tree
Showing 4 changed files with 326 additions and 248 deletions.
Expand Up @@ -25,6 +25,11 @@ namespace topic_statistics_constants
constexpr const char kMsgAgeStatName[] = "message_age";
constexpr const char kMsgPeriodStatName[] = "message_period";
constexpr const char kMillisecondUnitName[] = "ms";

constexpr const char kCollectStatsTopicName[] = "collect_topic_name";
constexpr const char kDefaultCollectStatsTopicName[] = "/topic";

constexpr const char kPublishStatsTopicName[] = "publish_topic_name";
} // namespace topic_statistics_constants

} // namespace topic_statistics_collector
Expand Down
Expand Up @@ -16,65 +16,155 @@
#include <limits>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "rcl/time.h"
#include "rclcpp/rclcpp.hpp"
#include "rcpputils/asserts.hpp"

#include "received_message_age.hpp"
#include "received_message_period.hpp"
#include "subscriber_topic_statistics.hpp"
#include "system_metrics_collector/constants.hpp"
#include "system_metrics_collector/metrics_message_publisher.hpp"

using metrics_statistics_msgs::msg::MetricsMessage;
using std::placeholders::_1;

namespace topic_statistics_collector
{
template<typename T>
SubscriberTopicStatistics<T>::SubscriberTopicStatistics(
const std::string & node_name,
const rclcpp::NodeOptions & node_options,
const std::string & topic_name)
: rclcpp_lifecycle::LifecycleNode{node_name, node_options},
topic_name_(topic_name)
static rcl_interfaces::msg::IntegerRange buildPositiveIntegerRange(
int64_t from,
int64_t to,
uint64_t step)
{
rcl_interfaces::msg::IntegerRange positive_range;
positive_range.from_value = 1;
positive_range.to_value =
std::numeric_limits<decltype(rcl_interfaces::msg::IntegerRange::to_value)>::max();
positive_range.step = 1;
positive_range.from_value = from;
positive_range.to_value = to;
positive_range.step = step;

rcl_interfaces::msg::ParameterDescriptor descriptor;
descriptor.read_only = true;
descriptor.integer_range.push_back(positive_range);
return positive_range;
}

static rcl_interfaces::msg::ParameterDescriptor buildPeriodParameterDescriptor(
const std::string & description)
{
const auto positive_range = buildPositiveIntegerRange(
1,
std::numeric_limits<decltype(rcl_interfaces::msg::IntegerRange::to_value)>::max(),
1
);

descriptor.description =
"The period in milliseconds between each published MetricsMessage.";
rcl_interfaces::msg::ParameterDescriptor period_descriptor;
period_descriptor.read_only = true;
period_descriptor.integer_range.push_back(positive_range);
period_descriptor.description = description;

return period_descriptor;
}

static rcl_interfaces::msg::ParameterDescriptor buildTopicParameterDescriptor(
const std::string & description)
{
rcl_interfaces::msg::ParameterDescriptor topic_descriptor;
topic_descriptor.read_only = true;

return topic_descriptor;
}

static void validateStringParam(const std::string & param)
{
if (param.empty()) {
std::stringstream ss;
ss << param << " node paramater cannot be empty";
throw std::invalid_argument{ss.str().c_str()};
}
}

static void initializeClock(rcl_clock_t & clock_)
{
auto allocator = rcl_get_default_allocator();
if (RCL_RET_OK != rcl_clock_init(RCL_STEADY_TIME, &clock_, &allocator)) {
throw std::runtime_error{"Failed to initialize clock"};
}
}

static rcl_time_point_value_t getCurrentTime(rcl_clock_t & clock)
{
rcl_time_point_value_t now_time_point;
if (RCL_RET_OK != rcl_clock_get_now(&clock, &now_time_point)) {
RCUTILS_LOG_ERROR_NAMED("SubscriberTopicStatistics", "Failed to get current time");
return 0;
}
return now_time_point;
}

template<typename T>
static void initializeCollectors(
std::vector<std::shared_ptr<TopicStatisticsCollector<T>>> & container)
{
container.push_back(std::make_shared<ReceivedMessageAgeCollector<T>>());
container.push_back(std::make_shared<ReceivedMessagePeriodCollector<T>>());
}

template<typename T>
SubscriberTopicStatisticsNode<T>::SubscriberTopicStatisticsNode(
const std::string & node_name,
const rclcpp::NodeOptions & node_options)
: rclcpp_lifecycle::LifecycleNode{node_name, node_options}
{
const auto publish_param_desc = buildPeriodParameterDescriptor(
"The period in milliseconds between each published MetricsMessage."
);
auto publish_period = declare_parameter(
system_metrics_collector::collector_node_constants::kPublishPeriodParam,
system_metrics_collector::collector_node_constants::kDefaultPublishPeriod.count(),
descriptor);
publish_param_desc);
publish_period_ = std::chrono::milliseconds{publish_period};

statistics_collectors_.push_back(std::make_unique<ReceivedMessageAgeCollector<T>>());
statistics_collectors_.push_back(std::make_unique<ReceivedMessagePeriodCollector<T>>());
const auto collect_topic_desc = buildTopicParameterDescriptor(
"The topic to subscribe to, for calculating topic statistics.");
collect_topic_name_ = declare_parameter(
topic_statistics_constants::kCollectStatsTopicName,
topic_statistics_constants::kDefaultCollectStatsTopicName,
collect_topic_desc);
validateStringParam(collect_topic_name_);

const auto publish_topic_desc = buildTopicParameterDescriptor(
"The topic to publish topic statistics to.");
publish_topic_name_ = declare_parameter(
topic_statistics_constants::kPublishStatsTopicName,
system_metrics_collector::collector_node_constants::kStatisticsTopicName,
publish_topic_desc);
validateStringParam(publish_topic_name_);

initializeClock(clock_);

auto qos_options_ = rclcpp::QoS(rclcpp::KeepAll());
subscription_ = create_subscription<T>(
topic_name_,
qos_options_,
std::bind(&SubscriberTopicStatistics::CollectorCallback, this, _1),
subscription_options_);
initializeCollectors(statistics_collectors_);

auto callback = [this](typename T::SharedPtr received_message) {
for (const auto & collector : statistics_collectors_) {
collector->OnMessageReceived(
*received_message,
getCurrentTime(clock_));
}
};
subscription_ = create_subscription<T>(collect_topic_name_, 10, callback);
}

template<typename T>
SubscriberTopicStatisticsNode<T>::~SubscriberTopicStatisticsNode()
{
if (RCL_RET_OK != rcl_clock_fini(&clock_)) {
RCUTILS_LOG_WARN("Failed to finalize clock");
}
}

template<typename T>
bool SubscriberTopicStatistics<T>::SetupStart()
void SubscriberTopicStatisticsNode<T>::StartPublisher()
{
assert(publish_timer_ == nullptr);
rcpputils::check_true(publish_timer_ == nullptr);

if (publisher_ == nullptr) {
publisher_ = create_publisher<MetricsMessage>(
system_metrics_collector::collector_node_constants::kStatisticsTopicName,
10 /*history_depth*/);
publisher_ = create_publisher<MetricsMessage>(publish_topic_name_, 10);
}

publisher_->on_activate();
Expand All @@ -83,102 +173,99 @@ bool SubscriberTopicStatistics<T>::SetupStart()
publish_timer_ = this->create_wall_timer(
publish_period_, [this]() {
this->PublishStatisticMessage();
this->ClearCurrentMeasurements();
this->ClearCollectorMeasurements();
this->window_start_ = this->now();
});

window_start_ = now();

return true;
}

template<typename T>
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn
SubscriberTopicStatistics<T>::on_activate(const rclcpp_lifecycle::State &)
SubscriberTopicStatisticsNode<T>::on_activate(const rclcpp_lifecycle::State &)
{
RCLCPP_DEBUG(this->get_logger(), "on_activate");
const auto ret = Start();
return ret ? CallbackReturn::SUCCESS : CallbackReturn::ERROR;
for (const auto & collector : statistics_collectors_) {
if (!collector->Start()) {
return CallbackReturn::ERROR;
}
}

StartPublisher();
return CallbackReturn::SUCCESS;
}

template<typename T>
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn
SubscriberTopicStatistics<T>::on_deactivate(const rclcpp_lifecycle::State & state)
SubscriberTopicStatisticsNode<T>::on_deactivate(const rclcpp_lifecycle::State & state)
{
RCLCPP_DEBUG(this->get_logger(), "on_deactivate");
const auto ret = Stop();
return ret ? CallbackReturn::SUCCESS : CallbackReturn::ERROR;
for (const auto & collector : statistics_collectors_) {
if (!collector->Stop()) {
return CallbackReturn::ERROR;
}
}

StopPublisher();
return CallbackReturn::SUCCESS;
}

template<typename T>
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn
SubscriberTopicStatistics<T>::on_shutdown(const rclcpp_lifecycle::State & state)
SubscriberTopicStatisticsNode<T>::on_shutdown(const rclcpp_lifecycle::State & state)
{
RCLCPP_DEBUG(this->get_logger(), "on_shutdown");
Stop();
publisher_.reset();

return CallbackReturn::SUCCESS;
}

template<typename T>
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn
SubscriberTopicStatistics<T>::on_error(const rclcpp_lifecycle::State & state)
SubscriberTopicStatisticsNode<T>::on_error(const rclcpp_lifecycle::State & state)
{
RCLCPP_DEBUG(this->get_logger(), "on_error");
Stop();
if (publisher_) {
if (publisher_ != nullptr) {
publisher_.reset();
}
return CallbackReturn::SUCCESS;
}

template<typename T>
bool SubscriberTopicStatistics<T>::SetupStop()
void SubscriberTopicStatisticsNode<T>::ClearCollectorMeasurements()
{
for (const auto & collector : statistics_collectors_) {
collector->ClearCurrentMeasurements();
}
}

template<typename T>
void SubscriberTopicStatisticsNode<T>::StopPublisher()
{
assert(publish_timer_ != nullptr);
assert(publisher_ != nullptr);
rcpputils::check_true(publish_timer_ != nullptr);
rcpputils::check_true(publisher_ != nullptr);

publisher_->on_deactivate();

publish_timer_->cancel();
publish_timer_.reset();

return true;
}

template<typename T>
void SubscriberTopicStatistics<T>::CollectorCallback(const typename T::SharedPtr received_message)
void SubscriberTopicStatisticsNode<T>::PublishStatisticMessage()
{
rcpputils::check_true(publisher_ != nullptr);
rcpputils::check_true(publisher_->is_activated());

for (const auto & collector : statistics_collectors_) {
collector->OnMessageReceived(*received_message);
const auto msg = system_metrics_collector::GenerateStatisticMessage(
get_name(),
collector->GetMetricName(),
collector->GetMetricUnit(),
window_start_,
now(),
collector->GetStatisticsResults());
publisher_->publish(msg);
}
}

template<typename T>
std::string SubscriberTopicStatistics<T>::GetStatusString() const
{
std::stringstream ss;
ss << "name=" << get_name() <<
", publishing_topic=" << (publisher_ ? publisher_->get_topic_name() : "") <<
", publish_period=" << std::to_string(publish_period_.count()) + "ms" <<
", " << Collector::GetStatusString();
return ss.str();
}

template<typename T>
void SubscriberTopicStatistics<T>::PublishStatisticMessage()
{
assert(publisher_ != nullptr);
assert(publisher_->is_activated());

const auto msg = system_metrics_collector::GenerateStatisticMessage(
get_name(),
GetMetricName(),
GetMetricUnit(),
window_start_,
now(),
GetStatisticsResults());
publisher_->publish(msg);
}

} // namespace topic_statistics_collector

0 comments on commit e57a4d1

Please sign in to comment.