diff --git a/fuse_core/CMakeLists.txt b/fuse_core/CMakeLists.txt index a8a1c2e33..51e33adc5 100644 --- a/fuse_core/CMakeLists.txt +++ b/fuse_core/CMakeLists.txt @@ -427,6 +427,22 @@ if(CATKIN_ENABLE_TESTING) CXX_STANDARD_REQUIRED YES ) + # Throttle callback test + add_rostest_gtest( + test_throttled_callback + test/throttled_callback.test + test/test_throttled_callback.cpp + ) + target_link_libraries(test_throttled_callback + ${PROJECT_NAME} + ${catkin_LIBRARIES} + ) + set_target_properties(test_throttled_callback + PROPERTIES + CXX_STANDARD 14 + CXX_STANDARD_REQUIRED YES + ) + # Util tests catkin_add_gtest(test_util test/test_util.cpp diff --git a/fuse_core/include/fuse_core/parameter.h b/fuse_core/include/fuse_core/parameter.h index 3ecd02c97..dd8d6f9b9 100644 --- a/fuse_core/include/fuse_core/parameter.h +++ b/fuse_core/include/fuse_core/parameter.h @@ -92,6 +92,23 @@ void getPositiveParam(const ros::NodeHandle& node_handle, const std::string& par } } +/** + * @brief Helper function that loads positive duration values from the parameter server + * + * @param[in] node_handle - The node handle used to load the parameter + * @param[in] parameter_name - The parameter name to load + * @param[in, out] default_value - A default value to use if the provided parameter name does not exist. As output it + * has the loaded (or default) value + * @param[in] strict - Whether to check the loaded value is strictly positive or not, i.e. whether 0 is accepted or not + */ +inline void getPositiveParam(const ros::NodeHandle& node_handle, const std::string& parameter_name, + ros::Duration& default_value, const bool strict = true) +{ + double default_value_sec = default_value.toSec(); + getPositiveParam(node_handle, parameter_name, default_value_sec, strict); + default_value.fromSec(default_value_sec); +} + /** * @brief Helper function that loads a covariance matrix diagonal vector from the parameter server and checks the size * and the values are invalid, i.e. they are positive. diff --git a/fuse_models/include/fuse_models/common/throttled_callback.h b/fuse_core/include/fuse_core/throttled_callback.h similarity index 77% rename from fuse_models/include/fuse_models/common/throttled_callback.h rename to fuse_core/include/fuse_core/throttled_callback.h index 34a6b15f9..4cccad7f3 100644 --- a/fuse_models/include/fuse_models/common/throttled_callback.h +++ b/fuse_core/include/fuse_core/throttled_callback.h @@ -31,40 +31,34 @@ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ -#ifndef FUSE_MODELS_COMMON_THROTTLED_CALLBACK_H -#define FUSE_MODELS_COMMON_THROTTLED_CALLBACK_H +#ifndef FUSE_CORE_THROTTLED_CALLBACK_H +#define FUSE_CORE_THROTTLED_CALLBACK_H #include #include +#include -namespace fuse_models -{ - -namespace common +namespace fuse_core { /** - * @brief A throttled callback that encapsulates the logic to throttle a message callback so it is only called after a - * given period in seconds (or more). The dropped messages can optionally be received in a dropped callback, that could - * be used to count the number of messages dropped. + * @brief A throttled callback that encapsulates the logic to throttle a callback so it is only called after a given + * period in seconds (or more). The dropped calls can optionally be received in a dropped callback, that could be used + * to count the number of calls dropped. * - * @tparam M The message the callback receives + * @tparam Callback The std::function callback */ -template +template class ThrottledCallback { public: - using MessageConstPtr = typename M::ConstPtr; - using Callback = std::function; - /** * @brief Constructor * - * @param[in] keep_callback The callback to call when the message is kept, i.e. not dropped. Defaults to nullptr - * @param[in] drop_callback The callback to call when the message is dropped because of the throttling. Defaults to - * nullptr + * @param[in] keep_callback The callback to call when kept, i.e. not dropped. Defaults to nullptr + * @param[in] drop_callback The callback to call when dropped because of the throttling. Defaults to nullptr * @param[in] throttle_period The throttling period duration in seconds. Defaults to 0.0, i.e. no throttling * @param[in] use_wall_time Whether to use ros::WallTime or not. Defaults to false */ @@ -149,12 +143,13 @@ class ThrottledCallback } /** - * @brief Message callback that throttles the calls to the keep callback provided. When the message is dropped because + * @brief Callback that throttles the calls to the keep callback provided. When dropped because * of throttling, the drop callback is called instead. * - * @param[in] message The input message + * @param[in] args The input arguments */ - void callback(const MessageConstPtr& message) + template + void callback(Args&&... args) { // Keep the callback if: // @@ -166,7 +161,7 @@ class ThrottledCallback { if (keep_callback_) { - keep_callback_(message); + keep_callback_(std::forward(args)...); } if (last_called_time_.isZero()) @@ -180,21 +175,38 @@ class ThrottledCallback } else if (drop_callback_) { - drop_callback_(message); + drop_callback_(std::forward(args)...); } } + /** + * @brief Operator() that simply calls the callback() method forwarding the input arguments + * + * @param[in] args The input arguments + */ + template + void operator()(Args&&... args) + { + callback(std::forward(args)...); + } + private: - Callback keep_callback_; //!< The callback to call when the message is kept, i.e. not dropped - Callback drop_callback_; //!< The callback to call when the message is dropped because of throttling + Callback keep_callback_; //!< The callback to call when kept, i.e. not dropped + Callback drop_callback_; //!< The callback to call when dropped because of throttling ros::Duration throttle_period_; //!< The throttling period duration in seconds bool use_wall_time_; // +using ThrottledMessageCallback = ThrottledCallback>; -} // namespace fuse_models +} // namespace fuse_core -#endif // FUSE_MODELS_COMMON_THROTTLED_CALLBACK_H +#endif // FUSE_CORE_THROTTLED_CALLBACK_H diff --git a/fuse_models/test/test_throttled_callback.cpp b/fuse_core/test/test_throttled_callback.cpp similarity index 95% rename from fuse_models/test/test_throttled_callback.cpp rename to fuse_core/test/test_throttled_callback.cpp index f63827833..181bd70a4 100644 --- a/fuse_models/test/test_throttled_callback.cpp +++ b/fuse_core/test/test_throttled_callback.cpp @@ -31,7 +31,7 @@ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ -#include +#include #include #include @@ -94,8 +94,8 @@ class PointPublisher }; /** - * @brief A dummy point sensor model that uses a fuse_models::common::ThrottledCallback with a - * keep and drop callback. + * @brief A dummy point sensor model that uses a fuse_core::ThrottledMessageCallback with a keep + * and drop callback. * * The callbacks simply count the number of times they are called, for testing purposes. The keep callback also caches * the last message received, also for testing purposes. @@ -112,7 +112,8 @@ class PointSensorModel : throttled_callback_(std::bind(&PointSensorModel::keepCallback, this, std::placeholders::_1), std::bind(&PointSensorModel::dropCallback, this, std::placeholders::_1), throttle_period) { - subscriber_ = node_handle_.subscribe("point", 10, &PointThrottledCallback::callback, &throttled_callback_); + subscriber_ = node_handle_.subscribe( + "point", 10, &PointThrottledCallback::callback, &throttled_callback_); } /** @@ -170,7 +171,7 @@ class PointSensorModel ros::NodeHandle node_handle_; //!< The node handle ros::Subscriber subscriber_; //!< The subscriber - using PointThrottledCallback = fuse_models::common::ThrottledCallback; + using PointThrottledCallback = fuse_core::ThrottledMessageCallback; PointThrottledCallback throttled_callback_; //!< The throttled callback size_t kept_messages_{ 0 }; //!< Messages kept diff --git a/fuse_core/test/throttled_callback.test b/fuse_core/test/throttled_callback.test new file mode 100644 index 000000000..b0596970b --- /dev/null +++ b/fuse_core/test/throttled_callback.test @@ -0,0 +1,4 @@ + + + + diff --git a/fuse_models/CMakeLists.txt b/fuse_models/CMakeLists.txt index abec584f2..f46a26c88 100644 --- a/fuse_models/CMakeLists.txt +++ b/fuse_models/CMakeLists.txt @@ -244,7 +244,7 @@ if(CATKIN_ENABLE_TESTING) ) # Other tests - catkin_add_gtest( + catkin_add_gmock( test_sensor_proc test/test_sensor_proc.cpp ) @@ -258,21 +258,6 @@ if(CATKIN_ENABLE_TESTING) CXX_STANDARD_REQUIRED YES ) - add_rostest_gtest( - test_throttled_callback - test/throttled_callback.test - test/test_throttled_callback.cpp - ) - target_link_libraries(test_throttled_callback - ${PROJECT_NAME} - ${catkin_LIBRARIES} - ) - set_target_properties(test_throttled_callback - PROPERTIES - CXX_STANDARD 14 - CXX_STANDARD_REQUIRED YES - ) - # Benchmarks find_package(benchmark QUIET) diff --git a/fuse_models/include/fuse_models/acceleration_2d.h b/fuse_models/include/fuse_models/acceleration_2d.h index c477ef7b6..9b060d4ba 100644 --- a/fuse_models/include/fuse_models/acceleration_2d.h +++ b/fuse_models/include/fuse_models/acceleration_2d.h @@ -35,9 +35,9 @@ #define FUSE_MODELS_ACCELERATION_2D_H #include -#include #include +#include #include #include @@ -117,7 +117,7 @@ class Acceleration2D : public fuse_core::AsyncSensorModel ros::Subscriber subscriber_; - using AccelerationThrottledCallback = common::ThrottledCallback; + using AccelerationThrottledCallback = fuse_core::ThrottledMessageCallback; AccelerationThrottledCallback throttled_callback_; }; diff --git a/fuse_models/include/fuse_models/imu_2d.h b/fuse_models/include/fuse_models/imu_2d.h index 332b0d536..ab58dfcd1 100644 --- a/fuse_models/include/fuse_models/imu_2d.h +++ b/fuse_models/include/fuse_models/imu_2d.h @@ -35,7 +35,7 @@ #define FUSE_MODELS_IMU_2D_H #include -#include +#include #include #include @@ -137,7 +137,7 @@ class Imu2D : public fuse_core::AsyncSensorModel ros::Subscriber subscriber_; - using ImuThrottledCallback = common::ThrottledCallback; + using ImuThrottledCallback = fuse_core::ThrottledMessageCallback; ImuThrottledCallback throttled_callback_; }; diff --git a/fuse_models/include/fuse_models/odometry_2d.h b/fuse_models/include/fuse_models/odometry_2d.h index 3c86c98bf..061d3615a 100644 --- a/fuse_models/include/fuse_models/odometry_2d.h +++ b/fuse_models/include/fuse_models/odometry_2d.h @@ -35,7 +35,7 @@ #define FUSE_MODELS_ODOMETRY_2D_H #include -#include +#include #include #include @@ -133,7 +133,7 @@ class Odometry2D : public fuse_core::AsyncSensorModel ros::Subscriber subscriber_; - using OdometryThrottledCallback = common::ThrottledCallback; + using OdometryThrottledCallback = fuse_core::ThrottledMessageCallback; OdometryThrottledCallback throttled_callback_; }; diff --git a/fuse_models/include/fuse_models/pose_2d.h b/fuse_models/include/fuse_models/pose_2d.h index 9761139da..5f4738126 100644 --- a/fuse_models/include/fuse_models/pose_2d.h +++ b/fuse_models/include/fuse_models/pose_2d.h @@ -35,9 +35,9 @@ #define FUSE_MODELS_POSE_2D_H #include -#include #include +#include #include #include @@ -122,7 +122,7 @@ class Pose2D : public fuse_core::AsyncSensorModel ros::Subscriber subscriber_; - using PoseThrottledCallback = common::ThrottledCallback; + using PoseThrottledCallback = fuse_core::ThrottledMessageCallback; PoseThrottledCallback throttled_callback_; }; diff --git a/fuse_models/include/fuse_models/twist_2d.h b/fuse_models/include/fuse_models/twist_2d.h index 9528ca816..650d8993b 100644 --- a/fuse_models/include/fuse_models/twist_2d.h +++ b/fuse_models/include/fuse_models/twist_2d.h @@ -35,7 +35,7 @@ #define FUSE_MODELS_TWIST_2D_H #include -#include +#include #include #include @@ -111,7 +111,7 @@ class Twist2D : public fuse_core::AsyncSensorModel ros::Subscriber subscriber_; - using TwistThrottledCallback = common::ThrottledCallback; + using TwistThrottledCallback = fuse_core::ThrottledMessageCallback; TwistThrottledCallback throttled_callback_; }; diff --git a/fuse_models/src/acceleration_2d.cpp b/fuse_models/src/acceleration_2d.cpp index eabd61824..162ac03c4 100644 --- a/fuse_models/src/acceleration_2d.cpp +++ b/fuse_models/src/acceleration_2d.cpp @@ -77,8 +77,8 @@ void Acceleration2D::onStart() { if (!params_.indices.empty()) { - subscriber_ = node_handle_.subscribe(ros::names::resolve(params_.topic), params_.queue_size, - &AccelerationThrottledCallback::callback, &throttled_callback_); + subscriber_ = node_handle_.subscribe(ros::names::resolve(params_.topic), + params_.queue_size, &AccelerationThrottledCallback::callback, &throttled_callback_); } } diff --git a/fuse_models/src/imu_2d.cpp b/fuse_models/src/imu_2d.cpp index 48c5d98f0..8b039dde3 100644 --- a/fuse_models/src/imu_2d.cpp +++ b/fuse_models/src/imu_2d.cpp @@ -88,8 +88,8 @@ void Imu2D::onStart() !params_.angular_velocity_indices.empty()) { previous_pose_.reset(); - subscriber_ = node_handle_.subscribe(ros::names::resolve(params_.topic), params_.queue_size, - &ImuThrottledCallback::callback, &throttled_callback_); + subscriber_ = node_handle_.subscribe(ros::names::resolve(params_.topic), params_.queue_size, + &ImuThrottledCallback::callback, &throttled_callback_); } } diff --git a/fuse_models/src/odometry_2d.cpp b/fuse_models/src/odometry_2d.cpp index fc9ec5fc9..71116af8c 100644 --- a/fuse_models/src/odometry_2d.cpp +++ b/fuse_models/src/odometry_2d.cpp @@ -89,8 +89,8 @@ void Odometry2D::onStart() !params_.angular_velocity_indices.empty()) { previous_pose_.reset(); - subscriber_ = node_handle_.subscribe(ros::names::resolve(params_.topic), params_.queue_size, - &OdometryThrottledCallback::callback, &throttled_callback_); + subscriber_ = node_handle_.subscribe(ros::names::resolve(params_.topic), params_.queue_size, + &OdometryThrottledCallback::callback, &throttled_callback_); } } diff --git a/fuse_models/src/pose_2d.cpp b/fuse_models/src/pose_2d.cpp index 111d89711..42134ef8d 100644 --- a/fuse_models/src/pose_2d.cpp +++ b/fuse_models/src/pose_2d.cpp @@ -79,8 +79,8 @@ void Pose2D::onStart() if (!params_.position_indices.empty() || !params_.orientation_indices.empty()) { - subscriber_ = node_handle_.subscribe(ros::names::resolve(params_.topic), params_.queue_size, - &PoseThrottledCallback::callback, &throttled_callback_); + subscriber_ = node_handle_.subscribe( + ros::names::resolve(params_.topic), params_.queue_size, &PoseThrottledCallback::callback, &throttled_callback_); } } diff --git a/fuse_models/src/twist_2d.cpp b/fuse_models/src/twist_2d.cpp index d213f4016..3a7d646b9 100644 --- a/fuse_models/src/twist_2d.cpp +++ b/fuse_models/src/twist_2d.cpp @@ -79,8 +79,8 @@ void Twist2D::onStart() if (!params_.linear_indices.empty() || !params_.angular_indices.empty()) { - subscriber_ = node_handle_.subscribe(ros::names::resolve(params_.topic), params_.queue_size, - &TwistThrottledCallback::callback, &throttled_callback_); + subscriber_ = node_handle_.subscribe(ros::names::resolve(params_.topic), + params_.queue_size, &TwistThrottledCallback::callback, &throttled_callback_); } } diff --git a/fuse_models/test/throttled_callback.test b/fuse_models/test/throttled_callback.test deleted file mode 100644 index fcc494f9b..000000000 --- a/fuse_models/test/throttled_callback.test +++ /dev/null @@ -1,4 +0,0 @@ - - - - diff --git a/fuse_optimizers/include/fuse_optimizers/batch_optimizer_params.h b/fuse_optimizers/include/fuse_optimizers/batch_optimizer_params.h index 003d707cb..cc92ee4c8 100644 --- a/fuse_optimizers/include/fuse_optimizers/batch_optimizer_params.h +++ b/fuse_optimizers/include/fuse_optimizers/batch_optimizer_params.h @@ -93,14 +93,10 @@ struct BatchOptimizerParams } else { - double optimization_period_sec{ optimization_period.toSec() }; - fuse_core::getPositiveParam(nh, "optimization_period", optimization_period_sec); - optimization_period.fromSec(optimization_period_sec); + fuse_core::getPositiveParam(nh, "optimization_period", optimization_period); } - double transaction_timeout_sec{ transaction_timeout.toSec() }; - fuse_core::getPositiveParam(nh, "transaction_timeout", transaction_timeout_sec); - transaction_timeout.fromSec(transaction_timeout_sec); + fuse_core::getPositiveParam(nh, "transaction_timeout", transaction_timeout); fuse_core::loadSolverOptionsFromROS(ros::NodeHandle(nh, "solver_options"), solver_options); } diff --git a/fuse_optimizers/include/fuse_optimizers/fixed_lag_smoother_params.h b/fuse_optimizers/include/fuse_optimizers/fixed_lag_smoother_params.h index 8484d1c40..a9c0972c1 100644 --- a/fuse_optimizers/include/fuse_optimizers/fixed_lag_smoother_params.h +++ b/fuse_optimizers/include/fuse_optimizers/fixed_lag_smoother_params.h @@ -95,9 +95,7 @@ struct FixedLagSmootherParams void loadFromROS(const ros::NodeHandle& nh) { // Read settings from the parameter server - double lag_duration_sec{ lag_duration.toSec() }; - fuse_core::getPositiveParam(nh, "lag_duration", lag_duration_sec); - lag_duration.fromSec(lag_duration_sec); + fuse_core::getPositiveParam(nh, "lag_duration", lag_duration); if (nh.hasParam("optimization_frequency")) { @@ -107,16 +105,12 @@ struct FixedLagSmootherParams } else { - double optimization_period_sec{ optimization_period.toSec() }; - fuse_core::getPositiveParam(nh, "optimization_period", optimization_period_sec); - optimization_period.fromSec(optimization_period_sec); + fuse_core::getPositiveParam(nh, "optimization_period", optimization_period); } nh.getParam("reset_service", reset_service); - double transaction_timeout_sec{ transaction_timeout.toSec() }; - fuse_core::getPositiveParam(nh, "transaction_timeout", transaction_timeout_sec); - transaction_timeout.fromSec(transaction_timeout_sec); + fuse_core::getPositiveParam(nh, "transaction_timeout", transaction_timeout); fuse_core::loadSolverOptionsFromROS(ros::NodeHandle(nh, "solver_options"), solver_options); } diff --git a/fuse_publishers/include/fuse_publishers/serialized_publisher.h b/fuse_publishers/include/fuse_publishers/serialized_publisher.h index 6bc56e3d5..8068509d0 100644 --- a/fuse_publishers/include/fuse_publishers/serialized_publisher.h +++ b/fuse_publishers/include/fuse_publishers/serialized_publisher.h @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -81,9 +82,21 @@ class SerializedPublisher : public fuse_core::AsyncPublisher fuse_core::Graph::ConstSharedPtr graph) override; protected: - std::string frame_id_; //!< The name of the frame for this path + /** + * @brief Publish the serialized graph + * + * @param[in] graph A read-only pointer to the graph object, allowing queries to be performed whenever needed + * @param[in] stamp A ros::Time stamp used for the serialized graph message published + */ + void graphPublisherCallback(fuse_core::Graph::ConstSharedPtr graph, const ros::Time& stamp) const; + + std::string frame_id_; //!< The name of the frame for the serialized graph and transaction messages published ros::Publisher graph_publisher_; ros::Publisher transaction_publisher_; + + using GraphPublisherCallback = std::function; + using GraphPublisherThrottledCallback = fuse_core::ThrottledCallback; + GraphPublisherThrottledCallback graph_publisher_throttled_callback_; //!< The graph publisher throttled callback }; } // namespace fuse_publishers diff --git a/fuse_publishers/src/serialized_publisher.cpp b/fuse_publishers/src/serialized_publisher.cpp index dc0ed959b..85ecb9f44 100644 --- a/fuse_publishers/src/serialized_publisher.cpp +++ b/fuse_publishers/src/serialized_publisher.cpp @@ -36,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -52,7 +53,9 @@ namespace fuse_publishers SerializedPublisher::SerializedPublisher() : fuse_core::AsyncPublisher(1), - frame_id_("map") + frame_id_("map"), + graph_publisher_throttled_callback_( + std::bind(&SerializedPublisher::graphPublisherCallback, this, std::placeholders::_1, std::placeholders::_2)) { } @@ -64,6 +67,15 @@ void SerializedPublisher::onInit() bool latch = false; private_node_handle_.getParam("latch", latch); + ros::Duration graph_throttle_period{ 0.0 }; + fuse_core::getPositiveParam(private_node_handle_, "graph_throttle_period", graph_throttle_period, false); + + bool graph_throttle_use_wall_time{ false }; + private_node_handle_.getParam("graph_throttle_use_wall_time", graph_throttle_use_wall_time); + + graph_publisher_throttled_callback_.setThrottlePeriod(graph_throttle_period); + graph_publisher_throttled_callback_.setUseWallTime(graph_throttle_use_wall_time); + // Advertise the topics graph_publisher_ = private_node_handle_.advertise("graph", 1, latch); transaction_publisher_ = private_node_handle_.advertise("transaction", 1, latch); @@ -76,11 +88,7 @@ void SerializedPublisher::notifyCallback( const auto& stamp = transaction->stamp(); if (graph_publisher_.getNumSubscribers() > 0) { - fuse_msgs::SerializedGraph msg; - msg.header.stamp = stamp; - msg.header.frame_id = frame_id_; - fuse_core::serializeGraph(*graph, msg); - graph_publisher_.publish(msg); + graph_publisher_throttled_callback_(graph, stamp); } if (transaction_publisher_.getNumSubscribers() > 0) @@ -93,4 +101,13 @@ void SerializedPublisher::notifyCallback( } } +void SerializedPublisher::graphPublisherCallback(fuse_core::Graph::ConstSharedPtr graph, const ros::Time& stamp) const +{ + fuse_msgs::SerializedGraph msg; + msg.header.stamp = stamp; + msg.header.frame_id = frame_id_; + fuse_core::serializeGraph(*graph, msg); + graph_publisher_.publish(msg); +} + } // namespace fuse_publishers