Skip to content

Commit

Permalink
Support throttling serialized graph publisher (#204)
Browse files Browse the repository at this point in the history
* Change sensor proc from gtest to gmock target
* Move ThrottledCallback to fuse_core
* Support generic callbacks in ThrottledCallback
* Throttle graph publishing
* Overload getPositiveParam for ros::Duration
* Use getPositiveParam for ros::Duration parameters
  • Loading branch information
efernandez committed Nov 30, 2020
1 parent 8c3e3c7 commit 51feecf
Show file tree
Hide file tree
Showing 21 changed files with 145 additions and 94 deletions.
16 changes: 16 additions & 0 deletions fuse_core/CMakeLists.txt
Expand Up @@ -427,6 +427,22 @@ if(CATKIN_ENABLE_TESTING)
CXX_STANDARD_REQUIRED YES
)

# Throttle callback test
add_rostest_gtest(
test_throttled_callback
test/throttled_callback.test
test/test_throttled_callback.cpp
)
target_link_libraries(test_throttled_callback
${PROJECT_NAME}
${catkin_LIBRARIES}
)
set_target_properties(test_throttled_callback
PROPERTIES
CXX_STANDARD 14
CXX_STANDARD_REQUIRED YES
)

# Util tests
catkin_add_gtest(test_util
test/test_util.cpp
Expand Down
17 changes: 17 additions & 0 deletions fuse_core/include/fuse_core/parameter.h
Expand Up @@ -92,6 +92,23 @@ void getPositiveParam(const ros::NodeHandle& node_handle, const std::string& par
}
}

/**
* @brief Helper function that loads positive duration values from the parameter server
*
* @param[in] node_handle - The node handle used to load the parameter
* @param[in] parameter_name - The parameter name to load
* @param[in, out] default_value - A default value to use if the provided parameter name does not exist. As output it
* has the loaded (or default) value
* @param[in] strict - Whether to check the loaded value is strictly positive or not, i.e. whether 0 is accepted or not
*/
inline void getPositiveParam(const ros::NodeHandle& node_handle, const std::string& parameter_name,
ros::Duration& default_value, const bool strict = true)
{
double default_value_sec = default_value.toSec();
getPositiveParam(node_handle, parameter_name, default_value_sec, strict);
default_value.fromSec(default_value_sec);
}

/**
* @brief Helper function that loads a covariance matrix diagonal vector from the parameter server and checks the size
* and the values are invalid, i.e. they are positive.
Expand Down
Expand Up @@ -31,40 +31,34 @@
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef FUSE_MODELS_COMMON_THROTTLED_CALLBACK_H
#define FUSE_MODELS_COMMON_THROTTLED_CALLBACK_H
#ifndef FUSE_CORE_THROTTLED_CALLBACK_H
#define FUSE_CORE_THROTTLED_CALLBACK_H

#include <ros/subscriber.h>

#include <functional>
#include <utility>


namespace fuse_models
{

namespace common
namespace fuse_core
{

/**
* @brief A throttled callback that encapsulates the logic to throttle a message callback so it is only called after a
* given period in seconds (or more). The dropped messages can optionally be received in a dropped callback, that could
* be used to count the number of messages dropped.
* @brief A throttled callback that encapsulates the logic to throttle a callback so it is only called after a given
* period in seconds (or more). The dropped calls can optionally be received in a dropped callback, that could be used
* to count the number of calls dropped.
*
* @tparam M The message the callback receives
* @tparam Callback The std::function callback
*/
template <class M>
template <class Callback>
class ThrottledCallback
{
public:
using MessageConstPtr = typename M::ConstPtr;
using Callback = std::function<void(const MessageConstPtr&)>;

/**
* @brief Constructor
*
* @param[in] keep_callback The callback to call when the message is kept, i.e. not dropped. Defaults to nullptr
* @param[in] drop_callback The callback to call when the message is dropped because of the throttling. Defaults to
* nullptr
* @param[in] keep_callback The callback to call when kept, i.e. not dropped. Defaults to nullptr
* @param[in] drop_callback The callback to call when dropped because of the throttling. Defaults to nullptr
* @param[in] throttle_period The throttling period duration in seconds. Defaults to 0.0, i.e. no throttling
* @param[in] use_wall_time Whether to use ros::WallTime or not. Defaults to false
*/
Expand Down Expand Up @@ -149,12 +143,13 @@ class ThrottledCallback
}

