Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend the lifetime of rcl_publisher_t by holding onto the shared pointer. #1119

Merged
merged 3 commits into from
May 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rclcpp/include/rclcpp/loaned_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class LoanedMessage
if (pub_.can_loan_messages()) {
void * message_ptr = nullptr;
auto ret = rcl_borrow_loaned_message(
pub_.get_publisher_handle(),
pub_.get_publisher_handle().get(),
rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
&message_ptr);
if (RCL_RET_OK != ret) {
Expand Down Expand Up @@ -137,7 +137,7 @@ class LoanedMessage
if (pub_.can_loan_messages()) {
// return allocated memory to the middleware
auto ret =
rcl_return_loaned_message_from_publisher(pub_.get_publisher_handle(), message_);
rcl_return_loaned_message_from_publisher(pub_.get_publisher_handle().get(), message_);
if (ret != RCL_RET_OK) {
RCLCPP_ERROR(
error_logger, "rcl_deallocate_loaned_message failed: %s", rcl_get_error_string().str);
Expand Down
14 changes: 7 additions & 7 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,12 @@ class Publisher : public PublisherBase
void
do_inter_process_publish(const MessageT & msg)
{
auto status = rcl_publish(&publisher_handle_, &msg, nullptr);
auto status = rcl_publish(publisher_handle_.get(), &msg, nullptr);

if (RCL_RET_PUBLISHER_INVALID == status) {
rcl_reset_error(); // next call will reset error message if not context
if (rcl_publisher_is_valid_except_context(&publisher_handle_)) {
rcl_context_t * context = rcl_publisher_get_context(&publisher_handle_);
if (rcl_publisher_is_valid_except_context(publisher_handle_.get())) {
rcl_context_t * context = rcl_publisher_get_context(publisher_handle_.get());
if (nullptr != context && !rcl_context_is_valid(context)) {
// publisher is invalid due to context being shutdown
return;
Expand All @@ -303,7 +303,7 @@ class Publisher : public PublisherBase
// TODO(Karsten1987): support serialized message passed by intraprocess
throw std::runtime_error("storing serialized messages in intra process is not supported yet");
}
auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg, nullptr);
auto status = rcl_publish_serialized_message(publisher_handle_.get(), serialized_msg, nullptr);
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
}
Expand All @@ -312,12 +312,12 @@ class Publisher : public PublisherBase
void
do_loaned_message_publish(MessageT * msg)
{
auto status = rcl_publish_loaned_message(&publisher_handle_, msg, nullptr);
auto status = rcl_publish_loaned_message(publisher_handle_.get(), msg, nullptr);

if (RCL_RET_PUBLISHER_INVALID == status) {
rcl_reset_error(); // next call will reset error message if not context
if (rcl_publisher_is_valid_except_context(&publisher_handle_)) {
rcl_context_t * context = rcl_publisher_get_context(&publisher_handle_);
if (rcl_publisher_is_valid_except_context(publisher_handle_.get())) {
rcl_context_t * context = rcl_publisher_get_context(publisher_handle_.get());
if (nullptr != context && !rcl_context_is_valid(context)) {
// publisher is invalid due to context being shutdown
return;
Expand Down
11 changes: 6 additions & 5 deletions rclcpp/include/rclcpp/publisher_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
/// Get the rcl publisher handle.
/** \return The rcl publisher handle. */
RCLCPP_PUBLIC
rcl_publisher_t *
std::shared_ptr<rcl_publisher_t>
get_publisher_handle();

/// Get the rcl publisher handle.
/** \return The rcl publisher handle. */
RCLCPP_PUBLIC
const rcl_publisher_t *
std::shared_ptr<const rcl_publisher_t>
get_publisher_handle() const;

/// Get all the QoS event handlers associated with this publisher.
Expand Down Expand Up @@ -200,10 +200,11 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
const EventCallbackT & callback,
const rcl_publisher_event_type_t event_type)
{
auto handler = std::make_shared<QOSEventHandler<EventCallbackT>>(
auto handler = std::make_shared<QOSEventHandler<EventCallbackT,
std::shared_ptr<rcl_publisher_t>>>(
callback,
rcl_publisher_event_init,
&publisher_handle_,
publisher_handle_,
event_type);
event_handlers_.emplace_back(handler);
}
Expand All @@ -213,7 +214,7 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>

std::shared_ptr<rcl_node_t> rcl_node_handle_;

rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher();
std::shared_ptr<rcl_publisher_t> publisher_handle_;

std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> event_handlers_;

Expand Down
8 changes: 5 additions & 3 deletions rclcpp/include/rclcpp/qos_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,21 @@ class QOSEventHandlerBase : public Waitable
size_t wait_set_event_index_;
};

template<typename EventCallbackT>
template<typename EventCallbackT, typename ParentHandleT>
class QOSEventHandler : public QOSEventHandlerBase
{
public:
template<typename InitFuncT, typename ParentHandleT, typename EventTypeEnum>
template<typename InitFuncT, typename EventTypeEnum>
QOSEventHandler(
const EventCallbackT & callback,
InitFuncT init_func,
ParentHandleT parent_handle,
EventTypeEnum event_type)
: event_callback_(callback)
{
parent_handle_ = parent_handle;
event_handle_ = rcl_get_zero_initialized_event();
rcl_ret_t ret = init_func(&event_handle_, parent_handle, event_type);
rcl_ret_t ret = init_func(&event_handle_, parent_handle.get(), event_type);
if (ret != RCL_RET_OK) {
if (ret == RCL_RET_UNSUPPORTED) {
UnsupportedEventTypeException exc(ret, rcl_get_error_state(), "Failed to initialize event");
Expand Down Expand Up @@ -148,6 +149,7 @@ class QOSEventHandler : public QOSEventHandlerBase
using EventCallbackInfoT = typename std::remove_reference<typename
rclcpp::function_traits::function_traits<EventCallbackT>::template argument_type<0>>::type;

ParentHandleT parent_handle_;
EventCallbackT event_callback_;
};

Expand Down
5 changes: 3 additions & 2 deletions rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,11 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
const EventCallbackT & callback,
const rcl_subscription_event_type_t event_type)
{
auto handler = std::make_shared<QOSEventHandler<EventCallbackT>>(
auto handler = std::make_shared<QOSEventHandler<EventCallbackT,
std::shared_ptr<rcl_subscription_t>>>(
callback,
rcl_subscription_event_init,
get_subscription_handle().get(),
get_subscription_handle(),
event_type);
qos_events_in_use_by_wait_set_.insert(std::make_pair(handler.get(), false));
event_handlers_.emplace_back(handler);
Expand Down
3 changes: 3 additions & 0 deletions rclcpp/include/rclcpp/waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class Waitable
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Waitable)

RCLCPP_PUBLIC
virtual ~Waitable() = default;

/// Get the number of ready subscriptions
/**
* Returns a value of 0 by default.
Expand Down
55 changes: 32 additions & 23 deletions rclcpp/src/rclcpp/publisher_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,24 @@ PublisherBase::PublisherBase(
: rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
intra_process_is_enabled_(false), intra_process_publisher_id_(0)
{
auto custom_deleter = [node_handle = this->rcl_node_handle_](rcl_publisher_t * rcl_pub)
{
if (rcl_publisher_fini(rcl_pub, node_handle.get()) != RCL_RET_OK) {
RCLCPP_ERROR(
rclcpp::get_node_logger(node_handle.get()).get_child("rclcpp"),
"Error in destruction of rcl publisher handle: %s",
rcl_get_error_string().str);
rcl_reset_error();
}
delete rcl_pub;
};

publisher_handle_ = std::shared_ptr<rcl_publisher_t>(
new rcl_publisher_t, custom_deleter);
*publisher_handle_.get() = rcl_get_zero_initialized_publisher();

rcl_ret_t ret = rcl_publisher_init(
&publisher_handle_,
publisher_handle_.get(),
rcl_node_handle_.get(),
&type_support,
topic.c_str(),
Expand All @@ -67,7 +83,7 @@ PublisherBase::PublisherBase(
rclcpp::exceptions::throw_from_rcl_error(ret, "could not create publisher");
}
// Life time of this object is tied to the publisher handle.
rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle(&publisher_handle_);
rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle(publisher_handle_.get());
if (!publisher_rmw_handle) {
auto msg = std::string("failed to get rmw handle: ") + rcl_get_error_string().str;
rcl_reset_error();
Expand All @@ -85,14 +101,6 @@ PublisherBase::~PublisherBase()
// must fini the events before fini-ing the publisher
event_handlers_.clear();

if (rcl_publisher_fini(&publisher_handle_, rcl_node_handle_.get()) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Error in destruction of rcl publisher handle: %s",
rcl_get_error_string().str);
rcl_reset_error();
}

auto ipm = weak_ipm_.lock();

if (!intra_process_is_enabled_) {
Expand All @@ -102,7 +110,7 @@ PublisherBase::~PublisherBase()
// TODO(ivanpauno): should this raise an error?
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Intra process manager died before than a publisher.");
"Intra process manager died before a publisher.");
return;
}
ipm->remove_publisher(intra_process_publisher_id_);
Expand All @@ -111,13 +119,14 @@ PublisherBase::~PublisherBase()
const char *
PublisherBase::get_topic_name() const
{
return rcl_publisher_get_topic_name(&publisher_handle_);
return rcl_publisher_get_topic_name(publisher_handle_.get());
}

size_t
PublisherBase::get_queue_size() const
{
const rcl_publisher_options_t * publisher_options = rcl_publisher_get_options(&publisher_handle_);
const rcl_publisher_options_t * publisher_options = rcl_publisher_get_options(
publisher_handle_.get());
if (!publisher_options) {
auto msg = std::string("failed to get publisher options: ") + rcl_get_error_string().str;
rcl_reset_error();
Expand All @@ -132,16 +141,16 @@ PublisherBase::get_gid() const
return rmw_gid_;
}

rcl_publisher_t *
std::shared_ptr<rcl_publisher_t>
PublisherBase::get_publisher_handle()
{
return &publisher_handle_;
return publisher_handle_;
}

const rcl_publisher_t *
std::shared_ptr<const rcl_publisher_t>
PublisherBase::get_publisher_handle() const
{
return &publisher_handle_;
return publisher_handle_;
}

const std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
Expand All @@ -156,13 +165,13 @@ PublisherBase::get_subscription_count() const
size_t inter_process_subscription_count = 0;

rcl_ret_t status = rcl_publisher_get_subscription_count(
&publisher_handle_,
publisher_handle_.get(),
&inter_process_subscription_count);

if (RCL_RET_PUBLISHER_INVALID == status) {
rcl_reset_error(); /* next call will reset error message if not context */
if (rcl_publisher_is_valid_except_context(&publisher_handle_)) {
rcl_context_t * context = rcl_publisher_get_context(&publisher_handle_);
if (rcl_publisher_is_valid_except_context(publisher_handle_.get())) {
rcl_context_t * context = rcl_publisher_get_context(publisher_handle_.get());
if (nullptr != context && !rcl_context_is_valid(context)) {
/* publisher is invalid due to context being shutdown */
return 0;
Expand Down Expand Up @@ -195,7 +204,7 @@ PublisherBase::get_intra_process_subscription_count() const
rclcpp::QoS
PublisherBase::get_actual_qos() const
{
const rmw_qos_profile_t * qos = rcl_publisher_get_actual_qos(&publisher_handle_);
const rmw_qos_profile_t * qos = rcl_publisher_get_actual_qos(publisher_handle_.get());
if (!qos) {
auto msg = std::string("failed to get qos settings: ") + rcl_get_error_string().str;
rcl_reset_error();
Expand All @@ -208,13 +217,13 @@ PublisherBase::get_actual_qos() const
bool
PublisherBase::assert_liveliness() const
{
return RCL_RET_OK == rcl_publisher_assert_liveliness(&publisher_handle_);
return RCL_RET_OK == rcl_publisher_assert_liveliness(publisher_handle_.get());
}

bool
PublisherBase::can_loan_messages() const
{
return rcl_publisher_can_loan_messages(&publisher_handle_);
return rcl_publisher_can_loan_messages(publisher_handle_.get());
}

bool
Expand Down