Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
better payload cleanup (ledgerwatch#4772)
Browse files Browse the repository at this point in the history
Co-authored-by: giuliorebuffo <giuliorebuffo@system76-pc.localdomain>
  • Loading branch information
Giulio2002 and giuliorebuffo committed Jul 20, 2022
1 parent 9e8f625 commit 73b028a
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 4 deletions.
2 changes: 1 addition & 1 deletion eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func startHandlingForkChoice(
headerInserter *headerdownload.HeaderInserter,
) (*privateapi.PayloadStatus, error) {
if cfg.memoryOverlay {
defer cfg.forkValidator.ClearWithUnwind(tx)
defer cfg.forkValidator.ClearWithUnwind(tx, cfg.notifications.Accumulator, cfg.notifications.StateChangesConsumer)
}
headerHash := forkChoice.HeadBlockHash
log.Debug(fmt.Sprintf("[%s] Handling fork choice", s.LogPrefix()), "headerHash", headerHash)
Expand Down
96 changes: 93 additions & 3 deletions turbo/engineapi/fork_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,22 @@ package engineapi

import (
"bytes"
"context"
"encoding/binary"
"fmt"

"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/changeset"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/types/accounts"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/log/v3"
)

Expand Down Expand Up @@ -81,6 +88,89 @@ func (fv *ForkValidator) ExtendingForkHeadHash() common.Hash {
return fv.extendingForkHeadHash
}

func (fv *ForkValidator) rewindAccumulator(to uint64, accumulator *shards.Accumulator, c shards.StateChangeConsumer) error {
hash, err := rawdb.ReadCanonicalHash(fv.extendingFork, to)
if err != nil {
return fmt.Errorf("read canonical hash of unwind point: %w", err)
}
header := rawdb.ReadHeader(fv.extendingFork, hash, to)
if header == nil {
return fmt.Errorf("could not find header for block: %d", to)
}

txs, err := rawdb.RawTransactionsRange(fv.extendingFork, to, to+1)
if err != nil {
return err
}
// Start the changes
accumulator.StartChange(to, hash, txs, true)
accChangesCursor, err := fv.extendingFork.CursorDupSort(kv.AccountChangeSet)
if err != nil {
return err
}
defer accChangesCursor.Close()

storageChangesCursor, err := fv.extendingFork.CursorDupSort(kv.StorageChangeSet)
if err != nil {
return err
}
defer storageChangesCursor.Close()

startingKey := dbutils.EncodeBlockNumber(to)
// Unwind notifications on accounts
for k, v, err := accChangesCursor.Seek(startingKey); k != nil; k, v, err = accChangesCursor.Next() {
if err != nil {
return err
}
_, dbKey, dbValue, err := changeset.FromDBFormat(k, v)
if err != nil {
return err
}
if len(dbValue) > 0 {
var acc accounts.Account
if err := acc.DecodeForStorage(dbValue); err != nil {
return err
}
// Fetch the code hash
var address common.Address
copy(address[:], dbKey)
if acc.Incarnation > 0 && acc.IsEmptyCodeHash() {
if codeHash, err2 := fv.extendingFork.GetOne(kv.PlainContractCode, dbutils.PlainGenerateStoragePrefix(address[:], acc.Incarnation)); err2 == nil {
copy(acc.CodeHash[:], codeHash)
}
}

newV := make([]byte, acc.EncodingLengthForStorage())
acc.EncodeForStorage(newV)
accumulator.ChangeAccount(address, acc.Incarnation, newV)
} else {
var address common.Address
copy(address[:], dbKey)
accumulator.DeleteAccount(address)
}
}
// Unwind notifications on storage
for k, v, err := storageChangesCursor.Seek(startingKey); k != nil; k, v, err = accChangesCursor.Next() {
if err != nil {
return err
}
_, dbKey, dbValue, err := changeset.FromDBFormat(k, v)
if err != nil {
return err
}
var address common.Address
var incarnation uint64
var location common.Hash
copy(address[:], dbKey[:length.Addr])
incarnation = binary.BigEndian.Uint64(dbKey[length.Addr:])
copy(location[:], dbKey[length.Addr+length.Incarnation:])
accumulator.ChangeStorage(address, incarnation, location, common.CopyBytes(dbValue))
}
accumulator.SendAndReset(context.Background(), c, header.BaseFee.Uint64(), header.GasLimit)
log.Info("Transaction pool notified of discard side fork.")
return nil
}

// NotifyCurrentHeight is to be called at the end of the stage cycle and repressent the last processed block.
func (fv *ForkValidator) NotifyCurrentHeight(currentHeight uint64) {
fv.currentHeight = currentHeight
Expand Down Expand Up @@ -191,14 +281,14 @@ func (fv *ForkValidator) Clear() {
}

// Clear wipes out current extending fork data and notify txpool.
func (fv *ForkValidator) ClearWithUnwind(tx kv.RwTx) {
func (fv *ForkValidator) ClearWithUnwind(tx kv.RwTx, accumulator *shards.Accumulator, c shards.StateChangeConsumer) {
sb, ok := fv.sideForksBlock[fv.extendingForkHeadHash]
// If we did not flush the fork state, then we need to notify the txpool through unwind.
if fv.extendingFork != nil && fv.extendingForkHeadHash != (common.Hash{}) && ok {
fv.extendingFork.UpdateTxn(tx)
// this will call unwind of extending fork to notify txpool of reverting transactions.
if err := fv.validatePayload(fv.extendingFork, nil, nil, sb.header.Number.Uint64()-1, nil, nil); err != nil {
log.Warn("Could not clean payload", "err", err)
if err := fv.rewindAccumulator(sb.header.Number.Uint64()-1, accumulator, c); err != nil {
log.Warn("could not notify txpool of invalid side fork", "err", err)
}
fv.extendingFork.Rollback()
}
Expand Down

0 comments on commit 73b028a

Please sign in to comment.