From 456eb1751febb84cc5143440a8638feedcd30f7b Mon Sep 17 00:00:00 2001 From: Luke Osborne Date: Mon, 9 Oct 2023 12:15:37 -0400 Subject: [PATCH] adapter: Limit the number of stored queries in QSC This is a temporary measure in order to limit the memory usage of the query status cache by changing the persistent handle (which still isn't actually persistent yet, but will be) to use an LRUCache as its underlying store with a limit of 100_000. The id_to_status map is able to "remember" the query status for more than this number of queries, but we will only keep the full query for 100_000 queries. The main goal of this is to remove this as a variable in our locust testing before the fully persistent refactor can be finished. Change-Id: I9d39590c3e797db4c0ad40244ad47fce32cb0e51 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/6178 Reviewed-by: Jason Brown Tested-by: Buildkite CI --- Cargo.lock | 26 ++++ readyset-adapter/Cargo.toml | 3 +- readyset-adapter/src/query_status_cache.rs | 151 +++++++++------------ 3 files changed, 89 insertions(+), 91 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ef586274cf..833f0fa52a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -48,6 +48,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" + [[package]] name = "android_system_properties" version = "0.1.4" @@ -2060,6 +2066,16 @@ dependencies = [ "ahash 0.8.3", ] +[[package]] +name = "hashbrown" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" +dependencies = [ + "ahash 0.8.3", + "allocator-api2", +] + [[package]] name = "hdrhistogram" version = "7.5.2" @@ -2705,6 +2721,15 @@ dependencies = [ "hashbrown 0.13.1", ] +[[package]] +name = "lru" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60" +dependencies = [ + "hashbrown 0.14.1", +] + [[package]] name = "lru-cache" version = "0.1.2" @@ -4392,6 +4417,7 @@ dependencies = [ "indexmap", "itertools", "lazy_static", + "lru 0.12.0", "metrics", "metrics-exporter-prometheus", "metrics-util", diff --git a/readyset-adapter/Cargo.toml b/readyset-adapter/Cargo.toml index dd0c496df8..7bedc19a65 100644 --- a/readyset-adapter/Cargo.toml +++ b/readyset-adapter/Cargo.toml @@ -63,6 +63,7 @@ readyset-sql-passes = { path = "../readyset-sql-passes" } readyset-version = { path = "../readyset-version" } health-reporter = { path = "../health-reporter" } database-utils = { path = "../database-utils" } +lru = "0.12.0" [dev-dependencies] proptest = "1.0.0" @@ -78,4 +79,4 @@ harness = false [features] ryw = [] -failure_injection = ["fail/failpoints"] \ No newline at end of file +failure_injection = ["fail/failpoints"] diff --git a/readyset-adapter/src/query_status_cache.rs b/readyset-adapter/src/query_status_cache.rs index 4ee2d5c679..d8e4ef2507 100644 --- a/readyset-adapter/src/query_status_cache.rs +++ b/readyset-adapter/src/query_status_cache.rs @@ -11,12 +11,16 @@ use anyhow::anyhow; use clap::ValueEnum; use dashmap::mapref::entry::Entry; use dashmap::DashMap; +use lru::LruCache; +use parking_lot::RwLock; use readyset_client::query::*; use readyset_client::ViewCreateRequest; use readyset_data::DfValue; use readyset_util::hash::hash; use tracing::error; +const NUM_PERSISTED_QUERIES: usize = 100_000; + /// A metadata cache for all queries that have been processed by this /// adapter. Thread-safe. #[derive(Debug)] @@ -44,50 +48,60 @@ pub struct QueryStatusCache { enable_experimental_placeholder_inlining: bool, } -#[derive(Debug, Default)] +#[derive(Debug)] /// A handle to persistent metadata for all queries that have been processed by this adapter. pub struct PersistentStatusCacheHandle { - /// A Thread-safe hash map that holds the full [`Query`] as well as its associated - /// [`QueryStatus`]. The `QueryStatus` must match the one in - /// [`QueryStatusCache::id_to_status`]. - statuses: DashMap, + /// An [`LRUCache`] that holds the full [`Query`] as well as its associated + /// [`QueryStatus`] for a fixed number of queries. + statuses: RwLock>, /// List of pending inlined migrations. Contains the query to be inlined, and the sets of /// parameters to use for inlining. pending_inlined_migrations: DashMap>>, } +impl Default for PersistentStatusCacheHandle { + fn default() -> Self { + Self { + statuses: RwLock::new(LruCache::new( + NUM_PERSISTED_QUERIES + .try_into() + .expect("num persisted queries is not zero"), + )), + pending_inlined_migrations: Default::default(), + } + } +} + impl PersistentStatusCacheHandle { fn insert_with_status(&self, q: Query, id: QueryId, status: QueryStatus) { - self.statuses.insert(id, (q, status)); + let mut statuses = self.statuses.write(); + statuses.put(id, (q, status)); } fn allow_list(&self) -> Vec<(QueryId, Arc, QueryStatus)> { - self.statuses + let statuses = self.statuses.read(); + statuses .iter() - .filter_map(|entry| { - let (query_id, (query, status)) = entry.pair(); - match query { - Query::Parsed(view) => { - if status.is_successful() { - Some((*query_id, view.clone(), status.clone())) - } else { - None - } + .filter_map(|(query_id, (query, status))| match query { + Query::Parsed(view) => { + if status.is_successful() { + Some((*query_id, view.clone(), status.clone())) + } else { + None } - Query::ParseFailed(_) => None, } + Query::ParseFailed(_) => None, }) .collect::>() } fn deny_list(&self, style: MigrationStyle) -> Vec { + let statuses = self.statuses.read(); match style { - MigrationStyle::Async | MigrationStyle::InRequestPath => self - .statuses + MigrationStyle::Async | MigrationStyle::InRequestPath => statuses .iter() - .filter_map(|entry| { - let (query_id, (query, status)) = entry.pair(); + .filter_map(|(query_id, (query, status))| { if status.is_unsupported() || status.is_dropped() { Some(DeniedQuery { id: *query_id, @@ -99,11 +113,9 @@ impl PersistentStatusCacheHandle { } }) .collect::>(), - MigrationStyle::Explicit => self - .statuses + MigrationStyle::Explicit => statuses .iter() - .filter_map(|entry| { - let (query_id, (query, status)) = entry.pair(); + .filter_map(|(query_id, (query, status))| { if status.is_denied() { Some(DeniedQuery { id: *query_id, @@ -175,8 +187,8 @@ impl QueryStatusKey for ViewCreateRequest { // Since this is potentially mutating, we need to apply F to both the in-memory and the // persistent version of the status. f(cache.id_to_status.get_mut(&id).as_deref_mut()); - let mut persistent_status = cache.persistent_handle.statuses.get_mut(&id); - let transformed_status = persistent_status.as_deref_mut().map(|(_, status)| status); + let mut statuses = cache.persistent_handle.statuses.write(); + let transformed_status = statuses.get_mut(&id).map(|(_, status)| status); f(transformed_status) } } @@ -200,8 +212,8 @@ impl QueryStatusKey for String { // persistent version of the status. f(cache.id_to_status.get_mut(&id).as_deref_mut()); - let mut persistent_status = cache.persistent_handle.statuses.get_mut(&id); - let transformed_status = persistent_status.as_deref_mut().map(|(_, status)| status); + let mut statuses = cache.persistent_handle.statuses.write(); + let transformed_status = statuses.get_mut(&id).map(|(_, status)| status); f(transformed_status) } } @@ -292,28 +304,7 @@ impl QueryStatusCache { let query_state = self.id_to_status.get(&id); match query_state { - Some(s) => { - debug_assert!( - self.persistent_handle - .statuses - .get(&id) - .map(|entry| entry.value().0.clone()) - .expect("query not found") - == q.clone().into(), - "mismatch between calculated and cached id/query" - ); - debug_assert!( - self.persistent_handle - .statuses - .get(&id) - .map(|entry| entry.value().1.clone()) - .expect("query not found") - == *s, - "mismatch between calculated and cached id/status" - ); - - (id, s.value().migration_state.clone()) - } + Some(s) => (id, s.value().migration_state.clone()), None => self.insert(q.clone()), } } @@ -604,12 +595,11 @@ impl QueryStatusCache { v.migration_state = MigrationState::Pending; v.always = false; }); - self.persistent_handle - .statuses + let mut statuses = self.persistent_handle.statuses.write(); + statuses .iter_mut() - .filter(|entry| entry.value().1.is_successful()) - .for_each(|mut entry| { - let (_, status) = entry.pair_mut().1; + .filter(|(_query_id, (_query, status))| status.is_successful()) + .for_each(|(_query_id, (_query, ref mut status))| { status.migration_state = MigrationState::Pending; status.always = false; }); @@ -697,11 +687,10 @@ impl QueryStatusCache { /// /// Does not include any queries that require inlining. pub fn pending_migration(&self) -> QueryList { - self.persistent_handle - .statuses + let statuses = self.persistent_handle.statuses.read(); + statuses .iter() - .filter_map(|entry| { - let (_query_id, (query, status)) = entry.pair(); + .filter_map(|(_query_id, (query, status))| { if status.is_pending() { Some((query.clone(), status.clone())) } else { @@ -725,10 +714,8 @@ impl QueryStatusCache { /// Returns a query given a query hash pub fn query(&self, id: &str) -> Option { let id = QueryId::new(u64::from_str_radix(id.strip_prefix("q_")?, 16).ok()?); - self.persistent_handle - .statuses - .get(&id) - .map(|entry| entry.value().0.clone()) + let statuses = self.persistent_handle.statuses.read(); + statuses.peek(&id).map(|(query, _status)| query.clone()) } } @@ -794,23 +781,15 @@ mod tests { cache.insert(q1.clone()); - assert!(cache - .persistent_handle - .statuses + let mut statuses = cache.persistent_handle.statuses.write(); + assert!(statuses .iter() - .map(|entry| entry.value().0.clone()) + .map(|(_, (q, _))| q.clone()) .any(|q| q == q1.clone().into())); - assert!(cache - .persistent_handle - .statuses - .insert(id, (q1.into(), status.clone())) - .is_some()); + assert!(statuses.put(id, (q1.into(), status.clone())).is_some()); - assert_eq!( - cache.persistent_handle.statuses.get(&id).unwrap().value().1, - status - ); + assert_eq!(statuses.get(&id).unwrap().1, status); } #[test] @@ -822,23 +801,15 @@ mod tests { cache.insert(q1.clone()); - assert!(cache - .persistent_handle - .statuses + let mut statuses = cache.persistent_handle.statuses.write(); + assert!(statuses .iter() - .map(|entry| entry.value().0.clone()) + .map(|(_, (q, _))| q.clone()) .any(|q| q == q1.clone().into())); - assert!(cache - .persistent_handle - .statuses - .insert(id, (q1.into(), status.clone())) - .is_some()); + assert!(statuses.put(id, (q1.into(), status.clone())).is_some()); - assert_eq!( - cache.persistent_handle.statuses.get(&id).unwrap().value().1, - status - ); + assert_eq!(statuses.get(&id).unwrap().1, status); } #[test]