Skip to content
This repository has been archived by the owner on Jun 10, 2021. It is now read-only.

Add periodic API #6

Merged
merged 4 commits into from Nov 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 7 additions & 7 deletions metrics_statistics_msgs/CMakeLists.txt
Expand Up @@ -29,16 +29,16 @@ find_package(builtin_interfaces REQUIRED)
find_package(rosidl_default_generators REQUIRED)

set(msg_files
"msg/MetricsMessage.msg"
"msg/StatisticDataPoint.msg"
"msg/StatisticDataType.msg"
)
msg/MetricsMessage.msg
msg/StatisticDataPoint.msg
msg/StatisticDataType.msg
)

## Generate added messages and services with any dependencies listed here
rosidl_generate_interfaces(${PROJECT_NAME}
${msg_files}
DEPENDENCIES builtin_interfaces std_msgs
)
${msg_files}
DEPENDENCIES builtin_interfaces std_msgs
)

ament_export_dependencies(rosidl_default_runtime)

Expand Down
30 changes: 21 additions & 9 deletions system_metrics_collector/CMakeLists.txt
Expand Up @@ -28,12 +28,14 @@ find_package(rclcpp REQUIRED)
find_package(rcutils REQUIRED)
find_package(metrics_statistics_msgs REQUIRED)

set(SOURCES
src/moving_average_statistics/moving_average.hpp
add_library(system_metrics_collector SHARED
src/moving_average_statistics/moving_average.cpp
src/moving_average_statistics/types.hpp
src/system_metrics_collector/collector.hpp
src/system_metrics_collector/collector.cpp)
src/system_metrics_collector/collector.cpp
src/system_metrics_collector/periodic_measurement_node.cpp)

ament_target_dependencies(system_metrics_collector rclcpp rcpputils)
ament_export_libraries(system_metrics_collector)

if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
Expand All @@ -42,12 +44,19 @@ if(BUILD_TESTING)
ament_lint_auto_find_test_dependencies()

ament_add_gtest(test_moving_average_statistics
test/moving_average_statistics/test_moving_average_statistics.cpp
${SOURCES})
test/moving_average_statistics/test_moving_average_statistics.cpp)
target_link_libraries(test_moving_average_statistics system_metrics_collector)
ament_target_dependencies(test_moving_average_statistics rclcpp)

ament_add_gtest(test_collector
test/system_metrics_collector/test_collector.cpp
${SOURCES})
test/system_metrics_collector/test_collector.cpp)
target_link_libraries(test_collector system_metrics_collector)
ament_target_dependencies(test_collector rclcpp)

ament_add_gtest(test_periodic_measurement_node
test/system_metrics_collector/test_periodic_measurement_node.cpp)
target_link_libraries(test_periodic_measurement_node system_metrics_collector)
ament_target_dependencies(test_periodic_measurement_node rclcpp)

install(TARGETS
test_moving_average_statistics
Expand All @@ -57,7 +66,10 @@ if(BUILD_TESTING)
test_collector
DESTINATION
lib/${PROJECT_NAME})

install(TARGETS
test_periodic_measurement_node
DESTINATION
lib/${PROJECT_NAME})
endif()

ament_package()
5 changes: 5 additions & 0 deletions system_metrics_collector/package.xml
Expand Up @@ -11,14 +11,19 @@

<build_depend>rclcpp</build_depend>
<build_depend>rcutils</build_depend>
<build_depend>rcpputils</build_depend>
<build_depend>metrics_statistics_msgs</build_depend>

<exec_depend>rclcpp</exec_depend>
<exec_depend>rcpputils</exec_depend>
<exec_depend>rcutils</exec_depend>
<exec_depend>metrics_statistics_msgs</exec_depend>

<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<test_depend>rclcpp</test_depend>
<test_depend>rcpputils</test_depend>
<test_depend>rcutils</test_depend>

