Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement matched event #2105

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rclcpp/include/rclcpp/event_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ using QOSOfferedIncompatibleQoSInfo = rmw_offered_qos_incompatible_event_status_
using QOSRequestedIncompatibleQoSInfo = rmw_requested_qos_incompatible_event_status_t;

using IncompatibleTypeInfo = rmw_incompatible_type_status_t;
using MatchedInfo = rmw_matched_status_t;

using QOSDeadlineRequestedCallbackType = std::function<void (QOSDeadlineRequestedInfo &)>;
using QOSDeadlineOfferedCallbackType = std::function<void (QOSDeadlineOfferedInfo &)>;
Expand All @@ -58,6 +59,8 @@ using QOSRequestedIncompatibleQoSCallbackType =
std::function<void (QOSRequestedIncompatibleQoSInfo &)>;

using IncompatibleTypeCallbackType = std::function<void (IncompatibleTypeInfo &)>;
using PublisherMatchedCallbackType = std::function<void (MatchedInfo &)>;
using SubscriptionMatchedCallbackType = std::function<void (MatchedInfo &)>;

/// Contains callbacks for various types of events a Publisher can receive from the middleware.
struct PublisherEventCallbacks
Expand All @@ -66,6 +69,7 @@ struct PublisherEventCallbacks
QOSLivelinessLostCallbackType liveliness_callback;
QOSOfferedIncompatibleQoSCallbackType incompatible_qos_callback;
IncompatibleTypeCallbackType incompatible_type_callback;
PublisherMatchedCallbackType matched_callback;
};

/// Contains callbacks for non-message events that a Subscription can receive from the middleware.
Expand All @@ -76,6 +80,7 @@ struct SubscriptionEventCallbacks
QOSRequestedIncompatibleQoSCallbackType incompatible_qos_callback;
QOSMessageLostCallbackType message_lost_callback;
IncompatibleTypeCallbackType incompatible_type_callback;
SubscriptionMatchedCallbackType matched_callback;
};

class UnsupportedEventTypeException : public exceptions::RCLErrorBase, public std::runtime_error
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/src/rclcpp/publisher_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ PublisherBase::bind_event_callbacks(
rclcpp::get_logger("rclcpp"),
"Failed to add event handler for incompatible type; wrong callback type");
}
if (event_callbacks.matched_callback) {
this->add_event_handler(
event_callbacks.matched_callback,
RCL_PUBLISHER_MATCHED);
}
}

size_t
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/src/rclcpp/subscription_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ SubscriptionBase::bind_event_callbacks(
event_callbacks.message_lost_callback,
RCL_SUBSCRIPTION_MESSAGE_LOST);
}
if (event_callbacks.matched_callback) {
this->add_event_handler(
event_callbacks.matched_callback,
RCL_SUBSCRIPTION_MATCHED);
}
}

