Skip to content

Commit

Permalink
Addressed PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanpauno committed Feb 13, 2019
1 parent 5b8bae0 commit 3d92375
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 37 deletions.
8 changes: 4 additions & 4 deletions rclcpp/include/rclcpp/intra_process_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class IntraProcessManagerImplBase
matches_any_publishers(const rmw_gid_t * id) const = 0;

virtual size_t
get_subscription_count(uint64_t intra_process_publisher_id_) const = 0;
get_subscription_count(uint64_t intra_process_publisher_id) const = 0;

private:
RCLCPP_DISABLE_COPY(IntraProcessManagerImplBase)
Expand Down Expand Up @@ -252,9 +252,9 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase
}

size_t
get_subscription_count(uint64_t id) const
get_subscription_count(uint64_t intra_process_publisher_id) const
{
auto publisher_it = publishers_.find(id);
auto publisher_it = publishers_.find(intra_process_publisher_id);
if (publisher_it == publishers_.end()) {
// Publisher is either invalid or no longer exists.
return 0;
Expand All @@ -265,7 +265,7 @@ class IntraProcessManagerImpl : public IntraProcessManagerImplBase
}
auto sub_map_it = subscription_ids_by_topic_.find(publisher->get_topic_name());
if (sub_map_it == subscription_ids_by_topic_.end()) {
// No intraprocess subscriberes
// No intraprocess subscribers
return 0;
}
return sub_map_it->second.size();
Expand Down
14 changes: 6 additions & 8 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,16 @@ class Publisher : public PublisherBase
virtual void
publish(std::unique_ptr<MessageT, MessageDeleter> & msg)
{
size_t inter_process_subscritpion_count;
size_t intra_process_subscritpion_count = 0;

inter_process_subscritpion_count = this->get_subscription_count();
size_t inter_process_subscription_count = this->get_subscription_count();
size_t intra_process_subscription_count = 0;

if (get_intra_process_subscription_count_) {
intra_process_subscritpion_count = \
intra_process_subscription_count =
get_intra_process_subscription_count_(intra_process_publisher_id_);
}

if (!get_intra_process_subscription_count_ ||
inter_process_subscritpion_count > intra_process_subscritpion_count)
inter_process_subscription_count > intra_process_subscription_count)
{
this->do_inter_process_publish(msg.get());
}
Expand All @@ -230,11 +228,11 @@ class Publisher : public PublisherBase
ipm.message_sequence = message_seq;
auto status = rcl_publish(&intra_process_publisher_handle_, &ipm);
if (RCL_RET_PUBLISHER_INVALID == status) {
rcl_reset_error(); /* next call will reset error message if not context */
rcl_reset_error(); // next call will reset error message if not context
if (rcl_publisher_is_valid_except_context(&intra_process_publisher_handle_)) {
rcl_context_t * context = rcl_publisher_get_context(&intra_process_publisher_handle_);
if (nullptr != context && !rcl_context_is_valid(context)) {
/* publisher is invalid due to context being shutdown */
// publisher is invalid due to context being shutdown
return;
}
}
Expand Down
33 changes: 33 additions & 0 deletions rclcpp/include/rclcpp/publisher_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ struct PublisherFactory
rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm)>;

SharedPublishCallbackFactoryFunction create_shared_publish_callback;

using GetIntraProcessSubscriberCountCallbackT = std::function<
rclcpp::PublisherBase::GetIntraProcessSubscriberCountCallbackT(
rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm)>;

GetIntraProcessSubscriberCountCallbackT create_intra_process_subscription_count_callback;
};

/// Return a PublisherFactory with functions setup for creating a PublisherT<MessageT, Alloc>.
Expand Down Expand Up @@ -139,6 +145,33 @@ create_publisher_factory(std::shared_ptr<Alloc> allocator)
return shared_publish_callback;
};

