Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Fast-DDS Waitsets instead of listeners #619

Merged
merged 34 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8f9b0e8
Initial commit
richiware Jun 24, 2022
6532c29
Working subscriptions and services
richiware Jun 27, 2022
db2ef41
Add event support
richiware Jun 27, 2022
905863e
Initial new event callback
richiware Jun 28, 2022
3413abe
Use guard_condition with event listeners
richiware Jun 28, 2022
7b9ca57
Remove unused functions
richiware Jun 28, 2022
2ee24cf
Fixing tests
richiware Jul 1, 2022
9a22e9e
Fixing uncrustify
richiware Jul 1, 2022
ac33f66
Simplify SubListener's get_unread_messages()
jsan-rt Jul 6, 2022
a64fc56
Simplified get_unread_requests() and get_unread_responses()
jsan-rt Jul 6, 2022
6de684e
Moved Waitset creation/destruction outside loop as suggested
jsan-rt Jul 11, 2022
dc5d78d
Renamed variable. Removed unneded checks. Replaced get_unread_count w…
jsan-rt Jul 11, 2022
ec7a576
Modified oneliners.
jsan-rt Jul 11, 2022
da3ade0
Cleaned more unneeded checks
jsan-rt Jul 11, 2022
850eeeb
Added RCPPUTILS_TSA_GUARDED_BY macros to previously atomic booleans
jsan-rt Jul 11, 2022
5ff6c5e
Fixed wrong mutex guard. Renamed and removed break; from TERMINATE_TH…
jsan-rt Jul 11, 2022
6735e87
Fix waiting events
richiware Jul 13, 2022
fe61133
Fixing crash
richiware Jul 13, 2022
67fc650
Fix linking error on Windows
richiware Jul 13, 2022
50dd0aa
Usage of new function get_unread_count(bool)
richiware Jul 14, 2022
6b8946a
Fix windows compilation linkage error
richiware Jul 15, 2022
cc8c3f3
Removed unneeded include. Restored some cleanup code. Added some comm…
jsan-rt Jul 13, 2022
9e8865f
Set nullptr after delete
jsan-rt Jul 13, 2022
cfea91c
Detach listener
jsan-rt Jul 13, 2022
07838b6
Fix creation of datareader for content filter
richiware Jul 15, 2022
b778478
Fix wrong usage of take_next_sample with read samples
richiware Jul 20, 2022
62336a2
Apply suggestions
richiware Jul 20, 2022
03f6b71
Apply suggestion
richiware Jul 21, 2022
e95e130
Fix rosbag2 failure tests
richiware Aug 11, 2022
68aacaa
Apply suggestions
richiware Aug 12, 2022
09cb19b
Change usage of StatusMask::operator<<
richiware Aug 12, 2022
c175f3a
Protecting a member
richiware Aug 12, 2022
8b1a886
Apply suggestions
richiware Aug 12, 2022
fccdddd
Fix extra space
richiware Aug 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ create_subscription(
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options,
bool keyed,
bool create_subscription_listener);
bool keyed);
} // namespace rmw_fastrtps_cpp

#endif // RMW_FASTRTPS_CPP__SUBSCRIPTION_HPP_
26 changes: 16 additions & 10 deletions rmw_fastrtps_cpp/src/init_rmw_context_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ using rmw_dds_common::msg::ParticipantEntitiesInfo;

