diff --git a/Cargo.lock b/Cargo.lock index ef586274cf..833f0fa52a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -48,6 +48,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" + [[package]] name = "android_system_properties" version = "0.1.4" @@ -2060,6 +2066,16 @@ dependencies = [ "ahash 0.8.3", ] +[[package]] +name = "hashbrown" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" +dependencies = [ + "ahash 0.8.3", + "allocator-api2", +] + [[package]] name = "hdrhistogram" version = "7.5.2" @@ -2705,6 +2721,15 @@ dependencies = [ "hashbrown 0.13.1", ] +[[package]] +name = "lru" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60" +dependencies = [ + "hashbrown 0.14.1", +] + [[package]] name = "lru-cache" version = "0.1.2" @@ -4392,6 +4417,7 @@ dependencies = [ "indexmap", "itertools", "lazy_static", + "lru 0.12.0", "metrics", "metrics-exporter-prometheus", "metrics-util", diff --git a/readyset-adapter/Cargo.toml b/readyset-adapter/Cargo.toml index dd0c496df8..7bedc19a65 100644 --- a/readyset-adapter/Cargo.toml +++ b/readyset-adapter/Cargo.toml @@ -63,6 +63,7 @@ readyset-sql-passes = { path = "../readyset-sql-passes" } readyset-version = { path = "../readyset-version" } health-reporter = { path = "../health-reporter" } database-utils = { path = "../database-utils" } +lru = "0.12.0" [dev-dependencies] proptest = "1.0.0" @@ -78,4 +79,4 @@ harness = false [features] ryw = [] -failure_injection = ["fail/failpoints"] \ No newline at end of file +failure_injection = ["fail/failpoints"] diff --git a/readyset-adapter/src/query_status_cache.rs b/readyset-adapter/src/query_status_cache.rs index 4ee2d5c679..d8e4ef2507 100644 --- a/readyset-adapter/src/query_status_cache.rs +++ b/readyset-adapter/src/query_status_cache.rs @@ -11,12 +11,16 @@ use anyhow::anyhow; use clap::ValueEnum; use dashmap::mapref::entry::Entry; use dashmap::DashMap; +use lru::LruCache; +use parking_lot::RwLock; use readyset_client::query::*; use readyset_client::ViewCreateRequest; use readyset_data::DfValue; use readyset_util::hash::hash; use tracing::error; +const NUM_PERSISTED_QUERIES: usize = 100_000; + /// A metadata cache for all queries that have been processed by this /// adapter. Thread-safe. #[derive(Debug)] @@ -44,50 +48,60 @@ pub struct QueryStatusCache { enable_experimental_placeholder_inlining: bool, } -#[derive(Debug, Default)] +#[derive(Debug)] /// A handle to persistent metadata for all queries that have been processed by this adapter. pub struct PersistentStatusCacheHandle { - /// A Thread-safe hash map that holds the full [`Query`] as well as its associated - /// [`QueryStatus`]. The `QueryStatus` must match the one in - /// [`QueryStatusCache::id_to_status`]. - statuses: DashMap, + /// An [`LRUCache`] that holds the full [`Query`] as well as its associated + /// [`QueryStatus`] for a fixed number of queries. + statuses: RwLock>, /// List of pending inlined migrations. Contains the query to be inlined, and the sets of /// parameters to use for inlining. pending_inlined_migrations: DashMap>>, } +impl Default for PersistentStatusCacheHandle { + fn default() -> Self { + Self { + statuses: RwLock::new(LruCache::new( + NUM_PERSISTED_QUERIES + .try_into() + .expect("num persisted queries is not zero"), + )), + pending_inlined_migrations: Default::default(), + } + } +} + impl PersistentStatusCacheHandle { fn insert_with_status(&self, q: Query, id: QueryId, status: QueryStatus) { - self.statuses.insert(id, (q, status)); + let mut statuses = self.statuses.write(); + statuses.put(id, (q, status)); } fn allow_list(&self) -> Vec<(QueryId, Arc, QueryStatus)> { - self.statuses + let statuses = self.statuses.read(); + statuses .iter() - .filter_map(|entry| { - let (query_id, (query, status)) = entry.pair(); - match query { - Query::Parsed(view) => { - if status.is_successful() { - Some((*query_id, view.clone(), status.clone())) - } else { - None - } + .filter_map(|(query_id, (query, status))| match query { + Query::Parsed(view) => { + if status.is_successful() { + Some((*query_id, view.clone(), status.clone())) + } else { + None } - Query::ParseFailed(_) => None, } + Query::ParseFailed(_) => None, }) .collect::>() } fn deny_list(&self, style: MigrationStyle) -> Vec { + let statuses = self.statuses.read(); match style { - MigrationStyle::Async | MigrationStyle::InRequestPath => self - .statuses + MigrationStyle::Async | MigrationStyle::InRequestPath => statuses .iter() - .filter_map(|entry| { - let (query_id, (query, status)) = entry.pair(); + .filter_map(|(query_id, (query, status))| { if status.is_unsupported() || status.is_dropped() { Some(DeniedQuery { id: *query_id, @@ -99,11 +113,9 @@ impl PersistentStatusCacheHandle { } }) .collect::>(), - MigrationStyle::Explicit => self - .statuses + MigrationStyle::Explicit => statuses .iter() - .filter_map(|entry| { - let (query_id, (query, status)) = entry.pair(); + .filter_map(|(query_id, (query, status))| { if status.is_denied() { Some(DeniedQuery { id: *query_id, @@ -175,8 +187,8 @@ impl QueryStatusKey for ViewCreateRequest { // Since this is potentially mutating, we need to apply F to both the in-memory and the // persistent version of the status. f(cache.id_to_status.get_mut(&id).as_deref_mut()); - let mut persistent_status = cache.persistent_handle.statuses.get_mut(&id); - let transformed_status = persistent_status.as_deref_mut().map(|(_, status)| status); + let mut statuses = cache.persistent_handle.statuses.write(); + let transformed_status = statuses.get_mut(&id).map(|(_, status)| status); f(transformed_status) } } @@ -200,8 +212,8 @@ impl QueryStatusKey for String { // persistent version of the status. f(cache.id_to_status.get_mut(&id).as_deref_mut()); - let mut persistent_status = cache.persistent_handle.statuses.get_mut(&id); - let transformed_status = persistent_status.as_deref_mut().map(|(_, status)| status); + let mut statuses = cache.persistent_handle.statuses.write(); + let transformed_status = statuses.get_mut(&id).map(|(_, status)| status); f(transformed_status) } } @@ -292,28 +304,7 @@ impl QueryStatusCache { let query_state = self.id_to_status.get(&id); match query_state { - Some(s) => { - debug_assert!( - self.persistent_handle - .statuses - .get(&id) - .map(|entry| entry.value().0.clone()) - .expect("query not found") - == q.clone().into(), - "mismatch between calculated and cached id/query" - ); - debug_assert!( - self.persistent_handle - .statuses - .get(&id) - .map(|entry| entry.value().1.clone()) - .expect("query not found") - == *s, - "mismatch between calculated and cached id/status" - ); - - (id, s.value().migration_state.clone()) - } + Some(s) => (id, s.value().migration_state.clone()), None => self.insert(q.clone()), } } @@ -604,12 +595,11 @@ impl QueryStatusCache { v.migration_state = MigrationState::Pending; v.always = false; }); - self.persistent_handle - .statuses + let mut statuses = self.persistent_handle.statuses.write(); + statuses .iter_mut() - .filter(|entry| entry.value().1.is_successful()) - .for_each(|mut entry| { - let (_, status) = entry.pair_mut().1; + .filter(|(_query_id, (_query, status))| status.is_successful()) + .for_each(|(_query_id, (_query, ref mut status))| { status.migration_state = MigrationState::Pending; status.always = false; }); @@ -697,11 +687,10 @@ impl QueryStatusCache { /// /// Does not include any queries that require inlining. pub fn pending_migration(&self) -> QueryList { - self.persistent_handle - .statuses + let statuses = self.persistent_handle.statuses.read(); + statuses .iter() - .filter_map(|entry| { - let (_query_id, (query, status)) = entry.pair(); + .filter_map(|(_query_id, (query, status))| { if status.is_pending() { Some((query.clone(), status.clone())) } else { @@ -725,10 +714,8 @@ impl QueryStatusCache { /// Returns a query given a query hash pub fn query(&self, id: &str) -> Option { let id = QueryId::new(u64::from_str_radix(id.strip_prefix("q_")?, 16).ok()?); - self.persistent_handle - .statuses - .get(&id) - .map(|entry| entry.value().0.clone()) + let statuses = self.persistent_handle.statuses.read(); + statuses.peek(&id).map(|(query, _status)| query.clone()) } } @@ -794,23 +781,15 @@ mod tests { cache.insert(q1.clone()); - assert!(cache - .persistent_handle - .statuses + let mut statuses = cache.persistent_handle.statuses.write(); + assert!(statuses .iter() - .map(|entry| entry.value().0.clone()) + .map(|(_, (q, _))| q.clone()) .any(|q| q == q1.clone().into())); - assert!(cache - .persistent_handle - .statuses - .insert(id, (q1.into(), status.clone())) - .is_some()); + assert!(statuses.put(id, (q1.into(), status.clone())).is_some()); - assert_eq!( - cache.persistent_handle.statuses.get(&id).unwrap().value().1, - status - ); + assert_eq!(statuses.get(&id).unwrap().1, status); } #[test] @@ -822,23 +801,15 @@ mod tests { cache.insert(q1.clone()); - assert!(cache - .persistent_handle - .statuses + let mut statuses = cache.persistent_handle.statuses.write(); + assert!(statuses .iter() - .map(|entry| entry.value().0.clone()) + .map(|(_, (q, _))| q.clone()) .any(|q| q == q1.clone().into())); - assert!(cache - .persistent_handle - .statuses - .insert(id, (q1.into(), status.clone())) - .is_some()); + assert!(statuses.put(id, (q1.into(), status.clone())).is_some()); - assert_eq!( - cache.persistent_handle.statuses.get(&id).unwrap().value().1, - status - ); + assert_eq!(statuses.get(&id).unwrap().1, status); } #[test]