Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(db): race condition mdbx abort tx #6798

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ece6be5
Fix bug, race condition in MDBX bindings
emhane Feb 24, 2024
91bbeff
Use dashmap dep unconditionally
emhane Feb 26, 2024
e049933
Test abort rw transaction
emhane Feb 26, 2024
4ee6b0e
Test abort ro transaction
emhane Feb 26, 2024
a80094f
Move rw tests out of read timeouts module
emhane Feb 26, 2024
1def7ee
fixup! Move rw tests out of read timeouts module
emhane Feb 26, 2024
7490bde
Don't treat ReadTransactionAborted (already aborted) as an error to a…
emhane Feb 26, 2024
f1d211a
Add flags to TxnManagerMessage::Abort
emhane Feb 26, 2024
524cf49
Merge branch 'main' into emhane/race-condition-mdbx
emhane Feb 26, 2024
137ad26
Revert changes for abort write transaction
emhane Feb 27, 2024
b2636fc
Clean up abort ro timeout semantics
emhane Feb 27, 2024
63575ee
fixup! Revert changes for abort write transaction
emhane Feb 27, 2024
b5a7ee4
Add comment todo
emhane Feb 27, 2024
b74f818
Remove tests for rw txns that cannot be aborted in bg
emhane Feb 27, 2024
b46fdfd
Fix aborted ro tests
emhane Feb 27, 2024
6a91342
fixup! Revert changes for abort write transaction
emhane Feb 27, 2024
17055ff
Fix semantics for abort error on two layers: tx manager or mdbx
emhane Feb 27, 2024
7e1a884
Fix semantics
emhane Feb 27, 2024
14426b2
Update docs
emhane Feb 27, 2024
c732514
fixup! Fix semantics for abort error on two layers: tx manager or mdbx
emhane Feb 27, 2024
5b276c4
fixup! fixup! Fix semantics for abort error on two layers: tx manager…
emhane Feb 27, 2024
4f814ad
Merge interfaces aborted by user and aborted by bg thread
emhane Feb 27, 2024
a91af5b
Replace evict aborted after timeout with metrics for length of aborte…
emhane Feb 27, 2024
002e354
fixup! Replace evict aborted after timeout with metrics for length of…
emhane Feb 27, 2024
a54ced4
Merge branch 'main' into emhane/race-condition-mdbx
emhane Feb 27, 2024
a361128
Fix merge conflicts with main
emhane Feb 27, 2024
7886731
Only add to aborted set if aborted on timeout
emhane Feb 28, 2024
01f2af0
Add test for mdbx reassing txn ptr
emhane Feb 28, 2024
0cbeb37
Revert changes to manifest
emhane Feb 28, 2024
c38f8aa
Merge branch 'main' into emhane/race-condition-mdbx
emhane Feb 28, 2024
d939d39
Fix lint
emhane Feb 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/storage/libmdbx-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ ffi = { package = "reth-mdbx-sys", path = "./mdbx-sys" }
[target.'cfg(not(windows))'.dependencies]
libffi = "3.2.0"

# metrics
reth-metrics.workspace = true
metrics.workspace = true

[features]
default = []
return-borrowed = []
Expand Down
2 changes: 2 additions & 0 deletions crates/storage/libmdbx-rs/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ pub(crate) fn mdbx_result(err_code: c_int) -> Result<bool> {
}
}

