diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 895afa4f336..a0a75dbb0d4 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -146,9 +146,13 @@ impl BlockCache { pub fn delete_blobs(&mut self, block_root: &Hash256) { let _ = self.blob_cache.pop(block_root); } + pub fn delete_data_columns(&mut self, block_root: &Hash256) { + let _ = self.data_column_cache.pop(block_root); + } pub fn delete(&mut self, block_root: &Hash256) { - let _ = self.block_cache.pop(block_root); - let _ = self.blob_cache.pop(block_root); + self.delete_block(block_root); + self.delete_blobs(block_root); + self.delete_data_columns(block_root); } } @@ -2553,6 +2557,16 @@ impl, Cold: ItemStore> HotColdDB .collect() } + /// Fetch all possible data column keys for a given `block_root`. + /// + /// Unlike `get_data_column_keys`, these keys are not necessarily all present in the database, + /// due to the node's custody requirements many just store a subset. + pub fn get_all_data_column_keys(&self, block_root: Hash256) -> Vec> { + (0..E::number_of_columns() as u64) + .map(|column_index| get_data_column_key(&block_root, &column_index)) + .collect() + } + /// Fetch a single data_column for a given block from the store. pub fn get_data_column( &self, @@ -3228,13 +3242,14 @@ impl, Cold: ItemStore> HotColdDB return Err(HotColdDBError::BlobPruneLogicError.into()); }; - // Start pruning from the epoch of the oldest blob stored. - // The start epoch is inclusive (blobs in this epoch will be pruned). + // The start epoch is not necessarily iterated back to, but is used for deciding whether we + // should attempt pruning. We could probably refactor it out eventually (while reducing our + // dependence on BlobInfo). let start_epoch = oldest_blob_slot.epoch(E::slots_per_epoch()); // Prune blobs up until the `data_availability_boundary - margin` or the split // slot's epoch, whichever is older. We can't prune blobs newer than the split. - // The end epoch is also inclusive (blobs in this epoch will be pruned). + // The end epoch is inclusive (blobs in this epoch will be pruned). let split = self.get_split_info(); let end_epoch = std::cmp::min( data_availability_boundary - margin_epochs - 1, @@ -3257,20 +3272,30 @@ impl, Cold: ItemStore> HotColdDB return Ok(()); } - // Sanity checks. - let anchor = self.get_anchor_info(); - if oldest_blob_slot < anchor.oldest_block_slot { - error!( - %oldest_blob_slot, - oldest_block_slot = %anchor.oldest_block_slot, - "Oldest blob is older than oldest block" + // Iterate blocks backwards from the `end_epoch` (usually the data availability boundary). + let Some((end_block_root, _)) = self + .forwards_block_roots_iterator_until(end_slot, end_slot, || { + self.get_hot_state(&split.state_root, true)? + .ok_or(HotColdDBError::MissingSplitState( + split.state_root, + split.slot, + )) + .map(|state| (state, split.state_root)) + .map_err(Into::into) + })? + .next() + .transpose()? + else { + // Can't prune blobs if we don't know the block at `end_slot`. This is expected if we + // have checkpoint synced and haven't backfilled to the DA boundary yet. + debug!( + %end_epoch, + %data_availability_boundary, + "No blobs to prune" ); - return Err(HotColdDBError::BlobPruneLogicError.into()); - } - - // Iterate block roots forwards from the oldest blob slot. + return Ok(()); + }; debug!( - %start_epoch, %end_epoch, %data_availability_boundary, "Pruning blobs" @@ -3279,48 +3304,77 @@ impl, Cold: ItemStore> HotColdDB // We collect block roots of deleted blobs in memory. Even for 10y of blob history this // vec won't go beyond 1GB. We can probably optimise this out eventually. let mut removed_block_roots = vec![]; + let mut blobs_db_ops = vec![]; + + // Iterate blocks backwards until we reach a block for which we've already pruned + // blobs/columns. + for tuple in ParentRootBlockIterator::new(self, end_block_root) { + let (block_root, blinded_block) = tuple?; + let slot = blinded_block.slot(); + + // If the block has no blobs we can't tell if they've been pruned, and there is nothing + // to prune, so we just skip. + if !blinded_block.message().body().has_blobs() { + continue; + } - let remove_blob_if = |blobs_bytes: &[u8]| { - let blobs = Vec::from_ssz_bytes(blobs_bytes)?; - let Some(blob): Option<&Arc>> = blobs.first() else { - return Ok(false); - }; - - if blob.slot() <= end_slot { - // Store the block root so we can delete from the blob cache - removed_block_roots.push(blob.block_root()); - // Delete from the on-disk db - return Ok(true); + // Check if we have blobs or columns stored. If not, we assume pruning has already + // reached this point. + let (db_column, db_keys) = if blinded_block.fork_name_unchecked().fulu_enabled() { + ( + DBColumn::BeaconDataColumn, + self.get_all_data_column_keys(block_root), + ) + } else { + (DBColumn::BeaconBlob, vec![block_root.as_slice().to_vec()]) }; - Ok(false) - }; - self.blobs_db - .delete_if(DBColumn::BeaconBlob, remove_blob_if)?; - - if self.spec.is_peer_das_enabled_for_epoch(start_epoch) { - let remove_data_column_if = |blobs_bytes: &[u8]| { - let data_column: DataColumnSidecar = - DataColumnSidecar::from_ssz_bytes(blobs_bytes)?; - - if data_column.slot() <= end_slot { - return Ok(true); - }; - - Ok(false) - }; + // For data columns, consider a block pruned if ALL column indices are absent. + // In future we might want to refactor this to read the data column indices that *exist* + // from the DB, which could be slightly more efficient than checking existence for every + // possible column. + let mut data_stored_for_block = false; + for db_key in db_keys { + if self.blobs_db.key_exists(db_column, &db_key)? { + data_stored_for_block = true; + blobs_db_ops.push(KeyValueStoreOp::DeleteKey(db_column, db_key)); + } + } - self.blobs_db - .delete_if(DBColumn::BeaconDataColumn, remove_data_column_if)?; + if data_stored_for_block { + debug!( + ?block_root, + %slot, + "Pruning blobs or columns for block" + ); + removed_block_roots.push(block_root); + } else { + debug!( + %slot, + ?block_root, + "Reached slot with blobs or columns already pruned" + ); + break; + } } // Remove deleted blobs from the cache. if let Some(mut block_cache) = self.block_cache.as_ref().map(|cache| cache.lock()) { for block_root in removed_block_roots { block_cache.delete_blobs(&block_root); + block_cache.delete_data_columns(&block_root); } } + // Remove from disk. + if !blobs_db_ops.is_empty() { + debug!( + num_deleted = blobs_db_ops.len(), + "Deleting blobs and data columns from disk" + ); + self.blobs_db.do_atomically(blobs_db_ops)?; + } + self.update_blob_or_data_column_info(start_epoch, end_slot, blob_info, data_column_info)?; debug!("Blob pruning complete");