Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added count matching api and intra-process subscriber count #628

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,16 @@ if(BUILD_TESTING)
)
target_link_libraries(test_publisher ${PROJECT_NAME})
endif()
ament_add_gtest(test_publisher_subscription_count_api test/test_publisher_subscription_count_api.cpp)
if(TARGET test_publisher_subscription_count_api)
target_include_directories(test_publisher_subscription_count_api PUBLIC
${rcl_interfaces_INCLUDE_DIRS}
${rmw_INCLUDE_DIRS}
${rosidl_generator_cpp_INCLUDE_DIRS}
${rosidl_typesupport_cpp_INCLUDE_DIRS}
)
target_link_libraries(test_publisher_subscription_count_api ${PROJECT_NAME})
endif()
ament_add_gtest(test_rate test/test_rate.cpp)
if(TARGET test_rate)
target_include_directories(test_rate PUBLIC
Expand Down Expand Up @@ -290,6 +300,16 @@ if(BUILD_TESTING)
)
target_link_libraries(test_subscription ${PROJECT_NAME})
endif()
ament_add_gtest(test_subscription_publisher_count_api test/test_subscription_publisher_count_api.cpp)
if(TARGET test_subscription_publisher_count_api)
target_include_directories(test_subscription_publisher_count_api PUBLIC
${rcl_interfaces_INCLUDE_DIRS}
${rmw_INCLUDE_DIRS}
${rosidl_generator_cpp_INCLUDE_DIRS}
${rosidl_typesupport_cpp_INCLUDE_DIRS}
)
target_link_libraries(test_subscription_publisher_count_api ${PROJECT_NAME})
endif()
find_package(test_msgs REQUIRED)
ament_add_gtest(test_subscription_traits test/test_subscription_traits.cpp)
if(TARGET test_subscription_traits)
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/include/rclcpp/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,11 @@ class IntraProcessManager
bool
matches_any_publishers(const rmw_gid_t * id) const;

/// Return the number of intraprocess subscriptions to a topic, given the publisher id.
RCLCPP_PUBLIC
size_t
get_subscription_count(uint64_t intra_process_publisher_id) const;

private:
RCLCPP_PUBLIC
static uint64_t
Expand Down
23 changes: 23 additions & 0 deletions rclcpp/include/rclcpp/intra_process_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ class IntraProcessManagerImplBase
virtual bool
matches_any_publishers(const rmw_gid_t * id) const = 0;

virtual size_t
get_subscription_count(uint64_t intra_process_publisher_id) const = 0;

private:
RCLCPP_DISABLE_COPY(IntraProcessManagerImplBase)
};
Expand Down Expand Up @@ -248,6 +251,26 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase
return false;
}

size_t
get_subscription_count(uint64_t intra_process_publisher_id) const
{
auto publisher_it = publishers_.find(intra_process_publisher_id);
if (publisher_it == publishers_.end()) {
// Publisher is either invalid or no longer exists.
return 0;
}
auto publisher = publisher_it->second.publisher.lock();
if (!publisher) {
throw std::runtime_error("publisher has unexpectedly gone out of scope");
}
auto sub_map_it = subscription_ids_by_topic_.find(publisher->get_topic_name());
if (sub_map_it == subscription_ids_by_topic_.end()) {
// No intraprocess subscribers
return 0;
}
return sub_map_it->second.size();
}

private:
RCLCPP_DISABLE_COPY(IntraProcessManagerImpl)

Expand Down
30 changes: 29 additions & 1 deletion rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ namespace node_interfaces
class NodeTopicsInterface;
}

namespace intra_process_manager
{
/**
* NOTE(ivanpauno): IntraProcessManager is forward declared here, avoiding a circular inclusion between intra_process_manager.hpp and publisher.hpp.
* SharedPtr and WeakPtr of the IntraProcessManager are defined again here, to avoid a warning for accessing a member of a forward declared class.
*/
class IntraProcessManager;
}

