Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[erigon2.2] State reconstitution prototype #4508

Merged
merged 32 commits into from
Jul 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
368 changes: 9 additions & 359 deletions cmd/hack/hack.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion cmd/rpcdaemon22/commands/eth_receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math/big"
"sort"
"time"

"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/kv"
Expand Down Expand Up @@ -69,6 +70,7 @@ func (api *BaseAPI) getReceipts(ctx context.Context, tx kv.Tx, chainConfig *para

// GetLogs implements eth_getLogs. Returns an array of logs matching a given filter object.
func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([]*types.Log, error) {
start := time.Now()
var begin, end uint64
logs := []*types.Log{}

Expand Down Expand Up @@ -215,7 +217,8 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([
}
logs = append(logs, filtered...)
}

stats := api._agg.GetAndResetStats()
log.Info("Finished", "duration", time.Since(start), "history queries", stats.HistoryQueries, "ef search duration", stats.EfSearchTime)
return logs, nil
}

Expand Down
211 changes: 116 additions & 95 deletions cmd/state/commands/erigon22.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,12 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log
}
defer historyTx.Rollback()
stateDbPath := path.Join(datadir, "statedb")
if block == 0 {
if _, err = os.Stat(stateDbPath); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}
} else if err = os.RemoveAll(stateDbPath); err != nil {
if _, err = os.Stat(stateDbPath); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}
} else if err = os.RemoveAll(stateDbPath); err != nil {
return err
}
db, err2 := kv2.NewMDBX(logger).Path(stateDbPath).WriteMap().Open()
if err2 != nil {
Expand All @@ -96,18 +94,6 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log
defer db.Close()

aggPath := filepath.Join(datadir, "erigon22")
if block == 0 {
if _, err = os.Stat(aggPath); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}
} else if err = os.RemoveAll(aggPath); err != nil {
return err
}
if err = os.Mkdir(aggPath, os.ModePerm); err != nil {
return err
}
}

var rwTx kv.RwTx
defer func() {
Expand All @@ -124,9 +110,11 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log
return fmt.Errorf("create aggregator: %w", err3)
}
defer agg.Close()
startTxNum := agg.EndTxNumMinimax()
fmt.Printf("Max txNum in files: %d\n", startTxNum)

