diff --git a/Cargo.lock b/Cargo.lock index 408fec94e15..6c67f437d5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,7 +98,7 @@ checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" [[package]] name = "async-bb8-diesel" version = "0.1.0" -source = "git+https://github.com/oxidecomputer/async-bb8-diesel?rev=51de79fe02b334899be5d5fd8b469f9d140ea887#51de79fe02b334899be5d5fd8b469f9d140ea887" +source = "git+https://github.com/oxidecomputer/async-bb8-diesel?rev=7944dafc8a36dc6e20a1405eca59d04662de2bb7#7944dafc8a36dc6e20a1405eca59d04662de2bb7" dependencies = [ "async-trait", "bb8", diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index de3c5086604..61fecaac7e3 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -9,7 +9,7 @@ path = "../rpaths" [dependencies] anyhow = "1.0" -async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "51de79fe02b334899be5d5fd8b469f9d140ea887" } +async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "7944dafc8a36dc6e20a1405eca59d04662de2bb7" } async-trait = "0.1.56" base64 = "0.13.0" bb8 = "0.8.0" diff --git a/nexus/src/db/collection_attach.rs b/nexus/src/db/collection_attach.rs index 10ad00b7254..84a4073df00 100644 --- a/nexus/src/db/collection_attach.rs +++ b/nexus/src/db/collection_attach.rs @@ -17,7 +17,7 @@ use super::cte_utils::{ QueryFromClause, QuerySqlType, TableDefaultWhereClause, }; use super::pool::DbConnection; -use async_bb8_diesel::{AsyncRunQueryDsl, ConnectionManager, PoolError}; +use async_bb8_diesel::{AsyncRunQueryDsl, PoolError}; use diesel::associations::HasTable; use diesel::expression::{AsExpression, Expression}; use diesel::helper_types::*; @@ -301,12 +301,6 @@ where pub type AsyncAttachToCollectionResult = Result<(C, ResourceType), AttachError>; -/// Result of [`AttachToCollectionStatement`] when executed synchronously -pub type SyncAttachToCollectionResult = Result< - (C, ResourceType), - AttachError, ->; - /// Errors returned by [`AttachToCollectionStatement`]. #[derive(Debug)] pub enum AttachError { @@ -338,9 +332,10 @@ where AttachToCollectionStatement: Send, { /// Issues the CTE asynchronously and parses the result. - pub async fn attach_and_get_result_async( + pub async fn attach_and_get_result_async( self, - pool: &bb8::Pool>, + conn: &(impl async_bb8_diesel::AsyncConnection + + Sync), ) -> AsyncAttachToCollectionResult where // We require this bound to ensure that "Self" is runnable as query. @@ -349,33 +344,17 @@ where DbConnection, RawOutput, >, + ConnErr: From + Send + 'static, + PoolError: From, { - self.get_result_async::>(pool) + self.get_result_async::>(conn) .await // If the database returns an error, propagate it right away. - .map_err(AttachError::DatabaseError) + .map_err(|e| AttachError::DatabaseError(PoolError::from(e))) // Otherwise, parse the output to determine if the CTE succeeded. .and_then(Self::parse_result) } - /// Issues the CTE synchronously and parses the result. - pub fn attach_and_get_result( - self, - conn: &mut DbConnection, - ) -> SyncAttachToCollectionResult - where - // We require this bound to ensure that "Self" is runnable as query. - Self: query_methods::LoadQuery< - 'static, - DbConnection, - RawOutput, - >, - { - self.get_result::>(conn) - .map_err(AttachError::DatabaseError) - .and_then(Self::parse_result) - } - fn parse_result( result: RawOutput, ) -> Result<(C, ResourceType), AttachError> { @@ -1012,16 +991,17 @@ mod test { .set(resource::dsl::collection_id.eq(collection_id)), ); - type TxnError = TransactionError< - AttachError, - >; + type TxnError = + TransactionError>; let result = pool .pool() - .transaction(move |conn| { - attach_query.attach_and_get_result(conn).map_err(|e| match e { - AttachError::DatabaseError(e) => TxnError::from(e), - e => TxnError::CustomError(e), - }) + .transaction_async(|conn| async move { + attach_query.attach_and_get_result_async(&conn).await.map_err( + |e| match e { + AttachError::DatabaseError(e) => TxnError::from(e), + e => TxnError::CustomError(e), + }, + ) }) .await; diff --git a/nexus/src/db/collection_detach.rs b/nexus/src/db/collection_detach.rs index 3ae384d74e9..c4ae237062a 100644 --- a/nexus/src/db/collection_detach.rs +++ b/nexus/src/db/collection_detach.rs @@ -16,7 +16,7 @@ use super::cte_utils::{ QueryFromClause, QuerySqlType, }; use super::pool::DbConnection; -use async_bb8_diesel::{AsyncRunQueryDsl, ConnectionManager, PoolError}; +use async_bb8_diesel::{AsyncRunQueryDsl, PoolError}; use diesel::associations::HasTable; use diesel::expression::{AsExpression, Expression}; use diesel::helper_types::*; @@ -232,10 +232,6 @@ where pub type AsyncDetachFromCollectionResult = Result>; -/// Result of [`DetachFromCollectionStatement`] when executed synchronously -pub type SyncDetachFromCollectionResult = - Result>; - /// Errors returned by [`DetachFromCollectionStatement`]. #[derive(Debug)] pub enum DetachError { @@ -269,7 +265,8 @@ where /// Issues the CTE asynchronously and parses the result. pub async fn detach_and_get_result_async( self, - pool: &bb8::Pool>, + conn: &(impl async_bb8_diesel::AsyncConnection + + Sync), ) -> AsyncDetachFromCollectionResult where // We require this bound to ensure that "Self" is runnable as query. @@ -279,7 +276,7 @@ where RawOutput, >, { - self.get_result_async::>(pool) + self.get_result_async::>(conn) .await // If the database returns an error, propagate it right away. .map_err(DetachError::DatabaseError) @@ -287,24 +284,6 @@ where .and_then(Self::parse_result) } - /// Issues the CTE synchronously and parses the result. - pub fn detach_and_get_result( - self, - conn: &mut DbConnection, - ) -> SyncDetachFromCollectionResult - where - // We require this bound to ensure that "Self" is runnable as query. - Self: query_methods::LoadQuery< - 'static, - DbConnection, - RawOutput, - >, - { - self.get_result::>(conn) - .map_err(DetachError::DatabaseError) - .and_then(Self::parse_result) - } - fn parse_result( result: RawOutput, ) -> Result> { @@ -502,12 +481,8 @@ where mod test { use super::*; use crate::db::collection_attach::DatastoreAttachTarget; - use crate::db::{ - self, error::TransactionError, identity::Resource as IdentityResource, - }; - use async_bb8_diesel::{ - AsyncConnection, AsyncRunQueryDsl, AsyncSimpleConnection, - }; + use crate::db::{self, identity::Resource as IdentityResource}; + use async_bb8_diesel::{AsyncRunQueryDsl, AsyncSimpleConnection}; use chrono::Utc; use db_macros::Resource; use diesel::expression_methods::ExpressionMethods; @@ -899,57 +874,6 @@ mod test { logctx.cleanup_successful(); } - #[tokio::test] - async fn test_detach_once_synchronous() { - let logctx = dev::test_setup_log("test_detach_once_synchronous"); - let mut db = test_setup_database(&logctx.log).await; - let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&cfg); - - setup_db(&pool).await; - - let collection_id = uuid::Uuid::new_v4(); - let resource_id = uuid::Uuid::new_v4(); - - // Create the collection and resource. - let _collection = - insert_collection(collection_id, "collection", &pool).await; - let _resource = insert_resource(resource_id, "resource", &pool).await; - attach_resource(collection_id, resource_id, &pool).await; - - // Detach the resource from the collection. - let detach_query = Collection::detach_resource( - collection_id, - resource_id, - collection::table.into_boxed(), - resource::table.into_boxed(), - diesel::update(resource::table) - .set(resource::dsl::collection_id.eq(Option::::None)), - ); - - type TxnError = TransactionError< - DetachError, - >; - let result = pool - .pool() - .transaction(move |conn| { - detach_query.detach_and_get_result(conn).map_err(|e| match e { - DetachError::DatabaseError(e) => TxnError::from(e), - e => TxnError::CustomError(e), - }) - }) - .await; - - // "detach_and_get_result" should return the "detached" resource. - let returned_resource = result.expect("Detach should have worked"); - assert!(returned_resource.collection_id.is_none()); - // The returned values should be the latest value in the DB. - assert_eq!(returned_resource, get_resource(resource_id, &pool).await); - - db.cleanup().await.unwrap(); - logctx.cleanup_successful(); - } - #[tokio::test] async fn test_detach_while_already_detached() { let logctx = dev::test_setup_log("test_detach_while_already_detached"); diff --git a/nexus/src/db/collection_detach_many.rs b/nexus/src/db/collection_detach_many.rs index 4288fc7b04f..caef0eaa53a 100644 --- a/nexus/src/db/collection_detach_many.rs +++ b/nexus/src/db/collection_detach_many.rs @@ -16,7 +16,7 @@ use super::cte_utils::{ QueryFromClause, QuerySqlType, }; use super::pool::DbConnection; -use async_bb8_diesel::{AsyncRunQueryDsl, ConnectionManager, PoolError}; +use async_bb8_diesel::{AsyncRunQueryDsl, PoolError}; use diesel::associations::HasTable; use diesel::expression::{AsExpression, Expression}; use diesel::helper_types::*; @@ -243,10 +243,6 @@ where pub type AsyncDetachManyFromCollectionResult = Result>; -/// Result of [`DetachManyFromCollectionStatement`] when executed synchronously -pub type SyncDetachManyFromCollectionResult = - Result>; - /// Errors returned by [`DetachManyFromCollectionStatement`]. #[derive(Debug)] pub enum DetachManyError { @@ -277,36 +273,25 @@ where DetachManyFromCollectionStatement: Send, { /// Issues the CTE asynchronously and parses the result. - pub async fn detach_and_get_result_async( + pub async fn detach_and_get_result_async( self, - pool: &bb8::Pool>, + conn: &(impl async_bb8_diesel::AsyncConnection + + Sync), ) -> AsyncDetachManyFromCollectionResult where // We require this bound to ensure that "Self" is runnable as query. Self: query_methods::LoadQuery<'static, DbConnection, RawOutput>, + ConnErr: From + Send + 'static, + PoolError: From, { - self.get_result_async::>(pool) + self.get_result_async::>(conn) .await // If the database returns an error, propagate it right away. - .map_err(DetachManyError::DatabaseError) + .map_err(|e| DetachManyError::DatabaseError(PoolError::from(e))) // Otherwise, parse the output to determine if the CTE succeeded. .and_then(Self::parse_result) } - /// Issues the CTE synchronously and parses the result. - pub fn detach_and_get_result( - self, - conn: &mut DbConnection, - ) -> SyncDetachManyFromCollectionResult - where - // We require this bound to ensure that "Self" is runnable as query. - Self: query_methods::LoadQuery<'static, DbConnection, RawOutput>, - { - self.get_result::>(conn) - .map_err(DetachManyError::DatabaseError) - .and_then(Self::parse_result) - } - fn parse_result( result: RawOutput, ) -> Result> { @@ -926,16 +911,17 @@ mod test { .set(resource::dsl::collection_id.eq(Option::::None)), ); - type TxnError = TransactionError< - DetachManyError, - >; + type TxnError = + TransactionError>; let result = pool .pool() - .transaction(move |conn| { - detach_query.detach_and_get_result(conn).map_err(|e| match e { - DetachManyError::DatabaseError(e) => TxnError::from(e), - e => TxnError::CustomError(e), - }) + .transaction_async(|conn| async move { + detach_query.detach_and_get_result_async(&conn).await.map_err( + |e| match e { + DetachManyError::DatabaseError(e) => TxnError::from(e), + e => TxnError::CustomError(e), + }, + ) }) .await; diff --git a/nexus/src/db/collection_insert.rs b/nexus/src/db/collection_insert.rs index e87c5670168..47733bdb1d8 100644 --- a/nexus/src/db/collection_insert.rs +++ b/nexus/src/db/collection_insert.rs @@ -10,9 +10,7 @@ //! 3) inserts the child resource row use super::pool::DbConnection; -use async_bb8_diesel::{ - AsyncRunQueryDsl, ConnectionError, ConnectionManager, PoolError, -}; +use async_bb8_diesel::{AsyncRunQueryDsl, ConnectionError, PoolError}; use diesel::associations::HasTable; use diesel::helper_types::*; use diesel::pg::Pg; @@ -166,9 +164,6 @@ where /// Result of [`InsertIntoCollectionStatement`] when executed asynchronously pub type AsyncInsertIntoCollectionResult = Result; -/// Result of [`InsertIntoCollectionStatement`] when executed synchronously -pub type SyncInsertIntoCollectionResult = Result; - /// Errors returned by [`InsertIntoCollectionStatement`]. #[derive(Debug)] pub enum AsyncInsertError { @@ -178,15 +173,6 @@ pub enum AsyncInsertError { DatabaseError(PoolError), } -/// Errors returned by [`InsertIntoCollectionStatement`]. -#[derive(Debug)] -pub enum SyncInsertError { - /// The collection that the query was inserting into does not exist - CollectionNotFound, - /// Other database error - DatabaseError(diesel::result::Error), -} - impl InsertIntoCollectionStatement where ResourceType: 'static + Debug + Send + Selectable, @@ -202,17 +188,20 @@ where /// - Ok(new row) /// - Error(collection not found) /// - Error(other diesel error) - pub async fn insert_and_get_result_async( + pub async fn insert_and_get_result_async( self, - pool: &bb8::Pool>, + conn: &(impl async_bb8_diesel::AsyncConnection + + Sync), ) -> AsyncInsertIntoCollectionResult where // We require this bound to ensure that "Self" is runnable as query. Self: query_methods::LoadQuery<'static, DbConnection, ResourceType>, + ConnErr: From + Send + 'static, + PoolError: From, { - self.get_result_async::(pool) + self.get_result_async::(conn) .await - .map_err(Self::translate_async_error) + .map_err(|e| Self::translate_async_error(PoolError::from(e))) } /// Issues the CTE asynchronously and parses the result. @@ -221,53 +210,20 @@ where /// - Ok(Vec of new rows) /// - Error(collection not found) /// - Error(other diesel error) - pub async fn insert_and_get_results_async( + pub async fn insert_and_get_results_async( self, - pool: &bb8::Pool>, + conn: &(impl async_bb8_diesel::AsyncConnection + + Sync), ) -> AsyncInsertIntoCollectionResult> where // We require this bound to ensure that "Self" is runnable as query. Self: query_methods::LoadQuery<'static, DbConnection, ResourceType>, + ConnErr: From + Send + 'static, + PoolError: From, { - self.get_results_async::(pool) + self.get_results_async::(conn) .await - .map_err(Self::translate_async_error) - } - - /// Issues the CTE synchronously and parses the result. - /// - /// The three outcomes are: - /// - Ok(new row) - /// - Error(collection not found) - /// - Error(other diesel error) - pub fn insert_and_get_result( - self, - conn: &mut DbConnection, - ) -> SyncInsertIntoCollectionResult - where - // We require this bound to ensure that "Self" is runnable as query. - Self: query_methods::LoadQuery<'static, DbConnection, ResourceType>, - { - self.get_result::(conn) - .map_err(Self::translate_sync_error) - } - - /// Issues the CTE synchronously and parses the result. - /// - /// The three outcomes are: - /// - Ok(Vec of new rows) - /// - Error(collection not found) - /// - Error(other diesel error) - pub fn insert_and_get_results( - self, - conn: &mut DbConnection, - ) -> SyncInsertIntoCollectionResult> - where - // We require this bound to ensure that "Self" is runnable as query. - Self: query_methods::LoadQuery<'static, DbConnection, ResourceType>, - { - self.get_results::(conn) - .map_err(Self::translate_sync_error) + .map_err(|e| Self::translate_async_error(PoolError::from(e))) } /// Check for the intentional division by zero error @@ -298,16 +254,6 @@ where other => AsyncInsertError::DatabaseError(other), } } - - /// Translate from diesel errors into SyncInsertError, handling the - /// intentional division-by-zero error in the CTE. - fn translate_sync_error(err: diesel::result::Error) -> SyncInsertError { - if Self::error_is_division_by_zero(&err) { - SyncInsertError::CollectionNotFound - } else { - SyncInsertError::DatabaseError(err) - } - } } type SelectableSqlType = @@ -446,12 +392,8 @@ where #[cfg(test)] mod test { use super::*; - use crate::db::{ - self, error::TransactionError, identity::Resource as IdentityResource, - }; - use async_bb8_diesel::{ - AsyncConnection, AsyncRunQueryDsl, AsyncSimpleConnection, - }; + use crate::db::{self, identity::Resource as IdentityResource}; + use async_bb8_diesel::{AsyncRunQueryDsl, AsyncSimpleConnection}; use chrono::{DateTime, NaiveDateTime, Utc}; use db_macros::Resource; use diesel::expression_methods::ExpressionMethods; @@ -623,41 +565,6 @@ mod test { .await; assert!(matches!(insert, Err(AsyncInsertError::CollectionNotFound))); - let insert_query = Collection::insert_resource( - collection_id, - diesel::insert_into(resource::table).values(( - resource::dsl::id.eq(resource_id), - resource::dsl::name.eq("test"), - resource::dsl::description.eq("desc"), - resource::dsl::time_created.eq(Utc::now()), - resource::dsl::time_modified.eq(Utc::now()), - resource::dsl::collection_id.eq(collection_id), - )), - ); - - #[derive(Debug)] - enum CollectionError { - NotFound, - } - type TxnError = TransactionError; - - let result = pool - .pool() - .transaction(move |conn| { - insert_query.insert_and_get_result(conn).map_err(|e| match e { - SyncInsertError::CollectionNotFound => { - TxnError::CustomError(CollectionError::NotFound) - } - SyncInsertError::DatabaseError(e) => TxnError::from(e), - }) - }) - .await; - - assert!(matches!( - result, - Err(TxnError::CustomError(CollectionError::NotFound)) - )); - db.cleanup().await.unwrap(); logctx.cleanup_successful(); } diff --git a/nexus/src/db/datastore/device_auth.rs b/nexus/src/db/datastore/device_auth.rs index c9e221fd92a..62e54f23215 100644 --- a/nexus/src/db/datastore/device_auth.rs +++ b/nexus/src/db/datastore/device_auth.rs @@ -79,14 +79,16 @@ impl DataStore { self.pool_authorized(opctx) .await? - .transaction(move |conn| match delete_request.execute(conn)? { - 0 => { - Err(TxnError::CustomError(TokenGrantError::RequestNotFound)) + .transaction_async(|conn| async move { + match delete_request.execute_async(&conn).await? { + 0 => { + Err(TxnError::CustomError(TokenGrantError::RequestNotFound)) + } + 1 => Ok(insert_token.get_result_async(&conn).await?), + _ => Err(TxnError::CustomError( + TokenGrantError::TooManyRequests, + )), } - 1 => Ok(insert_token.get_result(conn)?), - _ => Err(TxnError::CustomError( - TokenGrantError::TooManyRequests, - )), }) .await .map_err(|e| match e { diff --git a/nexus/src/db/datastore/identity_provider.rs b/nexus/src/db/datastore/identity_provider.rs index 18b185102b3..ef7a3800090 100644 --- a/nexus/src/db/datastore/identity_provider.rs +++ b/nexus/src/db/datastore/identity_provider.rs @@ -53,7 +53,7 @@ impl DataStore { let name = provider.identity().name.to_string(); self.pool_authorized(opctx) .await? - .transaction(move |conn| { + .transaction_async(|conn| async move { // insert silo identity provider record with type Saml use db::schema::identity_provider::dsl as idp_dsl; diesel::insert_into(idp_dsl::identity_provider) @@ -69,14 +69,16 @@ impl DataStore { silo_id: provider.silo_id, provider_type: db::model::IdentityProviderType::Saml, }) - .execute(conn)?; + .execute_async(&conn) + .await?; // insert silo saml identity provider record use db::schema::saml_identity_provider::dsl; let result = diesel::insert_into(dsl::saml_identity_provider) .values(provider) .returning(db::model::SamlIdentityProvider::as_returning()) - .get_result(conn)?; + .get_result_async(&conn) + .await?; Ok(result) }) diff --git a/nexus/src/db/datastore/network_interface.rs b/nexus/src/db/datastore/network_interface.rs index b341cb612fd..45acdcb0f82 100644 --- a/nexus/src/db/datastore/network_interface.rs +++ b/nexus/src/db/datastore/network_interface.rs @@ -302,15 +302,18 @@ impl DataStore { #[derive(Debug)] enum NetworkInterfaceUpdateError { InstanceNotStopped, - FailedToUnsetPrimary(diesel::result::Error), + FailedToUnsetPrimary(async_bb8_diesel::ConnectionError), } type TxnError = TransactionError; let pool = self.pool_authorized(opctx).await?; if primary { - pool.transaction(move |conn| { - let instance_state = - instance_query.get_result(conn)?.runtime_state.state; + pool.transaction_async(|conn| async move { + let instance_state = instance_query + .get_result_async(&conn) + .await? + .runtime_state + .state; if instance_state != stopped { return Err(TxnError::CustomError( NetworkInterfaceUpdateError::InstanceNotStopped, @@ -318,8 +321,8 @@ impl DataStore { } // First, get the primary interface - let primary_interface = find_primary_query.get_result(conn)?; - + let primary_interface = + find_primary_query.get_result_async(&conn).await?; // If the target and primary are different, we need to toggle // the primary into a secondary. if primary_interface.identity.id != interface_id { @@ -327,7 +330,8 @@ impl DataStore { .filter(dsl::id.eq(primary_interface.identity.id)) .filter(dsl::time_deleted.is_null()) .set(dsl::is_primary.eq(false)) - .execute(conn) + .execute_async(&conn) + .await { return Err(TxnError::CustomError( NetworkInterfaceUpdateError::FailedToUnsetPrimary( @@ -338,7 +342,7 @@ impl DataStore { } // In any case, update the actual target - Ok(update_target_query.get_result(conn)?) + Ok(update_target_query.get_result_async(&conn).await?) }) } else { // In this case, we can just directly apply the updates. By @@ -346,15 +350,18 @@ impl DataStore { // be done there. The other columns always need to be updated, and // we're only hitting a single row. Note that we still need to // verify the instance is stopped. - pool.transaction(move |conn| { - let instance_state = - instance_query.get_result(conn)?.runtime_state.state; + pool.transaction_async(|conn| async move { + let instance_state = instance_query + .get_result_async(&conn) + .await? + .runtime_state + .state; if instance_state != stopped { return Err(TxnError::CustomError( NetworkInterfaceUpdateError::InstanceNotStopped, )); } - Ok(update_target_query.get_result(conn)?) + Ok(update_target_query.get_result_async(&conn).await?) }) } .await diff --git a/nexus/src/db/datastore/rack.rs b/nexus/src/db/datastore/rack.rs index 06d298a4261..415e7e83623 100644 --- a/nexus/src/db/datastore/rack.rs +++ b/nexus/src/db/datastore/rack.rs @@ -8,10 +8,8 @@ use super::DataStore; use crate::authz; use crate::context::OpContext; use crate::db; +use crate::db::collection_insert::AsyncInsertError; use crate::db::collection_insert::DatastoreCollection; -use crate::db::collection_insert::SyncInsertError; -use crate::db::error::public_error_from_diesel_create; -use crate::db::error::public_error_from_diesel_lookup; use crate::db::error::public_error_from_diesel_pool; use crate::db::error::ErrorHandler; use crate::db::error::TransactionError; @@ -22,6 +20,7 @@ use crate::db::model::Sled; use crate::db::pagination::paginated; use async_bb8_diesel::AsyncConnection; use async_bb8_diesel::AsyncRunQueryDsl; +use async_bb8_diesel::PoolError; use chrono::Utc; use diesel::prelude::*; use diesel::upsert::excluded; @@ -90,8 +89,8 @@ impl DataStore { #[derive(Debug)] enum RackInitError { - ServiceInsert { err: SyncInsertError, sled_id: Uuid, svc_id: Uuid }, - RackUpdate(diesel::result::Error), + ServiceInsert { err: AsyncInsertError, sled_id: Uuid, svc_id: Uuid }, + RackUpdate(PoolError), } type TxnError = TransactionError; @@ -99,14 +98,17 @@ impl DataStore { // the low-frequency of calls, this optimization has been deferred. self.pool_authorized(opctx) .await? - .transaction(move |conn| { + .transaction_async(|conn| async move { // Early exit if the rack has already been initialized. let rack = rack_dsl::rack .filter(rack_dsl::id.eq(rack_id)) .select(Rack::as_select()) - .get_result(conn) + .get_result_async(&conn) + .await .map_err(|e| { - TxnError::CustomError(RackInitError::RackUpdate(e)) + TxnError::CustomError(RackInitError::RackUpdate( + PoolError::from(e), + )) })?; if rack.initialized { return Ok(rack); @@ -130,7 +132,8 @@ impl DataStore { .eq(excluded(service_dsl::kind)), )), ) - .insert_and_get_result(conn) + .insert_and_get_result_async(&conn) + .await .map_err(|err| { TxnError::CustomError(RackInitError::ServiceInsert { err, @@ -146,9 +149,12 @@ impl DataStore { rack_dsl::time_modified.eq(Utc::now()), )) .returning(Rack::as_returning()) - .get_result::(conn) + .get_result_async::(&conn) + .await .map_err(|e| { - TxnError::CustomError(RackInitError::RackUpdate(e)) + TxnError::CustomError(RackInitError::RackUpdate( + PoolError::from(e), + )) }) }) .await @@ -158,25 +164,29 @@ impl DataStore { sled_id, svc_id, }) => match err { - SyncInsertError::CollectionNotFound => { + AsyncInsertError::CollectionNotFound => { Error::ObjectNotFound { type_name: ResourceType::Sled, lookup_type: LookupType::ById(sled_id), } } - SyncInsertError::DatabaseError(e) => { - public_error_from_diesel_create( + AsyncInsertError::DatabaseError(e) => { + public_error_from_diesel_pool( e, - ResourceType::Service, - &svc_id.to_string(), + ErrorHandler::Conflict( + ResourceType::Service, + &svc_id.to_string(), + ), ) } }, TxnError::CustomError(RackInitError::RackUpdate(err)) => { - public_error_from_diesel_lookup( + public_error_from_diesel_pool( err, - ResourceType::Rack, - &LookupType::ById(rack_id), + ErrorHandler::NotFoundByLookup( + ResourceType::Rack, + LookupType::ById(rack_id), + ), ) } TxnError::Pool(e) => { diff --git a/nexus/src/db/datastore/region.rs b/nexus/src/db/datastore/region.rs index ad354b7bb8e..acf14c8a4b0 100644 --- a/nexus/src/db/datastore/region.rs +++ b/nexus/src/db/datastore/region.rs @@ -147,7 +147,7 @@ impl DataStore { params.extent_size() / block_size.to_bytes() as i64; self.pool() - .transaction(move |conn| { + .transaction_async(|conn| async move { // First, for idempotency, check if regions are already // allocated to this disk. // @@ -155,7 +155,8 @@ impl DataStore { // datasets. let datasets_and_regions = Self::get_allocated_regions_query(volume_id) - .get_results::<(Dataset, Region)>(conn)?; + .get_results_async::<(Dataset, Region)>(&conn) + .await?; if !datasets_and_regions.is_empty() { return Ok(datasets_and_regions); } @@ -168,7 +169,8 @@ impl DataStore { // separately - this is all or nothing. let mut datasets: Vec = Self::get_allocatable_datasets_query() - .get_results::(conn)?; + .get_results_async::(&conn) + .await?; if datasets.len() < REGION_REDUNDANCY_THRESHOLD { return Err(TxnError::CustomError( @@ -196,7 +198,8 @@ impl DataStore { let regions = diesel::insert_into(region_dsl::region) .values(regions) .returning(Region::as_returning()) - .get_results(conn)?; + .get_results_async(&conn) + .await?; // Update size_used in the source datasets containing those // regions. @@ -211,7 +214,8 @@ impl DataStore { * region_dsl::extent_count, )) .nullable() - .get_result(conn)?; + .get_result_async(&conn) + .await?; let dataset_total_occupied_size: i64 = if let Some( dataset_total_occupied_size, @@ -240,7 +244,8 @@ impl DataStore { dataset_dsl::size_used .eq(dataset_total_occupied_size), ) - .execute(conn)?; + .execute_async(&conn) + .await?; // Update the results we'll send the caller dataset.size_used = Some(dataset_total_occupied_size); @@ -261,7 +266,8 @@ impl DataStore { .filter(dataset_dsl::time_deleted.is_null()) .select(diesel::dsl::sum(dataset_dsl::size_used)) .nullable() - .get_result(conn)?; + .get_result_async(&conn) + .await?; let zpool_total_occupied_size: u64 = if let Some( zpool_total_occupied_size, @@ -287,7 +293,8 @@ impl DataStore { let zpool = zpool_dsl::zpool .filter(zpool_dsl::id.eq(zpool_id)) .select(Zpool::as_returning()) - .get_result(conn)?; + .get_result_async(&conn) + .await?; // Does this go over the zpool's total size? if zpool.total_size.to_bytes() < zpool_total_occupied_size { diff --git a/nexus/src/db/datastore/role.rs b/nexus/src/db/datastore/role.rs index 03ce9ecb25e..f3d04ee343a 100644 --- a/nexus/src/db/datastore/role.rs +++ b/nexus/src/db/datastore/role.rs @@ -140,14 +140,15 @@ impl DataStore { // person attempt to access anything. self.pool_authorized(opctx).await? - .transaction(move |conn| { + .transaction_async(|conn| async move { let mut role_assignments = dsl::role_assignment .filter(dsl::identity_type.eq(identity_type.clone())) .filter(dsl::identity_id.eq(identity_id)) .filter(dsl::resource_type.eq(resource_type.to_string())) .filter(dsl::resource_id.eq(resource_id)) .select(RoleAssignment::as_select()) - .load::(conn)?; + .load_async::(&conn) + .await?; // Return the roles that a silo user has from their group memberships if identity_type == IdentityType::SiloUser { @@ -163,7 +164,8 @@ impl DataStore { .filter(dsl::resource_type.eq(resource_type.to_string())) .filter(dsl::resource_id.eq(resource_id)) .select(RoleAssignment::as_select()) - .load::(conn)?; + .load_async::(&conn) + .await?; role_assignments.append(&mut group_role_assignments); } @@ -259,9 +261,9 @@ impl DataStore { // then flip the resource over to using it. self.pool_authorized(opctx) .await? - .transaction(move |conn| { - delete_old_query.execute(conn)?; - Ok(insert_new_query.get_results(conn)?) + .transaction_async(|conn| async move { + delete_old_query.execute_async(&conn).await?; + Ok(insert_new_query.get_results_async(&conn).await?) }) .await .map_err(|e| match e { diff --git a/nexus/src/db/datastore/silo.rs b/nexus/src/db/datastore/silo.rs index 96324b83508..28bf42f12e0 100644 --- a/nexus/src/db/datastore/silo.rs +++ b/nexus/src/db/datastore/silo.rs @@ -132,18 +132,18 @@ impl DataStore { self.pool_authorized(opctx) .await? - .transaction(move |conn| { - let silo = silo_create_query.get_result(conn)?; + .transaction_async(|conn| async move { + let silo = silo_create_query.get_result_async(&conn).await?; if let Some(query) = silo_admin_group_ensure_query { - query.get_result(conn)?; + query.get_result_async(&conn).await?; } if let Some(queries) = silo_admin_group_role_assignment_queries { let (delete_old_query, insert_new_query) = queries; - delete_old_query.execute(conn)?; - insert_new_query.execute(conn)?; + delete_old_query.execute_async(&conn).await?; + insert_new_query.execute_async(&conn).await?; } Ok(silo) diff --git a/nexus/src/db/datastore/silo_group.rs b/nexus/src/db/datastore/silo_group.rs index 63855b9012f..dc13d8d6882 100644 --- a/nexus/src/db/datastore/silo_group.rs +++ b/nexus/src/db/datastore/silo_group.rs @@ -126,13 +126,14 @@ impl DataStore { self.pool_authorized(opctx) .await? - .transaction(move |conn| { + .transaction_async(|conn| async move { use db::schema::silo_group_membership::dsl; // Delete existing memberships for user diesel::delete(dsl::silo_group_membership) .filter(dsl::silo_user_id.eq(silo_user_id)) - .execute(conn)?; + .execute_async(&conn) + .await?; // Create new memberships for user let silo_group_memberships: Vec< @@ -147,7 +148,8 @@ impl DataStore { diesel::insert_into(dsl::silo_group_membership) .values(silo_group_memberships) - .execute(conn)?; + .execute_async(&conn) + .await?; Ok(()) }) @@ -173,7 +175,7 @@ impl DataStore { self.pool_authorized(opctx) .await? - .transaction(move |conn| { + .transaction_async(|conn| async move { use db::schema::silo_group_membership; // Don't delete groups that still have memberships @@ -185,7 +187,8 @@ impl DataStore { ) .select(SiloGroupMembership::as_returning()) .limit(1) - .load(conn)?; + .load_async(&conn) + .await?; if !group_memberships.is_empty() { return Err(TxnError::CustomError( @@ -199,7 +202,8 @@ impl DataStore { .filter(dsl::id.eq(group_id)) .filter(dsl::time_deleted.is_null()) .set(dsl::time_deleted.eq(Utc::now())) - .execute(conn)?; + .execute_async(&conn) + .await?; Ok(()) }) diff --git a/nexus/src/db/datastore/snapshot.rs b/nexus/src/db/datastore/snapshot.rs index 3b02e631cfa..23c3f3d7453 100644 --- a/nexus/src/db/datastore/snapshot.rs +++ b/nexus/src/db/datastore/snapshot.rs @@ -160,7 +160,7 @@ impl DataStore { self.pool_authorized(&opctx) .await? - .transaction(move |conn| { + .transaction_async(|conn| async move { use db::schema::snapshot::dsl; diesel::update(dsl::snapshot) @@ -169,9 +169,10 @@ impl DataStore { .filter(dsl::id.eq(snapshot_id)) .set(dsl::time_deleted.eq(now)) .check_if_exists::(snapshot_id) - .execute(conn)?; + .execute_async(&conn) + .await?; - volume_delete_query.execute(conn)?; + volume_delete_query.execute_async(&conn).await?; Ok(()) }) diff --git a/nexus/src/db/datastore/vpc.rs b/nexus/src/db/datastore/vpc.rs index 946207f65a3..8b0f5abbfbb 100644 --- a/nexus/src/db/datastore/vpc.rs +++ b/nexus/src/db/datastore/vpc.rs @@ -10,7 +10,6 @@ use crate::context::OpContext; use crate::db; use crate::db::collection_insert::AsyncInsertError; use crate::db::collection_insert::DatastoreCollection; -use crate::db::collection_insert::SyncInsertError; use crate::db::error::diesel_pool_result_optional; use crate::db::error::public_error_from_diesel_pool; use crate::db::error::ErrorHandler; @@ -289,22 +288,23 @@ impl DataStore { // legibility of CTEs via diesel right now. self.pool_authorized(opctx) .await? - .transaction(move |conn| { - delete_old_query.execute(conn)?; + .transaction_async(|conn| async move { + delete_old_query.execute_async(&conn).await?; // The generation count update on the vpc table row will take a // write lock on the row, ensuring that the vpc was not deleted // concurently. - insert_new_query.insert_and_get_results(conn).map_err(|e| { - match e { - SyncInsertError::CollectionNotFound => { + insert_new_query + .insert_and_get_results_async(&conn) + .await + .map_err(|e| match e { + AsyncInsertError::CollectionNotFound => { TxnError::CustomError( FirewallUpdateError::CollectionNotFound, ) } - SyncInsertError::DatabaseError(e) => e.into(), - } - }) + AsyncInsertError::DatabaseError(e) => e.into(), + }) }) .await .map_err(|e| match e { diff --git a/nexus/src/db/error.rs b/nexus/src/db/error.rs index dfaa1922e66..348daf2737d 100644 --- a/nexus/src/db/error.rs +++ b/nexus/src/db/error.rs @@ -37,6 +37,14 @@ impl From for TransactionError { } } +// Maps a "connection error" into a "pool error", which +// is already contained within the error type. +impl From for TransactionError { + fn from(err: async_bb8_diesel::ConnectionError) -> Self { + Self::Pool(PoolError::Connection(err)) + } +} + impl From for TransactionError { fn from(err: PublicError) -> Self { TransactionError::CustomError(err) diff --git a/nexus/src/db/explain.rs b/nexus/src/db/explain.rs index 7a449cd390d..aaeea598f22 100644 --- a/nexus/src/db/explain.rs +++ b/nexus/src/db/explain.rs @@ -115,7 +115,7 @@ mod test { use super::*; use crate::db; - use async_bb8_diesel::{AsyncConnection, AsyncSimpleConnection}; + use async_bb8_diesel::AsyncSimpleConnection; use diesel::SelectableHelper; use expectorate::assert_contents; use nexus_test_utils::db::test_setup_database; @@ -160,40 +160,6 @@ mod test { .unwrap(); } - // Tests the ".explain()" method in a synchronous context. - // - // This is often done when calling from transactions, which we demonstrate. - #[tokio::test] - async fn test_explain() { - let logctx = dev::test_setup_log("test_explain"); - let mut db = test_setup_database(&logctx.log).await; - let cfg = db::Config { url: db.pg_config().clone() }; - let pool = db::Pool::new(&cfg); - - create_schema(&pool).await; - - use schema::test_users::dsl; - pool.pool() - .transaction( - move |conn| -> Result<(), db::error::TransactionError<()>> { - let explanation = dsl::test_users - .filter(dsl::id.eq(Uuid::nil())) - .select(User::as_select()) - .explain(conn) - .unwrap(); - assert_contents( - "tests/output/test-explain-output", - &explanation, - ); - Ok(()) - }, - ) - .await - .unwrap(); - db.cleanup().await.unwrap(); - logctx.cleanup_successful(); - } - // Tests the ".explain_async()" method in an asynchronous context. #[tokio::test] async fn test_explain_async() {