<export>
<build_type>ament_cmake</build_type>
Expand Down
Expand Up @@ -25,6 +25,7 @@

#include "types.hpp"

#include "rcpputils/thread_safety_annotations.hpp"

/**
* A class for calculating moving average statistics. This operates in constant memory and constant time. Note:
Expand All @@ -49,21 +50,21 @@ class MovingAverageStatistics
*
* @return The arithmetic mean of all data recorded, or NaN if the sample count is 0.
**/
double average() const;
double average() const RCPPUTILS_TSA_REQUIRES(mutex);

/**
* Returns the maximum value recorded. If size of list is zero, returns NaN.
*
* @return The maximum value recorded, or NaN if size of data is zero.
**/
double max() const;
double max() const RCPPUTILS_TSA_REQUIRES(mutex);

/**
* Returns the minimum value recorded. If size of list is zero, returns NaN.
*
* @return The minimum value recorded, or NaN if size of data is zero.
**/
double min() const;
double min() const RCPPUTILS_TSA_REQUIRES(mutex);

/**
* Returns the standard deviation (population) of all data recorded. If size of list is zero, returns NaN.
Expand All @@ -73,7 +74,7 @@ class MovingAverageStatistics
*
* @return The standard deviation (population) of all data recorded, or NaN if size of data is zero.
**/
double standardDeviation() const;
double standardDeviation() const RCPPUTILS_TSA_REQUIRES(mutex);

/**
* Return a StatisticData object, containing average, minimum, maximum, standard deviation (population),
Expand Down Expand Up @@ -106,12 +107,11 @@ class MovingAverageStatistics

private:
mutable std::mutex mutex;
// cached values below
double average_ = 0;
double min_ = std::numeric_limits<double>::max();
double max_ = std::numeric_limits<double>::min();
double sum_of_square_diff_from_mean_ = 0;
uint64_t count_ = 0;
double average_ = 0 RCPPUTILS_TSA_GUARDED_BY(mutex);
double min_ = std::numeric_limits<double>::max() RCPPUTILS_TSA_GUARDED_BY(mutex);
double max_ = std::numeric_limits<double>::min() RCPPUTILS_TSA_GUARDED_BY(mutex);
double sum_of_square_diff_from_mean_ = 0 RCPPUTILS_TSA_GUARDED_BY(mutex);
uint64_t count_ = 0 RCPPUTILS_TSA_GUARDED_BY(mutex);
};

#endif // MOVING_AVERAGE_STATISTICS__MOVING_AVERAGE_HPP_
Expand Up @@ -12,14 +12,42 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "collector.hpp"

#include <mutex>
#include <sstream>
#include <string>

#include "../../src/moving_average_statistics/moving_average.hpp"
#include "../../src/moving_average_statistics/types.hpp"

#include "collector.hpp"
bool Collector::start()
{
std::unique_lock<std::mutex> ulock(mutex);
if (started_) {
return false;
}
started_ = true;
return setupStart();
}

bool Collector::stop()
{
bool ret = false;
{
std::unique_lock<std::mutex> ulock(mutex);
if (!started_) {
return false;
}
started_ = false;

ret = setupStop();
}
clearCurrentMeasurements();
return ret;
}

void Collector::acceptData(double measurement)
void Collector::acceptData(const double measurement)
{
collected_data_.addMeasurement(measurement);
}
Expand All @@ -33,3 +61,17 @@ void Collector::clearCurrentMeasurements()
{
collected_data_.reset();
}

bool Collector::isStarted() const
{
std::unique_lock<std::mutex> ulock(mutex);
return started_;
}

std::string Collector::getStatusString() const
{
std::stringstream ss;
ss << "started=" << (isStarted() ? "true" : "false") <<
", " << statisticsDataToString(getStatisticsResults());
return ss.str();
}
Expand Up @@ -15,11 +15,16 @@
#ifndef SYSTEM_METRICS_COLLECTOR__COLLECTOR_HPP_
#define SYSTEM_METRICS_COLLECTOR__COLLECTOR_HPP_

#include <mutex>
#include <string>

#include "../moving_average_statistics/moving_average.hpp"
#include "../moving_average_statistics/types.hpp"

#include "rcpputils/thread_safety_annotations.hpp"

/**
* Simple wrapping class in order to collect observed data and generate statistics for the given observations.
* Simple class in order to collect observed data and generate statistics for the given observations.
*/
class Collector
{
Expand All @@ -28,26 +33,30 @@ class Collector
virtual ~Collector() = default;

/**
* Start collecting data. Meant to be called after construction.
* Start collecting data. Meant to be called after construction. Note: this locks the recursive mutex class
* member 'mutex'.
*
* @return true if started, false if an error occured
* @return true if started, false if an error occurred
*/
virtual bool start() = 0;
bool start();

/**
* Stop collecting data. Meant to be a teardown method (before destruction, but should place the
* class in a restartable state, i.e., start can be called to be able to resume collection.
*
* This calls clearCurrentMeasurements.
*
* @return true if stopped, false if an error occurred
*/
virtual bool stop() = 0;
bool stop();

/**
* Add an observed measurement.
* Add an observed measurement. This aggregates the measurement and calculates statistics
* via the moving_average class.
*
* @param the measurement observed
*/
virtual void acceptData(double measurement);
virtual void acceptData(const double measurement);

/**
* Return the statistics for all of the observed data.
Expand All @@ -61,9 +70,42 @@ class Collector
*/
virtual void clearCurrentMeasurements();

/**
* Return true is start has been called, false otherwise.
*
* @return the started state of this collector
*/
bool isStarted() const;

/**
* Return a pretty printed status representation of this class
*
* @return a string detailing the current status
*/
virtual std::string getStatusString() const;

// todo @dabonnie uptime (once start has been called)

private:
/**
* Override in order to perform necessary starting steps.
*
* @return true if setup was successful, false otherwise.
*/
virtual bool setupStart() = 0 RCPPUTILS_TSA_REQUIRES(mutex);

/**
* Override in order to perform necessary teardown.
*
* @return true if teardown was successful, false otherwise.
*/
virtual bool setupStop() = 0 RCPPUTILS_TSA_REQUIRES(mutex);

mutable std::mutex mutex;

MovingAverageStatistics collected_data_;
bool started_{false};

bool started_{false} RCPPUTILS_TSA_GUARDED_BY(mutex);
};


