Skip to content

Commit

Permalink
raftstore: Add slow log for peer and store msg (#16605) (#17035)
Browse files Browse the repository at this point in the history
ref #16600

Add slow log for peer and store msg

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
Signed-off-by: Connor1996 <zbk602423539@gmail.com>

Co-authored-by: Connor <zbk602423539@gmail.com>
Co-authored-by: Connor1996 <zbk602423539@gmail.com>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed May 28, 2024
1 parent 8fbb6d9 commit 1a2c88d
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 14 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions components/raftstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ slog = { workspace = true }
slog-global = { workspace = true }
smallvec = "1.4"
sst_importer = { workspace = true }
strum = { version = "0.20", features = ["derive"] }
strum_macros = "0.24"
tempfile = "3.0"
thiserror = "1.0"
tidb_query_datatype = { workspace = true }
Expand Down
19 changes: 16 additions & 3 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use engine_traits::{Engines, KvEngine, RaftEngine, SstMetaInfo, WriteBatchExt, C
use error_code::ErrorCodeExt;
use fail::fail_point;
use futures::channel::mpsc::UnboundedSender;
use itertools::Itertools;
use keys::{self, enc_end_key, enc_start_key};
use kvproto::{
brpb::CheckAdminResponse,
Expand Down Expand Up @@ -49,13 +50,15 @@ use raft::{
GetEntriesContext, Progress, ReadState, SnapshotStatus, StateRole, INVALID_INDEX, NO_LIMIT,
};
use smallvec::SmallVec;
use strum::{EnumCount, VariantNames};
use tikv_alloc::trace::TraceEvent;
use tikv_util::{
box_err, debug, defer, error, escape, info, info_or_debug, is_zero_duration,
mpsc::{self, LooseBoundedSender, Receiver},
slow_log,
store::{find_peer, find_peer_by_id, is_learner, region_on_same_stores},
sys::disk::DiskUsage,
time::{monotonic_raw_now, Instant as TiInstant},
time::{monotonic_raw_now, Instant as TiInstant, SlowTimer},
trace, warn,
worker::{ScheduleError, Scheduler},
Either,
Expand Down Expand Up @@ -617,9 +620,12 @@ where
}

pub fn handle_msgs(&mut self, msgs: &mut Vec<PeerMsg<EK>>) {
let timer = TiInstant::now_coarse();
let timer = SlowTimer::from_millis(100);
let count = msgs.len();
#[allow(const_evaluatable_unchecked)]
let mut distribution = [0; PeerMsg::<EK>::COUNT];
for m in msgs.drain(..) {
distribution[m.discriminant()] += 1;
match m {
PeerMsg::RaftMessage(msg, sent_time) => {
if let Some(sent_time) = sent_time {
Expand Down Expand Up @@ -705,12 +711,19 @@ where
}
}
self.on_loop_finished();
slow_log!(
T timer,
"{} handle {} peer messages {:?}",
self.fsm.peer.tag,
count,
PeerMsg::<EK>::VARIANTS.iter().zip(distribution).filter(|(_, c)| *c > 0).format(", "),
);
self.ctx.raft_metrics.peer_msg_len.observe(count as f64);
self.ctx
.raft_metrics
.event_time
.peer_msg
.observe(timer.saturating_elapsed_secs());
.observe(timer.saturating_elapsed().as_secs_f64());
}

#[inline]
Expand Down
21 changes: 17 additions & 4 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use fail::fail_point;
use file_system::{IoType, WithIoType};
use futures::{compat::Future01CompatExt, FutureExt};
use grpcio_health::HealthService;
use itertools::Itertools;
use keys::{self, data_end_key, data_key, enc_end_key, enc_start_key};
use kvproto::{
metapb::{self, Region, RegionEpoch},
Expand All @@ -49,6 +50,7 @@ use resource_control::{channel::unbounded, ResourceGroupManager};
use resource_metering::CollectorRegHandle;
use service::service_manager::GrpcServiceManager;
use sst_importer::SstImporter;
use strum::{EnumCount, VariantNames};
use tikv_alloc::trace::TraceEvent;
use tikv_util::{
box_try,
Expand All @@ -61,7 +63,7 @@ use tikv_util::{
store::{find_peer, region_on_stores},
sys as sys_util,
sys::disk::{get_disk_status, DiskUsage},
time::{duration_to_sec, monotonic_raw_now, Instant as TiInstant},
time::{duration_to_sec, monotonic_raw_now, Instant as TiInstant, SlowTimer},
timer::SteadyTimer,
warn,
worker::{LazyWorker, Scheduler, Worker},
Expand Down Expand Up @@ -783,15 +785,19 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport>
.observe(duration_to_sec(elapsed));
slow_log!(
elapsed,
"[store {}] handle timeout {:?}",
"[store {}] handle tick {:?}",
self.fsm.store.id,
tick
);
}

fn handle_msgs(&mut self, msgs: &mut Vec<StoreMsg<EK>>) {
let timer = TiInstant::now_coarse();
let timer = SlowTimer::from_millis(100);
let count = msgs.len();
#[allow(const_evaluatable_unchecked)]
let mut distribution = [0; StoreMsg::<EK>::COUNT];
for m in msgs.drain(..) {
distribution[m.discriminant()] += 1;
match m {
StoreMsg::Tick(tick) => self.on_tick(tick),
StoreMsg::RaftMessage(msg) => {
Expand Down Expand Up @@ -844,11 +850,18 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport>
}
}
}
slow_log!(
T timer,
"[store {}] handle {} store messages {:?}",
self.fsm.store.id,
count,
StoreMsg::<EK>::VARIANTS.iter().zip(distribution).filter(|(_, c)| *c > 0).format(", "),
);
self.ctx
.raft_metrics
.event_time
.store_msg
.observe(timer.saturating_elapsed_secs());
.observe(timer.saturating_elapsed().as_secs_f64());
}

fn start(&mut self, store: metapb::Store) {
Expand Down
56 changes: 49 additions & 7 deletions components/raftstore/src/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use pd_client::BucketMeta;
use raft::SnapshotStatus;
use resource_control::ResourceMetered;
use smallvec::{smallvec, SmallVec};
use strum::{EnumCount, EnumVariantNames};
use tikv_util::{deadline::Deadline, escape, memory::HeapSize, time::Instant};
use tracker::{get_tls_tracker_token, TrackerToken};

Expand Down Expand Up @@ -748,11 +749,13 @@ pub struct InspectedRaftMessage {

/// Message that can be sent to a peer.
#[allow(clippy::large_enum_variant)]
#[derive(EnumCount, EnumVariantNames)]
#[repr(u8)]
pub enum PeerMsg<EK: KvEngine> {
/// Raft message is the message sent between raft nodes in the same
/// raft group. Messages need to be redirected to raftstore if target
/// peer doesn't exist.
RaftMessage(InspectedRaftMessage, Option<Instant>),
RaftMessage(InspectedRaftMessage, Option<Instant>) = 0,
/// Raft command is the command that is expected to be proposed by the
/// leader of the target raft group. If it's failed to be sent, callback
/// usually needs to be called before dropping in case of resource leak.
Expand Down Expand Up @@ -818,6 +821,23 @@ impl<EK: KvEngine> fmt::Debug for PeerMsg<EK> {
}

impl<EK: KvEngine> PeerMsg<EK> {
pub fn discriminant(&self) -> usize {
match self {
PeerMsg::RaftMessage(..) => 0,
PeerMsg::RaftCommand(_) => 1,
PeerMsg::Tick(_) => 2,
PeerMsg::SignificantMsg(_) => 3,
PeerMsg::ApplyRes { .. } => 4,
PeerMsg::Start => 5,
PeerMsg::Noop => 6,
PeerMsg::Persisted { .. } => 7,
PeerMsg::CasualMessage(_) => 8,
PeerMsg::HeartbeatPd => 9,
PeerMsg::UpdateReplicationMode => 10,
PeerMsg::Destroy(_) => 11,
}
}

/// For some specific kind of messages, it's actually acceptable if failed
/// to send it by `significant_send`. This function determine if the
/// current message is acceptable to fail.
Expand All @@ -829,6 +849,7 @@ impl<EK: KvEngine> PeerMsg<EK> {
}
}

#[derive(EnumCount, EnumVariantNames)]
pub enum StoreMsg<EK>
where
EK: KvEngine,
Expand Down Expand Up @@ -861,10 +882,6 @@ where
inspector: LatencyInspector,
},

/// Message only used for test.
#[cfg(any(test, feature = "testexport"))]
Validate(Box<dyn FnOnce(&crate::store::Config) + Send>),

UnsafeRecoveryReport(pdpb::StoreReport),
UnsafeRecoveryCreatePeer {
syncer: UnsafeRecoveryExecutePlanSyncer,
Expand All @@ -876,6 +893,10 @@ where
AwakenRegions {
abnormal_stores: Vec<u64>,
},

/// Message only used for test.
#[cfg(any(test, feature = "testexport"))]
Validate(Box<dyn FnOnce(&crate::store::Config) + Send>),
}

impl<EK: KvEngine> ResourceMetered for StoreMsg<EK> {}
Expand All @@ -901,8 +922,6 @@ where
),
StoreMsg::Tick(tick) => write!(fmt, "StoreTick {:?}", tick),
StoreMsg::Start { ref store } => write!(fmt, "Start store {:?}", store),
#[cfg(any(test, feature = "testexport"))]
StoreMsg::Validate(_) => write!(fmt, "Validate config"),
StoreMsg::UpdateReplicationMode(_) => write!(fmt, "UpdateReplicationMode"),
StoreMsg::LatencyInspect { .. } => write!(fmt, "LatencyInspect"),
StoreMsg::UnsafeRecoveryReport(..) => write!(fmt, "UnsafeRecoveryReport"),
Expand All @@ -911,6 +930,29 @@ where
}
StoreMsg::GcSnapshotFinish => write!(fmt, "GcSnapshotFinish"),
StoreMsg::AwakenRegions { .. } => write!(fmt, "AwakenRegions"),
#[cfg(any(test, feature = "testexport"))]
StoreMsg::Validate(_) => write!(fmt, "Validate config"),
}
}
}

impl<EK: KvEngine> StoreMsg<EK> {
pub fn discriminant(&self) -> usize {
match self {
StoreMsg::RaftMessage(_) => 0,
StoreMsg::StoreUnreachable { .. } => 1,
StoreMsg::CompactedEvent(_) => 2,
StoreMsg::ClearRegionSizeInRange { .. } => 3,
StoreMsg::Tick(_) => 4,
StoreMsg::Start { .. } => 5,
StoreMsg::UpdateReplicationMode(_) => 6,
StoreMsg::LatencyInspect { .. } => 7,
StoreMsg::UnsafeRecoveryReport(_) => 8,
StoreMsg::UnsafeRecoveryCreatePeer { .. } => 9,
StoreMsg::GcSnapshotFinish => 10,
StoreMsg::AwakenRegions { .. } => 11,
#[cfg(any(test, feature = "testexport"))]
StoreMsg::Validate(_) => 12,
}
}
}

0 comments on commit 1a2c88d

Please sign in to comment.