Skip to content

Commit

Permalink
Merge "reactor: stop using signals for task_quota timer"
Browse files Browse the repository at this point in the history
"Signals do not scale on large multicores due to a process-wide lock, and
since we want to schedule at sub-task-quota latencies, the problem will
only get worse.  Fix by switching to a timerfd+thread.

In the future, we may reduce the number of context switches by having a
thread update more than one reactor's g_need_preempt, but that is not done
now."

* timer-signal:
  reactor: convert task quota signal to a timerfd+thread
  reactor: simplify itimerspec calculations by using the new helpers
  posix: add helpers for `struct timespec` and `struct itimerspec`
  • Loading branch information
avikivity committed Jan 1, 2017
2 parents bb4711f + f03ebd3 commit c175cb5
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 29 deletions.
64 changes: 63 additions & 1 deletion core/posix.hh
Expand Up @@ -16,7 +16,7 @@
* under the License.
*/
/*
* Copyright (C) 2014 Cloudius Systems, Ltd.
* Copyright (C) 2016 ScyllaDB
*/

#ifndef FILE_DESC_HH_
Expand All @@ -41,8 +41,18 @@
#include <pthread.h>
#include <signal.h>
#include <memory>
#include <chrono>
#include "net/socket_defs.hh"

/// \file
/// \defgroup posix-support POSIX Support
///
/// Mostly-internal APIs to provide C++ glue for the underlying POSIX platform;
/// but can be used by the application when they don't block.
///
/// \addtogroup posix-support
/// @{

inline void throw_system_error_on(bool condition, const char* what_arg = "");

template <typename T>
Expand Down Expand Up @@ -306,6 +316,55 @@ private:
file_desc(int fd) : _fd(fd) {}
};


namespace seastar {

namespace posix {

/// Converts a duration value to a `timespec`
///
/// \param d a duration value to convert to the POSIX `timespec` format
/// \return `d` as a `timespec` value
template <typename Rep, typename Period>
struct timespec
to_timespec(std::chrono::duration<Rep, Period> d) {
auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(d).count();
struct timespec ts {};
ts.tv_sec = ns / 1000000000;
ts.tv_nsec = ns % 1000000000;
return ts;
}

/// Converts a relative start time and an interval to an `itimerspec`
///
/// \param base First expiration of the timer, relative to the current time
/// \param interval period for re-arming the timer
/// \return `base` and `interval` converted to an `itimerspec`
template <typename Rep1, typename Period1, typename Rep2, typename Period2>
struct itimerspec
to_relative_itimerspec(std::chrono::duration<Rep1, Period1> base, std::chrono::duration<Rep2, Period2> interval) {
struct itimerspec its {};
its.it_interval = to_timespec(interval);
its.it_value = to_timespec(base);
return its;
}


/// Converts a time_point and a duration to an `itimerspec`
///
/// \param base base time for the timer; must use the same clock as the timer
/// \param interval period for re-arming the timer
/// \return `base` and `interval` converted to an `itimerspec`
template <typename Clock, class Duration, class Rep, class Period>
struct itimerspec
to_absolute_itimerspec(std::chrono::time_point<Clock, Duration> base, std::chrono::duration<Rep, Period> interval) {
return to_relative_itimerspec(base.time_since_epoch(), interval);
}

}

}

