Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): use mdbx_txn_reset to time out transactions #6850

Merged
merged 22 commits into from Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 38 additions & 10 deletions crates/storage/libmdbx-rs/src/transaction.rs
Expand Up @@ -6,7 +6,7 @@ use crate::{
txn_manager::{TxnManagerMessage, TxnPtr},
Cursor, Error, Stat, TableObject,
};
use ffi::{MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE};
use ffi::{mdbx_txn_renew, MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE};
use indexmap::IndexSet;
use libc::{c_uint, c_void};
use parking_lot::Mutex;
Expand Down Expand Up @@ -320,7 +320,7 @@ where
where
F: FnOnce(*mut ffi::MDBX_txn) -> T,
{
self.txn.txn_execute(f)
self.txn.txn_execute_fail_on_timeout(f)
}
}

Expand All @@ -329,7 +329,9 @@ where
K: TransactionKind,
{
fn drop(&mut self) {
let _ = self.txn_execute(|txn| {
// To be able to abort a timed out transaction, we need to renew it first.
// Hence the usage of `txn_execute_renew_on_timeout` here.
let _ = self.txn.txn_execute_renew_on_timeout(|txn| {
if !self.has_committed() {
if K::IS_READ_ONLY {
#[cfg(feature = "read-tx-timeouts")]
Expand Down Expand Up @@ -528,27 +530,53 @@ impl TransactionPtr {
Self { txn, lock: Arc::new(Mutex::new(())) }
}

// Returns `true` if the transaction is timed out.
//
// When transaction is timed out via `TxnManager`, it's actually reset using
// `mdbx_txn_reset`. It makes the transaction unusable (MDBX fails on any usages of such
// transactions), and sets the `MDBX_TXN_FINISHED` flag.
fn is_timed_out(&self) -> bool {
(unsafe { ffi::mdbx_txn_flags(self.txn) } & ffi::MDBX_TXN_FINISHED) != 0
}

/// Executes the given closure once the lock on the transaction is acquired.
///
/// Returns the result of the closure or an error if the transaction is timed out.
#[inline]
pub(crate) fn txn_execute<F, T>(&self, f: F) -> Result<T>
pub(crate) fn txn_execute_fail_on_timeout<F, T>(&self, f: F) -> Result<T>
where
F: FnOnce(*mut ffi::MDBX_txn) -> T,
{
let _lck = self.lock.lock();

// When transaction is timed out via `TxnManager`, it's actually reset using
// `mdbx_txn_reset` that makes the transaction unusable and sets the
// `MDBX_TXN_FINISHED` flag.
//
// No race condition with the `TxnManager` timeouting our transaction is possible here,
// No race condition with the `TxnManager` timing out the transaction is possible here,
// because we're taking a lock for any actions on the transaction pointer, including a call
// to the `mdbx_txn_reset`.
if unsafe { ffi::mdbx_txn_flags(self.txn) } & ffi::MDBX_TXN_FINISHED != 0 {
if self.is_timed_out() {
return Err(Error::ReadTransactionTimeout)
}

Ok((f)(self.txn))
}

/// Executes the given closure once the lock on the transaction is acquired. If the tranasction
/// is timed out, it will be renewed first.
///
/// Returns the result of the closure or an error if the transaction renewal fails.
#[inline]
fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
where
F: FnOnce(*mut ffi::MDBX_txn) -> T,
{
let _lck = self.lock.lock();

// To be able to do any operations on the transaction, we need to renew it first.
if self.is_timed_out() {
mdbx_result(unsafe { mdbx_txn_renew(self.txn) })?;
}

Ok((f)(self.txn))
}
}

/// Commit latencies info.
Expand Down
26 changes: 19 additions & 7 deletions crates/storage/libmdbx-rs/src/txn_manager.rs
Expand Up @@ -216,7 +216,7 @@ mod read_transactions {
let duration = now - *start;

if duration > self.max_duration {
let result = tx.txn_execute(|txn_ptr| {
let result = tx.txn_execute_fail_on_timeout(|txn_ptr| {
(
txn_ptr,
duration,
Expand Down Expand Up @@ -344,29 +344,41 @@ mod read_transactions {
assert!(!read_transactions.active.contains_key(&tx_ptr));
}

// Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the
// manager kills it, use it two times and observe the `Error::ReadTransactionTimeout`
// error. Also, ensure that the transaction pointer is not reused when opening a new
// read-only transaction.
{
// Create a read-only transaction and observe it's in the liist of active
shekhirin marked this conversation as resolved.
Show resolved Hide resolved
// transactions.
let tx = env.begin_ro_txn().unwrap();
let tx_ptr = tx.txn() as usize;
assert!(read_transactions.active.contains_key(&tx_ptr));

// Wait until the transaction is timed out by the manager.
sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);

// Ensure that the transaction is not in the list of active transactions anymore,
// and is in the list of timed out but not aborted transactions.
assert!(!read_transactions.active.contains_key(&tx_ptr));
assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));

// Use the timed out transaction and observe the `Error::ReadTransactionTimeout`
assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionTimeout));
assert!(!read_transactions.active.contains_key(&tx_ptr));
assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));

assert_eq!(tx.id().err(), Some(Error::ReadTransactionTimeout));
assert!(!read_transactions.active.contains_key(&tx_ptr));
assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));

let tx = env.begin_ro_txn().unwrap();
let new_tx_ptr = tx.txn() as usize;
// Ensure that the transaction pointer is not reused when opening a new read-only
// transaction.
let new_tx = env.begin_ro_txn().unwrap();
let new_tx_ptr = new_tx.txn() as usize;
assert!(read_transactions.active.contains_key(&new_tx_ptr));
assert_ne!(tx_ptr, new_tx_ptr);

// Drop the transaction and ensure that it's not in the list of timed out but not
// aborted transactions anymore.
drop(tx);
assert!(!read_transactions.timed_out_not_aborted.contains(&tx_ptr));
}
}

Expand Down