Expand Down
@@ -0,0 +1,66 @@
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.


#include "periodic_measurement_node.hpp"

#include <chrono>
#include <string>

#include "rclcpp/rclcpp.hpp"


PeriodicMeasurementNode::PeriodicMeasurementNode(
const std::string & name,
const std::chrono::milliseconds measurement_period,
const std::string & publishing_topic)
: Node(name),
measurement_period_(measurement_period),
publishing_topic_(publishing_topic),
measurement_timer_(nullptr)
{}

bool PeriodicMeasurementNode::setupStart()
{
if (!measurement_timer_) {
RCLCPP_DEBUG(this->get_logger(), "creating timer");

measurement_timer_ = this->create_wall_timer(
measurement_period_,
std::bind(&PeriodicMeasurementNode::periodicMeasurement, this));

} else {
RCLCPP_WARN(this->get_logger(), "setupStart: measurement_timer_ already exists!");
}
return true;
}

bool PeriodicMeasurementNode::setupStop()
{
if (measurement_timer_) {
measurement_timer_->cancel();
measurement_timer_.reset();
}
return true;
}

std::string PeriodicMeasurementNode::getStatusString() const
{
std::stringstream ss;
ss << "name=" << get_name() <<
", measurement_period=" << std::to_string(measurement_period_.count()) << "ms" <<
", publishing_topic=" << publishing_topic_ <<
", " << Collector::getStatusString();
return ss.str();
}