diff --git a/src/error.rs b/src/error.rs index a640414..edc2c98 100644 --- a/src/error.rs +++ b/src/error.rs @@ -185,6 +185,12 @@ pub enum MdbxError { /// Operation requires DUP_FIXED flag on database. #[error("operation requires DUP_FIXED flag on database")] RequiresDupFixed, + /// Failed to open multiple read transactions on the same MVCC snapshot. + /// + /// Concurrent write pressure prevented acquiring a consistent set of + /// read transactions within the retry limit. + #[error("failed to acquire consistent MVCC snapshot across multiple read transactions")] + SnapshotDivergence, } impl MdbxError { @@ -263,6 +269,7 @@ impl MdbxError { Self::BotchedTransaction => -96001, Self::RequiresDupSort => -96002, Self::RequiresDupFixed => -96003, + Self::SnapshotDivergence => -96004, Self::Permission => ffi::MDBX_EPERM, Self::Other(err_code) => *err_code, } diff --git a/src/sys/environment.rs b/src/sys/environment.rs index 7ce6926..21a4a0c 100644 --- a/src/sys/environment.rs +++ b/src/sys/environment.rs @@ -18,6 +18,11 @@ use std::{ time::Duration, }; +/// Maximum retry count for the optimistic snapshot-matching loop in +/// [`Environment::begin_ro_sync_multi`] and +/// [`Environment::begin_ro_unsync_multi`]. +const MAX_MULTI_RETRIES: usize = 16; + /// An environment supports multiple databases, all residing in the same shared-memory map. /// /// Accessing the environment is thread-safe. @@ -116,6 +121,89 @@ impl Environment { RwTxUnsync::begin(self.clone()) } + /// Open `n` read-only synchronized transactions guaranteed to share the + /// same MVCC snapshot. + /// + /// This enables safe parallel iteration over large tables using multiple + /// cursors on separate threads without risking snapshot divergence. + /// + /// Uses an optimistic open-and-verify loop: transactions are opened + /// sequentially, then their snapshot IDs are compared. If a writer + /// commits between opens causing divergence, all transactions are + /// dropped and the process retries. Returns + /// [`MdbxError::SnapshotDivergence`] if retries are exhausted. + /// + /// # Examples + /// + /// ```no_run + /// # use signet_libmdbx::{Environment, Geometry, MdbxResult}; + /// # use std::path::Path; + /// # fn main() -> MdbxResult<()> { + /// let env = Environment::builder() + /// .set_geometry(Geometry { + /// size: Some(0..(1024 * 1024 * 1024)), + /// ..Default::default() + /// }) + /// .open(Path::new("/tmp/my_database"))?; + /// + /// // Open 4 read transactions on the same snapshot + /// let txns = env.begin_ro_sync_multi(4)?; + /// // All transactions see the same data + /// # Ok(()) + /// # } + /// ``` + pub fn begin_ro_sync_multi(&self, n: usize) -> MdbxResult> { + self.begin_ro_multi(n, Self::begin_ro_sync, |tx| tx.id()) + } + + /// Open `n` read-only unsynchronized transactions guaranteed to share the + /// same MVCC snapshot. + /// + /// This is the `!Sync` counterpart to [`begin_ro_sync_multi`]. The + /// returned transactions cannot be shared between threads, but offer + /// ~30% lower overhead per operation. + /// + /// See [`begin_ro_sync_multi`] for details on the optimistic retry + /// behavior. + /// + /// [`begin_ro_sync_multi`]: Self::begin_ro_sync_multi + pub fn begin_ro_unsync_multi(&self, n: usize) -> MdbxResult> { + self.begin_ro_multi(n, Self::begin_ro_unsync, |tx| tx.id()) + } + + /// Open `n` read-only transactions guaranteed to share the same MVCC + /// snapshot. + /// + /// This is the generic implementation backing both + /// [`begin_ro_sync_multi`] and [`begin_ro_unsync_multi`]. + /// + /// [`begin_ro_sync_multi`]: Self::begin_ro_sync_multi + /// [`begin_ro_unsync_multi`]: Self::begin_ro_unsync_multi + fn begin_ro_multi( + &self, + n: usize, + begin: fn(&Self) -> MdbxResult, + id: fn(&T) -> MdbxResult, + ) -> MdbxResult> { + if n == 0 { + return Ok(Vec::new()); + } + if n == 1 { + return begin(self).map(|t| vec![t]); + } + + for _ in 0..MAX_MULTI_RETRIES { + let txns: Vec = (0..n).map(|_| begin(self)).collect::>()?; + + let first = id(&txns[0])?; + if txns[1..].iter().all(|t| id(t) == Ok(first)) { + return Ok(txns); + } + } + + Err(MdbxError::SnapshotDivergence) + } + /// Returns a raw pointer to the underlying MDBX environment. /// /// The caller **must** ensure that the pointer is never dereferenced after the environment has diff --git a/src/tx/impl.rs b/src/tx/impl.rs index 70e3513..c8839f2 100644 --- a/src/tx/impl.rs +++ b/src/tx/impl.rs @@ -135,7 +135,10 @@ where /// Returns the transaction id. #[inline(always)] pub fn id(&self) -> MdbxResult { - self.with_txn_ptr(|txn_ptr| Ok(unsafe { ffi::mdbx_txn_id(txn_ptr) })) + self.with_txn_ptr(|txn_ptr| { + let id = unsafe { ffi::mdbx_txn_id(txn_ptr) }; + if id == 0 { Err(MdbxError::BadTxn) } else { Ok(id) } + }) } /// Gets an item from a database. diff --git a/tests/multi_ro.rs b/tests/multi_ro.rs new file mode 100644 index 0000000..c8b6b00 --- /dev/null +++ b/tests/multi_ro.rs @@ -0,0 +1,120 @@ +#![allow(missing_docs)] +use signet_libmdbx::*; +use std::{ + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, + thread, +}; +use tempfile::tempdir; + +#[test] +fn begin_ro_sync_multi_all_same_snapshot() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = env.begin_rw_sync().unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"k", b"v", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + let txns = env.begin_ro_sync_multi(4).unwrap(); + assert_eq!(txns.len(), 4); + + let ids: Vec = txns.iter().map(|tx| tx.id().unwrap()).collect(); + assert!(ids.windows(2).all(|w| w[0] == w[1])); +} + +#[test] +fn begin_ro_unsync_multi_all_same_snapshot() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = env.begin_rw_sync().unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"k", b"v", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + let txns = env.begin_ro_unsync_multi(4).unwrap(); + assert_eq!(txns.len(), 4); + + let ids: Vec = txns.iter().map(|tx| tx.id().unwrap()).collect(); + assert!(ids.windows(2).all(|w| w[0] == w[1])); +} + +#[test] +fn begin_ro_sync_multi_zero_returns_empty() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txns = env.begin_ro_sync_multi(0).unwrap(); + assert!(txns.is_empty()); +} + +#[test] +fn begin_ro_unsync_multi_zero_returns_empty() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txns = env.begin_ro_unsync_multi(0).unwrap(); + assert!(txns.is_empty()); +} + +#[test] +fn begin_ro_sync_multi_one_returns_single() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txns = env.begin_ro_sync_multi(1).unwrap(); + assert_eq!(txns.len(), 1); +} + +#[test] +fn begin_ro_unsync_multi_one_returns_single() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txns = env.begin_ro_unsync_multi(1).unwrap(); + assert_eq!(txns.len(), 1); +} + +#[test] +fn begin_ro_sync_multi_consistent_under_write_pressure() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + // Seed the database + let txn = env.begin_rw_sync().unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"k", b"v", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + let stop = Arc::new(AtomicBool::new(false)); + + // Writer thread: commit as fast as possible to create snapshot churn + let writer = { + let env = env.clone(); + let stop = Arc::clone(&stop); + thread::spawn(move || { + let mut i = 0u64; + while !stop.load(Ordering::Relaxed) { + let txn = env.begin_rw_sync().unwrap(); + let db = txn.open_db(None).unwrap(); + txn.put(db, b"counter", i.to_le_bytes(), WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + i += 1; + } + }) + }; + + // Open multi-txn sets concurrently with writer + for _ in 0..20 { + let txns = env.begin_ro_sync_multi(4).unwrap(); + let ids: Vec = txns.iter().map(|tx| tx.id().unwrap()).collect(); + assert!(ids.windows(2).all(|w| w[0] == w[1]), "snapshot divergence detected: {ids:?}"); + } + + stop.store(true, Ordering::Relaxed); + writer.join().unwrap(); +}