Skip to content

Commit

Permalink
[erigon2.2] State reconstitution prototype (#4508)
Browse files Browse the repository at this point in the history
* reconstitution

* Add history access without state function

* More on state reconstitution

* More on state recon

* More on state recon

* More

* More

* support dao fork

* More on state reconstitution

* Update to erigon-lib

* More

* Added genesis block and filling with history

* update

* Genesis works

* Start on parallel

* Preparation for parallel reconstitution, stats for EfSearch

* continue with parallel work

* Fix history reader

* Remove time measurements

* Fixes

* Fixes and UX improvements

* Fixes

* More tracing

* More fixes

* More fixes

* Fix code size

* Update to latest erigon-lib

* Fix for dao fork

* Remove hacks

* Update to erigon-lib, fix lint

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
  • Loading branch information
3 people committed Jul 2, 2022
1 parent 84c3cdc commit 8599dce
Show file tree
Hide file tree
Showing 13 changed files with 1,401 additions and 479 deletions.
368 changes: 9 additions & 359 deletions cmd/hack/hack.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion cmd/rpcdaemon22/commands/eth_receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math/big"
"sort"
"time"

"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/kv"
Expand Down Expand Up @@ -69,6 +70,7 @@ func (api *BaseAPI) getReceipts(ctx context.Context, tx kv.Tx, chainConfig *para

// GetLogs implements eth_getLogs. Returns an array of logs matching a given filter object.
func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([]*types.Log, error) {
start := time.Now()
var begin, end uint64
logs := []*types.Log{}

Expand Down Expand Up @@ -215,7 +217,8 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([
}
logs = append(logs, filtered...)
}

stats := api._agg.GetAndResetStats()
log.Info("Finished", "duration", time.Since(start), "history queries", stats.HistoryQueries, "ef search duration", stats.EfSearchTime)
return logs, nil
}

Expand Down
211 changes: 116 additions & 95 deletions cmd/state/commands/erigon22.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,12 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log
}
defer historyTx.Rollback()
stateDbPath := path.Join(datadir, "statedb")
if block == 0 {
if _, err = os.Stat(stateDbPath); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}
} else if err = os.RemoveAll(stateDbPath); err != nil {
if _, err = os.Stat(stateDbPath); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}
} else if err = os.RemoveAll(stateDbPath); err != nil {
return err
}
db, err2 := kv2.NewMDBX(logger).Path(stateDbPath).WriteMap().Open()
if err2 != nil {
Expand All @@ -96,18 +94,6 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log
defer db.Close()

aggPath := filepath.Join(datadir, "erigon22")
if block == 0 {
if _, err = os.Stat(aggPath); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}
} else if err = os.RemoveAll(aggPath); err != nil {
return err
}
if err = os.Mkdir(aggPath, os.ModePerm); err != nil {
return err
}
}

var rwTx kv.RwTx
defer func() {
Expand All @@ -124,9 +110,11 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log
return fmt.Errorf("create aggregator: %w", err3)
}
defer agg.Close()
startTxNum := agg.EndTxNumMinimax()
fmt.Printf("Max txNum in files: %d\n", startTxNum)

