Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update GraphCache for services & clients along with remaining graph introspection methods #90

Merged
merged 15 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 174 additions & 38 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp

Large diffs are not rendered by default.

33 changes: 32 additions & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@


///=============================================================================
// TODO(Yadunund): Since we reuse pub_count_ and sub_count_ for pub/sub and
// service/client consider more general names for these fields.
struct TopicStats
{
// The count of publishers or clients.
std::size_t pub_count_;

// The count of subscriptions or services.
std::size_t sub_count_;

// Constructor which initializes counters to 0.
Expand Down Expand Up @@ -69,8 +74,15 @@ struct GraphNode
using TopicDataMap = std::unordered_map<std::string, TopicDataPtr>;
// Map topic name to TopicDataMap
using TopicMap = std::unordered_map<std::string, TopicDataMap>;

// Entries for pub/sub.
TopicMap pubs_ = {};
TopicMap subs_ = {};

// Entires for service/client.
TopicMap clients_ = {};
TopicMap services_ = {};

};
using GraphNodePtr = std::shared_ptr<GraphNode>;

Expand All @@ -94,6 +106,10 @@ class GraphCache final
bool no_demangle,
rmw_names_and_types_t * topic_names_and_types) const;

rmw_ret_t get_service_names_and_types(
rcutils_allocator_t * allocator,
rmw_names_and_types_t * service_names_and_types) const;

rmw_ret_t count_publishers(
const char * topic_name,
size_t * count) const;
Expand All @@ -102,6 +118,14 @@ class GraphCache final
const char * topic_name,
size_t * count) const;

rmw_ret_t count_services(
const char * service_name,
size_t * count) const;

rmw_ret_t count_clients(
const char * service_name,
size_t * count) const;

rmw_ret_t get_entity_names_and_types_by_node(
liveliness::EntityType entity_type,
rcutils_allocator_t * allocator,
Expand All @@ -117,6 +141,11 @@ class GraphCache final
bool no_demangle,
rmw_topic_endpoint_info_array_t * endpoints_info) const;

rmw_ret_t service_server_is_available(
const char * service_name,
const char * service_type,
bool * is_available);

