Skip to content

Commit

Permalink
Options-based create_publisher and create_subscription interfaces
Browse files Browse the repository at this point in the history
Introduce new Options structs for creating publishers and subscribers. Deprecate existing interfaces for checking in CI how often they are used.

Signed-off-by: Emerson Knapp <eknapp@amazon.com>
  • Loading branch information
Emerson Knapp committed Apr 5, 2019
1 parent ed21cf4 commit 4d10549
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 51 deletions.
9 changes: 9 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,15 @@ if(BUILD_TESTING)
)
target_link_libraries(test_publisher ${PROJECT_NAME})
endif()
ament_add_gtest(test_pub_sub_option_interface test/test_pub_sub_option_interface.cpp)
if(TARGET test_pub_sub_option_interface)
ament_target_dependencies(test_pub_sub_option_interface
test_msgs
)
target_link_libraries(test_pub_sub_option_interface
${PROJECT_NAME}
)
endif()
ament_add_gtest(test_publisher_subscription_count_api test/test_publisher_subscription_count_api.cpp)
if(TARGET test_publisher_subscription_count_api)
ament_target_dependencies(test_publisher_subscription_count_api
Expand Down
34 changes: 34 additions & 0 deletions rclcpp/include/rclcpp/intra_process_setting.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__INTRA_PROCESS_SETTING_HPP_
#define RCLCPP__INTRA_PROCESS_SETTING_HPP_

namespace rclcpp
{

/// Used as argument in create_publisher and create_subscriber.
enum class IntraProcessSetting
{
/// Explicitly enable intraprocess comm at publisher/subscription level.
Enable,
/// Explicitly disable intraprocess comm at publisher/subscription level.
Disable,
/// Take intraprocess configuration from the node.
NodeDefault
};

} // namespace rclcpp

#endif // RCLCPP__INTRA_PROCESS_SETTING_HPP_
63 changes: 52 additions & 11 deletions rclcpp/include/rclcpp/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@
#include "rclcpp/node_interfaces/node_waitables_interface.hpp"
#include "rclcpp/parameter.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/publisher_options.hpp"
#include "rclcpp/service.hpp"
#include "rclcpp/subscription.hpp"
#include "rclcpp/subscription_options.hpp"
#include "rclcpp/subscription_traits.hpp"
#include "rclcpp/time.hpp"
#include "rclcpp/timer.hpp"
Expand All @@ -64,17 +66,6 @@
namespace rclcpp
{

/// Used as argument in create_publisher and create_subscriber.
enum class IntraProcessSetting
{
/// Explicitly enable intraprocess comm at publisher/subscription level.
Enable,
/// Explicitly disable intraprocess comm at publisher/subscription level.
Disable,
/// Take intraprocess configuration from the node.
NodeDefault
};

/// Node is the single point of entry for creating publishers and subscribers.
class Node : public std::enable_shared_from_this<Node>
{
Expand Down Expand Up @@ -150,6 +141,23 @@ class Node : public std::enable_shared_from_this<Node>
const std::vector<rclcpp::callback_group::CallbackGroup::WeakPtr> &
get_callback_groups() const;

/// Create and return a Publisher.
/**
* \param[in] topic_name The topic for this publisher to publish on.
* \param[in] qos_history_depth The depth of the publisher message queue.
* \param[in] options Additional options for the created Publisher.
* \return Shared pointer to the created publisher.
*/
template<
typename MessageT,
typename Alloc = std::allocator<void>,
typename PublisherT = ::rclcpp::Publisher<MessageT, Alloc>>
std::shared_ptr<PublisherT>
create_publisher(
const std::string & topic_name,
size_t qos_history_depth,
const PublisherOptions<Alloc> & options = PublisherOptions<Alloc>());

/// Create and return a Publisher.
/**
* \param[in] topic_name The topic for this publisher to publish on.
Expand All @@ -160,6 +168,7 @@ class Node : public std::enable_shared_from_this<Node>
template<
typename MessageT, typename Alloc = std::allocator<void>,
typename PublisherT = ::rclcpp::Publisher<MessageT, Alloc>>
[[deprecated]]
std::shared_ptr<PublisherT>
create_publisher(
const std::string & topic_name, size_t qos_history_depth,
Expand All @@ -176,13 +185,43 @@ class Node : public std::enable_shared_from_this<Node>
template<
typename MessageT, typename Alloc = std::allocator<void>,
typename PublisherT = ::rclcpp::Publisher<MessageT, Alloc>>
[[deprecated]]
std::shared_ptr<PublisherT>
create_publisher(
const std::string & topic_name,
const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default,
std::shared_ptr<Alloc> allocator = nullptr,
IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault);

/// Create and return a Subscription.
/**
* \param[in] topic_name The topic to subscribe on.
* \param[in] callback The user-defined callback function to receive a message
* \param[in] qos_history_depth The depth of the subscription's incoming message queue.
* \param[in] options Additional options for the creation of the Subscription.
* \param[in] msg_mem_strat The message memory strategy to use for allocating messages.
* \return Shared pointer to the created subscription.
*/
/* TODO(jacquelinekay):
Windows build breaks when static member function passed as default
argument to msg_mem_strat, nullptr is a workaround.
*/
template<
typename MessageT,
typename CallbackT,
typename Alloc = std::allocator<void>,
typename SubscriptionT = rclcpp::Subscription<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>>
std::shared_ptr<SubscriptionT>
create_subscription(
const std::string & topic_name,
CallbackT && callback,
size_t qos_history_depth,
const SubscriptionOptions<Alloc> & options = SubscriptionOptions<Alloc>(),
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat = nullptr);

/// Create and return a Subscription.
/**
* \param[in] topic_name The topic to subscribe on.
Expand All @@ -204,6 +243,7 @@ class Node : public std::enable_shared_from_this<Node>
typename Alloc = std::allocator<void>,
typename SubscriptionT = rclcpp::Subscription<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>>
[[deprecated]]
std::shared_ptr<SubscriptionT>
create_subscription(
const std::string & topic_name,
Expand Down Expand Up @@ -238,6 +278,7 @@ class Node : public std::enable_shared_from_this<Node>
typename Alloc = std::allocator<void>,
typename SubscriptionT = rclcpp::Subscription<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>>
[[deprecated]]
std::shared_ptr<SubscriptionT>
create_subscription(
const std::string & topic_name,
Expand Down
124 changes: 84 additions & 40 deletions rclcpp/include/rclcpp/node_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,6 @@
namespace rclcpp
{

template<typename MessageT, typename Alloc, typename PublisherT>
std::shared_ptr<PublisherT>
Node::create_publisher(
const std::string & topic_name, size_t qos_history_depth,
std::shared_ptr<Alloc> allocator,
IntraProcessSetting use_intra_process_comm)
{
if (!allocator) {
allocator = std::make_shared<Alloc>();
}
rmw_qos_profile_t qos = rmw_qos_profile_default;
qos.depth = qos_history_depth;
return this->create_publisher<MessageT, Alloc, PublisherT>(topic_name, qos,
allocator, use_intra_process_comm);
}

RCLCPP_LOCAL
inline
std::string
Expand All @@ -82,15 +66,19 @@ extend_name_with_sub_namespace(const std::string & name, const std::string & sub
template<typename MessageT, typename Alloc, typename PublisherT>
std::shared_ptr<PublisherT>
Node::create_publisher(
const std::string & topic_name, const rmw_qos_profile_t & qos_profile,
std::shared_ptr<Alloc> allocator, IntraProcessSetting use_intra_process_comm)
const std::string & topic_name,
size_t qos_history_depth,
const PublisherOptions<Alloc> & options)
{
std::shared_ptr<Alloc> allocator = options.allocator;
if (!allocator) {
allocator = std::make_shared<Alloc>();
}
rmw_qos_profile_t qos_profile = options.qos_profile;
qos_profile.depth = qos_history_depth;

bool use_intra_process;
switch (use_intra_process_comm) {
switch (options.use_intra_process_comm) {
case IntraProcessSetting::Enable:
use_intra_process = true;
break;
Expand All @@ -113,6 +101,35 @@ Node::create_publisher(
allocator);
}

template<typename MessageT, typename Alloc, typename PublisherT>
std::shared_ptr<PublisherT>
Node::create_publisher(
const std::string & topic_name, size_t qos_history_depth,
std::shared_ptr<Alloc> allocator,
IntraProcessSetting use_intra_process_comm)
{
PublisherOptions<Alloc> pub_options;
pub_options.allocator = allocator;
pub_options.use_intra_process_comm = use_intra_process_comm;
return this->create_publisher<MessageT, Alloc, PublisherT>(
topic_name, qos_history_depth, pub_options);
}


template<typename MessageT, typename Alloc, typename PublisherT>
std::shared_ptr<PublisherT>
Node::create_publisher(
const std::string & topic_name, const rmw_qos_profile_t & qos_profile,
std::shared_ptr<Alloc> allocator, IntraProcessSetting use_intra_process_comm)
{
PublisherOptions<Alloc> pub_options;
pub_options.qos_profile = qos_profile;
pub_options.allocator = allocator;
pub_options.use_intra_process_comm = use_intra_process_comm;
return this->create_publisher<MessageT, Alloc, PublisherT>(
topic_name, qos_profile.depth, pub_options);
}

template<
typename MessageT,
typename CallbackT,
Expand All @@ -122,28 +139,29 @@ std::shared_ptr<SubscriptionT>
Node::create_subscription(
const std::string & topic_name,
CallbackT && callback,
const rmw_qos_profile_t & qos_profile,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
size_t qos_history_depth,
const SubscriptionOptions<Alloc> & options,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat,
std::shared_ptr<Alloc> allocator,
IntraProcessSetting use_intra_process_comm)
msg_mem_strat)
{
using CallbackMessageT = typename rclcpp::subscription_traits::has_message_type<CallbackT>::type;

std::shared_ptr<Alloc> allocator = options.allocator;
if (!allocator) {
allocator = std::make_shared<Alloc>();
}

rmw_qos_profile_t qos_profile = options.qos_profile;
qos_profile.depth = qos_history_depth;

if (!msg_mem_strat) {
using rclcpp::message_memory_strategy::MessageMemoryStrategy;
msg_mem_strat = MessageMemoryStrategy<CallbackMessageT, Alloc>::create_default();
}

bool use_intra_process;
switch (use_intra_process_comm) {
switch (options.use_intra_process_comm) {
case IntraProcessSetting::Enable:
use_intra_process = true;
break;
Expand All @@ -163,8 +181,8 @@ Node::create_subscription(
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
std::forward<CallbackT>(callback),
qos_profile,
group,
ignore_local_publications,
options.callback_group,
options.ignore_local_publications,
use_intra_process,
msg_mem_strat,
allocator);
Expand All @@ -179,7 +197,7 @@ std::shared_ptr<SubscriptionT>
Node::create_subscription(
const std::string & topic_name,
CallbackT && callback,
size_t qos_history_depth,
const rmw_qos_profile_t & qos_profile,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
Expand All @@ -188,18 +206,44 @@ Node::create_subscription(
std::shared_ptr<Alloc> allocator,
IntraProcessSetting use_intra_process_comm)
{
rmw_qos_profile_t qos = rmw_qos_profile_default;
qos.depth = qos_history_depth;
SubscriptionOptions<Alloc> sub_options;
sub_options.qos_profile = qos_profile;
sub_options.callback_group = group;
sub_options.ignore_local_publications = ignore_local_publications;
sub_options.allocator = allocator;
sub_options.use_intra_process_comm = use_intra_process_comm;

return this->create_subscription<MessageT, CallbackT, Alloc, SubscriptionT>(
topic_name, std::forward<CallbackT>(callback), qos_profile.depth, sub_options, msg_mem_strat);
}

return this->create_subscription<MessageT>(
topic_name,
std::forward<CallbackT>(callback),
qos,
group,
ignore_local_publications,
msg_mem_strat,
allocator,
use_intra_process_comm);

template<
typename MessageT,
typename CallbackT,
typename Alloc,
typename SubscriptionT>
std::shared_ptr<SubscriptionT>
Node::create_subscription(
const std::string & topic_name,
CallbackT && callback,
size_t qos_history_depth,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat,
std::shared_ptr<Alloc> allocator,
IntraProcessSetting use_intra_process_comm)
{
SubscriptionOptions<Alloc> sub_options;
sub_options.callback_group = group;
sub_options.ignore_local_publications = ignore_local_publications;
sub_options.allocator = allocator;
sub_options.use_intra_process_comm = use_intra_process_comm;

return this->create_subscription<MessageT, CallbackT, Alloc, SubscriptionT>(
topic_name, std::forward<CallbackT>(callback), qos_history_depth, sub_options, msg_mem_strat);
}

template<typename DurationRepT, typename DurationT, typename CallbackT>
Expand Down
Loading

0 comments on commit 4d10549

Please sign in to comment.