Skip to content

Commit

Permalink
Create a default warning for qos incompatibility (#1051)
Browse files Browse the repository at this point in the history
Signed-off-by: Miaofei <miaofei@amazon.com>
  • Loading branch information
mm318 committed Apr 14, 2020
1 parent aaf8b38 commit 44fa4fe
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 8 deletions.
15 changes: 15 additions & 0 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,21 @@ class Publisher : public PublisherBase
this->add_event_handler(
options_.event_callbacks.incompatible_qos_callback,
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
} else if (options_.use_default_callbacks) {
// Register default callback when not specified
try {
this->add_event_handler(
[this](QOSOfferedIncompatibleQoSInfo & info) {
this->default_incompatible_qos_callback(info);
},
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
} catch (UnsupportedEventTypeException & /*exc*/) {
RCLCPP_WARN_ONCE(
rclcpp::get_logger(rcl_node_get_logger_name(rcl_node_handle_.get())),
"This rmw implementation does not support ON_OFFERED_INCOMPATIBLE_QOS "
"events, you will not be notified when Publishers offer an incompatible "
"QoS profile to Subscriptions on the same topic.");
}
}
// Setup continues in the post construction method, post_init_setup().
}
Expand Down
3 changes: 3 additions & 0 deletions rclcpp/include/rclcpp/publisher_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
event_handlers_.emplace_back(handler);
}

RCLCPP_PUBLIC
void default_incompatible_qos_callback(QOSOfferedIncompatibleQoSInfo & info) const;

std::shared_ptr<rcl_node_t> rcl_node_handle_;

rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher();
Expand Down
3 changes: 3 additions & 0 deletions rclcpp/include/rclcpp/publisher_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ struct PublisherOptionsBase
/// Callbacks for various events related to publishers.
PublisherEventCallbacks event_callbacks;

/// Whether or not to use default callbacks when user doesn't supply any in event_callbacks
bool use_default_callbacks = true;

/// Callback group in which the waitable items from the publisher should be placed.
std::shared_ptr<rclcpp::callback_group::CallbackGroup> callback_group;

Expand Down
8 changes: 6 additions & 2 deletions rclcpp/include/rclcpp/qos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@
#ifndef RCLCPP__QOS_HPP_
#define RCLCPP__QOS_HPP_

#include <rmw/qos_profiles.h>
#include <rmw/types.h>
#include <string>

#include "rclcpp/duration.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rmw/incompatible_qos_events_statuses.h"
#include "rmw/qos_profiles.h"
#include "rmw/types.h"

namespace rclcpp
{

std::string qos_policy_name_from_kind(rmw_qos_policy_kind_t policy_kind);

/// QoS initialization values, cannot be created directly, use KeepAll or KeepLast instead.
struct RCLCPP_PUBLIC QoSInitialization
{
Expand Down
15 changes: 15 additions & 0 deletions rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,21 @@ class Subscription : public SubscriptionBase
this->add_event_handler(
options.event_callbacks.incompatible_qos_callback,
RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS);
} else if (options_.use_default_callbacks) {
// Register default callback when not specified
try {
this->add_event_handler(
[this](QOSRequestedIncompatibleQoSInfo & info) {
this->default_incompatible_qos_callback(info);
},
RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS);
} catch (UnsupportedEventTypeException & /*exc*/) {
RCLCPP_WARN_ONCE(
rclcpp::get_logger(rcl_node_get_logger_name(node_handle_.get())),
"This rmw implementation does not support ON_REQUESTED_INCOMPATIBLE_QOS "
"events, you will not be notified when Subscriptions request an incompatible "
"QoS profile from Publishers on the same topic.");
}
}

// Setup intra process publishing if requested.
Expand Down
3 changes: 3 additions & 0 deletions rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
event_handlers_.emplace_back(handler);
}

RCLCPP_PUBLIC
void default_incompatible_qos_callback(QOSRequestedIncompatibleQoSInfo & info) const;

