Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement deliver message kind #2168

Merged
merged 2 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/generic_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class GenericSubscription : public rclcpp::SubscriptionBase
options.to_rcl_subscription_options(qos),
options.event_callbacks,
options.use_default_callbacks,
SubscriptionType::SERIALIZED_MESSAGE),
DeliveredMessageKind::SERIALIZED_MESSAGE),
callback_(callback),
ts_lib_(ts_lib)
{}
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class Subscription : public SubscriptionBase
// NOTE(methylDragon): Passing these args separately is necessary for event binding
options.event_callbacks,
options.use_default_callbacks,
callback.is_serialized_message_callback() ? SubscriptionType::SERIALIZED_MESSAGE : SubscriptionType::ROS_MESSAGE), // NOLINT
callback.is_serialized_message_callback() ? DeliveredMessageKind::SERIALIZED_MESSAGE : DeliveredMessageKind::ROS_MESSAGE), // NOLINT
any_callback_(callback),
options_(options),
message_memory_strategy_(message_memory_strategy)
Expand Down
35 changes: 24 additions & 11 deletions rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,25 @@ namespace experimental
class IntraProcessManager;
} // namespace experimental

enum class SubscriptionType : uint8_t
/// The kind of message that the subscription delivers in its callback, used by the executor
/**
* This enum needs to exist because the callback handle is not accessible to the executor's scope.
*
* "Kind" is used since what is being delivered is a category of messages, for example, there are
* different ROS message types that can be delivered, but they're all ROS messages.
*
* As a concrete example, all of the following callbacks will be considered ROS_MESSAGE for
* DeliveredMessageKind:
* - void callback(const std_msgs::msg::String &)
* - void callback(const std::string &) // type adaption
* - void callback(std::unique_ptr<std_msgs::msg::String>)
*/
enum class DeliveredMessageKind : uint8_t
{
INVALID = 0, // The subscription type is most likely uninitialized
ROS_MESSAGE = 1, // take message as ROS message and handle as ROS message
SERIALIZED_MESSAGE = 2, // take message as serialized and handle as serialized
DYNAMIC_MESSAGE_DIRECT = 3, // take message as DynamicMessage and handle as DynamicMessage
DYNAMIC_MESSAGE_FROM_SERIALIZED = 4 // take message as serialized and handle as DynamicMessage
INVALID = 0,
ROS_MESSAGE = 1, // The subscription delivers a ROS message to its callback
SERIALIZED_MESSAGE = 2, // The subscription delivers a serialized message to its callback
DYNAMIC_MESSAGE = 3, // The subscription delivers a dynamic message to its callback
Comment on lines -68 to +84
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked @methylDragon to change this, since I believe the difference between a dynamic message delivered from the middleware or by wrapping a serialized message from the middleware should be encapsulated in the subscription, that is to say, neither the executor nor the user creating the subscription should care how the dynamic message is delivered. That also lead to the discussion of the name of this enum and how it was a bit ambiguous, e.g. normal vs generic subscription or lifecycle vs normal, "type" was just overloaded too much.

Also, we discussed a use case where the user wanted the dynamic message to be delivered by wrapping a serialized message always, even if the middleware supported directly returning one, but then we concluded the user could do this themselves by asking for a serialize message in their callback and creating a dynamic message from that themselves. And if it becomes a common pattern we want to put into our API, it should be a separate option form this enum.

};

/// Virtual base class for subscriptions. This pattern allows us to iterate over different template
Expand All @@ -88,7 +100,8 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
* \param[in] type_support_handle rosidl type support struct, for the Message type of the topic.
* \param[in] topic_name Name of the topic to subscribe to.
* \param[in] subscription_options Options for the subscription.
* \param[in] subscription_type Enum flag to change how the message will be received and delivered
* \param[in] delivered_message_kind Enum flag to change how the message will be received and
* delivered
*/
RCLCPP_PUBLIC
SubscriptionBase(
Expand All @@ -98,7 +111,7 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
const rcl_subscription_options_t & subscription_options,
const SubscriptionEventCallbacks & event_callbacks,
bool use_default_callbacks,
SubscriptionType subscription_type = SubscriptionType::ROS_MESSAGE);
DeliveredMessageKind delivered_message_kind = DeliveredMessageKind::ROS_MESSAGE);

/// Destructor.
RCLCPP_PUBLIC
Expand Down Expand Up @@ -249,10 +262,10 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>

