Skip to content

Commit ce9b84a

Browse files
committed
Minor improvements
1 parent bdc7fe9 commit ce9b84a

File tree

4 files changed

+31
-30
lines changed

4 files changed

+31
-30
lines changed

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ mod transaction_manager;
130130
pub use self::mysql::AsyncMysqlConnection;
131131
#[cfg(feature = "mysql")]
132132
#[doc(inline)]
133-
pub use self::mysql::CancelToken;
133+
pub use self::mysql::MysqlCancelToken;
134134
#[cfg(feature = "postgres")]
135135
#[doc(inline)]
136136
pub use self::pg::AsyncPgConnection;

src/mysql/cancel_token.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ use crate::mysql::error_helper::ErrorHelper;
66
/// The capability to request cancellation of in-progress queries on a
77
/// connection.
88
#[derive(Clone)]
9-
pub struct CancelToken {
9+
pub struct MysqlCancelToken {
1010
pub(crate) opts: Opts,
1111
pub(crate) kill_id: u32,
1212
}
1313

14-
impl CancelToken {
14+
impl MysqlCancelToken {
1515
/// Attempts to cancel the in-progress query on the connection associated
1616
/// with this `CancelToken`.
1717
///
@@ -22,7 +22,7 @@ impl CancelToken {
2222
/// cancellation request will reach the server before the query terminates
2323
/// normally, or that the connection associated with this token is still
2424
/// active.
25-
pub async fn cancel_query(&self) -> Result<(), diesel::result::ConnectionError> {
25+
pub async fn cancel_query(&self) -> diesel::result::ConnectionResult<()> {
2626
let builder = OptsBuilder::from_opts(self.opts.clone());
2727

2828
let conn = mysql_async::Conn::new(builder).await.map_err(ErrorHelper)?;

src/mysql/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ mod error_helper;
2424
mod row;
2525
mod serialize;
2626

27-
pub use self::cancel_token::CancelToken;
27+
pub use self::cancel_token::MysqlCancelToken;
2828
use self::error_helper::ErrorHelper;
2929
use self::row::MysqlRow;
3030
use self::serialize::ToSqlHelper;
@@ -257,11 +257,11 @@ impl AsyncMysqlConnection {
257257
}
258258

259259
/// Constructs a cancellation token that can later be used to request cancellation of a query running on the connection associated with this client.
260-
pub fn cancel_token(&self) -> CancelToken {
260+
pub fn cancel_token(&self) -> MysqlCancelToken {
261261
let kill_id = self.conn.id();
262262
let opts = self.conn.opts().clone();
263263

264-
CancelToken { kill_id, opts }
264+
MysqlCancelToken { kill_id, opts }
265265
}
266266

267267
fn with_prepared_statement<'conn, T, F, R>(

tests/lib.rs

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,13 @@ async fn postgres_cancel_token() {
170170
#[cfg(feature = "mysql")]
171171
#[tokio::test]
172172
async fn mysql_cancel_token() {
173-
use std::time::Duration;
174-
175173
use diesel::result::{DatabaseErrorKind, Error};
174+
use std::time::Duration;
176175

177176
let (sender, receiver) = tokio::sync::oneshot::channel();
178177

179-
// execute a long-running query on a separate thread
180-
let task = tokio::spawn(async move {
178+
// execute a long-running query in a separate future
179+
let query_future = async move {
181180
let conn = &mut connection().await;
182181
let token = conn.cancel_token();
183182

@@ -187,28 +186,30 @@ async fn mysql_cancel_token() {
187186
.unwrap_or_else(|_| panic!("couldn't send token"));
188187

189188
diesel::dsl::sql::<diesel::sql_types::Integer>("SELECT SLEEP(5)")
190-
.load::<i32>(conn)
189+
.get_result::<i32>(conn)
191190
.await
192-
});
193-
194-
// wait for the cancellation token to be sent
195-
if let Ok(token) = receiver.await {
196-
// give the query time to start before invoking the token
197-
tokio::time::sleep(Duration::from_millis(500)).await;
198-
token.cancel_query().await.unwrap();
199-
}
191+
};
192+
let cancel_future = async move {
193+
// wait for the cancellation token to be sent
194+
if let Ok(token) = receiver.await {
195+
// give the query time to start before invoking the token
196+
tokio::time::sleep(Duration::from_millis(500)).await;
197+
token.cancel_query().await.unwrap();
198+
} else {
199+
panic!("Failed to receive cancel token");
200+
}
201+
};
202+
203+
let (task, _) = tokio::join!(query_future, cancel_future);
200204

201205
// make sure the query task resulted in a cancellation error or a return value of 1:
202-
match task.await.unwrap() {
203-
Err(e) => match e {
204-
Error::DatabaseError(DatabaseErrorKind::Unknown, v)
205-
if v.message() == "Query execution was interrupted" => {}
206-
_ => panic!("unexpected error: {:?}", e),
207-
},
208-
Ok(r) => match r[0] {
209-
1 => {}
210-
_ => panic!("query completed successfully without cancellation"),
211-
},
206+
match task {
207+
Err(Error::DatabaseError(DatabaseErrorKind::Unknown, v))
208+
if v.message() == "Query execution was interrupted" => {}
209+
Err(e) => panic!("unexpected error: {:?}", e),
210+
// mysql 8.4 returns 1 from a canceled sleep instead of an error
211+
Ok(1) => {}
212+
Ok(_) => panic!("query completed successfully without cancellation"),
212213
}
213214
}
214215

0 commit comments

Comments
 (0)