Skip to content

Commit

Permalink
raftstore: Remove future_poller pool and batch Ticks (#8457) (#8634)
Browse files Browse the repository at this point in the history
* cherry pick #8457 to release-4.0

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot committed Sep 11, 2020
1 parent fff0582 commit da63556
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 103 deletions.
83 changes: 27 additions & 56 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::borrow::Cow;
use std::collections::Bound::{Excluded, Included, Unbounded};
use std::collections::VecDeque;
use std::time::{Duration, Instant};
use std::time::Instant;
use std::{cmp, u64};

use batch_system::{BasicMailbox, Fsm};
Expand All @@ -12,7 +12,6 @@ use engine_rocks::{Compat, RocksEngine, RocksSnapshot, WRITE_BATCH_MAX_KEYS};
use engine_traits::CF_RAFT;
use engine_traits::{KvEngine, Peekable};
use error_code::ErrorCodeExt;
use futures::Future;
use kvproto::errorpb;
use kvproto::import_sstpb::SstMeta;
use kvproto::metapb::{self, Region, RegionEpoch};
Expand Down Expand Up @@ -849,17 +848,18 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}

#[inline]
fn schedule_tick(&mut self, tick: PeerTicks, timeout: Duration) {
fn schedule_tick(&mut self, tick: PeerTicks) {
if self.fsm.tick_registry.contains(tick) {
return;
}
if is_zero_duration(&timeout) {
let idx = tick.bits() as usize;
if is_zero_duration(&self.ctx.tick_batch[idx].wait_duration) {
return;
}
trace!(
"schedule tick";
"tick" => ?tick,
"timeout" => ?timeout,
"timeout" => ?self.ctx.tick_batch[idx].wait_duration,
"region_id" => self.region_id(),
"peer_id" => self.fsm.peer_id(),
);
Expand All @@ -880,42 +880,27 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}
};
let peer_id = self.fsm.peer.peer_id();
let f = self
.ctx
.timer
.delay(timeout)
.map(move |_| {
fail_point!(
"on_raft_log_gc_tick_1",
peer_id == 1 && tick == PeerTicks::RAFT_LOG_GC,
|_| unreachable!()
);
// This can happen only when the peer is about to be destroyed
// or the node is shutting down. So it's OK to not to clean up
// registry.
if let Err(e) = mb.force_send(PeerMsg::Tick(tick)) {
info!(
"failed to schedule peer tick";
"region_id" => region_id,
"peer_id" => peer_id,
"tick" => ?tick,
"err" => %e,
);
}
})
.map_err(move |e| {
panic!(
"[region {}] {} tick {:?} is lost due to timeout error: {:?}",
region_id, peer_id, tick, e
let cb = Box::new(move || {
// This can happen only when the peer is about to be destroyed
// or the node is shutting down. So it's OK to not to clean up
// registry.
if let Err(e) = mb.force_send(PeerMsg::Tick(tick)) {
debug!(
"failed to schedule peer tick";
"region_id" => region_id,
"peer_id" => peer_id,
"tick" => ?tick,
"err" => %e,
);
});
self.ctx.future_poller.spawn(f).unwrap();
}
});
self.ctx.tick_batch[idx].ticks.push(cb);
}

fn register_raft_base_tick(&mut self) {
// If we register raft base tick failed, the whole raft can't run correctly,
// TODO: shutdown the store?
self.schedule_tick(PeerTicks::RAFT, self.ctx.cfg.raft_base_tick_interval.0)
self.schedule_tick(PeerTicks::RAFT)
}

fn on_raft_base_tick(&mut self) {
Expand Down Expand Up @@ -1870,10 +1855,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}

fn register_merge_check_tick(&mut self) {
self.schedule_tick(
PeerTicks::CHECK_MERGE,
self.ctx.cfg.merge_check_tick_interval.0,
)
self.schedule_tick(PeerTicks::CHECK_MERGE)
}

/// Check if merge target region is staler than the local one in kv engine.
Expand Down Expand Up @@ -2699,19 +2681,17 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}

