Skip to content

Commit

Permalink
async log entries fetch for forwarding
Browse files Browse the repository at this point in the history
Signed-off-by: LintianShi <lintian.shi@pingcap.com>
  • Loading branch information
LintianShi committed Sep 25, 2022
1 parent 1fcdb47 commit eed70bc
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 96 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ pub use raw_node::is_empty_snap;
pub use raw_node::{LightReady, Peer, RawNode, Ready, SnapshotStatus};
pub use read_only::{ReadOnlyOption, ReadState};
pub use status::Status;
pub use storage::{GetEntriesContext, RaftState, Storage};
pub use storage::{GetEntriesContext, GetEntriesFor, RaftState, Storage};
pub use tracker::{Inflights, Progress, ProgressState, ProgressTracker};
pub use util::majority;

Expand Down
147 changes: 74 additions & 73 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,63 @@ impl<T: Storage> RaftCore<T> {
true
}

fn send_forward(
&mut self,
from: u64,
commit: u64,
commit_term: u64,
forward: &Forward,
msgs: &mut Vec<Message>,
) {
let mut m = Message::default();
m.to = forward.to;
m.from = from;
m.commit = commit;
m.commit_term = commit_term;
m.set_msg_type(MessageType::MsgAppend);
// Fetch log entries from the forward.index to the last index of log.
if self
.raft_log
.match_term(forward.get_index(), forward.get_log_term())
{
let ents = self.raft_log.entries(
forward.get_index() + 1,
self.max_msg_size,
GetEntriesContext(GetEntriesFor::SendForward {
from,
commit,
commit_term,
term: self.term,
forward: forward.clone(),
}),
);

match ents {
Ok(ents) => {
m.index = forward.get_index();
m.log_term = forward.get_log_term();
m.set_entries(ents.into());
self.send(m, msgs);
}
Err(Error::Store(StorageError::LogTemporarilyUnavailable)) => {}
_ => {
// Forward MsgAppend with empty entries in order to update commit
// or trigger decrementing next_idx.
m.index = forward.get_index();
m.log_term = forward.get_log_term();
self.send(m, msgs);
warn!(
self.logger,
"The agent fails to fetch entries, index {} log term {} in forward message to peer {}.",
forward.get_index(),
forward.get_log_term(),
forward.get_to()
);
}
}
}
}

