Skip to content

Commit

Permalink
Zero copy api (#864)
Browse files Browse the repository at this point in the history
* loaned message

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

wip

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

loaned message

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* use publisher allocator

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* use unique_ptr inside LoanedMessage

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* can_loan_messages for subscription_base

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* use correct rcl_publish function

* default move constructor not allowed in gcc

Signed-off-by: Knese Karsten (CR/RTC-HMI4) <karsten.knese@us.bosch.com>

*  remove get_instance and make LoanedMessage constructor public

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* some more api doc

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* rebase ontop of master

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* warn when not being able to loan message

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* first draft loaned_message_sequence

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* rmw_take_loaned_message

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* address review comments

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* introduce rmw_publish_loaned_message

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* const correct publish

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* placement new

Signed-off-by: Knese Karsten <karsten@openrobotics.org>

* disable deleter for callback

Signed-off-by: Knese Karsten <karsten@openrobotics.org>

* print info once

Signed-off-by: Knese Karsten <karsten@openrobotics.org>

* uncrustify

Signed-off-by: Karsten Knese <karsten@openrobotics.org>
  • Loading branch information
Karsten1987 committed Oct 18, 2019
1 parent 658f207 commit ed0bd16
Show file tree
Hide file tree
Showing 10 changed files with 460 additions and 8 deletions.
7 changes: 7 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@ if(BUILD_TESTING)
"rosidl_typesupport_cpp"
)
endif()

ament_add_gtest(test_loaned_message test/test_loaned_message.cpp)
ament_target_dependencies(test_loaned_message
"test_msgs"
)
target_link_libraries(test_loaned_message ${PROJECT_NAME})

