diff --git a/src/lib.rs b/src/lib.rs index 943e19f..4ae68a3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -128,6 +128,9 @@ mod transaction_manager; #[cfg(feature = "mysql")] #[doc(inline)] pub use self::mysql::AsyncMysqlConnection; +#[cfg(feature = "mysql")] +#[doc(inline)] +pub use self::mysql::MysqlCancelToken; #[cfg(feature = "postgres")] #[doc(inline)] pub use self::pg::AsyncPgConnection; diff --git a/src/mysql/cancel_token.rs b/src/mysql/cancel_token.rs new file mode 100644 index 0000000..8f2c945 --- /dev/null +++ b/src/mysql/cancel_token.rs @@ -0,0 +1,37 @@ +use mysql_async::prelude::Query; +use mysql_async::{Opts, OptsBuilder}; + +use crate::mysql::error_helper::ErrorHelper; + +/// The capability to request cancellation of in-progress queries on a +/// connection. +#[derive(Clone)] +pub struct MysqlCancelToken { + pub(crate) opts: Opts, + pub(crate) kill_id: u32, +} + +impl MysqlCancelToken { + /// Attempts to cancel the in-progress query on the connection associated + /// with this `CancelToken`. + /// + /// The server provides no information about whether a cancellation attempt was successful or not. An error will + /// only be returned if the client was unable to connect to the database. + /// + /// Cancellation is inherently racy. There is no guarantee that the + /// cancellation request will reach the server before the query terminates + /// normally, or that the connection associated with this token is still + /// active. + pub async fn cancel_query(&self) -> diesel::result::ConnectionResult<()> { + let builder = OptsBuilder::from_opts(self.opts.clone()); + + let conn = mysql_async::Conn::new(builder).await.map_err(ErrorHelper)?; + + format!("KILL QUERY {};", self.kill_id) + .ignore(conn) + .await + .map_err(ErrorHelper)?; + + Ok(()) + } +} diff --git a/src/mysql/mod.rs b/src/mysql/mod.rs index 1d44650..9d4cc89 100644 --- a/src/mysql/mod.rs +++ b/src/mysql/mod.rs @@ -19,10 +19,12 @@ use mysql_async::prelude::Queryable; use mysql_async::{Opts, OptsBuilder, Statement}; use std::future::Future; +mod cancel_token; mod error_helper; mod row; mod serialize; +pub use self::cancel_token::MysqlCancelToken; use self::error_helper::ErrorHelper; use self::row::MysqlRow; use self::serialize::ToSqlHelper; @@ -254,6 +256,14 @@ impl AsyncMysqlConnection { Ok(conn) } + /// Constructs a cancellation token that can later be used to request cancellation of a query running on the connection associated with this client. + pub fn cancel_token(&self) -> MysqlCancelToken { + let kill_id = self.conn.id(); + let opts = self.conn.opts().clone(); + + MysqlCancelToken { kill_id, opts } + } + fn with_prepared_statement<'conn, T, F, R>( &'conn mut self, query: T, diff --git a/tests/lib.rs b/tests/lib.rs index 67b0c27..80162a5 100644 --- a/tests/lib.rs +++ b/tests/lib.rs @@ -167,6 +167,52 @@ async fn postgres_cancel_token() { } } +#[cfg(feature = "mysql")] +#[tokio::test] +async fn mysql_cancel_token() { + use diesel::result::{DatabaseErrorKind, Error}; + use std::time::Duration; + + let (sender, receiver) = tokio::sync::oneshot::channel(); + + // execute a long-running query in a separate future + let query_future = async move { + let conn = &mut connection().await; + let token = conn.cancel_token(); + + // send the token back to the main thread via a oneshot channel + sender + .send(token) + .unwrap_or_else(|_| panic!("couldn't send token")); + + diesel::dsl::sql::("SELECT SLEEP(5)") + .get_result::(conn) + .await + }; + let cancel_future = async move { + // wait for the cancellation token to be sent + if let Ok(token) = receiver.await { + // give the query time to start before invoking the token + tokio::time::sleep(Duration::from_millis(500)).await; + token.cancel_query().await.unwrap(); + } else { + panic!("Failed to receive cancel token"); + } + }; + + let (task, _) = tokio::join!(query_future, cancel_future); + + // make sure the query task resulted in a cancellation error or a return value of 1: + match task { + Err(Error::DatabaseError(DatabaseErrorKind::Unknown, v)) + if v.message() == "Query execution was interrupted" => {} + Err(e) => panic!("unexpected error: {:?}", e), + // mysql 8.4 returns 1 from a canceled sleep instead of an error + Ok(1) => {} + Ok(_) => panic!("query completed successfully without cancellation"), + } +} + #[cfg(feature = "postgres")] async fn setup(connection: &mut TestConnection) { diesel::sql_query(