Skip to content

Commit

Permalink
metrics: Update node state metrics
Browse files Browse the repository at this point in the history
Previously, we were emitting metrics for base table size, node state
size, partial state size, and reader state size using the node index,
domain, and shard as labels. These values have high cardinality, which
means there is a high cost associated with using these labels.

This commit does two things:
1. Removes the metrics that were being used to track node state and node
   partial state, since we cannot emit gauge metrics on a per-node basis
   without labeling the metrics with the node index; and
2. Updates the base table state size and reader state size gauge metrics
   to use the table name and reader node name, respectively, as labels

Release-Note-Core: Simplified the metrics Readyset emits to keep track
  of internal state size
Change-Id: I3b7c589d73f3fa4bad7f8c2e69a403a3cbb34f2d
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/6851
Tested-by: Buildkite CI
Reviewed-by: Luke Osborne <luke@readyset.io>
  • Loading branch information
ethan-readyset committed Feb 6, 2024
1 parent 511d6bc commit f0d1228
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 92 deletions.
27 changes: 5 additions & 22 deletions readyset-client/src/metrics/mod.rs
Expand Up @@ -238,38 +238,21 @@ pub mod recorded {
/// decision and sending packets.
pub const EVICTION_WORKER_EVICTION_TIME: &str = "readyset_eviction_worker.eviction_time_us";

/// Gauge: The amount of bytes required to store a dataflow node's state./
///
/// | Tag | Description |
/// | --- | ----------- |
/// | node | The LocalNodeIndex of the dataflow node. |
pub const NODE_STATE_SIZE_BYTES: &str = "readyset_node_state_size_bytes";

/// Gauge: The sum of the amount of bytes used to store the dataflow node's
/// partial state within a domain.
/// Gauge: The sum of the amount of bytes used to store a node's reader state
/// within a domain.
///
/// | Tag | Description |
/// | --- | ----------- |
/// | domain | The index of the domain. |
/// | shard | The shard identifier of the domain. |
pub const DOMAIN_PARTIAL_STATE_SIZE_BYTES: &str = "readyset_domain.partial_state_size_bytes";

/// Gauge: The sum of the amount of bytes used to store a node's reader state
/// within a domain.
/// | name | The name of the reader node |
pub const READER_STATE_SIZE_BYTES: &str = "readyset_reader_state_size_bytes";

/// Gauge: The sum of the amount of bytes used to store a node's base tables
/// on disk.
pub const ESTIMATED_BASE_TABLE_SIZE_BYTES: &str = "readyset_base_tables_estimated_size_bytes";

/// Gauge: The sum of a domain's total node state and reader state bytes.
///
/// | Tag | Description |
/// | --- | ----------- |
/// | domain | The index of the domain. |
/// | shard | The shard identifier of the domain. |
pub const DOMAIN_TOTAL_NODE_STATE_SIZE_BYTES: &str =
"readyset_domain.total_node_state_size_bytes";
/// | table_name | The name of the base table. |
pub const ESTIMATED_BASE_TABLE_SIZE_BYTES: &str = "readyset_base_tables_estimated_size_bytes";

/// Counter: The number of HTTP requests received at the readyset-server, for either the
/// controller or worker.
Expand Down
56 changes: 13 additions & 43 deletions readyset-dataflow/src/domain/domain_metrics.rs
Expand Up @@ -7,10 +7,7 @@ use std::collections::BTreeMap;
use std::convert::TryInto;
use std::time::Duration;

use metrics::{
register_counter, register_gauge, register_histogram, Counter, Gauge, Histogram, Label,
SharedString,
};
use metrics::{gauge, register_counter, register_histogram, Counter, Histogram, SharedString};
use nom_sql::Relation;
use readyset_client::metrics::recorded;
use strum::{EnumCount, IntoEnumIterator};
Expand All @@ -29,11 +26,6 @@ pub(super) struct DomainMetrics {
eviction_time: Histogram,
eviction_size: Histogram,

partial_state_size: Gauge,
reader_state_size: Gauge,
base_table_size: Gauge,
total_node_state_size: Gauge,

packets_sent: [Counter; PacketDiscriminants::COUNT],

// using a BTree to look up metrics by tag/node, BTree is faster than HashMap for u32/u64 keys
Expand All @@ -47,19 +39,13 @@ pub(super) struct DomainMetrics {
reader_replay_request_time: NodeMap<(Counter, Histogram)>,
chunked_replay_time: NodeMap<(Counter, Histogram)>,
base_table_lookups: NodeMap<Counter>,
node_state_size: NodeMap<Gauge>,
}

impl DomainMetrics {
pub(super) fn new(replica_address: ReplicaAddress) -> Self {
let index: SharedString = replica_address.domain_index.index().to_string().into();
let shard: SharedString = replica_address.shard.to_string().into();

let labels_with_domain_and_shard = vec![
Label::new("domain", index.clone()),
Label::new("shard", shard.clone()),
];

let packets_sent: Vec<Counter> = PacketDiscriminants::iter()
.map(|d| {
let name: &'static str = d.into();
Expand All @@ -72,17 +58,6 @@ impl DomainMetrics {
.collect();

DomainMetrics {
partial_state_size: register_gauge!(
recorded::DOMAIN_PARTIAL_STATE_SIZE_BYTES,
labels_with_domain_and_shard.clone()
),
reader_state_size: register_gauge!(recorded::READER_STATE_SIZE_BYTES, vec![]),
base_table_size: register_gauge!(recorded::ESTIMATED_BASE_TABLE_SIZE_BYTES, vec![]),
total_node_state_size: register_gauge!(
recorded::DOMAIN_TOTAL_NODE_STATE_SIZE_BYTES,
labels_with_domain_and_shard
),

eviction_requests: register_counter!(recorded::EVICTION_REQUESTS, vec![],),
eviction_time: register_histogram!(recorded::EVICTION_TIME, vec![]),
eviction_size: register_histogram!(recorded::EVICTION_FREED_MEMORY, vec![],),
Expand All @@ -96,7 +71,6 @@ impl DomainMetrics {
packets_sent: packets_sent.try_into().ok().unwrap(),
reader_replay_request_time: Default::default(),
base_table_lookups: Default::default(),
node_state_size: Default::default(),
shard,
index,
}
Expand Down Expand Up @@ -365,24 +339,20 @@ impl DomainMetrics {
}
}

pub(super) fn set_state_sizes(&self, partial: u64, reader: u64, base: u64, node: u64) {
self.partial_state_size.set(partial as f64);
self.reader_state_size.set(reader as f64);
self.base_table_size.set(base as f64);
self.total_node_state_size.set(node as f64);
pub(super) fn set_reader_state_size(&self, name: &Relation, size: u64) {
gauge!(
recorded::READER_STATE_SIZE_BYTES,
size as f64,
"name" => cache_name_to_string(name),
);
}

pub(super) fn set_node_state_size(&mut self, node: LocalNodeIndex, size: u64) {
if let Some(gauge) = self.node_state_size.get(node) {
gauge.set(size as f64);
} else {
let gauge = register_gauge!(
recorded::NODE_STATE_SIZE_BYTES,
"node" => node.to_string(),
);
gauge.set(size as f64);
self.node_state_size.insert(node, gauge);
}
pub(super) fn set_base_table_size(&self, name: &Relation, size: u64) {
gauge!(
recorded::ESTIMATED_BASE_TABLE_SIZE_BYTES,
size as f64,
"table_name" => cache_name_to_string(name),
);
}
}

Expand Down
38 changes: 14 additions & 24 deletions readyset-dataflow/src/domain/mod.rs
Expand Up @@ -1444,7 +1444,6 @@ impl Domain {
};
self.auxiliary_node_states.remove(node);
self.reader_write_handles.remove(node);
self.metrics.set_node_state_size(node, 0);
trace!(local = node.id(), "node removed");
}

Expand Down Expand Up @@ -2299,10 +2298,6 @@ impl Domain {
let ret = (domain_stats, node_stats);
Ok(Some(bincode::serialize(&ret)?))
}
DomainRequest::UpdateStateSize => {
self.update_state_sizes();
Ok(None)
}
DomainRequest::RequestMinPersistedReplicationOffset => Ok(Some(bincode::serialize(
&self.min_persisted_replication_offset()?,
)?)),
Expand Down Expand Up @@ -4399,6 +4394,9 @@ impl Domain {
reader_size += size;
}
}

self.metrics.set_reader_state_size(n.name(), size);

size
} else {
// Not a reader, state is with domain
Expand All @@ -4411,31 +4409,23 @@ impl Domain {
})
.sum();

let Domain { state, metrics, .. } = self; // Help borrowchk
let total_node_state: u64 = state
.iter()
.map(|(ni, state)| {
let ret = state.deep_size_of();
metrics.set_node_state_size(ni, ret);
ret
})
.sum();

self.metrics.set_state_sizes(
total,
reader_size,
self.estimated_base_tables_size(),
total_node_state + reader_size,
);

self.state_size.store(total as usize, Ordering::Release);
// no response sent, as worker will read the atomic
}

pub fn estimated_base_tables_size(&self) -> u64 {
self.state
.values()
.filter_map(|state| state.as_persistent().map(|s| s.deep_size_of()))
.iter()
.filter_map(|(ni, state)| {
state.as_persistent().map(|s| {
let n = &*self.nodes.get(ni).unwrap().borrow();
let size = s.deep_size_of();

self.metrics.set_base_table_size(n.name(), size);

size
})
})
.sum()
}

Expand Down
3 changes: 0 additions & 3 deletions readyset-dataflow/src/payload.rs
Expand Up @@ -346,9 +346,6 @@ pub enum DomainRequest {
state: PrepareStateKind,
},

/// Ask domain to log its state size
UpdateStateSize,

/// Inform domain about a new replay path.
SetupReplayPath {
tag: Tag,
Expand Down

0 comments on commit f0d1228

Please sign in to comment.