Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): use mdbx_txn_reset to time out transactions #6850

Merged
merged 22 commits into from Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/node-core/src/metrics/prometheus_exporter.rs
Expand Up @@ -102,6 +102,10 @@ where
describe_gauge!("db.table_pages", "The number of database pages for a table");
describe_gauge!("db.table_entries", "The number of entries for a table");
describe_gauge!("db.freelist", "The number of pages on the freelist");
describe_gauge!(
"db.timed_out_not_aborted_transactions",
"Number of timed out transactions that were not aborted by the user yet"
);
process.describe();
describe_memory_stats();
describe_io_stats();
Expand Down
16 changes: 12 additions & 4 deletions crates/storage/db/src/implementation/mdbx/mod.rs
Expand Up @@ -123,17 +123,19 @@ impl Database for DatabaseEnv {
type TXMut = tx::Tx<RW>;

fn tx(&self) -> Result<Self::TX, DatabaseError> {
Ok(Tx::new_with_metrics(
Tx::new_with_metrics(
self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
self.metrics.as_ref().cloned(),
))
)
.map_err(|e| DatabaseError::InitTx(e.into()))
}

fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
Ok(Tx::new_with_metrics(
Tx::new_with_metrics(
self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
self.metrics.as_ref().cloned(),
))
)
.map_err(|e| DatabaseError::InitTx(e.into()))
}
}

Expand Down Expand Up @@ -202,6 +204,12 @@ impl DatabaseMetrics for DatabaseEnv {
metrics.push(("db.freelist", freelist as f64, vec![]));
}

metrics.push((
"db.timed_out_not_aborted_transactions",
self.timed_out_not_aborted_transactions() as f64,
vec![],
));

metrics
}
}
Expand Down
24 changes: 13 additions & 11 deletions crates/storage/db/src/implementation/mdbx/tx.rs
Expand Up @@ -53,14 +53,16 @@ impl<K: TransactionKind> Tx<K> {
pub fn new_with_metrics(
inner: Transaction<K>,
env_metrics: Option<Arc<DatabaseEnvMetrics>>,
) -> Self {
let metrics_handler = env_metrics.map(|env_metrics| {
let handler = MetricsHandler::<K>::new(inner.id(), env_metrics);
handler.env_metrics.record_opened_transaction(handler.transaction_mode());
handler.log_transaction_opened();
handler
});
Self::new_inner(inner, metrics_handler)
) -> reth_libmdbx::Result<Self> {
let metrics_handler = env_metrics
.map(|env_metrics| {
let handler = MetricsHandler::<K>::new(inner.id()?, env_metrics);
handler.env_metrics.record_opened_transaction(handler.transaction_mode());
handler.log_transaction_opened();
Ok(handler)
})
.transpose()?;
Ok(Self::new_inner(inner, metrics_handler))
}

#[inline]
Expand All @@ -76,8 +78,8 @@ impl<K: TransactionKind> Tx<K> {
}

/// Gets this transaction ID.
pub fn id(&self) -> u64 {
self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| handler.txn_id)
pub fn id(&self) -> reth_libmdbx::Result<u64> {
self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))
}

