Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore-v2: add replay watch #14739

Merged
merged 8 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 7 additions & 2 deletions components/raftstore-v2/src/batch/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ use time::Timespec;

use crate::{
fsm::{PeerFsm, PeerFsmDelegate, SenderFsmPair, StoreFsm, StoreFsmDelegate, StoreMeta},
operation::{SharedReadTablet, MERGE_IN_PROGRESS_PREFIX, MERGE_SOURCE_PREFIX, SPLIT_PREFIX},
operation::{
ReplayWatch, SharedReadTablet, MERGE_IN_PROGRESS_PREFIX, MERGE_SOURCE_PREFIX, SPLIT_PREFIX,
},
raft::Storage,
router::{PeerMsg, PeerTick, StoreMsg},
worker::{checkpoint, pd, tablet},
Expand Down Expand Up @@ -728,8 +730,11 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
router.register_all(mailboxes);

// Make sure Msg::Start is the first message each FSM received.
let watch = Arc::new(ReplayWatch::new(self.logger.clone()));
for addr in address {
router.force_send(addr, PeerMsg::Start).unwrap();
router
.force_send(addr, PeerMsg::Start(Some(watch.clone())))
.unwrap();
}
router.send_control(StoreMsg::Start).unwrap();
Ok(())
Expand Down
13 changes: 9 additions & 4 deletions components/raftstore-v2/src/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

//! This module contains the peer implementation for batch system.

use std::borrow::Cow;
use std::{borrow::Cow, sync::Arc};

use batch_system::{BasicMailbox, Fsm};
use crossbeam::channel::TryRecvError;
Expand All @@ -20,6 +20,7 @@ use tikv_util::{

use crate::{
batch::StoreContext,
operation::ReplayWatch,
raft::{Peer, Storage},
router::{PeerMsg, PeerTick, QueryResult},
Result,
Expand Down Expand Up @@ -187,8 +188,12 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER,
self.store_ctx.tick_batch[idx].ticks.push(cb);
}

fn on_start(&mut self) {
if !self.fsm.peer.maybe_pause_for_recovery(self.store_ctx) {
fn on_start(&mut self, watch: Option<Arc<ReplayWatch>>) {
if !self
.fsm
.peer
.maybe_pause_for_recovery(self.store_ctx, watch)
{
self.schedule_tick(PeerTick::Raft);
}
self.schedule_tick(PeerTick::SplitRegionCheck);
Expand Down Expand Up @@ -269,7 +274,7 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER,
PeerMsg::SplitInitFinish(region_id) => {
self.fsm.peer.on_split_init_finish(region_id)
}
PeerMsg::Start => self.on_start(),
PeerMsg::Start(w) => self.on_start(w),
PeerMsg::Noop => unimplemented!(),
PeerMsg::Persisted {
peer_id,
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore-v2/src/operation/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
progress_to_be_updated,
);
self.try_compelete_recovery();
if !self.pause_for_recovery() && self.storage_mut().apply_trace_mut().should_flush() {
if !self.pause_for_replay() && self.storage_mut().apply_trace_mut().should_flush() {
if let Some(scheduler) = self.apply_scheduler() {
scheduler.send(ApplyTask::ManualFlush);
}
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore-v2/src/operation/life.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ impl Store {
let mailbox = BasicMailbox::new(tx, fsm, ctx.router.state_cnt().clone());
if ctx
.router
.send_and_register(region_id, mailbox, PeerMsg::Start)
.send_and_register(region_id, mailbox, PeerMsg::Start(None))
.is_err()
{
panic!(
Expand Down
3 changes: 2 additions & 1 deletion components/raftstore-v2/src/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ pub use command::{
};
pub use life::{AbnormalPeerContext, DestroyProgress, GcPeerContext};
pub use ready::{
write_initial_states, ApplyTrace, AsyncWriter, DataTrace, GenSnapTask, SnapState, StateStorage,
write_initial_states, ApplyTrace, AsyncWriter, DataTrace, GenSnapTask, ReplayWatch, SnapState,
StateStorage,
};

pub(crate) use self::{
Expand Down
75 changes: 69 additions & 6 deletions components/raftstore-v2/src/operation/ready/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@ mod apply_trace;
mod async_writer;
mod snapshot;

use std::{cmp, time::Instant};
use std::{
cmp,
fmt::{self, Debug, Formatter},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Instant,
};

use engine_traits::{KvEngine, RaftEngine};
use error_code::ErrorCodeExt;
Expand All @@ -41,7 +49,7 @@ use raftstore::{
FetchedLogs, ReadProgress, Transport, WriteCallback, WriteTask,
},
};
use slog::{debug, error, info, trace, warn};
use slog::{debug, error, info, trace, warn, Logger};
use tikv_util::{
log::SlogFormat,
slog_panic,
Expand All @@ -65,6 +73,53 @@ use crate::{

const PAUSE_FOR_RECOVERY_GAP: u64 = 128;

pub struct ReplayWatch {
tonyxuqqi marked this conversation as resolved.
Show resolved Hide resolved
skipped: AtomicUsize,
paused: AtomicUsize,
logger: Logger,
timer: Instant,
}

impl Debug for ReplayWatch {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("ReplayWatch")
.field("skipped", &self.skipped)
.field("paused", &self.paused)
.field("logger", &self.logger)
.field("timer", &self.timer)
.finish()
}
}

impl ReplayWatch {
pub fn new(logger: Logger) -> Self {
Self {
skipped: AtomicUsize::new(0),
paused: AtomicUsize::new(0),
logger,
timer: Instant::now(),
}
}

pub fn record_skipped(&self) {
self.skipped.fetch_add(1, Ordering::Relaxed);
}

pub fn record_paused(&self) {
self.paused.fetch_add(1, Ordering::Relaxed);
}
}

impl Drop for ReplayWatch {
fn drop(&mut self) {
info!(self.logger,
"The raft log replay completed";
"skipped" => self.skipped.load(Ordering::Relaxed),
"paused" => self.paused.load(Ordering::Relaxed),
"elapsed" => ?self.timer.elapsed());
tonyxuqqi marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl Store {
pub fn on_store_unreachable<EK, ER, T>(
&mut self,
Expand Down Expand Up @@ -115,7 +170,11 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER,
}

impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
pub fn maybe_pause_for_recovery<T>(&mut self, store_ctx: &mut StoreContext<EK, ER, T>) -> bool {
pub fn maybe_pause_for_recovery<T>(
&mut self,
store_ctx: &mut StoreContext<EK, ER, T>,
watch: Option<Arc<ReplayWatch>>,
) -> bool {
// The task needs to be scheduled even if the tablet may be replaced during
// recovery. Otherwise if there are merges during recovery, the FSM may
// be paused forever.
Expand All @@ -139,14 +198,18 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
// it may block for ever when there is unapplied conf change.
self.set_has_ready();
}
if committed_index > applied_index + PAUSE_FOR_RECOVERY_GAP {
if committed_index > applied_index + PAUSE_FOR_RECOVERY_GAP && let Some(w) = watch {
tabokie marked this conversation as resolved.
Show resolved Hide resolved
// If there are too many the missing logs, we need to skip ticking otherwise
// it may block the raftstore thread for a long time in reading logs for
// election timeout.
info!(self.logger, "pause for recovery"; "applied" => applied_index, "committed" => committed_index);
self.set_pause_for_recovery(true);
w.record_paused();
self.set_replay_watch(Some(w));
true
} else {
if let Some(w) = watch {
w.record_skipped();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it called "skipped"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's "skipped" the pause for replay as it's not needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is confusing. For "pause", it means the replay is paused. But for "skipped", it doesn't mean replay is skipped, it means "pause-for-replay" is skipped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It's renamed to "normal_peers" and "paused_peers", which seems better. @tabokie

}
false
}
}
Expand Down Expand Up @@ -189,7 +252,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
"from_peer_id" => msg.get_from_peer().get_id(),
"to_peer_id" => msg.get_to_peer().get_id(),
);
if self.pause_for_recovery() && msg.get_message().get_msg_type() == MessageType::MsgAppend {
if self.pause_for_replay() && msg.get_message().get_msg_type() == MessageType::MsgAppend {
ctx.raft_metrics.message_dropped.recovery.inc();
return;
}
Expand Down
20 changes: 10 additions & 10 deletions components/raftstore-v2/src/raft/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use crate::{
fsm::ApplyScheduler,
operation::{
AbnormalPeerContext, AsyncWriter, BucketStatsInfo, CompactLogContext, DestroyProgress,
GcPeerContext, MergeContext, ProposalControl, SimpleWriteReqEncoder, SplitFlowControl,
TxnContext,
GcPeerContext, MergeContext, ProposalControl, ReplayWatch, SimpleWriteReqEncoder,
SplitFlowControl, TxnContext,
},
router::{ApplyTask, CmdResChannel, PeerTick, QueryResChannel},
Result,
Expand Down Expand Up @@ -76,7 +76,7 @@ pub struct Peer<EK: KvEngine, ER: RaftEngine> {
has_ready: bool,
/// Sometimes there is no ready at all, but we need to trigger async write.
has_extra_write: bool,
pause_for_recovery: bool,
replay_watch: Option<Arc<ReplayWatch>>,
/// Writer for persisting side effects asynchronously.
pub(crate) async_writer: AsyncWriter<EK, ER>,

Expand Down Expand Up @@ -179,7 +179,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
apply_scheduler: None,
has_ready: false,
has_extra_write: false,
pause_for_recovery: false,
replay_watch: None,
destroy_progress: DestroyProgress::None,
raft_group,
logger,
Expand Down Expand Up @@ -468,21 +468,21 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
}

#[inline]
pub fn set_pause_for_recovery(&mut self, pause: bool) {
self.pause_for_recovery = pause;
pub fn set_replay_watch(&mut self, watch: Option<Arc<ReplayWatch>>) {
self.replay_watch = watch;
}

#[inline]
pub fn pause_for_recovery(&self) -> bool {
self.pause_for_recovery
pub fn pause_for_replay(&self) -> bool {
tabokie marked this conversation as resolved.
Show resolved Hide resolved
self.replay_watch.is_some()
}

#[inline]
// we may have skipped scheduling raft tick when start due to noticable gap
// between commit index and apply index. We should scheduling it when raft log
// apply catches up.
pub fn try_compelete_recovery(&mut self) {
if self.pause_for_recovery()
if self.pause_for_replay()
&& self.storage().entry_storage().commit_index()
<= self.storage().entry_storage().applied_index()
{
Expand All @@ -491,7 +491,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
"recovery completed";
"apply_index" => self.storage().entry_storage().applied_index()
);
self.set_pause_for_recovery(false);
self.set_replay_watch(None);
// Flush to avoid recover again and again.
if let Some(scheduler) = self.apply_scheduler() {
scheduler.send(ApplyTask::ManualFlush);
Expand Down
5 changes: 3 additions & 2 deletions components/raftstore-v2/src/router/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

// #[PerformanceCriticalPath]
use std::sync::Arc;

use kvproto::{
import_sstpb::SstMeta,
Expand All @@ -21,7 +22,7 @@ use super::response_channel::{
QueryResSubscriber,
};
use crate::{
operation::{CatchUpLogs, RequestHalfSplit, RequestSplit, SplitInit},
operation::{CatchUpLogs, ReplayWatch, RequestHalfSplit, RequestSplit, SplitInit},
router::ApplyRes,
};

Expand Down Expand Up @@ -167,7 +168,7 @@ pub enum PeerMsg {
LogsFetched(FetchedLogs),
SnapshotGenerated(GenSnapRes),
/// Start the FSM.
Start,
Start(Option<Arc<ReplayWatch>>),
/// Messages from peer to peer in the same store
SplitInit(Box<SplitInit>),
SplitInitFinish(u64),
Expand Down