diff --git a/src/import/kv_service.rs b/src/import/kv_service.rs index 45049e674bc..e047e1ae18c 100644 --- a/src/import/kv_service.rs +++ b/src/import/kv_service.rs @@ -17,7 +17,7 @@ use tikv_util::time::Instant; use super::client::*; use super::metrics::*; use super::service::*; -use super::{Config, Error, KVImporter, error_inc}; +use super::{error_inc, Config, Error, KVImporter}; #[derive(Clone)] pub struct ImportKVService { diff --git a/src/import/metrics.rs b/src/import/metrics.rs index c054848f86b..0855f3a59ab 100644 --- a/src/import/metrics.rs +++ b/src/import/metrics.rs @@ -88,7 +88,7 @@ lazy_static! { "Counter of wait store available", &["store_id"] ) - .unwrap(); + .unwrap(); pub static ref IMPORTER_DOWNLOAD_DURATION: HistogramVec = register_histogram_vec!( "tikv_import_download_duration", "Bucketed histogram of importer download duration", diff --git a/src/import/mod.rs b/src/import/mod.rs index f0f3e096837..a7fce65903e 100644 --- a/src/import/mod.rs +++ b/src/import/mod.rs @@ -39,7 +39,7 @@ mod sst_service; pub mod test_helpers; pub use self::config::Config; -pub use self::errors::{Error, Result, error_inc}; +pub use self::errors::{error_inc, Error, Result}; pub use self::kv_importer::KVImporter; pub use self::kv_server::ImportKVServer; pub use self::kv_service::ImportKVService; diff --git a/src/import/sst_importer.rs b/src/import/sst_importer.rs index 6d618c81ec9..38e2bd6e87f 100644 --- a/src/import/sst_importer.rs +++ b/src/import/sst_importer.rs @@ -21,8 +21,8 @@ use engine::rocks::{IngestExternalFileOptions, SeekKey, SstReader, SstWriter, DB use engine::CF_WRITE; use external_storage::{create_storage, url_of_backend}; -use super::{Error, Result}; use super::metrics::*; +use super::{Error, Result}; use crate::raftstore::store::keys; /// SSTImporter manages SST files that are waiting for ingesting. diff --git a/src/import/sst_service.rs b/src/import/sst_service.rs index 020a7c61dd8..6097e363338 100644 --- a/src/import/sst_service.rs +++ b/src/import/sst_service.rs @@ -21,7 +21,7 @@ use tikv_util::time::Instant; use super::import_mode::*; use super::metrics::*; use super::service::*; -use super::{Config, Error, SSTImporter, error_inc}; +use super::{error_inc, Config, Error, SSTImporter}; /// ImportSSTService provides tikv-server with the ability to ingest SST files. /// diff --git a/src/raftstore/store/fsm/peer.rs b/src/raftstore/store/fsm/peer.rs index c0977c12808..17059feb3a7 100644 --- a/src/raftstore/store/fsm/peer.rs +++ b/src/raftstore/store/fsm/peer.rs @@ -358,6 +358,11 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { self.fsm.group_state = GroupState::Chaos; self.register_raft_base_tick(); } + CasualMessage::SnapshotGenerated => { + // Resume snapshot handling again to avoid waiting another heartbeat. + self.fsm.peer.ping(); + self.fsm.has_ready = true; + } } } @@ -1440,6 +1445,9 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { let id = peer.get_id(); self.fsm.peer.peer_heartbeats.insert(id, now); if self.fsm.peer.is_leader() { + // Speed up snapshot instead of waiting another heartbeat. + self.fsm.peer.ping(); + self.fsm.has_ready = true; self.fsm.peer.peers_start_pending_time.push((id, now)); } self.fsm.peer.recent_conf_change_time = now; diff --git a/src/raftstore/store/fsm/store.rs b/src/raftstore/store/fsm/store.rs index e8b2c7db314..19c04ed65bd 100644 --- a/src/raftstore/store/fsm/store.rs +++ b/src/raftstore/store/fsm/store.rs @@ -1083,8 +1083,9 @@ impl RaftBatchSystem { cfg.snap_apply_batch_size.0 as usize, cfg.use_delete_range, cfg.clean_stale_peer_delay.0, + self.router(), ); - let timer = RegionRunner::new_timer(); + let timer = region_runner.new_timer(); box_try!(workers.region_worker.start_with_timer(region_runner, timer)); let raftlog_gc_runner = RaftlogGcRunner::new(None); diff --git a/src/raftstore/store/msg.rs b/src/raftstore/store/msg.rs index d953e62862e..3250654aa70 100644 --- a/src/raftstore/store/msg.rs +++ b/src/raftstore/store/msg.rs @@ -209,6 +209,8 @@ pub enum CasualMessage { ClearRegionSize, /// Indicate a target region is overlapped. RegionOverlapped, + /// Notifies that a new snapshot has been generated. + SnapshotGenerated, } impl fmt::Debug for CasualMessage { @@ -250,6 +252,7 @@ impl fmt::Debug for CasualMessage { "clear region size" }, CasualMessage::RegionOverlapped => write!(fmt, "RegionOverlapped"), + CasualMessage::SnapshotGenerated => write!(fmt, "SnapshotGenerated"), } } } diff --git a/src/raftstore/store/peer_storage.rs b/src/raftstore/store/peer_storage.rs index 8c334ba9ee0..be740b5bf69 100644 --- a/src/raftstore/store/peer_storage.rs +++ b/src/raftstore/store/peer_storage.rs @@ -1955,7 +1955,15 @@ mod tests { let mut worker = Worker::new("region-worker"); let sched = worker.scheduler(); let mut s = new_storage_from_ents(sched.clone(), &td, &ents); - let runner = RegionRunner::new(s.engines.clone(), mgr, 0, true, Duration::from_secs(0)); + let (router, _) = mpsc::sync_channel(100); + let runner = RegionRunner::new( + s.engines.clone(), + mgr, + 0, + true, + Duration::from_secs(0), + router, + ); worker.start(runner).unwrap(); let snap = s.snapshot(); let unavailable = RaftError::Store(StorageError::SnapshotTemporarilyUnavailable); @@ -2260,12 +2268,14 @@ mod tests { let mut worker = Worker::new("snap-manager"); let sched = worker.scheduler(); let s1 = new_storage_from_ents(sched.clone(), &td1, &ents); + let (router, _) = mpsc::sync_channel(100); let runner = RegionRunner::new( s1.engines.clone(), mgr.clone(), 0, true, Duration::from_secs(0), + router, ); worker.start(runner).unwrap(); assert!(s1.snapshot().is_err()); diff --git a/src/raftstore/store/worker/region.rs b/src/raftstore/store/worker/region.rs index 565bf3f25bd..9e2f21abf77 100644 --- a/src/raftstore/store/worker/region.rs +++ b/src/raftstore/store/worker/region.rs @@ -22,8 +22,9 @@ use crate::raftstore::store::peer_storage::{ JOB_STATUS_PENDING, JOB_STATUS_RUNNING, }; use crate::raftstore::store::snap::{plain_file_used, Error, Result, SNAPSHOT_CFS}; +use crate::raftstore::store::transport::CasualRouter; use crate::raftstore::store::{ - self, check_abort, keys, ApplyOptions, SnapEntry, SnapKey, SnapManager, + self, check_abort, keys, ApplyOptions, CasualMessage, SnapEntry, SnapKey, SnapManager, }; use tikv_util::threadpool::{DefaultContext, ThreadPool, ThreadPoolBuilder}; use tikv_util::time; @@ -203,16 +204,17 @@ impl PendingDeleteRanges { } #[derive(Clone)] -struct SnapContext { +struct SnapContext { engines: Engines, batch_size: usize, mgr: SnapManager, use_delete_range: bool, clean_stale_peer_delay: Duration, pending_delete_ranges: PendingDeleteRanges, + router: R, } -impl SnapContext { +impl SnapContext { /// Generates the snapshot of the Region. fn generate_snap( &self, @@ -238,6 +240,10 @@ impl SnapContext { "err" => %e, ); } + // The error can be ignored as snapshot will be sent in next heartbeat in the end. + let _ = self + .router + .send(region_id, CasualMessage::SnapshotGenerated); Ok(()) } @@ -519,23 +525,24 @@ impl SnapContext { } } -pub struct Runner { +pub struct Runner { pool: ThreadPool, - ctx: SnapContext, + ctx: SnapContext, // we may delay some apply tasks if level 0 files to write stall threshold, // pending_applies records all delayed apply task, and will check again later pending_applies: VecDeque, } -impl Runner { +impl Runner { pub fn new( engines: Engines, mgr: SnapManager, batch_size: usize, use_delete_range: bool, clean_stale_peer_delay: Duration, - ) -> Runner { + router: R, + ) -> Runner { Runner { pool: ThreadPoolBuilder::with_default_factory(thd_name!("snap-generator")) .thread_count(GENERATE_POOL_SIZE) @@ -547,12 +554,13 @@ impl Runner { use_delete_range, clean_stale_peer_delay, pending_delete_ranges: PendingDeleteRanges::default(), + router, }, pending_applies: VecDeque::new(), } } - pub fn new_timer() -> Timer { + pub fn new_timer(&self) -> Timer { let mut timer = Timer::new(2); timer.add_task( Duration::from_millis(PENDING_APPLY_CHECK_INTERVAL), @@ -581,7 +589,10 @@ impl Runner { } } -impl Runnable for Runner { +impl Runnable for Runner +where + R: CasualRouter + Send + Clone + 'static, +{ fn run(&mut self, task: Task) { match task { Task::Gen { @@ -639,7 +650,10 @@ pub enum Event { CheckApply, } -impl RunnableWithTimer for Runner { +impl RunnableWithTimer for Runner +where + R: CasualRouter + Send + Clone + 'static, +{ fn on_timeout(&mut self, timer: &mut Timer, event: Event) { match event { Event::CheckApply => { @@ -671,7 +685,7 @@ mod tests { use crate::raftstore::store::peer_storage::JOB_STATUS_PENDING; use crate::raftstore::store::snap::tests::get_test_db_for_regions; use crate::raftstore::store::worker::RegionRunner; - use crate::raftstore::store::{keys, SnapKey, SnapManager}; + use crate::raftstore::store::{keys, CasualMessage, SnapKey, SnapManager}; use engine::rocks; use engine::rocks::{ColumnFamilyOptions, Snapshot, Writable, WriteBatch}; use engine::Engines; @@ -808,7 +822,15 @@ mod tests { let mgr = SnapManager::new(snap_dir.path().to_str().unwrap(), None); let mut worker = Worker::new("snap-manager"); let sched = worker.scheduler(); - let runner = RegionRunner::new(engines.clone(), mgr, 0, true, Duration::from_secs(0)); + let (router, receiver) = mpsc::sync_channel(1); + let runner = RegionRunner::new( + engines.clone(), + mgr, + 0, + true, + Duration::from_secs(0), + router, + ); let mut timer = Timer::new(1); timer.add_task(Duration::from_millis(100), Event::CheckApply); worker.start_with_timer(runner, timer).unwrap(); @@ -825,6 +847,12 @@ mod tests { }) .unwrap(); let s1 = rx.recv().unwrap(); + match receiver.recv() { + Ok((region_id, CasualMessage::SnapshotGenerated)) => { + assert_eq!(region_id, id); + } + msg => panic!("expected SnapshotGenerated, but got {:?}", msg), + } let data = s1.get_data(); let key = SnapKey::from_snap(&s1).unwrap(); let mgr = SnapManager::new(snap_dir.path().to_str().unwrap(), None); diff --git a/tests/integrations/raftstore/test_conf_change.rs b/tests/integrations/raftstore/test_conf_change.rs index 94ef1d7afb9..8b0769d4823 100644 --- a/tests/integrations/raftstore/test_conf_change.rs +++ b/tests/integrations/raftstore/test_conf_change.rs @@ -3,7 +3,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::thread; -use std::time::Duration; +use std::time::*; use futures::Future; @@ -897,3 +897,22 @@ where let admin_req = new_admin_request(region_id, &epoch, conf_change); cluster.call_command_on_leader(admin_req, Duration::from_secs(3)) } + +/// Tests if conf change relies on heartbeat. +#[test] +fn test_conf_change_fast() { + let mut cluster = new_server_cluster(0, 3); + // Sets heartbeat timeout to more than 5 seconds. It also changes the election timeout, + // but it's OK as the cluster starts with only one peer, it will campaigns immediately. + configure_for_lease_read(&mut cluster, Some(5000), None); + let pd_client = Arc::clone(&cluster.pd_client); + pd_client.disable_default_operator(); + let r1 = cluster.run_conf_change(); + cluster.must_put(b"k1", b"v1"); + let timer = Instant::now(); + // If conf change relies on heartbeat, it will take more than 5 seconds to finish, + // hence it must timeout. + pd_client.must_add_peer(r1, new_peer(2, 2)); + must_get_equal(&cluster.get_engine(2), b"k1", b"v1"); + assert!(timer.elapsed() < Duration::from_secs(5)); +}