Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions magicblock-ledger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ prost = { workspace = true }
serde = { workspace = true }
magicblock-accounts-db = { workspace = true }
magicblock-core = { workspace = true }
magicblock-metrics = { workspace = true }
scc = { workspace = true }
solana-account-decoder = { workspace = true }
solana-measure = { workspace = true }
Expand Down
9 changes: 9 additions & 0 deletions magicblock-ledger/src/database/ledger_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,15 @@ where
write_batch.delete::<C>(key);
}

pub fn delete_range(
&self,
from: C::Index,
to: C::Index,
) -> LedgerResult<()> {
self.backend
.delete_range_cf(self.handle(), C::key(from), C::key(to))
}

pub fn delete_range_in_batch(
&self,
write_batch: &mut WriteBatch,
Expand Down
10 changes: 10 additions & 0 deletions magicblock-ledger/src/database/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ impl Rocks {
Ok(())
}

pub fn delete_range_cf<K: AsRef<[u8]>>(
&self,
cf: &ColumnFamily,
from: K,
to: K,
) -> LedgerResult<()> {
self.db.delete_range_cf(cf, from, to)?;
Ok(())
}

/// Delete files whose slot range is within \[`from`, `to`\].
pub fn delete_file_in_range_cf(
&self,
Expand Down
137 changes: 95 additions & 42 deletions magicblock-ledger/src/ledger_truncator.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
use std::{
cmp::min,
ops::ControlFlow,
sync::Arc,
sync::{atomic::Ordering, Arc},
thread::{self, JoinHandle},
time::Duration,
};

use log::{error, info, warn};
use magicblock_metrics::metrics::{
observe_ledger_truncator_delete, start_ledger_truncator_compaction_timer,
HistogramTimer,
};
use solana_measure::measure::Measure;
use tokio::{runtime::Builder, time::interval};
use tokio_util::sync::CancellationToken;

use crate::{
database::columns::{
AddressSignatures, Blockhash, Blocktime, PerfSamples, SlotSignatures,
Transaction, TransactionMemos, TransactionStatus,
database::{
columns,
columns::{
AddressSignatures, Blockhash, Blocktime, PerfSamples,
SlotSignatures, Transaction, TransactionMemos, TransactionStatus,
},
},
errors::LedgerResult,
store::api::HasColumn,
Ledger,
};

Expand Down Expand Up @@ -48,9 +55,6 @@ impl LedgerTrunctationWorker {
}

pub async fn run(self) {
self.ledger
.initialize_lowest_cleanup_slot()
.expect("Lowest cleanup slot initialization");
let mut interval = interval(self.truncation_time_interval);
loop {
tokio::select! {
Expand Down Expand Up @@ -93,7 +97,7 @@ impl LedgerTrunctationWorker {
from_slot,
to_slot,
self.cancellation_token.clone(),
),
)?,
None => warn!("Could not estimate truncation range"),
}
}
Expand All @@ -102,6 +106,7 @@ impl LedgerTrunctationWorker {
}

/// Truncates ledger that is over desired size
/// We rely on present `CompactionFilter` to delete all data
pub fn truncate_fat_ledger(
&self,
current_ledger_size: u64,
Expand Down Expand Up @@ -136,7 +141,10 @@ impl LedgerTrunctationWorker {
"Fat truncation: truncating up to(inclusive): {}",
truncate_to_slot
);

self.ledger.set_lowest_cleanup_slot(truncate_to_slot);
Self::delete_slots(&self.ledger, 0, truncate_to_slot)?;

if let Err(err) = self.ledger.flush() {
// We will still compact
error!("Failed to flush: {}", err);
Expand All @@ -151,6 +159,73 @@ impl LedgerTrunctationWorker {
Ok(())
}

/// Inserts tombstones in slot-ordered columns for range [from; to] inclusive
/// This is a cheap operation since delete_range inserts one range tombstone
/// NOTE: this doesn't cover all the columns, we rely on CompactionFilter to clean the rest
fn delete_slots(
ledger: &Arc<Ledger>,
from_slot: u64,
to_slot: u64,
) -> LedgerResult<()> {
observe_ledger_truncator_delete(|| {
let start = from_slot;
let end = to_slot + 1;
ledger.delete_range_cf::<Blockhash>(start, end)?;
<Ledger as HasColumn<Blockhash>>::with_column(
ledger.as_ref(),
|column| {
column.try_decrease_entry_counter(end - start);
},
);

ledger.delete_range_cf::<Blocktime>(start, end)?;
<Ledger as HasColumn<Blocktime>>::with_column(
ledger.as_ref(),
|column| {
column.try_decrease_entry_counter(end - start);
},
);

ledger.delete_range_cf::<PerfSamples>(start, end)?;
<Ledger as HasColumn<PerfSamples>>::with_column(
ledger.as_ref(),
|column| {
column.try_decrease_entry_counter(end - start);
},
);

// Can cheaply delete SlotSignatures as well
// NOTE: we need to clean (to_slot, u32::MAX)
// since range is exclusive at the end we use (to_slot + 1, 0)
ledger.delete_range_cf::<SlotSignatures>(
(from_slot, 0),
(to_slot + 1, 0),
)?;

// Reset counters for other columns
// where we can't know how many el-ts gets deleted
macro_rules! reset_entry_counter {
($ledger:expr, $cf_ty:ty $(,)?) => {
<Ledger as HasColumn<$cf_ty>>::with_column(
$ledger.as_ref(),
|column| {
column
.entry_counter
.store(columns::DIRTY_COUNT, Ordering::Relaxed);
},
);
};
}
reset_entry_counter!(ledger, SlotSignatures);
reset_entry_counter!(ledger, TransactionStatus);
reset_entry_counter!(ledger, AddressSignatures);
reset_entry_counter!(ledger, Transaction);
reset_entry_counter!(ledger, TransactionMemos);

Ok(())
})
}

/// Returns range to truncate [from_slot, to_slot]
fn estimate_truncation_range(
&self,
Expand Down Expand Up @@ -196,53 +271,27 @@ impl LedgerTrunctationWorker {

/// Utility function for splitting truncation into smaller chunks
/// Cleans slots [from_slot; to_slot] inclusive range
/// We rely on present `CompactionFilter` to delete all data
pub fn truncate_slot_range(
ledger: &Arc<Ledger>,
from_slot: u64,
to_slot: u64,
cancellation_token: CancellationToken,
) {
// In order not to torture RocksDB's WriteBatch we split large tasks into chunks
const SINGLE_TRUNCATION_LIMIT: usize = 300;

) -> LedgerResult<()> {
if to_slot < from_slot {
warn!("LedgerTruncator: Nani?");
return;
return Ok(());
}

info!(
"LedgerTruncator: truncating slot range [{from_slot}; {to_slot}]"
);

let ledger_copy = ledger.clone();
(from_slot..=to_slot)
.step_by(SINGLE_TRUNCATION_LIMIT)
.try_for_each(|cur_from_slot| {
if cancellation_token.is_cancelled() {
return ControlFlow::Break(());
}

let num_slots_to_truncate = min(
to_slot - cur_from_slot + 1,
SINGLE_TRUNCATION_LIMIT as u64,
);
let truncate_to_slot =
cur_from_slot + num_slots_to_truncate - 1;

if let Err(err) = ledger_copy
.delete_slot_range(cur_from_slot, truncate_to_slot)
{
warn!(
"Failed to truncate slots {}-{}: {}",
cur_from_slot, truncate_to_slot, err
);
}

ControlFlow::Continue(())
});
ledger.set_lowest_cleanup_slot(to_slot);
Self::delete_slots(ledger, from_slot, to_slot)?;

// Flush memtables with tombstones prior to compaction
if let Err(err) = ledger_copy.flush() {
if let Err(err) = ledger.flush() {
error!("Failed to flush ledger: {err}");
}
Self::compact_slot_range(
Expand All @@ -251,6 +300,7 @@ impl LedgerTrunctationWorker {
to_slot,
cancellation_token,
);
Ok(())
}

/// Synchronous utility function that triggers and awaits compaction on all the columns
Expand Down Expand Up @@ -278,16 +328,19 @@ impl LedgerTrunctationWorker {

struct CompactionMeasure {
measure: Measure,
_histogram_timer: HistogramTimer,
}
impl Drop for CompactionMeasure {
fn drop(&mut self) {
self.measure.stop();
// histogram_timer - records on HistogramTimer::drop
info!("Manual compaction took: {}", self.measure);
}
}

let _measure = CompactionMeasure {
measure: Measure::start("Manual compaction"),
_histogram_timer: start_ledger_truncator_compaction_timer(),
};

let start = from_slot;
Expand Down Expand Up @@ -316,7 +369,7 @@ impl LedgerTrunctationWorker {
compact_cf_or_return!(
ledger,
cancellation_token,
(Some((from_slot, u32::MIN)), Some((to_slot + 1, u32::MAX))),
(Some((from_slot, u32::MIN)), Some((to_slot + 1, 0))),
SlotSignatures
);
compact_cf_or_return!(
Expand Down
Loading