Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move ownership of shutdown_guard_condition to executors/graph_listener #1404

Merged
merged 6 commits into from
Nov 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 0 additions & 66 deletions rclcpp/include/rclcpp/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,67 +248,6 @@ class Context : public std::enable_shared_from_this<Context>
void
interrupt_all_sleep_for();

/// Get a handle to the guard condition which is triggered when interrupted.
/**
* This guard condition is triggered any time interrupt_all_wait_sets() is
* called, which may be called by the user, or shutdown().
* And in turn, shutdown() may be called by the user, the destructor of this
* context, or the signal handler if installed and shutdown_on_sigint is true
* for this context.
*
* The first time that this function is called for a given wait set a new guard
* condition will be created and returned; thereafter the same guard condition
* will be returned for the same wait set.
* This mechanism is designed to ensure that the same guard condition is not
* reused across wait sets (e.g., when using multiple executors in the same
* process).
* This method will throw an exception if initialization of the guard
* condition fails.
*
* The returned guard condition needs to be released with the
* release_interrupt_guard_condition() method in order to reclaim resources.
*
* \param[in] wait_set Pointer to the rcl_wait_set_t that will be using the
* resulting guard condition.
* \return Pointer to the guard condition.
* \throws anything rclcpp::exceptions::throw_from_rcl_error can throw.
*/
RCLCPP_PUBLIC
rcl_guard_condition_t *
get_interrupt_guard_condition(rcl_wait_set_t * wait_set);

/// Release the previously allocated guard condition which is triggered when interrupted.
/**
* If you previously called get_interrupt_guard_condition() for a given wait
* set to get a interrupt guard condition, then you should call
* release_interrupt_guard_condition() when you're done, to free that
* condition.
* Will throw an exception if get_interrupt_guard_condition() wasn't
* previously called for the given wait set.
*
* After calling this, the pointer returned by get_interrupt_guard_condition()
* for the given wait_set is invalid.
*
* \param[in] wait_set Pointer to the rcl_wait_set_t that was using the
* resulting guard condition.
* \throws anything rclcpp::exceptions::throw_from_rcl_error can throw.
* \throws std::runtime_error if a nonexistent wait set is trying to release sigint guard condition.
*/
RCLCPP_PUBLIC
void
release_interrupt_guard_condition(rcl_wait_set_t * wait_set);

/// Nothrow version of release_interrupt_guard_condition(), logs to RCLCPP_ERROR instead.
RCLCPP_PUBLIC
void
release_interrupt_guard_condition(rcl_wait_set_t * wait_set, const std::nothrow_t &) noexcept;

/// Interrupt any blocking executors, or wait sets associated with this context.
RCLCPP_PUBLIC
virtual
void
interrupt_all_wait_sets();

/// Return a singleton instance for the SubContext type, constructing one if necessary.
template<typename SubContext, typename ... Args>
std::shared_ptr<SubContext>
Expand Down Expand Up @@ -368,11 +307,6 @@ class Context : public std::enable_shared_from_this<Context>
/// Mutex for protecting the global condition variable.
std::mutex interrupt_mutex_;

/// Mutex to protect sigint_guard_cond_handles_.
std::mutex interrupt_guard_cond_handles_mutex_;
/// Guard conditions for interrupting of associated wait sets on interrupt_all_wait_sets().
std::unordered_map<rcl_wait_set_t *, rcl_guard_condition_t> interrupt_guard_cond_handles_;

/// Keep shared ownership of global vector of weak contexts
std::shared_ptr<WeakContextsWrapper> weak_contexts_;
};
Expand Down
3 changes: 3 additions & 0 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "rcl/wait.h"

#include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/executor_options.hpp"
#include "rclcpp/future_return_code.hpp"
#include "rclcpp/memory_strategies.hpp"
Expand Down Expand Up @@ -525,6 +526,8 @@ class Executor
/// Guard condition for signaling the rmw layer to wake up for special events.
rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition();

std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;

/// Wait set for managing entities that the rmw layer waits on.
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();

Expand Down
14 changes: 8 additions & 6 deletions rclcpp/include/rclcpp/graph_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class GraphListener : public std::enable_shared_from_this<GraphListener>
{
public:
RCLCPP_PUBLIC
explicit GraphListener(std::shared_ptr<rclcpp::Context> parent_context);
explicit GraphListener(const rclcpp::Context::SharedPtr & parent_context);

RCLCPP_PUBLIC
virtual ~GraphListener();
Expand Down Expand Up @@ -161,19 +161,22 @@ class GraphListener : public std::enable_shared_from_this<GraphListener>
run_loop();

RCLCPP_PUBLIC
void init_wait_set();
void
init_wait_set();

RCLCPP_PUBLIC
void cleanup_wait_set();
void
cleanup_wait_set();

private:
RCLCPP_DISABLE_COPY(GraphListener)

/** \internal */
void
__shutdown(bool);
__shutdown();

rclcpp::Context::WeakPtr parent_context_;
std::weak_ptr<rclcpp::Context> weak_parent_context_;
std::shared_ptr<rcl_context_t> rcl_parent_context_;

std::thread listener_thread_;
bool is_started_;
Expand All @@ -185,7 +188,6 @@ class GraphListener : public std::enable_shared_from_this<GraphListener>
std::vector<rclcpp::node_interfaces::NodeGraphInterface *> node_graph_interfaces_;

rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition();
rcl_guard_condition_t * shutdown_guard_condition_;
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();
};

Expand Down
70 changes: 0 additions & 70 deletions rclcpp/src/rclcpp/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ Context::shutdown(const std::string & reason)
}
// interrupt all blocking sleep_for() and all blocking executors or wait sets
this->interrupt_all_sleep_for();
this->interrupt_all_wait_sets();
// remove self from the global contexts
weak_contexts_->remove_context(this);
// shutdown logger
Expand Down Expand Up @@ -391,75 +390,6 @@ Context::interrupt_all_sleep_for()
interrupt_condition_variable_.notify_all();
}

rcl_guard_condition_t *
Context::get_interrupt_guard_condition(rcl_wait_set_t * wait_set)
{
std::lock_guard<std::mutex> lock(interrupt_guard_cond_handles_mutex_);
auto kv = interrupt_guard_cond_handles_.find(wait_set);
if (kv != interrupt_guard_cond_handles_.end()) {
return &kv->second;
} else {
rcl_guard_condition_t handle = rcl_get_zero_initialized_guard_condition();
rcl_guard_condition_options_t options = rcl_guard_condition_get_default_options();
auto ret = rcl_guard_condition_init(&handle, this->get_rcl_context().get(), options);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't initialize guard condition");
}
interrupt_guard_cond_handles_.emplace(wait_set, handle);
return &interrupt_guard_cond_handles_[wait_set];
}
}

void
Context::release_interrupt_guard_condition(rcl_wait_set_t * wait_set)
{
std::lock_guard<std::mutex> lock(interrupt_guard_cond_handles_mutex_);
auto kv = interrupt_guard_cond_handles_.find(wait_set);
if (kv != interrupt_guard_cond_handles_.end()) {
rcl_ret_t ret = rcl_guard_condition_fini(&kv->second);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to destroy sigint guard condition");
}
interrupt_guard_cond_handles_.erase(kv);
} else {
throw std::runtime_error("Tried to release sigint guard condition for nonexistent wait set");
}
}

void
Context::release_interrupt_guard_condition(
rcl_wait_set_t * wait_set,
const std::nothrow_t &) noexcept
{
try {
this->release_interrupt_guard_condition(wait_set);
} catch (const std::exception & exc) {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"caught %s exception when releasing interrupt guard condition: %s",
rmw::impl::cpp::demangle(exc).c_str(), exc.what());
} catch (...) {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"caught unknown exception when releasing interrupt guard condition");
}
}

void
Context::interrupt_all_wait_sets()
{
std::lock_guard<std::mutex> lock(interrupt_guard_cond_handles_mutex_);
for (auto & kv : interrupt_guard_cond_handles_) {
rcl_ret_t status = rcl_trigger_guard_condition(&(kv.second));
if (status != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"failed to trigger guard condition in Context::interrupt_all_wait_sets(): %s",
rcl_get_error_string().str);
}
}
}

void
Context::clean_up()
{
Expand Down
25 changes: 16 additions & 9 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "rclcpp/exceptions.hpp"
#include "rclcpp/executor.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/scope_exit.hpp"
Expand All @@ -42,28 +43,35 @@ using rclcpp::FutureReturnCode;

Executor::Executor(const rclcpp::ExecutorOptions & options)
: spinning(false),
shutdown_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
memory_strategy_(options.memory_strategy)
{
// Store the context for later use.
context_ = options.context;

rcl_guard_condition_options_t guard_condition_options = rcl_guard_condition_get_default_options();
rcl_ret_t ret = rcl_guard_condition_init(
&interrupt_guard_condition_, options.context->get_rcl_context().get(), guard_condition_options);
&interrupt_guard_condition_, context_->get_rcl_context().get(), guard_condition_options);
if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "Failed to create interrupt guard condition in Executor constructor");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line isn't hit anymore.
The reason is that now we're creating two guard conditions, instead of one, and we're hitting the error in the first creation and not here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brawner Any suggestion to cover this line without adding a mock that fails in the second call to rcl_guard_condition_init?
I can apply the mentioned option, but IMO that's a bit "strange".

Maybe fault injection works for this, I will try to give it a try.

P.S.: I don't think that hitting this line matters, as overall coverage has improved.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P.S.: I don't think that hitting this line matters, as overall coverage has improved.

Yeah, that's my opinion too. If overall coverage is improving, then its OK to lose a line like this. If it becomes important, we can always revisit it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fault injection tools were intended for situations like this, but I think it's fine without.

}

context_->on_shutdown(
[weak_gc = std::weak_ptr<rclcpp::GuardCondition>{shutdown_guard_condition_}]() {
auto strong_gc = weak_gc.lock();
if (strong_gc) {
strong_gc->trigger();
}
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
});

// The number of guard conditions is always at least 2: 1 for the ctrl-c guard cond,
// and one for the executor's guard cond (interrupt_guard_condition_)

// Put the global ctrl-c guard condition in
memory_strategy_->add_guard_condition(options.context->get_interrupt_guard_condition(&wait_set_));
memory_strategy_->add_guard_condition(&shutdown_guard_condition_->get_rcl_guard_condition());

// Put the executor's guard condition in
memory_strategy_->add_guard_condition(&interrupt_guard_condition_);
rcl_allocator_t allocator = memory_strategy_->get_allocator();

// Store the context for later use.
context_ = options.context;

ret = rcl_wait_set_init(
&wait_set_,
0, 2, 0, 0, 0, 0,
Expand Down Expand Up @@ -129,8 +137,7 @@ Executor::~Executor()
rcl_reset_error();
}
// Remove and release the sigint guard condition
memory_strategy_->remove_guard_condition(context_->get_interrupt_guard_condition(&wait_set_));
context_->release_interrupt_guard_condition(&wait_set_, std::nothrow);
memory_strategy_->remove_guard_condition(&shutdown_guard_condition_->get_rcl_guard_condition());
}

std::vector<rclcpp::CallbackGroup::WeakPtr>
Expand Down
Loading