/// Gets a table database handle if it exists, otherwise creates it.
Expand Down Expand Up @@ -437,7 +439,7 @@ mod tests {

assert_eq!(
tx.get::<tables::Transactions>(0).err(),
Some(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionAborted.into()))
Some(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionTimeout.into()))
); // Transaction is timeout-ed
assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
// Backtrace is recorded
Expand Down
26 changes: 14 additions & 12 deletions crates/storage/libmdbx-rs/benches/cursor.rs
Expand Up @@ -78,27 +78,29 @@ fn bench_get_seq_raw(c: &mut Criterion) {
let (_dir, env) = setup_bench_db(n);

let dbi = env.begin_ro_txn().unwrap().open_db(None).unwrap().dbi();
let _txn = env.begin_ro_txn().unwrap();
let txn = _txn.txn();
let txn = env.begin_ro_txn().unwrap();

let mut key = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let mut data = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let mut cursor: *mut MDBX_cursor = ptr::null_mut();

c.bench_function("bench_get_seq_raw", |b| {
b.iter(|| unsafe {
mdbx_cursor_open(txn, dbi, &mut cursor);
let mut i = 0;
let mut count = 0u32;
txn.txn_execute(|txn| {
mdbx_cursor_open(txn, dbi, &mut cursor);
let mut i = 0;
let mut count = 0u32;

while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 {
i += key.iov_len + data.iov_len;
count += 1;
}
while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 {
i += key.iov_len + data.iov_len;
count += 1;
}

black_box(i);
assert_eq!(count, n);
mdbx_cursor_close(cursor);
black_box(i);
assert_eq!(count, n);
mdbx_cursor_close(cursor);
})
.unwrap();
})
});
}
Expand Down
5 changes: 3 additions & 2 deletions crates/storage/libmdbx-rs/benches/transaction.rs
Expand Up @@ -46,7 +46,7 @@ fn bench_get_rand_raw(c: &mut Criterion) {

c.bench_function("bench_get_rand_raw", |b| {
b.iter(|| unsafe {
txn.with_raw_tx_ptr(|txn| {
txn.txn_execute(|txn| {
let mut i: size_t = 0;
for key in &keys {
key_val.iov_len = key.len() as size_t;
Expand All @@ -57,7 +57,8 @@ fn bench_get_rand_raw(c: &mut Criterion) {
i += key_val.iov_len;
}
black_box(i);
});
})
.unwrap();
})
});
}
Expand Down
79 changes: 43 additions & 36 deletions crates/storage/libmdbx-rs/src/cursor.rs
@@ -1,5 +1,5 @@
use crate::{
error::{mdbx_result, mdbx_result_with_tx_kind, Error, Result},
error::{mdbx_result, Error, Result},
flags::*,
mdbx_try_optional,
transaction::{TransactionKind, RW},
Expand Down Expand Up @@ -30,26 +30,26 @@ where
pub(crate) fn new(txn: Transaction<K>, dbi: ffi::MDBX_dbi) -> Result<Self> {
let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut();
unsafe {
mdbx_result_with_tx_kind::<K>(
txn.txn_execute(|txn| ffi::mdbx_cursor_open(txn, dbi, &mut cursor)),
txn.txn(),
txn.env().txn_manager(),
)?;
txn.txn_execute(|txn_ptr| {
mdbx_result(ffi::mdbx_cursor_open(txn_ptr, dbi, &mut cursor))
})??;
}
Ok(Self { txn, cursor })
}

fn new_at_position(other: &Self) -> Result<Self> {
unsafe {
let cursor = ffi::mdbx_cursor_create(ptr::null_mut());
other.txn.txn_execute(|_| {
let cursor = ffi::mdbx_cursor_create(ptr::null_mut());

let res = ffi::mdbx_cursor_copy(other.cursor(), cursor);
let res = ffi::mdbx_cursor_copy(other.cursor(), cursor);

let s = Self { txn: other.txn.clone(), cursor };
let s = Self { txn: other.txn.clone(), cursor };

mdbx_result_with_tx_kind::<K>(res, s.txn.txn(), s.txn.env().txn_manager())?;
mdbx_result(res)?;

Ok(s)
Ok(s)
})?
}
}

Expand Down Expand Up @@ -95,11 +95,12 @@ where
let key_ptr = key_val.iov_base;
let data_ptr = data_val.iov_base;
self.txn.txn_execute(|txn| {
let v = mdbx_result_with_tx_kind::<K>(
ffi::mdbx_cursor_get(self.cursor, &mut key_val, &mut data_val, op),
txn,
self.txn.env().txn_manager(),
)?;
let v = mdbx_result(ffi::mdbx_cursor_get(
self.cursor,
&mut key_val,
&mut data_val,
op,
))?;
assert_ne!(data_ptr, data_val.iov_base);
let key_out = {
// MDBX wrote in new key
Expand All @@ -111,7 +112,7 @@ where
};
let data_out = Value::decode_val::<K>(txn, data_val)?;
Ok((key_out, data_out, v))
})
})?
}
}

Expand Down Expand Up @@ -444,7 +445,7 @@ impl Cursor<RW> {
mdbx_result(unsafe {
self.txn.txn_execute(|_| {
ffi::mdbx_cursor_put(self.cursor, &key_val, &mut data_val, flags.bits())
})
})?
})?;

