Skip to content

Commit

Permalink
Adapter: Support SHOW READYSET MIGRATION STATUS
Browse files Browse the repository at this point in the history
Adds a SQL extension to query the status of a migration initiated by
CREATE CACHE CONCURRENTLY.

Release-Note-Core: Adds a `SHOW READYSET MIGRATION STATUS <id>` SQL
  extension to query the status of a migration initiated via `CREATE
  CACHE CONCURRENTLY`. This command will return the status of the
  migration, which is either "Completed", "Pending", or "Failed with
  error: <error>". Executing this statement with an id that has already
  returned a non-pending status (Completed or Failed) will lead to
  undefined behavior.
Change-Id: I28357581ce2c4785b9a1ec1148217d02910184cb
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5759
Tested-by: Buildkite CI
Reviewed-by: Aspen Smith <aspen@readyset.io>
  • Loading branch information
Dan Wilbanks committed Aug 22, 2023
1 parent 81340d8 commit 1ce1282
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 5 deletions.
24 changes: 24 additions & 0 deletions nom-sql/src/show.rs
Expand Up @@ -21,6 +21,7 @@ pub enum ShowStatement {
CachedQueries(Option<QueryID>),
ProxiedQueries(Option<QueryID>),
ReadySetStatus,
ReadySetMigrationStatus(u64),
ReadySetVersion,
ReadySetTables,
}
Expand All @@ -47,6 +48,7 @@ impl ShowStatement {
}
}
Self::ReadySetStatus => write!(f, "READYSET STATUS"),
Self::ReadySetMigrationStatus(id) => write!(f, "READYSET MIGRATION STATUS {}", id),
Self::ReadySetVersion => write!(f, "READYSET VERSION"),
Self::ReadySetTables => write!(f, "READYSET TABLES"),
}
Expand Down Expand Up @@ -90,13 +92,26 @@ fn proxied_queries(
}
}

/// Parses READYSET MIGRATION STATUS <u64_id>
pub fn readyset_migration_status(i: LocatedSpan<&[u8]>) -> NomSqlResult<&[u8], ShowStatement> {
let (i, _) = tag_no_case("readyset")(i)?;
let (i, _) = whitespace1(i)?;
let (i, _) = tag_no_case("migration")(i)?;
let (i, _) = whitespace1(i)?;
let (i, _) = tag_no_case("status")(i)?;
let (i, _) = whitespace1(i)?;
let (i, id) = nom::character::complete::u64(i)?;
Ok((i, ShowStatement::ReadySetMigrationStatus(id)))
}

pub fn show(dialect: Dialect) -> impl Fn(LocatedSpan<&[u8]>) -> NomSqlResult<&[u8], ShowStatement> {
move |i| {
let (i, _) = tag_no_case("show")(i)?;
let (i, _) = whitespace1(i)?;
let (i, statement) = alt((
cached_queries(dialect),
proxied_queries(dialect),
readyset_migration_status,
value(
ShowStatement::ReadySetStatus,
tuple((tag_no_case("readyset"), whitespace1, tag_no_case("status"))),
Expand Down Expand Up @@ -373,4 +388,13 @@ mod tests {
let res = test_parse!(show(Dialect::MySQL), b"SHOW READYSET TABLES");
assert_eq!(res, ShowStatement::ReadySetTables);
}

#[test]
fn show_readyset_migration_status() {
let res = test_parse!(
show(Dialect::MySQL),
b"SHOW\t READYSET\t MIGRATION\t STATUS\t 123456"
);
assert_eq!(res, ShowStatement::ReadySetMigrationStatus(123456))
}
}
3 changes: 3 additions & 0 deletions readyset-adapter/src/backend.rs
Expand Up @@ -1903,6 +1903,9 @@ where
SqlQuery::Show(ShowStatement::ReadySetStatus) => {
self.noria.readyset_status(&self.authority).await
}
SqlQuery::Show(ShowStatement::ReadySetMigrationStatus(id)) => {
self.noria.migration_status(*id).await
}
SqlQuery::Show(ShowStatement::ReadySetVersion) => readyset_version(),
SqlQuery::Show(ShowStatement::ReadySetTables) => self.noria.table_statuses().await,
SqlQuery::Show(ShowStatement::ProxiedQueries(q_id)) => {
Expand Down
19 changes: 19 additions & 0 deletions readyset-adapter/src/backend/noria_connector.rs
Expand Up @@ -984,6 +984,25 @@ impl NoriaConnector {
))
}

/// Query the status of a pending migration identified by the given `migration_id`. Once the
/// function returns a result (completed or an error), calling again with the same id will lead
/// to undefined behavior.
pub(crate) async fn migration_status(
&mut self,
id: u64,
) -> ReadySetResult<QueryResult<'static>> {
let status = noria_await!(
self.inner.get_mut()?,
self.inner.get_mut()?.noria.migration_status(id)
)?
.to_string();
Ok(QueryResult::Meta(vec![(
"Migration Status".to_string(),
status,
)
.into()]))
}

pub(crate) async fn table_statuses(&mut self) -> ReadySetResult<QueryResult<'static>> {
let statuses = noria_await!(
self.inner.get_mut()?,
Expand Down
8 changes: 8 additions & 0 deletions readyset-client/src/controller.rs
Expand Up @@ -701,6 +701,14 @@ impl ReadySetHandle {
}
}

/// Query the status of a pending migration identified by the given `migration_id`.
pub fn migration_status(
&mut self,
migration_id: u64,
) -> impl Future<Output = ReadySetResult<MigrationStatus>> + '_ {
self.rpc::<_, MigrationStatus>("migration_status", migration_id, self.migration_timeout)
}

/// Asynchronous version of extend_recipe(). The Controller should immediately return an ID that
/// can be used to query the migration status.
pub fn extend_recipe_async(
Expand Down
16 changes: 15 additions & 1 deletion readyset-client/src/recipe/mod.rs
Expand Up @@ -3,7 +3,9 @@
pub mod changelist;

use std::borrow::Cow;
use std::fmt::Display;

use readyset_errors::ReadySetError;
use serde::{Deserialize, Serialize};

pub use crate::recipe::changelist::ChangeList;
Expand Down Expand Up @@ -59,14 +61,26 @@ pub enum ExtendRecipeResult {
}

/// The status of an actively running migration
#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum MigrationStatus {
/// The migration has completed
Done,
/// The migration has completed with an error
Failed(ReadySetError),
/// The migration has not yet completed
Pending,
}

impl Display for MigrationStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MigrationStatus::Done => f.write_str("Completed"),
MigrationStatus::Failed(e) => write!(f, "Failed with error {e}"),
MigrationStatus::Pending => f.write_str("Pending"),
}
}
}

