Skip to content

Commit

Permalink
Allow GenericPublisher / GenericSubscription to take a QoS profile (r…
Browse files Browse the repository at this point in the history
…e-do #337) (#340)

* Add QoS to the GenericSubscription / GenericPublisher APIs

Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>
  • Loading branch information
emersonknapp committed Mar 31, 2020
1 parent 28a6aa8 commit d15dd97
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 25 deletions.
17 changes: 12 additions & 5 deletions rosbag2_transport/CMakeLists.txt
Expand Up @@ -52,6 +52,7 @@ ament_target_dependencies(${PROJECT_NAME}
rosbag2_compression
rosbag2_cpp
shared_queues_vendor
yaml_cpp_vendor
)

include(cmake/configure_python.cmake)
Expand Down Expand Up @@ -126,22 +127,25 @@ if(BUILD_TESTING)
endif()

ament_add_gmock(test_rosbag2_node
test/rosbag2_transport/test_rosbag2_node.cpp
src/rosbag2_transport/generic_publisher.cpp
src/rosbag2_transport/generic_subscription.cpp
src/rosbag2_transport/qos.cpp
src/rosbag2_transport/rosbag2_node.cpp
test/rosbag2_transport/test_rosbag2_node.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
if(TARGET test_rosbag2_node)
target_include_directories(test_rosbag2_node
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rosbag2_transport>
$<INSTALL_INTERFACE:include>)
ament_target_dependencies(test_rosbag2_node
ament_index_cpp
rosbag2_cpp
rclcpp
rosbag2_cpp
rosbag2_test_common
test_msgs
rosbag2_test_common)
yaml_cpp_vendor)
endif()

ament_add_gmock(test_formatter
Expand Down Expand Up @@ -179,15 +183,18 @@ if(BUILD_TESTING)
endif()

ament_add_gmock(test_qos
src/rosbag2_transport/qos.cpp
test/rosbag2_transport/test_qos.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}
${SKIP_TEST})
if(TARGET test_qos)
target_link_libraries(test_qos rosbag2_transport)
target_include_directories(test_qos
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rosbag2_transport>)
ament_target_dependencies(test_qos rosbag2_test_common)
ament_target_dependencies(test_qos
rosbag2_test_common
yaml_cpp_vendor)
endif()

endif()
Expand Down
17 changes: 14 additions & 3 deletions rosbag2_transport/src/rosbag2_transport/generic_publisher.cpp
Expand Up @@ -17,14 +17,25 @@
#include <memory>
#include <string>

namespace
{
rcl_publisher_options_t rosbag2_get_publisher_options(const rclcpp::QoS & qos)
{
auto options = rcl_publisher_get_default_options();
options.qos = qos.get_rmw_qos_profile();
return options;
}
} // unnamed namespace

namespace rosbag2_transport
{

GenericPublisher::GenericPublisher(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rosidl_message_type_support_t & type_support)
: rclcpp::PublisherBase(node_base, topic, type_support, rcl_publisher_get_default_options())
const rosidl_message_type_support_t & type_support,
const std::string & topic_name,
const rclcpp::QoS & qos)
: rclcpp::PublisherBase(node_base, topic_name, type_support, rosbag2_get_publisher_options(qos))
{}

void GenericPublisher::publish(std::shared_ptr<rmw_serialized_message_t> message)
Expand Down
5 changes: 3 additions & 2 deletions rosbag2_transport/src/rosbag2_transport/generic_publisher.hpp
Expand Up @@ -28,8 +28,9 @@ class GenericPublisher : public rclcpp::PublisherBase
public:
GenericPublisher(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rosidl_message_type_support_t & type_support);
const rosidl_message_type_support_t & type_support,
const std::string & topic_name,
const rclcpp::QoS & qos);

virtual ~GenericPublisher() = default;

Expand Down
13 changes: 12 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/generic_subscription.cpp
Expand Up @@ -22,19 +22,30 @@

#include "rosbag2_transport/logging.hpp"

namespace
{
rcl_subscription_options_t rosbag2_get_subscription_options(const rclcpp::QoS & qos)
{
auto options = rcl_subscription_get_default_options();
options.qos = qos.get_rmw_qos_profile();
return options;
}
} // unnamed namespace

