Skip to content

Commit

Permalink
adapter: Sync view names with the recipe
Browse files Browse the repository at this point in the history
We were running into a bug where if a cache was created with a name,
Readyset was restarted, and a read against that cache was executed, the
"/view_builder" RPC couldn't find the view associated with the query.
This was happening because the cache was being added to the expression
registry with its name, but after Readyset restarts, the adapter doesn't
know anything about the name. When it invokes "/view_builder," it uses
the adapter query ID as the name in `ViewRequest`, but when the server
looks up that name in the expression registry, it doesn't find anything
(because the cache was added with a name other than the adapter query ID).

This commit fixes the issue by 1) updating the `/view_statuses` controller
RPC to return the names of the queries in the expression registry
instead of just booleans and 2) updating the views synchronizer to
update the view name cache used in the adapter with the names of the
views.

This also fixes an issue where different queries in the query status
cache map to the same server-rewritten query, and versions of the query
other than the version from which the cache was created could not be
used to query the cache. Now, the views synchronizer will map all of
these queries to the same view name.

Fixes: REA-3933
Release-Note-Core: Fixed an issue where caches were not queryable after
  restarts in certain cases
Change-Id: If55846bdec50023585a12f2693dbb7e0e9b28a4a
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/6864
Tested-by: Buildkite CI
Reviewed-by: Luke Osborne <luke@readyset.io>
  • Loading branch information
ethan-readyset committed Feb 20, 2024
1 parent c712538 commit a36d8da
Show file tree
Hide file tree
Showing 15 changed files with 223 additions and 77 deletions.
4 changes: 2 additions & 2 deletions build/docker/grafana/dashboards/perf_debugging.json
Expand Up @@ -749,7 +749,7 @@
"uid": "PBFE396EC0B189D67"
},
"exemplar": true,
"expr": "readyset_controller_rpc_view_statuses_num_queries{}",
"expr": "readyset_controller_rpc_view_names_num_queries{}",
"interval": "",
"legendFormat": "",
"queryType": "randomWalk",
Expand Down Expand Up @@ -1385,4 +1385,4 @@
"uid": "perf_debugging",
"version": 3,
"weekStart": ""
}
}
6 changes: 5 additions & 1 deletion readyset-adapter/src/backend.rs
Expand Up @@ -2235,7 +2235,11 @@ where
// If the QSC didn't have this query, check with the controller to see if a
// view already exists there
if migration_state.is_none()
&& self.noria.get_view_status(view_request.clone()).await?
&& self
.noria
.get_view_name(view_request.clone())
.await?
.is_some()
{
migration_state = Some(MigrationState::Successful);
}
Expand Down
81 changes: 52 additions & 29 deletions readyset-adapter/src/backend/noria_connector.rs
Expand Up @@ -1013,7 +1013,9 @@ impl NoriaConnector {
.map(|_| ())
}

