Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: allow exec observers delay deletion of applied ssts #13061

Merged
merged 34 commits into from Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a8aa437
just change post_exec
CalvinNeo Jul 19, 2022
d539ca9
add tests with comments
CalvinNeo Jul 20, 2022
78d02ec
remove comments
CalvinNeo Jul 20, 2022
b1d70ea
finish tests
CalvinNeo Jul 20, 2022
e7267b9
f
CalvinNeo Jul 20, 2022
bd7c37c
f
CalvinNeo Jul 20, 2022
b414ab7
f
CalvinNeo Jul 20, 2022
843b1d2
f
CalvinNeo Jul 20, 2022
0c9f89b
Merge branch 'master' into merge-ingest-sst-delete
CalvinNeo Jul 21, 2022
201b166
f
CalvinNeo Aug 1, 2022
d304625
Merge branch 'merge-ingest-sst-delete' of github.com:CalvinNeo/tidb-e…
CalvinNeo Aug 1, 2022
7b4285c
f
CalvinNeo Aug 1, 2022
a61fe06
f
CalvinNeo Aug 1, 2022
3a9d36a
f
CalvinNeo Aug 4, 2022
4bc4d38
f
CalvinNeo Aug 5, 2022
1d070ac
f
CalvinNeo Aug 5, 2022
d65a980
Merge branch 'master' into merge-ingest-sst-delete
CalvinNeo Aug 9, 2022
28113c5
add test
CalvinNeo Aug 9, 2022
9df3445
f
CalvinNeo Aug 9, 2022
5828b38
remove debug log
CalvinNeo Aug 12, 2022
a30af8c
rename pending_clean_ssts to pending_delete_ssts
CalvinNeo Aug 12, 2022
f49b30b
fmt
CalvinNeo Aug 12, 2022
505f6a1
c
CalvinNeo Aug 15, 2022
31e8978
fmt
CalvinNeo Aug 15, 2022
d8b7f56
Merge branch 'master' into merge-ingest-sst-delete
CalvinNeo Aug 15, 2022
0aef69e
add metric
CalvinNeo Aug 16, 2022
8a24650
re test
CalvinNeo Aug 16, 2022
4d8633d
f
CalvinNeo Aug 16, 2022
b846e91
f
CalvinNeo Aug 16, 2022
fdea78c
f
CalvinNeo Aug 16, 2022
1f48a1a
f
CalvinNeo Aug 16, 2022
cb3b07a
f
CalvinNeo Aug 16, 2022
08ec612
f
CalvinNeo Aug 16, 2022
e70a175
Merge branch 'master' into merge-ingest-sst-delete
ti-chi-bot Aug 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 43 additions & 2 deletions components/raftstore/src/coprocessor/dispatcher.rs
Expand Up @@ -459,20 +459,21 @@ impl<E: KvEngine> CoprocessorHost<E> {
cmd: &Cmd,
apply_state: &RaftApplyState,
region_state: &RegionState,
apply_ctx: &mut ApplyCtxInfo<'_>,
) -> bool {
let mut ctx = ObserverContext::new(region);
if !cmd.response.has_admin_response() {
for observer in &self.registry.query_observers {
let observer = observer.observer.inner();
if observer.post_exec_query(&mut ctx, cmd, apply_state, region_state) {
if observer.post_exec_query(&mut ctx, cmd, apply_state, region_state, apply_ctx) {
return true;
}
}
false
} else {
for observer in &self.registry.admin_observers {
let observer = observer.observer.inner();
if observer.post_exec_admin(&mut ctx, cmd, apply_state, region_state) {
if observer.post_exec_admin(&mut ctx, cmd, apply_state, region_state, apply_ctx) {
return true;
}
}
Expand Down Expand Up @@ -693,6 +694,19 @@ mod tests {
ctx.bypass = self.bypass.load(Ordering::SeqCst);
false
}

fn post_exec_admin(
&self,
ctx: &mut ObserverContext<'_>,
_: &Cmd,
_: &RaftApplyState,
_: &RegionState,
_: &mut ApplyCtxInfo<'_>,
) -> bool {
self.called.fetch_add(18, Ordering::SeqCst);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the meaning of 18 and all other numbers below

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test, we assign unique index for each observer functions so that we can test all of the observer functions we want to be called are called. All numbers below 18 are used for other functions, so we use 18 for post_exec.

ctx.bypass = self.bypass.load(Ordering::SeqCst);
false
}
}

impl QueryObserver for TestCoprocessor {
Expand Down Expand Up @@ -735,6 +749,19 @@ mod tests {
self.called.fetch_add(14, Ordering::SeqCst);
ctx.bypass = self.bypass.load(Ordering::SeqCst);
}

fn post_exec_query(
&self,
ctx: &mut ObserverContext<'_>,
_: &Cmd,
_: &RaftApplyState,
_: &RegionState,
_: &mut ApplyCtxInfo<'_>,
) -> bool {
self.called.fetch_add(17, Ordering::SeqCst);
ctx.bypass = self.bypass.load(Ordering::SeqCst);
false
}
}

impl PdTaskObserver for TestCoprocessor {
Expand Down Expand Up @@ -886,6 +913,20 @@ mod tests {

host.on_compute_engine_size();
assert_all!([&ob.called], &[138]); // 19

let mut pending_handle_ssts = None;
let mut delete_ssts = vec![];
let mut pending_delete_ssts = vec![];
let mut info = ApplyCtxInfo {
pending_handle_ssts: &mut pending_handle_ssts,
pending_delete_ssts: &mut pending_delete_ssts,
delete_ssts: &mut delete_ssts,
};
let apply_state = RaftApplyState::default();
let region_state = RegionState::default();
let cmd = Cmd::default();
host.post_exec(&region, &cmd, &apply_state, &region_state, &mut info);
assert_all!([&ob.called], &[155]); // 17
}

#[test]
Expand Down
13 changes: 12 additions & 1 deletion components/raftstore/src/coprocessor/mod.rs
Expand Up @@ -9,7 +9,7 @@ use std::{
vec::IntoIter,
};

use engine_traits::CfName;
use engine_traits::{CfName, SstMetaInfo};
use kvproto::{
metapb::Region,
pdpb::CheckPolicy,
Expand Down Expand Up @@ -75,12 +75,21 @@ impl<'a> ObserverContext<'a> {
}
}

/// Context of a region provided for observers.
#[derive(Default, Clone)]
pub struct RegionState {
pub peer_id: u64,
pub pending_remove: bool,
pub modified_region: Option<Region>,
}

/// Context for exec observers of mutation to be applied to ApplyContext.
pub struct ApplyCtxInfo<'a> {
pub pending_handle_ssts: &'a mut Option<Vec<SstMetaInfo>>,
pub delete_ssts: &'a mut Vec<SstMetaInfo>,
pub pending_delete_ssts: &'a mut Vec<SstMetaInfo>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where's the code that consumes pending_delete_ssts and delete the SST files?
How can we ensure that these SST files are finally deleted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In TiKV's view:
post_exec will consumes pending_delete_ssts, and it will return true to suggest a immediate write to db.
In TiFlash's view:
When all data referred in the ssts is write into TiFlash's storage, TiFlash will consume pending_delete_ssts and suggest a write.
If TiFlash&Proxy restart before persistence, the recovered applied_index is before IngestSst command. So we can replay this command, and thus ensuring we will not lost track of sst files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In TiKV's view: post_exec will consumes pending_delete_ssts, and it will return true to suggest a immediate write to db. In TiFlash's view: When all data referred in the ssts is write into TiFlash's storage, TiFlash will consume pending_delete_ssts and suggest a write. If TiFlash&Proxy restart before persistence, the recovered applied_index is before IngestSst command. So we can replay this command, and thus ensuring we will not lost track of sst files.

IC. Then is there any metrics for pending_delete_ssts? We need to have a way to figure this out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a tikv_raft_applying_sst added
image

}

pub trait AdminObserver: Coprocessor {
/// Hook to call before proposing admin request.
fn pre_propose_admin(&self, _: &mut ObserverContext<'_>, _: &mut AdminRequest) -> Result<()> {
Expand Down Expand Up @@ -115,6 +124,7 @@ pub trait AdminObserver: Coprocessor {
_: &Cmd,
_: &RaftApplyState,
_: &RegionState,
_: &mut ApplyCtxInfo<'_>,
) -> bool {
false
}
Expand Down Expand Up @@ -154,6 +164,7 @@ pub trait QueryObserver: Coprocessor {
_: &Cmd,
_: &RaftApplyState,
_: &RegionState,
_: &mut ApplyCtxInfo<'_>,
) -> bool {
false
}
Expand Down
143 changes: 127 additions & 16 deletions components/raftstore/src/store/fsm/apply.rs
Expand Up @@ -73,7 +73,8 @@ use super::metrics::*;
use crate::{
bytes_capacity,
coprocessor::{
Cmd, CmdBatch, CmdObserveInfo, CoprocessorHost, ObserveHandle, ObserveLevel, RegionState,
ApplyCtxInfo, Cmd, CmdBatch, CmdObserveInfo, CoprocessorHost, ObserveHandle, ObserveLevel,
RegionState,
},
store::{
cmd_resp,
Expand Down Expand Up @@ -408,6 +409,11 @@ where
/// never apply again at first, then we can delete the ssts files.
delete_ssts: Vec<SstMetaInfo>,

/// A self-defined engine may be slow to ingest ssts.
/// It may move some elements of `delete_ssts` into `pending_delete_ssts` to
/// delay deletion. Otherwise we may lost data.
pending_delete_ssts: Vec<SstMetaInfo>,

/// The priority of this Handler.
priority: Priority,
/// Whether to yield high-latency operation to low-priority handler.
Expand Down Expand Up @@ -465,6 +471,7 @@ where
perf_context: engine.get_perf_context(cfg.perf_level, PerfContextKind::RaftstoreApply),
yield_duration: cfg.apply_yield_duration.0,
delete_ssts: vec![],
pending_delete_ssts: vec![],
store_id,
pending_create_peers,
priority,
Expand Down Expand Up @@ -1244,7 +1251,6 @@ where
.applied_batch
.push(cmd_cb, cmd, &self.observe_info, self.region_id());
if should_write {
debug!("persist data and apply state"; "region_id" => self.region_id(), "peer_id" => self.id(), "state" => ?self.apply_state);
apply_ctx.commit(self);
}
exec_result
Expand Down Expand Up @@ -1323,25 +1329,45 @@ where
self.applied_term = term;

let cmd = Cmd::new(index, term, req.clone(), resp.clone());
let (modified_region, mut pending_handle_ssts) = match exec_result {
ApplyResult::Res(ref e) => match e {
ExecResult::SplitRegion { ref derived, .. } => (Some(derived.clone()), None),
ExecResult::PrepareMerge { ref region, .. } => (Some(region.clone()), None),
ExecResult::CommitMerge { ref region, .. } => (Some(region.clone()), None),
ExecResult::RollbackMerge { ref region, .. } => (Some(region.clone()), None),
ExecResult::IngestSst { ref ssts } => (None, Some(ssts.clone())),
_ => (None, None),
},
_ => (None, None),
};
let mut apply_ctx_info = ApplyCtxInfo {
pending_handle_ssts: &mut pending_handle_ssts,
delete_ssts: &mut ctx.delete_ssts,
pending_delete_ssts: &mut ctx.pending_delete_ssts,
};
let should_write = ctx.host.post_exec(
&self.region,
&cmd,
&self.apply_state,
&RegionState {
peer_id: self.id(),
pending_remove: self.pending_remove,
modified_region: match exec_result {
ApplyResult::Res(ref e) => match e {
ExecResult::SplitRegion { ref derived, .. } => Some(derived.clone()),
ExecResult::PrepareMerge { ref region, .. } => Some(region.clone()),
ExecResult::CommitMerge { ref region, .. } => Some(region.clone()),
ExecResult::RollbackMerge { ref region, .. } => Some(region.clone()),
_ => None,
},
_ => None,
},
modified_region,
},
&mut apply_ctx_info,
);
match pending_handle_ssts {
None => (),
Some(mut v) => {
if !v.is_empty() {
// All elements in `pending_handle_ssts` should be moved into either
// `delete_ssts` or `pending_delete_ssts`, once handled by by any of the
// `post_exec` observers. So a non-empty
// `pending_handle_ssts` here indicates no `post_exec` handled.
ctx.delete_ssts.append(&mut v);
}
}
}

if let ApplyResult::Res(ref exec_result) = exec_result {
match *exec_result {
Expand Down Expand Up @@ -1564,7 +1590,6 @@ where
};
dont_delete_ingested_sst_fp();
}
ctx.delete_ssts.append(&mut ssts.clone());
ApplyResult::Res(ExecResult::IngestSst { ssts })
} else {
ApplyResult::None
Expand Down Expand Up @@ -4965,6 +4990,10 @@ mod tests {
cmd_sink: Option<Arc<Mutex<Sender<CmdBatch>>>>,
filter_compact_log: Arc<AtomicBool>,
filter_consistency_check: Arc<AtomicBool>,
delay_remove_ssts: Arc<AtomicBool>,
last_delete_sst_count: Arc<AtomicU64>,
last_pending_clean_sst_count: Arc<AtomicU64>,
last_pending_handle_sst_count: Arc<AtomicU64>,
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
}

impl Coprocessor for ApplyObserver {}
Expand All @@ -4977,6 +5006,43 @@ mod tests {
fn post_apply_query(&self, _: &mut ObserverContext<'_>, _: &Cmd) {
self.post_query_count.fetch_add(1, Ordering::SeqCst);
}

fn post_exec_query(
&self,
_: &mut ObserverContext<'_>,
_: &Cmd,
_: &RaftApplyState,
_: &RegionState,
apply_info: &mut ApplyCtxInfo<'_>,
) -> bool {
match apply_info.pending_handle_ssts {
Some(v) => {
// If it is a ingest sst
let mut ssts = std::mem::take(v);
assert_ne!(ssts.len(), 0);
if self.delay_remove_ssts.load(Ordering::SeqCst) {
apply_info.pending_delete_ssts.append(&mut ssts);
} else {
apply_info.delete_ssts.append(&mut ssts);
}
}
None => (),
}
self.last_delete_sst_count
.store(apply_info.delete_ssts.len() as u64, Ordering::SeqCst);
self.last_pending_clean_sst_count.store(
apply_info.pending_delete_ssts.len() as u64,
Ordering::SeqCst,
);
self.last_pending_handle_sst_count.store(
match apply_info.pending_handle_ssts {
Some(ref v) => v.len() as u64,
None => 0,
},
Ordering::SeqCst,
);
false
}
}

impl AdminObserver for ApplyObserver {
Expand All @@ -4986,6 +5052,7 @@ mod tests {
cmd: &Cmd,
_: &RaftApplyState,
region_state: &RegionState,
_: &mut ApplyCtxInfo<'_>,
) -> bool {
let request = cmd.request.get_admin_request();
match request.get_cmd_type() {
Expand Down Expand Up @@ -5662,11 +5729,13 @@ mod tests {
#[test]
fn test_exec_observer() {
let (_path, engine) = create_tmp_engine("test-exec-observer");
let (_import_dir, importer) = create_tmp_importer("test-exec-observer");
let (import_dir, importer) = create_tmp_importer("test-exec-observer");
let mut host = CoprocessorHost::<KvTestEngine>::default();
let obs = ApplyObserver::default();
host.registry
.register_admin_observer(1, BoxAdminObserver::new(obs.clone()));
host.registry
.register_query_observer(1, BoxQueryObserver::new(obs.clone()));

let (tx, rx) = mpsc::channel();
let (region_scheduler, _) = dummy_scheduler();
Expand All @@ -5680,7 +5749,7 @@ mod tests {
sender,
region_scheduler,
coprocessor_host: host,
importer,
importer: importer.clone(),
engine: engine.clone(),
router: router.clone(),
store_id: 1,
Expand Down Expand Up @@ -5781,7 +5850,7 @@ mod tests {
let apply_res = fetch_apply_res(&rx);
assert_eq!(apply_res.apply_state.get_applied_index(), index_id);
assert_eq!(apply_res.applied_term, 1);
let (_, r8) = if let ExecResult::SplitRegion {
let (r1, r8) = if let ExecResult::SplitRegion {
regions,
derived: _,
new_split_regions: _,
Expand Down Expand Up @@ -5812,6 +5881,48 @@ mod tests {
.unwrap_or_default();
assert_eq!(apply_res.apply_state, state);

// Phase 3: we test if we can delay deletion of some sst files.
let r1_epoch = r1.get_region_epoch();
index_id += 1;
let kvs: Vec<(&[u8], &[u8])> = vec![(b"k3", b"2")];
let sst_path = import_dir.path().join("test.sst");
let (mut meta, data) = gen_sst_file_with_kvs(&sst_path, &kvs);
meta.set_region_id(1);
meta.set_region_epoch(r1_epoch.clone());
let mut file = importer.create(&meta).unwrap();
file.append(&data).unwrap();
file.finish().unwrap();
let src = sst_path.clone();
let dst = file.get_import_path().save.to_str().unwrap();
std::fs::copy(src, dst).unwrap();
assert!(sst_path.as_path().exists());
let ingestsst = EntryBuilder::new(index_id, 1)
.ingest_sst(&meta)
.epoch(r1_epoch.get_conf_ver(), r1_epoch.get_version())
.build();

obs.delay_remove_ssts.store(true, Ordering::SeqCst);
router.schedule_task(1, Msg::apply(apply(peer_id, 1, 1, vec![ingestsst], vec![])));
fetch_apply_res(&rx);
let apply_res = fetch_apply_res(&rx);
assert_eq!(apply_res.exec_res.len(), 1);
assert_eq!(obs.last_pending_handle_sst_count.load(Ordering::SeqCst), 0);
assert_eq!(obs.last_delete_sst_count.load(Ordering::SeqCst), 0);
assert_eq!(obs.last_pending_clean_sst_count.load(Ordering::SeqCst), 1);

index_id += 1;
let ingestsst = EntryBuilder::new(index_id, 1)
.ingest_sst(&meta)
.epoch(r1_epoch.get_conf_ver(), r1_epoch.get_version())
.build();
obs.delay_remove_ssts.store(false, Ordering::SeqCst);
router.schedule_task(1, Msg::apply(apply(peer_id, 1, 1, vec![ingestsst], vec![])));
let apply_res = fetch_apply_res(&rx);
assert_eq!(apply_res.exec_res.len(), 1);
assert_eq!(obs.last_pending_handle_sst_count.load(Ordering::SeqCst), 0);
assert_eq!(obs.last_delete_sst_count.load(Ordering::SeqCst), 1);
assert_eq!(obs.last_pending_clean_sst_count.load(Ordering::SeqCst), 1);

system.shutdown();
}

Expand Down
4 changes: 4 additions & 0 deletions components/sst_importer/src/import_file.rs
Expand Up @@ -167,6 +167,10 @@ impl ImportFile {
}
Ok(())
}

pub fn get_import_path(&self) -> &ImportPath {
&self.path
}
}

impl fmt::Debug for ImportFile {
Expand Down