From 0c7ccd29bcf8492bbb80399e51d4fb743f1a574b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 21 Apr 2026 16:35:24 -0300 Subject: [PATCH] refactor(storage): dedupe PayloadBuffer proofs by subsumption on push MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Maintain each `PayloadBuffer` entry as an antichain under the subset relation on participants. On push: - skip if incoming participants are a subset (incl. equal) of any existing proof (adds no coverage); - otherwise remove existing proofs whose participants are a strict subset of the incoming one, then insert. This generalises the prior equality-only dedup and eliminates redundant proofs that otherwise accumulate after aggregation produces a superset of its child proofs, or when a block embeds a proof subsumed by one already in `known_payloads`. Validator coverage is monotonic across pushes, so `update_head`'s reads of `known_payloads` are never temporarily degraded. Affects every insert path transparently — `apply_aggregated_group`, `on_gossip_aggregated_attestation`, `on_block_core`, and `promote_new_aggregated_payloads` — without touching their call sites. --- crates/storage/src/store.rs | 258 ++++++++++++++++++++++++++++++++++-- 1 file changed, 250 insertions(+), 8 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index c15efe0..0c22b05 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -126,18 +126,45 @@ impl PayloadBuffer { } } - /// Insert proofs for an attestation, FIFO-evicting oldest data_roots when total proofs reach capacity. + /// Insert a proof, maintaining the antichain invariant per data_root. + /// + /// Each data_root entry holds proofs whose participant sets are pairwise + /// incomparable under the subset relation. On push: + /// + /// - If the incoming proof's participants are a subset (incl. equal) of + /// any existing proof, the incoming proof is redundant and skipped. + /// - Otherwise, any existing proof whose participants are a strict subset + /// of the incoming proof's is removed before inserting. + /// + /// This subsumes exact-equality dedup and keeps buffers bounded when an + /// aggregator produces a proof that supersedes prior children. + /// + /// FIFO-evicts oldest data_roots when `total_proofs` exceeds capacity. fn push(&mut self, hashed: HashedAttestationData, proof: AggregatedSignatureProof) { let (data_root, att_data) = hashed.into_parts(); + let new_set: HashSet = proof.participant_indices().collect(); + if let Some(entry) = self.data.get_mut(&data_root) { - // Skip duplicate proofs (same participants) - if entry - .proofs - .iter() - .any(|p| p.participants == proof.participants) - { - return; + let mut to_remove: Vec = Vec::new(); + for (i, p) in entry.proofs.iter().enumerate() { + let existing_set: HashSet = p.participant_indices().collect(); + // Incoming is subsumed by an existing proof (incl. equal) — skip. + if new_set.is_subset(&existing_set) { + return; + } + // Existing is a strict subset of incoming — mark for removal. + // (Non-strict equality was handled by the check above.) + if existing_set.is_subset(&new_set) { + to_remove.push(i); + } + } + + // Remove subsumed proofs (reverse order so earlier indices stay valid). + for i in to_remove.into_iter().rev() { + entry.proofs.swap_remove(i); + self.total_proofs -= 1; } + entry.proofs.push(proof); self.total_proofs += 1; } else { @@ -1628,6 +1655,17 @@ mod tests { AggregatedSignatureProof::empty(bits) } + /// Create a proof with bits set for every validator in `vids`. + fn make_proof_for_validators(vids: &[u64]) -> AggregatedSignatureProof { + use ethlambda_types::attestation::AggregationBits; + let max = vids.iter().copied().max().unwrap_or(0) as usize; + let mut bits = AggregationBits::with_length(max + 1).unwrap(); + for &v in vids { + bits.set(v as usize, true).unwrap(); + } + AggregatedSignatureProof::empty(bits) + } + fn make_att_data(slot: u64) -> AttestationData { AttestationData { slot, @@ -1751,6 +1789,210 @@ mod tests { assert_eq!(cloned.known_payloads.lock().unwrap().len(), 1); } + #[test] + fn payload_buffer_push_superset_removes_strict_subset() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1, 2]), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[1, 2, 3]), + ); + + assert_eq!(buf.total_proofs, 1); + assert_eq!(buf.data[&data_root].proofs.len(), 1); + let kept: HashSet = buf.data[&data_root].proofs[0] + .participant_indices() + .collect(); + assert_eq!(kept, HashSet::from([1, 2, 3])); + } + + #[test] + fn payload_buffer_push_subset_is_skipped() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1, 2, 3]), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[1, 2]), + ); + + assert_eq!(buf.total_proofs, 1); + assert_eq!(buf.data[&data_root].proofs.len(), 1); + let kept: HashSet = buf.data[&data_root].proofs[0] + .participant_indices() + .collect(); + assert_eq!(kept, HashSet::from([1, 2, 3])); + } + + #[test] + fn payload_buffer_push_equal_participants_is_skipped() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1, 2]), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[1, 2]), + ); + + assert_eq!(buf.total_proofs, 1); + assert_eq!(buf.data[&data_root].proofs.len(), 1); + } + + #[test] + fn payload_buffer_push_incomparable_proofs_coexist() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1, 2]), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[3, 4]), + ); + + assert_eq!(buf.total_proofs, 2); + assert_eq!(buf.data[&data_root].proofs.len(), 2); + } + + #[test] + fn payload_buffer_push_superset_absorbs_multiple_subsets() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + // Three pairwise-incomparable singletons: all retained. + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1]), + ); + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[2]), + ); + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[3]), + ); + assert_eq!(buf.total_proofs, 3); + + // Superset push absorbs all three at once. + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[1, 2, 3]), + ); + + assert_eq!(buf.total_proofs, 1); + assert_eq!(buf.data[&data_root].proofs.len(), 1); + // `order` still contains the single entry. + assert_eq!(buf.order.len(), 1); + assert_eq!(buf.order.front().copied(), Some(data_root)); + } + + #[test] + fn payload_buffer_push_mixed_kept_and_removed() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1, 2]), + ); + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[5, 6]), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[1, 2, 3]), + ); + + assert_eq!(buf.total_proofs, 2); + + let sets: HashSet> = buf.data[&data_root] + .proofs + .iter() + .map(|p| { + let mut v: Vec = p.participant_indices().collect(); + v.sort_unstable(); + v + }) + .collect(); + assert!(sets.contains(&vec![5, 6])); + assert!(sets.contains(&vec![1, 2, 3])); + } + + #[test] + fn payload_buffer_push_cross_data_root_independence() { + let mut buf = PayloadBuffer::new(10); + let data_a = make_att_data(1); + let data_b = make_att_data(2); + let root_a = data_a.hash_tree_root(); + let root_b = data_b.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data_a), + make_proof_for_validators(&[1, 2, 3]), + ); + buf.push( + HashedAttestationData::new(data_b), + make_proof_for_validators(&[1, 2]), + ); + + // Different data_roots → no cross-entry subsumption. + assert_eq!(buf.total_proofs, 2); + assert_eq!(buf.data[&root_a].proofs.len(), 1); + assert_eq!(buf.data[&root_b].proofs.len(), 1); + } + + #[test] + fn payload_buffer_push_fifo_eviction_uses_total_proofs() { + let mut buf = PayloadBuffer::new(2); + let data_a = make_att_data(1); + let data_b = make_att_data(2); + let data_c = make_att_data(3); + let root_a = data_a.hash_tree_root(); + let root_c = data_c.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data_a), + make_proof_for_validators(&[1]), + ); + buf.push( + HashedAttestationData::new(data_b), + make_proof_for_validators(&[2, 3]), + ); + // total_proofs == 3, over capacity → evict oldest (root_a). + // Pushing a third distinct data_root triggers eviction via capacity. + buf.push( + HashedAttestationData::new(data_c), + make_proof_for_validators(&[4]), + ); + + assert!(!buf.data.contains_key(&root_a)); + assert!(buf.data.contains_key(&root_c)); + assert_eq!(buf.total_proofs, 2); + } + // ============ GossipSignatureBuffer Tests ============ fn make_dummy_sig() -> ValidatorSignature {