Skip to content

Commit

Permalink
Fix bug that a callback not reached (#1640)
Browse files Browse the repository at this point in the history
* Add a test case

a subscriber on a new executor with a callback group triggered to receive a message

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* fix flaky and not to use spin_some

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* update comment

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* update for not using anti-pattern source code

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* add a notify guard condition for callback group

Co-authored-by: William Woodall <william@osrfoundation.org>
Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* Notify guard condition of Node not to be used in Executor

it is only for the waitset of GraphListener

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* put code in the try catch

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* defer to create guard condition

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* use context directly for the create function

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* cpplint

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* fix that some case might call add_node after shutdown

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* nitpick and fix some potential bug

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* add sanity check as some case might not create notify guard condition after shutdown

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* Cleanup includes.

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>

* remove destroy method

make a callback group can only create one guard condition

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* remove limitation that guard condition can not be re-created in callback group

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

Signed-off-by: Chen Lihui <lihui.chen@sony.com>
Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>
Co-authored-by: William Woodall <william@osrfoundation.org>
Co-authored-by: Chris Lalancette <clalancette@openrobotics.org>
  • Loading branch information
3 people committed Oct 26, 2022
1 parent 85c0af4 commit d119157
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 51 deletions.
22 changes: 21 additions & 1 deletion rclcpp/include/rclcpp/callback_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
#define RCLCPP__CALLBACK_GROUP_HPP_

#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

#include "rclcpp/client.hpp"
#include "rclcpp/context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/publisher_base.hpp"
#include "rclcpp/service.hpp"
#include "rclcpp/subscription_base.hpp"
Expand Down Expand Up @@ -95,6 +98,10 @@ class CallbackGroup
CallbackGroupType group_type,
bool automatically_add_to_executor_with_node = true);

/// Default destructor.
RCLCPP_PUBLIC
~CallbackGroup();

template<typename Function>
rclcpp::SubscriptionBase::SharedPtr
find_subscription_ptrs_if(Function func) const
Expand Down Expand Up @@ -171,6 +178,16 @@ class CallbackGroup
bool
automatically_add_to_executor_with_node() const;

/// Defer creating the notify guard condition and return it.
RCLCPP_PUBLIC
rclcpp::GuardCondition::SharedPtr
get_notify_guard_condition(const rclcpp::Context::SharedPtr context_ptr);

/// Trigger the notify guard condition.
RCLCPP_PUBLIC
void
trigger_notify_guard_condition();

protected:
RCLCPP_DISABLE_COPY(CallbackGroup)

Expand Down Expand Up @@ -213,6 +230,9 @@ class CallbackGroup
std::vector<rclcpp::Waitable::WeakPtr> waitable_ptrs_;
std::atomic_bool can_be_taken_from_;
const bool automatically_add_to_executor_with_node_;
// defer the creation of the guard condition
std::shared_ptr<rclcpp::GuardCondition> notify_guard_condition_ = nullptr;
std::recursive_mutex notify_guard_condition_mutex_;

private:
template<typename TypeT, typename Function>
Expand Down
12 changes: 6 additions & 6 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,14 +560,14 @@ class Executor
virtual void
spin_once_impl(std::chrono::nanoseconds timeout);

typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
typedef std::map<rclcpp::CallbackGroup::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
WeakNodesToGuardConditionsMap;
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
WeakCallbackGroupsToGuardConditionsMap;

/// maps nodes to guard conditions
WeakNodesToGuardConditionsMap
weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
/// maps callback groups to guard conditions
WeakCallbackGroupsToGuardConditionsMap
weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// maps callback groups associated to nodes
WeakCallbackGroupsToNodesMap
Expand Down
45 changes: 43 additions & 2 deletions rclcpp/src/rclcpp/callback_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rclcpp/callback_group.hpp"
#include <algorithm>
#include <atomic>
#include <functional>
#include <memory>
#include <mutex>
#include <stdexcept>

#include <vector>
#include "rclcpp/callback_group.hpp"
#include "rclcpp/client.hpp"
#include "rclcpp/service.hpp"
#include "rclcpp/subscription_base.hpp"
#include "rclcpp/timer.hpp"
#include "rclcpp/waitable.hpp"

using rclcpp::CallbackGroup;
using rclcpp::CallbackGroupType;
Expand All @@ -27,6 +37,10 @@ CallbackGroup::CallbackGroup(
automatically_add_to_executor_with_node_(automatically_add_to_executor_with_node)
{}

CallbackGroup::~CallbackGroup()
{
trigger_notify_guard_condition();
}

std::atomic_bool &
CallbackGroup::can_be_taken_from()
Expand Down Expand Up @@ -97,6 +111,33 @@ CallbackGroup::automatically_add_to_executor_with_node() const
return automatically_add_to_executor_with_node_;
}

