diff --git a/CHANGELOG.md b/CHANGELOG.md index 02da95d..779ded5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ for Rust libraries in [RFC #1105](https://github.com/rust-lang/rfcs/blob/master/ ## [Unreleased] +## [0.7.4] - 2025-11-07 + +* Fixed an issue with dropping uncached mysql statements + ## [0.7.3] - 2025-10-05 * Another docs.rs build fix diff --git a/Cargo.toml b/Cargo.toml index 6a3350e..f6809cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "diesel-async" -version = "0.7.3" +version = "0.7.4" authors = ["Georg Semmler "] edition = "2021" autotests = false diff --git a/src/mysql/mod.rs b/src/mysql/mod.rs index 9d4cc89..5972d9f 100644 --- a/src/mysql/mod.rs +++ b/src/mysql/mod.rs @@ -13,7 +13,7 @@ use diesel::result::{ConnectionError, ConnectionResult}; use diesel::QueryResult; use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; -use futures_util::stream; +use futures_core::Stream; use futures_util::{FutureExt, StreamExt, TryStreamExt}; use mysql_async::prelude::Queryable; use mysql_async::{Opts, OptsBuilder, Statement}; @@ -36,6 +36,7 @@ pub struct AsyncMysqlConnection { stmt_cache: StatementCache, transaction_manager: AnsiTransactionManager, instrumentation: DynInstrumentation, + stmt_to_free: Vec, } impl SimpleAsyncConnection for AsyncMysqlConnection { @@ -81,48 +82,7 @@ impl AsyncConnectionCore for AsyncMysqlConnection { + 'query, { self.with_prepared_statement(source.as_query(), |conn, stmt, binds| async move { - let stmt_for_exec = match stmt { - MaybeCached::Cached(ref s) => (*s).clone(), - MaybeCached::CannotCache(ref s) => s.clone(), - _ => unreachable!( - "Diesel has only two variants here at the time of writing.\n\ - If you ever see this error message please open in issue in the diesel-async issue tracker" - ), - }; - - let (tx, rx) = futures_channel::mpsc::channel(0); - - let yielder = async move { - let r = Self::poll_result_stream(conn, stmt_for_exec, binds, tx).await; - // We need to close any non-cached statement explicitly here as otherwise - // we might error out on too many open statements. See https://github.com/weiznich/diesel_async/issues/26 - // for details - // - // This might be problematic for cases where the stream is dropped before the end is reached - // - // Such behaviour might happen if users: - // * Just drop the future/stream after polling at least once (timeouts!!) - // * Users only fetch a fixed number of elements from the stream - // - // For now there is not really a good solution to this problem as this would require something like async drop - // (and even with async drop that would be really hard to solve due to the involved lifetimes) - if let MaybeCached::CannotCache(stmt) = stmt { - conn.close(stmt).await.map_err(ErrorHelper)?; - } - r - }; - - let fake_stream = stream::once(yielder).filter_map(|e: QueryResult<()>| async move { - if let Err(e) = e { - Some(Err(e)) - } else { - None - } - }); - - let stream = stream::select(fake_stream, rx).boxed(); - - Ok(stream) + Ok(Self::poll_result_stream(conn, stmt, binds).await?.boxed()) }) .boxed() } @@ -139,20 +99,6 @@ impl AsyncConnectionCore for AsyncMysqlConnection { self.with_prepared_statement(source, |conn, stmt, binds| async move { let params = mysql_async::Params::try_from(binds)?; conn.exec_drop(&*stmt, params).await.map_err(ErrorHelper)?; - // We need to close any non-cached statement explicitly here as otherwise - // we might error out on too many open statements. See https://github.com/weiznich/diesel_async/issues/26 - // for details - // - // This might be problematic for cases where the stream is dropped before the end is reached - // - // Such behaviour might happen if users: - // * Just drop the future after polling at least once (timeouts!!) - // - // For now there is not really a good solution to this problem as this would require something like async drop - // (and even with async drop that would be really hard to solve due to the involved lifetimes) - if let MaybeCached::CannotCache(stmt) = stmt { - conn.close(stmt).await.map_err(ErrorHelper)?; - } conn.affected_rows() .try_into() .map_err(|e| diesel::result::Error::DeserializationError(Box::new(e))) @@ -244,6 +190,7 @@ impl AsyncMysqlConnection { stmt_cache: StatementCache::new(), transaction_manager: AnsiTransactionManager::default(), instrumentation: DynInstrumentation::default_instrumentation(), + stmt_to_free: Vec::new(), }; for stmt in CONNECTION_SETUP_QUERIES { @@ -290,6 +237,7 @@ impl AsyncMysqlConnection { ref mut stmt_cache, ref mut transaction_manager, ref mut instrumentation, + ref mut stmt_to_free, .. } = self; @@ -299,6 +247,19 @@ impl AsyncMysqlConnection { let query_id = T::query_id(); async move { + // We need to close any non-cached statement explicitly here as otherwise + // we might error out on too many open statements. See https://github.com/weiznich/diesel_async/issues/26 + // and https://github.com/weiznich/diesel_async/issues/269 for details + // + // We remember these statements from the last run as there is currenly no relaible way to + // run this as destruction step after the execution finished. Users might abort polling the future, etc + // + // The overhead for this is keeping one additional statement open until the connection is used + // next, so you would need to have `max_prepared_stmt_count - 1` other statements open for this to cause issues. + // This is hopefully not a problem in practice + for stmt in std::mem::take(stmt_to_free) { + conn.close(stmt).await.map_err(ErrorHelper)?; + } let RawBytesBindCollector { metadata, binds, .. } = bind_collector?; @@ -320,6 +281,10 @@ impl AsyncMysqlConnection { &mut **instrumentation, ) .await?; + // for any not cached statement we need to remember to close them on the next connection usage + if let MaybeCached::CannotCache(stmt) = &stmt { + stmt_to_free.push(stmt.clone()); + } callback(conn, stmt, ToSqlHelper { metadata, binds }).await }; let r = update_transaction_manager_status(inner.await, transaction_manager); @@ -332,21 +297,31 @@ impl AsyncMysqlConnection { .boxed() } - async fn poll_result_stream( - conn: &mut mysql_async::Conn, - stmt_for_exec: mysql_async::Statement, + async fn poll_result_stream<'conn>( + conn: &'conn mut mysql_async::Conn, + stmt: MaybeCached<'_, mysql_async::Statement>, binds: ToSqlHelper, - mut tx: futures_channel::mpsc::Sender>, - ) -> QueryResult<()> { - use futures_util::sink::SinkExt; + ) -> QueryResult> + Send + use<'conn>> { let params = mysql_async::Params::try_from(binds)?; + let stmt_for_exec = match stmt { + MaybeCached::Cached(ref s) => { + (*s).clone() + }, + MaybeCached::CannotCache(ref s) => { + s.clone() + }, + _ => unreachable!( + "Diesel has only two variants here at the time of writing.\n\ + If you ever see this error message please open in issue in the diesel-async issue tracker" + ), + }; let res = conn .exec_iter(stmt_for_exec, params) .await .map_err(ErrorHelper)?; - let mut stream = res + let stream = res .stream_and_drop::() .await .map_err(ErrorHelper)? @@ -357,14 +332,7 @@ impl AsyncMysqlConnection { })? .map_err(|e| diesel::result::Error::from(ErrorHelper(e))); - while let Some(row) = stream.next().await { - let row = row?; - tx.send(Ok(row)) - .await - .map_err(|e| diesel::result::Error::DeserializationError(Box::new(e)))?; - } - - Ok(()) + Ok(stream) } async fn establish_connection_inner( @@ -384,6 +352,7 @@ impl AsyncMysqlConnection { stmt_cache: StatementCache::new(), transaction_manager: AnsiTransactionManager::default(), instrumentation: DynInstrumentation::none(), + stmt_to_free: Vec::new(), }) } } @@ -404,39 +373,78 @@ mod tests { } include!("../doctest_setup.rs"); + const STMT_COUNT: usize = 16382 + 1000; + + #[derive(Queryable)] + #[expect(dead_code, reason = "used for the test as loading target")] + struct User { + id: i32, + name: String, + } + #[tokio::test] - async fn check_statements_are_dropped() { + async fn check_cached_statements_are_dropped() { use self::schema::users; let mut conn = establish_connection().await; - // we cannot set a lower limit here without admin privileges - // which makes this test really slow - let stmt_count = 16382 + 10; - for i in 0..stmt_count { - diesel::insert_into(users::table) - .values(Some(users::name.eq(format!("User{i}")))) - .execute(&mut conn) + for _i in 0..STMT_COUNT { + users::table + .select(users::id) + .load::(&mut conn) .await .unwrap(); } + } - #[derive(QueryableByName)] - #[diesel(table_name = users)] - #[allow(dead_code)] - struct User { - id: i32, - name: String, - } + #[tokio::test] + async fn check_uncached_statements_are_dropped() { + use self::schema::users; + + let mut conn = establish_connection().await; - for i in 0..stmt_count { - diesel::sql_query("SELECT id, name FROM users WHERE name = ?") - .bind::(format!("User{i}")) + for _i in 0..STMT_COUNT { + users::table + .filter(users::dsl::id.eq_any(&[1, 2])) .load::(&mut conn) .await .unwrap(); } } + + #[tokio::test] + async fn check_cached_statements_are_dropped_get_result() { + use self::schema::users; + use diesel::OptionalExtension; + + let mut conn = establish_connection().await; + + for _i in 0..STMT_COUNT { + users::table + .select(users::id) + .get_result::(&mut conn) + .await + .optional() + .unwrap(); + } + } + + #[tokio::test] + async fn check_uncached_statements_are_dropped_get_result() { + use self::schema::users; + use diesel::OptionalExtension; + + let mut conn = establish_connection().await; + + for _i in 0..STMT_COUNT { + users::table + .filter(users::dsl::id.eq_any(&[1, 2])) + .get_result::(&mut conn) + .await + .optional() + .unwrap(); + } + } } impl QueryFragmentForCachedStatement for QueryFragmentHelper {