Skip to content
5 changes: 5 additions & 0 deletions validator_client/src/block_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,17 +407,22 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
)
.await?;

let signing_timer = metrics::start_timer(&metrics::BLOCK_SIGNING_TIMES);
let signed_block = self_ref
.validator_store
.sign_block::<Payload>(*validator_pubkey_ref, block, current_slot)
.await
.map_err(|e| BlockError::Recoverable(format!("Unable to sign block: {:?}", e)))?;
let signing_time_ms =
Duration::from_secs_f64(signing_timer.map_or(0.0, |t| t.stop_and_record())).as_millis();

info!(
log,
"Publishing signed block";
"slot" => slot.as_u64(),
"signing_time_ms" => signing_time_ms,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice addition!

);

// Publish block with first available beacon node.
self.beacon_nodes
.first_success(
Expand Down
189 changes: 163 additions & 26 deletions validator_client/src/duties_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ use crate::{
};
use environment::RuntimeContext;
use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId};
use futures::future::join_all;
use futures::{stream, StreamExt};
use parking_lot::RwLock;
use safe_arith::ArithError;
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use sync::poll_sync_committee_duties;
use sync::SyncDutiesMap;
use tokio::{sync::mpsc::Sender, time::sleep};
Expand All @@ -40,6 +41,14 @@ const SUBSCRIPTION_BUFFER_SLOTS: u64 = 2;
/// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch.
const HISTORICAL_DUTIES_EPOCHS: u64 = 2;

/// Compute attestation selection proofs this many slots before they are required.
///
/// At start-up selection proofs will be computed with less lookahead out of necessity.
const SELECTION_PROOF_SLOT_LOOKAHEAD: u64 = 8;

/// Fraction of a slot at which selection proof signing should happen (2 means half way).
const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2;

/// Minimum number of validators for which we auto-enable per-validator metrics.
/// For validators greater than this value, we need to manually set the `enable-per-validator-metrics`
/// flag in the cli to enable collection of per validator metrics.
Expand Down Expand Up @@ -71,7 +80,7 @@ pub struct DutyAndProof {

impl DutyAndProof {
/// Instantiate `Self`, computing the selection proof as well.
pub async fn new<T: SlotClock + 'static, E: EthSpec>(
pub async fn new_with_selection_proof<T: SlotClock + 'static, E: EthSpec>(
duty: AttesterData,
validator_store: &ValidatorStore<T, E>,
spec: &ChainSpec,
Expand Down Expand Up @@ -99,6 +108,14 @@ impl DutyAndProof {
selection_proof,
})
}

/// Create a new `DutyAndProof` with the selection proof waiting to be filled in.
pub fn new_without_selection_proof(duty: AttesterData) -> Self {
Self {
duty,
selection_proof: None,
}
}
}

/// To assist with readability, the dependent root for attester/proposer duties.
Expand Down Expand Up @@ -471,7 +488,7 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
/// 3. Push out any attestation subnet subscriptions to the BN.
/// 4. Prune old entries from `duties_service.attesters`.
async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
duties_service: &DutiesService<T, E>,
duties_service: &Arc<DutiesService<T, E>>,
) -> Result<(), Error> {
let current_epoch_timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
Expand Down Expand Up @@ -634,7 +651,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
/// For the given `local_indices` and `local_pubkeys`, download the duties for the given `epoch` and
/// store them in `duties_service.attesters`.
async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
duties_service: &DutiesService<T, E>,
duties_service: &Arc<DutiesService<T, E>>,
epoch: Epoch,
local_indices: &[u64],
local_pubkeys: &HashSet<PublicKeyBytes>,
Expand Down Expand Up @@ -742,31 +759,16 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
"num_new_duties" => new_duties.len(),
);

// Produce the `DutyAndProof` messages in parallel.
let duty_and_proof_results = join_all(new_duties.into_iter().map(|duty| {
DutyAndProof::new(duty, &duties_service.validator_store, &duties_service.spec)
}))
.await;

// Update the duties service with the new `DutyAndProof` messages.
let mut attesters = duties_service.attesters.write();
let mut already_warned = Some(());
for result in duty_and_proof_results {
let duty_and_proof = match result {
Ok(duty_and_proof) => duty_and_proof,
Err(e) => {
error!(
log,
"Failed to produce duty and proof";
"error" => ?e,
"msg" => "may impair attestation duties"
);
// Do not abort the entire batch for a single failure.
continue;
}
};
for duty in &new_duties {
let attester_map = attesters.entry(duty.pubkey).or_default();

let attester_map = attesters.entry(duty_and_proof.duty.pubkey).or_default();
// Create initial entries in the map without selection proofs. We'll compute them in the
// background later to avoid creating a thundering herd of signing threads whenever new
// duties are computed.
let duty_and_proof = DutyAndProof::new_without_selection_proof(duty.clone());

if let Some((prior_dependent_root, _)) =
attester_map.insert(epoch, (dependent_root, duty_and_proof))
Expand All @@ -785,9 +787,144 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
}
drop(attesters);

// Spawn the background task to compute selection proofs.
let subservice = duties_service.clone();
duties_service.context.executor.spawn(
async move {
fill_in_selection_proofs(subservice, new_duties, dependent_root).await;
},
"duties_service_selection_proofs_background",
);

Ok(())
}

