Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: Accurate duration tracing of storage/scheduler message handling #8403

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 4 additions & 7 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3086,13 +3086,14 @@ where
apply_ctx: &mut ApplyContext<EK, W>,
msgs: &mut Vec<Msg<EK>>,
) {
let mut channel_timer = None;
let mut wait_time_recorded = false;
let mut drainer = msgs.drain(..);
loop {
match drainer.next() {
Some(Msg::Apply { start, apply }) => {
if channel_timer.is_none() {
channel_timer = Some(start);
if !wait_time_recorded {
APPLY_TASK_WAIT_TIME_HISTOGRAM.observe(duration_to_sec(start.elapsed()));
wait_time_recorded = true;
}
self.handle_apply(apply_ctx, apply);
if let Some(ref mut state) = self.delegate.yield_state {
Expand All @@ -3118,10 +3119,6 @@ where
None => break,
}
}
if let Some(timer) = channel_timer {
let elapsed = duration_to_sec(timer.elapsed());
APPLY_TASK_WAIT_TIME_HISTOGRAM.observe(elapsed);
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/server/gc_worker/gc_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ mod tests {
TestEngineBuilder, WriteData,
};
use crate::storage::lock_manager::DummyLockManager;
use crate::storage::metrics::CommandKind;
use crate::storage::{txn::commands, Engine, Storage, TestStorageBuilder};

use super::*;
Expand Down Expand Up @@ -880,6 +881,7 @@ mod tests {
ctx: &Context,
mut batch: WriteData,
callback: EngineCallback<()>,
tag: Option<CommandKind>,
) -> EngineResult<()> {
batch.modifies.iter_mut().for_each(|modify| match modify {
Modify::Delete(_, ref mut key) => {
Expand All @@ -893,7 +895,7 @@ mod tests {
*end_key = Key::from_encoded(keys::data_end_key(end_key.as_encoded()));
}
});
self.0.async_write(ctx, batch, callback)
self.0.async_write(ctx, batch, callback, tag)
}

fn async_snapshot(
Expand Down
26 changes: 25 additions & 1 deletion src/server/raftkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::storage::kv::{
ErrorInner as KvErrorInner, Iterator as EngineIterator, Modify, ScanMode,
Snapshot as EngineSnapshot, WriteData,
};

use crate::storage::{self, kv};
use raftstore::errors::Error as RaftServerError;
use raftstore::router::RaftStoreRouter;
Expand All @@ -30,6 +31,10 @@ use raftstore::store::{RegionIterator, RegionSnapshot};
use tikv_util::time::Instant;
use tikv_util::time::ThreadReadId;

use crate::storage::metrics::CommandKind;
use crate::storage::metrics::ASYNC_WRITE_DURATIONS_VEC;
use crate::storage::metrics::PRE_ASYNC_WRITE_DURATIONS_VEC;

quick_error! {
#[derive(Debug)]
pub enum Error {
Expand Down Expand Up @@ -313,7 +318,14 @@ impl<S: RaftStoreRouter<RocksEngine>> Engine for RaftKv<S> {
write_modifies(&self.engine, modifies)
}

fn async_write(&self, ctx: &Context, batch: WriteData, cb: Callback<()>) -> kv::Result<()> {
fn async_write(
&self,
ctx: &Context,
batch: WriteData,
cb: Callback<()>,
tag: Option<CommandKind>,
) -> kv::Result<()> {
let pre_begin_instant = Instant::now_coarse();
fail_point!("raftkv_async_write");
if batch.modifies.is_empty() {
return Err(KvError::from(KvErrorInner::EmptyRequest));
Expand Down Expand Up @@ -356,6 +368,12 @@ impl<S: RaftStoreRouter<RocksEngine>> Engine for RaftKv<S> {
}

ASYNC_REQUESTS_COUNTER_VEC.write.all.inc();

if let Some(tag) = tag {
PRE_ASYNC_WRITE_DURATIONS_VEC
.get(tag)
.observe(pre_begin_instant.elapsed_secs());
}
let begin_instant = Instant::now_coarse();

self.exec_write_requests(
Expand All @@ -368,6 +386,12 @@ impl<S: RaftStoreRouter<RocksEngine>> Engine for RaftKv<S> {
ASYNC_REQUESTS_DURATIONS_VEC
.write
.observe(begin_instant.elapsed_secs());
if let Some(tag) = tag {
ASYNC_WRITE_DURATIONS_VEC
.get(tag)
.observe(begin_instant.elapsed_secs());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the value calculated at L387 be reused? Or how about just keeping one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L387 doesn't record the duration by operating type (prewrite, commit, etc),
that's why we need a new metric here.

The idea of remove the origin metric crossed my mind,
but I think is not nice to simply remove it because it's depended by some panels,
We could keep both until the new panel totally replace the origin panel and the origin metric doesn't needed anymore.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about reusing the metrics and add extra dimensions?

Copy link
Contributor Author

@innerr innerr Aug 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will also make the origin panel malfunction

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? I think prometheus is OK to be query with less dimensions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? I think prometheus is OK to be query with less dimensions.

That require we rewrite the PromQL, before that, the panel will be error.

What I mean to do is two steps (the first step may last for a while and include more than one PR, that's why I separated it into 2 steps):
1, Improve metrics, as in this PR, not touch any old metrics to keep the old panels work
2, Improve panels, in the mean time, remove the old metrics as well

}

fail_point!("raftkv_async_write_finish");
cb((cb_ctx, Ok(())))
}
Expand Down
2 changes: 2 additions & 0 deletions src/storage/kv/btree_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::storage::kv::{
ErrorInner as EngineErrorInner, Iterator, Modify, Result as EngineResult, ScanMode, Snapshot,
WriteData,
};
use crate::storage::metrics::CommandKind;
use tikv_util::time::ThreadReadId;

type RwLockTree = RwLock<BTreeMap<Key, Value>>;
Expand Down Expand Up @@ -90,6 +91,7 @@ impl Engine for BTreeEngine {
_ctx: &Context,
batch: WriteData,
cb: EngineCallback<()>,
_tag: Option<CommandKind>,
) -> EngineResult<()> {
if batch.modifies.is_empty() {
return Err(EngineError::from(EngineErrorInner::EmptyRequest));
Expand Down
11 changes: 9 additions & 2 deletions src/storage/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::fmt;
use std::time::Duration;
use std::{error, ptr, result};

use crate::storage::metrics::CommandKind;
use engine_rocks::RocksTablePropertiesCollection;
use engine_traits::{CfName, CF_DEFAULT};
use engine_traits::{IterOptions, KvEngine as LocalEngine, ReadOptions};
Expand Down Expand Up @@ -111,11 +112,17 @@ pub trait Engine: Send + Clone + 'static {
cb: Callback<Self::Snap>,
) -> Result<()>;

fn async_write(&self, ctx: &Context, batch: WriteData, callback: Callback<()>) -> Result<()>;
fn async_write(
&self,
ctx: &Context,
batch: WriteData,
callback: Callback<()>,
tag: Option<CommandKind>,
) -> Result<()>;

fn write(&self, ctx: &Context, batch: WriteData) -> Result<()> {
let timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECS);
match wait_op!(|cb| self.async_write(ctx, batch, cb), timeout) {
match wait_op!(|cb| self.async_write(ctx, batch, cb, None), timeout) {
Some((_, res)) => res,
None => Err(Error::from(ErrorInner::Timeout(timeout))),
}
Expand Down
9 changes: 8 additions & 1 deletion src/storage/kv/rocksdb_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tempfile::{Builder, TempDir};
use txn_types::{Key, Value};

use crate::storage::config::BlockCacheConfig;
use crate::storage::metrics::CommandKind;
use tikv_util::escape;
use tikv_util::time::ThreadReadId;
use tikv_util::worker::{Runnable, Scheduler, Worker};
Expand Down Expand Up @@ -290,7 +291,13 @@ impl Engine for RocksEngine {
write_modifies(&self.engines.kv, modifies)
}

fn async_write(&self, _: &Context, batch: WriteData, cb: Callback<()>) -> Result<()> {
fn async_write(
&self,
_: &Context,
batch: WriteData,
cb: Callback<()>,
_tag: Option<CommandKind>,
) -> Result<()> {
fail_point!("rockskv_async_write", |_| Err(box_err!("write failed")));

if batch.modifies.is_empty() {
Expand Down
123 changes: 123 additions & 0 deletions src/storage/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,42 @@ make_auto_flush_static_metric! {
pub struct SchedCommandPriCounterVec: LocalIntCounter {
"priority" => CommandPriority,
}

pub struct SchedWaitDurationVec: LocalHistogram {
"type" => CommandKind,
}

pub struct AsyncWriteDurationVec: LocalHistogram {
"type" => CommandKind,
}

pub struct PreAsyncWriteDurationVec: LocalHistogram {
"type" => CommandKind,
}

pub struct SchedAsyncSnapshotDurationVec: LocalHistogram {
"type" => CommandKind,
}

pub struct SchedWaitForThreadDurationVec: LocalHistogram {
"type" => CommandKind,
}

pub struct SchedProcessBeforeWriteDurationVec: LocalHistogram {
"type" => CommandKind,
}

pub struct SchedExecCallbackDurationVec: LocalHistogram {
"type" => CommandKind,
}

pub struct SchedPostHandleDurationVec: LocalHistogram {
"type" => CommandKind,
}

pub struct SchedPostWriteDurationVec: LocalHistogram {
"type" => CommandKind,
}
}

impl Into<GcKeysCF> for ServerGcKeysCF {
Expand Down Expand Up @@ -349,4 +385,91 @@ lazy_static! {
"Counter of request exceed bound"
)
.unwrap();
pub static ref SCHED_WAIT_HISTOGRAM: HistogramVec = register_histogram_vec!(
"tikv_scheduler_wait_for_process_duration_seconds",
"Bucketed histogram of duration of wait-for-process",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref SCHED_WAIT_HISTOGRAM_VEC: SchedWaitDurationVec =
auto_flush_from!(SCHED_WAIT_HISTOGRAM, SchedWaitDurationVec);
pub static ref ASYNC_WRITE_DURATIONS_VEC: AsyncWriteDurationVec =
auto_flush_from!(ASYNC_WRITE_DURATIONS, AsyncWriteDurationVec);
pub static ref ASYNC_WRITE_DURATIONS: HistogramVec = register_histogram_vec!(
"tikv_storage_engine_async_write_duration_seconds",
"Bucketed histogram of successful async-write requests.",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref PRE_ASYNC_WRITE_DURATIONS_VEC: PreAsyncWriteDurationVec =
auto_flush_from!(PRE_ASYNC_WRITE_DURATIONS, PreAsyncWriteDurationVec);
pub static ref PRE_ASYNC_WRITE_DURATIONS: HistogramVec = register_histogram_vec!(
"tikv_storage_engine_pre_async_write_duration_seconds",
"Bucketed histogram of duration of pre-async-write requests.",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref SCHED_ASYNC_SNAPSHOT_DURATIONS: HistogramVec = register_histogram_vec!(
"tikv_scheduler_async_snapshot_duration_seconds",
"Bucketed histogram of scheduler before-process duration.",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref SCHED_ASYNC_SNAPSHOT_DURATIONS_VEC: SchedAsyncSnapshotDurationVec = auto_flush_from!(
SCHED_ASYNC_SNAPSHOT_DURATIONS,
SchedAsyncSnapshotDurationVec
);
pub static ref SCHED_WAIT_FOR_THREAD_DURATIONS: HistogramVec = register_histogram_vec!(
"tikv_scheduler_wait_for_thread_duration_seconds",
"Bucketed histogram of scheduler wait-for-thread duration.",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref SCHED_WAIT_FOR_THREAD_DURATIONS_VEC: SchedWaitForThreadDurationVec = auto_flush_from!(
SCHED_WAIT_FOR_THREAD_DURATIONS,
SchedWaitForThreadDurationVec
);
pub static ref SCHED_PROCESS_BEFORE_WRITE_DURATIONS: HistogramVec = register_histogram_vec!(
"tikv_scheduler_process_before_write_duration_seconds",
"Bucketed histogram of scheduler process-before-write duration.",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref SCHED_PROCESS_BEFORE_WRITE_DURATIONS_VEC: SchedProcessBeforeWriteDurationVec = auto_flush_from!(
SCHED_PROCESS_BEFORE_WRITE_DURATIONS,
SchedProcessBeforeWriteDurationVec
);
pub static ref SCHED_EXEC_CALLBACK_DURATIONS: HistogramVec = register_histogram_vec!(
"tikv_scheduler_exec_callback_duration_seconds",
"Bucketed histogram of scheduler executing callback.",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref SCHED_EXEC_CALLBACK_DURATIONS_VEC: SchedExecCallbackDurationVec =
auto_flush_from!(SCHED_EXEC_CALLBACK_DURATIONS, SchedExecCallbackDurationVec);
pub static ref SCHED_POST_HANDLE_DURATIONS: HistogramVec = register_histogram_vec!(
"tikv_scheduler_post_handle_duration_seconds",
"Bucketed histogram of scheduler post-handle processing requests.",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref SCHED_POST_HANDLE_DURATIONS_VEC: SchedPostHandleDurationVec =
auto_flush_from!(SCHED_POST_HANDLE_DURATIONS, SchedPostHandleDurationVec);
pub static ref SCHED_POST_WRITE_DURATIONS: HistogramVec = register_histogram_vec!(
"tikv_scheduler_post_write_duration_seconds",
"Bucketed histogram of scheduler post-write processing requests.",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref SCHED_POST_WRITE_DURATIONS_VEC: SchedPostWriteDurationVec =
auto_flush_from!(SCHED_POST_WRITE_DURATIONS, SchedPostWriteDurationVec);
}
8 changes: 7 additions & 1 deletion src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub use self::{
};

use crate::read_pool::{ReadPool, ReadPoolHandle};
use crate::storage::metrics::CommandKind;
pub use crate::storage::metrics::CommandKind;
use crate::storage::{
config::Config,
kv::{with_tls_engine, Modify, WriteData},
Expand Down Expand Up @@ -716,6 +716,7 @@ impl<E: Engine, L: LockManager> Storage<E, L> {
&ctx,
WriteData::from_modifies(modifies),
Box::new(|(_, res): (_, kv::Result<_>)| callback(res.map_err(Error::from))),
None,
)?;
KV_COMMAND_COUNTER_VEC_STATIC.delete_range.inc();
Ok(())
Expand Down Expand Up @@ -940,6 +941,7 @@ impl<E: Engine, L: LockManager> Storage<E, L> {
value,
)]),
Box::new(|(_, res): (_, kv::Result<_>)| callback(res.map_err(Error::from))),
None,
)?;
KV_COMMAND_COUNTER_VEC_STATIC.raw_put.inc();
Ok(())
Expand Down Expand Up @@ -969,6 +971,7 @@ impl<E: Engine, L: LockManager> Storage<E, L> {
&ctx,
WriteData::from_modifies(modifies),
Box::new(|(_, res): (_, kv::Result<_>)| callback(res.map_err(Error::from))),
None,
)?;
KV_COMMAND_COUNTER_VEC_STATIC.raw_batch_put.inc();
Ok(())
Expand All @@ -991,6 +994,7 @@ impl<E: Engine, L: LockManager> Storage<E, L> {
Key::from_encoded(key),
)]),
Box::new(|(_, res): (_, kv::Result<_>)| callback(res.map_err(Error::from))),
None,
)?;
KV_COMMAND_COUNTER_VEC_STATIC.raw_delete.inc();
Ok(())
Expand Down Expand Up @@ -1021,6 +1025,7 @@ impl<E: Engine, L: LockManager> Storage<E, L> {
&ctx,
WriteData::from_modifies(vec![Modify::DeleteRange(cf, start_key, end_key, false)]),
Box::new(|(_, res): (_, kv::Result<_>)| callback(res.map_err(Error::from))),
None,
)?;
KV_COMMAND_COUNTER_VEC_STATIC.raw_delete_range.inc();
Ok(())
Expand All @@ -1045,6 +1050,7 @@ impl<E: Engine, L: LockManager> Storage<E, L> {
&ctx,
WriteData::from_modifies(modifies),
Box::new(|(_, res): (_, kv::Result<_>)| callback(res.map_err(Error::from))),
None,
)?;
KV_COMMAND_COUNTER_VEC_STATIC.raw_batch_delete.inc();
Ok(())
Expand Down