Skip to content

Commit

Permalink
adapter: Make SHOW CACHES persistent
Browse files Browse the repository at this point in the history
Previously, `SHOW CACHES` used the query status cache as its source of
truth. This meant that after a restart, `SHOW CACHES` would not reflect
any caches that existed on the server until after the adapter received a
query against that cache.

This commit has `SHOW CACHES` use the expression registry as its source
of truth, which is what `EXPLAIN CACHES` uses. This enables the adapter
to display an accurate, up-to-date list of caches via `SHOW CACHES`,
even after a restart.

Refs: REA-3516, REA-3349
Release-Note-Core: Added the ability for the `SHOW CACHES` SQL extension
  to display an up-to-date list of caches, even after a restart
Change-Id: I822dd2cc1f061eb4ae22c9e0bc13534eb9c142be
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/6546
Tested-by: Buildkite CI
Reviewed-by: Luke Osborne <luke@readyset.io>
  • Loading branch information
ethan-readyset committed Jan 4, 2024
1 parent 23f8d31 commit dc16282
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 62 deletions.
33 changes: 15 additions & 18 deletions readyset-adapter/src/backend.rs
Expand Up @@ -1993,13 +1993,14 @@ where
/// Responds to a `SHOW CACHES` query
async fn show_caches(
&mut self,
query_id: &Option<String>,
query_id: Option<&str>,
) -> ReadySetResult<noria_connector::QueryResult<'static>> {
let mut queries = self.state.query_status_cache.allow_list();
let mut views = self.noria.verbose_views().await?;