Ok(())
Expand All @@ -458,7 +459,7 @@ impl Cursor<RW> {
/// current key, if the database was opened with [DatabaseFlags::DUP_SORT].
pub fn del(&mut self, flags: WriteFlags) -> Result<()> {
mdbx_result(unsafe {
self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits()))
self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits()))?
})?;

Ok(())
Expand All @@ -470,7 +471,7 @@ where
K: TransactionKind,
{
fn clone(&self) -> Self {
self.txn.txn_execute(|_| Self::new_at_position(self).unwrap())
Self::new_at_position(self).unwrap()
}
}

Expand All @@ -488,7 +489,7 @@ where
K: TransactionKind,
{
fn drop(&mut self) {
self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) })
let _ = self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) });
}
}

Expand Down Expand Up @@ -564,7 +565,7 @@ where
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let op = mem::replace(op, *next_op);
unsafe {
cursor.txn.txn_execute(|txn| {
let result = cursor.txn.txn_execute(|txn| {
match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) {
ffi::MDBX_SUCCESS => {
let key = match Key::decode_val::<K>(txn, key) {
Expand All @@ -583,7 +584,11 @@ where
ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None,
error => Some(Err(Error::from_err_code(error))),
}
})
});
match result {
Ok(result) => result,
Err(err) => Some(Err(err)),
}
}
}
Self::Err(err) => err.take().map(Err),
Expand Down Expand Up @@ -655,7 +660,7 @@ where
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let op = mem::replace(op, *next_op);
unsafe {
cursor.txn.txn_execute(|txn| {
let result = cursor.txn.txn_execute(|txn| {
match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) {
ffi::MDBX_SUCCESS => {
let key = match Key::decode_val::<K>(txn, key) {
Expand All @@ -674,7 +679,11 @@ where
ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None,
error => Some(Err(Error::from_err_code(error))),
}
})
});
match result {
Ok(result) => result,
Err(err) => Some(Err(err)),
}
}
}
Iter::Err(err) => err.take().map(Err),
Expand Down Expand Up @@ -752,17 +761,15 @@ where
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let op = mem::replace(op, ffi::MDBX_NEXT_NODUP);

cursor.txn.txn_execute(|_| {
let err_code =
unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) };

(err_code == ffi::MDBX_SUCCESS).then(|| {
IntoIter::new(
Cursor::new_at_position(&**cursor).unwrap(),
ffi::MDBX_GET_CURRENT,
ffi::MDBX_NEXT_DUP,
)
})
let err_code =
unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) };

(err_code == ffi::MDBX_SUCCESS).then(|| {
IntoIter::new(
Cursor::new_at_position(&**cursor).unwrap(),
ffi::MDBX_GET_CURRENT,
ffi::MDBX_NEXT_DUP,
)
})
}
IterDup::Err(err) => err.take().map(|e| IntoIter::Err(Some(e))),
Expand Down
10 changes: 3 additions & 7 deletions crates/storage/libmdbx-rs/src/database.rs
@@ -1,5 +1,5 @@
use crate::{
error::{mdbx_result_with_tx_kind, Result},
error::{mdbx_result, Result},
transaction::TransactionKind,
Environment, Transaction,
};
Expand Down Expand Up @@ -31,12 +31,8 @@ impl Database {
let name_ptr = if let Some(c_name) = &c_name { c_name.as_ptr() } else { ptr::null() };
let mut dbi: ffi::MDBX_dbi = 0;
txn.txn_execute(|txn_ptr| {
mdbx_result_with_tx_kind::<K>(
unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) },
txn_ptr,
txn.env().txn_manager(),
)
})?;
mdbx_result(unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) })
})??;
Ok(Self::new_from_ptr(dbi, txn.env().clone()))
}

Expand Down
6 changes: 6 additions & 0 deletions crates/storage/libmdbx-rs/src/environment.rs
Expand Up @@ -88,6 +88,12 @@ impl Environment {
&self.inner.txn_manager
}

/// Returns the number of timed out transactions that were not aborted by the user yet.
#[cfg(feature = "read-tx-timeouts")]
pub fn timed_out_not_aborted_transactions(&self) -> usize {
self.inner.txn_manager.timed_out_not_aborted_read_transactions().unwrap_or(0)
}

/// Create a read-only transaction for use with the environment.
#[inline]
pub fn begin_ro_txn(&self) -> Result<Transaction<RO>> {
Expand Down