From 73b028a5fd553519c24c8a36ce2bd8c96467f889 Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Wed, 20 Jul 2022 18:16:42 +0200 Subject: [PATCH] better payload cleanup (#4772) Co-authored-by: giuliorebuffo --- eth/stagedsync/stage_headers.go | 2 +- turbo/engineapi/fork_validator.go | 96 ++++++++++++++++++++++++++++++- 2 files changed, 94 insertions(+), 4 deletions(-) diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index 4b3a5224348..19a44499c1f 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -259,7 +259,7 @@ func startHandlingForkChoice( headerInserter *headerdownload.HeaderInserter, ) (*privateapi.PayloadStatus, error) { if cfg.memoryOverlay { - defer cfg.forkValidator.ClearWithUnwind(tx) + defer cfg.forkValidator.ClearWithUnwind(tx, cfg.notifications.Accumulator, cfg.notifications.StateChangesConsumer) } headerHash := forkChoice.HeadBlockHash log.Debug(fmt.Sprintf("[%s] Handling fork choice", s.LogPrefix()), "headerHash", headerHash) diff --git a/turbo/engineapi/fork_validator.go b/turbo/engineapi/fork_validator.go index 7baaba9c052..a1b4f03bbb6 100644 --- a/turbo/engineapi/fork_validator.go +++ b/turbo/engineapi/fork_validator.go @@ -15,15 +15,22 @@ package engineapi import ( "bytes" + "context" + "encoding/binary" "fmt" + "github.com/ledgerwatch/erigon-lib/common/length" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/memdb" "github.com/ledgerwatch/erigon/common" + "github.com/ledgerwatch/erigon/common/changeset" + "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/core/types/accounts" "github.com/ledgerwatch/erigon/rlp" + "github.com/ledgerwatch/erigon/turbo/shards" "github.com/ledgerwatch/log/v3" ) @@ -81,6 +88,89 @@ func (fv *ForkValidator) ExtendingForkHeadHash() common.Hash { return fv.extendingForkHeadHash } +func (fv *ForkValidator) rewindAccumulator(to uint64, accumulator *shards.Accumulator, c shards.StateChangeConsumer) error { + hash, err := rawdb.ReadCanonicalHash(fv.extendingFork, to) + if err != nil { + return fmt.Errorf("read canonical hash of unwind point: %w", err) + } + header := rawdb.ReadHeader(fv.extendingFork, hash, to) + if header == nil { + return fmt.Errorf("could not find header for block: %d", to) + } + + txs, err := rawdb.RawTransactionsRange(fv.extendingFork, to, to+1) + if err != nil { + return err + } + // Start the changes + accumulator.StartChange(to, hash, txs, true) + accChangesCursor, err := fv.extendingFork.CursorDupSort(kv.AccountChangeSet) + if err != nil { + return err + } + defer accChangesCursor.Close() + + storageChangesCursor, err := fv.extendingFork.CursorDupSort(kv.StorageChangeSet) + if err != nil { + return err + } + defer storageChangesCursor.Close() + + startingKey := dbutils.EncodeBlockNumber(to) + // Unwind notifications on accounts + for k, v, err := accChangesCursor.Seek(startingKey); k != nil; k, v, err = accChangesCursor.Next() { + if err != nil { + return err + } + _, dbKey, dbValue, err := changeset.FromDBFormat(k, v) + if err != nil { + return err + } + if len(dbValue) > 0 { + var acc accounts.Account + if err := acc.DecodeForStorage(dbValue); err != nil { + return err + } + // Fetch the code hash + var address common.Address + copy(address[:], dbKey) + if acc.Incarnation > 0 && acc.IsEmptyCodeHash() { + if codeHash, err2 := fv.extendingFork.GetOne(kv.PlainContractCode, dbutils.PlainGenerateStoragePrefix(address[:], acc.Incarnation)); err2 == nil { + copy(acc.CodeHash[:], codeHash) + } + } + + newV := make([]byte, acc.EncodingLengthForStorage()) + acc.EncodeForStorage(newV) + accumulator.ChangeAccount(address, acc.Incarnation, newV) + } else { + var address common.Address + copy(address[:], dbKey) + accumulator.DeleteAccount(address) + } + } + // Unwind notifications on storage + for k, v, err := storageChangesCursor.Seek(startingKey); k != nil; k, v, err = accChangesCursor.Next() { + if err != nil { + return err + } + _, dbKey, dbValue, err := changeset.FromDBFormat(k, v) + if err != nil { + return err + } + var address common.Address + var incarnation uint64 + var location common.Hash + copy(address[:], dbKey[:length.Addr]) + incarnation = binary.BigEndian.Uint64(dbKey[length.Addr:]) + copy(location[:], dbKey[length.Addr+length.Incarnation:]) + accumulator.ChangeStorage(address, incarnation, location, common.CopyBytes(dbValue)) + } + accumulator.SendAndReset(context.Background(), c, header.BaseFee.Uint64(), header.GasLimit) + log.Info("Transaction pool notified of discard side fork.") + return nil +} + // NotifyCurrentHeight is to be called at the end of the stage cycle and repressent the last processed block. func (fv *ForkValidator) NotifyCurrentHeight(currentHeight uint64) { fv.currentHeight = currentHeight @@ -191,14 +281,14 @@ func (fv *ForkValidator) Clear() { } // Clear wipes out current extending fork data and notify txpool. -func (fv *ForkValidator) ClearWithUnwind(tx kv.RwTx) { +func (fv *ForkValidator) ClearWithUnwind(tx kv.RwTx, accumulator *shards.Accumulator, c shards.StateChangeConsumer) { sb, ok := fv.sideForksBlock[fv.extendingForkHeadHash] // If we did not flush the fork state, then we need to notify the txpool through unwind. if fv.extendingFork != nil && fv.extendingForkHeadHash != (common.Hash{}) && ok { fv.extendingFork.UpdateTxn(tx) // this will call unwind of extending fork to notify txpool of reverting transactions. - if err := fv.validatePayload(fv.extendingFork, nil, nil, sb.header.Number.Uint64()-1, nil, nil); err != nil { - log.Warn("Could not clean payload", "err", err) + if err := fv.rewindAccumulator(sb.header.Number.Uint64()-1, accumulator, c); err != nil { + log.Warn("could not notify txpool of invalid side fork", "err", err) } fv.extendingFork.Rollback() }