Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce heartbeat message #313

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,41 +545,42 @@ impl<T: Storage> Raft<T> {
is_batched
}

/// Sends RPC, with entries to the given peer.
pub fn send_append(&mut self, to: u64, pr: &mut Progress) {
/// Sends RPC, with entries to the given peer. Returns true if a message was sent.
pub fn send_append(&mut self, to: u64, pr: &mut Progress) -> bool {
if pr.is_paused() {
trace!(
self.logger,
"Skipping sending to {to}, it's paused",
to = to;
"progress" => ?pr,
);
return;
return false;
}
let mut m = Message::default();
m.to = to;
if pr.pending_request_snapshot != INVALID_INDEX {
// Check pending request snapshot first to avoid unnecessary loading entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
return false;
}
} else {
let term = self.raft_log.term(pr.next_idx - 1);
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
if term.is_err() || ents.is_err() {
// send snapshot if we failed to get term or entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
return false;
}
} else {
let mut ents = ents.unwrap();
if self.batch_append && self.try_batching(to, pr, &mut ents) {
return;
return true;
}
self.prepare_send_entries(&mut m, pr, term.unwrap(), ents);
}
}
self.send(m);
true
}

// send_heartbeat sends an empty MsgAppend
Expand All @@ -606,10 +607,21 @@ impl<T: Storage> Raft<T> {
pub fn bcast_append(&mut self) {
let self_id = self.id;
let mut prs = self.take_prs();
let mut bcast_msg = true;
prs.iter_mut()
.filter(|&(id, _)| *id != self_id)
.for_each(|(id, pr)| self.send_append(*id, pr));
.for_each(|(id, pr)| {
if !self.send_append(*id, pr) && bcast_msg {
bcast_msg = false;
}
});
self.set_prs(prs);
// When there is no pending Readindex, MsgAppend will do the
// work of MsgHeartbeat, so we can reset heartbeat_elapsed
// here to reduce MsgHeartbeat
if bcast_msg && self.read_only.last_pending_request_ctx().is_none() {
self.heartbeat_elapsed = 0;
}
}

/// Broadcasts heartbeats to all the followers if it's leader.
Expand Down