// function to create a intraprocess subscriber count callback std::function
using GetIntraProcessSubscriberCountCallbackT =
rclcpp::PublisherBase::GetIntraProcessSubscriberCountCallbackT;
factory.create_intra_process_subscription_count_callback =
[](rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm)
-> GetIntraProcessSubscriberCountCallbackT
{
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = ipm;

// this function is called on each call to publish(), when checking if
// interprocess publishing is needed or not.
auto shared_publish_callback =
[weak_ipm](uint64_t publisher_id) -> uint64_t
{
auto ipm = weak_ipm.lock();
if (!ipm) {
// TODO(ivanpauno): same as wjwwood comment above in create_shared_publish_callback
throw std::runtime_error(
"intra process subscriber count called after "
"destruction of intra process manager");
}
return ipm->get_subscription_count(publisher_id);
};

return shared_publish_callback;
};

// return the factory now that it is populated
return factory;
}
Expand Down
5 changes: 1 addition & 4 deletions rclcpp/src/rclcpp/node_interfaces/node_topics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ NodeTopics::create_publisher(
publisher_factory.add_publisher_to_intra_process_manager(ipm.get(), publisher);
// Create a function to be called when publisher to do the intra process publish.
auto shared_publish_callback = publisher_factory.create_shared_publish_callback(ipm);
auto count_callback = std::bind(
&rclcpp::intra_process_manager::IntraProcessManager::get_subscription_count,
ipm.get(),
std::placeholders::_1);
auto count_callback = publisher_factory.create_intra_process_subscription_count_callback(ipm);
publisher->setup_intra_process(
intra_process_publisher_id,
shared_publish_callback,
Expand Down
6 changes: 3 additions & 3 deletions rclcpp/src/rclcpp/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ PublisherBase::get_publisher_handle() const
size_t
PublisherBase::get_subscription_count() const
{
size_t inter_process_subscritpion_count = 0;
size_t inter_process_subscription_count = 0;

auto status = rcl_publisher_get_subscription_count(
&publisher_handle_,
&inter_process_subscritpion_count);
&inter_process_subscription_count);

if (RCL_RET_PUBLISHER_INVALID == status) {
rcl_reset_error(); /* next call will reset error message if not context */
Expand All @@ -161,7 +161,7 @@ PublisherBase::get_subscription_count() const
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to get get subscription count");
}
return inter_process_subscritpion_count;
return inter_process_subscription_count;
}

bool
Expand Down
100 changes: 82 additions & 18 deletions rclcpp/test/test_publisher_subscriber_count_api.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017 Open Source Robotics Foundation, Inc.
// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,7 +35,9 @@ class MyPublisher : public rclcpp::PublisherBase
}
};

class TestPublisher : public ::testing::Test
using rcl_interfaces::msg::IntraProcessMessage;

class TestPublisherSubscriberCount : public ::testing::Test
{
protected:
static void SetUpTestCase()
Expand All @@ -49,41 +51,103 @@ class TestPublisher : public ::testing::Test
"my_node",
"/ns",
rclcpp::NodeOptions().use_intra_process_comms(true));
publisher = node->create_publisher<IntraProcessMessage>("/topic");
}

void TearDown()
{
node.reset();
}

void test_common(rclcpp::NodeOptions options, const uint64_t intraprocess_count_results[2]);

rclcpp::Node::SharedPtr node;
std::shared_ptr<rclcpp::PublisherBase> publisher;
static std::chrono::milliseconds offset;
};

std::chrono::milliseconds TestPublisherSubscriberCount::offset = std::chrono::milliseconds(500);

void OnMessage(const rcl_interfaces::msg::IntraProcessMessage::SharedPtr msg)
{
(void)msg;
}

