Skip to content

Commit

Permalink
Merge pull request #16573 from vbotbuildovich/backport-pr-16560-v23.3…
Browse files Browse the repository at this point in the history
….x-242

[v23.3.x] Use confirmed term as a source of leader epoch
  • Loading branch information
mmaslankaprv committed Feb 20, 2024
2 parents 361ed48 + 025458e commit 78642d6
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
13 changes: 11 additions & 2 deletions src/v/kafka/server/replicated_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,18 @@ class replicated_partition final : public kafka::partition_proxy::impl {

ss::future<std::optional<model::offset>>
get_leader_epoch_last_offset(kafka::leader_epoch) const final;

/**
* A leader epoch is used by Kafka clients to determine if a replica is up
* to date with the leader and to detect truncation.
*
* The leader epoch differs from Raft term as the term is updated when
* leader election starts. Whereas the leader epoch is updated after the
* state of the replica is determined. Therefore the leader epoch uses
* confirmed term instead of the simple term which is incremented every time
* the leader election starts.
*/
kafka::leader_epoch leader_epoch() const final {
return leader_epoch_from_term(_partition->term());
return leader_epoch_from_term(_partition->raft()->confirmed_term());
}

ss::future<error_code> validate_fetch_offset(
Expand Down
3 changes: 2 additions & 1 deletion src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1976,7 +1976,7 @@ consensus::do_append_entries(append_entries_request&& r) {
maybe_update_last_visible_index(last_visible);
_last_leader_visible_offset = std::max(
request_metadata.last_visible_index, _last_leader_visible_offset);

_confirmed_term = _term;
if (_follower_recovery_state) {
vlog(
_ctxlog.debug,
Expand Down Expand Up @@ -2095,6 +2095,7 @@ consensus::do_append_entries(append_entries_request&& r) {
maybe_update_last_visible_index(last_visible);
_last_leader_visible_offset = std::max(
m.last_visible_index, _last_leader_visible_offset);
_confirmed_term = _term;
return maybe_update_follower_commit_idx(model::offset(m.commit_index))
.then([this, m, ofs, target] {
if (_follower_recovery_state) {
Expand Down
26 changes: 15 additions & 11 deletions src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -782,17 +782,21 @@ class consensus {
// consensus state
model::offset _commit_index;
model::term_id _term;
// It's common to use raft log as a foundation for state machines:
// when a node becomes a leader it replays the log, reconstructs
// the state and becomes ready to serve the requests. However it is
// not enough for a node to become a leader, it should successfully
// replicate a new record to be sure that older records stored in
// the local log were actually replicated and do not constitute an
// artifact of the previously crashed leader. Redpanda uses a confi-
// guration batch for the initial replication to gain certainty. When
// commit index moves past the configuration batch _confirmed_term
// gets updated. So when _term==_confirmed_term it's safe to use
// local log to reconstruct the state.

/**
* A confirmed term is used to determine if the state of a replica is up to
* date after the leader election. Only after the confirmed term is equal to
* the current term one can reason about the Raft group state.
*
* On the leader the confirmed term is updated after first successful
* replication of a batch subsequent to a leader election. After the
* replication succeed leader is guaranteed to have up to date committed and
* visible offsets.
*
* On the follower the confirmed term is updated only when an append entries
* request from the current leader may be accepted and follower may return
* success.
*/
model::term_id _confirmed_term;
model::offset _flushed_offset{};

Expand Down

0 comments on commit 78642d6

Please sign in to comment.