fn register_raft_gc_log_tick(&mut self) {
self.schedule_tick(
PeerTicks::RAFT_LOG_GC,
self.ctx.cfg.raft_log_gc_tick_interval.0,
)
self.schedule_tick(PeerTicks::RAFT_LOG_GC)
}

#[allow(clippy::if_same_then_else)]
fn on_raft_gc_log_tick(&mut self) {
if !self.fsm.peer.get_store().is_cache_empty() || !self.ctx.cfg.hibernate_regions {
self.register_raft_gc_log_tick();
}
debug_assert!(!self.fsm.stopped);
fail_point!("on_raft_log_gc_tick_1", self.fsm.peer_id() == 1, |_| {});
fail_point!("on_raft_gc_log_tick", |_| {});
debug_assert!(!self.fsm.stopped);

// As leader, we would not keep caches for the peers that didn't response heartbeat in the
// last few seconds. That happens probably because another TiKV is down. In this case if we
Expand Down Expand Up @@ -2811,10 +2791,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}

fn register_split_region_check_tick(&mut self) {
self.schedule_tick(
PeerTicks::SPLIT_REGION_CHECK,
self.ctx.cfg.split_region_check_tick_interval.0,
)
self.schedule_tick(PeerTicks::SPLIT_REGION_CHECK)
}

fn on_split_region_check_tick(&mut self) {
Expand Down Expand Up @@ -3028,10 +3005,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}

fn register_pd_heartbeat_tick(&mut self) {
self.schedule_tick(
PeerTicks::PD_HEARTBEAT,
self.ctx.cfg.pd_heartbeat_tick_interval.0,
)
self.schedule_tick(PeerTicks::PD_HEARTBEAT)
}

fn on_check_peer_stale_state_tick(&mut self) {
Expand Down Expand Up @@ -3129,10 +3103,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}

fn register_check_peer_stale_state_tick(&mut self) {
self.schedule_tick(
PeerTicks::CHECK_PEER_STALE_STATE,
self.ctx.cfg.peer_stale_state_check_interval.0,
)
self.schedule_tick(PeerTicks::CHECK_PEER_STALE_STATE)
}
}

Expand Down
77 changes: 63 additions & 14 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::{mem, thread, u64};
use time::{self, Timespec};
use tokio_threadpool::{Sender as ThreadPoolSender, ThreadPool};

use crate::coprocessor::split_observer::SplitObserver;
use crate::coprocessor::{BoxAdminObserver, CoprocessorHost, RegionChangeEvent};
Expand Down Expand Up @@ -58,6 +57,7 @@ use crate::store::worker::{
RaftlogGcRunner, RaftlogGcTask, ReadDelegate, RegionRunner, RegionTask, SplitCheckTask,
};
use crate::store::PdTask;
use crate::store::PeerTicks;
use crate::store::{
util, Callback, CasualMessage, PeerMsg, RaftCommand, SignificantMsg, SnapManager, StoreMsg,
StoreTick,
Expand All @@ -71,6 +71,7 @@ use pd_client::PdClient;
use sst_importer::SSTImporter;
use tikv_util::collections::{HashMap, HashSet};
use tikv_util::config::{Tracker, VersionTrack};
use tikv_util::future::poll_future_notify;
use tikv_util::mpsc::{self, LooseBoundedSender, Receiver};
use tikv_util::time::{duration_to_sec, Instant as TiInstant};
use tikv_util::timer::SteadyTimer;
Expand Down Expand Up @@ -205,6 +206,21 @@ impl<E: KvEngine> RaftRouter<E> {
}
}

#[derive(Default)]
pub struct PeerTickBatch {
pub ticks: Vec<Box<dyn FnOnce() + Send>>,
pub wait_duration: Duration,
}

