Skip to content

Commit

Permalink
separate heartbeat and append_entries, each append_entry is responsib…
Browse files Browse the repository at this point in the history
…le for only 1 log, will retry if failed
  • Loading branch information
markcty committed Nov 11, 2022
1 parent eeb46de commit 85b8351
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 83 deletions.
16 changes: 16 additions & 0 deletions curp/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,22 @@ impl AppendEntriesRequest {
})
}

/// Create a new `append_entries` heartbeat request
pub(crate) fn new_heartbeat(
term: TermNum,
prev_log_index: usize,
prev_log_term: TermNum,
leader_commit: usize,
) -> Self {
Self {
term,
prev_log_index: prev_log_index.numeric_cast(),
prev_log_term: prev_log_term.numeric_cast(),
entries: vec![],
leader_commit: leader_commit.numeric_cast(),
}
}

/// Get log entries
pub(crate) fn entries<C: Command>(&self) -> bincode::Result<Vec<LogEntry<C>>> {
self.entries
Expand Down
159 changes: 102 additions & 57 deletions curp/src/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,13 @@ impl<C: Command + 'static> SyncManager<C> {
spec: Arc<Mutex<RawMutex, VecDeque<C>>>,
) {
// notify when a broadcast of append_entries is needed immediately(a new log is received or committed)
let (ae_trigger, ae_trigger_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
let (ae_trigger, ae_trigger_rx) = tokio::sync::mpsc::unbounded_channel::<usize>();
// TODO: gracefully stop the background tasks
if self.state.read().is_leader() {
let _background_ae_handle = tokio::spawn(Self::bg_append_entries(
self.connects(),
Arc::clone(&self.state),
commit_trigger,
ae_trigger.clone(),
ae_trigger_rx,
));
}
Expand All @@ -167,6 +166,8 @@ impl<C: Command + 'static> SyncManager<C> {
commit_trigger_rx,
spec,
));
let _background_heartbeat_handle =
tokio::spawn(Self::bg_heartbeat(self.connects(), Arc::clone(&self.state)));

