From 5250abf9f467e1110ff34d725be2bc193820c273 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 7 Apr 2026 16:21:37 -0400 Subject: [PATCH 1/5] feat: add begin_ro_sync_multi and begin_ro_unsync_multi to Environment Open N read-only transactions guaranteed to share the same MVCC snapshot, enabling safe parallel iteration over large tables. Uses an optimistic open-and-verify loop with a hardcoded 16-retry limit, returning MdbxError::SnapshotDivergence if retries are exhausted. ENG-2135 Co-Authored-By: Claude Opus 4.6 (1M context) --- src/error.rs | 7 +++ src/sys/environment.rs | 86 +++++++++++++++++++++++++++++ tests/multi_ro.rs | 120 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 213 insertions(+) create mode 100644 tests/multi_ro.rs diff --git a/src/error.rs b/src/error.rs index a640414..edc2c98 100644 --- a/src/error.rs +++ b/src/error.rs @@ -185,6 +185,12 @@ pub enum MdbxError { /// Operation requires DUP_FIXED flag on database. #[error("operation requires DUP_FIXED flag on database")] RequiresDupFixed, + /// Failed to open multiple read transactions on the same MVCC snapshot. + /// + /// Concurrent write pressure prevented acquiring a consistent set of + /// read transactions within the retry limit. + #[error("failed to acquire consistent MVCC snapshot across multiple read transactions")] + SnapshotDivergence, } impl MdbxError { @@ -263,6 +269,7 @@ impl MdbxError { Self::BotchedTransaction => -96001, Self::RequiresDupSort => -96002, Self::RequiresDupFixed => -96003, + Self::SnapshotDivergence => -96004, Self::Permission => ffi::MDBX_EPERM, Self::Other(err_code) => *err_code, } diff --git a/src/sys/environment.rs b/src/sys/environment.rs index 7ce6926..7f463c8 100644 --- a/src/sys/environment.rs +++ b/src/sys/environment.rs @@ -116,6 +116,92 @@ impl Environment { RwTxUnsync::begin(self.clone()) } + /// Open `n` read-only synchronized transactions guaranteed to share the + /// same MVCC snapshot. + /// + /// This enables safe parallel iteration over large tables using multiple + /// cursors on separate threads without risking snapshot divergence. + /// + /// Uses an optimistic open-and-verify loop: transactions are opened + /// sequentially, then their snapshot IDs are compared. If a writer + /// commits between opens causing divergence, all transactions are + /// dropped and the process retries. Returns + /// [`MdbxError::SnapshotDivergence`] if retries are exhausted. + /// + /// # Examples + /// + /// ```no_run + /// # use signet_libmdbx::{Environment, Geometry, MdbxResult}; + /// # use std::path::Path; + /// # fn main() -> MdbxResult<()> { + /// let env = Environment::builder() + /// .set_geometry(Geometry { + /// size: Some(0..(1024 * 1024 * 1024)), + /// ..Default::default() + /// }) + /// .open(Path::new("/tmp/my_database"))?; + /// + /// // Open 4 read transactions on the same snapshot + /// let txns = env.begin_ro_sync_multi(4)?; + /// // All transactions see the same data + /// # Ok(()) + /// # } + /// ``` + pub fn begin_ro_sync_multi(&self, n: usize) -> MdbxResult> { + self.begin_ro_multi(n, Self::begin_ro_sync, |tx| tx.id()) + } + + /// Open `n` read-only unsynchronized transactions guaranteed to share the + /// same MVCC snapshot. + /// + /// This is the `!Sync` counterpart to [`begin_ro_sync_multi`]. The + /// returned transactions cannot be shared between threads, but offer + /// ~30% lower overhead per operation. + /// + /// See [`begin_ro_sync_multi`] for details on the optimistic retry + /// behavior. + /// + /// [`begin_ro_sync_multi`]: Self::begin_ro_sync_multi + pub fn begin_ro_unsync_multi(&self, n: usize) -> MdbxResult> { + self.begin_ro_multi(n, Self::begin_ro_unsync, |tx| tx.id()) + } + + /// Maximum retry count for the optimistic snapshot-matching loop in + /// [`begin_ro_sync_multi`] and [`begin_ro_unsync_multi`]. + const MAX_MULTI_RETRIES: usize = 16; + + /// Open `n` read-only transactions guaranteed to share the same MVCC + /// snapshot. + /// + /// This is the generic implementation backing both + /// [`begin_ro_sync_multi`] and [`begin_ro_unsync_multi`]. + /// + /// [`begin_ro_sync_multi`]: Self::begin_ro_sync_multi + /// [`begin_ro_unsync_multi`]: Self::begin_ro_unsync_multi + fn begin_ro_multi( + &self, + n: usize, + begin: fn(&Self) -> MdbxResult, + id: fn(&T) -> MdbxResult, + ) -> MdbxResult> { + if n <= 1 { + return (n == 1) + .then(|| begin(self)) + .map_or_else(|| Ok(Vec::new()), |r| r.map(|t| vec![t])); + } + + for _ in 0..Self::MAX_MULTI_RETRIES { + let txns: Vec = (0..n).map(|_| begin(self)).collect::>()?; + + let first_id = id(&txns[0])?; + if txns[1..].iter().all(|tx| id(tx) == Ok(first_id)) { + return Ok(txns); + } + } + + Err(MdbxError::SnapshotDivergence) + } + /// Returns a raw pointer to the underlying MDBX environment. /// /// The caller **must** ensure that the pointer is never dereferenced after the environment has diff --git a/tests/multi_ro.rs b/tests/multi_ro.rs new file mode 100644 index 0000000..c8b6b00 --- /dev/null +++ b/tests/multi_ro.rs @@ -0,0 +1,120 @@ +#![allow(missing_docs)] +use signet_libmdbx::*; +use std::{ + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, + thread, +}; +use tempfile::tempdir; + +#[test] +fn begin_ro_sync_multi_all_same_snapshot() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = env.begin_rw_sync().unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"k", b"v", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + let txns = env.begin_ro_sync_multi(4).unwrap(); + assert_eq!(txns.len(), 4); + + let ids: Vec = txns.iter().map(|tx| tx.id().unwrap()).collect(); + assert!(ids.windows(2).all(|w| w[0] == w[1])); +} + +#[test] +fn begin_ro_unsync_multi_all_same_snapshot() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = env.begin_rw_sync().unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"k", b"v", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + let txns = env.begin_ro_unsync_multi(4).unwrap(); + assert_eq!(txns.len(), 4); + + let ids: Vec = txns.iter().map(|tx| tx.id().unwrap()).collect(); + assert!(ids.windows(2).all(|w| w[0] == w[1])); +} + +#[test] +fn begin_ro_sync_multi_zero_returns_empty() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txns = env.begin_ro_sync_multi(0).unwrap(); + assert!(txns.is_empty()); +} + +#[test] +fn begin_ro_unsync_multi_zero_returns_empty() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txns = env.begin_ro_unsync_multi(0).unwrap(); + assert!(txns.is_empty()); +} + +#[test] +fn begin_ro_sync_multi_one_returns_single() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txns = env.begin_ro_sync_multi(1).unwrap(); + assert_eq!(txns.len(), 1); +} + +#[test] +fn begin_ro_unsync_multi_one_returns_single() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txns = env.begin_ro_unsync_multi(1).unwrap(); + assert_eq!(txns.len(), 1); +} + +#[test] +fn begin_ro_sync_multi_consistent_under_write_pressure() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + // Seed the database + let txn = env.begin_rw_sync().unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"k", b"v", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + let stop = Arc::new(AtomicBool::new(false)); + + // Writer thread: commit as fast as possible to create snapshot churn + let writer = { + let env = env.clone(); + let stop = Arc::clone(&stop); + thread::spawn(move || { + let mut i = 0u64; + while !stop.load(Ordering::Relaxed) { + let txn = env.begin_rw_sync().unwrap(); + let db = txn.open_db(None).unwrap(); + txn.put(db, b"counter", i.to_le_bytes(), WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + i += 1; + } + }) + }; + + // Open multi-txn sets concurrently with writer + for _ in 0..20 { + let txns = env.begin_ro_sync_multi(4).unwrap(); + let ids: Vec = txns.iter().map(|tx| tx.id().unwrap()).collect(); + assert!(ids.windows(2).all(|w| w[0] == w[1]), "snapshot divergence detected: {ids:?}"); + } + + stop.store(true, Ordering::Relaxed); + writer.join().unwrap(); +} From 207bb88fd78b72c255bd1365edf268d124f8f943 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 7 Apr 2026 16:33:47 -0400 Subject: [PATCH 2/5] fix: resolve rustdoc links on MAX_MULTI_RETRIES constant Co-Authored-By: Claude Opus 4.6 (1M context) --- src/sys/environment.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sys/environment.rs b/src/sys/environment.rs index 7f463c8..dea32ab 100644 --- a/src/sys/environment.rs +++ b/src/sys/environment.rs @@ -167,7 +167,7 @@ impl Environment { } /// Maximum retry count for the optimistic snapshot-matching loop in - /// [`begin_ro_sync_multi`] and [`begin_ro_unsync_multi`]. + /// [`Self::begin_ro_sync_multi`] and [`Self::begin_ro_unsync_multi`]. const MAX_MULTI_RETRIES: usize = 16; /// Open `n` read-only transactions guaranteed to share the same MVCC From be39cb3bac3f6d9d7f1e994263c0f3377f1dfe37 Mon Sep 17 00:00:00 2001 From: James Date: Thu, 9 Apr 2026 12:04:56 -0400 Subject: [PATCH 3/5] refactor: move MAX_MULTI_RETRIES to module level, split n<=1 cases Address PR feedback: move the constant to the top of the file with other module-level items, and replace the chained then/map_or_else with explicit n==0 and n==1 branches for readability. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/sys/environment.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/sys/environment.rs b/src/sys/environment.rs index dea32ab..b5d6b8e 100644 --- a/src/sys/environment.rs +++ b/src/sys/environment.rs @@ -18,6 +18,11 @@ use std::{ time::Duration, }; +/// Maximum retry count for the optimistic snapshot-matching loop in +/// [`Environment::begin_ro_sync_multi`] and +/// [`Environment::begin_ro_unsync_multi`]. +const MAX_MULTI_RETRIES: usize = 16; + /// An environment supports multiple databases, all residing in the same shared-memory map. /// /// Accessing the environment is thread-safe. @@ -166,10 +171,6 @@ impl Environment { self.begin_ro_multi(n, Self::begin_ro_unsync, |tx| tx.id()) } - /// Maximum retry count for the optimistic snapshot-matching loop in - /// [`Self::begin_ro_sync_multi`] and [`Self::begin_ro_unsync_multi`]. - const MAX_MULTI_RETRIES: usize = 16; - /// Open `n` read-only transactions guaranteed to share the same MVCC /// snapshot. /// @@ -184,13 +185,14 @@ impl Environment { begin: fn(&Self) -> MdbxResult, id: fn(&T) -> MdbxResult, ) -> MdbxResult> { - if n <= 1 { - return (n == 1) - .then(|| begin(self)) - .map_or_else(|| Ok(Vec::new()), |r| r.map(|t| vec![t])); + if n == 0 { + return Ok(Vec::new()); + } + if n == 1 { + return begin(self).map(|t| vec![t]); } - for _ in 0..Self::MAX_MULTI_RETRIES { + for _ in 0..MAX_MULTI_RETRIES { let txns: Vec = (0..n).map(|_| begin(self)).collect::>()?; let first_id = id(&txns[0])?; From 2be968f6689e4fc5e3a55abfd79508f72895c1e9 Mon Sep 17 00:00:00 2001 From: James Date: Thu, 9 Apr 2026 12:42:52 -0400 Subject: [PATCH 4/5] refactor: use windows(2) for snapshot ID comparison Co-Authored-By: Claude Opus 4.6 (1M context) --- src/sys/environment.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sys/environment.rs b/src/sys/environment.rs index b5d6b8e..b96de1b 100644 --- a/src/sys/environment.rs +++ b/src/sys/environment.rs @@ -195,8 +195,7 @@ impl Environment { for _ in 0..MAX_MULTI_RETRIES { let txns: Vec = (0..n).map(|_| begin(self)).collect::>()?; - let first_id = id(&txns[0])?; - if txns[1..].iter().all(|tx| id(tx) == Ok(first_id)) { + if txns.windows(2).all(|w| id(&w[0]) == id(&w[1])) { return Ok(txns); } } From d1cb912c21c16c9d502624e23e556b3fa8f7184c Mon Sep 17 00:00:00 2001 From: James Date: Fri, 10 Apr 2026 11:05:24 -0400 Subject: [PATCH 5/5] fix: handle mdbx_txn_id returning 0 as BadTxn error The C function mdbx_txn_id returns 0 for invalid transactions (null pointer or bad signature). Tx::id() now returns Err(BadTxn) instead of silently returning Ok(0). Also fixes begin_ro_multi snapshot comparison to propagate id errors rather than treating matching Err values as a valid snapshot match. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/sys/environment.rs | 3 ++- src/tx/impl.rs | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/sys/environment.rs b/src/sys/environment.rs index b96de1b..21a4a0c 100644 --- a/src/sys/environment.rs +++ b/src/sys/environment.rs @@ -195,7 +195,8 @@ impl Environment { for _ in 0..MAX_MULTI_RETRIES { let txns: Vec = (0..n).map(|_| begin(self)).collect::>()?; - if txns.windows(2).all(|w| id(&w[0]) == id(&w[1])) { + let first = id(&txns[0])?; + if txns[1..].iter().all(|t| id(t) == Ok(first)) { return Ok(txns); } } diff --git a/src/tx/impl.rs b/src/tx/impl.rs index 70e3513..c8839f2 100644 --- a/src/tx/impl.rs +++ b/src/tx/impl.rs @@ -135,7 +135,10 @@ where /// Returns the transaction id. #[inline(always)] pub fn id(&self) -> MdbxResult { - self.with_txn_ptr(|txn_ptr| Ok(unsafe { ffi::mdbx_txn_id(txn_ptr) })) + self.with_txn_ptr(|txn_ptr| { + let id = unsafe { ffi::mdbx_txn_id(txn_ptr) }; + if id == 0 { Err(MdbxError::BadTxn) } else { Ok(id) } + }) } /// Gets an item from a database.