Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 17 additions & 133 deletions src/lean_spec/subspecs/containers/state/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from collections.abc import Iterable
from collections.abc import Set as AbstractSet
from typing import TYPE_CHECKING

from lean_spec.subspecs.ssz.hash import hash_tree_root
from lean_spec.subspecs.xmss.aggregation import AggregatedSignatureProof
Expand All @@ -16,13 +15,13 @@
Uint64,
)

from ..attestation import AggregatedAttestation, AggregationBits, AttestationData
from ..attestation import AggregatedAttestation, AttestationData
from ..block import Block, BlockBody, BlockHeader
from ..block.types import AggregatedAttestations
from ..checkpoint import Checkpoint
from ..config import Config
from ..slot import Slot
from ..validator import ValidatorIndex, ValidatorIndices
from ..validator import ValidatorIndex
from .types import (
HistoricalBlockHashes,
JustificationRoots,
Expand All @@ -31,9 +30,6 @@
Validators,
)

if TYPE_CHECKING:
from lean_spec.subspecs.forkchoice import AttestationSignatureEntry


class State(Container):
"""The main consensus state object."""
Expand Down Expand Up @@ -692,7 +688,7 @@ def build_block(

found_entries = True

selected, _ = self._select_proofs_greedily(proofs)
selected, _ = AggregatedSignatureProof.select_greedily(proofs)
aggregated_signatures.extend(selected)
for proof in selected:
aggregated_attestations.append(
Expand Down Expand Up @@ -725,16 +721,23 @@ def build_block(

# Compact: merge all proofs sharing the same AttestationData into one
# using recursive children aggregation.
#
# During the fixed-point loop above, multiple proofs may have been
# selected for the same AttestationData across iterations. Group them
# and merge each group into a single recursive proof.
proof_groups: dict[AttestationData, list[AggregatedSignatureProof]] = {}
for att, sig in zip(aggregated_attestations, aggregated_signatures, strict=True):
proof_groups.setdefault(att.data, []).append(sig)

compacted_attestations: list[AggregatedAttestation] = []
compacted_signatures: list[AggregatedSignatureProof] = []
aggregated_attestations = []
aggregated_signatures = []
for att_data, proofs in proof_groups.items():
if len(proofs) == 1:
compacted_signatures.append(proofs[0])
sig = proofs[0]
else:
# Multiple proofs for the same data were aggregated separately.
# Merge them into one recursive proof using children-only
# aggregation (no new raw signatures).
children = [
(
proof,
Expand All @@ -745,24 +748,18 @@ def build_block(
)
for proof in proofs
]
merged = AggregatedSignatureProof.aggregate(
sig = AggregatedSignatureProof.aggregate(
xmss_participants=None,
children=children,
raw_xmss=[],
message=att_data.data_root_bytes(),
slot=att_data.slot,
)
compacted_signatures.append(merged)
compacted_attestations.append(
AggregatedAttestation(
aggregation_bits=compacted_signatures[-1].participants,
data=att_data,
)
aggregated_signatures.append(sig)
aggregated_attestations.append(
AggregatedAttestation(aggregation_bits=sig.participants, data=att_data)
)

aggregated_attestations = compacted_attestations
aggregated_signatures = compacted_signatures

# Create the final block with selected attestations.
final_block = Block(
slot=slot,
Expand All @@ -779,116 +776,3 @@ def build_block(
final_block = final_block.model_copy(update={"state_root": hash_tree_root(post_state)})

return final_block, post_state, aggregated_attestations, aggregated_signatures

@staticmethod
def _select_proofs_greedily(
*proof_sets: set[AggregatedSignatureProof] | None,
) -> tuple[list[AggregatedSignatureProof], set[ValidatorIndex]]:
"""
Greedy set-cover selection of signature proofs to maximize validator coverage.

Repeatedly selects the proof covering the most uncovered validators until
no proof adds new coverage. Earlier proof sets are prioritized.

Args:
proof_sets: Candidate proof sets in priority order.

Returns:
Selected proofs and the set of covered validator indices.
"""
selected: list[AggregatedSignatureProof] = []
covered: set[ValidatorIndex] = set()
for proofs in proof_sets:
if not proofs:
continue
remaining = list(proofs)
while remaining:
# Pick the proof that covers the most new validators.
best = max(
remaining,
key=lambda p: len(set(p.participants.to_validator_indices()) - covered),
)
new_coverage = set(best.participants.to_validator_indices()) - covered
# Stop when no proof in this set adds new coverage.
if not new_coverage:
break
selected.append(best)
covered.update(new_coverage)
remaining.remove(best)
return selected, covered

def aggregate(
self,
attestation_signatures: dict[AttestationData, set[AttestationSignatureEntry]] | None = None,
new_payloads: dict[AttestationData, set[AggregatedSignatureProof]] | None = None,
known_payloads: dict[AttestationData, set[AggregatedSignatureProof]] | None = None,
) -> list[tuple[AggregatedAttestation, AggregatedSignatureProof]]:
"""
Aggregate gossip signatures using new payloads, with known payloads as helpers.

Args:
attestation_signatures: Raw XMSS signatures from gossip, keyed by attestation data.
new_payloads: Aggregated proofs pending processing (child proofs).
known_payloads: Known aggregated proofs already accepted.

Returns:
List of (attestation, proof) pairs from aggregation.
"""
gossip_sigs = attestation_signatures or {}
new = new_payloads or {}
known = known_payloads or {}

attestation_keys = new.keys() | gossip_sigs.keys()
if not attestation_keys:
return []

results: list[tuple[AggregatedAttestation, AggregatedSignatureProof]] = []

for data in attestation_keys:
# Phase 1: Greedily select child proofs for maximum validator coverage.
# New payloads are prioritized over known payloads.
child_proofs, covered = self._select_proofs_greedily(new.get(data), known.get(data))

# Phase 2: Collect raw XMSS signatures for validators not yet covered.
# Sorted by validator index for deterministic output.
raw_entries = [
(
e.validator_id,
self.validators[e.validator_id].get_attestation_pubkey(),
e.signature,
)
for e in sorted(gossip_sigs.get(data, set()), key=lambda e: e.validator_id)
if e.validator_id not in covered
]

# Need at least one raw signature, or two child proofs to aggregate.
if not raw_entries and len(child_proofs) < 2:
continue

xmss_participants = AggregationBits.from_validator_indices(
ValidatorIndices(data=[vid for vid, _, _ in raw_entries])
)
raw_xmss = [(pk, sig) for _, pk, sig in raw_entries]

# Phase 3: Build recursive children with their public keys from the registry.
children = [
(
child,
[
self.validators[vid].get_attestation_pubkey()
for vid in child.participants.to_validator_indices()
],
)
for child in child_proofs
]
proof = AggregatedSignatureProof.aggregate(
xmss_participants=xmss_participants,
children=children,
raw_xmss=raw_xmss,
message=data.data_root_bytes(),
slot=data.slot,
)
attestation = AggregatedAttestation(aggregation_bits=proof.participants, data=data)
results.append((attestation, proof))

return results
140 changes: 122 additions & 18 deletions src/lean_spec/subspecs/forkchoice/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
JUSTIFICATION_LOOKBACK_SLOTS,
)
from lean_spec.subspecs.containers import (
AggregationBits,
AttestationData,
Block,
Checkpoint,
Expand All @@ -28,6 +29,7 @@
from lean_spec.subspecs.containers.attestation.attestation import SignedAggregatedAttestation
from lean_spec.subspecs.containers.block import BlockLookup
from lean_spec.subspecs.containers.slot import Slot
from lean_spec.subspecs.containers.validator import ValidatorIndices
from lean_spec.subspecs.metrics import registry as metrics
from lean_spec.subspecs.ssz.hash import hash_tree_root
from lean_spec.subspecs.xmss.aggregation import (
Expand Down Expand Up @@ -928,32 +930,134 @@ def update_safe_target(self) -> "Store":

def aggregate(self) -> tuple["Store", list[SignedAggregatedAttestation]]:
"""
Aggregate committee signatures and payloads together.
Turn raw validator votes into compact aggregated attestations.

This method aggregates signatures from the attestation_signatures map.
Validators cast individual signatures over gossip. Before those
votes can influence fork choice or be included in a block, they
must be combined into compact cryptographic proofs.

The store holds three pools of attestation evidence:

- **Gossip signatures**: individual validator votes arriving in real-time.
- **New payloads**: aggregated proofs from the current round, not yet
committed to the chain.
- **Known payloads**: previously accepted proofs, reusable as building
blocks for deeper aggregation.

For each unique piece of attestation data the algorithm proceeds in three phases:

1. **Select** — greedily pick existing proofs that maximize
validator coverage (new before known).
2. **Fill** — collect raw gossip signatures for any validators
not yet covered.
3. **Aggregate** — delegate to the XMSS subspec to produce a
single cryptographic proof.

After aggregation the store is updated:

- Consumed gossip signatures are removed.
- Newly produced proofs are recorded for future reuse.

Returns:
Tuple of (new Store with updated payloads, list of new SignedAggregatedAttestation).
Updated store and the list of freshly produced signed attestations.
"""
head_state = self.states[self.head]

aggregated_results = head_state.aggregate(
attestation_signatures=self.attestation_signatures,
new_payloads=self.latest_new_aggregated_payloads,
known_payloads=self.latest_known_aggregated_payloads,
)
validators = self.states[self.head].validators
gossip_sigs = self.attestation_signatures
new = self.latest_new_aggregated_payloads
known = self.latest_known_aggregated_payloads

new_aggregates: list[SignedAggregatedAttestation] = []

# Only attestation data with a new payload or a raw gossip signature
# can trigger aggregation. Known payloads alone cannot — they exist
# only to help extend coverage when combined with fresh evidence.
for data in new.keys() | gossip_sigs.keys():
# Phase 1: Select
#
# Start with the cheapest option: reuse proofs that already
# cover many validators.
#
# Child proofs are aggregated signatures from prior rounds.
# Selecting them first keeps the final proof tree shallow
# and avoids redundant cryptographic work.
#
# New payloads go first because they represent uncommitted
# work — known payloads fill remaining gaps.
child_proofs, covered = AggregatedSignatureProof.select_greedily(
new.get(data), known.get(data)
)

# Phase 2: Fill
#
# For every validator not yet covered by a child proof,
# include its individual gossip signature.
#
# Sorting by validator index guarantees deterministic proof
# construction regardless of network arrival order.
raw_entries = [
(
e.validator_id,
validators[e.validator_id].get_attestation_pubkey(),
e.signature,
)
for e in sorted(gossip_sigs.get(data, set()), key=lambda e: e.validator_id)
if e.validator_id not in covered
]

# The XMSS layer enforces a minimum: either at least one raw
# signature, or at least two child proofs to merge.
#
# A lone child proof is already a valid proof — nothing to do.
if not raw_entries and len(child_proofs) < 2:
continue

# Encode the set of raw signers as a compact bitfield.
xmss_participants = AggregationBits.from_validator_indices(
ValidatorIndices(data=[vid for vid, _, _ in raw_entries])
)
raw_xmss = [(pk, sig) for _, pk, sig in raw_entries]

# Phase 3: Aggregate
#
# Build the recursive proof tree.
#
# Each child proof needs its participants' public keys so
# the XMSS prover can verify inner proofs while constructing
# the outer one.
children = [
(
child,
[
validators[vid].get_attestation_pubkey()
for vid in child.participants.to_validator_indices()
],
)
for child in child_proofs
]

# Hand everything to the XMSS subspec.
# Out comes a single proof covering all selected validators.
proof = AggregatedSignatureProof.aggregate(
xmss_participants=xmss_participants,
children=children,
raw_xmss=raw_xmss,
message=data.data_root_bytes(),
slot=data.slot,
)
new_aggregates.append(SignedAggregatedAttestation(data=data, proof=proof))

# ── Store bookkeeping ────────────────────────────────────────
#
# Record freshly produced proofs so future rounds can reuse them.
# Remove gossip signatures that were consumed by this aggregation.
new_aggregated_payloads: dict[AttestationData, set[AggregatedSignatureProof]] = {}
aggregated_attestation_data: set[AttestationData] = set()
for att, proof in aggregated_results:
aggregated_attestation_data.add(att.data)
new_aggregates.append(SignedAggregatedAttestation(data=att.data, proof=proof))
new_aggregated_payloads.setdefault(att.data, set()).add(proof)
for signed_att in new_aggregates:
new_aggregated_payloads.setdefault(signed_att.data, set()).add(signed_att.proof)

remaining_attestation_signatures = {
attestation_data: signatures
for attestation_data, signatures in self.attestation_signatures.items()
if attestation_data not in aggregated_attestation_data
data: sigs
for data, sigs in self.attestation_signatures.items()
if data not in new_aggregated_payloads
}

return self.model_copy(
Expand Down
Loading
Loading