Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl BlockChainServer {
// this the aggregator never sees its own validator's signature in
// gossip_signatures and it is excluded from aggregated proofs.
if self.is_aggregator {
let _ = store::on_gossip_attestation(&mut self.store, &signed_attestation)
let _ = store::on_gossip_attestation(&mut self.store, &signed_attestation, true)
.inspect_err(|err| {
warn!(%slot, %validator_id, %err, "Self-delivery of attestation failed")
});
Expand Down Expand Up @@ -471,11 +471,7 @@ impl BlockChainServer {
}

fn on_gossip_attestation(&mut self, attestation: &SignedAttestation) {
if !self.is_aggregator {
warn!("Received unaggregated attestation but node is not an aggregator");
return;
}
let _ = store::on_gossip_attestation(&mut self.store, attestation)
let _ = store::on_gossip_attestation(&mut self.store, attestation, self.is_aggregator)
.inspect_err(|err| warn!(%err, "Failed to process gossiped attestation"));
}

Expand Down
18 changes: 12 additions & 6 deletions crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,11 +528,14 @@ pub fn on_tick(

/// Process a gossiped attestation with signature verification.
///
/// Verifies the validator's XMSS signature and stores it for later aggregation
/// at interval 2. Only aggregator nodes receive unaggregated gossip attestations.
/// Every subscriber validates the attestation data and verifies the XMSS
/// signature so invalid messages get caught at the edge. Only aggregators
/// store the signature for later aggregation at interval 2; non-aggregators
/// drop it after verification.
pub fn on_gossip_attestation(
store: &mut Store,
signed_attestation: &SignedAttestation,
is_aggregator: bool,
) -> Result<(), StoreError> {
let validator_id = signed_attestation.validator_id;
let attestation = Attestation {
Expand Down Expand Up @@ -570,10 +573,13 @@ pub fn on_gossip_attestation(
}
metrics::inc_pq_sig_attestation_signatures_valid();

// Store gossip signature unconditionally for later aggregation at interval 2.
// Subnet filtering is handled at the P2P subscription layer.
store.insert_gossip_signature(hashed, validator_id, signature);
metrics::update_gossip_signatures(store.gossip_signatures_count());
// Only aggregators persist the signature for later aggregation at
// interval 2. Non-aggregators drop the validated attestation — they
// still participate in the mesh so peers see the message propagate.
if is_aggregator {
store.insert_gossip_signature(hashed, validator_id, signature);
metrics::update_gossip_signatures(store.gossip_signatures_count());
}

metrics::inc_attestations_valid(1);

Expand Down
60 changes: 26 additions & 34 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,48 +206,40 @@ pub fn build_swarm(
.subscribe(&aggregation_topic)
.unwrap();

// Aggregators subscribe to attestation subnets to receive unaggregated
// attestations. Non-aggregators don't need to subscribe; they publish via
// gossipsub fanout.
// Subscribe to attestation subnets per leanSpec (`src/lean_spec/__main__.py`):
// every validator subscribes to its own subnet for mesh health; aggregators
// additionally subscribe to explicit `aggregate_subnet_ids` and fall back to
// subnet 0 when they have no validators of their own.
let validator_subnets: HashSet<u64> = config
.validator_ids
.iter()
.map(|vid| vid % config.attestation_committee_count)
.collect();

// The committee metric should reflect validator membership only, not
// aggregator-only subscriptions.
let metric_subnet = validator_subnets.iter().copied().min().unwrap_or(0);
metrics::set_attestation_committee_subnet(metric_subnet);

let mut subscription_subnets = validator_subnets;
if config.is_aggregator {
let mut aggregate_subnets: HashSet<u64> = config
.validator_ids
.iter()
.map(|vid| vid % config.attestation_committee_count)
.collect();
if let Some(ref explicit_ids) = config.aggregate_subnet_ids {
aggregate_subnets.extend(explicit_ids);
}
// Aggregator with no validators and no explicit subnets: fallback to subnet 0
if aggregate_subnets.is_empty() {
aggregate_subnets.insert(0);
subscription_subnets.extend(explicit_ids);
}
Comment on lines 225 to 228
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Fallback to subnet 0 fires before explicit IDs are merged

The empty-check on subscription_subnets runs before aggregate_subnet_ids are folded in, so an aggregator configured with no validator_ids but with explicit aggregate_subnet_ids (e.g. [5]) will subscribe to both subnet 0 and subnet 5 instead of just subnet 5. The old code deferred the fallback until after extending with explicit IDs, so it only triggered when the set was still empty after both sources were considered. Moving the explicit-ID extension before the fallback check restores that behaviour:

Suggested change
if config.is_aggregator {
let mut aggregate_subnets: HashSet<u64> = config
.validator_ids
.iter()
.map(|vid| vid % config.attestation_committee_count)
.collect();
if let Some(ref explicit_ids) = config.aggregate_subnet_ids {
aggregate_subnets.extend(explicit_ids);
}
// Aggregator with no validators and no explicit subnets: fallback to subnet 0
if aggregate_subnets.is_empty() {
aggregate_subnets.insert(0);
if subscription_subnets.is_empty() {
subscription_subnets.insert(0);
}
for &subnet_id in &aggregate_subnets {
let topic = attestation_subnet_topic(subnet_id);
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
info!(subnet_id, "Subscribed to attestation subnet");
if let Some(ref explicit_ids) = config.aggregate_subnet_ids {
subscription_subnets.extend(explicit_ids);
}
if config.is_aggregator {
if let Some(ref explicit_ids) = config.aggregate_subnet_ids {
subscription_subnets.extend(explicit_ids);
}
// Aggregator with no validators and no explicit subnets: fallback to subnet 0
if subscription_subnets.is_empty() {
subscription_subnets.insert(0);
}
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/p2p/src/lib.rs
Line: 225-231

Comment:
**Fallback to subnet 0 fires before explicit IDs are merged**

The empty-check on `subscription_subnets` runs before `aggregate_subnet_ids` are folded in, so an aggregator configured with no `validator_ids` but with explicit `aggregate_subnet_ids` (e.g. `[5]`) will subscribe to **both** subnet 0 and subnet 5 instead of just subnet 5. The old code deferred the fallback until after extending with explicit IDs, so it only triggered when the set was still empty after both sources were considered. Moving the explicit-ID extension before the fallback check restores that behaviour:

```suggestion
    if config.is_aggregator {
        if let Some(ref explicit_ids) = config.aggregate_subnet_ids {
            subscription_subnets.extend(explicit_ids);
        }
        // Aggregator with no validators and no explicit subnets: fallback to subnet 0
        if subscription_subnets.is_empty() {
            subscription_subnets.insert(0);
        }
    }
```

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good bot

for &subnet_id in &aggregate_subnets {
let topic = attestation_subnet_topic(subnet_id);
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
info!(subnet_id, "Subscribed to attestation subnet");
// Fall back to subnet 0 only when the aggregator has no validators
// and no explicit subnets — otherwise leave the set as configured.
if subscription_subnets.is_empty() {
subscription_subnets.insert(0);
}
}

// Build topic cache (avoids string allocation on every publish).
// Includes validator subnets and any explicit aggregate_subnet_ids.
let mut attestation_topics: HashMap<u64, libp2p::gossipsub::IdentTopic> = HashMap::new();
for &vid in &config.validator_ids {
let subnet_id = vid % config.attestation_committee_count;
attestation_topics
.entry(subnet_id)
.or_insert_with(|| attestation_subnet_topic(subnet_id));
for &subnet_id in &subscription_subnets {
let topic = attestation_subnet_topic(subnet_id);
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
info!(subnet_id, "Subscribed to attestation subnet");
attestation_topics.insert(subnet_id, topic);
}
if let Some(ref explicit_ids) = config.aggregate_subnet_ids {
for &subnet_id in explicit_ids {
attestation_topics
.entry(subnet_id)
.or_insert_with(|| attestation_subnet_topic(subnet_id));
}
}

let metric_subnet = attestation_topics.keys().copied().min().unwrap_or(0);
metrics::set_attestation_committee_subnet(metric_subnet);

info!(socket=%config.listening_socket, "P2P node started");

Expand Down
Loading