From af661a9459d997f09b8f514daa25e46b00dce9b8 Mon Sep 17 00:00:00 2001 From: Andrew Ashikhmin <34320705+yperbasis@users.noreply.github.com> Date: Thu, 14 Jul 2022 10:03:15 +0100 Subject: [PATCH] Fix canExtendCanonical when some headers are downloaded (#4709) * Fix canExtendCanonical when some headers are downloaded * Restore original logic for forkValidator.ValidatePayload * Check FCU status --- eth/stagedsync/stage_headers.go | 58 +++++++++++++++++++++----------- turbo/stages/sentry_mock_test.go | 3 ++ 2 files changed, 41 insertions(+), 20 deletions(-) diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index 498ae67ec20..8dfbb1551b5 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -193,7 +193,7 @@ func HeadersPOS( var payloadStatus *privateapi.PayloadStatus var err error if forkChoiceInsteadOfNewPayload { - payloadStatus, err = startHandlingForkChoice(forkChoiceMessage, requestStatus, requestId, s, u, ctx, tx, cfg, headerInserter, cfg.blockReader) + payloadStatus, err = startHandlingForkChoice(forkChoiceMessage, requestStatus, requestId, s, u, ctx, tx, cfg, headerInserter) } else { payloadStatus, err = handleNewPayload(payloadMessage, requestStatus, requestId, s, ctx, tx, cfg, headerInserter) } @@ -267,7 +267,6 @@ func startHandlingForkChoice( tx kv.RwTx, cfg HeadersCfg, headerInserter *headerdownload.HeaderInserter, - headerReader services.HeaderReader, ) (*privateapi.PayloadStatus, error) { headerHash := forkChoice.HeadBlockHash log.Debug(fmt.Sprintf("[%s] Handling fork choice", s.LogPrefix()), "headerHash", headerHash) @@ -307,7 +306,7 @@ func startHandlingForkChoice( } // Header itself may already be in the snapshots, if CL starts off at much earlier state than Erigon - header, err := headerReader.HeaderByHash(ctx, tx, headerHash) + header, err := cfg.blockReader.HeaderByHash(ctx, tx, headerHash) if err != nil { log.Warn(fmt.Sprintf("[%s] Fork choice err (reading header by hash %x)", s.LogPrefix(), headerHash), "err", err) cfg.hd.BeaconRequestList.Remove(requestId) @@ -369,16 +368,9 @@ func startHandlingForkChoice( } cfg.hd.UpdateTopSeenHeightPoS(headerNumber) - forkingPoint := uint64(0) - if headerNumber > 0 { - parent, err := headerReader.Header(ctx, tx, header.ParentHash, headerNumber-1) - if err != nil { - return nil, err - } - forkingPoint, err = headerInserter.ForkingPoint(tx, header, parent) - if err != nil { - return nil, err - } + forkingPoint, err := forkingPoint(ctx, tx, headerInserter, cfg.blockReader, header) + if err != nil { + return nil, err } log.Info(fmt.Sprintf("[%s] Fork choice re-org", s.LogPrefix()), "headerNumber", headerNumber, "forkingPoint", forkingPoint) @@ -549,7 +541,7 @@ func handleNewPayload( } log.Debug(fmt.Sprintf("[%s] New payload begin verification", s.LogPrefix())) - response, success, err := verifyAndSaveNewPoSHeader(requestStatus, s, tx, cfg, header, payloadMessage.Body, headerInserter) + response, success, err := verifyAndSaveNewPoSHeader(requestStatus, s, ctx, tx, cfg, header, payloadMessage.Body, headerInserter) log.Debug(fmt.Sprintf("[%s] New payload verification ended", s.LogPrefix()), "success", success, "err", err) if err != nil || !success { return response, err @@ -566,6 +558,7 @@ func handleNewPayload( func verifyAndSaveNewPoSHeader( requestStatus engineapi.RequestStatus, s *StageState, + ctx context.Context, tx kv.RwTx, cfg HeadersCfg, header *types.Header, @@ -586,17 +579,24 @@ func verifyAndSaveNewPoSHeader( } currentHeadHash := rawdb.ReadHeadHeaderHash(tx) - canExtendCanonical := header.ParentHash == currentHeadHash + + forkingPoint, err := forkingPoint(ctx, tx, headerInserter, cfg.blockReader, header) + if err != nil { + return nil, false, err + } + forkingHash, err := cfg.blockReader.CanonicalHash(ctx, tx, forkingPoint) + + canExtendCanonical := forkingHash == currentHeadHash canExtendFork := cfg.forkValidator.ExtendingForkHeadHash() == (common.Hash{}) || header.ParentHash == cfg.forkValidator.ExtendingForkHeadHash() - if cfg.memoryOverlay && (canExtendFork || !canExtendCanonical) { - status, latestValidHash, validationError, criticalError := cfg.forkValidator.ValidatePayload(tx, header, body, canExtendCanonical) + if cfg.memoryOverlay && (canExtendFork || header.ParentHash != currentHeadHash) { + status, latestValidHash, validationError, criticalError := cfg.forkValidator.ValidatePayload(tx, header, body, header.ParentHash == currentHeadHash /* extendCanonical */) if criticalError != nil { - return &privateapi.PayloadStatus{CriticalError: criticalError}, false, criticalError + return nil, false, criticalError } success = validationError == nil if !success { - log.Warn("Verification failed for header", "hash", headerHash, "height", headerNumber, "err", validationError) + log.Warn("Validation failed for header", "hash", headerHash, "height", headerNumber, "err", validationError) cfg.hd.ReportBadHeaderPoS(headerHash, latestValidHash) } else if err := headerInserter.FeedHeaderPoS(tx, header, headerHash); err != nil { return nil, false, err @@ -613,7 +613,7 @@ func verifyAndSaveNewPoSHeader( } if !canExtendCanonical { - log.Info("Side chain or something weird", "parentHash", header.ParentHash, "currentHead", currentHeadHash) + log.Info("Side chain", "parentHash", header.ParentHash, "currentHead", currentHeadHash) return &privateapi.PayloadStatus{Status: remote.EngineStatus_ACCEPTED}, true, nil } @@ -708,6 +708,24 @@ func verifyAndSaveDownloadedPoSHeaders(tx kv.RwTx, cfg HeadersCfg, headerInserte cfg.hd.SetPosStatus(headerdownload.Idle) } +func forkingPoint( + ctx context.Context, + tx kv.RwTx, + headerInserter *headerdownload.HeaderInserter, + headerReader services.HeaderReader, + header *types.Header, +) (uint64, error) { + headerNumber := header.Number.Uint64() + if headerNumber == 0 { + return 0, nil + } + parent, err := headerReader.Header(ctx, tx, header.ParentHash, headerNumber-1) + if err != nil { + return 0, err + } + return headerInserter.ForkingPoint(tx, header, parent) +} + // HeadersPOW progresses Headers stage for Proof-of-Work headers func HeadersPOW( s *StageState, diff --git a/turbo/stages/sentry_mock_test.go b/turbo/stages/sentry_mock_test.go index f0a9c136efa..9ab7015e06b 100644 --- a/turbo/stages/sentry_mock_test.go +++ b/turbo/stages/sentry_mock_test.go @@ -605,6 +605,7 @@ func TestPoSDownloader(t *testing.T) { headBlockHash, err = stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, false, m.UpdateHead, nil) require.NoError(t, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) + assert.Equal(t, chain.TopBlock.Hash(), headBlockHash) // Point forkChoice to the head forkChoiceMessage := engineapi.ForkChoiceMessage{ @@ -617,6 +618,8 @@ func TestPoSDownloader(t *testing.T) { require.NoError(t, err) stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) + payloadStatus = m.ReceivePayloadStatus() + assert.Equal(t, remote.EngineStatus_VALID, payloadStatus.Status) assert.Equal(t, chain.TopBlock.Hash(), headBlockHash) }