From 46585d5b4443b8e84bf1be286c1e10987eb4f7da Mon Sep 17 00:00:00 2001 From: Liqi Geng Date: Fri, 14 Feb 2020 21:14:25 +0800 Subject: [PATCH] change merge flow path Signed-off-by: Liqi Geng --- components/test_raftstore/src/pd.rs | 29 +-- src/pd/pd.rs | 6 +- src/raftstore/store/fsm/apply.rs | 94 +++----- src/raftstore/store/fsm/peer.rs | 308 +++++++-------------------- src/raftstore/store/fsm/store.rs | 18 +- src/raftstore/store/msg.rs | 22 +- src/raftstore/store/peer.rs | 78 ++++--- src/server/server.rs | 11 +- tests/failpoints/cases/test_merge.rs | 208 ++++++++++++++---- 9 files changed, 363 insertions(+), 411 deletions(-) diff --git a/components/test_raftstore/src/pd.rs b/components/test_raftstore/src/pd.rs index 569ce9d073dd..93829abf1c9a 100644 --- a/components/test_raftstore/src/pd.rs +++ b/components/test_raftstore/src/pd.rs @@ -860,25 +860,28 @@ impl TestPdClient { pub fn must_merge(&self, from: u64, target: u64) { self.merge_region(from, target); - for _ in 1..500 { - sleep_ms(10); - - if self.get_region_by_id(from).wait().unwrap().is_none() { - return; - } - } - - let region = self.get_region_by_id(from).wait().unwrap(); - if region.is_none() { - return; - } - panic!("region {:?} is still not merged.", region.unwrap()); + self.check_merged_timeout(from, Duration::from_secs(5)); } pub fn check_merged(&self, from: u64) -> bool { self.get_region_by_id(from).wait().unwrap().is_none() } + pub fn check_merged_timeout(&self, from: u64, duration: Duration) { + let timer = Instant::now(); + loop { + let region = self.get_region_by_id(from).wait().unwrap(); + if let Some(r) = region { + if timer.elapsed() > duration { + panic!("region {:?} is still not merged.", r); + } + } else { + return; + } + sleep_ms(10); + } + } + pub fn region_leader_must_be(&self, region_id: u64, peer: metapb::Peer) { for _ in 0..500 { sleep_ms(10); diff --git a/src/pd/pd.rs b/src/pd/pd.rs index 617102f92785..dda71790159a 100644 --- a/src/pd/pd.rs +++ b/src/pd/pd.rs @@ -29,7 +29,7 @@ use crate::raftstore::store::util::is_epoch_stale; use crate::raftstore::store::util::KeysInfoFormatter; use crate::raftstore::store::Callback; use crate::raftstore::store::StoreInfo; -use crate::raftstore::store::{CasualMessage, PeerMsg, RaftCommand, RaftRouter}; +use crate::raftstore::store::{CasualMessage, PeerMsg, RaftCommand, RaftRouter, SignificantMsg}; use crate::storage::FlowStatistics; use tikv_util::collections::HashMap; use tikv_util::metrics::ThreadInfoStatistics; @@ -981,9 +981,9 @@ fn send_admin_request( /// Sends merge fail message to gc merge source. fn send_merge_fail(router: &RaftRouter, source_region_id: u64, target: metapb::Peer) { let target_id = target.get_id(); - if let Err(e) = router.send( + if let Err(e) = router.force_send( source_region_id, - PeerMsg::CasualMessage(CasualMessage::MergeResult { + PeerMsg::SignificantMsg(SignificantMsg::MergeResult { target, stale: true, }), diff --git a/src/raftstore/store/fsm/apply.rs b/src/raftstore/store/fsm/apply.rs index 05a457b7ca61..85a8a047f1b2 100644 --- a/src/raftstore/store/fsm/apply.rs +++ b/src/raftstore/store/fsm/apply.rs @@ -34,7 +34,7 @@ use crate::import::SSTImporter; use crate::raftstore::coprocessor::CoprocessorHost; use crate::raftstore::store::fsm::{RaftPollerBuilder, RaftRouter}; use crate::raftstore::store::metrics::*; -use crate::raftstore::store::msg::{Callback, PeerMsg}; +use crate::raftstore::store::msg::{Callback, PeerMsg, SignificantMsg}; use crate::raftstore::store::peer::Peer; use crate::raftstore::store::peer_storage::{self, write_initial_apply_state, write_peer_state}; use crate::raftstore::store::util::check_region_epoch; @@ -175,7 +175,6 @@ pub enum ExecResult { region: Region, state: MergeState, }, - CatchUpLogs(CatchUpLogs), CommitMerge { region: Region, source: Region, @@ -954,8 +953,7 @@ impl ApplyDelegate { | ExecResult::VerifyHash { .. } | ExecResult::CompactLog { .. } | ExecResult::DeleteRange { .. } - | ExecResult::IngestSST { .. } - | ExecResult::CatchUpLogs { .. } => {} + | ExecResult::IngestSST { .. } => {} ExecResult::SplitRegion { ref derived, .. } => { self.region = derived.clone(); self.metrics.size_diff_hint = 0; @@ -1741,14 +1739,15 @@ impl ApplyDelegate { // The target peer should send missing log entries to the source peer. // // So, the merge process order would be: - // 1. `exec_commit_merge` in target apply worker - // 2. `catch_up_logs_for_merge` in source apply worker (check whether need to catch up logs) - // 3. `on_ready_catch_up_logs` in source raftstore - // 4. ... (raft append and apply logs) - // 5. `on_ready_prepare_merge` in source raftstore (means source region has finished applying all logs) - // 6. `catch_up_logs_for_merge` in source apply worker (destroy itself and send LogsUpToDate) - // 7. resume `exec_commit_merge` in target apply worker - // 8. `on_ready_commit_merge` in target raftstore + // 1. `exec_commit_merge` in target apply fsm and send `CatchUpLogs` to source peer fsm + // 2. `on_catch_up_logs_for_merge` in source peer fsm + // 3. if the source peer has already executed the corresponding `on_ready_prepare_merge`, set pending_remove and jump to step 6 + // 4. ... (raft append and apply logs) + // 5. `on_ready_prepare_merge` in source peer fsm and set pending_remove (means source region has finished applying all logs) + // 6. `logs_up_to_date_for_merge` in source apply fsm (destroy its apply fsm and send Noop to trigger the target apply fsm) + // 7. resume `exec_commit_merge` in target apply fsm + // 8. `on_ready_commit_merge` in target peer fsm and send `MergeResult` to source peer fsm + // 9. `on_merge_result` in source peer fsm (destroy itself) fn exec_commit_merge( &mut self, ctx: &mut ApplyContext, @@ -1789,15 +1788,16 @@ impl ApplyDelegate { "peer_id" => self.id(), "source_region_id" => source_region_id ); - - // Sends message to the source apply worker and pause `exec_commit_merge` process + fail_point!("before_handle_catch_up_logs_for_merge"); + // Sends message to the source peer fsm and pause `exec_commit_merge` process let logs_up_to_date = Arc::new(AtomicU64::new(0)); - let msg = Msg::CatchUpLogs(CatchUpLogs { + let msg = SignificantMsg::CatchUpLogs(CatchUpLogs { target_region_id: self.region_id(), merge: merge.to_owned(), logs_up_to_date: logs_up_to_date.clone(), }); - ctx.router.schedule_task(source_region_id, msg); + ctx.notifier + .notify(source_region_id, PeerMsg::SignificantMsg(msg)); return Ok(( AdminResponse::default(), ApplyResult::WaitMergeSource(logs_up_to_date), @@ -2241,8 +2241,8 @@ pub enum Msg { }, Registration(Registration), Proposal(RegionProposal), - CatchUpLogs(CatchUpLogs), - LogsUpToDate(u64), + LogsUpToDate(CatchUpLogs), + Noop, Destroy(Destroy), Snapshot(GenSnapTask), #[cfg(test)] @@ -2277,8 +2277,8 @@ impl Debug for Msg { Msg::Registration(ref r) => { write!(f, "[region {}] Reg {:?}", r.region.get_id(), r.apply_state) } - Msg::CatchUpLogs(cul) => write!(f, "{:?}", cul.merge), - Msg::LogsUpToDate(region_id) => write!(f, "[region {}] logs are updated", region_id), + Msg::LogsUpToDate(_) => write!(f, "logs are updated"), + Msg::Noop => write!(f, "noop"), Msg::Destroy(ref d) => write!(f, "[region {}] destroy", d.region_id), Msg::Snapshot(GenSnapTask { region_id, .. }) => { write!(f, "[region {}] requests a snapshot", region_id) @@ -2506,37 +2506,7 @@ impl ApplyFsm { true } - fn catch_up_logs_for_merge(&mut self, ctx: &mut ApplyContext, catch_up_logs: CatchUpLogs) { - if ctx.timer.is_none() { - ctx.timer = Some(SlowTimer::new()); - } - - // if it is already up to date, no need to catch up anymore - let apply_index = self.delegate.apply_state.get_applied_index(); - debug!( - "check catch up logs for merge"; - "apply_index" => apply_index, - "commit" => catch_up_logs.merge.get_commit(), - "region_id" => self.delegate.region_id(), - "peer_id" => self.delegate.id(), - ); - if apply_index < catch_up_logs.merge.get_commit() { - fail_point!("on_handle_catch_up_logs_for_merge"); - let mut res = VecDeque::new(); - // send logs to raftstore to append - res.push_back(ExecResult::CatchUpLogs(catch_up_logs)); - - // TODO: can we use `ctx.finish_for()` directly? is it safe here? - ctx.apply_res.push(ApplyRes { - region_id: self.delegate.region_id(), - apply_state: self.delegate.apply_state.clone(), - exec_res: res, - metrics: self.delegate.metrics.clone(), - applied_index_term: self.delegate.applied_index_term, - }); - return; - } - + fn logs_up_to_date_for_merge(&mut self, ctx: &mut ApplyContext, catch_up_logs: CatchUpLogs) { fail_point!("after_handle_catch_up_logs_for_merge"); fail_point!( "after_handle_catch_up_logs_for_merge_1000_1003", @@ -2545,18 +2515,20 @@ impl ApplyFsm { ); let region_id = self.delegate.region_id(); - self.destroy(ctx); - catch_up_logs - .logs_up_to_date - .store(region_id, Ordering::SeqCst); info!( "source logs are all applied now"; "region_id" => region_id, "peer_id" => self.delegate.id(), ); - + // The source peer fsm will be destroyed when the target peer executes `on_ready_commit_merge` + // and sends `merge result` to the source peer fsm. + self.destroy(ctx); + catch_up_logs + .logs_up_to_date + .store(region_id, Ordering::SeqCst); + // To trigger the target apply fsm if let Some(mailbox) = ctx.router.mailbox(catch_up_logs.target_region_id) { - let _ = mailbox.force_send(Msg::LogsUpToDate(region_id)); + let _ = mailbox.force_send(Msg::Noop); } else { error!( "failed to get mailbox, are we shutting down?"; @@ -2610,8 +2582,8 @@ impl ApplyFsm { Some(Msg::Proposal(prop)) => self.handle_proposal(prop), Some(Msg::Registration(reg)) => self.handle_registration(reg), Some(Msg::Destroy(d)) => self.handle_destroy(apply_ctx, d), - Some(Msg::CatchUpLogs(cul)) => self.catch_up_logs_for_merge(apply_ctx, cul), - Some(Msg::LogsUpToDate(_)) => {} + Some(Msg::LogsUpToDate(cul)) => self.logs_up_to_date_for_merge(apply_ctx, cul), + Some(Msg::Noop) => {} Some(Msg::Snapshot(snap_task)) => self.handle_snapshot(apply_ctx, snap_task), #[cfg(test)] Some(Msg::Validate(_, f)) => f(&self.delegate), @@ -2792,7 +2764,7 @@ impl ApplyRouter { } return; } - Msg::Apply { .. } | Msg::Destroy(_) | Msg::LogsUpToDate(_) => { + Msg::Apply { .. } | Msg::Destroy(_) | Msg::Noop => { info!( "target region is not found, drop messages"; "region_id" => region_id @@ -2806,7 +2778,7 @@ impl ApplyRouter { ); return; } - Msg::CatchUpLogs(cul) => { + Msg::LogsUpToDate(cul) => { warn!( "region is removed before merged, are we shutting down?"; "region_id" => region_id, diff --git a/src/raftstore/store/fsm/peer.rs b/src/raftstore/store/fsm/peer.rs index 37e948ec2fba..34390770d9ef 100644 --- a/src/raftstore/store/fsm/peer.rs +++ b/src/raftstore/store/fsm/peer.rs @@ -3,7 +3,6 @@ use std::borrow::Cow; use std::collections::Bound::{Excluded, Included, Unbounded}; use std::collections::VecDeque; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use std::{cmp, u64}; @@ -19,8 +18,8 @@ use kvproto::import_sstpb::SSTMeta; use kvproto::metapb::{self, Region, RegionEpoch}; use kvproto::pdpb::CheckPolicy; use kvproto::raft_cmdpb::{ - AdminCmdType, AdminRequest, CmdType, CommitMergeRequest, RaftCmdRequest, RaftCmdResponse, - StatusCmdType, StatusResponse, + AdminCmdType, AdminRequest, CmdType, RaftCmdRequest, RaftCmdResponse, StatusCmdType, + StatusResponse, }; use kvproto::raft_serverpb::{ MergeState, PeerState, RaftMessage, RaftSnapshotData, RaftTruncatedState, RegionLocalState, @@ -44,7 +43,7 @@ use crate::raftstore::store::fsm::{ use crate::raftstore::store::keys::{self, enc_end_key, enc_start_key}; use crate::raftstore::store::metrics::*; use crate::raftstore::store::msg::Callback; -use crate::raftstore::store::peer::{ConsistencyState, Peer, StaleState, WaitApplyResultState}; +use crate::raftstore::store::peer::{ConsistencyState, Peer, StaleState}; use crate::raftstore::store::peer_storage::{ApplySnapResult, InvokeContext}; use crate::raftstore::store::transport::Transport; use crate::raftstore::store::util::KeysInfoFormatter; @@ -224,10 +223,6 @@ impl PeerFsm { pub fn schedule_applying_snapshot(&mut self) { self.peer.mut_store().schedule_applying_snapshot(); } - - pub fn have_pending_merge_apply_result(&self) -> bool { - self.peer.pending_merge_apply_result.is_some() - } } impl Fsm for PeerFsm { @@ -291,10 +286,6 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { } PeerMsg::Tick(tick) => self.on_tick(tick), PeerMsg::ApplyRes { res } => { - if let Some(state) = self.fsm.peer.pending_merge_apply_result.as_mut() { - state.results.push(res); - continue; - } self.on_apply_res(res); } PeerMsg::SignificantMsg(msg) => self.on_significant_msg(msg), @@ -343,9 +334,6 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { } => { self.on_schedule_half_split_region(®ion_epoch, policy); } - CasualMessage::MergeResult { target, stale } => { - self.on_merge_result(target, stale); - } CasualMessage::GcSnap { snaps } => { self.on_gc_snap(snaps); } @@ -389,9 +377,6 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { } fn start(&mut self) { - if self.fsm.peer.pending_merge_state.is_some() { - self.notify_prepare_merge(); - } self.register_raft_base_tick(); self.register_raft_gc_log_tick(); self.register_pd_heartbeat_tick(); @@ -400,71 +385,6 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { self.on_check_merge(); } - fn notify_prepare_merge(&self) { - let region_id = self.region_id(); - let version = self.region().get_region_epoch().get_version(); - // If there is no merge lock for that key, insert one to let target peer know `PrepareMerge` - // is already executed. - let mut meta = self.ctx.store_meta.lock().unwrap(); - let (exist_version, ready_to_merge) = - match meta.merge_locks.insert(region_id, (version, None)) { - None => return, - Some((v, r)) => (v, r), - }; - if exist_version == version { - let ready_to_merge = ready_to_merge.unwrap(); - // Set `ready_to_merge` to true to indicate `PrepareMerge` is finished. - ready_to_merge.store(true, Ordering::SeqCst); - let state = self.fsm.peer.pending_merge_state.as_ref().unwrap(); - let target_region_id = state.get_target().get_id(); - // Send an empty message to target peer to make sure it will check `ready_to_merge` - self.ctx - .router - .force_send(target_region_id, PeerMsg::Noop) - .unwrap(); - } else if exist_version > version { - meta.merge_locks - .insert(region_id, (exist_version, ready_to_merge)); - } else { - panic!( - "{} expects version {} but got {}", - self.fsm.peer.tag, version, exist_version - ); - } - } - - pub fn resume_handling_pending_apply_result(&mut self) -> bool { - match self.fsm.peer.pending_merge_apply_result { - Some(ref state) => { - if !state.ready_to_merge.load(Ordering::SeqCst) { - return false; - } - } - None => panic!( - "{} doesn't have pending apply result, can't be resume.", - self.fsm.peer.tag - ), - } - - let mut pending_apply = self.fsm.peer.pending_merge_apply_result.take().unwrap(); - let mut drainer = pending_apply.results.drain(..); - while let Some(res) = drainer.next() { - debug!( - "resume handling apply result"; - "region_id" => self.region_id(), - "peer_id" => self.fsm.peer_id(), - "res" => ?res, - ); - self.on_apply_res(res); - // So meet another `CommitMerge` apply result needed to wait. - if let Some(state) = self.fsm.peer.pending_merge_apply_result.as_mut() { - state.results.extend(drainer); - return false; - } - } - true - } - fn on_gc_snap(&mut self, snaps: Vec<(SnapKey, bool)>) { let s = self.fsm.peer.get_store(); let compacted_idx = s.truncated_index(); @@ -579,6 +499,12 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { } } } + SignificantMsg::MergeResult { target, stale } => { + self.on_merge_result(target, stale); + } + SignificantMsg::CatchUpLogs(catch_up_logs) => { + self.on_catch_up_logs_for_merge(catch_up_logs); + } } } @@ -820,15 +746,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { "peer_id" => self.fsm.peer_id(), "res" => ?res, ); - if let Some(ready_to_merge) = self.on_ready_result(&mut res.exec_res, &res.metrics) - { - // There is a `CommitMerge` needed to wait - self.fsm.peer.pending_merge_apply_result = Some(WaitApplyResultState { - results: vec![ApplyTaskRes::Apply(res)], - ready_to_merge, - }); - return; - } + self.on_ready_result(&mut res.exec_res, &res.metrics); if self.fsm.stopped { return; } @@ -1305,7 +1223,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { .router .force_send( region_id, - PeerMsg::CasualMessage(CasualMessage::MergeResult { + PeerMsg::SignificantMsg(SignificantMsg::MergeResult { target: self.fsm.peer.peer.clone(), stale: true, }), @@ -1318,10 +1236,10 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { fn handle_destroy_peer(&mut self, job: DestroyPeerJob) -> bool { if job.initialized { - // When initialized is true and async_remove is false, applyfsm doesn't need to - // send destroy msg to peerfsm because peerfsm has already destroyed. - // In this case, if applyfsm sends destroy msg, peerfsm may be destroyed twice - // because there are some msgs in channel so peerfsm still need to handle them (e.g. callback) + // When initialized is true and async_remove is false, apply fsm doesn't need to + // send destroy msg to peer fsm because peer fsm has already destroyed. + // In this case, if apply fsm sends destroy msg, peer fsm may be destroyed twice + // because there are some msgs in channel so peer fsm still need to handle them (e.g. callback) self.ctx.apply_router.schedule_task( job.region_id, ApplyTask::destroy(job.region_id, job.async_remove), @@ -1370,7 +1288,6 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { } } } - meta.merge_locks.remove(®ion_id); // Destroy read delegates. if let Some(reader) = meta.readers.remove(®ion_id) { @@ -1410,10 +1327,10 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { .remove(&enc_end_key(self.fsm.peer.region())) .is_none() { - panic!("{} meta corruption detected", self.fsm.peer.tag,); + panic!("{} meta corruption detected", self.fsm.peer.tag); } if meta.regions.remove(®ion_id).is_none() && !merged_by_target { - panic!("{} meta corruption detected", self.fsm.peer.tag,) + panic!("{} meta corruption detected", self.fsm.peer.tag) } } @@ -1835,85 +1752,63 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { fn on_ready_prepare_merge(&mut self, region: metapb::Region, state: MergeState) { { let mut meta = self.ctx.store_meta.lock().unwrap(); - meta.set_region( - &self.ctx.coprocessor_host, - region.clone(), - &mut self.fsm.peer, - ); - } - let target = state.get_target().get_id(); - let commit = state.get_commit(); - self.fsm.peer.pending_merge_state = Some(state); - self.notify_prepare_merge(); - - if let Some(logs_up_to_date) = self.fsm.peer.catch_up_logs.take() { - // Send CatchUpLogs back to destroy source apply delegate, - // then it will send `LogsUpToDate` to target apply delegate. - let mut req = CommitMergeRequest::new(); - req.set_commit(commit); - self.ctx.apply_router.schedule_task( - region.get_id(), - ApplyTask::CatchUpLogs(CatchUpLogs { - target_region_id: target, - merge: req, - logs_up_to_date, - }), - ); - return; + meta.set_region(&self.ctx.coprocessor_host, region, &mut self.fsm.peer); } - self.on_check_merge(); - } - - // The `PrepareMerge` and `CommitMerge` is executed sequentially, but we cannot - // ensure the order to handle the apply results between different peers. So check - // the merge locks to ensure `on_ready_prepare_merge` is called. - fn check_merge_locks( - &self, - source: &metapb::Region, - meta: &mut StoreMeta, - ) -> Option> { - let source_region_id = source.get_id(); - let source_version = source.get_region_epoch().get_version(); - - if let Some((exist_version, ready_to_merge)) = meta.merge_locks.remove(&source_region_id) { - if exist_version == source_version { - assert!(ready_to_merge.is_none()); - // So `on_ready_prepare_merge` is executed. - return None; - } else if exist_version < source_version { - assert!( - ready_to_merge.is_none(), - "{} source region {} meets a commit merge before {} < {}", - self.fsm.peer.tag, - source_region_id, - exist_version, - source_version - ); - } else { - panic!( - "{} source region {} can't finished current merge: {} > {}", - self.fsm.peer.tag, source_region_id, exist_version, source_region_id + self.fsm.peer.pending_merge_state = Some(state); + let state = self.fsm.peer.pending_merge_state.as_ref().unwrap(); + + if let Some(ref catch_up_logs) = self.fsm.peer.catch_up_logs { + if state.get_commit() == catch_up_logs.merge.get_commit() { + assert_eq!(state.get_target().get_id(), catch_up_logs.target_region_id); + // Indicate that `on_catch_up_logs_for_merge` has already executed. + // Mark pending_remove because its apply fsm will be destroyed. + self.fsm.peer.pending_remove = true; + // Send CatchUpLogs back to destroy source apply fsm, + // then it will send `Noop` to trigger target apply fsm. + self.ctx.apply_router.schedule_task( + self.fsm.region_id(), + ApplyTask::LogsUpToDate(self.fsm.peer.catch_up_logs.take().unwrap()), ); + return; } } - // The corresponding `on_ready_prepare_merge` is not executed yet. - // Insert the lock, and `on_ready_prepare_merge` will check and use `ready_to_merge` - // to notify. - let ready_to_merge = Arc::new(AtomicBool::new(false)); - meta.merge_locks.insert( - source_region_id, - (source_version, Some(ready_to_merge.clone())), - ); - Some(ready_to_merge) + self.on_check_merge(); } - fn on_ready_catch_up_logs(&mut self, catch_up_logs: CatchUpLogs) { + fn on_catch_up_logs_for_merge(&mut self, mut catch_up_logs: CatchUpLogs) { let region_id = self.fsm.region_id(); assert_eq!(region_id, catch_up_logs.merge.get_source().get_id()); - // directly append these logs to raft log and then commit + if let Some(ref cul) = self.fsm.peer.catch_up_logs { + panic!( + "{} get catch_up_logs from {} but has already got from {}", + self.fsm.peer.tag, catch_up_logs.target_region_id, cul.target_region_id + ) + } + + if let Some(ref pending_merge_state) = self.fsm.peer.pending_merge_state { + if pending_merge_state.get_commit() == catch_up_logs.merge.get_commit() { + assert_eq!( + pending_merge_state.get_target().get_id(), + catch_up_logs.target_region_id + ); + // Indicate that `on_ready_prepare_merge` has already executed. + // Mark pending_remove because its apply fsm will be destroyed. + self.fsm.peer.pending_remove = true; + // Just for saving memory. + catch_up_logs.merge.clear_entries(); + // Send CatchUpLogs back to destroy source apply fsm, + // then it will send `Noop` to trigger target apply fsm. + self.ctx + .apply_router + .schedule_task(region_id, ApplyTask::LogsUpToDate(catch_up_logs)); + return; + } + } + + // Directly append these logs to raft log and then commit them. match self .fsm .peer @@ -1928,7 +1823,6 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { ); // Now it has some committed entries, so mark it to take `Ready` in next round. self.fsm.has_ready = true; - self.fsm.peer.catch_up_logs = Some(catch_up_logs.logs_up_to_date); } None => { info!( @@ -1936,28 +1830,17 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { "region_id" => region_id, "peer_id" => self.fsm.peer.peer_id(), ); - // Send CatchUpLogs back to destroy source apply delegate, - // then it will send `LogsUpToDate` to target apply delegate. - self.ctx - .apply_router - .schedule_task(region_id, ApplyTask::CatchUpLogs(catch_up_logs)); } } + // Just for saving memory. + catch_up_logs.merge.clear_entries(); + self.fsm.peer.catch_up_logs = Some(catch_up_logs); } - fn on_ready_commit_merge( - &mut self, - region: metapb::Region, - source: metapb::Region, - ) -> Option> { + fn on_ready_commit_merge(&mut self, region: metapb::Region, source: metapb::Region) { self.register_split_region_check_tick(); let mut meta = self.ctx.store_meta.lock().unwrap(); - let ready_to_merge = self.check_merge_locks(&source, &mut meta); - if ready_to_merge.is_some() { - return ready_to_merge; - } - let prev = meta.region_ranges.remove(&enc_end_key(&source)); assert_eq!(prev, Some(source.get_id())); let prev = if region.get_end_key() == source.get_end_key() { @@ -1977,6 +1860,8 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { meta.set_region(&self.ctx.coprocessor_host, region, &mut self.fsm.peer); let reader = meta.readers.remove(&source.get_id()).unwrap(); reader.mark_invalid(); + + drop(meta); // make approximate size and keys updated in time. // the reason why follower need to update is that there is a issue that after merge // and then transfer leader, the new leader may have stale size and keys. @@ -1991,13 +1876,15 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { ); self.fsm.peer.heartbeat_pd(self.ctx); } - if let Err(e) = self.ctx.router.send( + if let Err(e) = self.ctx.router.force_send( source.get_id(), - PeerMsg::CasualMessage(CasualMessage::MergeResult { + PeerMsg::SignificantMsg(SignificantMsg::MergeResult { target: self.fsm.peer.peer.clone(), stale: false, }), ) { + // TODO: need to remove "are we shutting down", it should panic + // if we are not in shut-down state info!( "failed to send merge result, are we shutting down?"; "region_id" => self.fsm.region_id(), @@ -2005,7 +1892,6 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { "err" => %e, ); } - None } /// Handle rollbacking Merge result. @@ -2028,33 +1914,9 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { ); } self.fsm.peer.pending_merge_state = None; - { + if let Some(r) = region { let mut meta = self.ctx.store_meta.lock().unwrap(); - if let Some(r) = region { - meta.set_region(&self.ctx.coprocessor_host, r, &mut self.fsm.peer); - } - let region = self.fsm.peer.region(); - let region_id = region.get_id(); - let source_version = region.get_region_epoch().get_version(); - if let Some((exist_version, ready_to_merge)) = meta.merge_locks.remove(®ion_id) { - if exist_version > source_version { - assert!( - ready_to_merge.is_some(), - "{} unexpected empty merge state at {}", - self.fsm.peer.tag, - exist_version - ); - meta.merge_locks - .insert(region_id, (exist_version, ready_to_merge)); - } else { - assert!( - ready_to_merge.is_none(), - "{} rollback a commit merge state at {}", - self.fsm.peer.tag, - exist_version - ); - } - } + meta.set_region(&self.ctx.coprocessor_host, r, &mut self.fsm.peer); } if self.fsm.peer.is_leader() { info!( @@ -2154,11 +2016,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { assert_eq!(prev, Some(prev_region)); } - fn on_ready_result( - &mut self, - exec_results: &mut VecDeque, - metrics: &ApplyMetrics, - ) -> Option> { + fn on_ready_result(&mut self, exec_results: &mut VecDeque, metrics: &ApplyMetrics) { // handle executing committed log results while let Some(result) = exec_results.pop_front() { match result { @@ -2170,18 +2028,10 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { self.on_ready_split_region(derived, regions) } ExecResult::PrepareMerge { region, state } => { - self.on_ready_prepare_merge(region, state); - } - ExecResult::CatchUpLogs(catch_up_logs) => { - self.on_ready_catch_up_logs(catch_up_logs); + self.on_ready_prepare_merge(region, state) } ExecResult::CommitMerge { region, source } => { - if let Some(ready_to_merge) = - self.on_ready_commit_merge(region.clone(), source.clone()) - { - exec_results.push_front(ExecResult::CommitMerge { region, source }); - return Some(ready_to_merge); - } + self.on_ready_commit_merge(region.clone(), source.clone()) } ExecResult::RollbackMerge { region, commit } => { self.on_ready_rollback_merge(commit, Some(region)) @@ -2204,8 +2054,6 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { self.ctx.store_stat.lock_cf_bytes_written += metrics.lock_cf_written_bytes; self.ctx.store_stat.engine_total_bytes_written += metrics.written_bytes; self.ctx.store_stat.engine_total_keys_written += metrics.written_keys; - - None } /// Check if a request is valid if it has valid prepare_merge/commit_merge proposal. diff --git a/src/raftstore/store/fsm/store.rs b/src/raftstore/store/fsm/store.rs index 7566c2b6c8d2..5278da856acd 100644 --- a/src/raftstore/store/fsm/store.rs +++ b/src/raftstore/store/fsm/store.rs @@ -14,7 +14,7 @@ use kvproto::raft_serverpb::{PeerState, RaftMessage, RegionLocalState}; use raft::{Ready, StateRole}; use std::collections::BTreeMap; use std::collections::Bound::{Excluded, Included, Unbounded}; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use std::{mem, thread, u64}; @@ -95,11 +95,6 @@ pub struct StoreMeta { /// An inverse mapping of `pending_merge_targets` used to let source peer help target peer to clean up related entry. /// source_region_id -> target_region_id pub targets_map: HashMap, - /// In raftstore, the execute order of `PrepareMerge` and `CommitMerge` is not certain because of the messages - /// belongs two regions. To make them in order, `PrepareMerge` will set this structure and `CommitMerge` will retry - /// later if there is no related lock. - /// source_region_id -> (version, BiLock). - pub merge_locks: HashMap>)>, } impl StoreMeta { @@ -113,7 +108,6 @@ impl StoreMeta { pending_snapshot_regions: Vec::default(), pending_merge_targets: HashMap::default(), targets_map: HashMap::default(), - merge_locks: HashMap::default(), } } @@ -598,14 +592,6 @@ impl PollHandler for RaftPoller Option { let mut expected_msg_count = None; - if peer.have_pending_merge_apply_result() { - expected_msg_count = Some(peer.receiver.len()); - let mut delegate = PeerFsmDelegate::new(peer, &mut self.poll_ctx); - if !delegate.resume_handling_pending_apply_result() { - return expected_msg_count; - } - expected_msg_count = None; - } fail_point!( "pause_on_peer_collect_message", @@ -1399,7 +1385,7 @@ impl<'a, T: Transport, C: PdClient> StoreFsmDelegate<'a, T, C> { .router .force_send( id, - PeerMsg::CasualMessage(CasualMessage::MergeResult { + PeerMsg::SignificantMsg(SignificantMsg::MergeResult { target: target.clone(), stale: true, }), diff --git a/src/raftstore/store/msg.rs b/src/raftstore/store/msg.rs index 3250654aa700..9d9d8d04b515 100644 --- a/src/raftstore/store/msg.rs +++ b/src/raftstore/store/msg.rs @@ -11,6 +11,7 @@ use kvproto::raft_cmdpb::{RaftCmdRequest, RaftCmdResponse}; use kvproto::raft_serverpb::RaftMessage; use raft::SnapshotStatus; +use crate::raftstore::store::fsm::apply::CatchUpLogs; use crate::raftstore::store::fsm::apply::TaskRes as ApplyTaskRes; use crate::raftstore::store::util::KeysInfoFormatter; use crate::raftstore::store::SnapKey; @@ -142,7 +143,7 @@ impl StoreTick { /// Some significant messages sent to raftstore. Raftstore will dispatch these messages to Raft /// groups to update some important internal status. -#[derive(Debug, PartialEq)] +#[derive(Debug)] pub enum SignificantMsg { /// Reports whether the snapshot sending is successful or not. SnapshotStatus { @@ -158,6 +159,15 @@ pub enum SignificantMsg { region_id: u64, to_peer_id: u64, }, + /// Source region catch up logs for merging + CatchUpLogs(CatchUpLogs), + /// Result of the fact that the region is merged. + MergeResult { + target: metapb::Peer, + // True means it's a stale merge source. + // False means it came from target region. + stale: bool, + }, } /// Message that will be sent to a peer. @@ -196,11 +206,6 @@ pub enum CasualMessage { region_epoch: RegionEpoch, policy: CheckPolicy, }, - /// Result of querying pd whether a region is merged. - MergeResult { - target: metapb::Peer, - stale: bool, - }, /// Remove snapshot files in `snaps`. GcSnap { snaps: Vec<(SnapKey, bool)>, @@ -237,11 +242,6 @@ impl fmt::Debug for CasualMessage { write!(fmt, "compaction declined bytes {}", bytes) } CasualMessage::HalfSplitRegion { .. } => write!(fmt, "Half Split"), - CasualMessage::MergeResult { target, stale } => write! { - fmt, - "target: {:?}, successful: {}", - target, stale - }, CasualMessage::GcSnap { ref snaps } => write! { fmt, "gc snaps {:?}", diff --git a/src/raftstore/store/peer.rs b/src/raftstore/store/peer.rs index 3edad99b2c91..74740dc86ddd 100644 --- a/src/raftstore/store/peer.rs +++ b/src/raftstore/store/peer.rs @@ -3,7 +3,6 @@ use std::cell::RefCell; use std::collections::Bound::{Excluded, Unbounded}; use std::collections::VecDeque; -use std::sync::atomic::{AtomicBool, AtomicU64}; use std::sync::{atomic, Arc}; use std::time::{Duration, Instant}; use std::{cmp, mem, u64, usize}; @@ -31,9 +30,10 @@ use uuid::Uuid; use crate::pd::{PdTask, INVALID_ID}; use crate::raftstore::coprocessor::{CoprocessorHost, RegionChangeEvent}; +use crate::raftstore::store::fsm::apply::CatchUpLogs; use crate::raftstore::store::fsm::store::PollContext; use crate::raftstore::store::fsm::{ - apply, Apply, ApplyMetrics, ApplyTask, ApplyTaskRes, GroupState, Proposal, RegionProposal, + apply, Apply, ApplyMetrics, ApplyTask, GroupState, Proposal, RegionProposal, }; use crate::raftstore::store::keys::{self, enc_end_key, enc_start_key}; use crate::raftstore::store::worker::{ReadDelegate, ReadProgress, RegionTask}; @@ -106,9 +106,9 @@ bitflags! { // TODO: maybe declare it as protobuf struct is better. /// A bitmap contains some useful flags when dealing with `eraftpb::Entry`. pub struct ProposalContext: u8 { - const SYNC_LOG = 0b00000001; - const SPLIT = 0b00000010; - const PREPARE_MERGE = 0b00000100; + const SYNC_LOG = 0b0000_0001; + const SPLIT = 0b0000_0010; + const PREPARE_MERGE = 0b0000_0100; } } @@ -155,19 +155,6 @@ pub struct CheckTickResult { up_to_date: bool, } -/// A struct that stores the state to wait for `PrepareMerge` apply result. -/// -/// When handling the apply result of a `CommitMerge`, the source peer may have -/// not handle the apply result of the `PrepareMerge`, so the target peer has -/// to abort current handle process and wait for it asynchronously. -pub struct WaitApplyResultState { - /// The following apply results waiting to be handled, including the `CommitMerge`. - /// These will be handled once `ready_to_merge` is true. - pub results: Vec, - /// It is used by target peer to check whether the apply result of `PrepareMerge` is handled. - pub ready_to_merge: Arc, -} - pub struct Peer { /// The ID of the Region which this Peer belongs to. region_id: u64, @@ -194,6 +181,7 @@ pub struct Peer { /// If it fails to send messages to leader. pub leader_unreachable: bool, /// Whether this peer is destroyed asynchronously. + /// If it's true when merging, its data in storeMeta will be removed early by the target peer pub pending_remove: bool, /// If a snapshot is being applied asynchronously, messages should not be sent. pending_messages: Vec, @@ -237,10 +225,8 @@ pub struct Peer { last_committed_prepare_merge_idx: u64, /// The merge related state. It indicates this Peer is in merging. pub pending_merge_state: Option, - /// The state to wait for `PrepareMerge` apply result. - pub pending_merge_apply_result: Option, /// source region is catching up logs for merge - pub catch_up_logs: Option>, + pub catch_up_logs: Option, /// Write Statistics for PD to schedule hot spot. pub peer_stat: PeerStat, @@ -318,7 +304,6 @@ impl Peer { raft_log_size_hint: 0, leader_lease: Lease::new(cfg.raft_store_max_leader_lease()), pending_messages: vec![], - pending_merge_apply_result: None, peer_stat: PeerStat::default(), catch_up_logs: None, }; @@ -364,13 +349,13 @@ impl Peer { // Though the entries is empty, it is possible that one source peer has caught up the logs // but commit index is not updated. If Other source peers are already destroyed, so the raft // group will not make any progress, namely the source peer can not get the latest commit index anymore. - // Here update the commit index to let source apply rest uncommitted entires. - if merge.get_commit() > self.raft_group.raft.raft_log.committed { + // Here update the commit index to let source apply rest uncommitted entries. + return if merge.get_commit() > self.raft_group.raft.raft_log.committed { self.raft_group.raft.raft_log.commit_to(merge.get_commit()); - return Some(merge.get_commit()); + Some(merge.get_commit()) } else { - return None; - } + None + }; } let first = entries.first().unwrap(); // make sure message should be with index not smaller than committed @@ -409,12 +394,12 @@ impl Peer { ); return None; } - // If initialized is false, it implicitly means applyfsm does not exist now. + // If initialized is false, it implicitly means apply fsm does not exist now. let initialized = self.get_store().is_initialized(); - // If async_remove is true, it means peerfsm needs to be removed after its - // corresponding applyfsm was removed. - // If it is false, it means either applyfsm does not exist or there is no task - // in applyfsm so it's ok to remove peerfsm immediately. + // If async_remove is true, it means peer fsm needs to be removed after its + // corresponding apply fsm was removed. + // If it is false, it means either apply fsm does not exist or there is no task + // in apply fsm so it's ok to remove peer fsm immediately. let async_remove = if self.is_applying_snapshot() { if !self.mut_store().cancel_applying_snap() { info!( @@ -1106,6 +1091,15 @@ impl Peer { "peer_id" => self.peer.get_id(), ); + let before_handle_raft_ready_1003 = || { + fail_point!( + "before_handle_raft_ready_1003", + self.peer.get_id() == 1003 && self.is_leader(), + |_| {} + ); + }; + before_handle_raft_ready_1003(); + let mut ready = self.raft_group.ready_since(self.last_applying_idx); self.on_role_changed(ctx, &ready); @@ -1265,6 +1259,25 @@ impl Peer { merge_to_be_update = false; } } + + fail_point!( + "before_send_rollback_merge_1003", + if self.peer_id() != 1003 { + false + } else { + let index = entry.get_index(); + let data = entry.get_data(); + if data.is_empty() || entry.get_entry_type() != EntryType::EntryNormal { + false + } else { + let cmd: RaftCmdRequest = util::parse_data_at(data, index, &self.tag); + cmd.has_admin_request() + && cmd.get_admin_request().get_cmd_type() + == AdminCmdType::RollbackMerge + } + }, + |_| {} + ); } if !committed_entries.is_empty() { self.last_applying_idx = committed_entries.last().unwrap().get_index(); @@ -1277,6 +1290,7 @@ impl Peer { ctx.apply_router .schedule_task(self.region_id, ApplyTask::apply(apply)); } + fail_point!("after_send_to_apply_1003", self.peer_id() == 1003, |_| {}); } self.apply_reads(ctx, &ready); diff --git a/src/server/server.rs b/src/server/server.rs index c08fbb379ea5..96a21979866e 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -309,9 +309,14 @@ mod tests { } fn is_unreachable_to(msg: &SignificantMsg, region_id: u64, to_peer_id: u64) -> bool { - *msg == SignificantMsg::Unreachable { - region_id, - to_peer_id, + if let SignificantMsg::Unreachable { + region_id: r_id, + to_peer_id: p_id, + } = *msg + { + region_id == r_id && to_peer_id == p_id + } else { + false } } diff --git a/tests/failpoints/cases/test_merge.rs b/tests/failpoints/cases/test_merge.rs index 4936378525b3..cbd39559dbf9 100644 --- a/tests/failpoints/cases/test_merge.rs +++ b/tests/failpoints/cases/test_merge.rs @@ -6,7 +6,6 @@ use std::thread; use std::time::*; use fail; -use futures::Future; use kvproto::raft_serverpb::{PeerState, RegionLocalState}; use raft::eraftpb::MessageType; @@ -14,7 +13,7 @@ use raft::eraftpb::MessageType; use engine::*; use test_raftstore::*; use tikv::pd::PdClient; -use tikv::raftstore::store::keys; +use tikv::raftstore::store::{keys, Callback}; use tikv_util::config::*; use tikv_util::HandyRwLock; @@ -140,22 +139,7 @@ fn test_node_merge_restart() { cluster.start().unwrap(); // Wait till merge is finished. - let timer = Instant::now(); - loop { - if pd_client - .get_region_by_id(left.get_id()) - .wait() - .unwrap() - .is_none() - { - break; - } - - if timer.elapsed() > Duration::from_secs(5) { - panic!("region still not merged after 5 secs"); - } - sleep_ms(10); - } + pd_client.check_merged_timeout(left.get_id(), Duration::from_secs(5)); cluster.must_put(b"k4", b"v4"); @@ -357,9 +341,9 @@ fn test_node_merge_catch_up_logs_no_need() { thread::sleep(Duration::from_millis(100)); // let source region not merged - fail::cfg("on_handle_catch_up_logs_for_merge", "pause").unwrap(); + fail::cfg("before_handle_catch_up_logs_for_merge", "pause").unwrap(); fail::cfg("after_handle_catch_up_logs_for_merge", "pause").unwrap(); - // due to `on_handle_catch_up_logs_for_merge` failpoint, we already pass `apply_index < catch_up_logs.merge.get_commit()` + // due to `before_handle_catch_up_logs_for_merge` failpoint, we already pass `apply_index < catch_up_logs.merge.get_commit()` // so now can let apply index make progress. fail::remove("apply_after_prepare_merge"); @@ -368,7 +352,7 @@ fn test_node_merge_catch_up_logs_no_need() { thread::sleep(Duration::from_millis(50)); // let merge process continue - fail::remove("on_handle_catch_up_logs_for_merge"); + fail::remove("before_handle_catch_up_logs_for_merge"); fail::remove("after_handle_catch_up_logs_for_merge"); thread::sleep(Duration::from_millis(50)); @@ -549,19 +533,21 @@ fn test_node_merge_restart_after_apply_premerge_before_apply_compact_log() { configure_for_merge(&mut cluster); cluster.cfg.raft_store.merge_max_log_gap = 10; cluster.cfg.raft_store.raft_log_gc_count_limit = 11; - // rely on this config to trigger a compact log + // Rely on this config to trigger a compact log cluster.cfg.raft_store.raft_log_gc_size_limit = ReadableSize(1); cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10); + + let pd_client = Arc::clone(&cluster.pd_client); + pd_client.disable_default_operator(); + cluster.run(); - // prevent gc_log_tick to propose a compact log + // Prevent gc_log_tick to propose a compact log let raft_gc_log_tick_fp = "on_raft_gc_log_tick"; fail::cfg(raft_gc_log_tick_fp, "return()").unwrap(); cluster.must_put(b"k1", b"v1"); cluster.must_put(b"k3", b"v3"); - let pd_client = Arc::clone(&cluster.pd_client); let region = pd_client.get_region(b"k1").unwrap(); - cluster.must_split(®ion, b"k2"); let left = pd_client.get_region(b"k1").unwrap(); @@ -569,46 +555,40 @@ fn test_node_merge_restart_after_apply_premerge_before_apply_compact_log() { let left_peer_1 = find_peer(&left, 1).cloned().unwrap(); cluster.must_transfer_leader(left.get_id(), left_peer_1); - // make log gap between store 1 and store 3, for min_index in preMerge + // Make log gap between store 1 and store 3, for min_index in preMerge cluster.add_send_filter(IsolationFilterFactory::new(3)); for i in 0..6 { cluster.must_put(format!("k1{}", i).as_bytes(), b"v1"); } - // prevent on_apply_res to update merge_state in Peer - // if not, almost everything cannot propose including compact log + // Prevent on_apply_res to update merge_state in Peer + // If not, almost everything cannot propose including compact log let on_apply_res_fp = "on_apply_res"; fail::cfg(on_apply_res_fp, "return()").unwrap(); - let merge = new_prepare_merge(right.clone()); - let req = new_admin_request(left.get_id(), left.get_region_epoch(), merge); - let resp = cluster - .call_command_on_leader(req, Duration::from_secs(3)) - .unwrap(); - if resp.get_header().has_error() { - panic!("response {:?} has error", resp); - } + cluster.try_merge(left.get_id(), right.get_id()); + cluster.clear_send_filters(); - // prevent apply fsm to apply compact log + // Prevent apply fsm to apply compact log let handle_apply_fp = "on_handle_apply"; fail::cfg(handle_apply_fp, "return()").unwrap(); let state1 = cluster.truncated_state(left.get_id(), 1); fail::remove(raft_gc_log_tick_fp); - // wait for compact log to be proposed and committed maybe + // Wait for compact log to be proposed and committed maybe sleep_ms(30); cluster.shutdown(); fail::remove(handle_apply_fp); fail::remove(on_apply_res_fp); - // prevent sched_merge_tick to propose CommitMerge + // Prevent sched_merge_tick to propose CommitMerge let schedule_merge_fp = "on_schedule_merge"; fail::cfg(schedule_merge_fp, "return()").unwrap(); cluster.start().unwrap(); - // wait for compact log to apply + // Wait for compact log to apply for _ in 0..50 { let state2 = cluster.truncated_state(left.get_id(), 1); if state1.get_index() != state2.get_index() { @@ -616,10 +596,154 @@ fn test_node_merge_restart_after_apply_premerge_before_apply_compact_log() { } sleep_ms(10); } - // can schedule merge now + // Now schedule merge fail::remove(schedule_merge_fp); - // propose to left region and wait for merge to succeed conveniently + pd_client.check_merged_timeout(left.get_id(), Duration::from_secs(5)); + cluster.must_put(b"k123", b"v2"); must_get_equal(&cluster.get_engine(3), b"k123", b"v2"); } + +/// Tests whether stale merge is rollback properly if it merge to the same target region again later. +#[test] +fn test_node_failed_merge_before_succeed_merge() { + let _guard = crate::setup(); + let mut cluster = new_node_cluster(0, 3); + configure_for_merge(&mut cluster); + cluster.cfg.raft_store.merge_max_log_gap = 30; + cluster.cfg.raft_store.store_max_batch_size = 1; + cluster.cfg.raft_store.store_pool_size = 2; + let pd_client = Arc::clone(&cluster.pd_client); + pd_client.disable_default_operator(); + + cluster.run(); + + for i in 0..10 { + cluster.must_put(format!("k{}", i).as_bytes(), b"v1"); + } + let region = pd_client.get_region(b"k1").unwrap(); + cluster.must_split(®ion, b"k5"); + + let left = pd_client.get_region(b"k1").unwrap(); + let mut right = pd_client.get_region(b"k5").unwrap(); + let left_peer_1 = find_peer(&left, 1).cloned().unwrap(); + cluster.must_transfer_leader(left.get_id(), left_peer_1.clone()); + + let left_peer_3 = find_peer(&left, 3).cloned().unwrap(); + assert_eq!(left_peer_3.get_id(), 1003); + + // Prevent sched_merge_tick to propose CommitMerge + let schedule_merge_fp = "on_schedule_merge"; + fail::cfg(schedule_merge_fp, "return()").unwrap(); + + // To minimize peers log gap for merging + cluster.must_put(b"k11", b"v2"); + must_get_equal(&cluster.get_engine(2), b"k11", b"v2"); + must_get_equal(&cluster.get_engine(3), b"k11", b"v2"); + // Make peer 1003 can't receive PrepareMerge and RollbackMerge log + cluster.add_send_filter(IsolationFilterFactory::new(3)); + + cluster.try_merge(left.get_id(), right.get_id()); + + // Change right region's epoch to make this merge failed + cluster.must_split(&right, b"k8"); + fail::remove(schedule_merge_fp); + // Wait for left region to rollback merge + cluster.must_put(b"k12", b"v2"); + // Prevent the `PrepareMerge` and `RollbackMerge` log sending to apply fsm after + // cleaning send filter. Since this method is just to check `RollbackMerge`, + // the `PrepareMerge` may escape, but it makes the best effort. + let before_send_rollback_merge_1003_fp = "before_send_rollback_merge_1003"; + fail::cfg(before_send_rollback_merge_1003_fp, "pause").unwrap(); + cluster.clear_send_filters(); + + right = pd_client.get_region(b"k5").unwrap(); + let right_peer_1 = find_peer(&right, 1).cloned().unwrap(); + cluster.must_transfer_leader(right.get_id(), right_peer_1); + // Add some data for checking data integrity check at a later time + for i in 0..5 { + cluster.must_put(format!("k2{}", i).as_bytes(), b"v3"); + } + // Do a really succeed merge + pd_client.must_merge(left.get_id(), right.get_id()); + // Wait right region to send CatchUpLogs to left region. + sleep_ms(100); + // After executing CatchUpLogs in source peer fsm, the committed log will send + // to apply fsm in the end of this batch. So even the first `on_ready_prepare_merge` + // is executed after CatchUplogs, the latter committed logs is still sent to apply fsm + // if CatchUpLogs and `on_ready_prepare_merge` is in different batch. + // + // In this case, the data is complete because the wrong up-to-date msg from the + // first `on_ready_prepare_merge` is sent after all committed log. + // Sleep a while to wait apply fsm to send `on_ready_prepare_merge` to peer fsm. + let after_send_to_apply_1003_fp = "after_send_to_apply_1003"; + fail::cfg(after_send_to_apply_1003_fp, "sleep(300)").unwrap(); + + fail::remove(before_send_rollback_merge_1003_fp); + // Wait `after_send_to_apply_1003` timeout + sleep_ms(300); + fail::remove(after_send_to_apply_1003_fp); + // Check the data integrity + for i in 0..5 { + must_get_equal(&cluster.get_engine(3), format!("k2{}", i).as_bytes(), b"v3"); + } +} + +/// Tests whether the source peer is destroyed correctly when transferring leader during committing merge. +/// +/// In the previous merge flow, target peer deletes meta of source peer without marking it as pending remove. +/// If source peer becomes leader at the same time, it will panic due to corrupted meta. +#[test] +fn test_node_merge_transfer_leader() { + let _guard = crate::setup(); + let mut cluster = new_node_cluster(0, 3); + configure_for_merge(&mut cluster); + cluster.cfg.raft_store.store_max_batch_size = 1; + cluster.cfg.raft_store.store_pool_size = 2; + let pd_client = Arc::clone(&cluster.pd_client); + pd_client.disable_default_operator(); + + cluster.run(); + + let region = pd_client.get_region(b"k1").unwrap(); + cluster.must_split(®ion, b"k2"); + + cluster.must_put(b"k1", b"v1"); + cluster.must_put(b"k3", b"v3"); + + let left = pd_client.get_region(b"k1").unwrap(); + let right = pd_client.get_region(b"k2").unwrap(); + + let left_peer_1 = find_peer(&left, 1).unwrap().to_owned(); + cluster.must_transfer_leader(left.get_id(), left_peer_1.clone()); + + let schedule_merge_fp = "on_schedule_merge"; + fail::cfg(schedule_merge_fp, "return()").unwrap(); + + cluster.try_merge(left.get_id(), right.get_id()); + + let left_peer_3 = find_peer(&left, 3).unwrap().to_owned(); + assert_eq!(left_peer_3.get_id(), 1003); + // Prevent peer 1003 to handle ready when it's leader + let before_handle_raft_ready_1003 = "before_handle_raft_ready_1003"; + fail::cfg(before_handle_raft_ready_1003, "pause").unwrap(); + + let epoch = cluster.get_region_epoch(left.get_id()); + let mut transfer_leader_req = + new_admin_request(left.get_id(), &epoch, new_transfer_leader_cmd(left_peer_3)); + transfer_leader_req.mut_header().set_peer(left_peer_1); + cluster + .sim + .rl() + .async_command_on_node(1, transfer_leader_req, Callback::None) + .unwrap(); + fail::remove(schedule_merge_fp); + + pd_client.check_merged_timeout(left.get_id(), Duration::from_secs(5)); + + fail::remove(before_handle_raft_ready_1003); + sleep_ms(100); + cluster.must_put(b"k4", b"v4"); + must_get_equal(&cluster.get_engine(3), b"k4", b"v4"); +}