class PublisherBase
{
friend ::rclcpp::node_interfaces::NodeTopicsInterface;
Expand Down Expand Up @@ -107,6 +116,18 @@ class PublisherBase
const rcl_publisher_t *
get_publisher_handle() const;

/// Get subscription count
/** \return The number of subscriptions. */
RCLCPP_PUBLIC
size_t
get_subscription_count() const;

/// Get intraprocess subscription count
/** \return The number of intraprocess subscriptions. */
RCLCPP_PUBLIC
size_t
get_intra_process_subscription_count() const;

/// Compare this publisher to a gid.
/**
* Note that this function calls the next function.
Expand All @@ -128,13 +149,16 @@ class PublisherBase
operator==(const rmw_gid_t * gid) const;

using StoreMessageCallbackT = std::function<uint64_t(uint64_t, void *, const std::type_info &)>;
using IntraProcessManagerSharedPtr =
std::shared_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
hidmic marked this conversation as resolved.
Show resolved Hide resolved

/// Implementation utility function used to setup intra process publishing after creation.
RCLCPP_PUBLIC
void
setup_intra_process(
uint64_t intra_process_publisher_id,
StoreMessageCallbackT callback,
StoreMessageCallbackT store_callback,
IntraProcessManagerSharedPtr ipm,
const rcl_publisher_options_t & intra_process_options);

protected:
Expand All @@ -143,6 +167,10 @@ class PublisherBase
rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher();
rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher();

using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
hidmic marked this conversation as resolved.
Show resolved Hide resolved
bool use_intra_process_;
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_publisher_id_;
StoreMessageCallbackT store_intra_process_message_;

Expand Down
27 changes: 26 additions & 1 deletion rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ namespace node_interfaces
class NodeTopicsInterface;
} // namespace node_interfaces

namespace intra_process_manager
{
/**
* NOTE(ivanpauno): IntraProcessManager is forward declared here, avoiding a circular inclusion between intra_process_manager.hpp and publisher.hpp.
* SharedPtr and WeakPtr of the IntraProcessManager are defined again here, to avoid a warning for accessing a member of a forward declared class.
*/
class IntraProcessManager;
}

/// Virtual base class for subscriptions. This pattern allows us to iterate over different template
/// specializations of Subscription, among other things.
class SubscriptionBase
Expand Down Expand Up @@ -129,11 +138,23 @@ class SubscriptionBase
bool
is_serialized() const;

/// Get matching publisher count
/** \return The number of publishers on this topic. */
RCLCPP_PUBLIC
size_t
get_publisher_count() const;

protected:
std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_;
std::shared_ptr<rcl_subscription_t> subscription_handle_;
std::shared_ptr<rcl_node_t> node_handle_;

using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
bool use_intra_process_;
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_subscription_id_;

private:
RCLCPP_DISABLE_COPY(SubscriptionBase)

Expand Down Expand Up @@ -272,10 +293,13 @@ class Subscription : public SubscriptionBase
using MatchesAnyPublishersCallbackType = std::function<bool (const rmw_gid_t *)>;

/// Implemenation detail.
// TODO(ivanpauno): This can be moved to the base class. No reason to be here.
// Also get_intra_process_message_callback_ and matches_any_intra_process_publishers_.
void setup_intra_process(
uint64_t intra_process_subscription_id,
GetMessageCallbackType get_message_callback,
MatchesAnyPublishersCallbackType matches_any_publisher_callback,
IntraProcessManagerWeakPtr weak_ipm,
const rcl_subscription_options_t & intra_process_options)
{
std::string intra_process_topic_name = std::string(get_topic_name()) + "/_intra";
Expand All @@ -302,6 +326,8 @@ class Subscription : public SubscriptionBase
intra_process_subscription_id_ = intra_process_subscription_id;
get_intra_process_message_callback_ = get_message_callback;
matches_any_intra_process_publishers_ = matches_any_publisher_callback;
weak_ipm_ = weak_ipm;
use_intra_process_ = true;
}

/// Implemenation detail.
Expand All @@ -323,7 +349,6 @@ class Subscription : public SubscriptionBase

GetMessageCallbackType get_intra_process_message_callback_;
MatchesAnyPublishersCallbackType matches_any_intra_process_publishers_;
uint64_t intra_process_subscription_id_;
};

} // namespace rclcpp
Expand Down
1 change: 1 addition & 0 deletions rclcpp/include/rclcpp/subscription_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ create_subscription_factory(
intra_process_subscription_id,
take_intra_process_message_func,
matches_any_publisher_func,
weak_ipm,
intra_process_options
);
};
Expand Down
6 changes: 6 additions & 0 deletions rclcpp/src/rclcpp/intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const
return impl_->matches_any_publishers(id);
}

size_t
IntraProcessManager::get_subscription_count(uint64_t intra_process_publisher_id) const
{
return impl_->get_subscription_count(intra_process_publisher_id);
}

