From 75923b91419d7848cdf8238e5e28e15ead824588 Mon Sep 17 00:00:00 2001 From: giuliorebuffo Date: Fri, 22 Jul 2022 23:01:30 +0200 Subject: [PATCH 1/5] avoid constantly triggering stageloop when using Engine API --- cmd/rpcdaemon/rpcdaemontest/test_util.go | 2 +- eth/backend.go | 2 +- eth/stagedsync/stage_headers.go | 108 ++--------- ethdb/privateapi/ethbackend.go | 172 +++++++++++++----- turbo/engineapi/request_list.go | 11 ++ turbo/stages/headerdownload/header_algos.go | 5 +- .../headerdownload/header_data_struct.go | 23 ++- turbo/stages/mock_sentry.go | 2 +- turbo/stages/stageloop.go | 7 +- 9 files changed, 176 insertions(+), 156 deletions(-) diff --git a/cmd/rpcdaemon/rpcdaemontest/test_util.go b/cmd/rpcdaemon/rpcdaemontest/test_util.go index ee53c28a891..56e184cc2af 100644 --- a/cmd/rpcdaemon/rpcdaemontest/test_util.go +++ b/cmd/rpcdaemon/rpcdaemontest/test_util.go @@ -292,7 +292,7 @@ func CreateTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *g ethashApi := apis[1].Service.(*ethash.API) server := grpc.NewServer() - remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, false)) + remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, nil, false)) txpool.RegisterTxpoolServer(server, m.TxPoolGrpcServer) txpool.RegisterMiningServer(server, privateapi.NewMiningServer(ctx, &IsMiningMock{}, ethashApi)) starknet.RegisterCAIROVMServer(server, &starknet.UnimplementedCAIROVMServer{}) diff --git a/eth/backend.go b/eth/backend.go index 293f3989309..02a6207a361 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -412,7 +412,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere // Initialize ethbackend ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events, blockReader, chainConfig, backend.sentriesClient.Hd.BeaconRequestList, backend.sentriesClient.Hd.PayloadStatusCh, - assembleBlockPOS, config.Miner.EnabledPOS) + assembleBlockPOS, backend.sentriesClient.Hd, config.Miner.EnabledPOS) miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi) if stack.Config().PrivateApiAddr != "" { diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index e8a4b7f863b..8e96b5b9bf6 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -180,7 +180,7 @@ func HeadersPOS( cfg.hd.ClearPendingPayloadHash() cfg.hd.SetPendingPayloadStatus(nil) - var payloadStatus *privateapi.PayloadStatus + var payloadStatus *engineapi.PayloadStatus if forkChoiceInsteadOfNewPayload { payloadStatus, err = startHandlingForkChoice(forkChoiceMessage, requestStatus, requestId, s, u, ctx, tx, cfg, headerInserter) } else { @@ -190,7 +190,7 @@ func HeadersPOS( if err != nil { if requestStatus == engineapi.New { - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err} + cfg.hd.PayloadStatusCh <- engineapi.PayloadStatus{CriticalError: err} } return err } @@ -257,7 +257,7 @@ func startHandlingForkChoice( tx kv.RwTx, cfg HeadersCfg, headerInserter *headerdownload.HeaderInserter, -) (*privateapi.PayloadStatus, error) { +) (*engineapi.PayloadStatus, error) { if cfg.memoryOverlay { defer cfg.forkValidator.ClearWithUnwind(tx, cfg.notifications.Accumulator, cfg.notifications.StateChangesConsumer) } @@ -274,27 +274,17 @@ func startHandlingForkChoice( return nil, err } if canonical { - return &privateapi.PayloadStatus{ + return &engineapi.PayloadStatus{ Status: remote.EngineStatus_VALID, LatestValidHash: currentHeadHash, }, nil } else { - return &privateapi.PayloadStatus{ + return &engineapi.PayloadStatus{ CriticalError: &privateapi.InvalidForkchoiceStateErr, }, nil } } - bad, lastValidHash := cfg.hd.IsBadHeaderPoS(headerHash) - if bad { - log.Warn(fmt.Sprintf("[%s] Fork choice bad head block", s.LogPrefix()), "headerHash", headerHash) - cfg.hd.BeaconRequestList.Remove(requestId) - return &privateapi.PayloadStatus{ - Status: remote.EngineStatus_INVALID, - LatestValidHash: lastValidHash, - }, nil - } - // Header itself may already be in the snapshots, if CL starts off at much earlier state than Erigon header, err := cfg.blockReader.HeaderByHash(ctx, tx, headerHash) if err != nil { @@ -307,33 +297,12 @@ func startHandlingForkChoice( log.Info(fmt.Sprintf("[%s] Fork choice missing header with hash %x", s.LogPrefix(), headerHash)) cfg.hd.SetPoSDownloaderTip(headerHash) schedulePoSDownload(requestId, headerHash, 0 /* header height is unknown, setting to 0 */, s, cfg) - return &privateapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil + return &engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil } cfg.hd.BeaconRequestList.Remove(requestId) headerNumber := header.Number.Uint64() - // If header is canonical, then no reorgs are required - canonicalHash, err := rawdb.ReadCanonicalHash(tx, headerNumber) - if err != nil { - log.Warn(fmt.Sprintf("[%s] Fork choice err (reading canonical hash of %d)", s.LogPrefix(), headerNumber), "err", err) - cfg.hd.BeaconRequestList.Remove(requestId) - return nil, err - } - - if headerHash == canonicalHash { - log.Info(fmt.Sprintf("[%s] Fork choice on previously known block", s.LogPrefix())) - cfg.hd.BeaconRequestList.Remove(requestId) - // Per the Engine API spec: - // Client software MAY skip an update of the forkchoice state and MUST NOT begin a payload build process - // if forkchoiceState.headBlockHash references an ancestor of the head of canonical chain. - // In the case of such an event, client software MUST return - // {payloadStatus: {status: VALID, latestValidHash: forkchoiceState.headBlockHash, validationError: null}, payloadId: null}. - return &privateapi.PayloadStatus{ - Status: remote.EngineStatus_VALID, - LatestValidHash: headerHash, - }, nil - } if cfg.memoryOverlay && headerHash == cfg.forkValidator.ExtendingForkHeadHash() { log.Info("Flushing in-memory state") @@ -350,7 +319,7 @@ func startHandlingForkChoice( cfg.hd.SetPendingPayloadHash(headerHash) return nil, nil } else { - return &privateapi.PayloadStatus{ + return &engineapi.PayloadStatus{ CriticalError: &privateapi.InvalidForkchoiceStateErr, }, nil } @@ -369,7 +338,7 @@ func startHandlingForkChoice( // TODO(yperbasis): what if some bodies are missing and we have to download them? cfg.hd.SetPendingPayloadHash(headerHash) } else { - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{Status: remote.EngineStatus_SYNCING} + cfg.hd.PayloadStatusCh <- engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING} } } @@ -418,7 +387,7 @@ func finishHandlingForkChoice( if !canonical { if cfg.hd.GetPendingPayloadHash() != (common.Hash{}) { - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{ + cfg.hd.PayloadStatusCh <- engineapi.PayloadStatus{ CriticalError: &privateapi.InvalidForkchoiceStateErr, } } @@ -438,7 +407,7 @@ func handleNewPayload( tx kv.RwTx, cfg HeadersCfg, headerInserter *headerdownload.HeaderInserter, -) (*privateapi.PayloadStatus, error) { +) (*engineapi.PayloadStatus, error) { header := block.Header() headerNumber := header.Number.Uint64() headerHash := block.Hash() @@ -446,40 +415,6 @@ func handleNewPayload( log.Debug(fmt.Sprintf("[%s] Handling new payload", s.LogPrefix()), "height", headerNumber, "hash", headerHash) cfg.hd.UpdateTopSeenHeightPoS(headerNumber) - existingCanonicalHash, err := rawdb.ReadCanonicalHash(tx, headerNumber) - if err != nil { - log.Warn(fmt.Sprintf("[%s] New payload err", s.LogPrefix()), "err", err) - cfg.hd.BeaconRequestList.Remove(requestId) - return nil, err - } - - if existingCanonicalHash != (common.Hash{}) && headerHash == existingCanonicalHash { - log.Info(fmt.Sprintf("[%s] New payload: previously received valid header %d", s.LogPrefix(), headerNumber)) - cfg.hd.BeaconRequestList.Remove(requestId) - return &privateapi.PayloadStatus{ - Status: remote.EngineStatus_VALID, - LatestValidHash: headerHash, - }, nil - } - - bad, lastValidHash := cfg.hd.IsBadHeaderPoS(headerHash) - if bad { - log.Warn(fmt.Sprintf("[%s] Previously known bad block", s.LogPrefix()), "height", headerNumber, "hash", headerHash) - } else { - bad, lastValidHash = cfg.hd.IsBadHeaderPoS(header.ParentHash) - if bad { - log.Warn(fmt.Sprintf("[%s] Previously known bad parent", s.LogPrefix()), "height", headerNumber, "hash", headerHash, "parentHash", header.ParentHash) - } - } - if bad { - cfg.hd.BeaconRequestList.Remove(requestId) - cfg.hd.ReportBadHeaderPoS(headerHash, lastValidHash) - return &privateapi.PayloadStatus{ - Status: remote.EngineStatus_INVALID, - LatestValidHash: lastValidHash, - }, nil - } - parent, err := cfg.blockReader.HeaderByHash(ctx, tx, header.ParentHash) if err != nil { return nil, err @@ -488,18 +423,7 @@ func handleNewPayload( log.Info(fmt.Sprintf("[%s] New payload missing parent", s.LogPrefix())) cfg.hd.SetPoSDownloaderTip(headerHash) schedulePoSDownload(requestId, header.ParentHash, headerNumber-1, s, cfg) - return &privateapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil - } - - if headerNumber != parent.Number.Uint64()+1 { - log.Warn(fmt.Sprintf("[%s] Invalid block number", s.LogPrefix()), "headerNumber", headerNumber, "parentNumber", parent.Number.Uint64()) - cfg.hd.BeaconRequestList.Remove(requestId) - cfg.hd.ReportBadHeaderPoS(headerHash, header.ParentHash) - return &privateapi.PayloadStatus{ - Status: remote.EngineStatus_INVALID, - LatestValidHash: header.ParentHash, - ValidationError: errors.New("invalid block number"), - }, nil + return &engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil } cfg.hd.BeaconRequestList.Remove(requestId) @@ -526,7 +450,7 @@ func verifyAndSaveNewPoSHeader( cfg HeadersCfg, block *types.Block, headerInserter *headerdownload.HeaderInserter, -) (response *privateapi.PayloadStatus, success bool, err error) { +) (response *engineapi.PayloadStatus, success bool, err error) { header := block.Header() headerNumber := header.Number.Uint64() headerHash := block.Hash() @@ -534,7 +458,7 @@ func verifyAndSaveNewPoSHeader( if verificationErr := cfg.hd.VerifyHeader(header); verificationErr != nil { log.Warn("Verification failed for header", "hash", headerHash, "height", headerNumber, "err", verificationErr) cfg.hd.ReportBadHeaderPoS(headerHash, header.ParentHash) - return &privateapi.PayloadStatus{ + return &engineapi.PayloadStatus{ Status: remote.EngineStatus_INVALID, LatestValidHash: header.ParentHash, ValidationError: verificationErr, @@ -565,7 +489,7 @@ func verifyAndSaveNewPoSHeader( } else if err := headerInserter.FeedHeaderPoS(tx, header, headerHash); err != nil { return nil, false, err } - return &privateapi.PayloadStatus{ + return &engineapi.PayloadStatus{ Status: status, LatestValidHash: latestValidHash, ValidationError: validationError, @@ -578,7 +502,7 @@ func verifyAndSaveNewPoSHeader( if !canExtendCanonical { log.Info("Side chain", "parentHash", header.ParentHash, "currentHead", currentHeadHash) - return &privateapi.PayloadStatus{Status: remote.EngineStatus_ACCEPTED}, true, nil + return &engineapi.PayloadStatus{Status: remote.EngineStatus_ACCEPTED}, true, nil } // OK, we're on the canonical chain @@ -725,7 +649,7 @@ func forkingPoint( func handleInterrupt(interrupt engineapi.Interrupt, cfg HeadersCfg, tx kv.RwTx, headerInserter *headerdownload.HeaderInserter, useExternalTx bool) (bool, error) { if interrupt != engineapi.None { if interrupt == engineapi.Stopping { - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: errors.New("server is stopping")} + cfg.hd.PayloadStatusCh <- engineapi.PayloadStatus{CriticalError: errors.New("server is stopping")} } if interrupt == engineapi.Synced { verifyAndSaveDownloadedPoSHeaders(tx, cfg, headerInserter) diff --git a/ethdb/privateapi/ethbackend.go b/ethdb/privateapi/ethbackend.go index f4abc963acf..84132ff86c6 100644 --- a/ethdb/privateapi/ethbackend.go +++ b/ethdb/privateapi/ethbackend.go @@ -24,6 +24,7 @@ import ( "github.com/ledgerwatch/erigon/turbo/builder" "github.com/ledgerwatch/erigon/turbo/engineapi" "github.com/ledgerwatch/erigon/turbo/services" + "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" "github.com/ledgerwatch/log/v3" "golang.org/x/exp/slices" "google.golang.org/protobuf/types/known/emptypb" @@ -58,11 +59,12 @@ type EthBackendServer struct { // Send Beacon Chain requests to staged sync requestList *engineapi.RequestList // Replies to newPayload & forkchoice requests - statusCh <-chan PayloadStatus + statusCh <-chan engineapi.PayloadStatus builderFunc builder.BlockBuilderFunc proposing bool lock sync.Mutex // Engine API is asynchronous, we want to avoid CL to call different APIs at the same time logsFilter *LogsFilterAggregator + hd *headerdownload.HeaderDownload } type EthBackend interface { @@ -73,23 +75,13 @@ type EthBackend interface { Peers(ctx context.Context) (*remote.PeersReply, error) } -// This is the status of a newly execute block. -// Hash: Block hash -// Status: block's status -type PayloadStatus struct { - Status remote.EngineStatus - LatestValidHash common.Hash - ValidationError error - CriticalError error -} - func NewEthBackendServer(ctx context.Context, eth EthBackend, db kv.RwDB, events *Events, blockReader services.BlockAndTxnReader, - config *params.ChainConfig, requestList *engineapi.RequestList, statusCh <-chan PayloadStatus, - builderFunc builder.BlockBuilderFunc, proposing bool, + config *params.ChainConfig, requestList *engineapi.RequestList, statusCh <-chan engineapi.PayloadStatus, + builderFunc builder.BlockBuilderFunc, hd *headerdownload.HeaderDownload, proposing bool, ) *EthBackendServer { s := &EthBackendServer{ctx: ctx, eth: eth, events: events, db: db, blockReader: blockReader, config: config, requestList: requestList, statusCh: statusCh, builders: make(map[uint64]*builder.BlockBuilder), - builderFunc: builderFunc, proposing: proposing, logsFilter: NewLogsFilterAggregator(events), + builderFunc: builderFunc, proposing: proposing, logsFilter: NewLogsFilterAggregator(events), hd: hd, } ch, clean := s.events.AddLogsSubscription() @@ -244,7 +236,7 @@ func (s *EthBackendServer) Block(ctx context.Context, req *remote.BlockRequest) return &remote.BlockReply{BlockRlp: blockRlp, Senders: sendersBytes}, nil } -func convertPayloadStatus(payloadStatus *PayloadStatus) *remote.EnginePayloadStatus { +func convertPayloadStatus(payloadStatus *engineapi.PayloadStatus) *remote.EnginePayloadStatus { reply := remote.EnginePayloadStatus{Status: payloadStatus.Status} if payloadStatus.LatestValidHash != (common.Hash{}) { reply.LatestValidHash = gointerfaces.ConvertHashToH256(payloadStatus.LatestValidHash) @@ -333,7 +325,8 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E }, nil } block := types.NewBlockFromStorage(blockHash, &header, transactions, nil) - + // Lock database access + s.lock.Lock() tx, err := s.db.BeginRo(ctx) if err != nil { return nil, err @@ -344,12 +337,22 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E if err != nil { return nil, err } + + tx.Rollback() + s.lock.Unlock() + if parentTd != nil && parentTd.Cmp(s.config.TerminalTotalDifficulty) < 0 { log.Warn("[NewPayload] TTD not reached yet", "height", header.Number, "hash", common.Hash(blockHash)) return &remote.EnginePayloadStatus{Status: remote.EngineStatus_INVALID, LatestValidHash: gointerfaces.ConvertHashToH256(common.Hash{})}, nil } - tx.Rollback() + possibleStatus, err := s.getPayloadStatusFromHashIfPossible(blockHash, req.BlockNumber, header.ParentHash, true) + if err != nil { + return nil, err + } + if possibleStatus != nil { + return convertPayloadStatus(possibleStatus), nil + } // If another payload is already commissioned then we just reply with syncing if s.stageLoopIsBusy() { // We are still syncing a commissioned payload @@ -361,12 +364,6 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E return &remote.EnginePayloadStatus{Status: remote.EngineStatus_SYNCING}, nil } - // Lock the thread (We modify shared resources). - log.Debug("[NewPayload] acquiring lock") - s.lock.Lock() - defer s.lock.Unlock() - log.Debug("[NewPayload] lock acquired") - log.Debug("[NewPayload] sending block", "height", header.Number, "hash", common.Hash(blockHash)) s.requestList.AddPayloadRequest(block) @@ -380,6 +377,93 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E return convertPayloadStatus(&payloadStatus), nil } +// Check if we can make out a status from the payload hash/head hash. +func (s *EthBackendServer) getPayloadStatusFromHashIfPossible(blockHash common.Hash, blockNumber uint64, parentHash common.Hash, newPayload bool) (*engineapi.PayloadStatus, error) { + s.lock.Lock() + defer s.lock.Unlock() + var prefix string + if newPayload { + prefix = "NewPayload" + } else { + prefix = "ForkChoiceUpdated" + } + tx, err := s.db.BeginRo(s.ctx) + if err != nil { + return nil, err + } + defer tx.Rollback() + + header, err := rawdb.ReadHeaderByHash(tx, blockHash) + if err != nil { + return nil, err + } + var parent *types.Header + if newPayload { + parent, err = rawdb.ReadHeaderByHash(tx, parentHash) + } else if header != nil { + parent, err = rawdb.ReadHeaderByHash(tx, header.ParentHash) + } + if err != nil { + return nil, err + } + + if newPayload && parent != nil && blockNumber != parent.Number.Uint64()+1 { + log.Warn(fmt.Sprintf("[%s] Invalid block number", prefix), "headerNumber", blockNumber, "parentNumber", parent.Number.Uint64()) + s.hd.ReportBadHeaderPoS(blockHash, parent.Hash()) + return &engineapi.PayloadStatus{ + Status: remote.EngineStatus_INVALID, + LatestValidHash: parent.Hash(), + ValidationError: errors.New("invalid block number"), + }, nil + } + // Check if we already determined if the hash is attributed to a previously received invalid header. + bad, lastValidHash := s.hd.IsBadHeaderPoS(blockHash) + if bad { + log.Warn(fmt.Sprintf("[%s] Previously known bad block", prefix), "hash", blockHash) + } else if !newPayload { + bad, lastValidHash = s.hd.IsBadHeaderPoS(parentHash) + if bad { + log.Warn(fmt.Sprintf("[%s] Previously known bad block", prefix), "hash", blockHash, "parentHash", parentHash) + } + } + if bad { + s.hd.ReportBadHeaderPoS(blockHash, lastValidHash) + return &engineapi.PayloadStatus{Status: remote.EngineStatus_INVALID, LatestValidHash: lastValidHash}, nil + } + + // If header is already validated or has a missing parent, you can either return VALID or SYNCING. + if newPayload { + if header != nil { + return &engineapi.PayloadStatus{Status: remote.EngineStatus_VALID, LatestValidHash: blockHash}, nil + } + + if parent == nil && s.hd.PosStatus() == headerdownload.Syncing { + return &engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil + } + + return nil, nil + } + + if header == nil { + if s.hd.PosStatus() == headerdownload.Syncing { + return &engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil + + } + return nil, nil + } + + headHash := rawdb.ReadHeadBlockHash(tx) + canonicalHash, err := rawdb.ReadCanonicalHash(tx, header.Number.Uint64()) + if err != nil { + return nil, err + } + + if blockHash != headHash && canonicalHash == blockHash { + return &engineapi.PayloadStatus{Status: remote.EngineStatus_VALID, LatestValidHash: blockHash}, nil + } + return nil, nil +} + // EngineGetPayloadV1 retrieves previously assembled payload (Validators only) func (s *EthBackendServer) EngineGetPayloadV1(ctx context.Context, req *remote.EngineGetPayloadRequest) (*types2.ExecutionPayload, error) { if !s.proposing { @@ -446,11 +530,14 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r FinalizedBlockHash: gointerfaces.ConvertH256ToHash(req.ForkchoiceState.FinalizedBlockHash), } + s.lock.Lock() tx1, err := s.db.BeginRo(ctx) if err != nil { return nil, err } defer tx1.Rollback() + s.lock.Unlock() + td, err := rawdb.ReadTdByHash(tx1, forkChoice.HeadBlockHash) tx1.Rollback() if err != nil { @@ -463,31 +550,32 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r }, nil } - if s.stageLoopIsBusy() { - log.Debug("[ForkChoiceUpdated] stage loop is busy") - return &remote.EngineForkChoiceUpdatedReply{ - PayloadStatus: &remote.EnginePayloadStatus{Status: remote.EngineStatus_SYNCING}, - }, nil + status, err := s.getPayloadStatusFromHashIfPossible(forkChoice.HeadBlockHash, 0, common.Hash{}, false) + if err != nil { + return nil, err } + if status == nil { + if s.stageLoopIsBusy() { + log.Debug("[ForkChoiceUpdated] stage loop is busy") + return &remote.EngineForkChoiceUpdatedReply{ + PayloadStatus: &remote.EnginePayloadStatus{Status: remote.EngineStatus_SYNCING}, + }, nil + } + log.Debug("[ForkChoiceUpdated] sending forkChoiceMessage", "head", forkChoice.HeadBlockHash) + s.requestList.AddForkChoiceRequest(&forkChoice) - log.Debug("[ForkChoiceUpdated] acquiring lock") - s.lock.Lock() - defer s.lock.Unlock() - log.Debug("[ForkChoiceUpdated] lock acquired") - - log.Debug("[ForkChoiceUpdated] sending forkChoiceMessage", "head", forkChoice.HeadBlockHash) - s.requestList.AddForkChoiceRequest(&forkChoice) - - status := <-s.statusCh - log.Debug("[ForkChoiceUpdated] got reply", "payloadStatus", status) + statusRef := <-s.statusCh + status = &statusRef + log.Debug("[ForkChoiceUpdated] got reply", "payloadStatus", status) - if status.CriticalError != nil { - return nil, status.CriticalError + if status.CriticalError != nil { + return nil, status.CriticalError + } } // No need for payload building if req.PayloadAttributes == nil || status.Status != remote.EngineStatus_VALID { - return &remote.EngineForkChoiceUpdatedReply{PayloadStatus: convertPayloadStatus(&status)}, nil + return &remote.EngineForkChoiceUpdatedReply{PayloadStatus: convertPayloadStatus(status)}, nil } if !s.proposing { @@ -514,7 +602,7 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r log.Warn("Skipping payload building because forkchoiceState.headBlockHash is not the head of the canonical chain", "forkChoice.HeadBlockHash", forkChoice.HeadBlockHash, "headHeader.Hash", headHeader.Hash()) - return &remote.EngineForkChoiceUpdatedReply{PayloadStatus: convertPayloadStatus(&status)}, nil + return &remote.EngineForkChoiceUpdatedReply{PayloadStatus: convertPayloadStatus(status)}, nil } if headHeader.Time >= req.PayloadAttributes.Timestamp { diff --git a/turbo/engineapi/request_list.go b/turbo/engineapi/request_list.go index 11a2bc0ba13..455a38825c0 100644 --- a/turbo/engineapi/request_list.go +++ b/turbo/engineapi/request_list.go @@ -6,10 +6,21 @@ import ( "github.com/emirpasic/gods/maps/treemap" + "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/core/types" ) +// This is the status of a newly execute block. +// Hash: Block hash +// Status: block's status +type PayloadStatus struct { + Status remote.EngineStatus + LatestValidHash common.Hash + ValidationError error + CriticalError error +} + // The message we are going to send to the stage sync in ForkchoiceUpdated type ForkChoiceMessage struct { HeadBlockHash common.Hash diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index 1f9d9f83808..53cea0779bc 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -26,7 +26,6 @@ import ( "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" - "github.com/ledgerwatch/erigon/ethdb/privateapi" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/engineapi" @@ -1134,13 +1133,13 @@ func (hd *HeaderDownload) ClearPendingPayloadHash() { hd.pendingPayloadHash = common.Hash{} } -func (hd *HeaderDownload) GetPendingPayloadStatus() *privateapi.PayloadStatus { +func (hd *HeaderDownload) GetPendingPayloadStatus() *engineapi.PayloadStatus { hd.lock.RLock() defer hd.lock.RUnlock() return hd.pendingPayloadStatus } -func (hd *HeaderDownload) SetPendingPayloadStatus(response *privateapi.PayloadStatus) { +func (hd *HeaderDownload) SetPendingPayloadStatus(response *engineapi.PayloadStatus) { hd.lock.Lock() defer hd.lock.Unlock() hd.pendingPayloadStatus = response diff --git a/turbo/stages/headerdownload/header_data_struct.go b/turbo/stages/headerdownload/header_data_struct.go index 1a097fcf28b..a858f1554fa 100644 --- a/turbo/stages/headerdownload/header_data_struct.go +++ b/turbo/stages/headerdownload/header_data_struct.go @@ -12,7 +12,6 @@ import ( "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/ethdb/privateapi" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/engineapi" "github.com/ledgerwatch/erigon/turbo/services" @@ -305,16 +304,16 @@ type HeaderDownload struct { requestId int posAnchor *Anchor posStatus SyncStatus - posSync bool // Whether the chain is syncing in the PoS mode - headersCollector *etl.Collector // ETL collector for headers - BeaconRequestList *engineapi.RequestList // Requests from ethbackend to staged sync - PayloadStatusCh chan privateapi.PayloadStatus // Responses (validation/execution status) - pendingPayloadHash common.Hash // Header whose status we still should send to PayloadStatusCh - pendingPayloadStatus *privateapi.PayloadStatus // Alternatively, there can be an already prepared response to send to PayloadStatusCh - unsettledForkChoice *engineapi.ForkChoiceMessage // Forkchoice to process after unwind - unsettledHeadHeight uint64 // Height of unsettledForkChoice.headBlockHash - posDownloaderTip common.Hash // See https://hackmd.io/GDc0maGsQeKfP8o2C7L52w - badPoSHeaders map[common.Hash]common.Hash // Invalid Tip -> Last Valid Ancestor + posSync bool // Whether the chain is syncing in the PoS mode + headersCollector *etl.Collector // ETL collector for headers + BeaconRequestList *engineapi.RequestList // Requests from ethbackend to staged sync + PayloadStatusCh chan engineapi.PayloadStatus // Responses (validation/execution status) + pendingPayloadHash common.Hash // Header whose status we still should send to PayloadStatusCh + pendingPayloadStatus *engineapi.PayloadStatus // Alternatively, there can be an already prepared response to send to PayloadStatusCh + unsettledForkChoice *engineapi.ForkChoiceMessage // Forkchoice to process after unwind + unsettledHeadHeight uint64 // Height of unsettledForkChoice.headBlockHash + posDownloaderTip common.Hash // See https://hackmd.io/GDc0maGsQeKfP8o2C7L52w + badPoSHeaders map[common.Hash]common.Hash // Invalid Tip -> Last Valid Ancestor } // HeaderRecord encapsulates two forms of the same header - raw RLP encoding (to avoid duplicated decodings and encodings), and parsed value types.Header @@ -343,7 +342,7 @@ func NewHeaderDownload( DeliveryNotify: make(chan struct{}, 1), QuitPoWMining: make(chan struct{}), BeaconRequestList: engineapi.NewRequestList(), - PayloadStatusCh: make(chan privateapi.PayloadStatus, 1), + PayloadStatusCh: make(chan engineapi.PayloadStatus, 1), headerReader: headerReader, badPoSHeaders: make(map[common.Hash]common.Hash), } diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 774e98fab39..134bdc48ff2 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -530,7 +530,7 @@ func (ms *MockSentry) SendForkChoiceRequest(message *engineapi.ForkChoiceMessage ms.sentriesClient.Hd.BeaconRequestList.AddForkChoiceRequest(message) } -func (ms *MockSentry) ReceivePayloadStatus() privateapi.PayloadStatus { +func (ms *MockSentry) ReceivePayloadStatus() engineapi.PayloadStatus { return <-ms.sentriesClient.Hd.PayloadStatusCh } diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 5280dc8b7e3..e2552990af9 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -23,7 +23,6 @@ import ( "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/stagedsync" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" - "github.com/ledgerwatch/erigon/ethdb/privateapi" "github.com/ledgerwatch/erigon/p2p" "github.com/ledgerwatch/erigon/turbo/engineapi" "github.com/ledgerwatch/erigon/turbo/services" @@ -35,13 +34,13 @@ import ( func SendPayloadStatus(hd *headerdownload.HeaderDownload, headBlockHash common.Hash, err error) { if pendingPayloadStatus := hd.GetPendingPayloadStatus(); pendingPayloadStatus != nil { if err != nil { - hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err} + hd.PayloadStatusCh <- engineapi.PayloadStatus{CriticalError: err} } else { hd.PayloadStatusCh <- *pendingPayloadStatus } } else if pendingPayloadHash := hd.GetPendingPayloadHash(); pendingPayloadHash != (common.Hash{}) { if err != nil { - hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err} + hd.PayloadStatusCh <- engineapi.PayloadStatus{CriticalError: err} } else { var status remote.EngineStatus if headBlockHash == pendingPayloadHash { @@ -50,7 +49,7 @@ func SendPayloadStatus(hd *headerdownload.HeaderDownload, headBlockHash common.H log.Warn("Failed to execute pending payload", "pendingPayload", pendingPayloadHash, "headBlock", headBlockHash) status = remote.EngineStatus_INVALID } - hd.PayloadStatusCh <- privateapi.PayloadStatus{ + hd.PayloadStatusCh <- engineapi.PayloadStatus{ Status: status, LatestValidHash: headBlockHash, } From 78e82ceac36f89dabebb5daf785a554e7b884989 Mon Sep 17 00:00:00 2001 From: giuliorebuffo Date: Sat, 23 Jul 2022 13:18:12 +0200 Subject: [PATCH 2/5] fix lint + test --- cmd/rpcdaemon/commands/eth_subscribe_test.go | 2 +- cmd/rpcdaemon22/rpcdaemontest/test_util.go | 2 +- ethdb/privateapi/engine_test.go | 22 ++++++++++---------- ethdb/privateapi/ethbackend.go | 4 ++++ turbo/stages/sentry_mock_test.go | 9 ++++---- 5 files changed, 21 insertions(+), 18 deletions(-) diff --git a/cmd/rpcdaemon/commands/eth_subscribe_test.go b/cmd/rpcdaemon/commands/eth_subscribe_test.go index b7c8531e3d4..f68ed5108f1 100644 --- a/cmd/rpcdaemon/commands/eth_subscribe_test.go +++ b/cmd/rpcdaemon/commands/eth_subscribe_test.go @@ -40,7 +40,7 @@ func TestEthSubscribe(t *testing.T) { m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed ctx := context.Background() - backendServer := privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, false) + backendServer := privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, nil, false) backendClient := direct.NewEthBackendClientDirect(backendServer) backend := rpcservices.NewRemoteBackend(backendClient, m.DB, snapshotsync.NewBlockReader()) ff := rpchelper.New(ctx, backend, nil, nil, func() {}) diff --git a/cmd/rpcdaemon22/rpcdaemontest/test_util.go b/cmd/rpcdaemon22/rpcdaemontest/test_util.go index ad73c3faad4..a0cb856fb38 100644 --- a/cmd/rpcdaemon22/rpcdaemontest/test_util.go +++ b/cmd/rpcdaemon22/rpcdaemontest/test_util.go @@ -293,7 +293,7 @@ func CreateTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *g ethashApi := apis[1].Service.(*ethash.API) server := grpc.NewServer() - remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, false)) + remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, nil, false)) txpool.RegisterTxpoolServer(server, m.TxPoolGrpcServer) txpool.RegisterMiningServer(server, privateapi.NewMiningServer(ctx, &IsMiningMock{}, ethashApi)) starknet.RegisterCAIROVMServer(server, &starknet.UnimplementedCAIROVMServer{}) diff --git a/ethdb/privateapi/engine_test.go b/ethdb/privateapi/engine_test.go index 42903819ad9..78d02621c26 100644 --- a/ethdb/privateapi/engine_test.go +++ b/ethdb/privateapi/engine_test.go @@ -90,10 +90,10 @@ func TestMockDownloadRequest(t *testing.T) { makeTestDb(ctx, db) beaconRequestList := engineapi.NewRequestList() - statusCh := make(chan PayloadStatus) + statusCh := make(chan engineapi.PayloadStatus) events := NewEvents() - backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, false) + backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, nil, false) var err error var reply *remote.EnginePayloadStatus @@ -105,7 +105,7 @@ func TestMockDownloadRequest(t *testing.T) { }() beaconRequestList.WaitForRequest(true) - statusCh <- PayloadStatus{Status: remote.EngineStatus_SYNCING} + statusCh <- engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING} <-done require.NoError(err) require.Equal(reply.Status, remote.EngineStatus_SYNCING) @@ -149,10 +149,10 @@ func TestMockValidExecution(t *testing.T) { makeTestDb(ctx, db) beaconRequestList := engineapi.NewRequestList() - statusCh := make(chan PayloadStatus) + statusCh := make(chan engineapi.PayloadStatus) events := NewEvents() - backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, false) + backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, nil, false) var err error var reply *remote.EnginePayloadStatus @@ -165,7 +165,7 @@ func TestMockValidExecution(t *testing.T) { beaconRequestList.WaitForRequest(true) - statusCh <- PayloadStatus{ + statusCh <- engineapi.PayloadStatus{ Status: remote.EngineStatus_VALID, LatestValidHash: payload3Hash, } @@ -185,10 +185,10 @@ func TestMockInvalidExecution(t *testing.T) { makeTestDb(ctx, db) beaconRequestList := engineapi.NewRequestList() - statusCh := make(chan PayloadStatus) + statusCh := make(chan engineapi.PayloadStatus) events := NewEvents() - backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, false) + backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, nil, false) var err error var reply *remote.EnginePayloadStatus @@ -201,7 +201,7 @@ func TestMockInvalidExecution(t *testing.T) { beaconRequestList.WaitForRequest(true) // Simulate invalid status - statusCh <- PayloadStatus{ + statusCh <- engineapi.PayloadStatus{ Status: remote.EngineStatus_INVALID, LatestValidHash: startingHeadHash, } @@ -221,10 +221,10 @@ func TestNoTTD(t *testing.T) { makeTestDb(ctx, db) beaconRequestList := engineapi.NewRequestList() - statusCh := make(chan PayloadStatus) + statusCh := make(chan engineapi.PayloadStatus) events := NewEvents() - backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{}, beaconRequestList, statusCh, nil, false) + backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{}, beaconRequestList, statusCh, nil, nil, false) var err error diff --git a/ethdb/privateapi/ethbackend.go b/ethdb/privateapi/ethbackend.go index 84132ff86c6..4649c09a6cb 100644 --- a/ethdb/privateapi/ethbackend.go +++ b/ethdb/privateapi/ethbackend.go @@ -381,6 +381,9 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E func (s *EthBackendServer) getPayloadStatusFromHashIfPossible(blockHash common.Hash, blockNumber uint64, parentHash common.Hash, newPayload bool) (*engineapi.PayloadStatus, error) { s.lock.Lock() defer s.lock.Unlock() + if s.hd == nil { + return nil, nil + } var prefix string if newPayload { prefix = "NewPayload" @@ -461,6 +464,7 @@ func (s *EthBackendServer) getPayloadStatusFromHashIfPossible(blockHash common.H if blockHash != headHash && canonicalHash == blockHash { return &engineapi.PayloadStatus{Status: remote.EngineStatus_VALID, LatestValidHash: blockHash}, nil } + return nil, nil } diff --git a/turbo/stages/sentry_mock_test.go b/turbo/stages/sentry_mock_test.go index bd8e552c1be..6e72276dea8 100644 --- a/turbo/stages/sentry_mock_test.go +++ b/turbo/stages/sentry_mock_test.go @@ -669,11 +669,10 @@ func TestPoSSyncWithInvalidHeader(t *testing.T) { FinalizedBlockHash: invalidTip.Hash(), } m.SendForkChoiceRequest(&forkChoiceMessage) - headBlockHash, err = stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, false, m.UpdateHead, nil) + _, err = stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, false, m.UpdateHead, nil) require.NoError(t, err) - stages.SendPayloadStatus(m.HeaderDownload(), headBlockHash, err) - payloadStatus2 := m.ReceivePayloadStatus() - require.Equal(t, remote.EngineStatus_INVALID, payloadStatus2.Status) - assert.Equal(t, lastValidHeader.Hash(), payloadStatus2.LatestValidHash) + bad, lastValidHash := m.HeaderDownload().IsBadHeaderPoS(invalidTip.Hash()) + assert.True(t, bad) + assert.Equal(t, lastValidHash, lastValidHeader.Hash()) } From a8d31fedce1374432d6cc2a1b8f4db4a87c1d1af Mon Sep 17 00:00:00 2001 From: giuliorebuffo Date: Sat, 23 Jul 2022 17:06:03 +0200 Subject: [PATCH 3/5] fixed comments --- cmd/rpcdaemon/commands/eth_subscribe_test.go | 2 +- cmd/rpcdaemon/rpcdaemontest/test_util.go | 2 +- eth/backend.go | 3 +- ethdb/privateapi/engine_test.go | 33 +++++++++---------- ethdb/privateapi/ethbackend.go | 34 ++++++++------------ 5 files changed, 32 insertions(+), 42 deletions(-) diff --git a/cmd/rpcdaemon/commands/eth_subscribe_test.go b/cmd/rpcdaemon/commands/eth_subscribe_test.go index f68ed5108f1..24f38220c45 100644 --- a/cmd/rpcdaemon/commands/eth_subscribe_test.go +++ b/cmd/rpcdaemon/commands/eth_subscribe_test.go @@ -40,7 +40,7 @@ func TestEthSubscribe(t *testing.T) { m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed ctx := context.Background() - backendServer := privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, nil, false) + backendServer := privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, false) backendClient := direct.NewEthBackendClientDirect(backendServer) backend := rpcservices.NewRemoteBackend(backendClient, m.DB, snapshotsync.NewBlockReader()) ff := rpchelper.New(ctx, backend, nil, nil, func() {}) diff --git a/cmd/rpcdaemon/rpcdaemontest/test_util.go b/cmd/rpcdaemon/rpcdaemontest/test_util.go index 56e184cc2af..f54a8429b29 100644 --- a/cmd/rpcdaemon/rpcdaemontest/test_util.go +++ b/cmd/rpcdaemon/rpcdaemontest/test_util.go @@ -292,7 +292,7 @@ func CreateTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *g ethashApi := apis[1].Service.(*ethash.API) server := grpc.NewServer() - remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, nil, false)) + remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, false)) txpool.RegisterTxpoolServer(server, m.TxPoolGrpcServer) txpool.RegisterMiningServer(server, privateapi.NewMiningServer(ctx, &IsMiningMock{}, ethashApi)) starknet.RegisterCAIROVMServer(server, &starknet.UnimplementedCAIROVMServer{}) diff --git a/eth/backend.go b/eth/backend.go index 02a6207a361..89cd9332846 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -411,8 +411,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere // Initialize ethbackend ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events, - blockReader, chainConfig, backend.sentriesClient.Hd.BeaconRequestList, backend.sentriesClient.Hd.PayloadStatusCh, - assembleBlockPOS, backend.sentriesClient.Hd, config.Miner.EnabledPOS) + blockReader, chainConfig, assembleBlockPOS, backend.sentriesClient.Hd, config.Miner.EnabledPOS) miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi) if stack.Config().PrivateApiAddr != "" { diff --git a/ethdb/privateapi/engine_test.go b/ethdb/privateapi/engine_test.go index 78d02621c26..74b0eab1efa 100644 --- a/ethdb/privateapi/engine_test.go +++ b/ethdb/privateapi/engine_test.go @@ -13,6 +13,7 @@ import ( "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/turbo/engineapi" + "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" "github.com/stretchr/testify/require" ) @@ -89,11 +90,10 @@ func TestMockDownloadRequest(t *testing.T) { require := require.New(t) makeTestDb(ctx, db) - beaconRequestList := engineapi.NewRequestList() - statusCh := make(chan engineapi.PayloadStatus) + hd := headerdownload.NewHeaderDownload(0, 0, nil, nil) events := NewEvents() - backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, nil, false) + backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, nil, hd, false) var err error var reply *remote.EnginePayloadStatus @@ -104,8 +104,8 @@ func TestMockDownloadRequest(t *testing.T) { done <- true }() - beaconRequestList.WaitForRequest(true) - statusCh <- engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING} + hd.BeaconRequestList.WaitForRequest(true) + hd.PayloadStatusCh <- engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING} <-done require.NoError(err) require.Equal(reply.Status, remote.EngineStatus_SYNCING) @@ -148,11 +148,10 @@ func TestMockValidExecution(t *testing.T) { makeTestDb(ctx, db) - beaconRequestList := engineapi.NewRequestList() - statusCh := make(chan engineapi.PayloadStatus) + hd := headerdownload.NewHeaderDownload(0, 0, nil, nil) events := NewEvents() - backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, nil, false) + backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, nil, hd, false) var err error var reply *remote.EnginePayloadStatus @@ -163,9 +162,9 @@ func TestMockValidExecution(t *testing.T) { done <- true }() - beaconRequestList.WaitForRequest(true) + hd.BeaconRequestList.WaitForRequest(true) - statusCh <- engineapi.PayloadStatus{ + hd.PayloadStatusCh <- engineapi.PayloadStatus{ Status: remote.EngineStatus_VALID, LatestValidHash: payload3Hash, } @@ -184,11 +183,10 @@ func TestMockInvalidExecution(t *testing.T) { makeTestDb(ctx, db) - beaconRequestList := engineapi.NewRequestList() - statusCh := make(chan engineapi.PayloadStatus) + hd := headerdownload.NewHeaderDownload(0, 0, nil, nil) events := NewEvents() - backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, nil, false) + backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, nil, hd, false) var err error var reply *remote.EnginePayloadStatus @@ -199,9 +197,9 @@ func TestMockInvalidExecution(t *testing.T) { done <- true }() - beaconRequestList.WaitForRequest(true) + hd.BeaconRequestList.WaitForRequest(true) // Simulate invalid status - statusCh <- engineapi.PayloadStatus{ + hd.PayloadStatusCh <- engineapi.PayloadStatus{ Status: remote.EngineStatus_INVALID, LatestValidHash: startingHeadHash, } @@ -220,11 +218,10 @@ func TestNoTTD(t *testing.T) { makeTestDb(ctx, db) - beaconRequestList := engineapi.NewRequestList() - statusCh := make(chan engineapi.PayloadStatus) + hd := headerdownload.NewHeaderDownload(0, 0, nil, nil) events := NewEvents() - backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{}, beaconRequestList, statusCh, nil, nil, false) + backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{}, nil, hd, false) var err error diff --git a/ethdb/privateapi/ethbackend.go b/ethdb/privateapi/ethbackend.go index 4649c09a6cb..c3df5fa5ba6 100644 --- a/ethdb/privateapi/ethbackend.go +++ b/ethdb/privateapi/ethbackend.go @@ -56,10 +56,7 @@ type EthBackendServer struct { // Block proposing for proof-of-stake payloadId uint64 builders map[uint64]*builder.BlockBuilder - // Send Beacon Chain requests to staged sync - requestList *engineapi.RequestList - // Replies to newPayload & forkchoice requests - statusCh <-chan engineapi.PayloadStatus + builderFunc builder.BlockBuilderFunc proposing bool lock sync.Mutex // Engine API is asynchronous, we want to avoid CL to call different APIs at the same time @@ -76,11 +73,10 @@ type EthBackend interface { } func NewEthBackendServer(ctx context.Context, eth EthBackend, db kv.RwDB, events *Events, blockReader services.BlockAndTxnReader, - config *params.ChainConfig, requestList *engineapi.RequestList, statusCh <-chan engineapi.PayloadStatus, - builderFunc builder.BlockBuilderFunc, hd *headerdownload.HeaderDownload, proposing bool, + config *params.ChainConfig, builderFunc builder.BlockBuilderFunc, hd *headerdownload.HeaderDownload, proposing bool, ) *EthBackendServer { s := &EthBackendServer{ctx: ctx, eth: eth, events: events, db: db, blockReader: blockReader, config: config, - requestList: requestList, statusCh: statusCh, builders: make(map[uint64]*builder.BlockBuilder), + builders: make(map[uint64]*builder.BlockBuilder), builderFunc: builderFunc, proposing: proposing, logsFilter: NewLogsFilterAggregator(events), hd: hd, } @@ -249,7 +245,7 @@ func convertPayloadStatus(payloadStatus *engineapi.PayloadStatus) *remote.Engine func (s *EthBackendServer) stageLoopIsBusy() bool { for i := 0; i < 20; i++ { - if !s.requestList.IsWaiting() { + if !s.hd.BeaconRequestList.IsWaiting() { // This might happen, for example, in the following scenario: // 1) CL sends NewPayload and immediately after that ForkChoiceUpdated. // 2) We happily process NewPayload and stage loop is at the end. @@ -261,7 +257,7 @@ func (s *EthBackendServer) stageLoopIsBusy() bool { time.Sleep(5 * time.Millisecond) } } - return !s.requestList.IsWaiting() + return !s.hd.BeaconRequestList.IsWaiting() } // EngineNewPayloadV1 validates and possibly executes payload @@ -325,8 +321,7 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E }, nil } block := types.NewBlockFromStorage(blockHash, &header, transactions, nil) - // Lock database access - s.lock.Lock() + tx, err := s.db.BeginRo(ctx) if err != nil { return nil, err @@ -339,7 +334,6 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E } tx.Rollback() - s.lock.Unlock() if parentTd != nil && parentTd.Cmp(s.config.TerminalTotalDifficulty) < 0 { log.Warn("[NewPayload] TTD not reached yet", "height", header.Number, "hash", common.Hash(blockHash)) @@ -363,11 +357,13 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E log.Debug("[NewPayload] stage loop is busy") return &remote.EnginePayloadStatus{Status: remote.EngineStatus_SYNCING}, nil } + s.lock.Lock() + defer s.lock.Unlock() log.Debug("[NewPayload] sending block", "height", header.Number, "hash", common.Hash(blockHash)) - s.requestList.AddPayloadRequest(block) + s.hd.BeaconRequestList.AddPayloadRequest(block) - payloadStatus := <-s.statusCh + payloadStatus := <-s.hd.PayloadStatusCh log.Debug("[NewPayload] got reply", "payloadStatus", payloadStatus) if payloadStatus.CriticalError != nil { @@ -379,8 +375,6 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E // Check if we can make out a status from the payload hash/head hash. func (s *EthBackendServer) getPayloadStatusFromHashIfPossible(blockHash common.Hash, blockNumber uint64, parentHash common.Hash, newPayload bool) (*engineapi.PayloadStatus, error) { - s.lock.Lock() - defer s.lock.Unlock() if s.hd == nil { return nil, nil } @@ -534,13 +528,11 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r FinalizedBlockHash: gointerfaces.ConvertH256ToHash(req.ForkchoiceState.FinalizedBlockHash), } - s.lock.Lock() tx1, err := s.db.BeginRo(ctx) if err != nil { return nil, err } defer tx1.Rollback() - s.lock.Unlock() td, err := rawdb.ReadTdByHash(tx1, forkChoice.HeadBlockHash) tx1.Rollback() @@ -558,6 +550,8 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r if err != nil { return nil, err } + s.lock.Lock() + defer s.lock.Unlock() if status == nil { if s.stageLoopIsBusy() { log.Debug("[ForkChoiceUpdated] stage loop is busy") @@ -566,9 +560,9 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r }, nil } log.Debug("[ForkChoiceUpdated] sending forkChoiceMessage", "head", forkChoice.HeadBlockHash) - s.requestList.AddForkChoiceRequest(&forkChoice) + s.hd.BeaconRequestList.AddForkChoiceRequest(&forkChoice) - statusRef := <-s.statusCh + statusRef := <-s.hd.PayloadStatusCh status = &statusRef log.Debug("[ForkChoiceUpdated] got reply", "payloadStatus", status) From 05b2709d8cba813b0ddff69d63b0789a02f797a0 Mon Sep 17 00:00:00 2001 From: giuliorebuffo Date: Sat, 23 Jul 2022 17:31:01 +0200 Subject: [PATCH 4/5] ops --- cmd/rpcdaemon22/rpcdaemontest/test_util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/rpcdaemon22/rpcdaemontest/test_util.go b/cmd/rpcdaemon22/rpcdaemontest/test_util.go index a0cb856fb38..a2d6c1eb143 100644 --- a/cmd/rpcdaemon22/rpcdaemontest/test_util.go +++ b/cmd/rpcdaemon22/rpcdaemontest/test_util.go @@ -293,7 +293,7 @@ func CreateTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *g ethashApi := apis[1].Service.(*ethash.API) server := grpc.NewServer() - remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, nil, false)) + remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, false)) txpool.RegisterTxpoolServer(server, m.TxPoolGrpcServer) txpool.RegisterMiningServer(server, privateapi.NewMiningServer(ctx, &IsMiningMock{}, ethashApi)) starknet.RegisterCAIROVMServer(server, &starknet.UnimplementedCAIROVMServer{}) From bfd9107b0d6124709502e0fe6becde1107736626 Mon Sep 17 00:00:00 2001 From: giuliorebuffo Date: Sat, 23 Jul 2022 18:32:20 +0200 Subject: [PATCH 5/5] little fixes here and there --- ethdb/privateapi/ethbackend.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/ethdb/privateapi/ethbackend.go b/ethdb/privateapi/ethbackend.go index c3df5fa5ba6..6e1c22b2dfd 100644 --- a/ethdb/privateapi/ethbackend.go +++ b/ethdb/privateapi/ethbackend.go @@ -397,8 +397,14 @@ func (s *EthBackendServer) getPayloadStatusFromHashIfPossible(blockHash common.H var parent *types.Header if newPayload { parent, err = rawdb.ReadHeaderByHash(tx, parentHash) - } else if header != nil { - parent, err = rawdb.ReadHeaderByHash(tx, header.ParentHash) + } + if err != nil { + return nil, err + } + + var canonicalHash common.Hash + if header != nil { + canonicalHash, err = rawdb.ReadCanonicalHash(tx, header.Number.Uint64()) } if err != nil { return nil, err @@ -417,7 +423,7 @@ func (s *EthBackendServer) getPayloadStatusFromHashIfPossible(blockHash common.H bad, lastValidHash := s.hd.IsBadHeaderPoS(blockHash) if bad { log.Warn(fmt.Sprintf("[%s] Previously known bad block", prefix), "hash", blockHash) - } else if !newPayload { + } else if newPayload { bad, lastValidHash = s.hd.IsBadHeaderPoS(parentHash) if bad { log.Warn(fmt.Sprintf("[%s] Previously known bad block", prefix), "hash", blockHash, "parentHash", parentHash) @@ -430,7 +436,7 @@ func (s *EthBackendServer) getPayloadStatusFromHashIfPossible(blockHash common.H // If header is already validated or has a missing parent, you can either return VALID or SYNCING. if newPayload { - if header != nil { + if header != nil && canonicalHash == blockHash { return &engineapi.PayloadStatus{Status: remote.EngineStatus_VALID, LatestValidHash: blockHash}, nil } @@ -450,7 +456,6 @@ func (s *EthBackendServer) getPayloadStatusFromHashIfPossible(blockHash common.H } headHash := rawdb.ReadHeadBlockHash(tx) - canonicalHash, err := rawdb.ReadCanonicalHash(tx, header.Number.Uint64()) if err != nil { return nil, err } @@ -550,8 +555,6 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r if err != nil { return nil, err } - s.lock.Lock() - defer s.lock.Unlock() if status == nil { if s.stageLoopIsBusy() { log.Debug("[ForkChoiceUpdated] stage loop is busy") @@ -559,6 +562,9 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r PayloadStatus: &remote.EnginePayloadStatus{Status: remote.EngineStatus_SYNCING}, }, nil } + s.lock.Lock() + defer s.lock.Unlock() + log.Debug("[ForkChoiceUpdated] sending forkChoiceMessage", "head", forkChoice.HeadBlockHash) s.hd.BeaconRequestList.AddForkChoiceRequest(&forkChoice) @@ -569,6 +575,9 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r if status.CriticalError != nil { return nil, status.CriticalError } + } else { + s.lock.Lock() + defer s.lock.Unlock() } // No need for payload building