impl Clone for PeerTickBatch {
fn clone(&self) -> PeerTickBatch {
PeerTickBatch {
ticks: vec![],
wait_duration: self.wait_duration,
}
}
}

pub struct PollContext<T, C: 'static> {
pub cfg: Config,
pub store: metapb::Store,
Expand All @@ -219,7 +235,6 @@ pub struct PollContext<T, C: 'static> {
pub router: RaftRouter<RocksEngine>,
pub importer: Arc<SSTImporter>,
pub store_meta: Arc<Mutex<StoreMeta>>,
pub future_poller: ThreadPoolSender,
pub raft_metrics: RaftMetrics,
pub snap_mgr: SnapManager,
pub applying_snap_count: Arc<AtomicUsize>,
Expand All @@ -240,6 +255,7 @@ pub struct PollContext<T, C: 'static> {
pub queued_snapshot: HashSet<u64>,
pub current_time: Option<Timespec>,
pub perf_context_statistics: PerfContextStatistics,
pub tick_batch: Vec<PeerTickBatch>,
pub node_start_time: Option<TiInstant>,
}

Expand Down Expand Up @@ -288,14 +304,29 @@ impl<T, C> PollContext<T, C> {
}
timeout
}

pub fn update_ticks_timeout(&mut self) {
self.tick_batch[PeerTicks::RAFT.bits() as usize].wait_duration =
self.cfg.raft_base_tick_interval.0;
self.tick_batch[PeerTicks::RAFT_LOG_GC.bits() as usize].wait_duration =
self.cfg.raft_log_gc_tick_interval.0;
self.tick_batch[PeerTicks::PD_HEARTBEAT.bits() as usize].wait_duration =
self.cfg.pd_heartbeat_tick_interval.0;
self.tick_batch[PeerTicks::SPLIT_REGION_CHECK.bits() as usize].wait_duration =
self.cfg.split_region_check_tick_interval.0;
self.tick_batch[PeerTicks::CHECK_PEER_STALE_STATE.bits() as usize].wait_duration =
self.cfg.peer_stale_state_check_interval.0;
self.tick_batch[PeerTicks::CHECK_MERGE.bits() as usize].wait_duration =
self.cfg.merge_check_tick_interval.0;
}
}

impl<T: Transport, C> PollContext<T, C> {
#[inline]
fn schedule_store_tick(&self, tick: StoreTick, timeout: Duration) {
if !is_zero_duration(&timeout) {
let mb = self.router.control_mailbox();
let f = self
let delay = self
.timer
.delay(timeout)
.map(move |_| {
Expand All @@ -310,7 +341,7 @@ impl<T: Transport, C> PollContext<T, C> {
.map_err(move |e| {
panic!("tick {:?} is lost due to timeout error: {:?}", tick, e);
});
self.future_poller.spawn(f).unwrap();
poll_future_notify(delay);
}
}

Expand Down Expand Up @@ -626,6 +657,29 @@ impl<T: Transport, C: PdClient> RaftPoller<T, C> {
self.poll_ctx.raft_metrics.ready.snapshot - self.previous_metrics.ready.snapshot
);
}

fn flush_ticks(&mut self) {
for t in PeerTicks::get_all_ticks() {
let idx = t.bits() as usize;
if self.poll_ctx.tick_batch[idx].ticks.is_empty() {
continue;
}
let peer_ticks = std::mem::replace(&mut self.poll_ctx.tick_batch[idx].ticks, vec![]);
let f = self
.poll_ctx
.timer
.delay(self.poll_ctx.tick_batch[idx].wait_duration)
.map(move |_| {
for tick in peer_ticks {
tick();
}
})
.map_err(move |e| {
panic!("tick is lost due to timeout error: {:?}", e);
});
poll_future_notify(f);
}
}
}

impl<T: Transport, C: PdClient> PollHandler<PeerFsm<RocksEngine>, StoreFsm> for RaftPoller<T, C> {
Expand Down Expand Up @@ -658,6 +712,7 @@ impl<T: Transport, C: PdClient> PollHandler<PeerFsm<RocksEngine>, StoreFsm> for
_ => {}
}
self.poll_ctx.cfg = incoming.clone();
self.poll_ctx.update_ticks_timeout();
}
}

