Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter: Fix views synchronizer race #1243

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 8 additions & 20 deletions readyset-adapter/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2046,7 +2046,7 @@ where
}

if only_supported {
queries.retain(|q| q.status.migration_state.is_supported());
queries.retain(|q| q.status.is_supported());
}

let select_schema = if let Some(handle) = self.metrics_handle.as_mut() {
Expand Down Expand Up @@ -2075,15 +2075,7 @@ where
let mut data = queries
.into_iter()
.map(|DeniedQuery { id, query, status }| {
let s = match status.migration_state {
MigrationState::DryRunSucceeded
| MigrationState::Successful
| MigrationState::Dropped => "yes",
MigrationState::Pending | MigrationState::Inlined(_) => "pending",
MigrationState::Unsupported => "unsupported",
}
.to_string();

let s = status.supported_state().to_string();
let mut row = vec![
DfValue::from(id.to_string()),
DfValue::from(Self::format_query_text(
Expand Down Expand Up @@ -2506,11 +2498,7 @@ where
event: &mut QueryExecutionEvent,
processed_query_params: ProcessedQueryParams,
) -> Result<QueryResult<'a, DB>, DB::Error> {
let mut status = status.unwrap_or(QueryStatus {
migration_state: MigrationState::Unsupported,
execution_info: None,
always: false,
});
let mut status = status.unwrap_or(QueryStatus::new(MigrationState::Unsupported));
let original_status = status.clone();
let did_work = if let Some(ref mut i) = status.execution_info {
i.reset_if_exceeded_recovery(
Expand All @@ -2524,9 +2512,9 @@ where
// Test several conditions to see if we should proxy
let upstream_exists = upstream.is_some();
let proxy_out_of_band = settings.migration_mode != MigrationMode::InRequestPath
&& status.migration_state != MigrationState::Successful;
&& status.migration_state() != &MigrationState::Successful;
let unsupported_or_dropped = matches!(
&status.migration_state,
status.migration_state(),
MigrationState::Unsupported | MigrationState::Dropped
);
let exceeded_network_failure = status
Expand Down Expand Up @@ -2568,7 +2556,7 @@ where
match noria_res {
Ok(noria_ok) => {
// We managed to select on ReadySet, good for us
status.migration_state = MigrationState::Successful;
status.set_migration_state(MigrationState::Successful);
if let Some(i) = status.execution_info.as_mut() {
i.execute_succeeded()
}
Expand All @@ -2591,9 +2579,9 @@ where
}

if noria_err.caused_by_view_not_found() {
status.migration_state = MigrationState::Pending;
status.set_migration_state(MigrationState::Pending);
} else if noria_err.caused_by_unsupported() {
status.migration_state = MigrationState::Unsupported;
status.set_migration_state(MigrationState::Unsupported);
};

let always = status.always;
Expand Down
1 change: 1 addition & 0 deletions readyset-adapter/src/backend/noria_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,7 @@ impl NoriaConnector {
create_if_not_exist: bool,
override_schema_search_path: Option<Vec<SqlIdentifier>>,
) -> ReadySetResult<Relation> {
println!("get_view_name_cached");
let search_path =
override_schema_search_path.unwrap_or_else(|| self.schema_search_path().to_vec());
let view_request = ViewCreateRequest::new(q.clone(), search_path.clone());
Expand Down
18 changes: 5 additions & 13 deletions readyset-adapter/src/proxied_queries_reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ impl ProxiedQueriesReporter {
TelemetryEvent::ProxiedQuery,
TelemetryBuilder::new()
.proxied_query(anon_q)
.migration_status(query.status.migration_state.to_string())
.migration_status(query.status.migration_state().to_string())
.build(),
))
};

match reported_queries.insert(query.id, query.status.migration_state.clone()) {
match reported_queries.insert(query.id, query.status.migration_state().clone()) {
Some(old_migration_state) => {
// Check whether we know of a new migration state for this query. Send an event if
// so
if old_migration_state != query.status.migration_state {
if &old_migration_state != query.status.migration_state() {
build_event()
} else {
None
Expand Down Expand Up @@ -105,11 +105,7 @@ mod tests {
query: Query::ParseFailed(Arc::new(
"this is easier than making a view create request".to_string(),
)),
status: QueryStatus {
migration_state: MigrationState::Pending,
execution_info: None,
always: false,
},
status: QueryStatus::new(MigrationState::Pending),
};
proxied_queries_reporter.report_query(&mut init_q).await;
let status = {
Expand All @@ -126,11 +122,7 @@ mod tests {
query: Query::ParseFailed(Arc::new(
"this is easier than making a view create request".to_string(),
)),
status: QueryStatus {
migration_state: MigrationState::Successful,
execution_info: None,
always: false,
},
status: QueryStatus::new(MigrationState::Successful),
};
proxied_queries_reporter.report_query(&mut updated_q).await;
let status = {
Expand Down
102 changes: 28 additions & 74 deletions readyset-adapter/src/query_status_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl QueryStatusCache {
{
let q = q.into();
let status = QueryStatus::default_for_query(&q);
let migration_state = status.migration_state.clone();
let migration_state = status.migration_state().clone();
let id = self.insert_with_status(q, status);
(id, migration_state)
}
Expand All @@ -326,9 +326,9 @@ impl QueryStatusCache {
Query::Parsed { .. } => status,
Query::ParseFailed(_) => {
let mut status = status;
if status.migration_state != MigrationState::Unsupported {
if status.migration_state() != &MigrationState::Unsupported {
error!("Cannot set migration state to anything other than Unsupported for a Query::ParseFailed");
status.migration_state = MigrationState::Unsupported
status.set_migration_state(MigrationState::Unsupported);
}
status
}
Expand All @@ -355,7 +355,7 @@ impl QueryStatusCache {
let query_state = self.id_to_status.get(&id);

match query_state {
Some(s) => (id, s.value().migration_state.clone()),
Some(s) => (id, s.value().migration_state().clone()),
None => self.insert(q.clone()),
}
}
Expand All @@ -370,7 +370,7 @@ impl QueryStatusCache {
let id = q.query_id();
let query_state = self.id_to_status.get(&id);

(id, query_state.map(|s| s.value().migration_state.clone()))
(id, query_state.map(|s| s.value().migration_state().clone()))
}

/// This function returns the query status of a query. If the query does not exist
Expand Down Expand Up @@ -491,10 +491,10 @@ impl QueryStatusCache {
//
// `Inlined` queries may only be changed from `Inlined` to `Unsupported`.
if !matches!(
s.migration_state,
s.migration_state(),
MigrationState::Unsupported | MigrationState::Inlined(_)
) {
s.migration_state = MigrationState::Pending
s.set_migration_state(MigrationState::Pending);
}
false
}
Expand All @@ -504,14 +504,7 @@ impl QueryStatusCache {
});

if should_insert {
self.insert_with_status(
q.clone(),
QueryStatus {
migration_state: MigrationState::Pending,
execution_info: None,
always: false,
},
);
self.insert_with_status(q.clone(), QueryStatus::new(MigrationState::Pending));
}
}

Expand All @@ -526,37 +519,15 @@ impl QueryStatusCache {
// Dropped should not be set manually
debug_assert!(!matches!(m, MigrationState::Dropped));

let should_insert = q.with_mut_status(self, |s| {
match s {
Some(s) => {
match s.migration_state {
// We do not support transitions from the `Unsupported` state, as we assume
// any `Unsupported` query will remain `Unsupported` for the duration of
// this process.
MigrationState::Unsupported => {}
// A query with an Inlined state can only transition to Unsupported.
MigrationState::Inlined(_) => {
if matches!(m, MigrationState::Unsupported) {
s.migration_state = MigrationState::Unsupported;
}
}
// All other state transitions are allowed.
_ => s.migration_state = m.clone(),
}
false
}
None => true,
let should_insert = q.with_mut_status(self, |s| match s {
Some(s) => {
s.set_migration_state(m.clone());
false
}
None => true,
});
if should_insert {
self.insert_with_status(
q.clone(),
QueryStatus {
migration_state: m,
execution_info: None,
always: false,
},
);
self.insert_with_status(q.clone(), QueryStatus::new(m));
}
}

Expand All @@ -570,20 +541,13 @@ impl QueryStatusCache {
{
let should_insert = q.with_mut_status(self, |s| match s {
Some(s) => {
s.migration_state = MigrationState::Dropped;
s.set_migration_state(MigrationState::Dropped);
false
}
None => true,
});
if should_insert {
self.insert_with_status(
q.clone(),
QueryStatus {
migration_state: MigrationState::Dropped,
execution_info: None,
always: false,
},
);
self.insert_with_status(q.clone(), QueryStatus::new(MigrationState::Dropped));
}
}

Expand All @@ -592,20 +556,13 @@ impl QueryStatusCache {
pub fn unsupported_inlined_migration(&self, q: &ViewCreateRequest) {
let should_insert = q.with_mut_status(self, |s| match s {
Some(s) => {
s.migration_state = MigrationState::Unsupported;
s.set_migration_state(MigrationState::Unsupported);
false
}
None => true,
});
if should_insert {
self.insert_with_status(
q.clone(),
QueryStatus {
migration_state: MigrationState::Unsupported,
execution_info: None,
always: false,
},
);
self.insert_with_status(q.clone(), QueryStatus::new(MigrationState::Unsupported));
}
self.persistent_handle.pending_inlined_migrations.remove(q);
}
Expand All @@ -619,7 +576,7 @@ impl QueryStatusCache {
Q: QueryStatusKey,
{
q.with_mut_status(self, |s| match s {
Some(s) if s.migration_state != MigrationState::Unsupported => {
Some(s) if s.migration_state() != &MigrationState::Unsupported => {
s.always = always;
}
_ => {}
Expand All @@ -634,8 +591,8 @@ impl QueryStatusCache {
Q: QueryStatusKey,
{
let should_insert = q.with_mut_status(self, |s| match s {
Some(s) if s.migration_state != MigrationState::Unsupported => {
s.migration_state = status.migration_state.clone();
Some(s) if s.migration_state() != &MigrationState::Unsupported => {
s.set_migration_state(status.migration_state().clone());
s.execution_info = status.execution_info.clone();
false
}
Expand All @@ -659,15 +616,15 @@ impl QueryStatusCache {
.iter_mut()
.filter(|v| v.is_successful())
.for_each(|mut v| {
v.migration_state = MigrationState::Pending;
v.set_migration_state(MigrationState::Pending);
v.always = false;
});
let mut statuses = self.persistent_handle.statuses.write();
statuses
.iter_mut()
.filter(|(_query_id, (_query, status))| status.is_successful())
.for_each(|(_query_id, (_query, ref mut status))| {
status.migration_state = MigrationState::Pending;
status.set_migration_state(MigrationState::Pending);
status.always = false;
});
}
Expand Down Expand Up @@ -726,11 +683,7 @@ impl QueryStatusCache {

// Then update the inlined state epoch for the query
query.with_mut_status(self, |s| {
if let Some(QueryStatus {
migration_state: MigrationState::Inlined(ref mut state),
..
}) = s
{
if let Some(state) = s.and_then(|s| s.as_inlined_mut()) {
state.epoch += 1;
}
})
Expand All @@ -746,7 +699,7 @@ impl QueryStatusCache {
// Get the placeholders that require inlining
let placeholders =
q.key()
.with_status(self, |s| match s.map(|s| &s.migration_state) {
.with_status(self, |s| match s.map(|s| s.migration_state()) {
Some(MigrationState::Inlined(InlinedState {
inlined_placeholders,
..
Expand All @@ -772,6 +725,7 @@ impl QueryStatusCache {
/// Does not include any queries that require inlining.
pub fn pending_migration(&self) -> QueryList {
let statuses = self.persistent_handle.statuses.read();

statuses
.iter()
.filter_map(|(_query_id, (query, status))| {
Expand Down Expand Up @@ -1172,8 +1126,8 @@ mod tests {
epoch: 1,
});
cache.update_query_migration_state(&q, inlined_state.clone());
let state = cache.query_status(&q).migration_state;
assert_eq!(state, inlined_state);
let state = cache.query_status(&q);
assert_eq!(state.migration_state(), &inlined_state);
assert_eq!(
cache
.persistent_handle
Expand Down