const char *
Expand Down
26 changes: 16 additions & 10 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -444,17 +444,23 @@ function(test_generic_pubsub_for_rmw_implementation)
endif()
endfunction()
call_for_each_rmw_implementation(test_generic_pubsub_for_rmw_implementation)
ament_add_gtest(test_qos_event test_qos_event.cpp)
if(TARGET test_qos_event)
ament_target_dependencies(test_qos_event
"rmw"
"test_msgs"
)
target_link_libraries(test_qos_event
${PROJECT_NAME}
mimick

function(test_qos_event_for_rmw_implementation)
set(rmw_implementation_env_var RMW_IMPLEMENTATION=${rmw_implementation})
ament_add_gmock(test_qos_event${target_suffix} test_qos_event.cpp
ENV ${rmw_implementation_env_var}
)
endif()
if(TARGET test_qos_event${target_suffix})
target_link_libraries(test_qos_event${target_suffix} ${PROJECT_NAME} mimick)
ament_target_dependencies(test_qos_event${target_suffix}
"rmw"
"rosidl_typesupport_cpp"
"test_msgs"
)
endif()
endfunction()
call_for_each_rmw_implementation(test_qos_event_for_rmw_implementation)

ament_add_gmock(test_qos_overriding_options test_qos_overriding_options.cpp)
if(TARGET test_qos_overriding_options)
target_link_libraries(test_qos_overriding_options
Expand Down
190 changes: 190 additions & 0 deletions rclcpp/test/rclcpp/test_qos_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <gtest/gtest.h>

#include <atomic>
#include <chrono>
#include <functional>
#include <future>
Expand Down Expand Up @@ -313,6 +314,11 @@ TEST_F(TestQosEvent, add_to_wait_set) {

TEST_F(TestQosEvent, test_on_new_event_callback)
{
// rmw_connextdds doesn't support rmw_event_set_callback() interface
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}

auto offered_deadline = rclcpp::Duration(std::chrono::milliseconds(1));
auto requested_deadline = rclcpp::Duration(std::chrono::milliseconds(2));

Expand Down Expand Up @@ -354,6 +360,11 @@ TEST_F(TestQosEvent, test_on_new_event_callback)

TEST_F(TestQosEvent, test_invalid_on_new_event_callback)
{
// rmw_connextdds doesn't support rmw_event_set_callback() interface
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}

auto pub = node->create_publisher<test_msgs::msg::Empty>(topic_name, 10);
auto sub = node->create_subscription<test_msgs::msg::Empty>(topic_name, 10, message_callback);
auto dummy_cb = [](size_t count_events) {(void)count_events;};
Expand All @@ -376,6 +387,12 @@ TEST_F(TestQosEvent, test_invalid_on_new_event_callback)
EXPECT_NO_THROW(
pub->clear_on_new_qos_event_callback(RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS));

EXPECT_NO_THROW(
pub->set_on_new_qos_event_callback(dummy_cb, RCL_PUBLISHER_MATCHED));

EXPECT_NO_THROW(
pub->clear_on_new_qos_event_callback(RCL_PUBLISHER_MATCHED));

EXPECT_NO_THROW(
sub->set_on_new_qos_event_callback(dummy_cb, RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED));

Expand All @@ -394,6 +411,12 @@ TEST_F(TestQosEvent, test_invalid_on_new_event_callback)
EXPECT_NO_THROW(
sub->clear_on_new_qos_event_callback(RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS));

EXPECT_NO_THROW(
sub->set_on_new_qos_event_callback(dummy_cb, RCL_SUBSCRIPTION_MATCHED));

EXPECT_NO_THROW(
sub->clear_on_new_qos_event_callback(RCL_SUBSCRIPTION_MATCHED));

std::function<void(size_t)> invalid_cb;

rclcpp::SubscriptionOptions sub_options;
Expand All @@ -413,3 +436,170 @@ TEST_F(TestQosEvent, test_invalid_on_new_event_callback)
pub->set_on_new_qos_event_callback(invalid_cb, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED),
std::invalid_argument);
}

TEST_F(TestQosEvent, test_pub_matched_event_by_set_event_callback)
{
// rmw_connextdds doesn't support rmw_event_set_callback() interface
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}

std::atomic_size_t matched_count = 0;

rclcpp::PublisherOptions pub_options;
pub_options.event_callbacks.matched_callback = [](auto) {};
auto pub = node->create_publisher<test_msgs::msg::Empty>(
topic_name, 10, pub_options);

auto matched_event_callback = [&matched_count](size_t count) {
matched_count += count;
};

pub->set_on_new_qos_event_callback(matched_event_callback, RCL_PUBLISHER_MATCHED);

rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(node->get_node_base_interface());

const auto timeout = std::chrono::milliseconds(200);

{
auto sub1 = node->create_subscription<test_msgs::msg::Empty>(topic_name, 10, message_callback);
ex.spin_some(timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(1));

{
auto sub2 = node->create_subscription<test_msgs::msg::Empty>(
topic_name, 10, message_callback);
ex.spin_some(timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(2));
}
ex.spin_some(timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(3));
}

ex.spin_some(timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(4));
}

