From ab1d1435ba558c2bfe466bcbabc6f0d0a6d62bba Mon Sep 17 00:00:00 2001 From: Gleb Pomykalov Date: Tue, 2 Jul 2024 23:17:40 +0200 Subject: [PATCH 1/4] feat: add begin_custom --- sqlx-core/src/acquire.rs | 22 ++++++++++++++++++++ sqlx-core/src/any/connection/backend.rs | 6 ++++++ sqlx-core/src/any/transaction.rs | 7 +++++++ sqlx-core/src/pool/connection.rs | 7 +++++++ sqlx-core/src/transaction.rs | 27 +++++++++++++++++++++++++ sqlx-mysql/src/any.rs | 5 +++++ sqlx-mysql/src/transaction.rs | 14 ++++++++++++- sqlx-postgres/src/any.rs | 5 +++++ sqlx-postgres/src/transaction.rs | 16 ++++++++++++++- sqlx-sqlite/src/any.rs | 5 +++++ sqlx-sqlite/src/transaction.rs | 7 ++++++- 11 files changed, 118 insertions(+), 3 deletions(-) diff --git a/sqlx-core/src/acquire.rs b/sqlx-core/src/acquire.rs index c9d7fb215c..51d83f4d05 100644 --- a/sqlx-core/src/acquire.rs +++ b/sqlx-core/src/acquire.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use crate::database::Database; use crate::error::Error; use crate::pool::{MaybePoolConnection, Pool, PoolConnection}; @@ -78,6 +79,8 @@ pub trait Acquire<'c> { fn acquire(self) -> BoxFuture<'c, Result>; fn begin(self) -> BoxFuture<'c, Result, Error>>; + + fn begin_custom(self, sql: Cow<'static, str>) -> BoxFuture<'c, Result, Error>>; } impl<'a, DB: Database> Acquire<'a> for &'_ Pool { @@ -96,6 +99,14 @@ impl<'a, DB: Database> Acquire<'a> for &'_ Pool { Transaction::begin(MaybePoolConnection::PoolConnection(conn.await?)).await }) } + + fn begin_custom(self, sql: Cow<'static, str>) -> BoxFuture<'a, Result, Error>> { + let conn = self.acquire(); + + Box::pin(async move { + Transaction::begin_custom(MaybePoolConnection::PoolConnection(conn.await?), sql).await + }) + } } #[macro_export] @@ -123,6 +134,17 @@ macro_rules! impl_acquire { > { $crate::transaction::Transaction::begin(self) } + + #[inline] + fn begin_custom( + self, + stmt: std::borrow::Cow<'static, str>, + ) -> futures_core::future::BoxFuture< + 'c, + Result<$crate::transaction::Transaction<'c, $DB>, $crate::error::Error>, + > { + $crate::transaction::Transaction::begin_custom(self, stmt) + } } }; } diff --git a/sqlx-core/src/any/connection/backend.rs b/sqlx-core/src/any/connection/backend.rs index 8dff4ba29f..59f15cf1a4 100644 --- a/sqlx-core/src/any/connection/backend.rs +++ b/sqlx-core/src/any/connection/backend.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use crate::any::{Any, AnyArguments, AnyQueryResult, AnyRow, AnyStatement, AnyTypeInfo}; use crate::describe::Describe; use either::Either; @@ -30,6 +31,11 @@ pub trait AnyConnectionBackend: std::any::Any + Debug + Send + 'static { /// Returns a [`Transaction`] for controlling and tracking the new transaction. fn begin(&mut self) -> BoxFuture<'_, crate::Result<()>>; + /// Begin a new transaction or establish a savepoint within the active transaction. + /// + /// Returns a [`Transaction`] for controlling and tracking the new transaction. + fn begin_custom(&mut self, sql: Cow<'static, str>) -> BoxFuture<'_, crate::Result<()>>; + fn commit(&mut self) -> BoxFuture<'_, crate::Result<()>>; fn rollback(&mut self) -> BoxFuture<'_, crate::Result<()>>; diff --git a/sqlx-core/src/any/transaction.rs b/sqlx-core/src/any/transaction.rs index fce4175626..9f854cf288 100644 --- a/sqlx-core/src/any/transaction.rs +++ b/sqlx-core/src/any/transaction.rs @@ -1,6 +1,8 @@ +use std::borrow::Cow; use futures_util::future::BoxFuture; use crate::any::{Any, AnyConnection}; +use crate::database::Database; use crate::error::Error; use crate::transaction::TransactionManager; @@ -13,6 +15,11 @@ impl TransactionManager for AnyTransactionManager { conn.backend.begin() } + fn begin_custom<'a>(conn: &'a mut ::Connection, sql: Cow<'static, str>) -> BoxFuture<'a, Result<(), Error>> { + conn.backend.begin_custom(sql) + } + + fn commit(conn: &mut AnyConnection) -> BoxFuture<'_, Result<(), Error>> { conn.backend.commit() } diff --git a/sqlx-core/src/pool/connection.rs b/sqlx-core/src/pool/connection.rs index c1e163c704..c26973c222 100644 --- a/sqlx-core/src/pool/connection.rs +++ b/sqlx-core/src/pool/connection.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::fmt::{self, Debug, Formatter}; use std::ops::{Deref, DerefMut}; use std::sync::Arc; @@ -12,6 +13,8 @@ use crate::error::Error; use super::inner::{is_beyond_max_lifetime, DecrementSizeGuard, PoolInner}; use crate::pool::options::PoolConnectionMetadata; use std::future::Future; +use futures_core::future::BoxFuture; +use crate::transaction::Transaction; /// A connection managed by a [`Pool`][crate::pool::Pool]. /// @@ -159,6 +162,10 @@ impl<'c, DB: Database> crate::acquire::Acquire<'c> for &'c mut PoolConnection) -> BoxFuture<'c, Result, Error>> { + crate::transaction::Transaction::begin_custom(&mut **self, sql) + } } /// Returns the connection to the [`Pool`][crate::pool::Pool] it was checked-out from. diff --git a/sqlx-core/src/transaction.rs b/sqlx-core/src/transaction.rs index 0516b6adc3..afea1f549b 100644 --- a/sqlx-core/src/transaction.rs +++ b/sqlx-core/src/transaction.rs @@ -20,6 +20,11 @@ pub trait TransactionManager { conn: &mut ::Connection, ) -> BoxFuture<'_, Result<(), Error>>; + fn begin_custom<'a>( + conn: &'a mut ::Connection, + sql: Cow<'static, str>, + ) -> BoxFuture<'a, Result<(), Error>>; + /// Commit the active transaction or release the most recent savepoint. fn commit( conn: &mut ::Connection, @@ -78,6 +83,24 @@ where }) } + #[doc(hidden)] + pub fn begin_custom( + conn: impl Into>, + sql: Cow<'static, str> + ) -> BoxFuture<'c, Result> { + let mut conn = conn.into(); + + Box::pin(async move { + DB::TransactionManager::begin_custom(&mut conn, sql).await?; + + Ok(Self { + connection: conn, + open: true, + }) + }) + } + + /// Commits this transaction or savepoint. pub async fn commit(mut self) -> Result<(), Error> { DB::TransactionManager::commit(&mut self.connection).await?; @@ -221,6 +244,10 @@ impl<'c, 't, DB: Database> crate::acquire::Acquire<'t> for &'t mut Transaction<' fn begin(self) -> BoxFuture<'t, Result, Error>> { Transaction::begin(&mut **self) } + + fn begin_custom(self, sql: Cow<'static, str>) -> BoxFuture<'t, Result, Error>> { + Transaction::begin_custom(&mut **self, sql) + } } impl<'c, DB> Drop for Transaction<'c, DB> diff --git a/sqlx-mysql/src/any.rs b/sqlx-mysql/src/any.rs index 404c408a9e..933a07ae0a 100644 --- a/sqlx-mysql/src/any.rs +++ b/sqlx-mysql/src/any.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use crate::protocol::text::ColumnType; use crate::{ MySql, MySqlColumn, MySqlConnectOptions, MySqlConnection, MySqlQueryResult, MySqlRow, @@ -40,6 +41,10 @@ impl AnyConnectionBackend for MySqlConnection { MySqlTransactionManager::begin(self) } + fn begin_custom(&mut self, sql: Cow<'static, str>) -> BoxFuture<'_, sqlx_core::Result<()>> { + MySqlTransactionManager::begin_custom(self, sql) + } + fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { MySqlTransactionManager::commit(self) } diff --git a/sqlx-mysql/src/transaction.rs b/sqlx-mysql/src/transaction.rs index 731bdb5750..ede11b8315 100644 --- a/sqlx-mysql/src/transaction.rs +++ b/sqlx-mysql/src/transaction.rs @@ -1,5 +1,6 @@ +use std::borrow::Cow; use futures_core::future::BoxFuture; - +use sqlx_core::database::Database; use crate::connection::Waiting; use crate::error::Error; use crate::executor::Executor; @@ -25,6 +26,17 @@ impl TransactionManager for MySqlTransactionManager { }) } + fn begin_custom<'a>(conn: &'a mut ::Connection, sql: Cow<'static, str>) -> BoxFuture<'a, Result<(), Error>> { + Box::pin(async move { + let depth = conn.transaction_depth; + + conn.execute(&*sql).await?; + conn.transaction_depth = depth + 1; + + Ok(()) + }) + } + fn commit(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> { Box::pin(async move { let depth = conn.transaction_depth; diff --git a/sqlx-postgres/src/any.rs b/sqlx-postgres/src/any.rs index 542e7f007b..80a738f7e9 100644 --- a/sqlx-postgres/src/any.rs +++ b/sqlx-postgres/src/any.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use crate::{ Either, PgColumn, PgConnectOptions, PgConnection, PgQueryResult, PgRow, PgTransactionManager, PgTypeInfo, Postgres, @@ -39,6 +40,10 @@ impl AnyConnectionBackend for PgConnection { PgTransactionManager::begin(self) } + fn begin_custom(&mut self, sql: Cow<'static, str>) -> BoxFuture<'_, sqlx_core::Result<()>> { + PgTransactionManager::begin_custom(self, sql) + } + fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { PgTransactionManager::commit(self) } diff --git a/sqlx-postgres/src/transaction.rs b/sqlx-postgres/src/transaction.rs index 02028624e1..b8cb140ed7 100644 --- a/sqlx-postgres/src/transaction.rs +++ b/sqlx-postgres/src/transaction.rs @@ -1,5 +1,6 @@ +use std::borrow::Cow; use futures_core::future::BoxFuture; - +use sqlx_core::database::Database; use crate::error::Error; use crate::executor::Executor; @@ -26,6 +27,19 @@ impl TransactionManager for PgTransactionManager { }) } + fn begin_custom<'a>(conn: &'a mut ::Connection, sql: Cow<'static, str>) -> BoxFuture<'a, Result<(), Error>> { + Box::pin(async move { + let rollback = Rollback::new(conn); + rollback.conn.queue_simple_query(&sql); + rollback.conn.transaction_depth += 1; + rollback.conn.wait_until_ready().await?; + rollback.defuse(); + + Ok(()) + }) + } + + fn commit(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> { Box::pin(async move { if conn.transaction_depth > 0 { diff --git a/sqlx-sqlite/src/any.rs b/sqlx-sqlite/src/any.rs index dc84f66598..590a43e9ee 100644 --- a/sqlx-sqlite/src/any.rs +++ b/sqlx-sqlite/src/any.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use crate::{ Either, Sqlite, SqliteArgumentValue, SqliteArguments, SqliteColumn, SqliteConnectOptions, SqliteConnection, SqliteQueryResult, SqliteRow, SqliteTransactionManager, SqliteTypeInfo, @@ -41,6 +42,10 @@ impl AnyConnectionBackend for SqliteConnection { SqliteTransactionManager::begin(self) } + fn begin_custom(&mut self, sql: Cow<'static, str>) -> BoxFuture<'_, sqlx_core::Result<()>> { + todo!() + } + fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { SqliteTransactionManager::commit(self) } diff --git a/sqlx-sqlite/src/transaction.rs b/sqlx-sqlite/src/transaction.rs index 24eaca51b1..ffebd09096 100644 --- a/sqlx-sqlite/src/transaction.rs +++ b/sqlx-sqlite/src/transaction.rs @@ -1,5 +1,6 @@ +use std::borrow::Cow; use futures_core::future::BoxFuture; - +use sqlx_core::database::Database; use crate::{Sqlite, SqliteConnection}; use sqlx_core::error::Error; use sqlx_core::transaction::TransactionManager; @@ -14,6 +15,10 @@ impl TransactionManager for SqliteTransactionManager { Box::pin(conn.worker.begin()) } + fn begin_custom<'a>(_conn: &'a mut ::Connection, sql: Cow<'static, str>) -> BoxFuture<'a, Result<(), Error>> { + unimplemented!() + } + fn commit(conn: &mut SqliteConnection) -> BoxFuture<'_, Result<(), Error>> { Box::pin(conn.worker.commit()) } From 44e53bdc7c157780537932816f1098d858650bc1 Mon Sep 17 00:00:00 2001 From: Gleb Pomykalov Date: Tue, 9 Jul 2024 20:42:31 +0200 Subject: [PATCH 2/4] begin_custom -> begin_with --- sqlx-core/src/acquire.rs | 10 +++++----- sqlx-core/src/any/connection/backend.rs | 2 +- sqlx-core/src/any/transaction.rs | 4 ++-- sqlx-core/src/pool/connection.rs | 4 ++-- sqlx-core/src/transaction.rs | 10 +++++----- sqlx-mysql/src/any.rs | 4 ++-- sqlx-mysql/src/transaction.rs | 2 +- sqlx-postgres/src/any.rs | 4 ++-- sqlx-postgres/src/transaction.rs | 2 +- sqlx-sqlite/src/any.rs | 2 +- sqlx-sqlite/src/transaction.rs | 2 +- 11 files changed, 23 insertions(+), 23 deletions(-) diff --git a/sqlx-core/src/acquire.rs b/sqlx-core/src/acquire.rs index 51d83f4d05..dc71c29e4b 100644 --- a/sqlx-core/src/acquire.rs +++ b/sqlx-core/src/acquire.rs @@ -80,7 +80,7 @@ pub trait Acquire<'c> { fn begin(self) -> BoxFuture<'c, Result, Error>>; - fn begin_custom(self, sql: Cow<'static, str>) -> BoxFuture<'c, Result, Error>>; + fn begin_with(self, sql: Cow<'static, str>) -> BoxFuture<'c, Result, Error>>; } impl<'a, DB: Database> Acquire<'a> for &'_ Pool { @@ -100,11 +100,11 @@ impl<'a, DB: Database> Acquire<'a> for &'_ Pool { }) } - fn begin_custom(self, sql: Cow<'static, str>) -> BoxFuture<'a, Result, Error>> { + fn begin_with(self, sql: Cow<'static, str>) -> BoxFuture<'a, Result, Error>> { let conn = self.acquire(); Box::pin(async move { - Transaction::begin_custom(MaybePoolConnection::PoolConnection(conn.await?), sql).await + Transaction::begin_with(MaybePoolConnection::PoolConnection(conn.await?), sql).await }) } } @@ -136,14 +136,14 @@ macro_rules! impl_acquire { } #[inline] - fn begin_custom( + fn begin_with( self, stmt: std::borrow::Cow<'static, str>, ) -> futures_core::future::BoxFuture< 'c, Result<$crate::transaction::Transaction<'c, $DB>, $crate::error::Error>, > { - $crate::transaction::Transaction::begin_custom(self, stmt) + $crate::transaction::Transaction::begin_with(self, stmt) } } }; diff --git a/sqlx-core/src/any/connection/backend.rs b/sqlx-core/src/any/connection/backend.rs index 59f15cf1a4..0410fd262a 100644 --- a/sqlx-core/src/any/connection/backend.rs +++ b/sqlx-core/src/any/connection/backend.rs @@ -34,7 +34,7 @@ pub trait AnyConnectionBackend: std::any::Any + Debug + Send + 'static { /// Begin a new transaction or establish a savepoint within the active transaction. /// /// Returns a [`Transaction`] for controlling and tracking the new transaction. - fn begin_custom(&mut self, sql: Cow<'static, str>) -> BoxFuture<'_, crate::Result<()>>; + fn begin_with(&mut self, sql: Cow<'static, str>) -> BoxFuture<'_, crate::Result<()>>; fn commit(&mut self) -> BoxFuture<'_, crate::Result<()>>; diff --git a/sqlx-core/src/any/transaction.rs b/sqlx-core/src/any/transaction.rs index 9f854cf288..90c0c337ac 100644 --- a/sqlx-core/src/any/transaction.rs +++ b/sqlx-core/src/any/transaction.rs @@ -15,8 +15,8 @@ impl TransactionManager for AnyTransactionManager { conn.backend.begin() } - fn begin_custom<'a>(conn: &'a mut ::Connection, sql: Cow<'static, str>) -> BoxFuture<'a, Result<(), Error>> { - conn.backend.begin_custom(sql) + fn begin_with<'a>(conn: &'a mut ::Connection, sql: Cow<'static, str>) -> BoxFuture<'a, Result<(), Error>> { + conn.backend.begin_with(sql) } diff --git a/sqlx-core/src/pool/connection.rs b/sqlx-core/src/pool/connection.rs index c26973c222..4bc929f632 100644 --- a/sqlx-core/src/pool/connection.rs +++ b/sqlx-core/src/pool/connection.rs @@ -163,8 +163,8 @@ impl<'c, DB: Database> crate::acquire::Acquire<'c> for &'c mut PoolConnection) -> BoxFuture<'c, Result, Error>> { - crate::transaction::Transaction::begin_custom(&mut **self, sql) + fn begin_with(self, sql: Cow<'static, str>) -> BoxFuture<'c, Result, Error>> { + crate::transaction::Transaction::begin_with(&mut **self, sql) } } diff --git a/sqlx-core/src/transaction.rs b/sqlx-core/src/transaction.rs index afea1f549b..625b8366d9 100644 --- a/sqlx-core/src/transaction.rs +++ b/sqlx-core/src/transaction.rs @@ -20,7 +20,7 @@ pub trait TransactionManager { conn: &mut ::Connection, ) -> BoxFuture<'_, Result<(), Error>>; - fn begin_custom<'a>( + fn begin_with<'a>( conn: &'a mut ::Connection, sql: Cow<'static, str>, ) -> BoxFuture<'a, Result<(), Error>>; @@ -84,14 +84,14 @@ where } #[doc(hidden)] - pub fn begin_custom( + pub fn begin_with( conn: impl Into>, sql: Cow<'static, str> ) -> BoxFuture<'c, Result> { let mut conn = conn.into(); Box::pin(async move { - DB::TransactionManager::begin_custom(&mut conn, sql).await?; + DB::TransactionManager::begin_with(&mut conn, sql).await?; Ok(Self { connection: conn, @@ -245,8 +245,8 @@ impl<'c, 't, DB: Database> crate::acquire::Acquire<'t> for &'t mut Transaction<' Transaction::begin(&mut **self) } - fn begin_custom(self, sql: Cow<'static, str>) -> BoxFuture<'t, Result, Error>> { - Transaction::begin_custom(&mut **self, sql) + fn begin_with(self, sql: Cow<'static, str>) -> BoxFuture<'t, Result, Error>> { + Transaction::begin_with(&mut **self, sql) } } diff --git a/sqlx-mysql/src/any.rs b/sqlx-mysql/src/any.rs index 933a07ae0a..fed8969e3e 100644 --- a/sqlx-mysql/src/any.rs +++ b/sqlx-mysql/src/any.rs @@ -41,8 +41,8 @@ impl AnyConnectionBackend for MySqlConnection { MySqlTransactionManager::begin(self) } - fn begin_custom(&mut self, sql: Cow<'static, str>) -> BoxFuture<'_, sqlx_core::Result<()>> { - MySqlTransactionManager::begin_custom(self, sql) + fn begin_with(&mut self, sql: Cow<'static, str>) -> BoxFuture<'_, sqlx_core::Result<()>> { + MySqlTransactionManager::begin_with(self, sql) } fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { diff --git a/sqlx-mysql/src/transaction.rs b/sqlx-mysql/src/transaction.rs index ede11b8315..e19636406c 100644 --- a/sqlx-mysql/src/transaction.rs +++ b/sqlx-mysql/src/transaction.rs @@ -26,7 +26,7 @@ impl TransactionManager for MySqlTransactionManager { }) } - fn begin_custom<'a>(conn: &'a mut ::Connection, sql: Cow<'static, str>) -> BoxFuture<'a, Result<(), Error>> { + fn begin_with<'a>(conn: &'a mut ::Connection, sql: Cow<'static, str>) -> BoxFuture<'a, Result<(), Error>> { Box::pin(async move { let depth = conn.transaction_depth; diff --git a/sqlx-postgres/src/any.rs b/sqlx-postgres/src/any.rs index 80a738f7e9..aac3d66176 100644 --- a/sqlx-postgres/src/any.rs +++ b/sqlx-postgres/src/any.rs @@ -40,8 +40,8 @@ impl AnyConnectionBackend for PgConnection { PgTransactionManager::begin(self) } - fn begin_custom(&mut self, sql: Cow<'static, str>) -> BoxFuture<'_, sqlx_core::Result<()>> { - PgTransactionManager::begin_custom(self, sql) + fn begin_with(&mut self, sql: Cow<'static, str>) -> BoxFuture<'_, sqlx_core::Result<()>> { + PgTransactionManager::begin_with(self, sql) } fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { diff --git a/sqlx-postgres/src/transaction.rs b/sqlx-postgres/src/transaction.rs index b8cb140ed7..ebcbb976d2 100644 --- a/sqlx-postgres/src/transaction.rs +++ b/sqlx-postgres/src/transaction.rs @@ -27,7 +27,7 @@ impl TransactionManager for PgTransactionManager { }) } - fn begin_custom<'a>(conn: &'a mut ::Connection, sql: Cow<'static, str>) -> BoxFuture<'a, Result<(), Error>> { + fn begin_with<'a>(conn: &'a mut ::Connection, sql: Cow<'static, str>) -> BoxFuture<'a, Result<(), Error>> { Box::pin(async move { let rollback = Rollback::new(conn); rollback.conn.queue_simple_query(&sql); diff --git a/sqlx-sqlite/src/any.rs b/sqlx-sqlite/src/any.rs index 590a43e9ee..a79df52d3d 100644 --- a/sqlx-sqlite/src/any.rs +++ b/sqlx-sqlite/src/any.rs @@ -42,7 +42,7 @@ impl AnyConnectionBackend for SqliteConnection { SqliteTransactionManager::begin(self) } - fn begin_custom(&mut self, sql: Cow<'static, str>) -> BoxFuture<'_, sqlx_core::Result<()>> { + fn begin_with(&mut self, sql: Cow<'static, str>) -> BoxFuture<'_, sqlx_core::Result<()>> { todo!() } diff --git a/sqlx-sqlite/src/transaction.rs b/sqlx-sqlite/src/transaction.rs index ffebd09096..7eab1d1554 100644 --- a/sqlx-sqlite/src/transaction.rs +++ b/sqlx-sqlite/src/transaction.rs @@ -15,7 +15,7 @@ impl TransactionManager for SqliteTransactionManager { Box::pin(conn.worker.begin()) } - fn begin_custom<'a>(_conn: &'a mut ::Connection, sql: Cow<'static, str>) -> BoxFuture<'a, Result<(), Error>> { + fn begin_with<'a>(_conn: &'a mut ::Connection, sql: Cow<'static, str>) -> BoxFuture<'a, Result<(), Error>> { unimplemented!() } From 019f194cd23e4b9773e35bf479412a4f1acf9115 Mon Sep 17 00:00:00 2001 From: Gleb Pomykalov Date: Tue, 9 Jul 2024 20:58:00 +0200 Subject: [PATCH 3/4] wip --- sqlx-core/src/acquire.rs | 20 ++++++++++++++------ sqlx-core/src/any/transaction.rs | 13 +++++++++---- sqlx-core/src/pool/connection.rs | 9 ++++++--- sqlx-core/src/transaction.rs | 23 +++++++++++++++-------- sqlx-mysql/src/transaction.rs | 16 +++++++++++----- sqlx-postgres/src/transaction.rs | 17 +++++++++++------ sqlx-sqlite/src/transaction.rs | 12 +++++++++--- 7 files changed, 75 insertions(+), 35 deletions(-) diff --git a/sqlx-core/src/acquire.rs b/sqlx-core/src/acquire.rs index dc71c29e4b..59bfd822f2 100644 --- a/sqlx-core/src/acquire.rs +++ b/sqlx-core/src/acquire.rs @@ -1,7 +1,7 @@ -use std::borrow::Cow; use crate::database::Database; use crate::error::Error; use crate::pool::{MaybePoolConnection, Pool, PoolConnection}; +use std::borrow::Cow; use crate::transaction::Transaction; use futures_core::future::BoxFuture; @@ -80,7 +80,9 @@ pub trait Acquire<'c> { fn begin(self) -> BoxFuture<'c, Result, Error>>; - fn begin_with(self, sql: Cow<'static, str>) -> BoxFuture<'c, Result, Error>>; + fn begin_with(self, sql: S) -> BoxFuture<'c, Result, Error>> + where + S: Into> + Send + 'c; } impl<'a, DB: Database> Acquire<'a> for &'_ Pool { @@ -100,7 +102,10 @@ impl<'a, DB: Database> Acquire<'a> for &'_ Pool { }) } - fn begin_with(self, sql: Cow<'static, str>) -> BoxFuture<'a, Result, Error>> { + fn begin_with(self, sql: S) -> BoxFuture<'a, Result, Error>> + where + S: Into> + Send + 'a, + { let conn = self.acquire(); Box::pin(async move { @@ -136,13 +141,16 @@ macro_rules! impl_acquire { } #[inline] - fn begin_with( + fn begin_with( self, - stmt: std::borrow::Cow<'static, str>, + stmt: S, ) -> futures_core::future::BoxFuture< 'c, Result<$crate::transaction::Transaction<'c, $DB>, $crate::error::Error>, - > { + > + where + S: Into> + Send + 'c, + { $crate::transaction::Transaction::begin_with(self, stmt) } } diff --git a/sqlx-core/src/any/transaction.rs b/sqlx-core/src/any/transaction.rs index 90c0c337ac..e6dc75dfd2 100644 --- a/sqlx-core/src/any/transaction.rs +++ b/sqlx-core/src/any/transaction.rs @@ -1,5 +1,5 @@ -use std::borrow::Cow; use futures_util::future::BoxFuture; +use std::borrow::Cow; use crate::any::{Any, AnyConnection}; use crate::database::Database; @@ -15,11 +15,16 @@ impl TransactionManager for AnyTransactionManager { conn.backend.begin() } - fn begin_with<'a>(conn: &'a mut ::Connection, sql: Cow<'static, str>) -> BoxFuture<'a, Result<(), Error>> { - conn.backend.begin_with(sql) + fn begin_with<'a, S>( + conn: &'a mut ::Connection, + sql: S, + ) -> BoxFuture<'a, Result<(), Error>> + where + S: Into> + Send + 'a, + { + conn.backend.begin_with(sql.into()) } - fn commit(conn: &mut AnyConnection) -> BoxFuture<'_, Result<(), Error>> { conn.backend.commit() } diff --git a/sqlx-core/src/pool/connection.rs b/sqlx-core/src/pool/connection.rs index 4bc929f632..e55a952a9f 100644 --- a/sqlx-core/src/pool/connection.rs +++ b/sqlx-core/src/pool/connection.rs @@ -12,9 +12,9 @@ use crate::error::Error; use super::inner::{is_beyond_max_lifetime, DecrementSizeGuard, PoolInner}; use crate::pool::options::PoolConnectionMetadata; -use std::future::Future; -use futures_core::future::BoxFuture; use crate::transaction::Transaction; +use futures_core::future::BoxFuture; +use std::future::Future; /// A connection managed by a [`Pool`][crate::pool::Pool]. /// @@ -163,7 +163,10 @@ impl<'c, DB: Database> crate::acquire::Acquire<'c> for &'c mut PoolConnection) -> BoxFuture<'c, Result, Error>> { + fn begin_with(self, sql: S) -> BoxFuture<'c, Result, Error>> + where + S: Into> + Send + 'c, + { crate::transaction::Transaction::begin_with(&mut **self, sql) } } diff --git a/sqlx-core/src/transaction.rs b/sqlx-core/src/transaction.rs index 625b8366d9..69da21a589 100644 --- a/sqlx-core/src/transaction.rs +++ b/sqlx-core/src/transaction.rs @@ -20,10 +20,12 @@ pub trait TransactionManager { conn: &mut ::Connection, ) -> BoxFuture<'_, Result<(), Error>>; - fn begin_with<'a>( + fn begin_with<'a, S>( conn: &'a mut ::Connection, - sql: Cow<'static, str>, - ) -> BoxFuture<'a, Result<(), Error>>; + sql: S, + ) -> BoxFuture<'a, Result<(), Error>> + where + S: Into> + Send + 'a; /// Commit the active transaction or release the most recent savepoint. fn commit( @@ -84,10 +86,13 @@ where } #[doc(hidden)] - pub fn begin_with( + pub fn begin_with( conn: impl Into>, - sql: Cow<'static, str> - ) -> BoxFuture<'c, Result> { + sql: S, + ) -> BoxFuture<'c, Result> + where + S: Into> + Send + 'c, + { let mut conn = conn.into(); Box::pin(async move { @@ -100,7 +105,6 @@ where }) } - /// Commits this transaction or savepoint. pub async fn commit(mut self) -> Result<(), Error> { DB::TransactionManager::commit(&mut self.connection).await?; @@ -245,7 +249,10 @@ impl<'c, 't, DB: Database> crate::acquire::Acquire<'t> for &'t mut Transaction<' Transaction::begin(&mut **self) } - fn begin_with(self, sql: Cow<'static, str>) -> BoxFuture<'t, Result, Error>> { + fn begin_with(self, sql: S) -> BoxFuture<'t, Result, Error>> + where + S: Into> + Send + 't, + { Transaction::begin_with(&mut **self, sql) } } diff --git a/sqlx-mysql/src/transaction.rs b/sqlx-mysql/src/transaction.rs index e19636406c..f2d9892490 100644 --- a/sqlx-mysql/src/transaction.rs +++ b/sqlx-mysql/src/transaction.rs @@ -1,11 +1,11 @@ -use std::borrow::Cow; -use futures_core::future::BoxFuture; -use sqlx_core::database::Database; use crate::connection::Waiting; use crate::error::Error; use crate::executor::Executor; use crate::protocol::text::Query; use crate::{MySql, MySqlConnection}; +use futures_core::future::BoxFuture; +use sqlx_core::database::Database; +use std::borrow::Cow; pub(crate) use sqlx_core::transaction::*; @@ -26,11 +26,17 @@ impl TransactionManager for MySqlTransactionManager { }) } - fn begin_with<'a>(conn: &'a mut ::Connection, sql: Cow<'static, str>) -> BoxFuture<'a, Result<(), Error>> { + fn begin_with<'a, S>( + conn: &'a mut ::Connection, + sql: S, + ) -> BoxFuture<'a, Result<(), Error>> + where + S: Into> + Send + 'a, + { Box::pin(async move { let depth = conn.transaction_depth; - conn.execute(&*sql).await?; + conn.execute(&*sql.into()).await?; conn.transaction_depth = depth + 1; Ok(()) diff --git a/sqlx-postgres/src/transaction.rs b/sqlx-postgres/src/transaction.rs index ebcbb976d2..b6d4d4f29c 100644 --- a/sqlx-postgres/src/transaction.rs +++ b/sqlx-postgres/src/transaction.rs @@ -1,8 +1,8 @@ -use std::borrow::Cow; -use futures_core::future::BoxFuture; -use sqlx_core::database::Database; use crate::error::Error; use crate::executor::Executor; +use futures_core::future::BoxFuture; +use sqlx_core::database::Database; +use std::borrow::Cow; use crate::{PgConnection, Postgres}; @@ -27,10 +27,16 @@ impl TransactionManager for PgTransactionManager { }) } - fn begin_with<'a>(conn: &'a mut ::Connection, sql: Cow<'static, str>) -> BoxFuture<'a, Result<(), Error>> { + fn begin_with<'a, S>( + conn: &'a mut ::Connection, + sql: S, + ) -> BoxFuture<'a, Result<(), Error>> + where + S: Into> + Send + 'a, + { Box::pin(async move { let rollback = Rollback::new(conn); - rollback.conn.queue_simple_query(&sql); + rollback.conn.queue_simple_query(&sql.into()); rollback.conn.transaction_depth += 1; rollback.conn.wait_until_ready().await?; rollback.defuse(); @@ -39,7 +45,6 @@ impl TransactionManager for PgTransactionManager { }) } - fn commit(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> { Box::pin(async move { if conn.transaction_depth > 0 { diff --git a/sqlx-sqlite/src/transaction.rs b/sqlx-sqlite/src/transaction.rs index 7eab1d1554..28f24aafd4 100644 --- a/sqlx-sqlite/src/transaction.rs +++ b/sqlx-sqlite/src/transaction.rs @@ -1,9 +1,9 @@ -use std::borrow::Cow; +use crate::{Sqlite, SqliteConnection}; use futures_core::future::BoxFuture; use sqlx_core::database::Database; -use crate::{Sqlite, SqliteConnection}; use sqlx_core::error::Error; use sqlx_core::transaction::TransactionManager; +use std::borrow::Cow; /// Implementation of [`TransactionManager`] for SQLite. pub struct SqliteTransactionManager; @@ -15,7 +15,13 @@ impl TransactionManager for SqliteTransactionManager { Box::pin(conn.worker.begin()) } - fn begin_with<'a>(_conn: &'a mut ::Connection, sql: Cow<'static, str>) -> BoxFuture<'a, Result<(), Error>> { + fn begin_with<'a, S>( + _conn: &'a mut ::Connection, + sql: S, + ) -> BoxFuture<'a, Result<(), Error>> + where + S: Into> + Send + 'a, + { unimplemented!() } From d47f13c6c993e97f966b40ef1cb185290171a6c8 Mon Sep 17 00:00:00 2001 From: Gleb Pomykalov Date: Tue, 9 Jul 2024 20:58:10 +0200 Subject: [PATCH 4/4] wip --- sqlx-core/src/any/connection/backend.rs | 2 +- sqlx-mysql/src/any.rs | 2 +- sqlx-postgres/src/any.rs | 2 +- sqlx-sqlite/src/any.rs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sqlx-core/src/any/connection/backend.rs b/sqlx-core/src/any/connection/backend.rs index 0410fd262a..5ae70158bd 100644 --- a/sqlx-core/src/any/connection/backend.rs +++ b/sqlx-core/src/any/connection/backend.rs @@ -1,9 +1,9 @@ -use std::borrow::Cow; use crate::any::{Any, AnyArguments, AnyQueryResult, AnyRow, AnyStatement, AnyTypeInfo}; use crate::describe::Describe; use either::Either; use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; +use std::borrow::Cow; use std::fmt::Debug; pub trait AnyConnectionBackend: std::any::Any + Debug + Send + 'static { diff --git a/sqlx-mysql/src/any.rs b/sqlx-mysql/src/any.rs index fed8969e3e..4318c0bee0 100644 --- a/sqlx-mysql/src/any.rs +++ b/sqlx-mysql/src/any.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use crate::protocol::text::ColumnType; use crate::{ MySql, MySqlColumn, MySqlConnectOptions, MySqlConnection, MySqlQueryResult, MySqlRow, @@ -17,6 +16,7 @@ use sqlx_core::database::Database; use sqlx_core::describe::Describe; use sqlx_core::executor::Executor; use sqlx_core::transaction::TransactionManager; +use std::borrow::Cow; sqlx_core::declare_driver_with_optional_migrate!(DRIVER = MySql); diff --git a/sqlx-postgres/src/any.rs b/sqlx-postgres/src/any.rs index aac3d66176..abad18a064 100644 --- a/sqlx-postgres/src/any.rs +++ b/sqlx-postgres/src/any.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use crate::{ Either, PgColumn, PgConnectOptions, PgConnection, PgQueryResult, PgRow, PgTransactionManager, PgTypeInfo, Postgres, @@ -6,6 +5,7 @@ use crate::{ use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; use futures_util::{StreamExt, TryFutureExt, TryStreamExt}; +use std::borrow::Cow; pub use sqlx_core::any::*; diff --git a/sqlx-sqlite/src/any.rs b/sqlx-sqlite/src/any.rs index a79df52d3d..c52f865a6a 100644 --- a/sqlx-sqlite/src/any.rs +++ b/sqlx-sqlite/src/any.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use crate::{ Either, Sqlite, SqliteArgumentValue, SqliteArguments, SqliteColumn, SqliteConnectOptions, SqliteConnection, SqliteQueryResult, SqliteRow, SqliteTransactionManager, SqliteTypeInfo, @@ -6,6 +5,7 @@ use crate::{ use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; use futures_util::{StreamExt, TryFutureExt, TryStreamExt}; +use std::borrow::Cow; use sqlx_core::any::{ Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow,