Skip to content

Commit

Permalink
Refactor rt double buffered list for easier comprehension and safety
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Lopez committed Sep 18, 2020
1 parent 160a7c7 commit 4c6a609
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 78 deletions.
Expand Up @@ -118,15 +118,15 @@ class ControllerManager : public rclcpp::Node

CONTROLLER_MANAGER_PUBLIC
controller_interface::return_type
activate() const;
activate();

CONTROLLER_MANAGER_PUBLIC
controller_interface::return_type
deactivate() const;
deactivate();

CONTROLLER_MANAGER_PUBLIC
controller_interface::return_type
cleanup() const;
cleanup();

protected:
CONTROLLER_MANAGER_PUBLIC
Expand Down Expand Up @@ -186,18 +186,71 @@ class ControllerManager : public rclcpp::Node
std::shared_ptr<rclcpp::Executor> executor_;
std::vector<ControllerLoaderInterfaceSharedPtr> loaders_;

/** \name Controllers List
* The controllers list is double-buffered to avoid needing to lock the
* real-time thread when switching controllers in the non-real-time thread.
*\{*/
/// Mutex protecting the current controllers list
std::recursive_mutex controllers_lock_;
std::vector<ControllerSpec> controllers_lists_[2];
/// The index of the current controllers list
int current_controllers_list_ = {0};
/// The index of the controllers list being used in the real-time thread.
int used_by_realtime_ = {-1};
/**
* @brief The RTControllerListWrapper class wraps a double-buffered list of controllers
* to avoid needing to lock the real-time thread when switching controllers in
* the non-real-time thread.
*
* There's always an "updated" list and an "outdated" one
* There's always an "used by rt" list and an "unused by rt" list
*
* The updated state changes on the switch_updated_list()
* The rt usage state changes on the get_used_by_rt_list()
*/
class RTControllerListWrapper
{
public:
/**
* @brief get_used_by_rt_list Makes the "updated" list the "used by rt" list
* @warning Should only be called by the RT thread, noone should modify the
* updated list while it's being used
* @return reference to the updated list
*/
std::vector<ControllerSpec> & get_used_by_rt_list();

/**
* @brief get_unused_list Waits until the "outdated" and "unused by rt"
* lists match and returns a reference to it
* This referenced list can be modified safely until switch_updated_controller_list()
* is called, at this point the RT thread may start using it at any time
* @param guard Guard needed to make sure the caller is the only one accessing the unused by rt list
*/
std::vector<ControllerSpec> & get_unused_list(
const std::lock_guard<std::recursive_mutex> & guard);

/**
* @brief get_updated_list Returns a const reference to the most updated list,
* @warning May or may not being used by the realtime thread, read-only reference for safety
* @param guard Guard needed to make sure the caller is the only one accessing the unused by rt list
*/
const std::vector<ControllerSpec> & get_updated_list(
const std::lock_guard<std::recursive_mutex> & guard) const;

/**
* @brief switch_updated_list Switches the "updated" and "outdated" lists, and waits
* until the RT thread is using the new "updated" list.
* @param guard Guard needed to make sure the caller is the only one accessing the unused by rt list
*/
void switch_updated_list(const std::lock_guard<std::recursive_mutex> & guard);


// Mutex protecting the controllers list
// must be acquired before using any list other than the "used by rt"
mutable std::recursive_mutex controllers_lock_;

private:
int get_other_list(int index) const;

void wait_until_rt_not_using(int index) const;

std::vector<ControllerSpec> controllers_lists_[2];
/// The index of the controller list with the most updated information
int updated_controllers_index_ = {0};
/// The index of the controllers list being used in the real-time thread.
int used_by_realtime_controllers_index_ = {-1};
};

