From 9da5cd06e5a3f2d38ba330307f9f4c34e99b1696 Mon Sep 17 00:00:00 2001 From: eboasson Date: Wed, 17 Jun 2020 21:09:52 +0200 Subject: [PATCH] Lost service responses (#183, #74) (#187) * Block rmw_send_response if response reader unknown The client checks using rmw_service_server_is_available whether the request it sends will be delivered to service, but that does not imply that the (independent, as far as DDS is concerned) response reader of the client has been discovered by the service. Usually that will be the case, but there is no guarantee. Ideally DDS would offer an interface that allows checking the reverse discovery, but that does not yet exist in either the specification or in Cyclone. This commit works around that by delaying publishing the response until the number of request writers matches the number of response readers. Signed-off-by: Erik Boasson * Change request headers to use rmw_request_id_t on the wire Signed-off-by: Erik Boasson * Precise check for matched client/service Assign a unique identifier to each client/service on creation, add it to the USER_DATA QoS of the reader and writer and use it for the request ids. This allows: * rmw_service_server_is_available to only return true once it has discovered a reader/writer pair of a single service (rather than a reader from some service and a writer from some service); and * rmw_send_response to block until it has discovered the requesting client's response reader and to abandon the operation when the client has disappeared. The USER_DATA is formatted in the same manner as the participant USER_DATA, this uses the keys "serviceid" and "clientid". This is still but a workaround for having a mechanism in DDS to ensure that the response reader has been discovered prior by the request writer prior to sending the request. Signed-off-by: Erik Boasson * Address review comments Signed-off-by: Erik Boasson * Backwards compatibility * Revert commit fb040c5db6c05be7698f05969f9bb48b8740f0fe to retain the old wire representation; * Embed the publication_handle of the request inside rmw_request_id_t, possible because reverting to the old wire representation frees up enough space, and use this in rmw_send_response to check for the presence of the client's reader; * Clients and services without a client/service id in the reader/writer user data are treated as fully matched at all times. * Replace ERROR by FAILURE to because of windows.h Signed-off-by: Erik Boasson * Timeout rmw_send_response after waiting 100ms for discovery The discovery will eventually result in the client's reader being known or its writer no longer being known, so a timeout is not necessary for correctness. However, if it ever were to block for a longish time (which is possible in the face of network failures), returning a timeout to the caller is expected to result in less confusion. Signed-off-by: Erik Boasson * Make iterators "const auto &" Signed-off-by: Erik Boasson * Add TODO for eliminating rmw_send_response blocking Signed-off-by: Erik Boasson --- rmw_cyclonedds_cpp/src/rmw_node.cpp | 315 +++++++++++++++++++++++++--- 1 file changed, 286 insertions(+), 29 deletions(-) diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 4b39ed7a..b4189493 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -16,6 +16,8 @@ #include #include #include +#include +#include #include #include #include @@ -75,6 +77,8 @@ #include "serdata.hpp" #include "demangle.hpp" +using namespace std::literals::chrono_literals; + /* Security must be enabled when compiling and requires cyclone to support QOS property lists */ #if DDS_HAS_SECURITY && DDS_HAS_PROPERTY_LIST_QOS #define RMW_SUPPORT_SECURITY 1 @@ -244,6 +248,7 @@ struct rmw_context_impl_t rmw_dds_common::Context common; dds_domainid_t domain_id; dds_entity_t ppant; + rmw_gid_t ppant_gid; /* handles for built-in topic readers */ dds_entity_t rd_participant; @@ -258,8 +263,12 @@ struct rmw_context_impl_t size_t node_count{0}; std::mutex initialization_mutex; + /* suffix for GUIDs to construct unique client/service ids + (protected by initialization_mutex) */ + uint32_t client_service_id; + rmw_context_impl_t() - : common(), domain_id(UINT32_MAX), ppant(0) + : common(), domain_id(UINT32_MAX), ppant(0), client_service_id(0) { /* destructor relies on these being initialized properly */ common.thread_is_running.store(false); @@ -309,10 +318,18 @@ struct CddsSubscription : CddsEntity dds_entity_t rdcondh; }; +struct client_service_id_t +{ + // strangely, the writer_guid in an rmw_request_id_t is smaller than the identifier in + // an rmw_gid_t + uint8_t data[sizeof((reinterpret_cast(0))->writer_guid)]; // NOLINT +}; + struct CddsCS { CddsPublisher * pub; CddsSubscription * sub; + client_service_id_t id; }; struct CddsClient @@ -490,6 +507,32 @@ static void get_entity_gid(dds_entity_t h, rmw_gid_t & gid) convert_guid_to_gid(guid, gid); } +static std::map> parse_user_data(const dds_qos_t * qos) +{ + std::map> map; + void * ud; + size_t udsz; + if (dds_qget_userdata(qos, &ud, &udsz)) { + std::vector udvec(static_cast(ud), static_cast(ud) + udsz); + dds_free(ud); + map = rmw::impl::cpp::parse_key_value(udvec); + } + return map; +} + +static bool get_user_data_key(const dds_qos_t * qos, const std::string key, std::string & value) +{ + if (qos != nullptr) { + auto map = parse_user_data(qos); + auto name_found = map.find(key); + if (name_found != map.end()) { + value = std::string(name_found->second.begin(), name_found->second.end()); + return true; + } + } + return false; +} + static void handle_ParticipantEntitiesInfo(dds_entity_t reader, void * arg) { static_cast(reader); @@ -516,18 +559,9 @@ static void handle_DCPSParticipant(dds_entity_t reader, void * arg) } else if (si.instance_state != DDS_ALIVE_INSTANCE_STATE) { impl->common.graph_cache.remove_participant(gid); } else if (si.valid_data) { - void * ud; - size_t udsz; - if (dds_qget_userdata(s->qos, &ud, &udsz)) { - std::vector udvec(static_cast(ud), static_cast(ud) + udsz); - dds_free(ud); - auto map = rmw::impl::cpp::parse_key_value(udvec); - auto name_found = map.find("enclave"); - if (name_found != map.end()) { - auto enclave = - std::string(name_found->second.begin(), name_found->second.end()); - impl->common.graph_cache.add_participant(gid, enclave); - } + std::string enclave; + if (get_user_data_key(s->qos, "enclave", enclave)) { + impl->common.graph_cache.add_participant(gid, enclave); } } dds_return_loan(reader, &raw, 1); @@ -925,6 +959,7 @@ rmw_context_impl_t::init(rmw_init_options_t * options) "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS participant"); return RMW_RET_ERROR; } + get_entity_gid(this->ppant, this->ppant_gid); /* Create readers for DDS built-in topics for monitoring discovery */ if ((this->rd_participant = @@ -3022,6 +3057,66 @@ extern "C" rmw_ret_t rmw_wait( /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// +using get_matched_endpoints_fn_t = dds_return_t (*)( + dds_entity_t h, + dds_instance_handle_t * xs, size_t nxs); +using BuiltinTopicEndpoint = std::unique_ptr>; + +static rmw_ret_t get_matched_endpoints( + dds_entity_t h, get_matched_endpoints_fn_t fn, std::vector & res) +{ + dds_return_t ret; + if ((ret = fn(h, res.data(), res.size())) < 0) { + return RMW_RET_ERROR; + } + while ((size_t) ret >= res.size()) { + // 128 is a completely arbitrary margin to reduce the risk of having to retry + // when matches are create/deleted in parallel + res.resize((size_t) ret + 128); + if ((ret = fn(h, res.data(), res.size())) < 0) { + return RMW_RET_ERROR; + } + } + res.resize((size_t) ret); + return RMW_RET_OK; +} + +static void free_builtintopic_endpoint(dds_builtintopic_endpoint_t * e) +{ + dds_delete_qos(e->qos); + dds_free(e->topic_name); + dds_free(e->type_name); + dds_free(e); +} + +static BuiltinTopicEndpoint get_matched_subscription_data( + dds_entity_t writer, dds_instance_handle_t readerih) +{ + BuiltinTopicEndpoint ep(dds_get_matched_subscription_data(writer, readerih), + free_builtintopic_endpoint); + return ep; +} + +static BuiltinTopicEndpoint get_matched_publication_data( + dds_entity_t reader, dds_instance_handle_t writerih) +{ + BuiltinTopicEndpoint ep(dds_get_matched_publication_data(reader, writerih), + free_builtintopic_endpoint); + return ep; +} + +static const std::string csid_to_string(const client_service_id_t & id) +{ + std::ostringstream os; + os << std::hex; + os << std::setw(2) << static_cast(id.data[0]); + for (size_t i = 1; i < sizeof(id.data); i++) { + os << "." << static_cast(id.data[i]); + } + return os.str(); +} + static rmw_ret_t rmw_take_response_request( CddsCS * cs, rmw_service_info_t * request_header, void * ros_data, bool * taken, dds_time_t * source_timestamp, @@ -3036,9 +3131,16 @@ static rmw_ret_t rmw_take_response_request( void * wrap_ptr = static_cast(&wrap); while (dds_take(cs->sub->enth, &wrap_ptr, &info, 1, 1) == 1) { if (info.valid_data) { - memset(request_header, 0, sizeof(wrap.header)); - assert(sizeof(wrap.header.guid) <= sizeof(request_header->request_id.writer_guid)); - memcpy(request_header->request_id.writer_guid, &wrap.header.guid, sizeof(wrap.header.guid)); + static_assert( + sizeof(request_header->request_id.writer_guid) == + sizeof(wrap.header.guid) + sizeof(info.publication_handle), + "request header size assumptions not met"); + memcpy( + static_cast(request_header->request_id.writer_guid), + static_cast(&wrap.header.guid), sizeof(wrap.header.guid)); + memcpy( + static_cast(request_header->request_id.writer_guid + sizeof(wrap.header.guid)), + static_cast(&info.publication_handle), sizeof(info.publication_handle)); request_header->request_id.sequence_number = wrap.header.seq; request_header->source_timestamp = info.source_timestamp; // TODO(iluetkeb) replace with real received timestamp when available in cyclone @@ -3110,7 +3212,9 @@ extern "C" rmw_ret_t rmw_take_request( { RET_WRONG_IMPLID(service); auto info = static_cast(service->data); - return rmw_take_response_request(&info->service, request_header, ros_request, taken, nullptr, 0); + return rmw_take_response_request( + &info->service, request_header, ros_request, taken, nullptr, + false); } static rmw_ret_t rmw_send_response_request( @@ -3126,6 +3230,56 @@ static rmw_ret_t rmw_send_response_request( } } +enum class client_present_t +{ + FAILURE, // an error occurred when checking + MAYBE, // reader not matched, writer still present + YES, // reader matched + GONE // neither reader nor writer +}; + +static bool check_client_service_endpoint( + const dds_builtintopic_endpoint_t * ep, + const std::string key, const std::string needle) +{ + if (ep != nullptr) { + std::string clientid; + get_user_data_key(ep->qos, key, clientid); + return clientid == needle; + } + return false; +} + +static client_present_t check_for_response_reader( + const CddsCS & service, + const dds_instance_handle_t reqwrih) +{ + auto reqwr = get_matched_publication_data(service.sub->enth, reqwrih); + std::string clientid; + if (reqwr == nullptr) { + return client_present_t::GONE; + } else if (!get_user_data_key(reqwr->qos, "clientid", clientid)) { + // backwards-compatibility: a client without a client id, assume all is well + return client_present_t::YES; + } else { + // look for this client's reader: if we have matched it, all is well; + // if not, continue waiting + std::vector rds; + if (get_matched_endpoints(service.pub->enth, dds_get_matched_subscriptions, rds) < 0) { + RMW_SET_ERROR_MSG("rmw_send_response: failed to get reader/writer matches"); + return client_present_t::FAILURE; + } + // if we have matched this client's reader, all is well + for (const auto & rdih : rds) { + auto rd = get_matched_subscription_data(service.pub->enth, rdih); + if (check_client_service_endpoint(rd.get(), "clientid", clientid)) { + return client_present_t::YES; + } + } + return client_present_t::MAYBE; + } +} + extern "C" rmw_ret_t rmw_send_response( const rmw_service_t * service, rmw_request_id_t * request_header, void * ros_response) @@ -3135,9 +3289,43 @@ extern "C" rmw_ret_t rmw_send_response( RET_NULL(ros_response); CddsService * info = static_cast(service->data); cdds_request_header_t header; - memcpy(&header.guid, request_header->writer_guid, sizeof(header.guid)); + dds_instance_handle_t reqwrih; + static_assert( + sizeof(request_header->writer_guid) == sizeof(header.guid) + sizeof(reqwrih), + "request header size assumptions not met"); + memcpy( + static_cast(&header.guid), static_cast(request_header->writer_guid), + sizeof(header.guid)); + memcpy( + static_cast(&reqwrih), + static_cast(request_header->writer_guid + sizeof(header.guid)), sizeof(reqwrih)); header.seq = request_header->sequence_number; - return rmw_send_response_request(&info->service, header, ros_response); + // Block until the response reader has been matched by the response writer (this is a + // workaround: rmw_service_server_is_available should keep returning false until this + // is a given). + // TODO(eboasson): rmw_service_server_is_available should block the request instead (#191) + client_present_t st; + std::chrono::system_clock::time_point tnow = std::chrono::system_clock::now(); + std::chrono::system_clock::time_point tend = tnow + 100ms; + while ((st = + check_for_response_reader( + info->service, + reqwrih)) == client_present_t::MAYBE && tnow < tend) + { + dds_sleepfor(DDS_MSECS(10)); + tnow = std::chrono::system_clock::now(); + } + switch (st) { + case client_present_t::FAILURE: + break; + case client_present_t::MAYBE: + return RMW_RET_TIMEOUT; + case client_present_t::YES: + return rmw_send_response_request(&info->service, header, ros_response); + case client_present_t::GONE: + return RMW_RET_OK; + } + return RMW_RET_ERROR; } extern "C" rmw_ret_t rmw_send_request( @@ -3185,6 +3373,31 @@ static const rosidl_service_type_support_t * get_service_typesupport( } } +static void get_unique_csid(const rmw_node_t * node, client_service_id_t & id) +{ + auto impl = node->context->impl; + static_assert( + sizeof(dds_guid_t) <= sizeof(id.data), + "client/service id assumed it can hold a DDSI GUID"); + static_assert( + sizeof(dds_guid_t) <= sizeof((reinterpret_cast(0))->data), + "client/service id assumes rmw_gid_t can hold a DDSI GUID"); + uint32_t x; + + { + std::lock_guard guard(impl->initialization_mutex); + x = ++impl->client_service_id; + } + + // construct id by taking the entity prefix (which is just the first 12 + // bytes of the GID, which itself is just the GUID padded with 0's; then + // overwriting the entity id with the big-endian counter value + memcpy(id.data, impl->ppant_gid.data, 12); + for (size_t i = 0, s = 24; i < 4; i++, s -= 8) { + id.data[12 + i] = static_cast(x >> s); + } +} + static rmw_ret_t rmw_init_cs( CddsCS * cs, const rmw_node_t * node, const rosidl_service_type_support_t * type_supports, @@ -3262,6 +3475,16 @@ static rmw_ret_t rmw_init_cs( } dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1)); dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); + + // store a unique identifier for this client/service in the user + // data of the reader and writer so that we can always determine + // which pairs belong together + get_unique_csid(node, cs->id); + { + std::string user_data = std::string(is_service ? "serviceid=" : "clientid=") + csid_to_string( + cs->id) + std::string(";"); + dds_qset_userdata(qos, user_data.c_str(), user_data.size()); + } if ((pub->enth = dds_create_writer(node->context->impl->dds_pub, pubtopic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create writer"); goto fail_writer; @@ -3599,6 +3822,49 @@ static rmw_ret_t get_topic_name(dds_entity_t endpoint_handle, std::string & name } while (true); } +static rmw_ret_t check_for_service_reader_writer(const CddsCS & client, bool * is_available) +{ + std::vector rds, wrs; + assert(is_available != nullptr && !*is_available); + if (get_matched_endpoints(client.pub->enth, dds_get_matched_subscriptions, rds) < 0 || + get_matched_endpoints(client.sub->enth, dds_get_matched_publications, wrs) < 0) + { + RMW_SET_ERROR_MSG("rmw_service_server_is_available: failed to get reader/writer matches"); + return RMW_RET_ERROR; + } + // first extract all service ids from matched readers + std::set needles; + for (const auto & rdih : rds) { + auto rd = get_matched_subscription_data(client.pub->enth, rdih); + std::string serviceid; + if (rd && get_user_data_key(rd->qos, "serviceid", serviceid)) { + needles.insert(serviceid); + } + } + if (needles.empty()) { + // if no services advertising a serviceid have been matched, but there + // are matched request readers and response writers, then we fall back + // to the old method of simply requiring the existence of matches. + *is_available = !rds.empty() && !wrs.empty(); + } else { + // scan the writers to see if there is at least one response writer + // matching a discovered request reader + for (const auto & wrih : wrs) { + auto wr = get_matched_publication_data(client.sub->enth, wrih); + std::string serviceid; + if (wr && + get_user_data_key( + wr->qos, "serviceid", + serviceid) && needles.find(serviceid) != needles.end()) + { + *is_available = true; + break; + } + } + } + return RMW_RET_OK; +} + extern "C" rmw_ret_t rmw_service_server_is_available( const rmw_node_t * node, const rmw_client_t * client, @@ -3632,16 +3898,7 @@ extern "C" rmw_ret_t rmw_service_server_is_available( if (ret != RMW_RET_OK || 0 == number_of_response_publishers) { return ret; } - dds_publication_matched_status_t ps; - dds_subscription_matched_status_t cs; - if (dds_get_publication_matched_status(info->client.pub->enth, &ps) < 0 || - dds_get_subscription_matched_status(info->client.sub->enth, &cs) < 0) - { - RMW_SET_ERROR_MSG("rmw_service_server_is_available: get_..._matched_status failed"); - return RMW_RET_ERROR; - } - *is_available = ps.current_count > 0 && cs.current_count > 0; - return RMW_RET_OK; + return check_for_service_reader_writer(info->client, is_available); } extern "C" rmw_ret_t rmw_count_publishers(