Skip to content

Commit

Permalink
adapter: Limit the number of stored queries in QSC
Browse files Browse the repository at this point in the history
This is a temporary measure in order to limit the memory usage of the
query status cache by changing the persistent handle (which still isn't
actually persistent yet, but will be) to use an LRUCache as its
underlying store with a limit of 100_000. The id_to_status map is able
to "remember" the query status for more than this number of queries, but
we will only keep the full query for 100_000 queries.

The main goal of this is to remove this as a variable in our locust
testing before the fully persistent refactor can be finished.

Change-Id: I9d39590c3e797db4c0ad40244ad47fce32cb0e51
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/6178
Reviewed-by: Jason Brown <jason.b@readyset.io>
Tested-by: Buildkite CI
  • Loading branch information
lukoktonos committed Oct 9, 2023
1 parent 2a565b3 commit 456eb17
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 91 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion readyset-adapter/Cargo.toml
Expand Up @@ -63,6 +63,7 @@ readyset-sql-passes = { path = "../readyset-sql-passes" }
readyset-version = { path = "../readyset-version" }
health-reporter = { path = "../health-reporter" }
database-utils = { path = "../database-utils" }
lru = "0.12.0"

[dev-dependencies]
proptest = "1.0.0"
Expand All @@ -78,4 +79,4 @@ harness = false

[features]
ryw = []
failure_injection = ["fail/failpoints"]
failure_injection = ["fail/failpoints"]
151 changes: 61 additions & 90 deletions readyset-adapter/src/query_status_cache.rs
Expand Up @@ -11,12 +11,16 @@ use anyhow::anyhow;
use clap::ValueEnum;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use lru::LruCache;
use parking_lot::RwLock;
use readyset_client::query::*;
use readyset_client::ViewCreateRequest;
use readyset_data::DfValue;
use readyset_util::hash::hash;
use tracing::error;

const NUM_PERSISTED_QUERIES: usize = 100_000;

/// A metadata cache for all queries that have been processed by this
/// adapter. Thread-safe.
#[derive(Debug)]
Expand Down Expand Up @@ -44,50 +48,60 @@ pub struct QueryStatusCache {
enable_experimental_placeholder_inlining: bool,
}

#[derive(Debug, Default)]
#[derive(Debug)]
/// A handle to persistent metadata for all queries that have been processed by this adapter.
pub struct PersistentStatusCacheHandle {
/// A Thread-safe hash map that holds the full [`Query`] as well as its associated
/// [`QueryStatus`]. The `QueryStatus` must match the one in
/// [`QueryStatusCache::id_to_status`].
statuses: DashMap<QueryId, (Query, QueryStatus), ahash::RandomState>,
/// An [`LRUCache`] that holds the full [`Query`] as well as its associated
/// [`QueryStatus`] for a fixed number of queries.
statuses: RwLock<LruCache<QueryId, (Query, QueryStatus)>>,

/// List of pending inlined migrations. Contains the query to be inlined, and the sets of
/// parameters to use for inlining.
pending_inlined_migrations: DashMap<ViewCreateRequest, HashSet<Vec<DfValue>>>,
}

impl Default for PersistentStatusCacheHandle {
fn default() -> Self {
Self {
statuses: RwLock::new(LruCache::new(
NUM_PERSISTED_QUERIES
.try_into()
.expect("num persisted queries is not zero"),
)),
pending_inlined_migrations: Default::default(),
}
}
}