/// Return the type of the subscription.
/**
* \return `SubscriptionType`, which adjusts how messages are received and delivered.
* \return `DeliveredMessageKind`, which adjusts how messages are received and delivered.
*/
RCLCPP_PUBLIC
SubscriptionType
DeliveredMessageKind
get_subscription_type() const;

/// Get matching publisher count.
Expand Down Expand Up @@ -650,7 +663,7 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
RCLCPP_DISABLE_COPY(SubscriptionBase)

rosidl_message_type_support_t type_support_;
SubscriptionType subscription_type_;
DeliveredMessageKind delivered_message_type_;

std::atomic<bool> subscription_in_use_by_wait_set_{false};
std::atomic<bool> intra_process_subscription_waitable_in_use_by_wait_set_{false};
Expand Down
20 changes: 7 additions & 13 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,8 +605,8 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
message_info.get_rmw_message_info().from_intra_process = false;

switch (subscription->get_subscription_type()) {
// Take ROS message
case rclcpp::SubscriptionType::ROS_MESSAGE:
// Deliver ROS message
case rclcpp::DeliveredMessageKind::ROS_MESSAGE:
{
if (subscription->can_loan_messages()) {
// This is the case where a loaned message is taken from the middleware via
Expand Down Expand Up @@ -659,8 +659,8 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
break;
}

// Take serialized message
case rclcpp::SubscriptionType::SERIALIZED_MESSAGE:
// Deliver serialized message
case rclcpp::DeliveredMessageKind::SERIALIZED_MESSAGE:
{
// This is the case where a copy of the serialized message is taken from
// the middleware via inter-process communication.
Expand All @@ -679,21 +679,15 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
}

// DYNAMIC SUBSCRIPTION ========================================================================
// Take dynamic message directly from the middleware
case rclcpp::SubscriptionType::DYNAMIC_MESSAGE_DIRECT:
{
throw std::runtime_error("Unimplemented");
}

// Take serialized and then convert to dynamic message
case rclcpp::SubscriptionType::DYNAMIC_MESSAGE_FROM_SERIALIZED:
// Deliver dynamic message
case rclcpp::DeliveredMessageKind::DYNAMIC_MESSAGE:
{
throw std::runtime_error("Unimplemented");
}

default:
{
throw std::runtime_error("Subscription type is not supported");
throw std::runtime_error("Delivered message kind is not supported");
}
}
return;
Expand Down
18 changes: 5 additions & 13 deletions rclcpp/src/rclcpp/subscription_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,16 @@ SubscriptionBase::SubscriptionBase(
const rcl_subscription_options_t & subscription_options,
const SubscriptionEventCallbacks & event_callbacks,
bool use_default_callbacks,
SubscriptionType subscription_type)
DeliveredMessageKind delivered_message_kind)
: node_base_(node_base),
node_handle_(node_base_->get_shared_rcl_node_handle()),
node_logger_(rclcpp::get_node_logger(node_handle_.get())),
use_intra_process_(false),
intra_process_subscription_id_(0),
event_callbacks_(event_callbacks),
type_support_(type_support_handle),
subscription_type_(subscription_type)
delivered_message_type_(delivered_message_kind)
{
if (!rmw_feature_supported(RMW_MIDDLEWARE_CAN_TAKE_DYNAMIC_MESSAGE) &&
subscription_type == rclcpp::SubscriptionType::DYNAMIC_MESSAGE_DIRECT)
{
throw std::runtime_error(
"Cannot set subscription to take dynamic message directly, feature not supported in rmw"
);
}

auto custom_deletor = [node_handle = this->node_handle_](rcl_subscription_t * rcl_subs)
{
if (rcl_subscription_fini(rcl_subs, node_handle.get()) != RCL_RET_OK) {
Expand Down Expand Up @@ -269,13 +261,13 @@ SubscriptionBase::get_message_type_support_handle() const
bool
SubscriptionBase::is_serialized() const
{
return subscription_type_ == rclcpp::SubscriptionType::SERIALIZED_MESSAGE;
return delivered_message_type_ == rclcpp::DeliveredMessageKind::SERIALIZED_MESSAGE;
}

rclcpp::SubscriptionType
rclcpp::DeliveredMessageKind
SubscriptionBase::get_subscription_type() const
{
return subscription_type_;
return delivered_message_type_;
}

size_t
Expand Down