Skip to content

Commit

Permalink
commitlog: Add a(n empty) message loop
Browse files Browse the repository at this point in the history
Does not do anything, but is the basis for eventual moving of
segment related tasks into it.
  • Loading branch information
Calle Wilund authored and elcallio committed Jul 6, 2022
1 parent 4d71e1c commit 3c5343c
Showing 1 changed file with 117 additions and 0 deletions.
117 changes: 117 additions & 0 deletions db/commitlog/commitlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
#include <unordered_set>
#include <exception>
#include <filesystem>
#include <ranges>

#include <seastar/core/align.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/metrics.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/file.hh>
#include <seastar/core/rwlock.hh>
Expand All @@ -46,8 +48,11 @@
#include "db/extensions.hh"
#include "utils/data_input.hh"
#include "utils/crc.hh"
#include "utils/chunked_vector.hh"
#include "utils/runtime.hh"
#include "utils/flush_queue.hh"
#include "utils/overloaded_functor.hh"
#include "utils/waitable_counter.hh"
#include "log.hh"
#include "commitlog_entry.hh"
#include "commitlog_extensions.hh"
Expand Down Expand Up @@ -492,12 +497,82 @@ class db::commitlog::segment_manager : public ::enable_shared_from_this<segment_

void flush_segments(uint64_t size_to_remove);

using message_clock = clock_type;

template<typename T>
class message_queue {
seastar::condition_variable _signal;
seastar::circular_buffer<T> _buffer;
bool _closed = false;
bool _signaled = false;
bool should_wakeup() noexcept {
return std::exchange(_signaled, false) || !empty() || _closed;
}
public:
void push(T&& t) {
_buffer.push_back(std::move(t));
if (_signal.has_waiters()) {
_signal.signal();
}
}
auto wait() noexcept {
return _signal.when(std::bind(&message_queue::should_wakeup, this));
}
auto wait(message_clock::time_point until) noexcept {
return _signal.when(until, std::bind(&message_queue::should_wakeup, this));
}
T& front() noexcept {
return _buffer.front();
}
T pop() noexcept {
T t = std::move(_buffer.front());
_buffer.pop_front();
return t;
}
bool empty() const noexcept {
return _buffer.empty();
}
size_t size() const noexcept {
return _buffer.size();
}
void clear() noexcept {
_buffer.clear();
}
void close() {
_closed = true;
_signal.broadcast();
}
void signal() noexcept {
_signaled = true;
_signal.signal();
}
bool closed() const {
return _closed;
}
};

struct shutdown_request {};

using message = std::variant<
shutdown_request
// to come...
>;

static inline constexpr auto immediately = message_clock::time_point::min();
static inline constexpr auto no_timeout = message_clock::time_point::max();

void send(message);
void wakeup();


private:
class shutdown_marker{};

future<> clear_reserve_segments();
void abort_recycled_list(std::exception_ptr);

future<> message_loop();

size_t max_request_controller_units() const;
segment_id_type _ids = 0;
std::vector<sseg_ptr> _segments;
Expand All @@ -513,6 +588,8 @@ class db::commitlog::segment_manager : public ::enable_shared_from_this<segment_
seastar::gate _gate;
uint64_t _new_counter = 0;
std::optional<size_t> _disk_write_alignment;
message_queue<message> _message_queue;
future<> _message_loop;
};

future<> db::commitlog::segment_manager::named_file::open(open_flags flags, file_open_options opt, std::optional<uint64_t> size_in) noexcept {
Expand Down Expand Up @@ -1390,6 +1467,7 @@ db::commitlog::segment_manager::segment_manager(config c)
, _recycled_segments(std::numeric_limits<size_t>::max())
, _reserve_replenisher(make_ready_future<>())
, _background_sync(make_ready_future<>())
, _message_loop(make_ready_future<>())
{
assert(max_size > 0);
assert(max_mutation_size < segment::multi_entry_size_magic);
Expand All @@ -1403,6 +1481,40 @@ db::commitlog::segment_manager::segment_manager(config c)
}
}

void db::commitlog::segment_manager::send(message m) {
_message_queue.push(std::move(m));
}

void db::commitlog::segment_manager::wakeup() {
if (_message_queue.empty()) {
_message_queue.signal();
}
}

future<> db::commitlog::segment_manager::message_loop() {
bool shutdown = false;

while (!shutdown || !_message_queue.empty()) {
auto next_wakeup = no_timeout;
auto now = message_clock::now();

// handling timeout exception with a "handle_exception" and just assuming
// it is this error, and not an actual one, since we don't sent exceptions
// through the cond/queue.
// handling it in a handle_exception handler prevents actual throw from
// occuring, which prevents unwarranted exception stalls here every N ms
co_await _message_queue.wait(next_wakeup).handle_exception([](auto ignored) {});

while (!_message_queue.empty()) {
auto m = _message_queue.pop();

std::visit(overloaded_functor {
[&](const shutdown_request&) { shutdown = true; },
}, m);
}
}
}

size_t db::commitlog::segment_manager::max_request_controller_units() const {
return max_mutation_size + db::commitlog::segment::default_size;
}
Expand Down Expand Up @@ -1480,6 +1592,7 @@ future<> db::commitlog::segment_manager::init() {
_ids = replay_position(this_shard_id(), id).id;
// always run the timer now, since we need to handle segment pre-alloc etc as well.
_timer.set_callback(std::bind(&segment_manager::on_timer, this));
_message_loop = message_loop();
auto delay = this_shard_id() * std::ceil(double(cfg.commitlog_sync_period_in_ms) / smp::count);
clogger.trace("Delaying timer loop {} ms", delay);
// We need to wait until we have scanned all other segments to actually start serving new
Expand Down Expand Up @@ -2022,6 +2135,10 @@ future<> db::commitlog::segment_manager::shutdown() {
} catch (...) {
p = std::current_exception();
}

send(shutdown_request{});
co_await std::move(_message_loop);

// slight functional change from non-coroutine version: we propagate all/any
// exceptions, not just the replenish one.
if (p) {
Expand Down

0 comments on commit 3c5343c

Please sign in to comment.