Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
Added PoS download validation when applicable (ledgerwatch#4728)
Browse files Browse the repository at this point in the history
* added incomplete version of PoS download validation

* fixed stuff
  • Loading branch information
Giulio2002 committed Jul 17, 2022
1 parent e044014 commit 8a754cd
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 23 deletions.
16 changes: 8 additions & 8 deletions eth/stagedsync/default_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,23 +242,23 @@ func StateStages(ctx context.Context, headers HeadersCfg, bodies BodiesCfg, bloc
},
},
{
ID: stages.BlockHashes,
Description: "Write block hashes",
ID: stages.Bodies,
Description: "Download block bodies",
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
return SpawnBlockHashStage(s, tx, blockHashCfg, ctx)
return BodiesForward(s, u, ctx, tx, bodies, false, false)
},
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
return UnwindBlockHashStage(u, tx, blockHashCfg, ctx)
return UnwindBodiesStage(u, tx, bodies, ctx)
},
},
{
ID: stages.Bodies,
Description: "Download block bodies",
ID: stages.BlockHashes,
Description: "Write block hashes",
Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error {
return nil
return SpawnBlockHashStage(s, tx, blockHashCfg, ctx)
},
Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error {
return UnwindBodiesStage(u, tx, bodies, ctx)
return UnwindBlockHashStage(u, tx, blockHashCfg, ctx)
},
},
{
Expand Down
25 changes: 22 additions & 3 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,16 +666,32 @@ func schedulePoSDownload(

func verifyAndSaveDownloadedPoSHeaders(tx kv.RwTx, cfg HeadersCfg, headerInserter *headerdownload.HeaderInserter) {
var lastValidHash common.Hash

var badChainError error
headerLoadFunc := func(key, value []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error {
var h types.Header
if err := rlp.DecodeBytes(value, &h); err != nil {
return err
}
if badChainError != nil {
cfg.hd.ReportBadHeaderPoS(h.Hash(), lastValidHash)
return nil
}
lastValidHash = h.ParentHash
if err := cfg.hd.VerifyHeader(&h); err != nil {
log.Warn("Verification failed for header", "hash", h.Hash(), "height", h.Number.Uint64(), "err", err)
return err
badChainError = err
cfg.hd.ReportBadHeaderPoS(h.Hash(), lastValidHash)
return nil
}
// Validate state if possible (bodies will be retrieved through body download)
_, _, validationError, criticalError := cfg.forkValidator.ValidatePayload(tx, &h, nil, false)
if criticalError != nil {
return criticalError
}
if validationError != nil {
badChainError = validationError
cfg.hd.ReportBadHeaderPoS(h.Hash(), lastValidHash)
return nil
}
return headerInserter.FeedHeaderPoS(tx, &h, h.Hash())
}
Expand All @@ -686,7 +702,10 @@ func verifyAndSaveDownloadedPoSHeaders(tx kv.RwTx, cfg HeadersCfg, headerInserte
},
})

if err != nil {
if err != nil || badChainError != nil {
if err == nil {
err = badChainError
}
log.Warn("Removing beacon request due to", "err", err, "requestId", cfg.hd.RequestId())
cfg.hd.BeaconRequestList.Remove(cfg.hd.RequestId())
cfg.hd.ReportBadHeaderPoS(cfg.hd.PoSDownloaderTip(), lastValidHash)
Expand Down
20 changes: 20 additions & 0 deletions eth/stagedsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,26 @@ func (s *Sync) StageState(stage stages.SyncStage, tx kv.Tx, db kv.RoDB) (*StageS
return &StageState{s, stage, blockNum}, nil
}

func (s *Sync) RunUnwind(db kv.RwDB, tx kv.RwTx) error {
if s.unwindPoint == nil {
return nil
}
for j := 0; j < len(s.unwindOrder); j++ {
if s.unwindOrder[j] == nil || s.unwindOrder[j].Disabled || s.unwindOrder[j].Unwind == nil {
continue
}
if err := s.unwindStage(false, s.unwindOrder[j], db, tx); err != nil {
return err
}
}
s.prevUnwindPoint = s.unwindPoint
s.unwindPoint = nil
s.badBlock = common.Hash{}
if err := s.SetCurrentStage(s.stages[0].ID); err != nil {
return err
}
return nil
}
func (s *Sync) Run(db kv.RwDB, tx kv.RwTx, firstCycle bool) error {
s.prevUnwindPoint = nil
s.timings = s.timings[:0]
Expand Down
27 changes: 25 additions & 2 deletions turbo/engineapi/fork_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
package engineapi

import (
"bytes"

"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/log/v3"
)

Expand Down Expand Up @@ -177,7 +180,6 @@ func (fv *ForkValidator) Clear(tx kv.RwTx) {
}
fv.extendingFork.Rollback()
}
// Clean all data relative to txpool
fv.extendingForkHeadHash = common.Hash{}
fv.extendingFork = nil
}
Expand All @@ -191,8 +193,29 @@ func (fv *ForkValidator) validateAndStorePayload(tx kv.RwTx, header *types.Heade
status = remote.EngineStatus_INVALID
return
}
// If we do not have the body we can recover it from the batch.
if body == nil {
var bodyWithTxs *types.Body
bodyWithTxs, criticalError = rawdb.ReadBodyWithTransactions(tx, header.Hash(), header.Number.Uint64())
if criticalError != nil {
return
}
var encodedTxs [][]byte
buf := bytes.NewBuffer(nil)
for _, tx := range bodyWithTxs.Transactions {
buf.Reset()
if criticalError = rlp.Encode(buf, tx); criticalError != nil {
return
}
encodedTxs = append(encodedTxs, common.CopyBytes(buf.Bytes()))
}
fv.sideForksBlock[header.Hash()] = forkSegment{header, &types.RawBody{
Transactions: encodedTxs,
}}
} else {
fv.sideForksBlock[header.Hash()] = forkSegment{header, body}
}
status = remote.EngineStatus_VALID
fv.sideForksBlock[header.Hash()] = forkSegment{header, body}
return
}

Expand Down
23 changes: 13 additions & 10 deletions turbo/stages/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func StateStep(ctx context.Context, batch kv.RwTx, stateSync *stagedsync.Sync, h
if unwindPoint > 0 {
// Run it through the unwind
stateSync.UnwindTo(unwindPoint, common.Hash{})
if err = stateSync.Run(nil, batch, false); err != nil {
if err = stateSync.RunUnwind(nil, batch); err != nil {
return err
}
// Once we unwond we can start constructing the chain (assumption: len(headersChain) == len(bodiesChain))
Expand All @@ -282,17 +282,13 @@ func StateStep(ctx context.Context, batch kv.RwTx, stateSync *stagedsync.Sync, h
}
}
// If we did not specify header or body we stop here
if header == nil || body == nil {
if header == nil {
return nil
}
// Setup
height := header.Number.Uint64()
hash := header.Hash()
// Prepare memory state for block execution
if err = rawdb.WriteRawBodyIfNotExists(batch, hash, height, body); err != nil {
return err
}

rawdb.WriteHeader(batch, header)
if err = rawdb.WriteHeaderNumber(batch, hash, height); err != nil {
return err
Expand All @@ -309,11 +305,18 @@ func StateStep(ctx context.Context, batch kv.RwTx, stateSync *stagedsync.Sync, h
if err = stages.SaveStageProgress(batch, stages.Headers, height); err != nil {
return err
}

if err = stages.SaveStageProgress(batch, stages.Bodies, height); err != nil {
return err
if body != nil {
if err = stages.SaveStageProgress(batch, stages.Bodies, height); err != nil {
return err
}
if err = rawdb.WriteRawBodyIfNotExists(batch, hash, height, body); err != nil {
return err
}
} else {
if err = stages.SaveStageProgress(batch, stages.Bodies, height-1); err != nil {
return err
}
}

// Run state sync
if err = stateSync.Run(nil, batch, false); err != nil {
return err
Expand Down

0 comments on commit 8a754cd

Please sign in to comment.