Skip to content

Commit

Permalink
Inlining: Automatically migrate queries requiring inlining
Browse files Browse the repository at this point in the history
When we fail to lookup a view for a query that has been cached but
requires inlining, we now report the miss to the QueryStatusCache, so
that the MigrationHandler can asynchronously create an inlined
migration. The asynchronous migrated is gated behind an
--experimental-placeholder-inlining flag.

The QueryStatusCache stores the unsupported placeholders so that we know
which placeholders require inlining when we attempt to migrate. The
cache also stores an epoch which is used to count how many times the
set of all inlined caches for a query has changed.

When the migration handler receives a new set of parameters to inline
into a query, it creates the inlined cache asynchronously and also
updates the cached Views stored by noria so that we do not have to fetch
these in the query request path.

When an inlined query is executed, we must get the status of the query
from the QueryStatusCache to see if the epoch has advanced. If it has,
then we request a new view from ReadySet, update the cached statement,
and prepare the statement against ReadySet if we have not yet done that.

Release-Note-Core: Added an experimental feature to automatically inline
  placeholders in a query with literal values when the placeholders are
  unsupported, so that the query can be run against ReadySet. This
  feature is enabled with the --experimental-placeholder-inlining flag.
  This feature is experimental as it could degrade the performance of
  the ReadySet instance if too many inlined instances of a query are
  created.  It is recommended that you drop the cache of any query with
  inlined placeholders with high cardinality.
Change-Id: Ia058b8373f8d61f38f8b095a0da4b7bf9786c85c
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/4273
Tested-by: Buildkite CI
Reviewed-by: Fran Noriega <fran@readyset.io>
  • Loading branch information
Dan Wilbanks committed May 2, 2023
1 parent 241f066 commit 4a31acd
Show file tree
Hide file tree
Showing 12 changed files with 899 additions and 86 deletions.
148 changes: 122 additions & 26 deletions readyset-adapter/src/backend.rs
Expand Up @@ -87,7 +87,7 @@ use nom_sql::{
use readyset_client::consistency::Timestamp;
use readyset_client::query::*;
use readyset_client::results::Results;
use readyset_client::{ColumnSchema, ViewCreateRequest};
use readyset_client::{ColumnSchema, PlaceholderIdx, ViewCreateRequest};
pub use readyset_client_metrics::QueryDestination;
use readyset_client_metrics::{recorded, EventType, QueryExecutionEvent, SqlQueryType};
use readyset_data::{DfType, DfValue};
Expand All @@ -99,6 +99,7 @@ use readyset_version::READYSET_VERSION;
use timestamp_service::client::{TimestampClient, WriteId, WriteKey};
use tokio::sync::mpsc::UnboundedSender;
use tracing::{error, instrument, trace, warn};
use vec1::Vec1;

use crate::backend::noria_connector::ExecuteSelectContext;
use crate::query_handler::SetBehavior;
Expand Down Expand Up @@ -265,6 +266,7 @@ pub struct BackendBuilder {
query_max_failure_seconds: u64,
fallback_recovery_seconds: u64,
telemetry_sender: Option<TelemetrySender>,
enable_experimental_placeholder_inlining: bool,
}

impl Default for BackendBuilder {
Expand All @@ -285,6 +287,7 @@ impl Default for BackendBuilder {
query_max_failure_seconds: (i64::MAX / 1000) as u64,
fallback_recovery_seconds: 0,
telemetry_sender: None,
enable_experimental_placeholder_inlining: false,
}
}
}
Expand Down Expand Up @@ -333,6 +336,8 @@ impl BackendBuilder {
query_max_failure_duration: Duration::new(self.query_max_failure_seconds, 0),
query_log_ad_hoc_queries: self.query_log_ad_hoc_queries,
fallback_recovery_duration: Duration::new(self.fallback_recovery_seconds, 0),
enable_experimental_placeholder_inlining: self
.enable_experimental_placeholder_inlining,
},
telemetry_sender: self.telemetry_sender,
_query_handler: PhantomData,
Expand Down Expand Up @@ -414,6 +419,14 @@ impl BackendBuilder {
self.telemetry_sender = Some(telemetry_sender);
self
}

pub fn enable_experimental_placeholder_inlining(
mut self,
enable_experimental_placeholder_inlining: bool,
) -> Self {
self.enable_experimental_placeholder_inlining = enable_experimental_placeholder_inlining;
self
}
}

/// A [`CachedPreparedStatement`] stores the data needed for an immediate
Expand Down Expand Up @@ -472,6 +485,13 @@ where
false
}
}

/// Get a reference to the `ViewRequest` or return an error
fn as_view_request(&self) -> ReadySetResult<&ViewCreateRequest> {
self.view_request
.as_ref()
.ok_or_else(|| internal_err!("Expected ViewRequest for CachedPreparedStatement"))
}
}