/// Returns the appropriate error. Removes the transaction from aborted transactions, if present,
/// as a side-effect.
#[cfg(feature = "read-tx-timeouts")]
#[inline]
pub(crate) fn mdbx_result_with_tx_kind<K: TransactionKind>(
Expand Down
2 changes: 2 additions & 0 deletions crates/storage/libmdbx-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub use crate::{
},
error::{Error, Result},
flags::*,
metrics::*,
transaction::{CommitLatency, Transaction, TransactionKind, RO, RW},
};
pub mod ffi {
Expand All @@ -32,6 +33,7 @@ mod database;
mod environment;
mod error;
mod flags;
mod metrics;
mod transaction;
mod txn_manager;

Expand Down
10 changes: 10 additions & 0 deletions crates/storage/libmdbx-rs/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use metrics::Gauge;
use reth_metrics::Metrics;

#[derive(Metrics)]
#[metrics(scope = "storage.libmdbxrs.txn_manager")]
/// `TxnManager` metrics.
pub struct TxnManagerMetrics {
/// The number of aborted transactions that are currently tracked.
pub(super) aborted: Gauge,
}
19 changes: 14 additions & 5 deletions crates/storage/libmdbx-rs/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,17 +359,26 @@ where
if !self.has_committed() {
if K::IS_READ_ONLY {
#[cfg(feature = "read-tx-timeouts")]
self.env.txn_manager().remove_active_read_transaction(txn);
{
if let Some(true) = self.env.txn_manager().is_aborted_read_transaction(txn)
{
return
}

self.env.txn_manager().remove_active_read_transaction(txn);
}

unsafe {
ffi::mdbx_txn_abort(txn);
}
} else {
let (sender, rx) = sync_channel(0);
self.env
.txn_manager()
.send_message(TxnManagerMessage::Abort { tx: TxnPtr(txn), sender });
rx.recv().unwrap().unwrap();
self.env.txn_manager().send_message(TxnManagerMessage::Abort {
tx: TxnPtr(txn),
flags: K::OPEN_FLAGS,
sender,
});
rx.recv().unwrap().unwrap().unwrap();
}
}
})
Expand Down
119 changes: 91 additions & 28 deletions crates/storage/libmdbx-rs/src/txn_manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
environment::EnvPtr,
error::{mdbx_result, Result},
CommitLatency,
CommitLatency, Error,
};
use std::{
ptr,
Expand All @@ -15,15 +15,19 @@ unsafe impl Sync for TxnPtr {}

pub(crate) enum TxnManagerMessage {
Begin { parent: TxnPtr, flags: ffi::MDBX_txn_flags_t, sender: SyncSender<Result<TxnPtr>> },
Abort { tx: TxnPtr, sender: SyncSender<Result<bool>> },
Abort { tx: TxnPtr, flags: ffi::MDBX_txn_flags_t, sender: SyncSender<Result<Result<bool>>> },
Commit { tx: TxnPtr, sender: SyncSender<Result<(bool, CommitLatency)>> },
}

/// Manages transactions by doing two things:
/// - Opening, aborting, and committing transactions using [TxnManager::send_message] with the
/// corresponding [TxnManagerMessage]
/// - Aborting long-lived read transactions (if the `read-tx-timeouts` feature is enabled and
/// `TxnManager::with_max_read_transaction_duration` is called)
/// - Opening, aborting, and committing transactions using [`TxnManager::send_message`] with the
/// corresponding [TxnManagerMessage]. This is mainly since MDBX requires that write transactions
/// are opened from the same thread.
/// - API for tracking active and aborted long-lived read transactions (if the `read-tx-timeouts`
/// feature is enabled and [`TxnManager::new_with_max_read_transaction_duration`] constructor is
/// used).
/// - Aborting long-lived read transactions in the background (if the `read-tx-timeouts` feature is
/// enabled and [`TxnManager::new_with_max_read_transaction_duration`] constructor is used).
#[derive(Debug)]
pub(crate) struct TxnManager {
sender: SyncSender<TxnManagerMessage>,
Expand Down Expand Up @@ -54,6 +58,8 @@ impl TxnManager {
fn start_message_listener(&self, env: EnvPtr, rx: Receiver<TxnManagerMessage>) {
#[cfg(feature = "read-tx-timeouts")]
let read_transactions = self.read_transactions.clone();
#[cfg(feature = "read-tx-timeouts")]
use crate::transaction::TransactionKind;

std::thread::spawn(move || {
#[allow(clippy::redundant_locals)]
Expand All @@ -76,8 +82,6 @@ impl TxnManager {

#[cfg(feature = "read-tx-timeouts")]
{
use crate::transaction::TransactionKind;

if res.is_ok() && flags == crate::transaction::RO::OPEN_FLAGS {
if let Some(read_transactions) = &read_transactions {
read_transactions.add_active(txn);
Expand All @@ -87,13 +91,25 @@ impl TxnManager {

sender.send(res).unwrap();
}
TxnManagerMessage::Abort { tx, sender } => {
TxnManagerMessage::Abort { tx, flags, sender } => {
#[cfg(feature = "read-tx-timeouts")]
if let Some(read_transactions) = &read_transactions {
read_transactions.remove_active(tx.0);
{
if flags == crate::transaction::RO::OPEN_FLAGS {
if let Some(read_transactions) = &read_transactions {
if read_transactions.is_aborted(tx.0) {
sender
.send(Err(Error::ReadTransactionAborted))
.unwrap();
continue
}

read_transactions.remove_active(tx.0);
}
}
}

sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
let res = mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) });
sender.send(Ok(res)).unwrap();
}
TxnManagerMessage::Commit { tx, sender } => {
#[cfg(feature = "read-tx-timeouts")]
Expand Down Expand Up @@ -125,7 +141,10 @@ impl TxnManager {

#[cfg(feature = "read-tx-timeouts")]
mod read_transactions {
use crate::{environment::EnvPtr, error::mdbx_result, txn_manager::TxnManager, Error};
use crate::{
environment::EnvPtr, error::mdbx_result, metrics::TxnManagerMetrics,
txn_manager::TxnManager, Error,
};
use dashmap::{DashMap, DashSet};
use std::{
sync::{mpsc::sync_channel, Arc},
Expand Down Expand Up @@ -169,6 +188,11 @@ mod read_transactions {
self.read_transactions.as_ref()?.remove_active(ptr)
}

/// Returns `Some(true)` if a transaction has already been aborted due to timeout.
pub(crate) fn is_aborted_read_transaction(&self, ptr: *mut ffi::MDBX_txn) -> Option<bool> {
Some(self.read_transactions.as_ref()?.is_aborted(ptr))
}

/// Removes a transaction from the list of aborted read transactions.
pub(crate) fn remove_aborted_read_transaction(
&self,
Expand All @@ -188,12 +212,14 @@ mod read_transactions {
/// We store `usize` instead of a raw pointer as a key, because pointers are not
/// comparable. The time of transaction opening is stored as a value.
active: DashMap<usize, Instant>,
/// List of read transactions aborted by the [ReadTransactions::start_monitor].
/// We keep them until user tries to abort the transaction, so we're able to report a nice
/// List of read transactions aborted by the [ReadTransactions::start_monitor]. We keep
/// them until the user tries to reuse, so we're able to report a nice
/// [Error::ReadTransactionAborted] error.
///
/// We store `usize` instead of a raw pointer, because pointers are not comparable.
aborted: DashSet<usize>,
#[doc(hidden)]
metrics: TxnManagerMetrics,
}

impl ReadTransactions {
Expand All @@ -211,9 +237,15 @@ mod read_transactions {
self.active.remove(&(ptr as usize))
}

/// Adds a new transaction to the list of aborted read transactions.
pub(super) fn add_aborted(&self, ptr: *mut ffi::MDBX_txn) {
self.aborted.insert(ptr as usize);
/// Adds a new transaction to the set of aborted read transactions. Returns `true` if the
/// transaction isn't already in the set.
fn add_aborted(&self, ptr: *mut ffi::MDBX_txn) -> bool {
self.aborted.insert(ptr as usize)
}

/// Returns `true` if the transaction is in set of aborted transactions.
pub(super) fn is_aborted(&self, ptr: *mut ffi::MDBX_txn) -> bool {
self.aborted.contains(&(ptr as usize))
}

/// Removes a transaction from the list of aborted read transactions.
Expand All @@ -234,6 +266,7 @@ mod read_transactions {

loop {
let now = Instant::now();

let mut max_active_transaction_duration = None;

// Iterate through active read transactions and abort those that's open for
Expand All @@ -247,13 +280,13 @@ mod read_transactions {

// Add the transaction to the list of aborted transactions, so further
// usages report the correct error when the transaction is closed.
self.add_aborted(ptr);
_ = self.add_aborted(ptr);

// Abort the transaction
let result = mdbx_result(unsafe { ffi::mdbx_txn_abort(ptr) });

// Add the transaction to `aborted_active`. We can't remove it instantly
// from the list of active transactions, because we iterate through it.
// Add the transaction to `aborted_active`. We can't remove it
// instantly from the list of active transactions, because we iterate
// through it.
aborted_active.push((ptr, duration, result.err()));
} else {
max_active_transaction_duration = Some(
Expand All @@ -268,13 +301,9 @@ mod read_transactions {
// Try deleting the transaction from the list of active transactions.
let was_in_active = self.remove_active(ptr).is_some();
if let Some(err) = err {
// If there was an error when aborting the transaction, we need to
// remove it from the list of aborted transactions, because otherwise it
// will stay there forever.
self.remove_aborted(ptr);
if was_in_active && err != Error::BadSignature {
// If the transaction was in the list of active transactions and the
// error code is not `EBADSIGN`, then user didn't abort it.
// If the transaction was in the list of active transactions and
// the user didn't abort it.
error!(target: "libmdbx", %err, ?open_duration, "Failed to abort the long-lived read transactions");
}
} else {
Expand All @@ -300,6 +329,8 @@ mod read_transactions {
);
}

self.update_monitor_metrics();

// Sleep not more than `READ_TRANSACTIONS_CHECK_INTERVAL`, but at least until
// the closest deadline of an active read transaction
let duration_until_closest_deadline =
Expand All @@ -310,6 +341,10 @@ mod read_transactions {
}
});
}

fn update_monitor_metrics(&self) {
self.metrics.aborted.set(self.aborted.len() as f64)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -452,5 +487,33 @@ mod read_transactions {
assert!(!read_transactions.aborted.contains(&tx_ptr));
}
}

#[test]
fn txn_manager_abort_timed_out_transaction() {
const MAX_DURATION: Duration = Duration::from_secs(1);

let dir = tempdir().unwrap();
let env = Environment::builder()
.set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
.open(dir.path())
.unwrap();

assert!(env.txn_manager().read_transactions.is_some());

// Create a ro transaction. Abort it by letting it time out.
let tx = env.begin_ro_txn().unwrap();
let tx_ptr = TxnPtr(tx.txn());

sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);

// Attempt to abort again throws error.
let (sender, rx) = sync_channel(0);
env.txn_manager().send_message(TxnManagerMessage::Abort {
tx: tx_ptr,
flags: RO::OPEN_FLAGS,
sender,
});
assert_eq!(Err(Error::ReadTransactionAborted), rx.recv().unwrap());
}
}
}
Loading