diff --git a/core/lib/storage/src/db.rs b/core/lib/storage/src/db.rs index be8a1665bbe..3280183abf9 100644 --- a/core/lib/storage/src/db.rs +++ b/core/lib/storage/src/db.rs @@ -203,6 +203,16 @@ impl RocksDBInner { } } +impl Drop for RocksDBInner { + fn drop(&mut self) { + tracing::debug!( + "Canceling background compactions / flushes for DB `{}`", + self.db_name + ); + self.db.cancel_all_background_work(true); + } +} + /// Configuration for retries when RocksDB writes are stalled. #[derive(Debug, Clone, Copy)] pub struct StalledWritesRetries { @@ -448,13 +458,21 @@ impl RocksDB { let raw_batch_bytes = raw_batch.data().to_vec(); let mut retries = self.stalled_writes_retries.intervals(); + let mut stalled_write_reported = false; + let started_at = Instant::now(); loop { match self.write_inner(raw_batch) { - Ok(()) => return Ok(()), + Ok(()) => { + if stalled_write_reported { + METRICS.observe_stalled_write_duration(CF::DB_NAME, started_at.elapsed()); + } + return Ok(()); + } Err(err) => { let is_stalled_write = StalledWritesRetries::is_write_stall_error(&err); - if is_stalled_write { - METRICS.report_stalled_write(CF::DB_NAME); + if is_stalled_write && !stalled_write_reported { + METRICS.observe_stalled_write(CF::DB_NAME); + stalled_write_reported = true; } else { return Err(err); } @@ -552,12 +570,6 @@ impl RocksDB<()> { } } -impl Drop for RocksDB { - fn drop(&mut self) { - self.inner.db.cancel_all_background_work(true); - } -} - /// Empty struct used to register rocksdb instance #[derive(Debug)] struct RegistryEntry; diff --git a/core/lib/storage/src/metrics.rs b/core/lib/storage/src/metrics.rs index a8f4fb1e7b4..0c26bd749d5 100644 --- a/core/lib/storage/src/metrics.rs +++ b/core/lib/storage/src/metrics.rs @@ -6,6 +6,7 @@ use vise::{Buckets, Collector, Counter, EncodeLabelSet, Family, Gauge, Histogram use std::{ collections::HashMap, sync::{Mutex, Weak}, + time::Duration, }; use crate::db::RocksDBInner; @@ -41,8 +42,15 @@ pub(crate) struct RocksdbMetrics { /// Size of a serialized `WriteBatch` written to a RocksDB instance. #[metrics(buckets = BYTE_SIZE_BUCKETS)] write_batch_size: Family>, - /// Number of stalled writes for a RocksDB instance. + /// Number of independent stalled writes for a RocksDB instance. + // The counter is similar for the counter in `stalled_write_duration` histogram, but is reported earlier + // (immediately when stalled write is encountered, rather than when it's resolved). write_stalled: Family, + /// Total duration of a stalled writes instance for a RocksDB instance. Naturally, this only reports + /// stalled writes that were resolved in time (otherwise, the stall error is propagated, which + /// leads to a panic). + #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] + stalled_write_duration: Family>, } impl RocksdbMetrics { @@ -50,9 +58,17 @@ impl RocksdbMetrics { self.write_batch_size[&db.into()].observe(batch_size); } - pub(crate) fn report_stalled_write(&self, db: &'static str) { + pub(crate) fn observe_stalled_write(&self, db: &'static str) { self.write_stalled[&db.into()].inc(); } + + pub(crate) fn observe_stalled_write_duration( + &self, + db: &'static str, + stall_duration: Duration, + ) { + self.stalled_write_duration[&db.into()].observe(stall_duration); + } } #[vise::register]