loop {
let (cmds, term) = match self.fetch_sync_msgs().await {
Expand All @@ -176,13 +177,14 @@ impl<C: Command + 'static> SyncManager<C> {

self.state.map_write(|mut state| {
state.log.push(LogEntry::new(term, &cmds));
if let Err(e) = ae_trigger.send(()) {
if let Err(e) = ae_trigger.send(state.last_log_index()) {
error!("ae_trigger failed: {}", e);
}

debug!(
"received new log, index {:?}",
state.log.len().checked_sub(1)
"received new log, index {:?}, contains {} cmds",
state.log.len().checked_sub(1),
cmds.len()
);
});
}
Expand All @@ -198,27 +200,19 @@ impl<C: Command + 'static> SyncManager<C> {
connects: Vec<Arc<Connect>>,
state: Arc<RwLock<State<C>>>,
commit_trigger: UnboundedSender<()>,
ae_trigger_tx: UnboundedSender<()>,
mut ae_trigger_rx: UnboundedReceiver<()>,
mut ae_trigger_rx: UnboundedReceiver<usize>,
) {
#[allow(clippy::integer_arithmetic)] // tokio internal triggered
loop {
// wait for either timeout
tokio::select! {
_ = tokio::time::sleep(Self::APPEND_ENTRIES_INTERVAL) => {},
_ = ae_trigger_rx.recv() => {},
}

while let Some(i) = ae_trigger_rx.recv().await {
if state.read().is_leader() {
// send append_entries to each server in parallel
let rpcs: FuturesUnordered<_> = connects
.iter()
.map(|connect| {
Self::send_append_entries(
Self::send_log_until_succeed(
i,
Arc::clone(connect),
Arc::clone(&state),
commit_trigger.clone(),
ae_trigger_tx.clone(),
)
})
.collect();
Expand All @@ -229,28 +223,26 @@ impl<C: Command + 'static> SyncManager<C> {
}
}

/// Send `append_entries` to a server
/// Send `append_entries` containing a single log to a server
#[allow(clippy::integer_arithmetic, clippy::indexing_slicing)] // log.len() >= 1 because we have a fake log[0], indexing of `next_index` or `match_index` won't panic because we created an entry when initializing the server state
async fn send_append_entries(
async fn send_log_until_succeed(
i: usize,
connect: Arc<Connect>,
state: Arc<RwLock<State<C>>>,
commit_trigger: UnboundedSender<()>,
ae_trigger: UnboundedSender<()>,
) {
// prepare append_entries request args
#[allow(clippy::shadow_unrelated)] // clippy false positive
let (term, prev_log_index, prev_log_term, entries, leader_commit) =
state.map_read(|state| {
let next_index = state.next_index[&connect.addr];
(
state.term,
next_index - 1,
state.log[next_index - 1].term(),
state.log[next_index..].to_vec(),
i - 1,
state.log[i - 1].term(),
vec![state.log[i].clone()],
state.commit_index,
)
});
let last_sent_index = prev_log_index + entries.len(); // the index of the last log entry sent in the current request
let req = match AppendEntriesRequest::new(
term,
prev_log_index,
Expand All @@ -265,8 +257,89 @@ impl<C: Command + 'static> SyncManager<C> {
Ok(req) => req,
};

// send log[i] until succeed
loop {
debug!("append_entries sent to {}", connect.addr);
let resp = connect
.append_entries(req.clone(), Self::APPEND_ENTRIES_TIMEOUT)
.await;

#[allow(clippy::unwrap_used)]
// indexing of `next_index` or `match_index` won't panic because we created an entry when initializing the server state
match resp {
Err(e) => warn!("append_entries error: {}", e),
Ok(resp) => {
let resp = resp.into_inner();
// TODO: calibrate term: if response term is newer, stop retrying and convert to follower
if resp.success {
let mut state = state.write();
// update match_index and next_index
let match_index = state.match_index.get_mut(&connect.addr).unwrap();
if *match_index < i {
*match_index = i;
}
*state.next_index.get_mut(&connect.addr).unwrap() = *match_index + 1;

let min_replicated = (state.others.len() + 1) / 2;
// If the majority of servers has replicated the log, commit
if state.commit_index < i
&& state.log[i].term() == state.term
&& state
.others
.iter()
.filter(|addr| state.match_index[*addr] >= i)
.count()
>= min_replicated
{
state.commit_index = i;
debug!("commit_index updated to {i}");
if let Err(e) = commit_trigger.send(()) {
error!("commit_trigger failed: {}", e);
}
}
break;
}
}
}
}
}

/// Background `append_entries`, only works for the leader
async fn bg_heartbeat(connects: Vec<Arc<Connect>>, state: Arc<RwLock<State<C>>>) {
loop {
tokio::time::sleep(Self::APPEND_ENTRIES_INTERVAL).await;
if state.read().is_leader() {
// send append_entries to each server in parallel
let rpcs: FuturesUnordered<_> = connects
.iter()
.map(|connect| Self::send_heartbeat(Arc::clone(connect), Arc::clone(&state)))
.collect();
let _drop = tokio::spawn(async move {
let _drop: Vec<_> = rpcs.collect().await;
});
}
}
}

/// Send `append_entries` to a server
#[allow(clippy::integer_arithmetic, clippy::indexing_slicing)] // log.len() >= 1 because we have a fake log[0], indexing of `next_index` or `match_index` won't panic because we created an entry when initializing the server state
async fn send_heartbeat(connect: Arc<Connect>, state: Arc<RwLock<State<C>>>) {
// prepare append_entries request args
#[allow(clippy::shadow_unrelated)] // clippy false positive
let (term, prev_log_index, prev_log_term, leader_commit) = state.map_read(|state| {
let next_index = state.next_index[&connect.addr];
(
state.term,
next_index - 1,
state.log[next_index - 1].term(),
state.commit_index,
)
});
let req =
AppendEntriesRequest::new_heartbeat(term, prev_log_index, prev_log_term, leader_commit);

// send append_entries request and receive response
debug!("append_entries sent to {}", connect.addr);
debug!("heartbeat sent to {}", connect.addr);
let resp = connect
.append_entries(req, Self::APPEND_ENTRIES_TIMEOUT)
.await;
Expand All @@ -277,37 +350,9 @@ impl<C: Command + 'static> SyncManager<C> {
Err(e) => warn!("append_entries error: {}", e),
Ok(resp) => {
let resp = resp.into_inner();
let mut state = state.write();
if resp.success && term == resp.term {
// update match_index and next_index
let match_index = state.match_index.get_mut(&connect.addr).unwrap();
if *match_index < last_sent_index {
*match_index = last_sent_index;
}
*state.next_index.get_mut(&connect.addr).unwrap() = *match_index + 1;

let min_replicated = (state.others.len() + 1) / 2;
// If the majority of servers has replicated the log, commit
if state.commit_index < last_sent_index
&& state.log[last_sent_index].term() == state.term
&& state
.others
.iter()
.filter(|addr| state.match_index[*addr] >= last_sent_index)
.count()
>= min_replicated
{
state.commit_index = last_sent_index;
debug!("commit_index updated to {last_sent_index}");
if let Err(e) = commit_trigger.send(()) {
error!("commit_trigger failed: {}", e);
}
if let Err(e) = ae_trigger.send(()) {
error!("ae_trigger failed: {}", e);
}
}
} else {
// TODO: update term
// TODO: calibrate term
if !resp.success {
let mut state = state.write();
*state.next_index.get_mut(&connect.addr).unwrap() -= 1;
}
}
Expand Down
4 changes: 2 additions & 2 deletions curp/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ pub async fn create_servers_client() -> (
"127.0.0.1:8767".to_owned(),
];

let (exe_tx, exe_rx) = mpsc::channel(10);
let (after_sync_tx, after_sync_rx) = mpsc::channel(10);
let (exe_tx, exe_rx) = mpsc::channel(100);
let (after_sync_tx, after_sync_rx) = mpsc::channel(100);

let exe_tx1 = exe_tx.clone();
let after_sync_tx1 = after_sync_tx.clone();
Expand Down
28 changes: 28 additions & 0 deletions curp/tests/concurrent_cmd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use crate::common::{create_servers_client, TestCommand, TestCommandType};
use curp::cmd::ProposeId;
use std::time::Duration;
use tracing::debug;

mod common;

// #[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
async fn concurrent_cmd() {
tracing_subscriber::fmt::init();
let (_exe_rx, _after_sync_rx, client) = create_servers_client().await;
debug!("started");

for i in 1..=20 {
let _res = client
.propose(TestCommand::new(
ProposeId::new(format!("id{i}")),
TestCommandType::Get,
vec![format!("id{i}")],
None,
))
.await;
}

// watch the log while doing sync
tokio::time::sleep(Duration::from_secs(1)).await;
}
24 changes: 0 additions & 24 deletions curp/tests/raft.rs

This file was deleted.

0 comments on commit 85b8351

Please sign in to comment.