interrupt := false
if block == 0 {
if startTxNum == 0 {
_, genesisIbs, err := genesis.ToBlock()
if err != nil {
return err
Expand Down Expand Up @@ -176,6 +164,16 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
engine := initConsensusEngine(chainConfig, logger, allSnapshots)

getHeader := func(hash common.Hash, number uint64) *types.Header {
h, err := blockReader.Header(ctx, historyTx, hash, number)
if err != nil {
panic(err)
}
return h
}
readWrapper := &ReaderWrapper22{r: agg, roTx: rwTx}
writeWrapper := &WriterWrapper22{w: agg}

for !interrupt {
blockNum++
trace = traceBlock > 0 && blockNum == uint64(traceBlock)
Expand All @@ -192,40 +190,12 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log
log.Info("history: block is nil", "block", blockNum)
break
}
if blockNum <= block {
// Skip that block, but increase txNum
txNum += uint64(len(b.Transactions())) + 2 // Pre and Post block transaction
continue
}
agg.SetTx(rwTx)
agg.SetTxNum(txNum)

readWrapper := &ReaderWrapper22{r: agg, roTx: rwTx, blockNum: blockNum}
writeWrapper := &WriterWrapper22{w: agg, blockNum: blockNum}
getHeader := func(hash common.Hash, number uint64) *types.Header {
h, err := blockReader.Header(ctx, historyTx, hash, number)
if err != nil {
panic(err)
}
return h
}

txNum++ // Pre-block transaction
agg.SetTxNum(txNum)

if txNum, _, err = processBlock22(trace, txNum, readWrapper, writeWrapper, chainConfig, engine, getHeader, b, vmConfig); err != nil {
if txNum, _, err = processBlock22(startTxNum, trace, txNum, readWrapper, writeWrapper, chainConfig, engine, getHeader, b, vmConfig); err != nil {
return fmt.Errorf("processing block %d: %w", blockNum, err)
}
agg.SetTxNum(txNum)
if err := agg.FinishTx(); err != nil {
return fmt.Errorf("failed to finish tx: %w", err)
}
if trace {
fmt.Printf("FinishTx called for %d block %d\n", txNum, blockNum)
}

txNum++ // Post-block transaction
agg.SetTxNum(txNum)

// Check for interrupts
select {
Expand Down Expand Up @@ -255,6 +225,7 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log
}
}
agg.SetTx(rwTx)
readWrapper.roTx = rwTx
}
}

Expand Down Expand Up @@ -303,7 +274,7 @@ func (s *stat22) delta(aStats libstate.FilesStats, blockNum uint64) *stat22 {
return s
}

func processBlock22(trace bool, txNumStart uint64, rw *ReaderWrapper22, ww *WriterWrapper22, chainConfig *params.ChainConfig,
func processBlock22(startTxNum uint64, trace bool, txNumStart uint64, rw *ReaderWrapper22, ww *WriterWrapper22, chainConfig *params.ChainConfig,
engine consensus.Engine, getHeader func(hash common.Hash, number uint64) *types.Header, block *types.Block, vmConfig vm.Config,
) (uint64, types.Receipts, error) {
defer blockExecutionTimer.UpdateDuration(time.Now())
Expand All @@ -313,82 +284,119 @@ func processBlock22(trace bool, txNumStart uint64, rw *ReaderWrapper22, ww *Writ
gp := new(core.GasPool).AddGas(block.GasLimit())
usedGas := new(uint64)
var receipts types.Receipts
daoBlock := chainConfig.DAOForkSupport && chainConfig.DAOForkBlock != nil && chainConfig.DAOForkBlock.Cmp(block.Number()) == 0
rules := chainConfig.Rules(block.NumberU64())
txNum := txNumStart
ww.w.SetTxNum(txNum)
rw.blockNum = block.NumberU64()
ww.blockNum = block.NumberU64()

for i, tx := range block.Transactions() {
daoFork := txNum >= startTxNum && chainConfig.DAOForkSupport && chainConfig.DAOForkBlock != nil && chainConfig.DAOForkBlock.Cmp(block.Number()) == 0
if daoFork {
ibs := state.New(rw)
if daoBlock {
misc.ApplyDAOHardFork(ibs)
daoBlock = false
// TODO Actually add tracing to the DAO related accounts
misc.ApplyDAOHardFork(ibs)
if err := ibs.FinalizeTx(rules, ww); err != nil {
return 0, nil, err
}
ibs.Prepare(tx.Hash(), block.Hash(), i)
ct := NewCallTracer()
vmConfig.Tracer = ct
receipt, _, err := core.ApplyTransaction(chainConfig, core.GetHashFn(header, getHeader), engine, nil, gp, ibs, ww, header, tx, usedGas, vmConfig, nil)
if err != nil {
return 0, nil, fmt.Errorf("could not apply tx %d [%x] failed: %w", i, tx.Hash(), err)
if err := ww.w.FinishTx(); err != nil {
return 0, nil, fmt.Errorf("finish daoFork failed: %w", err)
}
for from := range ct.froms {
if err := ww.w.AddTraceFrom(from[:]); err != nil {
return 0, nil, err
}

txNum++ // Pre-block transaction
ww.w.SetTxNum(txNum)

for i, tx := range block.Transactions() {
if txNum >= startTxNum {
ibs := state.New(rw)
ibs.Prepare(tx.Hash(), block.Hash(), i)
ct := NewCallTracer()
vmConfig.Tracer = ct
receipt, _, err := core.ApplyTransaction(chainConfig, core.GetHashFn(header, getHeader), engine, nil, gp, ibs, ww, header, tx, usedGas, vmConfig, nil)
if err != nil {
return 0, nil, fmt.Errorf("could not apply tx %d [%x] failed: %w", i, tx.Hash(), err)
}
}
for to := range ct.tos {
if err := ww.w.AddTraceTo(to[:]); err != nil {
return 0, nil, err
for from := range ct.froms {
if err := ww.w.AddTraceFrom(from[:]); err != nil {
return 0, nil, err
}
}
}
receipts = append(receipts, receipt)
for _, log := range receipt.Logs {
if err = ww.w.AddLogAddr(log.Address[:]); err != nil {
return 0, nil, fmt.Errorf("adding event log for addr %x: %w", log.Address, err)
for to := range ct.tos {
if err := ww.w.AddTraceTo(to[:]); err != nil {
return 0, nil, err
}
}
for _, topic := range log.Topics {
if err = ww.w.AddLogTopic(topic[:]); err != nil {
return 0, nil, fmt.Errorf("adding event log for topic %x: %w", topic, err)
receipts = append(receipts, receipt)
for _, log := range receipt.Logs {
if err = ww.w.AddLogAddr(log.Address[:]); err != nil {
return 0, nil, fmt.Errorf("adding event log for addr %x: %w", log.Address, err)
}
for _, topic := range log.Topics {
if err = ww.w.AddLogTopic(topic[:]); err != nil {
return 0, nil, fmt.Errorf("adding event log for topic %x: %w", topic, err)
}
}
}
}
if err = ww.w.FinishTx(); err != nil {
return 0, nil, fmt.Errorf("finish tx %d [%x] failed: %w", i, tx.Hash(), err)
}
if trace {
fmt.Printf("FinishTx called for blockNum=%d, txIndex=%d, txNum=%d txHash=[%x]\n", block.NumberU64(), i, txNum, tx.Hash())
if err = ww.w.FinishTx(); err != nil {
return 0, nil, fmt.Errorf("finish tx %d [%x] failed: %w", i, tx.Hash(), err)
}
if trace {
fmt.Printf("FinishTx called for blockNum=%d, txIndex=%d, txNum=%d txHash=[%x]\n", block.NumberU64(), i, txNum, tx.Hash())
}
}
txNum++
ww.w.SetTxNum(txNum)
}

ibs := state.New(rw)
if err := ww.w.AddTraceTo(block.Coinbase().Bytes()); err != nil {
return 0, nil, fmt.Errorf("adding coinbase trace: %w", err)
}
for _, uncle := range block.Uncles() {
if err := ww.w.AddTraceTo(uncle.Coinbase.Bytes()); err != nil {
return 0, nil, fmt.Errorf("adding uncle trace: %w", err)
if txNum >= startTxNum && chainConfig.IsByzantium(block.NumberU64()) {
receiptSha := types.DeriveSha(receipts)
if receiptSha != block.ReceiptHash() {
fmt.Printf("mismatched receipt headers for block %d\n", block.NumberU64())
for j, receipt := range receipts {
fmt.Printf("tx %d, used gas: %d\n", j, receipt.GasUsed)
}
}
}

// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
if _, _, _, err := engine.FinalizeAndAssemble(chainConfig, header, ibs, block.Transactions(), block.Uncles(), receipts, nil, nil, nil, nil); err != nil {
return 0, nil, fmt.Errorf("finalize of block %d failed: %w", block.NumberU64(), err)
}
if txNum >= startTxNum {
ibs := state.New(rw)
if err := ww.w.AddTraceTo(block.Coinbase().Bytes()); err != nil {
return 0, nil, fmt.Errorf("adding coinbase trace: %w", err)
}
for _, uncle := range block.Uncles() {
if err := ww.w.AddTraceTo(uncle.Coinbase.Bytes()); err != nil {
return 0, nil, fmt.Errorf("adding uncle trace: %w", err)
}
}

// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
if _, _, err := engine.Finalize(chainConfig, header, ibs, block.Transactions(), block.Uncles(), receipts, nil, nil, nil); err != nil {
return 0, nil, fmt.Errorf("finalize of block %d failed: %w", block.NumberU64(), err)
}

if err := ibs.CommitBlock(rules, ww); err != nil {
return 0, nil, fmt.Errorf("committing block %d failed: %w", block.NumberU64(), err)
}

if err := ibs.CommitBlock(rules, ww); err != nil {
return 0, nil, fmt.Errorf("committing block %d failed: %w", block.NumberU64(), err)
if err := ww.w.FinishTx(); err != nil {
return 0, nil, fmt.Errorf("failed to finish tx: %w", err)
}
if trace {
fmt.Printf("FinishTx called for %d block %d\n", txNum, block.NumberU64())
}
}

txNum++ // Post-block transaction
ww.w.SetTxNum(txNum)

return txNum, receipts, nil
}

// Implements StateReader and StateWriter
type ReaderWrapper22 struct {
blockNum uint64
roTx kv.Tx
r *libstate.Aggregator
blockNum uint64
}

type WriterWrapper22 struct {
Expand Down Expand Up @@ -431,6 +439,9 @@ func (rw *ReaderWrapper22) ReadAccountData(address common.Address) (*accounts.Ac
if incBytes > 0 {
a.Incarnation = bytesToUint64(enc[pos : pos+incBytes])
}
if rw.blockNum == 10264901 {
fmt.Printf("block %d ReadAccount [%x] => {Balance: %d, Nonce: %d}\n", rw.blockNum, address, &a.Balance, a.Nonce)
}
return &a, nil
}

Expand All @@ -439,9 +450,15 @@ func (rw *ReaderWrapper22) ReadAccountStorage(address common.Address, incarnatio
if err != nil {
return nil, err
}
if rw.blockNum == 10264901 {
fmt.Printf("block %d ReadStorage [%x] [%x] => [%x]\n", rw.blockNum, address, *key, enc)
}
if enc == nil {
return nil, nil
}
if len(enc) == 1 && enc[0] == 0 {
return nil, nil
}
return enc, nil
}

Expand Down Expand Up @@ -541,6 +558,10 @@ func (ww *WriterWrapper22) DeleteAccount(address common.Address, original *accou
}

func (ww *WriterWrapper22) WriteAccountStorage(address common.Address, incarnation uint64, key *common.Hash, original, value *uint256.Int) error {
trace := fmt.Sprintf("%x", address) == "000000000000006f6502b7f2bbac8c30a3f67e9a"
if trace {
fmt.Printf("block %d WriteAccountStorage [%x] [%x] => [%x]\n", ww.blockNum, address, *key, value.Bytes())
}
if err := ww.w.WriteAccountStorage(address.Bytes(), key.Bytes(), value.Bytes()); err != nil {
return err
}
Expand Down
20 changes: 14 additions & 6 deletions cmd/state/commands/history22.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ func History22(genesis *core.Genesis, logger log.Logger) error {
readWrapper.SetTrace(blockNum == uint64(traceBlock))
}
writeWrapper := state.NewNoopWriter()
txNum++ // Pre block transaction
getHeader := func(hash common.Hash, number uint64) *types.Header {
h, err := blockReader.Header(ctx, historyTx, hash, number)
if err != nil {
Expand Down Expand Up @@ -227,15 +226,24 @@ func runHistory22(trace bool, blockNum, txNumStart uint64, hw *state.HistoryRead
gp := new(core.GasPool).AddGas(block.GasLimit())
usedGas := new(uint64)
var receipts types.Receipts
daoBlock := chainConfig.DAOForkSupport && chainConfig.DAOForkBlock != nil && chainConfig.DAOForkBlock.Cmp(block.Number()) == 0
rules := chainConfig.Rules(block.NumberU64())
txNum := txNumStart
hw.SetTxNum(txNum)
daoFork := chainConfig.DAOForkSupport && chainConfig.DAOForkBlock != nil && chainConfig.DAOForkBlock.Cmp(block.Number()) == 0
if daoFork {
ibs := state.New(hw)
misc.ApplyDAOHardFork(ibs)
if err := ibs.FinalizeTx(rules, ww); err != nil {
return 0, nil, err
}
if err := hw.FinishTx(); err != nil {
return 0, nil, fmt.Errorf("finish dao fork failed: %w", err)
}
}
txNum++ // Pre block transaction
for i, tx := range block.Transactions() {
hw.SetTxNum(txNum)
ibs := state.New(hw)
if daoBlock {
misc.ApplyDAOHardFork(ibs)
daoBlock = false
}
ibs.Prepare(tx.Hash(), block.Hash(), i)
receipt, _, err := core.ApplyTransaction(chainConfig, core.GetHashFn(header, getHeader), engine, nil, gp, ibs, ww, header, tx, usedGas, vmConfig, nil)
if err != nil {
Expand Down
Loading