diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 4b39ed7a..b4189493 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -16,6 +16,8 @@ #include #include #include +#include +#include #include #include #include @@ -75,6 +77,8 @@ #include "serdata.hpp" #include "demangle.hpp" +using namespace std::literals::chrono_literals; + /* Security must be enabled when compiling and requires cyclone to support QOS property lists */ #if DDS_HAS_SECURITY && DDS_HAS_PROPERTY_LIST_QOS #define RMW_SUPPORT_SECURITY 1 @@ -244,6 +248,7 @@ struct rmw_context_impl_t rmw_dds_common::Context common; dds_domainid_t domain_id; dds_entity_t ppant; + rmw_gid_t ppant_gid; /* handles for built-in topic readers */ dds_entity_t rd_participant; @@ -258,8 +263,12 @@ struct rmw_context_impl_t size_t node_count{0}; std::mutex initialization_mutex; + /* suffix for GUIDs to construct unique client/service ids + (protected by initialization_mutex) */ + uint32_t client_service_id; + rmw_context_impl_t() - : common(), domain_id(UINT32_MAX), ppant(0) + : common(), domain_id(UINT32_MAX), ppant(0), client_service_id(0) { /* destructor relies on these being initialized properly */ common.thread_is_running.store(false); @@ -309,10 +318,18 @@ struct CddsSubscription : CddsEntity dds_entity_t rdcondh; }; +struct client_service_id_t +{ + // strangely, the writer_guid in an rmw_request_id_t is smaller than the identifier in + // an rmw_gid_t + uint8_t data[sizeof((reinterpret_cast(0))->writer_guid)]; // NOLINT +}; + struct CddsCS { CddsPublisher * pub; CddsSubscription * sub; + client_service_id_t id; }; struct CddsClient @@ -490,6 +507,32 @@ static void get_entity_gid(dds_entity_t h, rmw_gid_t & gid) convert_guid_to_gid(guid, gid); } +static std::map> parse_user_data(const dds_qos_t * qos) +{ + std::map> map; + void * ud; + size_t udsz; + if (dds_qget_userdata(qos, &ud, &udsz)) { + std::vector udvec(static_cast(ud), static_cast(ud) + udsz); + dds_free(ud); + map = rmw::impl::cpp::parse_key_value(udvec); + } + return map; +} + +static bool get_user_data_key(const dds_qos_t * qos, const std::string key, std::string & value) +{ + if (qos != nullptr) { + auto map = parse_user_data(qos); + auto name_found = map.find(key); + if (name_found != map.end()) { + value = std::string(name_found->second.begin(), name_found->second.end()); + return true; + } + } + return false; +} + static void handle_ParticipantEntitiesInfo(dds_entity_t reader, void * arg) { static_cast(reader); @@ -516,18 +559,9 @@ static void handle_DCPSParticipant(dds_entity_t reader, void * arg) } else if (si.instance_state != DDS_ALIVE_INSTANCE_STATE) { impl->common.graph_cache.remove_participant(gid); } else if (si.valid_data) { - void * ud; - size_t udsz; - if (dds_qget_userdata(s->qos, &ud, &udsz)) { - std::vector udvec(static_cast(ud), static_cast(ud) + udsz); - dds_free(ud); - auto map = rmw::impl::cpp::parse_key_value(udvec); - auto name_found = map.find("enclave"); - if (name_found != map.end()) { - auto enclave = - std::string(name_found->second.begin(), name_found->second.end()); - impl->common.graph_cache.add_participant(gid, enclave); - } + std::string enclave; + if (get_user_data_key(s->qos, "enclave", enclave)) { + impl->common.graph_cache.add_participant(gid, enclave); } } dds_return_loan(reader, &raw, 1); @@ -925,6 +959,7 @@ rmw_context_impl_t::init(rmw_init_options_t * options) "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS participant"); return RMW_RET_ERROR; } + get_entity_gid(this->ppant, this->ppant_gid); /* Create readers for DDS built-in topics for monitoring discovery */ if ((this->rd_participant = @@ -3022,6 +3057,66 @@ extern "C" rmw_ret_t rmw_wait( /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// +using get_matched_endpoints_fn_t = dds_return_t (*)( + dds_entity_t h, + dds_instance_handle_t * xs, size_t nxs); +using BuiltinTopicEndpoint = std::unique_ptr>; + +static rmw_ret_t get_matched_endpoints( + dds_entity_t h, get_matched_endpoints_fn_t fn, std::vector & res) +{ + dds_return_t ret; + if ((ret = fn(h, res.data(), res.size())) < 0) { + return RMW_RET_ERROR; + } + while ((size_t) ret >= res.size()) { + // 128 is a completely arbitrary margin to reduce the risk of having to retry + // when matches are create/deleted in parallel + res.resize((size_t) ret + 128); + if ((ret = fn(h, res.data(), res.size())) < 0) { + return RMW_RET_ERROR; + } + } + res.resize((size_t) ret); + return RMW_RET_OK; +} + +static void free_builtintopic_endpoint(dds_builtintopic_endpoint_t * e) +{ + dds_delete_qos(e->qos); + dds_free(e->topic_name); + dds_free(e->type_name); + dds_free(e); +} + +static BuiltinTopicEndpoint get_matched_subscription_data( + dds_entity_t writer, dds_instance_handle_t readerih) +{ + BuiltinTopicEndpoint ep(dds_get_matched_subscription_data(writer, readerih), + free_builtintopic_endpoint); + return ep; +} + +static BuiltinTopicEndpoint get_matched_publication_data( + dds_entity_t reader, dds_instance_handle_t writerih) +{ + BuiltinTopicEndpoint ep(dds_get_matched_publication_data(reader, writerih), + free_builtintopic_endpoint); + return ep; +} + +static const std::string csid_to_string(const client_service_id_t & id) +{ + std::ostringstream os; + os << std::hex; + os << std::setw(2) << static_cast(id.data[0]); + for (size_t i = 1; i < sizeof(id.data); i++) { + os << "." << static_cast(id.data[i]); + } + return os.str(); +} + static rmw_ret_t rmw_take_response_request( CddsCS * cs, rmw_service_info_t * request_header, void * ros_data, bool * taken, dds_time_t * source_timestamp, @@ -3036,9 +3131,16 @@ static rmw_ret_t rmw_take_response_request( void * wrap_ptr = static_cast(&wrap); while (dds_take(cs->sub->enth, &wrap_ptr, &info, 1, 1) == 1) { if (info.valid_data) { - memset(request_header, 0, sizeof(wrap.header)); - assert(sizeof(wrap.header.guid) <= sizeof(request_header->request_id.writer_guid)); - memcpy(request_header->request_id.writer_guid, &wrap.header.guid, sizeof(wrap.header.guid)); + static_assert( + sizeof(request_header->request_id.writer_guid) == + sizeof(wrap.header.guid) + sizeof(info.publication_handle), + "request header size assumptions not met"); + memcpy( + static_cast(request_header->request_id.writer_guid), + static_cast(&wrap.header.guid), sizeof(wrap.header.guid)); + memcpy( + static_cast(request_header->request_id.writer_guid + sizeof(wrap.header.guid)), + static_cast(&info.publication_handle), sizeof(info.publication_handle)); request_header->request_id.sequence_number = wrap.header.seq; request_header->source_timestamp = info.source_timestamp; // TODO(iluetkeb) replace with real received timestamp when available in cyclone @@ -3110,7 +3212,9 @@ extern "C" rmw_ret_t rmw_take_request( { RET_WRONG_IMPLID(service); auto info = static_cast(service->data); - return rmw_take_response_request(&info->service, request_header, ros_request, taken, nullptr, 0); + return rmw_take_response_request( + &info->service, request_header, ros_request, taken, nullptr, + false); } static rmw_ret_t rmw_send_response_request( @@ -3126,6 +3230,56 @@ static rmw_ret_t rmw_send_response_request( } } +enum class client_present_t +{ + FAILURE, // an error occurred when checking + MAYBE, // reader not matched, writer still present + YES, // reader matched + GONE // neither reader nor writer +}; + +static bool check_client_service_endpoint( + const dds_builtintopic_endpoint_t * ep, + const std::string key, const std::string needle) +{ + if (ep != nullptr) { + std::string clientid; + get_user_data_key(ep->qos, key, clientid); + return clientid == needle; + } + return false; +} + +static client_present_t check_for_response_reader( + const CddsCS & service, + const dds_instance_handle_t reqwrih) +{ + auto reqwr = get_matched_publication_data(service.sub->enth, reqwrih); + std::string clientid; + if (reqwr == nullptr) { + return client_present_t::GONE; + } else if (!get_user_data_key(reqwr->qos, "clientid", clientid)) { + // backwards-compatibility: a client without a client id, assume all is well + return client_present_t::YES; + } else { + // look for this client's reader: if we have matched it, all is well; + // if not, continue waiting + std::vector rds; + if (get_matched_endpoints(service.pub->enth, dds_get_matched_subscriptions, rds) < 0) { + RMW_SET_ERROR_MSG("rmw_send_response: failed to get reader/writer matches"); + return client_present_t::FAILURE; + } + // if we have matched this client's reader, all is well + for (const auto & rdih : rds) { + auto rd = get_matched_subscription_data(service.pub->enth, rdih); + if (check_client_service_endpoint(rd.get(), "clientid", clientid)) { + return client_present_t::YES; + } + } + return client_present_t::MAYBE; + } +} + extern "C" rmw_ret_t rmw_send_response( const rmw_service_t * service, rmw_request_id_t * request_header, void * ros_response) @@ -3135,9 +3289,43 @@ extern "C" rmw_ret_t rmw_send_response( RET_NULL(ros_response); CddsService * info = static_cast(service->data); cdds_request_header_t header; - memcpy(&header.guid, request_header->writer_guid, sizeof(header.guid)); + dds_instance_handle_t reqwrih; + static_assert( + sizeof(request_header->writer_guid) == sizeof(header.guid) + sizeof(reqwrih), + "request header size assumptions not met"); + memcpy( + static_cast(&header.guid), static_cast(request_header->writer_guid), + sizeof(header.guid)); + memcpy( + static_cast(&reqwrih), + static_cast(request_header->writer_guid + sizeof(header.guid)), sizeof(reqwrih)); header.seq = request_header->sequence_number; - return rmw_send_response_request(&info->service, header, ros_response); + // Block until the response reader has been matched by the response writer (this is a + // workaround: rmw_service_server_is_available should keep returning false until this + // is a given). + // TODO(eboasson): rmw_service_server_is_available should block the request instead (#191) + client_present_t st; + std::chrono::system_clock::time_point tnow = std::chrono::system_clock::now(); + std::chrono::system_clock::time_point tend = tnow + 100ms; + while ((st = + check_for_response_reader( + info->service, + reqwrih)) == client_present_t::MAYBE && tnow < tend) + { + dds_sleepfor(DDS_MSECS(10)); + tnow = std::chrono::system_clock::now(); + } + switch (st) { + case client_present_t::FAILURE: + break; + case client_present_t::MAYBE: + return RMW_RET_TIMEOUT; + case client_present_t::YES: + return rmw_send_response_request(&info->service, header, ros_response); + case client_present_t::GONE: + return RMW_RET_OK; + } + return RMW_RET_ERROR; } extern "C" rmw_ret_t rmw_send_request( @@ -3185,6 +3373,31 @@ static const rosidl_service_type_support_t * get_service_typesupport( } } +static void get_unique_csid(const rmw_node_t * node, client_service_id_t & id) +{ + auto impl = node->context->impl; + static_assert( + sizeof(dds_guid_t) <= sizeof(id.data), + "client/service id assumed it can hold a DDSI GUID"); + static_assert( + sizeof(dds_guid_t) <= sizeof((reinterpret_cast(0))->data), + "client/service id assumes rmw_gid_t can hold a DDSI GUID"); + uint32_t x; + + { + std::lock_guard guard(impl->initialization_mutex); + x = ++impl->client_service_id; + } + + // construct id by taking the entity prefix (which is just the first 12 + // bytes of the GID, which itself is just the GUID padded with 0's; then + // overwriting the entity id with the big-endian counter value + memcpy(id.data, impl->ppant_gid.data, 12); + for (size_t i = 0, s = 24; i < 4; i++, s -= 8) { + id.data[12 + i] = static_cast(x >> s); + } +} + static rmw_ret_t rmw_init_cs( CddsCS * cs, const rmw_node_t * node, const rosidl_service_type_support_t * type_supports, @@ -3262,6 +3475,16 @@ static rmw_ret_t rmw_init_cs( } dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1)); dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); + + // store a unique identifier for this client/service in the user + // data of the reader and writer so that we can always determine + // which pairs belong together + get_unique_csid(node, cs->id); + { + std::string user_data = std::string(is_service ? "serviceid=" : "clientid=") + csid_to_string( + cs->id) + std::string(";"); + dds_qset_userdata(qos, user_data.c_str(), user_data.size()); + } if ((pub->enth = dds_create_writer(node->context->impl->dds_pub, pubtopic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create writer"); goto fail_writer; @@ -3599,6 +3822,49 @@ static rmw_ret_t get_topic_name(dds_entity_t endpoint_handle, std::string & name } while (true); } +static rmw_ret_t check_for_service_reader_writer(const CddsCS & client, bool * is_available) +{ + std::vector rds, wrs; + assert(is_available != nullptr && !*is_available); + if (get_matched_endpoints(client.pub->enth, dds_get_matched_subscriptions, rds) < 0 || + get_matched_endpoints(client.sub->enth, dds_get_matched_publications, wrs) < 0) + { + RMW_SET_ERROR_MSG("rmw_service_server_is_available: failed to get reader/writer matches"); + return RMW_RET_ERROR; + } + // first extract all service ids from matched readers + std::set needles; + for (const auto & rdih : rds) { + auto rd = get_matched_subscription_data(client.pub->enth, rdih); + std::string serviceid; + if (rd && get_user_data_key(rd->qos, "serviceid", serviceid)) { + needles.insert(serviceid); + } + } + if (needles.empty()) { + // if no services advertising a serviceid have been matched, but there + // are matched request readers and response writers, then we fall back + // to the old method of simply requiring the existence of matches. + *is_available = !rds.empty() && !wrs.empty(); + } else { + // scan the writers to see if there is at least one response writer + // matching a discovered request reader + for (const auto & wrih : wrs) { + auto wr = get_matched_publication_data(client.sub->enth, wrih); + std::string serviceid; + if (wr && + get_user_data_key( + wr->qos, "serviceid", + serviceid) && needles.find(serviceid) != needles.end()) + { + *is_available = true; + break; + } + } + } + return RMW_RET_OK; +} + extern "C" rmw_ret_t rmw_service_server_is_available( const rmw_node_t * node, const rmw_client_t * client, @@ -3632,16 +3898,7 @@ extern "C" rmw_ret_t rmw_service_server_is_available( if (ret != RMW_RET_OK || 0 == number_of_response_publishers) { return ret; } - dds_publication_matched_status_t ps; - dds_subscription_matched_status_t cs; - if (dds_get_publication_matched_status(info->client.pub->enth, &ps) < 0 || - dds_get_subscription_matched_status(info->client.sub->enth, &cs) < 0) - { - RMW_SET_ERROR_MSG("rmw_service_server_is_available: get_..._matched_status failed"); - return RMW_RET_ERROR; - } - *is_available = ps.current_count > 0 && cs.current_count > 0; - return RMW_RET_OK; + return check_for_service_reader_writer(info->client, is_available); } extern "C" rmw_ret_t rmw_count_publishers(