Skip to content

Commit

Permalink
add drop and sync committee metrics
Browse files Browse the repository at this point in the history
* use storeBlock for processing API blocks
* avoid double block dump
* count all gossip metrics at the same spot
* simplify block broadcast
  • Loading branch information
arnetheduck authored and zah committed Oct 20, 2021
1 parent 47343ff commit bf6ad41
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 157 deletions.
3 changes: 1 addition & 2 deletions beacon_chain/consensus_object_pools/block_clearance.nim
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ proc addResolvedBlock(
merge.TrustedSignedBeaconBlock,
parent: BlockRef, cache: var StateCache,
onBlockAdded: OnPhase0BlockAdded | OnAltairBlockAdded | OnMergeBlockAdded,
stateDataDur, sigVerifyDur,
stateVerifyDur: Duration
stateDataDur, sigVerifyDur, stateVerifyDur: Duration
) =
doAssert getStateField(state.data, slot) == trustedBlock.message.slot,
"state must match block"
Expand Down
10 changes: 5 additions & 5 deletions beacon_chain/gossip_processing/block_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ proc addBlock*(
validationDur = Duration()) =
## Enqueue a Gossip-validated block for consensus verification
# Backpressure:
# There is no backpressure here - producers must wait for the future in the
# BlockEntry to constrain their own processing
# There is no backpressure here - producers must wait for `resfut` to
# constrain their own processing
# Producers:
# - Gossip (when synced)
# - SyncManager (during sync)
Expand Down Expand Up @@ -144,11 +144,11 @@ proc dumpBlock*[T](
else:
discard

proc storeBlock(
proc storeBlock*(
self: var BlockProcessor,
signedBlock: phase0.SignedBeaconBlock | altair.SignedBeaconBlock |
merge.SignedBeaconBlock,
wallSlot: Slot): Result[void, BlockError] =
wallSlot: Slot): Result[BlockRef, BlockError] =
let
attestationPool = self.consensusManager.attestationPool

Expand All @@ -167,7 +167,7 @@ proc storeBlock(
# was pruned from the ForkChoice.
if blck.isErr:
return err(blck.error[1])
ok()
ok(blck.get())

# Event Loop
# ------------------------------------------------------------------------------
Expand Down
95 changes: 71 additions & 24 deletions beacon_chain/gossip_processing/eth2_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,37 @@ import

# Metrics for tracking attestation and beacon block loss
declareCounter beacon_attestations_received,
"Number of beacon chain attestations received by this peer"
"Number of valid unaggregated attestations processed by this node"
declareCounter beacon_attestations_dropped,
"Number of invalid unaggregated attestations dropped by this node", labels = ["reason"]
declareCounter beacon_aggregates_received,
"Number of beacon chain aggregate attestations received by this peer"
"Number of valid aggregated attestations processed by this node"
declareCounter beacon_aggregates_dropped,
"Number of invalid aggregated attestations dropped by this node", labels = ["reason"]
declareCounter beacon_blocks_received,
"Number of beacon chain blocks received by this peer"
"Number of valid blocks processed by this node"
declareCounter beacon_blocks_dropped,
"Number of invalid blocks dropped by this node", labels = ["reason"]
declareCounter beacon_attester_slashings_received,
"Number of beacon chain attester slashings received by this peer"
"Number of valid attester slashings processed by this node"
declareCounter beacon_attester_slashings_dropped,
"Number of invalid attester slashings dropped by this node", labels = ["reason"]
declareCounter beacon_proposer_slashings_received,
"Number of beacon chain proposer slashings received by this peer"
"Number of valid proposer slashings processed by this node"
declareCounter beacon_proposer_slashings_dropped,
"Number of invalid proposer slashings dropped by this node", labels = ["reason"]
declareCounter beacon_voluntary_exits_received,
"Number of beacon chain voluntary exits received by this peer"
"Number of valid voluntary exits processed by this node"
declareCounter beacon_voluntary_exits_dropped,
"Number of invalid voluntary exits dropped by this node", labels = ["reason"]
declareCounter beacon_sync_committee_messages_received,
"Number of valid sync committee messages processed by this node"
declareCounter beacon_sync_committee_messages_dropped,
"Number of invalid sync committee messages dropped by this node", labels = ["reason"]
declareCounter beacon_sync_committee_contributions_received,
"Number of valid sync committee contributions processed by this node"
declareCounter beacon_sync_committee_contributions_dropped,
"Number of invalid sync committee contributions dropped by this node", labels = ["reason"]

const delayBuckets = [2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, Inf]

Expand All @@ -60,6 +80,19 @@ type
## across quick restarts.

Eth2Processor* = object
## The Eth2Processor is the entry point for untrusted message processing -
## when we receive messages from various sources, we pass them to the
## processor for validation and routing - the messages are generally
## validated, and if valid, passed on to the various pools, monitors and
## managers to update the state of the application.
##
## Block processing is special in that part of it is done in the
## `BlockProcessor` instead, via a special block processing queue.
##
## Each validating function generally will do a sanity check on the message
## whose purpose is to quickly filter out spam, then will (usually) delegate
## full validation to the proper manager - finally, metrics and monitoring
## are updated.
doppelGangerDetectionEnabled*: bool

# Local sources of truth for validation
Expand Down Expand Up @@ -158,27 +191,29 @@ proc blockValidator*(
# decoding at this stage, which may be significant
debug "Block received", delay

let blck = self.dag.isValidBeaconBlock(
let v = self.dag.isValidBeaconBlock(
self.quarantine, signedBlock, wallTime, {})

self.blockProcessor[].dumpBlock(signedBlock, blck)

if not blck.isOk:
return blck.error[0]

beacon_blocks_received.inc()
beacon_block_delay.observe(delay.toFloatSeconds())
if v.isErr:
self.blockProcessor[].dumpBlock(signedBlock, v)
beacon_blocks_dropped.inc(1, [$v.error[0]])
return v.error[0]

# Block passed validation - enqueue it for processing. The block processing
# queue is effectively unbounded as we use a freestanding task to enqueue
# the block - this is done so that when blocks arrive concurrently with
# sync, we don't lose the gossip blocks, but also don't block the gossip
# propagation of seemingly good blocks
trace "Block validated"

self.blockProcessor[].addBlock(
ForkedSignedBeaconBlock.init(signedBlock),
validationDur = self.getCurrentBeaconTime() - wallTime)

# Validator monitor registration for blocks is done by the processor
beacon_blocks_received.inc()
beacon_block_delay.observe(delay.toFloatSeconds())

ValidationResult.Accept

proc checkForPotentialDoppelganger(
Expand Down Expand Up @@ -235,21 +270,22 @@ proc attestationValidator*(
self.batchCrypto, attestation, wallTime, subnet_id, checkSignature)
if v.isErr():
debug "Dropping attestation", validationError = v.error
beacon_attestations_dropped.inc(1, [$v.error[0]])
return v.error[0]

# Due to async validation the wallSlot here might have changed
(afterGenesis, wallSlot) = self.getCurrentBeaconTime().toSlot()

beacon_attestations_received.inc()
beacon_attestation_delay.observe(delay.toFloatSeconds())

let (attestation_index, sig) = v.get()
let (attester_index, sig) = v.get()

self[].checkForPotentialDoppelganger(attestation, [attestation_index])
self[].checkForPotentialDoppelganger(attestation, [attester_index])

trace "Attestation validated"
self.attestationPool[].addAttestation(
attestation, [attestation_index], sig, wallSlot)
attestation, [attester_index], sig, wallSlot)

beacon_attestations_received.inc()
beacon_attestation_delay.observe(delay.toFloatSeconds())

return ValidationResult.Accept

Expand Down Expand Up @@ -283,14 +319,12 @@ proc aggregateValidator*(
aggregator_index = signedAggregateAndProof.message.aggregator_index,
selection_proof = signedAggregateAndProof.message.selection_proof,
wallSlot
beacon_aggregates_dropped.inc(1, [$v.error[0]])
return v.error[0]

# Due to async validation the wallSlot here might have changed
(afterGenesis, wallSlot) = self.getCurrentBeaconTime().toSlot()

beacon_aggregates_received.inc()
beacon_aggregate_delay.observe(delay.toFloatSeconds())

let (attesting_indices, sig) = v.get()

self[].checkForPotentialDoppelganger(
Expand All @@ -303,6 +337,9 @@ proc aggregateValidator*(
self.attestationPool[].addAttestation(
signedAggregateAndProof.message.aggregate, attesting_indices, sig, wallSlot)

beacon_aggregates_received.inc()
beacon_aggregate_delay.observe(delay.toFloatSeconds())

return ValidationResult.Accept

proc attesterSlashingValidator*(
Expand All @@ -314,6 +351,7 @@ proc attesterSlashingValidator*(
let v = self.exitPool[].validateAttesterSlashing(attesterSlashing)
if v.isErr:
debug "Dropping attester slashing", validationError = v.error
beacon_attester_slashings_dropped.inc(1, [$v.error[0]])
return v.error[0]

beacon_attester_slashings_received.inc()
Expand All @@ -329,6 +367,7 @@ proc proposerSlashingValidator*(
let v = self.exitPool[].validateProposerSlashing(proposerSlashing)
if v.isErr:
debug "Dropping proposer slashing", validationError = v.error
beacon_proposer_slashings_dropped.inc(1, [$v.error[0]])
return v.error[0]

beacon_proposer_slashings_received.inc()
Expand All @@ -344,6 +383,7 @@ proc voluntaryExitValidator*(
let v = self.exitPool[].validateVoluntaryExit(signedVoluntaryExit)
if v.isErr:
debug "Dropping voluntary exit", validationError = v.error
beacon_voluntary_exits_dropped.inc(1, [$v.error[0]])
return v.error[0]

beacon_voluntary_exits_received.inc()
Expand All @@ -369,11 +409,15 @@ proc syncCommitteeMsgValidator*(
let v = validateSyncCommitteeMessage(self.dag, self.syncCommitteeMsgPool,
syncCommitteeMsg, committeeIdx, wallTime,
checkSignature)
if v.isErr():
if v.isErr:
debug "Dropping sync committee message", validationError = v.error
beacon_sync_committee_messages_dropped.inc(1, [$v.error[0]])
return v.error[0]

trace "Sync committee message validated"

beacon_sync_committee_messages_received.inc()

ValidationResult.Accept

proc syncCommitteeContributionValidator*(
Expand Down Expand Up @@ -401,6 +445,9 @@ proc syncCommitteeContributionValidator*(
validationError = v.error,
selection_proof = contributionAndProof.message.selection_proof,
wallSlot
beacon_sync_committee_contributions_dropped.inc(1, [$v.error[0]])
return v.error[0]

beacon_sync_committee_contributions_received.inc()

ValidationResult.Accept
133 changes: 66 additions & 67 deletions beacon_chain/gossip_processing/gossip_validation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -876,76 +876,75 @@ proc validateSignedContributionAndProof*(

syncCommitteeMsgPool.seenContributionByAuthor.incl msgKey

block:
# [REJECT] The aggregator's validator index is in the declared subcommittee
# of the current sync committee.
# i.e. state.validators[contribution_and_proof.aggregator_index].pubkey in
# get_sync_subcommittee_pubkeys(state, contribution.subcommittee_index).
let
epoch = msg.message.contribution.slot.epoch
fork = dag.forkAtEpoch(epoch)
genesisValidatorsRoot = dag.genesisValidatorsRoot
# [REJECT] The aggregator's validator index is in the declared subcommittee
# of the current sync committee.
# i.e. state.validators[contribution_and_proof.aggregator_index].pubkey in
# get_sync_subcommittee_pubkeys(state, contribution.subcommittee_index).
let
epoch = msg.message.contribution.slot.epoch
fork = dag.forkAtEpoch(epoch)
genesisValidatorsRoot = dag.genesisValidatorsRoot

# [REJECT] The aggregator signature, signed_contribution_and_proof.signature, is valid
if not verify_signed_contribution_and_proof_signature(msg, fork,
genesisValidatorsRoot,
aggregatorPubKey.get()):
return errReject(
"SignedContributionAndProof: aggregator signature fails to verify")

# [REJECT] The contribution_and_proof.selection_proof is a valid signature of the
# SyncAggregatorSelectionData derived from the contribution by the validator with
# index contribution_and_proof.aggregator_index.
if not verify_selection_proof_signature(msg.message, fork,
genesisValidatorsRoot,
aggregatorPubKey.get()):
return errReject(
"SignedContributionAndProof: selection proof signature fails to verify")

# [REJECT] The aggregate signature is valid for the message beacon_block_root
# and aggregate pubkey derived from the participation info in aggregation_bits
# for the subcommittee specified by the contribution.subcommittee_index.
var
committeeAggKey {.noInit.}: AggregatePublicKey
initialized = false
mixedKeys = 0

for validatorPubKey in dag.syncCommitteeParticipants(
msg.message.contribution.slot + 1,
committeeIdx,
msg.message.contribution.aggregation_bits):
let validatorPubKey = validatorPubKey.loadWithCache.get
if not initialized:
initialized = true
committeeAggKey.init(validatorPubKey)
inc mixedKeys
else:
inc mixedKeys
committeeAggKey.aggregate(validatorPubKey)

# [REJECT] The aggregator signature, signed_contribution_and_proof.signature, is valid
if not verify_signed_contribution_and_proof_signature(msg, fork,
genesisValidatorsRoot,
aggregatorPubKey.get()):
return errReject(
"SignedContributionAndProof: aggregator signature fails to verify")

# [REJECT] The contribution_and_proof.selection_proof is a valid signature of the
# SyncAggregatorSelectionData derived from the contribution by the validator with
# index contribution_and_proof.aggregator_index.
if not verify_selection_proof_signature(msg.message, fork,
genesisValidatorsRoot,
aggregatorPubKey.get()):
return errReject(
"SignedContributionAndProof: selection proof signature fails to verify")

# [REJECT] The aggregate signature is valid for the message beacon_block_root
# and aggregate pubkey derived from the participation info in aggregation_bits
# for the subcommittee specified by the contribution.subcommittee_index.
var
committeeAggKey {.noInit.}: AggregatePublicKey
initialized = false
mixedKeys = 0

for validatorPubKey in dag.syncCommitteeParticipants(
msg.message.contribution.slot + 1,
committeeIdx,
msg.message.contribution.aggregation_bits):
let validatorPubKey = validatorPubKey.loadWithCache.get
if not initialized:
initialized = true
committeeAggKey.init(validatorPubKey)
inc mixedKeys
else:
inc mixedKeys
committeeAggKey.aggregate(validatorPubKey)
if not initialized:
# [REJECT] The contribution has participants
# that is, any(contribution.aggregation_bits).
return errReject("SignedContributionAndProof: aggregation bits empty")

if not initialized:
# [REJECT] The contribution has participants
# that is, any(contribution.aggregation_bits).
return errReject("SignedContributionAndProof: aggregation bits empty")
let cookedSignature = msg.message.contribution.signature.load
if cookedSignature.isNone:
return errReject(
"SignedContributionAndProof: aggregate signature fails to load")

if checkSignature and
not verify_sync_committee_message_signature(
epoch, msg.message.contribution.beacon_block_root, fork,
genesisValidatorsRoot, committeeAggKey.finish, cookedSignature.get):
debug "failing_sync_contribution",
slot = msg.message.contribution.slot + 1,
subnet = committeeIdx,
participants = $(msg.message.contribution.aggregation_bits),
mixedKeys

let cookedSignature = msg.message.contribution.signature.load
if cookedSignature.isNone:
return errReject(
"SignedContributionAndProof: aggregate signature fails to load")
return errReject(
"SignedContributionAndProof: aggregate signature fails to verify")

if checkSignature and
not verify_sync_committee_message_signature(
epoch, msg.message.contribution.beacon_block_root, fork,
genesisValidatorsRoot, committeeAggKey.finish, cookedSignature.get):
debug "failing_sync_contribution",
slot = msg.message.contribution.slot + 1,
subnet_id = committeeIdx,
participants = $(msg.message.contribution.aggregation_bits),
mixedKeys

return errReject(
"SignedContributionAndProof: aggregate signature fails to verify")

syncCommitteeMsgPool[].addSyncContribution(msg, cookedSignature.get)
syncCommitteeMsgPool[].addSyncContribution(msg, cookedSignature.get)

ok()

0 comments on commit bf6ad41

Please sign in to comment.