From a8aa437990bf10ffc69f4387a114a7658b7b6544 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Tue, 19 Jul 2022 18:22:23 +0800 Subject: [PATCH 01/27] just change post_exec Signed-off-by: CalvinNeo --- .../raftstore/src/coprocessor/dispatcher.rs | 7 ++++--- components/raftstore/src/coprocessor/mod.rs | 12 ++++++++++-- components/raftstore/src/store/fsm/apply.rs | 15 ++++++++++++++- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index cd370e332e3..17a89292618 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -443,18 +443,19 @@ impl CoprocessorHost { /// It notifies observers side effects of this command before execution of the next command, /// including req/resp, apply state, modified region state, etc. /// Return true observers think a persistence is necessary. - pub fn post_exec( + pub fn post_exec<'a>( &self, region: &Region, cmd: &Cmd, apply_state: &RaftApplyState, region_state: &RegionState, + apply_ctx: &ApplyCtxInfo<'a> ) -> bool { let mut ctx = ObserverContext::new(region); if !cmd.response.has_admin_response() { for observer in &self.registry.query_observers { let observer = observer.observer.inner(); - if observer.post_exec_query(&mut ctx, cmd, apply_state, region_state) { + if observer.post_exec_query(&mut ctx, cmd, apply_state, region_state, apply_ctx) { return true; } } @@ -462,7 +463,7 @@ impl CoprocessorHost { } else { for observer in &self.registry.admin_observers { let observer = observer.observer.inner(); - if observer.post_exec_admin(&mut ctx, cmd, apply_state, region_state) { + if observer.post_exec_admin(&mut ctx, cmd, apply_state, region_state, apply_ctx) { return true; } } diff --git a/components/raftstore/src/coprocessor/mod.rs b/components/raftstore/src/coprocessor/mod.rs index b798c7577af..93dd492a20c 100644 --- a/components/raftstore/src/coprocessor/mod.rs +++ b/components/raftstore/src/coprocessor/mod.rs @@ -9,6 +9,7 @@ use std::{ vec::IntoIter, }; +use engine_traits::SstMetaInfo; use engine_traits::CfName; use kvproto::{ metapb::Region, @@ -62,6 +63,11 @@ pub struct ObserverContext<'a> { pub bypass: bool, } +pub struct ApplyCtxInfo<'a> { + pub delete_ssts: &'a mut Vec, + pub pending_clean_ssts: &'a mut Vec, +} + impl<'a> ObserverContext<'a> { pub fn new(region: &Region) -> ObserverContext<'_> { ObserverContext { @@ -101,12 +107,13 @@ pub trait AdminObserver: Coprocessor { /// Hook to call immediately after exec command /// Will be a special persistence after this exec if a observer returns true. - fn post_exec_admin( + fn post_exec_admin<'a>( &self, _: &mut ObserverContext<'_>, _: &Cmd, _: &RaftApplyState, _: &RegionState, + _: &ApplyCtxInfo<'a> ) -> bool { false } @@ -137,12 +144,13 @@ pub trait QueryObserver: Coprocessor { /// Hook to call immediately after exec command. /// Will be a special persistence after this exec if a observer returns true. - fn post_exec_query( + fn post_exec_query<'a>( &self, _: &mut ObserverContext<'_>, _: &Cmd, _: &RaftApplyState, _: &RegionState, + _: &ApplyCtxInfo<'a> ) -> bool { false } diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 16e039dd640..366973e57f5 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -93,6 +93,7 @@ use crate::{ }, Error, Result, }; +use crate::coprocessor::ApplyCtxInfo; const DEFAULT_APPLY_WB_SIZE: usize = 4 * 1024; const APPLY_WB_SHRINK_SIZE: usize = 1024 * 1024; @@ -404,6 +405,11 @@ where /// this entry will never apply again at first, then we can delete the ssts files. delete_ssts: Vec, + /// A self-defined engine may be slow to ingest ssts. + /// It may move some elements of `delete_ssts` into `pending_clean_ssts` to delay deletion. + /// Otherwise we may lost data. + pending_clean_ssts: Vec, + /// The priority of this Handler. priority: Priority, /// Whether to yield high-latency operation to low-priority handler. @@ -461,6 +467,7 @@ where perf_context: engine.get_perf_context(cfg.perf_level, PerfContextKind::RaftstoreApply), yield_duration: cfg.apply_yield_duration.0, delete_ssts: vec![], + pending_clean_ssts: vec![], store_id, pending_create_peers, priority, @@ -1305,6 +1312,10 @@ where self.applied_index_term = term; let cmd = Cmd::new(index, term, req.clone(), resp.clone()); + let apply_ctx_info = ApplyCtxInfo { + delete_ssts: &mut ctx.delete_ssts, + pending_clean_ssts: &mut ctx.pending_clean_ssts, + }; let should_write = ctx.host.post_exec( &self.region, &cmd, @@ -1323,6 +1334,7 @@ where _ => None, }, }, + &apply_ctx_info, ); if let ApplyResult::Res(ref exec_result) = exec_result { @@ -4946,12 +4958,13 @@ mod tests { } impl AdminObserver for ApplyObserver { - fn post_exec_admin( + fn post_exec_admin<'a>( &self, _: &mut ObserverContext<'_>, cmd: &Cmd, _: &RaftApplyState, region_state: &RegionState, + _: &ApplyCtxInfo<'a> ) -> bool { let request = cmd.request.get_admin_request(); match request.get_cmd_type() { From d539ca9b587f9249394a3667798f1a8dfce5609a Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 20 Jul 2022 10:41:12 +0800 Subject: [PATCH 02/27] add tests with comments Signed-off-by: CalvinNeo --- .../raftstore/src/coprocessor/dispatcher.rs | 2 +- components/raftstore/src/coprocessor/mod.rs | 8 +- components/raftstore/src/store/fsm/apply.rs | 147 +++++++++++++++--- components/raftstore/src/store/util.rs | 4 + components/sst_importer/src/import_file.rs | 4 + 5 files changed, 141 insertions(+), 24 deletions(-) diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index 17a89292618..90b9c752b6b 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -449,7 +449,7 @@ impl CoprocessorHost { cmd: &Cmd, apply_state: &RaftApplyState, region_state: &RegionState, - apply_ctx: &ApplyCtxInfo<'a> + apply_ctx: &mut ApplyCtxInfo<'a>, ) -> bool { let mut ctx = ObserverContext::new(region); if !cmd.response.has_admin_response() { diff --git a/components/raftstore/src/coprocessor/mod.rs b/components/raftstore/src/coprocessor/mod.rs index 93dd492a20c..847f1f5ddae 100644 --- a/components/raftstore/src/coprocessor/mod.rs +++ b/components/raftstore/src/coprocessor/mod.rs @@ -9,8 +9,7 @@ use std::{ vec::IntoIter, }; -use engine_traits::SstMetaInfo; -use engine_traits::CfName; +use engine_traits::{CfName, SstMetaInfo}; use kvproto::{ metapb::Region, pdpb::CheckPolicy, @@ -64,6 +63,7 @@ pub struct ObserverContext<'a> { } pub struct ApplyCtxInfo<'a> { + pub pending_handle_ssts: &'a mut Option>, pub delete_ssts: &'a mut Vec, pub pending_clean_ssts: &'a mut Vec, } @@ -113,7 +113,7 @@ pub trait AdminObserver: Coprocessor { _: &Cmd, _: &RaftApplyState, _: &RegionState, - _: &ApplyCtxInfo<'a> + _: &mut ApplyCtxInfo<'a>, ) -> bool { false } @@ -150,7 +150,7 @@ pub trait QueryObserver: Coprocessor { _: &Cmd, _: &RaftApplyState, _: &RegionState, - _: &ApplyCtxInfo<'a> + _: &mut ApplyCtxInfo<'a>, ) -> bool { false } diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 366973e57f5..1a475a70d16 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -73,7 +73,8 @@ use super::metrics::*; use crate::{ bytes_capacity, coprocessor::{ - Cmd, CmdBatch, CmdObserveInfo, CoprocessorHost, ObserveHandle, ObserveLevel, RegionState, + ApplyCtxInfo, Cmd, CmdBatch, CmdObserveInfo, CoprocessorHost, ObserveHandle, ObserveLevel, + RegionState, }, store::{ cmd_resp, @@ -93,7 +94,6 @@ use crate::{ }, Error, Result, }; -use crate::coprocessor::ApplyCtxInfo; const DEFAULT_APPLY_WB_SIZE: usize = 4 * 1024; const APPLY_WB_SHRINK_SIZE: usize = 1024 * 1024; @@ -1312,7 +1312,21 @@ where self.applied_index_term = term; let cmd = Cmd::new(index, term, req.clone(), resp.clone()); - let apply_ctx_info = ApplyCtxInfo { + println!("!!!! exec_result {:?}", exec_result); + let (modified_region, mut pending_handle_ssts) = match exec_result { + ApplyResult::Res(ref e) => match e { + ExecResult::SplitRegion { ref derived, .. } => (Some(derived.clone()), None), + ExecResult::PrepareMerge { ref region, .. } => (Some(region.clone()), None), + ExecResult::CommitMerge { ref region, .. } => (Some(region.clone()), None), + ExecResult::RollbackMerge { ref region, .. } => (Some(region.clone()), None), + ExecResult::IngestSst { ref ssts } => (None, Some(ssts.clone())), + _ => (None, None), + }, + _ => (None, None), + }; + println!("!!!! pending_handle_ssts {:?}", pending_handle_ssts); + let mut apply_ctx_info = ApplyCtxInfo { + pending_handle_ssts: &mut pending_handle_ssts, delete_ssts: &mut ctx.delete_ssts, pending_clean_ssts: &mut ctx.pending_clean_ssts, }; @@ -1323,19 +1337,20 @@ where &RegionState { peer_id: self.id(), pending_remove: self.pending_remove, - modified_region: match exec_result { - ApplyResult::Res(ref e) => match e { - ExecResult::SplitRegion { ref derived, .. } => Some(derived.clone()), - ExecResult::PrepareMerge { ref region, .. } => Some(region.clone()), - ExecResult::CommitMerge { ref region, .. } => Some(region.clone()), - ExecResult::RollbackMerge { ref region, .. } => Some(region.clone()), - _ => None, - }, - _ => None, - }, + modified_region, }, - &apply_ctx_info, + &mut apply_ctx_info, ); + println!("!!!! post post exec {:?}", exec_result); + match pending_handle_ssts { + None => (), + Some(mut v) => { + if !v.is_empty() { + // The sst is not handled by any of the `pre_post` observers. + ctx.delete_ssts.append(&mut v); + } + } + } if let ApplyResult::Res(ref exec_result) = exec_result { match *exec_result { @@ -1387,6 +1402,7 @@ where } } + println!("!!!! R"); (resp, exec_result, should_write) } @@ -1441,7 +1457,9 @@ where // Include region for epoch not match after merge may cause key not in range. let include_region = req.get_header().get_region_epoch().get_version() >= self.last_merge_version; + println!("!!! exec 1"); check_region_epoch(req, &self.region, include_region)?; + println!("!!! exec 2"); if req.has_admin_request() { self.exec_admin_cmd(ctx, req) } else { @@ -1548,6 +1566,7 @@ where let exec_res = if !ranges.is_empty() { ApplyResult::Res(ExecResult::DeleteRange { ranges }) } else if !ssts.is_empty() { + println!("!!!! sst"); #[cfg(feature = "failpoints")] { let mut dont_delete_ingested_sst_fp = || { @@ -1557,7 +1576,6 @@ where }; dont_delete_ingested_sst_fp(); } - ctx.delete_ssts.append(&mut ssts.clone()); ApplyResult::Res(ExecResult::IngestSst { ssts }) } else { ApplyResult::None @@ -1747,6 +1765,7 @@ where PEER_WRITE_CMD_COUNTER.ingest_sst.inc(); let sst = req.get_ingest_sst().get_sst(); + println!("!!! s1"); if let Err(e) = check_sst_for_ingestion(sst, &self.region) { error!(?e; "ingest fail"; @@ -1760,6 +1779,7 @@ where return Err(e); } + println!("!!! s2"); match ctx.importer.validate(sst) { Ok(meta_info) => { ctx.pending_ssts.push(meta_info.clone()); @@ -1772,6 +1792,7 @@ where } }; + println!("!!! s3"); Ok(()) } } @@ -4943,6 +4964,10 @@ mod tests { cmd_sink: Option>>>, filter_compact_log: Arc, filter_consistency_check: Arc, + delay_remove_ssts: Arc, + last_delete_sst_count: Arc, + last_pending_clean_sst_count: Arc, + last_pending_handle_sst_count: Arc, } impl Coprocessor for ApplyObserver {} @@ -4955,6 +4980,47 @@ mod tests { fn post_apply_query(&self, _: &mut ObserverContext<'_>, _: &Cmd) { self.post_query_count.fetch_add(1, Ordering::SeqCst); } + + fn post_exec_query<'a>( + &self, + _: &mut ObserverContext<'_>, + _: &Cmd, + _: &RaftApplyState, + _: &RegionState, + apply_info: &mut ApplyCtxInfo<'a>, + ) -> bool { + match apply_info.pending_handle_ssts { + Some(v) => { + // If it is a ingest sst + let mut ssts = std::mem::take(v); + assert_ne!(ssts.len(), 0); + if self.delay_remove_ssts.load(Ordering::SeqCst) { + apply_info.pending_clean_ssts.append(&mut ssts); + } else { + apply_info.delete_ssts.append(&mut ssts); + } + } + None => (), + } + self.last_delete_sst_count + .store(apply_info.delete_ssts.len() as u64, Ordering::SeqCst); + self.last_pending_clean_sst_count + .store(apply_info.pending_clean_ssts.len() as u64, Ordering::SeqCst); + self.last_pending_handle_sst_count.store( + match apply_info.pending_handle_ssts { + Some(ref v) => v.len() as u64, + None => 0, + }, + Ordering::SeqCst, + ); + println!( + "!!!! Zzfsf {:?} {:?} {:?}", + self.last_delete_sst_count, + self.last_pending_clean_sst_count, + self.last_pending_handle_sst_count + ); + false + } } impl AdminObserver for ApplyObserver { @@ -4964,7 +5030,7 @@ mod tests { cmd: &Cmd, _: &RaftApplyState, region_state: &RegionState, - _: &ApplyCtxInfo<'a> + _: &mut ApplyCtxInfo<'a>, ) -> bool { let request = cmd.request.get_admin_request(); match request.get_cmd_type() { @@ -5633,11 +5699,13 @@ mod tests { #[test] fn test_exec_observer() { let (_path, engine) = create_tmp_engine("test-exec-observer"); - let (_import_dir, importer) = create_tmp_importer("test-exec-observer"); + let (import_dir, importer) = create_tmp_importer("test-exec-observer"); let mut host = CoprocessorHost::::default(); let obs = ApplyObserver::default(); host.registry .register_admin_observer(1, BoxAdminObserver::new(obs.clone())); + host.registry + .register_query_observer(1, BoxQueryObserver::new(obs.clone())); let (tx, rx) = mpsc::channel(); let (region_scheduler, _) = dummy_scheduler(); @@ -5651,7 +5719,7 @@ mod tests { sender, region_scheduler, coprocessor_host: host, - importer, + importer: importer.clone(), engine: engine.clone(), router: router.clone(), store_id: 1, @@ -5752,7 +5820,7 @@ mod tests { let apply_res = fetch_apply_res(&rx); assert_eq!(apply_res.apply_state.get_applied_index(), index_id); assert_eq!(apply_res.applied_index_term, 1); - let (_, r8) = if let ExecResult::SplitRegion { + let (r1, r8) = if let ExecResult::SplitRegion { regions, derived: _, new_split_regions: _, @@ -5783,6 +5851,47 @@ mod tests { .unwrap_or_default(); assert_eq!(apply_res.apply_state, state); + let r1_epoch = r1.get_region_epoch(); + index_id += 1; + let keys_count = 10; + println!("!!!!! zz {}", index_id); + + let mut kvs: Vec<(&[u8], &[u8])> = Vec::new(); + let mut keys: Vec = Vec::default(); + // for i in 0..keys_count { + // keys.push(format!("k{}", i)); + // } + // for i in 0..keys_count { + // kvs.push((keys.get(i).unwrap().as_bytes(), b"2")); + // } + kvs.push((b"k3", b"2")); + let sst_path = import_dir.path().join("test.sst"); + let (mut meta, data) = gen_sst_file_with_kvs(&sst_path, &kvs); + meta.set_region_id(1); + meta.set_region_epoch(r1_epoch.clone()); + let mut file = importer.create(&meta).unwrap(); + file.append(&data).unwrap(); + file.finish().unwrap(); + let src = sst_path.clone(); + let dst = file.get_import_path().save.to_str().unwrap(); + std::fs::copy(src.clone(), dst).unwrap(); + assert!(src.as_path().exists()); + println!("!!! cpy {:?} {:?}", src, dst); + let ingestsst = EntryBuilder::new(index_id, 1) + .ingest_sst(&meta) + .epoch(r1_epoch.get_conf_ver(), r1_epoch.get_version()) + .build(); + + obs.delay_remove_ssts.store(true, Ordering::SeqCst); + router.schedule_task(1, Msg::apply(apply(peer_id, 1, 1, vec![ingestsst], vec![]))); + let apply_res = fetch_apply_res(&rx); + let apply_res = fetch_apply_res(&rx); + assert_eq!(apply_res.exec_res.len(), 1); + assert_eq!(obs.last_pending_handle_sst_count.load(Ordering::SeqCst), 0); + assert_eq!(obs.last_delete_sst_count.load(Ordering::SeqCst), 0); + assert_eq!(obs.last_pending_clean_sst_count.load(Ordering::SeqCst), 1); + println!("!!!!! zzz {:?}", obs.last_pending_clean_sst_count); + system.shutdown(); } diff --git a/components/raftstore/src/store/util.rs b/components/raftstore/src/store/util.rs index 75c620ac12c..7f229e4813e 100644 --- a/components/raftstore/src/store/util.rs +++ b/components/raftstore/src/store/util.rs @@ -261,6 +261,10 @@ pub fn check_region_epoch( } let from_epoch = req.get_header().get_region_epoch(); + println!( + "!!!! check_region_epoch {:?} {:?} {:?} {:?} {:?}", + from_epoch, region, check_conf_ver, check_ver, include_region + ); compare_region_epoch( from_epoch, region, diff --git a/components/sst_importer/src/import_file.rs b/components/sst_importer/src/import_file.rs index 7c02b058d1e..17865041c35 100644 --- a/components/sst_importer/src/import_file.rs +++ b/components/sst_importer/src/import_file.rs @@ -167,6 +167,10 @@ impl ImportFile { } Ok(()) } + + pub fn get_import_path(&self) -> &ImportPath { + &self.path + } } impl fmt::Debug for ImportFile { From 78d02ece52ee941cf5bc486866f4e904cc9330f5 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 20 Jul 2022 10:45:51 +0800 Subject: [PATCH 03/27] remove comments Signed-off-by: CalvinNeo --- components/raftstore/src/store/fsm/apply.rs | 26 --------------------- components/raftstore/src/store/util.rs | 4 ---- 2 files changed, 30 deletions(-) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 1a475a70d16..3f621ae9f6b 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -1312,7 +1312,6 @@ where self.applied_index_term = term; let cmd = Cmd::new(index, term, req.clone(), resp.clone()); - println!("!!!! exec_result {:?}", exec_result); let (modified_region, mut pending_handle_ssts) = match exec_result { ApplyResult::Res(ref e) => match e { ExecResult::SplitRegion { ref derived, .. } => (Some(derived.clone()), None), @@ -1324,7 +1323,6 @@ where }, _ => (None, None), }; - println!("!!!! pending_handle_ssts {:?}", pending_handle_ssts); let mut apply_ctx_info = ApplyCtxInfo { pending_handle_ssts: &mut pending_handle_ssts, delete_ssts: &mut ctx.delete_ssts, @@ -1341,7 +1339,6 @@ where }, &mut apply_ctx_info, ); - println!("!!!! post post exec {:?}", exec_result); match pending_handle_ssts { None => (), Some(mut v) => { @@ -1402,7 +1399,6 @@ where } } - println!("!!!! R"); (resp, exec_result, should_write) } @@ -1457,9 +1453,7 @@ where // Include region for epoch not match after merge may cause key not in range. let include_region = req.get_header().get_region_epoch().get_version() >= self.last_merge_version; - println!("!!! exec 1"); check_region_epoch(req, &self.region, include_region)?; - println!("!!! exec 2"); if req.has_admin_request() { self.exec_admin_cmd(ctx, req) } else { @@ -1566,7 +1560,6 @@ where let exec_res = if !ranges.is_empty() { ApplyResult::Res(ExecResult::DeleteRange { ranges }) } else if !ssts.is_empty() { - println!("!!!! sst"); #[cfg(feature = "failpoints")] { let mut dont_delete_ingested_sst_fp = || { @@ -1765,7 +1758,6 @@ where PEER_WRITE_CMD_COUNTER.ingest_sst.inc(); let sst = req.get_ingest_sst().get_sst(); - println!("!!! s1"); if let Err(e) = check_sst_for_ingestion(sst, &self.region) { error!(?e; "ingest fail"; @@ -1779,7 +1771,6 @@ where return Err(e); } - println!("!!! s2"); match ctx.importer.validate(sst) { Ok(meta_info) => { ctx.pending_ssts.push(meta_info.clone()); @@ -1792,7 +1783,6 @@ where } }; - println!("!!! s3"); Ok(()) } } @@ -5013,12 +5003,6 @@ mod tests { }, Ordering::SeqCst, ); - println!( - "!!!! Zzfsf {:?} {:?} {:?}", - self.last_delete_sst_count, - self.last_pending_clean_sst_count, - self.last_pending_handle_sst_count - ); false } } @@ -5854,16 +5838,8 @@ mod tests { let r1_epoch = r1.get_region_epoch(); index_id += 1; let keys_count = 10; - println!("!!!!! zz {}", index_id); let mut kvs: Vec<(&[u8], &[u8])> = Vec::new(); - let mut keys: Vec = Vec::default(); - // for i in 0..keys_count { - // keys.push(format!("k{}", i)); - // } - // for i in 0..keys_count { - // kvs.push((keys.get(i).unwrap().as_bytes(), b"2")); - // } kvs.push((b"k3", b"2")); let sst_path = import_dir.path().join("test.sst"); let (mut meta, data) = gen_sst_file_with_kvs(&sst_path, &kvs); @@ -5876,7 +5852,6 @@ mod tests { let dst = file.get_import_path().save.to_str().unwrap(); std::fs::copy(src.clone(), dst).unwrap(); assert!(src.as_path().exists()); - println!("!!! cpy {:?} {:?}", src, dst); let ingestsst = EntryBuilder::new(index_id, 1) .ingest_sst(&meta) .epoch(r1_epoch.get_conf_ver(), r1_epoch.get_version()) @@ -5890,7 +5865,6 @@ mod tests { assert_eq!(obs.last_pending_handle_sst_count.load(Ordering::SeqCst), 0); assert_eq!(obs.last_delete_sst_count.load(Ordering::SeqCst), 0); assert_eq!(obs.last_pending_clean_sst_count.load(Ordering::SeqCst), 1); - println!("!!!!! zzz {:?}", obs.last_pending_clean_sst_count); system.shutdown(); } diff --git a/components/raftstore/src/store/util.rs b/components/raftstore/src/store/util.rs index 7f229e4813e..75c620ac12c 100644 --- a/components/raftstore/src/store/util.rs +++ b/components/raftstore/src/store/util.rs @@ -261,10 +261,6 @@ pub fn check_region_epoch( } let from_epoch = req.get_header().get_region_epoch(); - println!( - "!!!! check_region_epoch {:?} {:?} {:?} {:?} {:?}", - from_epoch, region, check_conf_ver, check_ver, include_region - ); compare_region_epoch( from_epoch, region, From b1d70ea78376bee104ef41113bd8cc3cf9c8b8e9 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 20 Jul 2022 11:39:05 +0800 Subject: [PATCH 04/27] finish tests Signed-off-by: CalvinNeo --- components/raftstore/src/coprocessor/mod.rs | 14 +++++++------ components/raftstore/src/store/fsm/apply.rs | 23 +++++++++++++++++---- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/components/raftstore/src/coprocessor/mod.rs b/components/raftstore/src/coprocessor/mod.rs index 847f1f5ddae..403cc145530 100644 --- a/components/raftstore/src/coprocessor/mod.rs +++ b/components/raftstore/src/coprocessor/mod.rs @@ -62,12 +62,6 @@ pub struct ObserverContext<'a> { pub bypass: bool, } -pub struct ApplyCtxInfo<'a> { - pub pending_handle_ssts: &'a mut Option>, - pub delete_ssts: &'a mut Vec, - pub pending_clean_ssts: &'a mut Vec, -} - impl<'a> ObserverContext<'a> { pub fn new(region: &Region) -> ObserverContext<'_> { ObserverContext { @@ -81,12 +75,20 @@ impl<'a> ObserverContext<'a> { } } +/// Context of a region provided for observers. pub struct RegionState { pub peer_id: u64, pub pending_remove: bool, pub modified_region: Option, } +/// Context for exec observers of mutation to be applied to ApplyContext. +pub struct ApplyCtxInfo<'a> { + pub pending_handle_ssts: &'a mut Option>, + pub delete_ssts: &'a mut Vec, + pub pending_clean_ssts: &'a mut Vec, +} + pub trait AdminObserver: Coprocessor { /// Hook to call before proposing admin request. fn pre_propose_admin(&self, _: &mut ObserverContext<'_>, _: &mut AdminRequest) -> Result<()> { diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 3f621ae9f6b..496d23f985d 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -5835,10 +5835,9 @@ mod tests { .unwrap_or_default(); assert_eq!(apply_res.apply_state, state); + // Phase 3: we test if we can delay deletion of some sst files. let r1_epoch = r1.get_region_epoch(); index_id += 1; - let keys_count = 10; - let mut kvs: Vec<(&[u8], &[u8])> = Vec::new(); kvs.push((b"k3", b"2")); let sst_path = import_dir.path().join("test.sst"); @@ -5858,14 +5857,30 @@ mod tests { .build(); obs.delay_remove_ssts.store(true, Ordering::SeqCst); - router.schedule_task(1, Msg::apply(apply(peer_id, 1, 1, vec![ingestsst], vec![]))); - let apply_res = fetch_apply_res(&rx); + router.schedule_task( + 1, + Msg::apply(apply(peer_id, 1, 1, vec![ingestsst.clone()], vec![])), + ); + fetch_apply_res(&rx); let apply_res = fetch_apply_res(&rx); assert_eq!(apply_res.exec_res.len(), 1); assert_eq!(obs.last_pending_handle_sst_count.load(Ordering::SeqCst), 0); assert_eq!(obs.last_delete_sst_count.load(Ordering::SeqCst), 0); assert_eq!(obs.last_pending_clean_sst_count.load(Ordering::SeqCst), 1); + index_id += 1; + let ingestsst = EntryBuilder::new(index_id, 1) + .ingest_sst(&meta) + .epoch(r1_epoch.get_conf_ver(), r1_epoch.get_version()) + .build(); + obs.delay_remove_ssts.store(false, Ordering::SeqCst); + router.schedule_task(1, Msg::apply(apply(peer_id, 1, 1, vec![ingestsst], vec![]))); + let apply_res = fetch_apply_res(&rx); + assert_eq!(apply_res.exec_res.len(), 1); + assert_eq!(obs.last_pending_handle_sst_count.load(Ordering::SeqCst), 0); + assert_eq!(obs.last_delete_sst_count.load(Ordering::SeqCst), 1); + assert_eq!(obs.last_pending_clean_sst_count.load(Ordering::SeqCst), 1); + system.shutdown(); } From e7267b9f8e559608e11c5e3c239cc168ed2a6419 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 20 Jul 2022 12:08:22 +0800 Subject: [PATCH 05/27] f Signed-off-by: CalvinNeo --- components/raftstore/src/store/fsm/apply.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 496d23f985d..7cf17b6fa86 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -5849,8 +5849,8 @@ mod tests { file.finish().unwrap(); let src = sst_path.clone(); let dst = file.get_import_path().save.to_str().unwrap(); - std::fs::copy(src.clone(), dst).unwrap(); - assert!(src.as_path().exists()); + std::fs::copy(src, dst).unwrap(); + assert!(sst_path.as_path().exists()); let ingestsst = EntryBuilder::new(index_id, 1) .ingest_sst(&meta) .epoch(r1_epoch.get_conf_ver(), r1_epoch.get_version()) @@ -5859,7 +5859,7 @@ mod tests { obs.delay_remove_ssts.store(true, Ordering::SeqCst); router.schedule_task( 1, - Msg::apply(apply(peer_id, 1, 1, vec![ingestsst.clone()], vec![])), + Msg::apply(apply(peer_id, 1, 1, vec![ingestsst], vec![])), ); fetch_apply_res(&rx); let apply_res = fetch_apply_res(&rx); From bd7c37c19b326a77eeae7693779af015f04ebe94 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 20 Jul 2022 12:57:27 +0800 Subject: [PATCH 06/27] f Signed-off-by: CalvinNeo --- components/raftstore/src/store/fsm/apply.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 7cf17b6fa86..7eeaa94574b 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -5857,10 +5857,7 @@ mod tests { .build(); obs.delay_remove_ssts.store(true, Ordering::SeqCst); - router.schedule_task( - 1, - Msg::apply(apply(peer_id, 1, 1, vec![ingestsst], vec![])), - ); + router.schedule_task(1, Msg::apply(apply(peer_id, 1, 1, vec![ingestsst], vec![]))); fetch_apply_res(&rx); let apply_res = fetch_apply_res(&rx); assert_eq!(apply_res.exec_res.len(), 1); From b414ab7c34433b68443313196121f2445852d619 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 20 Jul 2022 13:11:11 +0800 Subject: [PATCH 07/27] f Signed-off-by: CalvinNeo --- components/raftstore/src/store/fsm/apply.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 7eeaa94574b..e24456d35f7 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -5838,8 +5838,7 @@ mod tests { // Phase 3: we test if we can delay deletion of some sst files. let r1_epoch = r1.get_region_epoch(); index_id += 1; - let mut kvs: Vec<(&[u8], &[u8])> = Vec::new(); - kvs.push((b"k3", b"2")); + let mut kvs: Vec<(&[u8], &[u8])> = vec![(b"k3", b"2")]; let sst_path = import_dir.path().join("test.sst"); let (mut meta, data) = gen_sst_file_with_kvs(&sst_path, &kvs); meta.set_region_id(1); From 843b1d24b38d4caa2226f716184e721d08e0286d Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 20 Jul 2022 16:01:17 +0800 Subject: [PATCH 08/27] f Signed-off-by: CalvinNeo --- components/raftstore/src/store/fsm/apply.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index e24456d35f7..a4439da2bbf 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -5838,7 +5838,7 @@ mod tests { // Phase 3: we test if we can delay deletion of some sst files. let r1_epoch = r1.get_region_epoch(); index_id += 1; - let mut kvs: Vec<(&[u8], &[u8])> = vec![(b"k3", b"2")]; + let kvs: Vec<(&[u8], &[u8])> = vec![(b"k3", b"2")]; let sst_path = import_dir.path().join("test.sst"); let (mut meta, data) = gen_sst_file_with_kvs(&sst_path, &kvs); meta.set_region_id(1); From 7b4285cf15ac49c1ff6dc1128c0a2bdd3f45cb14 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 1 Aug 2022 12:04:01 +0800 Subject: [PATCH 09/27] f Signed-off-by: CalvinNeo --- components/raftstore/src/coprocessor/dispatcher.rs | 4 ++-- components/raftstore/src/store/fsm/apply.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index f95884fed77..5c4c1cccc16 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -446,13 +446,13 @@ impl CoprocessorHost { /// execution of the next command, including req/resp, apply state, /// modified region state, etc. Return true observers think a /// persistence is necessary. - pub fn post_exec<'a>( + pub fn post_exec( &self, region: &Region, cmd: &Cmd, apply_state: &RaftApplyState, region_state: &RegionState, - apply_ctx: &mut ApplyCtxInfo<'a>, + apply_ctx: &mut ApplyCtxInfo, ) -> bool { let mut ctx = ObserverContext::new(region); if !cmd.response.has_admin_response() { diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 66cd1b84722..904cf7d4f90 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -5846,7 +5846,7 @@ mod tests { let apply_res = fetch_apply_res(&rx); assert_eq!(apply_res.apply_state.get_applied_index(), index_id); assert_eq!(apply_res.applied_term, 1); - let (_, r8) = if let ExecResult::SplitRegion { + let (r1, r8) = if let ExecResult::SplitRegion { regions, derived: _, new_split_regions: _, From a61fe066cbb5dae76cb4a63742c80d9f73c82730 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 1 Aug 2022 12:55:09 +0800 Subject: [PATCH 10/27] f Signed-off-by: CalvinNeo --- components/raftstore/src/coprocessor/dispatcher.rs | 2 +- components/raftstore/src/coprocessor/mod.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index 5c4c1cccc16..f3c6878ccb0 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -452,7 +452,7 @@ impl CoprocessorHost { cmd: &Cmd, apply_state: &RaftApplyState, region_state: &RegionState, - apply_ctx: &mut ApplyCtxInfo, + apply_ctx: &mut ApplyCtxInfo<'_>, ) -> bool { let mut ctx = ObserverContext::new(region); if !cmd.response.has_admin_response() { diff --git a/components/raftstore/src/coprocessor/mod.rs b/components/raftstore/src/coprocessor/mod.rs index 993d50722b9..4c6b68727e1 100644 --- a/components/raftstore/src/coprocessor/mod.rs +++ b/components/raftstore/src/coprocessor/mod.rs @@ -117,13 +117,13 @@ pub trait AdminObserver: Coprocessor { /// Hook to call immediately after exec command /// Will be a special persistence after this exec if a observer returns /// true. - fn post_exec_admin<'a>( + fn post_exec_admin( &self, _: &mut ObserverContext<'_>, _: &Cmd, _: &RaftApplyState, _: &RegionState, - _: &mut ApplyCtxInfo<'a>, + _: &mut ApplyCtxInfo<'_>, ) -> bool { false } @@ -157,13 +157,13 @@ pub trait QueryObserver: Coprocessor { /// Hook to call immediately after exec command. /// Will be a special persistence after this exec if a observer returns /// true. - fn post_exec_query<'a>( + fn post_exec_query( &self, _: &mut ObserverContext<'_>, _: &Cmd, _: &RaftApplyState, _: &RegionState, - _: &mut ApplyCtxInfo<'a>, + _: &mut ApplyCtxInfo<'_>, ) -> bool { false } From 3a9d36a55c3ef7dfd4d6f8b90d75b104a0794900 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 4 Aug 2022 22:33:54 +0800 Subject: [PATCH 11/27] f Signed-off-by: CalvinNeo --- components/raftstore/src/store/fsm/apply.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 904cf7d4f90..04ffc2d7611 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -5005,13 +5005,13 @@ mod tests { self.post_query_count.fetch_add(1, Ordering::SeqCst); } - fn post_exec_query<'a>( + fn post_exec_query( &self, _: &mut ObserverContext<'_>, _: &Cmd, _: &RaftApplyState, _: &RegionState, - apply_info: &mut ApplyCtxInfo<'a>, + apply_info: &mut ApplyCtxInfo<'_>, ) -> bool { match apply_info.pending_handle_ssts { Some(v) => { @@ -5042,7 +5042,7 @@ mod tests { } impl AdminObserver for ApplyObserver { - fn post_exec_admin<'a>( + fn post_exec_admin( &self, _: &mut ObserverContext<'_>, cmd: &Cmd, From 4bc4d38b8ae8bab3155e0656d7109cf715a6e1af Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 5 Aug 2022 10:01:42 +0800 Subject: [PATCH 12/27] f Signed-off-by: CalvinNeo --- components/raftstore/src/store/fsm/apply.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 04ffc2d7611..5de1522a8b2 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -5048,7 +5048,7 @@ mod tests { cmd: &Cmd, _: &RaftApplyState, region_state: &RegionState, - _: &mut ApplyCtxInfo<'a>, + _: &mut ApplyCtxInfo, ) -> bool { let request = cmd.request.get_admin_request(); match request.get_cmd_type() { From 1d070acadebe75c6eb3c8e7b29dc4304b9fad882 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 5 Aug 2022 10:42:34 +0800 Subject: [PATCH 13/27] f Signed-off-by: CalvinNeo --- components/raftstore/src/store/fsm/apply.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 5de1522a8b2..295098cd2b8 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -5048,7 +5048,7 @@ mod tests { cmd: &Cmd, _: &RaftApplyState, region_state: &RegionState, - _: &mut ApplyCtxInfo, + _: &mut ApplyCtxInfo<'_>, ) -> bool { let request = cmd.request.get_admin_request(); match request.get_cmd_type() { From 28113c589bfde1bf3e9ba51cad5f9e52fa6041fa Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Tue, 9 Aug 2022 19:21:43 +0800 Subject: [PATCH 14/27] add test Signed-off-by: CalvinNeo --- .../raftstore/src/coprocessor/dispatcher.rs | 40 +++++++++++++++++++ components/raftstore/src/coprocessor/mod.rs | 1 + 2 files changed, 41 insertions(+) diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index f3c6878ccb0..17bd601d247 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -678,6 +678,19 @@ mod tests { ctx.bypass = self.bypass.load(Ordering::SeqCst); false } + + fn post_exec_admin( + &self, + ctx: &mut ObserverContext<'_>, + _: &Cmd, + _: &RaftApplyState, + _: &RegionState, + _: &mut ApplyCtxInfo<'_>, + ) -> bool { + self.called.fetch_add(18, Ordering::SeqCst); + ctx.bypass = self.bypass.load(Ordering::SeqCst); + false + } } impl QueryObserver for TestCoprocessor { @@ -720,6 +733,19 @@ mod tests { self.called.fetch_add(14, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } + + fn post_exec_query( + &self, + ctx: &mut ObserverContext<'_>, + _: &Cmd, + _: &RaftApplyState, + _: &RegionState, + _: &mut ApplyCtxInfo<'_>, + ) -> bool { + self.called.fetch_add(17, Ordering::SeqCst); + ctx.bypass = self.bypass.load(Ordering::SeqCst); + false + } } impl RoleObserver for TestCoprocessor { @@ -860,6 +886,20 @@ mod tests { admin_req.set_admin_request(AdminRequest::default()); host.pre_exec(®ion, &admin_req, 0, 0); assert_all!([&ob.called], &[119]); // 16 + + let mut pending_handle_ssts = None; + let mut delete_ssts = vec![]; + let mut pending_clean_ssts = vec![]; + let mut info = ApplyCtxInfo { + pending_handle_ssts: &mut pending_handle_ssts, + pending_clean_ssts: &mut pending_clean_ssts, + delete_ssts: &mut delete_ssts, + }; + let apply_state = RaftApplyState::default(); + let region_state = RegionState::default(); + let cmd = Cmd::default(); + host.post_exec(®ion, &cmd, &apply_state, ®ion_state, &mut info); + assert_all!([&ob.called], &[136]); // 17 } #[test] diff --git a/components/raftstore/src/coprocessor/mod.rs b/components/raftstore/src/coprocessor/mod.rs index 4c6b68727e1..3f25458237e 100644 --- a/components/raftstore/src/coprocessor/mod.rs +++ b/components/raftstore/src/coprocessor/mod.rs @@ -76,6 +76,7 @@ impl<'a> ObserverContext<'a> { } /// Context of a region provided for observers. +#[derive(Default, Clone)] pub struct RegionState { pub peer_id: u64, pub pending_remove: bool, From 5828b38841441caa4740e090c6d4f35ec1424f8a Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 12 Aug 2022 10:33:55 +0800 Subject: [PATCH 15/27] remove debug log Signed-off-by: CalvinNeo --- components/raftstore/src/store/fsm/apply.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 9d61f3cde46..f1a304d2cf0 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -1251,7 +1251,6 @@ where .applied_batch .push(cmd_cb, cmd, &self.observe_info, self.region_id()); if should_write { - debug!("persist data and apply state"; "region_id" => self.region_id(), "peer_id" => self.id(), "state" => ?self.apply_state); apply_ctx.commit(self); } exec_result From a30af8cb41a90210594bfccc5102570df86ddfe3 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 12 Aug 2022 16:28:31 +0800 Subject: [PATCH 16/27] rename pending_clean_ssts to pending_delete_ssts Signed-off-by: CalvinNeo --- components/raftstore/src/coprocessor/dispatcher.rs | 4 ++-- components/raftstore/src/coprocessor/mod.rs | 2 +- components/raftstore/src/store/fsm/apply.rs | 12 ++++++------ 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index 07282d255a0..193182a8e84 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -916,10 +916,10 @@ mod tests { let mut pending_handle_ssts = None; let mut delete_ssts = vec![]; - let mut pending_clean_ssts = vec![]; + let mut pending_delete_ssts = vec![]; let mut info = ApplyCtxInfo { pending_handle_ssts: &mut pending_handle_ssts, - pending_clean_ssts: &mut pending_clean_ssts, + pending_delete_ssts: &mut pending_delete_ssts, delete_ssts: &mut delete_ssts, }; let apply_state = RaftApplyState::default(); diff --git a/components/raftstore/src/coprocessor/mod.rs b/components/raftstore/src/coprocessor/mod.rs index 8d7f52dc31a..fcbfcfc98ff 100644 --- a/components/raftstore/src/coprocessor/mod.rs +++ b/components/raftstore/src/coprocessor/mod.rs @@ -87,7 +87,7 @@ pub struct RegionState { pub struct ApplyCtxInfo<'a> { pub pending_handle_ssts: &'a mut Option>, pub delete_ssts: &'a mut Vec, - pub pending_clean_ssts: &'a mut Vec, + pub pending_delete_ssts: &'a mut Vec, } pub trait AdminObserver: Coprocessor { diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index f1a304d2cf0..da6d790f21e 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -410,9 +410,9 @@ where delete_ssts: Vec, /// A self-defined engine may be slow to ingest ssts. - /// It may move some elements of `delete_ssts` into `pending_clean_ssts` to + /// It may move some elements of `delete_ssts` into `pending_delete_ssts` to /// delay deletion. Otherwise we may lost data. - pending_clean_ssts: Vec, + pending_delete_ssts: Vec, /// The priority of this Handler. priority: Priority, @@ -471,7 +471,7 @@ where perf_context: engine.get_perf_context(cfg.perf_level, PerfContextKind::RaftstoreApply), yield_duration: cfg.apply_yield_duration.0, delete_ssts: vec![], - pending_clean_ssts: vec![], + pending_delete_ssts: vec![], store_id, pending_create_peers, priority, @@ -1343,7 +1343,7 @@ where let mut apply_ctx_info = ApplyCtxInfo { pending_handle_ssts: &mut pending_handle_ssts, delete_ssts: &mut ctx.delete_ssts, - pending_clean_ssts: &mut ctx.pending_clean_ssts, + pending_delete_ssts: &mut ctx.pending_delete_ssts, }; let should_write = ctx.host.post_exec( &self.region, @@ -5018,7 +5018,7 @@ mod tests { let mut ssts = std::mem::take(v); assert_ne!(ssts.len(), 0); if self.delay_remove_ssts.load(Ordering::SeqCst) { - apply_info.pending_clean_ssts.append(&mut ssts); + apply_info.pending_delete_ssts.append(&mut ssts); } else { apply_info.delete_ssts.append(&mut ssts); } @@ -5028,7 +5028,7 @@ mod tests { self.last_delete_sst_count .store(apply_info.delete_ssts.len() as u64, Ordering::SeqCst); self.last_pending_clean_sst_count - .store(apply_info.pending_clean_ssts.len() as u64, Ordering::SeqCst); + .store(apply_info.pending_delete_ssts.len() as u64, Ordering::SeqCst); self.last_pending_handle_sst_count.store( match apply_info.pending_handle_ssts { Some(ref v) => v.len() as u64, From f49b30bf52357a0f8e70f2eb3b1bb7cb004ba609 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 12 Aug 2022 16:54:49 +0800 Subject: [PATCH 17/27] fmt Signed-off-by: CalvinNeo --- components/raftstore/src/store/fsm/apply.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index da6d790f21e..439e0907682 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -5027,8 +5027,10 @@ mod tests { } self.last_delete_sst_count .store(apply_info.delete_ssts.len() as u64, Ordering::SeqCst); - self.last_pending_clean_sst_count - .store(apply_info.pending_delete_ssts.len() as u64, Ordering::SeqCst); + self.last_pending_clean_sst_count.store( + apply_info.pending_delete_ssts.len() as u64, + Ordering::SeqCst, + ); self.last_pending_handle_sst_count.store( match apply_info.pending_handle_ssts { Some(ref v) => v.len() as u64, From 505f6a1c0f6d78297579af387d576784cabc7954 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 15 Aug 2022 13:26:29 +0800 Subject: [PATCH 18/27] c Signed-off-by: CalvinNeo --- components/raftstore/src/store/fsm/apply.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 439e0907682..0e4f72ce6df 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -1360,7 +1360,10 @@ where None => (), Some(mut v) => { if !v.is_empty() { - // The sst is not handled by any of the `pre_post` observers. + // All elements in `pending_handle_ssts` should be moved into either + // `delete_ssts` or `pending_delete_ssts`, once handled by by any of the + // `post_exec` observers. So a non-empty + // `pending_handle_ssts` here indicates no `post_exec` handled. ctx.delete_ssts.append(&mut v); } } From 31e89788c450242833e6e74035d8807fde00c87f Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Tue, 16 Aug 2022 01:42:30 +0800 Subject: [PATCH 19/27] fmt Signed-off-by: CalvinNeo --- components/raftstore/src/store/fsm/apply.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 0e4f72ce6df..18034d35ea6 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -4992,7 +4992,7 @@ mod tests { filter_consistency_check: Arc, delay_remove_ssts: Arc, last_delete_sst_count: Arc, - last_pending_clean_sst_count: Arc, + last_pending_delete_sst_count: Arc, last_pending_handle_sst_count: Arc, } @@ -5030,7 +5030,7 @@ mod tests { } self.last_delete_sst_count .store(apply_info.delete_ssts.len() as u64, Ordering::SeqCst); - self.last_pending_clean_sst_count.store( + self.last_pending_delete_sst_count.store( apply_info.pending_delete_ssts.len() as u64, Ordering::SeqCst, ); @@ -5908,7 +5908,7 @@ mod tests { assert_eq!(apply_res.exec_res.len(), 1); assert_eq!(obs.last_pending_handle_sst_count.load(Ordering::SeqCst), 0); assert_eq!(obs.last_delete_sst_count.load(Ordering::SeqCst), 0); - assert_eq!(obs.last_pending_clean_sst_count.load(Ordering::SeqCst), 1); + assert_eq!(obs.last_pending_delete_sst_count.load(Ordering::SeqCst), 1); index_id += 1; let ingestsst = EntryBuilder::new(index_id, 1) @@ -5921,7 +5921,7 @@ mod tests { assert_eq!(apply_res.exec_res.len(), 1); assert_eq!(obs.last_pending_handle_sst_count.load(Ordering::SeqCst), 0); assert_eq!(obs.last_delete_sst_count.load(Ordering::SeqCst), 1); - assert_eq!(obs.last_pending_clean_sst_count.load(Ordering::SeqCst), 1); + assert_eq!(obs.last_pending_delete_sst_count.load(Ordering::SeqCst), 1); system.shutdown(); } From 0aef69e585f924704000fb80452bc6c5414147ca Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Tue, 16 Aug 2022 12:51:27 +0800 Subject: [PATCH 20/27] add metric Signed-off-by: CalvinNeo --- components/raftstore/src/store/fsm/apply.rs | 3 +++ components/raftstore/src/store/metrics.rs | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 58f1c5dc3b3..370146c6b9e 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -1366,6 +1366,9 @@ where // `pending_handle_ssts` here indicates no `post_exec` handled. ctx.delete_ssts.append(&mut v); } + RAFT_APPLYING_SST_GAUGE + .with_label_values(&["pending_delete"]) + .set(ctx.pending_delete_ssts.len()); } } diff --git a/components/raftstore/src/store/metrics.rs b/components/raftstore/src/store/metrics.rs index 9691d5be0db..587b9ad3af7 100644 --- a/components/raftstore/src/store/metrics.rs +++ b/components/raftstore/src/store/metrics.rs @@ -777,4 +777,10 @@ lazy_static! { .unwrap(); pub static ref RAFT_LOG_GC_SKIPPED: RaftLogGcSkippedVec = auto_flush_from!(RAFT_LOG_GC_SKIPPED_VEC, RaftLogGcSkippedVec); + + pub static ref RAFT_APPLYING_SST_GAUGE: IntGaugeVec = register_int_gauge_vec!( + "tikv_raft_applying_sst", + "Sum of applying sst.", + &["type"] + ).unwrap(); } From 8a24650f958d557e055913dcdaabeb60d81a866c Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Tue, 16 Aug 2022 13:06:43 +0800 Subject: [PATCH 21/27] re test Signed-off-by: CalvinNeo --- .../raftstore/src/coprocessor/dispatcher.rs | 90 ++++++++++++++----- 1 file changed, 67 insertions(+), 23 deletions(-) diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index 193182a8e84..94fd25c395e 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -657,6 +657,26 @@ mod tests { return_err: Arc, } + enum ObserverIndex { + PreProposeAdmin = 1, + PreApplyAdmin = 2, + PostApplyAdmin = 3, + PreProposeQuery = 4, + PreApplyQuery = 5, + PostApplyQuery = 6, + OnRoleChange = 7, + OnRegionChanged = 8, + ApplyPlainKvs = 9, + ApplySst = 10, + OnFlushAppliedCmdBatch = 13, + OnEmptyCmd = 14, + PreExecQuery = 15, + PreExecAdmin = 16, + PostExecQuery = 17, + PostExecAdmin = 18, + OnComputeEngineSize = 19, + } + impl Coprocessor for TestCoprocessor {} impl AdminObserver for TestCoprocessor { @@ -665,7 +685,8 @@ mod tests { ctx: &mut ObserverContext<'_>, _: &mut AdminRequest, ) -> Result<()> { - self.called.fetch_add(1, Ordering::SeqCst); + self.called + .fetch_add(ObserverIndex::PreApplyAdmin, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); if self.return_err.load(Ordering::SeqCst) { return Err(box_err!("error")); @@ -674,12 +695,14 @@ mod tests { } fn pre_apply_admin(&self, ctx: &mut ObserverContext<'_>, _: &AdminRequest) { - self.called.fetch_add(2, Ordering::SeqCst); + self.called + .fetch_add(ObserverIndex::PreApplyAdmin, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } fn post_apply_admin(&self, ctx: &mut ObserverContext<'_>, _: &AdminResponse) { - self.called.fetch_add(3, Ordering::SeqCst); + self.called + .fetch_add(ObserverIndex::PostApplyAdmin, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } @@ -690,7 +713,8 @@ mod tests { _: u64, _: u64, ) -> bool { - self.called.fetch_add(16, Ordering::SeqCst); + self.called + .fetch_add(ObserverIndex::PreExecAdmin, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); false } @@ -703,7 +727,8 @@ mod tests { _: &RegionState, _: &mut ApplyCtxInfo<'_>, ) -> bool { - self.called.fetch_add(18, Ordering::SeqCst); + self.called + .fetch_add(ObserverIndex::PostExecAdmin, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); false } @@ -715,7 +740,8 @@ mod tests { ctx: &mut ObserverContext<'_>, _: &mut Vec, ) -> Result<()> { - self.called.fetch_add(4, Ordering::SeqCst); + self.called + .fetch_add(ObserverIndex::PreProposeQuery, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); if self.return_err.load(Ordering::SeqCst) { return Err(box_err!("error")); @@ -724,12 +750,14 @@ mod tests { } fn pre_apply_query(&self, ctx: &mut ObserverContext<'_>, _: &[Request]) { - self.called.fetch_add(5, Ordering::SeqCst); + self.called + .fetch_add(ObserverIndex::PreApplyQuery, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } fn post_apply_query(&self, ctx: &mut ObserverContext<'_>, _: &Cmd) { - self.called.fetch_add(6, Ordering::SeqCst); + self.called + .fetch_add(ObserverIndex::PostApplyQuery, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } @@ -740,13 +768,15 @@ mod tests { _: u64, _: u64, ) -> bool { - self.called.fetch_add(15, Ordering::SeqCst); + self.called + .fetch_add(ObserverIndex::PreExecQuery, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); false } fn on_empty_cmd(&self, ctx: &mut ObserverContext<'_>, _index: u64, _term: u64) { - self.called.fetch_add(14, Ordering::SeqCst); + self.called + .fetch_add(ObserverIndex::OnEmptyCmd, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } @@ -758,7 +788,8 @@ mod tests { _: &RegionState, _: &mut ApplyCtxInfo<'_>, ) -> bool { - self.called.fetch_add(17, Ordering::SeqCst); + self.called + .fetch_add(ObserverIndex::PostExecQuery, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); false } @@ -766,13 +797,15 @@ mod tests { impl PdTaskObserver for TestCoprocessor { fn on_compute_engine_size(&self, _: &mut Option) { - self.called.fetch_add(19, Ordering::SeqCst); + self.called + .fetch_add(ObserverIndex::OnComputeEngineSize, Ordering::SeqCst); } } impl RoleObserver for TestCoprocessor { fn on_role_change(&self, ctx: &mut ObserverContext<'_>, _: &RoleChange) { - self.called.fetch_add(7, Ordering::SeqCst); + self.called + .fetch_add(ObserverIndex::OnRoleChange, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } } @@ -784,7 +817,8 @@ mod tests { _: RegionChangeEvent, _: StateRole, ) { - self.called.fetch_add(8, Ordering::SeqCst); + self.called + .fetch_add(ObserverIndex::OnRegionChanged, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } } @@ -796,12 +830,14 @@ mod tests { _: CfName, _: &[(Vec, Vec)], ) { - self.called.fetch_add(9, Ordering::SeqCst); + self.called + .fetch_add(ObserverIndex::ApplyPlainKvs, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } fn apply_sst(&self, ctx: &mut ObserverContext<'_>, _: CfName, _: &str) { - self.called.fetch_add(10, Ordering::SeqCst); + self.called + .fetch_add(ObserverIndex::ApplySst, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } } @@ -813,7 +849,8 @@ mod tests { _: &mut Vec, _: &PanicEngine, ) { - self.called.fetch_add(13, Ordering::SeqCst); + self.called + .fetch_add(ObserverIndex::OnFlushAppliedCmdBatch, Ordering::SeqCst); } fn on_applied_current_term(&self, _: StateRole, _: &Region) {} } @@ -885,6 +922,7 @@ mod tests { host.post_apply_sst_from_snapshot(®ion, "default", ""); assert_all!([&ob.called], &[55]); + let mut index = 55; let observe_info = CmdObserveInfo::from_handle( ObserveHandle::new(), ObserveHandle::new(), @@ -893,26 +931,31 @@ mod tests { let mut cb = CmdBatch::new(&observe_info, 0); cb.push(&observe_info, 0, Cmd::default()); host.on_flush_applied_cmd_batch(cb.level, vec![cb], &PanicEngine); - // `post_apply` + `on_flush_applied_cmd_batch` => 13 + 6 = 19 + index += ObserverIndex::PostApplyQuery; + index += ObserverIndex::OnFlushAppliedCmdBatch; assert_all!([&ob.called], &[74]); let mut empty_req = RaftCmdRequest::default(); empty_req.set_requests(vec![Request::default()].into()); host.on_empty_cmd(®ion, 0, 0); - assert_all!([&ob.called], &[88]); // 14 + index += ObserverIndex::OnEmptyCmd; + assert_all!([&ob.called], &[index]); let mut query_req = RaftCmdRequest::default(); query_req.set_requests(vec![Request::default()].into()); host.pre_exec(®ion, &query_req, 0, 0); - assert_all!([&ob.called], &[103]); // 15 + index += ObserverIndex::PreExecQuery; + assert_all!([&ob.called], &[103]); let mut admin_req = RaftCmdRequest::default(); admin_req.set_admin_request(AdminRequest::default()); host.pre_exec(®ion, &admin_req, 0, 0); - assert_all!([&ob.called], &[119]); // 16 + index += ObserverIndex::PreExecAdmin; + assert_all!([&ob.called], &[119]); host.on_compute_engine_size(); - assert_all!([&ob.called], &[138]); // 19 + index += ObserverIndex::OnComputeEngineSize; + assert_all!([&ob.called], &[138]); let mut pending_handle_ssts = None; let mut delete_ssts = vec![]; @@ -926,7 +969,8 @@ mod tests { let region_state = RegionState::default(); let cmd = Cmd::default(); host.post_exec(®ion, &cmd, &apply_state, ®ion_state, &mut info); - assert_all!([&ob.called], &[155]); // 17 + index += ObserverIndex::PostExecQuery; + assert_all!([&ob.called], &[155]); } #[test] From 4d8633d069b76448bae3bf37b1eac0d7a7bca475 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Tue, 16 Aug 2022 13:08:40 +0800 Subject: [PATCH 22/27] f Signed-off-by: CalvinNeo --- .../raftstore/src/coprocessor/dispatcher.rs | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index 94fd25c395e..37756b233fb 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -911,18 +911,22 @@ mod tests { host.post_apply(®ion, &Cmd::new(0, 0, query_req, query_resp)); assert_all!([&ob.called], &[21]); + let mut index = 21; host.on_role_change(®ion, RoleChange::new(StateRole::Leader)); - assert_all!([&ob.called], &[28]); + index += ObserverIndex::OnRoleChange; + assert_all!([&ob.called], &[index]); host.on_region_changed(®ion, RegionChangeEvent::Create, StateRole::Follower); - assert_all!([&ob.called], &[36]); + index += ObserverIndex::OnRegionChanged; + assert_all!([&ob.called], &[index]); host.post_apply_plain_kvs_from_snapshot(®ion, "default", &[]); - assert_all!([&ob.called], &[45]); + index += ObserverIndex::ApplyPlainKvs; + assert_all!([&ob.called], &[index]); host.post_apply_sst_from_snapshot(®ion, "default", ""); - assert_all!([&ob.called], &[55]); + index += ObserverIndex::ApplySst; + assert_all!([&ob.called], &[index]); - let mut index = 55; let observe_info = CmdObserveInfo::from_handle( ObserveHandle::new(), ObserveHandle::new(), @@ -933,7 +937,7 @@ mod tests { host.on_flush_applied_cmd_batch(cb.level, vec![cb], &PanicEngine); index += ObserverIndex::PostApplyQuery; index += ObserverIndex::OnFlushAppliedCmdBatch; - assert_all!([&ob.called], &[74]); + assert_all!([&ob.called], &[index]); let mut empty_req = RaftCmdRequest::default(); empty_req.set_requests(vec![Request::default()].into()); @@ -945,17 +949,17 @@ mod tests { query_req.set_requests(vec![Request::default()].into()); host.pre_exec(®ion, &query_req, 0, 0); index += ObserverIndex::PreExecQuery; - assert_all!([&ob.called], &[103]); + assert_all!([&ob.called], &[index]); let mut admin_req = RaftCmdRequest::default(); admin_req.set_admin_request(AdminRequest::default()); host.pre_exec(®ion, &admin_req, 0, 0); index += ObserverIndex::PreExecAdmin; - assert_all!([&ob.called], &[119]); + assert_all!([&ob.called], &[index]); host.on_compute_engine_size(); index += ObserverIndex::OnComputeEngineSize; - assert_all!([&ob.called], &[138]); + assert_all!([&ob.called], &[index]); let mut pending_handle_ssts = None; let mut delete_ssts = vec![]; @@ -970,7 +974,7 @@ mod tests { let cmd = Cmd::default(); host.post_exec(®ion, &cmd, &apply_state, ®ion_state, &mut info); index += ObserverIndex::PostExecQuery; - assert_all!([&ob.called], &[155]); + assert_all!([&ob.called], &[index]); } #[test] From b846e919ac3bd0b26ed2c2247467c00301e39513 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Tue, 16 Aug 2022 13:11:30 +0800 Subject: [PATCH 23/27] f Signed-off-by: CalvinNeo --- .../raftstore/src/coprocessor/dispatcher.rs | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index 37756b233fb..0c2e67e4025 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -889,29 +889,36 @@ mod tests { .register_region_change_observer(1, BoxRegionChangeObserver::new(ob.clone())); host.registry .register_cmd_observer(1, BoxCmdObserver::new(ob.clone())); + + let mut index = 0; let region = Region::default(); let mut admin_req = RaftCmdRequest::default(); admin_req.set_admin_request(AdminRequest::default()); host.pre_propose(®ion, &mut admin_req).unwrap(); - assert_all!([&ob.called], &[1]); + index += ObserverIndex::PreProposeAdmin; + assert_all!([&ob.called], &[index]); host.pre_apply(®ion, &admin_req); - assert_all!([&ob.called], &[3]); + index += ObserverIndex::PreApplyAdmin; + assert_all!([&ob.called], &[index]); let mut admin_resp = RaftCmdResponse::default(); admin_resp.set_admin_response(AdminResponse::default()); host.post_apply(®ion, &Cmd::new(0, 0, admin_req, admin_resp)); - assert_all!([&ob.called], &[6]); + index += ObserverIndex::PostApplyAdmin; + assert_all!([&ob.called], &[index]); let mut query_req = RaftCmdRequest::default(); query_req.set_requests(vec![Request::default()].into()); host.pre_propose(®ion, &mut query_req).unwrap(); - assert_all!([&ob.called], &[10]); + index += ObserverIndex::PreProposeQuery; + assert_all!([&ob.called], &[index]); + index += ObserverIndex::PreApplyQuery; host.pre_apply(®ion, &query_req); - assert_all!([&ob.called], &[15]); + assert_all!([&ob.called], &[index]); let query_resp = RaftCmdResponse::default(); host.post_apply(®ion, &Cmd::new(0, 0, query_req, query_resp)); - assert_all!([&ob.called], &[21]); + index += ObserverIndex::PostApplyQuery; + assert_all!([&ob.called], &[index]); - let mut index = 21; host.on_role_change(®ion, RoleChange::new(StateRole::Leader)); index += ObserverIndex::OnRoleChange; assert_all!([&ob.called], &[index]); From fdea78ca23aad0553bc95857549d60f7e20dbabc Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Tue, 16 Aug 2022 13:49:56 +0800 Subject: [PATCH 24/27] f Signed-off-by: CalvinNeo --- .../raftstore/src/coprocessor/dispatcher.rs | 78 ++++++++++--------- components/raftstore/src/store/fsm/apply.rs | 2 +- 2 files changed, 42 insertions(+), 38 deletions(-) diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index 0c2e67e4025..3b051a7f412 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -686,7 +686,7 @@ mod tests { _: &mut AdminRequest, ) -> Result<()> { self.called - .fetch_add(ObserverIndex::PreApplyAdmin, Ordering::SeqCst); + .fetch_add(ObserverIndex::PreApplyAdmin as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); if self.return_err.load(Ordering::SeqCst) { return Err(box_err!("error")); @@ -696,13 +696,13 @@ mod tests { fn pre_apply_admin(&self, ctx: &mut ObserverContext<'_>, _: &AdminRequest) { self.called - .fetch_add(ObserverIndex::PreApplyAdmin, Ordering::SeqCst); + .fetch_add(ObserverIndex::PreApplyAdmin as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } fn post_apply_admin(&self, ctx: &mut ObserverContext<'_>, _: &AdminResponse) { self.called - .fetch_add(ObserverIndex::PostApplyAdmin, Ordering::SeqCst); + .fetch_add(ObserverIndex::PostApplyAdmin as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } @@ -714,7 +714,7 @@ mod tests { _: u64, ) -> bool { self.called - .fetch_add(ObserverIndex::PreExecAdmin, Ordering::SeqCst); + .fetch_add(ObserverIndex::PreExecAdmin as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); false } @@ -728,7 +728,7 @@ mod tests { _: &mut ApplyCtxInfo<'_>, ) -> bool { self.called - .fetch_add(ObserverIndex::PostExecAdmin, Ordering::SeqCst); + .fetch_add(ObserverIndex::PostExecAdmin as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); false } @@ -741,7 +741,7 @@ mod tests { _: &mut Vec, ) -> Result<()> { self.called - .fetch_add(ObserverIndex::PreProposeQuery, Ordering::SeqCst); + .fetch_add(ObserverIndex::PreProposeQuery as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); if self.return_err.load(Ordering::SeqCst) { return Err(box_err!("error")); @@ -751,13 +751,13 @@ mod tests { fn pre_apply_query(&self, ctx: &mut ObserverContext<'_>, _: &[Request]) { self.called - .fetch_add(ObserverIndex::PreApplyQuery, Ordering::SeqCst); + .fetch_add(ObserverIndex::PreApplyQuery as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } fn post_apply_query(&self, ctx: &mut ObserverContext<'_>, _: &Cmd) { self.called - .fetch_add(ObserverIndex::PostApplyQuery, Ordering::SeqCst); + .fetch_add(ObserverIndex::PostApplyQuery as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } @@ -769,14 +769,14 @@ mod tests { _: u64, ) -> bool { self.called - .fetch_add(ObserverIndex::PreExecQuery, Ordering::SeqCst); + .fetch_add(ObserverIndex::PreExecQuery as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); false } fn on_empty_cmd(&self, ctx: &mut ObserverContext<'_>, _index: u64, _term: u64) { self.called - .fetch_add(ObserverIndex::OnEmptyCmd, Ordering::SeqCst); + .fetch_add(ObserverIndex::OnEmptyCmd as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } @@ -789,7 +789,7 @@ mod tests { _: &mut ApplyCtxInfo<'_>, ) -> bool { self.called - .fetch_add(ObserverIndex::PostExecQuery, Ordering::SeqCst); + .fetch_add(ObserverIndex::PostExecQuery as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); false } @@ -797,15 +797,17 @@ mod tests { impl PdTaskObserver for TestCoprocessor { fn on_compute_engine_size(&self, _: &mut Option) { - self.called - .fetch_add(ObserverIndex::OnComputeEngineSize, Ordering::SeqCst); + self.called.fetch_add( + ObserverIndex::OnComputeEngineSize as usize, + Ordering::SeqCst, + ); } } impl RoleObserver for TestCoprocessor { fn on_role_change(&self, ctx: &mut ObserverContext<'_>, _: &RoleChange) { self.called - .fetch_add(ObserverIndex::OnRoleChange, Ordering::SeqCst); + .fetch_add(ObserverIndex::OnRoleChange as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } } @@ -818,7 +820,7 @@ mod tests { _: StateRole, ) { self.called - .fetch_add(ObserverIndex::OnRegionChanged, Ordering::SeqCst); + .fetch_add(ObserverIndex::OnRegionChanged as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } } @@ -831,13 +833,13 @@ mod tests { _: &[(Vec, Vec)], ) { self.called - .fetch_add(ObserverIndex::ApplyPlainKvs, Ordering::SeqCst); + .fetch_add(ObserverIndex::ApplyPlainKvs as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } fn apply_sst(&self, ctx: &mut ObserverContext<'_>, _: CfName, _: &str) { self.called - .fetch_add(ObserverIndex::ApplySst, Ordering::SeqCst); + .fetch_add(ObserverIndex::ApplySst as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); } } @@ -849,8 +851,10 @@ mod tests { _: &mut Vec, _: &PanicEngine, ) { - self.called - .fetch_add(ObserverIndex::OnFlushAppliedCmdBatch, Ordering::SeqCst); + self.called.fetch_add( + ObserverIndex::OnFlushAppliedCmdBatch as usize, + Ordering::SeqCst, + ); } fn on_applied_current_term(&self, _: StateRole, _: &Region) {} } @@ -890,48 +894,48 @@ mod tests { host.registry .register_cmd_observer(1, BoxCmdObserver::new(ob.clone())); - let mut index = 0; + let mut index: u64= 0; let region = Region::default(); let mut admin_req = RaftCmdRequest::default(); admin_req.set_admin_request(AdminRequest::default()); host.pre_propose(®ion, &mut admin_req).unwrap(); - index += ObserverIndex::PreProposeAdmin; + index += ObserverIndex::PreProposeAdmin as u64; assert_all!([&ob.called], &[index]); host.pre_apply(®ion, &admin_req); - index += ObserverIndex::PreApplyAdmin; + index += ObserverIndex::PreApplyAdmin as u64; assert_all!([&ob.called], &[index]); let mut admin_resp = RaftCmdResponse::default(); admin_resp.set_admin_response(AdminResponse::default()); host.post_apply(®ion, &Cmd::new(0, 0, admin_req, admin_resp)); - index += ObserverIndex::PostApplyAdmin; + index += ObserverIndex::PostApplyAdmin as u64; assert_all!([&ob.called], &[index]); let mut query_req = RaftCmdRequest::default(); query_req.set_requests(vec![Request::default()].into()); host.pre_propose(®ion, &mut query_req).unwrap(); - index += ObserverIndex::PreProposeQuery; + index += ObserverIndex::PreProposeQuery as u64; assert_all!([&ob.called], &[index]); - index += ObserverIndex::PreApplyQuery; + index += ObserverIndex::PreApplyQuery as u64; host.pre_apply(®ion, &query_req); assert_all!([&ob.called], &[index]); let query_resp = RaftCmdResponse::default(); host.post_apply(®ion, &Cmd::new(0, 0, query_req, query_resp)); - index += ObserverIndex::PostApplyQuery; + index += ObserverIndex::PostApplyQuery as u64; assert_all!([&ob.called], &[index]); host.on_role_change(®ion, RoleChange::new(StateRole::Leader)); - index += ObserverIndex::OnRoleChange; + index += ObserverIndex::OnRoleChange as u64; assert_all!([&ob.called], &[index]); host.on_region_changed(®ion, RegionChangeEvent::Create, StateRole::Follower); - index += ObserverIndex::OnRegionChanged; + index += ObserverIndex::OnRegionChanged as u64; assert_all!([&ob.called], &[index]); host.post_apply_plain_kvs_from_snapshot(®ion, "default", &[]); - index += ObserverIndex::ApplyPlainKvs; + index += ObserverIndex::ApplyPlainKvs as u64; assert_all!([&ob.called], &[index]); host.post_apply_sst_from_snapshot(®ion, "default", ""); - index += ObserverIndex::ApplySst; + index += ObserverIndex::ApplySst as u64; assert_all!([&ob.called], &[index]); let observe_info = CmdObserveInfo::from_handle( @@ -942,30 +946,30 @@ mod tests { let mut cb = CmdBatch::new(&observe_info, 0); cb.push(&observe_info, 0, Cmd::default()); host.on_flush_applied_cmd_batch(cb.level, vec![cb], &PanicEngine); - index += ObserverIndex::PostApplyQuery; - index += ObserverIndex::OnFlushAppliedCmdBatch; + index += ObserverIndex::PostApplyQuery as u64; + index += ObserverIndex::OnFlushAppliedCmdBatch as u64; assert_all!([&ob.called], &[index]); let mut empty_req = RaftCmdRequest::default(); empty_req.set_requests(vec![Request::default()].into()); host.on_empty_cmd(®ion, 0, 0); - index += ObserverIndex::OnEmptyCmd; + index += ObserverIndex::OnEmptyCmd as u64; assert_all!([&ob.called], &[index]); let mut query_req = RaftCmdRequest::default(); query_req.set_requests(vec![Request::default()].into()); host.pre_exec(®ion, &query_req, 0, 0); - index += ObserverIndex::PreExecQuery; + index += ObserverIndex::PreExecQuery as u64; assert_all!([&ob.called], &[index]); let mut admin_req = RaftCmdRequest::default(); admin_req.set_admin_request(AdminRequest::default()); host.pre_exec(®ion, &admin_req, 0, 0); - index += ObserverIndex::PreExecAdmin; + index += ObserverIndex::PreExecAdmin as u64; assert_all!([&ob.called], &[index]); host.on_compute_engine_size(); - index += ObserverIndex::OnComputeEngineSize; + index += ObserverIndex::OnComputeEngineSize as u64; assert_all!([&ob.called], &[index]); let mut pending_handle_ssts = None; @@ -980,7 +984,7 @@ mod tests { let region_state = RegionState::default(); let cmd = Cmd::default(); host.post_exec(®ion, &cmd, &apply_state, ®ion_state, &mut info); - index += ObserverIndex::PostExecQuery; + index += ObserverIndex::PostExecQuery as u64; assert_all!([&ob.called], &[index]); } diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index 370146c6b9e..3f841e699bb 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -1368,7 +1368,7 @@ where } RAFT_APPLYING_SST_GAUGE .with_label_values(&["pending_delete"]) - .set(ctx.pending_delete_ssts.len()); + .set(ctx.pending_delete_ssts.len() as i64); } } From 1f48a1a8f11f7cdb6eda37caa819fd6ce817d224 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Tue, 16 Aug 2022 14:03:52 +0800 Subject: [PATCH 25/27] f Signed-off-by: CalvinNeo --- .../raftstore/src/coprocessor/dispatcher.rs | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index 3b051a7f412..b36e99e6abd 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -894,48 +894,48 @@ mod tests { host.registry .register_cmd_observer(1, BoxCmdObserver::new(ob.clone())); - let mut index: u64= 0; + let mut index: usize= 0; let region = Region::default(); let mut admin_req = RaftCmdRequest::default(); admin_req.set_admin_request(AdminRequest::default()); host.pre_propose(®ion, &mut admin_req).unwrap(); - index += ObserverIndex::PreProposeAdmin as u64; + index += ObserverIndex::PreProposeAdmin as usize; assert_all!([&ob.called], &[index]); host.pre_apply(®ion, &admin_req); - index += ObserverIndex::PreApplyAdmin as u64; + index += ObserverIndex::PreApplyAdmin as usize; assert_all!([&ob.called], &[index]); let mut admin_resp = RaftCmdResponse::default(); admin_resp.set_admin_response(AdminResponse::default()); host.post_apply(®ion, &Cmd::new(0, 0, admin_req, admin_resp)); - index += ObserverIndex::PostApplyAdmin as u64; + index += ObserverIndex::PostApplyAdmin as usize; assert_all!([&ob.called], &[index]); let mut query_req = RaftCmdRequest::default(); query_req.set_requests(vec![Request::default()].into()); host.pre_propose(®ion, &mut query_req).unwrap(); - index += ObserverIndex::PreProposeQuery as u64; + index += ObserverIndex::PreProposeQuery as usize; assert_all!([&ob.called], &[index]); - index += ObserverIndex::PreApplyQuery as u64; + index += ObserverIndex::PreApplyQuery as usize; host.pre_apply(®ion, &query_req); assert_all!([&ob.called], &[index]); let query_resp = RaftCmdResponse::default(); host.post_apply(®ion, &Cmd::new(0, 0, query_req, query_resp)); - index += ObserverIndex::PostApplyQuery as u64; + index += ObserverIndex::PostApplyQuery as usize; assert_all!([&ob.called], &[index]); host.on_role_change(®ion, RoleChange::new(StateRole::Leader)); - index += ObserverIndex::OnRoleChange as u64; + index += ObserverIndex::OnRoleChange as usize; assert_all!([&ob.called], &[index]); host.on_region_changed(®ion, RegionChangeEvent::Create, StateRole::Follower); - index += ObserverIndex::OnRegionChanged as u64; + index += ObserverIndex::OnRegionChanged as usize; assert_all!([&ob.called], &[index]); host.post_apply_plain_kvs_from_snapshot(®ion, "default", &[]); - index += ObserverIndex::ApplyPlainKvs as u64; + index += ObserverIndex::ApplyPlainKvs as usize; assert_all!([&ob.called], &[index]); host.post_apply_sst_from_snapshot(®ion, "default", ""); - index += ObserverIndex::ApplySst as u64; + index += ObserverIndex::ApplySst as usize; assert_all!([&ob.called], &[index]); let observe_info = CmdObserveInfo::from_handle( @@ -946,30 +946,30 @@ mod tests { let mut cb = CmdBatch::new(&observe_info, 0); cb.push(&observe_info, 0, Cmd::default()); host.on_flush_applied_cmd_batch(cb.level, vec![cb], &PanicEngine); - index += ObserverIndex::PostApplyQuery as u64; - index += ObserverIndex::OnFlushAppliedCmdBatch as u64; + index += ObserverIndex::PostApplyQuery as usize; + index += ObserverIndex::OnFlushAppliedCmdBatch as usize; assert_all!([&ob.called], &[index]); let mut empty_req = RaftCmdRequest::default(); empty_req.set_requests(vec![Request::default()].into()); host.on_empty_cmd(®ion, 0, 0); - index += ObserverIndex::OnEmptyCmd as u64; + index += ObserverIndex::OnEmptyCmd as usize; assert_all!([&ob.called], &[index]); let mut query_req = RaftCmdRequest::default(); query_req.set_requests(vec![Request::default()].into()); host.pre_exec(®ion, &query_req, 0, 0); - index += ObserverIndex::PreExecQuery as u64; + index += ObserverIndex::PreExecQuery as usize; assert_all!([&ob.called], &[index]); let mut admin_req = RaftCmdRequest::default(); admin_req.set_admin_request(AdminRequest::default()); host.pre_exec(®ion, &admin_req, 0, 0); - index += ObserverIndex::PreExecAdmin as u64; + index += ObserverIndex::PreExecAdmin as usize; assert_all!([&ob.called], &[index]); host.on_compute_engine_size(); - index += ObserverIndex::OnComputeEngineSize as u64; + index += ObserverIndex::OnComputeEngineSize as usize; assert_all!([&ob.called], &[index]); let mut pending_handle_ssts = None; @@ -984,7 +984,7 @@ mod tests { let region_state = RegionState::default(); let cmd = Cmd::default(); host.post_exec(®ion, &cmd, &apply_state, ®ion_state, &mut info); - index += ObserverIndex::PostExecQuery as u64; + index += ObserverIndex::PostExecQuery as usize; assert_all!([&ob.called], &[index]); } From cb3b07a243251c8b32b78ac76beee3df47d8c08c Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Tue, 16 Aug 2022 14:18:34 +0800 Subject: [PATCH 26/27] f Signed-off-by: CalvinNeo --- components/raftstore/src/coprocessor/dispatcher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index b36e99e6abd..e667dd020da 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -894,7 +894,7 @@ mod tests { host.registry .register_cmd_observer(1, BoxCmdObserver::new(ob.clone())); - let mut index: usize= 0; + let mut index: usize = 0; let region = Region::default(); let mut admin_req = RaftCmdRequest::default(); admin_req.set_admin_request(AdminRequest::default()); From 08ec6127814c9fe344c6516222e161557c9225b6 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Tue, 16 Aug 2022 14:38:06 +0800 Subject: [PATCH 27/27] f Signed-off-by: CalvinNeo --- components/raftstore/src/coprocessor/dispatcher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index e667dd020da..7eea973997b 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -686,7 +686,7 @@ mod tests { _: &mut AdminRequest, ) -> Result<()> { self.called - .fetch_add(ObserverIndex::PreApplyAdmin as usize, Ordering::SeqCst); + .fetch_add(ObserverIndex::PreProposeAdmin as usize, Ordering::SeqCst); ctx.bypass = self.bypass.load(Ordering::SeqCst); if self.return_err.load(Ordering::SeqCst) { return Err(box_err!("error"));