Skip to content

Commit

Permalink
metrics: improve duration tracing to make it accurate
Browse files Browse the repository at this point in the history
Signed-off-by: Liu Cong <innerr@gmail.com>
  • Loading branch information
innerr committed Aug 28, 2020
1 parent 934d9d8 commit 04b6c13
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 25 deletions.
11 changes: 4 additions & 7 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3086,13 +3086,14 @@ where
apply_ctx: &mut ApplyContext<EK, W>,
msgs: &mut Vec<Msg<EK>>,
) {
let mut channel_timer = None;
let mut wait_time_recorded = false;
let mut drainer = msgs.drain(..);
loop {
match drainer.next() {
Some(Msg::Apply { start, apply }) => {
if channel_timer.is_none() {
channel_timer = Some(start);
if !wait_time_recorded {
APPLY_TASK_WAIT_TIME_HISTOGRAM.observe(duration_to_sec(start.elapsed()));
wait_time_recorded = true;
}
self.handle_apply(apply_ctx, apply);
if let Some(ref mut state) = self.delegate.yield_state {
Expand All @@ -3118,10 +3119,6 @@ where
None => break,
}
}
if let Some(timer) = channel_timer {
let elapsed = duration_to_sec(timer.elapsed());
APPLY_TASK_WAIT_TIME_HISTOGRAM.observe(elapsed);
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/server/gc_worker/gc_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ mod tests {
TestEngineBuilder, WriteData,
};
use crate::storage::lock_manager::DummyLockManager;
use crate::storage::metrics::CommandKind;
use crate::storage::{txn::commands, Engine, Storage, TestStorageBuilder};

use super::*;
Expand Down Expand Up @@ -880,6 +881,7 @@ mod tests {
ctx: &Context,
mut batch: WriteData,
callback: EngineCallback<()>,
tag: Option<CommandKind>,
) -> EngineResult<()> {
batch.modifies.iter_mut().for_each(|modify| match modify {
Modify::Delete(_, ref mut key) => {
Expand All @@ -893,7 +895,7 @@ mod tests {
*end_key = Key::from_encoded(keys::data_end_key(end_key.as_encoded()));
}
});
self.0.async_write(ctx, batch, callback)
self.0.async_write(ctx, batch, callback, tag)
}

fn async_snapshot(
Expand Down
26 changes: 25 additions & 1 deletion src/server/raftkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::storage::kv::{
ErrorInner as KvErrorInner, Iterator as EngineIterator, Modify, ScanMode,
Snapshot as EngineSnapshot, WriteData,
};

use crate::storage::{self, kv};
use raftstore::errors::Error as RaftServerError;
use raftstore::router::RaftStoreRouter;
Expand All @@ -30,6 +31,10 @@ use raftstore::store::{RegionIterator, RegionSnapshot};
use tikv_util::time::Instant;
use tikv_util::time::ThreadReadId;

use crate::storage::metrics::CommandKind;
use crate::storage::metrics::ASYNC_WRITE_DURATIONS_VEC;
use crate::storage::metrics::PRE_ASYNC_WRITE_DURATIONS_VEC;

quick_error! {
#[derive(Debug)]
pub enum Error {
Expand Down Expand Up @@ -313,7 +318,14 @@ impl<S: RaftStoreRouter<RocksEngine>> Engine for RaftKv<S> {
write_modifies(&self.engine, modifies)
}

fn async_write(&self, ctx: &Context, batch: WriteData, cb: Callback<()>) -> kv::Result<()> {
fn async_write(
&self,
ctx: &Context,
batch: WriteData,
cb: Callback<()>,
tag: Option<CommandKind>,
) -> kv::Result<()> {
let pre_begin_instant = Instant::now_coarse();
fail_point!("raftkv_async_write");
if batch.modifies.is_empty() {
return Err(KvError::from(KvErrorInner::EmptyRequest));
Expand Down Expand Up @@ -356,6 +368,12 @@ impl<S: RaftStoreRouter<RocksEngine>> Engine for RaftKv<S> {
}

ASYNC_REQUESTS_COUNTER_VEC.write.all.inc();

if let Some(tag) = tag {
PRE_ASYNC_WRITE_DURATIONS_VEC
.get(tag)
.observe(pre_begin_instant.elapsed_secs());
}
let begin_instant = Instant::now_coarse();

self.exec_write_requests(
Expand All @@ -368,6 +386,12 @@ impl<S: RaftStoreRouter<RocksEngine>> Engine for RaftKv<S> {
ASYNC_REQUESTS_DURATIONS_VEC
.write
.observe(begin_instant.elapsed_secs());
if let Some(tag) = tag {
ASYNC_WRITE_DURATIONS_VEC
.get(tag)
.observe(begin_instant.elapsed_secs());
}

fail_point!("raftkv_async_write_finish");
cb((cb_ctx, Ok(())))
}
Expand Down
2 changes: 2 additions & 0 deletions src/storage/kv/btree_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::storage::kv::{
ErrorInner as EngineErrorInner, Iterator, Modify, Result as EngineResult, ScanMode, Snapshot,
WriteData,
};
use crate::storage::metrics::CommandKind;
use tikv_util::time::ThreadReadId;

type RwLockTree = RwLock<BTreeMap<Key, Value>>;
Expand Down Expand Up @@ -90,6 +91,7 @@ impl Engine for BTreeEngine {
_ctx: &Context,
batch: WriteData,
cb: EngineCallback<()>,
_tag: Option<CommandKind>,
) -> EngineResult<()> {
if batch.modifies.is_empty() {
return Err(EngineError::from(EngineErrorInner::EmptyRequest));
Expand Down
11 changes: 9 additions & 2 deletions src/storage/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::fmt;
use std::time::Duration;
use std::{error, ptr, result};

use crate::storage::metrics::CommandKind;
use engine_rocks::RocksTablePropertiesCollection;
use engine_traits::{CfName, CF_DEFAULT};
use engine_traits::{IterOptions, KvEngine as LocalEngine, ReadOptions};
Expand Down Expand Up @@ -111,11 +112,17 @@ pub trait Engine: Send + Clone + 'static {
cb: Callback<Self::Snap>,
) -> Result<()>;

fn async_write(&self, ctx: &Context, batch: WriteData, callback: Callback<()>) -> Result<()>;
fn async_write(
&self,
ctx: &Context,
batch: WriteData,
callback: Callback<()>,
tag: Option<CommandKind>,
) -> Result<()>;

fn write(&self, ctx: &Context, batch: WriteData) -> Result<()> {
let timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECS);
match wait_op!(|cb| self.async_write(ctx, batch, cb), timeout) {
match wait_op!(|cb| self.async_write(ctx, batch, cb, None), timeout) {
Some((_, res)) => res,
None => Err(Error::from(ErrorInner::Timeout(timeout))),
}
Expand Down
9 changes: 8 additions & 1 deletion src/storage/kv/rocksdb_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tempfile::{Builder, TempDir};
use txn_types::{Key, Value};

use crate::storage::config::BlockCacheConfig;
use crate::storage::metrics::CommandKind;
use tikv_util::escape;
use tikv_util::time::ThreadReadId;
use tikv_util::worker::{Runnable, Scheduler, Worker};
Expand Down Expand Up @@ -290,7 +291,13 @@ impl Engine for RocksEngine {
write_modifies(&self.engines.kv, modifies)
}

fn async_write(&self, _: &Context, batch: WriteData, cb: Callback<()>) -> Result<()> {
fn async_write(
&self,
_: &Context,
batch: WriteData,
cb: Callback<()>,
_tag: Option<CommandKind>,
) -> Result<()> {
fail_point!("rockskv_async_write", |_| Err(box_err!("write failed")));

if batch.modifies.is_empty() {
Expand Down
123 changes: 123 additions & 0 deletions src/storage/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,42 @@ make_auto_flush_static_metric! {
pub struct SchedCommandPriCounterVec: LocalIntCounter {
"priority" => CommandPriority,
}

pub struct SchedWaitDurationVec: LocalHistogram {
"type" => CommandKind,
}

pub struct AsyncWriteDurationVec: LocalHistogram {
"type" => CommandKind,
}

pub struct PreAsyncWriteDurationVec: LocalHistogram {
"type" => CommandKind,
}

pub struct SchedAsyncSnapshotDurationVec: LocalHistogram {
"type" => CommandKind,
}

pub struct SchedWaitForThreadDurationVec: LocalHistogram {
"type" => CommandKind,
}

pub struct SchedProcessBeforeWriteDurationVec: LocalHistogram {
"type" => CommandKind,
}

pub struct SchedExecCallbackDurationVec: LocalHistogram {
"type" => CommandKind,
}

pub struct SchedPostHandleDurationVec: LocalHistogram {
"type" => CommandKind,
}

pub struct SchedPostWriteDurationVec: LocalHistogram {
"type" => CommandKind,
}
}

impl Into<GcKeysCF> for ServerGcKeysCF {
Expand Down Expand Up @@ -349,4 +385,91 @@ lazy_static! {
"Counter of request exceed bound"
)
.unwrap();
pub static ref SCHED_WAIT_HISTOGRAM: HistogramVec = register_histogram_vec!(
"tikv_scheduler_wait_for_process_duration_seconds",
"Bucketed histogram of duration of wait-for-process",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref SCHED_WAIT_HISTOGRAM_VEC: SchedWaitDurationVec =
auto_flush_from!(SCHED_WAIT_HISTOGRAM, SchedWaitDurationVec);
pub static ref ASYNC_WRITE_DURATIONS_VEC: AsyncWriteDurationVec =
auto_flush_from!(ASYNC_WRITE_DURATIONS, AsyncWriteDurationVec);
pub static ref ASYNC_WRITE_DURATIONS: HistogramVec = register_histogram_vec!(
"tikv_storage_engine_async_write_duration_seconds",
"Bucketed histogram of successful async-write requests.",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref PRE_ASYNC_WRITE_DURATIONS_VEC: PreAsyncWriteDurationVec =
auto_flush_from!(PRE_ASYNC_WRITE_DURATIONS, PreAsyncWriteDurationVec);
pub static ref PRE_ASYNC_WRITE_DURATIONS: HistogramVec = register_histogram_vec!(
"tikv_storage_engine_pre_async_write_duration_seconds",
"Bucketed histogram of duration of pre-async-write requests.",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref SCHED_ASYNC_SNAPSHOT_DURATIONS: HistogramVec = register_histogram_vec!(
"tikv_scheduler_async_snapshot_duration_seconds",
"Bucketed histogram of scheduler before-process duration.",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref SCHED_ASYNC_SNAPSHOT_DURATIONS_VEC: SchedAsyncSnapshotDurationVec = auto_flush_from!(
SCHED_ASYNC_SNAPSHOT_DURATIONS,
SchedAsyncSnapshotDurationVec
);
pub static ref SCHED_WAIT_FOR_THREAD_DURATIONS: HistogramVec = register_histogram_vec!(
"tikv_scheduler_wait_for_thread_duration_seconds",
"Bucketed histogram of scheduler wait-for-thread duration.",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref SCHED_WAIT_FOR_THREAD_DURATIONS_VEC: SchedWaitForThreadDurationVec = auto_flush_from!(
SCHED_WAIT_FOR_THREAD_DURATIONS,
SchedWaitForThreadDurationVec
);
pub static ref SCHED_PROCESS_BEFORE_WRITE_DURATIONS: HistogramVec = register_histogram_vec!(
"tikv_scheduler_process_before_write_duration_seconds",
"Bucketed histogram of scheduler process-before-write duration.",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref SCHED_PROCESS_BEFORE_WRITE_DURATIONS_VEC: SchedProcessBeforeWriteDurationVec = auto_flush_from!(
SCHED_PROCESS_BEFORE_WRITE_DURATIONS,
SchedProcessBeforeWriteDurationVec
);
pub static ref SCHED_EXEC_CALLBACK_DURATIONS: HistogramVec = register_histogram_vec!(
"tikv_scheduler_exec_callback_duration_seconds",
"Bucketed histogram of scheduler executing callback.",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref SCHED_EXEC_CALLBACK_DURATIONS_VEC: SchedExecCallbackDurationVec =
auto_flush_from!(SCHED_EXEC_CALLBACK_DURATIONS, SchedExecCallbackDurationVec);
pub static ref SCHED_POST_HANDLE_DURATIONS: HistogramVec = register_histogram_vec!(
"tikv_scheduler_post_handle_duration_seconds",
"Bucketed histogram of scheduler post-handle processing requests.",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref SCHED_POST_HANDLE_DURATIONS_VEC: SchedPostHandleDurationVec =
auto_flush_from!(SCHED_POST_HANDLE_DURATIONS, SchedPostHandleDurationVec);
pub static ref SCHED_POST_WRITE_DURATIONS: HistogramVec = register_histogram_vec!(
"tikv_scheduler_post_write_duration_seconds",
"Bucketed histogram of scheduler post-write processing requests.",
&["type"],
exponential_buckets(0.0005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref SCHED_POST_WRITE_DURATIONS_VEC: SchedPostWriteDurationVec =
auto_flush_from!(SCHED_POST_WRITE_DURATIONS, SchedPostWriteDurationVec);
}
8 changes: 7 additions & 1 deletion src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub use self::{
};

use crate::read_pool::{ReadPool, ReadPoolHandle};
use crate::storage::metrics::CommandKind;
pub use crate::storage::metrics::CommandKind;
use crate::storage::{
config::Config,
kv::{with_tls_engine, Modify, WriteData},
Expand Down Expand Up @@ -716,6 +716,7 @@ impl<E: Engine, L: LockManager> Storage<E, L> {
&ctx,
WriteData::from_modifies(modifies),
Box::new(|(_, res): (_, kv::Result<_>)| callback(res.map_err(Error::from))),
None,
)?;
KV_COMMAND_COUNTER_VEC_STATIC.delete_range.inc();
Ok(())
Expand Down Expand Up @@ -940,6 +941,7 @@ impl<E: Engine, L: LockManager> Storage<E, L> {
value,
)]),
Box::new(|(_, res): (_, kv::Result<_>)| callback(res.map_err(Error::from))),
None,
)?;
KV_COMMAND_COUNTER_VEC_STATIC.raw_put.inc();
Ok(())
Expand Down Expand Up @@ -969,6 +971,7 @@ impl<E: Engine, L: LockManager> Storage<E, L> {
&ctx,
WriteData::from_modifies(modifies),
Box::new(|(_, res): (_, kv::Result<_>)| callback(res.map_err(Error::from))),
None,
)?;
KV_COMMAND_COUNTER_VEC_STATIC.raw_batch_put.inc();
Ok(())
Expand All @@ -991,6 +994,7 @@ impl<E: Engine, L: LockManager> Storage<E, L> {
Key::from_encoded(key),
)]),
Box::new(|(_, res): (_, kv::Result<_>)| callback(res.map_err(Error::from))),
None,
)?;
KV_COMMAND_COUNTER_VEC_STATIC.raw_delete.inc();
Ok(())
Expand Down Expand Up @@ -1021,6 +1025,7 @@ impl<E: Engine, L: LockManager> Storage<E, L> {
&ctx,
WriteData::from_modifies(vec![Modify::DeleteRange(cf, start_key, end_key, false)]),
Box::new(|(_, res): (_, kv::Result<_>)| callback(res.map_err(Error::from))),
None,
)?;
KV_COMMAND_COUNTER_VEC_STATIC.raw_delete_range.inc();
Ok(())
Expand All @@ -1045,6 +1050,7 @@ impl<E: Engine, L: LockManager> Storage<E, L> {
&ctx,
WriteData::from_modifies(modifies),
Box::new(|(_, res): (_, kv::Result<_>)| callback(res.map_err(Error::from))),
None,
)?;
KV_COMMAND_COUNTER_VEC_STATIC.raw_batch_delete.inc();
Ok(())
Expand Down

0 comments on commit 04b6c13

Please sign in to comment.