Skip to content

Commit

Permalink
raftstore: speed up conf change (#6421)
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Lee <busyjaylee@gmail.com>
  • Loading branch information
BusyJay committed Jan 7, 2020
1 parent e5568d4 commit 3d6258b
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/import/kv_service.rs
Expand Up @@ -17,7 +17,7 @@ use tikv_util::time::Instant;
use super::client::*;
use super::metrics::*;
use super::service::*;
use super::{Config, Error, KVImporter, error_inc};
use super::{error_inc, Config, Error, KVImporter};

#[derive(Clone)]
pub struct ImportKVService {
Expand Down
2 changes: 1 addition & 1 deletion src/import/metrics.rs
Expand Up @@ -88,7 +88,7 @@ lazy_static! {
"Counter of wait store available",
&["store_id"]
)
.unwrap();
.unwrap();
pub static ref IMPORTER_DOWNLOAD_DURATION: HistogramVec = register_histogram_vec!(
"tikv_import_download_duration",
"Bucketed histogram of importer download duration",
Expand Down
2 changes: 1 addition & 1 deletion src/import/mod.rs
Expand Up @@ -39,7 +39,7 @@ mod sst_service;
pub mod test_helpers;

pub use self::config::Config;
pub use self::errors::{Error, Result, error_inc};
pub use self::errors::{error_inc, Error, Result};
pub use self::kv_importer::KVImporter;
pub use self::kv_server::ImportKVServer;
pub use self::kv_service::ImportKVService;
Expand Down
2 changes: 1 addition & 1 deletion src/import/sst_importer.rs
Expand Up @@ -21,8 +21,8 @@ use engine::rocks::{IngestExternalFileOptions, SeekKey, SstReader, SstWriter, DB
use engine::CF_WRITE;
use external_storage::{create_storage, url_of_backend};

use super::{Error, Result};
use super::metrics::*;
use super::{Error, Result};
use crate::raftstore::store::keys;

/// SSTImporter manages SST files that are waiting for ingesting.
Expand Down
2 changes: 1 addition & 1 deletion src/import/sst_service.rs
Expand Up @@ -21,7 +21,7 @@ use tikv_util::time::Instant;
use super::import_mode::*;
use super::metrics::*;
use super::service::*;
use super::{Config, Error, SSTImporter, error_inc};
use super::{error_inc, Config, Error, SSTImporter};

/// ImportSSTService provides tikv-server with the ability to ingest SST files.
///
Expand Down
8 changes: 8 additions & 0 deletions src/raftstore/store/fsm/peer.rs
Expand Up @@ -358,6 +358,11 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
self.fsm.group_state = GroupState::Chaos;
self.register_raft_base_tick();
}
CasualMessage::SnapshotGenerated => {
// Resume snapshot handling again to avoid waiting another heartbeat.
self.fsm.peer.ping();
self.fsm.has_ready = true;
}
}
}

Expand Down Expand Up @@ -1440,6 +1445,9 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
let id = peer.get_id();
self.fsm.peer.peer_heartbeats.insert(id, now);
if self.fsm.peer.is_leader() {
// Speed up snapshot instead of waiting another heartbeat.
self.fsm.peer.ping();
self.fsm.has_ready = true;
self.fsm.peer.peers_start_pending_time.push((id, now));
}
self.fsm.peer.recent_conf_change_time = now;
Expand Down
3 changes: 2 additions & 1 deletion src/raftstore/store/fsm/store.rs
Expand Up @@ -1083,8 +1083,9 @@ impl RaftBatchSystem {
cfg.snap_apply_batch_size.0 as usize,
cfg.use_delete_range,
cfg.clean_stale_peer_delay.0,
self.router(),
);
let timer = RegionRunner::new_timer();
let timer = region_runner.new_timer();
box_try!(workers.region_worker.start_with_timer(region_runner, timer));

let raftlog_gc_runner = RaftlogGcRunner::new(None);
Expand Down
3 changes: 3 additions & 0 deletions src/raftstore/store/msg.rs
Expand Up @@ -209,6 +209,8 @@ pub enum CasualMessage {
ClearRegionSize,
/// Indicate a target region is overlapped.
RegionOverlapped,
/// Notifies that a new snapshot has been generated.
SnapshotGenerated,
}

impl fmt::Debug for CasualMessage {
Expand Down Expand Up @@ -250,6 +252,7 @@ impl fmt::Debug for CasualMessage {
"clear region size"
},
CasualMessage::RegionOverlapped => write!(fmt, "RegionOverlapped"),
CasualMessage::SnapshotGenerated => write!(fmt, "SnapshotGenerated"),
}
}
}
Expand Down
12 changes: 11 additions & 1 deletion src/raftstore/store/peer_storage.rs
Expand Up @@ -1955,7 +1955,15 @@ mod tests {
let mut worker = Worker::new("region-worker");
let sched = worker.scheduler();
let mut s = new_storage_from_ents(sched.clone(), &td, &ents);
let runner = RegionRunner::new(s.engines.clone(), mgr, 0, true, Duration::from_secs(0));
let (router, _) = mpsc::sync_channel(100);
let runner = RegionRunner::new(
s.engines.clone(),
mgr,
0,
true,
Duration::from_secs(0),
router,
);
worker.start(runner).unwrap();
let snap = s.snapshot();
let unavailable = RaftError::Store(StorageError::SnapshotTemporarilyUnavailable);
Expand Down Expand Up @@ -2260,12 +2268,14 @@ mod tests {
let mut worker = Worker::new("snap-manager");
let sched = worker.scheduler();
let s1 = new_storage_from_ents(sched.clone(), &td1, &ents);
let (router, _) = mpsc::sync_channel(100);
let runner = RegionRunner::new(
s1.engines.clone(),
mgr.clone(),
0,
true,
Duration::from_secs(0),
router,
);
worker.start(runner).unwrap();
assert!(s1.snapshot().is_err());
Expand Down
52 changes: 40 additions & 12 deletions src/raftstore/store/worker/region.rs
Expand Up @@ -22,8 +22,9 @@ use crate::raftstore::store::peer_storage::{
JOB_STATUS_PENDING, JOB_STATUS_RUNNING,
};
use crate::raftstore::store::snap::{plain_file_used, Error, Result, SNAPSHOT_CFS};
use crate::raftstore::store::transport::CasualRouter;
use crate::raftstore::store::{
self, check_abort, keys, ApplyOptions, SnapEntry, SnapKey, SnapManager,
self, check_abort, keys, ApplyOptions, CasualMessage, SnapEntry, SnapKey, SnapManager,
};
use tikv_util::threadpool::{DefaultContext, ThreadPool, ThreadPoolBuilder};
use tikv_util::time;
Expand Down Expand Up @@ -203,16 +204,17 @@ impl PendingDeleteRanges {
}

#[derive(Clone)]
struct SnapContext {
struct SnapContext<R> {
engines: Engines,
batch_size: usize,
mgr: SnapManager,
use_delete_range: bool,
clean_stale_peer_delay: Duration,
pending_delete_ranges: PendingDeleteRanges,
router: R,
}

impl SnapContext {
impl<R: CasualRouter> SnapContext<R> {
/// Generates the snapshot of the Region.
fn generate_snap(
&self,
Expand All @@ -238,6 +240,10 @@ impl SnapContext {
"err" => %e,
);
}
// The error can be ignored as snapshot will be sent in next heartbeat in the end.
let _ = self
.router
.send(region_id, CasualMessage::SnapshotGenerated);
Ok(())
}

Expand Down Expand Up @@ -519,23 +525,24 @@ impl SnapContext {
}
}

pub struct Runner {
pub struct Runner<R> {
pool: ThreadPool<DefaultContext>,
ctx: SnapContext,
ctx: SnapContext<R>,

// we may delay some apply tasks if level 0 files to write stall threshold,
// pending_applies records all delayed apply task, and will check again later
pending_applies: VecDeque<Task>,
}

impl Runner {
impl<R: CasualRouter> Runner<R> {
pub fn new(
engines: Engines,
mgr: SnapManager,
batch_size: usize,
use_delete_range: bool,
clean_stale_peer_delay: Duration,
) -> Runner {
router: R,
) -> Runner<R> {
Runner {
pool: ThreadPoolBuilder::with_default_factory(thd_name!("snap-generator"))
.thread_count(GENERATE_POOL_SIZE)
Expand All @@ -547,12 +554,13 @@ impl Runner {
use_delete_range,
clean_stale_peer_delay,
pending_delete_ranges: PendingDeleteRanges::default(),
router,
},
pending_applies: VecDeque::new(),
}
}

pub fn new_timer() -> Timer<Event> {
pub fn new_timer(&self) -> Timer<Event> {
let mut timer = Timer::new(2);
timer.add_task(
Duration::from_millis(PENDING_APPLY_CHECK_INTERVAL),
Expand Down Expand Up @@ -581,7 +589,10 @@ impl Runner {
}
}

impl Runnable<Task> for Runner {
impl<R> Runnable<Task> for Runner<R>
where
R: CasualRouter + Send + Clone + 'static,
{
fn run(&mut self, task: Task) {
match task {
Task::Gen {
Expand Down Expand Up @@ -639,7 +650,10 @@ pub enum Event {
CheckApply,
}

impl RunnableWithTimer<Task, Event> for Runner {
impl<R> RunnableWithTimer<Task, Event> for Runner<R>
where
R: CasualRouter + Send + Clone + 'static,
{
fn on_timeout(&mut self, timer: &mut Timer<Event>, event: Event) {
match event {
Event::CheckApply => {
Expand Down Expand Up @@ -671,7 +685,7 @@ mod tests {
use crate::raftstore::store::peer_storage::JOB_STATUS_PENDING;
use crate::raftstore::store::snap::tests::get_test_db_for_regions;
use crate::raftstore::store::worker::RegionRunner;
use crate::raftstore::store::{keys, SnapKey, SnapManager};
use crate::raftstore::store::{keys, CasualMessage, SnapKey, SnapManager};
use engine::rocks;
use engine::rocks::{ColumnFamilyOptions, Snapshot, Writable, WriteBatch};
use engine::Engines;
Expand Down Expand Up @@ -808,7 +822,15 @@ mod tests {
let mgr = SnapManager::new(snap_dir.path().to_str().unwrap(), None);
let mut worker = Worker::new("snap-manager");
let sched = worker.scheduler();
let runner = RegionRunner::new(engines.clone(), mgr, 0, true, Duration::from_secs(0));
let (router, receiver) = mpsc::sync_channel(1);
let runner = RegionRunner::new(
engines.clone(),
mgr,
0,
true,
Duration::from_secs(0),
router,
);
let mut timer = Timer::new(1);
timer.add_task(Duration::from_millis(100), Event::CheckApply);
worker.start_with_timer(runner, timer).unwrap();
Expand All @@ -825,6 +847,12 @@ mod tests {
})
.unwrap();
let s1 = rx.recv().unwrap();
match receiver.recv() {
Ok((region_id, CasualMessage::SnapshotGenerated)) => {
assert_eq!(region_id, id);
}
msg => panic!("expected SnapshotGenerated, but got {:?}", msg),
}
let data = s1.get_data();
let key = SnapKey::from_snap(&s1).unwrap();
let mgr = SnapManager::new(snap_dir.path().to_str().unwrap(), None);
Expand Down
21 changes: 20 additions & 1 deletion tests/integrations/raftstore/test_conf_change.rs
Expand Up @@ -3,7 +3,7 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::time::*;

use futures::Future;

Expand Down Expand Up @@ -897,3 +897,22 @@ where
let admin_req = new_admin_request(region_id, &epoch, conf_change);
cluster.call_command_on_leader(admin_req, Duration::from_secs(3))
}

/// Tests if conf change relies on heartbeat.
#[test]
fn test_conf_change_fast() {
let mut cluster = new_server_cluster(0, 3);
// Sets heartbeat timeout to more than 5 seconds. It also changes the election timeout,
// but it's OK as the cluster starts with only one peer, it will campaigns immediately.
configure_for_lease_read(&mut cluster, Some(5000), None);
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();
let r1 = cluster.run_conf_change();
cluster.must_put(b"k1", b"v1");
let timer = Instant::now();
// If conf change relies on heartbeat, it will take more than 5 seconds to finish,
// hence it must timeout.
pd_client.must_add_peer(r1, new_peer(2, 2));
must_get_equal(&cluster.get_engine(2), b"k1", b"v1");
assert!(timer.elapsed() < Duration::from_secs(5));
}

0 comments on commit 3d6258b

Please sign in to comment.