Expand Down Expand Up @@ -729,6 +784,7 @@ impl<T: Transport, C: PdClient> PollHandler<PeerFsm<RocksEngine>, StoreFsm> for
}

fn end(&mut self, peers: &mut [Box<PeerFsm<RocksEngine>>]) {
self.flush_ticks();
if self.poll_ctx.has_ready {
self.handle_raft_ready(peers);
}
Expand Down Expand Up @@ -768,7 +824,6 @@ pub struct RaftPollerBuilder<T, C> {
pub router: RaftRouter<RocksEngine>,
pub importer: Arc<SSTImporter>,
store_meta: Arc<Mutex<StoreMeta>>,
future_poller: ThreadPoolSender,
snap_mgr: SnapManager,
pub coprocessor_host: CoprocessorHost,
trans: T,
Expand Down Expand Up @@ -953,7 +1008,7 @@ where
type Handler = RaftPoller<T, C>;

fn build(&mut self) -> RaftPoller<T, C> {
let ctx = PollContext {
let mut ctx = PollContext {
cfg: self.cfg.value().clone(),
store: self.store.clone(),
pd_scheduler: self.pd_scheduler.clone(),
Expand All @@ -966,7 +1021,6 @@ where
raftlog_gc_scheduler: self.raftlog_gc_scheduler.clone(),
importer: self.importer.clone(),
store_meta: self.store_meta.clone(),
future_poller: self.future_poller.clone(),
raft_metrics: RaftMetrics::default(),
snap_mgr: self.snap_mgr.clone(),
applying_snap_count: self.applying_snap_count.clone(),
Expand All @@ -987,8 +1041,10 @@ where
queued_snapshot: HashSet::default(),
current_time: None,
perf_context_statistics: PerfContextStatistics::new(self.cfg.value().perf_level),
tick_batch: vec![PeerTickBatch::default(); 256],
node_start_time: Some(TiInstant::now_coarse()),
};
ctx.update_ticks_timeout();
let tag = format!("[store {}]", ctx.store.get_id());
RaftPoller {
tag: tag.clone(),
Expand All @@ -1013,7 +1069,6 @@ struct Workers {
raftlog_gc_worker: Worker<RaftlogGcTask<RocksEngine>>,
region_worker: Worker<RegionTask>,
coprocessor_host: CoprocessorHost,
future_poller: ThreadPool,
}

pub struct RaftBatchSystem {
Expand Down Expand Up @@ -1065,10 +1120,6 @@ impl RaftBatchSystem {
cleanup_worker: Worker::new("cleanup-worker"),
raftlog_gc_worker: Worker::new("raft-gc-worker"),
coprocessor_host,
future_poller: tokio_threadpool::Builder::new()
.name_prefix("future-poller")
.pool_size(cfg.value().future_poll_size)
.build(),
};
let mut builder = RaftPollerBuilder {
cfg,
Expand All @@ -1090,7 +1141,6 @@ impl RaftBatchSystem {
global_stat: GlobalStoreStat::default(),
store_meta,
applying_snap_count: Arc::new(AtomicUsize::new(0)),
future_poller: workers.future_poller.sender().clone(),
};
let region_peers = builder.init()?;
let engine = RocksEngine::from_db(builder.engines.kv.clone());
Expand Down Expand Up @@ -1249,7 +1299,6 @@ impl RaftBatchSystem {
}
}
workers.coprocessor_host.shutdown();
workers.future_poller.shutdown_now().wait().unwrap();
}
}

Expand Down
Loading

0 comments on commit da63556

Please sign in to comment.