From 5af4cb28326fb0b01003b6a388782e4205f58765 Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Wed, 9 Feb 2022 12:26:25 -0300 Subject: [PATCH 1/2] Add GuardCondition on trigger callback Signed-off-by: Mauro Passerino --- rclcpp/include/rclcpp/guard_condition.hpp | 16 ++++++++ rclcpp/src/rclcpp/guard_condition.cpp | 48 +++++++++++++++++++++-- 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/rclcpp/include/rclcpp/guard_condition.hpp b/rclcpp/include/rclcpp/guard_condition.hpp index fce42f1d4b..f614294505 100644 --- a/rclcpp/include/rclcpp/guard_condition.hpp +++ b/rclcpp/include/rclcpp/guard_condition.hpp @@ -98,10 +98,26 @@ class GuardCondition bool exchange_in_use_by_wait_set_state(bool in_use_state); + /// Adds the guard condition to a waitset + /** + * This function is thread-safe. + * \param[in] wait_set pointer to a wait set where to add the guard condition + */ + RCLCPP_PUBLIC + void + add_to_wait_set(rcl_wait_set_t * wait_set); + + RCLCPP_PUBLIC + void + set_on_trigger_callback(std::function callback); + protected: rclcpp::Context::SharedPtr context_; rcl_guard_condition_t rcl_guard_condition_; std::atomic in_use_by_wait_set_{false}; + std::recursive_mutex reentrant_mutex_; + std::function on_trigger_callback_{nullptr}; + size_t unread_count_{0}; }; } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/guard_condition.cpp b/rclcpp/src/rclcpp/guard_condition.cpp index cae0c4ce5c..8efd541902 100644 --- a/rclcpp/src/rclcpp/guard_condition.cpp +++ b/rclcpp/src/rclcpp/guard_condition.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "rclcpp/guard_condition.hpp" #include "rclcpp/exceptions.hpp" @@ -72,9 +74,16 @@ GuardCondition::get_rcl_guard_condition() const void GuardCondition::trigger() { - rcl_ret_t ret = rcl_trigger_guard_condition(&rcl_guard_condition_); - if (RCL_RET_OK != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret); + std::lock_guard lock(reentrant_mutex_); + + if (on_trigger_callback_) { + on_trigger_callback_(1); + } else { + rcl_ret_t ret = rcl_trigger_guard_condition(&rcl_guard_condition_); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + unread_count_++; } } @@ -84,4 +93,37 @@ GuardCondition::exchange_in_use_by_wait_set_state(bool in_use_state) return in_use_by_wait_set_.exchange(in_use_state); } +void +GuardCondition::add_to_wait_set(rcl_wait_set_t * wait_set) +{ + if (exchange_in_use_by_wait_set_state(true)) { + RCLCPP_ERROR( + rclcpp::get_logger("rclcpp"), + "guard condition already added to a wait set"); + } + rcl_ret_t ret = rcl_wait_set_add_guard_condition(wait_set, &this->rcl_guard_condition_, NULL); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to add guard condition to wait set"); + } +} + +void +GuardCondition::set_on_trigger_callback(std::function callback) +{ + std::lock_guard lock(reentrant_mutex_); + + if (callback) { + on_trigger_callback_ = callback; + + if (unread_count_) { + callback(unread_count_); + unread_count_ = 0; + } + return; + } + + on_trigger_callback_ = nullptr; +} + } // namespace rclcpp From 9d0ac6c46cea7bbd67e1f36f8e5df5c89e468f6b Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Wed, 9 Feb 2022 15:40:10 -0300 Subject: [PATCH 2/2] Add tests for new GuardCondition APIs Signed-off-by: Mauro Passerino --- rclcpp/include/rclcpp/guard_condition.hpp | 1 + rclcpp/src/rclcpp/guard_condition.cpp | 11 +++- rclcpp/test/rclcpp/test_guard_condition.cpp | 62 +++++++++++++++++++++ 3 files changed, 71 insertions(+), 3 deletions(-) diff --git a/rclcpp/include/rclcpp/guard_condition.hpp b/rclcpp/include/rclcpp/guard_condition.hpp index f614294505..f6f5af9586 100644 --- a/rclcpp/include/rclcpp/guard_condition.hpp +++ b/rclcpp/include/rclcpp/guard_condition.hpp @@ -118,6 +118,7 @@ class GuardCondition std::recursive_mutex reentrant_mutex_; std::function on_trigger_callback_{nullptr}; size_t unread_count_{0}; + rcl_wait_set_t * wait_set_{nullptr}; }; } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/guard_condition.cpp b/rclcpp/src/rclcpp/guard_condition.cpp index 8efd541902..ea68c78d73 100644 --- a/rclcpp/src/rclcpp/guard_condition.cpp +++ b/rclcpp/src/rclcpp/guard_condition.cpp @@ -96,11 +96,16 @@ GuardCondition::exchange_in_use_by_wait_set_state(bool in_use_state) void GuardCondition::add_to_wait_set(rcl_wait_set_t * wait_set) { + std::lock_guard lock(reentrant_mutex_); + if (exchange_in_use_by_wait_set_state(true)) { - RCLCPP_ERROR( - rclcpp::get_logger("rclcpp"), - "guard condition already added to a wait set"); + if (wait_set != wait_set_) { + throw std::runtime_error("guard condition has already been added to a wait set."); + } + } else { + wait_set_ = wait_set; } + rcl_ret_t ret = rcl_wait_set_add_guard_condition(wait_set, &this->rcl_guard_condition_, NULL); if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error( diff --git a/rclcpp/test/rclcpp/test_guard_condition.cpp b/rclcpp/test/rclcpp/test_guard_condition.cpp index 8100cf1c9b..481051ccf9 100644 --- a/rclcpp/test/rclcpp/test_guard_condition.cpp +++ b/rclcpp/test/rclcpp/test_guard_condition.cpp @@ -102,3 +102,65 @@ TEST_F(TestGuardCondition, trigger) { } } } + +/* + * Testing addition to a wait set + */ +TEST_F(TestGuardCondition, add_to_wait_set) { + { + { + auto gc = std::make_shared(); + + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_wait_set_add_guard_condition, RCL_RET_OK); + + rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); + EXPECT_NO_THROW(gc->add_to_wait_set(&wait_set)); + EXPECT_NO_THROW(gc->add_to_wait_set(&wait_set)); + + rcl_wait_set_t wait_set_2 = rcl_get_zero_initialized_wait_set(); + EXPECT_THROW(gc->add_to_wait_set(&wait_set_2), std::runtime_error); + } + + { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_wait_set_add_guard_condition, RCL_RET_ERROR); + + auto gd = std::make_shared(); + EXPECT_THROW(gd->add_to_wait_set(nullptr), rclcpp::exceptions::RCLError); + } + } +} + +/* + * Testing set on trigger callback + */ +TEST_F(TestGuardCondition, set_on_trigger_callback) { + { + auto gc = std::make_shared(); + + std::atomic c1 {0}; + auto increase_c1_cb = [&c1](size_t count_msgs) {c1 += count_msgs;}; + gc->set_on_trigger_callback(increase_c1_cb); + + EXPECT_EQ(c1.load(), 0u); + EXPECT_NO_THROW(gc->trigger()); + EXPECT_EQ(c1.load(), 1u); + + std::atomic c2 {0}; + auto increase_c2_cb = [&c2](size_t count_msgs) {c2 += count_msgs;}; + gc->set_on_trigger_callback(increase_c2_cb); + + EXPECT_NO_THROW(gc->trigger()); + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + + gc->set_on_trigger_callback(nullptr); + EXPECT_NO_THROW(gc->trigger()); + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + + gc->set_on_trigger_callback(increase_c1_cb); + EXPECT_EQ(c1.load(), 2u); + } +}