RCLCPP_PUBLIC
bool
matches_any_intra_process_publishers(const rmw_gid_t * sender_gid) const;
Expand Down
3 changes: 3 additions & 0 deletions rclcpp/include/rclcpp/subscription_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ struct SubscriptionOptionsBase
/// Callbacks for events related to this subscription.
SubscriptionEventCallbacks event_callbacks;

/// Whether or not to use default callbacks when user doesn't supply any in event_callbacks
bool use_default_callbacks = true;

/// True to ignore local publications.
bool ignore_local_publications = false;

Expand Down
11 changes: 11 additions & 0 deletions rclcpp/src/rclcpp/publisher_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,14 @@ PublisherBase::setup_intra_process(
weak_ipm_ = ipm;
intra_process_is_enabled_ = true;
}

void
PublisherBase::default_incompatible_qos_callback(QOSOfferedIncompatibleQoSInfo & event) const
{
std::string policy_name = qos_policy_name_from_kind(event.last_policy_kind);
RCLCPP_WARN(
rclcpp::get_logger(rcl_node_get_logger_name(rcl_node_handle_.get())),
"New subscription discovered on this topic, requesting incompatible QoS. "
"No messages will be sent to it. "
"Last incompatible policy: %s", policy_name.c_str());
}
24 changes: 23 additions & 1 deletion rclcpp/src/rclcpp/qos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,33 @@

#include "rclcpp/qos.hpp"

#include <rmw/types.h>
#include <string>

#include "rmw/types.h"

namespace rclcpp
{

std::string qos_policy_name_from_kind(rmw_qos_policy_kind_t policy_kind)
{
switch (policy_kind) {
case RMW_QOS_POLICY_DURABILITY:
return "DURABILITY_QOS_POLICY";
case RMW_QOS_POLICY_DEADLINE:
return "DEADLINE_QOS_POLICY";
case RMW_QOS_POLICY_LIVELINESS:
return "LIVELINESS_QOS_POLICY";
case RMW_QOS_POLICY_RELIABILITY:
return "RELIABILITY_QOS_POLICY";
case RMW_QOS_POLICY_HISTORY:
return "HISTORY_QOS_POLICY";
case RMW_QOS_POLICY_LIFESPAN:
return "LIFESPAN_QOS_POLICY";
default:
return "INVALID_QOS_POLICY";
}
}

QoSInitialization::QoSInitialization(rmw_qos_history_policy_t history_policy_arg, size_t depth_arg)
: history_policy(history_policy_arg), depth(depth_arg)
{}
Expand Down
11 changes: 11 additions & 0 deletions rclcpp/src/rclcpp/subscription_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,17 @@ SubscriptionBase::get_intra_process_waitable() const
return ipm->get_subscription_intra_process(intra_process_subscription_id_);
}

void
SubscriptionBase::default_incompatible_qos_callback(QOSRequestedIncompatibleQoSInfo & event) const
{
std::string policy_name = qos_policy_name_from_kind(event.last_policy_kind);
RCLCPP_WARN(
rclcpp::get_logger(rcl_node_get_logger_name(node_handle_.get())),
"New publisher discovered on this topic, offering incompatible QoS. "
"No messages will be sent to it. "
"Last incompatible policy: %s", policy_name.c_str());
}

bool
SubscriptionBase::matches_any_intra_process_publishers(const rmw_gid_t * sender_gid) const
{
Expand Down
82 changes: 77 additions & 5 deletions rclcpp/test/test_qos_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@

#include <gtest/gtest.h>

#include <chrono>
#include <functional>
#include <future>
#include <memory>
#include <string>

#include "rclcpp/rclcpp.hpp"
#include "rcutils/logging.h"
#include "rmw/rmw.h"
#include "test_msgs/msg/empty.hpp"

Expand All @@ -31,9 +35,14 @@ class TestQosEvent : public ::testing::Test

void SetUp()
{
node = std::make_shared<rclcpp::Node>("test_qos_event", "/ns");
is_fastrtps =
std::string(rmw_get_implementation_identifier()).find("rmw_fastrtps") != std::string::npos;

node = std::make_shared<rclcpp::Node>("test_qos_event", "/ns");

message_callback = [node = node.get()](const test_msgs::msg::Empty::SharedPtr /*msg*/) {
RCLCPP_INFO(node->get_logger(), "Message received");
};
}

void TearDown()
Expand All @@ -44,6 +53,7 @@ class TestQosEvent : public ::testing::Test
static constexpr char topic_name[] = "test_topic";
rclcpp::Node::SharedPtr node;
bool is_fastrtps;
std::function<void(const test_msgs::msg::Empty::SharedPtr)> message_callback;
};

