Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized payload cleanup #4772

Merged
merged 1 commit into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func startHandlingForkChoice(
headerInserter *headerdownload.HeaderInserter,
) (*privateapi.PayloadStatus, error) {
if cfg.memoryOverlay {
defer cfg.forkValidator.ClearWithUnwind(tx)
defer cfg.forkValidator.ClearWithUnwind(tx, cfg.notifications.Accumulator, cfg.notifications.StateChangesConsumer)
}
headerHash := forkChoice.HeadBlockHash
log.Debug(fmt.Sprintf("[%s] Handling fork choice", s.LogPrefix()), "headerHash", headerHash)
Expand Down
96 changes: 93 additions & 3 deletions turbo/engineapi/fork_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,22 @@ package engineapi

import (
"bytes"
"context"
"encoding/binary"
"fmt"

"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/changeset"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/types/accounts"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/log/v3"
)

Expand Down Expand Up @@ -81,6 +88,89 @@ func (fv *ForkValidator) ExtendingForkHeadHash() common.Hash {
return fv.extendingForkHeadHash
}

func (fv *ForkValidator) rewindAccumulator(to uint64, accumulator *shards.Accumulator, c shards.StateChangeConsumer) error {
hash, err := rawdb.ReadCanonicalHash(fv.extendingFork, to)
if err != nil {
return fmt.Errorf("read canonical hash of unwind point: %w", err)
}
header := rawdb.ReadHeader(fv.extendingFork, hash, to)
if header == nil {
return fmt.Errorf("could not find header for block: %d", to)
}

txs, err := rawdb.RawTransactionsRange(fv.extendingFork, to, to+1)
if err != nil {
return err
}
// Start the changes
accumulator.StartChange(to, hash, txs, true)
accChangesCursor, err := fv.extendingFork.CursorDupSort(kv.AccountChangeSet)
if err != nil {
return err
}
defer accChangesCursor.Close()

storageChangesCursor, err := fv.extendingFork.CursorDupSort(kv.StorageChangeSet)
if err != nil {
return err
}
defer storageChangesCursor.Close()

startingKey := dbutils.EncodeBlockNumber(to)
// Unwind notifications on accounts
for k, v, err := accChangesCursor.Seek(startingKey); k != nil; k, v, err = accChangesCursor.Next() {
if err != nil {
return err
}
_, dbKey, dbValue, err := changeset.FromDBFormat(k, v)
if err != nil {
return err
}
if len(dbValue) > 0 {
var acc accounts.Account
if err := acc.DecodeForStorage(dbValue); err != nil {
return err
}
// Fetch the code hash
var address common.Address
copy(address[:], dbKey)
if acc.Incarnation > 0 && acc.IsEmptyCodeHash() {
if codeHash, err2 := fv.extendingFork.GetOne(kv.PlainContractCode, dbutils.PlainGenerateStoragePrefix(address[:], acc.Incarnation)); err2 == nil {
copy(acc.CodeHash[:], codeHash)
}
}

newV := make([]byte, acc.EncodingLengthForStorage())
acc.EncodeForStorage(newV)
accumulator.ChangeAccount(address, acc.Incarnation, newV)
} else {
var address common.Address
copy(address[:], dbKey)
accumulator.DeleteAccount(address)
}
}
// Unwind notifications on storage
for k, v, err := storageChangesCursor.Seek(startingKey); k != nil; k, v, err = accChangesCursor.Next() {
if err != nil {
return err
}
_, dbKey, dbValue, err := changeset.FromDBFormat(k, v)
if err != nil {
return err
}
var address common.Address
var incarnation uint64
var location common.Hash
copy(address[:], dbKey[:length.Addr])
incarnation = binary.BigEndian.Uint64(dbKey[length.Addr:])
copy(location[:], dbKey[length.Addr+length.Incarnation:])
accumulator.ChangeStorage(address, incarnation, location, common.CopyBytes(dbValue))
}
accumulator.SendAndReset(context.Background(), c, header.BaseFee.Uint64(), header.GasLimit)
log.Info("Transaction pool notified of discard side fork.")
return nil
}

// NotifyCurrentHeight is to be called at the end of the stage cycle and repressent the last processed block.
func (fv *ForkValidator) NotifyCurrentHeight(currentHeight uint64) {
fv.currentHeight = currentHeight
Expand Down Expand Up @@ -191,14 +281,14 @@ func (fv *ForkValidator) Clear() {
}

// Clear wipes out current extending fork data and notify txpool.
func (fv *ForkValidator) ClearWithUnwind(tx kv.RwTx) {
func (fv *ForkValidator) ClearWithUnwind(tx kv.RwTx, accumulator *shards.Accumulator, c shards.StateChangeConsumer) {
sb, ok := fv.sideForksBlock[fv.extendingForkHeadHash]
// If we did not flush the fork state, then we need to notify the txpool through unwind.
if fv.extendingFork != nil && fv.extendingForkHeadHash != (common.Hash{}) && ok {
fv.extendingFork.UpdateTxn(tx)
// this will call unwind of extending fork to notify txpool of reverting transactions.
if err := fv.validatePayload(fv.extendingFork, nil, nil, sb.header.Number.Uint64()-1, nil, nil); err != nil {
log.Warn("Could not clean payload", "err", err)
if err := fv.rewindAccumulator(sb.header.Number.Uint64()-1, accumulator, c); err != nil {
log.Warn("could not notify txpool of invalid side fork", "err", err)
}
fv.extendingFork.Rollback()
}
Expand Down