From 7a724995113a01273a649326db8ea3fd8a0bb840 Mon Sep 17 00:00:00 2001 From: Michael Carroll Date: Tue, 11 Apr 2023 08:45:35 -0500 Subject: [PATCH 1/4] Trigger the intraprocess guard condition with data If the intraprocess buffer still has data after taking, re-trigger the guard condition to ensure that the executor will continue to service it, even if incoming publications stop. Signed-off-by: Michael Carroll --- .../rclcpp/experimental/subscription_intra_process.hpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 91ea91a7c3..ec89ebc5ef 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -118,6 +118,13 @@ class SubscriptionIntraProcess return nullptr; } } + + if (this->buffer_->has_data()) { + // If there is data still to be processed, indicate to the + // executor or waitset by triggering the guard condition. + this->trigger_guard_condition(); + } + return std::static_pointer_cast( std::make_shared>( std::pair( From 0100d7365a2dca822e16e6183bcb792aaa4cb0dc Mon Sep 17 00:00:00 2001 From: Michael Carroll Date: Tue, 11 Apr 2023 09:40:14 -0500 Subject: [PATCH 2/4] Create test to prevent behavior regression Signed-off-by: Michael Carroll --- .../test/rclcpp/executors/test_executors.cpp | 92 +++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 1fa2cbb4dd..9758dc40fc 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -593,3 +593,95 @@ TEST(TestExecutors, testSpinUntilFutureCompleteNodePtr) { rclcpp::shutdown(); } + +template +class TestIntraprocessExecutors : public ::testing::Test +{ +public: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } + + void SetUp() + { + const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info(); + std::stringstream test_name; + test_name << test_info->test_case_name() << "_" << test_info->name(); + node = std::make_shared("node", test_name.str()); + + callback_count = 0; + + const std::string topic_name = std::string("topic_") + test_name.str(); + + rclcpp::PublisherOptions po; + po.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; + publisher = node->create_publisher(topic_name, rclcpp::QoS(1), po); + + auto callback = [this](test_msgs::msg::Empty::ConstSharedPtr) { + this->callback_count.fetch_add(1); + }; + + rclcpp::SubscriptionOptions so; + so.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; + subscription = + node->create_subscription( + topic_name, rclcpp::QoS(kNumMessages), std::move(callback), so); + } + + void TearDown() + { + publisher.reset(); + subscription.reset(); + node.reset(); + } + + const size_t kNumMessages = 100; + + rclcpp::Node::SharedPtr node; + rclcpp::Publisher::SharedPtr publisher; + rclcpp::Subscription::SharedPtr subscription; + std::atomic_int callback_count; +}; + +TYPED_TEST_SUITE(TestIntraprocessExecutors, ExecutorTypes, ExecutorTypeNames); + +TYPED_TEST(TestIntraprocessExecutors, testIntraprocessRetrigger) { + // This tests that executors will continue to service intraprocess subscriptions in the case + // that publishers aren't continuing to publish. + // This was previously broken in that intraprocess guard conditions were only triggered on + // publish and the test was added to prevent future regressions. + const size_t kNumMessages = 100; + + using ExecutorType = TypeParam; + ExecutorType executor; + executor.add_node(this->node); + + EXPECT_EQ(0, this->callback_count.load()); + this->publisher->publish(test_msgs::msg::Empty()); + + const std::chrono::milliseconds sleep_per_loop(10); + while (1u != this->callback_count.load()) { + rclcpp::sleep_for(sleep_per_loop); + executor.spin_some(); + } + EXPECT_EQ(1u, this->callback_count.load()); + + // reset counter + this->callback_count.store(0); + + for (size_t ii = 0; ii < kNumMessages; ++ii) { + this->publisher->publish(test_msgs::msg::Empty()); + } + auto timer = this->node->create_wall_timer( + std::chrono::milliseconds(250), [&executor]() { + executor.cancel(); + }); + executor.spin(); + EXPECT_EQ(kNumMessages, this->callback_count.load()); +} From 2a518c98bbf0254c92629e959064850c85d6ca00 Mon Sep 17 00:00:00 2001 From: Michael Carroll Date: Tue, 11 Apr 2023 15:06:31 -0500 Subject: [PATCH 3/4] Add some escape hatch conditions to tests Signed-off-by: Michael Carroll --- rclcpp/test/rclcpp/executors/test_executors.cpp | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 9758dc40fc..da46a1fc5b 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -665,10 +665,13 @@ TYPED_TEST(TestIntraprocessExecutors, testIntraprocessRetrigger) { EXPECT_EQ(0, this->callback_count.load()); this->publisher->publish(test_msgs::msg::Empty()); + // Wait for up to 5 seconds for the first message to come available. const std::chrono::milliseconds sleep_per_loop(10); - while (1u != this->callback_count.load()) { + int loops = 0; + while (1u != this->callback_count.load() && loops < 500) { rclcpp::sleep_for(sleep_per_loop); executor.spin_some(); + loops++; } EXPECT_EQ(1u, this->callback_count.load()); @@ -678,9 +681,16 @@ TYPED_TEST(TestIntraprocessExecutors, testIntraprocessRetrigger) { for (size_t ii = 0; ii < kNumMessages; ++ii) { this->publisher->publish(test_msgs::msg::Empty()); } + + // Fire a timer every 10ms up to 5 seconds waiting for subscriptions to be read. + loops = 0; auto timer = this->node->create_wall_timer( - std::chrono::milliseconds(250), [&executor]() { - executor.cancel(); + std::chrono::milliseconds(10), [this, &executor, &loops]() { + loops++; + if (kNumMessages == this->callback_count.load() || + loops == 500) { + executor.cancel(); + } }); executor.spin(); EXPECT_EQ(kNumMessages, this->callback_count.load()); From 151aab83f4fb9ab04069af7e00416abf497cb06f Mon Sep 17 00:00:00 2001 From: Michael Carroll Date: Tue, 11 Apr 2023 17:51:38 -0500 Subject: [PATCH 4/4] Make MSVC happy Signed-off-by: Michael Carroll --- rclcpp/test/rclcpp/executors/test_executors.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index da46a1fc5b..eb6652f19b 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -685,10 +685,11 @@ TYPED_TEST(TestIntraprocessExecutors, testIntraprocessRetrigger) { // Fire a timer every 10ms up to 5 seconds waiting for subscriptions to be read. loops = 0; auto timer = this->node->create_wall_timer( - std::chrono::milliseconds(10), [this, &executor, &loops]() { + std::chrono::milliseconds(10), [this, &executor, &loops, &kNumMessages]() { loops++; if (kNumMessages == this->callback_count.load() || - loops == 500) { + loops == 500) + { executor.cancel(); } });