Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(db): race condition mdbx abort tx #6798

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ece6be5
Fix bug, race condition in MDBX bindings
emhane Feb 24, 2024
91bbeff
Use dashmap dep unconditionally
emhane Feb 26, 2024
e049933
Test abort rw transaction
emhane Feb 26, 2024
4ee6b0e
Test abort ro transaction
emhane Feb 26, 2024
a80094f
Move rw tests out of read timeouts module
emhane Feb 26, 2024
1def7ee
fixup! Move rw tests out of read timeouts module
emhane Feb 26, 2024
7490bde
Don't treat ReadTransactionAborted (already aborted) as an error to a…
emhane Feb 26, 2024
f1d211a
Add flags to TxnManagerMessage::Abort
emhane Feb 26, 2024
524cf49
Merge branch 'main' into emhane/race-condition-mdbx
emhane Feb 26, 2024
137ad26
Revert changes for abort write transaction
emhane Feb 27, 2024
b2636fc
Clean up abort ro timeout semantics
emhane Feb 27, 2024
63575ee
fixup! Revert changes for abort write transaction
emhane Feb 27, 2024
b5a7ee4
Add comment todo
emhane Feb 27, 2024
b74f818
Remove tests for rw txns that cannot be aborted in bg
emhane Feb 27, 2024
b46fdfd
Fix aborted ro tests
emhane Feb 27, 2024
6a91342
fixup! Revert changes for abort write transaction
emhane Feb 27, 2024
17055ff
Fix semantics for abort error on two layers: tx manager or mdbx
emhane Feb 27, 2024
7e1a884
Fix semantics
emhane Feb 27, 2024
14426b2
Update docs
emhane Feb 27, 2024
c732514
fixup! Fix semantics for abort error on two layers: tx manager or mdbx
emhane Feb 27, 2024
5b276c4
fixup! fixup! Fix semantics for abort error on two layers: tx manager…
emhane Feb 27, 2024
4f814ad
Merge interfaces aborted by user and aborted by bg thread
emhane Feb 27, 2024
a91af5b
Replace evict aborted after timeout with metrics for length of aborte…
emhane Feb 27, 2024
002e354
fixup! Replace evict aborted after timeout with metrics for length of…
emhane Feb 27, 2024
a54ced4
Merge branch 'main' into emhane/race-condition-mdbx
emhane Feb 27, 2024
a361128
Fix merge conflicts with main
emhane Feb 27, 2024
7886731
Only add to aborted set if aborted on timeout
emhane Feb 28, 2024
01f2af0
Add test for mdbx reassing txn ptr
emhane Feb 28, 2024
0cbeb37
Revert changes to manifest
emhane Feb 28, 2024
c38f8aa
Merge branch 'main' into emhane/race-condition-mdbx
emhane Feb 28, 2024
d939d39
Fix lint
emhane Feb 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/storage/libmdbx-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ indexmap = "2"
libc = "0.2"
parking_lot.workspace = true
thiserror.workspace = true
dashmap = { version = "5.5.3", features = ["inline"], optional = true }
dashmap = { version = "5.5.3", features = ["inline"] }
tracing = { workspace = true, optional = true }

ffi = { package = "reth-mdbx-sys", path = "./mdbx-sys" }
Expand All @@ -33,7 +33,7 @@ libffi = "3.2.0"
[features]
default = []
return-borrowed = []
read-tx-timeouts = ["dashmap", "dashmap/inline", "tracing"]
read-tx-timeouts = ["tracing"]
emhane marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
pprof = { workspace = true, features = ["flamegraph", "frame-pointer", "criterion"] }
Expand Down
9 changes: 5 additions & 4 deletions crates/storage/libmdbx-rs/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,16 +705,17 @@ impl EnvironmentBuilder {

#[cfg(feature = "read-tx-timeouts")]
let txn_manager = {
let mut txn_manager = TxnManager::new(EnvPtr(env));
let env_ptr = EnvPtr(env);
if let crate::MaxReadTransactionDuration::Set(duration) = self
.max_read_transaction_duration
.unwrap_or(read_transactions::MaxReadTransactionDuration::Set(
DEFAULT_MAX_READ_TRANSACTION_DURATION,
))
{
txn_manager = txn_manager.with_max_read_transaction_duration(duration);
};
txn_manager
TxnManager::new_with_max_read_transaction_duration(env_ptr, duration)
} else {
TxnManager::new(env_ptr)
}
};

let env = EnvironmentInner { env, txn_manager, env_kind: self.kind };
Expand Down
6 changes: 5 additions & 1 deletion crates/storage/libmdbx-rs/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ pub enum Error {
WriteTransactionUnsupportedInReadOnlyMode,
#[error("read transaction has been aborted by the transaction manager")]
ReadTransactionAborted,
#[error("write transaction has been aborted by the transaction manager")]
RWTransactionAborted,
/// Unknown error code.
#[error("unknown error code")]
Other(i32),
Expand Down Expand Up @@ -193,7 +195,9 @@ impl Error {
Error::DecodeErrorLenDiff | Error::DecodeError => ffi::MDBX_EINVAL,
Error::Access => ffi::MDBX_EACCESS,
Error::TooLarge => ffi::MDBX_TOO_LARGE,
Error::BadSignature | Error::ReadTransactionAborted => ffi::MDBX_EBADSIGN,
Error::BadSignature | Error::ReadTransactionAborted | Error::RWTransactionAborted => {
ffi::MDBX_EBADSIGN
}
Error::WriteTransactionUnsupportedInReadOnlyMode => ffi::MDBX_EACCESS,
Error::NestedTransactionsUnsupportedWithWriteMap => ffi::MDBX_EACCESS,
Error::Other(err_code) => *err_code,
Expand Down
21 changes: 16 additions & 5 deletions crates/storage/libmdbx-rs/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,17 +359,28 @@ where
if !self.has_committed() {
if K::IS_READ_ONLY {
#[cfg(feature = "read-tx-timeouts")]
self.env.txn_manager().remove_active_read_transaction(txn);
{
self.env.txn_manager().remove_active_read_transaction(txn);

let new_aborted_ro =
self.env.txn_manager().add_aborted_read_transaction(txn);
if let Some(false) = new_aborted_ro {
return
}
}

unsafe {
ffi::mdbx_txn_abort(txn);
}
} else {
let (sender, rx) = sync_channel(0);
self.env
.txn_manager()
.send_message(TxnManagerMessage::Abort { tx: TxnPtr(txn), sender });
rx.recv().unwrap().unwrap();
self.env.txn_manager().send_message(TxnManagerMessage::Abort {
tx: TxnPtr(txn),
flags: K::OPEN_FLAGS,
sender,
});
let res = rx.recv().unwrap();
assert!(res == Err(Error::RWTransactionAborted) || res.is_ok())
}
}
})
Expand Down
Loading
Loading