Skip to content

Commit

Permalink
metrics: Add cache_name label
Browse files Browse the repository at this point in the history
This commit adds a new `cache_name` label to a number of domain-related
metrics. This will allow us to group by the cache name when plotting
these metrics, making it much easier to determine whether the domains in
a cache are experiencing congestion due to slow replays.

Release-Note-Core: Added a new `cache_name` label to various metrics to
  improve debuggability
Change-Id: I44a4fd6760704ac0b9175990df745e2cc63a7325
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/6850
Tested-by: Buildkite CI
Reviewed-by: Luke Osborne <luke@readyset.io>
  • Loading branch information
ethan-readyset committed Feb 6, 2024
1 parent b08683d commit 511d6bc
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 70 deletions.
10 changes: 10 additions & 0 deletions readyset-client/src/metrics/mod.rs
Expand Up @@ -32,6 +32,7 @@ pub mod recorded {
/// | shard | The shard the replay miss is recorded in |
/// | miss_in | The LocalNodeIndex of the data flow node where the miss occurred |
/// | needed_for | The client tag of the request that the replay is required for. |
/// | cache_name | The name of the cache associated with this replay.
pub const DOMAIN_REPLAY_MISSES: &str = "readyset_domain.replay_misses";

/// Histogram: The time in microseconds that a domain spends
Expand Down Expand Up @@ -67,6 +68,7 @@ pub mod recorded {
/// | domain | The index of the domain the replay miss is recorded in. |
/// | shard | The shard the replay miss is recorded in. |
/// | tag | The client tag of the request that the replay is required for. |
/// | cache_name | The name of the cache associated with this replay.
pub const DOMAIN_REPLAY_TIME: &str = "readyset_domain.handle_replay_time";

/// Counter: The total time in microseconds that a domain spends
Expand All @@ -78,6 +80,7 @@ pub mod recorded {
/// | domain | The index of the domain the replay miss is recorded in. |
/// | shard | The shard the replay miss is recorded in. |
/// | tag | The client tag of the request that the replay is required for. |
/// | cache_name | The name of the cache associated with this replay.
pub const DOMAIN_TOTAL_REPLAY_TIME: &str = "readyset_domain.total_handle_replay_time";

/// Histogram: The time in microseconds spent handling a reader replay
Expand All @@ -90,6 +93,7 @@ pub mod recorded {
/// | domain | The index of the domain the reader replay request is recorded in. |
/// | shard | The shard the reader replay request is recorded in. |
/// | node | The LocalNodeIndex of the reader node handling the packet. |
/// | cache_name | The name of the cache associated with this replay.
pub const DOMAIN_READER_REPLAY_REQUEST_TIME: &str =
"readyset_domain.reader_replay_request_time_us";

Expand All @@ -103,6 +107,7 @@ pub mod recorded {
/// | domain | The index of the domain the reader replay request is recorded in. |
/// | shard | The shard the reader replay request is recorded in. |
/// | node | The LocalNodeIndex of the reader node handling the packet. |
/// | cache_name | The name of the cache associated with this replay.
pub const DOMAIN_READER_TOTAL_REPLAY_REQUEST_TIME: &str =
"readyset_domain.reader_total_replay_request_time_us";

Expand All @@ -115,6 +120,7 @@ pub mod recorded {
/// | domain | The index of the domain the replay request is recorded in. |
/// | shard |The shard the replay request is recorded in. |
/// | tag | The client tag of the request that the replay is required for. |
/// | cache_name | The name of the cache associated with this replay.
pub const DOMAIN_SEED_REPLAY_TIME: &str = "readyset_domain.seed_replay_time_us";

/// Counter: The total time in microseconds that a domain spends
Expand All @@ -126,6 +132,7 @@ pub mod recorded {
/// | domain | The index of the domain the replay request is recorded in. |
/// | shard |The shard the replay request is recorded in. |
/// | tag | The client tag of the request that the replay is required for. |
/// | cache_name | The name of the cache associated with this replay.
pub const DOMAIN_TOTAL_SEED_REPLAY_TIME: &str = "readyset_domain.total_seed_replay_time_us";

/// Histogram: The time in microseconds that a domain spawning a state
Expand Down Expand Up @@ -186,6 +193,7 @@ pub mod recorded {
/// | domain | The index of the domain the replay request is recorded in. |
/// | shard | The shard the replay request is recorded in. |
/// | tag | The client tag of the request that the Finish packet is required for. |
/// | cache_name | The name of the cache associated with this replay.
pub const DOMAIN_FINISH_REPLAY_TIME: &str = "readyset_domain.finish_replay_time_us";

/// Counter: The total time in microseconds that a domain spends
Expand All @@ -197,6 +205,7 @@ pub mod recorded {
/// | domain | The index of the domain the replay request is recorded in. |
/// | shard | The shard the replay request is recorded in. |
/// | tag | The client tag of the request that the Finish packet is required for. |
/// | cache_name | The name of the cache associated with this replay.
pub const DOMAIN_TOTAL_FINISH_REPLAY_TIME: &str = "readyset_domain.total_finish_replay_time_us";

/// Histogram: The amount of time spent handling an eviction
Expand Down Expand Up @@ -280,6 +289,7 @@ pub mod recorded {
/// | table_name | The name of the base table. |
/// | shard | The shard of the base table the lookup is requested in. |
/// | node | The LocalNodeIndex of the base table node handling the packet. |
/// | cache_name | The name of the cache associated with this replay.
pub const BASE_TABLE_LOOKUP_REQUESTS: &str = "readyset_base_table.lookup_requests";

/// Counter: The number of packets dropped by an egress node.
Expand Down
17 changes: 9 additions & 8 deletions readyset-dataflow/src/backlog/mod.rs
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;
use ahash::RandomState;
use common::SizeOf;
use dataflow_expression::{PostLookup, ReaderProcessing};
use nom_sql::Relation;
use reader_map::{EvictionQuantity, EvictionStrategy};
use readyset_client::consistency::Timestamp;
use readyset_client::results::SharedResults;
Expand All @@ -23,7 +24,7 @@ pub type ReaderUpdatedNotifier = tokio::sync::broadcast::Receiver<ReaderNotifica
pub(crate) type ReaderUpdatedSender = tokio::sync::broadcast::Sender<ReaderNotification>;

pub(crate) trait Trigger =
Fn(&mut dyn Iterator<Item = KeyComparison>) -> bool + 'static + Send + Sync;
Fn(&mut dyn Iterator<Item = KeyComparison>, Relation) -> bool + 'static + Send + Sync;

/// Allocate a new end-user facing result table.
///
Expand Down Expand Up @@ -499,7 +500,7 @@ impl std::fmt::Debug for SingleReadHandle {

impl SingleReadHandle {
/// Trigger a replay of a missing key from a partially materialized view.
pub fn trigger<I>(&self, keys: I) -> bool
pub fn trigger<I>(&self, keys: I, name: Relation) -> bool
where
I: Iterator<Item = KeyComparison>,
{
Expand All @@ -511,7 +512,7 @@ impl SingleReadHandle {
let mut it = keys;

// trigger a replay to populate
(*self.trigger.as_ref().unwrap())(&mut it)
(*self.trigger.as_ref().unwrap())(&mut it, name)
}

/// Returns None if this handle is not ready, Some(true) if this handle fully contains the given
Expand Down Expand Up @@ -749,7 +750,7 @@ mod tests {
let (r, mut w) = new_partial(
1,
Index::hash_map(vec![0]),
|_: &mut dyn Iterator<Item = KeyComparison>| true,
|_: &mut dyn Iterator<Item = KeyComparison>, _| true,
EvictionKind::Random,
ReaderProcessing::default(),
);
Expand All @@ -774,7 +775,7 @@ mod tests {
let (r, mut w) = new_partial(
1,
Index::hash_map(vec![0]),
|_: &mut dyn Iterator<Item = KeyComparison>| true,
|_: &mut dyn Iterator<Item = KeyComparison>, _| true,
EvictionKind::Random,
ReaderProcessing::default(),
);
Expand All @@ -793,7 +794,7 @@ mod tests {
let (r, mut w) = new_partial(
1,
Index::btree_map(vec![0]),
|_: &mut dyn Iterator<Item = KeyComparison>| true,
|_: &mut dyn Iterator<Item = KeyComparison>, _| true,
EvictionKind::Random,
ReaderProcessing::default(),
);
Expand Down Expand Up @@ -823,7 +824,7 @@ mod tests {
let (r, mut w) = new_partial(
1,
Index::btree_map(vec![0]),
|_: &mut dyn Iterator<Item = KeyComparison>| true,
|_: &mut dyn Iterator<Item = KeyComparison>, _| true,
EvictionKind::Random,
ReaderProcessing::default(),
);
Expand All @@ -844,7 +845,7 @@ mod tests {
let (r, mut w) = new_partial(
1,
Index::btree_map(vec![0]),
|_: &mut dyn Iterator<Item = KeyComparison>| true,
|_: &mut dyn Iterator<Item = KeyComparison>, _| true,
EvictionKind::Random,
ReaderProcessing::default(),
);
Expand Down
63 changes: 48 additions & 15 deletions readyset-dataflow/src/domain/domain_metrics.rs
Expand Up @@ -11,11 +11,11 @@ use metrics::{
register_counter, register_gauge, register_histogram, Counter, Gauge, Histogram, Label,
SharedString,
};
use readyset_client::internal::ReplicaAddress;
use nom_sql::Relation;
use readyset_client::metrics::recorded;
use strum::{EnumCount, IntoEnumIterator};

use crate::domain::{LocalNodeIndex, Tag};
use crate::domain::{LocalNodeIndex, ReplicaAddress, Tag};
use crate::{NodeMap, Packet, PacketDiscriminants};

/// Contains handles to the various metrics collected for a domain.
Expand Down Expand Up @@ -165,7 +165,7 @@ impl DomainMetrics {
}
}

pub(super) fn rec_replay_time(&mut self, tag: Tag, time: Duration) {
pub(super) fn rec_replay_time(&mut self, tag: Tag, cache_name: &Relation, time: Duration) {
if let Some((ctr, histo)) = self.total_replay_time.get(&tag) {
ctr.increment(time.as_micros() as u64);
histo.record(time.as_micros() as f64);
Expand All @@ -174,14 +174,16 @@ impl DomainMetrics {
recorded::DOMAIN_TOTAL_REPLAY_TIME,
"domain" => self.index.clone(),
"shard" => self.shard.clone(),
"tag" => tag.to_string()
"tag" => tag.to_string(),
"cache_name" => cache_name_to_string(cache_name)
);

let histo = register_histogram!(
recorded::DOMAIN_REPLAY_TIME,
"domain" => self.index.clone(),
"shard" => self.shard.clone(),
"tag" => tag.to_string()
"tag" => tag.to_string(),
"cache_name" => cache_name_to_string(cache_name)
);

ctr.increment(time.as_micros() as u64);
Expand All @@ -191,7 +193,7 @@ impl DomainMetrics {
}
}

pub(super) fn rec_seed_replay_time(&mut self, tag: Tag, time: Duration) {
pub(super) fn rec_seed_replay_time(&mut self, tag: Tag, cache_name: &Relation, time: Duration) {
if let Some((ctr, histo)) = self.seed_replay_time.get(&tag) {
ctr.increment(time.as_micros() as u64);
histo.record(time.as_micros() as f64);
Expand All @@ -200,14 +202,16 @@ impl DomainMetrics {
recorded::DOMAIN_TOTAL_SEED_REPLAY_TIME,
"domain" => self.index.clone(),
"shard" => self.shard.clone(),
"tag" => tag.to_string()
"tag" => tag.to_string(),
"cache_name" => cache_name_to_string(cache_name)
);

let histo = register_histogram!(
recorded::DOMAIN_SEED_REPLAY_TIME,
"domain" => self.index.clone(),
"shard" => self.shard.clone(),
"tag" => tag.to_string()
"tag" => tag.to_string(),
"cache_name" => cache_name_to_string(cache_name)
);

ctr.increment(time.as_micros() as u64);
Expand All @@ -217,7 +221,12 @@ impl DomainMetrics {
}
}

pub(super) fn rec_finish_replay_time(&mut self, tag: Tag, time: Duration) {
pub(super) fn rec_finish_replay_time(
&mut self,
tag: Tag,
cache_name: &Relation,
time: Duration,
) {
if let Some((ctr, histo)) = self.finish_replay_time.get(&tag) {
ctr.increment(time.as_micros() as u64);
histo.record(time.as_micros() as f64);
Expand All @@ -226,14 +235,16 @@ impl DomainMetrics {
recorded::DOMAIN_TOTAL_FINISH_REPLAY_TIME,
"domain" => self.index.clone(),
"shard" => self.shard.clone(),
"tag" => tag.to_string()
"tag" => tag.to_string(),
"cache_name" => cache_name_to_string(cache_name)
);

let histo = register_histogram!(
recorded::DOMAIN_FINISH_REPLAY_TIME,
"domain" => self.index.clone(),
"shard" => self.shard.clone(),
"tag" => tag.to_string()
"tag" => tag.to_string(),
"cache_name" => cache_name_to_string(cache_name)
);

ctr.increment(time.as_micros() as u64);
Expand Down Expand Up @@ -276,7 +287,12 @@ impl DomainMetrics {
}
}

pub(super) fn rec_reader_replay_time(&mut self, node: LocalNodeIndex, time: Duration) {
pub(super) fn rec_reader_replay_time(
&mut self,
node: LocalNodeIndex,
cache_name: &Relation,
time: Duration,
) {
if let Some((ctr, histo)) = self.reader_replay_request_time.get(node) {
ctr.increment(time.as_micros() as u64);
histo.record(time.as_micros() as f64);
Expand All @@ -286,13 +302,15 @@ impl DomainMetrics {
"domain" => self.index.clone(),
"shard" => self.shard.clone(),
"node" => node.to_string(),
"cache_name" => cache_name_to_string(cache_name)
);

let histo = register_histogram!(
recorded::DOMAIN_READER_REPLAY_REQUEST_TIME,
"domain" => self.index.clone(),
"shard" => self.shard.clone(),
"node" => node.to_string(),
"cache_name" => cache_name_to_string(cache_name)
);

ctr.increment(time.as_micros() as u64);
Expand All @@ -302,7 +320,13 @@ impl DomainMetrics {
}
}

pub(super) fn inc_replay_misses(&mut self, miss_in: LocalNodeIndex, needed_for: Tag, n: usize) {
pub(super) fn inc_replay_misses(
&mut self,
miss_in: LocalNodeIndex,
needed_for: Tag,
cache_name: &Relation,
n: usize,
) {
if let Some(ctr) = self.replay_misses.get(&(miss_in, needed_for)) {
ctr.increment(n as u64);
} else {
Expand All @@ -311,7 +335,8 @@ impl DomainMetrics {
"domain" => self.index.clone(),
"shard" => self.shard.clone(),
"miss_in" => miss_in.id().to_string(),
"needed_for" => needed_for.to_string()
"needed_for" => needed_for.to_string(),
"cache_name" => cache_name_to_string(cache_name)
);

ctr.increment(n as u64);
Expand All @@ -324,7 +349,7 @@ impl DomainMetrics {
self.packets_sent[discriminant as usize].increment(1);
}

pub(super) fn inc_base_table_lookups(&mut self, node: LocalNodeIndex) {
pub(super) fn inc_base_table_lookups(&mut self, node: LocalNodeIndex, cache_name: &Relation) {
if let Some(ctr) = self.base_table_lookups.get(node) {
ctr.increment(1);
} else {
Expand All @@ -333,6 +358,7 @@ impl DomainMetrics {
"domain" => self.index.clone(),
"shard" => self.shard.clone(),
"node" => node.to_string(),
"cache_name" => cache_name_to_string(cache_name)
);
ctr.increment(1);
self.base_table_lookups.insert(node, ctr);
Expand All @@ -359,3 +385,10 @@ impl DomainMetrics {
}
}
}

/// Converts the given cache_name to a string by invoking `display_unquoted()`. This method should
/// only be used for converting cache names to strings for the purposes of including them as metric
/// labels.
fn cache_name_to_string(cache_name: &Relation) -> String {
cache_name.display_unquoted().to_string()
}

0 comments on commit 511d6bc

Please sign in to comment.