Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid using dds common public mutex directly #725

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions rmw_fastrtps_cpp/src/init_rmw_context_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ init_context_impl(
common_context->gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, participant_info->participant_->guid());
common_context->pub = publisher.get();
common_context->publish_callback = [](const rmw_publisher_t * pub, const void * msg) {
return rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
pub,
msg,
nullptr);
};
common_context->sub = subscription.get();
common_context->graph_guard_condition = graph_guard_condition.get();

Expand Down
49 changes: 12 additions & 37 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,43 +442,18 @@ rmw_create_client(
}
memcpy(const_cast<char *>(rmw_client->service_name), service_name, strlen(service_name) + 1);

{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_gid_t request_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_writer_->guid());
common_context->graph_cache.associate_writer(
request_publisher_gid,
common_context->gid,
node->name,
node->namespace_);

rmw_gid_t response_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_reader_->guid());
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_reader(
response_subscriber_gid,
common_context->gid,
node->name,
node->namespace_);
rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != ret) {
common_context->graph_cache.dissociate_reader(
response_subscriber_gid,
common_context->gid,
node->name,
node->namespace_);
common_context->graph_cache.dissociate_writer(
request_publisher_gid,
common_context->gid,
node->name,
node->namespace_);
return nullptr;
}
// Update graph
rmw_gid_t request_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_writer_->guid());
rmw_gid_t response_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_reader_->guid());
rmw_ret_t rmw_ret = common_context->update_client_graph(
request_publisher_gid, response_subscriber_gid,
node->name, node->namespace_
);

if (RMW_RET_OK != rmw_ret) {
return nullptr;
}

cleanup_rmw_client.cancel();
Expand Down
40 changes: 17 additions & 23 deletions rmw_fastrtps_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,33 +103,27 @@ rmw_create_publisher(
auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);

auto info = static_cast<const CustomPublisherInfo *>(publisher->data);
{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_writer(
info->publisher_gid, common_context->gid, node->name, node->namespace_);
rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
static_cast<void *>(&msg),
nullptr);

// Update graph
rmw_ret_t rmw_ret = common_context->update_publisher_graph(
info->publisher_gid,
node->name, node->namespace_
);

if (RMW_RET_OK != rmw_ret) {
rmw_error_state_t error_state = *rmw_get_error_state();
rmw_reset_error();
rmw_ret = rmw_fastrtps_shared_cpp::destroy_publisher(
eprosima_fastrtps_identifier, participant_info, publisher);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I almost feel like we should make this an RCPPUTILS_SCOPE_EXIT right after we successfully create the publisher above. That way it will be destroyed in all error paths. Thoughts?

(the same goes for the subscription and the dynamic versions below)

if (RMW_RET_OK != rmw_ret) {
rmw_error_state_t error_state = *rmw_get_error_state();
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
rmw_reset_error();
static_cast<void>(common_context->graph_cache.dissociate_writer(
info->publisher_gid, common_context->gid, node->name, node->namespace_));
rmw_ret = rmw_fastrtps_shared_cpp::destroy_publisher(
eprosima_fastrtps_identifier, participant_info, publisher);
if (RMW_RET_OK != rmw_ret) {
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
rmw_reset_error();
}
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
return nullptr;
}
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
return nullptr;
}

return publisher;
}

Expand Down
48 changes: 12 additions & 36 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,42 +442,18 @@ rmw_create_service(
}
memcpy(const_cast<char *>(rmw_service->service_name), service_name, strlen(service_name) + 1);

{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_gid_t request_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_reader_->guid());
common_context->graph_cache.associate_reader(
request_subscriber_gid,
common_context->gid,
node->name,
node->namespace_);
rmw_gid_t response_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_writer_->guid());
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_writer(
response_publisher_gid,
common_context->gid,
node->name,
node->namespace_);
rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != ret) {
common_context->graph_cache.dissociate_writer(
response_publisher_gid,
common_context->gid,
node->name,
node->namespace_);
common_context->graph_cache.dissociate_reader(
request_subscriber_gid,
common_context->gid,
node->name,
node->namespace_);
return nullptr;
}
// Update graph
rmw_gid_t request_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_reader_->guid());
rmw_gid_t response_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_writer_->guid());
rmw_ret_t rmw_ret = common_context->update_service_graph(
request_subscriber_gid, response_publisher_gid,
node->name, node->namespace_
);

if (RMW_RET_OK != rmw_ret) {
return nullptr;
}

cleanup_rmw_service.cancel();
Expand Down
39 changes: 16 additions & 23 deletions rmw_fastrtps_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,33 +97,26 @@ rmw_create_subscription(
auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);
auto info = static_cast<CustomSubscriberInfo *>(subscription->data);

{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_reader(
info->subscription_gid_, common_context->gid, node->name, node->namespace_);
rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
static_cast<void *>(&msg),
nullptr);
// Update graph
rmw_ret_t rmw_ret = common_context->update_subscriber_graph(
info->subscription_gid_,
node->name, node->namespace_
);

