Skip to content

Commit

Permalink
Handle extend_recipe failure for cache restoration
Browse files Browse the repository at this point in the history
This commit handles and adds tests for the cases where `create cache` or
`drop cache` may end up failing. We remove the ddl request entry if we
encounter those failure scenarios.

Release-Note-Core: ReadySet will now automatically restore caches even
  if there is a backwards-incompatible upgrade that invalidates the
  persisted dataflow graph.

Change-Id: I3fa0d262911cb148bd9f5421b278a3d1e73b60d7
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/6326
Tested-by: Buildkite CI
Reviewed-by: Jason Brown <jason.b@readyset.io>
  • Loading branch information
lukoktonos committed Nov 9, 2023
1 parent f73108e commit 3679fb7
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 17 deletions.
70 changes: 62 additions & 8 deletions readyset-adapter/src/backend.rs
Expand Up @@ -112,7 +112,7 @@ use crate::query_handler::SetBehavior;
use crate::query_status_cache::QueryStatusCache;
use crate::rewrite::ProcessedQueryParams;
pub use crate::upstream_database::UpstreamPrepare;
use crate::utils::create_dummy_column;
use crate::utils::{create_dummy_column, retry_with_exponential_backoff};
use crate::{create_dummy_schema, rewrite, QueryHandler, UpstreamDatabase, UpstreamDestination};

pub mod noria_connector;
Expand Down Expand Up @@ -2047,18 +2047,47 @@ where
trace!("No telemetry sender. not sending metric for CREATE CACHE");
}

if let Some(unparsed_create_cache_statement) = unparsed_create_cache_statement {
let ddl_req = if let Some(unparsed_create_cache_statement) =
unparsed_create_cache_statement
{
let ddl_req = CacheDDLRequest {
unparsed_stmt: unparsed_create_cache_statement.clone(),
schema_search_path: self.noria.schema_search_path().to_owned(),
dialect: self.settings.dialect.into(),
};

self.authority.add_cache_ddl_request(ddl_req).await?;
}
self.authority
.add_cache_ddl_request(ddl_req.clone())
.await?;
Some(ddl_req)
} else {
None
};

