Skip to content

Commit

Permalink
raftstore: speed up conf change (#6421) (#6432)
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Lee <busyjaylee@gmail.com>
  • Loading branch information
BusyJay committed Feb 14, 2020
1 parent 2f02ee7 commit 5ef9d0b
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 15 deletions.
8 changes: 8 additions & 0 deletions src/raftstore/store/fsm/peer.rs
Expand Up @@ -358,6 +358,11 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
self.fsm.group_state = GroupState::Chaos;
self.register_raft_base_tick();
}
CasualMessage::SnapshotGenerated => {
// Resume snapshot handling again to avoid waiting another heartbeat.
self.fsm.peer.ping();
self.fsm.has_ready = true;
}
}
}

Expand Down Expand Up @@ -1437,6 +1442,9 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
let id = peer.get_id();
self.fsm.peer.peer_heartbeats.insert(id, now);
if self.fsm.peer.is_leader() {
// Speed up snapshot instead of waiting another heartbeat.
self.fsm.peer.ping();
self.fsm.has_ready = true;
self.fsm.peer.peers_start_pending_time.push((id, now));
}
self.fsm.peer.recent_conf_change_time = now;
Expand Down
3 changes: 2 additions & 1 deletion src/raftstore/store/fsm/store.rs
Expand Up @@ -1083,8 +1083,9 @@ impl RaftBatchSystem {
cfg.snap_apply_batch_size.0 as usize,
cfg.use_delete_range,
cfg.clean_stale_peer_delay.0,
self.router(),
);
let timer = RegionRunner::new_timer();
let timer = region_runner.new_timer();
box_try!(workers.region_worker.start_with_timer(region_runner, timer));

let raftlog_gc_runner = RaftlogGcRunner::new(None);
Expand Down
3 changes: 3 additions & 0 deletions src/raftstore/store/msg.rs
Expand Up @@ -209,6 +209,8 @@ pub enum CasualMessage {
ClearRegionSize,
/// Indicate a target region is overlapped.
RegionOverlapped,
/// Notifies that a new snapshot has been generated.
SnapshotGenerated,
}

