Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ for Rust libraries in [RFC #1105](https://github.com/rust-lang/rfcs/blob/master/

## [Unreleased]

## [0.7.4] - 2025-11-07

* Fixed an issue with dropping uncached mysql statements

## [0.7.3] - 2025-10-05

* Another docs.rs build fix
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "diesel-async"
version = "0.7.3"
version = "0.7.4"
authors = ["Georg Semmler <github@weiznich.de>"]
edition = "2021"
autotests = false
Expand Down
188 changes: 98 additions & 90 deletions src/mysql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use diesel::result::{ConnectionError, ConnectionResult};
use diesel::QueryResult;
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_util::stream;
use futures_core::Stream;
use futures_util::{FutureExt, StreamExt, TryStreamExt};
use mysql_async::prelude::Queryable;
use mysql_async::{Opts, OptsBuilder, Statement};
Expand All @@ -36,6 +36,7 @@ pub struct AsyncMysqlConnection {
stmt_cache: StatementCache<Mysql, Statement>,
transaction_manager: AnsiTransactionManager,
instrumentation: DynInstrumentation,
stmt_to_free: Vec<mysql_async::Statement>,
}

impl SimpleAsyncConnection for AsyncMysqlConnection {
Expand Down Expand Up @@ -81,48 +82,7 @@ impl AsyncConnectionCore for AsyncMysqlConnection {
+ 'query,
{
self.with_prepared_statement(source.as_query(), |conn, stmt, binds| async move {
let stmt_for_exec = match stmt {
MaybeCached::Cached(ref s) => (*s).clone(),
MaybeCached::CannotCache(ref s) => s.clone(),
_ => unreachable!(
"Diesel has only two variants here at the time of writing.\n\
If you ever see this error message please open in issue in the diesel-async issue tracker"
),
};

let (tx, rx) = futures_channel::mpsc::channel(0);

let yielder = async move {
let r = Self::poll_result_stream(conn, stmt_for_exec, binds, tx).await;
// We need to close any non-cached statement explicitly here as otherwise
// we might error out on too many open statements. See https://github.com/weiznich/diesel_async/issues/26
// for details
//
// This might be problematic for cases where the stream is dropped before the end is reached
//
// Such behaviour might happen if users:
// * Just drop the future/stream after polling at least once (timeouts!!)
// * Users only fetch a fixed number of elements from the stream
//
// For now there is not really a good solution to this problem as this would require something like async drop
// (and even with async drop that would be really hard to solve due to the involved lifetimes)
if let MaybeCached::CannotCache(stmt) = stmt {
conn.close(stmt).await.map_err(ErrorHelper)?;
}
r
};

let fake_stream = stream::once(yielder).filter_map(|e: QueryResult<()>| async move {
if let Err(e) = e {
Some(Err(e))
} else {
None
}
});

let stream = stream::select(fake_stream, rx).boxed();

Ok(stream)
Ok(Self::poll_result_stream(conn, stmt, binds).await?.boxed())
})
.boxed()
}
Expand All @@ -139,20 +99,6 @@ impl AsyncConnectionCore for AsyncMysqlConnection {
self.with_prepared_statement(source, |conn, stmt, binds| async move {
let params = mysql_async::Params::try_from(binds)?;
conn.exec_drop(&*stmt, params).await.map_err(ErrorHelper)?;
// We need to close any non-cached statement explicitly here as otherwise
// we might error out on too many open statements. See https://github.com/weiznich/diesel_async/issues/26
// for details
//
// This might be problematic for cases where the stream is dropped before the end is reached
//
// Such behaviour might happen if users:
// * Just drop the future after polling at least once (timeouts!!)
//
// For now there is not really a good solution to this problem as this would require something like async drop
// (and even with async drop that would be really hard to solve due to the involved lifetimes)
if let MaybeCached::CannotCache(stmt) = stmt {
conn.close(stmt).await.map_err(ErrorHelper)?;
}
conn.affected_rows()
.try_into()
.map_err(|e| diesel::result::Error::DeserializationError(Box::new(e)))
Expand Down Expand Up @@ -244,6 +190,7 @@ impl AsyncMysqlConnection {
stmt_cache: StatementCache::new(),
transaction_manager: AnsiTransactionManager::default(),
instrumentation: DynInstrumentation::default_instrumentation(),
stmt_to_free: Vec::new(),
};

for stmt in CONNECTION_SETUP_QUERIES {
Expand Down Expand Up @@ -290,6 +237,7 @@ impl AsyncMysqlConnection {
ref mut stmt_cache,
ref mut transaction_manager,
ref mut instrumentation,
ref mut stmt_to_free,
..
} = self;

Expand All @@ -299,6 +247,19 @@ impl AsyncMysqlConnection {
let query_id = T::query_id();

async move {
// We need to close any non-cached statement explicitly here as otherwise
// we might error out on too many open statements. See https://github.com/weiznich/diesel_async/issues/26
// and https://github.com/weiznich/diesel_async/issues/269 for details
//
// We remember these statements from the last run as there is currenly no relaible way to
// run this as destruction step after the execution finished. Users might abort polling the future, etc
//
// The overhead for this is keeping one additional statement open until the connection is used
// next, so you would need to have `max_prepared_stmt_count - 1` other statements open for this to cause issues.
// This is hopefully not a problem in practice
for stmt in std::mem::take(stmt_to_free) {
conn.close(stmt).await.map_err(ErrorHelper)?;
}
let RawBytesBindCollector {
metadata, binds, ..
} = bind_collector?;
Expand All @@ -320,6 +281,10 @@ impl AsyncMysqlConnection {
&mut **instrumentation,
)
.await?;
// for any not cached statement we need to remember to close them on the next connection usage
if let MaybeCached::CannotCache(stmt) = &stmt {
stmt_to_free.push(stmt.clone());
}
callback(conn, stmt, ToSqlHelper { metadata, binds }).await
};
let r = update_transaction_manager_status(inner.await, transaction_manager);
Expand All @@ -332,21 +297,31 @@ impl AsyncMysqlConnection {
.boxed()
}

async fn poll_result_stream(
conn: &mut mysql_async::Conn,
stmt_for_exec: mysql_async::Statement,
async fn poll_result_stream<'conn>(
conn: &'conn mut mysql_async::Conn,
stmt: MaybeCached<'_, mysql_async::Statement>,
binds: ToSqlHelper,
mut tx: futures_channel::mpsc::Sender<QueryResult<MysqlRow>>,
) -> QueryResult<()> {
use futures_util::sink::SinkExt;
) -> QueryResult<impl Stream<Item = QueryResult<MysqlRow>> + Send + use<'conn>> {
let params = mysql_async::Params::try_from(binds)?;
let stmt_for_exec = match stmt {
MaybeCached::Cached(ref s) => {
(*s).clone()
},
MaybeCached::CannotCache(ref s) => {
s.clone()
},
_ => unreachable!(
"Diesel has only two variants here at the time of writing.\n\
If you ever see this error message please open in issue in the diesel-async issue tracker"
),
};

let res = conn
.exec_iter(stmt_for_exec, params)
.await
.map_err(ErrorHelper)?;

let mut stream = res
let stream = res
.stream_and_drop::<MysqlRow>()
.await
.map_err(ErrorHelper)?
Expand All @@ -357,14 +332,7 @@ impl AsyncMysqlConnection {
})?
.map_err(|e| diesel::result::Error::from(ErrorHelper(e)));

while let Some(row) = stream.next().await {
let row = row?;
tx.send(Ok(row))
.await
.map_err(|e| diesel::result::Error::DeserializationError(Box::new(e)))?;
}

Ok(())
Ok(stream)
}

async fn establish_connection_inner(
Expand All @@ -384,6 +352,7 @@ impl AsyncMysqlConnection {
stmt_cache: StatementCache::new(),
transaction_manager: AnsiTransactionManager::default(),
instrumentation: DynInstrumentation::none(),
stmt_to_free: Vec::new(),
})
}
}
Expand All @@ -404,39 +373,78 @@ mod tests {
}
include!("../doctest_setup.rs");

const STMT_COUNT: usize = 16382 + 1000;

#[derive(Queryable)]
#[expect(dead_code, reason = "used for the test as loading target")]
struct User {
id: i32,
name: String,
}

#[tokio::test]
async fn check_statements_are_dropped() {
async fn check_cached_statements_are_dropped() {
use self::schema::users;

let mut conn = establish_connection().await;
// we cannot set a lower limit here without admin privileges
// which makes this test really slow
let stmt_count = 16382 + 10;

for i in 0..stmt_count {
diesel::insert_into(users::table)
.values(Some(users::name.eq(format!("User{i}"))))
.execute(&mut conn)
for _i in 0..STMT_COUNT {
users::table
.select(users::id)
.load::<i32>(&mut conn)
.await
.unwrap();
}
}

#[derive(QueryableByName)]
#[diesel(table_name = users)]
#[allow(dead_code)]
struct User {
id: i32,
name: String,
}
#[tokio::test]
async fn check_uncached_statements_are_dropped() {
use self::schema::users;

let mut conn = establish_connection().await;

for i in 0..stmt_count {
diesel::sql_query("SELECT id, name FROM users WHERE name = ?")
.bind::<diesel::sql_types::Text, _>(format!("User{i}"))
for _i in 0..STMT_COUNT {
users::table
.filter(users::dsl::id.eq_any(&[1, 2]))
.load::<User>(&mut conn)
.await
.unwrap();
}
}

#[tokio::test]
async fn check_cached_statements_are_dropped_get_result() {
use self::schema::users;
use diesel::OptionalExtension;

let mut conn = establish_connection().await;

for _i in 0..STMT_COUNT {
users::table
.select(users::id)
.get_result::<i32>(&mut conn)
.await
.optional()
.unwrap();
}
}

#[tokio::test]
async fn check_uncached_statements_are_dropped_get_result() {
use self::schema::users;
use diesel::OptionalExtension;

let mut conn = establish_connection().await;

for _i in 0..STMT_COUNT {
users::table
.filter(users::dsl::id.eq_any(&[1, 2]))
.get_result::<User>(&mut conn)
.await
.optional()
.unwrap();
}
}
}

impl QueryFragmentForCachedStatement<Mysql> for QueryFragmentHelper {
Expand Down
Loading