Skip to content

Commit

Permalink
remove leader_id in RPC and other minor improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
markcty committed Nov 10, 2022
1 parent 3064e70 commit a3c58bc
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 63 deletions.
1 change: 0 additions & 1 deletion curp/proto/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ message WaitSyncedResponse {

message AppendEntriesRequest {
uint64 term = 1;
uint64 leader_id = 2;
uint64 prev_log_index = 3;
uint64 prev_log_term = 4;
repeated bytes entries = 5;
Expand Down
2 changes: 0 additions & 2 deletions curp/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,13 @@ impl AppendEntriesRequest {
/// Create a new `append_entries` request
pub(crate) fn new<C: Command + Serialize>(
term: TermNum,
leader_id: u64,
prev_log_index: usize,
prev_log_term: TermNum,
entries: Vec<LogEntry<C>>,
leader_commit: usize,
) -> bincode::Result<Self> {
Ok(Self {
term,
leader_id,
prev_log_index: prev_log_index.numeric_cast(),
prev_log_term: prev_log_term.numeric_cast(),
entries: entries
Expand Down
11 changes: 5 additions & 6 deletions curp/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,17 +725,16 @@ impl<C: 'static + Command, CE: 'static + CommandExecutor<C>> Protocol<C, CE> {
request: tonic::Request<AppendEntriesRequest>,
) -> Result<tonic::Response<AppendEntriesResponse>, tonic::Status> {
let req = request.into_inner();
debug!("append_entries received: term({}), leader({}), commit({}), prev_log_index({}), prev_log_term({}), {} entries",
req.term, req.leader_id, req.leader_commit, req.prev_log_index, req.prev_log_term, req.entries.len());
debug!("append_entries received: term({}), commit({}), prev_log_index({}), prev_log_term({}), {} entries",
req.term, req.leader_commit, req.prev_log_index, req.prev_log_term, req.entries.len());

let mut state = self.state.write();

// TODO: check leader's term

// remove inconsistencies
while state.last_log_index() > req.prev_log_index.numeric_cast() {
let _drop = state.log.pop();
}
#[allow(clippy::integer_arithmetic)] // TODO: overflow of log index should be prevented
state.log.truncate((req.prev_log_index + 1).numeric_cast());

// check if previous log index match leader's one
if state
Expand All @@ -760,7 +759,7 @@ impl<C: 'static + Command, CE: 'static + CommandExecutor<C>> Protocol<C, CE> {
if prev_commit_index != state.commit_index {
debug!("commit_index updated to {}", state.commit_index);
if let Err(e) = self.commit_trigger.send(()) {
error!("commit_trigger closed: {}", e);
error!("commit_trigger failed: {}", e);
}
}

Expand Down
108 changes: 54 additions & 54 deletions curp/src/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration};

use clippy_utilities::NumericCast;
use futures::{stream::FuturesUnordered, StreamExt};
use parking_lot::lock_api::Mutex;
use parking_lot::lock_api::{Mutex, RwLockUpgradableReadGuard};
use parking_lot::{RawMutex, RwLock};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::{debug, error, info, warn};
Expand Down Expand Up @@ -152,7 +152,7 @@ impl<C: Command + 'static> SyncManager<C> {
let (ae_trigger, ae_trigger_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
// TODO: gracefully stop the background tasks
if self.state.read().is_leader() {
let _background_ping_handle = tokio::spawn(Self::bg_append_entries(
let _background_ae_handle = tokio::spawn(Self::bg_append_entries(
self.connects(),
Arc::clone(&self.state),
commit_trigger,
Expand All @@ -177,7 +177,7 @@ impl<C: Command + 'static> SyncManager<C> {
self.state.map_write(|mut state| {
state.log.push(LogEntry::new(term, &cmds));
if let Err(e) = ae_trigger.send(()) {
error!("ae_trigger closed: {}", e);
error!("ae_trigger failed: {}", e);
}

debug!(
Expand All @@ -188,12 +188,12 @@ impl<C: Command + 'static> SyncManager<C> {
}
}

/// Heartbeat Interval
/// Interval between sending `append_entries`
const APPEND_ENTRIES_INTERVAL: Duration = Duration::from_millis(150);
/// Heartbeat Timeout
/// `append_entries` request timeout
const APPEND_ENTRIES_TIMEOUT: Duration = Duration::from_millis(50);

/// Background ping, only work for the leader
/// Background `append_entries`, only works for the leader
async fn bg_append_entries(
connects: Vec<Arc<Connect>>,
state: Arc<RwLock<State<C>>>,
Expand Down Expand Up @@ -229,7 +229,7 @@ impl<C: Command + 'static> SyncManager<C> {
}
}

/// Send heartbeat to a server
/// Send `append_entries` to a server
#[allow(clippy::integer_arithmetic, clippy::indexing_slicing)] // log.len() >= 1 because we have a fake log[0], indexing of `next_index` or `match_index` won't panic because we created an entry when initializing the server state
async fn send_append_entries(
connect: Arc<Connect>,
Expand All @@ -239,22 +239,20 @@ impl<C: Command + 'static> SyncManager<C> {
) {
// prepare append_entries request args
#[allow(clippy::shadow_unrelated)] // clippy false positive
let (term, leader_id, prev_log_index, prev_log_term, entries, leader_commit) = state
.map_read(|state| {
let (term, prev_log_index, prev_log_term, entries, leader_commit) =
state.map_read(|state| {
let next_index = state.next_index[&connect.addr];
(
state.term,
0, // TODO: add leader_id
next_index - 1,
state.log[next_index - 1].term(),
state.log[next_index..].to_vec(),
state.commit_index,
)
});
let last_sent_index = prev_log_index + entries.len();
let last_sent_index = prev_log_index + entries.len(); // the index of the last log entry sent in the current request
let req = match AppendEntriesRequest::new(
term,
leader_id,
prev_log_index,
prev_log_term,
entries,
Expand All @@ -276,7 +274,7 @@ impl<C: Command + 'static> SyncManager<C> {
#[allow(clippy::unwrap_used)]
// indexing of `next_index` or `match_index` won't panic because we created an entry when initializing the server state
match resp {
Err(e) => warn!("Heartbeat error: {}", e),
Err(e) => warn!("append_entries error: {}", e),
Ok(resp) => {
let resp = resp.into_inner();
let mut state = state.write();
Expand All @@ -297,13 +295,13 @@ impl<C: Command + 'static> SyncManager<C> {
.count()
>= majority_cnt
{
debug!("commit_index updated to {last_sent_index}");
state.commit_index = last_sent_index;
debug!("commit_index updated to {last_sent_index}");
if let Err(e) = commit_trigger.send(()) {
error!("ae_trigger closed: {}", e);
error!("commit_trigger failed: {}", e);
}
if let Err(e) = ae_trigger.send(()) {
error!("ae_trigger closed: {}", e);
error!("ae_trigger failed: {}", e);
}
}
} else {
Expand All @@ -323,47 +321,49 @@ impl<C: Command + 'static> SyncManager<C> {
spec: Arc<Mutex<RawMutex, VecDeque<C>>>,
) {
while commit_notify.recv().await.is_some() {
if state.read().need_commit() {
let mut state = state.write();
#[allow(clippy::integer_arithmetic, clippy::indexing_slicing)]
// TODO: will last_applied overflow?
for i in (state.last_applied + 1)..=state.commit_index {
debug!("commit log[{}]", i);
for cmd in state.log[i].cmds().iter() {
// TODO: execution of leaders and followers should be merged
// leader commits a log by sending it through compl_chan
if state.is_leader() {
if comp_chan
.send(
cmd.keys(),
SyncCompleteMessage::new(i.numeric_cast(), Arc::clone(cmd)),
)
.is_err()
{
error!("The comp_chan is closed on the remote side");
break;
}
} else {
// followers execute commands directly
let cmd_executor = Arc::clone(&cmd_executor);
let cmd = Arc::clone(cmd);
let spec = Arc::clone(&spec);
let _handle = tokio::spawn(async move {
// TODO: execute command in parallel
// TODO: handle execution error
let _execute_result = cmd.execute(cmd_executor.as_ref()).await;
let _after_sync_result = cmd
.after_sync(cmd_executor.as_ref(), i.numeric_cast())
.await;
if Protocol::<C, CE>::spec_remove_cmd(&spec, cmd.id()).is_none() {
unreachable!("{:?} should be in the spec pool", cmd.id());
}
});
let state = state.upgradable_read();
if !state.need_commit() {
continue;
}

let mut state = RwLockUpgradableReadGuard::upgrade(state);
#[allow(clippy::integer_arithmetic, clippy::indexing_slicing)]
// TODO: overflow of log index should be prevented
for i in (state.last_applied + 1)..=state.commit_index {
for cmd in state.log[i].cmds().iter() {
// TODO: execution of leaders and followers should be merged
// leader commits a log by sending it through compl_chan
if state.is_leader() {
if comp_chan
.send(
cmd.keys(),
SyncCompleteMessage::new(i.numeric_cast(), Arc::clone(cmd)),
)
.is_err()
{
error!("The comp_chan is closed on the remote side");
break;
}
} else {
// followers execute commands directly
let cmd_executor = Arc::clone(&cmd_executor);
let cmd = Arc::clone(cmd);
let spec = Arc::clone(&spec);
let _handle = tokio::spawn(async move {
// TODO: execute command in parallel
// TODO: handle execution error
let _execute_result = cmd.execute(cmd_executor.as_ref()).await;
let _after_sync_result = cmd
.after_sync(cmd_executor.as_ref(), i.numeric_cast())
.await;
if Protocol::<C, CE>::spec_remove_cmd(&spec, cmd.id()).is_none() {
unreachable!("{:?} should be in the spec pool", cmd.id());
}
});
}
debug!("last_applied updated to {}", i);
state.last_applied = i;
}
state.last_applied = i;
debug!("log[{i}] committed, last_applied updated to {}", i);
}
}
info!("background apply stopped");
Expand Down

0 comments on commit a3c58bc

Please sign in to comment.