diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 9667d94b..0594465c 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -1450,6 +1450,12 @@ rmw_context_impl_s::init(rmw_init_options_t * options, size_t domain_id) this->clean_up(); return RMW_RET_ERROR; } + this->common.publish_callback = [](const rmw_publisher_t * pub, const void * msg) { + return rmw_publish( + pub, + msg, + nullptr); + }; rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options(); subscription_options.ignore_local_publications = true; @@ -1512,6 +1518,9 @@ rmw_context_impl_t::clean_up() destroy_publisher(common.pub); common.pub = nullptr; } + if (common.publish_callback) { + common.publish_callback = nullptr; + } if (common.sub) { destroy_subscription(common.sub); common.sub = nullptr; @@ -1757,26 +1766,11 @@ extern "C" rmw_node_t * rmw_create_node( RET_ALLOC_X(node->namespace_, return nullptr); memcpy(const_cast(node->namespace_), namespace_, strlen(namespace_) + 1); - { - // Though graph_cache methods are thread safe, both cache update and publishing have to also - // be atomic. - // If not, the following race condition is possible: - // node1-update-get-message / node2-update-get-message / node2-publish / node1-publish - // In that case, the last message published is not accurate. - auto common = &context->impl->common; - std::lock_guard guard(common->node_update_mutex); - rmw_dds_common::msg::ParticipantEntitiesInfo participant_msg = - common->graph_cache.add_node(common->gid, name, namespace_); - if (RMW_RET_OK != rmw_publish( - common->pub, - static_cast(&participant_msg), - nullptr)) - { - // If publishing the message failed, we don't have to publish an update - // after removing it from the graph cache */ - static_cast(common->graph_cache.remove_node(common->gid, name, namespace_)); - return nullptr; - } + auto common = &context->impl->common; + rmw_ret_t rmw_ret = common->add_node_graph( + name, namespace_); + if (RMW_RET_OK != rmw_ret) { + return nullptr; } cleanup_node.cancel(); @@ -1798,19 +1792,9 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t * node) return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); auto node_impl = static_cast(node->data); - { - // Though graph_cache methods are thread safe, both cache update and publishing have to also - // be atomic. - // If not, the following race condition is possible: - // node1-update-get-message / node2-update-get-message / node2-publish / node1-publish - // In that case, the last message published is not accurate. - auto common = &node->context->impl->common; - std::lock_guard guard(common->node_update_mutex); - rmw_dds_common::msg::ParticipantEntitiesInfo participant_msg = - common->graph_cache.remove_node(common->gid, node->name, node->namespace_); - result_ret = rmw_publish( - common->pub, static_cast(&participant_msg), nullptr); - } + auto common = &node->context->impl->common; + result_ret = common->remove_node_graph( + node->name, node->namespace_); rmw_context_t * context = node->context; rmw_free(const_cast(node->name)); @@ -2664,15 +2648,11 @@ extern "C" rmw_publisher_t * rmw_create_publisher( // Update graph auto common = &node->context->impl->common; const auto cddspub = static_cast(pub->data); + if (RMW_RET_OK != common->add_publisher_graph( + cddspub->gid, + node->name, node->namespace_)) { - std::lock_guard guard(common->node_update_mutex); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common->graph_cache.associate_writer(cddspub->gid, common->gid, node->name, node->namespace_); - if (RMW_RET_OK != rmw_publish(common->pub, static_cast(&msg), nullptr)) { - static_cast(common->graph_cache.dissociate_writer( - cddspub->gid, common->gid, node->name, node->namespace_)); - return nullptr; - } + return nullptr; } cleanup_publisher.cancel(); @@ -2957,21 +2937,15 @@ extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * rmw_ret_t ret = RMW_RET_OK; rmw_error_state_t error_state; - { - auto common = &node->context->impl->common; - const auto cddspub = static_cast(publisher->data); - std::lock_guard guard(common->node_update_mutex); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common->graph_cache.dissociate_writer( - cddspub->gid, common->gid, node->name, - node->namespace_); - rmw_ret_t publish_ret = - rmw_publish(common->pub, static_cast(&msg), nullptr); - if (RMW_RET_OK != publish_ret) { - error_state = *rmw_get_error_state(); - ret = publish_ret; - rmw_reset_error(); - } + auto common = &node->context->impl->common; + const auto cddspub = static_cast(publisher->data); + rmw_ret_t publish_ret = common->remove_publisher_graph( + cddspub->gid, + node->name, node->namespace_); + if (RMW_RET_OK != publish_ret) { + error_state = *rmw_get_error_state(); + ret = publish_ret; + rmw_reset_error(); } rmw_ret_t inner_ret = destroy_publisher(publisher); @@ -3204,16 +3178,10 @@ extern "C" rmw_subscription_t * rmw_create_subscription( // Update graph auto common = &node->context->impl->common; const auto cddssub = static_cast(sub->data); - std::lock_guard guard(common->node_update_mutex); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common->graph_cache.associate_reader(cddssub->gid, common->gid, node->name, node->namespace_); - if (RMW_RET_OK != rmw_publish( - common->pub, - static_cast(&msg), - nullptr)) + if (RMW_RET_OK != common->add_subscriber_graph( + cddssub->gid, + node->name, node->namespace_)) { - static_cast(common->graph_cache.dissociate_reader( - cddssub->gid, common->gid, node->name, node->namespace_)); return nullptr; } @@ -3327,20 +3295,15 @@ extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t * node, rmw_subscriptio rmw_ret_t ret = RMW_RET_OK; rmw_error_state_t error_state; rmw_error_string_t error_string; - { - auto common = &node->context->impl->common; - const auto cddssub = static_cast(subscription->data); - std::lock_guard guard(common->node_update_mutex); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common->graph_cache.dissociate_reader( - cddssub->gid, common->gid, node->name, - node->namespace_); - ret = rmw_publish(common->pub, static_cast(&msg), nullptr); - if (RMW_RET_OK != ret) { - error_state = *rmw_get_error_state(); - error_string = rmw_get_error_string(); - rmw_reset_error(); - } + auto common = &node->context->impl->common; + const auto cddssub = static_cast(subscription->data); + ret = common->remove_publisher_graph( + cddssub->gid, + node->name, node->namespace_); + if (RMW_RET_OK != ret) { + error_state = *rmw_get_error_state(); + error_string = rmw_get_error_string(); + rmw_reset_error(); } rmw_ret_t local_ret = destroy_subscription(subscription); @@ -5097,50 +5060,6 @@ static void rmw_fini_cs(CddsCS * cs) dds_delete(cs->pub->enth); } -static rmw_ret_t destroy_client(const rmw_node_t * node, rmw_client_t * client) -{ - RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( - node, - node->implementation_identifier, - eclipse_cyclonedds_identifier, - return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( - client, - client->implementation_identifier, - eclipse_cyclonedds_identifier, - return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - auto info = static_cast(client->data); - clean_waitset_caches(); - - { - // Update graph - auto common = &node->context->impl->common; - std::lock_guard guard(common->node_update_mutex); - static_cast(common->graph_cache.dissociate_writer( - info->client.pub->gid, common->gid, - node->name, node->namespace_)); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common->graph_cache.dissociate_reader( - info->client.sub->gid, common->gid, node->name, - node->namespace_); - if (RMW_RET_OK != rmw_publish( - common->pub, - static_cast(&msg), - nullptr)) - { - RMW_SET_ERROR_MSG("failed to publish ParticipantEntitiesInfo when destroying service"); - } - } - - rmw_fini_cs(&info->client); - delete info; - rmw_free(const_cast(client->service_name)); - rmw_client_free(client); - return RMW_RET_OK; -} - extern "C" rmw_client_t * rmw_create_client( const rmw_node_t * node, const rosidl_service_type_support_t * type_supports, @@ -5149,6 +5068,11 @@ extern "C" rmw_client_t * rmw_create_client( { RMW_CHECK_ARGUMENT_FOR_NULL(qos_policies, nullptr); CddsClient * info = new CddsClient(); + auto cleanup_info = rcpputils::make_scope_exit( + [info]() { + delete (info); + }); + #if REPORT_BLOCKED_REQUESTS info->lastcheck = 0; #endif @@ -5159,53 +5083,52 @@ extern "C" rmw_client_t * rmw_create_client( &info->client, &info->user_callback_data, node, type_supports, service_name, &adapted_qos_policies, false) != RMW_RET_OK) { - delete (info); return nullptr; } + auto cleanup_fini_cs = rcpputils::make_scope_exit( + [info]() { + rmw_fini_cs(&info->client); + }); + rmw_client_t * rmw_client = rmw_client_allocate(); - RET_NULL_X(rmw_client, goto fail_client); + if (!rmw_client) { + return nullptr; + } + auto cleanup_client = rcpputils::make_scope_exit( + [rmw_client]() { + rmw_client_free(rmw_client); + }); + + auto common = &node->context->impl->common; rmw_client->implementation_identifier = eclipse_cyclonedds_identifier; rmw_client->data = info; rmw_client->service_name = reinterpret_cast(rmw_allocate(strlen(service_name) + 1)); - RET_NULL_X(rmw_client->service_name, goto fail_service_name); + if (!rmw_client->service_name) { + return nullptr; + } + auto cleanup_service_name = rcpputils::make_scope_exit( + [rmw_client]() { + rmw_free(const_cast(rmw_client->service_name)); + }); memcpy(const_cast(rmw_client->service_name), service_name, strlen(service_name) + 1); + // Update graph + if (RMW_RET_OK != common->add_client_graph( + info->client.pub->gid, + info->client.sub->gid, + node->name, node->namespace_)) { - // Update graph - auto common = &node->context->impl->common; - std::lock_guard guard(common->node_update_mutex); - static_cast(common->graph_cache.associate_writer( - info->client.pub->gid, common->gid, - node->name, node->namespace_)); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common->graph_cache.associate_reader( - info->client.sub->gid, common->gid, node->name, - node->namespace_); - if (RMW_RET_OK != rmw_publish( - common->pub, - static_cast(&msg), - nullptr)) - { - static_cast(destroy_client(node, rmw_client)); - return nullptr; - } + return nullptr; } + cleanup_service_name.cancel(); + cleanup_client.cancel(); + cleanup_fini_cs.cancel(); + cleanup_info.cancel(); return rmw_client; -fail_service_name: - rmw_client_free(rmw_client); -fail_client: - rmw_fini_cs(&info->client); - delete info; - return nullptr; } extern "C" rmw_ret_t rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) -{ - return destroy_client(node, client); -} - -static rmw_ret_t destroy_service(const rmw_node_t * node, rmw_service_t * service) { RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( @@ -5213,39 +5136,29 @@ static rmw_ret_t destroy_service(const rmw_node_t * node, rmw_service_t * servic node->implementation_identifier, eclipse_cyclonedds_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_TYPE_IDENTIFIERS_MATCH( - service, - service->implementation_identifier, + client, + client->implementation_identifier, eclipse_cyclonedds_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); - auto info = static_cast(service->data); + auto info = static_cast(client->data); clean_waitset_caches(); + // Update graph + auto common = &node->context->impl->common; + if (RMW_RET_OK != common->remove_client_graph( + info->client.pub->gid, + info->client.sub->gid, + node->name, node->namespace_)) { - // Update graph - auto common = &node->context->impl->common; - std::lock_guard guard(common->node_update_mutex); - static_cast(common->graph_cache.dissociate_writer( - info->service.pub->gid, common->gid, - node->name, node->namespace_)); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common->graph_cache.dissociate_reader( - info->service.sub->gid, common->gid, node->name, - node->namespace_); - if (RMW_RET_OK != rmw_publish( - common->pub, - static_cast(&msg), - nullptr)) - { - RMW_SET_ERROR_MSG("failed to publish ParticipantEntitiesInfo when destroying service"); - } + RMW_SET_ERROR_MSG("failed to publish ParticipantEntitiesInfo when destroying client"); } - rmw_fini_cs(&info->service); + rmw_fini_cs(&info->client); delete info; - rmw_free(const_cast(service->service_name)); - rmw_service_free(service); + rmw_free(const_cast(client->service_name)); + rmw_client_free(client); return RMW_RET_OK; } @@ -5257,6 +5170,10 @@ extern "C" rmw_service_t * rmw_create_service( { RMW_CHECK_ARGUMENT_FOR_NULL(qos_policies, nullptr); CddsService * info = new CddsService(); + auto cleanup_info = rcpputils::make_scope_exit( + [info]() { + delete (info); + }); rmw_qos_profile_t adapted_qos_policies = rmw_dds_common::qos_profile_update_best_available_for_services(*qos_policies); if ( @@ -5264,51 +5181,82 @@ extern "C" rmw_service_t * rmw_create_service( &info->service, &info->user_callback_data, node, type_supports, service_name, &adapted_qos_policies, true) != RMW_RET_OK) { - delete (info); return nullptr; } + auto cleanup_fini_cs = rcpputils::make_scope_exit( + [info]() { + rmw_fini_cs(&info->service); + }); rmw_service_t * rmw_service = rmw_service_allocate(); - RET_NULL_X(rmw_service, goto fail_service); + if (!rmw_service) { + return nullptr; + } + auto cleanup_service = rcpputils::make_scope_exit( + [rmw_service]() { + rmw_service_free(rmw_service); + }); + auto common = &node->context->impl->common; rmw_service->implementation_identifier = eclipse_cyclonedds_identifier; rmw_service->data = info; rmw_service->service_name = reinterpret_cast(rmw_allocate(strlen(service_name) + 1)); - RET_NULL_X(rmw_service->service_name, goto fail_service_name); + if (!rmw_service->service_name) { + return nullptr; + } + auto cleanup_service_name = rcpputils::make_scope_exit( + [rmw_service]() { + rmw_free(const_cast(rmw_service->service_name)); + }); memcpy(const_cast(rmw_service->service_name), service_name, strlen(service_name) + 1); + // Update graph + if (RMW_RET_OK != common->add_service_graph( + info->service.sub->gid, + info->service.pub->gid, + node->name, node->namespace_)) { - // Update graph - auto common = &node->context->impl->common; - std::lock_guard guard(common->node_update_mutex); - static_cast(common->graph_cache.associate_writer( - info->service.pub->gid, common->gid, - node->name, node->namespace_)); - rmw_dds_common::msg::ParticipantEntitiesInfo msg = - common->graph_cache.associate_reader( - info->service.sub->gid, common->gid, node->name, - node->namespace_); - if (RMW_RET_OK != rmw_publish( - common->pub, - static_cast(&msg), - nullptr)) - { - static_cast(destroy_service(node, rmw_service)); - return nullptr; - } + return nullptr; } + cleanup_service_name.cancel(); + cleanup_service.cancel(); + cleanup_fini_cs.cancel(); + cleanup_info.cancel(); return rmw_service; -fail_service_name: - rmw_service_free(rmw_service); -fail_service: - rmw_fini_cs(&info->service); - delete info; - return nullptr; } extern "C" rmw_ret_t rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) { - return destroy_service(node, service); + RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + node, + node->implementation_identifier, + eclipse_cyclonedds_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + service, + service->implementation_identifier, + eclipse_cyclonedds_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + auto info = static_cast(service->data); + clean_waitset_caches(); + + // Update graph + auto common = &node->context->impl->common; + if (RMW_RET_OK != common->remove_service_graph( + info->service.sub->gid, + info->service.pub->gid, + node->name, node->namespace_)) + { + RMW_SET_ERROR_MSG("failed to publish ParticipantEntitiesInfo when destroying service"); + } + + rmw_fini_cs(&info->service); + delete info; + rmw_free(const_cast(service->service_name)); + rmw_service_free(service); + return RMW_RET_OK; } /////////////////////////////////////////////////////////////////////////////////////////