diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 3c3f816..55a7f73 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -400,8 +400,8 @@ fn read_validator_keys( validator_keys.insert( idx, ValidatorKeyPair { - attestation_key, - proposal_key, + attestation_key: Some(attestation_key), + proposal_key: Some(proposal_key), }, ); } diff --git a/crates/blockchain/src/key_manager.rs b/crates/blockchain/src/key_manager.rs index 9a9b8b1..828ddf0 100644 --- a/crates/blockchain/src/key_manager.rs +++ b/crates/blockchain/src/key_manager.rs @@ -5,15 +5,28 @@ use ethlambda_types::{ primitives::{H256, HashTreeRoot as _}, signature::{ValidatorSecretKey, ValidatorSignature}, }; -use tracing::info; use crate::metrics; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum KeyRole { + Attestation, + Proposal, +} + /// Error types for KeyManager operations. #[derive(Debug, thiserror::Error)] pub enum KeyManagerError { #[error("Validator key not found for validator_id: {0}")] ValidatorKeyNotFound(u64), + #[error("Key unavailable for validator {0}")] + KeyUnavailable(u64), + #[error("Key not prepared for slot {slot} (validator {validator_id}, {role:?})")] + KeyNotPreparedForSlot { + validator_id: u64, + role: KeyRole, + slot: u32, + }, #[error("Signing error: {0}")] SigningError(String), #[error("Signature conversion error: {0}")] @@ -26,8 +39,8 @@ pub enum KeyManagerError { /// allowing the validator to sign both an attestation and a block proposal /// within the same slot. pub struct ValidatorKeyPair { - pub attestation_key: ValidatorSecretKey, - pub proposal_key: ValidatorSecretKey, + pub attestation_key: Option, + pub proposal_key: Option, } /// Manages validator secret keys for signing attestations and block proposals. @@ -35,7 +48,7 @@ pub struct ValidatorKeyPair { /// Each validator has two independent XMSS keys: one for attestation signing /// and one for block proposal signing. pub struct KeyManager { - keys: HashMap, + pub(crate) keys: HashMap, } impl KeyManager { @@ -79,29 +92,22 @@ impl KeyManager { .keys .get_mut(&validator_id) .ok_or(KeyManagerError::ValidatorKeyNotFound(validator_id))?; + let key = key_pair + .attestation_key + .as_ref() + .ok_or(KeyManagerError::KeyUnavailable(validator_id))?; - // Advance XMSS key preparation window if the slot is outside the current window. - // Each bottom tree covers 65,536 slots; the window holds 2 at a time. - // Multiple advances may be needed if the node was offline for an extended period. - if !key_pair.attestation_key.is_prepared_for(slot) { - info!(validator_id, slot, "Advancing XMSS key preparation window"); - while !key_pair.attestation_key.is_prepared_for(slot) { - let before = key_pair.attestation_key.get_prepared_interval(); - key_pair.attestation_key.advance_preparation(); - if key_pair.attestation_key.get_prepared_interval() == before { - return Err(KeyManagerError::SigningError(format!( - "XMSS key exhausted for validator {validator_id}: \ - slot {slot} is beyond the key's activation interval" - ))); - } - } + if !key.is_prepared_for(slot) { + return Err(KeyManagerError::KeyNotPreparedForSlot { + validator_id, + role: KeyRole::Attestation, + slot, + }); } let signature: ValidatorSignature = { let _timing = metrics::time_pq_sig_attestation_signing(); - key_pair - .attestation_key - .sign(slot, message) + key.sign(slot, message) .map_err(|e| KeyManagerError::SigningError(e.to_string())) }?; metrics::inc_pq_sig_attestation_signatures(); @@ -121,29 +127,20 @@ impl KeyManager { .keys .get_mut(&validator_id) .ok_or(KeyManagerError::ValidatorKeyNotFound(validator_id))?; + let key = key_pair + .proposal_key + .as_ref() + .ok_or(KeyManagerError::KeyUnavailable(validator_id))?; - // Advance XMSS key preparation window if the slot is outside the current window. - // Each bottom tree covers 65,536 slots; the window holds 2 at a time. - // Multiple advances may be needed if the node was offline for an extended period. - if !key_pair.proposal_key.is_prepared_for(slot) { - info!( + if !key.is_prepared_for(slot) { + return Err(KeyManagerError::KeyNotPreparedForSlot { validator_id, - slot, "Advancing XMSS proposal key preparation window" - ); - while !key_pair.proposal_key.is_prepared_for(slot) { - let before = key_pair.proposal_key.get_prepared_interval(); - key_pair.proposal_key.advance_preparation(); - if key_pair.proposal_key.get_prepared_interval() == before { - return Err(KeyManagerError::SigningError(format!( - "XMSS proposal key exhausted for validator {validator_id}: \ - slot {slot} is beyond the key's activation interval" - ))); - } - } + role: KeyRole::Proposal, + slot, + }); } - let signature: ValidatorSignature = key_pair - .proposal_key + let signature: ValidatorSignature = key .sign(slot, message) .map_err(|e| KeyManagerError::SigningError(e.to_string()))?; @@ -189,4 +186,26 @@ mod tests { Err(KeyManagerError::ValidatorKeyNotFound(123)) )); } + + #[test] + fn test_sign_returns_key_unavailable_when_field_is_none() { + let mut keys = HashMap::new(); + keys.insert( + 0, + ValidatorKeyPair { + attestation_key: None, + proposal_key: None, + }, + ); + let mut key_manager = KeyManager::new(keys); + + assert!(matches!( + key_manager.sign_with_attestation_key(0, 0, &H256::default()), + Err(KeyManagerError::KeyUnavailable(0)), + )); + assert!(matches!( + key_manager.sign_with_proposal_key(0, 0, &H256::default()), + Err(KeyManagerError::KeyUnavailable(0)), + )); + } } diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 5e47f2e..5a1dc0f 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -10,15 +10,17 @@ use ethlambda_types::{ attestation::{SignedAggregatedAttestation, SignedAttestation}, block::{BlockSignatures, SignedBlock}, primitives::{H256, HashTreeRoot as _}, + signature::ValidatorSecretKey, }; use crate::aggregation::{ AGGREGATION_DEADLINE, AggregateProduced, AggregationDeadline, AggregationDone, AggregationSession, PRIOR_WORKER_JOIN_TIMEOUT, run_aggregation_worker, }; -use crate::key_manager::ValidatorKeyPair; +use crate::key_manager::{KeyManagerError, KeyRole, ValidatorKeyPair}; use spawned_concurrency::actor; use spawned_concurrency::error::ActorError; +use spawned_concurrency::message::Message; use spawned_concurrency::protocol; use spawned_concurrency::tasks::{Actor, ActorRef, ActorStart, Context, Handler, send_after}; use tokio_util::sync::CancellationToken; @@ -171,14 +173,14 @@ impl BlockChainServer { // Now build and publish the block (after attestations have been accepted) if let Some(validator_id) = proposer_validator_id { - self.propose_block(slot, validator_id); + self.propose_block(slot, validator_id, ctx); } // Produce attestations at interval 1 (all validators including proposer). // Reuse the same snapshot so self-delivery decisions match the rest // of the tick. if interval == 1 { - self.produce_attestations(slot, is_aggregator); + self.produce_attestations(slot, is_aggregator, ctx); } // Update safe target slot metric (updated by store.on_tick at interval 3) @@ -254,7 +256,7 @@ impl BlockChainServer { .find(|&vid| is_proposer(vid, slot, num_validators)) } - fn produce_attestations(&mut self, slot: u64, is_aggregator: bool) { + fn produce_attestations(&mut self, slot: u64, is_aggregator: bool, ctx: &Context) { let _timing = metrics::time_attestations_production(); // Produce attestation data once for all validators @@ -263,14 +265,24 @@ impl BlockChainServer { // For each registered validator, produce and publish attestation for validator_id in self.key_manager.validator_ids() { // Sign the attestation - let Ok(signature) = self + let signature = match self .key_manager .sign_attestation(validator_id, &attestation_data) - .inspect_err( - |err| error!(%slot, %validator_id, %err, "Failed to sign attestation"), - ) - else { - continue; + { + Ok(sig) => sig, + Err(KeyManagerError::KeyNotPreparedForSlot { + role, + slot: target_slot, + .. + }) => { + self.prepare_key_for_slot(validator_id, role, target_slot, ctx); + continue; + } + Err(KeyManagerError::KeyUnavailable(_)) => continue, + Err(err) => { + error!(%slot, %validator_id, %err, "Failed to sign attestation"); + continue; + } }; // Create signed attestation @@ -302,7 +314,7 @@ impl BlockChainServer { } /// Build and publish a block for the given slot and validator. - fn propose_block(&mut self, slot: u64, validator_id: u64) { + fn propose_block(&mut self, slot: u64, validator_id: u64, ctx: &Context) { info!(%slot, %validator_id, "We are the proposer for this slot"); let _timing = metrics::time_block_building(); @@ -318,14 +330,31 @@ impl BlockChainServer { // Sign the block root with the proposal key let block_root = block.hash_tree_root(); - let Ok(proposer_signature) = self - .key_manager - .sign_block_root(validator_id, slot as u32, &block_root) - .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to sign block root")) - else { - metrics::inc_block_building_failures(); - return; - }; + let proposer_signature = + match self + .key_manager + .sign_block_root(validator_id, slot as u32, &block_root) + { + Ok(sig) => sig, + Err(KeyManagerError::KeyNotPreparedForSlot { + role, + slot: target_slot, + .. + }) => { + self.prepare_key_for_slot(validator_id, role, target_slot, ctx); + metrics::inc_block_building_failures(); + return; + } + Err(KeyManagerError::KeyUnavailable(_)) => { + metrics::inc_block_building_failures(); + return; + } + Err(err) => { + error!(%slot, %validator_id, %err, "Failed to sign block root"); + metrics::inc_block_building_failures(); + return; + } + }; // Assemble SignedBlock let signed_block = SignedBlock { @@ -357,6 +386,39 @@ impl BlockChainServer { info!(%slot, %validator_id, "Published block"); } + /// Move the validator's key off the actor onto a `spawn_blocking` worker + /// that runs `advance_preparation` until the prepared window covers `target_slot`. + /// The worker sends back `KeyPreparedForSlot` for the actor + /// to restore the (possibly advanced) key. + fn prepare_key_for_slot( + &mut self, + validator_id: u64, + role: KeyRole, + target_slot: u32, + ctx: &Context, + ) { + let Some(key_pair) = self.key_manager.keys.get_mut(&validator_id) else { + return; + }; + let field = match role { + KeyRole::Attestation => &mut key_pair.attestation_key, + KeyRole::Proposal => &mut key_pair.proposal_key, + }; + let Some(key) = field.take() else { return }; + + info!(%validator_id, ?role, %target_slot, "Preparing XMSS key for slot in background"); + let actor_ref = ctx.actor_ref().clone(); + + tokio::task::spawn_blocking(move || { + let result = key.advance_until_prepared(target_slot); + let _ = actor_ref.send(KeyPreparedForSlot { + validator_id, + role, + key: result, + }); + }); + } + fn process_block(&mut self, signed_block: SignedBlock) -> Result<(), StoreError> { store::on_block(&mut self.store, signed_block)?; let head_slot = self.store.head_slot(); @@ -719,3 +781,40 @@ impl Handler for BlockChainServer { } } } + +/// Worker → actor result for a background XMSS key advance. +/// `key: None` means the activation interval was exhausted. +pub(crate) struct KeyPreparedForSlot { + pub validator_id: u64, + pub role: KeyRole, + pub key: Option, +} +impl Message for KeyPreparedForSlot { + type Result = (); +} + +impl Handler for BlockChainServer { + async fn handle(&mut self, msg: KeyPreparedForSlot, _ctx: &Context) { + let KeyPreparedForSlot { + validator_id, + role, + key, + } = msg; + let Some(key_pair) = self.key_manager.keys.get_mut(&validator_id) else { + return; + }; + match key { + Some(advanced) => { + info!(%validator_id, ?role, "XMSS key advance complete"); + match role { + KeyRole::Attestation => key_pair.attestation_key = Some(advanced), + KeyRole::Proposal => key_pair.proposal_key = Some(advanced), + } + } + None => error!( + %validator_id, ?role, + "XMSS key activation interval exhausted; validator can no longer sign with this key" + ), + } + } +} diff --git a/crates/common/types/src/signature.rs b/crates/common/types/src/signature.rs index 3587476..c95dec3 100644 --- a/crates/common/types/src/signature.rs +++ b/crates/common/types/src/signature.rs @@ -123,6 +123,20 @@ impl ValidatorSecretKey { pub fn advance_preparation(&mut self) { self.inner.advance_preparation(); } + + /// Advance the prepared window until it covers `target_slot`. Returns + /// `Some(self)` on success, or `None` if the activation interval is + /// exhausted (an `advance_preparation` call made no progress). + pub fn advance_until_prepared(mut self, target_slot: u32) -> Option { + while !self.is_prepared_for(target_slot) { + let before = self.get_prepared_interval(); + self.advance_preparation(); + if self.get_prepared_interval() == before { + return None; + } + } + Some(self) + } } #[cfg(test)] @@ -183,4 +197,19 @@ mod tests { result.err().map_or(String::new(), |e| e.to_string()) ); } + + #[test] + #[ignore = "slow: generates production-size XMSS key (~minutes)"] + fn test_advance_until_prepared_advances_then_detects_exhaustion() { + let key = generate_key_with_three_bottom_trees(); + // Initial window covers [0, 2*L). Target lives in tree 2 → one advance reaches it. + let target = 2 * LEAVES_PER_BOTTOM_TREE + 100; + let advanced = key.advance_until_prepared(target).expect("should advance"); + assert!(advanced.is_prepared_for(target)); + + // 3-tree key cannot reach beyond 3*L; from `advanced`, attempting to go + // further hits exhaustion (advance_preparation makes no progress). + let beyond_lifetime = 4 * LEAVES_PER_BOTTOM_TREE; + assert!(advanced.advance_until_prepared(beyond_lifetime).is_none()); + } }