impl PersistentStatusCacheHandle {
fn insert_with_status(&self, q: Query, id: QueryId, status: QueryStatus) {
self.statuses.insert(id, (q, status));
let mut statuses = self.statuses.write();
statuses.put(id, (q, status));
}

fn allow_list(&self) -> Vec<(QueryId, Arc<ViewCreateRequest>, QueryStatus)> {
self.statuses
let statuses = self.statuses.read();
statuses
.iter()
.filter_map(|entry| {
let (query_id, (query, status)) = entry.pair();
match query {
Query::Parsed(view) => {
if status.is_successful() {
Some((*query_id, view.clone(), status.clone()))
} else {
None
}
.filter_map(|(query_id, (query, status))| match query {
Query::Parsed(view) => {
if status.is_successful() {
Some((*query_id, view.clone(), status.clone()))
} else {
None
}
Query::ParseFailed(_) => None,
}
Query::ParseFailed(_) => None,
})
.collect::<Vec<_>>()
}

fn deny_list(&self, style: MigrationStyle) -> Vec<DeniedQuery> {
let statuses = self.statuses.read();
match style {
MigrationStyle::Async | MigrationStyle::InRequestPath => self
.statuses
MigrationStyle::Async | MigrationStyle::InRequestPath => statuses
.iter()
.filter_map(|entry| {
let (query_id, (query, status)) = entry.pair();
.filter_map(|(query_id, (query, status))| {
if status.is_unsupported() || status.is_dropped() {
Some(DeniedQuery {
id: *query_id,
Expand All @@ -99,11 +113,9 @@ impl PersistentStatusCacheHandle {
}
})
.collect::<Vec<_>>(),
MigrationStyle::Explicit => self
.statuses
MigrationStyle::Explicit => statuses
.iter()
.filter_map(|entry| {
let (query_id, (query, status)) = entry.pair();
.filter_map(|(query_id, (query, status))| {
if status.is_denied() {
Some(DeniedQuery {
id: *query_id,
Expand Down Expand Up @@ -175,8 +187,8 @@ impl QueryStatusKey for ViewCreateRequest {
// Since this is potentially mutating, we need to apply F to both the in-memory and the
// persistent version of the status.
f(cache.id_to_status.get_mut(&id).as_deref_mut());
let mut persistent_status = cache.persistent_handle.statuses.get_mut(&id);
let transformed_status = persistent_status.as_deref_mut().map(|(_, status)| status);
let mut statuses = cache.persistent_handle.statuses.write();
let transformed_status = statuses.get_mut(&id).map(|(_, status)| status);
f(transformed_status)
}
}
Expand All @@ -200,8 +212,8 @@ impl QueryStatusKey for String {
// persistent version of the status.
f(cache.id_to_status.get_mut(&id).as_deref_mut());

let mut persistent_status = cache.persistent_handle.statuses.get_mut(&id);
let transformed_status = persistent_status.as_deref_mut().map(|(_, status)| status);
let mut statuses = cache.persistent_handle.statuses.write();
let transformed_status = statuses.get_mut(&id).map(|(_, status)| status);
f(transformed_status)
}
}
Expand Down Expand Up @@ -292,28 +304,7 @@ impl QueryStatusCache {
let query_state = self.id_to_status.get(&id);

match query_state {
Some(s) => {
debug_assert!(
self.persistent_handle
.statuses
.get(&id)
.map(|entry| entry.value().0.clone())
.expect("query not found")
== q.clone().into(),
"mismatch between calculated and cached id/query"
);
debug_assert!(
self.persistent_handle
.statuses
.get(&id)
.map(|entry| entry.value().1.clone())
.expect("query not found")
== *s,
"mismatch between calculated and cached id/status"
);

(id, s.value().migration_state.clone())
}
Some(s) => (id, s.value().migration_state.clone()),
None => self.insert(q.clone()),
}
}
Expand Down Expand Up @@ -604,12 +595,11 @@ impl QueryStatusCache {
v.migration_state = MigrationState::Pending;
v.always = false;
});
self.persistent_handle
.statuses
let mut statuses = self.persistent_handle.statuses.write();
statuses
.iter_mut()
.filter(|entry| entry.value().1.is_successful())
.for_each(|mut entry| {
let (_, status) = entry.pair_mut().1;
.filter(|(_query_id, (_query, status))| status.is_successful())
.for_each(|(_query_id, (_query, ref mut status))| {
status.migration_state = MigrationState::Pending;
status.always = false;
});
Expand Down Expand Up @@ -697,11 +687,10 @@ impl QueryStatusCache {
///
/// Does not include any queries that require inlining.
pub fn pending_migration(&self) -> QueryList {
self.persistent_handle
.statuses
let statuses = self.persistent_handle.statuses.read();
statuses
.iter()
.filter_map(|entry| {
let (_query_id, (query, status)) = entry.pair();
.filter_map(|(_query_id, (query, status))| {
if status.is_pending() {
Some((query.clone(), status.clone()))
} else {
Expand All @@ -725,10 +714,8 @@ impl QueryStatusCache {
/// Returns a query given a query hash
pub fn query(&self, id: &str) -> Option<Query> {
let id = QueryId::new(u64::from_str_radix(id.strip_prefix("q_")?, 16).ok()?);
self.persistent_handle
.statuses
.get(&id)
.map(|entry| entry.value().0.clone())
let statuses = self.persistent_handle.statuses.read();
statuses.peek(&id).map(|(query, _status)| query.clone())
}
}

Expand Down Expand Up @@ -794,23 +781,15 @@ mod tests {

cache.insert(q1.clone());

assert!(cache
.persistent_handle
.statuses
let mut statuses = cache.persistent_handle.statuses.write();
assert!(statuses
.iter()
.map(|entry| entry.value().0.clone())
.map(|(_, (q, _))| q.clone())
.any(|q| q == q1.clone().into()));

assert!(cache
.persistent_handle
.statuses
.insert(id, (q1.into(), status.clone()))
.is_some());
assert!(statuses.put(id, (q1.into(), status.clone())).is_some());

assert_eq!(
cache.persistent_handle.statuses.get(&id).unwrap().value().1,
status
);
assert_eq!(statuses.get(&id).unwrap().1, status);
}

#[test]
Expand All @@ -822,23 +801,15 @@ mod tests {

cache.insert(q1.clone());

assert!(cache
.persistent_handle
.statuses
let mut statuses = cache.persistent_handle.statuses.write();
assert!(statuses
.iter()
.map(|entry| entry.value().0.clone())
.map(|(_, (q, _))| q.clone())
.any(|q| q == q1.clone().into()));

assert!(cache
.persistent_handle
.statuses
.insert(id, (q1.into(), status.clone()))
.is_some());
assert!(statuses.put(id, (q1.into(), status.clone())).is_some());

assert_eq!(
cache.persistent_handle.statuses.get(&id).unwrap().value().1,
status
);
assert_eq!(statuses.get(&id).unwrap().1, status);
}

#[test]
Expand Down

0 comments on commit 456eb17

Please sign in to comment.