// send_heartbeat sends an empty MsgAppend
fn send_heartbeat(
&mut self,
Expand Down Expand Up @@ -904,6 +961,12 @@ impl<T: Storage> Raft<T> {
.for_each(|(id, pr)| core.send_append(*id, pr, msgs));
}

/// Forwards an append RPC from the leader to the given peer.
pub fn send_forward(&mut self, from: u64, commit: u64, commit_term: u64, forward: &Forward) {
self.r
.send_forward(from, commit, commit_term, forward, &mut self.msgs);
}

/// Broadcasts heartbeats to all the followers if it's leader.
pub fn ping(&mut self) {
if self.state == StateRole::Leader {
Expand Down Expand Up @@ -2541,59 +2604,20 @@ impl<T: Storage> Raft<T> {
// If the agent fails to append entries from the leader,
// the agent cannot forward MsgAppend.
for forward in m.get_forwards() {
// Fetch log entries from the forward.index to the last index of log.
if self
.raft_log
.match_term(forward.get_index(), forward.get_log_term())
{
let ents = self.raft_log.entries(
forward.get_index() + 1,
self.max_msg_size,
GetEntriesContext(GetEntriesFor::SendAppend {
to: forward.get_to(),
term: m.term,
aggressively: false,
}),
);

match ents {
Ok(ents) => {
let mut m_append = Message::default();
m_append.to = forward.get_to();
m_append.from = m.get_from();
m_append.set_msg_type(MessageType::MsgAppend);
m_append.index = forward.get_index();
m_append.log_term = forward.get_log_term();
m_append.set_entries(ents.into());
m_append.commit = m.get_commit();
m_append.commit_term = m.get_commit_term();
self.r.send(m_append, &mut self.msgs);
}
Err(_) => {
self.dummy_forward(m, forward);
warn!(
self.logger,
"The agent fails to fetch entries, index {} log term {} in forward message to peer {}.",
forward.get_index(),
forward.get_log_term(),
forward.get_to()
);
}
}
} else {
self.dummy_forward(m, forward);
warn!(
self.logger,
"The agent's log does not match with index {} log term {} in forward message to peer {}.",
forward.get_index(),
forward.get_log_term(),
forward.get_to()
);
}
self.r
.send_forward(m.from, m.commit, m.commit_term, forward, &mut self.msgs);
}
} else {
for forward in m.get_forwards() {
self.dummy_forward(m, forward);
let mut m_append = Message::default();
m_append.to = forward.to;
m_append.from = m.from;
m_append.commit = m.commit;
m_append.commit_term = m.commit_term;
m_append.set_msg_type(MessageType::MsgAppend);
m_append.index = forward.get_index();
m_append.log_term = forward.get_log_term();
self.r.send(m_append, &mut self.msgs);
}
info!(
self.logger,
Expand Down Expand Up @@ -2937,29 +2961,6 @@ impl<T: Storage> Raft<T> {
self.lead_transferee = None;
}

// Forward MsgAppend with empty entries in order to update commit
// or trigger decrementing next_idx.
fn dummy_forward(&mut self, m: &Message, forward: &Forward) {
let mut m_append = Message::default();
m_append.to = forward.get_to();
m_append.from = m.get_from();
m_append.set_msg_type(MessageType::MsgAppend);
m_append.index = forward.get_index();
m_append.log_term = forward.get_log_term();
m_append.commit = m.get_commit();
m_append.commit_term = m.get_commit_term();

info!(
self.logger,
"The agent forwards reserved empty log entry [logterm: {msg_log_term}, index: {msg_index}] \
to peer {id}",
msg_log_term = forward.log_term,
msg_index = forward.index,
id = forward.to;
);
self.r.send(m_append, &mut self.msgs);
}

fn send_request_snapshot(&mut self) {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgAppendResponse);
Expand Down
13 changes: 13 additions & 0 deletions src/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,19 @@ impl<T: Storage> RawNode<T> {
self.raft.send_append(to)
}
}
GetEntriesFor::SendForward {
from,
commit,
commit_term,
term,
forward,
} => {
if self.raft.term != term {
// term has changed
return;
}
self.raft.send_forward(from, commit, commit_term, &forward);
}
GetEntriesFor::Empty(can_async) if can_async => {}
_ => panic!("shouldn't call callback on non-async context"),
}
Expand Down
59 changes: 37 additions & 22 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,44 @@ impl RaftState {
}
}

/// Purpose of getting entries.
#[derive(Debug)]
pub enum GetEntriesFor {
/// For sending entries to followers.
SendAppend {
/// The peer id to which the entries are going to send.
to: u64,
/// The term when the request is issued.
term: u64,
/// Whether to exhaust all the entries.
aggressively: bool,
},
/// For forwarding entries to followers.
SendForward {
/// The peer id from which the entries are forwarded.
from: u64,
/// The commit index in MsgGroupbroadcast.
commit: u64,
/// The commit term in MsgGroupbroadcast.
commit_term: u64,
/// The term when the request is issued.
term: u64,
/// The forward information in MsgGroupbroadcast.
forward: Forward,
},
/// For getting committed entries in a ready.
GenReady,
/// For getting entries to check pending conf when transferring leader.
TransferLeader,
/// For getting entries to check pending conf when forwarding commit index by vote messages
CommitByVote,
/// It's not called by the raft itself.
Empty(bool),
}

/// Records the context of the caller who calls entries() of Storage trait.
#[derive(Debug)]
pub struct GetEntriesContext(pub(crate) GetEntriesFor);
pub struct GetEntriesContext(pub GetEntriesFor);

impl GetEntriesContext {
/// Used for callers out of raft. Caller can customize if it supports async.
Expand All @@ -70,33 +105,13 @@ impl GetEntriesContext {
pub fn can_async(&self) -> bool {
match self.0 {
GetEntriesFor::SendAppend { .. } => true,
GetEntriesFor::SendForward { .. } => true,
GetEntriesFor::Empty(can_async) => can_async,
_ => false,
}
}
}

#[derive(Debug)]
pub(crate) enum GetEntriesFor {
// for sending entries to followers
SendAppend {
/// the peer id to which the entries are going to send
to: u64,
/// the term when the request is issued
term: u64,
/// whether to exhaust all the entries
aggressively: bool,
},
// for getting committed entries in a ready
GenReady,
// for getting entries to check pending conf when transferring leader
TransferLeader,
// for getting entries to check pending conf when forwarding commit index by vote messages
CommitByVote,
// It's not called by the raft itself
Empty(bool),
}

/// Storage saves all the information about the current Raft implementation, including Raft Log,
/// commit index, the leader to vote for, etc.
///
Expand Down

0 comments on commit eed70bc

Please sign in to comment.