Skip to content

Commit

Permalink
VC: Add pruning slashing database. (#5551)
Browse files Browse the repository at this point in the history
* Add slashing database pruning to VC.
Fix GetBlockHeaderResponse object declaration (spec has been changed).

* Switch to getFinalizedBlockHeader instead.

* Fix proper sign.
Add statements.
Show pruning log statement only when pruning happens.

* Optimize and remove debugging helpers.
  • Loading branch information
cheatfate committed Nov 6, 2023
1 parent eb7c8b7 commit 49c8511
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 2 deletions.
1 change: 1 addition & 0 deletions beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ type
DataRootEnclosedObject |
DataOptimisticObject |
DataVersionEnclosedObject |
DataOptimisticAndFinalizedObject |
GetBlockV2Response |
GetDistributedKeystoresResponse |
GetKeystoresResponse |
Expand Down
7 changes: 6 additions & 1 deletion beacon_chain/spec/eth2_apis/rest_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,11 @@ type
data*: T
execution_optimistic*: Option[bool]

DataOptimisticAndFinalizedObject*[T] = object
data*: T
execution_optimistic*: Option[bool]
finalized*: Option[bool]

ForkedSignedBlockHeader* = object
message*: uint32 # message offset
signature*: ValidatorSig
Expand Down Expand Up @@ -515,7 +520,7 @@ type
GetAggregatedAttestationResponse* = DataEnclosedObject[Attestation]
GetAttesterDutiesResponse* = DataRootEnclosedObject[seq[RestAttesterDuty]]
GetBlockAttestationsResponse* = DataEnclosedObject[seq[Attestation]]
GetBlockHeaderResponse* = DataOptimisticObject[RestBlockHeaderInfo]
GetBlockHeaderResponse* = DataOptimisticAndFinalizedObject[RestBlockHeaderInfo]
GetBlockHeadersResponse* = DataEnclosedObject[seq[RestBlockHeaderInfo]]
GetBlockRootResponse* = DataOptimisticObject[RestRoot]
GetDebugChainHeadsResponse* = DataEnclosedObject[seq[RestChainHead]]
Expand Down
89 changes: 89 additions & 0 deletions beacon_chain/validator_client/api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2570,3 +2570,92 @@ proc getValidatorsLiveness*(
res

return GetValidatorsLivenessResponse(data: response)

proc getFinalizedBlockHeader*(
vc: ValidatorClientRef,
): Future[Opt[GetBlockHeaderResponse]] {.async.} =
const RequestName = "getFinalizedBlockHeader"

let
blockIdent = BlockIdent.init(BlockIdentType.Finalized)
resp = vc.onceToAll(RestPlainResponse,
SlotDuration,
ViableNodeStatus,
{BeaconNodeRole.Duties},
getBlockHeaderPlain(it, blockIdent))
case resp.status
of ApiOperation.Timeout:
debug "Unable to obtain finalized block header in time",
timeout = SlotDuration
return Opt.none(GetBlockHeaderResponse)
of ApiOperation.Interrupt:
debug "Finalized block header request was interrupted"
return Opt.none(GetBlockHeaderResponse)
of ApiOperation.Failure:
debug "Unexpected error happened while trying to get finalized block header"
return Opt.none(GetBlockHeaderResponse)
of ApiOperation.Success:
var oldestBlockHeader: GetBlockHeaderResponse
var oldestEpoch: Opt[Epoch]
for apiResponse in resp.data:
if apiResponse.data.isErr():
debug "Unable to get finalized block header",
endpoint = apiResponse.node, error = apiResponse.data.error
else:
let response = apiResponse.data.get()
case response.status
of 200:
let res = decodeBytes(GetBlockHeaderResponse,
response.data, response.contentType)
if res.isOk():
let
rdata = res.get()
epoch = rdata.data.header.message.slot.epoch()
if oldestEpoch.get(FAR_FUTURE_EPOCH) > epoch:
oldestEpoch = Opt.some(epoch)
oldestBlockHeader = rdata
else:
let failure = ApiNodeFailure.init(
ApiFailure.UnexpectedResponse, RequestName,
apiResponse.node, response.status, $res.error)
# We do not update beacon node's status anymore because of
# issue #5377.
debug ResponseDecodeError, reason = getFailureReason(failure)
continue
of 400:
let failure = ApiNodeFailure.init(
ApiFailure.Invalid, RequestName,
apiResponse.node, response.status, response.getErrorMessage())
# We do not update beacon node's status anymore because of
# issue #5377.
debug ResponseInvalidError, reason = getFailureReason(failure)
continue
of 404:
let failure = ApiNodeFailure.init(
ApiFailure.NotFound, RequestName,
apiResponse.node, response.status, response.getErrorMessage())
# We do not update beacon node's status anymore because of
# issue #5377.
debug ResponseNotFoundError, reason = getFailureReason(failure)
continue
of 500:
let failure = ApiNodeFailure.init(
ApiFailure.Internal, RequestName,
apiResponse.node, response.status, response.getErrorMessage())
# We do not update beacon node's status anymore because of
# issue #5377.
debug ResponseInternalError, reason = getFailureReason(failure)
continue
else:
let failure = ApiNodeFailure.init(
ApiFailure.UnexpectedCode, RequestName,
apiResponse.node, response.status, response.getErrorMessage())
# We do not update beacon node's status anymore because of
# issue #5377.
debug ResponseUnexpectedError, reason = getFailureReason(failure)
continue

if oldestEpoch.isSome():
return Opt.some(oldestBlockHeader)
else:
return Opt.none(GetBlockHeaderResponse)
3 changes: 3 additions & 0 deletions beacon_chain/validator_client/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ type
DutiesServiceRef* = ref object of ClientServiceRef
pollingAttesterDutiesTask*: Future[void]
pollingSyncDutiesTask*: Future[void]
pruneSlashingDatabaseTask*: Future[void]
syncSubscriptionEpoch*: Opt[Epoch]
lastSlashingEpoch*: Opt[Epoch]

FallbackServiceRef* = ref object of ClientServiceRef
changesEvent*: AsyncEvent
Expand Down Expand Up @@ -229,6 +231,7 @@ type
blocksSeen*: Table[Slot, BlockDataItem]
rootsSeen*: Table[Eth2Digest, Slot]
processingDelay*: Opt[Duration]
finalizedEpoch*: Opt[Epoch]
rng*: ref HmacDrbgContext

ApiStrategyKind* {.pure.} = enum
Expand Down
75 changes: 74 additions & 1 deletion beacon_chain/validator_client/duties_service.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ logScope: service = ServiceName
type
DutiesServiceLoop* = enum
AttesterLoop, ProposerLoop, IndicesLoop, SyncCommitteeLoop,
ProposerPreparationLoop, ValidatorRegisterLoop, DynamicValidatorsLoop
ProposerPreparationLoop, ValidatorRegisterLoop, DynamicValidatorsLoop,
SlashPruningLoop

chronicles.formatIt(DutiesServiceLoop):
case it
Expand All @@ -30,6 +31,7 @@ chronicles.formatIt(DutiesServiceLoop):
of ProposerPreparationLoop: "proposer_prepare_loop"
of ValidatorRegisterLoop: "validator_register_loop"
of DynamicValidatorsLoop: "dynamic_validators_loop"
of SlashPruningLoop: "slashing_pruning_loop"

proc checkDuty(duty: RestAttesterDuty): bool =
(duty.committee_length <= MAX_VALIDATORS_PER_COMMITTEE) and
Expand Down Expand Up @@ -677,6 +679,70 @@ proc syncCommitteeDutiesLoop(service: DutiesServiceRef) {.async.} =
# Spawning new attestation duties task.
service.pollingSyncDutiesTask = service.pollForSyncCommitteeDuties()

proc getNextEpochMiddleSlot(vc: ValidatorClientRef): Slot =
let
middleSlot = Slot(SLOTS_PER_EPOCH div 2)
currentSlot = vc.beaconClock.now().slotOrZero()
slotInEpoch = currentSlot.since_epoch_start()

if slotInEpoch >= middleSlot:
(currentSlot.epoch + 1'u64).start_slot() + uint64(middleSlot)
else:
currentSlot + (uint64(middleSlot) - uint64(slotInEpoch))

proc pruneSlashingDatabase(service: DutiesServiceRef) {.async.} =
let
vc = service.client
currentSlot = vc.beaconClock.now().slotOrZero()
startTime = Moment.now()
blockHeader =
try:
await vc.getFinalizedBlockHeader()
except CancelledError as exc:
debug "Finalized block header request was interrupted",
slot = currentSlot
raise exc
except CatchableError as exc:
error "Unexpected error occured while requesting " &
"finalized block header", slot = currentSlot,
err_name = exc.name, err_msg = exc.msg
Opt.none(GetBlockHeaderResponse)
checkpointTime = Moment.now()
if blockHeader.isSome():
let epoch = blockHeader.get().data.header.message.slot.epoch
vc.finalizedEpoch = Opt.some(epoch)
if service.lastSlashingEpoch.get(FAR_FUTURE_EPOCH) != epoch:
vc.attachedValidators[]
.slashingProtection
.pruneAfterFinalization(epoch)
service.lastSlashingEpoch = Opt.some(epoch)
let finishTime = Moment.now()
debug "Slashing database has been pruned", slot = currentSlot,
epoch = currentSlot.epoch(),
finalized_epoch = epoch,
elapsed_time = (finishTime - startTime),
pruning_time = (finishTime - checkpointTime)

proc slashingDatabasePruningLoop(service: DutiesServiceRef) {.async.} =
let vc = service.client
debug "Slashing database pruning loop is waiting for initialization"
await allFutures(
vc.preGenesisEvent.wait(),
vc.indicesAvailable.wait(),
vc.forksAvailable.wait()
)
doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point")
while true:
let slot = await vc.checkedWaitForSlot(vc.getNextEpochMiddleSlot(),
aggregateSlotOffset, false)
if slot.isNone():
continue

if not(isNil(service.pruneSlashingDatabaseTask)) and
not(service.pruneSlashingDatabaseTask.finished()):
await cancelAndWait(service.pruneSlashingDatabaseTask)
service.pruneSlashingDatabaseTask = service.pruneSlashingDatabase()

template checkAndRestart(serviceLoop: DutiesServiceLoop,
future: Future[void], body: untyped): untyped =
if future.finished():
Expand Down Expand Up @@ -715,6 +781,7 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
else:
debug "Dynamic validators update loop disabled"
@[]
slashPruningFut = service.slashingDatabasePruningLoop()
web3SignerUrls = vc.config.web3SignerUrls

while true:
Expand All @@ -729,6 +796,7 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
FutureBase(indicesFut),
FutureBase(syncFut),
FutureBase(prepareFut),
FutureBase(slashPruningFut)
]
for fut in dynamicFuts:
futures.add fut
Expand All @@ -749,6 +817,8 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
service.dynamicValidatorsLoop(
web3SignerUrls[i],
vc.config.web3signerUpdateInterval))
checkAndRestart(SlashPruningLoop, slashPruningFut,
service.slashingDatabasePruningLoop())
false
except CancelledError:
debug "Service interrupted"
Expand All @@ -774,6 +844,9 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
if not(isNil(service.pollingSyncDutiesTask)) and
not(service.pollingSyncDutiesTask.finished()):
pending.add(service.pollingSyncDutiesTask.cancelAndWait())
if not(isNil(service.pruneSlashingDatabaseTask)) and
not(service.pruneSlashingDatabaseTask.finished()):
pending.add(service.pruneSlashingDatabaseTask.cancelAndWait())
await allFutures(pending)
true
except CatchableError as exc:
Expand Down

0 comments on commit 49c8511

Please sign in to comment.