diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 7aaa5c3d..63d62ef3 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -115,7 +115,7 @@ fn update_safe_target(store: &mut Store) { /// Aggregate committee signatures at interval 2. /// /// Collects individual gossip signatures, aggregates them by attestation data, -/// and stores the resulting proofs in `LatestNewAggregatedPayloads`. +/// and stores the resulting proofs in the new aggregated payloads buffer. fn aggregate_committee_signatures(store: &mut Store) -> Vec { let gossip_sigs: Vec<(SignatureKey, _)> = store.iter_gossip_signatures().collect(); if gossip_sigs.is_empty() { @@ -1067,7 +1067,7 @@ fn build_block( /// Select existing aggregated proofs for attestations to include in a block. /// /// Fresh gossip aggregation happens at interval 2 (`aggregate_committee_signatures`). -/// This function only selects from existing proofs in the `LatestKnownAggregatedPayloads` table +/// This function only selects from existing proofs in the known aggregated payloads buffer /// (proofs from previously received blocks and promoted gossip aggregations). /// /// Returns a list of (attestation, proof) pairs ready for block inclusion. diff --git a/crates/storage/src/api/tables.rs b/crates/storage/src/api/tables.rs index 7b184d59..7f7d7a3c 100644 --- a/crates/storage/src/api/tables.rs +++ b/crates/storage/src/api/tables.rs @@ -16,12 +16,6 @@ pub enum Table { GossipSignatures, /// Attestation data indexed by tree hash root: H256 -> AttestationData AttestationDataByRoot, - /// Pending aggregated payloads (not yet active in fork choice): - /// SignatureKey -> Vec - LatestNewAggregatedPayloads, - /// Active aggregated payloads (counted in fork choice): - /// SignatureKey -> Vec - LatestKnownAggregatedPayloads, /// Metadata: string keys -> various scalar values Metadata, /// Live chain index: (slot || root) -> parent_root @@ -33,15 +27,13 @@ pub enum Table { } /// All table variants. -pub const ALL_TABLES: [Table; 10] = [ +pub const ALL_TABLES: [Table; 8] = [ Table::BlockHeaders, Table::BlockBodies, Table::BlockSignatures, Table::States, Table::GossipSignatures, Table::AttestationDataByRoot, - Table::LatestNewAggregatedPayloads, - Table::LatestKnownAggregatedPayloads, Table::Metadata, Table::LiveChain, ]; diff --git a/crates/storage/src/backend/rocksdb.rs b/crates/storage/src/backend/rocksdb.rs index 45565f18..b1338052 100644 --- a/crates/storage/src/backend/rocksdb.rs +++ b/crates/storage/src/backend/rocksdb.rs @@ -18,8 +18,6 @@ fn cf_name(table: Table) -> &'static str { Table::States => "states", Table::GossipSignatures => "gossip_signatures", Table::AttestationDataByRoot => "attestation_data_by_root", - Table::LatestNewAggregatedPayloads => "latest_new_aggregated_payloads", - Table::LatestKnownAggregatedPayloads => "latest_known_aggregated_payloads", Table::Metadata => "metadata", Table::LiveChain => "live_chain", } diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 2f22db21..a69d39c2 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -1,5 +1,5 @@ -use std::collections::{HashMap, HashSet}; -use std::sync::{Arc, LazyLock}; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::sync::{Arc, LazyLock, Mutex}; /// The tree hash root of an empty block body. /// @@ -90,6 +90,69 @@ const _: () = assert!( "BLOCKS_TO_KEEP must be >= STATES_TO_KEEP" ); +/// Hard cap for the known aggregated payload buffer. +/// Matches Lantern's approach. With 9 validators, this holds +/// ~455 unique attestation messages (~30 min at 1/slot). +const AGGREGATED_PAYLOAD_CAP: usize = 4096; + +/// Hard cap for the new (pending) aggregated payload buffer. +/// Smaller than known since new payloads are drained every interval (~4s). +/// With 9 validators at 1 attestation/slot, one interval holds ~9 entries. +const NEW_PAYLOAD_CAP: usize = 512; + +/// Fixed-size circular buffer for aggregated payloads. +/// +/// Entries are evicted FIFO when the buffer reaches capacity. +/// This prevents unbounded memory growth when finalization stalls. +#[derive(Clone)] +struct PayloadBuffer { + entries: VecDeque<(SignatureKey, StoredAggregatedPayload)>, + capacity: usize, +} + +impl PayloadBuffer { + fn new(capacity: usize) -> Self { + Self { + entries: VecDeque::with_capacity(capacity), + capacity, + } + } + + /// Insert one entry, FIFO-evicting the oldest if at capacity. + fn push(&mut self, key: SignatureKey, payload: StoredAggregatedPayload) { + if self.entries.len() >= self.capacity { + self.entries.pop_front(); + } + self.entries.push_back((key, payload)); + } + + /// Insert multiple entries, FIFO-evicting as needed. + fn push_batch(&mut self, entries: Vec<(SignatureKey, StoredAggregatedPayload)>) { + for (key, payload) in entries { + self.push(key, payload); + } + } + + /// Take all entries, leaving the buffer empty. + fn drain(&mut self) -> Vec<(SignatureKey, StoredAggregatedPayload)> { + self.entries.drain(..).collect() + } + + /// Group entries by key, preserving insertion order within each group. + fn grouped(&self) -> HashMap> { + let mut map: HashMap> = HashMap::new(); + for (key, payload) in &self.entries { + map.entry(*key).or_default().push(payload.clone()); + } + map + } + + /// Return deduplicated keys. + fn unique_keys(&self) -> HashSet { + self.entries.iter().map(|(key, _)| *key).collect() + } +} + // ============ Key Encoding Helpers ============ /// Encode a SignatureKey (validator_id, root) to bytes. @@ -141,6 +204,8 @@ fn decode_live_chain_key(bytes: &[u8]) -> (u64, H256) { #[derive(Clone)] pub struct Store { backend: Arc, + new_payloads: Arc>, + known_payloads: Arc>, } impl Store { @@ -276,7 +341,11 @@ impl Store { info!(%anchor_state_root, %anchor_block_root, "Initialized store"); - Self { backend } + Self { + backend, + new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), + known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), + } } // ============ Metadata Helpers ============ @@ -385,14 +454,9 @@ impl Store { { let pruned_chain = self.prune_live_chain(finalized.slot); - // Prune signatures, payloads, and attestation data for finalized slots + // Prune signatures and attestation data for finalized slots let pruned_sigs = self.prune_gossip_signatures(finalized.slot); let pruned_att_data = self.prune_attestation_data_by_root(finalized.slot); - self.prune_aggregated_payload_table(Table::LatestNewAggregatedPayloads, finalized.slot); - self.prune_aggregated_payload_table( - Table::LatestKnownAggregatedPayloads, - finalized.slot, - ); // Prune old states before blocks: state pruning uses headers for slot lookup let protected_roots = [finalized.root, self.latest_justified().root]; let pruned_states = self.prune_old_states(&protected_roots); @@ -505,42 +569,6 @@ impl Store { }) } - /// Prune an aggregated payload table (new or known) for slots <= finalized_slot. - fn prune_aggregated_payload_table(&mut self, table: Table, finalized_slot: u64) { - let view = self.backend.begin_read().expect("read view"); - let mut updates = vec![]; - let mut deletes = vec![]; - - for (key_bytes, value_bytes) in view - .prefix_iterator(table, &[]) - .expect("iter") - .filter_map(|r| r.ok()) - { - if let Ok(mut payloads) = Vec::::from_ssz_bytes(&value_bytes) { - let original_len = payloads.len(); - payloads.retain(|p| p.slot > finalized_slot); - - if payloads.is_empty() { - deletes.push(key_bytes.to_vec()); - } else if payloads.len() < original_len { - updates.push((key_bytes.to_vec(), payloads.as_ssz_bytes())); - } - } - } - drop(view); - - if !updates.is_empty() || !deletes.is_empty() { - let mut batch = self.backend.begin_write().expect("write batch"); - if !updates.is_empty() { - batch.put_batch(table, updates).expect("put"); - } - if !deletes.is_empty() { - batch.delete_batch(table, deletes).expect("delete"); - } - batch.commit().expect("commit"); - } - } - /// Prune old states beyond the retention window. /// /// Keeps the most recent `STATES_TO_KEEP` states (by slot), plus any @@ -822,7 +850,8 @@ impl Store { /// Convenience: extract latest attestation per validator from known /// (fork-choice-active) aggregated payloads only. pub fn extract_latest_known_attestations(&self) -> HashMap { - self.extract_latest_attestations(self.iter_known_aggregated_payloads().map(|(key, _)| key)) + let keys = self.known_payloads.lock().unwrap().unique_keys(); + self.extract_latest_attestations(keys.into_iter()) } // ============ Known Aggregated Payloads ============ @@ -830,34 +859,37 @@ impl Store { // "Known" aggregated payloads are active in fork choice weight calculations. // Promoted from "new" payloads at specific intervals (0 with proposal, 4). - /// Iterates over all known aggregated payloads. + /// Iterates over all known aggregated payloads, grouped by key. pub fn iter_known_aggregated_payloads( &self, - ) -> impl Iterator)> + '_ { - self.iter_aggregated_payloads(Table::LatestKnownAggregatedPayloads) + ) -> impl Iterator)> { + self.known_payloads.lock().unwrap().grouped().into_iter() } - /// Iterates over keys only from the known aggregated payloads table, - /// skipping value deserialization. - pub fn iter_known_aggregated_payload_keys(&self) -> impl Iterator + '_ { - self.iter_aggregated_payload_keys(Table::LatestKnownAggregatedPayloads) + /// Iterates over deduplicated keys from the known aggregated payloads. + pub fn iter_known_aggregated_payload_keys(&self) -> impl Iterator { + self.known_payloads + .lock() + .unwrap() + .unique_keys() + .into_iter() } - /// Insert an aggregated payload into the known (fork-choice-active) table. + /// Insert an aggregated payload into the known (fork-choice-active) buffer. pub fn insert_known_aggregated_payload( &mut self, key: SignatureKey, payload: StoredAggregatedPayload, ) { - self.insert_aggregated_payload(Table::LatestKnownAggregatedPayloads, key, payload); + self.known_payloads.lock().unwrap().push(key, payload); } - /// Batch-insert multiple aggregated payloads into the known table in a single commit. + /// Batch-insert multiple aggregated payloads into the known buffer. pub fn insert_known_aggregated_payloads_batch( &mut self, entries: Vec<(SignatureKey, StoredAggregatedPayload)>, ) { - self.insert_aggregated_payloads_batch(Table::LatestKnownAggregatedPayloads, entries); + self.known_payloads.lock().unwrap().push_batch(entries); } // ============ New Aggregated Payloads ============ @@ -865,34 +897,33 @@ impl Store { // "New" aggregated payloads are pending — not yet counted in fork choice. // Promoted to "known" via `promote_new_aggregated_payloads`. - /// Iterates over all new (pending) aggregated payloads. + /// Iterates over all new (pending) aggregated payloads, grouped by key. pub fn iter_new_aggregated_payloads( &self, - ) -> impl Iterator)> + '_ { - self.iter_aggregated_payloads(Table::LatestNewAggregatedPayloads) + ) -> impl Iterator)> { + self.new_payloads.lock().unwrap().grouped().into_iter() } - /// Iterates over keys only from the new aggregated payloads table, - /// skipping value deserialization. - pub fn iter_new_aggregated_payload_keys(&self) -> impl Iterator + '_ { - self.iter_aggregated_payload_keys(Table::LatestNewAggregatedPayloads) + /// Iterates over deduplicated keys from the new aggregated payloads. + pub fn iter_new_aggregated_payload_keys(&self) -> impl Iterator { + self.new_payloads.lock().unwrap().unique_keys().into_iter() } - /// Insert an aggregated payload into the new (pending) table. + /// Insert an aggregated payload into the new (pending) buffer. pub fn insert_new_aggregated_payload( &mut self, key: SignatureKey, payload: StoredAggregatedPayload, ) { - self.insert_aggregated_payload(Table::LatestNewAggregatedPayloads, key, payload); + self.new_payloads.lock().unwrap().push(key, payload); } - /// Batch-insert multiple aggregated payloads into the new table in a single commit. + /// Batch-insert multiple aggregated payloads into the new buffer. pub fn insert_new_aggregated_payloads_batch( &mut self, entries: Vec<(SignatureKey, StoredAggregatedPayload)>, ) { - self.insert_aggregated_payloads_batch(Table::LatestNewAggregatedPayloads, entries); + self.new_payloads.lock().unwrap().push_batch(entries); } // ============ Pruning Helpers ============ @@ -930,132 +961,12 @@ impl Store { count } - // ============ Aggregated Payload Helpers ============ - - fn iter_aggregated_payloads( - &self, - table: Table, - ) -> impl Iterator)> { - let view = self.backend.begin_read().expect("read view"); - let entries: Vec<_> = view - .prefix_iterator(table, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .map(|(k, v)| { - let key = decode_signature_key(&k); - let payloads = - Vec::::from_ssz_bytes(&v).expect("valid payloads"); - (key, payloads) - }) - .collect(); - entries.into_iter() - } - - fn iter_aggregated_payload_keys(&self, table: Table) -> impl Iterator { - let view = self.backend.begin_read().expect("read view"); - let keys: Vec<_> = view - .prefix_iterator(table, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .map(|(k, _)| decode_signature_key(&k)) - .collect(); - keys.into_iter() - } - - fn insert_aggregated_payload( - &mut self, - table: Table, - key: SignatureKey, - payload: StoredAggregatedPayload, - ) { - self.insert_aggregated_payloads_batch(table, vec![(key, payload)]); - } - - /// Batch-insert multiple aggregated payloads in a single read-write-commit cycle. - /// Groups entries by key to correctly handle multiple payloads for the same key. - fn insert_aggregated_payloads_batch( - &mut self, - table: Table, - entries: Vec<(SignatureKey, StoredAggregatedPayload)>, - ) { - if entries.is_empty() { - return; - } - - // Group entries by key to handle multiple payloads for the same key - let mut grouped: HashMap, Vec> = HashMap::new(); - for (key, payload) in entries { - let encoded_key = encode_signature_key(&key); - grouped.entry(encoded_key).or_default().push(payload); - } - - let view = self.backend.begin_read().expect("read view"); - let mut batch_entries = Vec::new(); - - for (encoded_key, new_payloads) in grouped { - let mut payloads: Vec = view - .get(table, &encoded_key) - .expect("get") - .map(|bytes| Vec::::from_ssz_bytes(&bytes).expect("valid")) - .unwrap_or_default(); - payloads.extend(new_payloads); - batch_entries.push((encoded_key, payloads.as_ssz_bytes())); - } - drop(view); - - let mut batch = self.backend.begin_write().expect("write batch"); - batch - .put_batch(table, batch_entries) - .expect("put aggregated payloads"); - batch.commit().expect("commit"); - } - /// Promotes all new aggregated payloads to known, making them active in fork choice. /// - /// Merges entries from `LatestNewAggregatedPayloads` into `LatestKnownAggregatedPayloads`, - /// appending to existing payload lists rather than overwriting them. + /// Drains the new buffer and pushes all entries into the known buffer. pub fn promote_new_aggregated_payloads(&mut self) { - let view = self.backend.begin_read().expect("read view"); - let new_entries: Vec<(Vec, Vec)> = view - .prefix_iterator(Table::LatestNewAggregatedPayloads, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .map(|(k, v)| (k.to_vec(), v.to_vec())) - .collect(); - - if new_entries.is_empty() { - drop(view); - return; - } - - // Merge new payloads with existing known payloads - let merged: Vec<(Vec, Vec)> = new_entries - .iter() - .map(|(key, new_bytes)| { - let new_payloads = - Vec::::from_ssz_bytes(new_bytes).expect("valid"); - let mut known_payloads: Vec = view - .get(Table::LatestKnownAggregatedPayloads, key) - .expect("get") - .map(|bytes| { - Vec::::from_ssz_bytes(&bytes).expect("valid") - }) - .unwrap_or_default(); - known_payloads.extend(new_payloads); - (key.clone(), known_payloads.as_ssz_bytes()) - }) - .collect(); - drop(view); - - let keys_to_delete: Vec<_> = new_entries.into_iter().map(|(k, _)| k).collect(); - let mut batch = self.backend.begin_write().expect("write batch"); - batch - .delete_batch(Table::LatestNewAggregatedPayloads, keys_to_delete) - .expect("delete new aggregated payloads"); - batch - .put_batch(Table::LatestKnownAggregatedPayloads, merged) - .expect("put known aggregated payloads"); - batch.commit().expect("commit"); + let drained = self.new_payloads.lock().unwrap().drain(); + self.known_payloads.lock().unwrap().push_batch(drained); } /// Delete specific gossip signatures by key. @@ -1249,14 +1160,34 @@ mod tests { H256::from(bytes) } + impl Store { + /// Create a Store with an in-memory backend for tests. + fn test_store() -> Self { + let backend = Arc::new(InMemoryBackend::new()); + Self { + backend, + new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), + known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), + } + } + + /// Create a Store with a shared in-memory backend for tests that need + /// direct backend access. + fn test_store_with_backend(backend: Arc) -> Self { + Self { + backend, + new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), + known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), + } + } + } + // ============ Block Pruning Tests ============ #[test] fn prune_old_blocks_within_retention() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { - backend: backend.clone(), - }; + let mut store = Store::test_store_with_backend(backend.clone()); // Insert exactly BLOCKS_TO_KEEP blocks for i in 0..BLOCKS_TO_KEEP as u64 { @@ -1278,9 +1209,7 @@ mod tests { #[test] fn prune_old_blocks_exceeding_retention() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { - backend: backend.clone(), - }; + let mut store = Store::test_store_with_backend(backend.clone()); let total = BLOCKS_TO_KEEP + 10; for i in 0..total as u64 { @@ -1316,9 +1245,7 @@ mod tests { #[test] fn prune_old_blocks_preserves_protected() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { - backend: backend.clone(), - }; + let mut store = Store::test_store_with_backend(backend.clone()); let total = BLOCKS_TO_KEEP + 10; for i in 0..total as u64 { @@ -1359,9 +1286,7 @@ mod tests { #[test] fn prune_old_states_within_retention() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { - backend: backend.clone(), - }; + let mut store = Store::test_store_with_backend(backend.clone()); // Insert STATES_TO_KEEP headers + states for i in 0..STATES_TO_KEEP as u64 { @@ -1380,9 +1305,7 @@ mod tests { #[test] fn prune_old_states_exceeding_retention() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { - backend: backend.clone(), - }; + let mut store = Store::test_store_with_backend(backend.clone()); let total = STATES_TO_KEEP + 5; for i in 0..total as u64 { @@ -1411,9 +1334,7 @@ mod tests { #[test] fn prune_old_states_preserves_protected() { let backend = Arc::new(InMemoryBackend::new()); - let mut store = Store { - backend: backend.clone(), - }; + let mut store = Store::test_store_with_backend(backend.clone()); let total = STATES_TO_KEEP + 5; for i in 0..total as u64 { @@ -1430,4 +1351,99 @@ mod tests { assert!(has_key(backend.as_ref(), Table::States, &finalized_root)); assert!(has_key(backend.as_ref(), Table::States, &justified_root)); } + + // ============ PayloadBuffer Tests ============ + + fn make_payload(slot: u64) -> StoredAggregatedPayload { + use ethlambda_types::attestation::AggregationBits; + use ethlambda_types::block::AggregatedSignatureProof; + + StoredAggregatedPayload { + slot, + proof: AggregatedSignatureProof::empty(AggregationBits::with_capacity(0).unwrap()), + } + } + + #[test] + fn payload_buffer_fifo_eviction() { + let mut buf = PayloadBuffer::new(3); + let key = (0u64, H256::ZERO); + + buf.push(key, make_payload(1)); + buf.push(key, make_payload(2)); + buf.push(key, make_payload(3)); + assert_eq!(buf.entries.len(), 3); + + // Pushing a 4th entry should evict the oldest (slot 1) + buf.push(key, make_payload(4)); + assert_eq!(buf.entries.len(), 3); + let slots: Vec = buf.entries.iter().map(|(_, p)| p.slot).collect(); + assert_eq!(slots, vec![2, 3, 4]); + } + + #[test] + fn payload_buffer_grouped_returns_correct_groups() { + let mut buf = PayloadBuffer::new(10); + let key_a = (0u64, H256::ZERO); + let key_b = (1u64, H256::ZERO); + + buf.push(key_a, make_payload(1)); + buf.push(key_b, make_payload(2)); + buf.push(key_a, make_payload(3)); + + let grouped = buf.grouped(); + assert_eq!(grouped.len(), 2); + assert_eq!(grouped[&key_a].len(), 2); + assert_eq!(grouped[&key_a][0].slot, 1); + assert_eq!(grouped[&key_a][1].slot, 3); + assert_eq!(grouped[&key_b].len(), 1); + assert_eq!(grouped[&key_b][0].slot, 2); + } + + #[test] + fn payload_buffer_drain_empties_buffer() { + let mut buf = PayloadBuffer::new(10); + let key = (0u64, H256::ZERO); + + buf.push(key, make_payload(1)); + buf.push(key, make_payload(2)); + + let drained = buf.drain(); + assert_eq!(drained.len(), 2); + assert!(buf.entries.is_empty()); + } + + #[test] + fn promote_moves_new_to_known() { + let mut store = Store::test_store(); + + let key = (0u64, H256::ZERO); + store.insert_new_aggregated_payload(key, make_payload(1)); + store.insert_new_aggregated_payload(key, make_payload(2)); + + assert_eq!(store.new_payloads.lock().unwrap().entries.len(), 2); + assert_eq!(store.known_payloads.lock().unwrap().entries.len(), 0); + + store.promote_new_aggregated_payloads(); + + assert_eq!(store.new_payloads.lock().unwrap().entries.len(), 0); + assert_eq!(store.known_payloads.lock().unwrap().entries.len(), 2); + } + + #[test] + fn cloned_store_shares_payload_buffers() { + let mut store = Store::test_store(); + let cloned = store.clone(); + + let key = (0u64, H256::ZERO); + store.insert_new_aggregated_payload(key, make_payload(1)); + + // Modification on original should be visible in clone + assert_eq!(cloned.new_payloads.lock().unwrap().entries.len(), 1); + + store.promote_new_aggregated_payloads(); + + assert_eq!(cloned.new_payloads.lock().unwrap().entries.len(), 0); + assert_eq!(cloned.known_payloads.lock().unwrap().entries.len(), 1); + } }