static
rmw_ret_t
init_context_impl(rmw_context_t * context)
init_context_impl(
rmw_context_t * context)
{
rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options();
rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options();
Expand All @@ -65,7 +66,8 @@ init_context_impl(rmw_context_t * context)
(context->options.localhost_only == RMW_LOCALHOST_ONLY_ENABLED) ? 1 : 0,
context->options.enclave,
common_context.get()),
[&](CustomParticipantInfo * participant_info) {
[&](CustomParticipantInfo * participant_info)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_participant(participant_info)) {
RCUTILS_SAFE_FWRITE_TO_STDERR(
"Failed to destroy participant after function: '"
Expand All @@ -92,9 +94,10 @@ init_context_impl(rmw_context_t * context)
"ros_discovery_info",
&qos,
&publisher_options,
false, // our fastrtps typesupport doesn't support keyed topics
false, // our fastrtps typesupport doesn't support keyed topics
true),
[&](rmw_publisher_t * pub) {
[&](rmw_publisher_t * pub)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_publisher(
eprosima_fastrtps_identifier,
participant_info.get(),
Expand All @@ -119,9 +122,9 @@ init_context_impl(rmw_context_t * context)
"ros_discovery_info",
&qos,
&subscription_options,
false, // our fastrtps typesupport doesn't support keyed topics
true),
[&](rmw_subscription_t * sub) {
false), // our fastrtps typesupport doesn't support keyed topics
[&](rmw_subscription_t * sub)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_subscription(
eprosima_fastrtps_identifier,
participant_info.get(),
Expand All @@ -139,7 +142,8 @@ init_context_impl(rmw_context_t * context)
std::unique_ptr<rmw_guard_condition_t, std::function<void(rmw_guard_condition_t *)>>
graph_guard_condition(
rmw_fastrtps_shared_cpp::__rmw_create_guard_condition(eprosima_fastrtps_identifier),
[&](rmw_guard_condition_t * p) {
[&](rmw_guard_condition_t * p)
{
rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_destroy_guard_condition(p);
if (ret != RMW_RET_OK) {
RMW_SAFE_FWRITE_TO_STDERR(
Expand All @@ -166,7 +170,8 @@ init_context_impl(rmw_context_t * context)
}

common_context->graph_cache.set_on_change_callback(
[guard_condition = graph_guard_condition.get()]() {
[guard_condition = graph_guard_condition.get()]()
{
rmw_fastrtps_shared_cpp::__rmw_trigger_guard_condition(
eprosima_fastrtps_identifier,
guard_condition);
Expand All @@ -185,7 +190,8 @@ init_context_impl(rmw_context_t * context)
}

rmw_ret_t
rmw_fastrtps_cpp::increment_context_impl_ref_count(rmw_context_t * context)
rmw_fastrtps_cpp::increment_context_impl_ref_count(
rmw_context_t * context)
{
assert(context);
assert(context->impl);
Expand Down
9 changes: 7 additions & 2 deletions rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,17 +269,22 @@ rmw_fastrtps_cpp::create_publisher(
return nullptr;
}

// Creates DataWriter
// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->data_writer_ = publisher->create_datawriter(
topic.topic,
writer_qos,
info->listener_);
info->listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());

if (!info->data_writer_) {
RMW_SET_ERROR_MSG("create_publisher() could not create data writer");
return nullptr;
}

// Set the StatusCondition to none to prevent triggering via WaitSets
info->data_writer_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::none());

// lambda to delete datawriter
auto cleanup_datawriter = rcpputils::make_scope_exit(
[publisher, info]() {
Expand Down
15 changes: 12 additions & 3 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,17 @@ rmw_create_client(
info->response_reader_ = subscriber->create_datareader(
response_topic_desc,
reader_qos,
info->listener_);
info->listener_,
eprosima::fastdds::dds::StatusMask::subscription_matched());

if (!info->response_reader_) {
RMW_SET_ERROR_MSG("create_client() failed to create response DataReader");
return nullptr;
}

info->response_reader_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::data_available());

// lambda to delete datareader
auto cleanup_datareader = rcpputils::make_scope_exit(
[subscriber, info]() {
Expand Down Expand Up @@ -375,17 +379,22 @@ rmw_create_client(
return nullptr;
}

// Creates DataWriter
// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->request_writer_ = publisher->create_datawriter(
request_topic.topic,
writer_qos,
info->pub_listener_);
info->pub_listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());

if (!info->request_writer_) {
RMW_SET_ERROR_MSG("create_client() failed to create request DataWriter");
return nullptr;
}

// Set the StatusCondition to none to prevent triggering via WaitSets
info->request_writer_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::none());

// lambda to delete datawriter
auto cleanup_datawriter = rcpputils::make_scope_exit(
[publisher, info]() {
Expand Down
15 changes: 12 additions & 3 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,17 @@ rmw_create_service(
info->request_reader_ = subscriber->create_datareader(
request_topic_desc,
reader_qos,
info->listener_);
info->listener_,
eprosima::fastdds::dds::StatusMask::subscription_matched());

if (!info->request_reader_) {
RMW_SET_ERROR_MSG("create_service() failed to create request DataReader");
return nullptr;
}

info->request_reader_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::data_available());

// lambda to delete datareader
auto cleanup_datareader = rcpputils::make_scope_exit(
[subscriber, info]() {
Expand Down Expand Up @@ -378,17 +382,22 @@ rmw_create_service(
return nullptr;
}

// Creates DataWriter
// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->response_writer_ = publisher->create_datawriter(
response_topic.topic,
writer_qos,
info->pub_listener_);
info->pub_listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());

if (!info->response_writer_) {
RMW_SET_ERROR_MSG("create_service() failed to create response DataWriter");
return nullptr;
}

// Set the StatusCondition to none to prevent triggering via WaitSets
info->response_writer_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::none());

// lambda to delete datawriter
auto cleanup_datawriter = rcpputils::make_scope_exit(
[publisher, info]() {
Expand Down
3 changes: 1 addition & 2 deletions rmw_fastrtps_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ rmw_create_subscription(
topic_name,
&adapted_qos_policies,
subscription_options,
false, // use no keyed topic
true); // create subscription listener
false); // use no keyed topic
if (!subscription) {
return nullptr;
}
Expand Down
27 changes: 16 additions & 11 deletions rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ create_subscription(
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options,
bool keyed,
bool create_subscription_listener)
bool keyed)
{
/////
// Check input parameters
Expand Down Expand Up @@ -164,7 +163,8 @@ create_subscription(
}

auto cleanup_info = rcpputils::make_scope_exit(
[info, dds_participant]() {
[info, dds_participant]()
{
delete info->listener_;
if (info->type_support_) {
dds_participant->unregister_type(info->type_support_.get_type_name());
Expand Down Expand Up @@ -208,12 +208,10 @@ create_subscription(

/////
// Create Listener
if (create_subscription_listener) {
info->listener_ = new (std::nothrow) SubListener(info, qos_policies->depth);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener");
return nullptr;
}
info->listener_ = new (std::nothrow) SubListener(info);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener");
return nullptr;
}

/////
Expand Down Expand Up @@ -297,9 +295,14 @@ create_subscription(
return nullptr;
}

// Initialize DataReader's StatusCondition to be notified when new data is available
info->data_reader_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::data_available());

// lambda to delete datareader
auto cleanup_datareader = rcpputils::make_scope_exit(
[subscriber, info]() {
[subscriber, info]()
{
subscriber->delete_datareader(info->data_reader_);
});

Expand All @@ -316,7 +319,8 @@ create_subscription(
return nullptr;
}
auto cleanup_rmw_subscription = rcpputils::make_scope_exit(
[rmw_subscription]() {
[rmw_subscription]()
{
rmw_free(const_cast<char *>(rmw_subscription->topic_name));
rmw_subscription_free(rmw_subscription);
});
Expand Down Expand Up @@ -345,4 +349,5 @@ create_subscription(
info->subscription_gid_.data);
return rmw_subscription;
}

} // namespace rmw_fastrtps_cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ create_subscription(
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options,
bool keyed,
bool create_subscription_listener);
bool keyed);

} // namespace rmw_fastrtps_dynamic_cpp

Expand Down
26 changes: 16 additions & 10 deletions rmw_fastrtps_dynamic_cpp/src/init_rmw_context_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ using rmw_dds_common::msg::ParticipantEntitiesInfo;

static
rmw_ret_t
init_context_impl(rmw_context_t * context)
init_context_impl(
rmw_context_t * context)
{
rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options();
rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options();
Expand All @@ -65,7 +66,8 @@ init_context_impl(rmw_context_t * context)
(context->options.localhost_only == RMW_LOCALHOST_ONLY_ENABLED) ? 1 : 0,
context->options.enclave,
common_context.get()),
[&](CustomParticipantInfo * participant_info) {
[&](CustomParticipantInfo * participant_info)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_participant(participant_info)) {
RCUTILS_SAFE_FWRITE_TO_STDERR(
"Failed to destroy participant after function: '"
Expand All @@ -92,9 +94,10 @@ init_context_impl(rmw_context_t * context)
"ros_discovery_info",
&qos,
&publisher_options,
false, // our fastrtps typesupport doesn't support keyed topics
false, // our fastrtps typesupport doesn't support keyed topics
true),
[&](rmw_publisher_t * pub) {
[&](rmw_publisher_t * pub)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_publisher(
eprosima_fastrtps_identifier,
participant_info.get(),
Expand All @@ -119,9 +122,9 @@ init_context_impl(rmw_context_t * context)
"ros_discovery_info",
&qos,
&subscription_options,
false, // our fastrtps typesupport doesn't support keyed topics
true),
[&](rmw_subscription_t * sub) {
false), // our fastrtps typesupport doesn't support keyed topics
[&](rmw_subscription_t * sub)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_subscription(
eprosima_fastrtps_identifier,
participant_info.get(),
Expand All @@ -139,7 +142,8 @@ init_context_impl(rmw_context_t * context)
std::unique_ptr<rmw_guard_condition_t, std::function<void(rmw_guard_condition_t *)>>
graph_guard_condition(
rmw_fastrtps_shared_cpp::__rmw_create_guard_condition(eprosima_fastrtps_identifier),
[&](rmw_guard_condition_t * p) {
[&](rmw_guard_condition_t * p)
{
rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_destroy_guard_condition(p);
if (ret != RMW_RET_OK) {
RMW_SAFE_FWRITE_TO_STDERR(
Expand All @@ -166,7 +170,8 @@ init_context_impl(rmw_context_t * context)
}

common_context->graph_cache.set_on_change_callback(
[guard_condition = graph_guard_condition.get()]() {
[guard_condition = graph_guard_condition.get()]()
{
rmw_fastrtps_shared_cpp::__rmw_trigger_guard_condition(
eprosima_fastrtps_identifier,
guard_condition);
Expand All @@ -185,7 +190,8 @@ init_context_impl(rmw_context_t * context)
}

rmw_ret_t
rmw_fastrtps_dynamic_cpp::increment_context_impl_ref_count(rmw_context_t * context)
rmw_fastrtps_dynamic_cpp::increment_context_impl_ref_count(
rmw_context_t * context)
{
assert(context);
assert(context->impl);
Expand Down
6 changes: 5 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,17 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
info->data_writer_ = publisher->create_datawriter(
topic.topic,
writer_qos,
info->listener_);
info->listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());

if (!info->data_writer_) {
RMW_SET_ERROR_MSG("create_publisher() could not create data writer");
return nullptr;
}

info->data_writer_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::none());

// lambda to delete datawriter
auto cleanup_datawriter = rcpputils::make_scope_exit(
[publisher, info]() {
Expand Down
Loading