From cdfcb55873d143f94494ba0ad09c8e854f17552d Mon Sep 17 00:00:00 2001 From: LintianShi Date: Tue, 26 Jul 2022 22:05:50 +0800 Subject: [PATCH 01/21] Add new message type MsgGroupBroadcast and corresponding handler Signed-off-by: LintianShi --- proto/proto/eraftpb.proto | 13 ++++++ src/config.rs | 5 ++ src/raft.rs | 97 ++++++++++++++++++++++++++++++++++++--- 3 files changed, 108 insertions(+), 7 deletions(-) diff --git a/proto/proto/eraftpb.proto b/proto/proto/eraftpb.proto index 1f7c71b2..c1f275d1 100644 --- a/proto/proto/eraftpb.proto +++ b/proto/proto/eraftpb.proto @@ -46,6 +46,17 @@ message Snapshot { SnapshotMetadata metadata = 2; } +// Forward is a type that tells the agent how to forward the MsgGroupBroadcast from the leader. +// +// Field to is the destination of forwarding. +// Field log_term and index is the previous entry of log entries that should be forwarded. +// Entries to be forwarded is the range (index, last_index]. +message Forward { + uint64 to = 1; + uint64 log_term = 2; + uint64 index = 3; +} + enum MessageType { MsgHup = 0; MsgBeat = 1; @@ -66,6 +77,7 @@ enum MessageType { MsgReadIndexResp = 16; MsgRequestPreVote = 17; MsgRequestPreVoteResponse = 18; + MsgGroupBroadcast = 19; } message Message { @@ -89,6 +101,7 @@ message Message { uint64 reject_hint = 11; bytes context = 12; uint64 priority = 14; + repeated Forward forwards = 16; } message HardState { diff --git a/src/config.rs b/src/config.rs index 392540db..3452d0d4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -67,6 +67,10 @@ pub struct Config { /// rejoins the cluster. pub pre_vote: bool, + /// Enables follower replication. + /// This reduces the across-AZ traffic of cloud deployment. + pub follower_replication: bool, + /// The range of election timeout. In some cases, we hope some nodes has less possibility /// to become leader. This configuration ensures that the randomized election_timeout /// will always be suit in [min_election_tick, max_election_tick). @@ -112,6 +116,7 @@ impl Default for Config { max_inflight_msgs: 256, check_quorum: false, pre_vote: false, + follower_replication: false, min_election_tick: 0, max_election_tick: 0, read_only_option: ReadOnlyOption::Safe, diff --git a/src/raft.rs b/src/raft.rs index fe397ed1..a1f841ad 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -236,6 +236,11 @@ pub struct RaftCore { /// Enable this if greater cluster stability is preferred over faster elections. pub pre_vote: bool, + // Enable follower replication. + // This enables data replication from a follower to other servers in the same available zone. + // Enable this for reducing across-AZ traffic of cloud deployment. + follower_replication: bool, + skip_bcast_commit: bool, batch_append: bool, @@ -337,6 +342,7 @@ impl Raft { promotable: false, check_quorum: c.check_quorum, pre_vote: c.pre_vote, + follower_replication: c.follower_replication, read_only: ReadOnly::new(c.read_only_option), heartbeat_timeout: c.heartbeat_tick, election_timeout: c.election_tick, @@ -455,6 +461,11 @@ impl Raft { hs } + /// Whether enable follower replication. + pub fn follower_replication(&self) -> bool { + self.follower_replication + } + /// Returns whether the current raft is in lease. pub fn in_lease(&self) -> bool { self.state == StateRole::Leader && self.check_quorum @@ -1372,6 +1383,7 @@ impl Raft { if m.get_msg_type() == MessageType::MsgAppend || m.get_msg_type() == MessageType::MsgHeartbeat || m.get_msg_type() == MessageType::MsgSnapshot + || m.get_msg_type() == MessageType::MsgGroupBroadcast { self.become_follower(m.term, m.from); } else { @@ -1381,7 +1393,8 @@ impl Raft { } else if m.term < self.term { if (self.check_quorum || self.pre_vote) && (m.get_msg_type() == MessageType::MsgHeartbeat - || m.get_msg_type() == MessageType::MsgAppend) + || m.get_msg_type() == MessageType::MsgAppend + || m.get_msg_type() == MessageType::MsgGroupBroadcast) { // We have received messages from a leader at a lower term. It is possible // that these messages were simply delayed in the network, but this could @@ -2314,6 +2327,11 @@ impl Raft { self.leader_id = m.from; self.handle_append_entries(&m); } + MessageType::MsgGroupBroadcast => { + self.election_elapsed = 0; + self.leader_id = m.from; + self.handle_group_broadcast(&m); + } MessageType::MsgHeartbeat => { self.election_elapsed = 0; self.leader_id = m.from; @@ -2425,13 +2443,14 @@ impl Raft { Err(Error::RequestSnapshotDropped) } - // TODO: revoke pub when there is a better way to test. - /// For a given message, append the entries to the log. - pub fn handle_append_entries(&mut self, m: &Message) { + /// Try to append entries, and return the append result. + /// Return true only if the entries in the message has been appended in the log successfully. + pub fn try_append_entries(&mut self, m: &Message) -> bool { if self.pending_request_snapshot != INVALID_INDEX { self.send_request_snapshot(); - return; + return false; } + if m.index < self.raft_log.committed { debug!( self.logger, @@ -2443,13 +2462,14 @@ impl Raft { to_send.index = self.raft_log.committed; to_send.commit = self.raft_log.committed; self.r.send(to_send, &mut self.msgs); - return; + return false; } let mut to_send = Message::default(); to_send.to = m.from; to_send.set_msg_type(MessageType::MsgAppendResponse); + let mut success = true; if let Some((_, last_idx)) = self .raft_log .maybe_append(m.index, m.log_term, m.commit, &m.entries) @@ -2458,7 +2478,7 @@ impl Raft { } else { debug!( self.logger, - "rejected msgApp [logterm: {msg_log_term}, index: {msg_index}] \ + "reject append [logterm: {msg_log_term}, index: {msg_index}] \ from {from}", msg_log_term = m.log_term, msg_index = m.index, @@ -2483,9 +2503,72 @@ impl Raft { to_send.reject = true; to_send.reject_hint = hint_index; to_send.log_term = hint_term.unwrap(); + success = false; } to_send.set_commit(self.raft_log.committed); self.r.send(to_send, &mut self.msgs); + success + } + + // TODO: revoke pub when there is a better way to test. + /// For a given message, append the entries to the log. + pub fn handle_append_entries(&mut self, m: &Message) { + self.try_append_entries(m); + } + + /// For a broadcast, append entries to local log and forward MsgAppend to other dest. + pub fn handle_group_broadcast(&mut self, m: &Message) { + if self.try_append_entries(m) { + // If the agent fails to append entries from the leader, + // the agent cannot forward MsgAppend. + for forward in m.get_forwards() { + // Fetch log entries from the forward.index to the last index of log. + if self + .raft_log + .match_term(forward.get_index(), forward.get_log_term()) + { + let ents = self.raft_log.entries( + forward.get_index() + 1, + self.max_msg_size, + GetEntriesContext(GetEntriesFor::SendAppend { + to: forward.get_to(), + term: m.term, + aggressively: false, + }), + ); + + let mut m_append = Message::default(); + m_append.to = forward.get_to(); + m_append.from = m.get_from(); + m_append.set_msg_type(MessageType::MsgAppend); + m_append.index = forward.get_index(); + m_append.log_term = forward.get_log_term(); + m_append.set_entries(ents.unwrap().into()); + m_append.commit = m.get_commit(); + m_append.commit_term = m.get_commit_term(); + self.r.send(m_append, &mut self.msgs) + } else { + warn!( + self.logger, + "The agent's log does not match with index {} log term {} in forward message to peer {}.", + forward.get_index(), + forward.get_log_term(), + forward.get_to() + ); + } + } + } else { + debug!( + self.logger, + "the agent rejects append [logterm: {msg_log_term}, index: {msg_index}] \ + from {from}", + msg_log_term = m.log_term, + msg_index = m.index, + from = m.from; + "index" => m.index, + "logterm" => ?self.raft_log.term(m.index), + ); + } } // TODO: revoke pub when there is a better way to test. From 40b6c967cbf17c34f2b2ef79d4c8eafd9f737652 Mon Sep 17 00:00:00 2001 From: LintianShi Date: Wed, 31 Aug 2022 11:40:31 +0800 Subject: [PATCH 02/21] Add MsgGroupBroadcastResponse and corresponding handler Signed-off-by: LintianShi --- proto/proto/eraftpb.proto | 1 + src/raft.rs | 57 +++++++++++++++++++++++++++++++-------- 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/proto/proto/eraftpb.proto b/proto/proto/eraftpb.proto index c1f275d1..af5174e1 100644 --- a/proto/proto/eraftpb.proto +++ b/proto/proto/eraftpb.proto @@ -78,6 +78,7 @@ enum MessageType { MsgRequestPreVote = 17; MsgRequestPreVoteResponse = 18; MsgGroupBroadcast = 19; + MsgGroupBroadcastResponse = 20; } message Message { diff --git a/src/raft.rs b/src/raft.rs index a1f841ad..8fbd4041 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -1823,6 +1823,20 @@ impl Raft { } } + fn handle_group_broadcast_response(&mut self, m: &Message) { + if m.reject { + // The agent failed to forward MsgAppend, so the leader re-sends it. + for forward in m.get_forwards() { + info!( + self.logger, + "The agent's index is {} while target peer's index is {}", + m.get_index(), + forward.get_index(); + ); + } + } + } + fn handle_heartbeat_response(&mut self, m: &Message) { // Update the node. Drop the value explicitly since we'll check the qourum after. let pr = match self.prs.get_mut(m.from) { @@ -2145,6 +2159,9 @@ impl Raft { MessageType::MsgAppendResponse => { self.handle_append_response(&m); } + MessageType::MsgGroupBroadcastResponse => { + self.handle_group_broadcast_response(&m); + } MessageType::MsgHeartbeatResponse => { self.handle_heartbeat_response(&m); } @@ -2271,6 +2288,11 @@ impl Raft { self.become_follower(m.term, m.from); self.handle_append_entries(&m); } + MessageType::MsgGroupBroadcast => { + debug_assert_eq!(self.term, m.term); + self.become_follower(m.term, m.from); + self.handle_group_broadcast(&m); + } MessageType::MsgHeartbeat => { debug_assert_eq!(self.term, m.term); self.become_follower(m.term, m.from); @@ -2537,16 +2559,29 @@ impl Raft { }), ); - let mut m_append = Message::default(); - m_append.to = forward.get_to(); - m_append.from = m.get_from(); - m_append.set_msg_type(MessageType::MsgAppend); - m_append.index = forward.get_index(); - m_append.log_term = forward.get_log_term(); - m_append.set_entries(ents.unwrap().into()); - m_append.commit = m.get_commit(); - m_append.commit_term = m.get_commit_term(); - self.r.send(m_append, &mut self.msgs) + match ents { + Ok(ents) => { + let mut m_append = Message::default(); + m_append.to = forward.get_to(); + m_append.from = m.get_from(); + m_append.set_msg_type(MessageType::MsgAppend); + m_append.index = forward.get_index(); + m_append.log_term = forward.get_log_term(); + m_append.set_entries(ents.into()); + m_append.commit = m.get_commit(); + m_append.commit_term = m.get_commit_term(); + self.r.send(m_append, &mut self.msgs); + } + Err(_) => { + warn!( + self.logger, + "The agent fails to fetch entries, index {} log term {} in forward message to peer {}.", + forward.get_index(), + forward.get_log_term(), + forward.get_to() + ); + } + } } else { warn!( self.logger, @@ -2558,7 +2593,7 @@ impl Raft { } } } else { - debug!( + info!( self.logger, "the agent rejects append [logterm: {msg_log_term}, index: {msg_index}] \ from {from}", From 05059d19f5ad9b063b75c80e2dea943dcaeb5ea2 Mon Sep 17 00:00:00 2001 From: LintianShi Date: Thu, 1 Sep 2022 13:56:48 +0800 Subject: [PATCH 03/21] Send MsgAppend with empty entries when agent cannot forward Signed-off-by: LintianShi --- src/raft.rs | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index 8fbd4041..96b7e709 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -18,8 +18,8 @@ use std::cmp; use std::ops::{Deref, DerefMut}; use crate::eraftpb::{ - ConfChange, ConfChangeV2, ConfState, Entry, EntryType, HardState, Message, MessageType, - Snapshot, + ConfChange, ConfChangeV2, ConfState, Entry, EntryType, Forward, HardState, Message, + MessageType, Snapshot, }; use protobuf::Message as _; use raft_proto::ConfChangeI; @@ -2573,6 +2573,7 @@ impl Raft { self.r.send(m_append, &mut self.msgs); } Err(_) => { + self.dummy_forward(m, forward); warn!( self.logger, "The agent fails to fetch entries, index {} log term {} in forward message to peer {}.", @@ -2583,6 +2584,7 @@ impl Raft { } } } else { + self.dummy_forward(m, forward); warn!( self.logger, "The agent's log does not match with index {} log term {} in forward message to peer {}.", @@ -2593,6 +2595,9 @@ impl Raft { } } } else { + for forward in m.get_forwards() { + self.dummy_forward(m, forward); + } info!( self.logger, "the agent rejects append [logterm: {msg_log_term}, index: {msg_index}] \ @@ -2935,6 +2940,29 @@ impl Raft { self.lead_transferee = None; } + // Forward MsgAppend with empty entries in order to update commit + // or trigger decrementing next_idx. + fn dummy_forward(&mut self, m: &Message, forward: &Forward) { + let mut m_append = Message::default(); + m_append.to = forward.get_to(); + m_append.from = m.get_from(); + m_append.set_msg_type(MessageType::MsgAppend); + m_append.index = forward.get_index(); + m_append.log_term = forward.get_log_term(); + m_append.commit = m.get_commit(); + m_append.commit_term = m.get_commit_term(); + + info!( + self.logger, + "The agent forwards reserved empty log entry [logterm: {msg_log_term}, index: {msg_index}] \ + to peer {id}", + msg_log_term = forward.log_term, + msg_index = forward.index, + id = forward.to; + ); + self.r.send(m_append, &mut self.msgs); + } + fn send_request_snapshot(&mut self) { let mut m = Message::default(); m.set_msg_type(MessageType::MsgAppendResponse); From 6ed64659f6cd1cd95840477c50f0f7a321637c90 Mon Sep 17 00:00:00 2001 From: LintianShi Date: Wed, 10 Aug 2022 14:47:24 +0800 Subject: [PATCH 04/21] Inline function of querying the information of progress which supports agent selection Signed-off-by: LintianShi --- src/raft.rs | 12 ++++++++++++ src/tracker/progress.rs | 6 ++++++ 2 files changed, 18 insertions(+) diff --git a/src/raft.rs b/src/raft.rs index 96b7e709..97fc000e 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -3039,4 +3039,16 @@ impl Raft { pr.ins.set_cap(cap); } } + + // Whether this peer is active recently. + #[inline] + fn is_recent_active(&self, id: u64) -> bool { + self.prs().get(id).map_or(false, |pr| pr.recent_active) + } + + // Determine whether a progress is in Replicate state. + #[inline] + fn is_replicating(&self, id: u64) -> bool { + self.prs().get(id).map_or(false, |pr| pr.is_replicating()) + } } diff --git a/src/tracker/progress.rs b/src/tracker/progress.rs index 2f86f5a7..432e0d55 100644 --- a/src/tracker/progress.rs +++ b/src/tracker/progress.rs @@ -203,6 +203,12 @@ impl Progress { true } + /// Determine whether progress is in the Replicate state; + #[inline] + pub fn is_replicating(&self) -> bool { + self.state == ProgressState::Replicate + } + /// Determine whether progress is paused. #[inline] pub fn is_paused(&self) -> bool { From c5a6992441cb8acffa365b64c6760ea714861c7c Mon Sep 17 00:00:00 2001 From: LintianShi Date: Fri, 23 Sep 2022 18:17:21 +0800 Subject: [PATCH 05/21] async log entries fetch for forwarding Signed-off-by: LintianShi --- src/raft.rs | 158 ++++++++++++++++++++++++++---------------------- src/raw_node.rs | 18 ++++++ src/storage.rs | 14 +++++ 3 files changed, 117 insertions(+), 73 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index 97fc000e..00e20926 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -847,6 +847,74 @@ impl RaftCore { true } + fn send_forward( + &mut self, + from: u64, + commit: u64, + commit_term: u64, + forward: &Forward, + msgs: &mut Vec, + ) { + let mut m = Message::default(); + m.to = forward.to; + m.from = from; + m.commit = commit; + m.commit_term = commit_term; + m.set_msg_type(MessageType::MsgAppend); + // Fetch log entries from the forward.index to the last index of log. + if self + .raft_log + .match_term(forward.get_index(), forward.get_log_term()) + { + let ents = self.raft_log.entries( + forward.get_index() + 1, + self.max_msg_size, + GetEntriesContext(GetEntriesFor::SendForward { + from, + commit, + commit_term, + term: self.term, + forward: forward.clone(), + }), + ); + + match ents { + Ok(ents) => { + m.index = forward.get_index(); + m.log_term = forward.get_log_term(); + m.set_entries(ents.into()); + self.send(m, msgs); + } + Err(Error::Store(StorageError::LogTemporarilyUnavailable)) => {} + _ => { + // Forward MsgAppend with empty entries in order to update commit + // or trigger decrementing next_idx. + m.index = forward.get_index(); + m.log_term = forward.get_log_term(); + self.send(m, msgs); + warn!( + self.logger, + "The agent fails to fetch entries, index {} log term {} in forward message to peer {}.", + forward.get_index(), + forward.get_log_term(), + forward.get_to() + ); + } + } + } else { + m.index = forward.get_index(); + m.log_term = forward.get_log_term(); + self.send(m, msgs); + warn!( + self.logger, + "index {}, log term {} in forward message to peer {}, do not match the agent's raft log.", + forward.get_index(), + forward.get_log_term(), + forward.get_to() + ); + } + } + // send_heartbeat sends an empty MsgAppend fn send_heartbeat( &mut self, @@ -907,6 +975,12 @@ impl Raft { .for_each(|(id, pr)| core.send_append(*id, pr, msgs)); } + /// Forwards an append RPC from the leader to the given peer. + pub fn send_forward(&mut self, from: u64, commit: u64, commit_term: u64, forward: &Forward) { + self.r + .send_forward(from, commit, commit_term, forward, &mut self.msgs); + } + /// Broadcasts heartbeats to all the followers if it's leader. pub fn ping(&mut self) { if self.state == StateRole::Leader { @@ -2544,59 +2618,20 @@ impl Raft { // If the agent fails to append entries from the leader, // the agent cannot forward MsgAppend. for forward in m.get_forwards() { - // Fetch log entries from the forward.index to the last index of log. - if self - .raft_log - .match_term(forward.get_index(), forward.get_log_term()) - { - let ents = self.raft_log.entries( - forward.get_index() + 1, - self.max_msg_size, - GetEntriesContext(GetEntriesFor::SendAppend { - to: forward.get_to(), - term: m.term, - aggressively: false, - }), - ); - - match ents { - Ok(ents) => { - let mut m_append = Message::default(); - m_append.to = forward.get_to(); - m_append.from = m.get_from(); - m_append.set_msg_type(MessageType::MsgAppend); - m_append.index = forward.get_index(); - m_append.log_term = forward.get_log_term(); - m_append.set_entries(ents.into()); - m_append.commit = m.get_commit(); - m_append.commit_term = m.get_commit_term(); - self.r.send(m_append, &mut self.msgs); - } - Err(_) => { - self.dummy_forward(m, forward); - warn!( - self.logger, - "The agent fails to fetch entries, index {} log term {} in forward message to peer {}.", - forward.get_index(), - forward.get_log_term(), - forward.get_to() - ); - } - } - } else { - self.dummy_forward(m, forward); - warn!( - self.logger, - "The agent's log does not match with index {} log term {} in forward message to peer {}.", - forward.get_index(), - forward.get_log_term(), - forward.get_to() - ); - } + self.r + .send_forward(m.from, m.commit, m.commit_term, forward, &mut self.msgs); } } else { for forward in m.get_forwards() { - self.dummy_forward(m, forward); + let mut m_append = Message::default(); + m_append.to = forward.to; + m_append.from = m.from; + m_append.commit = m.commit; + m_append.commit_term = m.commit_term; + m_append.set_msg_type(MessageType::MsgAppend); + m_append.index = forward.get_index(); + m_append.log_term = forward.get_log_term(); + self.r.send(m_append, &mut self.msgs); } info!( self.logger, @@ -2940,29 +2975,6 @@ impl Raft { self.lead_transferee = None; } - // Forward MsgAppend with empty entries in order to update commit - // or trigger decrementing next_idx. - fn dummy_forward(&mut self, m: &Message, forward: &Forward) { - let mut m_append = Message::default(); - m_append.to = forward.get_to(); - m_append.from = m.get_from(); - m_append.set_msg_type(MessageType::MsgAppend); - m_append.index = forward.get_index(); - m_append.log_term = forward.get_log_term(); - m_append.commit = m.get_commit(); - m_append.commit_term = m.get_commit_term(); - - info!( - self.logger, - "The agent forwards reserved empty log entry [logterm: {msg_log_term}, index: {msg_index}] \ - to peer {id}", - msg_log_term = forward.log_term, - msg_index = forward.index, - id = forward.to; - ); - self.r.send(m_append, &mut self.msgs); - } - fn send_request_snapshot(&mut self) { let mut m = Message::default(); m.set_msg_type(MessageType::MsgAppendResponse); diff --git a/src/raw_node.rs b/src/raw_node.rs index a15a1489..d315339a 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -439,6 +439,24 @@ impl RawNode { self.raft.send_append(to) } } + GetEntriesFor::SendForward { + from, + commit, + commit_term, + term, + forward, + } => { + if self.raft.term != term || self.raft.state == StateRole::Leader { + // term or leadership has changed + // this peer is not the agent + return; + } + if self.raft.prs().get(forward.to).is_none() { + // the peer has been removed, do nothing + return; + } + self.raft.send_forward(from, commit, commit_term, &forward); + } GetEntriesFor::Empty(can_async) if can_async => {} _ => panic!("shouldn't call callback on non-async context"), } diff --git a/src/storage.rs b/src/storage.rs index f9b71fab..e05779bb 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -70,6 +70,7 @@ impl GetEntriesContext { pub fn can_async(&self) -> bool { match self.0 { GetEntriesFor::SendAppend { .. } => true, + GetEntriesFor::SendForward { .. } => true, GetEntriesFor::Empty(can_async) => can_async, _ => false, } @@ -87,6 +88,19 @@ pub(crate) enum GetEntriesFor { /// whether to exhaust all the entries aggressively: bool, }, + // for forwarding entries to followers + SendForward { + /// the peer id from which the entries are forwarded + from: u64, + /// the commit index in MsgGroupbroadcast + commit: u64, + /// the commit term in MsgGroupbroadcast + commit_term: u64, + /// the term when the request is issued + term: u64, + /// the forward information in MsgGroupbroadcast + forward: Forward, + }, // for getting committed entries in a ready GenReady, // for getting entries to check pending conf when transferring leader From 62da866418c54d3d7ef65f48a5416d3de439558c Mon Sep 17 00:00:00 2001 From: LintianShi Date: Wed, 5 Oct 2022 18:53:53 +0800 Subject: [PATCH 06/21] integration test cases for MsgGroupBroadcast Signed-off-by: LintianShi --- harness/tests/integration_cases/test_raft.rs | 280 +++++++++++++++++++ harness/tests/test_util/mod.rs | 8 + 2 files changed, 288 insertions(+) diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index dc9fd02f..c0597a53 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -1347,6 +1347,286 @@ fn test_handle_msg_append() { } } +#[test] +fn test_handle_groupbroadcast() { + let l = default_logger(); + let ng = |term, + log_term, + index, + commit, + ents: Option>, + forwards: Option>| { + let mut m = Message::default(); + m.set_msg_type(MessageType::MsgGroupBroadcast); + // always forward messages from peer id 100 in this test + m.from = 100; + m.term = term; + m.log_term = log_term; + m.index = index; + m.commit = commit; + if let Some(ets) = ents { + m.entries = ets.iter().map(|&(i, t)| empty_entry(t, i)).collect(); + } + if let Some(fwds) = forwards { + m.forwards = fwds + .iter() + .map(|&(to, t, i)| new_forward(to, t, i)) + .collect(); + } + m + }; + + let na = |to, term, log_term, index, commit, ents: Option>| { + let mut m = Message::default(); + m.set_msg_type(MessageType::MsgAppend); + m.to = to; + m.term = term; + m.log_term = log_term; + m.index = index; + m.commit = commit; + if let Some(ets) = ents { + m.entries = ets.iter().map(|&(i, t)| empty_entry(t, i)).collect(); + } + m + }; + + let valiadate_msg = |msgapp: &Message, w_msg: &Message, j: usize| { + if msgapp.msg_type != MessageType::MsgAppend { + panic!("#{}: msg_type should be MsgAppend", j); + } + if msgapp.from != 100 { + panic!("#{}: from = {}, want {}", j, msgapp.from, w_msg.from); + } + if msgapp.to != w_msg.to { + panic!("#{}: to = {}, want {}", j, msgapp.to, w_msg.to); + } + if msgapp.term != w_msg.term { + panic!("#{}: term = {}, want {}", j, msgapp.term, w_msg.term); + } + if msgapp.log_term != w_msg.log_term { + panic!( + "#{}: log_term = {}, want {}", + j, msgapp.log_term, w_msg.log_term + ); + } + if msgapp.index != w_msg.index { + panic!("#{}: index = {}, want {}", j, msgapp.index, w_msg.index); + } + if msgapp.commit != w_msg.commit { + panic!("#{}: commit = {}, want {}", j, msgapp.commit, w_msg.commit); + } + if msgapp.get_entries() != w_msg.get_entries() { + panic!( + "#{}: entries length = {}, want {}", + j, + msgapp.get_entries().len(), + w_msg.get_entries().len() + ); + } + true + }; + + let valiadate_tests = |mut tests: Vec<(Message, u64, u64, bool, Message, Message)>| { + for (j, (m, w_index, w_commit, w_reject, fwd1, fwd2)) in tests.drain(..).enumerate() { + let mut sm = new_test_raft_with_logs( + 1, + vec![1], + 10, + 1, + MemStorage::new(), + &[empty_entry(1, 1), empty_entry(2, 2)], + &l, + ); + + sm.become_follower(2, INVALID_ID); + sm.handle_group_broadcast(&m); + if sm.raft_log.last_index() != w_index { + panic!( + "#{}: last_index = {}, want {}", + j, + sm.raft_log.last_index(), + w_index + ); + } + if sm.raft_log.committed != w_commit { + panic!( + "#{}: committed = {}, want {}", + j, sm.raft_log.committed, w_commit + ); + } + let msg = sm.read_messages(); + if msg.len() != 3 { + panic!("#{}: msg count = {}, want 3", j, msg.len()); + } + if msg[0].reject != w_reject { + panic!("#{}: reject = {}, want {}", j, msg[0].reject, w_reject); + } + + valiadate_msg(&msg[1], &fwd1, j); + valiadate_msg(&msg[2], &fwd2, j); + } + }; + + let tests = vec![ + // Ensure 1: + // If the agent fails to handle MsgAppend in MsgGroupBroadcast, the agent only forwards empty MsgAppend. + // Send empty MsgAppend even if the previous log in Forward matches. + // Because the agent cannot guarantee its raft log is up-to-date now. + ( + ng(2, 3, 2, 3, None, Some(vec![(200, 3, 2), (300, 1, 1)])), + 2, + 0, + true, + na(200, 2, 3, 2, 3, None), + na(300, 2, 1, 1, 3, None), + ), // previous log mismatch, + ( + ng(2, 3, 2, 3, None, Some(vec![(200, 3, 3), (300, 0, 0)])), + 2, + 0, + true, + na(200, 2, 3, 3, 3, None), + na(300, 2, 0, 0, 3, None), + ), // previous log mismatch, + ( + ng(2, 3, 3, 3, None, Some(vec![(200, 3, 2), (300, 1, 1)])), + 2, + 0, + true, + na(200, 2, 3, 2, 3, None), + na(300, 2, 1, 1, 3, None), + ), // previous log non-exist + ( + ng(2, 3, 3, 3, None, Some(vec![(200, 3, 3), (300, 0, 0)])), + 2, + 0, + true, + na(200, 2, 3, 3, 3, None), + na(300, 2, 0, 0, 3, None), + ), // previous log non-exist + // Ensure 2: + // If the agent appends or overwrites its local raft log successfully, + // it will forward MsgAppend according to previous log in Forward. + // The agent appends log entries in MsgGroupBroadcast. + ( + ng( + 2, + 2, + 2, + 3, + Some(vec![(3, 2), (4, 2)]), + Some(vec![(200, 2, 2), (300, 1, 1)]), + ), + 4, + 3, + false, + na(200, 2, 2, 2, 3, Some(vec![(3, 2), (4, 2)])), + na(300, 2, 1, 1, 3, Some(vec![(2, 2), (3, 2), (4, 2)])), + ), // previous log match + ( + ng( + 2, + 2, + 2, + 3, + Some(vec![(3, 2), (4, 2)]), + Some(vec![(200, 0, 0), (300, 2, 3)]), + ), + 4, + 3, + false, + na(200, 2, 0, 0, 3, Some(vec![(1, 1), (2, 2), (3, 2), (4, 2)])), + na(300, 2, 2, 3, 3, Some(vec![(4, 2)])), + ), // previous log match + ( + ng( + 2, + 2, + 2, + 4, + Some(vec![(3, 2), (4, 2)]), + Some(vec![(200, 1, 2), (300, 2, 1)]), + ), + 4, + 4, + false, + na(200, 2, 1, 2, 4, None), + na(300, 2, 2, 1, 4, None), + ), // previous log mismatch + ( + ng( + 2, + 2, + 2, + 3, + Some(vec![(3, 2), (4, 2)]), + Some(vec![(200, 2, 5), (300, 3, 6)]), + ), + 4, + 3, + false, + na(200, 2, 2, 5, 3, None), + na(300, 2, 3, 6, 3, None), + ), // previous log non-exist + // The agent overwrites log entries in MsgGroupBroadcast. + ( + ng( + 2, + 0, + 0, + 2, + Some(vec![(1, 2), (2, 2)]), + Some(vec![(200, 0, 0), (300, 1, 1)]), + ), + 2, + 2, + false, + na(200, 2, 0, 0, 2, Some(vec![(1, 2), (2, 2)])), + na(300, 2, 1, 1, 2, None), + ), + ( + ng( + 2, + 1, + 1, + 4, + Some(vec![(2, 2), (3, 2), (4, 2), (5, 2)]), + Some(vec![(200, 2, 4), (300, 1, 1)]), + ), + 5, + 4, + false, + na(200, 2, 2, 4, 4, Some(vec![(5, 2)])), + na(300, 2, 1, 1, 4, Some(vec![(2, 2), (3, 2), (4, 2), (5, 2)])), + ), + ( + ng(2, 1, 1, 1, None, Some(vec![(200, 0, 0), (300, 1, 1)])), + 2, + 1, + false, + na(200, 2, 0, 0, 1, Some(vec![(1, 1), (2, 2)])), + na(300, 2, 1, 1, 1, Some(vec![(2, 2)])), + ), + ( + ng( + 2, + 0, + 0, + 1, + Some(vec![(1, 2)]), + Some(vec![(200, 0, 0), (300, 1, 1)]), + ), + 1, + 1, + false, + na(200, 2, 0, 0, 1, Some(vec![(1, 2)])), + na(300, 2, 1, 1, 1, None), + ), + ]; + + valiadate_tests(tests); +} + // test_handle_heartbeat ensures that the follower commits to the commit in the message. #[test] fn test_handle_heartbeat() { diff --git a/harness/tests/test_util/mod.rs b/harness/tests/test_util/mod.rs index d7864ad5..ec1144cf 100644 --- a/harness/tests/test_util/mod.rs +++ b/harness/tests/test_util/mod.rs @@ -172,6 +172,14 @@ pub fn new_snapshot(index: u64, term: u64, voters: Vec) -> Snapshot { s } +pub fn new_forward(to: u64, term: u64, index: u64) -> Forward { + let mut f = Forward::default(); + f.to = to; + f.log_term = term; + f.index = index; + f +} + pub fn conf_change(ty: ConfChangeType, node_id: u64) -> ConfChange { let mut cc = ConfChange::default(); cc.node_id = node_id; From e6384816761619a19ddb9b739c18474963c6fb6d Mon Sep 17 00:00:00 2001 From: LintianShi Date: Wed, 12 Oct 2022 11:10:09 +0800 Subject: [PATCH 07/21] Refine the procedure of forwarding msgapp Signed-off-by: LintianShi --- src/raft.rs | 85 +++++++++++++++++++++++++++++------------------------ 1 file changed, 47 insertions(+), 38 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index 00e20926..af4b1186 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -847,6 +847,7 @@ impl RaftCore { true } + // Pack MsgAppend according to forward info, and send it to target peer. fn send_forward( &mut self, from: u64, @@ -855,53 +856,21 @@ impl RaftCore { forward: &Forward, msgs: &mut Vec, ) { + // initialize MsgAppend let mut m = Message::default(); m.to = forward.to; m.from = from; m.commit = commit; m.commit_term = commit_term; m.set_msg_type(MessageType::MsgAppend); - // Fetch log entries from the forward.index to the last index of log. - if self + + // If log_term and index in forward info mismatch with agent's raft log, + // the agent just sends empty MsgAppend. + // Empty MsgAppend is only to update commit or trigger decrementing next index . + if !self .raft_log .match_term(forward.get_index(), forward.get_log_term()) { - let ents = self.raft_log.entries( - forward.get_index() + 1, - self.max_msg_size, - GetEntriesContext(GetEntriesFor::SendForward { - from, - commit, - commit_term, - term: self.term, - forward: forward.clone(), - }), - ); - - match ents { - Ok(ents) => { - m.index = forward.get_index(); - m.log_term = forward.get_log_term(); - m.set_entries(ents.into()); - self.send(m, msgs); - } - Err(Error::Store(StorageError::LogTemporarilyUnavailable)) => {} - _ => { - // Forward MsgAppend with empty entries in order to update commit - // or trigger decrementing next_idx. - m.index = forward.get_index(); - m.log_term = forward.get_log_term(); - self.send(m, msgs); - warn!( - self.logger, - "The agent fails to fetch entries, index {} log term {} in forward message to peer {}.", - forward.get_index(), - forward.get_log_term(), - forward.get_to() - ); - } - } - } else { m.index = forward.get_index(); m.log_term = forward.get_log_term(); self.send(m, msgs); @@ -912,6 +881,46 @@ impl RaftCore { forward.get_log_term(), forward.get_to() ); + return; + } + + // Fetch log entries from index in forward info to the last index of log. + let ents = self.raft_log.entries( + forward.get_index() + 1, + self.max_msg_size, + GetEntriesContext(GetEntriesFor::SendForward { + from, + commit, + commit_term, + term: self.term, + forward: forward.clone(), + }), + ); + + match ents { + Ok(ents) => { + m.index = forward.get_index(); + m.log_term = forward.get_log_term(); + m.set_entries(ents.into()); + self.send(m, msgs); + } + // TODO: Consider a better processing for temporary unavailable in async fetch, + // as current processing causes jitters of TPS. + // Temporarily the agent sends empty MsgAppend when log entries temporary unavailable. + _ => { + // If the agent fails to fetch log entries, send MsgAppend with empty entries + // in order to update commit, or trigger decrementing next_idx. + m.index = forward.get_index(); + m.log_term = forward.get_log_term(); + self.send(m, msgs); + warn!( + self.logger, + "The agent fails to fetch entries, index {} log term {} in forward message to peer {}.", + forward.get_index(), + forward.get_log_term(), + forward.get_to() + ); + } } } From f605f68518213e07ffd6b377c39e8e0f0c0270b7 Mon Sep 17 00:00:00 2001 From: LintianShi Date: Wed, 12 Oct 2022 14:07:14 +0800 Subject: [PATCH 08/21] Refine handle_group_broadcast and add some comments Signed-off-by: LintianShi --- src/raft.rs | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index af4b1186..b5735478 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -2621,16 +2621,22 @@ impl Raft { self.try_append_entries(m); } - /// For a broadcast, append entries to local log and forward MsgAppend to other dest. + /// For a group broadcast, append entries to local log and forward MsgAppend to other dest. + /// The usage of group broadcast is in examples/follower_replication/main.rs. pub fn handle_group_broadcast(&mut self, m: &Message) { - if self.try_append_entries(m) { - // If the agent fails to append entries from the leader, - // the agent cannot forward MsgAppend. - for forward in m.get_forwards() { - self.r - .send_forward(m.from, m.commit, m.commit_term, forward, &mut self.msgs); - } - } else { + // The agent should handle appending log entries in MsgGroupBroadcast firstly, in order to + // guarantee that agent's local raft log is identical to leader's log up through agent's last index. + // If the agent fails to append log entries, agent's log cannot be used for frowarding. + if !self.try_append_entries(m) { + // If the agent fails to append log entries, there are three cases. + // 1. The agent is pending request snapshot. To avoid dead loop, upper layer + // should only send MsgGroupBroadcast to peers which are in Replicate state. + // 2. Log entries in MsgGroupBroadcast is conflict with committed. + // 3. Log entries in MsgGroupBroadcast is conflict with agent's log. + // + // If the agent's raft log might be conflict with leader's raft log, + // it just sends empty MsgAppends to target peers. + for forward in m.get_forwards() { let mut m_append = Message::default(); m_append.to = forward.to; @@ -2652,6 +2658,13 @@ impl Raft { "index" => m.index, "logterm" => ?self.raft_log.term(m.index), ); + return; + } + + // Pack MsgAppend with local raft log and forward to target peers. + for forward in m.get_forwards() { + self.r + .send_forward(m.from, m.commit, m.commit_term, forward, &mut self.msgs); } } From 72ad3e65fd8bc828f29fdb6f0a6f54f4c6d12c96 Mon Sep 17 00:00:00 2001 From: LintianShi Date: Thu, 13 Oct 2022 15:57:06 +0800 Subject: [PATCH 09/21] Maintain broadcast group configuration in progress Signed-off-by: LintianShi --- src/raft.rs | 75 +++++++++++++++++++++++++++-------------- src/tracker/progress.rs | 5 +++ 2 files changed, 55 insertions(+), 25 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index b5735478..c26785a3 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -510,6 +510,31 @@ impl Raft { self.batch_append = batch_append; } + /// Assigns broadcast groups to peers. + /// + /// The tuple is (`peer_id`, `group_id`). `group_id` should be larger than 0. + /// + /// The group information is only stored in memory. So you need to configure + /// it every time a raft state machine is initialized or a snapshot is applied. + pub fn assign_broadcast_groups(&mut self, ids: &[(u64, u64)]) { + let prs = self.mut_prs(); + for (peer_id, group_id) in ids { + assert!(*group_id > 0); + if let Some(pr) = prs.get_mut(*peer_id) { + pr.broadcast_group_id = *group_id; + } else { + continue; + } + } + } + + /// Removes all broadcast group configurations. + pub fn clear_broadcast_group(&mut self) { + for (_, pr) in self.mut_prs().iter_mut() { + pr.broadcast_group_id = 0; + } + } + /// Configures group commit. /// /// If group commit is enabled, only logs replicated to at least two @@ -528,6 +553,31 @@ impl Raft { self.prs().group_commit() } + /// Checks whether the raft group is using group commit and consistent + /// over group. + /// + /// If it can't get a correct answer, `None` is returned. + pub fn check_group_commit_consistent(&mut self) -> Option { + if self.state != StateRole::Leader { + return None; + } + // Previous leader may have reach consistency already. + // + // check applied_index instead of committed_index to avoid pending conf change. + if !self.apply_to_current_term() { + return None; + } + let (index, use_group_commit) = self.mut_prs().maximal_committed_index(); + debug!( + self.logger, + "check group commit consistent"; + "index" => index, + "use_group_commit" => use_group_commit, + "committed" => self.raft_log.committed + ); + Some(use_group_commit && index == self.raft_log.committed) + } + /// Assigns groups to peers. /// /// The tuple is (`peer_id`, `group_id`). `group_id` should be larger than 0. @@ -556,31 +606,6 @@ impl Raft { } } - /// Checks whether the raft group is using group commit and consistent - /// over group. - /// - /// If it can't get a correct answer, `None` is returned. - pub fn check_group_commit_consistent(&mut self) -> Option { - if self.state != StateRole::Leader { - return None; - } - // Previous leader may have reach consistency already. - // - // check applied_index instead of committed_index to avoid pending conf change. - if !self.apply_to_current_term() { - return None; - } - let (index, use_group_commit) = self.mut_prs().maximal_committed_index(); - debug!( - self.logger, - "check group commit consistent"; - "index" => index, - "use_group_commit" => use_group_commit, - "committed" => self.raft_log.committed - ); - Some(use_group_commit && index == self.raft_log.committed) - } - /// Checks if logs are committed to its term. /// /// The check is useful usually when raft is leader. diff --git a/src/tracker/progress.rs b/src/tracker/progress.rs index 432e0d55..cedb88cd 100644 --- a/src/tracker/progress.rs +++ b/src/tracker/progress.rs @@ -51,6 +51,10 @@ pub struct Progress { /// Only logs replicated to different group will be committed if any group is configured. pub commit_group_id: u64, + /// Leader only replicates log entries to the agent of each group, + /// and the agent broadcasts logs within a group. + pub broadcast_group_id: u64, + /// Committed index in raft_log pub committed_index: u64, } @@ -68,6 +72,7 @@ impl Progress { recent_active: false, ins: Inflights::new(ins_size), commit_group_id: 0, + broadcast_group_id: 0, committed_index: 0, } } From 0a0110680de39697f5757dc506126e8bb9469048 Mon Sep 17 00:00:00 2001 From: LintianShi Date: Fri, 14 Oct 2022 10:18:28 +0800 Subject: [PATCH 10/21] Group broadcast log entries in bcast_append when follower replication is enabled Signed-off-by: LintianShi --- src/raft.rs | 128 +++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 122 insertions(+), 6 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index c26785a3..54578c05 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -15,6 +15,7 @@ // limitations under the License. use std::cmp; +use std::collections::HashMap; use std::ops::{Deref, DerefMut}; use crate::eraftpb::{ @@ -997,16 +998,131 @@ impl Raft { self.r.send_append_aggressively(to, pr, &mut self.msgs) } + fn select_agent_for_bcast_group(&self, group: &[usize], msgs: &[Message]) -> Option { + let mut agent_idx: Option = None; + for idx in group { + let peer_id = msgs[*idx].to; + let is_voter = self.prs().conf().voters().contains(peer_id); + // Agent must be voter and recently active. + if !is_voter || !self.is_recent_active(peer_id) { + continue; + } + agent_idx = Some(*idx); + } + agent_idx + } + + fn merge_append_group(&self, group: &[usize], msgs: &mut [Message], skip: &mut [bool]) { + // Do not need to merge if group size is less than two. + if group.len() < 2 { + return; + } + let agent_idx = self.select_agent_for_bcast_group(group, msgs); + // Return if no appropriate agent + if agent_idx.is_none() { + return; + } + + // Record forward information + let mut forwards: Vec = Vec::default(); + for idx in group { + if *idx == agent_idx.unwrap() { + // MsgAppend sent to the agent is changed to MsgGroupBroadcast. + let msg = &mut msgs[*idx]; + msg.set_msg_type(MessageType::MsgGroupBroadcast); + } else { + // MsgAppend sent to other peers in this group only reserve basic + // forward information. + let msg = &msgs[*idx]; + let forward = Forward { + to: msg.to, + log_term: msg.log_term, + index: msg.index, + ..Default::default() + }; + forwards.push(forward); + // Mark and skip this message later. + skip[*idx] = true; + } + } + // Attach forward information to MsgGroupbroadcast + let agent_msg = &mut msgs[agent_idx.unwrap()]; + agent_msg.set_forwards(forwards.into()); + } + /// Sends RPC, with entries to all peers that are not up-to-date /// according to the progress recorded in r.prs(). pub fn bcast_append(&mut self) { let self_id = self.id; - let core = &mut self.r; - let msgs = &mut self.msgs; - self.prs - .iter_mut() - .filter(|&(id, _)| *id != self_id) - .for_each(|(id, pr)| core.send_append(*id, pr, msgs)); + let mut msgs: Vec = Vec::default(); + { + // Messages are stored in a vector temporarily. + // They will be pushed to message queue later. + let core = &mut self.r; + self.prs + .iter_mut() + .filter(|&(id, _)| *id != self_id) + .for_each(|(id, pr)| core.send_append(*id, pr, &mut msgs)); + } + + // Use leader replication if follower replication is disabled or + // the broadcast group id of leader is unknown. Broadcast MsgAppend + // as normal. + let leader_group_id = self + .prs() + .get(self_id) + .map_or(0, |pr| pr.broadcast_group_id); + if !self.follower_replication() || leader_group_id != 0 { + self.msgs.append(&mut msgs); + return; + } + + // If follower replication is enabled, MsgAppends sent to the same broadcast group + // will be merge into a MsgGroupBroadcast. + // + // Record message that should be discarded after merging. + let mut skip = vec![false; msgs.len()]; + // Message group: + // broadcast group id -> {index of messages in msgs} + let mut msg_group: HashMap> = HashMap::default(); + + // Iterate messages generated by leader. + // Filter out messages that should be sent by follower replication, + // and group them by broadcast group id. + for (pos, msg) in msgs.iter().enumerate() { + // Only reserve MsgAppend sent to peers in replicating state + if msg.get_msg_type() != MessageType::MsgAppend || !self.is_replicating(msg.to) { + continue; + } + // Get the broadcast group id of target peer. + let group_id = self.prs().get(msg.to).map_or(0, |pr| pr.broadcast_group_id); + // Do not need merge if broadcast group id is unknown or in the same + // group with leader. Valid broadcast group id should be greater than 0. + if group_id == 0 || group_id != leader_group_id { + continue; + } + + // Group messages + if let Some(group) = msg_group.get_mut(&group_id) { + group.push(pos); + } else { + msg_group.insert(group_id, vec![pos]); + } + } + + // Merge MsgAppend in broadcast groups. + for (_, group) in msg_group { + self.merge_append_group(&group, &mut msgs, &mut skip); + } + + let mut idx: usize = 0; + for msg in msgs { + if !skip[idx] { + continue; + } + self.msgs.push(msg); + idx += 1 + } } /// Forwards an append RPC from the leader to the given peer. From 0cc4fa9613f3c624650967bacf5d9c2e29dce8bc Mon Sep 17 00:00:00 2001 From: LintianShi Date: Fri, 14 Oct 2022 11:02:04 +0800 Subject: [PATCH 11/21] Interface of querying whether two peers are in same broadcast group Signed-off-by: LintianShi --- src/raft.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/raft.rs b/src/raft.rs index 54578c05..626896cc 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -3215,6 +3215,16 @@ impl Raft { } } + /// Whether two peers are in the same broadcast group. + pub fn is_in_same_broadcast_group(&self, id: u64, id_other: u64) -> bool { + let group_id = self.prs().get(id).map_or(0, |pr| pr.broadcast_group_id); + let other_group_id = self + .prs() + .get(id_other) + .map_or(0, |pr| pr.broadcast_group_id); + group_id != 0 && other_group_id != 0 && group_id == other_group_id + } + // Whether this peer is active recently. #[inline] fn is_recent_active(&self, id: u64) -> bool { From c75d269e53af472b1259e151021e7eafedf78b9d Mon Sep 17 00:00:00 2001 From: LintianShi Date: Tue, 18 Oct 2022 13:16:16 +0800 Subject: [PATCH 12/21] Simplify message packing in fn send_forward Signed-off-by: LintianShi --- src/raft.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index 626896cc..0c436991 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -886,6 +886,8 @@ impl RaftCore { let mut m = Message::default(); m.to = forward.to; m.from = from; + m.index = forward.get_index(); + m.log_term = forward.get_log_term(); m.commit = commit; m.commit_term = commit_term; m.set_msg_type(MessageType::MsgAppend); @@ -897,8 +899,6 @@ impl RaftCore { .raft_log .match_term(forward.get_index(), forward.get_log_term()) { - m.index = forward.get_index(); - m.log_term = forward.get_log_term(); self.send(m, msgs); warn!( self.logger, @@ -925,8 +925,6 @@ impl RaftCore { match ents { Ok(ents) => { - m.index = forward.get_index(); - m.log_term = forward.get_log_term(); m.set_entries(ents.into()); self.send(m, msgs); } @@ -936,8 +934,6 @@ impl RaftCore { _ => { // If the agent fails to fetch log entries, send MsgAppend with empty entries // in order to update commit, or trigger decrementing next_idx. - m.index = forward.get_index(); - m.log_term = forward.get_log_term(); self.send(m, msgs); warn!( self.logger, From 20f033943f21f807d3abdd48ae3eb44890fd1308 Mon Sep 17 00:00:00 2001 From: LintianShi Date: Tue, 18 Oct 2022 18:31:05 +0800 Subject: [PATCH 13/21] Group and filter messages directly Signed-off-by: LintianShi --- src/raft.rs | 159 ++++++++++++++++++++++++---------------------------- 1 file changed, 74 insertions(+), 85 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index 0c436991..7d6b55e6 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -994,42 +994,42 @@ impl Raft { self.r.send_append_aggressively(to, pr, &mut self.msgs) } - fn select_agent_for_bcast_group(&self, group: &[usize], msgs: &[Message]) -> Option { - let mut agent_idx: Option = None; - for idx in group { - let peer_id = msgs[*idx].to; + // Find an appropriate agent. + // If found an appropriate one, mark the corresponding message's type as + // MsgGroupBroadcast and return true. If not found, return false. + fn select_agent_for_bcast_group(&self, msgs: &mut [Message]) -> bool { + for msg in msgs { + let peer_id = msg.to; let is_voter = self.prs().conf().voters().contains(peer_id); // Agent must be voter and recently active. if !is_voter || !self.is_recent_active(peer_id) { continue; } - agent_idx = Some(*idx); + msg.set_msg_type(MessageType::MsgGroupBroadcast); + return true; } - agent_idx + false } - fn merge_append_group(&self, group: &[usize], msgs: &mut [Message], skip: &mut [bool]) { + fn merge_msg_group(&mut self, mut group: Vec) { // Do not need to merge if group size is less than two. if group.len() < 2 { + self.msgs.append(&mut group); return; } - let agent_idx = self.select_agent_for_bcast_group(group, msgs); - // Return if no appropriate agent - if agent_idx.is_none() { + // Send messages directly if no appropriate agent in this broadcast group. + if !self.select_agent_for_bcast_group(&mut group) { + self.msgs.append(&mut group); return; } // Record forward information let mut forwards: Vec = Vec::default(); - for idx in group { - if *idx == agent_idx.unwrap() { - // MsgAppend sent to the agent is changed to MsgGroupBroadcast. - let msg = &mut msgs[*idx]; - msg.set_msg_type(MessageType::MsgGroupBroadcast); - } else { - // MsgAppend sent to other peers in this group only reserve basic - // forward information. - let msg = &msgs[*idx]; + let mut mark = 0; + for (idx, msg) in group.iter().enumerate() { + // MsgAppend sent to other peers in this group only reserve basic + // forward information. + if msg.get_msg_type() != MessageType::MsgGroupBroadcast { let forward = Forward { to: msg.to, log_term: msg.log_term, @@ -1037,87 +1037,82 @@ impl Raft { ..Default::default() }; forwards.push(forward); - // Mark and skip this message later. - skip[*idx] = true; + } else { + mark = idx; + } + } + // Attach forward information to MsgGroupbroadcast and send it. + group[mark].set_forwards(forwards.into()); + for msg in group { + if msg.get_msg_type() == MessageType::MsgGroupBroadcast { + self.msgs.push(msg); + return; } } - // Attach forward information to MsgGroupbroadcast - let agent_msg = &mut msgs[agent_idx.unwrap()]; - agent_msg.set_forwards(forwards.into()); } /// Sends RPC, with entries to all peers that are not up-to-date /// according to the progress recorded in r.prs(). pub fn bcast_append(&mut self) { let self_id = self.id; - let mut msgs: Vec = Vec::default(); - { - // Messages are stored in a vector temporarily. - // They will be pushed to message queue later. - let core = &mut self.r; - self.prs - .iter_mut() - .filter(|&(id, _)| *id != self_id) - .for_each(|(id, pr)| core.send_append(*id, pr, &mut msgs)); - } - - // Use leader replication if follower replication is disabled or - // the broadcast group id of leader is unknown. Broadcast MsgAppend - // as normal. let leader_group_id = self .prs() .get(self_id) .map_or(0, |pr| pr.broadcast_group_id); - if !self.follower_replication() || leader_group_id != 0 { - self.msgs.append(&mut msgs); + // Use leader replication if follower replication is disabled or + // the broadcast group id of leader is unknown. Broadcast MsgAppend + // as normal. + if !self.follower_replication() || leader_group_id == 0 { + let core = &mut self.r; + let msgs = &mut self.msgs; + self.prs + .iter_mut() + .filter(|&(id, _)| *id != self_id) + .for_each(|(id, pr)| core.send_append(*id, pr, msgs)); return; } // If follower replication is enabled, MsgAppends sent to the same broadcast group // will be merge into a MsgGroupBroadcast. // - // Record message that should be discarded after merging. - let mut skip = vec![false; msgs.len()]; - // Message group: - // broadcast group id -> {index of messages in msgs} - let mut msg_group: HashMap> = HashMap::default(); - - // Iterate messages generated by leader. - // Filter out messages that should be sent by follower replication, - // and group them by broadcast group id. - for (pos, msg) in msgs.iter().enumerate() { - // Only reserve MsgAppend sent to peers in replicating state - if msg.get_msg_type() != MessageType::MsgAppend || !self.is_replicating(msg.to) { - continue; - } - // Get the broadcast group id of target peer. - let group_id = self.prs().get(msg.to).map_or(0, |pr| pr.broadcast_group_id); - // Do not need merge if broadcast group id is unknown or in the same - // group with leader. Valid broadcast group id should be greater than 0. - if group_id == 0 || group_id != leader_group_id { - continue; - } + // Messages that needs to be forwarded are stored in hashmap temporarily, + // and they are grouped by broadcast_group_id of progress. + // Messages in msg_group will be pushed to message queue later. + let mut msg_group: HashMap> = HashMap::default(); + let core = &mut self.r; + let msgs = &mut self.msgs; + self.prs + .iter_mut() + .filter(|&(id, _)| *id != self_id) + .for_each(|(id, pr)| { + let mut tmp_msgs = Vec::default(); + // Let messages be pushed into tmp_vec firstly. + core.send_append(*id, pr, &mut tmp_msgs); + for msg in tmp_msgs { + // Filter out messages that need to be forwarded into msg_group. + // Other messages are sent directly. + if pr.broadcast_group_id == leader_group_id + || msg.get_msg_type() != MessageType::MsgAppend + || !pr.is_replicating() + { + msgs.push(msg); + } else { + msg_group + .entry(pr.broadcast_group_id) + .or_default() + .push(msg); + } + } + }); - // Group messages - if let Some(group) = msg_group.get_mut(&group_id) { - group.push(pos); + // Merge messages in the same broadcast group and send them. + for (group_id, mut group) in msg_group.drain() { + // Double check: do not need to forward messages in leader's broadcast group. + if group_id == leader_group_id { + self.msgs.append(&mut group); } else { - msg_group.insert(group_id, vec![pos]); - } - } - - // Merge MsgAppend in broadcast groups. - for (_, group) in msg_group { - self.merge_append_group(&group, &mut msgs, &mut skip); - } - - let mut idx: usize = 0; - for msg in msgs { - if !skip[idx] { - continue; + self.merge_msg_group(group); } - self.msgs.push(msg); - idx += 1 } } @@ -3226,10 +3221,4 @@ impl Raft { fn is_recent_active(&self, id: u64) -> bool { self.prs().get(id).map_or(false, |pr| pr.recent_active) } - - // Determine whether a progress is in Replicate state. - #[inline] - fn is_replicating(&self, id: u64) -> bool { - self.prs().get(id).map_or(false, |pr| pr.is_replicating()) - } } From 60a80f2f522ab09232ce7093d6474de2216d8c22 Mon Sep 17 00:00:00 2001 From: LintianShi Date: Thu, 20 Oct 2022 13:22:48 +0800 Subject: [PATCH 14/21] Simplfy bcast_append and merge_msg_group Signed-off-by: LintianShi --- src/raft.rs | 94 +++++++++++++++++++++----------------------------- src/tracker.rs | 4 +-- 2 files changed, 42 insertions(+), 56 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index 7d6b55e6..f3e69452 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -994,42 +994,30 @@ impl Raft { self.r.send_append_aggressively(to, pr, &mut self.msgs) } - // Find an appropriate agent. - // If found an appropriate one, mark the corresponding message's type as - // MsgGroupBroadcast and return true. If not found, return false. - fn select_agent_for_bcast_group(&self, msgs: &mut [Message]) -> bool { - for msg in msgs { - let peer_id = msg.to; - let is_voter = self.prs().conf().voters().contains(peer_id); - // Agent must be voter and recently active. - if !is_voter || !self.is_recent_active(peer_id) { - continue; - } - msg.set_msg_type(MessageType::MsgGroupBroadcast); - return true; - } - false - } - - fn merge_msg_group(&mut self, mut group: Vec) { - // Do not need to merge if group size is less than two. - if group.len() < 2 { - self.msgs.append(&mut group); - return; - } - // Send messages directly if no appropriate agent in this broadcast group. - if !self.select_agent_for_bcast_group(&mut group) { - self.msgs.append(&mut group); + fn merge_msg_group(&mut self, mut group: Vec<(Message, bool)>) { + let mut need_merge = group.len() > 1; + let mut agent_msg_idx = None; + if need_merge { + // Find an appropriate agent. + // If found an appropriate one, return the index of agent's message. If not found, return None. + agent_msg_idx = group + .iter() + .position(|(_, is_agent_candidate)| *is_agent_candidate); + need_merge = agent_msg_idx.is_some(); + } + // Do not need to merge if group size is less than two. Or there is no appropriate agent. + if !need_merge { + self.msgs + .append(&mut group.into_iter().map(|(msg, _)| msg).collect()); return; } // Record forward information let mut forwards: Vec = Vec::default(); - let mut mark = 0; - for (idx, msg) in group.iter().enumerate() { + for (idx, (msg, _)) in group.iter().enumerate() { // MsgAppend sent to other peers in this group only reserve basic // forward information. - if msg.get_msg_type() != MessageType::MsgGroupBroadcast { + if idx != agent_msg_idx.unwrap() { let forward = Forward { to: msg.to, log_term: msg.log_term, @@ -1037,32 +1025,32 @@ impl Raft { ..Default::default() }; forwards.push(forward); - } else { - mark = idx; } } // Attach forward information to MsgGroupbroadcast and send it. - group[mark].set_forwards(forwards.into()); - for msg in group { - if msg.get_msg_type() == MessageType::MsgGroupBroadcast { - self.msgs.push(msg); - return; - } - } + let mut agent_msg = group.swap_remove(agent_msg_idx.unwrap()).0; + agent_msg.set_msg_type(MessageType::MsgGroupBroadcast); + agent_msg.set_forwards(forwards.into()); + self.msgs.push(agent_msg); } /// Sends RPC, with entries to all peers that are not up-to-date /// according to the progress recorded in r.prs(). pub fn bcast_append(&mut self) { let self_id = self.id; - let leader_group_id = self - .prs() - .get(self_id) - .map_or(0, |pr| pr.broadcast_group_id); + let mut leader_group_id = 0; // Use leader replication if follower replication is disabled or // the broadcast group id of leader is unknown. Broadcast MsgAppend // as normal. - if !self.follower_replication() || leader_group_id == 0 { + let mut use_leader_replication = !self.follower_replication(); + if !use_leader_replication { + leader_group_id = self + .prs() + .get(self_id) + .map_or(0, |pr| pr.broadcast_group_id); + use_leader_replication = leader_group_id == 0; + } + if use_leader_replication { let core = &mut self.r; let msgs = &mut self.msgs; self.prs @@ -1078,11 +1066,12 @@ impl Raft { // Messages that needs to be forwarded are stored in hashmap temporarily, // and they are grouped by broadcast_group_id of progress. // Messages in msg_group will be pushed to message queue later. - let mut msg_group: HashMap> = HashMap::default(); + let mut msg_group: HashMap> = HashMap::default(); let core = &mut self.r; let msgs = &mut self.msgs; - self.prs - .iter_mut() + let prs = &mut self.prs.progress; + let conf = &self.prs.conf; + prs.iter_mut() .filter(|&(id, _)| *id != self_id) .for_each(|(id, pr)| { let mut tmp_msgs = Vec::default(); @@ -1097,22 +1086,19 @@ impl Raft { { msgs.push(msg); } else { + let peer_id = msg.to; msg_group .entry(pr.broadcast_group_id) .or_default() - .push(msg); + // The agent must be a voter and active recently. + .push((msg, pr.recent_active && conf.voters().contains(peer_id))); } } }); // Merge messages in the same broadcast group and send them. - for (group_id, mut group) in msg_group.drain() { - // Double check: do not need to forward messages in leader's broadcast group. - if group_id == leader_group_id { - self.msgs.append(&mut group); - } else { - self.merge_msg_group(group); - } + for (_, group) in msg_group.drain() { + self.merge_msg_group(group); } } diff --git a/src/tracker.rs b/src/tracker.rs index 5424e7bf..1c6a4046 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -190,11 +190,11 @@ impl AckedIndexer for ProgressMap { /// which could be `Leader`, `Follower` and `Learner`. #[derive(Clone, Getters)] pub struct ProgressTracker { - progress: ProgressMap, + pub(crate) progress: ProgressMap, /// The current configuration state of the cluster. #[get = "pub"] - conf: Configuration, + pub(crate) conf: Configuration, #[doc(hidden)] #[get = "pub"] votes: HashMap, From 91dd4be0e7eece89f1209a1f21868bc75b008462 Mon Sep 17 00:00:00 2001 From: LintianShi Date: Thu, 20 Oct 2022 15:24:45 +0800 Subject: [PATCH 15/21] Remove is_recent_active Signed-off-by: LintianShi --- src/raft.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index f3e69452..5f25f980 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -3201,10 +3201,4 @@ impl Raft { .map_or(0, |pr| pr.broadcast_group_id); group_id != 0 && other_group_id != 0 && group_id == other_group_id } - - // Whether this peer is active recently. - #[inline] - fn is_recent_active(&self, id: u64) -> bool { - self.prs().get(id).map_or(false, |pr| pr.recent_active) - } } From 91855fbd674164823ba1ce51923874aab978e330 Mon Sep 17 00:00:00 2001 From: LintianShi Date: Thu, 20 Oct 2022 15:51:18 +0800 Subject: [PATCH 16/21] Add cache for message group Signed-off-by: LintianShi --- src/raft.rs | 84 +++++++++++++++++++++++++---------------------------- 1 file changed, 40 insertions(+), 44 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index 5f25f980..d77d602b 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -266,6 +266,9 @@ pub struct RaftCore { /// Max size per committed entries in a `Read`. pub(crate) max_committed_size_per_ready: u64, + + // Message group cache for follower replication. + msg_group: HashMap>, } /// A struct that represents the raft consensus itself. Stores details concerning the current @@ -367,6 +370,7 @@ impl Raft { last_log_tail_index: 0, }, max_committed_size_per_ready: c.max_committed_size_per_ready, + msg_group: HashMap::default(), }, }; confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?; @@ -994,46 +998,6 @@ impl Raft { self.r.send_append_aggressively(to, pr, &mut self.msgs) } - fn merge_msg_group(&mut self, mut group: Vec<(Message, bool)>) { - let mut need_merge = group.len() > 1; - let mut agent_msg_idx = None; - if need_merge { - // Find an appropriate agent. - // If found an appropriate one, return the index of agent's message. If not found, return None. - agent_msg_idx = group - .iter() - .position(|(_, is_agent_candidate)| *is_agent_candidate); - need_merge = agent_msg_idx.is_some(); - } - // Do not need to merge if group size is less than two. Or there is no appropriate agent. - if !need_merge { - self.msgs - .append(&mut group.into_iter().map(|(msg, _)| msg).collect()); - return; - } - - // Record forward information - let mut forwards: Vec = Vec::default(); - for (idx, (msg, _)) in group.iter().enumerate() { - // MsgAppend sent to other peers in this group only reserve basic - // forward information. - if idx != agent_msg_idx.unwrap() { - let forward = Forward { - to: msg.to, - log_term: msg.log_term, - index: msg.index, - ..Default::default() - }; - forwards.push(forward); - } - } - // Attach forward information to MsgGroupbroadcast and send it. - let mut agent_msg = group.swap_remove(agent_msg_idx.unwrap()).0; - agent_msg.set_msg_type(MessageType::MsgGroupBroadcast); - agent_msg.set_forwards(forwards.into()); - self.msgs.push(agent_msg); - } - /// Sends RPC, with entries to all peers that are not up-to-date /// according to the progress recorded in r.prs(). pub fn bcast_append(&mut self) { @@ -1066,7 +1030,6 @@ impl Raft { // Messages that needs to be forwarded are stored in hashmap temporarily, // and they are grouped by broadcast_group_id of progress. // Messages in msg_group will be pushed to message queue later. - let mut msg_group: HashMap> = HashMap::default(); let core = &mut self.r; let msgs = &mut self.msgs; let prs = &mut self.prs.progress; @@ -1087,7 +1050,7 @@ impl Raft { msgs.push(msg); } else { let peer_id = msg.to; - msg_group + core.msg_group .entry(pr.broadcast_group_id) .or_default() // The agent must be a voter and active recently. @@ -1097,8 +1060,41 @@ impl Raft { }); // Merge messages in the same broadcast group and send them. - for (_, group) in msg_group.drain() { - self.merge_msg_group(group); + for (_, mut group) in core.msg_group.drain() { + let mut need_merge = group.len() > 1; + let mut agent_msg_idx = None; + if need_merge { + // If found an appropriate agent, return the index of agent's message. Otherwise, return None. + agent_msg_idx = group + .iter() + .position(|(_, is_agent_candidate)| *is_agent_candidate); + need_merge = agent_msg_idx.is_some(); + } + // Do not need to merge if group size is less than two. Or there is no appropriate agent. + if !need_merge { + msgs.append(&mut group.into_iter().map(|(msg, _)| msg).collect()); + return; + } + + // Record forward information + let mut forwards: Vec = Vec::default(); + for (idx, (msg, _)) in group.iter().enumerate() { + // MsgAppend sent to other peers in this group only reserve basic forward information. + if idx != agent_msg_idx.unwrap() { + let forward = Forward { + to: msg.to, + log_term: msg.log_term, + index: msg.index, + ..Default::default() + }; + forwards.push(forward); + } + } + // Attach forward information to MsgGroupbroadcast and send it. + let mut agent_msg = group.swap_remove(agent_msg_idx.unwrap()).0; + agent_msg.set_msg_type(MessageType::MsgGroupBroadcast); + agent_msg.set_forwards(forwards.into()); + msgs.push(agent_msg); } } From 6f67e2331daadd4c9132f14c3949df47b96ea041 Mon Sep 17 00:00:00 2001 From: LintianShi Date: Fri, 21 Oct 2022 14:54:07 +0800 Subject: [PATCH 17/21] Fix a bug on merging message group Signed-off-by: LintianShi --- src/raft.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/raft.rs b/src/raft.rs index d77d602b..64fbbb0d 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -1073,7 +1073,7 @@ impl Raft { // Do not need to merge if group size is less than two. Or there is no appropriate agent. if !need_merge { msgs.append(&mut group.into_iter().map(|(msg, _)| msg).collect()); - return; + continue; } // Record forward information From ceeac5150e920f3697eb3de248c7629349f453b7 Mon Sep 17 00:00:00 2001 From: LintianShi Date: Fri, 21 Oct 2022 16:42:30 +0800 Subject: [PATCH 18/21] Optimize generating messages Signed-off-by: LintianShi --- src/raft.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index 64fbbb0d..0fbd42a2 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -1040,21 +1040,22 @@ impl Raft { let mut tmp_msgs = Vec::default(); // Let messages be pushed into tmp_vec firstly. core.send_append(*id, pr, &mut tmp_msgs); + // Filter out messages that need to be forwarded into msg_group. Other messages + // are sent directly. + if pr.broadcast_group_id == leader_group_id || !pr.is_replicating() { + msgs.extend(tmp_msgs); + return; + } + let is_voter = conf.voters().contains(*id); for msg in tmp_msgs { - // Filter out messages that need to be forwarded into msg_group. - // Other messages are sent directly. - if pr.broadcast_group_id == leader_group_id - || msg.get_msg_type() != MessageType::MsgAppend - || !pr.is_replicating() - { + if msg.get_msg_type() != MessageType::MsgAppend { msgs.push(msg); } else { - let peer_id = msg.to; core.msg_group .entry(pr.broadcast_group_id) .or_default() // The agent must be a voter and active recently. - .push((msg, pr.recent_active && conf.voters().contains(peer_id))); + .push((msg, pr.recent_active && is_voter)); } } }); From 1e2e464390a8827be0164b88ddc7d1a57041e639 Mon Sep 17 00:00:00 2001 From: LintianShi Date: Thu, 3 Nov 2022 11:49:03 +0800 Subject: [PATCH 19/21] Using vector as group information cache instead of hashmap Signed-off-by: LintianShi --- src/raft.rs | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index 0fbd42a2..2585629b 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -15,7 +15,6 @@ // limitations under the License. use std::cmp; -use std::collections::HashMap; use std::ops::{Deref, DerefMut}; use crate::eraftpb::{ @@ -268,7 +267,8 @@ pub struct RaftCore { pub(crate) max_committed_size_per_ready: u64, // Message group cache for follower replication. - msg_group: HashMap>, + // Since the number of groups is small, use vector instead of hashmap. + msg_group: Vec<(u64, Vec<(Message, bool)>)>, } /// A struct that represents the raft consensus itself. Stores details concerning the current @@ -370,7 +370,7 @@ impl Raft { last_log_tail_index: 0, }, max_committed_size_per_ready: c.max_committed_size_per_ready, - msg_group: HashMap::default(), + msg_group: Vec::default(), }, }; confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?; @@ -1027,7 +1027,7 @@ impl Raft { // If follower replication is enabled, MsgAppends sent to the same broadcast group // will be merge into a MsgGroupBroadcast. // - // Messages that needs to be forwarded are stored in hashmap temporarily, + // Messages that needs to be forwarded are stored in cache temporarily, // and they are grouped by broadcast_group_id of progress. // Messages in msg_group will be pushed to message queue later. let core = &mut self.r; @@ -1051,17 +1051,28 @@ impl Raft { if msg.get_msg_type() != MessageType::MsgAppend { msgs.push(msg); } else { - core.msg_group - .entry(pr.broadcast_group_id) - .or_default() - // The agent must be a voter and active recently. - .push((msg, pr.recent_active && is_voter)); + // Search the target group. + let mut group_idx = None; + for (idx, (group_id, _)) in core.msg_group.iter().enumerate() { + if *group_id == pr.broadcast_group_id { + group_idx = Some(idx); + break; + } + } + // The agent must be a voter and active recently. + let msg_forward = (msg, pr.recent_active && is_voter); + if let Some(idx) = group_idx { + core.msg_group[idx].1.push(msg_forward) + } else { + core.msg_group + .push((pr.broadcast_group_id, vec![msg_forward])); + } } } }); // Merge messages in the same broadcast group and send them. - for (_, mut group) in core.msg_group.drain() { + for (_, mut group) in core.msg_group.drain(..) { let mut need_merge = group.len() > 1; let mut agent_msg_idx = None; if need_merge { From 67b8f306a836d69ce77798f59d090f601e3879d9 Mon Sep 17 00:00:00 2001 From: LintianShi Date: Thu, 3 Nov 2022 12:58:05 +0800 Subject: [PATCH 20/21] Async fetch for log entries in send_forward Signed-off-by: LintianShi --- src/raft.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index 2585629b..9028d824 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -932,9 +932,9 @@ impl RaftCore { m.set_entries(ents.into()); self.send(m, msgs); } - // TODO: Consider a better processing for temporary unavailable in async fetch, - // as current processing causes jitters of TPS. - // Temporarily the agent sends empty MsgAppend when log entries temporary unavailable. + Err(Error::Store(StorageError::LogTemporarilyUnavailable)) => { + // wait for storage to fetch entries asynchronously + } _ => { // If the agent fails to fetch log entries, send MsgAppend with empty entries // in order to update commit, or trigger decrementing next_idx. From 1e8b19f3f5ea64d9be2ffa78346cebd09f609635 Mon Sep 17 00:00:00 2001 From: LintianShi Date: Tue, 8 Nov 2022 18:28:09 +0800 Subject: [PATCH 21/21] Unit tests for follower replication in fn bcast_append Signed-off-by: LintianShi --- harness/tests/integration_cases/test_raft.rs | 202 +++++++++++++++++++ harness/tests/test_util/mod.rs | 20 ++ 2 files changed, 222 insertions(+) diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index c0597a53..4bae14de 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -2953,6 +2953,208 @@ fn test_leader_append_response() { } } +// When follower replication is enabled, fn bcast_append's main control +// flow should merge MsgAppend in the same broadcast group correctly. +#[test] +fn test_bcast_append_with_follower_replication_success() { + let l = default_logger(); + // make a state machine with log.offset = 1000 + let offset = 1000u64; + let s = new_snapshot(offset, 1, vec![1, 2, 3, 4, 5, 6]); + let store = new_storage(); + store.wl().apply_snapshot(s).expect(""); + let mut sm = new_test_raft_with_follower_replication( + 1, + vec![1, 2, 3, 4, 5, 6], + 10, + 1, + store, + true, + &l, + ); + sm.term = 1; + + sm.become_candidate(); + sm.become_leader(); + // Assign broadcast group. + sm.assign_broadcast_groups(&vec![(1, 1), (2, 1), (3, 2), (4, 2), (5, 3), (6, 2)]); + for i in 0..6 { + let pr = sm.mut_prs().get_mut(i + 1).unwrap(); + // Make sure each peer is replicating and active recently. + pr.become_replicate(); + pr.recent_active = true; + // Advance progress to avoid snapshot. + pr.matched = offset; + pr.next_idx = offset + 1; + } + + for i in 0..10 { + let _ = sm.append_entry(&mut [empty_entry(0, offset + i + 1)]); + } + sm.persist(); + sm.bcast_append(); + + let msg = sm.read_messages(); + if msg.len() != 3 { + panic!("the number of msg is {}, want 3", msg.len()); + } + + for m in msg { + if m.to == 2 { + // Peer 2 is in leader's broadcast group, so use leader replication. + if m.get_msg_type() != MessageType::MsgAppend { + panic!("To peer #{}, msg type = {:?}, want MsgAppend", m.to, m.get_msg_type()); + } + } else if m.to == 4 { + // Peer 3, 4, 6 are merged. + // The agent can be peer 3 or 4 or 6. It does not affect correctness. + if m.get_msg_type() != MessageType::MsgGroupBroadcast { + panic!("To peer #{}, msg type = {:?}, want MsgGroupBroadcast", m.to, m.get_msg_type()); + } + if m.forwards.len() != 2 { + panic!("To peer #{}, the number of forwards = {:?}, want 2", m.to, m.forwards.len()); + } + if m.forwards[0].to != 6 || m.forwards[0].log_term != 1 || m.forwards[0].index != 1000 { + panic!("Forward info is wrong. {:?}", m.forwards[0]); + } + if m.forwards[1].to != 3 || m.forwards[1].log_term != 1 || m.forwards[1].index != 1000 { + panic!("Forward info is wrong. {:?}", m.forwards[1]); + } + } else if m.to == 5 { + // Peer 5 is the only peer in the broadcast group, so use leader replication. + if m.get_msg_type() != MessageType::MsgAppend { + panic!("To peer #{}, msg type = {:?}, want MsgAppend", m.to, m); + } + } else { + panic!("To peer #{}. Unexpected", m.to); + } + } +} + +// When follower replication is enabled, it may be degraded to leader replication +// in some situations. This test case checks those control flow. +#[test] +fn test_bcast_append_with_follower_replication_fail_1() { + let l = default_logger(); + // make a state machine with log.offset = 1000 + let offset = 1000u64; + let s = new_snapshot(offset, 1, vec![1, 2, 3, 4, 5, 6, 7]); + let store = new_storage(); + store.wl().apply_snapshot(s).expect(""); + let mut sm = new_test_raft_with_follower_replication( + 1, + vec![1, 2, 3, 4, 5, 6, 7], + 10, + 1, + store, + true, + &l, + ); + sm.term = 1; + + sm.become_candidate(); + sm.become_leader(); + // Assign broadcast group except peer 2. + sm.assign_broadcast_groups(&vec![(1, 1), (3, 2), (4, 2), (5, 3), (6, 3), (7, 4)]); + for i in 0..7 { + let pr = sm.mut_prs().get_mut(i + 1).unwrap(); + if i + 1 != 3 { + pr.become_replicate(); + } + if i + 1 != 5 && i + 1 != 6 { + pr.recent_active = true; + } else { + pr.recent_active = false; + } + if i + 1 != 7 { + pr.matched = offset; + pr.next_idx = offset + 1; + } + } + + for i in 0..10 { + let _ = sm.append_entry(&mut [empty_entry(0, offset + i + 1)]); + } + sm.persist(); + sm.bcast_append(); + + let msg = sm.read_messages(); + if msg.len() != 6 { + panic!("the number of msg is {}, want 6", msg.len()); + } + + // Peer 2 does not have group id. + // Peer 3 is not replicating. + // Peer 5 and 6 in group 3 but no valid agent. + // Peer 7 send MsgSnapshot which should be sent directly. + for m in msg { + if m.to == 7 { + // Message to peer 7 is MsgSnapshot. + if m.get_msg_type() != MessageType::MsgSnapshot { + panic!("To peer #{}, msg type = {:?}, want MsgSnapshot", m.to, m.get_msg_type()); + } + } else { + // Messages to peer 2-6 should be sent directly. + if m.get_msg_type() != MessageType::MsgAppend { + panic!("To peer #{}, msg type = {:?}, want MsgAppend", m.to, m); + } + } + } +} + +// When follower replication is enabled, it may be degraded to leader replication +// in some situations. This test case checks those control flow. +#[test] +fn test_bcast_append_with_follower_replication_fail_2() { + let l = default_logger(); + // make a state machine with log.offset = 1000 + let offset = 1000u64; + let s = new_snapshot(offset, 1, vec![1, 2, 3, 4, 5]); + let store = new_storage(); + store.wl().apply_snapshot(s).expect(""); + let mut sm = new_test_raft_with_follower_replication( + 1, + vec![1, 2, 3, 4, 5], + 10, + 1, + store, + true, + &l, + ); + sm.term = 1; + + sm.become_candidate(); + sm.become_leader(); + // Assign broadcast group except peer 1. + sm.assign_broadcast_groups(&vec![(2, 2), (3, 2), (4, 3), (5, 3)]); + for i in 0..5 { + let pr = sm.mut_prs().get_mut(i + 1).unwrap(); + pr.become_replicate(); + pr.recent_active = true; + pr.matched = offset; + pr.next_idx = offset + 1; + } + + for i in 0..10 { + let _ = sm.append_entry(&mut [empty_entry(0, offset + i + 1)]); + } + sm.persist(); + sm.bcast_append(); + + let msg = sm.read_messages(); + if msg.len() != 4 { + panic!("the number of msg is {}, want 4", msg.len()); + } + + for m in msg { + // Messages to peer 2-5 should be sent directly since leader's + // group id is not assigned. + if m.get_msg_type() != MessageType::MsgAppend { + panic!("To peer #{}, msg type = {:?}, want MsgAppend", m.to, m); + } + } +} + // When the leader receives a heartbeat tick, it should // send a MsgApp with m.Index = 0, m.LogTerm=0 and empty entries. #[test] diff --git a/harness/tests/test_util/mod.rs b/harness/tests/test_util/mod.rs index ec1144cf..ccc34563 100644 --- a/harness/tests/test_util/mod.rs +++ b/harness/tests/test_util/mod.rs @@ -86,6 +86,26 @@ pub fn new_test_raft_with_prevote( new_test_raft_with_config(&config, storage, l) } +pub fn new_test_raft_with_follower_replication( + id: u64, + peers: Vec, + election: usize, + heartbeat: usize, + storage: MemStorage, + follower_replication: bool, + l: &Logger, +) -> Interface { + let mut config = new_test_config(id, election, heartbeat); + config.follower_replication = follower_replication; + if storage.initial_state().unwrap().initialized() && peers.is_empty() { + panic!("new_test_raft with empty peers on initialized store"); + } + if !peers.is_empty() && !storage.initial_state().unwrap().initialized() { + storage.initialize_with_conf_state((peers, vec![])); + } + new_test_raft_with_config(&config, storage, l) +} + pub fn new_test_raft_with_logs( id: u64, peers: Vec,