impl MigrationStatus {
/// Returns `true` if the migration status is [`Pending`].
///
Expand Down
57 changes: 57 additions & 0 deletions readyset-psql/tests/integration.rs
Expand Up @@ -1696,4 +1696,61 @@ mod multiple_create_and_drop {

shutdown_tx.shutdown().await;
}

#[tokio::test(flavor = "multi_thread")]
async fn create_cache_concurrently() {
use tokio_postgres::{SimpleQueryMessage, SimpleQueryRow};
fn extract_single_value(mut rows: Vec<SimpleQueryMessage>) -> SimpleQueryRow {
match rows.swap_remove(0) {
SimpleQueryMessage::Row(r) => r,
_ => panic!(),
}
}

readyset_tracing::init_test_logging();
let (opts, _handle, shutdown_tx) = setup().await;
let conn = connect(opts).await;

conn.simple_query("CREATE TABLE t (a int)").await.unwrap();
let valid_cache = extract_single_value(
conn.simple_query("CREATE CACHE CONCURRENTLY FROM SELECT a FROM t")
.await
.unwrap(),
);

let invalid_cache = extract_single_value(
conn.simple_query("CREATE CACHE CONCURRENTLY FROM SELECT b FROM t")
.await
.unwrap(),
);

eventually!(
run_test: {
extract_single_value(
conn.simple_query(&format!(
"SHOW READYSET MIGRATION STATUS {}",
valid_cache.get(0).unwrap()
))
.await
.unwrap()
)
},
then_assert: |completed| assert_eq!(completed.get(0).unwrap(), "Completed")
);
eventually!(
run_test: {
extract_single_value(
conn.simple_query(&format!(
"SHOW READYSET MIGRATION STATUS {}",
invalid_cache.get(0).unwrap()
))
.await
.unwrap(),
)
},
then_assert: |failed| assert_eq!(&failed.get(0).unwrap()[..6], "Failed")
);

shutdown_tx.shutdown().await;
}
}
10 changes: 6 additions & 4 deletions readyset-server/src/controller/inner.rs
Expand Up @@ -550,17 +550,19 @@ impl Leader {
.ok_or_else(|| ReadySetError::UnknownMigration(migration_id))?;

match (&mut migration).now_or_never() {
None => ReadySetResult::Ok(MigrationStatus::Pending),
None => MigrationStatus::Pending,
Some(res) => {
// Migration is done, remove it from the map
// Note that this means that only one thread can poll on the status of a
// particular migration!
running_migrations.remove(migration_key);
res.map_err(|e| internal_err!("{e}"))??;
Ok(MigrationStatus::Done)
match res.map_err(|e| internal_err!("{e}"))? {
Ok(_) => MigrationStatus::Done,
Err(e) => MigrationStatus::Failed(e),
}
}
}
}?;
};
return_serialized!(ret)
}
(&Method::POST, "/remove_query") => {
Expand Down
1 change: 1 addition & 0 deletions readyset-sql-passes/src/anonymize.rs
Expand Up @@ -199,6 +199,7 @@ impl<'ast> VisitorMut<'ast> for AnonymizeVisitor<'_> {
| nom_sql::ShowStatement::CachedQueries(..)
| nom_sql::ShowStatement::ProxiedQueries(..)
| nom_sql::ShowStatement::ReadySetStatus
| nom_sql::ShowStatement::ReadySetMigrationStatus(..)
| nom_sql::ShowStatement::ReadySetVersion
| nom_sql::ShowStatement::ReadySetTables => {}
}
Expand Down

0 comments on commit 1ce1282

Please sign in to comment.