diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index e03865cf..9ee4ee8e 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -198,7 +198,7 @@ impl BlockChainServer { // this the aggregator never sees its own validator's signature in // gossip_signatures and it is excluded from aggregated proofs. if self.is_aggregator { - let _ = store::on_gossip_attestation(&mut self.store, &signed_attestation) + let _ = store::on_gossip_attestation(&mut self.store, &signed_attestation, true) .inspect_err(|err| { warn!(%slot, %validator_id, %err, "Self-delivery of attestation failed") }); @@ -471,11 +471,7 @@ impl BlockChainServer { } fn on_gossip_attestation(&mut self, attestation: &SignedAttestation) { - if !self.is_aggregator { - warn!("Received unaggregated attestation but node is not an aggregator"); - return; - } - let _ = store::on_gossip_attestation(&mut self.store, attestation) + let _ = store::on_gossip_attestation(&mut self.store, attestation, self.is_aggregator) .inspect_err(|err| warn!(%err, "Failed to process gossiped attestation")); } diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 63905c53..04d3c0c6 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -528,11 +528,14 @@ pub fn on_tick( /// Process a gossiped attestation with signature verification. /// -/// Verifies the validator's XMSS signature and stores it for later aggregation -/// at interval 2. Only aggregator nodes receive unaggregated gossip attestations. +/// Every subscriber validates the attestation data and verifies the XMSS +/// signature so invalid messages get caught at the edge. Only aggregators +/// store the signature for later aggregation at interval 2; non-aggregators +/// drop it after verification. pub fn on_gossip_attestation( store: &mut Store, signed_attestation: &SignedAttestation, + is_aggregator: bool, ) -> Result<(), StoreError> { let validator_id = signed_attestation.validator_id; let attestation = Attestation { @@ -570,10 +573,13 @@ pub fn on_gossip_attestation( } metrics::inc_pq_sig_attestation_signatures_valid(); - // Store gossip signature unconditionally for later aggregation at interval 2. - // Subnet filtering is handled at the P2P subscription layer. - store.insert_gossip_signature(hashed, validator_id, signature); - metrics::update_gossip_signatures(store.gossip_signatures_count()); + // Only aggregators persist the signature for later aggregation at + // interval 2. Non-aggregators drop the validated attestation — they + // still participate in the mesh so peers see the message propagate. + if is_aggregator { + store.insert_gossip_signature(hashed, validator_id, signature); + metrics::update_gossip_signatures(store.gossip_signatures_count()); + } metrics::inc_attestations_valid(1); diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 134efe28..0298a54a 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -206,48 +206,40 @@ pub fn build_swarm( .subscribe(&aggregation_topic) .unwrap(); - // Aggregators subscribe to attestation subnets to receive unaggregated - // attestations. Non-aggregators don't need to subscribe; they publish via - // gossipsub fanout. + // Subscribe to attestation subnets per leanSpec (`src/lean_spec/__main__.py`): + // every validator subscribes to its own subnet for mesh health; aggregators + // additionally subscribe to explicit `aggregate_subnet_ids` and fall back to + // subnet 0 when they have no validators of their own. + let validator_subnets: HashSet = config + .validator_ids + .iter() + .map(|vid| vid % config.attestation_committee_count) + .collect(); + + // The committee metric should reflect validator membership only, not + // aggregator-only subscriptions. + let metric_subnet = validator_subnets.iter().copied().min().unwrap_or(0); + metrics::set_attestation_committee_subnet(metric_subnet); + + let mut subscription_subnets = validator_subnets; if config.is_aggregator { - let mut aggregate_subnets: HashSet = config - .validator_ids - .iter() - .map(|vid| vid % config.attestation_committee_count) - .collect(); if let Some(ref explicit_ids) = config.aggregate_subnet_ids { - aggregate_subnets.extend(explicit_ids); - } - // Aggregator with no validators and no explicit subnets: fallback to subnet 0 - if aggregate_subnets.is_empty() { - aggregate_subnets.insert(0); + subscription_subnets.extend(explicit_ids); } - for &subnet_id in &aggregate_subnets { - let topic = attestation_subnet_topic(subnet_id); - swarm.behaviour_mut().gossipsub.subscribe(&topic)?; - info!(subnet_id, "Subscribed to attestation subnet"); + // Fall back to subnet 0 only when the aggregator has no validators + // and no explicit subnets — otherwise leave the set as configured. + if subscription_subnets.is_empty() { + subscription_subnets.insert(0); } } - // Build topic cache (avoids string allocation on every publish). - // Includes validator subnets and any explicit aggregate_subnet_ids. let mut attestation_topics: HashMap = HashMap::new(); - for &vid in &config.validator_ids { - let subnet_id = vid % config.attestation_committee_count; - attestation_topics - .entry(subnet_id) - .or_insert_with(|| attestation_subnet_topic(subnet_id)); + for &subnet_id in &subscription_subnets { + let topic = attestation_subnet_topic(subnet_id); + swarm.behaviour_mut().gossipsub.subscribe(&topic)?; + info!(subnet_id, "Subscribed to attestation subnet"); + attestation_topics.insert(subnet_id, topic); } - if let Some(ref explicit_ids) = config.aggregate_subnet_ids { - for &subnet_id in explicit_ids { - attestation_topics - .entry(subnet_id) - .or_insert_with(|| attestation_subnet_topic(subnet_id)); - } - } - - let metric_subnet = attestation_topics.keys().copied().min().unwrap_or(0); - metrics::set_attestation_committee_subnet(metric_subnet); info!(socket=%config.listening_socket, "P2P node started");