diff --git a/src/raft.rs b/src/raft.rs index 501a185fa..7b73d8a56 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -128,6 +128,9 @@ pub struct Raft { /// The list of messages. pub msgs: Vec, + /// The list of heartbeat messages. + pub heartbeats: Vec, + /// The leader id pub leader_id: u64, @@ -246,6 +249,7 @@ impl Raft { election_timeout: c.election_tick, votes: Default::default(), msgs: Default::default(), + heartbeats: Default::default(), leader_id: Default::default(), lead_transferee: None, term: Default::default(), @@ -710,10 +714,12 @@ impl Raft { m.set_msg_type(MessageType::MsgHeartbeat); let commit = cmp::min(pr.matched, self.raft_log.committed); m.commit = commit; + m.from = self.id; + m.term = self.term; if let Some(context) = ctx { m.context = context; } - self.send(m); + self.heartbeats.push(m); } /// Sends RPC, with entries to all peers that are not up-to-date @@ -2005,7 +2011,16 @@ impl Raft { to_send.to = m.from; to_send.context = m.take_context(); to_send.commit = self.raft_log.committed; - self.send(to_send); + if self.raft_log.committed > self.raft_log.store.last_index().unwrap() + { + // If there is some entries that has committed in memory but not persisted, the message + // shall not be sent until all entries before committed_index have been persisted. + self.send(to_send); + } else { + to_send.term = self.term; + to_send.from = self.id; + self.heartbeats.push(to_send); + } } fn handle_snapshot(&mut self, mut m: Message) {