namespace rosbag2_transport
{

GenericSubscription::GenericSubscription(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const rosidl_message_type_support_t & ts,
const std::string & topic_name,
const rclcpp::QoS & qos,
std::function<void(std::shared_ptr<rmw_serialized_message_t>)> callback)
: SubscriptionBase(
node_base,
ts,
topic_name,
rcl_subscription_get_default_options(),
rosbag2_get_subscription_options(qos),
true),
default_allocator_(rcutils_get_default_allocator()),
callback_(callback)
Expand Down
Expand Up @@ -49,6 +49,7 @@ class GenericSubscription : public rclcpp::SubscriptionBase
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const rosidl_message_type_support_t & ts,
const std::string & topic_name,
const rclcpp::QoS & qos,
std::function<void(std::shared_ptr<rmw_serialized_message_t>)> callback);

// Same as create_serialized_message() as the subscription is to serialized_messages only
Expand Down
4 changes: 3 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/player.cpp
Expand Up @@ -32,6 +32,7 @@

#include "rosbag2_transport/logging.hpp"

#include "qos.hpp"
#include "rosbag2_node.hpp"
#include "replayable_message.hpp"

Expand Down Expand Up @@ -158,7 +159,8 @@ void Player::prepare_publishers()
for (const auto & topic : topics) {
publishers_.insert(
std::make_pair(
topic.name, rosbag2_transport_->create_generic_publisher(topic.name, topic.type)));
topic.name, rosbag2_transport_->create_generic_publisher(
topic.name, topic.type, Rosbag2QoS{})));
}
}

Expand Down
7 changes: 1 addition & 6 deletions rosbag2_transport/src/rosbag2_transport/qos.hpp
Expand Up @@ -30,16 +30,14 @@
# pragma warning(pop)
#endif

#include "rosbag2_transport/visibility_control.hpp"

