diff --git a/Cargo.lock b/Cargo.lock index a4a8430d7..d551ea028 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3827,6 +3827,7 @@ dependencies = [ "log", "magicblock-accounts-db", "magicblock-core", + "magicblock-metrics", "num-format", "num_cpus", "prost 0.11.9", diff --git a/magicblock-ledger/Cargo.toml b/magicblock-ledger/Cargo.toml index c2302637b..a704ee296 100644 --- a/magicblock-ledger/Cargo.toml +++ b/magicblock-ledger/Cargo.toml @@ -20,6 +20,7 @@ prost = { workspace = true } serde = { workspace = true } magicblock-accounts-db = { workspace = true } magicblock-core = { workspace = true } +magicblock-metrics = { workspace = true } scc = { workspace = true } solana-account-decoder = { workspace = true } solana-measure = { workspace = true } diff --git a/magicblock-ledger/src/database/ledger_column.rs b/magicblock-ledger/src/database/ledger_column.rs index 0965989ba..218ce0a5e 100644 --- a/magicblock-ledger/src/database/ledger_column.rs +++ b/magicblock-ledger/src/database/ledger_column.rs @@ -254,6 +254,15 @@ where write_batch.delete::(key); } + pub fn delete_range( + &self, + from: C::Index, + to: C::Index, + ) -> LedgerResult<()> { + self.backend + .delete_range_cf(self.handle(), C::key(from), C::key(to)) + } + pub fn delete_range_in_batch( &self, write_batch: &mut WriteBatch, diff --git a/magicblock-ledger/src/database/rocks_db.rs b/magicblock-ledger/src/database/rocks_db.rs index a84cfec6f..319fda9da 100644 --- a/magicblock-ledger/src/database/rocks_db.rs +++ b/magicblock-ledger/src/database/rocks_db.rs @@ -121,6 +121,16 @@ impl Rocks { Ok(()) } + pub fn delete_range_cf>( + &self, + cf: &ColumnFamily, + from: K, + to: K, + ) -> LedgerResult<()> { + self.db.delete_range_cf(cf, from, to)?; + Ok(()) + } + /// Delete files whose slot range is within \[`from`, `to`\]. pub fn delete_file_in_range_cf( &self, diff --git a/magicblock-ledger/src/ledger_truncator.rs b/magicblock-ledger/src/ledger_truncator.rs index 98e7e72e1..a3317918a 100644 --- a/magicblock-ledger/src/ledger_truncator.rs +++ b/magicblock-ledger/src/ledger_truncator.rs @@ -1,22 +1,29 @@ use std::{ cmp::min, - ops::ControlFlow, - sync::Arc, + sync::{atomic::Ordering, Arc}, thread::{self, JoinHandle}, time::Duration, }; use log::{error, info, warn}; +use magicblock_metrics::metrics::{ + observe_ledger_truncator_delete, start_ledger_truncator_compaction_timer, + HistogramTimer, +}; use solana_measure::measure::Measure; use tokio::{runtime::Builder, time::interval}; use tokio_util::sync::CancellationToken; use crate::{ - database::columns::{ - AddressSignatures, Blockhash, Blocktime, PerfSamples, SlotSignatures, - Transaction, TransactionMemos, TransactionStatus, + database::{ + columns, + columns::{ + AddressSignatures, Blockhash, Blocktime, PerfSamples, + SlotSignatures, Transaction, TransactionMemos, TransactionStatus, + }, }, errors::LedgerResult, + store::api::HasColumn, Ledger, }; @@ -48,9 +55,6 @@ impl LedgerTrunctationWorker { } pub async fn run(self) { - self.ledger - .initialize_lowest_cleanup_slot() - .expect("Lowest cleanup slot initialization"); let mut interval = interval(self.truncation_time_interval); loop { tokio::select! { @@ -93,7 +97,7 @@ impl LedgerTrunctationWorker { from_slot, to_slot, self.cancellation_token.clone(), - ), + )?, None => warn!("Could not estimate truncation range"), } } @@ -102,6 +106,7 @@ impl LedgerTrunctationWorker { } /// Truncates ledger that is over desired size + /// We rely on present `CompactionFilter` to delete all data pub fn truncate_fat_ledger( &self, current_ledger_size: u64, @@ -136,7 +141,10 @@ impl LedgerTrunctationWorker { "Fat truncation: truncating up to(inclusive): {}", truncate_to_slot ); + self.ledger.set_lowest_cleanup_slot(truncate_to_slot); + Self::delete_slots(&self.ledger, 0, truncate_to_slot)?; + if let Err(err) = self.ledger.flush() { // We will still compact error!("Failed to flush: {}", err); @@ -151,6 +159,73 @@ impl LedgerTrunctationWorker { Ok(()) } + /// Inserts tombstones in slot-ordered columns for range [from; to] inclusive + /// This is a cheap operation since delete_range inserts one range tombstone + /// NOTE: this doesn't cover all the columns, we rely on CompactionFilter to clean the rest + fn delete_slots( + ledger: &Arc, + from_slot: u64, + to_slot: u64, + ) -> LedgerResult<()> { + observe_ledger_truncator_delete(|| { + let start = from_slot; + let end = to_slot + 1; + ledger.delete_range_cf::(start, end)?; + >::with_column( + ledger.as_ref(), + |column| { + column.try_decrease_entry_counter(end - start); + }, + ); + + ledger.delete_range_cf::(start, end)?; + >::with_column( + ledger.as_ref(), + |column| { + column.try_decrease_entry_counter(end - start); + }, + ); + + ledger.delete_range_cf::(start, end)?; + >::with_column( + ledger.as_ref(), + |column| { + column.try_decrease_entry_counter(end - start); + }, + ); + + // Can cheaply delete SlotSignatures as well + // NOTE: we need to clean (to_slot, u32::MAX) + // since range is exclusive at the end we use (to_slot + 1, 0) + ledger.delete_range_cf::( + (from_slot, 0), + (to_slot + 1, 0), + )?; + + // Reset counters for other columns + // where we can't know how many el-ts gets deleted + macro_rules! reset_entry_counter { + ($ledger:expr, $cf_ty:ty $(,)?) => { + >::with_column( + $ledger.as_ref(), + |column| { + column + .entry_counter + .store(columns::DIRTY_COUNT, Ordering::Relaxed); + }, + ); + }; + } + reset_entry_counter!(ledger, SlotSignatures); + reset_entry_counter!(ledger, TransactionStatus); + reset_entry_counter!(ledger, AddressSignatures); + reset_entry_counter!(ledger, Transaction); + reset_entry_counter!(ledger, TransactionMemos); + + Ok(()) + }) + } + /// Returns range to truncate [from_slot, to_slot] fn estimate_truncation_range( &self, @@ -196,53 +271,27 @@ impl LedgerTrunctationWorker { /// Utility function for splitting truncation into smaller chunks /// Cleans slots [from_slot; to_slot] inclusive range + /// We rely on present `CompactionFilter` to delete all data pub fn truncate_slot_range( ledger: &Arc, from_slot: u64, to_slot: u64, cancellation_token: CancellationToken, - ) { - // In order not to torture RocksDB's WriteBatch we split large tasks into chunks - const SINGLE_TRUNCATION_LIMIT: usize = 300; - + ) -> LedgerResult<()> { if to_slot < from_slot { warn!("LedgerTruncator: Nani?"); - return; + return Ok(()); } info!( "LedgerTruncator: truncating slot range [{from_slot}; {to_slot}]" ); - let ledger_copy = ledger.clone(); - (from_slot..=to_slot) - .step_by(SINGLE_TRUNCATION_LIMIT) - .try_for_each(|cur_from_slot| { - if cancellation_token.is_cancelled() { - return ControlFlow::Break(()); - } - - let num_slots_to_truncate = min( - to_slot - cur_from_slot + 1, - SINGLE_TRUNCATION_LIMIT as u64, - ); - let truncate_to_slot = - cur_from_slot + num_slots_to_truncate - 1; - - if let Err(err) = ledger_copy - .delete_slot_range(cur_from_slot, truncate_to_slot) - { - warn!( - "Failed to truncate slots {}-{}: {}", - cur_from_slot, truncate_to_slot, err - ); - } - - ControlFlow::Continue(()) - }); + ledger.set_lowest_cleanup_slot(to_slot); + Self::delete_slots(ledger, from_slot, to_slot)?; // Flush memtables with tombstones prior to compaction - if let Err(err) = ledger_copy.flush() { + if let Err(err) = ledger.flush() { error!("Failed to flush ledger: {err}"); } Self::compact_slot_range( @@ -251,6 +300,7 @@ impl LedgerTrunctationWorker { to_slot, cancellation_token, ); + Ok(()) } /// Synchronous utility function that triggers and awaits compaction on all the columns @@ -278,16 +328,19 @@ impl LedgerTrunctationWorker { struct CompactionMeasure { measure: Measure, + _histogram_timer: HistogramTimer, } impl Drop for CompactionMeasure { fn drop(&mut self) { self.measure.stop(); + // histogram_timer - records on HistogramTimer::drop info!("Manual compaction took: {}", self.measure); } } let _measure = CompactionMeasure { measure: Measure::start("Manual compaction"), + _histogram_timer: start_ledger_truncator_compaction_timer(), }; let start = from_slot; @@ -316,7 +369,7 @@ impl LedgerTrunctationWorker { compact_cf_or_return!( ledger, cancellation_token, - (Some((from_slot, u32::MIN)), Some((to_slot + 1, u32::MAX))), + (Some((from_slot, u32::MIN)), Some((to_slot + 1, 0))), SlotSignatures ); compact_cf_or_return!( diff --git a/magicblock-ledger/src/store/api.rs b/magicblock-ledger/src/store/api.rs index b1ba01dd8..00a762c99 100644 --- a/magicblock-ledger/src/store/api.rs +++ b/magicblock-ledger/src/store/api.rs @@ -174,6 +174,7 @@ impl Ledger { let (slot, blockhash) = ledger.get_max_blockhash()?; let time = ledger.get_block_time(slot)?.unwrap_or_default(); ledger.latest_block.store(slot, blockhash, time); + ledger.initialize_lowest_cleanup_slot()?; Ok(ledger) } @@ -1216,112 +1217,16 @@ impl Ledger { } } - /// Permanently removes ledger data for slots in the inclusive range `[from_slot, to_slot]`. - /// # Note: - /// - This is a destructive operation that cannot be undone - /// - Requires exclusive access to the lowest cleanup slot tracker - /// - All deletions are atomic (either all succeed or none do) - pub fn delete_slot_range( + pub fn delete_range_cf( &self, - from_slot: Slot, - to_slot: Slot, - ) -> LedgerResult<()> { - self.set_lowest_cleanup_slot(to_slot); - - let mut batch = self.db.batch(); - let num_deleted_slots = to_slot + 1 - from_slot; - self.blocktime_cf.delete_range_in_batch( - &mut batch, - from_slot, - to_slot + 1, - ); - self.blockhash_cf.delete_range_in_batch( - &mut batch, - from_slot, - to_slot + 1, - ); - self.perf_samples_cf.delete_range_in_batch( - &mut batch, - from_slot, - to_slot + 1, - ); - - let mut slot_signatures_deleted = 0; - let mut transaction_status_deleted = 0; - let mut transactions_deleted = 0; - let mut transaction_memos_deleted = 0; - let mut address_signatures_deleted = 0; - self.slot_signatures_cf - .iter(IteratorMode::From( - (from_slot, u32::MIN), - IteratorDirection::Forward, - ))? - .take_while(|((slot, _), _)| slot <= &to_slot) - .try_for_each(|((slot, transaction_index), raw_signature)| { - self.slot_signatures_cf - .delete_in_batch(&mut batch, (slot, transaction_index)); - slot_signatures_deleted += 1; - - let signature = Signature::try_from(raw_signature.as_ref())?; - self.transaction_status_cf - .delete_in_batch(&mut batch, (signature, slot)); - transaction_status_deleted += 1; - - self.transaction_cf - .delete_in_batch(&mut batch, (signature, slot)); - transactions_deleted += 1; - - self.transaction_memos_cf - .delete_in_batch(&mut batch, (signature, slot)); - transaction_memos_deleted += 1; - - let transaction = self - .transaction_cf - .get_protobuf((signature, slot))? - .map(VersionedTransaction::from) - .ok_or(LedgerError::TransactionNotFound)?; - - transaction.message.static_account_keys().iter().for_each( - |address| { - self.address_signatures_cf.delete_in_batch( - &mut batch, - (*address, slot, transaction_index, signature), - ); - address_signatures_deleted += 1; - }, - ); - - // TODO(edwin): add AccountModData cleanup - Ok::<_, LedgerError>(()) - })?; - - self.db.write(batch)?; - - self.blocktime_cf - .try_decrease_entry_counter(num_deleted_slots); - self.blockhash_cf - .try_decrease_entry_counter(num_deleted_slots); - self.perf_samples_cf - .try_decrease_entry_counter(num_deleted_slots); - self.slot_signatures_cf - .try_decrease_entry_counter(slot_signatures_deleted); - self.transaction_status_cf - .try_decrease_entry_counter(transaction_status_deleted); - self.transaction_cf - .try_decrease_entry_counter(transactions_deleted); - self.transaction_memos_cf - .try_decrease_entry_counter(transaction_memos_deleted); - self.address_signatures_cf - .try_decrease_entry_counter(address_signatures_deleted); - - // To not spend time querying DB for value we set drop the counter - // This shouldn't happen very often due to rarity of actual truncations. - self.transaction_successful_status_count - .store(DIRTY_COUNT, Ordering::Release); - self.transaction_failed_status_count - .store(DIRTY_COUNT, Ordering::Release); - - Ok(()) + from: C::Index, + to: C::Index, + ) -> LedgerResult<()> + where + C: Column + ColumnName, + Self: HasColumn, + { + >::column(self).delete_range(from, to) } pub fn compact_slot_range_cf( @@ -1373,6 +1278,39 @@ impl Ledger { } } +pub trait HasColumn +where + C: Column + ColumnName, +{ + fn column(&self) -> &LedgerColumn; + fn with_column(&self, f: F) -> R + where + F: FnOnce(&LedgerColumn) -> R, + { + f(self.column()) + } +} + +macro_rules! impl_has_column { + ($cf_ty:ident, $field:ident) => { + impl HasColumn for Ledger { + fn column(&self) -> &LedgerColumn { + &self.$field + } + } + }; +} + +impl_has_column!(TransactionStatus, transaction_status_cf); +impl_has_column!(AddressSignatures, address_signatures_cf); +impl_has_column!(SlotSignatures, slot_signatures_cf); +impl_has_column!(Blocktime, blocktime_cf); +impl_has_column!(Blockhash, blockhash_cf); +impl_has_column!(Transaction, transaction_cf); +impl_has_column!(TransactionMemos, transaction_memos_cf); +impl_has_column!(PerfSamples, perf_samples_cf); +impl_has_column!(AccountModDatas, account_mod_datas_cf); + // ----------------- // Tests // ----------------- @@ -2474,90 +2412,4 @@ mod tests { assert_eq!(sig_info_dos.memo, Some("Test Dos Memo".to_string())); } } - - #[test] - fn test_truncate_slots() { - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let store = Ledger::open(ledger_path.path()).unwrap(); - - // Create test data - let slots_to_delete = [10, 15]; - let slots_to_preserve = [20]; - let test_data: Vec<_> = slots_to_delete - .iter() - .chain(slots_to_preserve.iter()) - .map(|&slot| { - let sig = Signature::new_unique(); - let (tx, sanitized) = - create_confirmed_transaction(slot, 5, Some(100), None); - (sig, slot, tx, sanitized) - }) - .collect(); - - // Write data to ledger - test_data.iter().for_each(|(sig, slot, tx, sanitized)| { - store - .write_transaction( - *sig, - *slot, - sanitized.clone(), - tx.tx_with_meta.get_status_meta().unwrap(), - ) - .unwrap(); - store.write_block(*slot, 100, Hash::new_unique()).unwrap(); - store - .write_transaction_memos( - sig, - *slot, - format!("Memo for slot {}", slot), - ) - .unwrap(); - }); - - // Truncate slots 10-15 (should remove first two entries) - assert!(store - .delete_slot_range( - *slots_to_delete.first().unwrap(), - *slots_to_delete.last().unwrap() - ) - .is_ok()); - - // Consistency checks - let (to_delete, to_preserve) = - test_data.split_at(slots_to_delete.len()); - to_delete.iter().for_each(|(sig, slot, _, _)| { - assert!(store - .transaction_cf - .get_protobuf((*sig, *slot)) - .unwrap() - .is_none()); - assert!(store - .transaction_status_cf - .get_protobuf((*sig, *slot)) - .unwrap() - .is_none()); - assert!(store.blocktime_cf.get(*slot).unwrap().is_none()); - assert!(store - .read_transaction_memos(*sig, *slot) - .unwrap() - .is_none()); - }); - to_preserve.iter().for_each(|(sig, slot, _, _)| { - assert!(store - .transaction_cf - .get_protobuf((*sig, *slot)) - .unwrap() - .is_some()); - assert!(store - .transaction_status_cf - .get_protobuf((*sig, *slot)) - .unwrap() - .is_some()); - assert!(store.blocktime_cf.get(*slot).unwrap().is_some()); - assert_eq!( - store.read_transaction_memos(*sig, *slot).unwrap(), - Some(format!("Memo for slot {}", slot)) - ); - }); - } } diff --git a/magicblock-metrics/src/metrics/mod.rs b/magicblock-metrics/src/metrics/mod.rs index 52e607578..e1a872217 100644 --- a/magicblock-metrics/src/metrics/mod.rs +++ b/magicblock-metrics/src/metrics/mod.rs @@ -99,6 +99,25 @@ lazy_static::lazy_static! { SECONDS_1_9.iter()).cloned().collect() ), ).unwrap(); + pub static ref LEDGER_TRUNCATOR_COMPACTION_SECONDS: Histogram = Histogram::with_opts( + HistogramOpts::new( + "ledger_truncator_compaction_seconds", + "Time taken to compact rocksdb columns" + ) + .buckets( + vec![10.0, 30.0, (30 * 60) as f64, (60 * 60) as f64, (3 * 60 * 60) as f64 ] + ), + ).unwrap(); + pub static ref LEDGER_TRUNCATOR_DELETE_SECONDS: Histogram = Histogram::with_opts( + HistogramOpts::new( + "ledger_truncator_delete_seconds", + "Time taken to delete rocksdb slot ranges" + ) + .buckets( + vec![0.1, 1.0, 5.0, 10.0, 30.0] + ), + ).unwrap(); + // ----------------- // Accounts @@ -356,6 +375,8 @@ pub(crate) fn register() { register!(LEDGER_PERF_SAMPLES_GAUGE); register!(LEDGER_ACCOUNT_MOD_DATA_GAUGE); register!(LEDGER_COLUMNS_COUNT_DURATION_SECONDS); + register!(LEDGER_TRUNCATOR_COMPACTION_SECONDS); + register!(LEDGER_TRUNCATOR_DELETE_SECONDS); register!(ACCOUNTS_SIZE_GAUGE); register!(ACCOUNTS_COUNT_GAUGE); register!(PENDING_ACCOUNT_CLONES_GAUGE); @@ -454,6 +475,14 @@ where LEDGER_COLUMNS_COUNT_DURATION_SECONDS.observe_closure_duration(f) } +pub fn start_ledger_truncator_compaction_timer() -> HistogramTimer { + LEDGER_TRUNCATOR_COMPACTION_SECONDS.start_timer() +} + +pub fn observe_ledger_truncator_delete T>(f: F) -> T { + LEDGER_TRUNCATOR_DELETE_SECONDS.observe_closure_duration(f) +} + pub fn set_accounts_size(value: i64) { ACCOUNTS_SIZE_GAUGE.set(value) } diff --git a/test-integration/Cargo.lock b/test-integration/Cargo.lock index ab8729f0c..3e2050035 100644 --- a/test-integration/Cargo.lock +++ b/test-integration/Cargo.lock @@ -3605,7 +3605,7 @@ dependencies = [ "solana-rpc", "solana-rpc-client", "solana-sdk", - "solana-svm 2.2.1 (git+https://github.com/magicblock-labs/magicblock-svm.git?rev=e480fa2)", + "solana-svm 2.2.1 (git+https://github.com/magicblock-labs/magicblock-svm.git?rev=86c2cb0f)", "solana-transaction", "tempfile", "thiserror 1.0.69", @@ -3783,6 +3783,7 @@ dependencies = [ "log", "magicblock-accounts-db", "magicblock-core", + "magicblock-metrics", "num-format", "num_cpus", "prost", @@ -3794,7 +3795,7 @@ dependencies = [ "solana-metrics", "solana-sdk", "solana-storage-proto 0.3.1", - "solana-svm 2.2.1 (git+https://github.com/magicblock-labs/magicblock-svm.git?rev=e480fa2)", + "solana-svm 2.2.1 (git+https://github.com/magicblock-labs/magicblock-svm.git?rev=86c2cb0f)", "solana-timings", "solana-transaction-status", "thiserror 1.0.69", @@ -3861,7 +3862,7 @@ dependencies = [ "solana-pubkey", "solana-rent-collector", "solana-sdk-ids", - "solana-svm 2.2.1 (git+https://github.com/magicblock-labs/magicblock-svm.git?rev=e480fa2)", + "solana-svm 2.2.1 (git+https://github.com/magicblock-labs/magicblock-svm.git?rev=86c2cb0f)", "solana-svm-transaction", "solana-system-program", "solana-transaction", @@ -3938,7 +3939,7 @@ dependencies = [ "solana-program", "solana-pubsub-client", "solana-sdk", - "solana-svm 2.2.1 (git+https://github.com/magicblock-labs/magicblock-svm.git?rev=e480fa2)", + "solana-svm 2.2.1 (git+https://github.com/magicblock-labs/magicblock-svm.git?rev=86c2cb0f)", "solana-timings", "thiserror 1.0.69", "tokio", @@ -9125,7 +9126,7 @@ dependencies = [ [[package]] name = "solana-svm" version = "2.2.1" -source = "git+https://github.com/magicblock-labs/magicblock-svm.git?rev=e480fa2#e480fa202f0680476b51b2d41210667ffc241bf4" +source = "git+https://github.com/magicblock-labs/magicblock-svm.git?rev=86c2cb0f#86c2cb0f2d92c6e92757e951a8f27757fef3aaea" dependencies = [ "ahash 0.8.12", "log",