Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: add tick registry #4632

Merged
merged 5 commits into from May 6, 2019
Merged
Changes from 3 commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -52,7 +52,7 @@ use crate::raftstore::store::worker::{
CleanupSSTTask, ConsistencyCheckTask, RaftlogGcTask, ReadTask, RegionTask, SplitCheckTask,
};
use crate::raftstore::store::{
util, CasualMessage, Config, PeerMsg, PeerTick, RaftCommand, SignificantMsg, SnapKey,
util, CasualMessage, Config, PeerMsg, PeerTicks, RaftCommand, SignificantMsg, SnapKey,
SnapshotDeleter, StoreMsg,
};

@@ -65,6 +65,8 @@ pub struct DestroyPeerJob {

pub struct PeerFsm {
peer: Peer,
/// A registry for all scheduled ticks. This can avoid scheduling ticks twice accidentally.
tick_registry: PeerTicks,
stopped: bool,
has_ready: bool,
mailbox: Option<BasicMailbox<PeerFsm>>,
@@ -123,6 +125,7 @@ impl PeerFsm {
tx,
Box::new(PeerFsm {
peer: Peer::new(store_id, cfg, sched, engines, region, meta_peer)?,
tick_registry: PeerTicks::empty(),
stopped: false,
has_ready: false,
mailbox: None,
@@ -157,6 +160,7 @@ impl PeerFsm {
tx,
Box::new(PeerFsm {
peer: Peer::new(store_id, cfg, sched, engines, &region, peer)?,
tick_registry: PeerTicks::empty(),
stopped: false,
has_ready: false,
mailbox: None,
@@ -318,17 +322,25 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}
}

fn on_tick(&mut self, tick: PeerTick) {
fn on_tick(&mut self, tick: PeerTicks) {
if self.fsm.stopped {
return;
}
trace!(
"tick";
"tick" => ?tick,
"peer_id" => self.fsm.peer_id(),
"region_id" => self.region_id(),
);
self.fsm.tick_registry.remove(tick);
match tick {
PeerTick::Raft => self.on_raft_base_tick(),
PeerTick::RaftLogGc => self.on_raft_gc_log_tick(),
PeerTick::PdHeartbeat => self.on_pd_heartbeat_tick(),
PeerTick::SplitRegionCheck => self.on_split_region_check_tick(),
PeerTick::CheckMerge => self.on_check_merge(),
PeerTick::CheckPeerStaleState => self.on_check_peer_stale_state_tick(),
PeerTicks::RAFT => self.on_raft_base_tick(),
PeerTicks::RAFT_LOG_GC => self.on_raft_gc_log_tick(),
PeerTicks::PD_HEARTBEAT => self.on_pd_heartbeat_tick(),
PeerTicks::SPLIT_REGION_CHECK => self.on_split_region_check_tick(),
PeerTicks::CHECK_MERGE => self.on_check_merge(),
PeerTicks::CHECK_PEER_STALE_STATE => self.on_check_peer_stale_state_tick(),
_ => unreachable!(),
}
}

@@ -582,15 +594,27 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}

#[inline]
fn schedule_tick(&self, tick: PeerTick, timeout: Duration) {
fn schedule_tick(&mut self, tick: PeerTicks, timeout: Duration) {
if self.fsm.tick_registry.contains(tick) {
return;
}
if is_zero_duration(&timeout) {
return;
}
trace!(
"schedule tick";
"tick" => ?tick,
"timeout" => ?timeout,
"region_id" => self.region_id(),
"peer_id" => self.fsm.peer_id(),
);
self.fsm.tick_registry.insert(tick);
This conversation was marked as resolved by Connor1996

This comment has been minimized.

Copy link
@Connor1996

Connor1996 May 6, 2019

Member

what if it failed to get mailbox or schedule peer tick below, then the flag would be never removed

This comment has been minimized.

Copy link
@BusyJay

BusyJay May 6, 2019

Author Contributor

The error can only happen when the node is shutting down. But it makes sense to keep registry clean.


let region_id = self.region_id();
let mb = match self.ctx.router.mailbox(region_id) {
Some(mb) => mb,
None => {
self.fsm.tick_registry.remove(tick);
error!(
"failed to get mailbox";
"region_id" => self.fsm.region_id(),
@@ -608,7 +632,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
.map(move |_| {
fail_point!(
"on_raft_log_gc_tick_1",
peer_id == 1 && tick == PeerTick::RaftLogGc,
peer_id == 1 && tick == PeerTicks::RAFT_LOG_GC,
|_| unreachable!()
);
if let Err(e) = mb.force_send(PeerMsg::Tick(tick)) {

This comment has been minimized.

Copy link
@Connor1996

Connor1996 May 6, 2019

Member

maybe add a comment to indicate that it's fine to not clean registry here cause the error happens when the node is shutting down

@@ -630,10 +654,10 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
self.ctx.future_poller.spawn(f).unwrap();
}

fn register_raft_base_tick(&self) {
fn register_raft_base_tick(&mut self) {
// If we register raft base tick failed, the whole raft can't run correctly,
// TODO: shutdown the store?
self.schedule_tick(PeerTick::Raft, self.ctx.cfg.raft_base_tick_interval.0)
self.schedule_tick(PeerTicks::RAFT, self.ctx.cfg.raft_base_tick_interval.0)
}

fn on_raft_base_tick(&mut self) {
@@ -1394,7 +1418,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
panic!("{} original region should exists", self.fsm.peer.tag);
}
// It's not correct anymore, so set it to None to let split checker update it.
self.fsm.peer.approximate_size.take();
self.fsm.peer.approximate_size = None;
let last_region_id = regions.last().unwrap().get_id();
for new_region in regions {
let new_region_id = new_region.get_id();
@@ -1492,9 +1516,9 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}
}

fn register_merge_check_tick(&self) {
fn register_merge_check_tick(&mut self) {
self.schedule_tick(
PeerTick::CheckMerge,
PeerTicks::CHECK_MERGE,
self.ctx.cfg.merge_check_tick_interval.0,
)
}
@@ -2181,9 +2205,9 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
.map(|(_, region_id)| meta.regions[region_id].to_owned())
}

fn register_raft_gc_log_tick(&self) {
fn register_raft_gc_log_tick(&mut self) {
self.schedule_tick(
PeerTick::RaftLogGc,
PeerTicks::RAFT_LOG_GC,
self.ctx.cfg.raft_log_gc_tick_interval.0,
)
}
@@ -2295,9 +2319,9 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
PEER_GC_RAFT_LOG_COUNTER.inc_by(total_gc_logs as i64);
}

fn register_split_region_check_tick(&self) {
fn register_split_region_check_tick(&mut self) {
self.schedule_tick(
PeerTick::SplitRegionCheck,
PeerTicks::SPLIT_REGION_CHECK,
self.ctx.cfg.split_region_check_tick_interval.0,
)
}
@@ -2498,9 +2522,9 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
self.fsm.peer.heartbeat_pd(self.ctx);
}

fn register_pd_heartbeat_tick(&self) {
fn register_pd_heartbeat_tick(&mut self) {
self.schedule_tick(
PeerTick::PdHeartbeat,
PeerTicks::PD_HEARTBEAT,
self.ctx.cfg.pd_heartbeat_tick_interval.0,
)
}
@@ -2575,9 +2599,9 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}
}

fn register_check_peer_stale_state_tick(&self) {
fn register_check_peer_stale_state_tick(&mut self) {
self.schedule_tick(
PeerTick::CheckPeerStaleState,
PeerTicks::CHECK_PEER_STALE_STATE,
self.ctx.cfg.peer_stale_state_check_interval.0,
)
}
@@ -24,7 +24,7 @@ pub use self::bootstrap::{
pub use self::config::Config;
pub use self::fsm::{new_compaction_listener, DestroyPeerJob, RaftRouter, StoreInfo};
pub use self::msg::{
Callback, CasualMessage, PeerMsg, PeerTick, RaftCommand, ReadCallback, ReadResponse,
Callback, CasualMessage, PeerMsg, PeerTicks, RaftCommand, ReadCallback, ReadResponse,
SignificantMsg, StoreMsg, StoreTick, WriteCallback, WriteResponse,
};
pub use self::peer::{
@@ -84,26 +84,28 @@ impl fmt::Debug for Callback {
}
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum PeerTick {
Raft,
RaftLogGc,
SplitRegionCheck,
PdHeartbeat,
CheckMerge,
CheckPeerStaleState,
bitflags! {
pub struct PeerTicks: u8 {
const RAFT = 0b00000001;
const RAFT_LOG_GC = 0b00000010;
const SPLIT_REGION_CHECK = 0b00000100;
const PD_HEARTBEAT = 0b00001000;
const CHECK_MERGE = 0b00010000;
const CHECK_PEER_STALE_STATE = 0b00100000;
}
}

impl PeerTick {
impl PeerTicks {
#[inline]
pub fn tag(self) -> &'static str {
match self {
PeerTick::Raft => "raft",
PeerTick::RaftLogGc => "raft_log_gc",
PeerTick::SplitRegionCheck => "split_region_check",
PeerTick::PdHeartbeat => "pd_heartbeat",
PeerTick::CheckMerge => "check_merge",
PeerTick::CheckPeerStaleState => "check_peer_stale_state",
PeerTicks::RAFT => "raft",
PeerTicks::RAFT_LOG_GC => "raft_log_gc",
PeerTicks::SPLIT_REGION_CHECK => "split_region_check",
PeerTicks::PD_HEARTBEAT => "pd_heartbeat",
PeerTicks::CHECK_MERGE => "check_merge",
PeerTicks::CHECK_PEER_STALE_STATE => "check_peer_stale_state",
_ => unreachable!(),
}
}
}
@@ -267,7 +269,7 @@ pub enum PeerMsg {
RaftCommand(RaftCommand),
/// Tick is periodical task. If target peer doesn't exist there is a potential
/// that the raft node will not work anymore.
Tick(PeerTick),
Tick(PeerTicks),
/// Result of applying committed entries. The message can't be lost.
ApplyRes { res: ApplyTaskRes },
/// Message that can't be lost but rarely created. If they are lost, real bad
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.