Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ path = "../rpaths"

[dependencies]
anyhow = "1.0"
async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "51de79fe02b334899be5d5fd8b469f9d140ea887" }
async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "7944dafc8a36dc6e20a1405eca59d04662de2bb7" }
async-trait = "0.1.56"
base64 = "0.13.0"
bb8 = "0.8.0"
Expand Down
54 changes: 17 additions & 37 deletions nexus/src/db/collection_attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use super::cte_utils::{
QueryFromClause, QuerySqlType, TableDefaultWhereClause,
};
use super::pool::DbConnection;
use async_bb8_diesel::{AsyncRunQueryDsl, ConnectionManager, PoolError};
use async_bb8_diesel::{AsyncRunQueryDsl, PoolError};
use diesel::associations::HasTable;
use diesel::expression::{AsExpression, Expression};
use diesel::helper_types::*;
Expand Down Expand Up @@ -301,12 +301,6 @@ where
pub type AsyncAttachToCollectionResult<ResourceType, C> =
Result<(C, ResourceType), AttachError<ResourceType, C, PoolError>>;

/// Result of [`AttachToCollectionStatement`] when executed synchronously
pub type SyncAttachToCollectionResult<ResourceType, C> = Result<
(C, ResourceType),
AttachError<ResourceType, C, diesel::result::Error>,
>;

