Skip to content

Commit

Permalink
ensure removal of guard conditions of expired nodes from memory strat…
Browse files Browse the repository at this point in the history
…egy (#741)

* change memory strategy API from vector of nodes to list of nodes

Signed-off-by: Dirk Thomas <dirk-thomas@users.noreply.github.com>

* store guard_condition of node in executor and ensure that it is removed from the memory strategy

Signed-off-by: Dirk Thomas <dirk-thomas@users.noreply.github.com>

* add unit test

Signed-off-by: Dirk Thomas <dirk-thomas@users.noreply.github.com>
  • Loading branch information
dirk-thomas committed May 24, 2019
1 parent 131a11b commit 8553fbe
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 54 deletions.
3 changes: 2 additions & 1 deletion rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ class Executor
private:
RCLCPP_DISABLE_COPY(Executor)

std::vector<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;
std::list<const rcl_guard_condition_t *> guard_conditions_;
};

} // namespace executor
Expand Down
30 changes: 15 additions & 15 deletions rclcpp/include/rclcpp/memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
#ifndef RCLCPP__MEMORY_STRATEGY_HPP_
#define RCLCPP__MEMORY_STRATEGY_HPP_

#include <list>
#include <memory>
#include <vector>

#include "rcl/allocator.h"
#include "rcl/wait.h"
Expand All @@ -42,11 +42,11 @@ class RCLCPP_PUBLIC MemoryStrategy
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(MemoryStrategy)
using WeakNodeVector = std::vector<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>;
using WeakNodeList = std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>;

virtual ~MemoryStrategy() = default;

virtual bool collect_entities(const WeakNodeVector & weak_nodes) = 0;
virtual bool collect_entities(const WeakNodeList & weak_nodes) = 0;

virtual size_t number_of_ready_subscriptions() const = 0;
virtual size_t number_of_ready_services() const = 0;
Expand All @@ -67,65 +67,65 @@ class RCLCPP_PUBLIC MemoryStrategy
virtual void
get_next_subscription(
rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0;
const WeakNodeList & weak_nodes) = 0;

virtual void
get_next_service(
rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0;
const WeakNodeList & weak_nodes) = 0;

virtual void
get_next_client(
rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0;
const WeakNodeList & weak_nodes) = 0;

virtual void
get_next_waitable(
rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0;
const WeakNodeList & weak_nodes) = 0;

virtual rcl_allocator_t
get_allocator() = 0;

static rclcpp::SubscriptionBase::SharedPtr
get_subscription_by_handle(
std::shared_ptr<const rcl_subscription_t> subscriber_handle,
const WeakNodeVector & weak_nodes);
const WeakNodeList & weak_nodes);

static rclcpp::ServiceBase::SharedPtr
get_service_by_handle(
std::shared_ptr<const rcl_service_t> service_handle,
const WeakNodeVector & weak_nodes);
const WeakNodeList & weak_nodes);

static rclcpp::ClientBase::SharedPtr
get_client_by_handle(
std::shared_ptr<const rcl_client_t> client_handle,
const WeakNodeVector & weak_nodes);
const WeakNodeList & weak_nodes);

static rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
get_node_by_group(
rclcpp::callback_group::CallbackGroup::SharedPtr group,
const WeakNodeVector & weak_nodes);
const WeakNodeList & weak_nodes);

static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_subscription(
rclcpp::SubscriptionBase::SharedPtr subscription,
const WeakNodeVector & weak_nodes);
const WeakNodeList & weak_nodes);

static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_service(
rclcpp::ServiceBase::SharedPtr service,
const WeakNodeVector & weak_nodes);
const WeakNodeList & weak_nodes);

static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_client(
rclcpp::ClientBase::SharedPtr client,
const WeakNodeVector & weak_nodes);
const WeakNodeList & weak_nodes);

static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_waitable(
rclcpp::Waitable::SharedPtr waitable,
const WeakNodeVector & weak_nodes);
const WeakNodeList & weak_nodes);
};

} // namespace memory_strategy
Expand Down
10 changes: 5 additions & 5 deletions rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
);
}

