diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 88915a9f0005..607767c66d06 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -19,11 +19,13 @@ #include #include #include +#include #include #include #include #include +#include #include #include #include @@ -46,8 +48,11 @@ #include "db/extensions.hh" #include "utils/data_input.hh" #include "utils/crc.hh" +#include "utils/chunked_vector.hh" #include "utils/runtime.hh" #include "utils/flush_queue.hh" +#include "utils/overloaded_functor.hh" +#include "utils/waitable_counter.hh" #include "log.hh" #include "commitlog_entry.hh" #include "commitlog_extensions.hh" @@ -492,12 +497,82 @@ class db::commitlog::segment_manager : public ::enable_shared_from_this + class message_queue { + seastar::condition_variable _signal; + seastar::circular_buffer _buffer; + bool _closed = false; + bool _signaled = false; + bool should_wakeup() noexcept { + return std::exchange(_signaled, false) || !empty() || _closed; + } + public: + void push(T&& t) { + _buffer.push_back(std::move(t)); + if (_signal.has_waiters()) { + _signal.signal(); + } + } + auto wait() noexcept { + return _signal.when(std::bind(&message_queue::should_wakeup, this)); + } + auto wait(message_clock::time_point until) noexcept { + return _signal.when(until, std::bind(&message_queue::should_wakeup, this)); + } + T& front() noexcept { + return _buffer.front(); + } + T pop() noexcept { + T t = std::move(_buffer.front()); + _buffer.pop_front(); + return t; + } + bool empty() const noexcept { + return _buffer.empty(); + } + size_t size() const noexcept { + return _buffer.size(); + } + void clear() noexcept { + _buffer.clear(); + } + void close() { + _closed = true; + _signal.broadcast(); + } + void signal() noexcept { + _signaled = true; + _signal.signal(); + } + bool closed() const { + return _closed; + } + }; + + struct shutdown_request {}; + + using message = std::variant< + shutdown_request + // to come... + >; + + static inline constexpr auto immediately = message_clock::time_point::min(); + static inline constexpr auto no_timeout = message_clock::time_point::max(); + + void send(message); + void wakeup(); + + private: class shutdown_marker{}; future<> clear_reserve_segments(); void abort_recycled_list(std::exception_ptr); + future<> message_loop(); + size_t max_request_controller_units() const; segment_id_type _ids = 0; std::vector _segments; @@ -513,6 +588,8 @@ class db::commitlog::segment_manager : public ::enable_shared_from_this _disk_write_alignment; + message_queue _message_queue; + future<> _message_loop; }; future<> db::commitlog::segment_manager::named_file::open(open_flags flags, file_open_options opt, std::optional size_in) noexcept { @@ -1390,6 +1467,7 @@ db::commitlog::segment_manager::segment_manager(config c) , _recycled_segments(std::numeric_limits::max()) , _reserve_replenisher(make_ready_future<>()) , _background_sync(make_ready_future<>()) + , _message_loop(make_ready_future<>()) { assert(max_size > 0); assert(max_mutation_size < segment::multi_entry_size_magic); @@ -1403,6 +1481,40 @@ db::commitlog::segment_manager::segment_manager(config c) } } +void db::commitlog::segment_manager::send(message m) { + _message_queue.push(std::move(m)); +} + +void db::commitlog::segment_manager::wakeup() { + if (_message_queue.empty()) { + _message_queue.signal(); + } +} + +future<> db::commitlog::segment_manager::message_loop() { + bool shutdown = false; + + while (!shutdown || !_message_queue.empty()) { + auto next_wakeup = no_timeout; + auto now = message_clock::now(); + + // handling timeout exception with a "handle_exception" and just assuming + // it is this error, and not an actual one, since we don't sent exceptions + // through the cond/queue. + // handling it in a handle_exception handler prevents actual throw from + // occuring, which prevents unwarranted exception stalls here every N ms + co_await _message_queue.wait(next_wakeup).handle_exception([](auto ignored) {}); + + while (!_message_queue.empty()) { + auto m = _message_queue.pop(); + + std::visit(overloaded_functor { + [&](const shutdown_request&) { shutdown = true; }, + }, m); + } + } +} + size_t db::commitlog::segment_manager::max_request_controller_units() const { return max_mutation_size + db::commitlog::segment::default_size; } @@ -1480,6 +1592,7 @@ future<> db::commitlog::segment_manager::init() { _ids = replay_position(this_shard_id(), id).id; // always run the timer now, since we need to handle segment pre-alloc etc as well. _timer.set_callback(std::bind(&segment_manager::on_timer, this)); + _message_loop = message_loop(); auto delay = this_shard_id() * std::ceil(double(cfg.commitlog_sync_period_in_ms) / smp::count); clogger.trace("Delaying timer loop {} ms", delay); // We need to wait until we have scanned all other segments to actually start serving new @@ -2022,6 +2135,10 @@ future<> db::commitlog::segment_manager::shutdown() { } catch (...) { p = std::current_exception(); } + + send(shutdown_request{}); + co_await std::move(_message_loop); + // slight functional change from non-coroutine version: we propagate all/any // exceptions, not just the replenish one. if (p) {