Skip to content

Commit

Permalink
Port fuse_publishers and some upstream tests
Browse files Browse the repository at this point in the history
Signed-off-by: methylDragon <methylDragon@gmail.com>
  • Loading branch information
methylDragon committed Dec 20, 2022
1 parent 8f66478 commit cb6b42e
Show file tree
Hide file tree
Showing 31 changed files with 747 additions and 576 deletions.
5 changes: 5 additions & 0 deletions fuse_constraints/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ install(DIRECTORY include/
DESTINATION include/${PROJECT_NAME}
)

install(
DIRECTORY cmake
DESTINATION share/${PROJECT_NAME}
)

pluginlib_export_plugin_description_file(fuse_core fuse_plugins.xml)

ament_export_targets(${PROJECT_NAME}-export HAS_LIBRARY_TARGET)
Expand Down
1 change: 1 addition & 0 deletions fuse_constraints/suitesparse-extras.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_LIST_DIR})
find_package(SUITESPARSE REQUIRED COMPONENTS CCOLAMD)
17 changes: 13 additions & 4 deletions fuse_core/include/fuse_core/async_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <fuse_core/callback_wrapper.hpp>
#include <fuse_core/fuse_macros.hpp>
#include <fuse_core/graph.hpp>
#include <fuse_core/node_interfaces/node_interfaces.hpp>
#include <fuse_core/publisher.hpp>
#include <fuse_core/transaction.hpp>
#include <rclcpp/rclcpp.hpp>
Expand Down Expand Up @@ -84,7 +85,13 @@ class AsyncPublisher : public Publisher
* @param[in] name A unique name to give this plugin instance
* @throws runtime_error if already initialized
*/
void initialize(const std::string & name) override;
void initialize(
node_interfaces::NodeInterfaces<
node_interfaces::Base,
node_interfaces::Waitables
> interfaces,
const std::string & name,
size_t thread_count);

/**
* @brief Get the unique name of this publisher
Expand Down Expand Up @@ -151,11 +158,13 @@ class AsyncPublisher : public Publisher
std::shared_ptr<fuse_core::CallbackAdapter> callback_queue_;

std::string name_; //!< The unique name for this publisher instance
rclcpp::Node::SharedPtr node_; //!< The node for this publisher
rclcpp::CallbackGroup::SharedPtr cb_group_; //!< Internal re-entrant callback group

//! The node interfaces
node_interfaces::NodeInterfaces<node_interfaces::Base, node_interfaces::Waitables> interfaces_;

//! A single/multi-threaded executor assigned to the local callback queue
rclcpp::executors::MultiThreadedExecutor::SharedPtr executor_;
rclcpp::Executor::SharedPtr executor_;
size_t executor_thread_count_;
std::thread spinner_; //!< Internal thread for spinning the executor
std::atomic<bool> initialized_ = false; //!< True if instance has been fully initialized
Expand All @@ -167,7 +176,7 @@ class AsyncPublisher : public Publisher
*
* @param[in] thread_count The number of threads used to service the local callback queue
*/
explicit AsyncPublisher(size_t thread_count = 1);
AsyncPublisher();

/**
* @brief Perform any required initialization for the publisher
Expand Down
3 changes: 2 additions & 1 deletion fuse_core/include/fuse_core/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class Publisher
*
* @param[in] name A unique name to give this plugin instance
*/
virtual void initialize(const std::string & name) = 0;
template<typename NodeT>
void initialize(NodeT interfaces, const std::string & name, size_t thread_count = 1);

