From 65fab7ec2f95071b3d5e64f339448ea0c31ed1d7 Mon Sep 17 00:00:00 2001 From: Georg Semmler Date: Fri, 7 Nov 2025 09:49:24 +0100 Subject: [PATCH] Keep track of mysql statements to close This change refactors how we close uncached mysql statements. Instead of trying to issue the close request as part of the same SQL query, we now just keep track of whether a statement need to be closed or not and execute the closing as first operation in the next query execution. The large advantage of this approach is that we sidestep any async drop/cancelation related problems. The disadvantage is that we keep that statement open for a bit longer. I cannot see how we ever would have more than one statement in there per connection and we also keep the cached statements around, so this shouldn't cause any problems in practice. Fix #269 --- CHANGELOG.md | 4 + Cargo.toml | 2 +- src/mysql/mod.rs | 188 ++++++++++++++++++++++++----------------------- 3 files changed, 103 insertions(+), 91 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 02da95d..779ded5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ for Rust libraries in [RFC #1105](https://github.com/rust-lang/rfcs/blob/master/ ## [Unreleased] +## [0.7.4] - 2025-11-07 + +* Fixed an issue with dropping uncached mysql statements + ## [0.7.3] - 2025-10-05 * Another docs.rs build fix diff --git a/Cargo.toml b/Cargo.toml index 6a3350e..f6809cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "diesel-async" -version = "0.7.3" +version = "0.7.4" authors = ["Georg Semmler "] edition = "2021" autotests = false diff --git a/src/mysql/mod.rs b/src/mysql/mod.rs index 9d4cc89..5972d9f 100644 --- a/src/mysql/mod.rs +++ b/src/mysql/mod.rs @@ -13,7 +13,7 @@ use diesel::result::{ConnectionError, ConnectionResult}; use diesel::QueryResult; use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; -use futures_util::stream; +use futures_core::Stream; use futures_util::{FutureExt, StreamExt, TryStreamExt}; use mysql_async::prelude::Queryable; use mysql_async::{Opts, OptsBuilder, Statement}; @@ -36,6 +36,7 @@ pub struct AsyncMysqlConnection { stmt_cache: StatementCache, transaction_manager: AnsiTransactionManager, instrumentation: DynInstrumentation, + stmt_to_free: Vec, } impl SimpleAsyncConnection for AsyncMysqlConnection { @@ -81,48 +82,7 @@ impl AsyncConnectionCore for AsyncMysqlConnection { + 'query, { self.with_prepared_statement(source.as_query(), |conn, stmt, binds| async move { - let stmt_for_exec = match stmt { - MaybeCached::Cached(ref s) => (*s).clone(), - MaybeCached::CannotCache(ref s) => s.clone(), - _ => unreachable!( - "Diesel has only two variants here at the time of writing.\n\ - If you ever see this error message please open in issue in the diesel-async issue tracker" - ), - }; - - let (tx, rx) = futures_channel::mpsc::channel(0); - - let yielder = async move { - let r = Self::poll_result_stream(conn, stmt_for_exec, binds, tx).await; - // We need to close any non-cached statement explicitly here as otherwise - // we might error out on too many open statements. See https://github.com/weiznich/diesel_async/issues/26 - // for details - // - // This might be problematic for cases where the stream is dropped before the end is reached - // - // Such behaviour might happen if users: - // * Just drop the future/stream after polling at least once (timeouts!!) - // * Users only fetch a fixed number of elements from the stream - // - // For now there is not really a good solution to this problem as this would require something like async drop - // (and even with async drop that would be really hard to solve due to the involved lifetimes) - if let MaybeCached::CannotCache(stmt) = stmt { - conn.close(stmt).await.map_err(ErrorHelper)?; - } - r - }; - - let fake_stream = stream::once(yielder).filter_map(|e: QueryResult<()>| async move { - if let Err(e) = e { - Some(Err(e)) - } else { - None - } - }); - - let stream = stream::select(fake_stream, rx).boxed(); - - Ok(stream) + Ok(Self::poll_result_stream(conn, stmt, binds).await?.boxed()) }) .boxed() } @@ -139,20 +99,6 @@ impl AsyncConnectionCore for AsyncMysqlConnection { self.with_prepared_statement(source, |conn, stmt, binds| async move { let params = mysql_async::Params::try_from(binds)?; conn.exec_drop(&*stmt, params).await.map_err(ErrorHelper)?; - // We need to close any non-cached statement explicitly here as otherwise - // we might error out on too many open statements. See https://github.com/weiznich/diesel_async/issues/26 - // for details - // - // This might be problematic for cases where the stream is dropped before the end is reached - // - // Such behaviour might happen if users: - // * Just drop the future after polling at least once (timeouts!!) - // - // For now there is not really a good solution to this problem as this would require something like async drop - // (and even with async drop that would be really hard to solve due to the involved lifetimes) - if let MaybeCached::CannotCache(stmt) = stmt { - conn.close(stmt).await.map_err(ErrorHelper)?; - } conn.affected_rows() .try_into() .map_err(|e| diesel::result::Error::DeserializationError(Box::new(e))) @@ -244,6 +190,7 @@ impl AsyncMysqlConnection { stmt_cache: StatementCache::new(), transaction_manager: AnsiTransactionManager::default(), instrumentation: DynInstrumentation::default_instrumentation(), + stmt_to_free: Vec::new(), }; for stmt in CONNECTION_SETUP_QUERIES { @@ -290,6 +237,7 @@ impl AsyncMysqlConnection { ref mut stmt_cache, ref mut transaction_manager, ref mut instrumentation, + ref mut stmt_to_free, .. } = self; @@ -299,6 +247,19 @@ impl AsyncMysqlConnection { let query_id = T::query_id(); async move { + // We need to close any non-cached statement explicitly here as otherwise + // we might error out on too many open statements. See https://github.com/weiznich/diesel_async/issues/26 + // and https://github.com/weiznich/diesel_async/issues/269 for details + // + // We remember these statements from the last run as there is currenly no relaible way to + // run this as destruction step after the execution finished. Users might abort polling the future, etc + // + // The overhead for this is keeping one additional statement open until the connection is used + // next, so you would need to have `max_prepared_stmt_count - 1` other statements open for this to cause issues. + // This is hopefully not a problem in practice + for stmt in std::mem::take(stmt_to_free) { + conn.close(stmt).await.map_err(ErrorHelper)?; + } let RawBytesBindCollector { metadata, binds, .. } = bind_collector?; @@ -320,6 +281,10 @@ impl AsyncMysqlConnection { &mut **instrumentation, ) .await?; + // for any not cached statement we need to remember to close them on the next connection usage + if let MaybeCached::CannotCache(stmt) = &stmt { + stmt_to_free.push(stmt.clone()); + } callback(conn, stmt, ToSqlHelper { metadata, binds }).await }; let r = update_transaction_manager_status(inner.await, transaction_manager); @@ -332,21 +297,31 @@ impl AsyncMysqlConnection { .boxed() } - async fn poll_result_stream( - conn: &mut mysql_async::Conn, - stmt_for_exec: mysql_async::Statement, + async fn poll_result_stream<'conn>( + conn: &'conn mut mysql_async::Conn, + stmt: MaybeCached<'_, mysql_async::Statement>, binds: ToSqlHelper, - mut tx: futures_channel::mpsc::Sender>, - ) -> QueryResult<()> { - use futures_util::sink::SinkExt; + ) -> QueryResult> + Send + use<'conn>> { let params = mysql_async::Params::try_from(binds)?; + let stmt_for_exec = match stmt { + MaybeCached::Cached(ref s) => { + (*s).clone() + }, + MaybeCached::CannotCache(ref s) => { + s.clone() + }, + _ => unreachable!( + "Diesel has only two variants here at the time of writing.\n\ + If you ever see this error message please open in issue in the diesel-async issue tracker" + ), + }; let res = conn .exec_iter(stmt_for_exec, params) .await .map_err(ErrorHelper)?; - let mut stream = res + let stream = res .stream_and_drop::() .await .map_err(ErrorHelper)? @@ -357,14 +332,7 @@ impl AsyncMysqlConnection { })? .map_err(|e| diesel::result::Error::from(ErrorHelper(e))); - while let Some(row) = stream.next().await { - let row = row?; - tx.send(Ok(row)) - .await - .map_err(|e| diesel::result::Error::DeserializationError(Box::new(e)))?; - } - - Ok(()) + Ok(stream) } async fn establish_connection_inner( @@ -384,6 +352,7 @@ impl AsyncMysqlConnection { stmt_cache: StatementCache::new(), transaction_manager: AnsiTransactionManager::default(), instrumentation: DynInstrumentation::none(), + stmt_to_free: Vec::new(), }) } } @@ -404,39 +373,78 @@ mod tests { } include!("../doctest_setup.rs"); + const STMT_COUNT: usize = 16382 + 1000; + + #[derive(Queryable)] + #[expect(dead_code, reason = "used for the test as loading target")] + struct User { + id: i32, + name: String, + } + #[tokio::test] - async fn check_statements_are_dropped() { + async fn check_cached_statements_are_dropped() { use self::schema::users; let mut conn = establish_connection().await; - // we cannot set a lower limit here without admin privileges - // which makes this test really slow - let stmt_count = 16382 + 10; - for i in 0..stmt_count { - diesel::insert_into(users::table) - .values(Some(users::name.eq(format!("User{i}")))) - .execute(&mut conn) + for _i in 0..STMT_COUNT { + users::table + .select(users::id) + .load::(&mut conn) .await .unwrap(); } + } - #[derive(QueryableByName)] - #[diesel(table_name = users)] - #[allow(dead_code)] - struct User { - id: i32, - name: String, - } + #[tokio::test] + async fn check_uncached_statements_are_dropped() { + use self::schema::users; + + let mut conn = establish_connection().await; - for i in 0..stmt_count { - diesel::sql_query("SELECT id, name FROM users WHERE name = ?") - .bind::(format!("User{i}")) + for _i in 0..STMT_COUNT { + users::table + .filter(users::dsl::id.eq_any(&[1, 2])) .load::(&mut conn) .await .unwrap(); } } + + #[tokio::test] + async fn check_cached_statements_are_dropped_get_result() { + use self::schema::users; + use diesel::OptionalExtension; + + let mut conn = establish_connection().await; + + for _i in 0..STMT_COUNT { + users::table + .select(users::id) + .get_result::(&mut conn) + .await + .optional() + .unwrap(); + } + } + + #[tokio::test] + async fn check_uncached_statements_are_dropped_get_result() { + use self::schema::users; + use diesel::OptionalExtension; + + let mut conn = establish_connection().await; + + for _i in 0..STMT_COUNT { + users::table + .filter(users::dsl::id.eq_any(&[1, 2])) + .get_result::(&mut conn) + .await + .optional() + .unwrap(); + } + } } impl QueryFragmentForCachedStatement for QueryFragmentHelper {