Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Thread configuration prototype #2205

Open
wants to merge 5 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions rclcpp/include/rclcpp/executor_options.hpp
Expand Up @@ -15,6 +15,8 @@
#ifndef RCLCPP__EXECUTOR_OPTIONS_HPP_
#define RCLCPP__EXECUTOR_OPTIONS_HPP_

#include <string>

#include "rclcpp/context.hpp"
#include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/memory_strategies.hpp"
Expand All @@ -36,6 +38,7 @@ struct ExecutorOptions
rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy;
rclcpp::Context::SharedPtr context;
size_t max_conditions;
std::string name;
};

} // namespace rclcpp
Expand Down
42 changes: 39 additions & 3 deletions rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp
Expand Up @@ -18,14 +18,16 @@
#include <chrono>
#include <memory>
#include <mutex>
#include <vector>
#include <set>
#include <thread>
#include <unordered_map>
#include <optional>

#include "rclcpp/executor.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/memory_strategies.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rcpputils/thread/thread_attribute.hpp"

namespace rclcpp
{
Expand All @@ -39,7 +41,7 @@ class MultiThreadedExecutor : public rclcpp::Executor

/// Constructor for MultiThreadedExecutor.
/**
* For the yield_before_execute option, when true std::this_thread::yield()
* For the yield_before_execute option, when true rcpputils::this_thread::yield()
* will be called after acquiring work (as an AnyExecutable) and
* releasing the spinning lock, but before executing the work.
* This is useful for reproducing some bugs related to taking work more than
Expand All @@ -48,7 +50,7 @@ class MultiThreadedExecutor : public rclcpp::Executor
* \param options common options for all executors
* \param number_of_threads number of threads to have in the thread pool,
* the default 0 will use the number of cpu cores found (minimum of 2)
* \param yield_before_execute if true std::this_thread::yield() is called
* \param yield_before_execute if true rcpputils::this_thread::yield() is called
* \param timeout maximum time to wait
*/
RCLCPP_PUBLIC
Expand All @@ -58,6 +60,21 @@ class MultiThreadedExecutor : public rclcpp::Executor
bool yield_before_execute = false,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

RCLCPP_PUBLIC
explicit MultiThreadedExecutor(
size_t number_of_threads,
const rcpputils::ThreadAttribute & thread_attr,
bool yield_before_execute = false,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

RCLCPP_PUBLIC
explicit MultiThreadedExecutor(
const rclcpp::ExecutorOptions & options,
size_t number_of_threads,
const rcpputils::ThreadAttribute & thread_attr,
bool yield_before_execute = false,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

RCLCPP_PUBLIC
virtual ~MultiThreadedExecutor();

Expand All @@ -73,6 +90,16 @@ class MultiThreadedExecutor : public rclcpp::Executor
size_t
get_number_of_threads();

RCLCPP_PUBLIC
const std::optional<rcpputils::ThreadAttribute> &
get_thread_attribute() const
{
return thread_attr_;
}

RCLCPP_PUBLIC
static const char default_name[];

protected:
RCLCPP_PUBLIC
void
Expand All @@ -81,8 +108,17 @@ class MultiThreadedExecutor : public rclcpp::Executor
private:
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)

RCLCPP_PUBLIC
explicit MultiThreadedExecutor(
const rclcpp::ExecutorOptions & options,
size_t number_of_threads,
std::optional<rcpputils::ThreadAttribute> thread_attr,
bool yield_before_execute,
std::chrono::nanoseconds timeout);

std::mutex wait_mutex_;
size_t number_of_threads_;
std::optional<rcpputils::ThreadAttribute> thread_attr_;
bool yield_before_execute_;
std::chrono::nanoseconds next_exec_timeout_;
};
Expand Down
30 changes: 30 additions & 0 deletions rclcpp/include/rclcpp/executors/single_threaded_executor.hpp
Expand Up @@ -21,6 +21,7 @@
#include <cstdlib>
#include <memory>
#include <vector>
#include <optional>

#include "rclcpp/executor.hpp"
#include "rclcpp/macros.hpp"
Expand All @@ -29,6 +30,7 @@
#include "rclcpp/utilities.hpp"
#include "rclcpp/rate.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rcpputils/thread/thread_attribute.hpp"

namespace rclcpp
{
Expand All @@ -49,6 +51,11 @@ class SingleThreadedExecutor : public rclcpp::Executor
explicit SingleThreadedExecutor(
const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions());

RCLCPP_PUBLIC
explicit SingleThreadedExecutor(
const rclcpp::ExecutorOptions & options,
const rcpputils::ThreadAttribute & thread_attr);

/// Default destructor.
RCLCPP_PUBLIC
virtual ~SingleThreadedExecutor();
Expand All @@ -65,8 +72,31 @@ class SingleThreadedExecutor : public rclcpp::Executor
void
spin() override;

RCLCPP_PUBLIC
bool has_thread_attribute() const
{
return thread_attr_.has_value();
}

RCLCPP_PUBLIC
const std::optional<rcpputils::ThreadAttribute> &
get_thread_attribute() const
{
return thread_attr_;
}

RCLCPP_PUBLIC
static const char default_name[];

protected:
RCLCPP_PUBLIC
void
run();

private:
RCLCPP_DISABLE_COPY(SingleThreadedExecutor)

std::optional<rcpputils::ThreadAttribute> thread_attr_;
};

} // namespace executors
Expand Down
96 changes: 90 additions & 6 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Expand Up @@ -14,30 +14,68 @@

