Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensure removal of guard conditions of expired nodes from memory strategy #741

Merged
merged 3 commits into from
May 24, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ class Executor
private:
RCLCPP_DISABLE_COPY(Executor)

std::vector<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;
std::list<const rcl_guard_condition_t *> guard_conditions_;
};

} // namespace executor
Expand Down
30 changes: 15 additions & 15 deletions rclcpp/include/rclcpp/memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
#ifndef RCLCPP__MEMORY_STRATEGY_HPP_
#define RCLCPP__MEMORY_STRATEGY_HPP_

#include <list>
#include <memory>
#include <vector>

#include "rcl/allocator.h"
#include "rcl/wait.h"
Expand All @@ -42,11 +42,11 @@ class RCLCPP_PUBLIC MemoryStrategy
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(MemoryStrategy)
using WeakNodeVector = std::vector<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>;
using WeakNodeList = std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>;

virtual ~MemoryStrategy() = default;

virtual bool collect_entities(const WeakNodeVector & weak_nodes) = 0;
virtual bool collect_entities(const WeakNodeList & weak_nodes) = 0;

virtual size_t number_of_ready_subscriptions() const = 0;
virtual size_t number_of_ready_services() const = 0;
Expand All @@ -67,65 +67,65 @@ class RCLCPP_PUBLIC MemoryStrategy
virtual void
get_next_subscription(
rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0;
const WeakNodeList & weak_nodes) = 0;

virtual void
get_next_service(
rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0;
const WeakNodeList & weak_nodes) = 0;

virtual void
get_next_client(
rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0;
const WeakNodeList & weak_nodes) = 0;

virtual void
get_next_waitable(
rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0;
const WeakNodeList & weak_nodes) = 0;

virtual rcl_allocator_t
get_allocator() = 0;

static rclcpp::SubscriptionBase::SharedPtr
get_subscription_by_handle(
std::shared_ptr<const rcl_subscription_t> subscriber_handle,
const WeakNodeVector & weak_nodes);
const WeakNodeList & weak_nodes);

static rclcpp::ServiceBase::SharedPtr
get_service_by_handle(
std::shared_ptr<const rcl_service_t> service_handle,
const WeakNodeVector & weak_nodes);
const WeakNodeList & weak_nodes);

static rclcpp::ClientBase::SharedPtr
get_client_by_handle(
std::shared_ptr<const rcl_client_t> client_handle,
const WeakNodeVector & weak_nodes);
const WeakNodeList & weak_nodes);

static rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
get_node_by_group(
rclcpp::callback_group::CallbackGroup::SharedPtr group,
const WeakNodeVector & weak_nodes);
const WeakNodeList & weak_nodes);

static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_subscription(
rclcpp::SubscriptionBase::SharedPtr subscription,
const WeakNodeVector & weak_nodes);
const WeakNodeList & weak_nodes);

static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_service(
rclcpp::ServiceBase::SharedPtr service,
const WeakNodeVector & weak_nodes);
const WeakNodeList & weak_nodes);

static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_client(
rclcpp::ClientBase::SharedPtr client,
const WeakNodeVector & weak_nodes);
const WeakNodeList & weak_nodes);

static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_waitable(
rclcpp::Waitable::SharedPtr waitable,
const WeakNodeVector & weak_nodes);
const WeakNodeList & weak_nodes);
};

} // namespace memory_strategy
Expand Down
10 changes: 5 additions & 5 deletions rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
);
}