self.create_cached_query(name.as_ref(), stmt, search_path, *always, *concurrently)
.await
let res = self
.create_cached_query(name.as_ref(), stmt, search_path, *always, *concurrently)
.await;
// The extend_recipe may have failed, in which case we should remove our intention
// to create this cache. Extend recipe waits a bit and then returns an
// Ok(ExtendRecipeResult::Pending) if it is still creating a cache in the
// background, so we don't remove the ddl request for timeouts.
if res.is_err() {
if let Some(ddl_req) = ddl_req {
let remove_res = retry_with_exponential_backoff(
async || {
let ddl_req = ddl_req.clone();
self.authority.remove_cache_ddl_request(ddl_req).await
},
5,
Duration::from_millis(1),
)
.await;
if remove_res.is_err() {
error!("Failed to remove stored 'create cache' request. It will be re-run if there is a backwards incompatible upgrade.");
}
}
}
res
}
SqlQuery::DropCache(drop_cache) => {
let ddl_req = CacheDDLRequest {
Expand All @@ -2068,9 +2097,34 @@ where
schema_search_path: vec![],
dialect: self.settings.dialect.into(),
};
self.authority.add_cache_ddl_request(ddl_req).await?;
self.authority
.add_cache_ddl_request(ddl_req.clone())
.await?;
let DropCacheStatement { name } = drop_cache;
self.drop_cached_query(name).await
let res = self.drop_cached_query(name).await;
// `drop_cached_query` may return an Err, but if the cache fails to be dropped for
// certain reasons, we can also see an Ok(Delete) here with num_rows_deleted set to
// 0.
if res.is_err()
|| matches!(
res,
Ok(noria_connector::QueryResult::Delete { num_rows_deleted }) if num_rows_deleted < 1
)
{
let remove_res = retry_with_exponential_backoff(
async || {
let ddl_req = ddl_req.clone();
self.authority.remove_cache_ddl_request(ddl_req).await
},
5,
Duration::from_millis(1),
)
.await;
if remove_res.is_err() {
error!("Failed to remove stored 'drop cache' request. It will be re-run if there is a backwards incompatible upgrade");
}
}
res
}
SqlQuery::DropAllCaches(_) => self.drop_all_caches().await,
SqlQuery::Show(ShowStatement::CachedQueries(query_id)) => {
Expand Down
1 change: 0 additions & 1 deletion readyset-adapter/src/utils.rs
Expand Up @@ -648,7 +648,6 @@ use tokio::time::{sleep, Duration};
///
/// This function returns a `Result` indicating whether the operation was successful (`Ok(T)`)
/// or not (`Err(E)`) after the maximum number of retries has been reached.
#[allow(dead_code)]
pub(crate) async fn retry_with_exponential_backoff<F, Fut, T, E>(
mut operation: F,
max_retries: usize,
Expand Down
2 changes: 2 additions & 0 deletions readyset-client/src/failpoints/mod.rs
Expand Up @@ -27,3 +27,5 @@ pub const POSTGRES_NEXT_WAL_EVENT: &str = "postgres-next-wal-event";
pub const START_INNER_POSTGRES: &str = "start-inner-postgres";
/// Imitate a backwards incompatible deserialization from controller state
pub const LOAD_CONTROLLER_STATE: &str = "load-controller-state";
/// Injects a failpoint at the beginning of DfState::extend_recipe
pub const EXTEND_RECIPE: &str = "extend-recipe";
96 changes: 88 additions & 8 deletions readyset-psql/tests/fallback.rs
Expand Up @@ -873,7 +873,7 @@ async fn insert_enum_value_appended_after_create_table() {
// this doesn't have to be the same table we later insert into)
// - Specifically insert the enum value that was added in the ALTER TYPE statement
// - Insert using a parameter, not a hardcoded query (hence the use of `query_raw` here)
// This turned out to be caused by an interation with a client library that cached types from
// This turned out to be caused by an integration with a client library that cached types from
// upstream queries and didn't update the cached definitions after the type was altered.
let params: Vec<DfValue> = vec!["b".into()];
client
Expand Down Expand Up @@ -2067,7 +2067,11 @@ async fn recreate_replication_slot() {
mod failure_injection_tests {
// TODO: move the above cfg failure_injection tests into this mod

use std::sync::Arc;

use readyset_client::consensus::{Authority, AuthorityControl, CacheDDLRequest};
use readyset_client::failpoints;
use readyset_data::Dialect;
use readyset_errors::ReadySetError;
use tracing::debug;

Expand All @@ -2081,7 +2085,12 @@ mod failure_injection_tests {
async fn setup_reload_controller_state_test(
prefix: &str,
queries: &[&str],
) -> (tokio_postgres::Config, Handle, ShutdownSender) {
) -> (
tokio_postgres::Config,
Handle,
Arc<Authority>,
ShutdownSender,
) {
readyset_tracing::init_test_logging();

let (config, handle, authority, shutdown_tx) =
Expand All @@ -2090,7 +2099,7 @@ mod failure_injection_tests {
let conn = connect(config).await;
for query in queries {
debug!(%query, "Running Query");
let _res = conn.simple_query(query).await.expect("query failed");
let _res = conn.simple_query(query).await;
// give it some time to propagate
sleep().await;
}
Expand All @@ -2110,10 +2119,12 @@ mod failure_injection_tests {
// Stop the server and start a new one
shutdown_tx.shutdown().await;
drop(handle);
sleep().await;

let (config, handle, _authority, shutdown_tx) =
let (config, handle, authority, shutdown_tx) =
setup_standalone_with_authority(prefix, Some(authority), true, false).await;
(config, handle, shutdown_tx)
sleep().await;
(config, handle, authority, shutdown_tx)
}

#[tokio::test(flavor = "multi_thread")]
Expand All @@ -2123,7 +2134,7 @@ mod failure_injection_tests {
"CREATE TABLE users (id INT PRIMARY KEY, name TEXT);",
"CREATE CACHE test_query FROM SELECT * FROM users;",
];
let (_config, mut handle, shutdown_tx) =
let (_config, mut handle, _authority, shutdown_tx) =
setup_reload_controller_state_test("caches_recreated", &queries).await;

let queries = handle.views().await.unwrap();
Expand All @@ -2141,7 +2152,7 @@ mod failure_injection_tests {
"CREATE CACHE cached_query FROM SELECT * FROM users where id = 1;",
"DROP CACHE dropped_query",
];
let (_config, mut handle, shutdown_tx) =
let (_config, mut handle, _authority, shutdown_tx) =
setup_reload_controller_state_test("caches_not_recreated", &queries).await;

let queries = handle.views().await.unwrap();
Expand All @@ -2162,10 +2173,79 @@ mod failure_injection_tests {
"CREATE CACHE cached_query FROM SELECT * FROM users",
];

let (_config, mut handle, _shutdown_tx) =
let (_config, mut handle, _authority, _shutdown_tx) =
setup_reload_controller_state_test("caches_dropped_then_recreated", &queries).await;
let queries = handle.views().await.unwrap();
assert!(!queries.contains_key(&"dropped_query".into()));
assert!(queries.contains_key(&"cached_query".into()));
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn caches_added_if_extend_recipe_times_out() {
let queries = [
"CREATE TABLE users (id INT PRIMARY KEY, name TEXT);",
"CREATE CACHE test_query FROM SELECT * FROM users;",
];

// This is set to be larger than EXTEND_RECIPE_MAX_SYNC_TIME, which is 5 seconds
// The `create cache` is the 3rd extend_recipe run
fail::cfg(failpoints::EXTEND_RECIPE, "2*off->1*sleep(6000)").expect("failed at failing");

let (_config, mut handle, authority, shutdown_tx) =
setup_reload_controller_state_test("extend_recipe_timeout", &queries).await;

let cache_ddl_requests = authority.cache_ddl_requests().await.unwrap();
let expected = CacheDDLRequest {
unparsed_stmt: "CREATE CACHE test_query FROM SELECT * FROM users;".to_string(),
schema_search_path: vec!["postgres".into(), "public".into()],
dialect: Dialect::DEFAULT_POSTGRESQL,
};
assert_eq!(expected, *cache_ddl_requests.get(0).unwrap());

let queries = handle.views().await.unwrap();
assert!(queries.contains_key(&"test_query".into()));

shutdown_tx.shutdown().await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn create_cache_not_added_if_extend_recipe_fails() {
let queries = [
"CREATE TABLE users (id INT PRIMARY KEY, name TEXT);",
"CREATE CACHE test_query FROM SELECT * FROM idontexist;",
];

let (_config, mut handle, authority, shutdown_tx) =
setup_reload_controller_state_test("extend_recipe_create_fail", &queries).await;

let cache_ddl_requests = authority.cache_ddl_requests().await.unwrap();
assert!(cache_ddl_requests.is_empty());

let queries = handle.views().await.unwrap();
assert!(queries.is_empty());

shutdown_tx.shutdown().await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn drop_cache_not_added_if_drop_fails() {
let queries = [
"CREATE TABLE users (id INT PRIMARY KEY, name TEXT);",
"DROP CACHE idontexist;",
];

let (_config, mut handle, authority, shutdown_tx) =
setup_reload_controller_state_test("extend_recipe_drop_fail", &queries).await;

let cache_ddl_requests = authority.cache_ddl_requests().await.unwrap();
assert!(cache_ddl_requests.is_empty());

let queries = handle.views().await.unwrap();
assert!(queries.is_empty());

shutdown_tx.shutdown().await;
}
}
4 changes: 4 additions & 0 deletions readyset-server/src/controller/state.rs
Expand Up @@ -27,6 +27,7 @@ use dataflow::{
BaseTableState, DomainBuilder, DomainConfig, DomainRequest, NodeMap, Packet,
PersistenceParameters, Sharding,
};
use failpoint_macros::set_failpoint;
use futures::stream::{self, FuturesUnordered, StreamExt, TryStreamExt};
use futures::{FutureExt, TryFutureExt, TryStream};
use metrics::{gauge, histogram};
Expand All @@ -40,6 +41,8 @@ use readyset_client::builders::{
use readyset_client::consensus::{Authority, AuthorityControl};
use readyset_client::debug::info::{GraphInfo, MaterializationInfo, NodeSize};
use readyset_client::debug::stats::{DomainStats, GraphStats, NodeStats};
#[cfg(feature = "failure_injection")]
use readyset_client::failpoints;
use readyset_client::internal::{MaterializationStatus, ReplicaAddress};
use readyset_client::metrics::recorded;
use readyset_client::recipe::changelist::{Change, ChangeList};
Expand Down Expand Up @@ -1559,6 +1562,7 @@ impl DfState {
recipe_spec: ExtendRecipeSpec<'_>,
dry_run: bool,
) -> Result<(), ReadySetError> {
set_failpoint!(failpoints::EXTEND_RECIPE);
// Drop recipes from the replicator that we have already processed.
if let (Some(new), Some(current)) = (
&recipe_spec.replication_offset,
Expand Down

0 comments on commit 3679fb7

Please sign in to comment.