Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ mod transaction_manager;
#[cfg(feature = "mysql")]
#[doc(inline)]
pub use self::mysql::AsyncMysqlConnection;
#[cfg(feature = "mysql")]
#[doc(inline)]
pub use self::mysql::MysqlCancelToken;
#[cfg(feature = "postgres")]
#[doc(inline)]
pub use self::pg::AsyncPgConnection;
Expand Down
37 changes: 37 additions & 0 deletions src/mysql/cancel_token.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use mysql_async::prelude::Query;
use mysql_async::{Opts, OptsBuilder};

use crate::mysql::error_helper::ErrorHelper;

/// The capability to request cancellation of in-progress queries on a
/// connection.
#[derive(Clone)]
pub struct MysqlCancelToken {
pub(crate) opts: Opts,
pub(crate) kill_id: u32,
}

impl MysqlCancelToken {
/// Attempts to cancel the in-progress query on the connection associated
/// with this `CancelToken`.
///
/// The server provides no information about whether a cancellation attempt was successful or not. An error will
/// only be returned if the client was unable to connect to the database.
///
/// Cancellation is inherently racy. There is no guarantee that the
/// cancellation request will reach the server before the query terminates
/// normally, or that the connection associated with this token is still
/// active.
pub async fn cancel_query(&self) -> diesel::result::ConnectionResult<()> {
let builder = OptsBuilder::from_opts(self.opts.clone());

let conn = mysql_async::Conn::new(builder).await.map_err(ErrorHelper)?;

format!("KILL QUERY {};", self.kill_id)
.ignore(conn)
.await
.map_err(ErrorHelper)?;

Ok(())
}
}
10 changes: 10 additions & 0 deletions src/mysql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ use mysql_async::prelude::Queryable;
use mysql_async::{Opts, OptsBuilder, Statement};
use std::future::Future;

mod cancel_token;
mod error_helper;
mod row;
mod serialize;

pub use self::cancel_token::MysqlCancelToken;
use self::error_helper::ErrorHelper;
use self::row::MysqlRow;
use self::serialize::ToSqlHelper;
Expand Down Expand Up @@ -254,6 +256,14 @@ impl AsyncMysqlConnection {
Ok(conn)
}

/// Constructs a cancellation token that can later be used to request cancellation of a query running on the connection associated with this client.
pub fn cancel_token(&self) -> MysqlCancelToken {
let kill_id = self.conn.id();
let opts = self.conn.opts().clone();

MysqlCancelToken { kill_id, opts }
}

fn with_prepared_statement<'conn, T, F, R>(
&'conn mut self,
query: T,
Expand Down
46 changes: 46 additions & 0 deletions tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,52 @@ async fn postgres_cancel_token() {
}
}

#[cfg(feature = "mysql")]
#[tokio::test]
async fn mysql_cancel_token() {
use diesel::result::{DatabaseErrorKind, Error};
use std::time::Duration;

let (sender, receiver) = tokio::sync::oneshot::channel();

// execute a long-running query in a separate future
let query_future = async move {
let conn = &mut connection().await;
let token = conn.cancel_token();

// send the token back to the main thread via a oneshot channel
sender
.send(token)
.unwrap_or_else(|_| panic!("couldn't send token"));

diesel::dsl::sql::<diesel::sql_types::Integer>("SELECT SLEEP(5)")
.get_result::<i32>(conn)
.await
};
let cancel_future = async move {
// wait for the cancellation token to be sent
if let Ok(token) = receiver.await {
// give the query time to start before invoking the token
tokio::time::sleep(Duration::from_millis(500)).await;
token.cancel_query().await.unwrap();
} else {
panic!("Failed to receive cancel token");
}
};

let (task, _) = tokio::join!(query_future, cancel_future);

// make sure the query task resulted in a cancellation error or a return value of 1:
match task {
Err(Error::DatabaseError(DatabaseErrorKind::Unknown, v))
if v.message() == "Query execution was interrupted" => {}
Err(e) => panic!("unexpected error: {:?}", e),
// mysql 8.4 returns 1 from a canceled sleep instead of an error
Ok(1) => {}
Ok(_) => panic!("query completed successfully without cancellation"),
}
}

#[cfg(feature = "postgres")]
async fn setup(connection: &mut TestConnection) {
diesel::sql_query(
Expand Down
Loading