impl fmt::Debug for CasualMessage {
Expand Down Expand Up @@ -250,6 +252,7 @@ impl fmt::Debug for CasualMessage {
"clear region size"
},
CasualMessage::RegionOverlapped => write!(fmt, "RegionOverlapped"),
CasualMessage::SnapshotGenerated => write!(fmt, "SnapshotGenerated"),
}
}
}
Expand Down
12 changes: 11 additions & 1 deletion src/raftstore/store/peer_storage.rs
Expand Up @@ -1955,7 +1955,15 @@ mod tests {
let mut worker = Worker::new("region-worker");
let sched = worker.scheduler();
let mut s = new_storage_from_ents(sched.clone(), &td, &ents);
let runner = RegionRunner::new(s.engines.clone(), mgr, 0, true, Duration::from_secs(0));
let (router, _) = mpsc::sync_channel(100);
let runner = RegionRunner::new(
s.engines.clone(),
mgr,
0,
true,
Duration::from_secs(0),
router,
);
worker.start(runner).unwrap();
let snap = s.snapshot();
let unavailable = RaftError::Store(StorageError::SnapshotTemporarilyUnavailable);
Expand Down Expand Up @@ -2260,12 +2268,14 @@ mod tests {
let mut worker = Worker::new("snap-manager");
let sched = worker.scheduler();
let s1 = new_storage_from_ents(sched.clone(), &td1, &ents);
let (router, _) = mpsc::sync_channel(100);
let runner = RegionRunner::new(
s1.engines.clone(),
mgr.clone(),
0,
true,
Duration::from_secs(0),
router,
);
worker.start(runner).unwrap();
assert!(s1.snapshot().is_err());
Expand Down
52 changes: 40 additions & 12 deletions src/raftstore/store/worker/region.rs
Expand Up @@ -22,8 +22,9 @@ use crate::raftstore::store::peer_storage::{
JOB_STATUS_PENDING, JOB_STATUS_RUNNING,
};
use crate::raftstore::store::snap::{plain_file_used, Error, Result, SNAPSHOT_CFS};
use crate::raftstore::store::transport::CasualRouter;
use crate::raftstore::store::{
self, check_abort, keys, ApplyOptions, SnapEntry, SnapKey, SnapManager,
self, check_abort, keys, ApplyOptions, CasualMessage, SnapEntry, SnapKey, SnapManager,
};
use tikv_util::threadpool::{DefaultContext, ThreadPool, ThreadPoolBuilder};
use tikv_util::time;
Expand Down Expand Up @@ -203,16 +204,17 @@ impl PendingDeleteRanges {
}

#[derive(Clone)]
struct SnapContext {
struct SnapContext<R> {
engines: Engines,
batch_size: usize,
mgr: SnapManager,
use_delete_range: bool,
clean_stale_peer_delay: Duration,
pending_delete_ranges: PendingDeleteRanges,
router: R,
}

impl SnapContext {
impl<R: CasualRouter> SnapContext<R> {
/// Generates the snapshot of the Region.
fn generate_snap(
&self,
Expand All @@ -238,6 +240,10 @@ impl SnapContext {
"err" => %e,
);
}
// The error can be ignored as snapshot will be sent in next heartbeat in the end.
let _ = self
.router
.send(region_id, CasualMessage::SnapshotGenerated);
Ok(())
}

Expand Down Expand Up @@ -519,23 +525,24 @@ impl SnapContext {
}
}

pub struct Runner {
pub struct Runner<R> {
pool: ThreadPool<DefaultContext>,
ctx: SnapContext,
ctx: SnapContext<R>,

// we may delay some apply tasks if level 0 files to write stall threshold,
// pending_applies records all delayed apply task, and will check again later
pending_applies: VecDeque<Task>,
}

impl Runner {
impl<R: CasualRouter> Runner<R> {
pub fn new(
engines: Engines,
mgr: SnapManager,
batch_size: usize,
use_delete_range: bool,
clean_stale_peer_delay: Duration,
) -> Runner {
router: R,
) -> Runner<R> {
Runner {
pool: ThreadPoolBuilder::with_default_factory(thd_name!("snap-generator"))
.thread_count(GENERATE_POOL_SIZE)
Expand All @@ -547,12 +554,13 @@ impl Runner {
use_delete_range,
clean_stale_peer_delay,
pending_delete_ranges: PendingDeleteRanges::default(),
router,
},
pending_applies: VecDeque::new(),
}
}

pub fn new_timer() -> Timer<Event> {
pub fn new_timer(&self) -> Timer<Event> {
let mut timer = Timer::new(2);
timer.add_task(
Duration::from_millis(PENDING_APPLY_CHECK_INTERVAL),
Expand Down Expand Up @@ -581,7 +589,10 @@ impl Runner {
}
}

impl Runnable<Task> for Runner {
impl<R> Runnable<Task> for Runner<R>
where
R: CasualRouter + Send + Clone + 'static,
{
fn run(&mut self, task: Task) {
match task {
Task::Gen {
Expand Down Expand Up @@ -639,7 +650,10 @@ pub enum Event {
CheckApply,
}

impl RunnableWithTimer<Task, Event> for Runner {
impl<R> RunnableWithTimer<Task, Event> for Runner<R>
where
R: CasualRouter + Send + Clone + 'static,
{
fn on_timeout(&mut self, timer: &mut Timer<Event>, event: Event) {
match event {
Event::CheckApply => {
Expand Down Expand Up @@ -671,7 +685,7 @@ mod tests {
use crate::raftstore::store::peer_storage::JOB_STATUS_PENDING;
use crate::raftstore::store::snap::tests::get_test_db_for_regions;
use crate::raftstore::store::worker::RegionRunner;
use crate::raftstore::store::{keys, SnapKey, SnapManager};
use crate::raftstore::store::{keys, CasualMessage, SnapKey, SnapManager};
use engine::rocks;
use engine::rocks::{ColumnFamilyOptions, Snapshot, Writable, WriteBatch};
use engine::Engines;
Expand Down Expand Up @@ -808,7 +822,15 @@ mod tests {
let mgr = SnapManager::new(snap_dir.path().to_str().unwrap(), None);
let mut worker = Worker::new("snap-manager");
let sched = worker.scheduler();
let runner = RegionRunner::new(engines.clone(), mgr, 0, true, Duration::from_secs(0));
let (router, receiver) = mpsc::sync_channel(1);
let runner = RegionRunner::new(
engines.clone(),
mgr,
0,
true,
Duration::from_secs(0),
router,
);
let mut timer = Timer::new(1);
timer.add_task(Duration::from_millis(100), Event::CheckApply);
worker.start_with_timer(runner, timer).unwrap();
Expand All @@ -825,6 +847,12 @@ mod tests {
})
.unwrap();
let s1 = rx.recv().unwrap();
match receiver.recv() {
Ok((region_id, CasualMessage::SnapshotGenerated)) => {
assert_eq!(region_id, id);
}
msg => panic!("expected SnapshotGenerated, but got {:?}", msg),
}
let data = s1.get_data();
let key = SnapKey::from_snap(&s1).unwrap();
let mgr = SnapManager::new(snap_dir.path().to_str().unwrap(), None);
Expand Down
21 changes: 20 additions & 1 deletion tests/integrations/raftstore/test_conf_change.rs
Expand Up @@ -3,7 +3,7 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::time::*;

use futures::Future;

Expand Down Expand Up @@ -897,3 +897,22 @@ where
let admin_req = new_admin_request(region_id, &epoch, conf_change);
cluster.call_command_on_leader(admin_req, Duration::from_secs(3))
}

/// Tests if conf change relies on heartbeat.
#[test]
fn test_conf_change_fast() {
let mut cluster = new_server_cluster(0, 3);
// Sets heartbeat timeout to more than 5 seconds. It also changes the election timeout,
// but it's OK as the cluster starts with only one peer, it will campaigns immediately.
configure_for_lease_read(&mut cluster, Some(5000), None);
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();
let r1 = cluster.run_conf_change();
cluster.must_put(b"k1", b"v1");
let timer = Instant::now();
// If conf change relies on heartbeat, it will take more than 5 seconds to finish,
// hence it must timeout.
pd_client.must_add_peer(r1, new_peer(2, 2));
must_get_equal(&cluster.get_engine(2), b"k1", b"v1");
assert!(timer.elapsed() < Duration::from_secs(5));
}

0 comments on commit 5ef9d0b

Please sign in to comment.