Skip to content

Commit

Permalink
raftstore-v2: offload checkpoint during split (#14646)
Browse files Browse the repository at this point in the history
close #14711

offload checkpoint during split

Signed-off-by: SpadeA-Tang <u6748471@anu.edu.au>
  • Loading branch information
SpadeA-Tang committed May 9, 2023
1 parent 39de299 commit 5ce3a6b
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 77 deletions.
19 changes: 15 additions & 4 deletions components/raftstore-v2/src/batch/store.rs
Expand Up @@ -46,7 +46,7 @@ use tikv_util::{
sys::SysQuota,
time::{duration_to_sec, Instant as TiInstant},
timer::SteadyTimer,
worker::{LazyWorker, Scheduler, Worker},
worker::{Builder, LazyWorker, Scheduler, Worker},
yatp_pool::{DefaultTicker, FuturePool, YatpPoolBuilder},
Either,
};
Expand All @@ -57,7 +57,7 @@ use crate::{
operation::{SharedReadTablet, MERGE_IN_PROGRESS_PREFIX, MERGE_SOURCE_PREFIX, SPLIT_PREFIX},
raft::Storage,
router::{PeerMsg, PeerTick, StoreMsg},
worker::{pd, tablet},
worker::{checkpoint, pd, tablet},
Error, Result,
};

Expand Down Expand Up @@ -496,6 +496,7 @@ pub struct Schedulers<EK: KvEngine, ER: RaftEngine> {
pub read: Scheduler<ReadTask<EK>>,
pub pd: Scheduler<pd::Task>,
pub tablet: Scheduler<tablet::Task<EK>>,
pub checkpoint: Scheduler<checkpoint::Task>,
pub write: WriteSenders<EK, ER>,

// Following is not maintained by raftstore itself.
Expand All @@ -518,6 +519,7 @@ struct Workers<EK: KvEngine, ER: RaftEngine> {
async_read: Worker,
pd: LazyWorker<pd::Task>,
tablet: Worker,
checkpoint: Worker,
async_write: StoreWriters<EK, ER>,
purge: Option<Worker>,

Expand All @@ -527,10 +529,12 @@ struct Workers<EK: KvEngine, ER: RaftEngine> {

impl<EK: KvEngine, ER: RaftEngine> Workers<EK, ER> {
fn new(background: Worker, pd: LazyWorker<pd::Task>, purge: Option<Worker>) -> Self {
let checkpoint = Builder::new("checkpoint-worker").thread_count(2).create();
Self {
async_read: Worker::new("async-read-worker"),
pd,
tablet: Worker::new("tablet-worker"),
checkpoint,
async_write: StoreWriters::new(None),
purge,
background,
Expand All @@ -542,6 +546,7 @@ impl<EK: KvEngine, ER: RaftEngine> Workers<EK, ER> {
self.async_read.stop();
self.pd.stop();
self.tablet.stop();
self.checkpoint.stop();
if let Some(w) = self.purge {
w.stop();
}
Expand Down Expand Up @@ -653,7 +658,7 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
),
);

let tablet_gc_scheduler = workers.tablet.start_with_timer(
let tablet_scheduler = workers.tablet.start_with_timer(
"tablet-worker",
tablet::Runner::new(
tablet_registry.clone(),
Expand All @@ -662,10 +667,16 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
),
);

let checkpoint_scheduler = workers.checkpoint.start(
"checkpoint-worker",
checkpoint::Runner::new(self.logger.clone(), tablet_registry.clone()),
);

let schedulers = Schedulers {
read: read_scheduler,
pd: workers.pd.scheduler(),
tablet: tablet_gc_scheduler,
tablet: tablet_scheduler,
checkpoint: checkpoint_scheduler,
write: workers.async_write.senders(),
split_check: split_check_scheduler,
};
Expand Down
3 changes: 3 additions & 0 deletions components/raftstore-v2/src/fsm/apply.rs
Expand Up @@ -27,6 +27,7 @@ use crate::{
operation::{CatchUpLogs, DataTrace},
raft::Apply,
router::{ApplyRes, ApplyTask, PeerMsg},
worker::checkpoint,
};

/// A trait for reporting apply result.
Expand Down Expand Up @@ -77,6 +78,7 @@ impl<EK: KvEngine, R> ApplyFsm<EK, R> {
res_reporter: R,
tablet_registry: TabletRegistry<EK>,
read_scheduler: Scheduler<ReadTask<EK>>,
checkpoint_scheduler: Scheduler<checkpoint::Task>,
flush_state: Arc<FlushState>,
log_recovery: Option<Box<DataTrace>>,
applied_term: u64,
Expand All @@ -99,6 +101,7 @@ impl<EK: KvEngine, R> ApplyFsm<EK, R> {
buckets,
sst_importer,
coprocessor_host,
checkpoint_scheduler,
logger,
);
(
Expand Down

0 comments on commit 5ce3a6b

Please sign in to comment.