Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid constantly triggering stageloop when using Engine API #4797

Merged
merged 5 commits into from
Jul 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/eth_subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestEthSubscribe(t *testing.T) {
m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed

ctx := context.Background()
backendServer := privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, false)
backendServer := privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, false)
backendClient := direct.NewEthBackendClientDirect(backendServer)
backend := rpcservices.NewRemoteBackend(backendClient, m.DB, snapshotsync.NewBlockReader())
ff := rpchelper.New(ctx, backend, nil, nil, func() {})
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/rpcdaemontest/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func CreateTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *g
ethashApi := apis[1].Service.(*ethash.API)
server := grpc.NewServer()

remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, false))
remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, false))
txpool.RegisterTxpoolServer(server, m.TxPoolGrpcServer)
txpool.RegisterMiningServer(server, privateapi.NewMiningServer(ctx, &IsMiningMock{}, ethashApi))
starknet.RegisterCAIROVMServer(server, &starknet.UnimplementedCAIROVMServer{})
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon22/rpcdaemontest/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func CreateTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *g
ethashApi := apis[1].Service.(*ethash.API)
server := grpc.NewServer()

remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, false))
remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, false))
txpool.RegisterTxpoolServer(server, m.TxPoolGrpcServer)
txpool.RegisterMiningServer(server, privateapi.NewMiningServer(ctx, &IsMiningMock{}, ethashApi))
starknet.RegisterCAIROVMServer(server, &starknet.UnimplementedCAIROVMServer{})
Expand Down
3 changes: 1 addition & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere

// Initialize ethbackend
ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events,
blockReader, chainConfig, backend.sentriesClient.Hd.BeaconRequestList, backend.sentriesClient.Hd.PayloadStatusCh,
assembleBlockPOS, config.Miner.EnabledPOS)
blockReader, chainConfig, assembleBlockPOS, backend.sentriesClient.Hd, config.Miner.EnabledPOS)
miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi)