/**
* @brief Message callback that throttles the calls to the keep callback provided. When the message is dropped because
* @brief Callback that throttles the calls to the keep callback provided. When dropped because
* of throttling, the drop callback is called instead.
*
* @param[in] message The input message
* @param[in] args The input arguments
*/
void callback(const MessageConstPtr& message)
template <class... Args>
void callback(Args&&... args)
{
// Keep the callback if:
//
Expand All @@ -166,7 +161,7 @@ class ThrottledCallback
{
if (keep_callback_)
{
keep_callback_(message);
keep_callback_(std::forward<Args>(args)...);
}

if (last_called_time_.isZero())
Expand All @@ -180,21 +175,38 @@ class ThrottledCallback
}
else if (drop_callback_)
{
drop_callback_(message);
drop_callback_(std::forward<Args>(args)...);
}
}

/**
* @brief Operator() that simply calls the callback() method forwarding the input arguments
*
* @param[in] args The input arguments
*/
template <class... Args>
void operator()(Args&&... args)
{
callback(std::forward<Args>(args)...);
}

private:
Callback keep_callback_; //!< The callback to call when the message is kept, i.e. not dropped
Callback drop_callback_; //!< The callback to call when the message is dropped because of throttling
Callback keep_callback_; //!< The callback to call when kept, i.e. not dropped
Callback drop_callback_; //!< The callback to call when dropped because of throttling
ros::Duration throttle_period_; //!< The throttling period duration in seconds
bool use_wall_time_; //<! The flag to indicate whether to use ros::WallTime or not

ros::Time last_called_time_; //!< The last time the keep callback was called
};

} // namespace common
/**
* @brief Throttled callback for ROS messages
*
* @tparam M The ROS message type, which should have the M::ConstPtr nested type
*/
template <class M>
using ThrottledMessageCallback = ThrottledCallback<std::function<void(const typename M::ConstPtr&)>>;

} // namespace fuse_models
} // namespace fuse_core

#endif // FUSE_MODELS_COMMON_THROTTLED_CALLBACK_H
#endif // FUSE_CORE_THROTTLED_CALLBACK_H
Expand Up @@ -31,7 +31,7 @@
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <fuse_models/common/throttled_callback.h>
#include <fuse_core/throttled_callback.h>
#include <geometry_msgs/Point.h>
#include <ros/ros.h>

Expand Down Expand Up @@ -94,8 +94,8 @@ class PointPublisher
};