rclcpp::GuardCondition::SharedPtr
CallbackGroup::get_notify_guard_condition(const rclcpp::Context::SharedPtr context_ptr)
{
std::lock_guard<std::recursive_mutex> lock(notify_guard_condition_mutex_);
if (notify_guard_condition_ && context_ptr != notify_guard_condition_->get_context()) {
if (associated_with_executor_) {
trigger_notify_guard_condition();
}
notify_guard_condition_ = nullptr;
}

if (!notify_guard_condition_) {
notify_guard_condition_ = std::make_shared<rclcpp::GuardCondition>(context_ptr);
}

return notify_guard_condition_;
}

void
CallbackGroup::trigger_notify_guard_condition()
{
std::lock_guard<std::recursive_mutex> lock(notify_guard_condition_mutex_);
if (notify_guard_condition_) {
notify_guard_condition_->trigger();
}
}

void
CallbackGroup::add_subscription(
const rclcpp::SubscriptionBase::SharedPtr subscription_ptr)
Expand Down
58 changes: 32 additions & 26 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ Executor::~Executor()
weak_groups_associated_with_executor_to_nodes_.clear();
weak_groups_to_nodes_associated_with_executor_.clear();
weak_groups_to_nodes_.clear();
for (const auto & pair : weak_nodes_to_guard_conditions_) {
for (const auto & pair : weak_groups_to_guard_conditions_) {
auto guard_condition = pair.second;
memory_strategy_->remove_guard_condition(guard_condition);
}
weak_nodes_to_guard_conditions_.clear();
weak_groups_to_guard_conditions_.clear();

// Finalize the wait set.
if (rcl_wait_set_fini(&wait_set_) != RCL_RET_OK) {
Expand Down Expand Up @@ -204,8 +204,7 @@ Executor::add_callback_group_to_map(
if (has_executor.exchange(true)) {
throw std::runtime_error("Callback group has already been added to an executor.");
}
bool is_new_node = !has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_) &&
!has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_);

rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr;
auto insert_info =
weak_groups_to_nodes.insert(std::make_pair(weak_group_ptr, node_ptr));
Expand All @@ -215,21 +214,24 @@ Executor::add_callback_group_to_map(
}
// Also add to the map that contains all callback groups
weak_groups_to_nodes_.insert(std::make_pair(weak_group_ptr, node_ptr));
if (is_new_node) {
const auto & gc = node_ptr->get_notify_guard_condition();
weak_nodes_to_guard_conditions_[node_ptr] = &gc;
if (notify) {
// Interrupt waiting to handle new node
try {
interrupt_guard_condition_.trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group add: ") + ex.what());
}

if (node_ptr->get_context()->is_valid()) {
auto callback_group_guard_condition =
group_ptr->get_notify_guard_condition(node_ptr->get_context());
weak_groups_to_guard_conditions_[weak_group_ptr] = callback_group_guard_condition.get();
// Add the callback_group's notify condition to the guard condition handles
memory_strategy_->add_guard_condition(*callback_group_guard_condition);
}

if (notify) {
// Interrupt waiting to handle new node
try {
interrupt_guard_condition_.trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group add: ") + ex.what());
}
// Add the node's notify condition to the guard condition handles
memory_strategy_->add_guard_condition(gc);
}
}

Expand Down Expand Up @@ -300,7 +302,12 @@ Executor::remove_callback_group_from_map(
if (!has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_) &&
!has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_))
{
weak_nodes_to_guard_conditions_.erase(node_ptr);
auto iter = weak_groups_to_guard_conditions_.find(weak_group_ptr);
if (iter != weak_groups_to_guard_conditions_.end()) {
memory_strategy_->remove_guard_condition(iter->second);
}
weak_groups_to_guard_conditions_.erase(weak_group_ptr);

if (notify) {
try {
interrupt_guard_condition_.trigger();
Expand All @@ -310,7 +317,6 @@ Executor::remove_callback_group_from_map(
"Failed to trigger guard condition on callback group remove: ") + ex.what());
}
}
memory_strategy_->remove_guard_condition(&node_ptr->get_notify_guard_condition());
}
}

Expand Down Expand Up @@ -700,12 +706,6 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
auto weak_node_ptr = pair.second;
if (weak_group_ptr.expired() || weak_node_ptr.expired()) {
invalid_group_ptrs.push_back(weak_group_ptr);
auto node_guard_pair = weak_nodes_to_guard_conditions_.find(weak_node_ptr);
if (node_guard_pair != weak_nodes_to_guard_conditions_.end()) {
auto guard_condition = node_guard_pair->second;
weak_nodes_to_guard_conditions_.erase(weak_node_ptr);
memory_strategy_->remove_guard_condition(guard_condition);
}
}
}
std::for_each(
Expand All @@ -721,6 +721,12 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
{
weak_groups_associated_with_executor_to_nodes_.erase(group_ptr);
}
auto callback_guard_pair = weak_groups_to_guard_conditions_.find(group_ptr);
if (callback_guard_pair != weak_groups_to_guard_conditions_.end()) {
auto guard_condition = callback_guard_pair->second;
weak_groups_to_guard_conditions_.erase(group_ptr);
memory_strategy_->remove_guard_condition(guard_condition);
}
weak_groups_to_nodes_.erase(group_ptr);
});
}
Expand Down
12 changes: 8 additions & 4 deletions rclcpp/src/rclcpp/node_interfaces/node_services.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ NodeServices::add_service(
// TODO(jacquelinekay): use custom exception
throw std::runtime_error("Cannot create service, group not in node.");
}
group->add_service(service_base_ptr);
} else {
node_base_->get_default_callback_group()->add_service(service_base_ptr);
group = node_base_->get_default_callback_group();
}