/**
* @brief Get the unique name of this publisher
Expand Down
56 changes: 31 additions & 25 deletions fuse_core/src/async_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@
namespace fuse_core
{

AsyncPublisher::AsyncPublisher(size_t thread_count)
: name_("uninitialized"),
executor_thread_count_(thread_count)
AsyncPublisher::AsyncPublisher()
: name_("uninitialized")
{
}

Expand All @@ -48,43 +47,50 @@ AsyncPublisher::~AsyncPublisher()
internal_stop();
}

void AsyncPublisher::initialize(const std::string & name)
void AsyncPublisher::initialize(
node_interfaces::NodeInterfaces<
node_interfaces::Base,
node_interfaces::Waitables
> interfaces,
const std::string & name,
size_t thread_count)
{
executor_thread_count_ = thread_count;
interfaces_ = interfaces;

if (initialized_) {
throw std::runtime_error("Calling initialize on an already initialized AsyncPublisher!");
}

// Initialize internal state
name_ = name;
std::string node_namespace = "";

// TODO(CH3): Pass in the context or a node to get the context from
rclcpp::Context::SharedPtr ros_context = rclcpp::contexts::get_global_default_context();
auto node_options = rclcpp::NodeOptions();
node_options.context(ros_context); // set a context to generate the node in

// TODO(CH3): Potentially pass in the optimizer node instead of spinning a new one
node_ = rclcpp::Node::make_shared(name_, node_namespace, node_options);
name_ = name; // NOTE(methylDragon): Used in derived classes

auto context = interfaces_.get_node_base_interface()->get_context();
auto executor_options = rclcpp::ExecutorOptions();
executor_options.context = ros_context;
executor_ = rclcpp::executors::MultiThreadedExecutor::make_shared(
executor_options,
executor_thread_count_);
executor_options.context = context;

if (executor_thread_count_ == 1) {
executor_ = rclcpp::executors::SingleThreadedExecutor::make_shared(executor_options);
} else {
executor_ = rclcpp::executors::MultiThreadedExecutor::make_shared(
executor_options, executor_thread_count_);
}

callback_queue_ = std::make_shared<CallbackAdapter>(ros_context);
callback_queue_ =
std::make_shared<CallbackAdapter>(context);

// This callback group MUST be re-entrant in order to support parallelization
cb_group_ = node_->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
node_->get_node_waitables_interface()->add_waitable(
callback_queue_, cb_group_);
cb_group_ = interfaces_.get_node_base_interface()->create_callback_group(
rclcpp::CallbackGroupType::Reentrant, false);
interfaces_.get_node_waitables_interface()->add_waitable(callback_queue_, cb_group_);

// Call the derived onInit() function to perform implementation-specific initialization
onInit();

// Make sure the executor will service the given node
// TODO(sloretz) add just the callback group here when using Optimizer's Node
executor_->add_node(node_);
// We can add this without any guards because the callback group was set to not get automatically
// added to executors
executor_->add_callback_group(cb_group_, interfaces_.get_node_base_interface());

// Start the executor
spinner_ = std::thread(
Expand Down Expand Up @@ -127,7 +133,7 @@ void AsyncPublisher::stop()
{
// Prefer to call onStop in executor's thread so downstream users don't have
// to worry about threads in ROS callbacks when there's only 1 thread.
if (node_->get_node_base_interface()->get_context()->is_valid()) {
if (interfaces_.get_node_base_interface()->get_context()->is_valid()) {
auto callback = std::make_shared<CallbackWrapper<void>>(
std::bind(&AsyncPublisher::onStop, this)
);
Expand Down
2 changes: 1 addition & 1 deletion fuse_core/test/launch_tests/test_parameters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ TEST_F(TestParameters, GetCovarianceDiagonalParam)
fuse_core::Matrix3d default_covariance = fuse_core::Matrix3d::Identity();
default_covariance *= default_variance;

// Load covariance matrix diagonal from the parameter server:
auto node = rclcpp::Node::make_shared("test_parameters_node");

// Load covariance matrix diagonal from the parameter server:
// A covariance diagonal with the expected size and valid should be the same as the expected one:
{
const std::string parameter_name{"covariance_diagonal"};
Expand Down
13 changes: 8 additions & 5 deletions fuse_core/test/test_async_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MyPublisher : public fuse_core::AsyncPublisher
{
public:
MyPublisher()
: fuse_core::AsyncPublisher(0),
: fuse_core::AsyncPublisher(),
callback_processed(false),
initialized(false)
{
Expand Down Expand Up @@ -87,24 +87,27 @@ class TestAsyncPublisher : public ::testing::Test
TEST_F(TestAsyncPublisher, OnInit)
{
for (int i = 0; i < 250; i++) {
auto node = rclcpp::Node::make_shared("test_async_pub_node");
MyPublisher publisher;
publisher.initialize("my_publisher_" + std::to_string(i));
publisher.initialize(node, "my_publisher_" + std::to_string(i), 0);
EXPECT_TRUE(publisher.initialized);
}
}

TEST_F(TestAsyncPublisher, DoubleInit)
{
auto node = rclcpp::Node::make_shared("test_async_pub_node");
MyPublisher publisher;
publisher.initialize("my_publisher");
publisher.initialize(node, "my_publisher", 0);
EXPECT_TRUE(publisher.initialized);
EXPECT_THROW(publisher.initialize("test"), std::runtime_error);
EXPECT_THROW(publisher.initialize(node, "test", 0), std::runtime_error);
}

TEST_F(TestAsyncPublisher, notifyCallback)
{
auto node = rclcpp::Node::make_shared("test_async_pub_node");
MyPublisher publisher;
publisher.initialize("my_publisher");
publisher.initialize(node, "my_publisher", 0);

// Execute the notify() method in this thread. This should push a call to
// MyPublisher::notifyCallback() into MyPublisher's callback queue, which will get executed by
Expand Down
2 changes: 1 addition & 1 deletion fuse_models/include/fuse_models/odometry_2d_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ namespace fuse_models
* - tf, tf_static (tf2_msgs::TFMessage) Subscribes to tf data to obtain the requisite odom->base_link transform,
* but only if the world_frame_id is set to the value of the map_frame_id.
*/
class Odometry2DPublisher : public fuse_core::AsyncPublisher
class Odometry2DPublisher : public fuse_core::AsyncPublisher // TODO(methylDragon): Refactor this in the same way it was done in fuse_publishers
{
public:
FUSE_SMART_PTR_DEFINITIONS_WITH_EIGEN(Odometry2DPublisher)
Expand Down
Loading

0 comments on commit cb6b42e

Please sign in to comment.