ament_add_gtest(test_node test/test_node.cpp)
if(TARGET test_node)
ament_target_dependencies(test_node
Expand Down
206 changes: 206 additions & 0 deletions rclcpp/include/rclcpp/loaned_message.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__LOANED_MESSAGE_HPP_
#define RCLCPP__LOANED_MESSAGE_HPP_

#include <memory>
#include <utility>

#include "rclcpp/logging.hpp"
#include "rclcpp/publisher_base.hpp"

#include "rcl/allocator.h"
#include "rcl/publisher.h"

namespace rclcpp
{

template<typename MessageT, typename AllocatorT = std::allocator<void>>
class LoanedMessage
{
using MessageAllocatorTraits = allocator::AllocRebind<MessageT, AllocatorT>;
using MessageAllocator = typename MessageAllocatorTraits::allocator_type;

public:
/// Constructor of the LoanedMessage class.
/**
* The constructor of this class allocates memory for a given message type
* and associates this with a given publisher.
*
* Given the publisher instance, a case differentiation is being performaned
* which decides whether the underlying middleware is able to allocate the appropriate
* memory for this message type or not.
* In the case that the middleware can not loan messages, the passed in allocator instance
* is being used to allocate the message within the scope of this class.
* Otherwise, the allocator is being ignored and the allocation is solely performaned
* in the underlying middleware with its appropriate allocation strategy.
* The need for this arises as the user code can be written explicitly targeting a middleware
* capable of loaning messages.
* However, this user code is ought to be usable even when dynamically linked against
* a middleware which doesn't support message loaning in which case the allocator will be used.
*
* \param pub rclcpp::Publisher instance to which the memory belongs
* \param allocator Allocator instance in case middleware can not allocate messages
*/
LoanedMessage(
const rclcpp::PublisherBase & pub,
std::allocator<MessageT> allocator)
: pub_(pub),
message_(nullptr),
message_allocator_(std::move(allocator))
{
if (pub_.can_loan_messages()) {
void * message_ptr = nullptr;
auto ret = rcl_borrow_loaned_message(
pub_.get_publisher_handle(),
rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
&message_ptr);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
message_ = static_cast<MessageT *>(message_ptr);
} else {
RCLCPP_INFO_ONCE(
rclcpp::get_logger("rclcpp"),
"Currently used middleware can't loan messages. Local allocator will be used.");
message_ = message_allocator_.allocate(1);
new (message_) MessageT();
}
}

/// Constructor of the LoanedMessage class.
/**
* The constructor of this class allocates memory for a given message type
* and associates this with a given publisher.
*
* Given the publisher instance, a case differentiation is being performaned
* which decides whether the underlying middleware is able to allocate the appropriate
* memory for this message type or not.
* In the case that the middleware can not loan messages, the passed in allocator instance
* is being used to allocate the message within the scope of this class.
* Otherwise, the allocator is being ignored and the allocation is solely performaned
* in the underlying middleware with its appropriate allocation strategy.
* The need for this arises as the user code can be written explicitly targeting a middleware
* capable of loaning messages.
* However, this user code is ought to be usable even when dynamically linked against
* a middleware which doesn't support message loaning in which case the allocator will be used.
*
* \param pub rclcpp::Publisher instance to which the memory belongs
* \param allocator Allocator instance in case middleware can not allocate messages
*/
LoanedMessage(
const rclcpp::PublisherBase * pub,
std::shared_ptr<std::allocator<MessageT>> allocator)
: LoanedMessage(*pub, *allocator)
{}

/// Move semantic for RVO
LoanedMessage(LoanedMessage<MessageT> && other)
: pub_(std::move(other.pub_)),
message_(std::move(other.message_)),
message_allocator_(std::move(other.message_allocator_))
{}

/// Destructor of the LoanedMessage class.
/**
* The destructor has the explicit task to return the allocated memory for its message
* instance.
* If the message was previously allocated via the middleware, the message is getting
* returned to the middleware to cleanly destroy the allocation.
* In the case that the local allocator instance was used, the same instance is then
* being used to destroy the allocated memory.
*
* The contract here is that the memory for this message is valid as long as this instance
* of the LoanedMessage class is alive.
*/
virtual ~LoanedMessage()
{
auto error_logger = rclcpp::get_logger("LoanedMessage");

if (message_ == nullptr) {
return;
}

if (pub_.can_loan_messages()) {
// return allocated memory to the middleware
auto ret =
rcl_return_loaned_message(pub_.get_publisher_handle(), message_);
if (ret != RCL_RET_OK) {
RCLCPP_ERROR(
error_logger, "rcl_deallocate_loaned_message failed: %s", rcl_get_error_string().str);
rcl_reset_error();
}
} else {
// call destructor before deallocating
message_->~MessageT();
message_allocator_.deallocate(message_, 1);
}
message_ = nullptr;
}

/// Validate if the message was correctly allocated.
/**
* The allocated memory might not be always consistent and valid.
* Reasons why this could fail is that an allocation step was failing,
* e.g. just like malloc could fail or a maximum amount of previously allocated
* messages is exceeded in which case the loaned messages have to be returned
* to the middleware prior to be able to allocate a new one.
*/
bool is_valid() const
{
return message_ != nullptr;
}

/// Access the ROS message instance.
/**
* A call to `get()` will return a mutable reference to the underlying ROS message instance.
* This allows a user to modify the content of the message prior to publishing it.
*
* If this reference is copied, the memory for this copy is no longer managed
* by the LoanedMessage instance and has to be cleanup individually.
*/
MessageT & get() const
{
return *message_;
}

/// Release ownership of the ROS message instance.
/**
* A call to `release()` will unmanage the memory for the ROS message.
* That means that the destructor of this class will not free the memory on scope exit.
*
* \return Raw pointer to the message instance.
*/
MessageT * release()
{
auto msg = message_;
message_ = nullptr;
return msg;
}

protected:
const rclcpp::PublisherBase & pub_;

MessageT * message_;

MessageAllocator message_allocator_;

/// Deleted copy constructor to preserve memory integrity.
LoanedMessage(const LoanedMessage<MessageT> & other) = delete;
};

} // namespace rclcpp

#endif // RCLCPP__LOANED_MESSAGE_HPP_
80 changes: 80 additions & 0 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/detail/resolve_use_intra_process.hpp"
#include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/loaned_message.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/publisher_base.hpp"
Expand All @@ -44,6 +45,9 @@
namespace rclcpp
{

template<typename MessageT, typename AllocatorT>
class LoanedMessage;

/// A publisher publishes messages of any type to a topic.
template<typename MessageT, typename AllocatorT = std::allocator<void>>
class Publisher : public PublisherBase
Expand Down Expand Up @@ -132,6 +136,26 @@ class Publisher : public PublisherBase
>::make_shared(size, this->get_allocator());
}

/// Loan memory for a ROS message from the middleware
/**
* If the middleware is capable of loaning memory for a ROS message instance,
* the loaned message will be directly allocated in the middleware.
* If not, the message allocator of this rclcpp::Publisher instance is being used.
*
* With a call to \sa `publish` the LoanedMessage instance is being returned to the middleware
* or free'd accordingly to the allocator.
* If the message is not being published but processed differently, the message has to be
* returned by calling \sa `return_loaned_message`.
* \sa rclcpp::LoanedMessage for details of the LoanedMessage class.
*
* \return LoanedMessage containing memory for a ROS message of type MessageT
*/
rclcpp::LoanedMessage<MessageT, AllocatorT>
loan_message()
{
return rclcpp::LoanedMessage<MessageT, AllocatorT>(this, this->get_allocator());
}

/// Send a message to the topic for this publisher.
/**
* This function is templated on the input message type, MessageT.
Expand Down Expand Up @@ -191,6 +215,41 @@ class Publisher : public PublisherBase
return this->do_serialized_publish(&serialized_msg);
}

/// Publish an instance of a LoanedMessage
/**
* When publishing a loaned message, the memory for this ROS instance will be deallocated
* after being published.
* The instance of the loaned message is no longer valid after this call.
*
* \param loaned_msg The LoanedMessage instance to be published.
*/
void
publish(rclcpp::LoanedMessage<MessageT, AllocatorT> && loaned_msg)
{
if (!loaned_msg.is_valid()) {
throw std::runtime_error("loaned message is not valid");
}
if (intra_process_is_enabled_) {
// TODO(Karsten1987): support loaned message passed by intraprocess
throw std::runtime_error("storing loaned messages in intra process is not supported yet");
}

// verify that publisher supports loaned messages
// TODO(Karsten1987): This case separation has to be done in rclcpp
// otherwise we have to ensure that every middleware implements
// `rmw_publish_loaned_message` explicitly the same way as `rmw_publish`
// by taking a copy of the ros message.
if (this->can_loan_messages()) {
// we release the ownership from the rclpp::LoanedMessage instance
// and let the middleware clean up the memory.
this->do_loaned_message_publish(loaned_msg.release());
} else {
// we don't release the ownership, let the middleware copy the ros message
// and thus the destructor of rclcpp::LoanedMessage cleans up the memory.
this->do_inter_process_publish(&loaned_msg.get());
}
}

std::shared_ptr<MessageAllocator>
get_allocator() const
{
Expand All @@ -202,6 +261,7 @@ class Publisher : public PublisherBase
do_inter_process_publish(const MessageT * msg)
{
auto status = rcl_publish(&publisher_handle_, msg, nullptr);

if (RCL_RET_PUBLISHER_INVALID == status) {
rcl_reset_error(); // next call will reset error message if not context
if (rcl_publisher_is_valid_except_context(&publisher_handle_)) {
Expand Down Expand Up @@ -252,6 +312,26 @@ class Publisher : public PublisherBase
}
}

void
do_loaned_message_publish(MessageT * msg)
{
auto status = rcl_publish_loaned_message(&publisher_handle_, msg, nullptr);

if (RCL_RET_PUBLISHER_INVALID == status) {
rcl_reset_error(); // next call will reset error message if not context
if (rcl_publisher_is_valid_except_context(&publisher_handle_)) {
rcl_context_t * context = rcl_publisher_get_context(&publisher_handle_);
if (nullptr != context && !rcl_context_is_valid(context)) {
// publisher is invalid due to context being shutdown
return;
}
}
}
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message");
}
}

uint64_t
store_intra_process_message(
uint64_t publisher_id,
Expand Down
9 changes: 9 additions & 0 deletions rclcpp/include/rclcpp/publisher_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,15 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
rclcpp::QoS
get_actual_qos() const;

/// Check if publisher instance can loan messages.
/**
* Depending on the middleware and the message type, this will return true if the middleware
* can allocate a ROS message instance.
*/
RCLCPP_PUBLIC
bool
can_loan_messages() const;

/// Compare this publisher to a gid.
/**
* Note that this function calls the next function.
Expand Down
Loading

0 comments on commit ed0bd16

Please sign in to comment.