Skip to content

Commit

Permalink
New IntraProcessManager capable of storing shared_ptr<const T> (#690)
Browse files Browse the repository at this point in the history
* Changed mapped_ring_buffer class to store both shared_ptr or unique_ptr

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Changed the IPM store and take methods

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Changed publish methods to take advantage of the new IPM

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Change how subscriptions handle intraprocess messages

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Modified publish method signatures

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Renamed 'publisher.cpp' and 'subscription.cpp' to 'publisher_base.cpp' and 'subscription_base.cpp'

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>

* Updated lifecycle_publisher publish methods

Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>
  • Loading branch information
ivanpauno committed Apr 30, 2019
1 parent d34fa60 commit 98f610c
Show file tree
Hide file tree
Showing 18 changed files with 1,093 additions and 765 deletions.
4 changes: 2 additions & 2 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/parameter_events_filter.cpp
src/rclcpp/parameter_map.cpp
src/rclcpp/parameter_service.cpp
src/rclcpp/publisher.cpp
src/rclcpp/publisher_base.cpp
src/rclcpp/service.cpp
src/rclcpp/signal_handler.cpp
src/rclcpp/subscription.cpp
src/rclcpp/subscription_base.cpp
src/rclcpp/time.cpp
src/rclcpp/time_source.cpp
src/rclcpp/timer.cpp
Expand Down
38 changes: 29 additions & 9 deletions rclcpp/include/rclcpp/any_subscription_callback.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class AnySubscriptionCallback
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, MessageT>;
using ConstMessageSharedPtr = std::shared_ptr<const MessageT>;
using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;

using SharedPtrCallback = std::function<void (const std::shared_ptr<MessageT>)>;
Expand Down Expand Up @@ -154,7 +155,6 @@ class AnySubscriptionCallback
void dispatch(
std::shared_ptr<MessageT> message, const rmw_message_info_t & message_info)
{
(void)message_info;
if (shared_ptr_callback_) {
shared_ptr_callback_(message);
} else if (shared_ptr_with_info_callback_) {
Expand All @@ -177,30 +177,50 @@ class AnySubscriptionCallback
}

void dispatch_intra_process(
MessageUniquePtr & message, const rmw_message_info_t & message_info)
ConstMessageSharedPtr message, const rmw_message_info_t & message_info)
{
if (const_shared_ptr_callback_) {
const_shared_ptr_callback_(message);
} else if (const_shared_ptr_with_info_callback_) {
const_shared_ptr_with_info_callback_(message, message_info);
} else {
if (unique_ptr_callback_ || unique_ptr_with_info_callback_ ||
shared_ptr_callback_ || shared_ptr_with_info_callback_)
{
throw std::runtime_error("unexpected dispatch_intra_process const shared "
"message call with no const shared_ptr callback");
} else {
throw std::runtime_error("unexpected message without any callback set");
}
}
}

void dispatch_intra_process(
MessageUniquePtr message, const rmw_message_info_t & message_info)
{
(void)message_info;
if (shared_ptr_callback_) {
typename std::shared_ptr<MessageT> shared_message = std::move(message);
shared_ptr_callback_(shared_message);
} else if (shared_ptr_with_info_callback_) {
typename std::shared_ptr<MessageT> shared_message = std::move(message);
shared_ptr_with_info_callback_(shared_message, message_info);
} else if (const_shared_ptr_callback_) {
typename std::shared_ptr<MessageT const> const_shared_message = std::move(message);
const_shared_ptr_callback_(const_shared_message);
} else if (const_shared_ptr_with_info_callback_) {
typename std::shared_ptr<MessageT const> const_shared_message = std::move(message);
const_shared_ptr_with_info_callback_(const_shared_message, message_info);
} else if (unique_ptr_callback_) {
unique_ptr_callback_(std::move(message));
} else if (unique_ptr_with_info_callback_) {
unique_ptr_with_info_callback_(std::move(message), message_info);
} else if (const_shared_ptr_callback_ || const_shared_ptr_with_info_callback_) {
throw std::runtime_error("unexpected dispatch_intra_process unique message call"
" with const shared_ptr callback");
} else {
throw std::runtime_error("unexpected message without any callback set");
}
}

bool use_take_shared_method()
{
return const_shared_ptr_callback_ || const_shared_ptr_with_info_callback_;
}

private:
std::shared_ptr<MessageAlloc> message_allocator_;
MessageDeleter message_deleter_;
Expand Down
92 changes: 73 additions & 19 deletions rclcpp/include/rclcpp/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
#include <memory>
#include <mutex>
#include <unordered_map>
#include <utility>
#include <set>

#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/intra_process_manager_impl.hpp"
#include "rclcpp/mapped_ring_buffer.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/subscription.hpp"
#include "rclcpp/publisher_base.hpp"
#include "rclcpp/subscription_base.hpp"
#include "rclcpp/visibility_control.hpp"

namespace rclcpp
Expand Down Expand Up @@ -184,21 +185,11 @@ class IntraProcessManager
* \param buffer_size if 0 (default) a size is calculated based on the QoS.
* \return an unsigned 64-bit integer which is the publisher's unique id.
*/
template<typename MessageT, typename Alloc>
RCLCPP_PUBLIC
uint64_t
add_publisher(
typename Publisher<MessageT, Alloc>::SharedPtr publisher,
size_t buffer_size = 0)
{
auto id = IntraProcessManager::get_next_unique_id();
size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size();
auto mrb = mapped_ring_buffer::MappedRingBuffer<
MessageT,
typename Publisher<MessageT, Alloc>::MessageAlloc
>::make_shared(size, publisher->get_allocator());
impl_->add_publisher(id, publisher, mrb, size);
return id;
}
rclcpp::PublisherBase::SharedPtr publisher,
size_t buffer_size = 0);

