Skip to content

Commit

Permalink
print attestation/aggregate drop notice once per slot (#2475)
Browse files Browse the repository at this point in the history
* add metrics for queue-related drops
* avoid importing beacon node conf in processor
  • Loading branch information
arnetheduck committed Apr 6, 2021
1 parent adec9d8 commit 10d99c1
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 22 deletions.
14 changes: 5 additions & 9 deletions beacon_chain/gossip_processing/eth2_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import
./batch_validation,
../validators/validator_pool,
../beacon_node_types,
../beacon_clock, ../conf, ../ssz/sszdump
../beacon_clock, ../ssz/sszdump

# Metrics for tracking attestation and beacon block loss
declareCounter beacon_attestations_received,
Expand Down Expand Up @@ -52,7 +52,7 @@ declareHistogram beacon_store_block_duration_seconds,

type
Eth2Processor* = object
config*: BeaconNodeConf
doppelGangerDetectionEnabled*: bool
getWallTime*: GetWallTimeFn

# Local sources of truth for validation
Expand Down Expand Up @@ -83,7 +83,7 @@ type
# ------------------------------------------------------------------------------

proc new*(T: type Eth2Processor,
config: BeaconNodeConf,
doppelGangerDetectionEnabled: bool,
verifQueues: ref VerifQueueManager,
chainDag: ChainDAGRef,
attestationPool: ref AttestationPool,
Expand All @@ -93,7 +93,7 @@ proc new*(T: type Eth2Processor,
rng: ref BrHmacDrbgContext,
getWallTime: GetWallTimeFn): ref Eth2Processor =
(ref Eth2Processor)(
config: config,
doppelGangerDetectionEnabled: doppelGangerDetectionEnabled,
getWallTime: getWallTime,
verifQueues: verifQueues,
chainDag: chainDag,
Expand Down Expand Up @@ -182,7 +182,7 @@ proc checkForPotentialDoppelganger(
validatorPubkey,
attestationSlot = attestationData.slot
doppelganger_detection_activated.inc()
if self.config.doppelgangerDetection:
if self.doppelgangerDetectionEnabled:
warn "We believe you are currently running another instance of the same validator. We've disconnected you from the network as this presents a significant slashing risk. Possible next steps are (a) making sure you've disconnected your validator from your old machine before restarting the client; and (b) running the client again with the gossip-slashing-protection option disabled, only if you are absolutely sure this is the only instance of your validator running, and reporting the issue at https://github.com/status-im/nimbus-eth2/issues."
quit QuitFailure

Expand Down Expand Up @@ -277,8 +277,6 @@ proc aggregateValidator*(

return ValidationResult.Accept

{.push raises: [Defect].}

proc attesterSlashingValidator*(
self: var Eth2Processor, attesterSlashing: AttesterSlashing):
ValidationResult =
Expand Down Expand Up @@ -323,5 +321,3 @@ proc voluntaryExitValidator*(
beacon_voluntary_exits_received.inc()

ValidationResult.Accept

{.pop.}
53 changes: 42 additions & 11 deletions beacon_chain/gossip_processing/gossip_to_consensus.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import
../spec/[crypto, datatypes, digest],
../consensus_object_pools/[block_clearance, blockchain_dag, attestation_pool],
./consensus_manager,
../beacon_node_types,
../beacon_clock, ../conf, ../ssz/sszdump
".."/[beacon_clock, beacon_node_types],
../ssz/sszdump

# Gossip Queue Manager
# ------------------------------------------------------------------------------
Expand All @@ -22,6 +22,12 @@ import
declareHistogram beacon_store_block_duration_seconds,
"storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf]

declareCounter beacon_attestations_dropped_queue_full,
"Number of attestations dropped because queue is full"

declareCounter beacon_aggregates_dropped_queue_full,
"Number of aggregates dropped because queue is full"

type
SyncBlock* = object
blk*: SignedBeaconBlock
Expand Down Expand Up @@ -72,7 +78,11 @@ type
# is there a point to separate
# attestations & aggregates here?
attestationsQueue: AsyncQueue[AttestationEntry]
attestationsDropped: int
attestationsDropTime: tuple[afterGenesis: bool, slot: Slot]
aggregatesQueue: AsyncQueue[AggregateEntry]
aggregatesDropped: int
aggregatesDropTime: tuple[afterGenesis: bool, slot: Slot]

# Consumer
# ----------------------------------------------------------------
Expand All @@ -85,13 +95,14 @@ type
# ------------------------------------------------------------------------------

proc new*(T: type VerifQueueManager,
conf: BeaconNodeConf,
dumpEnabled: bool,
dumpDirInvalid, dumpDirIncoming: string,
consensusManager: ref ConsensusManager,
getWallTime: GetWallTimeFn): ref VerifQueueManager =
(ref VerifQueueManager)(
dumpEnabled: conf.dumpEnabled,
dumpDirInvalid: conf.dumpDirInvalid,
dumpDirIncoming: conf.dumpDirIncoming,
dumpEnabled: dumpEnabled,
dumpDirInvalid: dumpDirInvalid,
dumpDirIncoming: dumpDirIncoming,

getWallTime: getWallTime,

Expand All @@ -107,7 +118,9 @@ proc new*(T: type VerifQueueManager,
attestationsQueue: newAsyncQueue[AttestationEntry](
(TARGET_COMMITTEE_SIZE * MAX_COMMITTEES_PER_SLOT).int),

consensusManager: consensusManager
consensusManager: consensusManager,
attestationsDropTime: getWallTime().toSlot(),
aggregatesDropTime: getWallTime().toSlot(),
)

# Sync callbacks
Expand Down Expand Up @@ -154,13 +167,22 @@ proc addAttestation*(self: var VerifQueueManager, att: Attestation, att_indices:
# Producer:
# - Gossip (when synced)
while self.attestationsQueue.full():
self.attestationsDropped += 1
beacon_attestations_dropped_queue_full.inc()

try:
notice "Queue full, dropping oldest attestation",
dropped = shortLog(self.attestationsQueue[0].v)
discard self.attestationsQueue.popFirstNoWait()
except AsyncQueueEmptyError as exc:
raiseAssert "If queue is full, we have at least one item! " & exc.msg

if self.attestationsDropped > 0:
let now = self.getWallTime().toSlot() # Print notice once per slot
if now != self.attestationsDropTime:
notice "Queue full, attestations dropped",
count = self.attestationsDropped
self.attestationsDropTime = now
self.attestationsDropped = 0

try:
self.attestationsQueue.addLastNoWait(
AttestationEntry(v: att, attesting_indices: att_indices))
Expand All @@ -175,13 +197,22 @@ proc addAggregate*(self: var VerifQueueManager, agg: SignedAggregateAndProof, at
# - Gossip (when synced)

while self.aggregatesQueue.full():
self.aggregatesDropped += 1
beacon_aggregates_dropped_queue_full.inc()

try:
notice "Queue full, dropping oldest aggregate",
dropped = shortLog(self.aggregatesQueue[0].v)
discard self.aggregatesQueue.popFirstNoWait()
except AsyncQueueEmptyError as exc:
raiseAssert "We just checked that queue is not full! " & exc.msg

if self.aggregatesDropped > 0:
let now = self.getWallTime().toSlot() # Print notice once per slot
if now != self.aggregatesDropTime:
notice "Queue full, aggregates dropped",
count = self.aggregatesDropped
self.aggregatesDropTime = now
self.aggregatesDropped = 0

try:
self.aggregatesQueue.addLastNoWait(AggregateEntry(
v: agg.message.aggregate,
Expand Down
5 changes: 3 additions & 2 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,11 @@ proc init*(T: type BeaconNode,
chainDag, attestationPool, quarantine
)
verifQueues = VerifQueueManager.new(
config, consensusManager,
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
consensusManager,
proc(): BeaconTime = beaconClock.now())
processor = Eth2Processor.new(
config,
config.doppelgangerDetection,
verifQueues,
chainDag, attestationPool, exitPool, validatorPool,
quarantine,
Expand Down

0 comments on commit 10d99c1

Please sign in to comment.