Skip to content

Commit

Permalink
avoid using dds common public mutex directly (#725)
Browse files Browse the repository at this point in the history
* call method to update graph instead of using mutex directly

* initialize publish_callback once

* use make_scope_exit to make it destroyed all error paths

Signed-off-by: Chen Lihui <lihui.chen@sony.com>
  • Loading branch information
Chen Lihui committed Nov 3, 2023
1 parent 5b27338 commit d518e06
Show file tree
Hide file tree
Showing 15 changed files with 191 additions and 353 deletions.
7 changes: 7 additions & 0 deletions rmw_fastrtps_cpp/src/init_rmw_context_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ init_context_impl(
common_context->gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, participant_info->participant_->guid());
common_context->pub = publisher.get();
common_context->publish_callback = [](const rmw_publisher_t * pub, const void * msg) {
return rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
pub,
msg,
nullptr);
};
common_context->sub = subscription.get();
common_context->graph_guard_condition = graph_guard_condition.get();

Expand Down
45 changes: 9 additions & 36 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,43 +442,16 @@ rmw_create_client(
}
memcpy(const_cast<char *>(rmw_client->service_name), service_name, strlen(service_name) + 1);

// Update graph
rmw_gid_t request_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_writer_->guid());
rmw_gid_t response_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_reader_->guid());
if (RMW_RET_OK != common_context->add_client_graph(
request_publisher_gid, response_subscriber_gid,
node->name, node->namespace_))
{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_gid_t request_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_writer_->guid());
common_context->graph_cache.associate_writer(
request_publisher_gid,
common_context->gid,
node->name,
node->namespace_);

rmw_gid_t response_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_reader_->guid());
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_reader(
response_subscriber_gid,
common_context->gid,
node->name,
node->namespace_);
rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != ret) {
common_context->graph_cache.dissociate_reader(
response_subscriber_gid,
common_context->gid,
node->name,
node->namespace_);
common_context->graph_cache.dissociate_writer(
request_publisher_gid,
common_context->gid,
node->name,
node->namespace_);
return nullptr;
}
return nullptr;
}

cleanup_rmw_client.cancel();
Expand Down
43 changes: 20 additions & 23 deletions rmw_fastrtps_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/rmw.h"

#include "rcpputils/scope_exit.hpp"

#include "rmw/impl/cpp/macros.hpp"

#include "rmw_dds_common/qos.hpp"
Expand Down Expand Up @@ -99,37 +101,32 @@ rmw_create_publisher(
if (!publisher) {
return nullptr;
}

auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);

auto info = static_cast<const CustomPublisherInfo *>(publisher->data);
{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_writer(
info->publisher_gid, common_context->gid, node->name, node->namespace_);
rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != rmw_ret) {
auto cleanup_publisher = rcpputils::make_scope_exit(
[participant_info, publisher]() {
rmw_error_state_t error_state = *rmw_get_error_state();
rmw_reset_error();
static_cast<void>(common_context->graph_cache.dissociate_writer(
info->publisher_gid, common_context->gid, node->name, node->namespace_));
rmw_ret = rmw_fastrtps_shared_cpp::destroy_publisher(
eprosima_fastrtps_identifier, participant_info, publisher);
if (RMW_RET_OK != rmw_ret) {
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_publisher(
eprosima_fastrtps_identifier, participant_info, publisher))
{
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
rmw_reset_error();
}
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
return nullptr;
}
});

auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);
auto info = static_cast<const CustomPublisherInfo *>(publisher->data);

// Update graph
if (RMW_RET_OK != common_context->add_publisher_graph(
info->publisher_gid,
node->name, node->namespace_))
{
return nullptr;
}

cleanup_publisher.cancel();
return publisher;
}

Expand Down
44 changes: 9 additions & 35 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,42 +442,16 @@ rmw_create_service(
}
memcpy(const_cast<char *>(rmw_service->service_name), service_name, strlen(service_name) + 1);

// Update graph
rmw_gid_t request_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_reader_->guid());
rmw_gid_t response_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_writer_->guid());
if (RMW_RET_OK != common_context->add_service_graph(
request_subscriber_gid, response_publisher_gid,
node->name, node->namespace_))
{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_gid_t request_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_reader_->guid());
common_context->graph_cache.associate_reader(
request_subscriber_gid,
common_context->gid,
node->name,
node->namespace_);
rmw_gid_t response_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_writer_->guid());
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_writer(
response_publisher_gid,
common_context->gid,
node->name,
node->namespace_);
rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != ret) {
common_context->graph_cache.dissociate_writer(
response_publisher_gid,
common_context->gid,
node->name,
node->namespace_);
common_context->graph_cache.dissociate_reader(
request_subscriber_gid,
common_context->gid,
node->name,
node->namespace_);
return nullptr;
}
return nullptr;
}