namespace rosbag2_transport
{
/// Simple wrapper around rclcpp::QoS to provide a default constructor for YAML deserialization.
class Rosbag2QoS : public rclcpp::QoS
{
public:
Rosbag2QoS()
: rclcpp::QoS(0) {} // 0 history depth is always overwritten on deserializing
: rclcpp::QoS(rmw_qos_profile_default.depth) {}
explicit Rosbag2QoS(const rclcpp::QoS & value)
: rclcpp::QoS(value) {}
};
Expand All @@ -57,10 +55,7 @@ struct convert<rmw_time_t>
template<>
struct convert<rosbag2_transport::Rosbag2QoS>
{
ROSBAG2_TRANSPORT_PUBLIC
static Node encode(const rosbag2_transport::Rosbag2QoS & qos);

ROSBAG2_TRANSPORT_PUBLIC
static bool decode(const Node & node, rosbag2_transport::Rosbag2QoS & qos);
};
} // namespace YAML
Expand Down
1 change: 1 addition & 0 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Expand Up @@ -173,6 +173,7 @@ Recorder::create_subscription(
auto subscription = node_->create_generic_subscription(
topic_name,
topic_type,
Rosbag2QoS{},
[this, topic_name](std::shared_ptr<rmw_serialized_message_t> message) {
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
bag_message->serialized_data = message;
Expand Down
8 changes: 5 additions & 3 deletions rosbag2_transport/src/rosbag2_transport/rosbag2_node.cpp
Expand Up @@ -38,26 +38,28 @@ Rosbag2Node::Rosbag2Node(const std::string & node_name)
{}

std::shared_ptr<GenericPublisher> Rosbag2Node::create_generic_publisher(
const std::string & topic, const std::string & type)
const std::string & topic, const std::string & type, const rclcpp::QoS & qos)
{
auto type_support = rosbag2_cpp::get_typesupport(type, "rosidl_typesupport_cpp");
return std::make_shared<GenericPublisher>(get_node_base_interface().get(), topic, *type_support);
return std::make_shared<GenericPublisher>(
get_node_base_interface().get(), *type_support, topic, qos);
}

std::shared_ptr<GenericSubscription> Rosbag2Node::create_generic_subscription(
const std::string & topic,
const std::string & type,
const rclcpp::QoS & qos,
std::function<void(std::shared_ptr<rmw_serialized_message_t>)> callback)
{
auto type_support = rosbag2_cpp::get_typesupport(type, "rosidl_typesupport_cpp");

auto subscription = std::shared_ptr<GenericSubscription>();

try {
subscription = std::make_shared<GenericSubscription>(
get_node_base_interface().get(),
*type_support,
topic,
qos,
callback);

get_node_topics_interface()->add_subscription(subscription, nullptr);
Expand Down
3 changes: 2 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/rosbag2_node.hpp
Expand Up @@ -38,12 +38,13 @@ class Rosbag2Node : public rclcpp::Node

std::shared_ptr<GenericPublisher>
create_generic_publisher(
const std::string & topic, const std::string & type);
const std::string & topic, const std::string & type, const rclcpp::QoS & qos);

std::shared_ptr<GenericSubscription>
create_generic_subscription(
const std::string & topic,
const std::string & type,
const rclcpp::QoS & qos,
std::function<void(std::shared_ptr<rmw_serialized_message_t>)> callback);

std::unordered_map<std::string, std::string>
Expand Down
62 changes: 59 additions & 3 deletions rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp
Expand Up @@ -26,7 +26,8 @@
#include "test_msgs/message_fixtures.hpp"
#include "test_msgs/msg/basic_types.hpp"

#include "../../src/rosbag2_transport/rosbag2_node.hpp"
#include "qos.hpp"
#include "rosbag2_node.hpp"

using namespace ::testing; // NOLINT
using namespace rosbag2_test_common; // NOLINT
Expand Down Expand Up @@ -64,7 +65,7 @@ class RosBag2NodeFixture : public Test
std::vector<std::string> messages;
size_t counter = 0;
auto subscription = node_->create_generic_subscription(
topic_name, type,
topic_name, type, rosbag2_transport::Rosbag2QoS{},
[this, &counter, &messages](std::shared_ptr<rmw_serialized_message_t> message) {
auto string_message =
memory_management_.deserialize_message<test_msgs::msg::Strings>(message);
Expand All @@ -91,6 +92,20 @@ class RosBag2NodeFixture : public Test
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

template<typename Condition, typename Duration>
bool wait_for(const Condition & condition, const Duration & timeout)
{
using clock = std::chrono::system_clock;
auto start = clock::now();
while (!condition()) {
if ((clock::now() - start) > timeout) {
return false;
}
rclcpp::spin_some(node_);
}
return true;
}

MemoryManagement memory_management_;
std::shared_ptr<rosbag2_transport::Rosbag2Node> node_;
rclcpp::Node::SharedPtr publisher_node_;
Expand All @@ -105,7 +120,8 @@ TEST_F(RosBag2NodeFixture, publisher_and_subscriber_work)
std::string topic_name = "string_topic";
std::string type = "test_msgs/Strings";

auto publisher = node_->create_generic_publisher(topic_name, type);
auto publisher = node_->create_generic_publisher(
topic_name, type, rosbag2_transport::Rosbag2QoS{});

auto subscriber_future_ = std::async(
std::launch::async, [this, topic_name, type] {
Expand All @@ -125,6 +141,46 @@ TEST_F(RosBag2NodeFixture, publisher_and_subscriber_work)
EXPECT_THAT(subscribed_messages[0], StrEq("Hello World"));
}

TEST_F(RosBag2NodeFixture, generic_subscription_uses_qos)
{
// If the GenericSubscription does not use the provided QoS profile,
// its request will be incompatible with the Publisher's offer and no messages will be passed.
using namespace std::chrono_literals;
std::string topic_name = "string_topic";
std::string topic_type = "test_msgs/Strings";
rclcpp::QoS qos = rclcpp::SensorDataQoS();

auto publisher = node_->create_publisher<test_msgs::msg::Strings>(topic_name, qos);
auto subscription = node_->create_generic_subscription(
topic_name, topic_type, qos,
[](std::shared_ptr<rmw_serialized_message_t>/* message */) {});
auto connected = [publisher, subscription]() -> bool {
return publisher->get_subscription_count() && subscription->get_publisher_count();
};
// It normally takes < 20ms, 5s chosen as "a very long time"
ASSERT_TRUE(wait_for(connected, 5s));
}

TEST_F(RosBag2NodeFixture, generic_publisher_uses_qos)
{
// If the GenericPublisher does not use the provided QoS profile,
// its offer will be incompatible with the Subscription's request and no messages will be passed.
using namespace std::chrono_literals;
std::string topic_name = "string_topic";
std::string topic_type = "test_msgs/Strings";
rclcpp::QoS qos = rosbag2_transport::Rosbag2QoS().transient_local();

auto publisher = node_->create_generic_publisher(topic_name, topic_type, qos);
auto subscription = node_->create_subscription<test_msgs::msg::Strings>(
topic_name, qos,
[](std::shared_ptr<test_msgs::msg::Strings>/* message */) {});
auto connected = [publisher, subscription]() -> bool {
return publisher->get_subscription_count() && subscription->get_publisher_count();
};
// It normally takes < 20ms, 5s chosen as "a very long time"
ASSERT_TRUE(wait_for(connected, 5s));
}

TEST_F(RosBag2NodeFixture, get_topics_with_types_returns_empty_if_topic_does_not_exist) {
create_publisher("string_topic");

Expand Down

0 comments on commit d15dd97

Please sign in to comment.