Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dnae adas/serialized ipm #973

Open
wants to merge 45 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
9b73845
added "SerializationBase" and "Serialization" to convert a ROS2 messa…
Jan 20, 2020
c8162ba
added "SerializedContainer" as container of rcl_serialized_message_t,…
Jan 20, 2020
f3aab4f
extended "IntraProcessManager" for serialized messages:
Jan 20, 2020
9f1c10f
extended "SubscriptionIntraProcess" for serialized communication by:
Jan 20, 2020
a300c2f
added secon communication channel for intra process communication for…
Jan 20, 2020
86bd3a3
extended "Publisher" for serialized messages:
Jan 20, 2020
9d7ef52
extended "Publisher" for serialized messages:
Jan 20, 2020
c8522b3
updated "LifcecylePublisher" for serialized messages
Jan 20, 2020
29fea92
extended "create_publisher" and "create_publisher_factory" to pass me…
Jan 20, 2020
f9ef2c2
extended "Subscription" for serialized messages:
Jan 20, 2020
56b520e
extended "create_subscription" and "create_subscriptioncreate_publish…
Jan 20, 2020
0292142
added unit test for serialized intra process communication
Jan 20, 2020
5faf8f4
changed include notation
Jan 27, 2020
a3db006
beautified error output
Jan 27, 2020
d7a84ea
changed timing
Jan 27, 2020
af14ee7
* fixed code style
Jan 28, 2020
0c83e49
updated test_intra_process_communication:
Jan 28, 2020
bc228ee
uncrustified file
Jan 28, 2020
4c80594
fixed code style
Jan 30, 2020
2ddbe32
added missing member from rebase
Apr 17, 2020
4e1e744
removed unnecessary dependency
Apr 17, 2020
d8f1da9
adapted to multi waitables for ipm
Apr 17, 2020
d74c4ed
removed unnecessary include
Apr 17, 2020
d4536b3
updated to rclcpp::ok
Apr 17, 2020
7cff1aa
Update rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
DensoADAS Apr 17, 2020
657d9a0
Update rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
DensoADAS Apr 17, 2020
9b65b7c
* default deleter
Apr 17, 2020
18181f5
changes due to review of PR
Apr 17, 2020
4c7506f
uncrustify
Apr 17, 2020
2cefdca
added backwards compatibility for publishing serialized messages
Apr 17, 2020
2147026
initializing variables
Apr 17, 2020
9b436fa
fix include guards
Apr 17, 2020
a2fc8bd
enabled publishing of rcl_serialized_message_t
Apr 17, 2020
b482164
moved serialization and serialized message out of "experimental"
Apr 17, 2020
93dc826
added original constructor to LifcyclePublisher
Apr 17, 2020
35fd490
fixed year in header
Apr 17, 2020
5472d36
added comment
Apr 17, 2020
ab73a62
updated loop signature
Apr 17, 2020
4fa5f09
fixed memory loss
Apr 17, 2020
ff7a81f
reuse constructor
Apr 17, 2020
408343f
renamed variable
Apr 17, 2020
511476c
fixed check
Apr 17, 2020
1f8226a
line break
Apr 17, 2020
2411642
fixed include order
Apr 17, 2020
43a8446
renamed overloaded functions:
Apr 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,18 @@ if(BUILD_TESTING)
"test_msgs"
)
target_link_libraries(test_loaned_message ${PROJECT_NAME})
ament_add_gtest(test_intra_process_communication test/test_intra_process_communication.cpp
TIMEOUT 120)
if(TARGET test_intra_process_communication)
ament_target_dependencies(test_intra_process_communication
"rcl"
"rcl_interfaces"
"rmw"
"rosidl_typesupport_cpp"
"test_msgs"
)
endif()
target_link_libraries(test_intra_process_communication ${PROJECT_NAME})

