diff --git a/cmd/rpcdaemon/commands/eth_subscribe_test.go b/cmd/rpcdaemon/commands/eth_subscribe_test.go index b7c8531e3d4..24f38220c45 100644 --- a/cmd/rpcdaemon/commands/eth_subscribe_test.go +++ b/cmd/rpcdaemon/commands/eth_subscribe_test.go @@ -40,7 +40,7 @@ func TestEthSubscribe(t *testing.T) { m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed ctx := context.Background() - backendServer := privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, false) + backendServer := privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, false) backendClient := direct.NewEthBackendClientDirect(backendServer) backend := rpcservices.NewRemoteBackend(backendClient, m.DB, snapshotsync.NewBlockReader()) ff := rpchelper.New(ctx, backend, nil, nil, func() {}) diff --git a/cmd/rpcdaemon/rpcdaemontest/test_util.go b/cmd/rpcdaemon/rpcdaemontest/test_util.go index ee53c28a891..f54a8429b29 100644 --- a/cmd/rpcdaemon/rpcdaemontest/test_util.go +++ b/cmd/rpcdaemon/rpcdaemontest/test_util.go @@ -292,7 +292,7 @@ func CreateTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *g ethashApi := apis[1].Service.(*ethash.API) server := grpc.NewServer() - remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, false)) + remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, false)) txpool.RegisterTxpoolServer(server, m.TxPoolGrpcServer) txpool.RegisterMiningServer(server, privateapi.NewMiningServer(ctx, &IsMiningMock{}, ethashApi)) starknet.RegisterCAIROVMServer(server, &starknet.UnimplementedCAIROVMServer{}) diff --git a/cmd/rpcdaemon22/rpcdaemontest/test_util.go b/cmd/rpcdaemon22/rpcdaemontest/test_util.go index ad73c3faad4..a2d6c1eb143 100644 --- a/cmd/rpcdaemon22/rpcdaemontest/test_util.go +++ b/cmd/rpcdaemon22/rpcdaemontest/test_util.go @@ -293,7 +293,7 @@ func CreateTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *g ethashApi := apis[1].Service.(*ethash.API) server := grpc.NewServer() - remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, false)) + remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, false)) txpool.RegisterTxpoolServer(server, m.TxPoolGrpcServer) txpool.RegisterMiningServer(server, privateapi.NewMiningServer(ctx, &IsMiningMock{}, ethashApi)) starknet.RegisterCAIROVMServer(server, &starknet.UnimplementedCAIROVMServer{}) diff --git a/eth/backend.go b/eth/backend.go index 293f3989309..89cd9332846 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -411,8 +411,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere // Initialize ethbackend ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events, - blockReader, chainConfig, backend.sentriesClient.Hd.BeaconRequestList, backend.sentriesClient.Hd.PayloadStatusCh, - assembleBlockPOS, config.Miner.EnabledPOS) + blockReader, chainConfig, assembleBlockPOS, backend.sentriesClient.Hd, config.Miner.EnabledPOS) miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi) if stack.Config().PrivateApiAddr != "" { diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index e8a4b7f863b..8e96b5b9bf6 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -180,7 +180,7 @@ func HeadersPOS( cfg.hd.ClearPendingPayloadHash() cfg.hd.SetPendingPayloadStatus(nil) - var payloadStatus *privateapi.PayloadStatus + var payloadStatus *engineapi.PayloadStatus if forkChoiceInsteadOfNewPayload { payloadStatus, err = startHandlingForkChoice(forkChoiceMessage, requestStatus, requestId, s, u, ctx, tx, cfg, headerInserter) } else { @@ -190,7 +190,7 @@ func HeadersPOS( if err != nil { if requestStatus == engineapi.New { - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err} + cfg.hd.PayloadStatusCh <- engineapi.PayloadStatus{CriticalError: err} } return err } @@ -257,7 +257,7 @@ func startHandlingForkChoice( tx kv.RwTx, cfg HeadersCfg, headerInserter *headerdownload.HeaderInserter, -) (*privateapi.PayloadStatus, error) { +) (*engineapi.PayloadStatus, error) { if cfg.memoryOverlay { defer cfg.forkValidator.ClearWithUnwind(tx, cfg.notifications.Accumulator, cfg.notifications.StateChangesConsumer) } @@ -274,27 +274,17 @@ func startHandlingForkChoice( return nil, err } if canonical { - return &privateapi.PayloadStatus{ + return &engineapi.PayloadStatus{ Status: remote.EngineStatus_VALID, LatestValidHash: currentHeadHash, }, nil } else { - return &privateapi.PayloadStatus{ + return &engineapi.PayloadStatus{ CriticalError: &privateapi.InvalidForkchoiceStateErr, }, nil } } - bad, lastValidHash := cfg.hd.IsBadHeaderPoS(headerHash) - if bad { - log.Warn(fmt.Sprintf("[%s] Fork choice bad head block", s.LogPrefix()), "headerHash", headerHash) - cfg.hd.BeaconRequestList.Remove(requestId) - return &privateapi.PayloadStatus{ - Status: remote.EngineStatus_INVALID, - LatestValidHash: lastValidHash, - }, nil - } - // Header itself may already be in the snapshots, if CL starts off at much earlier state than Erigon header, err := cfg.blockReader.HeaderByHash(ctx, tx, headerHash) if err != nil { @@ -307,33 +297,12 @@ func startHandlingForkChoice( log.Info(fmt.Sprintf("[%s] Fork choice missing header with hash %x", s.LogPrefix(), headerHash)) cfg.hd.SetPoSDownloaderTip(headerHash) schedulePoSDownload(requestId, headerHash, 0 /* header height is unknown, setting to 0 */, s, cfg) - return &privateapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil + return &engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil } cfg.hd.BeaconRequestList.Remove(requestId) headerNumber := header.Number.Uint64() - // If header is canonical, then no reorgs are required - canonicalHash, err := rawdb.ReadCanonicalHash(tx, headerNumber) - if err != nil { - log.Warn(fmt.Sprintf("[%s] Fork choice err (reading canonical hash of %d)", s.LogPrefix(), headerNumber), "err", err) - cfg.hd.BeaconRequestList.Remove(requestId) - return nil, err - } - - if headerHash == canonicalHash { - log.Info(fmt.Sprintf("[%s] Fork choice on previously known block", s.LogPrefix())) - cfg.hd.BeaconRequestList.Remove(requestId) - // Per the Engine API spec: - // Client software MAY skip an update of the forkchoice state and MUST NOT begin a payload build process - // if forkchoiceState.headBlockHash references an ancestor of the head of canonical chain. - // In the case of such an event, client software MUST return - // {payloadStatus: {status: VALID, latestValidHash: forkchoiceState.headBlockHash, validationError: null}, payloadId: null}. - return &privateapi.PayloadStatus{ - Status: remote.EngineStatus_VALID, - LatestValidHash: headerHash, - }, nil - } if cfg.memoryOverlay && headerHash == cfg.forkValidator.ExtendingForkHeadHash() { log.Info("Flushing in-memory state") @@ -350,7 +319,7 @@ func startHandlingForkChoice( cfg.hd.SetPendingPayloadHash(headerHash) return nil, nil } else { - return &privateapi.PayloadStatus{ + return &engineapi.PayloadStatus{ CriticalError: &privateapi.InvalidForkchoiceStateErr, }, nil } @@ -369,7 +338,7 @@ func startHandlingForkChoice( // TODO(yperbasis): what if some bodies are missing and we have to download them? cfg.hd.SetPendingPayloadHash(headerHash) } else { - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{Status: remote.EngineStatus_SYNCING} + cfg.hd.PayloadStatusCh <- engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING} } } @@ -418,7 +387,7 @@ func finishHandlingForkChoice( if !canonical { if cfg.hd.GetPendingPayloadHash() != (common.Hash{}) { - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{ + cfg.hd.PayloadStatusCh <- engineapi.PayloadStatus{ CriticalError: &privateapi.InvalidForkchoiceStateErr, } } @@ -438,7 +407,7 @@ func handleNewPayload( tx kv.RwTx, cfg HeadersCfg, headerInserter *headerdownload.HeaderInserter, -) (*privateapi.PayloadStatus, error) { +) (*engineapi.PayloadStatus, error) { header := block.Header() headerNumber := header.Number.Uint64() headerHash := block.Hash() @@ -446,40 +415,6 @@ func handleNewPayload( log.Debug(fmt.Sprintf("[%s] Handling new payload", s.LogPrefix()), "height", headerNumber, "hash", headerHash) cfg.hd.UpdateTopSeenHeightPoS(headerNumber) - existingCanonicalHash, err := rawdb.ReadCanonicalHash(tx, headerNumber) - if err != nil { - log.Warn(fmt.Sprintf("[%s] New payload err", s.LogPrefix()), "err", err) - cfg.hd.BeaconRequestList.Remove(requestId) - return nil, err - } - - if existingCanonicalHash != (common.Hash{}) && headerHash == existingCanonicalHash { - log.Info(fmt.Sprintf("[%s] New payload: previously received valid header %d", s.LogPrefix(), headerNumber)) - cfg.hd.BeaconRequestList.Remove(requestId) - return &privateapi.PayloadStatus{ - Status: remote.EngineStatus_VALID, - LatestValidHash: headerHash, - }, nil - } - - bad, lastValidHash := cfg.hd.IsBadHeaderPoS(headerHash) - if bad { - log.Warn(fmt.Sprintf("[%s] Previously known bad block", s.LogPrefix()), "height", headerNumber, "hash", headerHash) - } else { - bad, lastValidHash = cfg.hd.IsBadHeaderPoS(header.ParentHash) - if bad { - log.Warn(fmt.Sprintf("[%s] Previously known bad parent", s.LogPrefix()), "height", headerNumber, "hash", headerHash, "parentHash", header.ParentHash) - } - } - if bad { - cfg.hd.BeaconRequestList.Remove(requestId) - cfg.hd.ReportBadHeaderPoS(headerHash, lastValidHash) - return &privateapi.PayloadStatus{ - Status: remote.EngineStatus_INVALID, - LatestValidHash: lastValidHash, - }, nil - } - parent, err := cfg.blockReader.HeaderByHash(ctx, tx, header.ParentHash) if err != nil { return nil, err @@ -488,18 +423,7 @@ func handleNewPayload( log.Info(fmt.Sprintf("[%s] New payload missing parent", s.LogPrefix())) cfg.hd.SetPoSDownloaderTip(headerHash) schedulePoSDownload(requestId, header.ParentHash, headerNumber-1, s, cfg) - return &privateapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil - } - - if headerNumber != parent.Number.Uint64()+1 { - log.Warn(fmt.Sprintf("[%s] Invalid block number", s.LogPrefix()), "headerNumber", headerNumber, "parentNumber", parent.Number.Uint64()) - cfg.hd.BeaconRequestList.Remove(requestId) - cfg.hd.ReportBadHeaderPoS(headerHash, header.ParentHash) - return &privateapi.PayloadStatus{ - Status: remote.EngineStatus_INVALID, - LatestValidHash: header.ParentHash, - ValidationError: errors.New("invalid block number"), - }, nil + return &engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil } cfg.hd.BeaconRequestList.Remove(requestId) @@ -526,7 +450,7 @@ func verifyAndSaveNewPoSHeader( cfg HeadersCfg, block *types.Block, headerInserter *headerdownload.HeaderInserter, -) (response *privateapi.PayloadStatus, success bool, err error) { +) (response *engineapi.PayloadStatus, success bool, err error) { header := block.Header() headerNumber := header.Number.Uint64() headerHash := block.Hash() @@ -534,7 +458,7 @@ func verifyAndSaveNewPoSHeader( if verificationErr := cfg.hd.VerifyHeader(header); verificationErr != nil { log.Warn("Verification failed for header", "hash", headerHash, "height", headerNumber, "err", verificationErr) cfg.hd.ReportBadHeaderPoS(headerHash, header.ParentHash) - return &privateapi.PayloadStatus{ + return &engineapi.PayloadStatus{ Status: remote.EngineStatus_INVALID, LatestValidHash: header.ParentHash, ValidationError: verificationErr, @@ -565,7 +489,7 @@ func verifyAndSaveNewPoSHeader( } else if err := headerInserter.FeedHeaderPoS(tx, header, headerHash); err != nil { return nil, false, err } - return &privateapi.PayloadStatus{ + return &engineapi.PayloadStatus{ Status: status, LatestValidHash: latestValidHash, ValidationError: validationError, @@ -578,7 +502,7 @@ func verifyAndSaveNewPoSHeader( if !canExtendCanonical { log.Info("Side chain", "parentHash", header.ParentHash, "currentHead", currentHeadHash) - return &privateapi.PayloadStatus{Status: remote.EngineStatus_ACCEPTED}, true, nil + return &engineapi.PayloadStatus{Status: remote.EngineStatus_ACCEPTED}, true, nil } // OK, we're on the canonical chain @@ -725,7 +649,7 @@ func forkingPoint( func handleInterrupt(interrupt engineapi.Interrupt, cfg HeadersCfg, tx kv.RwTx, headerInserter *headerdownload.HeaderInserter, useExternalTx bool) (bool, error) { if interrupt != engineapi.None { if interrupt == engineapi.Stopping { - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: errors.New("server is stopping")} + cfg.hd.PayloadStatusCh <- engineapi.PayloadStatus{CriticalError: errors.New("server is stopping")} } if interrupt == engineapi.Synced { verifyAndSaveDownloadedPoSHeaders(tx, cfg, headerInserter) diff --git a/ethdb/privateapi/engine_test.go b/ethdb/privateapi/engine_test.go index 42903819ad9..74b0eab1efa 100644 --- a/ethdb/privateapi/engine_test.go +++ b/ethdb/privateapi/engine_test.go @@ -13,6 +13,7 @@ import ( "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/turbo/engineapi" + "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" "github.com/stretchr/testify/require" ) @@ -89,11 +90,10 @@ func TestMockDownloadRequest(t *testing.T) { require := require.New(t) makeTestDb(ctx, db) - beaconRequestList := engineapi.NewRequestList() - statusCh := make(chan PayloadStatus) + hd := headerdownload.NewHeaderDownload(0, 0, nil, nil) events := NewEvents() - backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, false) + backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, nil, hd, false) var err error var reply *remote.EnginePayloadStatus @@ -104,8 +104,8 @@ func TestMockDownloadRequest(t *testing.T) { done <- true }() - beaconRequestList.WaitForRequest(true) - statusCh <- PayloadStatus{Status: remote.EngineStatus_SYNCING} + hd.BeaconRequestList.WaitForRequest(true) + hd.PayloadStatusCh <- engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING} <-done require.NoError(err) require.Equal(reply.Status, remote.EngineStatus_SYNCING) @@ -148,11 +148,10 @@ func TestMockValidExecution(t *testing.T) { makeTestDb(ctx, db) - beaconRequestList := engineapi.NewRequestList() - statusCh := make(chan PayloadStatus) + hd := headerdownload.NewHeaderDownload(0, 0, nil, nil) events := NewEvents() - backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, false) + backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, nil, hd, false) var err error var reply *remote.EnginePayloadStatus @@ -163,9 +162,9 @@ func TestMockValidExecution(t *testing.T) { done <- true }() - beaconRequestList.WaitForRequest(true) + hd.BeaconRequestList.WaitForRequest(true) - statusCh <- PayloadStatus{ + hd.PayloadStatusCh <- engineapi.PayloadStatus{ Status: remote.EngineStatus_VALID, LatestValidHash: payload3Hash, } @@ -184,11 +183,10 @@ func TestMockInvalidExecution(t *testing.T) { makeTestDb(ctx, db) - beaconRequestList := engineapi.NewRequestList() - statusCh := make(chan PayloadStatus) + hd := headerdownload.NewHeaderDownload(0, 0, nil, nil) events := NewEvents() - backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, false) + backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, nil, hd, false) var err error var reply *remote.EnginePayloadStatus @@ -199,9 +197,9 @@ func TestMockInvalidExecution(t *testing.T) { done <- true }() - beaconRequestList.WaitForRequest(true) + hd.BeaconRequestList.WaitForRequest(true) // Simulate invalid status - statusCh <- PayloadStatus{ + hd.PayloadStatusCh <- engineapi.PayloadStatus{ Status: remote.EngineStatus_INVALID, LatestValidHash: startingHeadHash, } @@ -220,11 +218,10 @@ func TestNoTTD(t *testing.T) { makeTestDb(ctx, db) - beaconRequestList := engineapi.NewRequestList() - statusCh := make(chan PayloadStatus) + hd := headerdownload.NewHeaderDownload(0, 0, nil, nil) events := NewEvents() - backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{}, beaconRequestList, statusCh, nil, false) + backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{}, nil, hd, false) var err error diff --git a/ethdb/privateapi/ethbackend.go b/ethdb/privateapi/ethbackend.go index f4abc963acf..6e1c22b2dfd 100644 --- a/ethdb/privateapi/ethbackend.go +++ b/ethdb/privateapi/ethbackend.go @@ -24,6 +24,7 @@ import ( "github.com/ledgerwatch/erigon/turbo/builder" "github.com/ledgerwatch/erigon/turbo/engineapi" "github.com/ledgerwatch/erigon/turbo/services" + "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" "github.com/ledgerwatch/log/v3" "golang.org/x/exp/slices" "google.golang.org/protobuf/types/known/emptypb" @@ -55,14 +56,12 @@ type EthBackendServer struct { // Block proposing for proof-of-stake payloadId uint64 builders map[uint64]*builder.BlockBuilder - // Send Beacon Chain requests to staged sync - requestList *engineapi.RequestList - // Replies to newPayload & forkchoice requests - statusCh <-chan PayloadStatus + builderFunc builder.BlockBuilderFunc proposing bool lock sync.Mutex // Engine API is asynchronous, we want to avoid CL to call different APIs at the same time logsFilter *LogsFilterAggregator + hd *headerdownload.HeaderDownload } type EthBackend interface { @@ -73,23 +72,12 @@ type EthBackend interface { Peers(ctx context.Context) (*remote.PeersReply, error) } -// This is the status of a newly execute block. -// Hash: Block hash -// Status: block's status -type PayloadStatus struct { - Status remote.EngineStatus - LatestValidHash common.Hash - ValidationError error - CriticalError error -} - func NewEthBackendServer(ctx context.Context, eth EthBackend, db kv.RwDB, events *Events, blockReader services.BlockAndTxnReader, - config *params.ChainConfig, requestList *engineapi.RequestList, statusCh <-chan PayloadStatus, - builderFunc builder.BlockBuilderFunc, proposing bool, + config *params.ChainConfig, builderFunc builder.BlockBuilderFunc, hd *headerdownload.HeaderDownload, proposing bool, ) *EthBackendServer { s := &EthBackendServer{ctx: ctx, eth: eth, events: events, db: db, blockReader: blockReader, config: config, - requestList: requestList, statusCh: statusCh, builders: make(map[uint64]*builder.BlockBuilder), - builderFunc: builderFunc, proposing: proposing, logsFilter: NewLogsFilterAggregator(events), + builders: make(map[uint64]*builder.BlockBuilder), + builderFunc: builderFunc, proposing: proposing, logsFilter: NewLogsFilterAggregator(events), hd: hd, } ch, clean := s.events.AddLogsSubscription() @@ -244,7 +232,7 @@ func (s *EthBackendServer) Block(ctx context.Context, req *remote.BlockRequest) return &remote.BlockReply{BlockRlp: blockRlp, Senders: sendersBytes}, nil } -func convertPayloadStatus(payloadStatus *PayloadStatus) *remote.EnginePayloadStatus { +func convertPayloadStatus(payloadStatus *engineapi.PayloadStatus) *remote.EnginePayloadStatus { reply := remote.EnginePayloadStatus{Status: payloadStatus.Status} if payloadStatus.LatestValidHash != (common.Hash{}) { reply.LatestValidHash = gointerfaces.ConvertHashToH256(payloadStatus.LatestValidHash) @@ -257,7 +245,7 @@ func convertPayloadStatus(payloadStatus *PayloadStatus) *remote.EnginePayloadSta func (s *EthBackendServer) stageLoopIsBusy() bool { for i := 0; i < 20; i++ { - if !s.requestList.IsWaiting() { + if !s.hd.BeaconRequestList.IsWaiting() { // This might happen, for example, in the following scenario: // 1) CL sends NewPayload and immediately after that ForkChoiceUpdated. // 2) We happily process NewPayload and stage loop is at the end. @@ -269,7 +257,7 @@ func (s *EthBackendServer) stageLoopIsBusy() bool { time.Sleep(5 * time.Millisecond) } } - return !s.requestList.IsWaiting() + return !s.hd.BeaconRequestList.IsWaiting() } // EngineNewPayloadV1 validates and possibly executes payload @@ -344,12 +332,21 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E if err != nil { return nil, err } + + tx.Rollback() + if parentTd != nil && parentTd.Cmp(s.config.TerminalTotalDifficulty) < 0 { log.Warn("[NewPayload] TTD not reached yet", "height", header.Number, "hash", common.Hash(blockHash)) return &remote.EnginePayloadStatus{Status: remote.EngineStatus_INVALID, LatestValidHash: gointerfaces.ConvertHashToH256(common.Hash{})}, nil } - tx.Rollback() + possibleStatus, err := s.getPayloadStatusFromHashIfPossible(blockHash, req.BlockNumber, header.ParentHash, true) + if err != nil { + return nil, err + } + if possibleStatus != nil { + return convertPayloadStatus(possibleStatus), nil + } // If another payload is already commissioned then we just reply with syncing if s.stageLoopIsBusy() { // We are still syncing a commissioned payload @@ -360,17 +357,13 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E log.Debug("[NewPayload] stage loop is busy") return &remote.EnginePayloadStatus{Status: remote.EngineStatus_SYNCING}, nil } - - // Lock the thread (We modify shared resources). - log.Debug("[NewPayload] acquiring lock") s.lock.Lock() defer s.lock.Unlock() - log.Debug("[NewPayload] lock acquired") log.Debug("[NewPayload] sending block", "height", header.Number, "hash", common.Hash(blockHash)) - s.requestList.AddPayloadRequest(block) + s.hd.BeaconRequestList.AddPayloadRequest(block) - payloadStatus := <-s.statusCh + payloadStatus := <-s.hd.PayloadStatusCh log.Debug("[NewPayload] got reply", "payloadStatus", payloadStatus) if payloadStatus.CriticalError != nil { @@ -380,6 +373,100 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E return convertPayloadStatus(&payloadStatus), nil } +// Check if we can make out a status from the payload hash/head hash. +func (s *EthBackendServer) getPayloadStatusFromHashIfPossible(blockHash common.Hash, blockNumber uint64, parentHash common.Hash, newPayload bool) (*engineapi.PayloadStatus, error) { + if s.hd == nil { + return nil, nil + } + var prefix string + if newPayload { + prefix = "NewPayload" + } else { + prefix = "ForkChoiceUpdated" + } + tx, err := s.db.BeginRo(s.ctx) + if err != nil { + return nil, err + } + defer tx.Rollback() + + header, err := rawdb.ReadHeaderByHash(tx, blockHash) + if err != nil { + return nil, err + } + var parent *types.Header + if newPayload { + parent, err = rawdb.ReadHeaderByHash(tx, parentHash) + } + if err != nil { + return nil, err + } + + var canonicalHash common.Hash + if header != nil { + canonicalHash, err = rawdb.ReadCanonicalHash(tx, header.Number.Uint64()) + } + if err != nil { + return nil, err + } + + if newPayload && parent != nil && blockNumber != parent.Number.Uint64()+1 { + log.Warn(fmt.Sprintf("[%s] Invalid block number", prefix), "headerNumber", blockNumber, "parentNumber", parent.Number.Uint64()) + s.hd.ReportBadHeaderPoS(blockHash, parent.Hash()) + return &engineapi.PayloadStatus{ + Status: remote.EngineStatus_INVALID, + LatestValidHash: parent.Hash(), + ValidationError: errors.New("invalid block number"), + }, nil + } + // Check if we already determined if the hash is attributed to a previously received invalid header. + bad, lastValidHash := s.hd.IsBadHeaderPoS(blockHash) + if bad { + log.Warn(fmt.Sprintf("[%s] Previously known bad block", prefix), "hash", blockHash) + } else if newPayload { + bad, lastValidHash = s.hd.IsBadHeaderPoS(parentHash) + if bad { + log.Warn(fmt.Sprintf("[%s] Previously known bad block", prefix), "hash", blockHash, "parentHash", parentHash) + } + } + if bad { + s.hd.ReportBadHeaderPoS(blockHash, lastValidHash) + return &engineapi.PayloadStatus{Status: remote.EngineStatus_INVALID, LatestValidHash: lastValidHash}, nil + } + + // If header is already validated or has a missing parent, you can either return VALID or SYNCING. + if newPayload { + if header != nil && canonicalHash == blockHash { + return &engineapi.PayloadStatus{Status: remote.EngineStatus_VALID, LatestValidHash: blockHash}, nil + } + + if parent == nil && s.hd.PosStatus() == headerdownload.Syncing { + return &engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil + } + + return nil, nil + } + + if header == nil { + if s.hd.PosStatus() == headerdownload.Syncing { + return &engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil + + } + return nil, nil + } + + headHash := rawdb.ReadHeadBlockHash(tx) + if err != nil { + return nil, err + } + + if blockHash != headHash && canonicalHash == blockHash { + return &engineapi.PayloadStatus{Status: remote.EngineStatus_VALID, LatestValidHash: blockHash}, nil + } + + return nil, nil +} + // EngineGetPayloadV1 retrieves previously assembled payload (Validators only) func (s *EthBackendServer) EngineGetPayloadV1(ctx context.Context, req *remote.EngineGetPayloadRequest) (*types2.ExecutionPayload, error) { if !s.proposing { @@ -451,6 +538,7 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r return nil, err } defer tx1.Rollback() + td, err := rawdb.ReadTdByHash(tx1, forkChoice.HeadBlockHash) tx1.Rollback() if err != nil { @@ -463,31 +551,38 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r }, nil } - if s.stageLoopIsBusy() { - log.Debug("[ForkChoiceUpdated] stage loop is busy") - return &remote.EngineForkChoiceUpdatedReply{ - PayloadStatus: &remote.EnginePayloadStatus{Status: remote.EngineStatus_SYNCING}, - }, nil + status, err := s.getPayloadStatusFromHashIfPossible(forkChoice.HeadBlockHash, 0, common.Hash{}, false) + if err != nil { + return nil, err } + if status == nil { + if s.stageLoopIsBusy() { + log.Debug("[ForkChoiceUpdated] stage loop is busy") + return &remote.EngineForkChoiceUpdatedReply{ + PayloadStatus: &remote.EnginePayloadStatus{Status: remote.EngineStatus_SYNCING}, + }, nil + } + s.lock.Lock() + defer s.lock.Unlock() - log.Debug("[ForkChoiceUpdated] acquiring lock") - s.lock.Lock() - defer s.lock.Unlock() - log.Debug("[ForkChoiceUpdated] lock acquired") - - log.Debug("[ForkChoiceUpdated] sending forkChoiceMessage", "head", forkChoice.HeadBlockHash) - s.requestList.AddForkChoiceRequest(&forkChoice) + log.Debug("[ForkChoiceUpdated] sending forkChoiceMessage", "head", forkChoice.HeadBlockHash) + s.hd.BeaconRequestList.AddForkChoiceRequest(&forkChoice) - status := <-s.statusCh - log.Debug("[ForkChoiceUpdated] got reply", "payloadStatus", status) + statusRef := <-s.hd.PayloadStatusCh + status = &statusRef + log.Debug("[ForkChoiceUpdated] got reply", "payloadStatus", status) - if status.CriticalError != nil { - return nil, status.CriticalError + if status.CriticalError != nil { + return nil, status.CriticalError + } + } else { + s.lock.Lock() + defer s.lock.Unlock() } // No need for payload building if req.PayloadAttributes == nil || status.Status != remote.EngineStatus_VALID { - return &remote.EngineForkChoiceUpdatedReply{PayloadStatus: convertPayloadStatus(&status)}, nil + return &remote.EngineForkChoiceUpdatedReply{PayloadStatus: convertPayloadStatus(status)}, nil } if !s.proposing { @@ -514,7 +609,7 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r log.Warn("Skipping payload building because forkchoiceState.headBlockHash is not the head of the canonical chain", "forkChoice.HeadBlockHash", forkChoice.HeadBlockHash, "headHeader.Hash", headHeader.Hash()) - return &remote.EngineForkChoiceUpdatedReply{PayloadStatus: convertPayloadStatus(&status)}, nil + return &remote.EngineForkChoiceUpdatedReply{PayloadStatus: convertPayloadStatus(status)}, nil } if headHeader.Time >= req.PayloadAttributes.Timestamp { diff --git a/turbo/engineapi/request_list.go b/turbo/engineapi/request_list.go index 11a2bc0ba13..455a38825c0 100644 --- a/turbo/engineapi/request_list.go +++ b/turbo/engineapi/request_list.go @@ -6,10 +6,21 @@ import ( "github.com/emirpasic/gods/maps/treemap" + "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/core/types" ) +// This is the status of a newly execute block. +// Hash: Block hash +// Status: block's status +type PayloadStatus struct { + Status remote.EngineStatus + LatestValidHash common.Hash + ValidationError error + CriticalError error +} + // The message we are going to send to the stage sync in ForkchoiceUpdated type ForkChoiceMessage struct { HeadBlockHash common.Hash diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index 1f9d9f83808..53cea0779bc 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -26,7 +26,6 @@ import ( "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" - "github.com/ledgerwatch/erigon/ethdb/privateapi" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/engineapi" @@ -1134,13 +1133,13 @@ func (hd *HeaderDownload) ClearPendingPayloadHash() { hd.pendingPayloadHash = common.Hash{} } -func (hd *HeaderDownload) GetPendingPayloadStatus() *privateapi.PayloadStatus { +func (hd *HeaderDownload) GetPendingPayloadStatus() *engineapi.PayloadStatus { hd.lock.RLock() defer hd.lock.RUnlock() return hd.pendingPayloadStatus } -func (hd *HeaderDownload) SetPendingPayloadStatus(response *privateapi.PayloadStatus) { +func (hd *HeaderDownload) SetPendingPayloadStatus(response *engineapi.PayloadStatus) { hd.lock.Lock() defer hd.lock.Unlock() hd.pendingPayloadStatus = response diff --git a/turbo/stages/headerdownload/header_data_struct.go b/turbo/stages/headerdownload/header_data_struct.go index 1a097fcf28b..a858f1554fa 100644 --- a/turbo/stages/headerdownload/header_data_struct.go +++ b/turbo/stages/headerdownload/header_data_struct.go @@ -12,7 +12,6 @@ import ( "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/ethdb/privateapi" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/engineapi" "github.com/ledgerwatch/erigon/turbo/services" @@ -305,16 +304,16 @@ type HeaderDownload struct { requestId int posAnchor *Anchor posStatus SyncStatus - posSync bool // Whether the chain is syncing in the PoS mode - headersCollector *etl.Collector // ETL collector for headers - BeaconRequestList *engineapi.RequestList // Requests from ethbackend to staged sync - PayloadStatusCh chan privateapi.PayloadStatus // Responses (validation/execution status) - pendingPayloadHash common.Hash // Header whose status we still should send to PayloadStatusCh - pendingPayloadStatus *privateapi.PayloadStatus // Alternatively, there can be an already prepared response to send to PayloadStatusCh - unsettledForkChoice *engineapi.ForkChoiceMessage // Forkchoice to process after unwind - unsettledHeadHeight uint64 // Height of unsettledForkChoice.headBlockHash - posDownloaderTip common.Hash // See https://hackmd.io/GDc0maGsQeKfP8o2C7L52w - badPoSHeaders map[common.Hash]common.Hash // Invalid Tip -> Last Valid Ancestor + posSync bool // Whether the chain is syncing in the PoS mode + headersCollector *etl.Collector // ETL collector for headers + BeaconRequestList *engineapi.RequestList // Requests from ethbackend to staged sync + PayloadStatusCh chan engineapi.PayloadStatus // Responses (validation/execution status) + pendingPayloadHash common.Hash // Header whose status we still should send to PayloadStatusCh + pendingPayloadStatus *engineapi.PayloadStatus // Alternatively, there can be an already prepared response to send to PayloadStatusCh + unsettledForkChoice *engineapi.ForkChoiceMessage // Forkchoice to process after unwind + unsettledHeadHeight uint64 // Height of unsettledForkChoice.headBlockHash + posDownloaderTip common.Hash // See https://hackmd.io/GDc0maGsQeKfP8o2C7L52w + badPoSHeaders map[common.Hash]common.Hash // Invalid Tip -> Last Valid Ancestor } // HeaderRecord encapsulates two forms of the same header - raw RLP encoding (to avoid duplicated decodings and encodings), and parsed value types.Header @@ -343,7 +342,7 @@ func NewHeaderDownload( DeliveryNotify: make(chan struct{}, 1), QuitPoWMining: make(chan struct{}), BeaconRequestList: engineapi.NewRequestList(), - PayloadStatusCh: make(chan privateapi.PayloadStatus, 1), + PayloadStatusCh: make(chan engineapi.PayloadStatus, 1), headerReader: headerReader, badPoSHeaders: make(map[common.Hash]common.Hash), } diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 774e98fab39..134bdc48ff2 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -530,7 +530,7 @@ func (ms *MockSentry) SendForkChoiceRequest(message *engineapi.ForkChoiceMessage ms.sentriesClient.Hd.BeaconRequestList.AddForkChoiceRequest(message) } -func (ms *MockSentry) ReceivePayloadStatus() privateapi.PayloadStatus { +func (ms *MockSentry) ReceivePayloadStatus() engineapi.PayloadStatus { return <-ms.sentriesClient.Hd.PayloadStatusCh } diff --git a/turbo/stages/sentry_mock_test.go b/turbo/stages/sentry_mock_test.go index bd8e552c1be..6e72276dea8 100644 --- a/turbo/stages/sentry_mock_test.go +++ b/turbo/stages/sentry_mock_test.go @@ -669,11 +669,10 @@ func TestPoSSyncWithInvalidHeader(t *testing.T) { FinalizedBlockHash: invalidTip.Hash(), } m.SendForkChoiceRequest(&forkChoiceMessage) - headBlockHash, err = stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, false, m.UpdateHead, nil) + _, err = stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, false, m.UpdateHead, nil) require.NoError(t, err) - stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) - payloadStatus2 := m.ReceivePayloadStatus() - require.Equal(t, remote.EngineStatus_INVALID, payloadStatus2.Status) - assert.Equal(t, lastValidHeader.Hash(), payloadStatus2.LatestValidHash) + bad, lastValidHash := m.HeaderDownload().IsBadHeaderPoS(invalidTip.Hash()) + assert.True(t, bad) + assert.Equal(t, lastValidHash, lastValidHeader.Hash()) } diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 5280dc8b7e3..e2552990af9 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -23,7 +23,6 @@ import ( "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" - "github.com/ledgerwatch/erigon/ethdb/privateapi" "github.com/ledgerwatch/erigon/p2p" "github.com/ledgerwatch/erigon/turbo/engineapi" "github.com/ledgerwatch/erigon/turbo/services" @@ -35,13 +34,13 @@ import ( func SendPayloadStatus(hd *headerdownload.HeaderDownload, headBlockHash common.Hash, err error) { if pendingPayloadStatus := hd.GetPendingPayloadStatus(); pendingPayloadStatus != nil { if err != nil { - hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err} + hd.PayloadStatusCh <- engineapi.PayloadStatus{CriticalError: err} } else { hd.PayloadStatusCh <- *pendingPayloadStatus } } else if pendingPayloadHash := hd.GetPendingPayloadHash(); pendingPayloadHash != (common.Hash{}) { if err != nil { - hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err} + hd.PayloadStatusCh <- engineapi.PayloadStatus{CriticalError: err} } else { var status remote.EngineStatus if headBlockHash == pendingPayloadHash { @@ -50,7 +49,7 @@ func SendPayloadStatus(hd *headerdownload.HeaderDownload, headBlockHash common.H log.Warn("Failed to execute pending payload", "pendingPayload", pendingPayloadHash, "headBlock", headBlockHash) status = remote.EngineStatus_INVALID } - hd.PayloadStatusCh <- privateapi.PayloadStatus{ + hd.PayloadStatusCh <- engineapi.PayloadStatus{ Status: status, LatestValidHash: headBlockHash, }