class posix_thread {
public:
class attr;
Expand Down Expand Up @@ -400,4 +459,7 @@ void pin_this_thread(unsigned cpu_id) {
auto r = pthread_setaffinity_np(pthread_self(), sizeof(cs), &cs);
assert(r == 0);
}

/// @}

#endif /* FILE_DESC_HH_ */
48 changes: 21 additions & 27 deletions core/reactor.cc
Expand Up @@ -231,10 +231,6 @@ inline int alarm_signal() {
return SIGRTMIN;
}

inline int task_quota_signal() {
return SIGRTMIN + 1;
}

// Installs signal handler stack for current thread.
// The stack remains installed as long as the returned object is kept alive.
// When it goes out of scope the previous handler is restored.
Expand Down Expand Up @@ -266,10 +262,12 @@ reactor::reactor()
[&] { timer_thread_func(); }, sched::thread::attr().stack(4096).name("timer_thread").pin(sched::cpu::current()))
, _engine_thread(sched::thread::current())
#endif
, _task_quota_timer(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC))
, _cpu_started(0)
, _io_context(0)
, _io_context_available(max_aio)
, _reuseport(posix_reuseport_detect()) {
, _reuseport(posix_reuseport_detect())
, _task_quota_timer_thread(&reactor::task_quota_timer_thread_fn, this) {

seastar::thread_impl::init();
auto r = ::io_setup(max_aio, &_io_context);
Expand All @@ -288,11 +286,7 @@ reactor::reactor()
sev.sigev_signo = alarm_signal();
r = timer_create(CLOCK_MONOTONIC, &sev, &_steady_clock_timer);
assert(r >= 0);
sev.sigev_signo = task_quota_signal();
r = timer_create(CLOCK_MONOTONIC, &sev, &_task_quota_timer);
assert(r >= 0);
sigemptyset(&mask);
sigaddset(&mask, task_quota_signal());
r = ::pthread_sigmask(SIG_UNBLOCK, &mask, NULL);
assert(r == 0);
#endif
Expand All @@ -304,7 +298,8 @@ reactor::reactor()
}

reactor::~reactor() {
timer_delete(_task_quota_timer);
_dying.store(true, std::memory_order_relaxed);
_task_quota_timer_thread.join();
timer_delete(_steady_clock_timer);
auto eraser = [](auto& list) {
while (!list.empty()) {
Expand All @@ -317,6 +312,18 @@ reactor::~reactor() {
eraser(_expired_manual_timers);
}

void
reactor::task_quota_timer_thread_fn() {
while (!_dying.load(std::memory_order_relaxed)) {
uint64_t events;
_task_quota_timer.read(&events, 8);
_local_need_preempt = true;
// We're in a different thread, but guaranteed to be on the same core, so even
// a signal fence is overdoing it
std::atomic_signal_fence(std::memory_order_seq_cst);
}
}

void
reactor::clear_task_quota(int) {
g_need_preempt = true;
Expand Down Expand Up @@ -2423,23 +2430,10 @@ int reactor::run() {
});
load_timer.arm_periodic(1s);

itimerspec its = {};
auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(_task_quota).count();
auto tv_nsec = nsec % 1'000'000'000;
auto tv_sec = nsec / 1'000'000'000;
its.it_value.tv_nsec = tv_nsec;
its.it_value.tv_sec = tv_sec;
its.it_interval = its.it_value;
auto r = timer_settime(_task_quota_timer, 0, &its, nullptr);
assert(r == 0);
itimerspec its = seastar::posix::to_relative_itimerspec(_task_quota, _task_quota);
_task_quota_timer.timerfd_settime(0, its);
auto& task_quote_itimerspec = its;

struct sigaction sa_task_quota = {};
sa_task_quota.sa_handler = &reactor::clear_task_quota;
sa_task_quota.sa_flags = SA_RESTART;
r = sigaction(task_quota_signal(), &sa_task_quota, nullptr);
assert(r == 0);

bool idle = false;

std::function<bool()> check_for_work = [this] () {
Expand Down Expand Up @@ -2493,11 +2487,11 @@ int reactor::run() {
if (idle_end - idle_start > _max_poll_time) {
// Turn off the task quota timer to avoid spurious wakeiups
struct itimerspec zero_itimerspec = {};
timer_settime(_task_quota_timer, 0, &zero_itimerspec, nullptr);
_task_quota_timer.timerfd_settime(0, zero_itimerspec);
sleep();
// We may have slept for a while, so freshen idle_end
idle_end = steady_clock_type::now();
timer_settime(_task_quota_timer, 0, &task_quote_itimerspec, nullptr);
_task_quota_timer.timerfd_settime(0, task_quote_itimerspec);
}
} else {
// We previously ran pure_check_for_work(), might not actually have performed
Expand Down
6 changes: 5 additions & 1 deletion core/reactor.hh
Expand Up @@ -676,7 +676,7 @@ private:
promise<std::unique_ptr<network_stack>> _network_stack_ready_promise;
int _return = 0;
timer_t _steady_clock_timer = {};
timer_t _task_quota_timer = {};
file_desc _task_quota_timer;
promise<> _start_promise;
semaphore _cpu_started;
uint64_t _tasks_processed = 0;
Expand Down Expand Up @@ -730,6 +730,9 @@ private:
std::atomic<bool> _sleeping alignas(64);
pthread_t _thread_id alignas(64) = pthread_self();
bool _strict_o_direct = true;
bool& _local_need_preempt{g_need_preempt}; // for access from the _task_quota_timer_thread
std::thread _task_quota_timer_thread;
std::atomic<bool> _dying{false};
private:
static std::chrono::nanoseconds calculate_poll_time();
static void clear_task_quota(int);
Expand Down Expand Up @@ -782,6 +785,7 @@ private:

void run_tasks(circular_buffer<std::unique_ptr<task>>& tasks);
bool posix_reuseport_detect();
void task_quota_timer_thread_fn();
public:
static boost::program_options::options_description get_options_description();
reactor();
Expand Down

0 comments on commit c175cb5

Please sign in to comment.