Skip to content

Commit

Permalink
adapter: Fix views synchronizer bug
Browse files Browse the repository at this point in the history
Previously, the views synchronizer only checked the server for views for
queries that were in the "pending" state. This meant that if the
migration handler set a query's state to "dry run succeeded" before the
views synchronizer had a chance to check the server for a view, the query
would be stuck in the "dry run succeeded" state forever, even if a view
for the query did indeed exist already.

This commit fixes the issue by having the views synchronizer check the
server for views for queries in *either* the "pending" or "dry run
succeeded" states. In order to prevent the views synchronizer from
rechecking every query with status "dry run succeeded" over and over
again, a "cache" has been added to the views synchronizer to keep track
of which queries have already been checked.

Release-Note-Core: Fixed a bug where queries that already had caches
  were sometimes stuck in the `SHOW PROXIED QUERIES` view
Change-Id: Ie5faa100158fc80c906d8ad5cb897d8a02a07be9
  • Loading branch information
ethowitz committed May 6, 2024
1 parent d5c5daf commit e60957b
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 4 deletions.
12 changes: 11 additions & 1 deletion readyset-adapter/src/migration_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,18 @@ impl MigrationHandler {
match controller.dry_run(changelist).await {
Ok(_) => {
self.start_time.remove(view_request);

// It's possible that the ViewsSynchronizer found an existing view for this query
// on the server while we were performing a dry run, in which case it would have
// updated the query's status to "successful". In this situation, we don't want to
// overwrite the "successful" status, so we only write the new "dry run succeeded"
// status if the query's status is still "pending"
self.query_status_cache
.update_query_migration_state(view_request, MigrationState::DryRunSucceeded);
.with_mut_migration_state(view_request, |status| {
if status.is_pending() {
*status = MigrationState::DryRunSucceeded;
}
});
}
Err(e) if e.caused_by_unsupported() => {
self.start_time.remove(view_request);
Expand Down
34 changes: 34 additions & 0 deletions readyset-adapter/src/query_status_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,24 @@ impl QueryStatusCache {
}
}

/// Yields to the given function `f` a mutable reference to the migration state of the query
/// `q`. The primary purpose of this method is allow for atomic reads and writes of the
/// migration state of a query.
pub fn with_mut_migration_state<Q, F>(&self, q: &Q, f: F) -> bool
where
Q: QueryStatusKey,
F: Fn(&mut MigrationState),
{
q.with_mut_status(self, |maybe_query_status| {
if let Some(query_status) = maybe_query_status {
f(&mut query_status.migration_state);
true
} else {
false
}
})
}

/// Marks a query as dropped by the user.
///
/// NOTE: this should only be called after we successfully remove a View for this query. This is
Expand Down Expand Up @@ -785,6 +803,22 @@ impl QueryStatusCache {
.into()
}

/// Returns a list of queries whose migration states match `states`.
pub fn queries_with_statuses(&self, states: &[MigrationState]) -> QueryList {
let statuses = self.persistent_handle.statuses.read();
statuses
.iter()
.filter_map(|(_query_id, (query, status))| {
if states.contains(&status.migration_state) {
Some((query.clone(), status.clone()))
} else {
None
}
})
.collect::<Vec<(Query, QueryStatus)>>()
.into()
}

/// Returns a list of queries that have a state of [`QueryState::Successful`].
pub fn allow_list(&self) -> Vec<(QueryId, Arc<ViewCreateRequest>, QueryStatus)> {
self.persistent_handle.allow_list()
Expand Down
15 changes: 13 additions & 2 deletions readyset-adapter/src/views_synchronizer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::collections::HashSet;
use std::sync::Arc;

use dataflow_expression::Dialect;
use nom_sql::{DialectDisplay, Relation};
use readyset_client::query::MigrationState;
use readyset_client::query::{MigrationState, Query};
use readyset_client::{ReadySetHandle, ViewCreateRequest};
use readyset_util::shared_cache::LocalCache;
use readyset_util::shutdown::ShutdownReceiver;
Expand All @@ -22,6 +23,7 @@ pub struct ViewsSynchronizer {
dialect: Dialect,
/// Global and thread-local cache of view endpoints and prepared statements.
view_name_cache: LocalCache<ViewCreateRequest, Relation>,
views_checked: HashSet<Query>,
}

impl ViewsSynchronizer {
Expand All @@ -38,6 +40,7 @@ impl ViewsSynchronizer {
poll_interval,
dialect,
view_name_cache,
views_checked: HashSet::new(),
}
}

Expand Down Expand Up @@ -71,8 +74,16 @@ impl ViewsSynchronizer {
debug!("Views synchronizer polling");
let queries = self
.query_status_cache
.pending_migration()
.queries_with_statuses(&[MigrationState::DryRunSucceeded, MigrationState::Pending])
.into_iter()
.filter(|(q, _)| {
if self.views_checked.contains(q) {
false
} else {
self.views_checked.insert(q.clone());
true
}
})
.filter_map(|(q, _)| {
q.into_parsed()
// once arc_unwrap_or_clone is stabilized, we can use that cleaner syntax
Expand Down
2 changes: 1 addition & 1 deletion readyset-clustertest/src/readyset_mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1599,7 +1599,7 @@ async fn views_synchronize_between_deployments() {

// Eventually it should show up in adapter 1 too
eventually! {
adapter_1.as_mysql_conn().unwrap().query_drop("SELECT * FROM t1;");
adapter_1.as_mysql_conn().unwrap().query_drop("SELECT * FROM t1;").await.unwrap();
last_statement_destination(adapter_1.as_mysql_conn().unwrap()).await == QueryDestination::Readyset
}

Expand Down

0 comments on commit e60957b

Please sign in to comment.