diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index ad0f8f3dd47..be526ba04a9 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -349,6 +349,28 @@ func startHandlingForkChoice( } } + if cfg.memoryOverlay && headerHash == cfg.hd.GetNextForkHash() { + log.Info("Flushing in-memory state") + if err := cfg.hd.FlushNextForkState(tx); err != nil { + return nil, err + } + cfg.hd.BeaconRequestList.Remove(requestId) + rawdb.WriteForkchoiceHead(tx, forkChoice.HeadBlockHash) + canonical, err := safeAndFinalizedBlocksAreCanonical(forkChoice, s, tx, cfg) + if err != nil { + log.Warn(fmt.Sprintf("[%s] Fork choice err", s.LogPrefix()), "err", err) + return nil, err + } + if canonical { + cfg.hd.SetPendingPayloadHash(headerHash) + return nil, nil + } else { + return &privateapi.PayloadStatus{ + CriticalError: &privateapi.InvalidForkchoiceStateErr, + }, nil + } + } + cfg.hd.UpdateTopSeenHeightPoS(headerNumber) forkingPoint := uint64(0) if headerNumber > 0 { @@ -362,15 +384,6 @@ func startHandlingForkChoice( } } - if cfg.memoryOverlay && headerHash == cfg.hd.GetNextForkHash() { - log.Info("Flushing in-memory state") - if err := cfg.hd.FlushNextForkState(tx); err != nil { - return nil, err - } - cfg.hd.SetPendingPayloadHash(headerHash) - return nil, nil - } - log.Info(fmt.Sprintf("[%s] Fork choice re-org", s.LogPrefix()), "headerNumber", headerNumber, "forkingPoint", forkingPoint) if requestStatus == engineapi.New { @@ -571,14 +584,14 @@ func verifyAndSaveNewPoSHeader( // TODO(yperbasis): considered non-canonical because some missing headers were downloaded but not canonized // Or it's not a problem because forkChoice is updated frequently? if cfg.memoryOverlay { - status, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, false, cfg.execPayload) + status, latestValidHash, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, false, cfg.execPayload) if criticalError != nil { return &privateapi.PayloadStatus{CriticalError: criticalError}, false, criticalError } success = status == remote.EngineStatus_VALID || status == remote.EngineStatus_ACCEPTED return &privateapi.PayloadStatus{ Status: status, - LatestValidHash: currentHeadHash, + LatestValidHash: latestValidHash, ValidationError: validationError, }, success, nil } @@ -587,14 +600,14 @@ func verifyAndSaveNewPoSHeader( } if cfg.memoryOverlay && (cfg.hd.GetNextForkHash() == (common.Hash{}) || header.ParentHash == cfg.hd.GetNextForkHash()) { - status, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, true, cfg.execPayload) + status, latestValidHash, validationError, criticalError := cfg.hd.ValidatePayload(tx, header, body, true, cfg.execPayload) if criticalError != nil { return &privateapi.PayloadStatus{CriticalError: criticalError}, false, criticalError } success = status == remote.EngineStatus_VALID || status == remote.EngineStatus_ACCEPTED return &privateapi.PayloadStatus{ Status: status, - LatestValidHash: currentHeadHash, + LatestValidHash: latestValidHash, ValidationError: validationError, }, success, nil } diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index 2fe3b984551..759ba81be44 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -1093,10 +1093,16 @@ func abs64(n int64) uint64 { return uint64(n) } -func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body *types.RawBody, store bool, execPayload func(kv.RwTx, *types.Header, *types.RawBody, uint64, []*types.Header, []*types.RawBody) error) (status remote.EngineStatus, validationError error, criticalError error) { +func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body *types.RawBody, store bool, execPayload func(kv.RwTx, *types.Header, *types.RawBody, uint64, []*types.Header, []*types.RawBody) error) (status remote.EngineStatus, latestValidHash common.Hash, validationError error, criticalError error) { hd.lock.Lock() defer hd.lock.Unlock() maxDepth := uint64(16) + + currentHeight := rawdb.ReadCurrentBlockNumber(tx) + if currentHeight == nil { + criticalError = fmt.Errorf("could not read block number.") + return + } if store { // If it is a continuation of the canonical chain we can stack it up. if hd.nextForkState == nil { @@ -1105,17 +1111,17 @@ func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body hd.nextForkState.UpdateTxn(tx) } hd.nextForkHash = header.Hash() - status = remote.EngineStatus_VALID // Let's assemble the side fork chain if we have others building. validationError = execPayload(hd.nextForkState, header, body, 0, nil, nil) if validationError != nil { status = remote.EngineStatus_INVALID + latestValidHash = header.ParentHash + return } - return - } - currentHeight := rawdb.ReadCurrentBlockNumber(tx) - if currentHeight == nil { - criticalError = fmt.Errorf("could not read block number.") + status = remote.EngineStatus_VALID + latestValidHash = header.Hash() + hd.sideForksBlock[latestValidHash] = sideForkBlock{header, body} + hd.cleanupOutdateSideForks(*currentHeight, maxDepth) return } // if the block is not in range of MAX_DEPTH from head then we do not validate it. @@ -1123,8 +1129,6 @@ func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body status = remote.EngineStatus_ACCEPTED return } - // if it is not canonical we validate it as a side fork. - batch := memdb.NewMemoryBatch(tx) // Let's assemble the side fork backwards var foundCanonical bool currentHash := header.ParentHash @@ -1155,17 +1159,26 @@ func (hd *HeaderDownload) ValidatePayload(tx kv.RwTx, header *types.Header, body } hd.sideForksBlock[header.Hash()] = sideForkBlock{header, body} status = remote.EngineStatus_VALID + // if it is not canonical we validate it as a side fork. + batch := memdb.NewMemoryBatch(tx) + defer batch.Close() validationError = execPayload(batch, header, body, unwindPoint, headersChain, bodiesChain) + latestValidHash = header.Hash() if validationError != nil { + latestValidHash = header.ParentHash status = remote.EngineStatus_INVALID } // After the we finished executing, we clean up old forks + hd.cleanupOutdateSideForks(*currentHeight, maxDepth) + return +} + +func (hd *HeaderDownload) cleanupOutdateSideForks(currentHeight uint64, maxDepth uint64) { for hash, sb := range hd.sideForksBlock { - if abs64(int64(*currentHeight)-sb.header.Number.Int64()) > maxDepth { + if abs64(int64(currentHeight)-sb.header.Number.Int64()) > maxDepth { delete(hd.sideForksBlock, hash) } } - return } func (hd *HeaderDownload) FlushNextForkState(tx kv.RwTx) error { @@ -1174,6 +1187,11 @@ func (hd *HeaderDownload) FlushNextForkState(tx kv.RwTx) error { if err := hd.nextForkState.Flush(tx); err != nil { return err } + // If the side fork hash is now becoming canonical we can clean up. + if _, ok := hd.sideForksBlock[hd.nextForkHash]; ok { + delete(hd.sideForksBlock, hd.nextForkHash) + } + hd.nextForkState.Close() hd.nextForkHash = common.Hash{} hd.nextForkState = nil return nil