Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: skip empty callback #4682

Merged
merged 10 commits into from
May 28, 2019
33 changes: 22 additions & 11 deletions src/raftstore/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ pub struct PendingCmdQueue {
}

impl PendingCmdQueue {
fn pop_normal(&mut self, term: u64) -> Option<PendingCmd> {
fn pop_normal(&mut self, index: u64, term: u64) -> Option<PendingCmd> {
self.normals.pop_front().and_then(|cmd| {
if self.normals.capacity() > SHRINK_PENDING_CMD_QUEUE_CAP
&& self.normals.len() < SHRINK_PENDING_CMD_QUEUE_CAP
{
self.normals.shrink_to_fit();
}
if cmd.term > term {
if (cmd.term, cmd.index) > (term, index) {
self.normals.push_front(cmd);
return None;
}
Expand Down Expand Up @@ -784,14 +784,17 @@ impl ApplyDelegate {
return self.process_raft_cmd(apply_ctx, index, term, cmd);
}

// when a peer become leader, it will send an empty entry.
let mut state = self.apply_state.clone();
state.set_applied_index(index);
self.apply_state = state;
self.applied_index_term = term;
assert!(term > 0);
while let Some(mut cmd) = self.pending_cmds.pop_normal(term - 1) {
// apprently, all the callbacks whose term is less than entry's term are stale.

// 1. When a peer become leader, it will send an empty entry.
// 2. When a leader tries to read index during transferring leader,
// it will also propose an empty entry. But that entry will not contain
// any associated callback. So no need to clear callback.
while let Some(mut cmd) = self.pending_cmds.pop_normal(std::u64::MAX, term - 1) {
apply_ctx
.cbs
.last_mut()
Expand Down Expand Up @@ -842,13 +845,21 @@ impl ApplyDelegate {
}
return None;
}
while let Some(mut head) = self.pending_cmds.pop_normal(term) {
if head.index == index && head.term == term {
return Some(head.cb.take().unwrap());
while let Some(mut head) = self.pending_cmds.pop_normal(index, term) {
if head.term == term {
if head.index == index {
return Some(head.cb.take().unwrap());
} else {
panic!(
"{} unexpected callback at term {}, found index {}, expected {}",
self.tag, term, head.index, index
);
}
} else {
// Because of the lack of original RaftCmdRequest, we skip calling
// coprocessor here.
notify_stale_command(region_id, peer_id, self.term, head);
}
// Because of the lack of original RaftCmdRequest, we skip calling
// coprocessor here.
notify_stale_command(region_id, peer_id, self.term, head);
}
None
}
Expand Down
7 changes: 7 additions & 0 deletions src/raftstore/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ impl Callback {
other => panic!("expect Callback::Read(..), got {:?}", other),
}
}

pub fn is_none(&self) -> bool {
match self {
Callback::None => true,
_ => false,
}
}
}

impl fmt::Debug for Callback {
Expand Down
7 changes: 5 additions & 2 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1559,8 +1559,11 @@ impl Peer {
fn post_propose(&mut self, mut meta: ProposalMeta, is_conf_change: bool, cb: Callback) {
// Try to renew leader lease on every consistent read/write request.
meta.renew_lease_time = Some(monotonic_raw_now());
let p = Proposal::new(is_conf_change, meta.index, meta.term, cb);
self.apply_proposals.push(p);

if !cb.is_none() {
let p = Proposal::new(is_conf_change, meta.index, meta.term, cb);
self.apply_proposals.push(p);
}

self.proposals.push(meta);
}
Expand Down
20 changes: 20 additions & 0 deletions tests/integrations/raftstore/test_lease_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,26 @@ fn test_node_callback_when_destroyed() {
);
}

/// Test if the callback proposed by read index is cleared correctly.
#[test]
fn test_lease_read_callback_destroy() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it fail without your change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Only server cluster can fake sending message successfully in raftstore layer.
let mut cluster = new_server_cluster(0, 3);
// Increase the Raft tick interval to make this test case running reliably.
let election_timeout = configure_for_lease_read(&mut cluster, Some(50), None);
cluster.run();
cluster.must_transfer_leader(1, new_peer(1, 1));
cluster.must_put(b"k1", b"v1");
must_get_equal(&cluster.get_engine(3), b"k1", b"v1");
// Isolate the target peer to make transfer leader fail.
cluster.add_send_filter(IsolationFilterFactory::new(3));
cluster.transfer_leader(1, new_peer(3, 3));
thread::sleep(election_timeout * 2);
// Trigger ReadIndex on the leader.
assert_eq!(cluster.must_get(b"k1"), Some(b"v1".to_vec()));
cluster.must_put(b"k2", b"v2");
}

#[test]
fn test_read_index_when_transfer_leader() {
let mut cluster = new_node_cluster(0, 3);
Expand Down