Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Humble] Use Fast-DDS Waitsets instead of listeners (backport #619) #633

Merged
merged 1 commit into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/subscription.hpp
Expand Up @@ -30,8 +30,7 @@ create_subscription(
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options,
bool keyed,
bool create_subscription_listener);
bool keyed);
} // namespace rmw_fastrtps_cpp

#endif // RMW_FASTRTPS_CPP__SUBSCRIPTION_HPP_
26 changes: 16 additions & 10 deletions rmw_fastrtps_cpp/src/init_rmw_context_impl.cpp
Expand Up @@ -42,7 +42,8 @@ using rmw_dds_common::msg::ParticipantEntitiesInfo;

static
rmw_ret_t
init_context_impl(rmw_context_t * context)
init_context_impl(
rmw_context_t * context)
{
rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options();
rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options();
Expand All @@ -65,7 +66,8 @@ init_context_impl(rmw_context_t * context)
(context->options.localhost_only == RMW_LOCALHOST_ONLY_ENABLED) ? 1 : 0,
context->options.enclave,
common_context.get()),
[&](CustomParticipantInfo * participant_info) {
[&](CustomParticipantInfo * participant_info)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_participant(participant_info)) {
RCUTILS_SAFE_FWRITE_TO_STDERR(
"Failed to destroy participant after function: '"
Expand All @@ -92,9 +94,10 @@ init_context_impl(rmw_context_t * context)
"ros_discovery_info",
&qos,
&publisher_options,
false, // our fastrtps typesupport doesn't support keyed topics
false, // our fastrtps typesupport doesn't support keyed topics
true),
[&](rmw_publisher_t * pub) {
[&](rmw_publisher_t * pub)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_publisher(
eprosima_fastrtps_identifier,
participant_info.get(),
Expand All @@ -119,9 +122,9 @@ init_context_impl(rmw_context_t * context)
"ros_discovery_info",
&qos,
&subscription_options,
false, // our fastrtps typesupport doesn't support keyed topics
true),
[&](rmw_subscription_t * sub) {
false), // our fastrtps typesupport doesn't support keyed topics
[&](rmw_subscription_t * sub)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_subscription(
eprosima_fastrtps_identifier,
participant_info.get(),
Expand All @@ -139,7 +142,8 @@ init_context_impl(rmw_context_t * context)
std::unique_ptr<rmw_guard_condition_t, std::function<void(rmw_guard_condition_t *)>>
graph_guard_condition(
rmw_fastrtps_shared_cpp::__rmw_create_guard_condition(eprosima_fastrtps_identifier),
[&](rmw_guard_condition_t * p) {
[&](rmw_guard_condition_t * p)
{
rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_destroy_guard_condition(p);
if (ret != RMW_RET_OK) {
RMW_SAFE_FWRITE_TO_STDERR(
Expand All @@ -166,7 +170,8 @@ init_context_impl(rmw_context_t * context)
}

common_context->graph_cache.set_on_change_callback(
[guard_condition = graph_guard_condition.get()]() {
[guard_condition = graph_guard_condition.get()]()
{
rmw_fastrtps_shared_cpp::__rmw_trigger_guard_condition(
eprosima_fastrtps_identifier,
guard_condition);
Expand All @@ -185,7 +190,8 @@ init_context_impl(rmw_context_t * context)
}

rmw_ret_t
rmw_fastrtps_cpp::increment_context_impl_ref_count(rmw_context_t * context)
rmw_fastrtps_cpp::increment_context_impl_ref_count(
rmw_context_t * context)
{
assert(context);
assert(context->impl);
Expand Down
9 changes: 7 additions & 2 deletions rmw_fastrtps_cpp/src/publisher.cpp
Expand Up @@ -269,17 +269,22 @@ rmw_fastrtps_cpp::create_publisher(
return nullptr;
}

// Creates DataWriter
// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->data_writer_ = publisher->create_datawriter(
topic.topic,
writer_qos,
info->listener_);
info->listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());

if (!info->data_writer_) {
RMW_SET_ERROR_MSG("create_publisher() could not create data writer");
return nullptr;
}

// Set the StatusCondition to none to prevent triggering via WaitSets
info->data_writer_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::none());

// lambda to delete datawriter
auto cleanup_datawriter = rcpputils::make_scope_exit(
[publisher, info]() {
Expand Down
15 changes: 12 additions & 3 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Expand Up @@ -324,13 +324,17 @@ rmw_create_client(
info->response_reader_ = subscriber->create_datareader(
response_topic_desc,
reader_qos,
info->listener_);
info->listener_,
eprosima::fastdds::dds::StatusMask::subscription_matched());

if (!info->response_reader_) {
RMW_SET_ERROR_MSG("create_client() failed to create response DataReader");
return nullptr;
}

info->response_reader_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::data_available());

// lambda to delete datareader
auto cleanup_datareader = rcpputils::make_scope_exit(
[subscriber, info]() {
Expand Down Expand Up @@ -370,17 +374,22 @@ rmw_create_client(
return nullptr;
}

// Creates DataWriter
// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->request_writer_ = publisher->create_datawriter(
request_topic.topic,
writer_qos,
info->pub_listener_);
info->pub_listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());

if (!info->request_writer_) {
RMW_SET_ERROR_MSG("create_client() failed to create request DataWriter");
return nullptr;
}

// Set the StatusCondition to none to prevent triggering via WaitSets
info->request_writer_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::none());

// lambda to delete datawriter
auto cleanup_datawriter = rcpputils::make_scope_exit(
[publisher, info]() {
Expand Down
15 changes: 12 additions & 3 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Expand Up @@ -323,13 +323,17 @@ rmw_create_service(
info->request_reader_ = subscriber->create_datareader(
request_topic_desc,
reader_qos,
info->listener_);
info->listener_,
eprosima::fastdds::dds::StatusMask::subscription_matched());

if (!info->request_reader_) {
RMW_SET_ERROR_MSG("create_service() failed to create request DataReader");
return nullptr;
}

info->request_reader_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::data_available());

// lambda to delete datareader
auto cleanup_datareader = rcpputils::make_scope_exit(
[subscriber, info]() {
Expand Down Expand Up @@ -373,17 +377,22 @@ rmw_create_service(
return nullptr;
}

// Creates DataWriter
// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->response_writer_ = publisher->create_datawriter(
response_topic.topic,
writer_qos,
info->pub_listener_);
info->pub_listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());

if (!info->response_writer_) {
RMW_SET_ERROR_MSG("create_service() failed to create response DataWriter");
return nullptr;
}

// Set the StatusCondition to none to prevent triggering via WaitSets
info->response_writer_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::none());

// lambda to delete datawriter
auto cleanup_datawriter = rcpputils::make_scope_exit(
[publisher, info]() {
Expand Down
3 changes: 1 addition & 2 deletions rmw_fastrtps_cpp/src/rmw_subscription.cpp
Expand Up @@ -77,8 +77,7 @@ rmw_create_subscription(
topic_name,
qos_policies,
subscription_options,
false, // use no keyed topic
true); // create subscription listener
false); // use no keyed topic
if (!subscription) {
return nullptr;
}
Expand Down
27 changes: 16 additions & 11 deletions rmw_fastrtps_cpp/src/subscription.cpp
Expand Up @@ -65,8 +65,7 @@ create_subscription(
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options,
bool keyed,
bool create_subscription_listener)
bool keyed)
{
/////
// Check input parameters
Expand Down Expand Up @@ -164,7 +163,8 @@ create_subscription(
}

auto cleanup_info = rcpputils::make_scope_exit(
[info, dds_participant]() {
[info, dds_participant]()
{
delete info->listener_;
if (info->type_support_) {
dds_participant->unregister_type(info->type_support_.get_type_name());
Expand Down Expand Up @@ -208,12 +208,10 @@ create_subscription(

/////
// Create Listener
if (create_subscription_listener) {
info->listener_ = new (std::nothrow) SubListener(info, qos_policies->depth);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener");
return nullptr;
}
info->listener_ = new (std::nothrow) SubListener(info);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener");
return nullptr;
}

/////
Expand Down Expand Up @@ -297,9 +295,14 @@ create_subscription(
return nullptr;
}

// Initialize DataReader's StatusCondition to be notified when new data is available
info->data_reader_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::data_available());

// lambda to delete datareader
auto cleanup_datareader = rcpputils::make_scope_exit(
[subscriber, info]() {
[subscriber, info]()
{
subscriber->delete_datareader(info->data_reader_);
});

Expand All @@ -316,7 +319,8 @@ create_subscription(
return nullptr;
}
auto cleanup_rmw_subscription = rcpputils::make_scope_exit(
[rmw_subscription]() {
[rmw_subscription]()
{
rmw_free(const_cast<char *>(rmw_subscription->topic_name));
rmw_subscription_free(rmw_subscription);
});
Expand Down Expand Up @@ -345,4 +349,5 @@ create_subscription(
info->subscription_gid_.data);
return rmw_subscription;
}

} // namespace rmw_fastrtps_cpp
Expand Up @@ -30,8 +30,7 @@ create_subscription(
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options,
bool keyed,
bool create_subscription_listener);
bool keyed);

} // namespace rmw_fastrtps_dynamic_cpp

Expand Down
26 changes: 16 additions & 10 deletions rmw_fastrtps_dynamic_cpp/src/init_rmw_context_impl.cpp
Expand Up @@ -42,7 +42,8 @@ using rmw_dds_common::msg::ParticipantEntitiesInfo;

static
rmw_ret_t
init_context_impl(rmw_context_t * context)
init_context_impl(
rmw_context_t * context)
{
rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options();
rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options();
Expand All @@ -65,7 +66,8 @@ init_context_impl(rmw_context_t * context)
(context->options.localhost_only == RMW_LOCALHOST_ONLY_ENABLED) ? 1 : 0,
context->options.enclave,
common_context.get()),
[&](CustomParticipantInfo * participant_info) {
[&](CustomParticipantInfo * participant_info)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_participant(participant_info)) {
RCUTILS_SAFE_FWRITE_TO_STDERR(
"Failed to destroy participant after function: '"
Expand All @@ -92,9 +94,10 @@ init_context_impl(rmw_context_t * context)
"ros_discovery_info",
&qos,
&publisher_options,
false, // our fastrtps typesupport doesn't support keyed topics
false, // our fastrtps typesupport doesn't support keyed topics
true),
[&](rmw_publisher_t * pub) {
[&](rmw_publisher_t * pub)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_publisher(
eprosima_fastrtps_identifier,
participant_info.get(),
Expand All @@ -119,9 +122,9 @@ init_context_impl(rmw_context_t * context)
"ros_discovery_info",
&qos,
&subscription_options,
false, // our fastrtps typesupport doesn't support keyed topics
true),
[&](rmw_subscription_t * sub) {
false), // our fastrtps typesupport doesn't support keyed topics
[&](rmw_subscription_t * sub)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_subscription(
eprosima_fastrtps_identifier,
participant_info.get(),
Expand All @@ -139,7 +142,8 @@ init_context_impl(rmw_context_t * context)
std::unique_ptr<rmw_guard_condition_t, std::function<void(rmw_guard_condition_t *)>>
graph_guard_condition(
rmw_fastrtps_shared_cpp::__rmw_create_guard_condition(eprosima_fastrtps_identifier),
[&](rmw_guard_condition_t * p) {
[&](rmw_guard_condition_t * p)
{
rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_destroy_guard_condition(p);
if (ret != RMW_RET_OK) {
RMW_SAFE_FWRITE_TO_STDERR(
Expand All @@ -166,7 +170,8 @@ init_context_impl(rmw_context_t * context)
}

common_context->graph_cache.set_on_change_callback(
[guard_condition = graph_guard_condition.get()]() {
[guard_condition = graph_guard_condition.get()]()
{
rmw_fastrtps_shared_cpp::__rmw_trigger_guard_condition(
eprosima_fastrtps_identifier,
guard_condition);
Expand All @@ -185,7 +190,8 @@ init_context_impl(rmw_context_t * context)
}

rmw_ret_t
rmw_fastrtps_dynamic_cpp::increment_context_impl_ref_count(rmw_context_t * context)
rmw_fastrtps_dynamic_cpp::increment_context_impl_ref_count(
rmw_context_t * context)
{
assert(context);
assert(context->impl);
Expand Down
6 changes: 5 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/publisher.cpp
Expand Up @@ -278,13 +278,17 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
info->data_writer_ = publisher->create_datawriter(
topic.topic,
writer_qos,
info->listener_);
info->listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());

if (!info->data_writer_) {
RMW_SET_ERROR_MSG("create_publisher() could not create data writer");
return nullptr;
}

info->data_writer_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::none());

// lambda to delete datawriter
auto cleanup_datawriter = rcpputils::make_scope_exit(
[publisher, info]() {
Expand Down