diff --git a/readyset-adapter/src/backend.rs b/readyset-adapter/src/backend.rs index 56709099fa..829dc91798 100644 --- a/readyset-adapter/src/backend.rs +++ b/readyset-adapter/src/backend.rs @@ -2046,7 +2046,7 @@ where } if only_supported { - queries.retain(|q| q.status.migration_state.is_supported()); + queries.retain(|q| q.status.is_supported()); } let select_schema = if let Some(handle) = self.metrics_handle.as_mut() { @@ -2075,15 +2075,7 @@ where let mut data = queries .into_iter() .map(|DeniedQuery { id, query, status }| { - let s = match status.migration_state { - MigrationState::DryRunSucceeded - | MigrationState::Successful - | MigrationState::Dropped => "yes", - MigrationState::Pending | MigrationState::Inlined(_) => "pending", - MigrationState::Unsupported => "unsupported", - } - .to_string(); - + let s = status.supported_state().to_string(); let mut row = vec![ DfValue::from(id.to_string()), DfValue::from(Self::format_query_text( @@ -2506,11 +2498,7 @@ where event: &mut QueryExecutionEvent, processed_query_params: ProcessedQueryParams, ) -> Result, DB::Error> { - let mut status = status.unwrap_or(QueryStatus { - migration_state: MigrationState::Unsupported, - execution_info: None, - always: false, - }); + let mut status = status.unwrap_or(QueryStatus::new(MigrationState::Unsupported)); let original_status = status.clone(); let did_work = if let Some(ref mut i) = status.execution_info { i.reset_if_exceeded_recovery( @@ -2524,9 +2512,9 @@ where // Test several conditions to see if we should proxy let upstream_exists = upstream.is_some(); let proxy_out_of_band = settings.migration_mode != MigrationMode::InRequestPath - && status.migration_state != MigrationState::Successful; + && status.migration_state() != &MigrationState::Successful; let unsupported_or_dropped = matches!( - &status.migration_state, + status.migration_state(), MigrationState::Unsupported | MigrationState::Dropped ); let exceeded_network_failure = status @@ -2568,7 +2556,7 @@ where match noria_res { Ok(noria_ok) => { // We managed to select on ReadySet, good for us - status.migration_state = MigrationState::Successful; + status.set_migration_state(MigrationState::Successful); if let Some(i) = status.execution_info.as_mut() { i.execute_succeeded() } @@ -2591,9 +2579,9 @@ where } if noria_err.caused_by_view_not_found() { - status.migration_state = MigrationState::Pending; + status.set_migration_state(MigrationState::Pending); } else if noria_err.caused_by_unsupported() { - status.migration_state = MigrationState::Unsupported; + status.set_migration_state(MigrationState::Unsupported); }; let always = status.always; diff --git a/readyset-adapter/src/backend/noria_connector.rs b/readyset-adapter/src/backend/noria_connector.rs index d9d5b45def..73875d2278 100644 --- a/readyset-adapter/src/backend/noria_connector.rs +++ b/readyset-adapter/src/backend/noria_connector.rs @@ -1023,6 +1023,7 @@ impl NoriaConnector { create_if_not_exist: bool, override_schema_search_path: Option>, ) -> ReadySetResult { + println!("get_view_name_cached"); let search_path = override_schema_search_path.unwrap_or_else(|| self.schema_search_path().to_vec()); let view_request = ViewCreateRequest::new(q.clone(), search_path.clone()); diff --git a/readyset-adapter/src/proxied_queries_reporter.rs b/readyset-adapter/src/proxied_queries_reporter.rs index 7ecca67739..a4ba949b4f 100644 --- a/readyset-adapter/src/proxied_queries_reporter.rs +++ b/readyset-adapter/src/proxied_queries_reporter.rs @@ -41,16 +41,16 @@ impl ProxiedQueriesReporter { TelemetryEvent::ProxiedQuery, TelemetryBuilder::new() .proxied_query(anon_q) - .migration_status(query.status.migration_state.to_string()) + .migration_status(query.status.migration_state().to_string()) .build(), )) }; - match reported_queries.insert(query.id, query.status.migration_state.clone()) { + match reported_queries.insert(query.id, query.status.migration_state().clone()) { Some(old_migration_state) => { // Check whether we know of a new migration state for this query. Send an event if // so - if old_migration_state != query.status.migration_state { + if &old_migration_state != query.status.migration_state() { build_event() } else { None @@ -105,11 +105,7 @@ mod tests { query: Query::ParseFailed(Arc::new( "this is easier than making a view create request".to_string(), )), - status: QueryStatus { - migration_state: MigrationState::Pending, - execution_info: None, - always: false, - }, + status: QueryStatus::new(MigrationState::Pending), }; proxied_queries_reporter.report_query(&mut init_q).await; let status = { @@ -126,11 +122,7 @@ mod tests { query: Query::ParseFailed(Arc::new( "this is easier than making a view create request".to_string(), )), - status: QueryStatus { - migration_state: MigrationState::Successful, - execution_info: None, - always: false, - }, + status: QueryStatus::new(MigrationState::Successful), }; proxied_queries_reporter.report_query(&mut updated_q).await; let status = { diff --git a/readyset-adapter/src/query_status_cache.rs b/readyset-adapter/src/query_status_cache.rs index cfe01b26c2..f5716cbcb7 100644 --- a/readyset-adapter/src/query_status_cache.rs +++ b/readyset-adapter/src/query_status_cache.rs @@ -310,7 +310,7 @@ impl QueryStatusCache { { let q = q.into(); let status = QueryStatus::default_for_query(&q); - let migration_state = status.migration_state.clone(); + let migration_state = status.migration_state().clone(); let id = self.insert_with_status(q, status); (id, migration_state) } @@ -326,9 +326,9 @@ impl QueryStatusCache { Query::Parsed { .. } => status, Query::ParseFailed(_) => { let mut status = status; - if status.migration_state != MigrationState::Unsupported { + if status.migration_state() != &MigrationState::Unsupported { error!("Cannot set migration state to anything other than Unsupported for a Query::ParseFailed"); - status.migration_state = MigrationState::Unsupported + status.set_migration_state(MigrationState::Unsupported); } status } @@ -355,7 +355,7 @@ impl QueryStatusCache { let query_state = self.id_to_status.get(&id); match query_state { - Some(s) => (id, s.value().migration_state.clone()), + Some(s) => (id, s.value().migration_state().clone()), None => self.insert(q.clone()), } } @@ -370,7 +370,7 @@ impl QueryStatusCache { let id = q.query_id(); let query_state = self.id_to_status.get(&id); - (id, query_state.map(|s| s.value().migration_state.clone())) + (id, query_state.map(|s| s.value().migration_state().clone())) } /// This function returns the query status of a query. If the query does not exist @@ -491,10 +491,10 @@ impl QueryStatusCache { // // `Inlined` queries may only be changed from `Inlined` to `Unsupported`. if !matches!( - s.migration_state, + s.migration_state(), MigrationState::Unsupported | MigrationState::Inlined(_) ) { - s.migration_state = MigrationState::Pending + s.set_migration_state(MigrationState::Pending); } false } @@ -504,14 +504,7 @@ impl QueryStatusCache { }); if should_insert { - self.insert_with_status( - q.clone(), - QueryStatus { - migration_state: MigrationState::Pending, - execution_info: None, - always: false, - }, - ); + self.insert_with_status(q.clone(), QueryStatus::new(MigrationState::Pending)); } } @@ -526,37 +519,15 @@ impl QueryStatusCache { // Dropped should not be set manually debug_assert!(!matches!(m, MigrationState::Dropped)); - let should_insert = q.with_mut_status(self, |s| { - match s { - Some(s) => { - match s.migration_state { - // We do not support transitions from the `Unsupported` state, as we assume - // any `Unsupported` query will remain `Unsupported` for the duration of - // this process. - MigrationState::Unsupported => {} - // A query with an Inlined state can only transition to Unsupported. - MigrationState::Inlined(_) => { - if matches!(m, MigrationState::Unsupported) { - s.migration_state = MigrationState::Unsupported; - } - } - // All other state transitions are allowed. - _ => s.migration_state = m.clone(), - } - false - } - None => true, + let should_insert = q.with_mut_status(self, |s| match s { + Some(s) => { + s.set_migration_state(m.clone()); + false } + None => true, }); if should_insert { - self.insert_with_status( - q.clone(), - QueryStatus { - migration_state: m, - execution_info: None, - always: false, - }, - ); + self.insert_with_status(q.clone(), QueryStatus::new(m)); } } @@ -570,20 +541,13 @@ impl QueryStatusCache { { let should_insert = q.with_mut_status(self, |s| match s { Some(s) => { - s.migration_state = MigrationState::Dropped; + s.set_migration_state(MigrationState::Dropped); false } None => true, }); if should_insert { - self.insert_with_status( - q.clone(), - QueryStatus { - migration_state: MigrationState::Dropped, - execution_info: None, - always: false, - }, - ); + self.insert_with_status(q.clone(), QueryStatus::new(MigrationState::Dropped)); } } @@ -592,20 +556,13 @@ impl QueryStatusCache { pub fn unsupported_inlined_migration(&self, q: &ViewCreateRequest) { let should_insert = q.with_mut_status(self, |s| match s { Some(s) => { - s.migration_state = MigrationState::Unsupported; + s.set_migration_state(MigrationState::Unsupported); false } None => true, }); if should_insert { - self.insert_with_status( - q.clone(), - QueryStatus { - migration_state: MigrationState::Unsupported, - execution_info: None, - always: false, - }, - ); + self.insert_with_status(q.clone(), QueryStatus::new(MigrationState::Unsupported)); } self.persistent_handle.pending_inlined_migrations.remove(q); } @@ -619,7 +576,7 @@ impl QueryStatusCache { Q: QueryStatusKey, { q.with_mut_status(self, |s| match s { - Some(s) if s.migration_state != MigrationState::Unsupported => { + Some(s) if s.migration_state() != &MigrationState::Unsupported => { s.always = always; } _ => {} @@ -634,8 +591,8 @@ impl QueryStatusCache { Q: QueryStatusKey, { let should_insert = q.with_mut_status(self, |s| match s { - Some(s) if s.migration_state != MigrationState::Unsupported => { - s.migration_state = status.migration_state.clone(); + Some(s) if s.migration_state() != &MigrationState::Unsupported => { + s.set_migration_state(status.migration_state().clone()); s.execution_info = status.execution_info.clone(); false } @@ -659,7 +616,7 @@ impl QueryStatusCache { .iter_mut() .filter(|v| v.is_successful()) .for_each(|mut v| { - v.migration_state = MigrationState::Pending; + v.set_migration_state(MigrationState::Pending); v.always = false; }); let mut statuses = self.persistent_handle.statuses.write(); @@ -667,7 +624,7 @@ impl QueryStatusCache { .iter_mut() .filter(|(_query_id, (_query, status))| status.is_successful()) .for_each(|(_query_id, (_query, ref mut status))| { - status.migration_state = MigrationState::Pending; + status.set_migration_state(MigrationState::Pending); status.always = false; }); } @@ -726,11 +683,7 @@ impl QueryStatusCache { // Then update the inlined state epoch for the query query.with_mut_status(self, |s| { - if let Some(QueryStatus { - migration_state: MigrationState::Inlined(ref mut state), - .. - }) = s - { + if let Some(state) = s.and_then(|s| s.as_inlined_mut()) { state.epoch += 1; } }) @@ -746,7 +699,7 @@ impl QueryStatusCache { // Get the placeholders that require inlining let placeholders = q.key() - .with_status(self, |s| match s.map(|s| &s.migration_state) { + .with_status(self, |s| match s.map(|s| s.migration_state()) { Some(MigrationState::Inlined(InlinedState { inlined_placeholders, .. @@ -772,6 +725,7 @@ impl QueryStatusCache { /// Does not include any queries that require inlining. pub fn pending_migration(&self) -> QueryList { let statuses = self.persistent_handle.statuses.read(); + statuses .iter() .filter_map(|(_query_id, (query, status))| { @@ -1172,8 +1126,8 @@ mod tests { epoch: 1, }); cache.update_query_migration_state(&q, inlined_state.clone()); - let state = cache.query_status(&q).migration_state; - assert_eq!(state, inlined_state); + let state = cache.query_status(&q); + assert_eq!(state.migration_state(), &inlined_state); assert_eq!( cache .persistent_handle diff --git a/readyset-client/src/query.rs b/readyset-client/src/query.rs index dea6b161b4..0b93f58564 100644 --- a/readyset-client/src/query.rs +++ b/readyset-client/src/query.rs @@ -220,7 +220,7 @@ impl Serialize for DeniedQuery { #[derive(Debug, Clone, PartialEq, Eq)] pub struct QueryStatus { /// The migration state of the query - pub migration_state: MigrationState, + migration_state: MigrationState, /// The execution info of the query, if any pub execution_info: Option, /// If we should always cache the query (never proxy to upstream) @@ -228,6 +228,16 @@ pub struct QueryStatus { } impl QueryStatus { + /// Constructs a QueryStatus with the given migration state, no migration state, + /// and always set to false + pub fn new(migration_state: MigrationState) -> Self { + Self { + migration_state, + execution_info: None, + always: false, + } + } + /// Constructs a QueryStatus with the default migration state for the query, no migration state, /// and always set to false pub fn default_for_query(query: &Query) -> Self { @@ -247,12 +257,46 @@ impl QueryStatus { } } + pub fn set_migration_state(&mut self, migration_state: MigrationState) { + if self.migration_state.can_transit_to(&migration_state) { + self.migration_state = migration_state; + } + } + + /// Returns a reference to the [`MigrationState`] + pub fn migration_state(&self) -> &MigrationState { + &self.migration_state + } + + pub fn supported_state(&self) -> SupportedState { + match &self.migration_state { + MigrationState::DryRunSucceeded + | MigrationState::Successful + | MigrationState::Dropped => SupportedState::Yes, + MigrationState::Pending | MigrationState::Inlined(_) => SupportedState::Pending, + MigrationState::Unsupported => SupportedState::Unsupported, + } + } + + /// Returns true if this query status represents a [pending][] query + /// + /// [pending]: MigrationState::Pending + #[must_use] + pub fn as_inlined_mut(&mut self) -> Option<&mut InlinedState> { + if let MigrationState::Inlined(ref mut state) = self.migration_state { + Some(state) + } else { + None + } + } + /// Returns true if this query status represents a [pending][] query /// /// [pending]: MigrationState::Pending #[must_use] pub fn is_pending(&self) -> bool { - self.migration_state == MigrationState::Pending + // TODO ethan is this necessary change? + self.migration_state.is_pending() } /// Returns true if this query status represents a [successfully migrated][] query @@ -271,11 +315,20 @@ impl QueryStatus { self.migration_state == MigrationState::Unsupported } + /// Returns true if this query status represents an [unsupported][] query + /// + /// [unsupported]: MigrationState::Unsupported + #[must_use] + pub fn is_supported(&self) -> bool { + self.migration_state.is_supported() + } + /// Returns true if this query status represents an [dropped][] query /// /// [unsupported]: MigrationState::Dropped #[must_use] pub fn is_dropped(&self) -> bool { + // TODO ethan see about making migration state internals private? self.migration_state == MigrationState::Dropped } @@ -360,6 +413,23 @@ impl InlinedState { } } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum SupportedState { + Pending, + Yes, + Unsupported, +} + +impl fmt::Display for SupportedState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SupportedState::Pending => write!(f, "pending"), + SupportedState::Yes => write!(f, "yes"), + SupportedState::Unsupported => write!(f, "unsupported"), + } + } +} + /// Represents the current migration state of a given query. This state should be updated any time /// a migration is performed, or we learn that the migration state has changed, i.e. we receive a /// ViewNotFound error indicating a query is not migrated. @@ -416,6 +486,26 @@ impl MigrationState { MigrationState::Dropped | MigrationState::DryRunSucceeded | MigrationState::Successful ) } + + pub fn can_transit_to(&self, next_state: &MigrationState) -> bool { + use MigrationState::*; + + match self { + // Any state can trivially transit to itself + s if s == next_state => true, + // We do not support transitions from the `Unsupported` state, as we assume + // any `Unsupported` query will remain `Unsupported` for the duration of + // this process. + Unsupported => false, + // A query with an Inlined state can only transition to Unsupported. + Inlined(_) => matches!(next_state, Unsupported), + // A query that has a view cannot transition to DryRunSucceeded or Pending + Successful => matches!(next_state, Dropped | Pending), + DryRunSucceeded => matches!(next_state, Successful), + Dropped => matches!(next_state, Successful), + Pending => matches!(next_state, DryRunSucceeded | Unsupported | Successful), + } + } } impl Display for MigrationState { diff --git a/readyset-clustertest/src/readyset_mysql.rs b/readyset-clustertest/src/readyset_mysql.rs index 9406afda8e..0f2ccb85dc 100644 --- a/readyset-clustertest/src/readyset_mysql.rs +++ b/readyset-clustertest/src/readyset_mysql.rs @@ -1585,10 +1585,10 @@ async fn views_synchronize_between_deployments() { ); // Then create that query via adapter 0 - adapter_0 + eventually!(adapter_0 .query_drop("CREATE CACHE FROM SELECT * FROM t1;") .await - .unwrap(); + .is_ok()); // Ensure it's been successfully created in adapter 0 adapter_0.query_drop("SELECT * FROM t1;").await.unwrap();