private:
/*
namespace_1:
Expand Down Expand Up @@ -146,8 +175,10 @@ class GraphCache final
// Map namespace to a map of <node_name, GraphNodePtr>.
NamespaceMap graph_ = {};

// Optimize topic lookups across the graph.
// Optimize pub/sub lookups across the graph.
GraphNode::TopicMap graph_topics_ = {};
// Optimize service/client lookups across the graph.
GraphNode::TopicMap graph_services_ = {};

mutable std::mutex graph_mutex_;
};
Expand Down
40 changes: 35 additions & 5 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ static const char PUB_STR[] = "MP";
static const char SUB_STR[] = "MS";
static const char SRV_STR[] = "SS";
static const char CLI_STR[] = "SC";
static const char SLASH_REPLACEMENT = '%';

static const std::unordered_map<EntityType, std::string> entity_to_str = {
{EntityType::Node, NODE_STR},
Expand Down Expand Up @@ -149,12 +150,13 @@ Entity::Entity(
token_ss << "/";
}
// Finally append node name.
token_ss << node_info_.name_;
token_ss << mangle_name(node_info_.name_);
// If this entity has a topic info, append it to the token.
if (topic_info_.has_value()) {
const auto & topic_info = this->topic_info_.value();
// Note: We don't append a leading "/" as we expect the ROS topic name to start with a "/".
token_ss << topic_info.name_ + "/" + topic_info.type_ + "/" + topic_info.qos_;
token_ss <<
"/" + mangle_name(topic_info.name_) + "/" + topic_info.type_ + "/" + topic_info.qos_;
}

this->keyexpr_ = token_ss.str();
Expand Down Expand Up @@ -255,15 +257,15 @@ std::optional<Entity> Entity::make(const std::string & keyexpr)
if (entity_it == str_to_entity.end()) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Received liveliness token with invalid entity.");
"Received liveliness token with invalid entity %s.", entity_str.c_str());
return std::nullopt;
}

EntityType entity_type = entity_it->second;
std::size_t domain_id = std::stoul(parts[1]);
std::string & id = parts[2];
std::string ns = parts[4] == "_" ? "/" : "/" + std::move(parts[4]);
std::string node_name = std::move(parts[5]);
std::string node_name = demangle_name(std::move(parts[5]));
std::optional<TopicInfo> topic_info = std::nullopt;

// Populate topic_info if we have a token for an entity other than a node.
Expand All @@ -275,7 +277,7 @@ std::optional<Entity> Entity::make(const std::string & keyexpr)
return std::nullopt;
}
topic_info = TopicInfo{
"/" + std::move(parts[6]),
demangle_name(std::move(parts[6])),
std::move(parts[7]),
std::move(parts[8])};
}
Expand Down Expand Up @@ -387,4 +389,32 @@ bool PublishToken::del(
return true;
}

///=============================================================================
std::string mangle_name(const std::string & input)
{
std::string output = "";
for (std::size_t i = 0; i < input.length(); ++i) {
if (input[i] == '/') {
output += SLASH_REPLACEMENT;
} else {
output += input[i];
}
}
return output;
}

///=============================================================================
std::string demangle_name(const std::string & input)
{
std::string output = "";
for (std::size_t i = 0; i < input.length(); ++i) {
if (input[i] == SLASH_REPLACEMENT) {
output += '/';
} else {
output += input[i];
}
}
return output;
}

} // namespace liveliness
6 changes: 6 additions & 0 deletions rmw_zenoh_cpp/src/detail/liveliness_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ class PublishToken
const std::string & token);
};

/// Replace "/" instances with "%".
std::string mangle_name(const std::string & input);

/// Replace "%" instances with "/".
std::string demangle_name(const std::string & input);

} // namespace liveliness

#endif // DETAIL__LIVELINESS_UTILS_HPP_
7 changes: 6 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ struct rmw_service_data_t
z_owned_keyexpr_t keyexpr;
z_owned_queryable_t qable;

// Liveliness token for the service.
zc_owned_liveliness_token_t token;

const void * request_type_support_impl;
const void * response_type_support_impl;
const char * typesupport_identifier;
Expand All @@ -171,9 +174,11 @@ struct rmw_service_data_t
struct rmw_client_data_t
{
z_owned_keyexpr_t keyexpr;

z_owned_closure_reply_t zn_closure_reply;

// Liveliness token for the client.
zc_owned_liveliness_token_t token;

std::mutex message_mutex;
std::deque<z_owned_reply_t> replies;

Expand Down
14 changes: 10 additions & 4 deletions rmw_zenoh_cpp/src/rmw_get_service_names_and_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "detail/rmw_data_types.hpp"

#include "rmw/error_handling.h"
#include "rmw/get_service_names_and_types.h"

extern "C"
Expand All @@ -25,9 +27,13 @@ rmw_get_service_names_and_types(
rcutils_allocator_t * allocator,
rmw_names_and_types_t * service_names_and_types)
{
static_cast<void>(node);
static_cast<void>(allocator);
static_cast<void>(service_names_and_types);
return RMW_RET_UNSUPPORTED;
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(node->context->impl, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(service_names_and_types, RMW_RET_INVALID_ARGUMENT);

return node->context->impl->graph_cache.get_service_names_and_types(
allocator, service_names_and_types);
}
} // extern "C"
4 changes: 0 additions & 4 deletions rmw_zenoh_cpp/src/rmw_get_topic_names_and_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@

#include "detail/rmw_data_types.hpp"

#include "rcutils/strdup.h"

#include "rmw/error_handling.h"
#include "rmw/get_topic_names_and_types.h"

#include "rcpputils/scope_exit.hpp"

extern "C"
{
///==============================================================================
Expand Down
7 changes: 6 additions & 1 deletion rmw_zenoh_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,12 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
"Sending Query '%s' to fetch discovery data...",
liveliness_str.c_str()
);
z_owned_reply_channel_t channel = zc_reply_fifo_new(16);
// We create a blocking channel that is unbounded, ie. `bound` = 0, to receive
// replies for the zc_liveliness_get() call. This is necessary as if the `bound`
// is too low, the channel may starve the zenoh executor of its threads which
// would lead to deadlocks when trying to receive replies and block the
// execution here.
z_owned_reply_channel_t channel = zc_reply_fifo_new(0);
zc_liveliness_get(
z_loan(context->impl->session), z_keyexpr(liveliness_str.c_str()),
z_move(channel.send), NULL);
Expand Down
Loading
Loading