if (RMW_RET_OK != rmw_ret) {
rmw_error_state_t error_state = *rmw_get_error_state();
rmw_reset_error();
rmw_ret = rmw_fastrtps_shared_cpp::destroy_subscription(
eprosima_fastrtps_identifier, participant_info, subscription);
if (RMW_RET_OK != rmw_ret) {
rmw_error_state_t error_state = *rmw_get_error_state();
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
rmw_reset_error();
static_cast<void>(common_context->graph_cache.dissociate_reader(
info->subscription_gid_, common_context->gid, node->name, node->namespace_));
rmw_ret = rmw_fastrtps_shared_cpp::destroy_subscription(
eprosima_fastrtps_identifier, participant_info, subscription);
if (RMW_RET_OK != rmw_ret) {
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
rmw_reset_error();
}
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
return nullptr;
}
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
return nullptr;
}

info->node_ = node;
info->common_context_ = common_context;

Expand Down
7 changes: 7 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/init_rmw_context_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ init_context_impl(
common_context->gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, participant_info->participant_->guid());
common_context->pub = publisher.get();
common_context->publish_callback = [](const rmw_publisher_t * pub, const void * msg) {
return rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
pub,
msg,
nullptr);
};
common_context->sub = subscription.get();
common_context->graph_guard_condition = graph_guard_condition.get();

Expand Down
47 changes: 13 additions & 34 deletions rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,40 +471,19 @@ rmw_create_client(
}
memcpy(const_cast<char *>(rmw_client->service_name), service_name, strlen(service_name) + 1);

{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_gid_t request_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_writer_->guid());
common_context->graph_cache.associate_writer(
request_publisher_gid,
common_context->gid,
node->name,
node->namespace_);

rmw_gid_t response_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_reader_->guid());
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_reader(
response_subscriber_gid, common_context->gid, node->name, node->namespace_);
rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != rmw_ret) {
common_context->graph_cache.dissociate_reader(
response_subscriber_gid,
common_context->gid,
node->name,
node->namespace_);
common_context->graph_cache.dissociate_writer(
request_publisher_gid,
common_context->gid,
node->name,
node->namespace_);
return nullptr;
}
// Update graph
rmw_gid_t request_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_writer_->guid());
rmw_gid_t response_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_reader_->guid());

rmw_ret_t rmw_ret = common_context->update_client_graph(
request_publisher_gid, response_subscriber_gid,
node->name, node->namespace_
);

if (RMW_RET_OK != rmw_ret) {
return nullptr;
}

cleanup_rmw_client.cancel();
Expand Down
40 changes: 17 additions & 23 deletions rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,33 +103,27 @@ rmw_create_publisher(
auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);

auto info = static_cast<const CustomPublisherInfo *>(publisher->data);
{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_writer(
info->publisher_gid, common_context->gid, node->name, node->namespace_);
rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
static_cast<void *>(&msg),
nullptr);

// Update graph
rmw_ret_t rmw_ret = common_context->update_publisher_graph(
info->publisher_gid,
node->name, node->namespace_
);

if (RMW_RET_OK != rmw_ret) {
rmw_error_state_t error_state = *rmw_get_error_state();
rmw_reset_error();
rmw_ret = rmw_fastrtps_shared_cpp::destroy_publisher(
eprosima_fastrtps_identifier, participant_info, publisher);
if (RMW_RET_OK != rmw_ret) {
rmw_error_state_t error_state = *rmw_get_error_state();
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
rmw_reset_error();
static_cast<void>(common_context->graph_cache.dissociate_writer(
info->publisher_gid, common_context->gid, node->name, node->namespace_));
rmw_ret = rmw_fastrtps_shared_cpp::destroy_publisher(
eprosima_fastrtps_identifier, participant_info, publisher);
if (RMW_RET_OK != rmw_ret) {
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
rmw_reset_error();
}
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
return nullptr;
}
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
return nullptr;
}

return publisher;
}

Expand Down
46 changes: 12 additions & 34 deletions rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,40 +471,18 @@ rmw_create_service(
}
memcpy(const_cast<char *>(rmw_service->service_name), service_name, strlen(service_name) + 1);

{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_gid_t request_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_reader_->guid());
common_context->graph_cache.associate_reader(
request_subscriber_gid,
common_context->gid,
node->name,
node->namespace_);

rmw_gid_t response_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_writer_->guid());
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_writer(
response_publisher_gid, common_context->gid, node->name, node->namespace_);
rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != rmw_ret) {
common_context->graph_cache.dissociate_writer(
response_publisher_gid,
common_context->gid,
node->name,
node->namespace_);
common_context->graph_cache.dissociate_reader(
request_subscriber_gid,
common_context->gid,
node->name,
node->namespace_);
return nullptr;
}
// Update graph
rmw_gid_t request_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_reader_->guid());
rmw_gid_t response_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_writer_->guid());
rmw_ret_t rmw_ret = common_context->update_service_graph(
request_subscriber_gid, response_publisher_gid,
node->name, node->namespace_
);

if (RMW_RET_OK != rmw_ret) {
return nullptr;
}

cleanup_rmw_service.cancel();
Expand Down