diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 91ea91a7c3..ec89ebc5ef 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -118,6 +118,13 @@ class SubscriptionIntraProcess return nullptr; } } + + if (this->buffer_->has_data()) { + // If there is data still to be processed, indicate to the + // executor or waitset by triggering the guard condition. + this->trigger_guard_condition(); + } + return std::static_pointer_cast( std::make_shared>( std::pair( diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 1fa2cbb4dd..eb6652f19b 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -593,3 +593,106 @@ TEST(TestExecutors, testSpinUntilFutureCompleteNodePtr) { rclcpp::shutdown(); } + +template +class TestIntraprocessExecutors : public ::testing::Test +{ +public: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } + + void SetUp() + { + const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info(); + std::stringstream test_name; + test_name << test_info->test_case_name() << "_" << test_info->name(); + node = std::make_shared("node", test_name.str()); + + callback_count = 0; + + const std::string topic_name = std::string("topic_") + test_name.str(); + + rclcpp::PublisherOptions po; + po.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; + publisher = node->create_publisher(topic_name, rclcpp::QoS(1), po); + + auto callback = [this](test_msgs::msg::Empty::ConstSharedPtr) { + this->callback_count.fetch_add(1); + }; + + rclcpp::SubscriptionOptions so; + so.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; + subscription = + node->create_subscription( + topic_name, rclcpp::QoS(kNumMessages), std::move(callback), so); + } + + void TearDown() + { + publisher.reset(); + subscription.reset(); + node.reset(); + } + + const size_t kNumMessages = 100; + + rclcpp::Node::SharedPtr node; + rclcpp::Publisher::SharedPtr publisher; + rclcpp::Subscription::SharedPtr subscription; + std::atomic_int callback_count; +}; + +TYPED_TEST_SUITE(TestIntraprocessExecutors, ExecutorTypes, ExecutorTypeNames); + +TYPED_TEST(TestIntraprocessExecutors, testIntraprocessRetrigger) { + // This tests that executors will continue to service intraprocess subscriptions in the case + // that publishers aren't continuing to publish. + // This was previously broken in that intraprocess guard conditions were only triggered on + // publish and the test was added to prevent future regressions. + const size_t kNumMessages = 100; + + using ExecutorType = TypeParam; + ExecutorType executor; + executor.add_node(this->node); + + EXPECT_EQ(0, this->callback_count.load()); + this->publisher->publish(test_msgs::msg::Empty()); + + // Wait for up to 5 seconds for the first message to come available. + const std::chrono::milliseconds sleep_per_loop(10); + int loops = 0; + while (1u != this->callback_count.load() && loops < 500) { + rclcpp::sleep_for(sleep_per_loop); + executor.spin_some(); + loops++; + } + EXPECT_EQ(1u, this->callback_count.load()); + + // reset counter + this->callback_count.store(0); + + for (size_t ii = 0; ii < kNumMessages; ++ii) { + this->publisher->publish(test_msgs::msg::Empty()); + } + + // Fire a timer every 10ms up to 5 seconds waiting for subscriptions to be read. + loops = 0; + auto timer = this->node->create_wall_timer( + std::chrono::milliseconds(10), [this, &executor, &loops, &kNumMessages]() { + loops++; + if (kNumMessages == this->callback_count.load() || + loops == 500) + { + executor.cancel(); + } + }); + executor.spin(); + EXPECT_EQ(kNumMessages, this->callback_count.load()); +}