constexpr char TestQosEvent::topic_name[];
Expand Down Expand Up @@ -104,10 +114,6 @@ TEST_F(TestQosEvent, test_subscription_constructor)
{
rclcpp::SubscriptionOptions options;

auto message_callback = [node = node.get()](const test_msgs::msg::Empty::SharedPtr /*msg*/) {
RCLCPP_INFO(node->get_logger(), "Message received");
};

// options arg with no callbacks
auto subscription = node->create_subscription<test_msgs::msg::Empty>(
topic_name, 10, message_callback, options);
Expand Down Expand Up @@ -150,3 +156,69 @@ TEST_F(TestQosEvent, test_subscription_constructor)
EXPECT_TRUE(is_fastrtps);
}
}

/*
Testing construction of a subscriptions with QoS event callback functions.
*/
std::string * g_pub_log_msg;
std::string * g_sub_log_msg;
std::promise<void> * g_log_msgs_promise;
TEST_F(TestQosEvent, test_default_incompatible_qos_callbacks)
{
rcutils_logging_output_handler_t original_output_handler = rcutils_logging_get_output_handler();

std::string pub_log_msg;
std::string sub_log_msg;
std::promise<void> log_msgs_promise;
g_pub_log_msg = &pub_log_msg;
g_sub_log_msg = &sub_log_msg;
g_log_msgs_promise = &log_msgs_promise;
auto logger_callback = [](
const rcutils_log_location_t * /*location*/,
int /*level*/, const char * /*name*/, rcutils_time_point_value_t /*timestamp*/,
const char * format, va_list * args) -> void {
char buffer[1024];
vsnprintf(buffer, sizeof(buffer), format, *args);
const std::string msg = buffer;
if (msg.rfind("New subscription discovered", 0) == 0) {
*g_pub_log_msg = buffer;
} else if (msg.rfind("New publisher discovered", 0) == 0) {
*g_sub_log_msg = buffer;
}

if (!g_pub_log_msg->empty() && !g_sub_log_msg->empty()) {
g_log_msgs_promise->set_value();
}
};
rcutils_logging_set_output_handler(logger_callback);

std::shared_future<void> log_msgs_future = log_msgs_promise.get_future();

rclcpp::QoS qos_profile_publisher(10);
qos_profile_publisher.durability(RMW_QOS_POLICY_DURABILITY_VOLATILE);
auto publisher = node->create_publisher<test_msgs::msg::Empty>(
topic_name, qos_profile_publisher);

rclcpp::QoS qos_profile_subscription(10);
qos_profile_subscription.durability(RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL);
auto subscription = node->create_subscription<test_msgs::msg::Empty>(
topic_name, qos_profile_subscription, message_callback);

rclcpp::executors::SingleThreadedExecutor ex;
ex.add_node(node->get_node_base_interface());

ex.spin_until_future_complete(log_msgs_future, std::chrono::seconds(10));

if (!is_fastrtps) {
EXPECT_EQ(
"New subscription discovered on this topic, requesting incompatible QoS. "
"No messages will be sent to it. Last incompatible policy: DURABILITY_QOS_POLICY",
pub_log_msg);
EXPECT_EQ(
"New publisher discovered on this topic, offering incompatible QoS. "
"No messages will be sent to it. Last incompatible policy: DURABILITY_QOS_POLICY",
sub_log_msg);
}

rcutils_logging_set_output_handler(original_output_handler);
}

0 comments on commit 44fa4fe

Please sign in to comment.