/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map.
///
/// Duties are computed in batches each slot. If a re-org is detected then the process will
/// terminate early as it is assumed the selection proofs from `duties` are no longer relevant.
async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
duties_service: Arc<DutiesService<T, E>>,
duties: Vec<AttesterData>,
dependent_root: Hash256,
) {
let log = duties_service.context.log();

// Sort duties by slot in a BTreeMap.
let mut duties_by_slot: BTreeMap<Slot, Vec<_>> = BTreeMap::new();

for duty in duties {
duties_by_slot.entry(duty.slot).or_default().push(duty);
}

// At halfway through each slot when nothing else is likely to be getting signed, sign a batch
// of selection proofs and insert them into the duties service `attesters` map.
let slot_clock = &duties_service.slot_clock;
let slot_offset = duties_service.slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM;

while !duties_by_slot.is_empty() {
if let Some(duration) = slot_clock.duration_to_next_slot() {
sleep(duration.saturating_sub(slot_offset)).await;

let Some(current_slot) = slot_clock.now() else {
continue;
};

let lookahead_slot = current_slot + SELECTION_PROOF_SLOT_LOOKAHEAD;

let mut relevant_duties = duties_by_slot.split_off(&lookahead_slot);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The split_off method on BTreeMap is really nice here.

std::mem::swap(&mut relevant_duties, &mut duties_by_slot);

let batch_size = relevant_duties.values().map(Vec::len).sum::<usize>();

if batch_size == 0 {
continue;
}

let timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::ATTESTATION_SELECTION_PROOFS],
);

// Sign selection proofs (serially).
let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten())
.then(|duty| async {
DutyAndProof::new_with_selection_proof(
duty,
&duties_service.validator_store,
&duties_service.spec,
)
.await
})
.collect::<Vec<_>>()
.await;

// Add to attesters store.
let mut attesters = duties_service.attesters.write();
for result in duty_and_proof_results {
let duty_and_proof = match result {
Ok(duty_and_proof) => duty_and_proof,
Err(e) => {
error!(
log,
"Failed to produce duty and proof";
"error" => ?e,
"msg" => "may impair attestation duties"
);
// Do not abort the entire batch for a single failure.
continue;
}
};

let attester_map = attesters.entry(duty_and_proof.duty.pubkey).or_default();
let epoch = duty_and_proof.duty.slot.epoch(E::slots_per_epoch());
match attester_map.entry(epoch) {
hash_map::Entry::Occupied(mut entry) => {
// No need to update duties for which no proof was computed.
let Some(selection_proof) = duty_and_proof.selection_proof else {
continue;
};

let (existing_dependent_root, existing_duty) = entry.get_mut();

if *existing_dependent_root == dependent_root {
// Replace existing proof.
existing_duty.selection_proof = Some(selection_proof);
} else {
// Our selection proofs are no longer relevant due to a reorg, abandon
// this entire background process.
debug!(
log,
"Stopping selection proof background task";
"reason" => "re-org"
);
return;
}
}
hash_map::Entry::Vacant(entry) => {
entry.insert((dependent_root, duty_and_proof));
}
}
}
drop(attesters);

let time_taken_ms =
Duration::from_secs_f64(timer.map_or(0.0, |t| t.stop_and_record())).as_millis();
debug!(
log,
"Computed attestation selection proofs";
"batch_size" => batch_size,
"lookahead_slot" => lookahead_slot,
"time_taken_ms" => time_taken_ms
);
} else {
// Just sleep for one slot if we are unable to read the system clock, this gives
// us an opportunity for the clock to eventually come good.
sleep(duties_service.slot_clock.slot_duration()).await;
}
}
}

/// Download the proposer duties for the current epoch and store them in `duties_service.proposers`.
/// If there are any proposer for this slot, send out a notification to the block proposers.
///
Expand Down
5 changes: 5 additions & 0 deletions validator_client/src/http_metrics/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub const PROPOSER_DUTIES_HTTP_GET: &str = "proposer_duties_http_get";
pub const VALIDATOR_ID_HTTP_GET: &str = "validator_id_http_get";
pub const SUBSCRIPTIONS_HTTP_POST: &str = "subscriptions_http_post";
pub const UPDATE_PROPOSERS: &str = "update_proposers";
pub const ATTESTATION_SELECTION_PROOFS: &str = "attestation_selection_proofs";
pub const SUBSCRIPTIONS: &str = "subscriptions";
pub const LOCAL_KEYSTORE: &str = "local_keystore";
pub const WEB3SIGNER: &str = "web3signer";
Expand Down Expand Up @@ -177,6 +178,10 @@ lazy_static::lazy_static! {
"Duration to obtain a signature",
&["type"]
);
pub static ref BLOCK_SIGNING_TIMES: Result<Histogram> = try_create_histogram(
"vc_block_signing_times_seconds",
"Duration to obtain a signature for a block",
);

pub static ref ATTESTATION_DUTY: Result<IntGaugeVec> = try_create_int_gauge_vec(
"vc_attestation_duty_slot",
Expand Down