/*
Testing publisher subscriber count api, and internal process subscribers.
*/
TEST_F(TestPublisher, construction_and_destruction) {
using rcl_interfaces::msg::IntraProcessMessage;

auto publisher = node->create_publisher<IntraProcessMessage>("/topic");
void TestPublisherSubscriberCount::test_common(
rclcpp::NodeOptions options,
const uint64_t intraprocess_count_results[2])
{
EXPECT_EQ(publisher->get_subscription_count(), 0u);
EXPECT_EQ(
reinterpret_cast<MyPublisher *>(publisher.get())->get_intraprocess_subscription_count(), 0u);
{
auto sub = node->create_subscription<IntraProcessMessage>("/topic", &OnMessage);
rclcpp::sleep_for(offset);
EXPECT_EQ(publisher->get_subscription_count(), 1u);
EXPECT_EQ(
reinterpret_cast<MyPublisher *>(publisher.get())->get_intraprocess_subscription_count(),
intraprocess_count_results[0]);
{
rclcpp::Node::SharedPtr another_node = std::make_shared<rclcpp::Node>(
"another_node",
"/ns",
options);
auto another_sub =
another_node->create_subscription<IntraProcessMessage>("/topic", &OnMessage);

auto callback = &OnMessage;
auto sub = node->create_subscription<IntraProcessMessage>("/topic", callback);
EXPECT_EQ(publisher->get_subscription_count(), 1u);
rclcpp::sleep_for(offset);
EXPECT_EQ(publisher->get_subscription_count(), 2u);
EXPECT_EQ(
reinterpret_cast<MyPublisher *>(publisher.get())->get_intraprocess_subscription_count(),
intraprocess_count_results[1]);
}
}
/**
* Counts should be zero here, as all are subscriptions are out of scope.
* Subscriptions count checking is always preceeded with an sleep, as random failures had been
* detected without it. */
rclcpp::sleep_for(offset);
EXPECT_EQ(publisher->get_subscription_count(), 0u);
EXPECT_EQ(
reinterpret_cast<MyPublisher *>(publisher.get())->get_intraprocess_subscription_count(), 1u);
reinterpret_cast<MyPublisher *>(publisher.get())->get_intraprocess_subscription_count(),
0u);
}

rclcpp::Node::SharedPtr another_node = std::make_shared<rclcpp::Node>("another_node", "/ns");
auto another_sub = another_node->create_subscription<IntraProcessMessage>("/topic", callback);
EXPECT_EQ(publisher->get_subscription_count(), 2u);
EXPECT_EQ(
reinterpret_cast<MyPublisher *>(publisher.get())->get_intraprocess_subscription_count(), 1u);
/*
Testing publisher subscriber count api and internal process subscribers.
Two subscribers in same using intra_process comm.
*/
TEST_F(TestPublisherSubscriberCount, test_two_intra_comm) {
const uint64_t results[2] = {1u, 2u};
test_common(rclcpp::NodeOptions().use_intra_process_comms(true), results);
}

/*
Testing publisher subscriber count api and internal process subscribers.
Two subscribers, one using intra_process comm and the other not using it.
*/
TEST_F(TestPublisherSubscriberCount, test_one_intra_comm_one_inter_comm) {
const uint64_t results[2] = {1u, 1u};
test_common(rclcpp::NodeOptions().use_intra_process_comms(false), results);
}

/*
Testing publisher subscriber count api and internal process subscribers, with two contexts.
*/
TEST_F(TestPublisherSubscriberCount, test_with_two_contexts_both_using_intra) {
const uint64_t results[2] = {1u, 1u};
auto context = rclcpp::Context::make_shared();
context->init(0, nullptr);
test_common(
rclcpp::NodeOptions().context(context).use_intra_process_comms(true),
results);
}

TEST_F(TestPublisherSubscriberCount, test_with_two_contexts_one_using_intra) {
const uint64_t results[2] = {1u, 1u};
auto context = rclcpp::Context::make_shared();
context->init(0, nullptr);
test_common(
rclcpp::NodeOptions().context(context).use_intra_process_comms(false),
results);
}

0 comments on commit 3d92375

Please sign in to comment.