bool collect_entities(const WeakNodeVector & weak_nodes)
bool collect_entities(const WeakNodeList & weak_nodes)
{
bool has_invalid_weak_nodes = false;
for (auto & weak_node : weak_nodes) {
Expand Down Expand Up @@ -265,7 +265,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
virtual void
get_next_subscription(
executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
auto it = subscription_handles_.begin();
while (it != subscription_handles_.end()) {
Expand Down Expand Up @@ -309,7 +309,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
virtual void
get_next_service(
executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
auto it = service_handles_.begin();
while (it != service_handles_.end()) {
Expand Down Expand Up @@ -342,7 +342,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}

virtual void
get_next_client(executor::AnyExecutable & any_exec, const WeakNodeVector & weak_nodes)
get_next_client(executor::AnyExecutable & any_exec, const WeakNodeList & weak_nodes)
{
auto it = client_handles_.begin();
while (it != client_handles_.end()) {
Expand Down Expand Up @@ -375,7 +375,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}

virtual void
get_next_waitable(executor::AnyExecutable & any_exec, const WeakNodeVector & weak_nodes)
get_next_waitable(executor::AnyExecutable & any_exec, const WeakNodeList & weak_nodes)
{
auto it = waitable_handles_.begin();
while (it != waitable_handles_.end()) {
Expand Down
50 changes: 31 additions & 19 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ Executor::~Executor()
}
}
weak_nodes_.clear();
for (auto & guard_condition : guard_conditions_) {
memory_strategy_->remove_guard_condition(guard_condition);
}
guard_conditions_.clear();

// Finalize the wait set.
if (rcl_wait_set_fini(&wait_set_) != RCL_RET_OK) {
Expand Down Expand Up @@ -128,6 +132,7 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
}
}
weak_nodes_.push_back(node_ptr);
guard_conditions_.push_back(node_ptr->get_notify_guard_condition());
if (notify) {
// Interrupt waiting to handle new node
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
Expand All @@ -148,17 +153,21 @@ void
Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
{
bool node_removed = false;
weak_nodes_.erase(
std::remove_if(
weak_nodes_.begin(), weak_nodes_.end(),
[&](rclcpp::node_interfaces::NodeBaseInterface::WeakPtr & i)
{
bool matched = (i.lock() == node_ptr);
node_removed |= matched;
return matched;
{
auto node_it = weak_nodes_.begin();
auto gc_it = guard_conditions_.begin();
while (node_it != weak_nodes_.end()) {
bool matched = (node_it->lock() == node_ptr);
if (matched) {
node_it = weak_nodes_.erase(node_it);
gc_it = guard_conditions_.erase(gc_it);
node_removed = true;
} else {
++node_it;
++gc_it;
}
)
);
}
}
std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
has_executor.store(false);
if (notify) {
Expand Down Expand Up @@ -420,15 +429,18 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)

// Clean up any invalid nodes, if they were detected
if (has_invalid_weak_nodes) {
weak_nodes_.erase(
remove_if(
weak_nodes_.begin(), weak_nodes_.end(),
[](rclcpp::node_interfaces::NodeBaseInterface::WeakPtr i)
{
return i.expired();
}
)
);
auto node_it = weak_nodes_.begin();
auto gc_it = guard_conditions_.begin();
while (node_it != weak_nodes_.end()) {
if (node_it->expired()) {
node_it = weak_nodes_.erase(node_it);
memory_strategy_->remove_guard_condition(*gc_it);
gc_it = guard_conditions_.erase(gc_it);
} else {
++node_it;
++gc_it;
}
}
}
// clear wait set
if (rcl_wait_set_clear(&wait_set_) != RCL_RET_OK) {
Expand Down
16 changes: 8 additions & 8 deletions rclcpp/src/rclcpp/memory_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ using rclcpp::memory_strategy::MemoryStrategy;
rclcpp::SubscriptionBase::SharedPtr
MemoryStrategy::get_subscription_by_handle(
std::shared_ptr<const rcl_subscription_t> subscriber_handle,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
Expand Down Expand Up @@ -51,7 +51,7 @@ MemoryStrategy::get_subscription_by_handle(
rclcpp::ServiceBase::SharedPtr
MemoryStrategy::get_service_by_handle(
std::shared_ptr<const rcl_service_t> service_handle,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
Expand All @@ -77,7 +77,7 @@ MemoryStrategy::get_service_by_handle(
rclcpp::ClientBase::SharedPtr
MemoryStrategy::get_client_by_handle(
std::shared_ptr<const rcl_client_t> client_handle,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
Expand All @@ -103,7 +103,7 @@ MemoryStrategy::get_client_by_handle(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
MemoryStrategy::get_node_by_group(
rclcpp::callback_group::CallbackGroup::SharedPtr group,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
if (!group) {
return nullptr;
Expand All @@ -126,7 +126,7 @@ MemoryStrategy::get_node_by_group(
rclcpp::callback_group::CallbackGroup::SharedPtr
MemoryStrategy::get_group_by_subscription(
rclcpp::SubscriptionBase::SharedPtr subscription,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
Expand All @@ -152,7 +152,7 @@ MemoryStrategy::get_group_by_subscription(
rclcpp::callback_group::CallbackGroup::SharedPtr
MemoryStrategy::get_group_by_service(
rclcpp::ServiceBase::SharedPtr service,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
Expand All @@ -178,7 +178,7 @@ MemoryStrategy::get_group_by_service(
rclcpp::callback_group::CallbackGroup::SharedPtr
MemoryStrategy::get_group_by_client(
rclcpp::ClientBase::SharedPtr client,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
Expand All @@ -204,7 +204,7 @@ MemoryStrategy::get_group_by_client(
rclcpp::callback_group::CallbackGroup::SharedPtr
MemoryStrategy::get_group_by_waitable(
rclcpp::Waitable::SharedPtr waitable,
const WeakNodeVector & weak_nodes)
const WeakNodeList & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
Expand Down
12 changes: 6 additions & 6 deletions rclcpp/test/test_find_weak_nodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ TEST_F(TestFindWeakNodes, allocator_strategy_with_weak_nodes) {
rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>();
auto existing_node = rclcpp::Node::make_shared("existing_node");
auto dead_node = rclcpp::Node::make_shared("dead_node");
rclcpp::memory_strategy::MemoryStrategy::WeakNodeVector weak_nodes;
rclcpp::memory_strategy::MemoryStrategy::WeakNodeList weak_nodes;
weak_nodes.push_back(existing_node->get_node_base_interface());
weak_nodes.push_back(dead_node->get_node_base_interface());

// AND
// Delete dead_node, creating a dangling pointer in weak_nodes
dead_node.reset();
ASSERT_FALSE(weak_nodes[0].expired());
ASSERT_TRUE(weak_nodes[1].expired());
ASSERT_FALSE(weak_nodes.front().expired());
ASSERT_TRUE(weak_nodes.back().expired());

// WHEN
bool has_invalid_weak_nodes = memory_strategy->collect_entities(weak_nodes);
Expand All @@ -64,11 +64,11 @@ TEST_F(TestFindWeakNodes, allocator_strategy_no_weak_nodes) {
rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>();
auto existing_node1 = rclcpp::Node::make_shared("existing_node1");
auto existing_node2 = rclcpp::Node::make_shared("existing_node2");
rclcpp::memory_strategy::MemoryStrategy::WeakNodeVector weak_nodes;
rclcpp::memory_strategy::MemoryStrategy::WeakNodeList weak_nodes;
weak_nodes.push_back(existing_node1->get_node_base_interface());
weak_nodes.push_back(existing_node2->get_node_base_interface());
ASSERT_FALSE(weak_nodes[0].expired());
ASSERT_FALSE(weak_nodes[1].expired());
ASSERT_FALSE(weak_nodes.front().expired());
ASSERT_FALSE(weak_nodes.back().expired());

// WHEN
bool has_invalid_weak_nodes = memory_strategy->collect_entities(weak_nodes);
Expand Down