pub(crate) async fn get_view_name(
/// Gets the view name for the given statement from the view name cache, querying the server
/// for the name if it is not cached.
pub(crate) async fn get_view_name_cached(
&mut self,
q: &nom_sql::SelectStatement,
is_prepared: bool,
Expand All @@ -1025,10 +1027,12 @@ impl NoriaConnector {
let view_request = ViewCreateRequest::new(q.clone(), search_path.clone());
self.view_name_cache
.get_mut_or_try_insert_with(&view_request, shared_cache::InsertMode::Shared, async {
let qname: Relation = QueryId::from_select(q, search_path.as_slice()).into();

// add the query to ReadySet
if create_if_not_exist {
// If create_if_not_exist is true, Readyset is configured to create caches on
// the fly. Even if a view already does exist for the given query we still want
// to send over a `CreateCache` change to ensure that new invalidating tables
// are being created if the schema search path has changed
let qname: Relation = QueryId::from_select(q, search_path.as_slice()).into();
if is_prepared {
info!(
query = %Sensitive(&q.display(self.parse_dialect)),
Expand Down Expand Up @@ -1063,28 +1067,46 @@ impl NoriaConnector {
error!(%error, "add query failed");
}

return Err(error);
Err(error)
} else {
Ok(qname)
}
} else {
match noria_await!(
// `create_if_not_exist` is false, we check to see if the server has a view for
// the given query
let qname = noria_await!(
self.inner.get_mut()?,
self.inner.get_mut()?.noria.view(qname.clone())
) {
Ok(view) => {
// We should not have an entry, but if we do it's safe to overwrite
// since we got this information from the controller.
self.inner
.get_mut()?
.views
.insert(qname.clone(), view)
.await;
}
Err(e) => {
return Err(e);
}
self.inner
.get_mut()?
.noria
.view_names(vec![view_request.clone()], self.dialect)
)?
.into_iter()
.nth(0)
.unwrap();

if let Some(qname) = qname {
// The server has the view, so we retrieve it and insert it into the view
// name cache
let view = noria_await!(
self.inner.get_mut()?,
self.inner.get_mut()?.noria.view(qname.clone())
)?;

self.inner
.get_mut()?
.views
.insert(qname.clone(), view)
.await;

Ok(qname)
} else {
// The server does not have a view for this query, so we return an error
Err(ReadySetError::ViewNotFoundForQuery {
statement: q.clone(),
})
}
}
Ok(qname)
})
.await
.cloned()
Expand Down Expand Up @@ -1398,7 +1420,7 @@ impl NoriaConnector {
// check if we already have this query prepared
trace!("select::access view");
let qname = self
.get_view_name(
.get_view_name_cached(
&statement,
true,
create_if_not_exist,
Expand Down Expand Up @@ -1481,7 +1503,7 @@ impl NoriaConnector {
processed_query_params,
} => {
let name = self
.get_view_name(&statement, false, create_if_missing, None)
.get_view_name_cached(&statement, false, create_if_missing, None)
.await?;
(
Cow::Owned(name),
Expand Down Expand Up @@ -1555,7 +1577,7 @@ impl NoriaConnector {
is_prepared: bool,
) -> ReadySetResult<()> {
let qname = self
.get_view_name(
.get_view_name_cached(
statement,
is_prepared,
create_if_not_exists,
Expand All @@ -1573,17 +1595,18 @@ impl NoriaConnector {
self.inner.inner.as_ref().map(|i| i.noria.clone())
}

/// Returns true if a view exists for the given query and false otherwise.
pub(crate) async fn get_view_status(
/// Queries the server for the view name if a view exists for the given query and `None`
/// otherwise.
pub(crate) async fn get_view_name(
&mut self,
query: ViewCreateRequest,
) -> ReadySetResult<bool> {
) -> ReadySetResult<Option<Relation>> {
self.inner
.get_mut()?
.noria
.view_statuses(vec![query], self.dialect)
.view_names(vec![query], self.dialect)
.await
.map(|statuses| statuses[0])
.map(|names| names.into_iter().nth(0).unwrap())
}
}

Expand Down
18 changes: 12 additions & 6 deletions readyset-adapter/src/views_synchronizer.rs
@@ -1,9 +1,10 @@
use std::sync::Arc;

use dataflow_expression::Dialect;
use nom_sql::DialectDisplay;
use nom_sql::{DialectDisplay, Relation};
use readyset_client::query::MigrationState;
use readyset_client::ReadySetHandle;
use readyset_client::{ReadySetHandle, ViewCreateRequest};
use readyset_util::shared_cache::LocalCache;
use readyset_util::shutdown::ShutdownReceiver;
use tokio::select;
use tracing::{debug, info, instrument, trace, warn};
Expand All @@ -19,6 +20,8 @@ pub struct ViewsSynchronizer {
poll_interval: std::time::Duration,
/// Dialect to pass to ReadySet to control the expression semantics used for all queries
dialect: Dialect,
/// Global and thread-local cache of view endpoints and prepared statements.
view_name_cache: LocalCache<ViewCreateRequest, Relation>,
}

impl ViewsSynchronizer {
Expand All @@ -27,12 +30,14 @@ impl ViewsSynchronizer {
query_status_cache: &'static QueryStatusCache,
poll_interval: std::time::Duration,
dialect: Dialect,
view_name_cache: LocalCache<ViewCreateRequest, Relation>,
) -> Self {
ViewsSynchronizer {
controller,
query_status_cache,
poll_interval,
dialect,
view_name_cache,
}
}

Expand Down Expand Up @@ -77,18 +82,19 @@ impl ViewsSynchronizer {

match self
.controller
.view_statuses(queries.clone(), self.dialect)
.view_names(queries.clone(), self.dialect)
.await
{
Ok(statuses) => {
for (query, migrated) in queries.into_iter().zip(statuses) {
for (query, name) in queries.into_iter().zip(statuses) {
trace!(
// FIXME(REA-2168): Use correct dialect.
query = %query.statement.display(nom_sql::Dialect::MySQL),
migrated,
name = ?name,
"Loaded query status from controller"
);
if migrated {
if let Some(name) = name {
self.view_name_cache.insert(query.clone(), name).await;
self.query_status_cache
.update_query_migration_state(&query, MigrationState::Successful)
}
Expand Down
2 changes: 2 additions & 0 deletions readyset-client-test-helpers/src/lib.rs
Expand Up @@ -266,12 +266,14 @@ impl TestBuilder {
let rh = handle.clone();
let expr_dialect = Dialect::DEFAULT_POSTGRESQL;
let shutdown_rx = shutdown_tx.subscribe();
let view_name_cache = view_name_cache.clone();
tokio::spawn(async move {
let mut views_synchronizer = ViewsSynchronizer::new(
rh,
query_status_cache,
std::time::Duration::from_secs(1),
expr_dialect,
view_name_cache.new_local(),
);
views_synchronizer.run(shutdown_rx).await
});
Expand Down
10 changes: 5 additions & 5 deletions readyset-client/src/controller.rs
Expand Up @@ -497,13 +497,13 @@ impl ReadySetHandle {
/// For each of the given list of queries, determine whether that query (or a semantically
/// equivalent query) has been created as a `View`.
///
/// To save on data, this returns a list of booleans corresponding to the provided list of
/// query, where each boolean is `true` if the query at the same position in the argument list
/// has been installed as a view.
view_statuses(
/// To save on data, this returns a list of `Option<Relation>` corresponding to the provided
/// list of queries, where each option is `Some(<query name>)` if the query at the same
/// position in the argument list has been installed as a view.
view_names(
queries: Vec<ViewCreateRequest>,
dialect: dataflow_expression::Dialect,
) -> Vec<bool>
) -> Vec<Option<Relation>>
);

/// Obtain a `View` that allows you to query the given external view.
Expand Down
6 changes: 3 additions & 3 deletions readyset-client/src/metrics/mod.rs
Expand Up @@ -325,9 +325,9 @@ pub mod recorded {
/// | path | The http path associated with the rpc request. |
pub const CONTROLLER_RPC_REQUEST_TIME: &str = "readyset_controller.rpc_request_time";

/// Gauge: The number of queries sent to the `/view_statuses` controller RPC.
pub const CONTROLLER_RPC_VIEW_STATUSES_NUM_QUERIES: &str =
"readyset_controller.rpc_view_statuses_num_queries";
/// Gauge: The number of queries sent to the `/view_names` controller RPC.
pub const CONTROLLER_RPC_VIEW_NAMES_NUM_QUERIES: &str =
"readyset_controller.rpc_view_names_num_queries";

/// Histgoram: Write propagation time from binlog to reader node. For each
/// input packet, this is recorded for each reader node that the packet
Expand Down
6 changes: 5 additions & 1 deletion readyset-errors/src/lib.rs
Expand Up @@ -7,7 +7,7 @@ use std::error::Error;
use std::io;

use derive_more::Display;
use nom_sql::Relation;
use nom_sql::{DialectDisplay, Relation, SelectStatement};
use petgraph::graph::NodeIndex;
use readyset_util::redacted::Sensitive;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -287,6 +287,10 @@ pub enum ReadySetError {
#[error("Could not find view {0}")]
ViewNotFound(String),

/// A view couldn't be found for the given query.
#[error("Could not find view for query {}", Sensitive(&statement.display(nom_sql::Dialect::MySQL)))]
ViewNotFoundForQuery { statement: SelectStatement },

/// A view couldn't be found in the given pool of worker.
#[error("Could not find view '{name}' in workers '{workers:?}'")]
ViewNotFoundInWorkers {
Expand Down

0 comments on commit a36d8da

Please sign in to comment.