pub struct Backend<DB, Handler>
Expand Down Expand Up @@ -546,6 +566,9 @@ struct BackendSettings {
/// repeatedly failed for query_max_failure_duration.
fallback_recovery_duration: Duration,
fail_invalidated_queries: bool,
/// Whether to automatically create inlined migrations for queries with unsupported
/// placeholders.
enable_experimental_placeholder_inlining: bool,
}

/// QueryInfo holds information regarding the last query that was sent along this connection
Expand Down Expand Up @@ -890,12 +913,11 @@ where
Some(Err(e)) => {
if e.caused_by_view_not_found() {
warn!(error = %e, "View not found during mirror_prepare()");
self.state.query_status_cache.update_query_migration_state(
self.state.query_status_cache.view_not_found_for_query(
&ViewCreateRequest::new(
select_meta.rewritten.clone(),
self.noria.schema_search_path().to_owned(),
),
MigrationState::Pending,
);
} else if e.caused_by_unsupported() {
self.state.query_status_cache.update_query_migration_state(
Expand Down Expand Up @@ -1286,20 +1308,28 @@ where
}
}

/// Attempts to migrate a query on noria, after it was marked as Successful in the cache. If the
/// migration is successful, the cached entry is marked as such and will attempt to resolve
/// noria first in the future
/// Attempts to migrate a query on noria, after
/// - the query was marked as `MigrationState::Successful` in the cache -or-
/// - the epoch stored in `MigrationState::Inlined` advanced but the query is not yet prepared
/// on noria.
///
/// If the migration is successful, the prepare result is updated with the noria result. If the
/// state was previously `MigrationState::Pending`, it is updated to
/// `MigrationState::Successful`.
///
/// Returns an error if the statement is already prepared on noria.
///
/// # Panics
///
/// If the cached entry is not of kind `PrepareResult::Upstream` or is not in the
/// `MigrationState::Pending` state
/// If the query is not in the `MigrationState::Pending` or `MigrationState::Inlined` state
async fn update_noria_prepare(
noria: &mut NoriaConnector,
cached_entry: &mut CachedPreparedStatement<DB>,
id: u32,
) -> ReadySetResult<()> {
debug_assert!(cached_entry.migration_state.is_pending());
debug_assert!(
cached_entry.migration_state.is_pending() || cached_entry.migration_state.is_inlined()
);

let upstream_prep: UpstreamPrepare<DB> = match &cached_entry.prep {
PrepareResult::Upstream(UpstreamPrepare { statement_id, meta }) => UpstreamPrepare {
Expand Down Expand Up @@ -1334,7 +1364,11 @@ where
// At this point we got a successful noria prepare, so we want to replace the Upstream
// result with a Both result
cached_entry.prep = PrepareResult::Both(noria_prep, upstream_prep);
cached_entry.migration_state = MigrationState::Successful;
// If the query was previously `Pending`, update to `Successful`. If it was inlined, we do
// not update the migration state.
if cached_entry.migration_state == MigrationState::Pending {
cached_entry.migration_state = MigrationState::Successful;
}

Ok(())
}
Expand Down Expand Up @@ -1393,23 +1427,60 @@ where
let noria = &mut self.noria;
let ticket = self.state.ticket.clone();

if cached_statement.migration_state.is_pending() {
// If the query is pending, check the query status cache to see if it is now successful.
//
// If the query is inlined, we have to check the epoch of the current state in the query
// status cache to see if we should prepare the statement again.
if cached_statement.migration_state.is_pending()
|| cached_statement.migration_state.is_inlined()
{
// We got a statement with a pending migration, we want to check if migration is
// finished by now
let new_migration_state = self
.state
.query_status_cache
.query_migration_state(
cached_statement
.view_request
.as_ref()
.expect("Pending must have view_request set"),
)
.query_migration_state(cached_statement.as_view_request()?)
.1;

if new_migration_state == MigrationState::Successful {
// Attempt to prepare on ReadySet
let _ = Self::update_noria_prepare(noria, cached_statement, id).await;
} else if let MigrationState::Inlined(new_state) = new_migration_state {
if let MigrationState::Inlined(ref old_state) = cached_statement.migration_state {
// if the epoch has advanced, then we've made changes to the inlined caches so
// we should refresh the view cache and prepare if necessary.
if new_state.epoch > old_state.epoch {
let view_request = cached_statement.as_view_request()?;
// Request a new view from ReadySet.
let updated_view_cache = noria
.update_view_cache(
&view_request.statement,
Some(view_request.schema_search_path.clone()),
false, // create_if_not_exists
true, // is_prepared
)
.await
.is_ok();
// If we got a new view from ReadySet and we have only prepared against
// upstream, prepare the statement against ReadySet.
//
// Update the migration state if we updated the view_cache and, if
// necessary, the PrepareResult.
if updated_view_cache
&& matches!(cached_statement.prep, PrepareResult::Upstream(_))
{
if Self::update_noria_prepare(noria, cached_statement, id)
.await
.is_ok()
{
cached_statement.migration_state =
MigrationState::Inlined(new_state);
}
} else if updated_view_cache {
cached_statement.migration_state = MigrationState::Inlined(new_state);
}
}
}
}
}

Expand Down Expand Up @@ -1445,6 +1516,12 @@ where
.map_err(Into::into)
}
PrepareResult::Upstream(prep) => {
// No inlined caches for this query exist if we are only prepared on upstream.
if cached_statement.migration_state.is_inlined() {
self.state
.query_status_cache
.inlined_cache_miss(cached_statement.as_view_request()?, params.to_vec())
}
Self::execute_upstream(upstream, prep, params, &mut event, false).await
}
PrepareResult::Both(.., uprep) if should_fallback => {
Expand Down Expand Up @@ -1478,15 +1555,16 @@ where
cached_statement.prep.make_upstream_only();
} else if e.caused_by_unsupported() {
// On an unsupported execute we update the query migration state to be unsupported.
//
// Must exist or we would not have executed the query against ReadySet.
#[allow(clippy::unwrap_used)]
self.state.query_status_cache.update_query_migration_state(
cached_statement.view_request.as_ref().unwrap(),
cached_statement.as_view_request()?,
MigrationState::Unsupported,
);
} else if matches!(e, ReadySetError::NoCacheForQuery) {
self.state
.query_status_cache
.inlined_cache_miss(cached_statement.as_view_request()?, params.to_vec())
}
}
};

self.last_query = event.destination.map(|d| QueryInfo {
destination: d,
Expand Down Expand Up @@ -1587,12 +1665,30 @@ where
}
// Now migrate the new query
rewrite::process_query(&mut stmt, self.noria.server_supports_pagination())?;
self.noria
let migration_state = match self
.noria
.handle_create_cached_query(name, &stmt, override_schema_search_path, always)
.await?;
.await
{
Ok(()) => MigrationState::Successful,
// If the query fails because it contains unsupported placeholders, then mark it as an
// inlined query in the query status cache.
Err(e) if let Some(placeholders) = e.unsupported_placeholders_cause() => {
#[allow(clippy::unwrap_used)] // converting from Vec1 back to Vec1
let placeholders = Vec1::try_from(placeholders.into_iter().map(|p| p as PlaceholderIdx).collect::<Vec<_>>()).unwrap();
if self.settings.enable_experimental_placeholder_inlining {
MigrationState::Inlined(InlinedState::from_placeholders(placeholders))
} else {
return Err(e);
}
}
Err(e) => {
return Err(e);
}
};
self.state.query_status_cache.update_query_migration_state(
&ViewCreateRequest::new(stmt.clone(), self.noria.schema_search_path().to_owned()),
MigrationState::Successful,
migration_state,
);
self.state.query_status_cache.always_attempt_readyset(
&ViewCreateRequest::new(stmt.clone(), self.noria.schema_search_path().to_owned()),
Expand Down Expand Up @@ -1680,7 +1776,7 @@ where
.map(|DeniedQuery { id, query, status }| {
let s = match status.migration_state {
MigrationState::DryRunSucceeded | MigrationState::Successful => "yes",
MigrationState::Pending => "pending",
MigrationState::Pending | MigrationState::Inlined(_) => "pending",
MigrationState::Unsupported => "unsupported",
}
.to_string();
Expand Down
29 changes: 27 additions & 2 deletions readyset-adapter/src/backend/noria_connector.rs
Expand Up @@ -1062,7 +1062,7 @@ impl NoriaConnector {
async fn get_view(
&mut self,
q: &nom_sql::SelectStatement,
prepared: bool,
is_prepared: bool,
create_if_not_exist: bool,
override_schema_search_path: Option<Vec<SqlIdentifier>>,
) -> ReadySetResult<Relation> {
Expand All @@ -1075,7 +1075,7 @@ impl NoriaConnector {

// add the query to ReadySet
if create_if_not_exist {
if prepared {
if is_prepared {
info!(
query = %Sensitive(&q.display(self.parse_dialect)),
name = %qname.display_unquoted(),
Expand Down Expand Up @@ -1590,6 +1590,31 @@ impl NoriaConnector {
)?;
Ok(QueryResult::Empty)
}

/// Requests a view for the query from the controller. Invalidates the current entry in the view
/// cache, regardless of whether the view is marked as failed. Optionally creates a new
/// cache for the query.
pub async fn update_view_cache(
&mut self,
statement: &nom_sql::SelectStatement,
override_schema_search_path: Option<Vec<SqlIdentifier>>,
create_if_not_exists: bool,
is_prepared: bool,
) -> ReadySetResult<()> {
let qname = self
.get_view(
statement,
is_prepared,
create_if_not_exists,
override_schema_search_path,
)
.await?;

// Remove the view from failed_views if present, and request the view from the controller.
self.failed_views.remove(&qname);
self.inner.get_mut()?.get_noria_view(&qname, true).await?;
Ok(())
}
}

/// Verifies that there are no placeholder parameters in the given SELECT statement (i.e. ? or $N),
Expand Down

0 comments on commit 4a31acd

Please sign in to comment.