RTControllerListWrapper rt_controllers_wrapper_;
/// mutex copied from ROS1 Control, protects service callbacks
/// not needed if we're guaranteed that the callbacks don't come from multiple threads
std::mutex services_lock_;
Expand Down
170 changes: 106 additions & 64 deletions controller_manager/src/controller_manager.cpp
Expand Up @@ -93,17 +93,9 @@ controller_interface::ControllerInterfaceSharedPtr ControllerManager::load_contr
controller_interface::return_type ControllerManager::unload_controller(
const std::string & controller_name)
{
// get reference to controller list
int free_controllers_list = (current_controllers_list_ + 1) % 2;
while (rclcpp::ok() && free_controllers_list == used_by_realtime_) {
if (!rclcpp::ok()) {
return controller_interface::return_type::ERROR;
}
std::this_thread::sleep_for(std::chrono::microseconds(200));
}
std::vector<ControllerSpec>
& from = controllers_lists_[current_controllers_list_],
& to = controllers_lists_[free_controllers_list];
std::lock_guard<std::recursive_mutex> guard(rt_controllers_wrapper_.controllers_lock_);
std::vector<ControllerSpec> & to = rt_controllers_wrapper_.get_unused_list(guard);
const std::vector<ControllerSpec> & from = rt_controllers_wrapper_.get_updated_list(guard);
to.clear();

// Transfers the running controllers over, skipping the one to be removed and the running ones.
Expand Down Expand Up @@ -140,16 +132,11 @@ controller_interface::return_type ControllerManager::unload_controller(

// Destroys the old controllers list when the realtime thread is finished with it.
RCLCPP_DEBUG(get_logger(), "Realtime switches over to new controller list");
int former_current_controllers_list_ = current_controllers_list_;
current_controllers_list_ = free_controllers_list;
while (rclcpp::ok() && used_by_realtime_ == former_current_controllers_list_) {
if (!rclcpp::ok()) {
return controller_interface::return_type::ERROR;
}
std::this_thread::sleep_for(std::chrono::microseconds(200));
}
rt_controllers_wrapper_.switch_updated_list(guard);
std::vector<ControllerSpec> & new_unused_list = rt_controllers_wrapper_.get_unused_list(
guard);
RCLCPP_DEBUG(get_logger(), "Destruct controller");
from.clear();
new_unused_list.clear();
RCLCPP_DEBUG(get_logger(), "Destruct controller finished");

RCLCPP_DEBUG(get_logger(), "Successfully unloaded controller '%s'", controller_name.c_str());
Expand All @@ -158,7 +145,8 @@ controller_interface::return_type ControllerManager::unload_controller(

std::vector<ControllerSpec> ControllerManager::get_loaded_controllers() const
{
return controllers_lists_[current_controllers_list_];
std::lock_guard<std::recursive_mutex> guard(rt_controllers_wrapper_.controllers_lock_);
return rt_controllers_wrapper_.get_updated_list(guard);
}

void ControllerManager::register_controller_loader(ControllerLoaderInterfaceSharedPtr loader)
Expand Down Expand Up @@ -198,9 +186,6 @@ controller_interface::return_type ControllerManager::switch_controller(
RCLCPP_DEBUG(get_logger(), "- stopping controller '%s'", controller.c_str());
}

// lock controllers
std::lock_guard<std::recursive_mutex> guard(controllers_lock_);

controller_interface::ControllerInterface * ct;
// list all controllers to stop
for (const auto & controller : stop_controllers) {
Expand Down Expand Up @@ -263,7 +248,11 @@ controller_interface::return_type ControllerManager::switch_controller(
switch_stop_list_.clear();
#endif

const auto & controllers = controllers_lists_[current_controllers_list_];
// lock controllers
std::lock_guard<std::recursive_mutex> guard(rt_controllers_wrapper_.controllers_lock_);

const std::vector<ControllerSpec> & controllers =
rt_controllers_wrapper_.get_updated_list(guard);
for (const auto & controller : controllers) {
bool in_stop_list = false;
for (const auto & request : stop_request_) {
Expand Down Expand Up @@ -400,17 +389,11 @@ controller_interface::ControllerInterfaceSharedPtr
ControllerManager::add_controller_impl(
const ControllerSpec & controller)
{
// get reference to controller list
int free_controllers_list = (current_controllers_list_ + 1) % 2;
while (rclcpp::ok() && free_controllers_list == used_by_realtime_) {
if (!rclcpp::ok()) {
return nullptr;
}
std::this_thread::sleep_for(std::chrono::microseconds(200));
}
std::vector<ControllerSpec>
& from = controllers_lists_[current_controllers_list_],
& to = controllers_lists_[free_controllers_list];
// lock controllers
std::lock_guard<std::recursive_mutex> guard(rt_controllers_wrapper_.controllers_lock_);

std::vector<ControllerSpec> & to = rt_controllers_wrapper_.get_unused_list(guard);
const std::vector<ControllerSpec> & from = rt_controllers_wrapper_.get_updated_list(guard);
to.clear();

// Copy all controllers from the 'from' list to the 'to' list
Expand Down Expand Up @@ -441,26 +424,24 @@ ControllerManager::add_controller_impl(
to.emplace_back(controller);

// Destroys the old controllers list when the realtime thread is finished with it.
int former_current_controllers_list_ = current_controllers_list_;
current_controllers_list_ = free_controllers_list;
while (rclcpp::ok() && used_by_realtime_ == former_current_controllers_list_) {
if (!rclcpp::ok()) {
return nullptr;
}
std::this_thread::sleep_for(std::chrono::microseconds(200));
}
from.clear();
RCLCPP_DEBUG(get_logger(), "Realtime switches over to new controller list");
rt_controllers_wrapper_.switch_updated_list(guard);
RCLCPP_DEBUG(get_logger(), "Destruct controller");
std::vector<ControllerSpec> & new_unused_list = rt_controllers_wrapper_.get_unused_list(
guard);
new_unused_list.clear();
RCLCPP_DEBUG(get_logger(), "Destruct controller finished");

return to.back().c;
}

controller_interface::ControllerInterface * ControllerManager::get_controller_by_name(
const std::string & name)
{
// Lock recursive mutex in this context
std::lock_guard<std::recursive_mutex> guard(controllers_lock_);
// lock controllers
std::lock_guard<std::recursive_mutex> guard(rt_controllers_wrapper_.controllers_lock_);

for (const auto & controller : controllers_lists_[current_controllers_list_]) {
for (const auto & controller : rt_controllers_wrapper_.get_updated_list(guard)) {
if (controller.info.name == name) {
return controller.c.get();
}
Expand Down Expand Up @@ -605,9 +586,10 @@ void ControllerManager::list_controllers_srv_cb(
std::lock_guard<std::mutex> services_guard(services_lock_);
RCLCPP_DEBUG(get_logger(), "list controller service locked");

// lock controllers to get all names/types/states
std::lock_guard<std::recursive_mutex> controller_guard(controllers_lock_);
auto & controllers = controllers_lists_[current_controllers_list_];
// lock controllers
std::lock_guard<std::recursive_mutex> guard(rt_controllers_wrapper_.controllers_lock_);
const std::vector<ControllerSpec> & controllers =
rt_controllers_wrapper_.get_updated_list(guard);
response->controller.resize(controllers.size());

for (size_t i = 0; i < controllers.size(); ++i) {
Expand Down Expand Up @@ -685,8 +667,9 @@ void ControllerManager::reload_controller_libraries_service_cb(
std::vector<std::string> loaded_controllers, running_controllers;
get_controller_names(loaded_controllers);
{
std::lock_guard<std::recursive_mutex> guard(controllers_lock_);
for (const auto & controller : controllers_lists_[current_controllers_list_]) {
// lock controllers
std::lock_guard<std::recursive_mutex> guard(rt_controllers_wrapper_.controllers_lock_);
for (const auto & controller : rt_controllers_wrapper_.get_updated_list(guard)) {
if (isControllerRunning(*controller.c)) {
running_controllers.push_back(controller.info.name);
}
Expand Down Expand Up @@ -780,20 +763,23 @@ void ControllerManager::unload_controller_service_cb(

void ControllerManager::get_controller_names(std::vector<std::string> & names)
{
std::lock_guard<std::recursive_mutex> guard(controllers_lock_);
names.clear();
for (const auto & controller : controllers_lists_[current_controllers_list_]) {

// lock controllers
std::lock_guard<std::recursive_mutex> guard(rt_controllers_wrapper_.controllers_lock_);
for (const auto & controller : rt_controllers_wrapper_.get_updated_list(guard)) {
names.push_back(controller.info.name);
}
}

controller_interface::return_type
ControllerManager::update()
{
used_by_realtime_ = current_controllers_list_;
std::vector<ControllerSpec> & rt_controller_list =
rt_controllers_wrapper_.get_used_by_rt_list();

auto ret = controller_interface::return_type::SUCCESS;
for (auto loaded_controller : controllers_lists_[used_by_realtime_]) {
for (auto loaded_controller : rt_controller_list) {
// TODO(v-lopez) we could cache this information
// https://github.com/ros-controls/ros2_control/issues/153
if (isControllerRunning(*loaded_controller.c)) {
Expand Down Expand Up @@ -844,7 +830,9 @@ ControllerManager::configure()
&ControllerManager::unload_controller_service_cb, this, _1,
_2));

for (auto loaded_controller : controllers_lists_[current_controllers_list_]) {
// lock controllers
std::lock_guard<std::recursive_mutex> guard(rt_controllers_wrapper_.controllers_lock_);
for (auto loaded_controller : rt_controllers_wrapper_.get_updated_list(guard)) {
auto controller_state = loaded_controller.c->get_lifecycle_node()->configure();
if (controller_state.id() != lifecycle_msgs::msg::State::PRIMARY_STATE_INACTIVE) {
ret = controller_interface::return_type::ERROR;
Expand All @@ -855,10 +843,10 @@ ControllerManager::configure()
}

controller_interface::return_type
ControllerManager::activate() const
ControllerManager::activate()
{
auto ret = controller_interface::return_type::SUCCESS;
for (auto loaded_controller : controllers_lists_[current_controllers_list_]) {
for (auto loaded_controller : rt_controllers_wrapper_.get_used_by_rt_list()) {
auto controller_state = loaded_controller.c->get_lifecycle_node()->activate();
if (controller_state.id() != lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE) {
ret = controller_interface::return_type::ERROR;
Expand All @@ -869,10 +857,10 @@ ControllerManager::activate() const
}

controller_interface::return_type
ControllerManager::deactivate() const
ControllerManager::deactivate()
{
auto ret = controller_interface::return_type::SUCCESS;
for (auto loaded_controller : controllers_lists_[current_controllers_list_]) {
for (auto loaded_controller : rt_controllers_wrapper_.get_used_by_rt_list()) {
auto controller_state = loaded_controller.c->get_lifecycle_node()->deactivate();
if (controller_state.id() != lifecycle_msgs::msg::State::PRIMARY_STATE_INACTIVE) {
ret = controller_interface::return_type::ERROR;
Expand All @@ -883,10 +871,10 @@ ControllerManager::deactivate() const
}

controller_interface::return_type
ControllerManager::cleanup() const
ControllerManager::cleanup()
{
auto ret = controller_interface::return_type::SUCCESS;
for (auto loaded_controller : controllers_lists_[current_controllers_list_]) {
for (auto loaded_controller : rt_controllers_wrapper_.get_used_by_rt_list()) {
auto controller_state = loaded_controller.c->get_lifecycle_node()->cleanup();
if (controller_state.id() != lifecycle_msgs::msg::State::PRIMARY_STATE_UNCONFIGURED) {
ret = controller_interface::return_type::ERROR;
Expand All @@ -896,4 +884,58 @@ ControllerManager::cleanup() const
return ret;
}

std::vector<ControllerSpec> &
ControllerManager::RTControllerListWrapper::get_used_by_rt_list()
{
used_by_realtime_controllers_index_ = updated_controllers_index_;
return controllers_lists_[used_by_realtime_controllers_index_];
}

std::vector<ControllerSpec> &
ControllerManager::RTControllerListWrapper::get_unused_list(
const std::lock_guard<std::recursive_mutex> &)
{
assert(controllers_lock_.try_lock());
controllers_lock_.unlock();
// Get the index to the outdated controller list
int free_controllers_list = get_other_list(updated_controllers_index_);

// Wait until the outdated controller list is not being used by the realtime thread
wait_until_rt_not_using(free_controllers_list);
return controllers_lists_[free_controllers_list];
}

const std::vector<ControllerSpec> & ControllerManager::RTControllerListWrapper::get_updated_list(
const std::lock_guard<std::recursive_mutex> &) const
{
assert(controllers_lock_.try_lock());
controllers_lock_.unlock();
return controllers_lists_[updated_controllers_index_];
}

void ControllerManager::RTControllerListWrapper::switch_updated_list(
const std::lock_guard<std::recursive_mutex> &)
{
assert(controllers_lock_.try_lock());
controllers_lock_.unlock();
int former_current_controllers_list_ = updated_controllers_index_;
updated_controllers_index_ = get_other_list(former_current_controllers_list_);
wait_until_rt_not_using(former_current_controllers_list_);
}

int ControllerManager::RTControllerListWrapper::get_other_list(int index) const
{
return (index + 1) % 2;
}

void ControllerManager::RTControllerListWrapper::wait_until_rt_not_using(int index) const
{
while (used_by_realtime_controllers_index_ == index) {
if (!rclcpp::ok()) {
throw std::runtime_error("rclcpp interrupted");
}
std::this_thread::sleep_for(std::chrono::microseconds(200));
}
}

} // namespace controller_manager

0 comments on commit 4c6a609

Please sign in to comment.