Skip to content

Commit

Permalink
Merge pull request #202 from mmaslankaprv/acks-all-optimizations
Browse files Browse the repository at this point in the history
Optimized `acks=-1` produce tail latency
  • Loading branch information
mmaslankaprv committed Dec 3, 2020
2 parents d85515b + 89b0ab5 commit 15e67a1
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 19 deletions.
6 changes: 6 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ configuration::configuration()
"follower",
required::no,
5s)
, replicate_request_debounce_timeout_ms(
*this,
"replicate_request_debounce_timeout_ms",
"Max duration before dispatching batched replicate requests",
required::no,
4ms)
, reclaim_min_size(
*this,
"reclaim_min_size",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ struct configuration final : public config_store {
property<std::chrono::milliseconds> kafka_group_recovery_timeout_ms;
property<std::chrono::milliseconds> replicate_append_timeout_ms;
property<std::chrono::milliseconds> recovery_append_timeout_ms;
property<std::chrono::milliseconds> replicate_request_debounce_timeout_ms;

property<size_t> reclaim_min_size;
property<size_t> reclaim_max_size;
Expand Down
34 changes: 29 additions & 5 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ consensus::consensus(
, _client_protocol(client)
, _leader_notification(std::move(cb))
, _fstats({})
, _batcher(this)
, _batcher(
this, config::shard_local_cfg().replicate_request_debounce_timeout_ms())
, _event_manager(this)
, _ctxlog(group, _log.config().ntp())
, _replicate_append_timeout(
Expand Down Expand Up @@ -186,17 +187,40 @@ consensus::success_reply consensus::update_follower_index(
"Append entries response send to wrong group: {}, current group: {}",
reply.group));
}
if (seq < idx.last_received_seq) {
if (
seq < idx.last_received_seq
&& reply.last_dirty_log_index < _log.offsets().dirty_offset) {
vlog(
_ctxlog.trace,
"ignorring reordered reply from node {} - last: {} current: {} ",
"ignorring reordered reply {} from node {} - last: {} current: {} ",
reply,
reply.node_id,
idx.last_received_seq,
seq);
return success_reply::no;
}
// only update for in order sequences
idx.last_received_seq = seq;
/**
* Even though we allow some of the reordered responsens to be proccessed we
* do not want it to update last received response sequence. This may lead
* to processing one of the response that were reordered and should be
* discarded.
*
* example:
* assumptions:
*
* - [ seq: ... , lo: ...] denotes a response with given sequence (seq)
* containing information about last log offset of a follower (lo)
*
* - request are processed from left to right
*
* [ seq: 100, lo: 10][ seq:97, lo: 11 ][ seq:98, lo: 9 ]
*
* In this case we want to accept request with seq 100, but not the one with
* seq 98, updating the last_received_seq unconditionally would cause
* accepting request with seq 98, which should be rejected
*/
idx.last_received_seq = std::max(seq, idx.last_received_seq);

// check preconditions for processing the reply
if (!is_leader()) {
vlog(_ctxlog.debug, "ignorring append entries reply, not leader");
Expand Down
35 changes: 23 additions & 12 deletions src/v/raft/replicate_batcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,23 @@

namespace raft {
using namespace std::chrono_literals; // NOLINT
replicate_batcher::replicate_batcher(consensus* ptr, size_t cache_size)
replicate_batcher::replicate_batcher(
consensus* ptr,
std::chrono::milliseconds debounce_duration,
size_t cache_size)
: _ptr(ptr)
, _debounce_duration(debounce_duration)
, _max_batch_size(cache_size) {
_flush_timer.set_callback([this] {
(void)ss::with_gate(_ptr->_bg, [this] {
// background block further caching too
return _lock.with([this] { return flush(); });
}).handle_exception_type([this](const ss::gate_closed_exception&) {
vlog(
_ptr->_ctxlog.debug,
"Gate closed while flushing replicate requests");
});
_flush_timer.set_callback([this] { dispatch_background_flush(); });
}

void replicate_batcher::dispatch_background_flush() {
(void)ss::with_gate(_ptr->_bg, [this] {
// background block further caching too
return _lock.with([this] { return flush(); });
}).handle_exception_type([this](const ss::gate_closed_exception&) {
vlog(
_ptr->_ctxlog.debug, "Gate closed while flushing replicate requests");
});
}

Expand All @@ -46,8 +51,14 @@ replicate_batcher::replicate(model::record_batch_reader&& r) {
.with(
[this, r = std::move(r)]() mutable { return do_cache(std::move(r)); })
.then([this](item_ptr i) {
if (_pending_bytes < _max_batch_size || !_flush_timer.armed()) {
_flush_timer.rearm(clock_type::now() + 4ms);
if (_pending_bytes >= _max_batch_size) {
_flush_timer.cancel();
dispatch_background_flush();
return i->_promise.get_future();
}

if (!_flush_timer.armed()) {
_flush_timer.rearm(clock_type::now() + _debounce_duration);
}
return i->_promise.get_future();
});
Expand Down
9 changes: 7 additions & 2 deletions src/v/raft/replicate_batcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "model/record_batch_reader.h"
#include "outcome.h"
#include "raft/types.h"
#include "units.h"
#include "utils/mutex.h"

#include <absl/container/flat_hash_map.h>
Expand All @@ -29,10 +30,12 @@ class replicate_batcher {
};
using item_ptr = ss::lw_shared_ptr<item>;
// 1MB default size
static constexpr size_t default_batch_bytes = 1024 * 1024;
static constexpr size_t default_batch_bytes = 1_MiB;

explicit replicate_batcher(
consensus* ptr, size_t cache_size = default_batch_bytes);
consensus* ptr,
std::chrono::milliseconds debounce_duration,
size_t cache_size = default_batch_bytes);

replicate_batcher(replicate_batcher&&) noexcept = default;
replicate_batcher& operator=(replicate_batcher&&) noexcept = delete;
Expand All @@ -55,8 +58,10 @@ class replicate_batcher {

private:
ss::future<item_ptr> do_cache(model::record_batch_reader&&);
void dispatch_background_flush();

consensus* _ptr;
std::chrono::milliseconds _debounce_duration;
size_t _max_batch_size{default_batch_bytes};
size_t _pending_bytes{0};
timer_type _flush_timer;
Expand Down

0 comments on commit 15e67a1

Please sign in to comment.