Skip to content

Commit

Permalink
raftstore-v2: add replay watch (#14739)
Browse files Browse the repository at this point in the history
ref #14173

This is part of the code from PR 14501.  This change is to record the actual time of the replay raft log.

Signed-off-by: Qi Xu <tonyxuqqi@outlook.com>

Co-authored-by: Qi Xu <tonyxuqqi@outlook.com>
  • Loading branch information
tonyxuqqi and Qi Xu committed May 25, 2023
1 parent 1d25205 commit 7f9aaf7
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 29 deletions.
9 changes: 7 additions & 2 deletions components/raftstore-v2/src/batch/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ use time::Timespec;

use crate::{
fsm::{PeerFsm, PeerFsmDelegate, SenderFsmPair, StoreFsm, StoreFsmDelegate, StoreMeta},
operation::{SharedReadTablet, MERGE_IN_PROGRESS_PREFIX, MERGE_SOURCE_PREFIX, SPLIT_PREFIX},
operation::{
ReplayWatch, SharedReadTablet, MERGE_IN_PROGRESS_PREFIX, MERGE_SOURCE_PREFIX, SPLIT_PREFIX,
},
raft::Storage,
router::{PeerMsg, PeerTick, StoreMsg},
worker::{checkpoint, cleanup, pd, tablet},
Expand Down Expand Up @@ -747,8 +749,11 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
router.register_all(mailboxes);

// Make sure Msg::Start is the first message each FSM received.
let watch = Arc::new(ReplayWatch::new(self.logger.clone()));
for addr in address {
router.force_send(addr, PeerMsg::Start).unwrap();
router
.force_send(addr, PeerMsg::Start(Some(watch.clone())))
.unwrap();
}
router.send_control(StoreMsg::Start).unwrap();
Ok(())
Expand Down
9 changes: 5 additions & 4 deletions components/raftstore-v2/src/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

//! This module contains the peer implementation for batch system.

use std::borrow::Cow;
use std::{borrow::Cow, sync::Arc};

use batch_system::{BasicMailbox, Fsm};
use crossbeam::channel::TryRecvError;
Expand All @@ -20,6 +20,7 @@ use tikv_util::{

use crate::{
batch::StoreContext,
operation::ReplayWatch,
raft::{Peer, Storage},
router::{PeerMsg, PeerTick, QueryResult},
Result,
Expand Down Expand Up @@ -187,8 +188,8 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER,
self.store_ctx.tick_batch[idx].ticks.push(cb);
}

fn on_start(&mut self) {
if !self.fsm.peer.maybe_pause_for_recovery(self.store_ctx) {
fn on_start(&mut self, watch: Option<Arc<ReplayWatch>>) {
if !self.fsm.peer.maybe_pause_for_replay(self.store_ctx, watch) {
self.schedule_tick(PeerTick::Raft);
}
self.schedule_tick(PeerTick::SplitRegionCheck);
Expand Down Expand Up @@ -269,7 +270,7 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER,
PeerMsg::SplitInitFinish(region_id) => {
self.fsm.peer.on_split_init_finish(region_id)
}
PeerMsg::Start => self.on_start(),
PeerMsg::Start(w) => self.on_start(w),
PeerMsg::Noop => unimplemented!(),
PeerMsg::Persisted {
peer_id,
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore-v2/src/operation/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
progress_to_be_updated,
);
self.try_compelete_recovery();
if !self.pause_for_recovery() && self.storage_mut().apply_trace_mut().should_flush() {
if !self.pause_for_replay() && self.storage_mut().apply_trace_mut().should_flush() {
if let Some(scheduler) = self.apply_scheduler() {
scheduler.send(ApplyTask::ManualFlush);
}
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore-v2/src/operation/life.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ impl Store {
let mailbox = BasicMailbox::new(tx, fsm, ctx.router.state_cnt().clone());
if ctx
.router
.send_and_register(region_id, mailbox, PeerMsg::Start)
.send_and_register(region_id, mailbox, PeerMsg::Start(None))
.is_err()
{
panic!(
Expand Down
3 changes: 2 additions & 1 deletion components/raftstore-v2/src/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ pub use command::{
};
pub use life::{AbnormalPeerContext, DestroyProgress, GcPeerContext};
pub use ready::{
write_initial_states, ApplyTrace, AsyncWriter, DataTrace, GenSnapTask, SnapState, StateStorage,
write_initial_states, ApplyTrace, AsyncWriter, DataTrace, GenSnapTask, ReplayWatch, SnapState,
StateStorage,
};

pub(crate) use self::{
Expand Down
85 changes: 77 additions & 8 deletions components/raftstore-v2/src/operation/ready/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@ mod apply_trace;
mod async_writer;
mod snapshot;

use std::{cmp, time::Instant};
use std::{
cmp,
fmt::{self, Debug, Formatter},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Instant,
};

use engine_traits::{KvEngine, RaftEngine};
use error_code::ErrorCodeExt;
Expand All @@ -41,7 +49,7 @@ use raftstore::{
FetchedLogs, ReadProgress, Transport, WriteCallback, WriteTask,
},
};
use slog::{debug, error, info, trace, warn};
use slog::{debug, error, info, trace, warn, Logger};
use tikv_util::{
log::SlogFormat,
slog_panic,
Expand All @@ -63,7 +71,56 @@ use crate::{
worker::tablet,
};

const PAUSE_FOR_RECOVERY_GAP: u64 = 128;
const PAUSE_FOR_REPLAY_GAP: u64 = 128;

pub struct ReplayWatch {
normal_peers: AtomicUsize,
paused_peers: AtomicUsize,
logger: Logger,
timer: Instant,
}

impl Debug for ReplayWatch {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("ReplayWatch")
.field("normal_peers", &self.normal_peers)
.field("paused_peers", &self.paused_peers)
.field("logger", &self.logger)
.field("timer", &self.timer)
.finish()
}
}

impl ReplayWatch {
pub fn new(logger: Logger) -> Self {
Self {
normal_peers: AtomicUsize::new(0),
paused_peers: AtomicUsize::new(0),
logger,
timer: Instant::now(),
}
}

pub fn inc_normal_peer(&self) {
self.normal_peers.fetch_add(1, Ordering::Relaxed);
}

pub fn inc_paused_peer(&self) {
self.paused_peers.fetch_add(1, Ordering::Relaxed);
}
}

impl Drop for ReplayWatch {
fn drop(&mut self) {
info!(
self.logger,
"The raft log replay completed";
"normal_peers" => self.normal_peers.load(Ordering::Relaxed),
"paused_peers" => self.paused_peers.load(Ordering::Relaxed),
"elapsed" => ?self.timer.elapsed()
);
}
}

impl Store {
pub fn on_store_unreachable<EK, ER, T>(
Expand Down Expand Up @@ -115,7 +172,11 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER,
}

impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
pub fn maybe_pause_for_recovery<T>(&mut self, store_ctx: &mut StoreContext<EK, ER, T>) -> bool {
pub fn maybe_pause_for_replay<T>(
&mut self,
store_ctx: &mut StoreContext<EK, ER, T>,
watch: Option<Arc<ReplayWatch>>,
) -> bool {
// The task needs to be scheduled even if the tablet may be replaced during
// recovery. Otherwise if there are merges during recovery, the FSM may
// be paused forever.
Expand All @@ -139,14 +200,22 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
// it may block for ever when there is unapplied conf change.
self.set_has_ready();
}
if committed_index > applied_index + PAUSE_FOR_RECOVERY_GAP {
if committed_index > applied_index + PAUSE_FOR_REPLAY_GAP {
// If there are too many the missing logs, we need to skip ticking otherwise
// it may block the raftstore thread for a long time in reading logs for
// election timeout.
info!(self.logger, "pause for recovery"; "applied" => applied_index, "committed" => committed_index);
self.set_pause_for_recovery(true);
info!(self.logger, "pause for replay"; "applied" => applied_index, "committed" => committed_index);

// when committed_index > applied_index + PAUSE_FOR_REPLAY_GAP, the peer must be
// created from StoreSystem on TiKV Start
let w = watch.unwrap();
w.inc_paused_peer();
self.set_replay_watch(Some(w));
true
} else {
if let Some(w) = watch {
w.inc_normal_peer();
}
false
}
}
Expand Down Expand Up @@ -189,7 +258,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
"from_peer_id" => msg.get_from_peer().get_id(),
"to_peer_id" => msg.get_to_peer().get_id(),
);
if self.pause_for_recovery() && msg.get_message().get_msg_type() == MessageType::MsgAppend {
if self.pause_for_replay() && msg.get_message().get_msg_type() == MessageType::MsgAppend {
ctx.raft_metrics.message_dropped.recovery.inc();
return;
}
Expand Down
20 changes: 10 additions & 10 deletions components/raftstore-v2/src/raft/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use crate::{
fsm::ApplyScheduler,
operation::{
AbnormalPeerContext, AsyncWriter, BucketStatsInfo, CompactLogContext, DestroyProgress,
GcPeerContext, MergeContext, ProposalControl, SimpleWriteReqEncoder, SplitFlowControl,
TxnContext,
GcPeerContext, MergeContext, ProposalControl, ReplayWatch, SimpleWriteReqEncoder,
SplitFlowControl, TxnContext,
},
router::{ApplyTask, CmdResChannel, PeerTick, QueryResChannel},
Result,
Expand Down Expand Up @@ -76,7 +76,7 @@ pub struct Peer<EK: KvEngine, ER: RaftEngine> {
has_ready: bool,
/// Sometimes there is no ready at all, but we need to trigger async write.
has_extra_write: bool,
pause_for_recovery: bool,
replay_watch: Option<Arc<ReplayWatch>>,
/// Writer for persisting side effects asynchronously.
pub(crate) async_writer: AsyncWriter<EK, ER>,

Expand Down Expand Up @@ -179,7 +179,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
apply_scheduler: None,
has_ready: false,
has_extra_write: false,
pause_for_recovery: false,
replay_watch: None,
destroy_progress: DestroyProgress::None,
raft_group,
logger,
Expand Down Expand Up @@ -473,21 +473,21 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
}

#[inline]
pub fn set_pause_for_recovery(&mut self, pause: bool) {
self.pause_for_recovery = pause;
pub fn set_replay_watch(&mut self, watch: Option<Arc<ReplayWatch>>) {
self.replay_watch = watch;
}

#[inline]
pub fn pause_for_recovery(&self) -> bool {
self.pause_for_recovery
pub fn pause_for_replay(&self) -> bool {
self.replay_watch.is_some()
}

#[inline]
// we may have skipped scheduling raft tick when start due to noticable gap
// between commit index and apply index. We should scheduling it when raft log
// apply catches up.
pub fn try_compelete_recovery(&mut self) {
if self.pause_for_recovery()
if self.pause_for_replay()
&& self.storage().entry_storage().commit_index()
<= self.storage().entry_storage().applied_index()
{
Expand All @@ -496,7 +496,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
"recovery completed";
"apply_index" => self.storage().entry_storage().applied_index()
);
self.set_pause_for_recovery(false);
self.set_replay_watch(None);
// Flush to avoid recover again and again.
if let Some(scheduler) = self.apply_scheduler() {
scheduler.send(ApplyTask::ManualFlush);
Expand Down
5 changes: 3 additions & 2 deletions components/raftstore-v2/src/router/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

// #[PerformanceCriticalPath]
use std::sync::Arc;

use kvproto::{
import_sstpb::SstMeta,
Expand All @@ -21,7 +22,7 @@ use super::response_channel::{
QueryResSubscriber,
};
use crate::{
operation::{CatchUpLogs, RequestHalfSplit, RequestSplit, SplitInit},
operation::{CatchUpLogs, ReplayWatch, RequestHalfSplit, RequestSplit, SplitInit},
router::ApplyRes,
};

Expand Down Expand Up @@ -169,7 +170,7 @@ pub enum PeerMsg {
LogsFetched(FetchedLogs),
SnapshotGenerated(GenSnapRes),
/// Start the FSM.
Start,
Start(Option<Arc<ReplayWatch>>),
/// Messages from peer to peer in the same store
SplitInit(Box<SplitInit>),
SplitInitFinish(u64),
Expand Down

0 comments on commit 7f9aaf7

Please sign in to comment.