Skip to content

Commit

Permalink
Merge pull request #14981 from vbotbuildovich/backport-pr-14929-v23.2…
Browse files Browse the repository at this point in the history
….x-287

[v23.2.x] Fixed handling Raft snapshot
  • Loading branch information
piyushredpanda committed Nov 18, 2023
2 parents 06c16e3 + bc7a296 commit 8ce99c2
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 6 deletions.
1 change: 1 addition & 0 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2255,6 +2255,7 @@ ss::future<install_snapshot_reply> consensus::finish_snapshot(
}

ss::future<> consensus::write_snapshot(write_snapshot_cfg cfg) {
auto holder = _bg.hold();
model::offset last_included_index = cfg.last_included_index;
bool updated = co_await _snapshot_lock
.with([this, cfg = std::move(cfg)]() mutable {
Expand Down
28 changes: 22 additions & 6 deletions src/v/raft/state_machine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@

#include "raft/state_machine.h"

#include "likely.h"
#include "model/fundamental.h"
#include "model/record_batch_reader.h"
#include "raft/consensus.h"
#include "ssx/future-util.h"
#include "storage/log.h"
#include "storage/record_batch_builder.h"

#include <exception>

namespace raft {

state_machine::state_machine(
Expand Down Expand Up @@ -124,17 +128,29 @@ ss::future<result<replicate_result>> state_machine::quorum_write_empty_batch(
});
});
}

ss::future<> state_machine::maybe_apply_raft_snapshot() {
// a loop here is required as install snapshot request may be processed by
// Raft while handling the other snapshot. In this case a new snapshot
// should be applied to the STM.

while (_next < _raft->start_offset()) {
try {
co_await handle_raft_snapshot();
} catch (...) {
const auto& e = std::current_exception();
if (!ssx::is_shutdown_exception(e)) {
vlog(_log.error, "Error applying Raft snapshot - {}", e);
}
std::rethrow_exception(e);
}
}
}
ss::future<> state_machine::apply() {
// wait until consensus commit index is >= _next
return _raft->events()
.wait(_next, model::no_timeout, _as)
.then([this] {
auto f = ss::now();
if (_next < _raft->start_offset()) {
f = handle_raft_snapshot();
}
return f.then([this] {
return maybe_apply_raft_snapshot().then([this] {
/**
* Raft make_reader method allows callers reading up to
* last_visible index. In order to make the STMs safe and working
Expand Down
1 change: 1 addition & 0 deletions src/v/raft/state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class state_machine {
friend batch_applicator;

ss::future<> apply();
ss::future<> maybe_apply_raft_snapshot();
bool stop_batch_applicator();

ss::io_priority_class _io_prio;
Expand Down

0 comments on commit 8ce99c2

Please sign in to comment.