Skip to content

Commit

Permalink
raftstore: skip empty callback (tikv#4682)
Browse files Browse the repository at this point in the history
* add test cases

Signed-off-by: Jay Lee <busyjaylee@gmail.com>

* skip empty callback

Signed-off-by: Jay Lee <busyjaylee@gmail.com>

* address comment

Signed-off-by: Jay Lee <busyjaylee@gmail.com>
  • Loading branch information
BusyJay authored and kennytm committed May 28, 2019
1 parent 2d45009 commit e437a87
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 13 deletions.
33 changes: 22 additions & 11 deletions src/raftstore/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ pub struct PendingCmdQueue {
}

impl PendingCmdQueue {
fn pop_normal(&mut self, term: u64) -> Option<PendingCmd> {
fn pop_normal(&mut self, index: u64, term: u64) -> Option<PendingCmd> {
self.normals.pop_front().and_then(|cmd| {
if self.normals.capacity() > SHRINK_PENDING_CMD_QUEUE_CAP
&& self.normals.len() < SHRINK_PENDING_CMD_QUEUE_CAP
{
self.normals.shrink_to_fit();
}
if cmd.term > term {
if (cmd.term, cmd.index) > (term, index) {
self.normals.push_front(cmd);
return None;
}
Expand Down Expand Up @@ -784,14 +784,17 @@ impl ApplyDelegate {
return self.process_raft_cmd(apply_ctx, index, term, cmd);
}

// when a peer become leader, it will send an empty entry.
let mut state = self.apply_state.clone();
state.set_applied_index(index);
self.apply_state = state;
self.applied_index_term = term;
assert!(term > 0);
while let Some(mut cmd) = self.pending_cmds.pop_normal(term - 1) {
// apprently, all the callbacks whose term is less than entry's term are stale.

// 1. When a peer become leader, it will send an empty entry.
// 2. When a leader tries to read index during transferring leader,
// it will also propose an empty entry. But that entry will not contain
// any associated callback. So no need to clear callback.
while let Some(mut cmd) = self.pending_cmds.pop_normal(std::u64::MAX, term - 1) {
apply_ctx
.cbs
.last_mut()
Expand Down Expand Up @@ -842,13 +845,21 @@ impl ApplyDelegate {
}
return None;
}
while let Some(mut head) = self.pending_cmds.pop_normal(term) {
if head.index == index && head.term == term {
return Some(head.cb.take().unwrap());
while let Some(mut head) = self.pending_cmds.pop_normal(index, term) {
if head.term == term {
if head.index == index {
return Some(head.cb.take().unwrap());
} else {
panic!(
"{} unexpected callback at term {}, found index {}, expected {}",
self.tag, term, head.index, index
);
}
} else {
// Because of the lack of original RaftCmdRequest, we skip calling
// coprocessor here.
notify_stale_command(region_id, peer_id, self.term, head);
}
// Because of the lack of original RaftCmdRequest, we skip calling
// coprocessor here.
notify_stale_command(region_id, peer_id, self.term, head);
}
None
}
Expand Down
7 changes: 7 additions & 0 deletions src/raftstore/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ impl Callback {
other => panic!("expect Callback::Read(..), got {:?}", other),
}
}

pub fn is_none(&self) -> bool {
match self {
Callback::None => true,
_ => false,
}
}
}

impl fmt::Debug for Callback {
Expand Down
7 changes: 5 additions & 2 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1559,8 +1559,11 @@ impl Peer {
fn post_propose(&mut self, mut meta: ProposalMeta, is_conf_change: bool, cb: Callback) {
// Try to renew leader lease on every consistent read/write request.
meta.renew_lease_time = Some(monotonic_raw_now());
let p = Proposal::new(is_conf_change, meta.index, meta.term, cb);
self.apply_proposals.push(p);

if !cb.is_none() {
let p = Proposal::new(is_conf_change, meta.index, meta.term, cb);
self.apply_proposals.push(p);
}

self.proposals.push(meta);
}
Expand Down
20 changes: 20 additions & 0 deletions tests/integrations/raftstore/test_lease_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,26 @@ fn test_node_callback_when_destroyed() {
);
}

/// Test if the callback proposed by read index is cleared correctly.
#[test]
fn test_lease_read_callback_destroy() {
// Only server cluster can fake sending message successfully in raftstore layer.
let mut cluster = new_server_cluster(0, 3);
// Increase the Raft tick interval to make this test case running reliably.
let election_timeout = configure_for_lease_read(&mut cluster, Some(50), None);
cluster.run();
cluster.must_transfer_leader(1, new_peer(1, 1));
cluster.must_put(b"k1", b"v1");
must_get_equal(&cluster.get_engine(3), b"k1", b"v1");
// Isolate the target peer to make transfer leader fail.
cluster.add_send_filter(IsolationFilterFactory::new(3));
cluster.transfer_leader(1, new_peer(3, 3));
thread::sleep(election_timeout * 2);
// Trigger ReadIndex on the leader.
assert_eq!(cluster.must_get(b"k1"), Some(b"v1".to_vec()));
cluster.must_put(b"k2", b"v2");
}

#[test]
fn test_read_index_when_transfer_leader() {
let mut cluster = new_node_cluster(0, 3);
Expand Down

0 comments on commit e437a87

Please sign in to comment.