ament_add_gtest(test_node test/test_node.cpp TIMEOUT 240)
if(TARGET test_node)
Expand Down
27 changes: 25 additions & 2 deletions rclcpp/include/rclcpp/create_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ template<
typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>,
typename NodeT>
std::shared_ptr<PublisherT>
create_publisher(
create_generic_publisher(
NodeT & node,
const std::string & topic_name,
const rosidl_message_type_support_t & type_support,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
Expand All @@ -56,7 +57,7 @@ create_publisher(
// Create the publisher.
auto pub = node_topics->create_publisher(
topic_name,
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(options),
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(options, type_support),
qos
);

Expand All @@ -66,6 +67,28 @@ create_publisher(
return std::dynamic_pointer_cast<PublisherT>(pub);
}

template<
typename MessageT,
typename AllocatorT = std::allocator<void>,
typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>,
typename NodeT>
std::shared_ptr<PublisherT>
create_publisher(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do I need this overload if both methods are templated by MessageT? The call to const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(); could be done in the original function or even within the create_publisher_factory function, couldn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this allows an instantion with type "rcl_serialized_message_t" for generic publishers without previously known message type (e.g. for rosbag2)

Example:
auto publisher = rclcpp::create_publisher<rcl_serialized_message_t>( node, "/topic", *rosidl_typesupport_cpp::get_message_type_support_handle<test_msgs::msg::Strings>(), rclcpp::QoS(10));

NodeT & node,
const std::string & topic_name,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
)
)
{
const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>();

return create_generic_publisher<MessageT, AllocatorT, PublisherT, NodeT>(
node, topic_name, type_support,
qos, options);
}

} // namespace rclcpp

#endif // RCLCPP__CREATE_PUBLISHER_HPP_
42 changes: 40 additions & 2 deletions rclcpp/include/rclcpp/create_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ template<
>,
typename NodeT>
typename std::shared_ptr<SubscriptionT>
create_subscription(
create_generic_subscription(
NodeT && node,
const std::string & topic_name,
const rosidl_message_type_support_t & type_support,
const rclcpp::QoS & qos,
CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options = (
Expand All @@ -67,7 +68,8 @@ create_subscription(
auto factory = rclcpp::create_subscription_factory<MessageT>(
std::forward<CallbackT>(callback),
options,
msg_mem_strat
msg_mem_strat,
type_support
);

auto sub = node_topics->create_subscription(topic_name, factory, qos);
Expand All @@ -76,6 +78,42 @@ create_subscription(
return std::dynamic_pointer_cast<SubscriptionT>(sub);
}

template<
typename MessageT,
typename CallbackT,
typename AllocatorT = std::allocator<void>,
typename CallbackMessageT =
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type,
typename SubscriptionT = rclcpp::Subscription<CallbackMessageT, AllocatorT>,
typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
CallbackMessageT,
AllocatorT
>,
typename NodeT>
typename std::shared_ptr<SubscriptionT>
create_subscription(
Karsten1987 marked this conversation as resolved.
Show resolved Hide resolved
NodeT && node,
const std::string & topic_name,
const rclcpp::QoS & qos,
CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options = (
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT>()
),
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = (
MessageMemoryStrategyT::create_default()
)
)
{
const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>();

return
create_generic_subscription<
MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT, MessageMemoryStrategyT
>(
std::forward<NodeT>(node), topic_name, type_support, qos, std::forward<CallbackT>(
callback), options, msg_mem_strat);
}

} // namespace rclcpp

#endif // RCLCPP__CREATE_SUBSCRIPTION_HPP_
10 changes: 8 additions & 2 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,14 @@ class IntraProcessManager
* In addition this generates a unique intra process id for the subscription.
*
* \param subscription the SubscriptionIntraProcess to register.
* \param is_serialized true if the buffer expects serialized messages
* \return an unsigned 64-bit integer which is the subscription's unique id.
*/
RCLCPP_PUBLIC
uint64_t
add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription);
add_subscription(
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription,
const bool is_serialized = false);

/// Unregister a subscription using the subscription's unique id.
/**
Expand All @@ -134,11 +137,12 @@ class IntraProcessManager
* In addition this generates a unique intra process id for the publisher.
*
* \param publisher publisher to be registered with the manager.
* \param is_serialized true if the buffer expects serialized messages
* \return an unsigned 64-bit integer which is the publisher's unique id.
*/
RCLCPP_PUBLIC
uint64_t
add_publisher(rclcpp::PublisherBase::SharedPtr publisher);
add_publisher(rclcpp::PublisherBase::SharedPtr publisher, const bool is_serialized = false);

/// Unregister a publisher using the publisher's unique id.
/**
Expand Down Expand Up @@ -311,6 +315,7 @@ class IntraProcessManager
rmw_qos_profile_t qos;
const char * topic_name;
bool use_take_shared_method;
bool is_serialized;
};

struct PublisherInfo
Expand All @@ -320,6 +325,7 @@ class IntraProcessManager
rclcpp::PublisherBase::WeakPtr publisher;
rmw_qos_profile_t qos;
const char * topic_name;
bool is_serialized;
};

struct SplittedSubscriptions
Expand Down
127 changes: 120 additions & 7 deletions rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/create_intra_process_buffer.hpp"
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"
#include "tracetools/tracetools.h"
Expand All @@ -37,6 +39,8 @@ namespace rclcpp
namespace experimental
{

class SerializedMessage;

template<
typename MessageT,
typename Alloc = std::allocator<void>,
Expand All @@ -51,6 +55,10 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using ConstMessageSharedPtr = std::shared_ptr<const MessageT>;
using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
using CallbackMessageAllocTraits = allocator::AllocRebind<CallbackMessageT, Alloc>;
using CallbackMessageAlloc = typename CallbackMessageAllocTraits::allocator_type;
using CallbackMessageUniquePtr = std::unique_ptr<CallbackMessageT>;
using CallbackMessageSharedPtr = std::shared_ptr<CallbackMessageT>;

using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
MessageT,
Expand All @@ -64,11 +72,15 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
rclcpp::Context::SharedPtr context,
const std::string & topic_name,
rmw_qos_profile_t qos_profile,
rclcpp::IntraProcessBufferType buffer_type)
rclcpp::IntraProcessBufferType buffer_type,
std::shared_ptr<SerializationBase> serializer)
: SubscriptionIntraProcessBase(topic_name, qos_profile),
any_callback_(callback)
any_callback_(callback), serializer_(serializer)
{
if (!std::is_same<MessageT, CallbackMessageT>::value) {
if (!std::is_same<MessageT, CallbackMessageT>::value &&
!std::is_same<MessageT, rclcpp::SerializedMessage>::value &&
!std::is_same<CallbackMessageT, rcl_serialized_message_t>::value)
{
throw std::runtime_error("SubscriptionIntraProcess wrong callback type");
}

Expand Down Expand Up @@ -142,18 +154,47 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
(void)ret;
}

// convert from ROS2 message to rcl_serialized_message_t (serilizatino needed)
template<typename T>
typename std::enable_if<std::is_same<T, rcl_serialized_message_t>::value, void>::type
typename std::enable_if<
std::is_same<T, rcl_serialized_message_t>::value &&
!std::is_same<MessageT, rclcpp::SerializedMessage>::value,
void>::type
execute_impl()
{
throw std::runtime_error("Subscription intra-process can't handle serialized messages");
if (nullptr == serializer_) {
throw std::runtime_error("Subscription intra-process can't handle serialized messages");
}

rmw_message_info_t msg_info = {};
msg_info.from_intra_process = true;

ConstMessageSharedPtr msg = buffer_->consume_shared();
auto serialized_msg =
serializer_->serialize_message(reinterpret_cast<const void *>(msg.get()));

if (nullptr == serialized_msg) {
throw std::runtime_error("Subscription intra-process could not serialize message");
}

if (any_callback_.use_take_shared_method()) {
any_callback_.dispatch_intra_process(serialized_msg, msg_info);
} else {
throw std::runtime_error(
"Subscription intra-process for serialized "
"messages does not support unique pointers.");
}
}

// forward from ROS2 message to ROS2 message (same type)
template<class T>
typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value, void>::type
typename std::enable_if<
!std::is_same<T, rcl_serialized_message_t>::value &&
!std::is_same<MessageT, rclcpp::SerializedMessage>::value,
void>::type
execute_impl()
{
rmw_message_info_t msg_info;
rmw_message_info_t msg_info = {};
msg_info.publisher_gid = {0, {0}};
msg_info.from_intra_process = true;

Expand All @@ -166,8 +207,80 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
}
}

// forward from rcl_serialized_message_t to SerializationMessage (no conversion needed)
template<typename T>
typename std::enable_if<
std::is_same<T, rcl_serialized_message_t>::value &&
std::is_same<MessageT, rclcpp::SerializedMessage>::value,
void>::type
execute_impl()
{
rmw_message_info_t msg_info = {};
msg_info.from_intra_process = true;

if (any_callback_.use_take_shared_method()) {
ConstMessageSharedPtr msg = buffer_->consume_shared();
if (msg == nullptr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (msg == nullptr) {
if (nullptr == msg) {

throw std::runtime_error("Subscription intra-process could not get serialized message");
}
any_callback_.dispatch_intra_process(msg, msg_info);
} else {
throw std::runtime_error(
"Subscription intra-process for serialized "
"messages does not support unique pointers.");
}
}

// convert from rcl_serialized_message_t to ROS2 message (deserialization needed)
template<class T>
typename std::enable_if<
!std::is_same<T, rcl_serialized_message_t>::value &&
std::is_same<MessageT, rclcpp::SerializedMessage>::value,
void>::type
execute_impl()
{
if (nullptr == serializer_) {
throw std::runtime_error("Subscription intra-process can't handle unserialized messages");
}

ConstMessageSharedPtr serialized_container = buffer_->consume_shared();
if (nullptr == serialized_container) {
throw std::runtime_error("Subscription intra-process could not get serialized message");
}

rmw_message_info_t msg_info = {};
msg_info.from_intra_process = true;

if (any_callback_.use_take_shared_method()) {
CallbackMessageSharedPtr msg = construct_unique();
serializer_->deserialize_message(
serialized_container.get(),
reinterpret_cast<void *>(msg.get()));
any_callback_.dispatch_intra_process(msg, msg_info);
} else {
CallbackMessageUniquePtr msg = construct_unique();
serializer_->deserialize_message(
serialized_container.get(),
reinterpret_cast<void *>(msg.get()));
any_callback_.dispatch_intra_process(std::move(msg), msg_info);
}
}

CallbackMessageUniquePtr construct_unique()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a (inline) free function (anonymous namespace) which takes the allocator as a first argument.

{
CallbackMessageUniquePtr unique_msg;
auto ptr = CallbackMessageAllocTraits::allocate(*message_allocator_.get(), 1);
CallbackMessageAllocTraits::construct(*message_allocator_.get(), ptr);
unique_msg = CallbackMessageUniquePtr(ptr);

return unique_msg;
}

AnySubscriptionCallback<CallbackMessageT, Alloc> any_callback_;
BufferUniquePtr buffer_;
std::shared_ptr<SerializationBase> serializer_;
std::shared_ptr<CallbackMessageAlloc> message_allocator_ =
std::make_shared<CallbackMessageAlloc>();
};

} // namespace experimental
Expand Down
Loading