#include "rclcpp/executors/multi_threaded_executor.hpp"

#include <algorithm>
#include <chrono>
#include <functional>
#include <memory>
#include <optional>
#include <vector>

#include "rcpputils/scope_exit.hpp"
#include "rcpputils/thread.hpp"

#include "rclcpp/logging.hpp"
#include "rclcpp/utilities.hpp"

using rclcpp::executors::MultiThreadedExecutor;

const char MultiThreadedExecutor::default_name[] = "RCLCPP_EXECUTOR_MULTI_THREADED";

static std::optional<rcpputils::ThreadAttribute>
default_thread_attr(const rclcpp::ExecutorOptions & options);

MultiThreadedExecutor::MultiThreadedExecutor(
const rclcpp::ExecutorOptions & options,
size_t number_of_threads,
bool yield_before_execute,
std::chrono::nanoseconds next_exec_timeout)
: MultiThreadedExecutor(
options, number_of_threads, default_thread_attr(options),
yield_before_execute, next_exec_timeout) {}

MultiThreadedExecutor::MultiThreadedExecutor(
size_t number_of_threads,
rcpputils::ThreadAttribute const & thread_attr,
bool yield_before_execute,
std::chrono::nanoseconds next_exec_timeout)
: MultiThreadedExecutor(
rclcpp::ExecutorOptions(), number_of_threads, std::optional(thread_attr),
yield_before_execute, next_exec_timeout) {}

MultiThreadedExecutor::MultiThreadedExecutor(
const rclcpp::ExecutorOptions & options,
size_t number_of_threads,
rcpputils::ThreadAttribute const & thread_attr,
bool yield_before_execute,
std::chrono::nanoseconds next_exec_timeout)
: MultiThreadedExecutor(
options, number_of_threads, std::optional(thread_attr),
yield_before_execute, next_exec_timeout) {}

MultiThreadedExecutor::MultiThreadedExecutor(
const rclcpp::ExecutorOptions & options,
size_t number_of_threads,
std::optional<rcpputils::ThreadAttribute> thread_attr,
bool yield_before_execute,
std::chrono::nanoseconds next_exec_timeout)
: rclcpp::Executor(options),
thread_attr_(std::move(thread_attr)),
yield_before_execute_(yield_before_execute),
next_exec_timeout_(next_exec_timeout)
{
number_of_threads_ = number_of_threads > 0 ?
number_of_threads :
std::max(std::thread::hardware_concurrency(), 2U);
std::max(rcpputils::Thread::hardware_concurrency(), 2U);

if (number_of_threads_ == 1) {
RCLCPP_WARN(
Expand All @@ -56,17 +94,26 @@ MultiThreadedExecutor::spin()
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
std::vector<std::thread> threads;
std::vector<rcpputils::Thread> threads;
size_t thread_id = 0;
{

if (thread_attr_) {
std::lock_guard wait_lock{wait_mutex_};
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
for (; thread_id < number_of_threads_; ++thread_id) {
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
threads.emplace_back(func);
threads.emplace_back(thread_attr_.value(), func);
}
} else {
{
std::lock_guard wait_lock{wait_mutex_};
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
threads.emplace_back(func);
}
}
run(thread_id);
}

run(thread_id);
for (auto & thread : threads) {
thread.join();
}
Expand Down Expand Up @@ -104,3 +151,40 @@ MultiThreadedExecutor::run(size_t this_thread_number)
any_exec.callback_group.reset();
}
}

std::optional<rcpputils::ThreadAttribute>
default_thread_attr(rclcpp::ExecutorOptions const & options)
{
const rcutils_thread_attrs_t * attrs = rcl_context_get_thread_attrs(
options.context->get_rcl_context().get());
if (!attrs) {
return std::nullopt;
}

std::string name;
bool name_specified = !options.name.empty();
if (name_specified) {
name = options.name;
} else {
name = MultiThreadedExecutor::default_name;
}

const rcutils_thread_attr_t * attrs_beg = attrs->attributes;
const rcutils_thread_attr_t * attrs_end = attrs->attributes + attrs->num_attributes;
const rcutils_thread_attr_t * attr = std::find_if(
attrs_beg, attrs_end,
[&](const auto & attr) {
return attr.tag == name;
});
if (attr != attrs_end) {
return rcpputils::ThreadAttribute(*attr);
} else {
if (name_specified) {
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"MultiThreadedExecutor is named \"%s\", but not found corresponding thread attribute.",
name.c_str());
}
return std::nullopt;
}
}