Skip to content

Commit

Permalink
enabled static executor and added add callback group feature
Browse files Browse the repository at this point in the history
  • Loading branch information
peterpena committed Jul 20, 2020
1 parent 4d13604 commit 6a37e96
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 2 deletions.
2 changes: 1 addition & 1 deletion rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/duration.cpp
src/rclcpp/event.cpp
src/rclcpp/exceptions/exceptions.cpp
# src/rclcpp/executable_list.cpp
src/rclcpp/executable_list.cpp
src/rclcpp/executor.cpp
src/rclcpp/executors.cpp
src/rclcpp/expand_topic_or_service_name.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ class StaticExecutorEntitiesCollector final
size_t
get_number_of_ready_guard_conditions() override;

RCLCPP_PUBLIC
void
add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr);

/**
* \sa rclcpp::Executor::add_node()
* \throw std::runtime_error if node was already added
Expand Down Expand Up @@ -223,6 +229,11 @@ class StaticExecutorEntitiesCollector final
/// Memory strategy: an interface for handling user-defined memory allocation strategies.
rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;

typedef std::map<rclcpp::CallbackGroup::WeakPtr,
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>> WeakCallbackGroupsToNodesMap;
WeakCallbackGroupsToNodesMap weak_groups_to_nodes_;

/// List of weak nodes registered in the static executor
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor
void
spin() override;

RCLCPP_PUBLIC
void
add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify) override;

/// Add a node to the executor.
/**
* An executor can have zero or more nodes which provide work during `spin` functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void
StaticExecutorEntitiesCollector::fill_memory_strategy()
{
memory_strategy_->clear_handles();
bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_);
bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_groups_to_nodes_);

// Clean up any invalid nodes, if they were detected
if (has_invalid_weak_nodes) {
Expand Down Expand Up @@ -236,6 +236,20 @@ StaticExecutorEntitiesCollector::add_node(
guard_conditions_.push_back(node_ptr->get_notify_guard_condition());
}

void
StaticExecutorEntitiesCollector::add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
{
rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
auto insert_info = weak_groups_to_nodes_.insert(std::make_pair(weak_group_ptr, node_ptr));
group_ptr->allow_executor_to_add().store(false);
bool was_inserted = insert_info.second;
if (!was_inserted) {
throw std::runtime_error("Callback group was already added to executor.");
}
}

bool
StaticExecutorEntitiesCollector::remove_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Expand Down
31 changes: 31 additions & 0 deletions rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,28 @@ StaticSingleThreadedExecutor::spin()
}
}

void
StaticSingleThreadedExecutor::add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify)
{
// If the node already has an executor
std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic();
if (has_executor.exchange(true)) {
throw std::runtime_error("Node has already been added to an executor.");
}

if (notify &&
node_ptr->get_associated_with_executor_atomic().exchange(true)) {
// Interrupt waiting to handle new node
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
throw std::runtime_error(rcl_get_error_string().str);
}
}
entities_collector_->add_callback_group(group_ptr, node_ptr);
}

void
StaticSingleThreadedExecutor::add_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
Expand All @@ -67,6 +89,15 @@ StaticSingleThreadedExecutor::add_node(
}

entities_collector_->add_node(node_ptr);

std::for_each(
node_ptr->get_callback_groups().begin(), node_ptr->get_callback_groups().end(),
[this, node_ptr, notify] (rclcpp::CallbackGroup::WeakPtr weak_group){
auto group_ptr = weak_group.lock();
if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load()) {
add_callback_group(group_ptr, node_ptr, notify);
}
});
}

void
Expand Down

0 comments on commit 6a37e96

Please sign in to comment.