Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added proper cleanup when we get notified of new height #4753

Merged
merged 4 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,11 @@ func startHandlingForkChoice(
cfg HeadersCfg,
headerInserter *headerdownload.HeaderInserter,
) (*privateapi.PayloadStatus, error) {
headerHash := forkChoice.HeadBlockHash
log.Debug(fmt.Sprintf("[%s] Handling fork choice", s.LogPrefix()), "headerHash", headerHash)
if cfg.memoryOverlay {
defer cfg.forkValidator.Clear(tx)
defer cfg.forkValidator.ClearWithUnwind(tx)
}
headerHash := forkChoice.HeadBlockHash
log.Debug(fmt.Sprintf("[%s] Handling fork choice", s.LogPrefix()), "headerHash", headerHash)

currentHeadHash := rawdb.ReadHeadHeaderHash(tx)
if currentHeadHash == headerHash { // no-op
Expand Down Expand Up @@ -578,10 +578,11 @@ func verifyAndSaveNewPoSHeader(
forkingHash, err := cfg.blockReader.CanonicalHash(ctx, tx, forkingPoint)

canExtendCanonical := forkingHash == currentHeadHash
canExtendFork := cfg.forkValidator.ExtendingForkHeadHash() == (common.Hash{}) || header.ParentHash == cfg.forkValidator.ExtendingForkHeadHash()

if cfg.memoryOverlay && (canExtendFork || header.ParentHash != currentHeadHash) {
status, latestValidHash, validationError, criticalError := cfg.forkValidator.ValidatePayload(tx, header, body, header.ParentHash == currentHeadHash /* extendCanonical */)
if cfg.memoryOverlay {
extendingHash := cfg.forkValidator.ExtendingForkHeadHash()
extendCanonical := (extendingHash == common.Hash{} && header.ParentHash == currentHeadHash) || extendingHash == header.ParentHash
status, latestValidHash, validationError, criticalError := cfg.forkValidator.ValidatePayload(tx, header, body, extendCanonical)
if criticalError != nil {
return nil, false, criticalError
}
Expand Down Expand Up @@ -665,6 +666,8 @@ func schedulePoSDownload(
}

func verifyAndSaveDownloadedPoSHeaders(tx kv.RwTx, cfg HeadersCfg, headerInserter *headerdownload.HeaderInserter) {
defer cfg.forkValidator.Clear()

var lastValidHash common.Hash
var badChainError error
headerLoadFunc := func(key, value []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error {
Expand Down
25 changes: 22 additions & 3 deletions turbo/engineapi/fork_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ func (fv *ForkValidator) ExtendingForkHeadHash() common.Hash {
// NotifyCurrentHeight is to be called at the end of the stage cycle and repressent the last processed block.
func (fv *ForkValidator) NotifyCurrentHeight(currentHeight uint64) {
fv.currentHeight = currentHeight
// If the head changed,e previous assumptions on head are incorrect now.
if fv.extendingFork != nil {
fv.extendingFork.Rollback()
}
fv.extendingFork = nil
fv.extendingForkHeadHash = common.Hash{}
}

// FlushExtendingFork flush the current extending fork if fcu chooses its head hash as the its forkchoice.
Expand Down Expand Up @@ -169,7 +175,16 @@ func (fv *ForkValidator) ValidatePayload(tx kv.RwTx, header *types.Header, body
// Clear wipes out current extending fork data, this method is called after fcu is called,
// because fcu decides what the head is and after the call is done all the non-chosed forks are
// to be considered obsolete.
func (fv *ForkValidator) Clear(tx kv.RwTx) {
func (fv *ForkValidator) Clear() {
if fv.extendingFork != nil {
fv.extendingFork.Rollback()
}
fv.extendingForkHeadHash = common.Hash{}
fv.extendingFork = nil
}

// Clear wipes out current extending fork data and notify txpool.
func (fv *ForkValidator) ClearWithUnwind(tx kv.RwTx) {
sb, ok := fv.sideForksBlock[fv.extendingForkHeadHash]
// If we did not flush the fork state, then we need to notify the txpool through unwind.
if fv.extendingFork != nil && fv.extendingForkHeadHash != (common.Hash{}) && ok {
Expand All @@ -180,8 +195,7 @@ func (fv *ForkValidator) Clear(tx kv.RwTx) {
}
fv.extendingFork.Rollback()
}
fv.extendingForkHeadHash = common.Hash{}
fv.extendingFork = nil
fv.Clear()
}

// validateAndStorePayload validate and store a payload fork chain if such chain results valid.
Expand All @@ -191,6 +205,11 @@ func (fv *ForkValidator) validateAndStorePayload(tx kv.RwTx, header *types.Heade
if validationError != nil {
latestValidHash = header.ParentHash
status = remote.EngineStatus_INVALID
if fv.extendingFork != nil {
fv.extendingFork.Rollback()
fv.extendingFork = nil
}
fv.extendingForkHeadHash = common.Hash{}
return
}
// If we do not have the body we can recover it from the batch.
Expand Down