Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 250 additions & 8 deletions crates/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,45 @@ impl PayloadBuffer {
}
}

/// Insert proofs for an attestation, FIFO-evicting oldest data_roots when total proofs reach capacity.
/// Insert a proof, maintaining the antichain invariant per data_root.
///
/// Each data_root entry holds proofs whose participant sets are pairwise
/// incomparable under the subset relation. On push:
///
/// - If the incoming proof's participants are a subset (incl. equal) of
/// any existing proof, the incoming proof is redundant and skipped.
/// - Otherwise, any existing proof whose participants are a strict subset
/// of the incoming proof's is removed before inserting.
///
/// This subsumes exact-equality dedup and keeps buffers bounded when an
/// aggregator produces a proof that supersedes prior children.
///
/// FIFO-evicts oldest data_roots when `total_proofs` exceeds capacity.
fn push(&mut self, hashed: HashedAttestationData, proof: AggregatedSignatureProof) {
let (data_root, att_data) = hashed.into_parts();
let new_set: HashSet<u64> = proof.participant_indices().collect();
Comment on lines 144 to +145
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Unconditional new_set allocation for new data_roots

new_set is built before the if let branch, so it's allocated and populated even when data_root isn't in self.data (the else path), where it's never read. Moving the allocation inside the if let Some(entry) arm would avoid this work for fresh insertions.

Suggested change
let (data_root, att_data) = hashed.into_parts();
let new_set: HashSet<u64> = proof.participant_indices().collect();
let (data_root, att_data) = hashed.into_parts();

Then compute new_set at the start of the if let Some(entry) block only.

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/storage/src/store.rs
Line: 144-145

Comment:
**Unconditional `new_set` allocation for new data_roots**

`new_set` is built before the `if let` branch, so it's allocated and populated even when `data_root` isn't in `self.data` (the `else` path), where it's never read. Moving the allocation inside the `if let Some(entry)` arm would avoid this work for fresh insertions.

```suggestion
        let (data_root, att_data) = hashed.into_parts();
```

Then compute `new_set` at the start of the `if let Some(entry)` block only.

How can I resolve this? If you propose a fix, please make it concise.


if let Some(entry) = self.data.get_mut(&data_root) {
// Skip duplicate proofs (same participants)
if entry
.proofs
.iter()
.any(|p| p.participants == proof.participants)
{
return;
let mut to_remove: Vec<usize> = Vec::new();
for (i, p) in entry.proofs.iter().enumerate() {
let existing_set: HashSet<u64> = p.participant_indices().collect();
// Incoming is subsumed by an existing proof (incl. equal) — skip.
if new_set.is_subset(&existing_set) {
return;
}
// Existing is a strict subset of incoming — mark for removal.
// (Non-strict equality was handled by the check above.)
if existing_set.is_subset(&new_set) {
to_remove.push(i);
}
}

// Remove subsumed proofs (reverse order so earlier indices stay valid).
for i in to_remove.into_iter().rev() {
entry.proofs.swap_remove(i);
self.total_proofs -= 1;
}

entry.proofs.push(proof);
self.total_proofs += 1;
} else {
Expand Down Expand Up @@ -1628,6 +1655,17 @@ mod tests {
AggregatedSignatureProof::empty(bits)
}

/// Create a proof with bits set for every validator in `vids`.
fn make_proof_for_validators(vids: &[u64]) -> AggregatedSignatureProof {
use ethlambda_types::attestation::AggregationBits;
let max = vids.iter().copied().max().unwrap_or(0) as usize;
let mut bits = AggregationBits::with_length(max + 1).unwrap();
for &v in vids {
bits.set(v as usize, true).unwrap();
}
AggregatedSignatureProof::empty(bits)
}

fn make_att_data(slot: u64) -> AttestationData {
AttestationData {
slot,
Expand Down Expand Up @@ -1751,6 +1789,210 @@ mod tests {
assert_eq!(cloned.known_payloads.lock().unwrap().len(), 1);
}

#[test]
fn payload_buffer_push_superset_removes_strict_subset() {
let mut buf = PayloadBuffer::new(10);
let data = make_att_data(1);
let data_root = data.hash_tree_root();

buf.push(
HashedAttestationData::new(data.clone()),
make_proof_for_validators(&[1, 2]),
);
buf.push(
HashedAttestationData::new(data),
make_proof_for_validators(&[1, 2, 3]),
);

assert_eq!(buf.total_proofs, 1);
assert_eq!(buf.data[&data_root].proofs.len(), 1);
let kept: HashSet<u64> = buf.data[&data_root].proofs[0]
.participant_indices()
.collect();
assert_eq!(kept, HashSet::from([1, 2, 3]));
}

#[test]
fn payload_buffer_push_subset_is_skipped() {
let mut buf = PayloadBuffer::new(10);
let data = make_att_data(1);
let data_root = data.hash_tree_root();

buf.push(
HashedAttestationData::new(data.clone()),
make_proof_for_validators(&[1, 2, 3]),
);
buf.push(
HashedAttestationData::new(data),
make_proof_for_validators(&[1, 2]),
);

assert_eq!(buf.total_proofs, 1);
assert_eq!(buf.data[&data_root].proofs.len(), 1);
let kept: HashSet<u64> = buf.data[&data_root].proofs[0]
.participant_indices()
.collect();
assert_eq!(kept, HashSet::from([1, 2, 3]));
}

#[test]
fn payload_buffer_push_equal_participants_is_skipped() {
let mut buf = PayloadBuffer::new(10);
let data = make_att_data(1);
let data_root = data.hash_tree_root();

buf.push(
HashedAttestationData::new(data.clone()),
make_proof_for_validators(&[1, 2]),
);
buf.push(
HashedAttestationData::new(data),
make_proof_for_validators(&[1, 2]),
);

assert_eq!(buf.total_proofs, 1);
assert_eq!(buf.data[&data_root].proofs.len(), 1);
}

#[test]
fn payload_buffer_push_incomparable_proofs_coexist() {
let mut buf = PayloadBuffer::new(10);
let data = make_att_data(1);
let data_root = data.hash_tree_root();

buf.push(
HashedAttestationData::new(data.clone()),
make_proof_for_validators(&[1, 2]),
);
buf.push(
HashedAttestationData::new(data),
make_proof_for_validators(&[3, 4]),
);

assert_eq!(buf.total_proofs, 2);
assert_eq!(buf.data[&data_root].proofs.len(), 2);
}

#[test]
fn payload_buffer_push_superset_absorbs_multiple_subsets() {
let mut buf = PayloadBuffer::new(10);
let data = make_att_data(1);
let data_root = data.hash_tree_root();

// Three pairwise-incomparable singletons: all retained.
buf.push(
HashedAttestationData::new(data.clone()),
make_proof_for_validators(&[1]),
);
buf.push(
HashedAttestationData::new(data.clone()),
make_proof_for_validators(&[2]),
);
buf.push(
HashedAttestationData::new(data.clone()),
make_proof_for_validators(&[3]),
);
assert_eq!(buf.total_proofs, 3);

// Superset push absorbs all three at once.
buf.push(
HashedAttestationData::new(data),
make_proof_for_validators(&[1, 2, 3]),
);

assert_eq!(buf.total_proofs, 1);
assert_eq!(buf.data[&data_root].proofs.len(), 1);
// `order` still contains the single entry.
assert_eq!(buf.order.len(), 1);
assert_eq!(buf.order.front().copied(), Some(data_root));
}

#[test]
fn payload_buffer_push_mixed_kept_and_removed() {
let mut buf = PayloadBuffer::new(10);
let data = make_att_data(1);
let data_root = data.hash_tree_root();

buf.push(
HashedAttestationData::new(data.clone()),
make_proof_for_validators(&[1, 2]),
);
buf.push(
HashedAttestationData::new(data.clone()),
make_proof_for_validators(&[5, 6]),
);
buf.push(
HashedAttestationData::new(data),
make_proof_for_validators(&[1, 2, 3]),
);

assert_eq!(buf.total_proofs, 2);

let sets: HashSet<Vec<u64>> = buf.data[&data_root]
.proofs
.iter()
.map(|p| {
let mut v: Vec<u64> = p.participant_indices().collect();
v.sort_unstable();
v
})
.collect();
assert!(sets.contains(&vec![5, 6]));
assert!(sets.contains(&vec![1, 2, 3]));
}

#[test]
fn payload_buffer_push_cross_data_root_independence() {
let mut buf = PayloadBuffer::new(10);
let data_a = make_att_data(1);
let data_b = make_att_data(2);
let root_a = data_a.hash_tree_root();
let root_b = data_b.hash_tree_root();

buf.push(
HashedAttestationData::new(data_a),
make_proof_for_validators(&[1, 2, 3]),
);
buf.push(
HashedAttestationData::new(data_b),
make_proof_for_validators(&[1, 2]),
);

// Different data_roots → no cross-entry subsumption.
assert_eq!(buf.total_proofs, 2);
assert_eq!(buf.data[&root_a].proofs.len(), 1);
assert_eq!(buf.data[&root_b].proofs.len(), 1);
}

#[test]
fn payload_buffer_push_fifo_eviction_uses_total_proofs() {
let mut buf = PayloadBuffer::new(2);
let data_a = make_att_data(1);
let data_b = make_att_data(2);
let data_c = make_att_data(3);
let root_a = data_a.hash_tree_root();
let root_c = data_c.hash_tree_root();

buf.push(
HashedAttestationData::new(data_a),
make_proof_for_validators(&[1]),
);
buf.push(
HashedAttestationData::new(data_b),
make_proof_for_validators(&[2, 3]),
);
// total_proofs == 3, over capacity → evict oldest (root_a).
// Pushing a third distinct data_root triggers eviction via capacity.
buf.push(
HashedAttestationData::new(data_c),
make_proof_for_validators(&[4]),
);

assert!(!buf.data.contains_key(&root_a));
assert!(buf.data.contains_key(&root_c));
assert_eq!(buf.total_proofs, 2);
}

// ============ GossipSignatureBuffer Tests ============

fn make_dummy_sig() -> ValidatorSignature {
Expand Down
Loading