cleanup_rmw_service.cancel();
Expand Down
43 changes: 20 additions & 23 deletions rmw_fastrtps_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/rmw.h"

#include "rcpputils/scope_exit.hpp"

#include "rmw_dds_common/qos.hpp"

#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp"
Expand Down Expand Up @@ -93,40 +95,35 @@ rmw_create_subscription(
if (!subscription) {
return nullptr;
}

auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);
auto info = static_cast<CustomSubscriberInfo *>(subscription->data);

{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_reader(
info->subscription_gid_, common_context->gid, node->name, node->namespace_);
rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != rmw_ret) {
auto cleanup_subscription = rcpputils::make_scope_exit(
[participant_info, subscription]() {
rmw_error_state_t error_state = *rmw_get_error_state();
rmw_reset_error();
static_cast<void>(common_context->graph_cache.dissociate_reader(
info->subscription_gid_, common_context->gid, node->name, node->namespace_));
rmw_ret = rmw_fastrtps_shared_cpp::destroy_subscription(
eprosima_fastrtps_identifier, participant_info, subscription);
if (RMW_RET_OK != rmw_ret) {
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_subscription(
eprosima_fastrtps_identifier, participant_info, subscription))
{
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
rmw_reset_error();
}
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
return nullptr;
}
});

auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);
auto info = static_cast<CustomSubscriberInfo *>(subscription->data);

// Update graph
if (RMW_RET_OK != common_context->add_subscriber_graph(
info->subscription_gid_,
node->name, node->namespace_))
{
return nullptr;
}

info->node_ = node;
info->common_context_ = common_context;

cleanup_subscription.cancel();
return subscription;
}

Expand Down
7 changes: 7 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/init_rmw_context_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ init_context_impl(
common_context->gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, participant_info->participant_->guid());
common_context->pub = publisher.get();
common_context->publish_callback = [](const rmw_publisher_t * pub, const void * msg) {
return rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
pub,
msg,
nullptr);
};
common_context->sub = subscription.get();
common_context->graph_guard_condition = graph_guard_condition.get();

Expand Down
42 changes: 9 additions & 33 deletions rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,40 +471,16 @@ rmw_create_client(
}
memcpy(const_cast<char *>(rmw_client->service_name), service_name, strlen(service_name) + 1);

// Update graph
rmw_gid_t request_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_writer_->guid());
rmw_gid_t response_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_reader_->guid());
if (RMW_RET_OK != common_context->add_client_graph(
request_publisher_gid, response_subscriber_gid,
node->name, node->namespace_))
{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_gid_t request_publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_writer_->guid());
common_context->graph_cache.associate_writer(
request_publisher_gid,
common_context->gid,
node->name,
node->namespace_);

rmw_gid_t response_subscriber_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_reader_->guid());
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_reader(
response_subscriber_gid, common_context->gid, node->name, node->namespace_);
rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != rmw_ret) {
common_context->graph_cache.dissociate_reader(
response_subscriber_gid,
common_context->gid,
node->name,
node->namespace_);
common_context->graph_cache.dissociate_writer(
request_publisher_gid,
common_context->gid,
node->name,
node->namespace_);
return nullptr;
}
return nullptr;
}

cleanup_rmw_client.cancel();
Expand Down
43 changes: 20 additions & 23 deletions rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/rmw.h"

#include "rcpputils/scope_exit.hpp"

#include "rmw/impl/cpp/macros.hpp"

#include "rmw_dds_common/qos.hpp"
Expand Down Expand Up @@ -99,37 +101,32 @@ rmw_create_publisher(
if (!publisher) {
return nullptr;
}

auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);

auto info = static_cast<const CustomPublisherInfo *>(publisher->data);
{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_writer(
info->publisher_gid, common_context->gid, node->name, node->namespace_);
rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != rmw_ret) {
auto cleanup_publisher = rcpputils::make_scope_exit(
[participant_info, publisher]() {
rmw_error_state_t error_state = *rmw_get_error_state();
rmw_reset_error();
static_cast<void>(common_context->graph_cache.dissociate_writer(
info->publisher_gid, common_context->gid, node->name, node->namespace_));
rmw_ret = rmw_fastrtps_shared_cpp::destroy_publisher(
eprosima_fastrtps_identifier, participant_info, publisher);
if (RMW_RET_OK != rmw_ret) {
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_publisher(
eprosima_fastrtps_identifier, participant_info, publisher))
{
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
rmw_reset_error();
}
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
return nullptr;
}
});

auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);
auto info = static_cast<const CustomPublisherInfo *>(publisher->data);

// Update graph
if (RMW_RET_OK != common_context->add_publisher_graph(
info->publisher_gid,
node->name, node->namespace_))
{
return nullptr;
}

cleanup_publisher.cancel();
return publisher;
}

Expand Down

0 comments on commit d518e06

Please sign in to comment.