Skip to content

Commit

Permalink
Merge "enhance the I/O Scheduler with bandwidth and throughput limits…
Browse files Browse the repository at this point in the history
…" from Glauber

"
This is my new proposal for the I/O Scheduler concurrency determination,
addressing a series of shortcomings that the current design was shown
over the years to display.

Not yet included in this series is a version of iotune that calculates
the new parameters that are used in setting up the disk in this new
proposal. But that will follow shortly after we agree on this.

Also not included, but a natural next step for this work, is to allow
for all all shards to dispatch I/O, reducing the inter-shard latency for
the remote dispatchers. For now, I think we can take the next step with
the same --num-io-queues parameters we have, and later handle that
issue. We can do that by allowing a good old fashioned lock, or a
lock-free priority queue. That deserves a separate investigation on its
own, and I am positive that what is delivered here will already yield
strong latency benefits, while still allowing us to adjust the number of
io queues manually.
"

* 'iosched-v2-v3' of github.com:glommer/seastar:
  io_queue: determine concurrency based on the disk's visible throughput
  utils: adjust identation
  utils: allow parse_memory_size to interpret unitless numbers
  reactor: extend the fair queue with size and iops limits
  io_queues: move capacity planning out of resource manager
  io_queue: accept a configuration struct
  fair_queue: allow for passing more complex parameters
  fair_queue: encapsulate properties in a request descriptor
  reactor: pass a descriptor pointer to submit_io
  • Loading branch information
avikivity committed Apr 18, 2018
2 parents 27d763b + 97cc0de commit 636424c
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 96 deletions.
72 changes: 55 additions & 17 deletions core/fair_queue.hh
Expand Up @@ -36,14 +36,22 @@

namespace seastar {

/// \brief describes a request that passes through the fair queue
///
/// \related fair_queue
struct fair_queue_request_descriptor {
unsigned weight = 1; ///< the weight of this request for capacity purposes (IOPS).
unsigned size = 1; ///< the effective size of this request
};

/// \addtogroup io-module
/// @{

/// \cond internal
class priority_class {
struct request {
noncopyable_function<void()> func;
unsigned weight;
fair_queue_request_descriptor desc;
};
friend class fair_queue;
uint32_t _shares = 0;
Expand Down Expand Up @@ -95,6 +103,18 @@ using priority_class_ptr = lw_shared_ptr<priority_class>;
/// them first, until balance is restored. This balancing is expected to happen within
/// a certain time window that obeys an exponential decay.
class fair_queue {
public:
/// \brief Fair Queue configuration structure.
///
/// \sets the operation parameters of a \ref fair_queue
/// \related fair_queue
struct config {
unsigned capacity = std::numeric_limits<unsigned>::max();
std::chrono::microseconds tau = std::chrono::milliseconds(100);
unsigned max_req_count = std::numeric_limits<unsigned>::max();
unsigned max_bytes_count = std::numeric_limits<unsigned>::max();
};
private:
friend priority_class;

struct class_compare {
Expand All @@ -103,12 +123,13 @@ class fair_queue {
}
};

config _config;
unsigned _requests_executing = 0;
unsigned _req_count_executing = 0;
unsigned _bytes_count_executing = 0;
unsigned _requests_queued = 0;
unsigned _capacity;
using clock_type = std::chrono::steady_clock::time_point;
clock_type _base;
std::chrono::microseconds _tau;
using prioq = std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare>;
prioq _handles;
std::unordered_set<priority_class_ptr> _all_classes;
Expand All @@ -134,23 +155,35 @@ class fair_queue {
}

void normalize_stats() {
auto time_delta = std::log(normalize_factor()) * _tau;
auto time_delta = std::log(normalize_factor()) * _config.tau;
// time_delta is negative; and this may advance _base into the future
_base -= std::chrono::duration_cast<clock_type::duration>(time_delta);
for (auto& pc: _all_classes) {
pc->_accumulated *= normalize_factor();
}
}

bool can_dispatch() const {
return _requests_queued &&
(_requests_executing < _config.capacity) &&
(_req_count_executing < _config.max_req_count) &&
(_bytes_count_executing < _config.max_bytes_count);
}
public:
/// Constructs a fair queue with configuration parameters \c cfg.
///
/// \param cfg an instance of the class \ref config
explicit fair_queue(config cfg)
: _config(std::move(cfg))
, _base(std::chrono::steady_clock::now())
{}

/// Constructs a fair queue with a given \c capacity.
///
/// \param capacity how many concurrent requests are allowed in this queue.
/// \param tau the queue exponential decay parameter, as in exp(-1/tau * t)
explicit fair_queue(unsigned capacity, std::chrono::microseconds tau = std::chrono::milliseconds(100))
: _capacity(capacity)
, _base(std::chrono::steady_clock::now())
, _tau(tau) {
}
: fair_queue(config{capacity, tau}) {}

/// Registers a priority class against this fair queue.
///
Expand Down Expand Up @@ -186,23 +219,26 @@ public:
///
/// The user of this interface is supposed to call \ref notify_requests_finished when the
/// request finishes executing - regardless of success or failure.
void queue(priority_class_ptr pc, unsigned weight, noncopyable_function<void()> func) {
void queue(priority_class_ptr pc, fair_queue_request_descriptor desc, noncopyable_function<void()> func) {
// We need to return a future in this function on which the caller can wait.
// Since we don't know which queue we will use to execute the next request - if ours or
// someone else's, we need a separate promise at this point.
push_priority_class(pc);
pc->_queue.push_back(priority_class::request{std::move(func), weight});
pc->_queue.push_back(priority_class::request{std::move(func), std::move(desc)});
_requests_queued++;
}

/// Notifies that \c finished requests finished
void notify_requests_finished(unsigned finished) {
_requests_executing -= finished;
/// Notifies that ont request finished
/// \param desc an instance of \c fair_queue_request_descriptor structure describing the request that just finished.
void notify_requests_finished(fair_queue_request_descriptor& desc) {
_requests_executing--;
_req_count_executing -= desc.weight;
_bytes_count_executing -= desc.size;
}

/// Try to execute new requests if there is capacity left in the queue.
void dispatch_requests() {
while (_requests_queued && (_requests_executing < _capacity)) {
while (can_dispatch()) {
priority_class_ptr h;
do {
h = pop_priority_class();
Expand All @@ -211,17 +247,19 @@ public:
auto req = std::move(h->_queue.front());
h->_queue.pop_front();
_requests_executing++;
_req_count_executing += req.desc.weight;
_bytes_count_executing += req.desc.size;
_requests_queued--;

auto delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base);
auto req_cost = float(req.weight) / h->_shares;
auto cost = expf(1.0f/_tau.count() * delta.count()) * req_cost;
auto req_cost = (float(req.desc.weight) / _config.max_req_count + float(req.desc.size) / _config.max_bytes_count) / h->_shares;
auto cost = expf(1.0f/_config.tau.count() * delta.count()) * req_cost;
float next_accumulated = h->_accumulated + cost;
while (std::isinf(next_accumulated)) {
normalize_stats();
// If we have renormalized, our time base will have changed. This should happen very infrequently
delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base);
cost = expf(1.0f/_tau.count() * delta.count()) * req_cost;
cost = expf(1.0f/_config.tau.count() * delta.count()) * req_cost;
next_accumulated = h->_accumulated + cost;
}
h->_accumulated = next_accumulated;
Expand Down

0 comments on commit 636424c

Please sign in to comment.