From 815c7c198ebdbcc28042b829b7f94a79d005cfd9 Mon Sep 17 00:00:00 2001 From: Isaac Horvath Date: Sat, 13 Sep 2025 13:09:25 -0400 Subject: [PATCH 1/3] add mysql cancel token --- src/lib.rs | 3 +++ src/mysql/cancel_token.rs | 37 ++++++++++++++++++++++++++++++++ src/mysql/mod.rs | 10 +++++++++ tests/lib.rs | 45 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 95 insertions(+) create mode 100644 src/mysql/cancel_token.rs diff --git a/src/lib.rs b/src/lib.rs index 943e19f..be22a3e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -128,6 +128,9 @@ mod transaction_manager; #[cfg(feature = "mysql")] #[doc(inline)] pub use self::mysql::AsyncMysqlConnection; +#[cfg(feature = "mysql")] +#[doc(inline)] +pub use self::mysql::CancelToken; #[cfg(feature = "postgres")] #[doc(inline)] pub use self::pg::AsyncPgConnection; diff --git a/src/mysql/cancel_token.rs b/src/mysql/cancel_token.rs new file mode 100644 index 0000000..1e39dab --- /dev/null +++ b/src/mysql/cancel_token.rs @@ -0,0 +1,37 @@ +use mysql_async::prelude::Query; +use mysql_async::{Opts, OptsBuilder}; + +use crate::mysql::error_helper::ErrorHelper; + +/// The capability to request cancellation of in-progress queries on a +/// connection. +#[derive(Clone)] +pub struct CancelToken { + pub(crate) opts: Opts, + pub(crate) kill_id: u32, +} + +impl CancelToken { + /// Attempts to cancel the in-progress query on the connection associated + /// with this `CancelToken`. + /// + /// The server provides no information about whether a cancellation attempt was successful or not. An error will + /// only be returned if the client was unable to connect to the database. + /// + /// Cancellation is inherently racy. There is no guarantee that the + /// cancellation request will reach the server before the query terminates + /// normally, or that the connection associated with this token is still + /// active. + pub async fn cancel_query(&self) -> Result<(), diesel::result::ConnectionError> { + let builder = OptsBuilder::from_opts(self.opts.clone()); + + let conn = mysql_async::Conn::new(builder).await.map_err(ErrorHelper)?; + + format!("KILL QUERY {};", self.kill_id) + .ignore(conn) + .await + .map_err(ErrorHelper)?; + + Ok(()) + } +} diff --git a/src/mysql/mod.rs b/src/mysql/mod.rs index 1d44650..fd5feaa 100644 --- a/src/mysql/mod.rs +++ b/src/mysql/mod.rs @@ -19,10 +19,12 @@ use mysql_async::prelude::Queryable; use mysql_async::{Opts, OptsBuilder, Statement}; use std::future::Future; +mod cancel_token; mod error_helper; mod row; mod serialize; +pub use self::cancel_token::CancelToken; use self::error_helper::ErrorHelper; use self::row::MysqlRow; use self::serialize::ToSqlHelper; @@ -254,6 +256,14 @@ impl AsyncMysqlConnection { Ok(conn) } + /// Constructs a cancellation token that can later be used to request cancellation of a query running on the connection associated with this client. + pub fn cancel_token(&self) -> CancelToken { + let kill_id = self.conn.id(); + let opts = self.conn.opts().clone(); + + CancelToken { kill_id, opts } + } + fn with_prepared_statement<'conn, T, F, R>( &'conn mut self, query: T, diff --git a/tests/lib.rs b/tests/lib.rs index 67b0c27..bb0960f 100644 --- a/tests/lib.rs +++ b/tests/lib.rs @@ -167,6 +167,51 @@ async fn postgres_cancel_token() { } } +#[cfg(feature = "mysql")] +#[tokio::test] +async fn mysql_cancel_token() { + use std::time::Duration; + + use diesel::result::{DatabaseErrorKind, Error}; + + let (sender, receiver) = tokio::sync::oneshot::channel(); + + // execute a long-running query on a separate thread + let task = tokio::spawn(async move { + let conn = &mut connection().await; + let token = conn.cancel_token(); + + // send the token back to the main thread via a oneshot channel + sender + .send(token) + .unwrap_or_else(|_| panic!("couldn't send token")); + + diesel::dsl::sql::("SELECT SLEEP(5)") + .load::(conn) + .await + }); + + // wait for the cancellation token to be sent + if let Ok(token) = receiver.await { + // give the query time to start before invoking the token + tokio::time::sleep(Duration::from_millis(500)).await; + token.cancel_query().await.unwrap(); + } + + // make sure the query task resulted in a cancellation error or a return value of 1: + match task.await.unwrap() { + Err(e) => match e { + Error::DatabaseError(DatabaseErrorKind::Unknown, v) + if v.message() == "Query execution was interrupted" => {} + _ => panic!("unexpected error: {:?}", e), + }, + Ok(r) => match r[0] { + 1 => {} + _ => panic!(""), + }, + } +} + #[cfg(feature = "postgres")] async fn setup(connection: &mut TestConnection) { diesel::sql_query( From bdc7fe92cee044a72815c00ec3331b0d419ff8c1 Mon Sep 17 00:00:00 2001 From: Isaac Horvath Date: Sat, 13 Sep 2025 13:54:50 -0400 Subject: [PATCH 2/3] fix test failure output --- tests/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/lib.rs b/tests/lib.rs index bb0960f..8fa5fc6 100644 --- a/tests/lib.rs +++ b/tests/lib.rs @@ -207,7 +207,7 @@ async fn mysql_cancel_token() { }, Ok(r) => match r[0] { 1 => {} - _ => panic!(""), + _ => panic!("query completed successfully without cancellation"), }, } } From ce9b84a8d0f6a959ce8fa53411d24d376247617c Mon Sep 17 00:00:00 2001 From: Georg Semmler Date: Thu, 2 Oct 2025 08:04:10 +0200 Subject: [PATCH 3/3] Minor improvements --- src/lib.rs | 2 +- src/mysql/cancel_token.rs | 6 ++--- src/mysql/mod.rs | 6 ++--- tests/lib.rs | 47 ++++++++++++++++++++------------------- 4 files changed, 31 insertions(+), 30 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index be22a3e..4ae68a3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -130,7 +130,7 @@ mod transaction_manager; pub use self::mysql::AsyncMysqlConnection; #[cfg(feature = "mysql")] #[doc(inline)] -pub use self::mysql::CancelToken; +pub use self::mysql::MysqlCancelToken; #[cfg(feature = "postgres")] #[doc(inline)] pub use self::pg::AsyncPgConnection; diff --git a/src/mysql/cancel_token.rs b/src/mysql/cancel_token.rs index 1e39dab..8f2c945 100644 --- a/src/mysql/cancel_token.rs +++ b/src/mysql/cancel_token.rs @@ -6,12 +6,12 @@ use crate::mysql::error_helper::ErrorHelper; /// The capability to request cancellation of in-progress queries on a /// connection. #[derive(Clone)] -pub struct CancelToken { +pub struct MysqlCancelToken { pub(crate) opts: Opts, pub(crate) kill_id: u32, } -impl CancelToken { +impl MysqlCancelToken { /// Attempts to cancel the in-progress query on the connection associated /// with this `CancelToken`. /// @@ -22,7 +22,7 @@ impl CancelToken { /// cancellation request will reach the server before the query terminates /// normally, or that the connection associated with this token is still /// active. - pub async fn cancel_query(&self) -> Result<(), diesel::result::ConnectionError> { + pub async fn cancel_query(&self) -> diesel::result::ConnectionResult<()> { let builder = OptsBuilder::from_opts(self.opts.clone()); let conn = mysql_async::Conn::new(builder).await.map_err(ErrorHelper)?; diff --git a/src/mysql/mod.rs b/src/mysql/mod.rs index fd5feaa..9d4cc89 100644 --- a/src/mysql/mod.rs +++ b/src/mysql/mod.rs @@ -24,7 +24,7 @@ mod error_helper; mod row; mod serialize; -pub use self::cancel_token::CancelToken; +pub use self::cancel_token::MysqlCancelToken; use self::error_helper::ErrorHelper; use self::row::MysqlRow; use self::serialize::ToSqlHelper; @@ -257,11 +257,11 @@ impl AsyncMysqlConnection { } /// Constructs a cancellation token that can later be used to request cancellation of a query running on the connection associated with this client. - pub fn cancel_token(&self) -> CancelToken { + pub fn cancel_token(&self) -> MysqlCancelToken { let kill_id = self.conn.id(); let opts = self.conn.opts().clone(); - CancelToken { kill_id, opts } + MysqlCancelToken { kill_id, opts } } fn with_prepared_statement<'conn, T, F, R>( diff --git a/tests/lib.rs b/tests/lib.rs index 8fa5fc6..80162a5 100644 --- a/tests/lib.rs +++ b/tests/lib.rs @@ -170,14 +170,13 @@ async fn postgres_cancel_token() { #[cfg(feature = "mysql")] #[tokio::test] async fn mysql_cancel_token() { - use std::time::Duration; - use diesel::result::{DatabaseErrorKind, Error}; + use std::time::Duration; let (sender, receiver) = tokio::sync::oneshot::channel(); - // execute a long-running query on a separate thread - let task = tokio::spawn(async move { + // execute a long-running query in a separate future + let query_future = async move { let conn = &mut connection().await; let token = conn.cancel_token(); @@ -187,28 +186,30 @@ async fn mysql_cancel_token() { .unwrap_or_else(|_| panic!("couldn't send token")); diesel::dsl::sql::("SELECT SLEEP(5)") - .load::(conn) + .get_result::(conn) .await - }); - - // wait for the cancellation token to be sent - if let Ok(token) = receiver.await { - // give the query time to start before invoking the token - tokio::time::sleep(Duration::from_millis(500)).await; - token.cancel_query().await.unwrap(); - } + }; + let cancel_future = async move { + // wait for the cancellation token to be sent + if let Ok(token) = receiver.await { + // give the query time to start before invoking the token + tokio::time::sleep(Duration::from_millis(500)).await; + token.cancel_query().await.unwrap(); + } else { + panic!("Failed to receive cancel token"); + } + }; + + let (task, _) = tokio::join!(query_future, cancel_future); // make sure the query task resulted in a cancellation error or a return value of 1: - match task.await.unwrap() { - Err(e) => match e { - Error::DatabaseError(DatabaseErrorKind::Unknown, v) - if v.message() == "Query execution was interrupted" => {} - _ => panic!("unexpected error: {:?}", e), - }, - Ok(r) => match r[0] { - 1 => {} - _ => panic!("query completed successfully without cancellation"), - }, + match task { + Err(Error::DatabaseError(DatabaseErrorKind::Unknown, v)) + if v.message() == "Query execution was interrupted" => {} + Err(e) => panic!("unexpected error: {:?}", e), + // mysql 8.4 returns 1 from a canceled sleep instead of an error + Ok(1) => {} + Ok(_) => panic!("query completed successfully without cancellation"), } }