// Filter on query ID
if let Some(q_id) = query_id {
queries.retain(|(id, _, _)| id.to_string() == *q_id);
if let Some(unparsed_query_id) = query_id {
let query_id = unparsed_query_id.parse()?;
views.retain(|view| view.query_id == query_id);
}

let select_schema = if let Some(handle) = self.metrics_handle.as_mut() {
Expand All @@ -2018,17 +2019,12 @@ where

// Get the cache name for each query from the view cache
let mut results: Vec<Vec<DfValue>> = vec![];
for (id, view, status) in queries {
let mut row = vec![
id.to_string().into(),
self.noria
.get_view_name(&view.statement, false, false, None)
.await?
.display_unquoted()
.to_string()
.into(),
Self::format_query_text(view.statement.display(DB::SQL_DIALECT).to_string()).into(),
if status.always {
for view in views {
let mut row: Vec<DfValue> = vec![
view.query_id.to_string().into(),
view.name.display_unquoted().to_string().into(),
Self::format_query_text(view.display(DB::SQL_DIALECT).to_string()).into(),
if view.always {
"no fallback".into()
} else {
"fallback allowed".into()
Expand All @@ -2037,8 +2033,9 @@ where

// Append metrics if we have them
if let Some(handle) = self.metrics_handle.as_ref() {
let MetricsSummary { sample_count } =
handle.metrics_summary(id.to_string()).unwrap_or_default();
let MetricsSummary { sample_count } = handle
.metrics_summary(view.query_id.to_string())
.unwrap_or_default();
row.push(DfValue::from(format!("{sample_count}")));
}

Expand Down Expand Up @@ -2270,7 +2267,7 @@ where
trace!("No telemetry sender. not sending metric for SHOW CACHES");
}

self.show_caches(query_id).await
self.show_caches(query_id.as_deref()).await
}
SqlQuery::Show(ShowStatement::ReadySetStatus) => Ok(self
.status_reporter
Expand Down
8 changes: 6 additions & 2 deletions readyset-adapter/src/backend/noria_connector.rs
Expand Up @@ -13,6 +13,7 @@ use readyset_client::consistency::Timestamp;
use readyset_client::internal::LocalNodeIndex;
use readyset_client::query::QueryId;
use readyset_client::recipe::changelist::{Change, ChangeList, IntoChanges};
use readyset_client::recipe::CacheExpr;
use readyset_client::results::{ResultIterator, Results};
use readyset_client::{
ColumnSchema, GraphvizOptions, ReadQuery, ReaderAddress, ReaderHandle, ReadySetHandle,
Expand Down Expand Up @@ -539,9 +540,12 @@ impl NoriaConnector {
Ok(QueryResult::from_owned(schema, vec![Results::new(data)]))
}

pub(crate) async fn verbose_views(&mut self) -> ReadySetResult<Vec<CacheExpr>> {
self.inner.get_mut()?.noria.verbose_views().await
}

pub(crate) async fn list_create_cache_stmts(&mut self) -> ReadySetResult<Vec<String>> {
let noria = &mut self.inner.get_mut()?.noria;
Ok(noria
Ok(self
.verbose_views()
.await?
.into_iter()
Expand Down
6 changes: 3 additions & 3 deletions readyset-client/src/controller.rs
Expand Up @@ -8,7 +8,7 @@ use std::time::{Duration, Instant};

use futures_util::future;
use hyper::client::HttpConnector;
use nom_sql::{CreateCacheStatement, NonReplicatedRelation, Relation};
use nom_sql::{NonReplicatedRelation, Relation};
use parking_lot::RwLock;
use petgraph::graph::NodeIndex;
use readyset_errors::{
Expand All @@ -29,7 +29,7 @@ use crate::debug::stats;
use crate::internal::{DomainIndex, ReplicaAddress};
use crate::metrics::MetricsDump;
use crate::recipe::changelist::ChangeList;
use crate::recipe::{ExtendRecipeResult, ExtendRecipeSpec, MigrationStatus};
use crate::recipe::{CacheExpr, ExtendRecipeResult, ExtendRecipeSpec, MigrationStatus};
use crate::status::ReadySetControllerStatus;
use crate::table::{PersistencePoint, Table, TableBuilder, TableRpc};
use crate::view::{View, ViewBuilder, ViewRpc};
Expand Down Expand Up @@ -489,7 +489,7 @@ impl ReadySetHandle {
///
/// `Self::poll_ready` must have returned `Async::Ready` before you call
/// this method.
pub async fn verbose_views(&mut self) -> ReadySetResult<Vec<CreateCacheStatement>> {
pub async fn verbose_views(&mut self) -> ReadySetResult<Vec<CacheExpr>> {
self.simple_post_request("verbose_views").await
}

Expand Down
38 changes: 38 additions & 0 deletions readyset-client/src/recipe/mod.rs
Expand Up @@ -5,9 +5,12 @@ pub mod changelist;
use std::borrow::Cow;
use std::fmt::Display;

use nom_sql::{CacheInner, CreateCacheStatement, DialectDisplay, Relation, SelectStatement};
use readyset_errors::ReadySetError;
use readyset_util::fmt::fmt_with;
use serde::{Deserialize, Serialize};

use crate::query::QueryId;
pub use crate::recipe::changelist::ChangeList;
use crate::ReplicationOffset;

Expand Down Expand Up @@ -90,3 +93,38 @@ impl MigrationStatus {
matches!(self, Self::Pending)
}
}

/// The representation of a cache as it exists in the expression registry.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheExpr {
pub name: Relation,
pub statement: SelectStatement,
pub always: bool,
pub query_id: QueryId,
}

impl From<CacheExpr> for CreateCacheStatement {
fn from(value: CacheExpr) -> Self {
CreateCacheStatement {
name: Some(value.name),
inner: Ok(CacheInner::Statement(Box::new(value.statement))),
always: value.always,
// CacheExpr represents a migrated query, and the below fields are not relevant for an
// already-migrated query
concurrently: false,
unparsed_create_cache_statement: None,
}
}
}

impl DialectDisplay for CacheExpr {
fn display(&self, dialect: nom_sql::Dialect) -> impl std::fmt::Display + '_ {
fmt_with(move |f| {
write!(
f,
"{}",
CreateCacheStatement::from(self.clone()).display(dialect)
)
})
}
}
2 changes: 1 addition & 1 deletion readyset-server/src/controller/sql/mod.rs
Expand Up @@ -30,7 +30,7 @@ use self::query_graph::to_query_graph;
pub(crate) use self::recipe::{ExprId, Recipe, Schema};
use self::registry::ExprRegistry;
use crate::controller::mir_to_flow::{mir_node_to_flow_parts, mir_query_to_flow_parts};
use crate::controller::sql::registry::RecipeExpr;
pub(crate) use crate::controller::sql::registry::RecipeExpr;
use crate::controller::Migration;
use crate::sql::mir::MirRemovalResult;
use crate::ReuseConfigType;
Expand Down
36 changes: 4 additions & 32 deletions readyset-server/src/controller/sql/recipe/mod.rs
@@ -1,9 +1,6 @@
use std::{fmt, str};

use nom_sql::{
CacheInner, CreateCacheStatement, CreateTableStatement, CreateViewStatement, Relation,
SelectStatement, SqlIdentifier, SqlQuery,
};
use nom_sql::{Relation, SelectStatement, SqlIdentifier};
use petgraph::graph::NodeIndex;
use readyset_client::recipe::changelist::ChangeList;
use readyset_client::ViewCreateRequest;
Expand Down Expand Up @@ -91,34 +88,9 @@ pub(crate) enum Schema<'a> {
}

impl Recipe {
/// Get the id associated with an alias
pub(crate) fn expression_by_alias(&self, alias: &Relation) -> Option<SqlQuery> {
let expr = self.inc.registry.get(alias).map(|e| match e {
RecipeExpr::Table { name, body, .. } => SqlQuery::CreateTable(CreateTableStatement {
if_not_exists: false,
table: name.clone(),
body: Ok(body.clone()),
options: Ok(vec![]),
}),
RecipeExpr::View { name, definition } => SqlQuery::CreateView(CreateViewStatement {
name: name.clone(),
or_replace: false,
fields: vec![],
definition: Ok(Box::new(definition.clone())),
}),
RecipeExpr::Cache {
name,
statement,
always,
..
} => SqlQuery::CreateCache(CreateCacheStatement {
name: Some(name.clone()),
inner: Ok(CacheInner::Statement(Box::new(statement.clone()))),
unparsed_create_cache_statement: None, // not relevant after migrating
always: *always,
concurrently: false, // concurrently not relevant after migrating
}),
});
/// Get the [`RecipeExpr`] associated with an alias
pub(crate) fn expression_by_alias(&self, alias: &Relation) -> Option<RecipeExpr> {
let expr = self.inc.registry.get(alias).cloned();
if expr.is_none() {
warn!(alias = %alias.display_unquoted(), "Query not found in expression registry");
}
Expand Down
21 changes: 15 additions & 6 deletions readyset-server/src/controller/state.rs
Expand Up @@ -31,7 +31,7 @@ use failpoint_macros::set_failpoint;
use futures::stream::{self, FuturesUnordered, StreamExt, TryStreamExt};
use futures::{FutureExt, TryFutureExt, TryStream};
use metrics::{gauge, histogram};
use nom_sql::{CreateCacheStatement, NonReplicatedRelation, Relation, SqlIdentifier, SqlQuery};
use nom_sql::{NonReplicatedRelation, Relation, SqlIdentifier};
use petgraph::visit::{Bfs, IntoNodeReferences};
use petgraph::Direction;
use rand::Rng;
Expand All @@ -46,7 +46,7 @@ use readyset_client::failpoints;
use readyset_client::internal::{MaterializationStatus, ReplicaAddress};
use readyset_client::metrics::recorded;
use readyset_client::recipe::changelist::{Change, ChangeList};
use readyset_client::recipe::ExtendRecipeSpec;
use readyset_client::recipe::{CacheExpr, ExtendRecipeSpec};
use readyset_client::{
PersistencePoint, SingleKeyEviction, TableReplicationStatus, TableStatus, ViewCreateRequest,
ViewFilter, ViewRequest, ViewSchema,
Expand All @@ -69,7 +69,7 @@ use crate::controller::domain_handle::DomainHandle;
use crate::controller::migrate::materialization::Materializations;
use crate::controller::migrate::scheduling::Scheduler;
use crate::controller::migrate::{routing, DomainMigrationMode, DomainMigrationPlan, Migration};
use crate::controller::sql::Schema;
use crate::controller::sql::{RecipeExpr, Schema};
use crate::controller::{
schema, ControllerState, DomainPlacementRestriction, NodeRestrictionKey, Worker,
WorkerIdentifier,
Expand Down Expand Up @@ -264,7 +264,7 @@ impl DfState {
/// Get a map of all known views created from `CREATE CACHE` statements, mapping the name of the
/// view to a tuple of (`SelectStatement`, always) where always is a bool that indicates whether
/// the `CREATE CACHE` statement was created with the optional `ALWAYS` argument.
pub(super) fn verbose_views(&self) -> Vec<CreateCacheStatement> {
pub(super) fn verbose_views(&self) -> Vec<CacheExpr> {
self.ingredients
.externals(petgraph::EdgeDirection::Outgoing)
.filter_map(|n| {
Expand All @@ -281,8 +281,17 @@ impl DfState {

// Only return ingredients created from "CREATE CACHE"
match query {
// CacheInner::ID should have been expanded to CacheInner::Statement
SqlQuery::CreateCache(stmt) if stmt.inner.is_ok() => Some(stmt),
RecipeExpr::Cache {
name,
statement,
always,
query_id,
} => Some(CacheExpr {
name,
statement,
always,
query_id,
}),
_ => None,
}
} else {
Expand Down

0 comments on commit dc16282

Please sign in to comment.