/**
* @brief A dummy point sensor model that uses a fuse_models::common::ThrottledCallback<geometry_msgs::Point> with a
* keep and drop callback.
* @brief A dummy point sensor model that uses a fuse_core::ThrottledMessageCallback<geometry_msgs::Point> with a keep
* and drop callback.
*
* The callbacks simply count the number of times they are called, for testing purposes. The keep callback also caches
* the last message received, also for testing purposes.
Expand All @@ -112,7 +112,8 @@ class PointSensorModel
: throttled_callback_(std::bind(&PointSensorModel::keepCallback, this, std::placeholders::_1),
std::bind(&PointSensorModel::dropCallback, this, std::placeholders::_1), throttle_period)
{
subscriber_ = node_handle_.subscribe("point", 10, &PointThrottledCallback::callback, &throttled_callback_);
subscriber_ = node_handle_.subscribe<geometry_msgs::Point>(
"point", 10, &PointThrottledCallback::callback, &throttled_callback_);
}

/**
Expand Down Expand Up @@ -170,7 +171,7 @@ class PointSensorModel
ros::NodeHandle node_handle_; //!< The node handle
ros::Subscriber subscriber_; //!< The subscriber

using PointThrottledCallback = fuse_models::common::ThrottledCallback<geometry_msgs::Point>;
using PointThrottledCallback = fuse_core::ThrottledMessageCallback<geometry_msgs::Point>;
PointThrottledCallback throttled_callback_; //!< The throttled callback

size_t kept_messages_{ 0 }; //!< Messages kept
Expand Down
4 changes: 4 additions & 0 deletions fuse_core/test/throttled_callback.test
@@ -0,0 +1,4 @@
<?xml version="1.0"?>
<launch>
<test test-name="throttled_callback_test" pkg="fuse_core" type="test_throttled_callback" />
</launch>
17 changes: 1 addition & 16 deletions fuse_models/CMakeLists.txt
Expand Up @@ -244,7 +244,7 @@ if(CATKIN_ENABLE_TESTING)
)

# Other tests
catkin_add_gtest(
catkin_add_gmock(
test_sensor_proc
test/test_sensor_proc.cpp
)
Expand All @@ -258,21 +258,6 @@ if(CATKIN_ENABLE_TESTING)
CXX_STANDARD_REQUIRED YES
)

add_rostest_gtest(
test_throttled_callback
test/throttled_callback.test
test/test_throttled_callback.cpp
)
target_link_libraries(test_throttled_callback
${PROJECT_NAME}
${catkin_LIBRARIES}
)
set_target_properties(test_throttled_callback
PROPERTIES
CXX_STANDARD 14
CXX_STANDARD_REQUIRED YES
)

# Benchmarks
find_package(benchmark QUIET)

Expand Down
4 changes: 2 additions & 2 deletions fuse_models/include/fuse_models/acceleration_2d.h
Expand Up @@ -35,9 +35,9 @@
#define FUSE_MODELS_ACCELERATION_2D_H

#include <fuse_models/parameters/acceleration_2d_params.h>
#include <fuse_models/common/throttled_callback.h>

#include <fuse_core/async_sensor_model.h>
#include <fuse_core/throttled_callback.h>
#include <fuse_core/uuid.h>

#include <geometry_msgs/AccelWithCovarianceStamped.h>
Expand Down Expand Up @@ -117,7 +117,7 @@ class Acceleration2D : public fuse_core::AsyncSensorModel

ros::Subscriber subscriber_;

using AccelerationThrottledCallback = common::ThrottledCallback<geometry_msgs::AccelWithCovarianceStamped>;
using AccelerationThrottledCallback = fuse_core::ThrottledMessageCallback<geometry_msgs::AccelWithCovarianceStamped>;
AccelerationThrottledCallback throttled_callback_;
};

Expand Down
4 changes: 2 additions & 2 deletions fuse_models/include/fuse_models/imu_2d.h
Expand Up @@ -35,7 +35,7 @@
#define FUSE_MODELS_IMU_2D_H

#include <fuse_models/parameters/imu_2d_params.h>
#include <fuse_models/common/throttled_callback.h>
#include <fuse_core/throttled_callback.h>

#include <fuse_core/async_sensor_model.h>
#include <fuse_core/uuid.h>
Expand Down Expand Up @@ -137,7 +137,7 @@ class Imu2D : public fuse_core::AsyncSensorModel

ros::Subscriber subscriber_;

using ImuThrottledCallback = common::ThrottledCallback<sensor_msgs::Imu>;
using ImuThrottledCallback = fuse_core::ThrottledMessageCallback<sensor_msgs::Imu>;
ImuThrottledCallback throttled_callback_;
};

Expand Down
4 changes: 2 additions & 2 deletions fuse_models/include/fuse_models/odometry_2d.h
Expand Up @@ -35,7 +35,7 @@
#define FUSE_MODELS_ODOMETRY_2D_H

#include <fuse_models/parameters/odometry_2d_params.h>
#include <fuse_models/common/throttled_callback.h>
#include <fuse_core/throttled_callback.h>

#include <fuse_core/async_sensor_model.h>
#include <fuse_core/uuid.h>
Expand Down Expand Up @@ -133,7 +133,7 @@ class Odometry2D : public fuse_core::AsyncSensorModel

ros::Subscriber subscriber_;

using OdometryThrottledCallback = common::ThrottledCallback<nav_msgs::Odometry>;
using OdometryThrottledCallback = fuse_core::ThrottledMessageCallback<nav_msgs::Odometry>;
OdometryThrottledCallback throttled_callback_;
};

Expand Down
4 changes: 2 additions & 2 deletions fuse_models/include/fuse_models/pose_2d.h
Expand Up @@ -35,9 +35,9 @@
#define FUSE_MODELS_POSE_2D_H

#include <fuse_models/parameters/pose_2d_params.h>
#include <fuse_models/common/throttled_callback.h>

#include <fuse_core/async_sensor_model.h>
#include <fuse_core/throttled_callback.h>
#include <fuse_core/uuid.h>

#include <geometry_msgs/PoseWithCovarianceStamped.h>
Expand Down Expand Up @@ -122,7 +122,7 @@ class Pose2D : public fuse_core::AsyncSensorModel

ros::Subscriber subscriber_;

using PoseThrottledCallback = common::ThrottledCallback<geometry_msgs::PoseWithCovarianceStamped>;
using PoseThrottledCallback = fuse_core::ThrottledMessageCallback<geometry_msgs::PoseWithCovarianceStamped>;
PoseThrottledCallback throttled_callback_;
};

Expand Down
4 changes: 2 additions & 2 deletions fuse_models/include/fuse_models/twist_2d.h
Expand Up @@ -35,7 +35,7 @@
#define FUSE_MODELS_TWIST_2D_H

#include <fuse_models/parameters/twist_2d_params.h>
#include <fuse_models/common/throttled_callback.h>
#include <fuse_core/throttled_callback.h>

#include <fuse_core/async_sensor_model.h>
#include <fuse_core/uuid.h>
Expand Down Expand Up @@ -111,7 +111,7 @@ class Twist2D : public fuse_core::AsyncSensorModel

ros::Subscriber subscriber_;

using TwistThrottledCallback = common::ThrottledCallback<geometry_msgs::TwistWithCovarianceStamped>;
using TwistThrottledCallback = fuse_core::ThrottledMessageCallback<geometry_msgs::TwistWithCovarianceStamped>;
TwistThrottledCallback throttled_callback_;
};

Expand Down
4 changes: 2 additions & 2 deletions fuse_models/src/acceleration_2d.cpp
Expand Up @@ -77,8 +77,8 @@ void Acceleration2D::onStart()
{
if (!params_.indices.empty())
{
subscriber_ = node_handle_.subscribe(ros::names::resolve(params_.topic), params_.queue_size,
&AccelerationThrottledCallback::callback, &throttled_callback_);
subscriber_ = node_handle_.subscribe<geometry_msgs::AccelWithCovarianceStamped>(ros::names::resolve(params_.topic),
params_.queue_size, &AccelerationThrottledCallback::callback, &throttled_callback_);
}
}

Expand Down
4 changes: 2 additions & 2 deletions fuse_models/src/imu_2d.cpp
Expand Up @@ -88,8 +88,8 @@ void Imu2D::onStart()
!params_.angular_velocity_indices.empty())
{
previous_pose_.reset();
subscriber_ = node_handle_.subscribe(ros::names::resolve(params_.topic), params_.queue_size,
&ImuThrottledCallback::callback, &throttled_callback_);
subscriber_ = node_handle_.subscribe<sensor_msgs::Imu>(ros::names::resolve(params_.topic), params_.queue_size,
&ImuThrottledCallback::callback, &throttled_callback_);
}
}

Expand Down
4 changes: 2 additions & 2 deletions fuse_models/src/odometry_2d.cpp
Expand Up @@ -89,8 +89,8 @@ void Odometry2D::onStart()
!params_.angular_velocity_indices.empty())
{
previous_pose_.reset();
subscriber_ = node_handle_.subscribe(ros::names::resolve(params_.topic), params_.queue_size,
&OdometryThrottledCallback::callback, &throttled_callback_);
subscriber_ = node_handle_.subscribe<nav_msgs::Odometry>(ros::names::resolve(params_.topic), params_.queue_size,
&OdometryThrottledCallback::callback, &throttled_callback_);
}
}

Expand Down
4 changes: 2 additions & 2 deletions fuse_models/src/pose_2d.cpp
Expand Up @@ -79,8 +79,8 @@ void Pose2D::onStart()
if (!params_.position_indices.empty() ||
!params_.orientation_indices.empty())
{
subscriber_ = node_handle_.subscribe(ros::names::resolve(params_.topic), params_.queue_size,
&PoseThrottledCallback::callback, &throttled_callback_);
subscriber_ = node_handle_.subscribe<geometry_msgs::PoseWithCovarianceStamped>(
ros::names::resolve(params_.topic), params_.queue_size, &PoseThrottledCallback::callback, &throttled_callback_);
}
}

Expand Down
4 changes: 2 additions & 2 deletions fuse_models/src/twist_2d.cpp
Expand Up @@ -79,8 +79,8 @@ void Twist2D::onStart()
if (!params_.linear_indices.empty() ||
!params_.angular_indices.empty())
{
subscriber_ = node_handle_.subscribe(ros::names::resolve(params_.topic), params_.queue_size,
&TwistThrottledCallback::callback, &throttled_callback_);
subscriber_ = node_handle_.subscribe<geometry_msgs::TwistWithCovarianceStamped>(ros::names::resolve(params_.topic),
params_.queue_size, &TwistThrottledCallback::callback, &throttled_callback_);
}
}

Expand Down
4 changes: 0 additions & 4 deletions fuse_models/test/throttled_callback.test

This file was deleted.

0 comments on commit 51feecf

Please sign in to comment.