From 5ce3a6b78dbf498e4decbfcb5b0b7d1d7ea61dba Mon Sep 17 00:00:00 2001 From: Spade A <71589810+SpadeA-Tang@users.noreply.github.com> Date: Tue, 9 May 2023 14:30:58 +0800 Subject: [PATCH] raftstore-v2: offload checkpoint during split (#14646) close tikv/tikv#14711 offload checkpoint during split Signed-off-by: SpadeA-Tang --- components/raftstore-v2/src/batch/store.rs | 19 ++- components/raftstore-v2/src/fsm/apply.rs | 3 + .../src/operation/command/admin/split.rs | 155 ++++++++++-------- .../raftstore-v2/src/operation/command/mod.rs | 5 +- .../src/operation/query/capture.rs | 3 + components/raftstore-v2/src/raft/apply.rs | 10 ++ components/raftstore-v2/src/raft/storage.rs | 5 +- .../raftstore-v2/src/worker/checkpoint.rs | 145 ++++++++++++++++ components/raftstore-v2/src/worker/mod.rs | 1 + 9 files changed, 269 insertions(+), 77 deletions(-) create mode 100644 components/raftstore-v2/src/worker/checkpoint.rs diff --git a/components/raftstore-v2/src/batch/store.rs b/components/raftstore-v2/src/batch/store.rs index 1f6245cc010..9b01501ddd9 100644 --- a/components/raftstore-v2/src/batch/store.rs +++ b/components/raftstore-v2/src/batch/store.rs @@ -46,7 +46,7 @@ use tikv_util::{ sys::SysQuota, time::{duration_to_sec, Instant as TiInstant}, timer::SteadyTimer, - worker::{LazyWorker, Scheduler, Worker}, + worker::{Builder, LazyWorker, Scheduler, Worker}, yatp_pool::{DefaultTicker, FuturePool, YatpPoolBuilder}, Either, }; @@ -57,7 +57,7 @@ use crate::{ operation::{SharedReadTablet, MERGE_IN_PROGRESS_PREFIX, MERGE_SOURCE_PREFIX, SPLIT_PREFIX}, raft::Storage, router::{PeerMsg, PeerTick, StoreMsg}, - worker::{pd, tablet}, + worker::{checkpoint, pd, tablet}, Error, Result, }; @@ -496,6 +496,7 @@ pub struct Schedulers { pub read: Scheduler>, pub pd: Scheduler, pub tablet: Scheduler>, + pub checkpoint: Scheduler, pub write: WriteSenders, // Following is not maintained by raftstore itself. @@ -518,6 +519,7 @@ struct Workers { async_read: Worker, pd: LazyWorker, tablet: Worker, + checkpoint: Worker, async_write: StoreWriters, purge: Option, @@ -527,10 +529,12 @@ struct Workers { impl Workers { fn new(background: Worker, pd: LazyWorker, purge: Option) -> Self { + let checkpoint = Builder::new("checkpoint-worker").thread_count(2).create(); Self { async_read: Worker::new("async-read-worker"), pd, tablet: Worker::new("tablet-worker"), + checkpoint, async_write: StoreWriters::new(None), purge, background, @@ -542,6 +546,7 @@ impl Workers { self.async_read.stop(); self.pd.stop(); self.tablet.stop(); + self.checkpoint.stop(); if let Some(w) = self.purge { w.stop(); } @@ -653,7 +658,7 @@ impl StoreSystem { ), ); - let tablet_gc_scheduler = workers.tablet.start_with_timer( + let tablet_scheduler = workers.tablet.start_with_timer( "tablet-worker", tablet::Runner::new( tablet_registry.clone(), @@ -662,10 +667,16 @@ impl StoreSystem { ), ); + let checkpoint_scheduler = workers.checkpoint.start( + "checkpoint-worker", + checkpoint::Runner::new(self.logger.clone(), tablet_registry.clone()), + ); + let schedulers = Schedulers { read: read_scheduler, pd: workers.pd.scheduler(), - tablet: tablet_gc_scheduler, + tablet: tablet_scheduler, + checkpoint: checkpoint_scheduler, write: workers.async_write.senders(), split_check: split_check_scheduler, }; diff --git a/components/raftstore-v2/src/fsm/apply.rs b/components/raftstore-v2/src/fsm/apply.rs index 08d7f7946ec..9a3bc753810 100644 --- a/components/raftstore-v2/src/fsm/apply.rs +++ b/components/raftstore-v2/src/fsm/apply.rs @@ -27,6 +27,7 @@ use crate::{ operation::{CatchUpLogs, DataTrace}, raft::Apply, router::{ApplyRes, ApplyTask, PeerMsg}, + worker::checkpoint, }; /// A trait for reporting apply result. @@ -77,6 +78,7 @@ impl ApplyFsm { res_reporter: R, tablet_registry: TabletRegistry, read_scheduler: Scheduler>, + checkpoint_scheduler: Scheduler, flush_state: Arc, log_recovery: Option>, applied_term: u64, @@ -99,6 +101,7 @@ impl ApplyFsm { buckets, sst_importer, coprocessor_host, + checkpoint_scheduler, logger, ); ( diff --git a/components/raftstore-v2/src/operation/command/admin/split.rs b/components/raftstore-v2/src/operation/command/admin/split.rs index 4c6fdad3aa2..cd2c8428a46 100644 --- a/components/raftstore-v2/src/operation/command/admin/split.rs +++ b/components/raftstore-v2/src/operation/command/admin/split.rs @@ -25,14 +25,19 @@ //! created by the store, and here init it using the data sent from the parent //! peer. -use std::{any::Any, borrow::Cow, cmp, path::PathBuf}; +use std::{ + any::Any, + borrow::Cow, + cmp, + path::{Path, PathBuf}, + time::Duration, +}; use collections::HashSet; use crossbeam::channel::SendError; -use engine_traits::{ - Checkpointer, KvEngine, RaftEngine, RaftLogBatch, TabletContext, TabletRegistry, -}; +use engine_traits::{KvEngine, RaftEngine, RaftLogBatch, TabletContext, TabletRegistry}; use fail::fail_point; +use futures::channel::oneshot; use kvproto::{ metapb::{self, Region, RegionEpoch}, pdpb::CheckPolicy, @@ -54,7 +59,7 @@ use raftstore::{ Result, }; use slog::{error, info, warn}; -use tikv_util::{log::SlogFormat, slog_panic, time::Instant}; +use tikv_util::{log::SlogFormat, slog_panic, time::Instant, worker::Scheduler}; use crate::{ batch::StoreContext, @@ -62,7 +67,7 @@ use crate::{ operation::{AdminCmdResult, SharedReadTablet}, raft::{Apply, Peer}, router::{CmdResChannel, PeerMsg, PeerTick, StoreMsg}, - worker::tablet, + worker::{checkpoint, tablet}, Error, }; @@ -370,7 +375,7 @@ impl Peer { } impl Apply { - pub fn apply_split( + pub async fn apply_split( &mut self, req: &AdminRequest, log_index: u64, @@ -388,10 +393,10 @@ impl Apply { // This method is executed only when there are unapplied entries after being // restarted. So there will be no callback, it's OK to return a response // that does not matched with its request. - self.apply_batch_split(req, log_index) + self.apply_batch_split(req, log_index).await } - pub fn apply_batch_split( + pub async fn apply_batch_split( &mut self, req: &AdminRequest, log_index: u64, @@ -469,65 +474,36 @@ impl Apply { // write batch self.flush(); - // todo(SpadeA): Here: we use a temporary solution that we use checkpoint API to - // clone new tablets. It may cause large jitter as we need to flush the - // memtable. And more what is more important is that after removing WAL, the API - // will never flush. - // We will freeze the memtable rather than flush it in the following PR. - let tablet = self.tablet().clone(); - let mut checkpointer = tablet.new_checkpointer().unwrap_or_else(|e| { - slog_panic!( - self.logger, - "fails to create checkpoint object"; - "error" => ?e - ) - }); - let now = Instant::now(); - let reg = self.tablet_registry(); - for new_region in ®ions { - let new_region_id = new_region.id; - if new_region_id == region_id { - continue; - } - - let split_temp_path = temp_split_path(reg, new_region_id); - checkpointer - .create_at(&split_temp_path, None, 0) - .unwrap_or_else(|e| { - slog_panic!( - self.logger, - "fails to create checkpoint"; - "path" => %split_temp_path.display(), - "error" => ?e - ) - }); - } + let split_region_ids = regions + .iter() + .map(|r| r.get_id()) + .filter(|id| id != ®ion_id) + .collect::>(); + let (_, _, cur_suffix) = self + .tablet_registry() + .parse_tablet_name(Path::new(self.tablet().path())) + .unwrap(); + let scheduler: _ = self.checkpoint_scheduler().clone(); + let checkpoint_duration = async_checkpoint( + &scheduler, + region_id, + split_region_ids, + cur_suffix, + log_index, + ) + .await; - let derived_path = self.tablet_registry().tablet_path(region_id, log_index); - // If it's recovered from restart, it's possible the target path exists already. - // And because checkpoint is atomic, so we don't need to worry about corruption. - // And it's also wrong to delete it and remake as it may has applied and flushed - // some data to the new checkpoint before being restarted. - if !derived_path.exists() { - checkpointer - .create_at(&derived_path, None, 0) - .unwrap_or_else(|e| { - slog_panic!( - self.logger, - "fails to create checkpoint"; - "path" => %derived_path.display(), - "error" => ?e - ) - }); - } + // It should equal to checkpoint_duration + the duration of rescheduling current + // apply peer let elapsed = now.saturating_elapsed(); // to be removed after when it's stable info!( self.logger, - "create checkpoint time consumes"; + "checkpoint done and resume batch split execution"; "region" => ?self.region(), - "duration" => ?elapsed + "checkpoint_duration" => ?checkpoint_duration, + "total_duration" => ?elapsed, ); let reg = self.tablet_registry(); @@ -560,6 +536,27 @@ impl Apply { } } +// asynchronously execute the checkpoint creation and return the duration spent +// by it +async fn async_checkpoint( + scheduler: &Scheduler, + parent_region: u64, + split_regions: Vec, + cur_suffix: u64, + log_index: u64, +) -> Duration { + let (tx, rx) = oneshot::channel(); + let task = checkpoint::Task::Checkpoint { + cur_suffix, + log_index, + parent_region, + split_regions, + sender: tx, + }; + scheduler.schedule_force(task).unwrap(); + rx.await.unwrap() +} + impl Peer { pub fn on_apply_res_split( &mut self, @@ -867,6 +864,7 @@ mod test { use engine_traits::{ FlushState, Peekable, TabletContext, TabletRegistry, WriteBatch, CF_DEFAULT, DATA_CFS, }; + use futures::executor::block_on; use kvproto::{ metapb::RegionEpoch, raft_cmdpb::{BatchSplitRequest, SplitRequest}, @@ -879,8 +877,9 @@ mod test { use slog::o; use tempfile::TempDir; use tikv_util::{ + defer, store::{new_learner_peer, new_peer}, - worker::dummy_scheduler, + worker::{dummy_scheduler, Worker}, }; use super::*; @@ -947,7 +946,8 @@ mod test { req.set_splits(splits); // Exec batch split - let (resp, apply_res) = apply.apply_batch_split(&req, log_index).unwrap(); + let (resp, apply_res) = + block_on(async { apply.apply_batch_split(&req, log_index).await }).unwrap(); let regions = resp.get_splits().get_regions(); assert!(regions.len() == region_boundries.len()); @@ -990,6 +990,11 @@ mod test { assert!(reg.tablet_factory().exists(&path)); } } + + let AdminCmdResult::SplitRegion(SplitResult { tablet, .. }) = apply_res else { panic!() }; + // update cache + let mut cache = apply.tablet_registry().get(parent_id).unwrap(); + cache.set(*tablet.downcast().unwrap()); } #[test] @@ -1020,6 +1025,13 @@ mod test { region_state.set_region(region.clone()); region_state.set_tablet_index(5); + let checkpoint_worker = Worker::new("checkpoint-worker"); + let checkpoint_scheduler = checkpoint_worker.start( + "checkpoint-worker", + checkpoint::Runner::new(logger.clone(), reg.clone()), + ); + defer!(checkpoint_worker.stop()); + let (read_scheduler, _rx) = dummy_scheduler(); let (reporter, _) = MockReporter::new(); let (_tmp_dir, importer) = create_tmp_importer(); @@ -1042,6 +1054,7 @@ mod test { None, importer, host, + checkpoint_scheduler, logger.clone(), ); @@ -1050,13 +1063,13 @@ mod test { splits.mut_requests().push(new_split_req(b"k1", 1, vec![])); let mut req = AdminRequest::default(); req.set_splits(splits.clone()); - let err = apply.apply_batch_split(&req, 0).unwrap_err(); + let err = block_on(async { apply.apply_batch_split(&req, 0).await }).unwrap_err(); // 3 followers are required. assert!(err.to_string().contains("invalid new peer id count")); splits.mut_requests().clear(); req.set_splits(splits.clone()); - let err = apply.apply_batch_split(&req, 6).unwrap_err(); + let err = block_on(async { apply.apply_batch_split(&req, 6).await }).unwrap_err(); // Empty requests should be rejected. assert!(err.to_string().contains("missing split requests")); @@ -1064,7 +1077,9 @@ mod test { .mut_requests() .push(new_split_req(b"k11", 1, vec![11, 12, 13])); req.set_splits(splits.clone()); - let resp = new_error(apply.apply_batch_split(&req, 0).unwrap_err()); + let resp = + new_error(block_on(async { apply.apply_batch_split(&req, 0).await }).unwrap_err()); + // Out of range keys should be rejected. assert!( resp.get_header().get_error().has_key_not_in_region(), @@ -1077,7 +1092,7 @@ mod test { .mut_requests() .push(new_split_req(b"", 1, vec![11, 12, 13])); req.set_splits(splits.clone()); - let err = apply.apply_batch_split(&req, 7).unwrap_err(); + let err = block_on(async { apply.apply_batch_split(&req, 7).await }).unwrap_err(); // Empty key will not in any region exclusively. assert!(err.to_string().contains("missing split key"), "{:?}", err); @@ -1089,7 +1104,7 @@ mod test { .mut_requests() .push(new_split_req(b"k1", 1, vec![11, 12, 13])); req.set_splits(splits.clone()); - let err = apply.apply_batch_split(&req, 8).unwrap_err(); + let err = block_on(async { apply.apply_batch_split(&req, 8).await }).unwrap_err(); // keys should be in ascend order. assert!( err.to_string().contains("invalid split request"), @@ -1105,7 +1120,7 @@ mod test { .mut_requests() .push(new_split_req(b"k2", 1, vec![11, 12])); req.set_splits(splits.clone()); - let err = apply.apply_batch_split(&req, 9).unwrap_err(); + let err = block_on(async { apply.apply_batch_split(&req, 9).await }).unwrap_err(); // All requests should be checked. assert!(err.to_string().contains("id count"), "{:?}", err); @@ -1223,7 +1238,7 @@ mod test { .mut_requests() .push(new_split_req(b"k05", 70, vec![71, 72, 73])); req.set_splits(splits); - apply.apply_batch_split(&req, 51).unwrap(); + block_on(async { apply.apply_batch_split(&req, 51).await }).unwrap(); assert!(apply.write_batch.is_none()); assert_eq!( apply diff --git a/components/raftstore-v2/src/operation/command/mod.rs b/components/raftstore-v2/src/operation/command/mod.rs index b9256f031fe..11ada3697c0 100644 --- a/components/raftstore-v2/src/operation/command/mod.rs +++ b/components/raftstore-v2/src/operation/command/mod.rs @@ -143,6 +143,7 @@ impl Peer { mailbox, store_ctx.tablet_registry.clone(), read_scheduler, + store_ctx.schedulers.checkpoint.clone(), self.flush_state().clone(), self.storage().apply_trace().log_recovery(), self.entry_storage().applied_term(), @@ -631,8 +632,8 @@ impl Apply { let admin_req = req.get_admin_request(); let (admin_resp, admin_result) = match req.get_admin_request().get_cmd_type() { AdminCmdType::CompactLog => self.apply_compact_log(admin_req, log_index)?, - AdminCmdType::Split => self.apply_split(admin_req, log_index)?, - AdminCmdType::BatchSplit => self.apply_batch_split(admin_req, log_index)?, + AdminCmdType::Split => self.apply_split(admin_req, log_index).await?, + AdminCmdType::BatchSplit => self.apply_batch_split(admin_req, log_index).await?, AdminCmdType::PrepareMerge => self.apply_prepare_merge(admin_req, log_index)?, AdminCmdType::CommitMerge => self.apply_commit_merge(admin_req, log_index).await?, AdminCmdType::RollbackMerge => unimplemented!(), diff --git a/components/raftstore-v2/src/operation/query/capture.rs b/components/raftstore-v2/src/operation/query/capture.rs index 5393dfacc98..c1a622cd1f9 100644 --- a/components/raftstore-v2/src/operation/query/capture.rs +++ b/components/raftstore-v2/src/operation/query/capture.rs @@ -309,6 +309,8 @@ mod test { let mut host = CoprocessorHost::::default(); host.registry .register_cmd_observer(0, BoxCmdObserver::new(ob)); + + let (dummy_scheduler, _) = dummy_scheduler(); let mut apply = Apply::new( &Config::default(), region @@ -327,6 +329,7 @@ mod test { None, importer, host, + dummy_scheduler, logger.clone(), ); diff --git a/components/raftstore-v2/src/raft/apply.rs b/components/raftstore-v2/src/raft/apply.rs index d32b8bdbb80..bf1c81e88c8 100644 --- a/components/raftstore-v2/src/raft/apply.rs +++ b/components/raftstore-v2/src/raft/apply.rs @@ -21,6 +21,7 @@ use tikv_util::{log::SlogFormat, worker::Scheduler}; use crate::{ operation::{AdminCmdResult, ApplyFlowControl, DataTrace}, router::CmdResChannel, + worker::checkpoint, }; pub(crate) struct Observe { @@ -71,6 +72,8 @@ pub struct Apply { observe: Observe, coprocessor_host: CoprocessorHost, + checkpoint_scheduler: Scheduler, + pub(crate) metrics: ApplyMetrics, pub(crate) logger: Logger, pub(crate) buckets: Option, @@ -91,6 +94,7 @@ impl Apply { buckets: Option, sst_importer: Arc, coprocessor_host: CoprocessorHost, + checkpoint_scheduler: Scheduler, logger: Logger, ) -> Self { let mut remote_tablet = tablet_registry @@ -123,6 +127,7 @@ impl Apply { metrics: ApplyMetrics::default(), buckets, sst_importer, + checkpoint_scheduler, observe: Observe { info: CmdObserveInfo::default(), level: ObserveLevel::None, @@ -308,4 +313,9 @@ impl Apply { pub fn coprocessor_host(&self) -> &CoprocessorHost { &self.coprocessor_host } + + #[inline] + pub fn checkpoint_scheduler(&self) -> &Scheduler { + &self.checkpoint_scheduler + } } diff --git a/components/raftstore-v2/src/raft/storage.rs b/components/raftstore-v2/src/raft/storage.rs index 7edf8c02f09..0572a933fd5 100644 --- a/components/raftstore-v2/src/raft/storage.rs +++ b/components/raftstore-v2/src/raft/storage.rs @@ -338,7 +338,7 @@ mod tests { }; use slog::o; use tempfile::TempDir; - use tikv_util::worker::Worker; + use tikv_util::worker::{dummy_scheduler, Worker}; use super::*; use crate::{ @@ -506,6 +506,8 @@ mod tests { state.set_region(region.clone()); let (_tmp_dir, importer) = create_tmp_importer(); let host = CoprocessorHost::::default(); + + let (dummy_scheduler, _) = dummy_scheduler(); // setup peer applyer let mut apply = Apply::new( &Config::default(), @@ -520,6 +522,7 @@ mod tests { None, importer, host, + dummy_scheduler, logger, ); diff --git a/components/raftstore-v2/src/worker/checkpoint.rs b/components/raftstore-v2/src/worker/checkpoint.rs new file mode 100644 index 00000000000..0cbc9a11a42 --- /dev/null +++ b/components/raftstore-v2/src/worker/checkpoint.rs @@ -0,0 +1,145 @@ +// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. + +use std::{ + fmt::Display, + path::{Path, PathBuf}, + time::Duration, +}; + +use engine_traits::{Checkpointer, KvEngine, TabletRegistry}; +use futures::channel::oneshot::Sender; +use raftstore::store::RAFT_INIT_LOG_INDEX; +use slog::Logger; +use tikv_util::{slog_panic, time::Instant, worker::Runnable}; + +use crate::operation::SPLIT_PREFIX; + +pub enum Task { + Checkpoint { + // it is only used to assert + cur_suffix: u64, + log_index: u64, + parent_region: u64, + split_regions: Vec, + sender: Sender, + }, +} + +impl Display for Task { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Task::Checkpoint { + log_index, + parent_region, + split_regions, + .. + } => write!( + f, + "create checkpoint for batch split, parent region_id {}, source region_ids {:?}, log_index {}", + parent_region, split_regions, log_index, + ), + } + } +} + +pub struct Runner { + logger: Logger, + tablet_registry: TabletRegistry, +} + +pub fn temp_split_path(registry: &TabletRegistry, region_id: u64) -> PathBuf { + let tablet_name = registry.tablet_name(SPLIT_PREFIX, region_id, RAFT_INIT_LOG_INDEX); + registry.tablet_root().join(tablet_name) +} + +impl Runner { + pub fn new(logger: Logger, tablet_registry: TabletRegistry) -> Self { + Self { + logger, + tablet_registry, + } + } + + fn checkpoint( + &self, + parent_region: u64, + split_regions: Vec, + cur_suffix: u64, + log_index: u64, + sender: Sender, + ) { + let now = Instant::now(); + + let mut cache = self.tablet_registry.get(parent_region).unwrap(); + let tablet = cache.latest().unwrap(); + let (_, _, suffix) = self + .tablet_registry + .parse_tablet_name(Path::new(tablet.path())) + .unwrap(); + assert_eq!(cur_suffix, suffix); + + let mut checkpointer = tablet.new_checkpointer().unwrap_or_else(|e| { + slog_panic!( + self.logger, + "fails to create checkpoint object"; + "region_id" => parent_region, + "error" => ?e + ) + }); + + for id in split_regions { + let split_temp_path = temp_split_path(&self.tablet_registry, id); + checkpointer + .create_at(&split_temp_path, None, 0) + .unwrap_or_else(|e| { + slog_panic!( + self.logger, + "fails to create checkpoint"; + "region_id" => parent_region, + "path" => %split_temp_path.display(), + "error" => ?e + ) + }); + } + + let derived_path = self.tablet_registry.tablet_path(parent_region, log_index); + + // If it's recovered from restart, it's possible the target path exists already. + // And because checkpoint is atomic, so we don't need to worry about corruption. + // And it's also wrong to delete it and remake as it may has applied and flushed + // some data to the new checkpoint before being restarted. + if !derived_path.exists() { + checkpointer + .create_at(&derived_path, None, 0) + .unwrap_or_else(|e| { + slog_panic!( + self.logger, + "fails to create checkpoint"; + "region_id" => parent_region, + "path" => %derived_path.display(), + "error" => ?e + ) + }); + } + + sender.send(now.saturating_elapsed()).unwrap(); + } +} + +impl Runnable for Runner { + type Task = Task; + + fn run(&mut self, task: Self::Task) { + match task { + Task::Checkpoint { + cur_suffix, + log_index, + parent_region, + split_regions, + sender, + } => { + self.checkpoint(parent_region, split_regions, cur_suffix, log_index, sender); + } + } + } +} diff --git a/components/raftstore-v2/src/worker/mod.rs b/components/raftstore-v2/src/worker/mod.rs index 2fa7255afd3..b75525018d6 100644 --- a/components/raftstore-v2/src/worker/mod.rs +++ b/components/raftstore-v2/src/worker/mod.rs @@ -1,4 +1,5 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. +pub mod checkpoint; pub mod pd; pub mod tablet;