diff --git a/rclcpp/include/rclcpp/event_handler.hpp b/rclcpp/include/rclcpp/event_handler.hpp index af5d8c08dc..3f41de469c 100644 --- a/rclcpp/include/rclcpp/event_handler.hpp +++ b/rclcpp/include/rclcpp/event_handler.hpp @@ -47,6 +47,7 @@ using QOSOfferedIncompatibleQoSInfo = rmw_offered_qos_incompatible_event_status_ using QOSRequestedIncompatibleQoSInfo = rmw_requested_qos_incompatible_event_status_t; using IncompatibleTypeInfo = rmw_incompatible_type_status_t; +using MatchedInfo = rmw_matched_status_t; using QOSDeadlineRequestedCallbackType = std::function; using QOSDeadlineOfferedCallbackType = std::function; @@ -58,6 +59,8 @@ using QOSRequestedIncompatibleQoSCallbackType = std::function; using IncompatibleTypeCallbackType = std::function; +using PublisherMatchedCallbackType = std::function; +using SubscriptionMatchedCallbackType = std::function; /// Contains callbacks for various types of events a Publisher can receive from the middleware. struct PublisherEventCallbacks @@ -66,6 +69,7 @@ struct PublisherEventCallbacks QOSLivelinessLostCallbackType liveliness_callback; QOSOfferedIncompatibleQoSCallbackType incompatible_qos_callback; IncompatibleTypeCallbackType incompatible_type_callback; + PublisherMatchedCallbackType matched_callback; }; /// Contains callbacks for non-message events that a Subscription can receive from the middleware. @@ -76,6 +80,7 @@ struct SubscriptionEventCallbacks QOSRequestedIncompatibleQoSCallbackType incompatible_qos_callback; QOSMessageLostCallbackType message_lost_callback; IncompatibleTypeCallbackType incompatible_type_callback; + SubscriptionMatchedCallbackType matched_callback; }; class UnsupportedEventTypeException : public exceptions::RCLErrorBase, public std::runtime_error diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index a96a6ee791..698db2d559 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -183,6 +183,11 @@ PublisherBase::bind_event_callbacks( rclcpp::get_logger("rclcpp"), "Failed to add event handler for incompatible type; wrong callback type"); } + if (event_callbacks.matched_callback) { + this->add_event_handler( + event_callbacks.matched_callback, + RCL_PUBLISHER_MATCHED); + } } size_t diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index 2302876338..e95cb4ac49 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -161,6 +161,11 @@ SubscriptionBase::bind_event_callbacks( event_callbacks.message_lost_callback, RCL_SUBSCRIPTION_MESSAGE_LOST); } + if (event_callbacks.matched_callback) { + this->add_event_handler( + event_callbacks.matched_callback, + RCL_SUBSCRIPTION_MATCHED); + } } const char * diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index 0399f3ae11..d4da759c02 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -444,17 +444,23 @@ function(test_generic_pubsub_for_rmw_implementation) endif() endfunction() call_for_each_rmw_implementation(test_generic_pubsub_for_rmw_implementation) -ament_add_gtest(test_qos_event test_qos_event.cpp) -if(TARGET test_qos_event) - ament_target_dependencies(test_qos_event - "rmw" - "test_msgs" - ) - target_link_libraries(test_qos_event - ${PROJECT_NAME} - mimick + +function(test_qos_event_for_rmw_implementation) + set(rmw_implementation_env_var RMW_IMPLEMENTATION=${rmw_implementation}) + ament_add_gmock(test_qos_event${target_suffix} test_qos_event.cpp + ENV ${rmw_implementation_env_var} ) -endif() + if(TARGET test_qos_event${target_suffix}) + target_link_libraries(test_qos_event${target_suffix} ${PROJECT_NAME} mimick) + ament_target_dependencies(test_qos_event${target_suffix} + "rmw" + "rosidl_typesupport_cpp" + "test_msgs" + ) + endif() +endfunction() +call_for_each_rmw_implementation(test_qos_event_for_rmw_implementation) + ament_add_gmock(test_qos_overriding_options test_qos_overriding_options.cpp) if(TARGET test_qos_overriding_options) target_link_libraries(test_qos_overriding_options diff --git a/rclcpp/test/rclcpp/test_qos_event.cpp b/rclcpp/test/rclcpp/test_qos_event.cpp index 9c4c839a86..6b522d7ea2 100644 --- a/rclcpp/test/rclcpp/test_qos_event.cpp +++ b/rclcpp/test/rclcpp/test_qos_event.cpp @@ -14,6 +14,7 @@ #include +#include #include #include #include @@ -313,6 +314,11 @@ TEST_F(TestQosEvent, add_to_wait_set) { TEST_F(TestQosEvent, test_on_new_event_callback) { + // rmw_connextdds doesn't support rmw_event_set_callback() interface + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { + GTEST_SKIP(); + } + auto offered_deadline = rclcpp::Duration(std::chrono::milliseconds(1)); auto requested_deadline = rclcpp::Duration(std::chrono::milliseconds(2)); @@ -354,6 +360,11 @@ TEST_F(TestQosEvent, test_on_new_event_callback) TEST_F(TestQosEvent, test_invalid_on_new_event_callback) { + // rmw_connextdds doesn't support rmw_event_set_callback() interface + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { + GTEST_SKIP(); + } + auto pub = node->create_publisher(topic_name, 10); auto sub = node->create_subscription(topic_name, 10, message_callback); auto dummy_cb = [](size_t count_events) {(void)count_events;}; @@ -376,6 +387,12 @@ TEST_F(TestQosEvent, test_invalid_on_new_event_callback) EXPECT_NO_THROW( pub->clear_on_new_qos_event_callback(RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS)); + EXPECT_NO_THROW( + pub->set_on_new_qos_event_callback(dummy_cb, RCL_PUBLISHER_MATCHED)); + + EXPECT_NO_THROW( + pub->clear_on_new_qos_event_callback(RCL_PUBLISHER_MATCHED)); + EXPECT_NO_THROW( sub->set_on_new_qos_event_callback(dummy_cb, RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED)); @@ -394,6 +411,12 @@ TEST_F(TestQosEvent, test_invalid_on_new_event_callback) EXPECT_NO_THROW( sub->clear_on_new_qos_event_callback(RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS)); + EXPECT_NO_THROW( + sub->set_on_new_qos_event_callback(dummy_cb, RCL_SUBSCRIPTION_MATCHED)); + + EXPECT_NO_THROW( + sub->clear_on_new_qos_event_callback(RCL_SUBSCRIPTION_MATCHED)); + std::function invalid_cb; rclcpp::SubscriptionOptions sub_options; @@ -413,3 +436,170 @@ TEST_F(TestQosEvent, test_invalid_on_new_event_callback) pub->set_on_new_qos_event_callback(invalid_cb, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED), std::invalid_argument); } + +TEST_F(TestQosEvent, test_pub_matched_event_by_set_event_callback) +{ + // rmw_connextdds doesn't support rmw_event_set_callback() interface + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { + GTEST_SKIP(); + } + + std::atomic_size_t matched_count = 0; + + rclcpp::PublisherOptions pub_options; + pub_options.event_callbacks.matched_callback = [](auto) {}; + auto pub = node->create_publisher( + topic_name, 10, pub_options); + + auto matched_event_callback = [&matched_count](size_t count) { + matched_count += count; + }; + + pub->set_on_new_qos_event_callback(matched_event_callback, RCL_PUBLISHER_MATCHED); + + rclcpp::executors::SingleThreadedExecutor ex; + ex.add_node(node->get_node_base_interface()); + + const auto timeout = std::chrono::milliseconds(200); + + { + auto sub1 = node->create_subscription(topic_name, 10, message_callback); + ex.spin_some(timeout); + EXPECT_EQ(matched_count, static_cast(1)); + + { + auto sub2 = node->create_subscription( + topic_name, 10, message_callback); + ex.spin_some(timeout); + EXPECT_EQ(matched_count, static_cast(2)); + } + ex.spin_some(timeout); + EXPECT_EQ(matched_count, static_cast(3)); + } + + ex.spin_some(timeout); + EXPECT_EQ(matched_count, static_cast(4)); +} + +TEST_F(TestQosEvent, test_sub_matched_event_by_set_event_callback) +{ + // rmw_connextdds doesn't support rmw_event_set_callback() interface + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { + GTEST_SKIP(); + } + + std::atomic_size_t matched_count = 0; + + rclcpp::SubscriptionOptions sub_options; + sub_options.event_callbacks.matched_callback = [](auto) {}; + auto sub = node->create_subscription( + topic_name, 10, message_callback, sub_options); + + auto matched_event_callback = [&matched_count](size_t count) { + matched_count += count; + }; + + sub->set_on_new_qos_event_callback(matched_event_callback, RCL_SUBSCRIPTION_MATCHED); + + rclcpp::executors::SingleThreadedExecutor ex; + ex.add_node(node->get_node_base_interface()); + + const auto timeout = std::chrono::milliseconds(200); + + { + auto pub1 = node->create_publisher(topic_name, 10); + + ex.spin_some(timeout); + EXPECT_EQ(matched_count, static_cast(1)); + + { + auto pub2 = node->create_publisher(topic_name, 10); + ex.spin_some(timeout); + EXPECT_EQ(matched_count, static_cast(2)); + } + + ex.spin_some(timeout); + EXPECT_EQ(matched_count, static_cast(3)); + } + + ex.spin_some(timeout); + EXPECT_EQ(matched_count, static_cast(4)); +} + +TEST_F(TestQosEvent, test_pub_matched_event_by_option_event_callback) +{ + rmw_matched_status_t matched_expected_result; + + rclcpp::PublisherOptions pub_options; + pub_options.event_callbacks.matched_callback = + [&matched_expected_result](rmw_matched_status_t & s) { + EXPECT_EQ(s.total_count, matched_expected_result.total_count); + EXPECT_EQ(s.total_count_change, matched_expected_result.total_count_change); + EXPECT_EQ(s.current_count, matched_expected_result.current_count); + EXPECT_EQ(s.current_count_change, matched_expected_result.current_count_change); + }; + + auto pub = node->create_publisher( + topic_name, 10, pub_options); + + rclcpp::executors::SingleThreadedExecutor ex; + ex.add_node(node->get_node_base_interface()); + + // Create a connected subscription + matched_expected_result.total_count = 1; + matched_expected_result.total_count_change = 1; + matched_expected_result.current_count = 1; + matched_expected_result.current_count_change = 1; + + const auto timeout = std::chrono::milliseconds(200); + + { + auto sub = node->create_subscription(topic_name, 10, message_callback); + ex.spin_some(timeout); + + // destroy a connected subscription + matched_expected_result.total_count = 1; + matched_expected_result.total_count_change = 0; + matched_expected_result.current_count = 0; + matched_expected_result.current_count_change = -1; + } + ex.spin_some(timeout); +} + +TEST_F(TestQosEvent, test_sub_matched_event_by_option_event_callback) +{ + rmw_matched_status_t matched_expected_result; + + rclcpp::SubscriptionOptions sub_options; + sub_options.event_callbacks.matched_callback = + [&matched_expected_result](rmw_matched_status_t & s) { + EXPECT_EQ(s.total_count, matched_expected_result.total_count); + EXPECT_EQ(s.total_count_change, matched_expected_result.total_count_change); + EXPECT_EQ(s.current_count, matched_expected_result.current_count); + EXPECT_EQ(s.current_count_change, matched_expected_result.current_count_change); + }; + auto sub = node->create_subscription( + topic_name, 10, message_callback, sub_options); + + rclcpp::executors::SingleThreadedExecutor ex; + ex.add_node(node->get_node_base_interface()); + + // Create a connected publisher + matched_expected_result.total_count = 1; + matched_expected_result.total_count_change = 1; + matched_expected_result.current_count = 1; + matched_expected_result.current_count_change = 1; + + const auto timeout = std::chrono::milliseconds(200); + { + auto pub1 = node->create_publisher(topic_name, 10); + ex.spin_some(timeout); + + // destroy a connected publisher + matched_expected_result.total_count = 1; + matched_expected_result.total_count_change = 0; + matched_expected_result.current_count = 0; + matched_expected_result.current_count_change = -1; + } + ex.spin_some(timeout); +}