/// Errors returned by [`AttachToCollectionStatement`].
#[derive(Debug)]
pub enum AttachError<ResourceType, C, E> {
Expand Down Expand Up @@ -338,9 +332,10 @@ where
AttachToCollectionStatement<ResourceType, V, C>: Send,
{
/// Issues the CTE asynchronously and parses the result.
pub async fn attach_and_get_result_async(
pub async fn attach_and_get_result_async<ConnErr>(
self,
pool: &bb8::Pool<ConnectionManager<DbConnection>>,
conn: &(impl async_bb8_diesel::AsyncConnection<DbConnection, ConnErr>
+ Sync),
Comment on lines +337 to +338
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically all of our non-transaction code acts on a "bb8::Pool". However, the transaction_async method expects to act on an async_bb8_diesel::Connection.

Fortunately, async_bb8_diesel::AsyncConnection is implemented on both objects, so if we operate on that interface, we can interop with either the pool or a connection object in a transaction context. This pattern of impl AsyncConnection is used in a few spots in this patch, and is how I expect we could make code "generic to being run in a txn" in the future.

Unfortunately, the errors are different - we only propagate "pool-related errors" if you're actually acting on a pool - so that is made generic here too (see: ConnErr). If being generic over errors is only a pain, and not particularly useful, we could consider just propagating a PoolError everywhere (it's a superset of the other errors that could be returned from async_bb8_diesel).

) -> AsyncAttachToCollectionResult<ResourceType, C>
where
// We require this bound to ensure that "Self" is runnable as query.
Expand All @@ -349,33 +344,17 @@ where
DbConnection,
RawOutput<ResourceType, C>,
>,
ConnErr: From<diesel::result::Error> + Send + 'static,
PoolError: From<ConnErr>,
{
self.get_result_async::<RawOutput<ResourceType, C>>(pool)
self.get_result_async::<RawOutput<ResourceType, C>>(conn)
.await
// If the database returns an error, propagate it right away.
.map_err(AttachError::DatabaseError)
.map_err(|e| AttachError::DatabaseError(PoolError::from(e)))
// Otherwise, parse the output to determine if the CTE succeeded.
.and_then(Self::parse_result)
}

/// Issues the CTE synchronously and parses the result.
pub fn attach_and_get_result(
self,
conn: &mut DbConnection,
) -> SyncAttachToCollectionResult<ResourceType, C>
where
// We require this bound to ensure that "Self" is runnable as query.
Self: query_methods::LoadQuery<
'static,
DbConnection,
RawOutput<ResourceType, C>,
>,
{
self.get_result::<RawOutput<ResourceType, C>>(conn)
.map_err(AttachError::DatabaseError)
.and_then(Self::parse_result)
}

fn parse_result<E>(
result: RawOutput<ResourceType, C>,
) -> Result<(C, ResourceType), AttachError<ResourceType, C, E>> {
Expand Down Expand Up @@ -1012,16 +991,17 @@ mod test {
.set(resource::dsl::collection_id.eq(collection_id)),
);

type TxnError = TransactionError<
AttachError<Resource, Collection, diesel::result::Error>,
>;
type TxnError =
TransactionError<AttachError<Resource, Collection, PoolError>>;
let result = pool
.pool()
.transaction(move |conn| {
attach_query.attach_and_get_result(conn).map_err(|e| match e {
AttachError::DatabaseError(e) => TxnError::from(e),
e => TxnError::CustomError(e),
})
.transaction_async(|conn| async move {
attach_query.attach_and_get_result_async(&conn).await.map_err(
|e| match e {
AttachError::DatabaseError(e) => TxnError::from(e),
e => TxnError::CustomError(e),
},
)
})
.await;

Expand Down
88 changes: 6 additions & 82 deletions nexus/src/db/collection_detach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::cte_utils::{
QueryFromClause, QuerySqlType,
};
use super::pool::DbConnection;
use async_bb8_diesel::{AsyncRunQueryDsl, ConnectionManager, PoolError};
use async_bb8_diesel::{AsyncRunQueryDsl, PoolError};
use diesel::associations::HasTable;
use diesel::expression::{AsExpression, Expression};
use diesel::helper_types::*;
Expand Down Expand Up @@ -232,10 +232,6 @@ where
pub type AsyncDetachFromCollectionResult<ResourceType, C> =
Result<ResourceType, DetachError<ResourceType, C, PoolError>>;

/// Result of [`DetachFromCollectionStatement`] when executed synchronously
pub type SyncDetachFromCollectionResult<ResourceType, C> =
Result<ResourceType, DetachError<ResourceType, C, diesel::result::Error>>;

/// Errors returned by [`DetachFromCollectionStatement`].
#[derive(Debug)]
pub enum DetachError<ResourceType, C, E> {
Expand Down Expand Up @@ -269,7 +265,8 @@ where
/// Issues the CTE asynchronously and parses the result.
pub async fn detach_and_get_result_async(
self,
pool: &bb8::Pool<ConnectionManager<DbConnection>>,
conn: &(impl async_bb8_diesel::AsyncConnection<DbConnection, PoolError>
+ Sync),
) -> AsyncDetachFromCollectionResult<ResourceType, C>
where
// We require this bound to ensure that "Self" is runnable as query.
Expand All @@ -279,32 +276,14 @@ where
RawOutput<ResourceType, C>,
>,
{
self.get_result_async::<RawOutput<ResourceType, C>>(pool)
self.get_result_async::<RawOutput<ResourceType, C>>(conn)
.await
// If the database returns an error, propagate it right away.
.map_err(DetachError::DatabaseError)
// Otherwise, parse the output to determine if the CTE succeeded.
.and_then(Self::parse_result)
}

/// Issues the CTE synchronously and parses the result.
pub fn detach_and_get_result(
self,
conn: &mut DbConnection,
) -> SyncDetachFromCollectionResult<ResourceType, C>
where
// We require this bound to ensure that "Self" is runnable as query.
Self: query_methods::LoadQuery<
'static,
DbConnection,
RawOutput<ResourceType, C>,
>,
{
self.get_result::<RawOutput<ResourceType, C>>(conn)
.map_err(DetachError::DatabaseError)
.and_then(Self::parse_result)
}

fn parse_result<E>(
result: RawOutput<ResourceType, C>,
) -> Result<ResourceType, DetachError<ResourceType, C, E>> {
Expand Down Expand Up @@ -502,12 +481,8 @@ where
mod test {
use super::*;
use crate::db::collection_attach::DatastoreAttachTarget;
use crate::db::{
self, error::TransactionError, identity::Resource as IdentityResource,
};
use async_bb8_diesel::{
AsyncConnection, AsyncRunQueryDsl, AsyncSimpleConnection,
};
use crate::db::{self, identity::Resource as IdentityResource};
use async_bb8_diesel::{AsyncRunQueryDsl, AsyncSimpleConnection};
use chrono::Utc;
use db_macros::Resource;
use diesel::expression_methods::ExpressionMethods;
Expand Down Expand Up @@ -899,57 +874,6 @@ mod test {
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_detach_once_synchronous() {
let logctx = dev::test_setup_log("test_detach_once_synchronous");
let mut db = test_setup_database(&logctx.log).await;
let cfg = db::Config { url: db.pg_config().clone() };
let pool = db::Pool::new(&cfg);

setup_db(&pool).await;

let collection_id = uuid::Uuid::new_v4();
let resource_id = uuid::Uuid::new_v4();

// Create the collection and resource.
let _collection =
insert_collection(collection_id, "collection", &pool).await;
let _resource = insert_resource(resource_id, "resource", &pool).await;
attach_resource(collection_id, resource_id, &pool).await;

// Detach the resource from the collection.
let detach_query = Collection::detach_resource(
collection_id,
resource_id,
collection::table.into_boxed(),
resource::table.into_boxed(),
diesel::update(resource::table)
.set(resource::dsl::collection_id.eq(Option::<Uuid>::None)),
);

type TxnError = TransactionError<
DetachError<Resource, Collection, diesel::result::Error>,
>;
let result = pool
.pool()
.transaction(move |conn| {
detach_query.detach_and_get_result(conn).map_err(|e| match e {
DetachError::DatabaseError(e) => TxnError::from(e),
e => TxnError::CustomError(e),
})
})
.await;

// "detach_and_get_result" should return the "detached" resource.
let returned_resource = result.expect("Detach should have worked");
assert!(returned_resource.collection_id.is_none());
// The returned values should be the latest value in the DB.
assert_eq!(returned_resource, get_resource(resource_id, &pool).await);

db.cleanup().await.unwrap();
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_detach_while_already_detached() {
let logctx = dev::test_setup_log("test_detach_while_already_detached");
Expand Down
48 changes: 17 additions & 31 deletions nexus/src/db/collection_detach_many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::cte_utils::{
QueryFromClause, QuerySqlType,
};
use super::pool::DbConnection;
use async_bb8_diesel::{AsyncRunQueryDsl, ConnectionManager, PoolError};
use async_bb8_diesel::{AsyncRunQueryDsl, PoolError};
use diesel::associations::HasTable;
use diesel::expression::{AsExpression, Expression};
use diesel::helper_types::*;
Expand Down Expand Up @@ -243,10 +243,6 @@ where
pub type AsyncDetachManyFromCollectionResult<C> =
Result<C, DetachManyError<C, PoolError>>;

/// Result of [`DetachManyFromCollectionStatement`] when executed synchronously
pub type SyncDetachManyFromCollectionResult<C> =
Result<C, DetachManyError<C, diesel::result::Error>>;

/// Errors returned by [`DetachManyFromCollectionStatement`].
#[derive(Debug)]
pub enum DetachManyError<C, E> {
Expand Down Expand Up @@ -277,36 +273,25 @@ where
DetachManyFromCollectionStatement<ResourceType, VC, VR, C>: Send,
{
/// Issues the CTE asynchronously and parses the result.
pub async fn detach_and_get_result_async(
pub async fn detach_and_get_result_async<ConnErr>(
self,
pool: &bb8::Pool<ConnectionManager<DbConnection>>,
conn: &(impl async_bb8_diesel::AsyncConnection<DbConnection, ConnErr>
+ Sync),
) -> AsyncDetachManyFromCollectionResult<C>
where
// We require this bound to ensure that "Self" is runnable as query.
Self: query_methods::LoadQuery<'static, DbConnection, RawOutput<C>>,
ConnErr: From<diesel::result::Error> + Send + 'static,
PoolError: From<ConnErr>,
{
self.get_result_async::<RawOutput<C>>(pool)
self.get_result_async::<RawOutput<C>>(conn)
.await
// If the database returns an error, propagate it right away.
.map_err(DetachManyError::DatabaseError)
.map_err(|e| DetachManyError::DatabaseError(PoolError::from(e)))
// Otherwise, parse the output to determine if the CTE succeeded.
.and_then(Self::parse_result)
}

/// Issues the CTE synchronously and parses the result.
pub fn detach_and_get_result(
self,
conn: &mut DbConnection,
) -> SyncDetachManyFromCollectionResult<C>
where
// We require this bound to ensure that "Self" is runnable as query.
Self: query_methods::LoadQuery<'static, DbConnection, RawOutput<C>>,
{
self.get_result::<RawOutput<C>>(conn)
.map_err(DetachManyError::DatabaseError)
.and_then(Self::parse_result)
}

fn parse_result<E>(
result: RawOutput<C>,
) -> Result<C, DetachManyError<C, E>> {
Expand Down Expand Up @@ -926,16 +911,17 @@ mod test {
.set(resource::dsl::collection_id.eq(Option::<Uuid>::None)),
);

type TxnError = TransactionError<
DetachManyError<Collection, diesel::result::Error>,
>;
type TxnError =
TransactionError<DetachManyError<Collection, PoolError>>;
let result = pool
.pool()
.transaction(move |conn| {
detach_query.detach_and_get_result(conn).map_err(|e| match e {
DetachManyError::DatabaseError(e) => TxnError::from(e),
e => TxnError::CustomError(e),
})
.transaction_async(|conn| async move {
detach_query.detach_and_get_result_async(&conn).await.map_err(
|e| match e {
DetachManyError::DatabaseError(e) => TxnError::from(e),
e => TxnError::CustomError(e),
},
)
})
.await;

Expand Down
Loading