Skip to content

Commit

Permalink
Remove Controller assertions and return errors instead (Materialize…
Browse files Browse the repository at this point in the history
…Inc#10704)

* Remove assertions and return errors instead

* Add Peek validation
  • Loading branch information
frankmcsherry authored and Andi Wang committed Apr 11, 2022
1 parent 7c1166f commit b646eb8
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 73 deletions.
36 changes: 23 additions & 13 deletions src/coord/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,8 @@ impl Coordinator {
entry.id(),
(source_description, Antichain::from_elem(since_ts)),
)])
.await;
.await
.unwrap();
}
CatalogItem::Table(table) => {
self.persister
Expand Down Expand Up @@ -586,7 +587,8 @@ impl Coordinator {
entry.id(),
(source_description, Antichain::from_elem(since_ts)),
)])
.await;
.await
.unwrap();
}
CatalogItem::Index(_) => {
if BUILTINS.logs().any(|log| log.index_id == entry.id()) {
Expand Down Expand Up @@ -1588,7 +1590,8 @@ impl Coordinator {
self.persisted_table_allow_compaction(&index_since_updates);
self.dataflow_client
.allow_index_compaction(DEFAULT_COMPUTE_INSTANCE_ID, index_since_updates)
.await;
.await
.unwrap();
}

let source_since_updates: Vec<_> = self
Expand Down Expand Up @@ -2238,7 +2241,8 @@ impl Coordinator {
table_id,
(source_description, Antichain::from_elem(since_ts)),
)])
.await;
.await
.unwrap();
// Install the dataflow if so required.
if let Some(df) = df {
let frontiers = self.new_source_frontiers(
Expand Down Expand Up @@ -2339,7 +2343,8 @@ impl Coordinator {

self.dataflow_client
.create_sources(source_descriptions)
.await;
.await
.unwrap();
self.ship_dataflows(dfs).await;
Ok(ExecuteResponse::CreatedSource { existed: false })
}
Expand Down Expand Up @@ -4372,7 +4377,8 @@ impl Coordinator {
}
self.dataflow_client
.create_dataflows(DEFAULT_COMPUTE_INSTANCE_ID, dataflow_plans)
.await;
.await
.unwrap();
}

/// Finalizes a dataflow.
Expand Down Expand Up @@ -4678,11 +4684,13 @@ pub async fn serve(
.collect(),
log_logging: config.log_logging,
});
handle.block_on(
coord
.dataflow_client
.create_instance(DEFAULT_COMPUTE_INSTANCE_ID, logging),
);
handle
.block_on(
coord
.dataflow_client
.create_instance(DEFAULT_COMPUTE_INSTANCE_ID, logging),
)
.unwrap();
let bootstrap = handle.block_on(coord.bootstrap(builtin_table_updates));
let ok = bootstrap.is_ok();
bootstrap_tx.send(bootstrap).unwrap();
Expand Down Expand Up @@ -5071,7 +5079,8 @@ pub mod fast_path_peek {
// Very important: actually create the dataflow (here, so we can destructure).
self.dataflow_client
.create_dataflows(DEFAULT_COMPUTE_INSTANCE_ID, vec![dataflow])
.await;
.await
.unwrap();
// Create an identity MFP operator.
let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
map_filter_project
Expand Down Expand Up @@ -5119,7 +5128,8 @@ pub mod fast_path_peek {
finishing.clone(),
map_filter_project,
)
.await;
.await
.unwrap();

use futures::FutureExt;
use futures::StreamExt;
Expand Down
Loading

0 comments on commit b646eb8

Please sign in to comment.