Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: do not update pending_cross_snap when peer is applying snapshot #3873

Merged
merged 15 commits into from Dec 18, 2018
23 changes: 13 additions & 10 deletions components/test_raftstore/src/transport_simulate.rs
Expand Up @@ -26,7 +26,7 @@ use tikv::raftstore::store::{Msg as StoreMsg, SignificantMsg, Transport};
use tikv::raftstore::{Error, Result};
use tikv::server::transport::*;
use tikv::server::StoreAddrResolver;
use tikv::util::collections::{HashMap, HashSet};
use tikv::util::collections::HashSet;
use tikv::util::{transport, Either, HandyRwLock};

pub trait Channel<M>: Send + Clone {
Expand Down Expand Up @@ -458,7 +458,9 @@ impl Filter<RaftMessage> for SnapshotFilter {
pub struct CollectSnapshotFilter {
dropped: AtomicBool,
stale: AtomicBool,
pending_msg: Mutex<HashMap<u64, StoreMsg>>,
exists: Mutex<HashSet<u64>>,
// make it in order
pending_msg: Mutex<Vec<StoreMsg>>,
pending_count_sender: Mutex<Sender<usize>>,
}

Expand All @@ -467,7 +469,8 @@ impl CollectSnapshotFilter {
CollectSnapshotFilter {
dropped: AtomicBool::new(false),
stale: AtomicBool::new(false),
pending_msg: Mutex::new(HashMap::default()),
pending_msg: Mutex::new(Vec::new()),
exists: Mutex::new(HashSet::default()),
pending_count_sender: Mutex::new(sender),
}
}
Expand All @@ -481,27 +484,27 @@ impl Filter<StoreMsg> for CollectSnapshotFilter {
let mut to_send = vec![];
let mut pending_msg = self.pending_msg.lock().unwrap();
for m in msgs.drain(..) {
let (is_pending, from_peer_id) = match m {
let is_pending = match m {
StoreMsg::RaftMessage(ref msg) => {
if msg.get_message().get_msg_type() == MessageType::MsgSnapshot {
let from_peer_id = msg.get_from_peer().get_id();
if pending_msg.contains_key(&from_peer_id) {
if self.exists.lock().unwrap().contains(&from_peer_id) {
// Drop this snapshot message directly since it's from a seen peer
continue;
} else {
// Pile the snapshot from unseen peer
(true, from_peer_id)
true
}
} else {
(false, 0)
false
}
}
_ => (false, 0),
_ => false,
};
if is_pending {
self.dropped
.compare_and_swap(false, true, Ordering::Relaxed);
pending_msg.insert(from_peer_id, m);
pending_msg.push(m);
Connor1996 marked this conversation as resolved.
Show resolved Hide resolved
let sender = self.pending_count_sender.lock().unwrap();
sender.send(pending_msg.len()).unwrap();
} else {
Expand All @@ -512,7 +515,7 @@ impl Filter<StoreMsg> for CollectSnapshotFilter {
if pending_msg.len() > 1 {
self.dropped
.compare_and_swap(true, false, Ordering::Relaxed);
msgs.extend(pending_msg.drain().map(|(_, v)| v));
msgs.extend(pending_msg.drain(..));
self.stale.compare_and_swap(false, true, Ordering::Relaxed);
}
msgs.extend(to_send);
Expand Down
4 changes: 3 additions & 1 deletion src/pd/pd.rs
Expand Up @@ -183,7 +183,9 @@ impl Display for Task {
Task::ReadStats { ref read_stats } => {
write!(f, "get the read statistics {:?}", read_stats)
}
Task::DestroyPeer { ref region_id } => write!(f, "destroy peer {}", region_id),
Task::DestroyPeer { ref region_id } => {
write!(f, "destroy peer of region {}", region_id)
}
}
}
}
Expand Down
25 changes: 21 additions & 4 deletions src/raftstore/store/fsm/peer.rs
Expand Up @@ -464,6 +464,11 @@ impl<T: Transport, C: PdClient> Store<T, C> {
let merge_target = msg.get_merge_target();
let target_region_id = merge_target.get_id();

// When receiving message having merge target, it indicates that the source peer
Connor1996 marked this conversation as resolved.
Show resolved Hide resolved
// on this store is stale, the peers on other stores are already merged. The epoch
// in merge target is the state of target peer at the time when source peer is merged.
// So here we need to check the target peer on this store to decide whether the source
// to destory or wait to catch up logs.
if let Some(epoch) = self.pending_cross_snap.get(&target_region_id).or_else(|| {
self.region_peers
.get(&target_region_id)
Expand All @@ -475,11 +480,13 @@ impl<T: Transport, C: PdClient> Store<T, C> {
target_region_id,
epoch
);
// So the target peer has moved on, we should let it go.
// The target peer will move on, namely, it will apply a snapshot generated after merge,
// so destroy source peer.
if epoch.get_version() > merge_target.get_region_epoch().get_version() {
return Ok(true);
}
// Wait till it catching up logs.
// The target peer's version is unchanged, so source peer is able to merge into target
// peer once it has catched up the PreperMerge log. Wait till it has catched up logs.
Connor1996 marked this conversation as resolved.
Show resolved Hide resolved
return Ok(false);
}

Expand Down Expand Up @@ -603,8 +610,15 @@ impl<T: Transport, C: PdClient> Store<T, C> {
.map(|r| r.to_owned());
if let Some(exist_region) = r {
info!("region overlapped {:?}, {:?}", exist_region, snap_region);
self.pending_cross_snap
.insert(region_id, snap_region.get_region_epoch().to_owned());
let peer = &self.region_peers[&region_id];
// In some extreme case, it may happen that a new snapshot is received whereas a snapshot is still in applying
// if the snapshot under applying is generated before merge and the new snapshot is generated after merge,
// update `pending_cross_snap` here may cause source peer destroys itself improperly. So don't update
// `pending_cross_snap` here if peer is applying snapshot.
if !(peer.is_applying_snapshot() || peer.has_pending_snapshot()) {
Connor1996 marked this conversation as resolved.
Show resolved Hide resolved
self.pending_cross_snap
.insert(region_id, snap_region.get_region_epoch().to_owned());
}
self.raft_metrics.message_dropped.region_overlap += 1;
return Ok(Some(key));
}
Expand Down Expand Up @@ -642,6 +656,9 @@ impl<T: Transport, C: PdClient> Store<T, C> {
}

pub fn on_raft_ready(&mut self) {
// Only enable the fail point when the store id is equal to 3, which is
// the id of slow store in tests.
fail_point!("on_raft_ready", self.tag == "[store 3]", |_| {});
Connor1996 marked this conversation as resolved.
Show resolved Hide resolved
let t = SlowTimer::new();
let pending_count = self.pending_raft_groups.len();
let previous_ready_metrics = self.raft_metrics.ready.clone();
Expand Down
110 changes: 110 additions & 0 deletions tests/failpoints/cases/test_merge.rs
Expand Up @@ -12,18 +12,21 @@
// limitations under the License.

use std::sync::Arc;
use std::thread;
use std::time::*;

use fail;
use futures::Future;

use kvproto::raft_serverpb::{PeerState, RegionLocalState};
use raft::eraftpb::MessageType;

use test_raftstore::*;
use tikv::pd::PdClient;
use tikv::raftstore::store::keys;
use tikv::raftstore::store::Peekable;
use tikv::storage::CF_RAFT;
use tikv::util::config::*;

/// Test if merge is rollback as expected.
#[test]
Expand Down Expand Up @@ -257,3 +260,110 @@ fn test_node_merge_recover_snapshot() {
cluster.must_transfer_leader(1, new_peer(3, 3));
cluster.must_put(b"k40", b"v5");
}

#[test]
fn test_node_merge_multiple_snapshots() {
Connor1996 marked this conversation as resolved.
Show resolved Hide resolved
use std::sync::mpsc;
Connor1996 marked this conversation as resolved.
Show resolved Hide resolved
use tikv::util::HandyRwLock;

let mut cluster = new_node_cluster(0, 3);
configure_for_merge(&mut cluster);
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();
// make it gc quickly to trigger snapshot easily
cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(20);
cluster.cfg.raft_store.raft_base_tick_interval = ReadableDuration::millis(10);
cluster.cfg.raft_store.raft_log_gc_count_limit = 10;
cluster.cfg.raft_store.merge_max_log_gap = 9;
cluster.run();

cluster.must_put(b"k1", b"v1");
cluster.must_put(b"k3", b"v3");

let region = pd_client.get_region(b"k1").unwrap();
cluster.must_split(&region, b"k2");
let left = pd_client.get_region(b"k1").unwrap();
let right = pd_client.get_region(b"k3").unwrap();

let target_leader = right
.get_peers()
.iter()
.find(|p| p.get_store_id() == 1)
.unwrap()
.clone();
cluster.must_transfer_leader(right.get_id(), target_leader);
let target_leader = left
.get_peers()
.iter()
.find(|p| p.get_store_id() == 2)
.unwrap()
.clone();
cluster.must_transfer_leader(left.get_id(), target_leader);
must_get_equal(&cluster.get_engine(1), b"k3", b"v3");

// So cluster becomes:
// left region: 1 2(leader) I 3
// right region: 1(leader) 2 I 3
// I means isolation.(here just means 3 can not receive append log)
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(right.get_id(), 3)
.direction(Direction::Recv)
.msg_type(MessageType::MsgAppend),
));
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(left.get_id(), 3)
.direction(Direction::Recv)
.msg_type(MessageType::MsgAppend),
));

// Add a collect snapshot filter, it will delay snapshots until have collected multiple snapshots from different peers
let (tx, _rx) = mpsc::channel();
cluster
.sim
.wl()
.add_recv_filter(3, box CollectSnapshotFilter::new(tx));
// Write some data to trigger a snapshot of right region.
for i in 200..210 {
let key = format!("k{}", i);
let value = format!("v{}", i);
cluster.must_put(key.as_bytes(), value.as_bytes());
}
// Wait for snapshot to generate and send
thread::sleep(Duration::from_millis(100));

// Merge left and right region, due to isolation, the regions on store 3 are not merged yet.
pd_client.must_merge(left.get_id(), right.get_id());
thread::sleep(Duration::from_millis(200));

// Let peer of right region on store 3 to make append response to trigger a new snapshot
// one is snapshot before merge, the other is snapshot after merge.
// Then the old and new snapshot messages are received and handled in one tick,
// so `pending_cross_snap` may updated improperly and make merge source peer destory itself.
// Here blocks raftstore for a while to make it not to apply snapshot and receive new log now.
fail::cfg("on_raft_ready", "sleep(100)").unwrap();
cluster.clear_send_filters();
thread::sleep(Duration::from_millis(150));
// Filter message again to make sure peer on store 3 can not catch up PrepraeMerge log
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(left.get_id(), 3)
.direction(Direction::Recv)
.msg_type(MessageType::MsgAppend),
));
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(right.get_id(), 3)
.direction(Direction::Recv)
.msg_type(MessageType::MsgAppend),
));
// Cause filter is added again, no need to block raftstore anymore
fail::cfg("on_raft_ready", "off").unwrap();

// Wait some time to let already merged peer on store 1 or store 2 to notify
// the peer of left region on store 3 is stale, and then the peer will check
// `pending_cross_snap`
thread::sleep(Duration::from_millis(200));

cluster.must_put(b"k9", b"v9");
// let follower can reach the new log, then commit merge
cluster.clear_send_filters();
must_get_equal(&cluster.get_engine(3), b"k9", b"v9");
}