uint64_t
IntraProcessManager::get_next_unique_id()
{
Expand Down
1 change: 1 addition & 0 deletions rclcpp/src/rclcpp/node_interfaces/node_topics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ NodeTopics::create_publisher(
publisher->setup_intra_process(
intra_process_publisher_id,
shared_publish_callback,
ipm,
publisher_options);
}

Expand Down
69 changes: 66 additions & 3 deletions rclcpp/src/rclcpp/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
Expand All @@ -42,7 +43,8 @@ PublisherBase::PublisherBase(
const rosidl_message_type_support_t & type_support,
const rcl_publisher_options_t & publisher_options)
: rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
intra_process_publisher_id_(0), store_intra_process_message_(nullptr)
use_intra_process_(false), intra_process_publisher_id_(0),
store_intra_process_message_(nullptr)
{
rcl_ret_t ret = rcl_publisher_init(
&publisher_handle_,
Expand Down Expand Up @@ -94,6 +96,20 @@ PublisherBase::~PublisherBase()
rcl_get_error_string().str);
rcl_reset_error();
}

auto ipm = weak_ipm_.lock();

if (!use_intra_process_) {
return;
}
if (!ipm) {
// TODO(ivanpauno): should this raise an error?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivanpauno well, the publisher isn't necessarily using intra-process communication. We need another flag to know whether this should log an error or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is called in the destructor, it could happens that the ipm have already gone out of the scope first (I'm not sure, but it sounds reasonable). So, I don't think rising an error is a good idea.
If we want to raise an error, we need an extra flag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivanpauno Looks like here we should be using the use_intra_process_ flag as well to know whether this should be treated as an error or not.

RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Intra process manager died before than a publisher.");
return;
}
ipm->remove_publisher(intra_process_publisher_id_);
}

const char *
Expand Down Expand Up @@ -138,6 +154,49 @@ PublisherBase::get_publisher_handle() const
return &publisher_handle_;
}

size_t
PublisherBase::get_subscription_count() const
{
size_t inter_process_subscription_count = 0;

rmw_ret_t status = rcl_publisher_get_subscription_count(
&publisher_handle_,
&inter_process_subscription_count);

if (RCL_RET_PUBLISHER_INVALID == status) {
rcl_reset_error(); /* next call will reset error message if not context */
if (rcl_publisher_is_valid_except_context(&publisher_handle_)) {
rcl_context_t * context = rcl_publisher_get_context(&publisher_handle_);
if (nullptr != context && !rcl_context_is_valid(context)) {
/* publisher is invalid due to context being shutdown */
return 0;
}
}
}
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to get get subscription count");
}
return inter_process_subscription_count;
}

size_t
PublisherBase::get_intra_process_subscription_count() const
{
auto ipm = weak_ipm_.lock();
if (!use_intra_process_) {
return 0;
}
if (!ipm) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivanpauno I'm not sure this is the right thing to do. Correct me if I'm wrong, but wouldn't ipm be nullptr if the publisher was not setup to use intra-process communication? Even if we think that throwing is better than returning 0, the exception message may be misleading.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes right, I will check again. Maybe adding an extra flag as commented before.

// TODO(ivanpauno): should this just return silently? Or maybe return with a warning?
// Same as wjwwood comment in publisher_factory create_shared_publish_callback.
// If we don't raise an error here, use_intra_process_ flag is unnecessary.
throw std::runtime_error(
"intra process subscriber count called after "
"destruction of intra process manager");
}
return ipm->get_subscription_count(intra_process_publisher_id_);
}

bool
PublisherBase::operator==(const rmw_gid_t & gid) const
{
Expand Down Expand Up @@ -168,7 +227,8 @@ PublisherBase::operator==(const rmw_gid_t * gid) const
void
PublisherBase::setup_intra_process(
uint64_t intra_process_publisher_id,
StoreMessageCallbackT callback,
StoreMessageCallbackT store_callback,
IntraProcessManagerSharedPtr ipm,
const rcl_publisher_options_t & intra_process_options)
{
const char * topic_name = this->get_topic_name();
Expand Down Expand Up @@ -199,7 +259,10 @@ PublisherBase::setup_intra_process(
}

intra_process_publisher_id_ = intra_process_publisher_id;
store_intra_process_message_ = callback;
store_intra_process_message_ = store_callback;
weak_ipm_ = ipm;
use_intra_process_ = true;

// Life time of this object is tied to the publisher handle.
rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle(
&intra_process_publisher_handle_);
Expand Down
Loading