diff --git a/packages/common/metrics/src/providers.rs b/packages/common/metrics/src/providers.rs index f1ffa16f23..aac9a1b092 100644 --- a/packages/common/metrics/src/providers.rs +++ b/packages/common/metrics/src/providers.rs @@ -72,7 +72,7 @@ pub fn set_sampler_ratio(ratio: f64) -> anyhow::Result<()> { } fn resource() -> Resource { - let mut resource = Resource::builder() + let resource = Resource::builder() .with_service_name(rivet_env::service_name()) .with_schema_url( [KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION"))], diff --git a/packages/common/universaldb/src/driver/postgres/database.rs b/packages/common/universaldb/src/driver/postgres/database.rs index 9dc0cb37e2..7cae89e26d 100644 --- a/packages/common/universaldb/src/driver/postgres/database.rs +++ b/packages/common/universaldb/src/driver/postgres/database.rs @@ -1,5 +1,8 @@ use std::{ - sync::{Arc, Mutex}, + sync::{ + Arc, + atomic::{AtomicI32, Ordering}, + }, time::Duration, }; @@ -23,7 +26,7 @@ const GC_INTERVAL: Duration = Duration::from_secs(5); pub struct PostgresDatabaseDriver { pool: Arc, - max_retries: Arc>, + max_retries: AtomicI32, gc_handle: JoinHandle<()>, } @@ -162,7 +165,7 @@ impl PostgresDatabaseDriver { Ok(PostgresDatabaseDriver { pool: Arc::new(pool), - max_retries: Arc::new(Mutex::new(100)), + max_retries: AtomicI32::new(100), gc_handle, }) } @@ -182,7 +185,7 @@ impl DatabaseDriver for PostgresDatabaseDriver { ) -> BoxFut<'a, Result> { Box::pin(async move { let mut maybe_committed = MaybeCommitted(false); - let max_retries = *self.max_retries.lock().unwrap(); + let max_retries = self.max_retries.load(Ordering::SeqCst); for attempt in 0..max_retries { let tx = self.create_trx()?; @@ -227,7 +230,7 @@ impl DatabaseDriver for PostgresDatabaseDriver { fn set_option(&self, opt: DatabaseOption) -> Result<()> { match opt { DatabaseOption::TransactionRetryLimit(limit) => { - *self.max_retries.lock().unwrap() = limit; + self.max_retries.store(limit, Ordering::SeqCst); Ok(()) } } diff --git a/packages/common/universaldb/src/driver/postgres/transaction.rs b/packages/common/universaldb/src/driver/postgres/transaction.rs index 99a3df1e59..39c8da48b9 100644 --- a/packages/common/universaldb/src/driver/postgres/transaction.rs +++ b/packages/common/universaldb/src/driver/postgres/transaction.rs @@ -223,7 +223,6 @@ impl TransactionDriver for PostgresTransactionDriver { let (operations, conflict_ranges) = self.operations.consume(); - // We have operations but no transaction - create one just for commit let tx_sender = self.ensure_transaction().await?; // Send commit command @@ -250,8 +249,8 @@ impl TransactionDriver for PostgresTransactionDriver { self.operations.clear_all(); self.committed.store(false, Ordering::SeqCst); - // Note: We can't reset the transaction once it's created - // The transaction task will continue running + // Replace tx sender to get a new txn version + self.tx_sender = OnceCell::new(); } fn cancel(&self) { diff --git a/packages/common/universaldb/src/driver/rocksdb/conflict_range_tracker.rs b/packages/common/universaldb/src/driver/rocksdb/conflict_range_tracker.rs deleted file mode 100644 index 9872516b50..0000000000 --- a/packages/common/universaldb/src/driver/rocksdb/conflict_range_tracker.rs +++ /dev/null @@ -1,353 +0,0 @@ -// TODO: Revise to work like postgres (add conflict ranges at commit) - -use std::collections::HashMap; -use std::sync::{Arc, RwLock}; - -use anyhow::Result; - -use crate::error::DatabaseError; - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct TransactionId(u64); - -impl TransactionId { - pub fn new() -> Self { - use std::sync::atomic::{AtomicU64, Ordering}; - static COUNTER: AtomicU64 = AtomicU64::new(0); - TransactionId(COUNTER.fetch_add(1, Ordering::SeqCst)) - } -} - -#[derive(Debug, Clone)] -struct ConflictRange { - begin: Vec, - end: Vec, - is_write: bool, -} - -impl ConflictRange { - fn overlaps(&self, other: &ConflictRange) -> bool { - // Ranges overlap if begin1 < end2 and begin2 < end1 - self.begin < other.end && other.begin < self.end - } - - fn conflicts_with(&self, other: &ConflictRange) -> bool { - // Two ranges conflict if they overlap and at least one is a write - self.overlaps(other) && (self.is_write || other.is_write) - } -} - -#[derive(Clone)] -pub struct ConflictRangeTracker { - // Map from transaction ID to its held conflict ranges - transaction_ranges: Arc>>>, -} - -impl ConflictRangeTracker { - pub fn new() -> Self { - ConflictRangeTracker { - transaction_ranges: Arc::new(RwLock::new(HashMap::new())), - } - } - - /// Check if a range conflicts with any existing ranges from other transactions - pub fn check_conflict( - &self, - tx_id: TransactionId, - begin: &[u8], - end: &[u8], - is_write: bool, - ) -> Result<()> { - let new_range = ConflictRange { - begin: begin.to_vec(), - end: end.to_vec(), - is_write, - }; - - // Check against all other transactions' ranges - for (other_tx_id, ranges) in &*self.transaction_ranges.read().unwrap() { - if *other_tx_id == tx_id { - // Skip our own ranges - continue; - } - - for existing_range in ranges { - if new_range.conflicts_with(existing_range) { - // Found a conflict - return retryable error - return Err(DatabaseError::NotCommitted.into()); - } - } - } - - Ok(()) - } - - /// Add a conflict range for a transaction - pub fn add_range( - &self, - tx_id: TransactionId, - begin: &[u8], - end: &[u8], - is_write: bool, - ) -> Result<()> { - // First check for conflicts - self.check_conflict(tx_id, begin, end, is_write)?; - - let new_range = ConflictRange { - begin: begin.to_vec(), - end: end.to_vec(), - is_write, - }; - - self.transaction_ranges - .write() - .unwrap() - .entry(tx_id) - .or_insert_with(Vec::new) - .push(new_range); - - Ok(()) - } - - /// Release all conflict ranges for a transaction - pub fn release_transaction(&self, tx_id: TransactionId) { - self.transaction_ranges.write().unwrap().remove(&tx_id); - } - - /// Get all ranges held by a transaction (for debugging) - pub fn get_transaction_ranges(&self, tx_id: TransactionId) -> Vec<(Vec, Vec, bool)> { - self.transaction_ranges - .read() - .unwrap() - .get(&tx_id) - .map(|ranges| { - ranges - .iter() - .map(|r| (r.begin.clone(), r.end.clone(), r.is_write)) - .collect() - }) - .unwrap_or_default() - } - - /// Clear all conflict ranges (for testing) - pub fn clear_all(&self) { - self.transaction_ranges.write().unwrap().clear(); - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_no_conflict_different_ranges() { - let tracker = ConflictRangeTracker::new(); - let tx1 = TransactionId::new(); - let tx2 = TransactionId::new(); - - // Add range for tx1 - tracker - .add_range(tx1, b"a", b"b", false) - .expect("Should add first range"); - - // Add non-overlapping range for tx2 - tracker - .add_range(tx2, b"c", b"d", false) - .expect("Should add non-overlapping range"); - } - - #[test] - fn test_read_read_no_conflict() { - let tracker = ConflictRangeTracker::new(); - let tx1 = TransactionId::new(); - let tx2 = TransactionId::new(); - - // Add read range for tx1 - tracker - .add_range(tx1, b"a", b"c", false) - .expect("Should add first read range"); - - // Add overlapping read range for tx2 - should not conflict - tracker - .add_range(tx2, b"b", b"d", false) - .expect("Should add overlapping read range"); - } - - #[test] - fn test_read_write_conflict() { - let tracker = ConflictRangeTracker::new(); - let tx1 = TransactionId::new(); - let tx2 = TransactionId::new(); - - // Add read range for tx1 - tracker - .add_range(tx1, b"a", b"c", false) - .expect("Should add read range"); - - // Try to add overlapping write range for tx2 - should conflict - let result = tracker.add_range(tx2, b"b", b"d", true); - assert!(result.is_err()); - // Check for conflict error - if let Err(e) = result { - assert!(matches!( - e.downcast::().unwrap(), - DatabaseError::NotCommitted - )); - } - } - - #[test] - fn test_write_write_conflict() { - let tracker = ConflictRangeTracker::new(); - let tx1 = TransactionId::new(); - let tx2 = TransactionId::new(); - - // Add write range for tx1 - tracker - .add_range(tx1, b"a", b"c", true) - .expect("Should add first write range"); - - // Try to add overlapping write range for tx2 - should conflict - let result = tracker.add_range(tx2, b"b", b"d", true); - assert!(result.is_err()); - // Check for conflict error - if let Err(e) = result { - assert!(matches!( - e.downcast::().unwrap(), - DatabaseError::NotCommitted - )); - } - } - - #[test] - fn test_release_transaction() { - let tracker = ConflictRangeTracker::new(); - let tx1 = TransactionId::new(); - let tx2 = TransactionId::new(); - - // Add write range for tx1 - tracker - .add_range(tx1, b"a", b"c", true) - .expect("Should add write range"); - - // Try to add overlapping range for tx2 - should conflict - let result = tracker.add_range(tx2, b"b", b"d", true); - assert!(result.is_err()); - - // Release tx1's ranges - tracker.release_transaction(tx1); - - // Now tx2 should be able to add the range - tracker - .add_range(tx2, b"b", b"d", true) - .expect("Should add range after release"); - } - - #[test] - fn test_write_read_conflict() { - let tracker = ConflictRangeTracker::new(); - let tx1 = TransactionId::new(); - let tx2 = TransactionId::new(); - - // Add write range for tx1 - tracker - .add_range(tx1, b"a", b"c", true) - .expect("Should add write range"); - - // Try to add overlapping read range for tx2 - should conflict - let result = tracker.add_range(tx2, b"b", b"d", false); - assert!(result.is_err()); - // Check for conflict error - if let Err(e) = result { - assert!(matches!( - e.downcast::().unwrap(), - DatabaseError::NotCommitted - )); - } - } - - #[test] - fn test_same_transaction_no_conflict() { - let tracker = ConflictRangeTracker::new(); - let tx1 = TransactionId::new(); - - // Add write range for tx1 - tracker - .add_range(tx1, b"a", b"c", true) - .expect("Should add write range"); - - // Add overlapping write range for same transaction - should not conflict - tracker - .add_range(tx1, b"b", b"d", true) - .expect("Should add overlapping range for same transaction"); - } - - #[test] - fn test_exact_boundary_no_conflict() { - let tracker = ConflictRangeTracker::new(); - let tx1 = TransactionId::new(); - let tx2 = TransactionId::new(); - - // Add range [a, b) for tx1 - tracker - .add_range(tx1, b"a", b"b", true) - .expect("Should add first range"); - - // Add range [b, c) for tx2 - should not conflict (adjacent ranges) - tracker - .add_range(tx2, b"b", b"c", true) - .expect("Should add adjacent range"); - } - - #[test] - fn test_clear_all() { - let tracker = ConflictRangeTracker::new(); - let tx1 = TransactionId::new(); - let tx2 = TransactionId::new(); - - // Add write range for tx1 - tracker - .add_range(tx1, b"a", b"c", true) - .expect("Should add write range"); - - // Try to add overlapping range for tx2 - should conflict - let result = tracker.add_range(tx2, b"b", b"d", true); - assert!(result.is_err()); - - // Clear all ranges - tracker.clear_all(); - - // Now tx2 should be able to add the range - tracker - .add_range(tx2, b"b", b"d", true) - .expect("Should add range after clear_all"); - } - - #[test] - fn test_get_transaction_ranges() { - let tracker = ConflictRangeTracker::new(); - let tx1 = TransactionId::new(); - - // Add multiple ranges for tx1 - tracker - .add_range(tx1, b"a", b"b", false) - .expect("Should add read range"); - tracker - .add_range(tx1, b"c", b"d", true) - .expect("Should add write range"); - - // Get ranges for tx1 - let ranges = tracker.get_transaction_ranges(tx1); - assert_eq!(ranges.len(), 2); - - // Check first range (read) - assert_eq!(ranges[0].0, b"a"); - assert_eq!(ranges[0].1, b"b"); - assert_eq!(ranges[0].2, false); - - // Check second range (write) - assert_eq!(ranges[1].0, b"c"); - assert_eq!(ranges[1].1, b"d"); - assert_eq!(ranges[1].2, true); - } -} diff --git a/packages/common/universaldb/src/driver/rocksdb/database.rs b/packages/common/universaldb/src/driver/rocksdb/database.rs index ff4d3b36b3..01fb740d72 100644 --- a/packages/common/universaldb/src/driver/rocksdb/database.rs +++ b/packages/common/universaldb/src/driver/rocksdb/database.rs @@ -1,6 +1,10 @@ use std::{ path::PathBuf, - sync::{Arc, Mutex}, + sync::{ + Arc, + atomic::{AtomicI32, Ordering}, + }, + time::Duration, }; use anyhow::{Context, Result}; @@ -14,12 +18,16 @@ use crate::{ utils::{MaybeCommitted, calculate_tx_retry_backoff}, }; -use super::{conflict_range_tracker::ConflictRangeTracker, transaction::RocksDbTransactionDriver}; +use super::{ + transaction::RocksDbTransactionDriver, transaction_conflict_tracker::TransactionConflictTracker, +}; + +const TXN_TIMEOUT: Duration = Duration::from_secs(5); pub struct RocksDbDatabaseDriver { db: Arc, - max_retries: Arc>, - conflict_tracker: ConflictRangeTracker, + max_retries: AtomicI32, + txn_conflict_tracker: TransactionConflictTracker, } impl RocksDbDatabaseDriver { @@ -42,8 +50,8 @@ impl RocksDbDatabaseDriver { Ok(RocksDbDatabaseDriver { db: Arc::new(db), - max_retries: Arc::new(Mutex::new(100)), - conflict_tracker: ConflictRangeTracker::new(), + max_retries: AtomicI32::new(100), + txn_conflict_tracker: TransactionConflictTracker::new(), }) } } @@ -52,7 +60,7 @@ impl DatabaseDriver for RocksDbDatabaseDriver { fn create_trx(&self) -> Result { Ok(Transaction::new(Arc::new(RocksDbTransactionDriver::new( self.db.clone(), - self.conflict_tracker.clone(), + self.txn_conflict_tracker.clone(), )))) } @@ -62,7 +70,7 @@ impl DatabaseDriver for RocksDbDatabaseDriver { ) -> BoxFut<'a, Result> { Box::pin(async move { let mut maybe_committed = MaybeCommitted(false); - let max_retries = *self.max_retries.lock().unwrap(); + let max_retries = self.max_retries.load(Ordering::SeqCst); for attempt in 0..max_retries { let tx = self.create_trx()?; @@ -70,13 +78,15 @@ impl DatabaseDriver for RocksDbDatabaseDriver { retryable.maybe_committed = maybe_committed; // Execute transaction - let error = match closure(retryable.clone()).await { - Ok(res) => match retryable.inner.driver.commit_ref().await { - Ok(_) => return Ok(res), - Err(e) => e, - }, - Err(e) => e, - }; + let error = + match tokio::time::timeout(TXN_TIMEOUT, closure(retryable.clone())).await { + Ok(Ok(res)) => match retryable.inner.driver.commit_ref().await { + Ok(_) => return Ok(res), + Err(e) => e, + }, + Ok(Err(e)) => e, + Err(_) => anyhow::Error::from(DatabaseError::TransactionTooOld), + }; let chain = error .chain() @@ -105,7 +115,7 @@ impl DatabaseDriver for RocksDbDatabaseDriver { fn set_option(&self, opt: DatabaseOption) -> Result<()> { match opt { DatabaseOption::TransactionRetryLimit(limit) => { - *self.max_retries.lock().unwrap() = limit; + self.max_retries.store(limit, Ordering::SeqCst); Ok(()) } } diff --git a/packages/common/universaldb/src/driver/rocksdb/mod.rs b/packages/common/universaldb/src/driver/rocksdb/mod.rs index 296445bdb3..a24bd72603 100644 --- a/packages/common/universaldb/src/driver/rocksdb/mod.rs +++ b/packages/common/universaldb/src/driver/rocksdb/mod.rs @@ -1,6 +1,6 @@ -pub mod conflict_range_tracker; mod database; mod transaction; +mod transaction_conflict_tracker; mod transaction_task; pub use database::RocksDbDatabaseDriver; diff --git a/packages/common/universaldb/src/driver/rocksdb/transaction.rs b/packages/common/universaldb/src/driver/rocksdb/transaction.rs index f37525e19e..6090266b88 100644 --- a/packages/common/universaldb/src/driver/rocksdb/transaction.rs +++ b/packages/common/universaldb/src/driver/rocksdb/transaction.rs @@ -17,12 +17,12 @@ use crate::{ options::{ConflictRangeType, MutationType}, range_option::RangeOption, tx_ops::TransactionOperations, - utils::{IsolationLevel, end_of_key_range}, + utils::IsolationLevel, value::{Slice, Value, Values}, }; use super::{ - conflict_range_tracker::{ConflictRangeTracker, TransactionId}, + transaction_conflict_tracker::TransactionConflictTracker, transaction_task::{TransactionCommand, TransactionTask}, }; @@ -31,26 +31,24 @@ pub struct RocksDbTransactionDriver { operations: TransactionOperations, committed: AtomicBool, tx_sender: OnceCell>, - conflict_tracker: ConflictRangeTracker, - tx_id: TransactionId, -} - -impl Drop for RocksDbTransactionDriver { - fn drop(&mut self) { - // Release all conflict ranges when the transaction is dropped - self.conflict_tracker.release_transaction(self.tx_id); - } + txn_conflict_tracker: TransactionConflictTracker, + start_version: u64, } impl RocksDbTransactionDriver { - pub fn new(db: Arc, conflict_tracker: ConflictRangeTracker) -> Self { + pub fn new( + db: Arc, + txn_conflict_tracker: TransactionConflictTracker, + ) -> Self { + let start_version = txn_conflict_tracker.next_global_version(); + RocksDbTransactionDriver { db, operations: TransactionOperations::default(), committed: AtomicBool::new(false), tx_sender: OnceCell::new(), - conflict_tracker, - tx_id: TransactionId::new(), + txn_conflict_tracker, + start_version, } } @@ -61,7 +59,11 @@ impl RocksDbTransactionDriver { let (sender, receiver) = mpsc::channel(100); // Spawn the transaction task - let task = TransactionTask::new(self.db.clone(), receiver); + let task = TransactionTask::new( + self.db.clone(), + self.txn_conflict_tracker.clone(), + receiver, + ); tokio::spawn(task.run()); anyhow::Ok(sender) @@ -82,36 +84,26 @@ impl TransactionDriver for RocksDbTransactionDriver { isolation_level: IsolationLevel, ) -> Pin>> + Send + 'a>> { let key = key.to_vec(); + Box::pin(async move { self.operations .get_with_callback(&key, isolation_level, || async { - if let IsolationLevel::Serializable = isolation_level { - self.conflict_tracker.add_range( - self.tx_id, - &key, - &end_of_key_range(&key), - false, // is_write = false for reads - )?; - } - let tx_sender = self.ensure_transaction().await?; // Send query command let (response_tx, response_rx) = oneshot::channel(); tx_sender .send(TransactionCommand::Get { - key: key.to_vec(), + key: key.clone(), response: response_tx, }) .await - .context("failed to send transaction command")?; + .context("failed to send rocksdb transaction command")?; // Wait for response - let value = response_rx + response_rx .await - .context("failed to receive transaction response")??; - - Ok(value) + .context("failed to receive rocksdb response")? }) .await }) @@ -131,15 +123,6 @@ impl TransactionDriver for RocksDbTransactionDriver { self.operations .get_key(&selector, isolation_level, || async { - if let IsolationLevel::Serializable = isolation_level { - self.conflict_tracker.add_range( - self.tx_id, - &key, - &end_of_key_range(&key), - false, // is_write = false for reads - )?; - } - let tx_sender = self.ensure_transaction().await?; // Send query command @@ -152,12 +135,12 @@ impl TransactionDriver for RocksDbTransactionDriver { response: response_tx, }) .await - .context("failed to send commit command")?; + .context("failed to send rocksdb transaction command")?; // Wait for response let result_key = response_rx .await - .context("failed to receive key selector response")??; + .context("failed to receive rocksdb key selector response")??; // Return the key if found, or empty vector if not Ok(result_key.unwrap_or_else(Slice::new)) @@ -169,55 +152,46 @@ impl TransactionDriver for RocksDbTransactionDriver { fn get_range<'a>( &'a self, opt: &RangeOption<'a>, - iteration: usize, + _iteration: usize, isolation_level: IsolationLevel, ) -> Pin> + Send + 'a>> { - // Extract fields from RangeOption for the async closure let opt = opt.clone(); - let begin_selector = opt.begin.clone(); - let end_selector = opt.end.clone(); - let limit = opt.limit; - let reverse = opt.reverse; Box::pin(async move { + let begin = opt.begin.key().to_vec(); + let begin_or_equal = opt.begin.or_equal(); + let begin_offset = opt.begin.offset(); + let end = opt.end.key().to_vec(); + let end_or_equal = opt.end.or_equal(); + let end_offset = opt.end.offset(); + let limit = opt.limit; + let reverse = opt.reverse; + self.operations .get_range(&opt, isolation_level, || async { - if let IsolationLevel::Serializable = isolation_level { - // Add read conflict range for this range (using raw keys, conservative) - self.conflict_tracker.add_range( - self.tx_id, - begin_selector.key(), - end_selector.key(), - false, // is_write = false for reads - )?; - } - let tx_sender = self.ensure_transaction().await?; - // Send query command with selector info + // Send query command let (response_tx, response_rx) = oneshot::channel(); tx_sender .send(TransactionCommand::GetRange { - begin_key: begin_selector.key().to_vec(), - begin_or_equal: begin_selector.or_equal(), - begin_offset: begin_selector.offset(), - end_key: end_selector.key().to_vec(), - end_or_equal: end_selector.or_equal(), - end_offset: end_selector.offset(), + begin: begin.clone(), + begin_or_equal, + begin_offset, + end: end.clone(), + end_or_equal, + end_offset, limit, reverse, - iteration, response: response_tx, }) .await - .context("failed to send transaction command")?; + .context("failed to send rocksdb transaction command")?; // Wait for response - let values = response_rx + response_rx .await - .context("failed to receive range response")??; - - Ok(values) + .context("failed to receive rocksdb range response")? }) .await }) @@ -245,35 +219,14 @@ impl TransactionDriver for RocksDbTransactionDriver { } fn set(&self, key: &[u8], value: &[u8]) { - // Add write conflict range for this range - let _ = self.conflict_tracker.add_range( - self.tx_id, - key, - &end_of_key_range(&key), - true, // is_write = true for writes - ); - self.operations.set(key, value); } fn clear(&self, key: &[u8]) { - // Add write conflict range for this range - let _ = self.conflict_tracker.add_range( - self.tx_id, - key, - &end_of_key_range(&key), - true, // is_write = true for writes - ); - self.operations.clear(key); } fn clear_range(&self, begin: &[u8], end: &[u8]) { - // Add write conflict range for this range - let _ = self.conflict_tracker.add_range( - self.tx_id, begin, end, true, // is_write = true for writes - ); - self.operations.clear_range(begin, end); } @@ -284,55 +237,43 @@ impl TransactionDriver for RocksDbTransactionDriver { } self.committed.store(true, Ordering::SeqCst); - let (operations, _conflict_ranges) = self.operations.consume(); + let (operations, conflict_ranges) = self.operations.consume(); - // Get the transaction sender let tx_sender = self.ensure_transaction().await?; - // Send commit command with operations and conflict ranges + // Send commit command let (response_tx, response_rx) = oneshot::channel(); tx_sender .send(TransactionCommand::Commit { + start_version: self.start_version, operations, + conflict_ranges, response: response_tx, }) .await - .context("failed to send commit command")?; + .context("failed to send rocksdb transaction command")?; - // Wait for response - let result = response_rx + // Wait for commit response + response_rx .await - .context("failed to receive commit response")?; - - // Release conflict ranges after successful commit - if result.is_ok() { - self.conflict_tracker.release_transaction(self.tx_id); - } + .context("failed to receive rocksdb commit response")??; - result + Ok(()) }) } fn reset(&mut self) { - // Release any existing conflict ranges - self.conflict_tracker.release_transaction(self.tx_id); - - // Generate a new transaction ID for the reset transaction - self.tx_id = TransactionId::new(); - self.operations.clear_all(); - // Clear the transaction senders to reset connections - self.tx_sender = OnceCell::new(); + self.committed.store(false, Ordering::SeqCst); + + self.start_version = self.txn_conflict_tracker.next_global_version(); } fn cancel(&self) { - // Release all conflict ranges for this transaction - self.conflict_tracker.release_transaction(self.tx_id); + self.operations.clear_all(); + self.committed.store(true, Ordering::SeqCst); // Prevent future commits - // Send cancel command to both transaction tasks if they exist - if let Some(tx_sender) = self.tx_sender.get() { - let _ = tx_sender.try_send(TransactionCommand::Cancel); - } + // Tx sender will be stopped back when dropped } fn add_conflict_range( @@ -341,14 +282,6 @@ impl TransactionDriver for RocksDbTransactionDriver { end: &[u8], conflict_type: ConflictRangeType, ) -> Result<()> { - let is_write = match conflict_type { - ConflictRangeType::Write => true, - ConflictRangeType::Read => false, - }; - - self.conflict_tracker - .add_range(self.tx_id, begin, end, is_write)?; - self.operations .add_conflict_range(begin, end, conflict_type); @@ -375,12 +308,12 @@ impl TransactionDriver for RocksDbTransactionDriver { response: response_tx, }) .await - .context("failed to send commit command")?; + .context("failed to send rocksdb command")?; // Wait for response let size = response_rx .await - .context("failed to receive size response")??; + .context("failed to receive rocksdb size response")??; Ok(size) }) @@ -393,32 +326,29 @@ impl TransactionDriver for RocksDbTransactionDriver { } self.committed.store(true, Ordering::SeqCst); - let (operations, _conflict_ranges) = self.operations.consume(); + let (operations, conflict_ranges) = self.operations.consume(); - // Get the transaction sender + // We have operations but no transaction - create one just for commit let tx_sender = self.ensure_transaction().await?; - // Send commit command with operations + // Send commit command let (response_tx, response_rx) = oneshot::channel(); tx_sender .send(TransactionCommand::Commit { + start_version: self.start_version, operations, + conflict_ranges, response: response_tx, }) .await - .context("failed to send commit command")?; + .context("failed to send rocksdb transaction command")?; - // Wait for response - let result = response_rx + // Wait for commit response + response_rx .await - .context("failed to receive commit response")?; - - // Release conflict ranges after successful commit - if result.is_ok() { - self.conflict_tracker.release_transaction(self.tx_id); - } + .context("failed to receive rocksdb commit response")??; - result.map(|_| ()) + Ok(()) }) } } diff --git a/packages/common/universaldb/src/driver/rocksdb/transaction_conflict_tracker.rs b/packages/common/universaldb/src/driver/rocksdb/transaction_conflict_tracker.rs new file mode 100644 index 0000000000..1c949a887d --- /dev/null +++ b/packages/common/universaldb/src/driver/rocksdb/transaction_conflict_tracker.rs @@ -0,0 +1,94 @@ +use std::{ + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + time::{Duration, Instant}, +}; + +use tokio::sync::Mutex; + +use crate::options::ConflictRangeType; + +// Transactions cannot live longer than 5 seconds so we don't need to store transaction conflicts longer than +// that +const TXN_CONFLICT_TTL: Duration = Duration::from_secs(10); + +struct PreviousTransaction { + insert_instant: Instant, + start_version: u64, + commit_version: u64, + conflict_ranges: Vec<(Vec, Vec, ConflictRangeType)>, +} + +#[derive(Clone)] +pub struct TransactionConflictTracker { + // NOTE: We use a mutex because we need to lock reads across all active txns. This could be optimized to + // only lock txns that have overlapping ranges with the currently checking one, but its a small + // optimization because most txns are going to be very recent and this only stores the last 10 seconds of + // txns. + txns: Arc>>, + global_version: Arc, +} + +impl TransactionConflictTracker { + pub fn new() -> Self { + TransactionConflictTracker { + txns: Arc::new(Mutex::new(Vec::new())), + global_version: Arc::new(AtomicU64::new(0)), + } + } + + /// Each number returned is unique. + pub fn next_global_version(&self) -> u64 { + self.global_version.fetch_add(1, Ordering::SeqCst) + } + + pub async fn check_and_insert( + &self, + txn1_start_version: u64, + txn1_conflict_ranges: Vec<(Vec, Vec, ConflictRangeType)>, + ) -> bool { + let mut txns = self.txns.lock().await; + let txn1_commit_version = self.next_global_version(); + + // Prune old entries + txns.retain(|txn| txn.insert_instant.elapsed() < TXN_CONFLICT_TTL); + + for txn2 in &*txns { + // Check txn versions overlap + if txn1_start_version > txn2.start_version && txn1_start_version < txn2.commit_version { + for (cr1_start, cr1_end, cr1_type) in &txn1_conflict_ranges { + for (cr2_start, cr2_end, cr2_type) in &txn2.conflict_ranges { + // Check conflict ranges overlap + if cr1_start < cr2_end && cr2_start < cr1_end && cr1_type != cr2_type { + return true; + } + } + } + } + } + + // If no conflicts were detected, save txn data + txns.push(PreviousTransaction { + insert_instant: Instant::now(), + start_version: txn1_start_version, + commit_version: txn1_commit_version, + conflict_ranges: txn1_conflict_ranges, + }); + + false + } + + pub async fn remove(&self, txn_start_version: u64) { + let mut txns = self.txns.lock().await; + + if let Some(i) = txns + .iter() + .enumerate() + .find_map(|(i, txn)| (txn.start_version == txn_start_version).then_some(i)) + { + txns.remove(i); + } + } +} diff --git a/packages/common/universaldb/src/driver/rocksdb/transaction_task.rs b/packages/common/universaldb/src/driver/rocksdb/transaction_task.rs index 52e0debc0d..e43fbe6a1d 100644 --- a/packages/common/universaldb/src/driver/rocksdb/transaction_task.rs +++ b/packages/common/universaldb/src/driver/rocksdb/transaction_task.rs @@ -6,10 +6,12 @@ use rocksdb::{ }; use tokio::sync::{mpsc, oneshot}; +use super::transaction_conflict_tracker::TransactionConflictTracker; use crate::{ atomic::apply_atomic_op, error::DatabaseError, key_selector::KeySelector, + options::ConflictRangeType, tx_ops::Operation, value::{KeyValue, Slice, Values}, versionstamp::substitute_versionstamp_if_incomplete, @@ -27,19 +29,20 @@ pub enum TransactionCommand { response: oneshot::Sender>>, }, GetRange { - begin_key: Vec, + begin: Vec, begin_or_equal: bool, begin_offset: i32, - end_key: Vec, + end: Vec, end_or_equal: bool, end_offset: i32, limit: Option, reverse: bool, - iteration: usize, response: oneshot::Sender>, }, Commit { + start_version: u64, operations: Vec, + conflict_ranges: Vec<(Vec, Vec, ConflictRangeType)>, response: oneshot::Sender>, }, GetEstimatedRangeSize { @@ -47,20 +50,27 @@ pub enum TransactionCommand { end: Vec, response: oneshot::Sender>, }, - Cancel, } +// This task may be used for multiple rocksdb txns, in contrast to how postgres is written. This is solely to +// save on spawning new tasks. pub struct TransactionTask { db: Arc, + txn_conflict_tracker: TransactionConflictTracker, receiver: mpsc::Receiver, } impl TransactionTask { pub fn new( db: Arc, + txn_conflict_tracker: TransactionConflictTracker, receiver: mpsc::Receiver, ) -> Self { - TransactionTask { db, receiver } + TransactionTask { + db, + txn_conflict_tracker, + receiver, + } } pub async fn run(mut self) { @@ -80,37 +90,39 @@ impl TransactionTask { let _ = response.send(result); } TransactionCommand::GetRange { - begin_key, + begin, begin_or_equal, begin_offset, - end_key, + end, end_or_equal, end_offset, limit, reverse, - iteration, response, } => { let result = self .handle_get_range( - begin_key, + begin, begin_or_equal, begin_offset, - end_key, + end, end_or_equal, end_offset, limit, reverse, - iteration, ) .await; let _ = response.send(result); } TransactionCommand::Commit { + start_version, operations, + conflict_ranges, response, } => { - let result = self.handle_commit(operations).await; + let result = self + .handle_commit(start_version, operations, conflict_ranges) + .await; let _ = response.send(result); } TransactionCommand::GetEstimatedRangeSize { @@ -121,10 +133,6 @@ impl TransactionTask { let result = self.handle_get_estimated_range_size(&begin, &end).await; let _ = response.send(result); } - TransactionCommand::Cancel => { - // Exit the task - break; - } } } } @@ -297,7 +305,12 @@ impl TransactionTask { } } - async fn handle_commit(&mut self, operations: Vec) -> Result<()> { + async fn handle_commit( + &mut self, + start_version: u64, + operations: Vec, + conflict_ranges: Vec<(Vec, Vec, ConflictRangeType)>, + ) -> Result<()> { // Create a new transaction for this commit let txn = self.create_transaction(); @@ -361,18 +374,25 @@ impl TransactionTask { } } - // Note: RocksDB doesn't natively support conflict ranges like FoundationDB - // We would need to implement custom conflict detection here if needed - // For now, we'll rely on OptimisticTransactionDB's built-in conflict detection + if self + .txn_conflict_tracker + .check_and_insert(start_version, conflict_ranges) + .await + { + return Err(DatabaseError::NotCommitted.into()); + } // Commit the transaction (this consumes txn) match txn.commit() { Ok(_) => Ok(()), Err(e) => { + // If the txn failed due to a rocksdb error, remove it from the conflict tracker + self.txn_conflict_tracker.remove(start_version).await; + let err_str = e.to_string(); // Check if this is a conflict error - if err_str.contains("conflict") { + if err_str.contains("conflict") || err_str.contains("Resource busy") { // Return retryable error Err(DatabaseError::NotCommitted.into()) } else { @@ -384,26 +404,25 @@ impl TransactionTask { async fn handle_get_range( &mut self, - begin_key: Vec, + begin: Vec, begin_or_equal: bool, begin_offset: i32, - end_key: Vec, + end: Vec, end_or_equal: bool, end_offset: i32, limit: Option, reverse: bool, - _iteration: usize, ) -> Result { let txn = self.create_transaction(); let read_opts = ReadOptions::default(); // Resolve the begin selector let resolved_begin = - self.resolve_key_selector_for_range(&txn, &begin_key, begin_or_equal, begin_offset)?; + self.resolve_key_selector_for_range(&txn, &begin, begin_or_equal, begin_offset)?; // Resolve the end selector let resolved_end = - self.resolve_key_selector_for_range(&txn, &end_key, end_or_equal, end_offset)?; + self.resolve_key_selector_for_range(&txn, &end, end_or_equal, end_offset)?; // Now execute the range query with resolved keys let iter = txn.iterator_opt( diff --git a/packages/common/universaldb/tests/rocksdb.rs b/packages/common/universaldb/tests/rocksdb.rs new file mode 100644 index 0000000000..814382c2ef --- /dev/null +++ b/packages/common/universaldb/tests/rocksdb.rs @@ -0,0 +1,146 @@ +use std::{path::Path, sync::Arc}; + +use anyhow::Context; +use futures_util::StreamExt; +use rivet_test_deps_docker::TestDatabase; +use rocksdb::{OptimisticTransactionDB, Options, WriteOptions}; +use universaldb::{Database, utils::IsolationLevel::*}; +use uuid::Uuid; + +#[tokio::test] +async fn rocksdb_native() { + let _ = tracing_subscriber::fmt() + .with_env_filter("debug") + .with_test_writer() + .try_init(); + + let db_path = Path::new("/tmp/foobar-db"); + std::fs::create_dir_all(&db_path) + .context("failed to create database directory") + .unwrap(); + + // Configure RocksDB options + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_max_open_files(10000); + opts.set_keep_log_file_num(10); + opts.set_max_total_wal_size(64 * 1024 * 1024); // 64MB + + // Open the OptimisticTransactionDB + tracing::debug!(path=%db_path.display(), "opening rocksdb"); + let db: Arc = Arc::new( + OptimisticTransactionDB::open(&opts, db_path) + .context("failed to open rocksdb") + .unwrap(), + ); + + let mut handles = Vec::new(); + + for _ in 0..64 { + let db = db.clone(); + let write_opts = WriteOptions::default(); + let txn_opts = rocksdb::OptimisticTransactionOptions::default(); + + let handle = tokio::spawn(async move { + for attempt in 0..300 { + let txn = db.transaction_opt(&write_opts, &txn_opts); + + let key = vec![1, 2, 3]; + let value = vec![4, 5, 6]; + + txn.get(&key).unwrap(); + txn.put(&key, &value).unwrap(); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Execute transaction + let Err(error) = txn.commit() else { + tracing::info!("success"); + return; + }; + + let err_str = error.to_string(); + if err_str.contains("conflict") || err_str.contains("Resource busy") { + tracing::warn!(?error, "conflict"); + let backoff_ms = calculate_tx_retry_backoff(attempt); + tokio::time::sleep(tokio::time::Duration::from_millis(backoff_ms)).await; + continue; + } + + Err::<(), _>(error).unwrap(); + } + }); + + handles.push(handle); + } + + futures_util::stream::iter(handles) + .buffer_unordered(64) + .for_each(|result| async { + if let Err(err) = result { + tracing::error!(?err, "task failed"); + } + }) + .await; +} + +#[tokio::test] +async fn rocksdb_udb() { + let _ = tracing_subscriber::fmt() + .with_env_filter("debug") + .with_test_writer() + .try_init(); + + let test_id = Uuid::new_v4(); + let (db_config, _docker_config) = TestDatabase::FileSystem.config(test_id, 1).await.unwrap(); + + let rivet_config::config::Database::FileSystem(fs_config) = db_config else { + unreachable!() + }; + + let driver = universaldb::driver::RocksDbDatabaseDriver::new(fs_config.path) + .await + .unwrap(); + let db = Database::new(Arc::new(driver)); + + let mut handles = Vec::new(); + + for _ in 0..64 { + let db = db.clone(); + + let handle = tokio::spawn(async move { + db.run(|tx| async move { + let key = vec![1, 2, 3]; + let value = vec![4, 5, 6]; + + tx.get(&key, Serializable).await.unwrap(); + tx.set(&key, &value); + + Ok(()) + }) + .await + .unwrap(); + + tracing::info!("success"); + }); + + handles.push(handle); + } + + futures_util::stream::iter(handles) + .buffer_unordered(1024) + .for_each(|result| async { + if let Err(err) = result { + tracing::error!(?err, "task failed"); + } + }) + .await; +} + +pub fn calculate_tx_retry_backoff(attempt: usize) -> u64 { + let base_backoff_ms = 2_u64.pow((attempt as u32).min(10)) * 10; + + let jitter_ms = rand::random::() % 100; + + base_backoff_ms + jitter_ms +} diff --git a/scripts/run/engine-rocksdb.sh b/scripts/run/engine-rocksdb.sh new file mode 100755 index 0000000000..572101f0b8 --- /dev/null +++ b/scripts/run/engine-rocksdb.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" + +cd "${REPO_ROOT}" + +RUST_LOG=debug \ +cargo run --bin rivet-engine -- start "$@" diff --git a/sdks/typescript/test-runner/src/index.ts b/sdks/typescript/test-runner/src/index.ts index c0ff0f396a..57e6f78580 100644 --- a/sdks/typescript/test-runner/src/index.ts +++ b/sdks/typescript/test-runner/src/index.ts @@ -20,7 +20,7 @@ const RIVET_RUNNER_VERSION = process.env.RIVET_RUNNER_VERSION const RIVET_RUNNER_TOTAL_SLOTS = process.env.RIVET_RUNNER_TOTAL_SLOTS ? Number(process.env.RIVET_RUNNER_TOTAL_SLOTS) : 100; -const RIVET_ENDPOINT = process.env.RIVET_ENDPOINT ?? "http://localhost:6420"; +const RIVET_ENDPOINT = process.env.RIVET_ENDPOINT ?? "http://127.0.0.1:6420"; const RIVET_TOKEN = process.env.RIVET_TOKEN ?? "dev"; const AUTOSTART_SERVER = process.env.NO_AUTOSTART_SERVER == undefined; const AUTOSTART_RUNNER = process.env.NO_AUTOSTART_RUNNER == undefined;