-
-
Notifications
You must be signed in to change notification settings - Fork 103
Description
Setup
Versions
- Rust: rustc 1.91.0 (f8297e351 2025-10-28)
- Diesel: 2.3.3
- Diesel_async: 0.7.3
- Database: MySQL
- Operating System Linux
Feature Flags
- diesel: ["chrono", "serde_json"]
- diesel_async: ["mysql"]
Problem Description
When running queries that are unable to be cached in the prepared statement cache (eg. anything that uses .eq_any or raw SQL) the library correctly marks the statements as MaybeCached::CannotCache, but when executing the query with .get_result() or .first() it does not close the statement in the underlying MySQL connection correctly. It works correctly if you use the .get_results() or .load() functions. There is a large comment in the relevant code section that mentions that this kind of bug may be possible (that the future is dropped when reading a fixed number of rows from the stream) but from all of my testing it seems to be 100% reproducible any time I call .first() which is heavily used in our codebase and most likely in all codebases that use this library so I feel like this is an actual bug in the library that needs to be addressed in some way (potentially by modifying the RunQueryDsl trait for .get_result to perform the operation in a similar way to the workaround in my test case - calling .load() and then returning the first entry from the Vec if it exists?). The end result of these queries leaking over time is that you hit the Can't create more than max_prepared_stmt_count statements error in MySQL until the connections are closed and MySQL empties the cache
What are you trying to accomplish?
N/A
What is the expected output?
Using .load().await?.get(0)
Executing MaybeCached::CannotCache statement: 159
Yielder for statement: 159
Yielder for statement: 159, finished polling result stream
Closing MaybeCached::CannotCache statement: 159
Using .get_result().await.optional()?
Executing MaybeCached::CannotCache statement: 160
Yielder for statement: 160
Yielder for statement: 160, finished polling result stream
Closing MaybeCached::CannotCache statement: 160
What is the actual output?
Using .load().await?.get(0)
Executing MaybeCached::CannotCache statement: 159
Yielder for statement: 159
Yielder for statement: 159, finished polling result stream
Closing MaybeCached::CannotCache statement: 159
Using .get_result().await.optional()?
Executing MaybeCached::CannotCache statement: 160
Yielder for statement: 160
Are you seeing any additional errors?
No
Steps to reproduce
I modified the AsyncMysqlConnection implementation to add some logging (which you can see in the above output):
fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
where
T: diesel::query_builder::AsQuery,
T::Query: diesel::query_builder::QueryFragment<Self::Backend>
+ diesel::query_builder::QueryId
+ 'query,
{
self.with_prepared_statement(source.as_query(), |conn, stmt, binds| async move {
let (stmt_id, stmt_for_exec) = match stmt {
MaybeCached::Cached(ref s) => {
println!("Executing MaybeCached::Cached statement: {:?}", s.id());
(s.id(), (*s).clone())
},
MaybeCached::CannotCache(ref s) => {
println!("Executing MaybeCached::CannotCache statement: {:?}", s.id());
(s.id(), s.clone())
},
_ => unreachable!(
"Diesel has only two variants here at the time of writing.\n\
If you ever see this error message please open in issue in the diesel-async issue tracker"
),
};
let (tx, rx) = futures_channel::mpsc::channel(0);
let yielder = async move {
println!("Yielder for statement: {:?}", stmt_id);
let r = Self::poll_result_stream(conn, stmt_for_exec, binds, tx).await;
println!("Yielder for statement: {:?}, finished polling result stream", stmt_id);
// We need to close any non-cached statement explicitly here as otherwise
// we might error out on too many open statements. See https://github.com/weiznich/diesel_async/issues/26
// for details
//
// This might be problematic for cases where the stream is dropped before the end is reached
//
// Such behaviour might happen if users:
// * Just drop the future/stream after polling at least once (timeouts!!)
// * Users only fetch a fixed number of elements from the stream
//
// For now there is not really a good solution to this problem as this would require something like async drop
// (and even with async drop that would be really hard to solve due to the involved lifetimes)
if let MaybeCached::CannotCache(stmt) = stmt {
println!("Closing MaybeCached::CannotCache statement: {:?}", stmt.id());
conn.close(stmt).await.map_err(ErrorHelper)?;
}
r
};
let fake_stream = stream::once(yielder).filter_map(|e: QueryResult<()>| async move {
if let Err(e) = e {
Some(Err(e))
} else {
None
}
});
let stream = stream::select(fake_stream, rx).boxed();
Ok(stream)
})
.boxed()
}And my simple test case to reproduce the issue:
#[derive(Debug, Clone, Serialize, Deserialize, Queryable, QueryableByName, Identifiable, Selectable)]
#[diesel(table_name=users)]
pub struct User {
pub id: u32,
pub name: String,
}
table! {
users {
id -> Unsigned<Integer>,
name -> VarChar,
}
}
println!("Using .load().await?.get(0)");
let _user = users::table
.filter(users::dsl::id.eq_any(&[1, 2]))
.load::<User>(database_connection)
.await?
.get(0);
println!("Using .get_result().await.optional()?");
let _user = users::table
.filter(users::dsl::id.eq_any(&[1, 2]))
.get_result::<User>(database_connection)
.await
.optional()?;Checklist
- I have already looked over the issue tracker for similar possible closed issues.
- This issue can be reproduced on Rust's stable channel. (Your issue will be
closed if this is not the case) - This issue can be reproduced without requiring a third party crate