/// Unregister a publisher using the publisher's unique id.
/**
Expand Down Expand Up @@ -241,13 +232,41 @@ class IntraProcessManager
* \param message the message that is being stored.
* \return the message sequence number.
*/
template<
typename MessageT, typename Alloc = std::allocator<void>>
uint64_t
store_intra_process_message(
uint64_t intra_process_publisher_id,
std::shared_ptr<const MessageT> message)
{
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc>;
uint64_t message_seq = 0;
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id(
intra_process_publisher_id, message_seq);
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
if (!typed_buffer) {
throw std::runtime_error("Typecast failed due to incorrect message type");
}

// Insert the message into the ring buffer using the message_seq to identify it.
bool did_replace = typed_buffer->push_and_replace(message_seq, message);
// TODO(wjwwood): do something when a message was displaced. log debug?
(void)did_replace; // Avoid unused variable warning.

impl_->store_intra_process_message(intra_process_publisher_id, message_seq);

// Return the message sequence which is sent to the subscription.
return message_seq;
}

template<
typename MessageT, typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>>
uint64_t
store_intra_process_message(
uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT, Deleter> & message)
std::unique_ptr<MessageT, Deleter> message)
{
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc>;
Expand All @@ -260,7 +279,7 @@ class IntraProcessManager
}

// Insert the message into the ring buffer using the message_seq to identify it.
bool did_replace = typed_buffer->push_and_replace(message_seq, message);
bool did_replace = typed_buffer->push_and_replace(message_seq, std::move(message));
// TODO(wjwwood): do something when a message was displaced. log debug?
(void)did_replace; // Avoid unused variable warning.

Expand Down Expand Up @@ -334,10 +353,45 @@ class IntraProcessManager
// Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left.
if (target_subs_size) {
// There are more subscriptions to serve, return a copy.
typed_buffer->get_copy_at_key(message_sequence_number, message);
typed_buffer->get(message_sequence_number, message);
} else {
// This is the last one to be returned, transfer ownership.
typed_buffer->pop(message_sequence_number, message);
}
}

template<
typename MessageT, typename Alloc = std::allocator<void>>
void
take_intra_process_message(
uint64_t intra_process_publisher_id,
uint64_t message_sequence_number,
uint64_t requesting_subscriptions_intra_process_id,
std::shared_ptr<const MessageT> & message)
{
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
using TypedMRB = mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc>;
message = nullptr;

size_t target_subs_size = 0;
std::lock_guard<std::mutex> lock(take_mutex_);
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->take_intra_process_message(
intra_process_publisher_id,
message_sequence_number,
requesting_subscriptions_intra_process_id,
target_subs_size
);
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
if (!typed_buffer) {
return;
}
// Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left.
if (target_subs_size) {
// There are more subscriptions to serve, return a copy.
typed_buffer->get(message_sequence_number, message);
} else {
// This is the last one to be returned, transfer ownership.
typed_buffer->pop_at_key(message_sequence_number, message);
typed_buffer->pop(message_sequence_number, message);
}
}

Expand Down
4 changes: 2 additions & 2 deletions rclcpp/include/rclcpp/intra_process_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@

#include "rclcpp/macros.hpp"
#include "rclcpp/mapped_ring_buffer.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/subscription.hpp"
#include "rclcpp/publisher_base.hpp"
#include "rclcpp/subscription_base.hpp"
#include "rclcpp/visibility_control.hpp"

namespace rclcpp
Expand Down
Loading

0 comments on commit 98f610c

Please sign in to comment.