Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid using dds common public mutex directly #474

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
280 changes: 135 additions & 145 deletions rmw_cyclonedds_cpp/src/rmw_node.cpp
Expand Up @@ -1757,26 +1757,22 @@ extern "C" rmw_node_t * rmw_create_node(
RET_ALLOC_X(node->namespace_, return nullptr);
memcpy(const_cast<char *>(node->namespace_), namespace_, strlen(namespace_) + 1);

{
// Though graph_cache methods are thread safe, both cache update and publishing have to also
// be atomic.
// If not, the following race condition is possible:
// node1-update-get-message / node2-update-get-message / node2-publish / node1-publish
// In that case, the last message published is not accurate.
auto common = &context->impl->common;
std::lock_guard<std::mutex> guard(common->node_update_mutex);
rmw_dds_common::msg::ParticipantEntitiesInfo participant_msg =
common->graph_cache.add_node(common->gid, name, namespace_);
if (RMW_RET_OK != rmw_publish(
common->pub,
static_cast<void *>(&participant_msg),
nullptr))
{
// If publishing the message failed, we don't have to publish an update
// after removing it from the graph cache */
static_cast<void>(common->graph_cache.remove_node(common->gid, name, namespace_));
return nullptr;
}
// Though graph_cache methods are thread safe, both cache update and publishing have to also
// be atomic.
// If not, the following race condition is possible:
// node1-update-get-message / node2-update-get-message / node2-publish / node1-publish
// In that case, the last message published is not accurate.
clalancette marked this conversation as resolved.
Show resolved Hide resolved
auto common = &context->impl->common;
rmw_ret_t rmw_ret = common->update_node_graph(
name, namespace_,
[](rmw_publisher_t * pub, void * msg) {
return rmw_publish(
pub,
msg,
nullptr);
});
if (RMW_RET_OK != rmw_ret) {
return nullptr;
}

cleanup_node.cancel();
Expand All @@ -1798,19 +1794,20 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t * node)
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
auto node_impl = static_cast<CddsNode *>(node->data);

{
// Though graph_cache methods are thread safe, both cache update and publishing have to also
// be atomic.
// If not, the following race condition is possible:
// node1-update-get-message / node2-update-get-message / node2-publish / node1-publish
// In that case, the last message published is not accurate.
auto common = &node->context->impl->common;
std::lock_guard<std::mutex> guard(common->node_update_mutex);
rmw_dds_common::msg::ParticipantEntitiesInfo participant_msg =
common->graph_cache.remove_node(common->gid, node->name, node->namespace_);
result_ret = rmw_publish(
common->pub, static_cast<void *>(&participant_msg), nullptr);
}
// Though graph_cache methods are thread safe, both cache update and publishing have to also
// be atomic.
// If not, the following race condition is possible:
// node1-update-get-message / node2-update-get-message / node2-publish / node1-publish
// In that case, the last message published is not accurate.
clalancette marked this conversation as resolved.
Show resolved Hide resolved
auto common = &node->context->impl->common;
result_ret = common->destroy_node_graph(
node->name, node->namespace_,
[](rmw_publisher_t * pub, void * msg) {
return rmw_publish(
pub,
msg,
nullptr);
});

rmw_context_t * context = node->context;
rmw_free(const_cast<char *>(node->name));
Expand Down Expand Up @@ -2664,15 +2661,17 @@ extern "C" rmw_publisher_t * rmw_create_publisher(
// Update graph
auto common = &node->context->impl->common;
const auto cddspub = static_cast<const CddsPublisher *>(pub->data);
if (RMW_RET_OK != common->update_publisher_graph(
cddspub->gid,
node->name, node->namespace_,
[](rmw_publisher_t * pub, void * msg) {
return rmw_publish(
pub,
msg,
nullptr);
}))
{
std::lock_guard<std::mutex> guard(common->node_update_mutex);
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common->graph_cache.associate_writer(cddspub->gid, common->gid, node->name, node->namespace_);
if (RMW_RET_OK != rmw_publish(common->pub, static_cast<void *>(&msg), nullptr)) {
static_cast<void>(common->graph_cache.dissociate_writer(
cddspub->gid, common->gid, node->name, node->namespace_));
return nullptr;
}
return nullptr;
}

cleanup_publisher.cancel();
Expand Down Expand Up @@ -2957,21 +2956,21 @@ extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t *

rmw_ret_t ret = RMW_RET_OK;
rmw_error_state_t error_state;
{
auto common = &node->context->impl->common;
const auto cddspub = static_cast<const CddsPublisher *>(publisher->data);
std::lock_guard<std::mutex> guard(common->node_update_mutex);
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common->graph_cache.dissociate_writer(
cddspub->gid, common->gid, node->name,
node->namespace_);
rmw_ret_t publish_ret =
rmw_publish(common->pub, static_cast<void *>(&msg), nullptr);
if (RMW_RET_OK != publish_ret) {
error_state = *rmw_get_error_state();
ret = publish_ret;
rmw_reset_error();
}
auto common = &node->context->impl->common;
const auto cddspub = static_cast<const CddsPublisher *>(publisher->data);
rmw_ret_t publish_ret = common->destroy_publisher_graph(
cddspub->gid,
node->name, node->namespace_,
[](rmw_publisher_t * pub, void * msg) {
return rmw_publish(
pub,
msg,
nullptr);
});
if (RMW_RET_OK != publish_ret) {
error_state = *rmw_get_error_state();
ret = publish_ret;
rmw_reset_error();
}

rmw_ret_t inner_ret = destroy_publisher(publisher);
Expand Down Expand Up @@ -3204,16 +3203,16 @@ extern "C" rmw_subscription_t * rmw_create_subscription(
// Update graph
auto common = &node->context->impl->common;
const auto cddssub = static_cast<const CddsSubscription *>(sub->data);
std::lock_guard<std::mutex> guard(common->node_update_mutex);
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common->graph_cache.associate_reader(cddssub->gid, common->gid, node->name, node->namespace_);
if (RMW_RET_OK != rmw_publish(
common->pub,
static_cast<void *>(&msg),
nullptr))
if (RMW_RET_OK != common->update_subscriber_graph(
cddssub->gid,
node->name, node->namespace_,
[](rmw_publisher_t * pub, void * msg) {
return rmw_publish(
pub,
msg,
nullptr);
}))
{
static_cast<void>(common->graph_cache.dissociate_reader(
cddssub->gid, common->gid, node->name, node->namespace_));
return nullptr;
}

Expand Down Expand Up @@ -3327,20 +3326,21 @@ extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t * node, rmw_subscriptio
rmw_ret_t ret = RMW_RET_OK;
rmw_error_state_t error_state;
rmw_error_string_t error_string;
{
auto common = &node->context->impl->common;
const auto cddssub = static_cast<const CddsSubscription *>(subscription->data);
std::lock_guard<std::mutex> guard(common->node_update_mutex);
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common->graph_cache.dissociate_reader(
cddssub->gid, common->gid, node->name,
node->namespace_);
ret = rmw_publish(common->pub, static_cast<void *>(&msg), nullptr);
if (RMW_RET_OK != ret) {
error_state = *rmw_get_error_state();
error_string = rmw_get_error_string();
rmw_reset_error();
}
auto common = &node->context->impl->common;
const auto cddssub = static_cast<const CddsSubscription *>(subscription->data);
ret = common->destroy_publisher_graph(
cddssub->gid,
node->name, node->namespace_,
[](rmw_publisher_t * pub, void * msg) {
return rmw_publish(
pub,
msg,
nullptr);
});
if (RMW_RET_OK != ret) {
error_state = *rmw_get_error_state();
error_string = rmw_get_error_string();
rmw_reset_error();
}

rmw_ret_t local_ret = destroy_subscription(subscription);
Expand Down Expand Up @@ -5097,7 +5097,8 @@ static void rmw_fini_cs(CddsCS * cs)
dds_delete(cs->pub->enth);
}

static rmw_ret_t destroy_client(const rmw_node_t * node, rmw_client_t * client)
static rmw_ret_t destroy_client(
const rmw_node_t * node, rmw_client_t * client, bool destroy_graph = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please describe why we need the destroy_graph boolean here? It isn't totally clear to me.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After using make_scope_exit in the rmw_create_client, I removed the destroy_client and move its original implementation code into rmw_destroy_client since destroy_client is only called once, so there is no boolean flag.

{
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
Expand All @@ -5114,23 +5115,21 @@ static rmw_ret_t destroy_client(const rmw_node_t * node, rmw_client_t * client)
auto info = static_cast<CddsClient *>(client->data);
clean_waitset_caches();

{
// Update graph
// Update graph
if (destroy_graph) {
auto common = &node->context->impl->common;
std::lock_guard<std::mutex> guard(common->node_update_mutex);
static_cast<void>(common->graph_cache.dissociate_writer(
info->client.pub->gid, common->gid,
node->name, node->namespace_));
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common->graph_cache.dissociate_reader(
info->client.sub->gid, common->gid, node->name,
node->namespace_);
if (RMW_RET_OK != rmw_publish(
common->pub,
static_cast<void *>(&msg),
nullptr))
if (RMW_RET_OK != common->destroy_client_graph(
info->client.pub->gid,
info->client.sub->gid,
node->name, node->namespace_,
[](rmw_publisher_t * pub, void * msg) {
return rmw_publish(
pub,
msg,
nullptr);
}))
{
RMW_SET_ERROR_MSG("failed to publish ParticipantEntitiesInfo when destroying service");
RMW_SET_ERROR_MSG("failed to publish ParticipantEntitiesInfo when destroying client");
}
}

Expand Down Expand Up @@ -5163,32 +5162,28 @@ extern "C" rmw_client_t * rmw_create_client(
return nullptr;
}
rmw_client_t * rmw_client = rmw_client_allocate();
auto common = &node->context->impl->common;
RET_NULL_X(rmw_client, goto fail_client);
rmw_client->implementation_identifier = eclipse_cyclonedds_identifier;
rmw_client->data = info;
rmw_client->service_name = reinterpret_cast<const char *>(rmw_allocate(strlen(service_name) + 1));
RET_NULL_X(rmw_client->service_name, goto fail_service_name);
memcpy(const_cast<char *>(rmw_client->service_name), service_name, strlen(service_name) + 1);

// Update graph
if (RMW_RET_OK != common->update_client_graph(
info->client.pub->gid,
info->client.sub->gid,
node->name, node->namespace_,
[](rmw_publisher_t * pub, void * msg) {
return rmw_publish(
pub,
msg,
nullptr);
}))
{
// Update graph
auto common = &node->context->impl->common;
std::lock_guard<std::mutex> guard(common->node_update_mutex);
static_cast<void>(common->graph_cache.associate_writer(
info->client.pub->gid, common->gid,
node->name, node->namespace_));
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common->graph_cache.associate_reader(
info->client.sub->gid, common->gid, node->name,
node->namespace_);
if (RMW_RET_OK != rmw_publish(
common->pub,
static_cast<void *>(&msg),
nullptr))
{
static_cast<void>(destroy_client(node, rmw_client));
return nullptr;
}
static_cast<void>(destroy_client(node, rmw_client, /* destroy_graph = */ false));
return nullptr;
}

return rmw_client;
Expand All @@ -5205,7 +5200,8 @@ extern "C" rmw_ret_t rmw_destroy_client(rmw_node_t * node, rmw_client_t * client
return destroy_client(node, client);
}

static rmw_ret_t destroy_service(const rmw_node_t * node, rmw_service_t * service)
static rmw_ret_t destroy_service(
const rmw_node_t * node, rmw_service_t * service, bool destroy_graph = true)
{
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
Expand All @@ -5222,21 +5218,19 @@ static rmw_ret_t destroy_service(const rmw_node_t * node, rmw_service_t * servic
auto info = static_cast<CddsService *>(service->data);
clean_waitset_caches();

{
if (destroy_graph) {
// Update graph
auto common = &node->context->impl->common;
std::lock_guard<std::mutex> guard(common->node_update_mutex);
static_cast<void>(common->graph_cache.dissociate_writer(
info->service.pub->gid, common->gid,
node->name, node->namespace_));
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common->graph_cache.dissociate_reader(
info->service.sub->gid, common->gid, node->name,
node->namespace_);
if (RMW_RET_OK != rmw_publish(
common->pub,
static_cast<void *>(&msg),
nullptr))
if (RMW_RET_OK != common->destroy_service_graph(
info->service.sub->gid,
info->service.pub->gid,
node->name, node->namespace_,
[](rmw_publisher_t * pub, void * msg) {
return rmw_publish(
pub,
msg,
nullptr);
}))
{
RMW_SET_ERROR_MSG("failed to publish ParticipantEntitiesInfo when destroying service");
}
Expand Down Expand Up @@ -5268,6 +5262,7 @@ extern "C" rmw_service_t * rmw_create_service(
return nullptr;
}
rmw_service_t * rmw_service = rmw_service_allocate();
auto common = &node->context->impl->common;
RET_NULL_X(rmw_service, goto fail_service);
rmw_service->implementation_identifier = eclipse_cyclonedds_identifier;
rmw_service->data = info;
Expand All @@ -5276,25 +5271,20 @@ extern "C" rmw_service_t * rmw_create_service(
RET_NULL_X(rmw_service->service_name, goto fail_service_name);
memcpy(const_cast<char *>(rmw_service->service_name), service_name, strlen(service_name) + 1);

// Update graph
if (RMW_RET_OK != common->update_service_graph(
info->service.sub->gid,
info->service.pub->gid,
node->name, node->namespace_,
[](rmw_publisher_t * pub, void * msg) {
return rmw_publish(
pub,
msg,
nullptr);
}))
{
// Update graph
auto common = &node->context->impl->common;
std::lock_guard<std::mutex> guard(common->node_update_mutex);
static_cast<void>(common->graph_cache.associate_writer(
info->service.pub->gid, common->gid,
node->name, node->namespace_));
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common->graph_cache.associate_reader(
info->service.sub->gid, common->gid, node->name,
node->namespace_);
if (RMW_RET_OK != rmw_publish(
common->pub,
static_cast<void *>(&msg),
nullptr))
{
static_cast<void>(destroy_service(node, rmw_service));
return nullptr;
}
static_cast<void>(destroy_service(node, rmw_service, /* destroy_graph = */ false));
return nullptr;
}

return rmw_service;
Expand Down