group->add_service(service_base_ptr);

// Notify the executor that a new service was created using the parent Node.
auto & node_gc = node_base_->get_notify_guard_condition();
try {
node_gc.trigger();
group->trigger_notify_guard_condition();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string("failed to notify wait set on service creation: ") + ex.what());
Expand All @@ -60,15 +62,17 @@ NodeServices::add_client(
// TODO(jacquelinekay): use custom exception
throw std::runtime_error("Cannot create client, group not in node.");
}
group->add_client(client_base_ptr);
} else {
node_base_->get_default_callback_group()->add_client(client_base_ptr);
group = node_base_->get_default_callback_group();
}

group->add_client(client_base_ptr);

// Notify the executor that a new client was created using the parent Node.
auto & node_gc = node_base_->get_notify_guard_condition();
try {
node_gc.trigger();
group->trigger_notify_guard_condition();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string("failed to notify wait set on client creation: ") + ex.what());
Expand Down
5 changes: 3 additions & 2 deletions rclcpp/src/rclcpp/node_interfaces/node_timers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ NodeTimers::add_timer(
// TODO(jacquelinekay): use custom exception
throw std::runtime_error("Cannot create timer, group not in node.");
}
callback_group->add_timer(timer);
} else {
node_base_->get_default_callback_group()->add_timer(timer);
callback_group = node_base_->get_default_callback_group();
}
callback_group->add_timer(timer);

auto & node_gc = node_base_->get_notify_guard_condition();
try {
node_gc.trigger();
callback_group->trigger_notify_guard_condition();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string("failed to notify wait set on timer creation: ") + ex.what());
Expand Down
2 changes: 2 additions & 0 deletions rclcpp/src/rclcpp/node_interfaces/node_topics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ NodeTopics::add_publisher(
auto & node_gc = node_base_->get_notify_guard_condition();
try {
node_gc.trigger();
callback_group->trigger_notify_guard_condition();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string("failed to notify wait set on publisher creation: ") + ex.what());
Expand Down Expand Up @@ -121,6 +122,7 @@ NodeTopics::add_subscription(
auto & node_gc = node_base_->get_notify_guard_condition();
try {
node_gc.trigger();
callback_group->trigger_notify_guard_condition();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string("failed to notify wait set on subscription creation: ") + ex.what());
Expand Down
6 changes: 4 additions & 2 deletions rclcpp/src/rclcpp/node_interfaces/node_waitables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ NodeWaitables::add_waitable(
// TODO(jacobperron): use custom exception
throw std::runtime_error("Cannot create waitable, group not in node.");
}
group->add_waitable(waitable_ptr);
} else {
node_base_->get_default_callback_group()->add_waitable(waitable_ptr);
group = node_base_->get_default_callback_group();
}

group->add_waitable(waitable_ptr);

// Notify the executor that a new waitable was created using the parent Node.
auto & node_gc = node_base_->get_notify_guard_condition();
try {
node_gc.trigger();
group->trigger_notify_guard_condition();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string("failed to notify wait set on waitable creation: ") + ex.what());
Expand Down
21 changes: 13 additions & 8 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,20 @@ if(TARGET test_create_subscription)
"test_msgs"
)
endif()
ament_add_gtest(test_add_callback_groups_to_executor
test_add_callback_groups_to_executor.cpp
TIMEOUT 120)
if(TARGET test_add_callback_groups_to_executor)
target_link_libraries(test_add_callback_groups_to_executor ${PROJECT_NAME})
ament_target_dependencies(test_add_callback_groups_to_executor
"test_msgs"
function(test_add_callback_groups_to_executor_for_rmw_implementation)
set(rmw_implementation_env_var RMW_IMPLEMENTATION=${rmw_implementation})
ament_add_gmock(test_add_callback_groups_to_executor${target_suffix} test_add_callback_groups_to_executor.cpp
ENV ${rmw_implementation_env_var}
TIMEOUT 120
)
endif()
if(TARGET test_add_callback_groups_to_executor${target_suffix})
target_link_libraries(test_add_callback_groups_to_executor${target_suffix} ${PROJECT_NAME})
ament_target_dependencies(test_add_callback_groups_to_executor${target_suffix}
"test_msgs"
)
endif()
endfunction()
call_for_each_rmw_implementation(test_add_callback_groups_to_executor_for_rmw_implementation)
ament_add_gtest(test_expand_topic_or_service_name test_expand_topic_or_service_name.cpp)
if(TARGET test_expand_topic_or_service_name)
ament_target_dependencies(test_expand_topic_or_service_name
Expand Down
Loading

0 comments on commit d119157

Please sign in to comment.