Skip to content

Commit

Permalink
async log entries fetch for forwarding
Browse files Browse the repository at this point in the history
Signed-off-by: LintianShi <lintian.shi@pingcap.com>
  • Loading branch information
LintianShi committed Sep 29, 2022
1 parent d0834f5 commit 2fa5d20
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 73 deletions.
147 changes: 74 additions & 73 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,63 @@ impl<T: Storage> RaftCore<T> {
true
}

fn send_forward(
&mut self,
from: u64,
commit: u64,
commit_term: u64,
forward: &Forward,
msgs: &mut Vec<Message>,
) {
let mut m = Message::default();
m.to = forward.to;
m.from = from;
m.commit = commit;
m.commit_term = commit_term;
m.set_msg_type(MessageType::MsgAppend);
// Fetch log entries from the forward.index to the last index of log.
if self
.raft_log
.match_term(forward.get_index(), forward.get_log_term())
{
let ents = self.raft_log.entries(
forward.get_index() + 1,
self.max_msg_size,
GetEntriesContext(GetEntriesFor::SendForward {
from,
commit,
commit_term,
term: self.term,
forward: forward.clone(),
}),
);

match ents {
Ok(ents) => {
m.index = forward.get_index();
m.log_term = forward.get_log_term();
m.set_entries(ents.into());
self.send(m, msgs);
}
Err(Error::Store(StorageError::LogTemporarilyUnavailable)) => {}
_ => {
// Forward MsgAppend with empty entries in order to update commit
// or trigger decrementing next_idx.
m.index = forward.get_index();
m.log_term = forward.get_log_term();
self.send(m, msgs);
warn!(
self.logger,
"The agent fails to fetch entries, index {} log term {} in forward message to peer {}.",
forward.get_index(),
forward.get_log_term(),
forward.get_to()
);
}
}
}
}

// send_heartbeat sends an empty MsgAppend
fn send_heartbeat(
&mut self,
Expand Down Expand Up @@ -904,6 +961,12 @@ impl<T: Storage> Raft<T> {
.for_each(|(id, pr)| core.send_append(*id, pr, msgs));
}

/// Forwards an append RPC from the leader to the given peer.
pub fn send_forward(&mut self, from: u64, commit: u64, commit_term: u64, forward: &Forward) {
self.r
.send_forward(from, commit, commit_term, forward, &mut self.msgs);
}

/// Broadcasts heartbeats to all the followers if it's leader.
pub fn ping(&mut self) {
if self.state == StateRole::Leader {
Expand Down Expand Up @@ -2541,59 +2604,20 @@ impl<T: Storage> Raft<T> {
// If the agent fails to append entries from the leader,
// the agent cannot forward MsgAppend.
for forward in m.get_forwards() {
// Fetch log entries from the forward.index to the last index of log.
if self
.raft_log
.match_term(forward.get_index(), forward.get_log_term())
{
let ents = self.raft_log.entries(
forward.get_index() + 1,
self.max_msg_size,
GetEntriesContext(GetEntriesFor::SendAppend {
to: forward.get_to(),
term: m.term,
aggressively: false,
}),
);

match ents {
Ok(ents) => {
let mut m_append = Message::default();
m_append.to = forward.get_to();
m_append.from = m.get_from();
m_append.set_msg_type(MessageType::MsgAppend);
m_append.index = forward.get_index();
m_append.log_term = forward.get_log_term();
m_append.set_entries(ents.into());
m_append.commit = m.get_commit();
m_append.commit_term = m.get_commit_term();
self.r.send(m_append, &mut self.msgs);
}
Err(_) => {
self.dummy_forward(m, forward);
warn!(
self.logger,
"The agent fails to fetch entries, index {} log term {} in forward message to peer {}.",
forward.get_index(),
forward.get_log_term(),
forward.get_to()
);
}
}
} else {
self.dummy_forward(m, forward);
warn!(
self.logger,
"The agent's log does not match with index {} log term {} in forward message to peer {}.",
forward.get_index(),
forward.get_log_term(),
forward.get_to()
);
}
self.r
.send_forward(m.from, m.commit, m.commit_term, forward, &mut self.msgs);
}
} else {
for forward in m.get_forwards() {
self.dummy_forward(m, forward);
let mut m_append = Message::default();
m_append.to = forward.to;
m_append.from = m.from;
m_append.commit = m.commit;
m_append.commit_term = m.commit_term;
m_append.set_msg_type(MessageType::MsgAppend);
m_append.index = forward.get_index();
m_append.log_term = forward.get_log_term();
self.r.send(m_append, &mut self.msgs);
}
info!(
self.logger,
Expand Down Expand Up @@ -2937,29 +2961,6 @@ impl<T: Storage> Raft<T> {
self.lead_transferee = None;
}

// Forward MsgAppend with empty entries in order to update commit
// or trigger decrementing next_idx.
fn dummy_forward(&mut self, m: &Message, forward: &Forward) {
let mut m_append = Message::default();
m_append.to = forward.get_to();
m_append.from = m.get_from();
m_append.set_msg_type(MessageType::MsgAppend);
m_append.index = forward.get_index();
m_append.log_term = forward.get_log_term();
m_append.commit = m.get_commit();
m_append.commit_term = m.get_commit_term();

info!(
self.logger,
"The agent forwards reserved empty log entry [logterm: {msg_log_term}, index: {msg_index}] \
to peer {id}",
msg_log_term = forward.log_term,
msg_index = forward.index,
id = forward.to;
);
self.r.send(m_append, &mut self.msgs);
}

fn send_request_snapshot(&mut self) {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgAppendResponse);
Expand Down
18 changes: 18 additions & 0 deletions src/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,24 @@ impl<T: Storage> RawNode<T> {
self.raft.send_append(to)
}
}
GetEntriesFor::SendForward {
from,
commit,
commit_term,
term,
forward,
} => {
if self.raft.term != term || self.raft.state == StateRole::Leader {
// term or leadership has changed
// this peer is not the agent
return;
}
if self.raft.prs().get(forward.to).is_none() {
// the peer has been removed, do nothing
return;
}
self.raft.send_forward(from, commit, commit_term, &forward);
}
GetEntriesFor::Empty(can_async) if can_async => {}
_ => panic!("shouldn't call callback on non-async context"),
}
Expand Down
14 changes: 14 additions & 0 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl GetEntriesContext {
pub fn can_async(&self) -> bool {
match self.0 {
GetEntriesFor::SendAppend { .. } => true,
GetEntriesFor::SendForward { .. } => true,
GetEntriesFor::Empty(can_async) => can_async,
_ => false,
}
Expand All @@ -87,6 +88,19 @@ pub(crate) enum GetEntriesFor {
/// whether to exhaust all the entries
aggressively: bool,
},
// for forwarding entries to followers
SendForward {
/// the peer id from which the entries are forwarded
from: u64,
/// the commit index in MsgGroupbroadcast
commit: u64,
/// the commit term in MsgGroupbroadcast
commit_term: u64,
/// the term when the request is issued
term: u64,
/// the forward information in MsgGroupbroadcast
forward: Forward,
},
// for getting committed entries in a ready
GenReady,
// for getting entries to check pending conf when transferring leader
Expand Down

0 comments on commit 2fa5d20

Please sign in to comment.