diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index dc9fd02f..4bae14de 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -1347,6 +1347,286 @@ fn test_handle_msg_append() { } } +#[test] +fn test_handle_groupbroadcast() { + let l = default_logger(); + let ng = |term, + log_term, + index, + commit, + ents: Option>, + forwards: Option>| { + let mut m = Message::default(); + m.set_msg_type(MessageType::MsgGroupBroadcast); + // always forward messages from peer id 100 in this test + m.from = 100; + m.term = term; + m.log_term = log_term; + m.index = index; + m.commit = commit; + if let Some(ets) = ents { + m.entries = ets.iter().map(|&(i, t)| empty_entry(t, i)).collect(); + } + if let Some(fwds) = forwards { + m.forwards = fwds + .iter() + .map(|&(to, t, i)| new_forward(to, t, i)) + .collect(); + } + m + }; + + let na = |to, term, log_term, index, commit, ents: Option>| { + let mut m = Message::default(); + m.set_msg_type(MessageType::MsgAppend); + m.to = to; + m.term = term; + m.log_term = log_term; + m.index = index; + m.commit = commit; + if let Some(ets) = ents { + m.entries = ets.iter().map(|&(i, t)| empty_entry(t, i)).collect(); + } + m + }; + + let valiadate_msg = |msgapp: &Message, w_msg: &Message, j: usize| { + if msgapp.msg_type != MessageType::MsgAppend { + panic!("#{}: msg_type should be MsgAppend", j); + } + if msgapp.from != 100 { + panic!("#{}: from = {}, want {}", j, msgapp.from, w_msg.from); + } + if msgapp.to != w_msg.to { + panic!("#{}: to = {}, want {}", j, msgapp.to, w_msg.to); + } + if msgapp.term != w_msg.term { + panic!("#{}: term = {}, want {}", j, msgapp.term, w_msg.term); + } + if msgapp.log_term != w_msg.log_term { + panic!( + "#{}: log_term = {}, want {}", + j, msgapp.log_term, w_msg.log_term + ); + } + if msgapp.index != w_msg.index { + panic!("#{}: index = {}, want {}", j, msgapp.index, w_msg.index); + } + if msgapp.commit != w_msg.commit { + panic!("#{}: commit = {}, want {}", j, msgapp.commit, w_msg.commit); + } + if msgapp.get_entries() != w_msg.get_entries() { + panic!( + "#{}: entries length = {}, want {}", + j, + msgapp.get_entries().len(), + w_msg.get_entries().len() + ); + } + true + }; + + let valiadate_tests = |mut tests: Vec<(Message, u64, u64, bool, Message, Message)>| { + for (j, (m, w_index, w_commit, w_reject, fwd1, fwd2)) in tests.drain(..).enumerate() { + let mut sm = new_test_raft_with_logs( + 1, + vec![1], + 10, + 1, + MemStorage::new(), + &[empty_entry(1, 1), empty_entry(2, 2)], + &l, + ); + + sm.become_follower(2, INVALID_ID); + sm.handle_group_broadcast(&m); + if sm.raft_log.last_index() != w_index { + panic!( + "#{}: last_index = {}, want {}", + j, + sm.raft_log.last_index(), + w_index + ); + } + if sm.raft_log.committed != w_commit { + panic!( + "#{}: committed = {}, want {}", + j, sm.raft_log.committed, w_commit + ); + } + let msg = sm.read_messages(); + if msg.len() != 3 { + panic!("#{}: msg count = {}, want 3", j, msg.len()); + } + if msg[0].reject != w_reject { + panic!("#{}: reject = {}, want {}", j, msg[0].reject, w_reject); + } + + valiadate_msg(&msg[1], &fwd1, j); + valiadate_msg(&msg[2], &fwd2, j); + } + }; + + let tests = vec![ + // Ensure 1: + // If the agent fails to handle MsgAppend in MsgGroupBroadcast, the agent only forwards empty MsgAppend. + // Send empty MsgAppend even if the previous log in Forward matches. + // Because the agent cannot guarantee its raft log is up-to-date now. + ( + ng(2, 3, 2, 3, None, Some(vec![(200, 3, 2), (300, 1, 1)])), + 2, + 0, + true, + na(200, 2, 3, 2, 3, None), + na(300, 2, 1, 1, 3, None), + ), // previous log mismatch, + ( + ng(2, 3, 2, 3, None, Some(vec![(200, 3, 3), (300, 0, 0)])), + 2, + 0, + true, + na(200, 2, 3, 3, 3, None), + na(300, 2, 0, 0, 3, None), + ), // previous log mismatch, + ( + ng(2, 3, 3, 3, None, Some(vec![(200, 3, 2), (300, 1, 1)])), + 2, + 0, + true, + na(200, 2, 3, 2, 3, None), + na(300, 2, 1, 1, 3, None), + ), // previous log non-exist + ( + ng(2, 3, 3, 3, None, Some(vec![(200, 3, 3), (300, 0, 0)])), + 2, + 0, + true, + na(200, 2, 3, 3, 3, None), + na(300, 2, 0, 0, 3, None), + ), // previous log non-exist + // Ensure 2: + // If the agent appends or overwrites its local raft log successfully, + // it will forward MsgAppend according to previous log in Forward. + // The agent appends log entries in MsgGroupBroadcast. + ( + ng( + 2, + 2, + 2, + 3, + Some(vec![(3, 2), (4, 2)]), + Some(vec![(200, 2, 2), (300, 1, 1)]), + ), + 4, + 3, + false, + na(200, 2, 2, 2, 3, Some(vec![(3, 2), (4, 2)])), + na(300, 2, 1, 1, 3, Some(vec![(2, 2), (3, 2), (4, 2)])), + ), // previous log match + ( + ng( + 2, + 2, + 2, + 3, + Some(vec![(3, 2), (4, 2)]), + Some(vec![(200, 0, 0), (300, 2, 3)]), + ), + 4, + 3, + false, + na(200, 2, 0, 0, 3, Some(vec![(1, 1), (2, 2), (3, 2), (4, 2)])), + na(300, 2, 2, 3, 3, Some(vec![(4, 2)])), + ), // previous log match + ( + ng( + 2, + 2, + 2, + 4, + Some(vec![(3, 2), (4, 2)]), + Some(vec![(200, 1, 2), (300, 2, 1)]), + ), + 4, + 4, + false, + na(200, 2, 1, 2, 4, None), + na(300, 2, 2, 1, 4, None), + ), // previous log mismatch + ( + ng( + 2, + 2, + 2, + 3, + Some(vec![(3, 2), (4, 2)]), + Some(vec![(200, 2, 5), (300, 3, 6)]), + ), + 4, + 3, + false, + na(200, 2, 2, 5, 3, None), + na(300, 2, 3, 6, 3, None), + ), // previous log non-exist + // The agent overwrites log entries in MsgGroupBroadcast. + ( + ng( + 2, + 0, + 0, + 2, + Some(vec![(1, 2), (2, 2)]), + Some(vec![(200, 0, 0), (300, 1, 1)]), + ), + 2, + 2, + false, + na(200, 2, 0, 0, 2, Some(vec![(1, 2), (2, 2)])), + na(300, 2, 1, 1, 2, None), + ), + ( + ng( + 2, + 1, + 1, + 4, + Some(vec![(2, 2), (3, 2), (4, 2), (5, 2)]), + Some(vec![(200, 2, 4), (300, 1, 1)]), + ), + 5, + 4, + false, + na(200, 2, 2, 4, 4, Some(vec![(5, 2)])), + na(300, 2, 1, 1, 4, Some(vec![(2, 2), (3, 2), (4, 2), (5, 2)])), + ), + ( + ng(2, 1, 1, 1, None, Some(vec![(200, 0, 0), (300, 1, 1)])), + 2, + 1, + false, + na(200, 2, 0, 0, 1, Some(vec![(1, 1), (2, 2)])), + na(300, 2, 1, 1, 1, Some(vec![(2, 2)])), + ), + ( + ng( + 2, + 0, + 0, + 1, + Some(vec![(1, 2)]), + Some(vec![(200, 0, 0), (300, 1, 1)]), + ), + 1, + 1, + false, + na(200, 2, 0, 0, 1, Some(vec![(1, 2)])), + na(300, 2, 1, 1, 1, None), + ), + ]; + + valiadate_tests(tests); +} + // test_handle_heartbeat ensures that the follower commits to the commit in the message. #[test] fn test_handle_heartbeat() { @@ -2673,6 +2953,208 @@ fn test_leader_append_response() { } } +// When follower replication is enabled, fn bcast_append's main control +// flow should merge MsgAppend in the same broadcast group correctly. +#[test] +fn test_bcast_append_with_follower_replication_success() { + let l = default_logger(); + // make a state machine with log.offset = 1000 + let offset = 1000u64; + let s = new_snapshot(offset, 1, vec![1, 2, 3, 4, 5, 6]); + let store = new_storage(); + store.wl().apply_snapshot(s).expect(""); + let mut sm = new_test_raft_with_follower_replication( + 1, + vec![1, 2, 3, 4, 5, 6], + 10, + 1, + store, + true, + &l, + ); + sm.term = 1; + + sm.become_candidate(); + sm.become_leader(); + // Assign broadcast group. + sm.assign_broadcast_groups(&vec![(1, 1), (2, 1), (3, 2), (4, 2), (5, 3), (6, 2)]); + for i in 0..6 { + let pr = sm.mut_prs().get_mut(i + 1).unwrap(); + // Make sure each peer is replicating and active recently. + pr.become_replicate(); + pr.recent_active = true; + // Advance progress to avoid snapshot. + pr.matched = offset; + pr.next_idx = offset + 1; + } + + for i in 0..10 { + let _ = sm.append_entry(&mut [empty_entry(0, offset + i + 1)]); + } + sm.persist(); + sm.bcast_append(); + + let msg = sm.read_messages(); + if msg.len() != 3 { + panic!("the number of msg is {}, want 3", msg.len()); + } + + for m in msg { + if m.to == 2 { + // Peer 2 is in leader's broadcast group, so use leader replication. + if m.get_msg_type() != MessageType::MsgAppend { + panic!("To peer #{}, msg type = {:?}, want MsgAppend", m.to, m.get_msg_type()); + } + } else if m.to == 4 { + // Peer 3, 4, 6 are merged. + // The agent can be peer 3 or 4 or 6. It does not affect correctness. + if m.get_msg_type() != MessageType::MsgGroupBroadcast { + panic!("To peer #{}, msg type = {:?}, want MsgGroupBroadcast", m.to, m.get_msg_type()); + } + if m.forwards.len() != 2 { + panic!("To peer #{}, the number of forwards = {:?}, want 2", m.to, m.forwards.len()); + } + if m.forwards[0].to != 6 || m.forwards[0].log_term != 1 || m.forwards[0].index != 1000 { + panic!("Forward info is wrong. {:?}", m.forwards[0]); + } + if m.forwards[1].to != 3 || m.forwards[1].log_term != 1 || m.forwards[1].index != 1000 { + panic!("Forward info is wrong. {:?}", m.forwards[1]); + } + } else if m.to == 5 { + // Peer 5 is the only peer in the broadcast group, so use leader replication. + if m.get_msg_type() != MessageType::MsgAppend { + panic!("To peer #{}, msg type = {:?}, want MsgAppend", m.to, m); + } + } else { + panic!("To peer #{}. Unexpected", m.to); + } + } +} + +// When follower replication is enabled, it may be degraded to leader replication +// in some situations. This test case checks those control flow. +#[test] +fn test_bcast_append_with_follower_replication_fail_1() { + let l = default_logger(); + // make a state machine with log.offset = 1000 + let offset = 1000u64; + let s = new_snapshot(offset, 1, vec![1, 2, 3, 4, 5, 6, 7]); + let store = new_storage(); + store.wl().apply_snapshot(s).expect(""); + let mut sm = new_test_raft_with_follower_replication( + 1, + vec![1, 2, 3, 4, 5, 6, 7], + 10, + 1, + store, + true, + &l, + ); + sm.term = 1; + + sm.become_candidate(); + sm.become_leader(); + // Assign broadcast group except peer 2. + sm.assign_broadcast_groups(&vec![(1, 1), (3, 2), (4, 2), (5, 3), (6, 3), (7, 4)]); + for i in 0..7 { + let pr = sm.mut_prs().get_mut(i + 1).unwrap(); + if i + 1 != 3 { + pr.become_replicate(); + } + if i + 1 != 5 && i + 1 != 6 { + pr.recent_active = true; + } else { + pr.recent_active = false; + } + if i + 1 != 7 { + pr.matched = offset; + pr.next_idx = offset + 1; + } + } + + for i in 0..10 { + let _ = sm.append_entry(&mut [empty_entry(0, offset + i + 1)]); + } + sm.persist(); + sm.bcast_append(); + + let msg = sm.read_messages(); + if msg.len() != 6 { + panic!("the number of msg is {}, want 6", msg.len()); + } + + // Peer 2 does not have group id. + // Peer 3 is not replicating. + // Peer 5 and 6 in group 3 but no valid agent. + // Peer 7 send MsgSnapshot which should be sent directly. + for m in msg { + if m.to == 7 { + // Message to peer 7 is MsgSnapshot. + if m.get_msg_type() != MessageType::MsgSnapshot { + panic!("To peer #{}, msg type = {:?}, want MsgSnapshot", m.to, m.get_msg_type()); + } + } else { + // Messages to peer 2-6 should be sent directly. + if m.get_msg_type() != MessageType::MsgAppend { + panic!("To peer #{}, msg type = {:?}, want MsgAppend", m.to, m); + } + } + } +} + +// When follower replication is enabled, it may be degraded to leader replication +// in some situations. This test case checks those control flow. +#[test] +fn test_bcast_append_with_follower_replication_fail_2() { + let l = default_logger(); + // make a state machine with log.offset = 1000 + let offset = 1000u64; + let s = new_snapshot(offset, 1, vec![1, 2, 3, 4, 5]); + let store = new_storage(); + store.wl().apply_snapshot(s).expect(""); + let mut sm = new_test_raft_with_follower_replication( + 1, + vec![1, 2, 3, 4, 5], + 10, + 1, + store, + true, + &l, + ); + sm.term = 1; + + sm.become_candidate(); + sm.become_leader(); + // Assign broadcast group except peer 1. + sm.assign_broadcast_groups(&vec![(2, 2), (3, 2), (4, 3), (5, 3)]); + for i in 0..5 { + let pr = sm.mut_prs().get_mut(i + 1).unwrap(); + pr.become_replicate(); + pr.recent_active = true; + pr.matched = offset; + pr.next_idx = offset + 1; + } + + for i in 0..10 { + let _ = sm.append_entry(&mut [empty_entry(0, offset + i + 1)]); + } + sm.persist(); + sm.bcast_append(); + + let msg = sm.read_messages(); + if msg.len() != 4 { + panic!("the number of msg is {}, want 4", msg.len()); + } + + for m in msg { + // Messages to peer 2-5 should be sent directly since leader's + // group id is not assigned. + if m.get_msg_type() != MessageType::MsgAppend { + panic!("To peer #{}, msg type = {:?}, want MsgAppend", m.to, m); + } + } +} + // When the leader receives a heartbeat tick, it should // send a MsgApp with m.Index = 0, m.LogTerm=0 and empty entries. #[test] diff --git a/harness/tests/test_util/mod.rs b/harness/tests/test_util/mod.rs index d7864ad5..ccc34563 100644 --- a/harness/tests/test_util/mod.rs +++ b/harness/tests/test_util/mod.rs @@ -86,6 +86,26 @@ pub fn new_test_raft_with_prevote( new_test_raft_with_config(&config, storage, l) } +pub fn new_test_raft_with_follower_replication( + id: u64, + peers: Vec, + election: usize, + heartbeat: usize, + storage: MemStorage, + follower_replication: bool, + l: &Logger, +) -> Interface { + let mut config = new_test_config(id, election, heartbeat); + config.follower_replication = follower_replication; + if storage.initial_state().unwrap().initialized() && peers.is_empty() { + panic!("new_test_raft with empty peers on initialized store"); + } + if !peers.is_empty() && !storage.initial_state().unwrap().initialized() { + storage.initialize_with_conf_state((peers, vec![])); + } + new_test_raft_with_config(&config, storage, l) +} + pub fn new_test_raft_with_logs( id: u64, peers: Vec, @@ -172,6 +192,14 @@ pub fn new_snapshot(index: u64, term: u64, voters: Vec) -> Snapshot { s } +pub fn new_forward(to: u64, term: u64, index: u64) -> Forward { + let mut f = Forward::default(); + f.to = to; + f.log_term = term; + f.index = index; + f +} + pub fn conf_change(ty: ConfChangeType, node_id: u64) -> ConfChange { let mut cc = ConfChange::default(); cc.node_id = node_id; diff --git a/proto/proto/eraftpb.proto b/proto/proto/eraftpb.proto index 1f7c71b2..af5174e1 100644 --- a/proto/proto/eraftpb.proto +++ b/proto/proto/eraftpb.proto @@ -46,6 +46,17 @@ message Snapshot { SnapshotMetadata metadata = 2; } +// Forward is a type that tells the agent how to forward the MsgGroupBroadcast from the leader. +// +// Field to is the destination of forwarding. +// Field log_term and index is the previous entry of log entries that should be forwarded. +// Entries to be forwarded is the range (index, last_index]. +message Forward { + uint64 to = 1; + uint64 log_term = 2; + uint64 index = 3; +} + enum MessageType { MsgHup = 0; MsgBeat = 1; @@ -66,6 +77,8 @@ enum MessageType { MsgReadIndexResp = 16; MsgRequestPreVote = 17; MsgRequestPreVoteResponse = 18; + MsgGroupBroadcast = 19; + MsgGroupBroadcastResponse = 20; } message Message { @@ -89,6 +102,7 @@ message Message { uint64 reject_hint = 11; bytes context = 12; uint64 priority = 14; + repeated Forward forwards = 16; } message HardState { diff --git a/src/config.rs b/src/config.rs index 392540db..3452d0d4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -67,6 +67,10 @@ pub struct Config { /// rejoins the cluster. pub pre_vote: bool, + /// Enables follower replication. + /// This reduces the across-AZ traffic of cloud deployment. + pub follower_replication: bool, + /// The range of election timeout. In some cases, we hope some nodes has less possibility /// to become leader. This configuration ensures that the randomized election_timeout /// will always be suit in [min_election_tick, max_election_tick). @@ -112,6 +116,7 @@ impl Default for Config { max_inflight_msgs: 256, check_quorum: false, pre_vote: false, + follower_replication: false, min_election_tick: 0, max_election_tick: 0, read_only_option: ReadOnlyOption::Safe, diff --git a/src/raft.rs b/src/raft.rs index fe397ed1..9028d824 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -18,8 +18,8 @@ use std::cmp; use std::ops::{Deref, DerefMut}; use crate::eraftpb::{ - ConfChange, ConfChangeV2, ConfState, Entry, EntryType, HardState, Message, MessageType, - Snapshot, + ConfChange, ConfChangeV2, ConfState, Entry, EntryType, Forward, HardState, Message, + MessageType, Snapshot, }; use protobuf::Message as _; use raft_proto::ConfChangeI; @@ -236,6 +236,11 @@ pub struct RaftCore { /// Enable this if greater cluster stability is preferred over faster elections. pub pre_vote: bool, + // Enable follower replication. + // This enables data replication from a follower to other servers in the same available zone. + // Enable this for reducing across-AZ traffic of cloud deployment. + follower_replication: bool, + skip_bcast_commit: bool, batch_append: bool, @@ -260,6 +265,10 @@ pub struct RaftCore { /// Max size per committed entries in a `Read`. pub(crate) max_committed_size_per_ready: u64, + + // Message group cache for follower replication. + // Since the number of groups is small, use vector instead of hashmap. + msg_group: Vec<(u64, Vec<(Message, bool)>)>, } /// A struct that represents the raft consensus itself. Stores details concerning the current @@ -337,6 +346,7 @@ impl Raft { promotable: false, check_quorum: c.check_quorum, pre_vote: c.pre_vote, + follower_replication: c.follower_replication, read_only: ReadOnly::new(c.read_only_option), heartbeat_timeout: c.heartbeat_tick, election_timeout: c.election_tick, @@ -360,6 +370,7 @@ impl Raft { last_log_tail_index: 0, }, max_committed_size_per_ready: c.max_committed_size_per_ready, + msg_group: Vec::default(), }, }; confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?; @@ -455,6 +466,11 @@ impl Raft { hs } + /// Whether enable follower replication. + pub fn follower_replication(&self) -> bool { + self.follower_replication + } + /// Returns whether the current raft is in lease. pub fn in_lease(&self) -> bool { self.state == StateRole::Leader && self.check_quorum @@ -499,6 +515,31 @@ impl Raft { self.batch_append = batch_append; } + /// Assigns broadcast groups to peers. + /// + /// The tuple is (`peer_id`, `group_id`). `group_id` should be larger than 0. + /// + /// The group information is only stored in memory. So you need to configure + /// it every time a raft state machine is initialized or a snapshot is applied. + pub fn assign_broadcast_groups(&mut self, ids: &[(u64, u64)]) { + let prs = self.mut_prs(); + for (peer_id, group_id) in ids { + assert!(*group_id > 0); + if let Some(pr) = prs.get_mut(*peer_id) { + pr.broadcast_group_id = *group_id; + } else { + continue; + } + } + } + + /// Removes all broadcast group configurations. + pub fn clear_broadcast_group(&mut self) { + for (_, pr) in self.mut_prs().iter_mut() { + pr.broadcast_group_id = 0; + } + } + /// Configures group commit. /// /// If group commit is enabled, only logs replicated to at least two @@ -517,6 +558,31 @@ impl Raft { self.prs().group_commit() } + /// Checks whether the raft group is using group commit and consistent + /// over group. + /// + /// If it can't get a correct answer, `None` is returned. + pub fn check_group_commit_consistent(&mut self) -> Option { + if self.state != StateRole::Leader { + return None; + } + // Previous leader may have reach consistency already. + // + // check applied_index instead of committed_index to avoid pending conf change. + if !self.apply_to_current_term() { + return None; + } + let (index, use_group_commit) = self.mut_prs().maximal_committed_index(); + debug!( + self.logger, + "check group commit consistent"; + "index" => index, + "use_group_commit" => use_group_commit, + "committed" => self.raft_log.committed + ); + Some(use_group_commit && index == self.raft_log.committed) + } + /// Assigns groups to peers. /// /// The tuple is (`peer_id`, `group_id`). `group_id` should be larger than 0. @@ -545,31 +611,6 @@ impl Raft { } } - /// Checks whether the raft group is using group commit and consistent - /// over group. - /// - /// If it can't get a correct answer, `None` is returned. - pub fn check_group_commit_consistent(&mut self) -> Option { - if self.state != StateRole::Leader { - return None; - } - // Previous leader may have reach consistency already. - // - // check applied_index instead of committed_index to avoid pending conf change. - if !self.apply_to_current_term() { - return None; - } - let (index, use_group_commit) = self.mut_prs().maximal_committed_index(); - debug!( - self.logger, - "check group commit consistent"; - "index" => index, - "use_group_commit" => use_group_commit, - "committed" => self.raft_log.committed - ); - Some(use_group_commit && index == self.raft_log.committed) - } - /// Checks if logs are committed to its term. /// /// The check is useful usually when raft is leader. @@ -836,6 +877,79 @@ impl RaftCore { true } + // Pack MsgAppend according to forward info, and send it to target peer. + fn send_forward( + &mut self, + from: u64, + commit: u64, + commit_term: u64, + forward: &Forward, + msgs: &mut Vec, + ) { + // initialize MsgAppend + let mut m = Message::default(); + m.to = forward.to; + m.from = from; + m.index = forward.get_index(); + m.log_term = forward.get_log_term(); + m.commit = commit; + m.commit_term = commit_term; + m.set_msg_type(MessageType::MsgAppend); + + // If log_term and index in forward info mismatch with agent's raft log, + // the agent just sends empty MsgAppend. + // Empty MsgAppend is only to update commit or trigger decrementing next index . + if !self + .raft_log + .match_term(forward.get_index(), forward.get_log_term()) + { + self.send(m, msgs); + warn!( + self.logger, + "index {}, log term {} in forward message to peer {}, do not match the agent's raft log.", + forward.get_index(), + forward.get_log_term(), + forward.get_to() + ); + return; + } + + // Fetch log entries from index in forward info to the last index of log. + let ents = self.raft_log.entries( + forward.get_index() + 1, + self.max_msg_size, + GetEntriesContext(GetEntriesFor::SendForward { + from, + commit, + commit_term, + term: self.term, + forward: forward.clone(), + }), + ); + + match ents { + Ok(ents) => { + m.set_entries(ents.into()); + self.send(m, msgs); + } + Err(Error::Store(StorageError::LogTemporarilyUnavailable)) => { + // wait for storage to fetch entries asynchronously + } + _ => { + // If the agent fails to fetch log entries, send MsgAppend with empty entries + // in order to update commit, or trigger decrementing next_idx. + self.send(m, msgs); + warn!( + self.logger, + "The agent fails to fetch entries, index {} log term {} in forward message to peer {}.", + forward.get_index(), + forward.get_log_term(), + forward.get_to() + ); + } + } + } + // send_heartbeat sends an empty MsgAppend fn send_heartbeat( &mut self, @@ -888,12 +1002,118 @@ impl Raft { /// according to the progress recorded in r.prs(). pub fn bcast_append(&mut self) { let self_id = self.id; + let mut leader_group_id = 0; + // Use leader replication if follower replication is disabled or + // the broadcast group id of leader is unknown. Broadcast MsgAppend + // as normal. + let mut use_leader_replication = !self.follower_replication(); + if !use_leader_replication { + leader_group_id = self + .prs() + .get(self_id) + .map_or(0, |pr| pr.broadcast_group_id); + use_leader_replication = leader_group_id == 0; + } + if use_leader_replication { + let core = &mut self.r; + let msgs = &mut self.msgs; + self.prs + .iter_mut() + .filter(|&(id, _)| *id != self_id) + .for_each(|(id, pr)| core.send_append(*id, pr, msgs)); + return; + } + + // If follower replication is enabled, MsgAppends sent to the same broadcast group + // will be merge into a MsgGroupBroadcast. + // + // Messages that needs to be forwarded are stored in cache temporarily, + // and they are grouped by broadcast_group_id of progress. + // Messages in msg_group will be pushed to message queue later. let core = &mut self.r; let msgs = &mut self.msgs; - self.prs - .iter_mut() + let prs = &mut self.prs.progress; + let conf = &self.prs.conf; + prs.iter_mut() .filter(|&(id, _)| *id != self_id) - .for_each(|(id, pr)| core.send_append(*id, pr, msgs)); + .for_each(|(id, pr)| { + let mut tmp_msgs = Vec::default(); + // Let messages be pushed into tmp_vec firstly. + core.send_append(*id, pr, &mut tmp_msgs); + // Filter out messages that need to be forwarded into msg_group. Other messages + // are sent directly. + if pr.broadcast_group_id == leader_group_id || !pr.is_replicating() { + msgs.extend(tmp_msgs); + return; + } + let is_voter = conf.voters().contains(*id); + for msg in tmp_msgs { + if msg.get_msg_type() != MessageType::MsgAppend { + msgs.push(msg); + } else { + // Search the target group. + let mut group_idx = None; + for (idx, (group_id, _)) in core.msg_group.iter().enumerate() { + if *group_id == pr.broadcast_group_id { + group_idx = Some(idx); + break; + } + } + // The agent must be a voter and active recently. + let msg_forward = (msg, pr.recent_active && is_voter); + if let Some(idx) = group_idx { + core.msg_group[idx].1.push(msg_forward) + } else { + core.msg_group + .push((pr.broadcast_group_id, vec![msg_forward])); + } + } + } + }); + + // Merge messages in the same broadcast group and send them. + for (_, mut group) in core.msg_group.drain(..) { + let mut need_merge = group.len() > 1; + let mut agent_msg_idx = None; + if need_merge { + // If found an appropriate agent, return the index of agent's message. Otherwise, return None. + agent_msg_idx = group + .iter() + .position(|(_, is_agent_candidate)| *is_agent_candidate); + need_merge = agent_msg_idx.is_some(); + } + // Do not need to merge if group size is less than two. Or there is no appropriate agent. + if !need_merge { + msgs.append(&mut group.into_iter().map(|(msg, _)| msg).collect()); + continue; + } + + // Record forward information + let mut forwards: Vec = Vec::default(); + for (idx, (msg, _)) in group.iter().enumerate() { + // MsgAppend sent to other peers in this group only reserve basic forward information. + if idx != agent_msg_idx.unwrap() { + let forward = Forward { + to: msg.to, + log_term: msg.log_term, + index: msg.index, + ..Default::default() + }; + forwards.push(forward); + } + } + // Attach forward information to MsgGroupbroadcast and send it. + let mut agent_msg = group.swap_remove(agent_msg_idx.unwrap()).0; + agent_msg.set_msg_type(MessageType::MsgGroupBroadcast); + agent_msg.set_forwards(forwards.into()); + msgs.push(agent_msg); + } + } + + /// Forwards an append RPC from the leader to the given peer. + pub fn send_forward(&mut self, from: u64, commit: u64, commit_term: u64, forward: &Forward) { + self.r + .send_forward(from, commit, commit_term, forward, &mut self.msgs); } /// Broadcasts heartbeats to all the followers if it's leader. @@ -1372,6 +1592,7 @@ impl Raft { if m.get_msg_type() == MessageType::MsgAppend || m.get_msg_type() == MessageType::MsgHeartbeat || m.get_msg_type() == MessageType::MsgSnapshot + || m.get_msg_type() == MessageType::MsgGroupBroadcast { self.become_follower(m.term, m.from); } else { @@ -1381,7 +1602,8 @@ impl Raft { } else if m.term < self.term { if (self.check_quorum || self.pre_vote) && (m.get_msg_type() == MessageType::MsgHeartbeat - || m.get_msg_type() == MessageType::MsgAppend) + || m.get_msg_type() == MessageType::MsgAppend + || m.get_msg_type() == MessageType::MsgGroupBroadcast) { // We have received messages from a leader at a lower term. It is possible // that these messages were simply delayed in the network, but this could @@ -1810,6 +2032,20 @@ impl Raft { } } + fn handle_group_broadcast_response(&mut self, m: &Message) { + if m.reject { + // The agent failed to forward MsgAppend, so the leader re-sends it. + for forward in m.get_forwards() { + info!( + self.logger, + "The agent's index is {} while target peer's index is {}", + m.get_index(), + forward.get_index(); + ); + } + } + } + fn handle_heartbeat_response(&mut self, m: &Message) { // Update the node. Drop the value explicitly since we'll check the qourum after. let pr = match self.prs.get_mut(m.from) { @@ -2132,6 +2368,9 @@ impl Raft { MessageType::MsgAppendResponse => { self.handle_append_response(&m); } + MessageType::MsgGroupBroadcastResponse => { + self.handle_group_broadcast_response(&m); + } MessageType::MsgHeartbeatResponse => { self.handle_heartbeat_response(&m); } @@ -2258,6 +2497,11 @@ impl Raft { self.become_follower(m.term, m.from); self.handle_append_entries(&m); } + MessageType::MsgGroupBroadcast => { + debug_assert_eq!(self.term, m.term); + self.become_follower(m.term, m.from); + self.handle_group_broadcast(&m); + } MessageType::MsgHeartbeat => { debug_assert_eq!(self.term, m.term); self.become_follower(m.term, m.from); @@ -2314,6 +2558,11 @@ impl Raft { self.leader_id = m.from; self.handle_append_entries(&m); } + MessageType::MsgGroupBroadcast => { + self.election_elapsed = 0; + self.leader_id = m.from; + self.handle_group_broadcast(&m); + } MessageType::MsgHeartbeat => { self.election_elapsed = 0; self.leader_id = m.from; @@ -2425,13 +2674,14 @@ impl Raft { Err(Error::RequestSnapshotDropped) } - // TODO: revoke pub when there is a better way to test. - /// For a given message, append the entries to the log. - pub fn handle_append_entries(&mut self, m: &Message) { + /// Try to append entries, and return the append result. + /// Return true only if the entries in the message has been appended in the log successfully. + pub fn try_append_entries(&mut self, m: &Message) -> bool { if self.pending_request_snapshot != INVALID_INDEX { self.send_request_snapshot(); - return; + return false; } + if m.index < self.raft_log.committed { debug!( self.logger, @@ -2443,13 +2693,14 @@ impl Raft { to_send.index = self.raft_log.committed; to_send.commit = self.raft_log.committed; self.r.send(to_send, &mut self.msgs); - return; + return false; } let mut to_send = Message::default(); to_send.to = m.from; to_send.set_msg_type(MessageType::MsgAppendResponse); + let mut success = true; if let Some((_, last_idx)) = self .raft_log .maybe_append(m.index, m.log_term, m.commit, &m.entries) @@ -2458,7 +2709,7 @@ impl Raft { } else { debug!( self.logger, - "rejected msgApp [logterm: {msg_log_term}, index: {msg_index}] \ + "reject append [logterm: {msg_log_term}, index: {msg_index}] \ from {from}", msg_log_term = m.log_term, msg_index = m.index, @@ -2483,9 +2734,64 @@ impl Raft { to_send.reject = true; to_send.reject_hint = hint_index; to_send.log_term = hint_term.unwrap(); + success = false; } to_send.set_commit(self.raft_log.committed); self.r.send(to_send, &mut self.msgs); + success + } + + // TODO: revoke pub when there is a better way to test. + /// For a given message, append the entries to the log. + pub fn handle_append_entries(&mut self, m: &Message) { + self.try_append_entries(m); + } + + /// For a group broadcast, append entries to local log and forward MsgAppend to other dest. + /// The usage of group broadcast is in examples/follower_replication/main.rs. + pub fn handle_group_broadcast(&mut self, m: &Message) { + // The agent should handle appending log entries in MsgGroupBroadcast firstly, in order to + // guarantee that agent's local raft log is identical to leader's log up through agent's last index. + // If the agent fails to append log entries, agent's log cannot be used for frowarding. + if !self.try_append_entries(m) { + // If the agent fails to append log entries, there are three cases. + // 1. The agent is pending request snapshot. To avoid dead loop, upper layer + // should only send MsgGroupBroadcast to peers which are in Replicate state. + // 2. Log entries in MsgGroupBroadcast is conflict with committed. + // 3. Log entries in MsgGroupBroadcast is conflict with agent's log. + // + // If the agent's raft log might be conflict with leader's raft log, + // it just sends empty MsgAppends to target peers. + + for forward in m.get_forwards() { + let mut m_append = Message::default(); + m_append.to = forward.to; + m_append.from = m.from; + m_append.commit = m.commit; + m_append.commit_term = m.commit_term; + m_append.set_msg_type(MessageType::MsgAppend); + m_append.index = forward.get_index(); + m_append.log_term = forward.get_log_term(); + self.r.send(m_append, &mut self.msgs); + } + info!( + self.logger, + "the agent rejects append [logterm: {msg_log_term}, index: {msg_index}] \ + from {from}", + msg_log_term = m.log_term, + msg_index = m.index, + from = m.from; + "index" => m.index, + "logterm" => ?self.raft_log.term(m.index), + ); + return; + } + + // Pack MsgAppend with local raft log and forward to target peers. + for forward in m.get_forwards() { + self.r + .send_forward(m.from, m.commit, m.commit_term, forward, &mut self.msgs); + } } // TODO: revoke pub when there is a better way to test. @@ -2893,4 +3199,14 @@ impl Raft { pr.ins.set_cap(cap); } } + + /// Whether two peers are in the same broadcast group. + pub fn is_in_same_broadcast_group(&self, id: u64, id_other: u64) -> bool { + let group_id = self.prs().get(id).map_or(0, |pr| pr.broadcast_group_id); + let other_group_id = self + .prs() + .get(id_other) + .map_or(0, |pr| pr.broadcast_group_id); + group_id != 0 && other_group_id != 0 && group_id == other_group_id + } } diff --git a/src/raw_node.rs b/src/raw_node.rs index a15a1489..d315339a 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -439,6 +439,24 @@ impl RawNode { self.raft.send_append(to) } } + GetEntriesFor::SendForward { + from, + commit, + commit_term, + term, + forward, + } => { + if self.raft.term != term || self.raft.state == StateRole::Leader { + // term or leadership has changed + // this peer is not the agent + return; + } + if self.raft.prs().get(forward.to).is_none() { + // the peer has been removed, do nothing + return; + } + self.raft.send_forward(from, commit, commit_term, &forward); + } GetEntriesFor::Empty(can_async) if can_async => {} _ => panic!("shouldn't call callback on non-async context"), } diff --git a/src/storage.rs b/src/storage.rs index f9b71fab..e05779bb 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -70,6 +70,7 @@ impl GetEntriesContext { pub fn can_async(&self) -> bool { match self.0 { GetEntriesFor::SendAppend { .. } => true, + GetEntriesFor::SendForward { .. } => true, GetEntriesFor::Empty(can_async) => can_async, _ => false, } @@ -87,6 +88,19 @@ pub(crate) enum GetEntriesFor { /// whether to exhaust all the entries aggressively: bool, }, + // for forwarding entries to followers + SendForward { + /// the peer id from which the entries are forwarded + from: u64, + /// the commit index in MsgGroupbroadcast + commit: u64, + /// the commit term in MsgGroupbroadcast + commit_term: u64, + /// the term when the request is issued + term: u64, + /// the forward information in MsgGroupbroadcast + forward: Forward, + }, // for getting committed entries in a ready GenReady, // for getting entries to check pending conf when transferring leader diff --git a/src/tracker.rs b/src/tracker.rs index 5424e7bf..1c6a4046 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -190,11 +190,11 @@ impl AckedIndexer for ProgressMap { /// which could be `Leader`, `Follower` and `Learner`. #[derive(Clone, Getters)] pub struct ProgressTracker { - progress: ProgressMap, + pub(crate) progress: ProgressMap, /// The current configuration state of the cluster. #[get = "pub"] - conf: Configuration, + pub(crate) conf: Configuration, #[doc(hidden)] #[get = "pub"] votes: HashMap, diff --git a/src/tracker/progress.rs b/src/tracker/progress.rs index 2f86f5a7..cedb88cd 100644 --- a/src/tracker/progress.rs +++ b/src/tracker/progress.rs @@ -51,6 +51,10 @@ pub struct Progress { /// Only logs replicated to different group will be committed if any group is configured. pub commit_group_id: u64, + /// Leader only replicates log entries to the agent of each group, + /// and the agent broadcasts logs within a group. + pub broadcast_group_id: u64, + /// Committed index in raft_log pub committed_index: u64, } @@ -68,6 +72,7 @@ impl Progress { recent_active: false, ins: Inflights::new(ins_size), commit_group_id: 0, + broadcast_group_id: 0, committed_index: 0, } } @@ -203,6 +208,12 @@ impl Progress { true } + /// Determine whether progress is in the Replicate state; + #[inline] + pub fn is_replicating(&self) -> bool { + self.state == ProgressState::Replicate + } + /// Determine whether progress is paused. #[inline] pub fn is_paused(&self) -> bool {