From 511d6bcac8efc7243c30183e68e2d62dfe758184 Mon Sep 17 00:00:00 2001 From: Ethan Donowitz Date: Sun, 4 Feb 2024 11:57:02 -0500 Subject: [PATCH] metrics: Add cache_name label This commit adds a new `cache_name` label to a number of domain-related metrics. This will allow us to group by the cache name when plotting these metrics, making it much easier to determine whether the domains in a cache are experiencing congestion due to slow replays. Release-Note-Core: Added a new `cache_name` label to various metrics to improve debuggability Change-Id: I44a4fd6760704ac0b9175990df745e2cc63a7325 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/6850 Tested-by: Buildkite CI Reviewed-by: Luke Osborne --- readyset-client/src/metrics/mod.rs | 10 ++ readyset-dataflow/src/backlog/mod.rs | 17 +- .../src/domain/domain_metrics.rs | 63 +++++-- readyset-dataflow/src/domain/mod.rs | 162 +++++++++++++----- .../src/node/special/packet_filter.rs | 2 + readyset-dataflow/src/payload.rs | 16 +- readyset-server/src/worker/readers.rs | 10 +- readyset-server/src/worker/replica.rs | 6 +- 8 files changed, 216 insertions(+), 70 deletions(-) diff --git a/readyset-client/src/metrics/mod.rs b/readyset-client/src/metrics/mod.rs index 9fb28439cf..ae16bd4a37 100644 --- a/readyset-client/src/metrics/mod.rs +++ b/readyset-client/src/metrics/mod.rs @@ -32,6 +32,7 @@ pub mod recorded { /// | shard | The shard the replay miss is recorded in | /// | miss_in | The LocalNodeIndex of the data flow node where the miss occurred | /// | needed_for | The client tag of the request that the replay is required for. | + /// | cache_name | The name of the cache associated with this replay. pub const DOMAIN_REPLAY_MISSES: &str = "readyset_domain.replay_misses"; /// Histogram: The time in microseconds that a domain spends @@ -67,6 +68,7 @@ pub mod recorded { /// | domain | The index of the domain the replay miss is recorded in. | /// | shard | The shard the replay miss is recorded in. | /// | tag | The client tag of the request that the replay is required for. | + /// | cache_name | The name of the cache associated with this replay. pub const DOMAIN_REPLAY_TIME: &str = "readyset_domain.handle_replay_time"; /// Counter: The total time in microseconds that a domain spends @@ -78,6 +80,7 @@ pub mod recorded { /// | domain | The index of the domain the replay miss is recorded in. | /// | shard | The shard the replay miss is recorded in. | /// | tag | The client tag of the request that the replay is required for. | + /// | cache_name | The name of the cache associated with this replay. pub const DOMAIN_TOTAL_REPLAY_TIME: &str = "readyset_domain.total_handle_replay_time"; /// Histogram: The time in microseconds spent handling a reader replay @@ -90,6 +93,7 @@ pub mod recorded { /// | domain | The index of the domain the reader replay request is recorded in. | /// | shard | The shard the reader replay request is recorded in. | /// | node | The LocalNodeIndex of the reader node handling the packet. | + /// | cache_name | The name of the cache associated with this replay. pub const DOMAIN_READER_REPLAY_REQUEST_TIME: &str = "readyset_domain.reader_replay_request_time_us"; @@ -103,6 +107,7 @@ pub mod recorded { /// | domain | The index of the domain the reader replay request is recorded in. | /// | shard | The shard the reader replay request is recorded in. | /// | node | The LocalNodeIndex of the reader node handling the packet. | + /// | cache_name | The name of the cache associated with this replay. pub const DOMAIN_READER_TOTAL_REPLAY_REQUEST_TIME: &str = "readyset_domain.reader_total_replay_request_time_us"; @@ -115,6 +120,7 @@ pub mod recorded { /// | domain | The index of the domain the replay request is recorded in. | /// | shard |The shard the replay request is recorded in. | /// | tag | The client tag of the request that the replay is required for. | + /// | cache_name | The name of the cache associated with this replay. pub const DOMAIN_SEED_REPLAY_TIME: &str = "readyset_domain.seed_replay_time_us"; /// Counter: The total time in microseconds that a domain spends @@ -126,6 +132,7 @@ pub mod recorded { /// | domain | The index of the domain the replay request is recorded in. | /// | shard |The shard the replay request is recorded in. | /// | tag | The client tag of the request that the replay is required for. | + /// | cache_name | The name of the cache associated with this replay. pub const DOMAIN_TOTAL_SEED_REPLAY_TIME: &str = "readyset_domain.total_seed_replay_time_us"; /// Histogram: The time in microseconds that a domain spawning a state @@ -186,6 +193,7 @@ pub mod recorded { /// | domain | The index of the domain the replay request is recorded in. | /// | shard | The shard the replay request is recorded in. | /// | tag | The client tag of the request that the Finish packet is required for. | + /// | cache_name | The name of the cache associated with this replay. pub const DOMAIN_FINISH_REPLAY_TIME: &str = "readyset_domain.finish_replay_time_us"; /// Counter: The total time in microseconds that a domain spends @@ -197,6 +205,7 @@ pub mod recorded { /// | domain | The index of the domain the replay request is recorded in. | /// | shard | The shard the replay request is recorded in. | /// | tag | The client tag of the request that the Finish packet is required for. | + /// | cache_name | The name of the cache associated with this replay. pub const DOMAIN_TOTAL_FINISH_REPLAY_TIME: &str = "readyset_domain.total_finish_replay_time_us"; /// Histogram: The amount of time spent handling an eviction @@ -280,6 +289,7 @@ pub mod recorded { /// | table_name | The name of the base table. | /// | shard | The shard of the base table the lookup is requested in. | /// | node | The LocalNodeIndex of the base table node handling the packet. | + /// | cache_name | The name of the cache associated with this replay. pub const BASE_TABLE_LOOKUP_REQUESTS: &str = "readyset_base_table.lookup_requests"; /// Counter: The number of packets dropped by an egress node. diff --git a/readyset-dataflow/src/backlog/mod.rs b/readyset-dataflow/src/backlog/mod.rs index ec90c20053..12a7e151cd 100644 --- a/readyset-dataflow/src/backlog/mod.rs +++ b/readyset-dataflow/src/backlog/mod.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use ahash::RandomState; use common::SizeOf; use dataflow_expression::{PostLookup, ReaderProcessing}; +use nom_sql::Relation; use reader_map::{EvictionQuantity, EvictionStrategy}; use readyset_client::consistency::Timestamp; use readyset_client::results::SharedResults; @@ -23,7 +24,7 @@ pub type ReaderUpdatedNotifier = tokio::sync::broadcast::Receiver; pub(crate) trait Trigger = - Fn(&mut dyn Iterator) -> bool + 'static + Send + Sync; + Fn(&mut dyn Iterator, Relation) -> bool + 'static + Send + Sync; /// Allocate a new end-user facing result table. /// @@ -499,7 +500,7 @@ impl std::fmt::Debug for SingleReadHandle { impl SingleReadHandle { /// Trigger a replay of a missing key from a partially materialized view. - pub fn trigger(&self, keys: I) -> bool + pub fn trigger(&self, keys: I, name: Relation) -> bool where I: Iterator, { @@ -511,7 +512,7 @@ impl SingleReadHandle { let mut it = keys; // trigger a replay to populate - (*self.trigger.as_ref().unwrap())(&mut it) + (*self.trigger.as_ref().unwrap())(&mut it, name) } /// Returns None if this handle is not ready, Some(true) if this handle fully contains the given @@ -749,7 +750,7 @@ mod tests { let (r, mut w) = new_partial( 1, Index::hash_map(vec![0]), - |_: &mut dyn Iterator| true, + |_: &mut dyn Iterator, _| true, EvictionKind::Random, ReaderProcessing::default(), ); @@ -774,7 +775,7 @@ mod tests { let (r, mut w) = new_partial( 1, Index::hash_map(vec![0]), - |_: &mut dyn Iterator| true, + |_: &mut dyn Iterator, _| true, EvictionKind::Random, ReaderProcessing::default(), ); @@ -793,7 +794,7 @@ mod tests { let (r, mut w) = new_partial( 1, Index::btree_map(vec![0]), - |_: &mut dyn Iterator| true, + |_: &mut dyn Iterator, _| true, EvictionKind::Random, ReaderProcessing::default(), ); @@ -823,7 +824,7 @@ mod tests { let (r, mut w) = new_partial( 1, Index::btree_map(vec![0]), - |_: &mut dyn Iterator| true, + |_: &mut dyn Iterator, _| true, EvictionKind::Random, ReaderProcessing::default(), ); @@ -844,7 +845,7 @@ mod tests { let (r, mut w) = new_partial( 1, Index::btree_map(vec![0]), - |_: &mut dyn Iterator| true, + |_: &mut dyn Iterator, _| true, EvictionKind::Random, ReaderProcessing::default(), ); diff --git a/readyset-dataflow/src/domain/domain_metrics.rs b/readyset-dataflow/src/domain/domain_metrics.rs index 5324e7bddf..a5b2bfc392 100644 --- a/readyset-dataflow/src/domain/domain_metrics.rs +++ b/readyset-dataflow/src/domain/domain_metrics.rs @@ -11,11 +11,11 @@ use metrics::{ register_counter, register_gauge, register_histogram, Counter, Gauge, Histogram, Label, SharedString, }; -use readyset_client::internal::ReplicaAddress; +use nom_sql::Relation; use readyset_client::metrics::recorded; use strum::{EnumCount, IntoEnumIterator}; -use crate::domain::{LocalNodeIndex, Tag}; +use crate::domain::{LocalNodeIndex, ReplicaAddress, Tag}; use crate::{NodeMap, Packet, PacketDiscriminants}; /// Contains handles to the various metrics collected for a domain. @@ -165,7 +165,7 @@ impl DomainMetrics { } } - pub(super) fn rec_replay_time(&mut self, tag: Tag, time: Duration) { + pub(super) fn rec_replay_time(&mut self, tag: Tag, cache_name: &Relation, time: Duration) { if let Some((ctr, histo)) = self.total_replay_time.get(&tag) { ctr.increment(time.as_micros() as u64); histo.record(time.as_micros() as f64); @@ -174,14 +174,16 @@ impl DomainMetrics { recorded::DOMAIN_TOTAL_REPLAY_TIME, "domain" => self.index.clone(), "shard" => self.shard.clone(), - "tag" => tag.to_string() + "tag" => tag.to_string(), + "cache_name" => cache_name_to_string(cache_name) ); let histo = register_histogram!( recorded::DOMAIN_REPLAY_TIME, "domain" => self.index.clone(), "shard" => self.shard.clone(), - "tag" => tag.to_string() + "tag" => tag.to_string(), + "cache_name" => cache_name_to_string(cache_name) ); ctr.increment(time.as_micros() as u64); @@ -191,7 +193,7 @@ impl DomainMetrics { } } - pub(super) fn rec_seed_replay_time(&mut self, tag: Tag, time: Duration) { + pub(super) fn rec_seed_replay_time(&mut self, tag: Tag, cache_name: &Relation, time: Duration) { if let Some((ctr, histo)) = self.seed_replay_time.get(&tag) { ctr.increment(time.as_micros() as u64); histo.record(time.as_micros() as f64); @@ -200,14 +202,16 @@ impl DomainMetrics { recorded::DOMAIN_TOTAL_SEED_REPLAY_TIME, "domain" => self.index.clone(), "shard" => self.shard.clone(), - "tag" => tag.to_string() + "tag" => tag.to_string(), + "cache_name" => cache_name_to_string(cache_name) ); let histo = register_histogram!( recorded::DOMAIN_SEED_REPLAY_TIME, "domain" => self.index.clone(), "shard" => self.shard.clone(), - "tag" => tag.to_string() + "tag" => tag.to_string(), + "cache_name" => cache_name_to_string(cache_name) ); ctr.increment(time.as_micros() as u64); @@ -217,7 +221,12 @@ impl DomainMetrics { } } - pub(super) fn rec_finish_replay_time(&mut self, tag: Tag, time: Duration) { + pub(super) fn rec_finish_replay_time( + &mut self, + tag: Tag, + cache_name: &Relation, + time: Duration, + ) { if let Some((ctr, histo)) = self.finish_replay_time.get(&tag) { ctr.increment(time.as_micros() as u64); histo.record(time.as_micros() as f64); @@ -226,14 +235,16 @@ impl DomainMetrics { recorded::DOMAIN_TOTAL_FINISH_REPLAY_TIME, "domain" => self.index.clone(), "shard" => self.shard.clone(), - "tag" => tag.to_string() + "tag" => tag.to_string(), + "cache_name" => cache_name_to_string(cache_name) ); let histo = register_histogram!( recorded::DOMAIN_FINISH_REPLAY_TIME, "domain" => self.index.clone(), "shard" => self.shard.clone(), - "tag" => tag.to_string() + "tag" => tag.to_string(), + "cache_name" => cache_name_to_string(cache_name) ); ctr.increment(time.as_micros() as u64); @@ -276,7 +287,12 @@ impl DomainMetrics { } } - pub(super) fn rec_reader_replay_time(&mut self, node: LocalNodeIndex, time: Duration) { + pub(super) fn rec_reader_replay_time( + &mut self, + node: LocalNodeIndex, + cache_name: &Relation, + time: Duration, + ) { if let Some((ctr, histo)) = self.reader_replay_request_time.get(node) { ctr.increment(time.as_micros() as u64); histo.record(time.as_micros() as f64); @@ -286,6 +302,7 @@ impl DomainMetrics { "domain" => self.index.clone(), "shard" => self.shard.clone(), "node" => node.to_string(), + "cache_name" => cache_name_to_string(cache_name) ); let histo = register_histogram!( @@ -293,6 +310,7 @@ impl DomainMetrics { "domain" => self.index.clone(), "shard" => self.shard.clone(), "node" => node.to_string(), + "cache_name" => cache_name_to_string(cache_name) ); ctr.increment(time.as_micros() as u64); @@ -302,7 +320,13 @@ impl DomainMetrics { } } - pub(super) fn inc_replay_misses(&mut self, miss_in: LocalNodeIndex, needed_for: Tag, n: usize) { + pub(super) fn inc_replay_misses( + &mut self, + miss_in: LocalNodeIndex, + needed_for: Tag, + cache_name: &Relation, + n: usize, + ) { if let Some(ctr) = self.replay_misses.get(&(miss_in, needed_for)) { ctr.increment(n as u64); } else { @@ -311,7 +335,8 @@ impl DomainMetrics { "domain" => self.index.clone(), "shard" => self.shard.clone(), "miss_in" => miss_in.id().to_string(), - "needed_for" => needed_for.to_string() + "needed_for" => needed_for.to_string(), + "cache_name" => cache_name_to_string(cache_name) ); ctr.increment(n as u64); @@ -324,7 +349,7 @@ impl DomainMetrics { self.packets_sent[discriminant as usize].increment(1); } - pub(super) fn inc_base_table_lookups(&mut self, node: LocalNodeIndex) { + pub(super) fn inc_base_table_lookups(&mut self, node: LocalNodeIndex, cache_name: &Relation) { if let Some(ctr) = self.base_table_lookups.get(node) { ctr.increment(1); } else { @@ -333,6 +358,7 @@ impl DomainMetrics { "domain" => self.index.clone(), "shard" => self.shard.clone(), "node" => node.to_string(), + "cache_name" => cache_name_to_string(cache_name) ); ctr.increment(1); self.base_table_lookups.insert(node, ctr); @@ -359,3 +385,10 @@ impl DomainMetrics { } } } + +/// Converts the given cache_name to a string by invoking `display_unquoted()`. This method should +/// only be used for converting cache names to strings for the purposes of including them as metric +/// labels. +fn cache_name_to_string(cache_name: &Relation) -> String { + cache_name.display_unquoted().to_string() +} diff --git a/readyset-dataflow/src/domain/mod.rs b/readyset-dataflow/src/domain/mod.rs index 306ae2feae..82e52613ad 100644 --- a/readyset-dataflow/src/domain/mod.rs +++ b/readyset-dataflow/src/domain/mod.rs @@ -24,6 +24,7 @@ use futures_util::stream::StreamExt; use futures_util::TryFutureExt; pub use internal::{DomainIndex, ReplicaAddress}; use merging_interval_tree::IntervalTreeSet; +use nom_sql::Relation; use petgraph::graph::NodeIndex; use readyset_alloc::StdThreadBuildWrapper; use readyset_client::debug::info::KeyCount; @@ -55,6 +56,9 @@ use crate::prelude::*; use crate::processing::ColumnMiss; use crate::{backlog, DomainRequest, Readers}; +/// A stub for the cache name used for domain metrics that are emitted during a migration. +const MIGRATION_CACHE_NAME_STUB: &str = "migration"; + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct Config { /// If set to `true`, the metric tracking the in-memory size of materialized state will be @@ -791,6 +795,7 @@ impl Domain { miss_columns: &[usize], dst: Destination, target: Target, + cache_name: Relation, ) -> Result<(), ReadySetError> { let miss_index = Index::new(IndexType::best_for_keys(&miss_keys), miss_columns.to_vec()); // the cloned is a bit sad; self.request_partial_replay doesn't use @@ -847,11 +852,12 @@ impl Domain { unishard: true, // local replays are necessarily single-shard requesting_shard: self.shard(), requesting_replica: self.replica(), + cache_name: cache_name.clone(), }); continue; } - self.send_partial_replay_request(tag, miss_keys.clone())?; + self.send_partial_replay_request(tag, miss_keys.clone(), cache_name.clone())?; } Ok(()) @@ -867,6 +873,7 @@ impl Domain { requesting_shard: usize, requesting_replica: usize, needed_for: Tag, + cache_name: Relation, ) -> ReadySetResult<()> { use std::collections::hash_map::Entry; use std::ops::AddAssign; @@ -875,7 +882,7 @@ impl Domain { let mut w = self.waiting.remove(miss_in).unwrap_or_default(); self.metrics - .inc_replay_misses(miss_in, needed_for, missed_keys.len()); + .inc_replay_misses(miss_in, needed_for, &cache_name, missed_keys.len()); let is_generated = self .replay_paths @@ -978,7 +985,13 @@ impl Domain { self.waiting.insert(miss_in, w); for ((target, columns), keys) in needed_replays { - self.find_tags_and_replay(keys, &columns, Destination(miss_in), target)? + self.find_tags_and_replay( + keys, + &columns, + Destination(miss_in), + target, + cache_name.clone(), + )? } Ok(()) @@ -993,6 +1006,7 @@ impl Domain { &mut self, tag: Tag, keys: Vec, + cache_name: Relation, ) -> ReadySetResult<()> { let requesting_shard = self.shard(); let requesting_replica = self.replica(); @@ -1027,6 +1041,7 @@ impl Domain { keys: keys.clone(), // sad to clone here requesting_shard, requesting_replica, + cache_name: cache_name.clone(), }) .is_err() { @@ -1051,6 +1066,7 @@ impl Domain { unishard: true, // only one option, so only one path requesting_shard, requesting_replica, + cache_name, }) .is_err() { @@ -1075,6 +1091,7 @@ impl Domain { unishard: true, // !ask_all, so only one path requesting_shard, requesting_replica, + cache_name: cache_name.clone(), }) .is_err() { @@ -1645,9 +1662,15 @@ impl Domain { } let replica = self.replica(); + + struct Misses { + misses: Vec, + cache_name: Relation, + } + let txs = (0..num_shards) .map(|shard| -> ReadySetResult<_> { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let (tx, rx) = tokio::sync::mpsc::unbounded_channel::(); let sender = self .channel_coordinator .builder_for(&ReplicaAddress { @@ -1661,9 +1684,10 @@ impl Domain { tokio::spawn( UnboundedReceiverStream::new(rx) .map(move |misses| Packet::RequestReaderReplay { - keys: misses, + keys: misses.misses, cols: cols.clone(), node, + cache_name: misses.cache_name, }) .map(Ok) .forward(sender) @@ -1687,14 +1711,14 @@ impl Domain { let (r_part, w_part) = backlog::new_partial( num_columns, index, - move |misses: &mut dyn Iterator| { + move |misses: &mut dyn Iterator, cache_name| { if num_shards == 1 { let misses = misses.collect::>(); if misses.is_empty() { return true; } #[allow(clippy::indexing_slicing)] // just checked len is 1 - txs[0].send(misses).is_ok() + txs[0].send(Misses { misses, cache_name }).is_ok() } else { let mut per_shard = HashMap::new(); for miss in misses { @@ -1712,7 +1736,12 @@ impl Domain { per_shard.into_iter().all(|(shard, keys)| { #[allow(clippy::indexing_slicing)] // we know txs.len() is equal to num_shards - txs[shard].send(keys).is_ok() + txs[shard] + .send(Misses { + misses: keys, + cache_name: cache_name.clone(), + }) + .is_ok() }) } }, @@ -1965,6 +1994,7 @@ impl Domain { replicas: replicas.clone(), }, data: Vec::::new().into(), + cache_name: MIGRATION_CACHE_NAME_STUB.into(), }); let added_cols = self.ingress_inject.get(from).cloned(); @@ -2042,6 +2072,7 @@ impl Domain { replicas: replicas.clone(), }, data: chunk, + cache_name: MIGRATION_CACHE_NAME_STUB.into(), }; trace!(num = i, len, "sending batch"); @@ -2066,6 +2097,7 @@ impl Domain { replicas: replicas.clone(), }, data: Default::default(), + cache_name: MIGRATION_CACHE_NAME_STUB.into(), }) { warn!(%error, "replayer noticed domain shutdown"); } @@ -2378,12 +2410,19 @@ impl Domain { self.total_forward_time.stop(); self.metrics.rec_forward_time(src, dst, start.elapsed()); } - Packet::ReplayPiece { tag, .. } => { + Packet::ReplayPiece { + ref tag, + ref cache_name, + .. + } => { let start = time::Instant::now(); + let cache_name = cache_name.clone(); + let tag = *tag; self.total_replay_time.start(); self.handle_replay(m, executor)?; self.total_replay_time.stop(); - self.metrics.rec_replay_time(tag, start.elapsed()); + self.metrics + .rec_replay_time(tag, &cache_name, start.elapsed()); } Packet::Evict(req) => { self.handle_eviction(req, executor)?; @@ -2400,6 +2439,7 @@ impl Domain { mut keys, cols, node, + cache_name, } => { let start = time::Instant::now(); self.total_replay_time.start(); @@ -2458,39 +2498,40 @@ impl Domain { // Destination and target are the same since readers can't generate columns Destination(node), Target(node), + cache_name.clone(), )?; } self.total_replay_time.stop(); - self.metrics.rec_reader_replay_time(node, start.elapsed()); + self.metrics + .rec_reader_replay_time(node, &cache_name, start.elapsed()); } Packet::RequestPartialReplay { tag, - keys, - unishard, - requesting_shard, - requesting_replica, + ref keys, + ref cache_name, + .. } => { trace!(%tag, ?keys, "got replay request"); let start = time::Instant::now(); + let cache_name = cache_name.clone(); self.total_replay_time.start(); - self.seed_all( - tag, - requesting_shard, - requesting_replica, - keys.into_iter().collect(), - unishard, - executor, - )?; + self.seed_all(m, executor)?; self.total_replay_time.stop(); - self.metrics.rec_seed_replay_time(tag, start.elapsed()); + self.metrics + .rec_seed_replay_time(tag, &cache_name, start.elapsed()); } - Packet::Finish(tag, ni) => { + Packet::Finish { + tag, + node, + cache_name, + } => { let start = time::Instant::now(); self.total_replay_time.start(); - self.finish_replay(tag, ni, executor)?; + self.finish_replay(tag, node, &cache_name, executor)?; self.total_replay_time.stop(); - self.metrics.rec_finish_replay_time(tag, start.elapsed()); + self.metrics + .rec_finish_replay_time(tag, &cache_name, start.elapsed()); } Packet::Spin => { // spinning as instructed @@ -2663,15 +2704,31 @@ impl Domain { } } - fn seed_all( - &mut self, - tag: Tag, - requesting_shard: usize, - requesting_replica: usize, - keys: HashSet, - single_shard: bool, - ex: &mut dyn Executor, - ) -> Result<(), ReadySetError> { + fn seed_all(&mut self, packet: Packet, ex: &mut dyn Executor) -> Result<(), ReadySetError> { + let (tag, keys, unishard, requesting_shard, requesting_replica, cache_name) = + if let Packet::RequestPartialReplay { + tag, + keys, + unishard, + requesting_shard, + requesting_replica, + cache_name, + } = packet + { + ( + tag, + keys, + unishard, + requesting_shard, + requesting_replica, + cache_name, + ) + } else { + internal!() + }; + + let keys: HashSet = keys.into_iter().collect(); + #[allow(clippy::indexing_slicing)] // tag came from an internal data structure that guarantees it's present let (source, index, path) = match &self.replay_paths[tag] { @@ -2709,7 +2766,8 @@ impl Domain { if let Some(node) = self.nodes.get(*source) { if node.borrow().is_base() { - self.metrics.inc_base_table_lookups(*source); + self.metrics + .inc_base_table_lookups(*source, &cache_name.clone()); } } @@ -2748,10 +2806,11 @@ impl Domain { // same for range queries was a whole bug that eta had to spend like 2 // hours tracking down, only to find it was as simple as this. replay_keys, - single_shard, + unishard, requesting_shard, requesting_replica, tag, + cache_name.clone(), )?; } @@ -2769,11 +2828,12 @@ impl Domain { tag, context: ReplayPieceContext::Partial { for_keys: found_keys, - unishard: single_shard, // if we are the only source, only one path + unishard, // if we are the only source, only one path requesting_shard, requesting_replica, }, data: records.into(), + cache_name, }, ex, )?; @@ -2784,6 +2844,12 @@ impl Domain { #[allow(clippy::cognitive_complexity)] fn handle_replay(&mut self, m: Packet, ex: &mut dyn Executor) -> ReadySetResult<()> { + let cache_name = if let Packet::ReplayPiece { ref cache_name, .. } = m { + cache_name.clone() + } else { + internal!() + }; + let tag = m .tag() .ok_or_else(|| internal_err!("handle_replay called on an invalid message"))?; @@ -2841,9 +2907,9 @@ impl Domain { Packet::ReplayPiece { tag, link, - data, context, + .. } => (tag, link, data, context), _ => internal!(), }; @@ -2954,6 +3020,7 @@ impl Domain { tag, data, context, + cache_name: cache_name.clone(), }); macro_rules! replay_context { @@ -3595,6 +3662,7 @@ impl Domain { next_replay.requesting_shard, next_replay.requesting_replica, next_replay.tag, + cache_name.clone(), )?; } @@ -3690,6 +3758,7 @@ impl Domain { keys, requesting_shard, requesting_replica, + cache_name: cache_name.clone(), }); } @@ -3717,7 +3786,11 @@ impl Domain { // but this allows finish_replay to dispatch into the node by // overriding replaying_to. self.not_ready.remove(&dst); - self.delayed_for_self.push_back(Packet::Finish(tag, dst)); + self.delayed_for_self.push_back(Packet::Finish { + tag, + node: dst, + cache_name: cache_name.clone(), + }); } } Ok(()) @@ -3727,6 +3800,7 @@ impl Domain { &mut self, tag: Tag, node: LocalNodeIndex, + cache_name: &Relation, ex: &mut dyn Executor, ) -> Result<(), ReadySetError> { let mut was = mem::replace(&mut self.mode, DomainMode::Forwarding); @@ -3819,7 +3893,11 @@ impl Domain { } } else { // we're not done -- inject a request to continue handling buffered things - self.delayed_for_self.push_back(Packet::Finish(tag, node)); + self.delayed_for_self.push_back(Packet::Finish { + tag, + node, + cache_name: cache_name.clone(), + }); Ok(()) } } diff --git a/readyset-dataflow/src/node/special/packet_filter.rs b/readyset-dataflow/src/node/special/packet_filter.rs index 047907fdfe..86ac27751a 100644 --- a/readyset-dataflow/src/node/special/packet_filter.rs +++ b/readyset-dataflow/src/node/special/packet_filter.rs @@ -289,6 +289,7 @@ mod test { requesting_replica: 0, unishard: false, }, + cache_name: "test".into(), }; let mut packet_filter = PacketFilter::default(); @@ -611,6 +612,7 @@ mod test { tag: Tag::new(1), data: Default::default(), context, + cache_name: "test".into(), } } } diff --git a/readyset-dataflow/src/payload.rs b/readyset-dataflow/src/payload.rs index 51430e60d9..1b086bfcbc 100644 --- a/readyset-dataflow/src/payload.rs +++ b/readyset-dataflow/src/payload.rs @@ -3,6 +3,7 @@ use std::fmt::{self, Display}; use dataflow_state::MaterializedNodeState; use itertools::Itertools; +use nom_sql::Relation; use readyset_client::{self, KeyComparison, PacketData, PacketTrace}; use readyset_data::DfType; use serde::{Deserialize, Serialize}; @@ -465,13 +466,20 @@ pub enum Packet { tag: Tag, data: Records, context: ReplayPieceContext, + /// The cache name associated with the replay. Only used for metric labels. + cache_name: Relation, }, // Trigger an eviction as specified by the to the [`EvictRequest`]. Evict(EvictRequest), // Internal control - Finish(Tag, LocalNodeIndex), + Finish { + tag: Tag, + node: LocalNodeIndex, + /// The cache name associated with the replay. Only used for metric labels. + cache_name: Relation, + }, // Control messages /// Ask domain (nicely) to replay a particular set of keys. @@ -481,6 +489,8 @@ pub enum Packet { unishard: bool, requesting_shard: usize, requesting_replica: usize, + /// The cache name associated with the replay. Only used for metric labels. + cache_name: Relation, }, /// Ask domain (nicely) to replay a particular set of keys into a Reader. @@ -488,6 +498,8 @@ pub enum Packet { node: LocalNodeIndex, cols: Vec, keys: Vec, + /// The cache name associated with the replay. Only used for metric labels. + cache_name: Relation, }, /// A packet used solely to drive the event loop forward. @@ -626,11 +638,13 @@ impl Packet { tag, ref data, ref context, + ref cache_name, } => Packet::ReplayPiece { link, tag, data: data.clone(), context: context.clone(), + cache_name: cache_name.clone(), }, Packet::Timestamp { ref timestamp, diff --git a/readyset-server/src/worker/readers.rs b/readyset-server/src/worker/readers.rs index 91d0eab569..543c817e0e 100644 --- a/readyset-server/src/worker/readers.rs +++ b/readyset-server/src/worker/readers.rs @@ -241,7 +241,10 @@ impl ReadRequestHandler { // Trigger backfills for all the keys we missed on, regardless of a consistency hit/miss if !keys_to_replay.is_empty() { - reader.trigger(keys_to_replay.into_iter().map(|k| k.into_owned())); + reader.trigger( + keys_to_replay.into_iter().map(|k| k.into_owned()), + target.name.clone(), + ); } let read = BlockingRead { @@ -540,7 +543,10 @@ impl BlockingRead { self.eviction_epoch = cur_eviction_epoch; // Retrigger all un-read keys. Its possible they could have been filled and then // evicted again without us reading it. - if !reader.trigger(still_waiting.into_iter().map(|v| v.into_owned())) { + if !reader.trigger( + still_waiting.into_iter().map(|v| v.into_owned()), + self.target.name.clone(), + ) { // server is shutting down and won't do the backfill return Poll::Ready(Err(ReadySetError::ServerShuttingDown)); } diff --git a/readyset-server/src/worker/replica.rs b/readyset-server/src/worker/replica.rs index c80a8eb992..92981a1739 100644 --- a/readyset-server/src/worker/replica.rs +++ b/readyset-server/src/worker/replica.rs @@ -118,7 +118,9 @@ fn flatten_request_reader_replay( let mut i = 0; while i < packets.len() { match packets.get_mut(i) { - Some(Packet::RequestReaderReplay { node, cols, keys }) if *node == n && *cols == c => { + Some(Packet::RequestReaderReplay { + node, cols, keys, .. + }) if *node == n && *cols == c => { unique_keys.extend(keys.drain(..)); packets.remove(i); } @@ -435,7 +437,7 @@ impl Replica { // After processing we need to ack timestamp and input messages from base connections.iter_mut().find(|(t, _)| *t == *token).map(|(_, conn)| (*tag, conn)) } - Packet::RequestReaderReplay { node, cols, keys } => { + Packet::RequestReaderReplay { node, cols, keys, .. } => { // We want to batch multiple reader replay requests into a single call while // deduplicating non unique keys let mut unique_keys: HashSet<_> = keys.drain(..).collect();