Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 100 additions & 46 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,13 @@ impl<E: EthSpec> BlockCache<E> {
pub fn delete_blobs(&mut self, block_root: &Hash256) {
let _ = self.blob_cache.pop(block_root);
}
pub fn delete_data_columns(&mut self, block_root: &Hash256) {
let _ = self.data_column_cache.pop(block_root);
}
pub fn delete(&mut self, block_root: &Hash256) {
let _ = self.block_cache.pop(block_root);
let _ = self.blob_cache.pop(block_root);
self.delete_block(block_root);
self.delete_blobs(block_root);
self.delete_data_columns(block_root);
}
}

Expand Down Expand Up @@ -2553,6 +2557,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.collect()
}

/// Fetch all possible data column keys for a given `block_root`.
///
/// Unlike `get_data_column_keys`, these keys are not necessarily all present in the database,
/// due to the node's custody requirements many just store a subset.
pub fn get_all_data_column_keys(&self, block_root: Hash256) -> Vec<Vec<u8>> {
(0..E::number_of_columns() as u64)
.map(|column_index| get_data_column_key(&block_root, &column_index))
.collect()
}

/// Fetch a single data_column for a given block from the store.
pub fn get_data_column(
&self,
Expand Down Expand Up @@ -3228,13 +3242,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
return Err(HotColdDBError::BlobPruneLogicError.into());
};

// Start pruning from the epoch of the oldest blob stored.
// The start epoch is inclusive (blobs in this epoch will be pruned).
// The start epoch is not necessarily iterated back to, but is used for deciding whether we
// should attempt pruning. We could probably refactor it out eventually (while reducing our
// dependence on BlobInfo).
let start_epoch = oldest_blob_slot.epoch(E::slots_per_epoch());

// Prune blobs up until the `data_availability_boundary - margin` or the split
// slot's epoch, whichever is older. We can't prune blobs newer than the split.
// The end epoch is also inclusive (blobs in this epoch will be pruned).
// The end epoch is inclusive (blobs in this epoch will be pruned).
let split = self.get_split_info();
let end_epoch = std::cmp::min(
data_availability_boundary - margin_epochs - 1,
Expand All @@ -3257,20 +3272,30 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
return Ok(());
}

// Sanity checks.
let anchor = self.get_anchor_info();
if oldest_blob_slot < anchor.oldest_block_slot {
error!(
%oldest_blob_slot,
oldest_block_slot = %anchor.oldest_block_slot,
"Oldest blob is older than oldest block"
// Iterate blocks backwards from the `end_epoch` (usually the data availability boundary).
let Some((end_block_root, _)) = self
.forwards_block_roots_iterator_until(end_slot, end_slot, || {
self.get_hot_state(&split.state_root, true)?
.ok_or(HotColdDBError::MissingSplitState(
split.state_root,
split.slot,
))
.map(|state| (state, split.state_root))
.map_err(Into::into)
})?
.next()
.transpose()?
else {
// Can't prune blobs if we don't know the block at `end_slot`. This is expected if we
// have checkpoint synced and haven't backfilled to the DA boundary yet.
debug!(
%end_epoch,
%data_availability_boundary,
"No blobs to prune"
);
return Err(HotColdDBError::BlobPruneLogicError.into());
}

// Iterate block roots forwards from the oldest blob slot.
return Ok(());
};
debug!(
%start_epoch,
%end_epoch,
%data_availability_boundary,
"Pruning blobs"
Expand All @@ -3279,48 +3304,77 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// We collect block roots of deleted blobs in memory. Even for 10y of blob history this
// vec won't go beyond 1GB. We can probably optimise this out eventually.
let mut removed_block_roots = vec![];
let mut blobs_db_ops = vec![];

// Iterate blocks backwards until we reach a block for which we've already pruned
// blobs/columns.
for tuple in ParentRootBlockIterator::new(self, end_block_root) {
let (block_root, blinded_block) = tuple?;
let slot = blinded_block.slot();

// If the block has no blobs we can't tell if they've been pruned, and there is nothing
// to prune, so we just skip.
if !blinded_block.message().body().has_blobs() {
continue;
}

let remove_blob_if = |blobs_bytes: &[u8]| {
let blobs = Vec::from_ssz_bytes(blobs_bytes)?;
let Some(blob): Option<&Arc<BlobSidecar<E>>> = blobs.first() else {
return Ok(false);
};

if blob.slot() <= end_slot {
// Store the block root so we can delete from the blob cache
removed_block_roots.push(blob.block_root());
// Delete from the on-disk db
return Ok(true);
// Check if we have blobs or columns stored. If not, we assume pruning has already
// reached this point.
let (db_column, db_keys) = if blinded_block.fork_name_unchecked().fulu_enabled() {
(
DBColumn::BeaconDataColumn,
self.get_all_data_column_keys(block_root),
)
} else {
(DBColumn::BeaconBlob, vec![block_root.as_slice().to_vec()])
};
Ok(false)
};

self.blobs_db
.delete_if(DBColumn::BeaconBlob, remove_blob_if)?;

if self.spec.is_peer_das_enabled_for_epoch(start_epoch) {
let remove_data_column_if = |blobs_bytes: &[u8]| {
let data_column: DataColumnSidecar<E> =
DataColumnSidecar::from_ssz_bytes(blobs_bytes)?;

if data_column.slot() <= end_slot {
return Ok(true);
};

Ok(false)
};
// For data columns, consider a block pruned if ALL column indices are absent.
// In future we might want to refactor this to read the data column indices that *exist*
// from the DB, which could be slightly more efficient than checking existence for every
// possible column.
let mut data_stored_for_block = false;
for db_key in db_keys {
if self.blobs_db.key_exists(db_column, &db_key)? {
data_stored_for_block = true;
blobs_db_ops.push(KeyValueStoreOp::DeleteKey(db_column, db_key));
}
}

self.blobs_db
.delete_if(DBColumn::BeaconDataColumn, remove_data_column_if)?;
if data_stored_for_block {
debug!(
?block_root,
%slot,
"Pruning blobs or columns for block"
);
removed_block_roots.push(block_root);
} else {
debug!(
%slot,
?block_root,
"Reached slot with blobs or columns already pruned"
);
break;
}
}

// Remove deleted blobs from the cache.
if let Some(mut block_cache) = self.block_cache.as_ref().map(|cache| cache.lock()) {
for block_root in removed_block_roots {
block_cache.delete_blobs(&block_root);
block_cache.delete_data_columns(&block_root);
}
}

// Remove from disk.
if !blobs_db_ops.is_empty() {
debug!(
num_deleted = blobs_db_ops.len(),
"Deleting blobs and data columns from disk"
);
self.blobs_db.do_atomically(blobs_db_ops)?;
}

self.update_blob_or_data_column_info(start_epoch, end_slot, blob_info, data_column_info)?;

debug!("Blob pruning complete");
Expand Down
Loading