interrupt := false
if block == 0 {
if startTxNum == 0 {
_, genesisIbs, err := genesis.ToBlock()
if err != nil {
return err
Expand Down Expand Up @@ -176,6 +164,16 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
engine := initConsensusEngine(chainConfig, logger, allSnapshots)

getHeader := func(hash common.Hash, number uint64) *types.Header {
h, err := blockReader.Header(ctx, historyTx, hash, number)
if err != nil {
panic(err)
}
return h
}
readWrapper := &ReaderWrapper22{r: agg, roTx: rwTx}
writeWrapper := &WriterWrapper22{w: agg}

for !interrupt {
blockNum++
trace = traceBlock > 0 && blockNum == uint64(traceBlock)
Expand All @@ -192,40 +190,12 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log
log.Info("history: block is nil", "block", blockNum)
break
}
if blockNum <= block {
// Skip that block, but increase txNum
txNum += uint64(len(b.Transactions())) + 2 // Pre and Post block transaction
continue
}
agg.SetTx(rwTx)
agg.SetTxNum(txNum)

readWrapper := &ReaderWrapper22{r: agg, roTx: rwTx, blockNum: blockNum}
writeWrapper := &WriterWrapper22{w: agg, blockNum: blockNum}
getHeader := func(hash common.Hash, number uint64) *types.Header {
h, err := blockReader.Header(ctx, historyTx, hash, number)
if err != nil {
panic(err)
}
return h
}

txNum++ // Pre-block transaction
agg.SetTxNum(txNum)

if txNum, _, err = processBlock22(trace, txNum, readWrapper, writeWrapper, chainConfig, engine, getHeader, b, vmConfig); err != nil {
if txNum, _, err = processBlock22(startTxNum, trace, txNum, readWrapper, writeWrapper, chainConfig, engine, getHeader, b, vmConfig); err != nil {
return fmt.Errorf("processing block %d: %w", blockNum, err)
}
agg.SetTxNum(txNum)
if err := agg.FinishTx(); err != nil {
return fmt.Errorf("failed to finish tx: %w", err)
}
if trace {
fmt.Printf("FinishTx called for %d block %d\n", txNum, blockNum)
}

txNum++ // Post-block transaction
agg.SetTxNum(txNum)

// Check for interrupts
select {
Expand Down Expand Up @@ -255,6 +225,7 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log
}
}
agg.SetTx(rwTx)
readWrapper.roTx = rwTx
}
}

Expand Down Expand Up @@ -303,7 +274,7 @@ func (s *stat22) delta(aStats libstate.FilesStats, blockNum uint64) *stat22 {
return s
}

func processBlock22(trace bool, txNumStart uint64, rw *ReaderWrapper22, ww *WriterWrapper22, chainConfig *params.ChainConfig,
func processBlock22(startTxNum uint64, trace bool, txNumStart uint64, rw *ReaderWrapper22, ww *WriterWrapper22, chainConfig *params.ChainConfig,
engine consensus.Engine, getHeader func(hash common.Hash, number uint64) *types.Header, block *types.Block, vmConfig vm.Config,
) (uint64, types.Receipts, error) {
defer blockExecutionTimer.UpdateDuration(time.Now())
Expand All @@ -313,82 +284,119 @@ func processBlock22(trace bool, txNumStart uint64, rw *ReaderWrapper22, ww *Writ
gp := new(core.GasPool).AddGas(block.GasLimit())
usedGas := new(uint64)
var receipts types.Receipts
daoBlock := chainConfig.DAOForkSupport && chainConfig.DAOForkBlock != nil && chainConfig.DAOForkBlock.Cmp(block.Number()) == 0
rules := chainConfig.Rules(block.NumberU64())
txNum := txNumStart
ww.w.SetTxNum(txNum)
rw.blockNum = block.NumberU64()
ww.blockNum = block.NumberU64()

for i, tx := range block.Transactions() {
daoFork := txNum >= startTxNum && chainConfig.DAOForkSupport && chainConfig.DAOForkBlock != nil && chainConfig.DAOForkBlock.Cmp(block.Number()) == 0
if daoFork {
ibs := state.New(rw)
if daoBlock {
misc.ApplyDAOHardFork(ibs)
daoBlock = false
// TODO Actually add tracing to the DAO related accounts
misc.ApplyDAOHardFork(ibs)
if err := ibs.FinalizeTx(rules, ww); err != nil {
return 0, nil, err
}
ibs.Prepare(tx.Hash(), block.Hash(), i)
ct := NewCallTracer()
vmConfig.Tracer = ct
receipt, _, err := core.ApplyTransaction(chainConfig, core.GetHashFn(header, getHeader), engine, nil, gp, ibs, ww, header, tx, usedGas, vmConfig, nil)
if err != nil {
return 0, nil, fmt.Errorf("could not apply tx %d [%x] failed: %w", i, tx.Hash(), err)
if err := ww.w.FinishTx(); err != nil {
return 0, nil, fmt.Errorf("finish daoFork failed: %w", err)
}
for from := range ct.froms {
if err := ww.w.AddTraceFrom(from[:]); err != nil {
return 0, nil, err
}

txNum++ // Pre-block transaction
ww.w.SetTxNum(txNum)

for i, tx := range block.Transactions() {
if txNum >= startTxNum {
ibs := state.New(rw)
ibs.Prepare(tx.Hash(), block.Hash(), i)
ct := NewCallTracer()
vmConfig.Tracer = ct
receipt, _, err := core.ApplyTransaction(chainConfig, core.GetHashFn(header, getHeader), engine, nil, gp, ibs, ww, header, tx, usedGas, vmConfig, nil)
if err != nil {
return 0, nil, fmt.Errorf("could not apply tx %d [%x] failed: %w", i, tx.Hash(), err)
}
}
for to := range ct.tos {
if err := ww.w.AddTraceTo(to[:]); err != nil {
return 0, nil, err
for from := range ct.froms {
if err := ww.w.AddTraceFrom(from[:]); err != nil {
return 0, nil, err
}
}
}
receipts = append(receipts, receipt)
for _, log := range receipt.Logs {
if err = ww.w.AddLogAddr(log.Address[:]); err != nil {
return 0, nil, fmt.Errorf("adding event log for addr %x: %w", log.Address, err)
for to := range ct.tos {
if err := ww.w.AddTraceTo(to[:]); err != nil {
return 0, nil, err
}
}
for _, topic := range log.Topics {
if err = ww.w.AddLogTopic(topic[:]); err != nil {
return 0, nil, fmt.Errorf("adding event log for topic %x: %w", topic, err)
receipts = append(receipts, receipt)
for _, log := range receipt.Logs {
if err = ww.w.AddLogAddr(log.Address[:]); err != nil {
return 0, nil, fmt.Errorf("adding event log for addr %x: %w", log.Address, err)
}
for _, topic := range log.Topics {
if err = ww.w.AddLogTopic(topic[:]); err != nil {
return 0, nil, fmt.Errorf("adding event log for topic %x: %w", topic, err)
}
}
}
}
if err = ww.w.FinishTx(); err != nil {
return 0, nil, fmt.Errorf("finish tx %d [%x] failed: %w", i, tx.Hash(), err)
}
if trace {
fmt.Printf("FinishTx called for blockNum=%d, txIndex=%d, txNum=%d txHash=[%x]\n", block.NumberU64(), i, txNum, tx.Hash())
if err = ww.w.FinishTx(); err != nil {
return 0, nil, fmt.Errorf("finish tx %d [%x] failed: %w", i, tx.Hash(), err)
}
if trace {
fmt.Printf("FinishTx called for blockNum=%d, txIndex=%d, txNum=%d txHash=[%x]\n", block.NumberU64(), i, txNum, tx.Hash())
}
}
txNum++
ww.w.SetTxNum(txNum)
}

ibs := state.New(rw)
if err := ww.w.AddTraceTo(block.Coinbase().Bytes()); err != nil {
return 0, nil, fmt.Errorf("adding coinbase trace: %w", err)
}
for _, uncle := range block.Uncles() {
if err := ww.w.AddTraceTo(uncle.Coinbase.Bytes()); err != nil {
return 0, nil, fmt.Errorf("adding uncle trace: %w", err)
if txNum >= startTxNum && chainConfig.IsByzantium(block.NumberU64()) {
receiptSha := types.DeriveSha(receipts)
if receiptSha != block.ReceiptHash() {
fmt.Printf("mismatched receipt headers for block %d\n", block.NumberU64())
for j, receipt := range receipts {
fmt.Printf("tx %d, used gas: %d\n", j, receipt.GasUsed)
}
}
}

// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
if _, _, _, err := engine.FinalizeAndAssemble(chainConfig, header, ibs, block.Transactions(), block.Uncles(), receipts, nil, nil, nil, nil); err != nil {
return 0, nil, fmt.Errorf("finalize of block %d failed: %w", block.NumberU64(), err)
}
if txNum >= startTxNum {
ibs := state.New(rw)
if err := ww.w.AddTraceTo(block.Coinbase().Bytes()); err != nil {
return 0, nil, fmt.Errorf("adding coinbase trace: %w", err)
}
for _, uncle := range block.Uncles() {
if err := ww.w.AddTraceTo(uncle.Coinbase.Bytes()); err != nil {
return 0, nil, fmt.Errorf("adding uncle trace: %w", err)
}
}

// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
if _, _, err := engine.Finalize(chainConfig, header, ibs, block.Transactions(), block.Uncles(), receipts, nil, nil, nil); err != nil {
return 0, nil, fmt.Errorf("finalize of block %d failed: %w", block.NumberU64(), err)
}

if err := ibs.CommitBlock(rules, ww); err != nil {
return 0, nil, fmt.Errorf("committing block %d failed: %w", block.NumberU64(), err)
}

if err := ibs.CommitBlock(rules, ww); err != nil {
return 0, nil, fmt.Errorf("committing block %d failed: %w", block.NumberU64(), err)
if err := ww.w.FinishTx(); err != nil {
return 0, nil, fmt.Errorf("failed to finish tx: %w", err)
}
if trace {
fmt.Printf("FinishTx called for %d block %d\n", txNum, block.NumberU64())
}
}

txNum++ // Post-block transaction
ww.w.SetTxNum(txNum)

return txNum, receipts, nil
}

// Implements StateReader and StateWriter
type ReaderWrapper22 struct {
blockNum uint64
roTx kv.Tx
r *libstate.Aggregator
blockNum uint64
}

type WriterWrapper22 struct {
Expand Down Expand Up @@ -431,6 +439,9 @@ func (rw *ReaderWrapper22) ReadAccountData(address common.Address) (*accounts.Ac
if incBytes > 0 {
a.Incarnation = bytesToUint64(enc[pos : pos+incBytes])
}
if rw.blockNum == 10264901 {
fmt.Printf("block %d ReadAccount [%x] => {Balance: %d, Nonce: %d}\n", rw.blockNum, address, &a.Balance, a.Nonce)
}
return &a, nil
}

Expand All @@ -439,9 +450,15 @@ func (rw *ReaderWrapper22) ReadAccountStorage(address common.Address, incarnatio
if err != nil {
return nil, err
}
if rw.blockNum == 10264901 {
fmt.Printf("block %d ReadStorage [%x] [%x] => [%x]\n", rw.blockNum, address, *key, enc)
}
if enc == nil {
return nil, nil
}
if len(enc) == 1 && enc[0] == 0 {
return nil, nil
}
return enc, nil
}

Expand Down Expand Up @@ -541,6 +558,10 @@ func (ww *WriterWrapper22) DeleteAccount(address common.Address, original *accou
}

func (ww *WriterWrapper22) WriteAccountStorage(address common.Address, incarnation uint64, key *common.Hash, original, value *uint256.Int) error {
trace := fmt.Sprintf("%x", address) == "000000000000006f6502b7f2bbac8c30a3f67e9a"
if trace {
fmt.Printf("block %d WriteAccountStorage [%x] [%x] => [%x]\n", ww.blockNum, address, *key, value.Bytes())
}
if err := ww.w.WriteAccountStorage(address.Bytes(), key.Bytes(), value.Bytes()); err != nil {
return err
}
Expand Down
20 changes: 14 additions & 6 deletions cmd/state/commands/history22.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ func History22(genesis *core.Genesis, logger log.Logger) error {
readWrapper.SetTrace(blockNum == uint64(traceBlock))
}
writeWrapper := state.NewNoopWriter()
txNum++ // Pre block transaction
getHeader := func(hash common.Hash, number uint64) *types.Header {
h, err := blockReader.Header(ctx, historyTx, hash, number)
if err != nil {
Expand Down Expand Up @@ -227,15 +226,24 @@ func runHistory22(trace bool, blockNum, txNumStart uint64, hw *state.HistoryRead
gp := new(core.GasPool).AddGas(block.GasLimit())
usedGas := new(uint64)
var receipts types.Receipts
daoBlock := chainConfig.DAOForkSupport && chainConfig.DAOForkBlock != nil && chainConfig.DAOForkBlock.Cmp(block.Number()) == 0
rules := chainConfig.Rules(block.NumberU64())
txNum := txNumStart
hw.SetTxNum(txNum)
daoFork := chainConfig.DAOForkSupport && chainConfig.DAOForkBlock != nil && chainConfig.DAOForkBlock.Cmp(block.Number()) == 0
if daoFork {
ibs := state.New(hw)
misc.ApplyDAOHardFork(ibs)
if err := ibs.FinalizeTx(rules, ww); err != nil {
return 0, nil, err
}
if err := hw.FinishTx(); err != nil {
return 0, nil, fmt.Errorf("finish dao fork failed: %w", err)
}
}
txNum++ // Pre block transaction
for i, tx := range block.Transactions() {
hw.SetTxNum(txNum)
ibs := state.New(hw)
if daoBlock {
misc.ApplyDAOHardFork(ibs)
daoBlock = false
}
ibs.Prepare(tx.Hash(), block.Hash(), i)
receipt, _, err := core.ApplyTransaction(chainConfig, core.GetHashFn(header, getHeader), engine, nil, gp, ibs, ww, header, tx, usedGas, vmConfig, nil)
if err != nil {
Expand Down
Loading

0 comments on commit 8599dce

Please sign in to comment.