bool collect_entities(const WeakNodeVector & weak_nodes)
bool collect_entities(const WeakNodeList & weak_nodes)
{
bool has_invalid_weak_nodes = false;
for (auto & weak_node : weak_nodes) {
Expand Down Expand Up @@ -265,7 +265,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
virtual void
get_next_subscription(
executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
auto it = subscription_handles_.begin();
while (it != subscription_handles_.end()) {
Expand Down Expand Up @@ -309,7 +309,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
virtual void
get_next_service(
executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
auto it = service_handles_.begin();
while (it != service_handles_.end()) {
Expand Down Expand Up @@ -342,7 +342,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}

virtual void
get_next_client(executor::AnyExecutable & any_exec, const WeakNodeVector & weak_nodes)
get_next_client(executor::AnyExecutable & any_exec, const WeakNodeList & weak_nodes)
{
auto it = client_handles_.begin();
while (it != client_handles_.end()) {
Expand Down Expand Up @@ -375,7 +375,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}

virtual void
get_next_waitable(executor::AnyExecutable & any_exec, const WeakNodeVector & weak_nodes)
get_next_waitable(executor::AnyExecutable & any_exec, const WeakNodeList & weak_nodes)
{
auto it = waitable_handles_.begin();
while (it != waitable_handles_.end()) {
Expand Down
50 changes: 31 additions & 19 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ Executor::~Executor()
}
}
weak_nodes_.clear();
for (auto & guard_condition : guard_conditions_) {
memory_strategy_->remove_guard_condition(guard_condition);
}
guard_conditions_.clear();

// Finalize the wait set.
if (rcl_wait_set_fini(&wait_set_) != RCL_RET_OK) {
Expand Down Expand Up @@ -128,6 +132,7 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
}
}
weak_nodes_.push_back(node_ptr);
guard_conditions_.push_back(node_ptr->get_notify_guard_condition());
if (notify) {
// Interrupt waiting to handle new node
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
Expand All @@ -148,17 +153,21 @@ void
Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
{
bool node_removed = false;
weak_nodes_.erase(
std::remove_if(
weak_nodes_.begin(), weak_nodes_.end(),
[&](rclcpp::node_interfaces::NodeBaseInterface::WeakPtr & i)
{
bool matched = (i.lock() == node_ptr);
node_removed |= matched;
return matched;
{
auto node_it = weak_nodes_.begin();
auto gc_it = guard_conditions_.begin();
while (node_it != weak_nodes_.end()) {
bool matched = (node_it->lock() == node_ptr);
if (matched) {
node_it = weak_nodes_.erase(node_it);
gc_it = guard_conditions_.erase(gc_it);
node_removed = true;
} else {
++node_it;
++gc_it;
}
)
);
}
}
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
has_executor.store(false);
if (notify) {
Expand Down Expand Up @@ -420,15 +429,18 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)

// Clean up any invalid nodes, if they were detected
if (has_invalid_weak_nodes) {
weak_nodes_.erase(
remove_if(
weak_nodes_.begin(), weak_nodes_.end(),
[](rclcpp::node_interfaces::NodeBaseInterface::WeakPtr i)
{
return i.expired();
}
)
);
auto node_it = weak_nodes_.begin();
auto gc_it = guard_conditions_.begin();
while (node_it != weak_nodes_.end()) {
if (node_it->expired()) {
node_it = weak_nodes_.erase(node_it);
memory_strategy_->remove_guard_condition(*gc_it);
gc_it = guard_conditions_.erase(gc_it);
} else {
++node_it;
++gc_it;
}
}
}
// clear wait set
if (rcl_wait_set_clear(&wait_set_) != RCL_RET_OK) {
Expand Down
16 changes: 8 additions & 8 deletions rclcpp/src/rclcpp/memory_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ using rclcpp::memory_strategy::MemoryStrategy;
rclcpp::SubscriptionBase::SharedPtr
MemoryStrategy::get_subscription_by_handle(
std::shared_ptr<const rcl_subscription_t> subscriber_handle,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
Expand Down Expand Up @@ -51,7 +51,7 @@ MemoryStrategy::get_subscription_by_handle(
rclcpp::ServiceBase::SharedPtr
MemoryStrategy::get_service_by_handle(
std::shared_ptr<const rcl_service_t> service_handle,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
Expand All @@ -77,7 +77,7 @@ MemoryStrategy::get_service_by_handle(
rclcpp::ClientBase::SharedPtr
MemoryStrategy::get_client_by_handle(
std::shared_ptr<const rcl_client_t> client_handle,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
Expand All @@ -103,7 +103,7 @@ MemoryStrategy::get_client_by_handle(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
MemoryStrategy::get_node_by_group(
rclcpp::callback_group::CallbackGroup::SharedPtr group,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
if (!group) {
return nullptr;
Expand All @@ -126,7 +126,7 @@ MemoryStrategy::get_node_by_group(
rclcpp::callback_group::CallbackGroup::SharedPtr
MemoryStrategy::get_group_by_subscription(
rclcpp::SubscriptionBase::SharedPtr subscription,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
Expand All @@ -152,7 +152,7 @@ MemoryStrategy::get_group_by_subscription(
rclcpp::callback_group::CallbackGroup::SharedPtr
MemoryStrategy::get_group_by_service(
rclcpp::ServiceBase::SharedPtr service,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
Expand All @@ -178,7 +178,7 @@ MemoryStrategy::get_group_by_service(
rclcpp::callback_group::CallbackGroup::SharedPtr
MemoryStrategy::get_group_by_client(
rclcpp::ClientBase::SharedPtr client,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
Expand All @@ -204,7 +204,7 @@ MemoryStrategy::get_group_by_client(
rclcpp::callback_group::CallbackGroup::SharedPtr
MemoryStrategy::get_group_by_waitable(
rclcpp::Waitable::SharedPtr waitable,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
Expand Down
7 changes: 7 additions & 0 deletions rclcpp/test/test_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,10 @@ TEST_F(TestExecutors, detachOnDestruction) {
EXPECT_NO_THROW(executor.add_node(node));
}
}

// Make sure that the executor can automatically remove expired nodes correctly
TEST_F(TestExecutors, addTemporaryNode) {
rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(std::make_shared<rclcpp::Node>("temporary_node"));
EXPECT_NO_THROW(executor.spin_some());
}
12 changes: 6 additions & 6 deletions rclcpp/test/test_find_weak_nodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ TEST_F(TestFindWeakNodes, allocator_strategy_with_weak_nodes) {
rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>();
auto existing_node = rclcpp::Node::make_shared("existing_node");
auto dead_node = rclcpp::Node::make_shared("dead_node");
rclcpp::memory_strategy::MemoryStrategy::WeakNodeVector weak_nodes;
rclcpp::memory_strategy::MemoryStrategy::WeakNodeList weak_nodes;
weak_nodes.push_back(existing_node->get_node_base_interface());
weak_nodes.push_back(dead_node->get_node_base_interface());

// AND
// Delete dead_node, creating a dangling pointer in weak_nodes
dead_node.reset();
ASSERT_FALSE(weak_nodes[0].expired());
ASSERT_TRUE(weak_nodes[1].expired());
ASSERT_FALSE(weak_nodes.front().expired());
ASSERT_TRUE(weak_nodes.back().expired());

// WHEN
bool has_invalid_weak_nodes = memory_strategy->collect_entities(weak_nodes);
Expand All @@ -64,11 +64,11 @@ TEST_F(TestFindWeakNodes, allocator_strategy_no_weak_nodes) {
rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>();
auto existing_node1 = rclcpp::Node::make_shared("existing_node1");
auto existing_node2 = rclcpp::Node::make_shared("existing_node2");
rclcpp::memory_strategy::MemoryStrategy::WeakNodeVector weak_nodes;
rclcpp::memory_strategy::MemoryStrategy::WeakNodeList weak_nodes;
weak_nodes.push_back(existing_node1->get_node_base_interface());
weak_nodes.push_back(existing_node2->get_node_base_interface());
ASSERT_FALSE(weak_nodes[0].expired());
ASSERT_FALSE(weak_nodes[1].expired());
ASSERT_FALSE(weak_nodes.front().expired());
ASSERT_FALSE(weak_nodes.back().expired());

// WHEN
bool has_invalid_weak_nodes = memory_strategy->collect_entities(weak_nodes);
Expand Down

0 comments on commit 8553fbe

Please sign in to comment.