Skip to content

Commit

Permalink
fix clippy errors
Browse files Browse the repository at this point in the history
  • Loading branch information
markcty committed Nov 7, 2022
1 parent 29fab99 commit 15f9687
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 29 deletions.
2 changes: 1 addition & 1 deletion curp/src/channel/key_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<K, M> MpscKeysMessage<K, M> {
impl<K: Eq + Hash + Clone + ConflictCheck, M> KeyBasedChannel<MpscKeysMessage<K, M>> {
/// Insert successors and predecessors info
fn insert_graph(&mut self, new_km: MpscKeysMessage<K, M>) {
let predecessor_cnt = self
let predecessor_cnt: u64 = self
.successor
.iter_mut()
.filter_map(|(k, v)| {
Expand Down
4 changes: 4 additions & 0 deletions curp/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ impl CommitResponse {
}

impl AppendEntriesRequest {
/// Create a new heartbeat request
pub(crate) fn new_heart_beat(
term: TermNum,
leader_id: u64,
Expand All @@ -323,6 +324,7 @@ impl AppendEntriesRequest {
}
}

/// Get log entries
pub(crate) fn entries<C: Command>(&self) -> bincode::Result<Vec<LogEntry<C>>> {
self.entries
.iter()
Expand All @@ -332,13 +334,15 @@ impl AppendEntriesRequest {
}

impl AppendEntriesResponse {
/// Create a new rejected response
pub(crate) fn new_reject(term: TermNum) -> Self {
Self {
term,
success: false,
}
}

/// Create a new accepted response
pub(crate) fn new_accept(term: TermNum) -> Self {
Self {
term,
Expand Down
45 changes: 24 additions & 21 deletions curp/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ where
&self,
request: tonic::Request<AppendEntriesRequest>,
) -> Result<tonic::Response<AppendEntriesResponse>, tonic::Status> {
self.inner.append_entries(request).await
self.inner.append_entries(request)
}
}

Expand Down Expand Up @@ -226,6 +226,7 @@ pub struct Protocol<C: Command + 'static, CE: CommandExecutor<C> + 'static> {
cmd_board: Arc<Mutex<HashMap<ProposeId, CmdBoardValue>>>,
}

/// State of the server
pub(crate) struct State<C: Command + 'static> {
/// Role of the server
pub(crate) role: ServerRole,
Expand All @@ -246,7 +247,7 @@ pub(crate) struct State<C: Command + 'static> {

impl<C: Command + 'static> State<C> {
/// Init server state
pub(crate) fn new(role: ServerRole, term: TermNum, others: &Vec<String>) -> Self {
pub(crate) fn new(role: ServerRole, term: TermNum, others: &[String]) -> Self {
let mut next_index = HashMap::new();
let mut match_index = HashMap::new();
for other in others.iter() {
Expand All @@ -256,7 +257,7 @@ impl<C: Command + 'static> State<C> {
Self {
role,
term,
log: vec![LogEntry::new(0, &vec![], EntryStatus::Synced)], // a fake log[0] will simplify the boundary check significantly
log: vec![LogEntry::new(0, &[], EntryStatus::Synced)], // a fake log[0] will simplify the boundary check significantly
commit_index: 0,
next_index, // TODO: next_index should be initialized upon becoming a leader
match_index,
Expand All @@ -269,12 +270,13 @@ impl<C: Command + 'static> State<C> {
}

/// Last log index
#[allow(clippy::integer_arithmetic)] // log.len() >= 1 because we have a fake log[0]
pub(crate) fn last_log_index(&self) -> usize {
self.log.len() - 1 // this should not overflow since we have a fake log[0] that should never be deleted
self.log.len() - 1
}

/// Last log term
#[allow(dead_code)]
#[allow(dead_code, clippy::integer_arithmetic, clippy::indexing_slicing)] // log.len() >= 1 because we have a fake log[0]
pub(crate) fn last_log_term(&self) -> TermNum {
self.log[self.log.len() - 1].term
}
Expand Down Expand Up @@ -763,23 +765,21 @@ impl<C: 'static + Command, CE: 'static + CommandExecutor<C>> Protocol<C, CE> {
&& matches!(log[index.wrapping_sub(1)].status(), &EntryStatus::UnSynced))
{
Ok(CommitResponse::new_prev_not_ready(local_len.numeric_cast()))
} else if cr.term() < term {
Ok(CommitResponse::new_wrong_term(term))
} else if index < local_len && log[index].term() > term {
Ok(CommitResponse::new_wrong_term(log[index].term()))
} else {
if cr.term() < term {
Ok(CommitResponse::new_wrong_term(term))
} else if index < local_len && log[index].term() > term {
Ok(CommitResponse::new_wrong_term(log[index].term()))
if index < local_len {
log[index] = LogEntry::new(cr.term(), &cmds, EntryStatus::Synced);
} else {
if index < local_len {
log[index] = LogEntry::new(cr.term(), &cmds, EntryStatus::Synced);
} else {
log.push(LogEntry::new(cr.term(), &cmds, EntryStatus::Synced));
}
log.push(LogEntry::new(cr.term(), &cmds, EntryStatus::Synced));
}

for id in cmds.iter().map(|c| c.id()) {
let _ignore = Self::spec_remove_cmd(&Arc::clone(&self.spec), id);
}
Ok(CommitResponse::new_committed())
for id in cmds.iter().map(|c| c.id()) {
let _ignore = Self::spec_remove_cmd(&Arc::clone(&self.spec), id);
}
Ok(CommitResponse::new_committed())
}
});

Expand Down Expand Up @@ -809,7 +809,8 @@ impl<C: 'static + Command, CE: 'static + CommandExecutor<C>> Protocol<C, CE> {
})
}

async fn append_entries(
/// Handle `AppendEntries` requests
fn append_entries(
&self,
request: tonic::Request<AppendEntriesRequest>,
) -> Result<tonic::Response<AppendEntriesResponse>, tonic::Status> {
Expand All @@ -827,8 +828,10 @@ impl<C: 'static + Command, CE: 'static + CommandExecutor<C>> Protocol<C, CE> {
}

// check if previous log index match leader's one
if state.last_log_index() < req.prev_log_index.numeric_cast()
|| state.log[req.prev_log_index as usize].term != req.prev_log_term
if state
.log
.get(req.prev_log_index.numeric_cast::<usize>())
.map_or(false, |entry| entry.term == req.prev_log_term)
{
return Ok(tonic::Response::new(AppendEntriesResponse::new_reject(
state.term,
Expand Down
19 changes: 13 additions & 6 deletions curp/src/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,15 @@ impl<C: Command + 'static> SyncManager<C> {
}
}

/// Send heartbeat to a server
#[allow(clippy::integer_arithmetic, clippy::indexing_slicing)] // log.len() >= 1 because we have a fake log[0], indexing of `next_index` or `match_index` won't panic because we created an entry when initializing the server state
async fn heartbeat(connect: Arc<Connect>, state: Arc<RwLock<State<C>>>) {
let (term, commit_index, prev_log_index, prev_log_term) = state.map_read(|state| {
let (term, commit_index, prev_log_index, prev_log_term) = state.map_read(|st| {
(
state.term,
state.commit_index,
state.next_index[&connect.addr] - 1,
state.log[state.next_index[&connect.addr] - 1].term,
st.term,
st.commit_index,
st.next_index[&connect.addr] - 1,
st.log[st.next_index[&connect.addr] - 1].term,
)
});
debug!("heartbeat sent to {}", connect.addr);
Expand All @@ -344,11 +346,16 @@ impl<C: Command + 'static> SyncManager<C> {
let mut state = state.write();
if res.success {
if let Some(index) = state.match_index.get_mut(&connect.addr) {
*index = max(*index, prev_log_index)
*index = max(*index, prev_log_index);
} else {
unreachable!("no match_index for server {}", connect.addr);
}
} else {
#[allow(clippy::collapsible_else_if)] // This way the code is clearer
if let Some(index) = state.next_index.get_mut(&connect.addr) {
*index -= 1;
} else {
unreachable!("no next_index for server {}", connect.addr);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion curp/tests/synced_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn synced_propose() {
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
(TestCommandResult::GetResult("".to_owned()), 0)
(TestCommandResult::GetResult("".to_owned()), 1) // log[0] is a fake one
);

for _ in 0..3 {
Expand Down

0 comments on commit 15f9687

Please sign in to comment.