diff --git a/query-engine/driver-adapters/src/async_js_function.rs b/query-engine/driver-adapters/src/async_js_function.rs index 5f535334ffb9..4926534f58b1 100644 --- a/query-engine/driver-adapters/src/async_js_function.rs +++ b/query-engine/driver-adapters/src/async_js_function.rs @@ -55,6 +55,10 @@ where .map_err(into_quaint_error)?; js_result.into() } + + pub(crate) fn as_raw(&self) -> &ThreadsafeFunction { + &self.threadsafe_fn + } } impl FromNapiValue for AsyncJsFunction diff --git a/query-engine/driver-adapters/src/proxy.rs b/query-engine/driver-adapters/src/proxy.rs index 642c2491757a..000203c0d228 100644 --- a/query-engine/driver-adapters/src/proxy.rs +++ b/query-engine/driver-adapters/src/proxy.rs @@ -1,12 +1,12 @@ use std::borrow::Cow; use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; use crate::async_js_function::AsyncJsFunction; use crate::conversion::JSArg; use crate::transaction::JsTransaction; use metrics::increment_gauge; use napi::bindgen_prelude::{FromNapiValue, ToNapiValue}; -use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction}; use napi::{JsObject, JsString}; use napi_derive::napi; use quaint::connector::ResultSet as QuaintResultSet; @@ -52,9 +52,8 @@ pub(crate) struct TransactionProxy { /// rollback transaction rollback: AsyncJsFunction<(), ()>, - /// dispose transaction, cleanup logic executed at the end of the transaction lifecycle - /// on drop. - dispose: ThreadsafeFunction<(), ErrorStrategy::Fatal>, + /// whether the transaction has already been committed or rolled back + closed: AtomicBool, } /// This result set is more convenient to be manipulated from both Rust and NodeJS. @@ -581,14 +580,13 @@ impl TransactionProxy { pub fn new(js_transaction: &JsObject) -> napi::Result { let commit = js_transaction.get_named_property("commit")?; let rollback = js_transaction.get_named_property("rollback")?; - let dispose = js_transaction.get_named_property("dispose")?; let options = js_transaction.get_named_property("options")?; Ok(Self { commit, rollback, - dispose, options, + closed: AtomicBool::new(false), }) } @@ -596,19 +594,36 @@ impl TransactionProxy { &self.options } + /// Commits the transaction via the driver adapter. + /// + /// Cancelation safety: [`TransactionProxy::closed`] is only set after the call to JS finishes, + /// so the destructor will ensure the transaction is closed even if the future is dropped. pub async fn commit(&self) -> quaint::Result<()> { - self.commit.call(()).await + let result = self.commit.call(()).await; + self.closed.swap(true, Ordering::Release); + result } + /// Rolls back the transaction via the driver adapter. + /// + /// Cancelation safety: [`TransactionProxy::closed`] is only set after the call to JS finishes, + /// so the destructor will ensure the transaction is closed even if the future is dropped. pub async fn rollback(&self) -> quaint::Result<()> { - self.rollback.call(()).await + let result = self.rollback.call(()).await; + self.closed.swap(true, Ordering::Release); + result } } impl Drop for TransactionProxy { fn drop(&mut self) { + if self.closed.swap(true, Ordering::Acquire) { + return; + } + _ = self - .dispose + .rollback + .as_raw() .call((), napi::threadsafe_function::ThreadsafeFunctionCallMode::NonBlocking); } }