Skip to content

Commit

Permalink
Merge branch 'rolling' into francocipollone/service_support_via_query…
Browse files Browse the repository at this point in the history
…able
  • Loading branch information
Yadunund committed Dec 28, 2023
2 parents e181435 + 10955d7 commit 336cd24
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 14 deletions.
108 changes: 108 additions & 0 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -683,3 +683,111 @@ rmw_ret_t GraphCache::get_entity_names_and_types_by_node(

return RMW_RET_OK;
}


///=============================================================================
rmw_ret_t GraphCache::get_entities_info_by_topic(
liveliness::EntityType entity_type,
rcutils_allocator_t * allocator,
const char * topic_name,
bool no_demangle,
rmw_topic_endpoint_info_array_t * endpoints_info) const
{
static_cast<void>(no_demangle);
RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, RMW_RET_INVALID_ARGUMENT);
RCUTILS_CHECK_ALLOCATOR_WITH_MSG(
allocator, "allocator argument is invalid", return RMW_RET_INVALID_ARGUMENT);

if (entity_type != EntityType::Publisher && entity_type != EntityType::Subscription) {
return RMW_RET_INVALID_ARGUMENT;
}

std::lock_guard<std::mutex> lock(graph_mutex_);

// Minor optimization to exit early if the topic does not exist in the graph.
if (graph_topics_.find(topic_name) == graph_topics_.end()) {
return RMW_RET_OK;
}
// TODO(Yadunund): Refactor graph_topics_ to map to a list of GraphNodePtr to
// avoid this expensive iteration.
std::size_t size = 0;
std::vector<GraphNodePtr> nodes = {};
for (NamespaceMap::const_iterator ns_it = graph_.begin(); ns_it != graph_.end(); ++ns_it) {
for (NodeMap::const_iterator node_it = ns_it->second.begin(); node_it != ns_it->second.end();
++node_it)
{
const GraphNode::TopicMap & entity_map =
entity_type == EntityType::Publisher ? node_it->second->pubs_ :
node_it->second->subs_;
GraphNode::TopicMap::const_iterator topic_it = entity_map.find(topic_name);
if (topic_it != entity_map.end()) {
nodes.push_back(node_it->second);
size += topic_it->second.size();
}
}
}


rmw_ret_t ret = rmw_topic_endpoint_info_array_init_with_size(
endpoints_info,
nodes.size(),
allocator);
if (RMW_RET_OK != ret) {
return ret;
}

auto cleanup_endpoints_info = rcpputils::make_scope_exit(
[endpoints_info, allocator] {
rmw_ret_t fail_ret = rmw_topic_endpoint_info_array_fini(
endpoints_info, allocator);
if (fail_ret != RMW_RET_OK) {
RMW_SAFE_FWRITE_TO_STDERR("failed to cleanup endpoints info during error handling");
}
});

for (std::size_t i = 0; i < nodes.size(); ++i) {
const GraphNode::TopicMap & entity_map =
entity_type == EntityType::Publisher ? nodes[i]->pubs_ :
nodes[i]->subs_;
const GraphNode::TopicDataMap & topic_data_map = entity_map.find(topic_name)->second;
for (const auto & [topic_data, _] : topic_data_map) {
rmw_topic_endpoint_info_t & endpoint_info = endpoints_info->info_array[i];
endpoint_info = rmw_get_zero_initialized_topic_endpoint_info();

ret = rmw_topic_endpoint_info_set_node_name(
&endpoint_info,
nodes[i]->name_.c_str(),
allocator);
if (RMW_RET_OK != ret) {
return ret;
}

ret = rmw_topic_endpoint_info_set_node_namespace(
&endpoint_info,
nodes[i]->ns_.c_str(),
allocator);
if (RMW_RET_OK != ret) {
return ret;
}

ret = rmw_topic_endpoint_info_set_topic_type(
&endpoint_info,
_demangle_if_ros_type(topic_data).c_str(),
allocator);
if (RMW_RET_OK != ret) {
return ret;
}

ret = rmw_topic_endpoint_info_set_endpoint_type(
&endpoint_info,
entity_type == EntityType::Publisher ? RMW_ENDPOINT_PUBLISHER : RMW_ENDPOINT_SUBSCRIPTION);
if (RMW_RET_OK != ret) {
return ret;
}
// TODO(Yadunund): Set type_hash, qos_profile, gid.
}
}

cleanup_endpoints_info.cancel();
return RMW_RET_OK;
}
8 changes: 8 additions & 0 deletions rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "rcutils/types.h"

#include "rmw/rmw.h"
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/names_and_types.h"


Expand Down Expand Up @@ -108,6 +109,13 @@ class GraphCache final
bool no_demangle,
rmw_names_and_types_t * names_and_types) const;

rmw_ret_t get_entities_info_by_topic(
liveliness::EntityType entity_type,
rcutils_allocator_t * allocator,
const char * topic_name,
bool no_demangle,
rmw_topic_endpoint_info_array_t * endpoints_info) const;

private:
/*
namespace_1:
Expand Down
45 changes: 33 additions & 12 deletions rmw_zenoh_cpp/src/rmw_get_topic_endpoint_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
// limitations under the License.


#include "detail/identifier.hpp"
#include "detail/liveliness_utils.hpp"
#include "detail/rmw_data_types.hpp"

#include "rmw/get_topic_endpoint_info.h"
#include "rmw/impl/cpp/macros.hpp"

extern "C"
{
Expand All @@ -27,12 +32,20 @@ rmw_get_publishers_info_by_topic(
bool no_mangle,
rmw_topic_endpoint_info_array_t * publishers_info)
{
static_cast<void>(node);
static_cast<void>(allocator);
static_cast<void>(topic_name);
static_cast<void>(no_mangle);
static_cast<void>(publishers_info);
return RMW_RET_OK;
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT);
return node->context->impl->graph_cache.get_entities_info_by_topic(
liveliness::EntityType::Publisher,
allocator,
topic_name,
no_mangle,
publishers_info);
}

///==============================================================================
Expand All @@ -45,11 +58,19 @@ rmw_get_subscriptions_info_by_topic(
bool no_mangle,
rmw_topic_endpoint_info_array_t * subscriptions_info)
{
static_cast<void>(node);
static_cast<void>(allocator);
static_cast<void>(topic_name);
static_cast<void>(no_mangle);
static_cast<void>(subscriptions_info);
return RMW_RET_OK;
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
rmw_zenoh_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT);
return node->context->impl->graph_cache.get_entities_info_by_topic(
liveliness::EntityType::Subscription,
allocator,
topic_name,
no_mangle,
subscriptions_info);
}
} // extern "C"
2 changes: 1 addition & 1 deletion zenoh_c_vendor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ find_package(ament_cmake_vendor_package REQUIRED)

ament_vendor(zenoh_c_vendor
VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git
VCS_VERSION master
VCS_VERSION 0.10.1-rc
)

# set(INSTALL_DIR "${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}-prefix/install")
Expand Down
2 changes: 1 addition & 1 deletion zenohd_vendor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ ExternalProject_Add(
zenoh
PREFIX ${CMAKE_BINARY_DIR}/_cargo_deps
GIT_REPOSITORY https://github.com/eclipse-zenoh/zenoh.git
GIT_TAG master
GIT_TAG 0.10.1-rc
BUILD_COMMAND cargo build --package zenohd --release
CONFIGURE_COMMAND "" # Disable the configure step
INSTALL_COMMAND ""
Expand Down

0 comments on commit 336cd24

Please sign in to comment.