diff --git a/readyset-adapter/src/migration_handler.rs b/readyset-adapter/src/migration_handler.rs index fffb041bc9..6e02b7c9fe 100644 --- a/readyset-adapter/src/migration_handler.rs +++ b/readyset-adapter/src/migration_handler.rs @@ -344,8 +344,18 @@ impl MigrationHandler { match controller.dry_run(changelist).await { Ok(_) => { self.start_time.remove(view_request); + + // It's possible that the ViewsSynchronizer found an existing view for this query + // on the server while we were performing a dry run, in which case it would have + // updated the query's status to "successful". In this situation, we don't want to + // overwrite the "successful" status, so we only write the new "dry run succeeded" + // status if the query's status is still "pending" self.query_status_cache - .update_query_migration_state(view_request, MigrationState::DryRunSucceeded); + .with_mut_migration_state(view_request, |status| { + if status.is_pending() { + *status = MigrationState::DryRunSucceeded; + } + }); } Err(e) if e.caused_by_unsupported() => { self.start_time.remove(view_request); diff --git a/readyset-adapter/src/query_status_cache.rs b/readyset-adapter/src/query_status_cache.rs index cfe01b26c2..e8c2107ef9 100644 --- a/readyset-adapter/src/query_status_cache.rs +++ b/readyset-adapter/src/query_status_cache.rs @@ -560,6 +560,24 @@ impl QueryStatusCache { } } + /// Yields to the given function `f` a mutable reference to the migration state of the query + /// `q`. The primary purpose of this method is allow for atomic reads and writes of the + /// migration state of a query. + pub fn with_mut_migration_state(&self, q: &Q, f: F) -> bool + where + Q: QueryStatusKey, + F: Fn(&mut MigrationState), + { + q.with_mut_status(self, |maybe_query_status| { + if let Some(query_status) = maybe_query_status { + f(&mut query_status.migration_state); + true + } else { + false + } + }) + } + /// Marks a query as dropped by the user. /// /// NOTE: this should only be called after we successfully remove a View for this query. This is @@ -785,6 +803,22 @@ impl QueryStatusCache { .into() } + /// Returns a list of queries whose migration states match `states`. + pub fn queries_with_statuses(&self, states: &[MigrationState]) -> QueryList { + let statuses = self.persistent_handle.statuses.read(); + statuses + .iter() + .filter_map(|(_query_id, (query, status))| { + if states.contains(&status.migration_state) { + Some((query.clone(), status.clone())) + } else { + None + } + }) + .collect::>() + .into() + } + /// Returns a list of queries that have a state of [`QueryState::Successful`]. pub fn allow_list(&self) -> Vec<(QueryId, Arc, QueryStatus)> { self.persistent_handle.allow_list() diff --git a/readyset-adapter/src/views_synchronizer.rs b/readyset-adapter/src/views_synchronizer.rs index 72ae3e0ad1..af15e99ddc 100644 --- a/readyset-adapter/src/views_synchronizer.rs +++ b/readyset-adapter/src/views_synchronizer.rs @@ -1,8 +1,9 @@ +use std::collections::HashSet; use std::sync::Arc; use dataflow_expression::Dialect; use nom_sql::{DialectDisplay, Relation}; -use readyset_client::query::MigrationState; +use readyset_client::query::{MigrationState, Query}; use readyset_client::{ReadySetHandle, ViewCreateRequest}; use readyset_util::shared_cache::LocalCache; use readyset_util::shutdown::ShutdownReceiver; @@ -22,6 +23,7 @@ pub struct ViewsSynchronizer { dialect: Dialect, /// Global and thread-local cache of view endpoints and prepared statements. view_name_cache: LocalCache, + views_checked: HashSet, } impl ViewsSynchronizer { @@ -38,6 +40,7 @@ impl ViewsSynchronizer { poll_interval, dialect, view_name_cache, + views_checked: HashSet::new(), } } @@ -71,8 +74,16 @@ impl ViewsSynchronizer { debug!("Views synchronizer polling"); let queries = self .query_status_cache - .pending_migration() + .queries_with_statuses(&[MigrationState::DryRunSucceeded, MigrationState::Pending]) .into_iter() + .filter(|(q, _)| { + if self.views_checked.contains(q) { + false + } else { + self.views_checked.insert(q.clone()); + true + } + }) .filter_map(|(q, _)| { q.into_parsed() // once arc_unwrap_or_clone is stabilized, we can use that cleaner syntax diff --git a/readyset-clustertest/src/readyset_mysql.rs b/readyset-clustertest/src/readyset_mysql.rs index 9406afda8e..e7814681bd 100644 --- a/readyset-clustertest/src/readyset_mysql.rs +++ b/readyset-clustertest/src/readyset_mysql.rs @@ -1599,7 +1599,7 @@ async fn views_synchronize_between_deployments() { // Eventually it should show up in adapter 1 too eventually! { - adapter_1.as_mysql_conn().unwrap().query_drop("SELECT * FROM t1;"); + adapter_1.as_mysql_conn().unwrap().query_drop("SELECT * FROM t1;").await.unwrap(); last_statement_destination(adapter_1.as_mysql_conn().unwrap()).await == QueryDestination::Readyset }