TEST_F(TestQosEvent, test_sub_matched_event_by_set_event_callback)
{
// rmw_connextdds doesn't support rmw_event_set_callback() interface
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}

std::atomic_size_t matched_count = 0;

rclcpp::SubscriptionOptions sub_options;
sub_options.event_callbacks.matched_callback = [](auto) {};
auto sub = node->create_subscription<test_msgs::msg::Empty>(
topic_name, 10, message_callback, sub_options);

auto matched_event_callback = [&matched_count](size_t count) {
matched_count += count;
};

sub->set_on_new_qos_event_callback(matched_event_callback, RCL_SUBSCRIPTION_MATCHED);

rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(node->get_node_base_interface());

const auto timeout = std::chrono::milliseconds(200);

{
auto pub1 = node->create_publisher<test_msgs::msg::Empty>(topic_name, 10);

ex.spin_some(timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(1));

{
auto pub2 = node->create_publisher<test_msgs::msg::Empty>(topic_name, 10);
ex.spin_some(timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(2));
}

ex.spin_some(timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(3));
}

ex.spin_some(timeout);
EXPECT_EQ(matched_count, static_cast<size_t>(4));
}

TEST_F(TestQosEvent, test_pub_matched_event_by_option_event_callback)
{
rmw_matched_status_t matched_expected_result;

rclcpp::PublisherOptions pub_options;
pub_options.event_callbacks.matched_callback =
[&matched_expected_result](rmw_matched_status_t & s) {
EXPECT_EQ(s.total_count, matched_expected_result.total_count);
EXPECT_EQ(s.total_count_change, matched_expected_result.total_count_change);
EXPECT_EQ(s.current_count, matched_expected_result.current_count);
EXPECT_EQ(s.current_count_change, matched_expected_result.current_count_change);
};

auto pub = node->create_publisher<test_msgs::msg::Empty>(
topic_name, 10, pub_options);

rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(node->get_node_base_interface());

// Create a connected subscription
matched_expected_result.total_count = 1;
matched_expected_result.total_count_change = 1;
matched_expected_result.current_count = 1;
matched_expected_result.current_count_change = 1;

const auto timeout = std::chrono::milliseconds(200);

{
auto sub = node->create_subscription<test_msgs::msg::Empty>(topic_name, 10, message_callback);
ex.spin_some(timeout);

// destroy a connected subscription
matched_expected_result.total_count = 1;
matched_expected_result.total_count_change = 0;
matched_expected_result.current_count = 0;
matched_expected_result.current_count_change = -1;
}
ex.spin_some(timeout);
}

TEST_F(TestQosEvent, test_sub_matched_event_by_option_event_callback)
{
rmw_matched_status_t matched_expected_result;

rclcpp::SubscriptionOptions sub_options;
sub_options.event_callbacks.matched_callback =
[&matched_expected_result](rmw_matched_status_t & s) {
EXPECT_EQ(s.total_count, matched_expected_result.total_count);
EXPECT_EQ(s.total_count_change, matched_expected_result.total_count_change);
EXPECT_EQ(s.current_count, matched_expected_result.current_count);
EXPECT_EQ(s.current_count_change, matched_expected_result.current_count_change);
};
auto sub = node->create_subscription<test_msgs::msg::Empty>(
topic_name, 10, message_callback, sub_options);

rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(node->get_node_base_interface());

// Create a connected publisher
matched_expected_result.total_count = 1;
matched_expected_result.total_count_change = 1;
matched_expected_result.current_count = 1;
matched_expected_result.current_count_change = 1;

const auto timeout = std::chrono::milliseconds(200);
{
auto pub1 = node->create_publisher<test_msgs::msg::Empty>(topic_name, 10);
ex.spin_some(timeout);

// destroy a connected publisher
matched_expected_result.total_count = 1;
matched_expected_result.total_count_change = 0;
matched_expected_result.current_count = 0;
matched_expected_result.current_count_change = -1;
}
ex.spin_some(timeout);
}