Skip to content

Commit

Permalink
Merge pull request #15333 from vbotbuildovich/backport-pr-15294-v23.2…
Browse files Browse the repository at this point in the history
….x-315

[v23.2.x] Made leader_id and term changes atomic
  • Loading branch information
piyushredpanda committed Dec 14, 2023
2 parents 5c0a87b + 8c8cce4 commit b4de333
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
25 changes: 15 additions & 10 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1830,24 +1830,19 @@ consensus::do_append_entries(append_entries_request&& r) {
if (request_metadata.term > _term) {
vlog(
_ctxlog.debug,
"Append entries request term:{} is greater than current: {}. "
"Setting "
"new term",
"Append entries request term: {} is greater than current: {}. "
"Setting new term",
request_metadata.term,
_term);
_term = request_metadata.term;
_voted_for = {};
maybe_update_leader(r.source_node());

return do_append_entries(std::move(r));
}

// raft.pdf:If AppendEntries RPC received from new leader: convert to
// follower (§5.2)
_vstate = vote_state::follower;
if (unlikely(_leader_id != r.source_node())) {
_leader_id = r.source_node();
_follower_reply.broadcast();
trigger_leadership_notification();
}
maybe_update_leader(r.source_node());

// raft.pdf: Reply false if log doesn’t contain an entry at
// prevLogIndex whose term matches prevLogTerm (§5.3)
Expand Down Expand Up @@ -2030,6 +2025,14 @@ consensus::do_append_entries(append_entries_request&& r) {
});
}

void consensus::maybe_update_leader(vnode request_node) {
if (unlikely(_leader_id != request_node)) {
_leader_id = request_node;
_follower_reply.broadcast();
trigger_leadership_notification();
}
}

ss::future<install_snapshot_reply>
consensus::install_snapshot(install_snapshot_request&& r) {
return with_gate(_bg, [this, r = std::move(r)]() mutable {
Expand Down Expand Up @@ -2179,8 +2182,10 @@ consensus::do_install_snapshot(install_snapshot_request r) {
_term = r.term;
_voted_for = {};
do_step_down("install_snapshot_term_greater");
maybe_update_leader(r.source_node());
co_return co_await do_install_snapshot(std::move(r));
}
maybe_update_leader(r.source_node());

// Create new snapshot file if first chunk (offset is 0) (§7.2)
if (r.file_offset == 0) {
Expand Down
2 changes: 2 additions & 0 deletions src/v/raft/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ class consensus {
*/
ss::future<> maybe_flush_log(size_t threshold_bytes);

inline void maybe_update_leader(vnode request_node);

private:
friend replicate_entries_stm;
friend vote_stm;
Expand Down

0 comments on commit b4de333

Please sign in to comment.