Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include <rmw/types.h>

#include <chrono>
#include <functional>
#include <memory>
#include <stdexcept>
#include <string>
Expand All @@ -29,7 +31,9 @@
#include "rclcpp/context.hpp"
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/time.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "tracetools/tracetools.h"

Expand Down Expand Up @@ -70,22 +74,25 @@ class SubscriptionIntraProcess
using ConstMessageSharedPtr = typename SubscriptionIntraProcessBufferT::ConstDataSharedPtr;
using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::SubscribedTypeUniquePtr;
using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr;
using StatsHandlerFn = std::function<void(const rmw_message_info_t &, const rclcpp::Time &)>;

SubscriptionIntraProcess(
AnySubscriptionCallback<MessageT, Alloc> callback,
std::shared_ptr<Alloc> allocator,
rclcpp::Context::SharedPtr context,
const std::string & topic_name,
const rclcpp::QoS & qos_profile,
rclcpp::IntraProcessBufferType buffer_type)
rclcpp::IntraProcessBufferType buffer_type,
StatsHandlerFn stats_handler = nullptr)
: SubscriptionIntraProcessBuffer<SubscribedType, SubscribedTypeAlloc,
SubscribedTypeDeleter, ROSMessageType>(
std::make_shared<SubscribedTypeAlloc>(*allocator),
context,
topic_name,
qos_profile,
buffer_type),
any_callback_(callback)
any_callback_(callback),
stats_handler_(std::move(stats_handler))
{
TRACETOOLS_TRACEPOINT(
rclcpp_subscription_callback_added,
Expand Down Expand Up @@ -197,6 +204,18 @@ class SubscriptionIntraProcess
msg_info.publisher_gid = {0, {0}};
msg_info.from_intra_process = true;

const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(
std::chrono::system_clock::now());
if (stats_handler_) {
RCLCPP_WARN_ONCE(
rclcpp::get_logger("rclcpp"),
"Intra-process communication does not support accurate message age statistics");
// Set source_timestamp to "now" so that message_age reports 0ms rather than
// an invalid value taken from an un-initialised timestamp. IPC delivery
// has little/no transport latency by definition, so near-zero age is expected.
msg_info.source_timestamp = nanos.time_since_epoch().count();
}

auto shared_ptr = std::static_pointer_cast<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>(
data);

Expand All @@ -208,9 +227,14 @@ class SubscriptionIntraProcess
any_callback_.dispatch_intra_process(std::move(unique_msg), msg_info);
}
shared_ptr.reset();

if (stats_handler_) {
stats_handler_(msg_info, rclcpp::Time(nanos.time_since_epoch().count()));
}
}

AnySubscriptionCallback<MessageT, Alloc> any_callback_;
StatsHandlerFn stats_handler_;
};

} // namespace experimental
Expand Down
15 changes: 14 additions & 1 deletion rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ class Subscription : public SubscriptionBase
ROSMessageT,
AllocatorT>;

// Build a type-erased stats handler to avoid a circular include chain
// via publisher.hpp and callback_group.hpp
typename SubscriptionIntraProcessT::StatsHandlerFn stats_handler = nullptr;
if (subscription_topic_statistics) {
stats_handler =
[subscription_topic_statistics](
const rmw_message_info_t & info, const rclcpp::Time & time)
{
subscription_topic_statistics->handle_message(info, time);
};
}

// First create a SubscriptionIntraProcess which will be given to the intra-process manager.
auto context = node_base->get_context();
subscription_intra_process_ = std::make_shared<SubscriptionIntraProcessT>(
Expand All @@ -172,7 +184,8 @@ class Subscription : public SubscriptionBase
context,
this->get_topic_name(), // important to get like this, as it has the fully-qualified name
qos_profile,
resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback));
resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback),
std::move(stats_handler));
TRACETOOLS_TRACEPOINT(
rclcpp_subscription_init,
static_cast<const void *>(get_subscription_handle().get()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ class PublisherNode : public rclcpp::Node
public:
PublisherNode(
const std::string & name, const std::string & topic,
const std::chrono::milliseconds & publish_period = std::chrono::milliseconds{100})
: Node(name)
const std::chrono::milliseconds & publish_period = std::chrono::milliseconds{100},
bool use_intra_process_comms = false)
: Node(name, rclcpp::NodeOptions().use_intra_process_comms(use_intra_process_comms))
{
publisher_ = create_publisher<MessageT>(topic, 10);
publish_timer_ = this->create_wall_timer(
Expand Down Expand Up @@ -181,8 +182,9 @@ class SubscriberWithTopicStatistics : public rclcpp::Node
public:
SubscriberWithTopicStatistics(
const std::string & name, const std::string & topic,
std::chrono::milliseconds publish_period = defaultStatisticsPublishPeriod)
: Node(name)
std::chrono::milliseconds publish_period = defaultStatisticsPublishPeriod,
bool use_intra_process_comms = false)
: Node(name, rclcpp::NodeOptions().use_intra_process_comms(use_intra_process_comms))
{
// Manually enable topic statistics via options
auto options = rclcpp::SubscriptionOptions();
Expand All @@ -195,7 +197,7 @@ class SubscriberWithTopicStatistics : public rclcpp::Node
subscription_ = create_subscription<MessageT,
std::function<void(typename MessageT::UniquePtr)>>(
topic,
rclcpp::QoS(rclcpp::KeepAll()),
use_intra_process_comms ? rclcpp::QoS(10) : rclcpp::QoS(rclcpp::KeepAll()),
callback,
options);
}
Expand Down Expand Up @@ -421,3 +423,58 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_include_window
}
}
}

/**
* Test topic statistics are collected when use_intra_process_comms is enabled.
* This validates a fix for ros2/rclcpp#2911 where IPC subscriptions never called the
* stat handler causing all statistics to report NaN values.
* Also verifies message_age is non-NaN, validating that source_timestamp is set correctly.
*/
TEST_F(TestSubscriptionTopicStatisticsFixture, test_stats_with_intra_process_comms)
{
auto empty_publisher = std::make_shared<PublisherNode<Empty>>(
kTestPubNodeName,
kTestSubStatsEmptyTopic,
std::chrono::milliseconds{100},
true);

auto statistics_listener = std::make_shared<rclcpp::topic_statistics::MetricsMessageSubscriber>(
"test_ipc_stats_listener",
"/statistics",
kNumExpectedMessages);

auto empty_subscriber = std::make_shared<SubscriberWithTopicStatistics<Empty>>(
kTestSubNodeName,
kTestSubStatsEmptyTopic,
defaultStatisticsPublishPeriod,
true);

rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(empty_publisher);
ex.add_node(statistics_listener);
ex.add_node(empty_subscriber);

ex.spin_until_future_complete(statistics_listener->GetFuture(), kTestTimeout);

const auto received_messages = statistics_listener->GetReceivedMessages();
EXPECT_EQ(kNumExpectedMessages, received_messages.size());

uint64_t message_age_count{0};
uint64_t message_period_count{0};

for (const auto & msg : received_messages) {
if (msg.metrics_source == kMessageAgeSourceLabel) {
message_age_count++;
// Verify message_age stats are non-NaN to validates source_timestamp fix
for (const auto & stats_point : msg.statistics) {
EXPECT_FALSE(std::isnan(stats_point.data));
}
}
if (msg.metrics_source == kMessagePeriodSourceLabel) {
message_period_count++;
}
}

EXPECT_EQ(kNumExpectedMessageAgeMessages, message_age_count);
EXPECT_EQ(kNumExpectedMessagePeriodMessages, message_period_count);
}