if stack.Config().PrivateApiAddr != "" {
Expand Down
108 changes: 16 additions & 92 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func HeadersPOS(
cfg.hd.ClearPendingPayloadHash()
cfg.hd.SetPendingPayloadStatus(nil)

var payloadStatus *privateapi.PayloadStatus
var payloadStatus *engineapi.PayloadStatus
if forkChoiceInsteadOfNewPayload {
payloadStatus, err = startHandlingForkChoice(forkChoiceMessage, requestStatus, requestId, s, u, ctx, tx, cfg, headerInserter)
} else {
Expand All @@ -190,7 +190,7 @@ func HeadersPOS(

if err != nil {
if requestStatus == engineapi.New {
cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err}
cfg.hd.PayloadStatusCh <- engineapi.PayloadStatus{CriticalError: err}
}
return err
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func startHandlingForkChoice(
tx kv.RwTx,
cfg HeadersCfg,
headerInserter *headerdownload.HeaderInserter,
) (*privateapi.PayloadStatus, error) {
) (*engineapi.PayloadStatus, error) {
if cfg.memoryOverlay {
defer cfg.forkValidator.ClearWithUnwind(tx, cfg.notifications.Accumulator, cfg.notifications.StateChangesConsumer)
}
Expand All @@ -274,27 +274,17 @@ func startHandlingForkChoice(
return nil, err
}
if canonical {
return &privateapi.PayloadStatus{
return &engineapi.PayloadStatus{
Status: remote.EngineStatus_VALID,
LatestValidHash: currentHeadHash,
}, nil
} else {
return &privateapi.PayloadStatus{
return &engineapi.PayloadStatus{
CriticalError: &privateapi.InvalidForkchoiceStateErr,
}, nil
}
}

bad, lastValidHash := cfg.hd.IsBadHeaderPoS(headerHash)
if bad {
log.Warn(fmt.Sprintf("[%s] Fork choice bad head block", s.LogPrefix()), "headerHash", headerHash)
cfg.hd.BeaconRequestList.Remove(requestId)
return &privateapi.PayloadStatus{
Status: remote.EngineStatus_INVALID,
LatestValidHash: lastValidHash,
}, nil
}

// Header itself may already be in the snapshots, if CL starts off at much earlier state than Erigon
header, err := cfg.blockReader.HeaderByHash(ctx, tx, headerHash)
if err != nil {
Expand All @@ -307,33 +297,12 @@ func startHandlingForkChoice(
log.Info(fmt.Sprintf("[%s] Fork choice missing header with hash %x", s.LogPrefix(), headerHash))
cfg.hd.SetPoSDownloaderTip(headerHash)
schedulePoSDownload(requestId, headerHash, 0 /* header height is unknown, setting to 0 */, s, cfg)
return &privateapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil
return &engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil
}

cfg.hd.BeaconRequestList.Remove(requestId)

headerNumber := header.Number.Uint64()
// If header is canonical, then no reorgs are required
canonicalHash, err := rawdb.ReadCanonicalHash(tx, headerNumber)
if err != nil {
log.Warn(fmt.Sprintf("[%s] Fork choice err (reading canonical hash of %d)", s.LogPrefix(), headerNumber), "err", err)
cfg.hd.BeaconRequestList.Remove(requestId)
return nil, err
}

if headerHash == canonicalHash {
log.Info(fmt.Sprintf("[%s] Fork choice on previously known block", s.LogPrefix()))
cfg.hd.BeaconRequestList.Remove(requestId)
// Per the Engine API spec:
// Client software MAY skip an update of the forkchoice state and MUST NOT begin a payload build process
// if forkchoiceState.headBlockHash references an ancestor of the head of canonical chain.
// In the case of such an event, client software MUST return
// {payloadStatus: {status: VALID, latestValidHash: forkchoiceState.headBlockHash, validationError: null}, payloadId: null}.
return &privateapi.PayloadStatus{
Status: remote.EngineStatus_VALID,
LatestValidHash: headerHash,
}, nil
}

if cfg.memoryOverlay && headerHash == cfg.forkValidator.ExtendingForkHeadHash() {
log.Info("Flushing in-memory state")
Expand All @@ -350,7 +319,7 @@ func startHandlingForkChoice(
cfg.hd.SetPendingPayloadHash(headerHash)
return nil, nil
} else {
return &privateapi.PayloadStatus{
return &engineapi.PayloadStatus{
CriticalError: &privateapi.InvalidForkchoiceStateErr,
}, nil
}
Expand All @@ -369,7 +338,7 @@ func startHandlingForkChoice(
// TODO(yperbasis): what if some bodies are missing and we have to download them?
cfg.hd.SetPendingPayloadHash(headerHash)
} else {
cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}
cfg.hd.PayloadStatusCh <- engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}
}
}

Expand Down Expand Up @@ -418,7 +387,7 @@ func finishHandlingForkChoice(

if !canonical {
if cfg.hd.GetPendingPayloadHash() != (common.Hash{}) {
cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{
cfg.hd.PayloadStatusCh <- engineapi.PayloadStatus{
CriticalError: &privateapi.InvalidForkchoiceStateErr,
}
}
Expand All @@ -438,48 +407,14 @@ func handleNewPayload(
tx kv.RwTx,
cfg HeadersCfg,
headerInserter *headerdownload.HeaderInserter,
) (*privateapi.PayloadStatus, error) {
) (*engineapi.PayloadStatus, error) {
header := block.Header()
headerNumber := header.Number.Uint64()
headerHash := block.Hash()

log.Debug(fmt.Sprintf("[%s] Handling new payload", s.LogPrefix()), "height", headerNumber, "hash", headerHash)
cfg.hd.UpdateTopSeenHeightPoS(headerNumber)

existingCanonicalHash, err := rawdb.ReadCanonicalHash(tx, headerNumber)
if err != nil {
log.Warn(fmt.Sprintf("[%s] New payload err", s.LogPrefix()), "err", err)
cfg.hd.BeaconRequestList.Remove(requestId)
return nil, err
}

if existingCanonicalHash != (common.Hash{}) && headerHash == existingCanonicalHash {
log.Info(fmt.Sprintf("[%s] New payload: previously received valid header %d", s.LogPrefix(), headerNumber))
cfg.hd.BeaconRequestList.Remove(requestId)
return &privateapi.PayloadStatus{
Status: remote.EngineStatus_VALID,
LatestValidHash: headerHash,
}, nil
}

bad, lastValidHash := cfg.hd.IsBadHeaderPoS(headerHash)
if bad {
log.Warn(fmt.Sprintf("[%s] Previously known bad block", s.LogPrefix()), "height", headerNumber, "hash", headerHash)
} else {
bad, lastValidHash = cfg.hd.IsBadHeaderPoS(header.ParentHash)
if bad {
log.Warn(fmt.Sprintf("[%s] Previously known bad parent", s.LogPrefix()), "height", headerNumber, "hash", headerHash, "parentHash", header.ParentHash)
}
}
if bad {
cfg.hd.BeaconRequestList.Remove(requestId)
cfg.hd.ReportBadHeaderPoS(headerHash, lastValidHash)
return &privateapi.PayloadStatus{
Status: remote.EngineStatus_INVALID,
LatestValidHash: lastValidHash,
}, nil
}

parent, err := cfg.blockReader.HeaderByHash(ctx, tx, header.ParentHash)
if err != nil {
return nil, err
Expand All @@ -488,18 +423,7 @@ func handleNewPayload(
log.Info(fmt.Sprintf("[%s] New payload missing parent", s.LogPrefix()))
cfg.hd.SetPoSDownloaderTip(headerHash)
schedulePoSDownload(requestId, header.ParentHash, headerNumber-1, s, cfg)
return &privateapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil
}

if headerNumber != parent.Number.Uint64()+1 {
log.Warn(fmt.Sprintf("[%s] Invalid block number", s.LogPrefix()), "headerNumber", headerNumber, "parentNumber", parent.Number.Uint64())
cfg.hd.BeaconRequestList.Remove(requestId)
cfg.hd.ReportBadHeaderPoS(headerHash, header.ParentHash)
return &privateapi.PayloadStatus{
Status: remote.EngineStatus_INVALID,
LatestValidHash: header.ParentHash,
ValidationError: errors.New("invalid block number"),
}, nil
return &engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}, nil
}

cfg.hd.BeaconRequestList.Remove(requestId)
Expand All @@ -526,15 +450,15 @@ func verifyAndSaveNewPoSHeader(
cfg HeadersCfg,
block *types.Block,
headerInserter *headerdownload.HeaderInserter,
) (response *privateapi.PayloadStatus, success bool, err error) {
) (response *engineapi.PayloadStatus, success bool, err error) {
header := block.Header()
headerNumber := header.Number.Uint64()
headerHash := block.Hash()

if verificationErr := cfg.hd.VerifyHeader(header); verificationErr != nil {
log.Warn("Verification failed for header", "hash", headerHash, "height", headerNumber, "err", verificationErr)
cfg.hd.ReportBadHeaderPoS(headerHash, header.ParentHash)
return &privateapi.PayloadStatus{
return &engineapi.PayloadStatus{
Status: remote.EngineStatus_INVALID,
LatestValidHash: header.ParentHash,
ValidationError: verificationErr,
Expand Down Expand Up @@ -565,7 +489,7 @@ func verifyAndSaveNewPoSHeader(
} else if err := headerInserter.FeedHeaderPoS(tx, header, headerHash); err != nil {
return nil, false, err
}
return &privateapi.PayloadStatus{
return &engineapi.PayloadStatus{
Status: status,
LatestValidHash: latestValidHash,
ValidationError: validationError,
Expand All @@ -578,7 +502,7 @@ func verifyAndSaveNewPoSHeader(

if !canExtendCanonical {
log.Info("Side chain", "parentHash", header.ParentHash, "currentHead", currentHeadHash)
return &privateapi.PayloadStatus{Status: remote.EngineStatus_ACCEPTED}, true, nil
return &engineapi.PayloadStatus{Status: remote.EngineStatus_ACCEPTED}, true, nil
}

// OK, we're on the canonical chain
Expand Down Expand Up @@ -725,7 +649,7 @@ func forkingPoint(
func handleInterrupt(interrupt engineapi.Interrupt, cfg HeadersCfg, tx kv.RwTx, headerInserter *headerdownload.HeaderInserter, useExternalTx bool) (bool, error) {
if interrupt != engineapi.None {
if interrupt == engineapi.Stopping {
cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: errors.New("server is stopping")}
cfg.hd.PayloadStatusCh <- engineapi.PayloadStatus{CriticalError: errors.New("server is stopping")}
}
if interrupt == engineapi.Synced {
verifyAndSaveDownloadedPoSHeaders(tx, cfg, headerInserter)
Expand Down
33 changes: 15 additions & 18 deletions ethdb/privateapi/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -89,11 +90,10 @@ func TestMockDownloadRequest(t *testing.T) {
require := require.New(t)

makeTestDb(ctx, db)
beaconRequestList := engineapi.NewRequestList()
statusCh := make(chan PayloadStatus)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)

events := NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, false)
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, nil, hd, false)

var err error
var reply *remote.EnginePayloadStatus
Expand All @@ -104,8 +104,8 @@ func TestMockDownloadRequest(t *testing.T) {
done <- true
}()

beaconRequestList.WaitForRequest(true)
statusCh <- PayloadStatus{Status: remote.EngineStatus_SYNCING}
hd.BeaconRequestList.WaitForRequest(true)
hd.PayloadStatusCh <- engineapi.PayloadStatus{Status: remote.EngineStatus_SYNCING}
<-done
require.NoError(err)
require.Equal(reply.Status, remote.EngineStatus_SYNCING)
Expand Down Expand Up @@ -148,11 +148,10 @@ func TestMockValidExecution(t *testing.T) {

makeTestDb(ctx, db)

beaconRequestList := engineapi.NewRequestList()
statusCh := make(chan PayloadStatus)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)

events := NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, false)
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, nil, hd, false)

var err error
var reply *remote.EnginePayloadStatus
Expand All @@ -163,9 +162,9 @@ func TestMockValidExecution(t *testing.T) {
done <- true
}()

beaconRequestList.WaitForRequest(true)
hd.BeaconRequestList.WaitForRequest(true)

statusCh <- PayloadStatus{
hd.PayloadStatusCh <- engineapi.PayloadStatus{
Status: remote.EngineStatus_VALID,
LatestValidHash: payload3Hash,
}
Expand All @@ -184,11 +183,10 @@ func TestMockInvalidExecution(t *testing.T) {

makeTestDb(ctx, db)

beaconRequestList := engineapi.NewRequestList()
statusCh := make(chan PayloadStatus)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)

events := NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, false)
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{TerminalTotalDifficulty: common.Big1}, nil, hd, false)

var err error
var reply *remote.EnginePayloadStatus
Expand All @@ -199,9 +197,9 @@ func TestMockInvalidExecution(t *testing.T) {
done <- true
}()

beaconRequestList.WaitForRequest(true)
hd.BeaconRequestList.WaitForRequest(true)
// Simulate invalid status
statusCh <- PayloadStatus{
hd.PayloadStatusCh <- engineapi.PayloadStatus{
Status: remote.EngineStatus_INVALID,
LatestValidHash: startingHeadHash,
}
Expand All @@ -220,11 +218,10 @@ func TestNoTTD(t *testing.T) {

makeTestDb(ctx, db)

beaconRequestList := engineapi.NewRequestList()
statusCh := make(chan PayloadStatus)
hd := headerdownload.NewHeaderDownload(0, 0, nil, nil)

events := NewEvents()
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{}, beaconRequestList, statusCh, nil, false)
backend := NewEthBackendServer(ctx, nil, db, events, nil, &params.ChainConfig{}, nil, hd, false)

var err error

Expand Down
Loading