From 8599dceec7c6e74126de1b63b9a4538f8435aad6 Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Sat, 2 Jul 2022 20:48:42 +0100 Subject: [PATCH] [erigon2.2] State reconstitution prototype (#4508) * reconstitution * Add history access without state function * More on state reconstitution * More on state recon * More on state recon * More * More * support dao fork * More on state reconstitution * Update to erigon-lib * More * Added genesis block and filling with history * update * Genesis works * Start on parallel * Preparation for parallel reconstitution, stats for EfSearch * continue with parallel work * Fix history reader * Remove time measurements * Fixes * Fixes and UX improvements * Fixes * More tracing * More fixes * More fixes * Fix code size * Update to latest erigon-lib * Fix for dao fork * Remove hacks * Update to erigon-lib, fix lint Co-authored-by: Alexey Sharp Co-authored-by: Alex Sharp --- cmd/hack/hack.go | 368 +-------- cmd/rpcdaemon22/commands/eth_receipts.go | 5 +- cmd/state/commands/erigon22.go | 211 ++--- cmd/state/commands/history22.go | 20 +- cmd/state/commands/state_recon.go | 775 ++++++++++++++++++ ...istoryReader22.go => history_reader_22.go} | 6 +- core/state/history_reader_nostate.go | 210 +++++ core/state/intra_block_state.go | 8 +- core/state/state_object.go | 11 +- core/state/state_recon_writer.go | 257 ++++++ go.mod | 2 +- go.sum | 4 +- turbo/snapshotsync/block_reader.go | 3 + 13 files changed, 1401 insertions(+), 479 deletions(-) create mode 100644 cmd/state/commands/state_recon.go rename core/state/{HistoryReader22.go => history_reader_22.go} (97%) create mode 100644 core/state/history_reader_nostate.go create mode 100644 core/state/state_recon_writer.go diff --git a/cmd/hack/hack.go b/cmd/hack/hack.go index ce4fbf4ae3b..8434f6b3ebe 100644 --- a/cmd/hack/hack.go +++ b/cmd/hack/hack.go @@ -13,10 +13,8 @@ import ( _ "net/http/pprof" //nolint:gosec "os" "path/filepath" - "regexp" "runtime/pprof" "sort" - "strconv" "strings" "time" @@ -25,8 +23,6 @@ import ( "github.com/ledgerwatch/erigon-lib/compress" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" - "github.com/ledgerwatch/erigon-lib/recsplit" - "github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32" "golang.org/x/exp/slices" hackdb "github.com/ledgerwatch/erigon/cmd/hack/db" @@ -994,180 +990,6 @@ func devTx(chaindata string) error { return nil } -func mainnetGenesis() error { - g := core.DefaultGenesisBlock() - _, _, err := g.ToBlock() - if err != nil { - return err - } - return nil -} - -func junkdb() error { - dir, err := os.MkdirTemp(".", "junk") - if err != nil { - return fmt.Errorf("creating temp dir for db size test: %w", err) - } - //defer os.RemoveAll(dir) - oneBucketCfg := make(kv.TableCfg) - oneBucketCfg["t"] = kv.TableCfgItem{} - var db kv.RwDB - db, err = mdbx.NewMDBX(log.New()).Path(dir).WithTablessCfg(func(kv.TableCfg) kv.TableCfg { - return oneBucketCfg - }).Open() - if err != nil { - return fmt.Errorf("opening database: %w", err) - } - defer db.Close() - for i := 0; i < 1_000_000; i++ { - if err = db.Update(context.Background(), func(tx kv.RwTx) error { - c, e := tx.RwCursor("t") - if e != nil { - return e - } - defer c.Close() - for j := 0; j < 1_000_000_000; j++ { - var b [8]byte - binary.BigEndian.PutUint64(b[:], uint64(i*1_000_000_000+j)) - if e = c.Append(b[:], b[:]); e != nil { - return e - } - } - return nil - }); err != nil { - return err - } - log.Info("Appended records", "bln", i+1) - } - return nil -} - -func histStats() error { - files, err := os.ReadDir(".") - if err != nil { - return err - } - endBlockMap := map[uint64]struct{}{} - pageMap := map[string]map[uint64]uint64{} - keys := []string{"ahistory", "shistory", "abitmap", "sbitmap"} - for _, k := range keys { - pageMap[k] = map[uint64]uint64{} - } - re := regexp.MustCompile(`(ahistory|shistory|abitmap|sbitmap).([0-9]+).txt`) - for _, f := range files { - name := f.Name() - subs := re.FindStringSubmatch(name) - if len(subs) != 3 { - if len(subs) != 0 { - log.Warn("File ignored by changes scan, more than 3 submatches", "name", name, "submatches", len(subs)) - } - continue - } - var endBlock uint64 - if endBlock, err = strconv.ParseUint(subs[2], 10, 64); err != nil { - return err - } - endBlockMap[endBlock] = struct{}{} - var ff *os.File - if ff, err = os.Open(name); err != nil { - return err - } - scanner := bufio.NewScanner(ff) - // Skip 5 lines - for i := 0; i < 5; i++ { - scanner.Scan() - } - var totalPages uint64 - for i := 0; i < 3; i++ { - scanner.Scan() - line := scanner.Text() - p := strings.Index(line, ": ") - var pages uint64 - if pages, err = strconv.ParseUint(line[p+2:], 10, 64); err != nil { - return err - } - totalPages += pages - } - pageMap[subs[1]][endBlock] = totalPages - ff.Close() - } - var endBlocks []uint64 - for endBlock := range endBlockMap { - endBlocks = append(endBlocks, endBlock) - } - slices.Sort(endBlocks) - var lastEndBlock uint64 - fmt.Printf("endBlock,%s\n", strings.Join(keys, ",")) - for _, endBlock := range endBlocks { - fmt.Printf("%d", endBlock) - for _, k := range keys { - if lastEndBlock == 0 { - fmt.Printf(",%.3f", float64(pageMap[k][endBlock])/256.0/1024.0) - } else { - fmt.Printf(",%.3f", float64(pageMap[k][endBlock]-pageMap[k][lastEndBlock])/256.0/1024.0) - } - } - fmt.Printf("\n") - lastEndBlock = endBlock - } - return nil -} - -func histStat1(chaindata string) error { - files, err := os.ReadDir(chaindata) - if err != nil { - return err - } - endBlockMap := map[uint64]struct{}{} - sizeMap := map[string]map[uint64]uint64{} - keys := []string{"ahistory", "shistory", "chistory", "abitmap", "sbitmap", "cbitmap"} - for _, k := range keys { - sizeMap[k] = map[uint64]uint64{} - } - re := regexp.MustCompile(fmt.Sprintf("(%s).([0-9]+)-([0-9]+).(dat|idx)", strings.Join(keys, "|"))) - for _, f := range files { - name := f.Name() - subs := re.FindStringSubmatch(name) - if len(subs) != 5 { - if len(subs) != 0 { - log.Warn("File ignored by changes scan, more than 5 submatches", "name", name, "submatches", len(subs)) - } - continue - } - var startBlock uint64 - if startBlock, err = strconv.ParseUint(subs[2], 10, 64); err != nil { - return err - } - var endBlock uint64 - if endBlock, err = strconv.ParseUint(subs[3], 10, 64); err != nil { - return err - } - if endBlock-startBlock < 499_999 { - continue - } - endBlockMap[endBlock] = struct{}{} - if fileInfo, err1 := os.Stat(filepath.Join(chaindata, name)); err1 == nil { - sizeMap[subs[1]][endBlock] += uint64(fileInfo.Size()) - } else { - return err1 - } - } - var endBlocks []uint64 - for endBlock := range endBlockMap { - endBlocks = append(endBlocks, endBlock) - } - slices.Sort(endBlocks) - fmt.Printf("endBlock,%s\n", strings.Join(keys, ",")) - for _, endBlock := range endBlocks { - fmt.Printf("%d", endBlock) - for _, k := range keys { - fmt.Printf(",%.3f", float64(sizeMap[k][endBlock])/1024.0/1024.0/1024.0) - } - fmt.Printf("\n") - } - return nil -} - func chainConfig(name string) error { var chainConfig *params.ChainConfig switch name { @@ -1270,81 +1092,6 @@ func findPrefix(chaindata string) error { return nil } -func readEf(file string, addr []byte) error { - datPath := file + ".dat" - idxPath := file + ".idx" - index, err := recsplit.OpenIndex(idxPath) - if err != nil { - return err - } - defer index.Close() - decomp, err := compress.NewDecompressor(datPath) - if err != nil { - return err - } - defer decomp.Close() - indexReader := recsplit.NewIndexReader(index) - offset := indexReader.Lookup(addr) - g := decomp.MakeGetter() - g.Reset(offset) - word, _ := g.Next(nil) - fmt.Printf("%x\n", word) - word, _ = g.NextUncompressed() - ef, _ := eliasfano32.ReadEliasFano(word) - it := ef.Iterator() - line := 0 - for it.HasNext() { - fmt.Printf("%d ", it.Next()) - line++ - if line%20 == 0 { - fmt.Printf("\n") - } - } - fmt.Printf("\n") - return nil -} - -func readBodies(file string) error { - decomp, err := compress.NewDecompressor(file) - if err != nil { - return err - } - defer decomp.Close() - gg := decomp.MakeGetter() - buf, _ := gg.Next(nil) - firstBody := &types.BodyForStorage{} - if err = rlp.DecodeBytes(buf, firstBody); err != nil { - return err - } - //var blockFrom uint64 = 12300000 - //var blockTo uint64 = 12400000 - firstTxID := firstBody.BaseTxId - - lastBody := new(types.BodyForStorage) - i := uint64(0) - for gg.HasNext() { - i++ - //if i == blockTo-blockFrom-1 { - //fmt.Printf("lastBody\n") - buf, _ = gg.Next(buf[:0]) - if err = rlp.DecodeBytes(buf, lastBody); err != nil { - return err - } - //if gg.HasNext() { - // panic(1) - //} - //} else { - if gg.HasNext() { - gg.Skip() - } - //} - } - expectedCount := lastBody.BaseTxId + uint64(lastBody.TxAmount) - firstBody.BaseTxId - fmt.Printf("i=%d, firstBody=%v, lastBody=%v, firstTxID=%d, expectedCount=%d\n", i, firstBody, lastBody, firstTxID, expectedCount) - - return nil -} - func findLogs(chaindata string, block uint64, blockTotal uint64) error { db := mdbx.MustOpen(chaindata) defer db.Close() @@ -1416,106 +1163,21 @@ func findLogs(chaindata string, block uint64, blockTotal uint64) error { return nil } -func decompress(chaindata string) error { - dir := filepath.Join(chaindata, "erigon22") - files, err := os.ReadDir(dir) - if err != nil { - return err - } - for _, f := range files { - name := f.Name() - if !strings.HasSuffix(name, ".dat") { - continue - } - if err = decompressAll(dir, filepath.Join(dir, name), strings.Contains(name, "code")); err != nil { - return err - } - } - // Re-read directory - files, err = os.ReadDir(dir) - if err != nil { - return err - } - for _, f := range files { - name := f.Name() - if !strings.HasSuffix(name, ".d") { - continue - } - if err = os.Rename(filepath.Join(dir, name), filepath.Join(dir, name[:len(name)-2])); err != nil { - return err - } - } - return nil -} - -func decompressAll(dir string, filename string, onlyKeys bool) error { - fmt.Printf("decompress file %s, onlyKeys=%t\n", filename, onlyKeys) +func iterate(filename string) error { d, err := compress.NewDecompressor(filename) if err != nil { return err } defer d.Close() - newDatPath := filename + ".d" - comp, err := compress.NewCompressor(context.Background(), "comp", newDatPath, dir, compress.MinPatternScore, 1, log.LvlInfo) - if err != nil { - return err - } - defer comp.Close() - idxPath := filename[:len(filename)-3] + "idx" - idx, err := recsplit.OpenIndex(idxPath) - if err != nil { - return err - } - defer idx.Close() g := d.MakeGetter() - var isKey bool - var word []byte + var buf, bufv []byte for g.HasNext() { - word, _ = g.Next(word[:0]) - if onlyKeys && !isKey { - if err := comp.AddWord(word); err != nil { - return err - } - } else { - if err := comp.AddUncompressedWord(word); err != nil { - return err - } + buf, _ = g.Next(buf[:0]) + bufv, _ = g.Next(bufv[:0]) + s := fmt.Sprintf("%x", buf) + if strings.HasPrefix(s, "000000000000006f6502b7f2bbac8c30a3f67e9a") { + fmt.Printf("%s [%x]\n", s, bufv) } - isKey = !isKey - } - if err = comp.Compress(); err != nil { - return err - } - comp.Close() - offsets := idx.ExtractOffsets() - newD, err := compress.NewDecompressor(newDatPath) - if err != nil { - return err - } - defer newD.Close() - newG := newD.MakeGetter() - g.Reset(0) - offset := uint64(0) - newOffset := uint64(0) - for g.HasNext() { - offsets[offset] = newOffset - offset = g.Skip() - newOffset = newG.Skip() - } - newIdxPath := idxPath + ".d" - f, err := os.Create(newIdxPath) - if err != nil { - return err - } - w := bufio.NewWriter(f) - if err = idx.RewriteWithOffsets(w, offsets); err != nil { - return err - } - if err = w.Flush(); err != nil { - return err - } - if err = f.Close(); err != nil { - return err } return nil } @@ -1636,26 +1298,14 @@ func main() { case "devTx": err = devTx(*chaindata) - case "mainnetGenesis": - err = mainnetGenesis() - case "junkdb": - err = junkdb() - case "histStats": - err = histStats() - case "histStat1": - err = histStat1(*chaindata) case "chainConfig": err = chainConfig(*name) case "findPrefix": err = findPrefix(*chaindata) - case "readEf": - err = readEf(*chaindata, common.FromHex(*account)) - case "readBodies": - err = readBodies(*chaindata) case "findLogs": err = findLogs(*chaindata, uint64(*block), uint64(*blockTotal)) - case "decompress": - err = decompress(*chaindata) + case "iterate": + err = iterate(*chaindata) } if err != nil { diff --git a/cmd/rpcdaemon22/commands/eth_receipts.go b/cmd/rpcdaemon22/commands/eth_receipts.go index 3a49b77b49a..0128a6b1ab8 100644 --- a/cmd/rpcdaemon22/commands/eth_receipts.go +++ b/cmd/rpcdaemon22/commands/eth_receipts.go @@ -5,6 +5,7 @@ import ( "fmt" "math/big" "sort" + "time" "github.com/holiman/uint256" "github.com/ledgerwatch/erigon-lib/kv" @@ -69,6 +70,7 @@ func (api *BaseAPI) getReceipts(ctx context.Context, tx kv.Tx, chainConfig *para // GetLogs implements eth_getLogs. Returns an array of logs matching a given filter object. func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([]*types.Log, error) { + start := time.Now() var begin, end uint64 logs := []*types.Log{} @@ -215,7 +217,8 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([ } logs = append(logs, filtered...) } - + stats := api._agg.GetAndResetStats() + log.Info("Finished", "duration", time.Since(start), "history queries", stats.HistoryQueries, "ef search duration", stats.EfSearchTime) return logs, nil } diff --git a/cmd/state/commands/erigon22.go b/cmd/state/commands/erigon22.go index bae4512818c..754c11cdd7f 100644 --- a/cmd/state/commands/erigon22.go +++ b/cmd/state/commands/erigon22.go @@ -80,14 +80,12 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log } defer historyTx.Rollback() stateDbPath := path.Join(datadir, "statedb") - if block == 0 { - if _, err = os.Stat(stateDbPath); err != nil { - if !errors.Is(err, os.ErrNotExist) { - return err - } - } else if err = os.RemoveAll(stateDbPath); err != nil { + if _, err = os.Stat(stateDbPath); err != nil { + if !errors.Is(err, os.ErrNotExist) { return err } + } else if err = os.RemoveAll(stateDbPath); err != nil { + return err } db, err2 := kv2.NewMDBX(logger).Path(stateDbPath).WriteMap().Open() if err2 != nil { @@ -96,18 +94,6 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log defer db.Close() aggPath := filepath.Join(datadir, "erigon22") - if block == 0 { - if _, err = os.Stat(aggPath); err != nil { - if !errors.Is(err, os.ErrNotExist) { - return err - } - } else if err = os.RemoveAll(aggPath); err != nil { - return err - } - if err = os.Mkdir(aggPath, os.ModePerm); err != nil { - return err - } - } var rwTx kv.RwTx defer func() { @@ -124,9 +110,11 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log return fmt.Errorf("create aggregator: %w", err3) } defer agg.Close() + startTxNum := agg.EndTxNumMinimax() + fmt.Printf("Max txNum in files: %d\n", startTxNum) interrupt := false - if block == 0 { + if startTxNum == 0 { _, genesisIbs, err := genesis.ToBlock() if err != nil { return err @@ -176,6 +164,16 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) engine := initConsensusEngine(chainConfig, logger, allSnapshots) + getHeader := func(hash common.Hash, number uint64) *types.Header { + h, err := blockReader.Header(ctx, historyTx, hash, number) + if err != nil { + panic(err) + } + return h + } + readWrapper := &ReaderWrapper22{r: agg, roTx: rwTx} + writeWrapper := &WriterWrapper22{w: agg} + for !interrupt { blockNum++ trace = traceBlock > 0 && blockNum == uint64(traceBlock) @@ -192,40 +190,12 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log log.Info("history: block is nil", "block", blockNum) break } - if blockNum <= block { - // Skip that block, but increase txNum - txNum += uint64(len(b.Transactions())) + 2 // Pre and Post block transaction - continue - } agg.SetTx(rwTx) agg.SetTxNum(txNum) - readWrapper := &ReaderWrapper22{r: agg, roTx: rwTx, blockNum: blockNum} - writeWrapper := &WriterWrapper22{w: agg, blockNum: blockNum} - getHeader := func(hash common.Hash, number uint64) *types.Header { - h, err := blockReader.Header(ctx, historyTx, hash, number) - if err != nil { - panic(err) - } - return h - } - - txNum++ // Pre-block transaction - agg.SetTxNum(txNum) - - if txNum, _, err = processBlock22(trace, txNum, readWrapper, writeWrapper, chainConfig, engine, getHeader, b, vmConfig); err != nil { + if txNum, _, err = processBlock22(startTxNum, trace, txNum, readWrapper, writeWrapper, chainConfig, engine, getHeader, b, vmConfig); err != nil { return fmt.Errorf("processing block %d: %w", blockNum, err) } - agg.SetTxNum(txNum) - if err := agg.FinishTx(); err != nil { - return fmt.Errorf("failed to finish tx: %w", err) - } - if trace { - fmt.Printf("FinishTx called for %d block %d\n", txNum, blockNum) - } - - txNum++ // Post-block transaction - agg.SetTxNum(txNum) // Check for interrupts select { @@ -255,6 +225,7 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log } } agg.SetTx(rwTx) + readWrapper.roTx = rwTx } } @@ -303,7 +274,7 @@ func (s *stat22) delta(aStats libstate.FilesStats, blockNum uint64) *stat22 { return s } -func processBlock22(trace bool, txNumStart uint64, rw *ReaderWrapper22, ww *WriterWrapper22, chainConfig *params.ChainConfig, +func processBlock22(startTxNum uint64, trace bool, txNumStart uint64, rw *ReaderWrapper22, ww *WriterWrapper22, chainConfig *params.ChainConfig, engine consensus.Engine, getHeader func(hash common.Hash, number uint64) *types.Header, block *types.Block, vmConfig vm.Config, ) (uint64, types.Receipts, error) { defer blockExecutionTimer.UpdateDuration(time.Now()) @@ -313,82 +284,119 @@ func processBlock22(trace bool, txNumStart uint64, rw *ReaderWrapper22, ww *Writ gp := new(core.GasPool).AddGas(block.GasLimit()) usedGas := new(uint64) var receipts types.Receipts - daoBlock := chainConfig.DAOForkSupport && chainConfig.DAOForkBlock != nil && chainConfig.DAOForkBlock.Cmp(block.Number()) == 0 rules := chainConfig.Rules(block.NumberU64()) txNum := txNumStart ww.w.SetTxNum(txNum) + rw.blockNum = block.NumberU64() + ww.blockNum = block.NumberU64() - for i, tx := range block.Transactions() { + daoFork := txNum >= startTxNum && chainConfig.DAOForkSupport && chainConfig.DAOForkBlock != nil && chainConfig.DAOForkBlock.Cmp(block.Number()) == 0 + if daoFork { ibs := state.New(rw) - if daoBlock { - misc.ApplyDAOHardFork(ibs) - daoBlock = false + // TODO Actually add tracing to the DAO related accounts + misc.ApplyDAOHardFork(ibs) + if err := ibs.FinalizeTx(rules, ww); err != nil { + return 0, nil, err } - ibs.Prepare(tx.Hash(), block.Hash(), i) - ct := NewCallTracer() - vmConfig.Tracer = ct - receipt, _, err := core.ApplyTransaction(chainConfig, core.GetHashFn(header, getHeader), engine, nil, gp, ibs, ww, header, tx, usedGas, vmConfig, nil) - if err != nil { - return 0, nil, fmt.Errorf("could not apply tx %d [%x] failed: %w", i, tx.Hash(), err) + if err := ww.w.FinishTx(); err != nil { + return 0, nil, fmt.Errorf("finish daoFork failed: %w", err) } - for from := range ct.froms { - if err := ww.w.AddTraceFrom(from[:]); err != nil { - return 0, nil, err + } + + txNum++ // Pre-block transaction + ww.w.SetTxNum(txNum) + + for i, tx := range block.Transactions() { + if txNum >= startTxNum { + ibs := state.New(rw) + ibs.Prepare(tx.Hash(), block.Hash(), i) + ct := NewCallTracer() + vmConfig.Tracer = ct + receipt, _, err := core.ApplyTransaction(chainConfig, core.GetHashFn(header, getHeader), engine, nil, gp, ibs, ww, header, tx, usedGas, vmConfig, nil) + if err != nil { + return 0, nil, fmt.Errorf("could not apply tx %d [%x] failed: %w", i, tx.Hash(), err) } - } - for to := range ct.tos { - if err := ww.w.AddTraceTo(to[:]); err != nil { - return 0, nil, err + for from := range ct.froms { + if err := ww.w.AddTraceFrom(from[:]); err != nil { + return 0, nil, err + } } - } - receipts = append(receipts, receipt) - for _, log := range receipt.Logs { - if err = ww.w.AddLogAddr(log.Address[:]); err != nil { - return 0, nil, fmt.Errorf("adding event log for addr %x: %w", log.Address, err) + for to := range ct.tos { + if err := ww.w.AddTraceTo(to[:]); err != nil { + return 0, nil, err + } } - for _, topic := range log.Topics { - if err = ww.w.AddLogTopic(topic[:]); err != nil { - return 0, nil, fmt.Errorf("adding event log for topic %x: %w", topic, err) + receipts = append(receipts, receipt) + for _, log := range receipt.Logs { + if err = ww.w.AddLogAddr(log.Address[:]); err != nil { + return 0, nil, fmt.Errorf("adding event log for addr %x: %w", log.Address, err) + } + for _, topic := range log.Topics { + if err = ww.w.AddLogTopic(topic[:]); err != nil { + return 0, nil, fmt.Errorf("adding event log for topic %x: %w", topic, err) + } } } - } - if err = ww.w.FinishTx(); err != nil { - return 0, nil, fmt.Errorf("finish tx %d [%x] failed: %w", i, tx.Hash(), err) - } - if trace { - fmt.Printf("FinishTx called for blockNum=%d, txIndex=%d, txNum=%d txHash=[%x]\n", block.NumberU64(), i, txNum, tx.Hash()) + if err = ww.w.FinishTx(); err != nil { + return 0, nil, fmt.Errorf("finish tx %d [%x] failed: %w", i, tx.Hash(), err) + } + if trace { + fmt.Printf("FinishTx called for blockNum=%d, txIndex=%d, txNum=%d txHash=[%x]\n", block.NumberU64(), i, txNum, tx.Hash()) + } } txNum++ ww.w.SetTxNum(txNum) } - ibs := state.New(rw) - if err := ww.w.AddTraceTo(block.Coinbase().Bytes()); err != nil { - return 0, nil, fmt.Errorf("adding coinbase trace: %w", err) - } - for _, uncle := range block.Uncles() { - if err := ww.w.AddTraceTo(uncle.Coinbase.Bytes()); err != nil { - return 0, nil, fmt.Errorf("adding uncle trace: %w", err) + if txNum >= startTxNum && chainConfig.IsByzantium(block.NumberU64()) { + receiptSha := types.DeriveSha(receipts) + if receiptSha != block.ReceiptHash() { + fmt.Printf("mismatched receipt headers for block %d\n", block.NumberU64()) + for j, receipt := range receipts { + fmt.Printf("tx %d, used gas: %d\n", j, receipt.GasUsed) + } } } - // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) - if _, _, _, err := engine.FinalizeAndAssemble(chainConfig, header, ibs, block.Transactions(), block.Uncles(), receipts, nil, nil, nil, nil); err != nil { - return 0, nil, fmt.Errorf("finalize of block %d failed: %w", block.NumberU64(), err) - } + if txNum >= startTxNum { + ibs := state.New(rw) + if err := ww.w.AddTraceTo(block.Coinbase().Bytes()); err != nil { + return 0, nil, fmt.Errorf("adding coinbase trace: %w", err) + } + for _, uncle := range block.Uncles() { + if err := ww.w.AddTraceTo(uncle.Coinbase.Bytes()); err != nil { + return 0, nil, fmt.Errorf("adding uncle trace: %w", err) + } + } + + // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) + if _, _, err := engine.Finalize(chainConfig, header, ibs, block.Transactions(), block.Uncles(), receipts, nil, nil, nil); err != nil { + return 0, nil, fmt.Errorf("finalize of block %d failed: %w", block.NumberU64(), err) + } + + if err := ibs.CommitBlock(rules, ww); err != nil { + return 0, nil, fmt.Errorf("committing block %d failed: %w", block.NumberU64(), err) + } - if err := ibs.CommitBlock(rules, ww); err != nil { - return 0, nil, fmt.Errorf("committing block %d failed: %w", block.NumberU64(), err) + if err := ww.w.FinishTx(); err != nil { + return 0, nil, fmt.Errorf("failed to finish tx: %w", err) + } + if trace { + fmt.Printf("FinishTx called for %d block %d\n", txNum, block.NumberU64()) + } } + txNum++ // Post-block transaction + ww.w.SetTxNum(txNum) + return txNum, receipts, nil } // Implements StateReader and StateWriter type ReaderWrapper22 struct { - blockNum uint64 roTx kv.Tx r *libstate.Aggregator + blockNum uint64 } type WriterWrapper22 struct { @@ -431,6 +439,9 @@ func (rw *ReaderWrapper22) ReadAccountData(address common.Address) (*accounts.Ac if incBytes > 0 { a.Incarnation = bytesToUint64(enc[pos : pos+incBytes]) } + if rw.blockNum == 10264901 { + fmt.Printf("block %d ReadAccount [%x] => {Balance: %d, Nonce: %d}\n", rw.blockNum, address, &a.Balance, a.Nonce) + } return &a, nil } @@ -439,9 +450,15 @@ func (rw *ReaderWrapper22) ReadAccountStorage(address common.Address, incarnatio if err != nil { return nil, err } + if rw.blockNum == 10264901 { + fmt.Printf("block %d ReadStorage [%x] [%x] => [%x]\n", rw.blockNum, address, *key, enc) + } if enc == nil { return nil, nil } + if len(enc) == 1 && enc[0] == 0 { + return nil, nil + } return enc, nil } @@ -541,6 +558,10 @@ func (ww *WriterWrapper22) DeleteAccount(address common.Address, original *accou } func (ww *WriterWrapper22) WriteAccountStorage(address common.Address, incarnation uint64, key *common.Hash, original, value *uint256.Int) error { + trace := fmt.Sprintf("%x", address) == "000000000000006f6502b7f2bbac8c30a3f67e9a" + if trace { + fmt.Printf("block %d WriteAccountStorage [%x] [%x] => [%x]\n", ww.blockNum, address, *key, value.Bytes()) + } if err := ww.w.WriteAccountStorage(address.Bytes(), key.Bytes(), value.Bytes()); err != nil { return err } diff --git a/cmd/state/commands/history22.go b/cmd/state/commands/history22.go index f8b30069496..a7ecf4d8ad9 100644 --- a/cmd/state/commands/history22.go +++ b/cmd/state/commands/history22.go @@ -175,7 +175,6 @@ func History22(genesis *core.Genesis, logger log.Logger) error { readWrapper.SetTrace(blockNum == uint64(traceBlock)) } writeWrapper := state.NewNoopWriter() - txNum++ // Pre block transaction getHeader := func(hash common.Hash, number uint64) *types.Header { h, err := blockReader.Header(ctx, historyTx, hash, number) if err != nil { @@ -227,15 +226,24 @@ func runHistory22(trace bool, blockNum, txNumStart uint64, hw *state.HistoryRead gp := new(core.GasPool).AddGas(block.GasLimit()) usedGas := new(uint64) var receipts types.Receipts - daoBlock := chainConfig.DAOForkSupport && chainConfig.DAOForkBlock != nil && chainConfig.DAOForkBlock.Cmp(block.Number()) == 0 + rules := chainConfig.Rules(block.NumberU64()) txNum := txNumStart + hw.SetTxNum(txNum) + daoFork := chainConfig.DAOForkSupport && chainConfig.DAOForkBlock != nil && chainConfig.DAOForkBlock.Cmp(block.Number()) == 0 + if daoFork { + ibs := state.New(hw) + misc.ApplyDAOHardFork(ibs) + if err := ibs.FinalizeTx(rules, ww); err != nil { + return 0, nil, err + } + if err := hw.FinishTx(); err != nil { + return 0, nil, fmt.Errorf("finish dao fork failed: %w", err) + } + } + txNum++ // Pre block transaction for i, tx := range block.Transactions() { hw.SetTxNum(txNum) ibs := state.New(hw) - if daoBlock { - misc.ApplyDAOHardFork(ibs) - daoBlock = false - } ibs.Prepare(tx.Hash(), block.Hash(), i) receipt, _, err := core.ApplyTransaction(chainConfig, core.GetHashFn(header, getHeader), engine, nil, gp, ibs, ww, header, tx, usedGas, vmConfig, nil) if err != nil { diff --git a/cmd/state/commands/state_recon.go b/cmd/state/commands/state_recon.go new file mode 100644 index 00000000000..501ec1420a4 --- /dev/null +++ b/cmd/state/commands/state_recon.go @@ -0,0 +1,775 @@ +package commands + +import ( + "context" + "errors" + "fmt" + "math/big" + "os" + "os/signal" + "path" + "path/filepath" + "runtime" + "sort" + "sync" + "sync/atomic" + "syscall" + "time" + + "github.com/RoaringBitmap/roaring/roaring64" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/kv" + kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx" + libstate "github.com/ledgerwatch/erigon-lib/state" + "github.com/ledgerwatch/erigon/common" + "github.com/ledgerwatch/erigon/common/dbutils" + "github.com/ledgerwatch/erigon/consensus" + "github.com/ledgerwatch/erigon/consensus/misc" + "github.com/ledgerwatch/erigon/core" + "github.com/ledgerwatch/erigon/core/state" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/core/types/accounts" + "github.com/ledgerwatch/erigon/core/vm" + "github.com/ledgerwatch/erigon/crypto" + "github.com/ledgerwatch/erigon/eth/ethconfig" + "github.com/ledgerwatch/erigon/eth/stagedsync" + "github.com/ledgerwatch/erigon/params" + "github.com/ledgerwatch/erigon/turbo/services" + "github.com/ledgerwatch/erigon/turbo/snapshotsync" + "github.com/ledgerwatch/log/v3" + "github.com/spf13/cobra" +) + +func init() { + withBlock(reconCmd) + withDataDir(reconCmd) + rootCmd.AddCommand(reconCmd) +} + +var reconCmd = &cobra.Command{ + Use: "recon", + Short: "Exerimental command to reconstitute the state from state history at given block", + RunE: func(cmd *cobra.Command, args []string) error { + logger := log.New() + return Recon(genesis, logger) + }, +} + +type ReconWorker struct { + lock sync.Locker + wg *sync.WaitGroup + rs *state.ReconState + blockReader services.FullBlockReader + allSnapshots *snapshotsync.RoSnapshots + stateWriter *state.StateReconWriter + stateReader *state.HistoryReaderNoState + firstBlock bool + lastBlockNum uint64 + lastBlockHash common.Hash + lastHeader *types.Header + lastRules *params.Rules + getHeader func(hash common.Hash, number uint64) *types.Header + ctx context.Context + engine consensus.Engine + txNums []uint64 + chainConfig *params.ChainConfig + logger log.Logger + genesis *core.Genesis +} + +func NewReconWorker(lock sync.Locker, wg *sync.WaitGroup, rs *state.ReconState, + a *libstate.Aggregator, blockReader services.FullBlockReader, allSnapshots *snapshotsync.RoSnapshots, + txNums []uint64, chainConfig *params.ChainConfig, logger log.Logger, genesis *core.Genesis, +) *ReconWorker { + ac := a.MakeContext() + return &ReconWorker{ + lock: lock, + wg: wg, + rs: rs, + blockReader: blockReader, + allSnapshots: allSnapshots, + ctx: context.Background(), + stateWriter: state.NewStateReconWriter(ac, rs), + stateReader: state.NewHistoryReaderNoState(ac, rs), + txNums: txNums, + chainConfig: chainConfig, + logger: logger, + genesis: genesis, + } +} + +func (rw *ReconWorker) SetTx(tx kv.Tx) { + rw.stateReader.SetTx(tx) +} + +func (rw *ReconWorker) run() { + defer rw.wg.Done() + rw.firstBlock = true + rw.getHeader = func(hash common.Hash, number uint64) *types.Header { + h, err := rw.blockReader.Header(rw.ctx, nil, hash, number) + if err != nil { + panic(err) + } + return h + } + rw.engine = initConsensusEngine(rw.chainConfig, rw.logger, rw.allSnapshots) + for txNum, ok := rw.rs.Schedule(); ok; txNum, ok = rw.rs.Schedule() { + rw.runTxNum(txNum) + } +} + +func (rw *ReconWorker) runTxNum(txNum uint64) { + rw.lock.Lock() + defer rw.lock.Unlock() + rw.stateReader.SetTxNum(txNum) + rw.stateReader.ResetError() + rw.stateWriter.SetTxNum(txNum) + noop := state.NewNoopWriter() + // Find block number + blockNum := uint64(sort.Search(len(rw.txNums), func(i int) bool { + return rw.txNums[i] > txNum + })) + if rw.firstBlock || blockNum != rw.lastBlockNum { + var err error + if rw.lastHeader, err = rw.blockReader.HeaderByNumber(rw.ctx, nil, blockNum); err != nil { + panic(err) + } + rw.lastBlockNum = blockNum + rw.lastBlockHash = rw.lastHeader.Hash() + rw.lastRules = rw.chainConfig.Rules(blockNum) + rw.firstBlock = false + } + var startTxNum uint64 + if blockNum > 0 { + startTxNum = rw.txNums[blockNum-1] + } + ibs := state.New(rw.stateReader) + daoForkTx := rw.chainConfig.DAOForkSupport && rw.chainConfig.DAOForkBlock != nil && rw.chainConfig.DAOForkBlock.Uint64() == blockNum && txNum == rw.txNums[blockNum-1] + var err error + if blockNum == 0 { + //fmt.Printf("txNum=%d, blockNum=%d, Genesis\n", txNum, blockNum) + // Genesis block + _, ibs, err = rw.genesis.ToBlock() + if err != nil { + panic(err) + } + } else if daoForkTx { + //fmt.Printf("txNum=%d, blockNum=%d, DAO fork\n", txNum, blockNum) + misc.ApplyDAOHardFork(ibs) + if err := ibs.FinalizeTx(rw.lastRules, noop); err != nil { + panic(err) + } + } else if txNum+1 == rw.txNums[blockNum] { + //fmt.Printf("txNum=%d, blockNum=%d, finalisation of the block\n", txNum, blockNum) + // End of block transaction in a block + block, _, err := rw.blockReader.BlockWithSenders(rw.ctx, nil, rw.lastBlockHash, blockNum) + if err != nil { + panic(err) + } + if _, _, err := rw.engine.Finalize(rw.chainConfig, rw.lastHeader, ibs, block.Transactions(), block.Uncles(), nil /* receipts */, nil, nil, nil); err != nil { + panic(fmt.Errorf("finalize of block %d failed: %w", blockNum, err)) + } + } else { + txIndex := txNum - startTxNum - 1 + //fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d\n", txNum, blockNum, txIndex) + txn, err := rw.blockReader.TxnByIdxInBlock(rw.ctx, nil, blockNum, int(txIndex)) + if err != nil { + panic(err) + } + txHash := txn.Hash() + gp := new(core.GasPool).AddGas(txn.GetGas()) + //fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d, gas=%d, input=[%x]\n", txNum, blockNum, txIndex, txn.GetGas(), txn.GetData()) + usedGas := new(uint64) + vmConfig := vm.Config{NoReceipts: true, SkipAnalysis: core.SkipAnalysis(rw.chainConfig, blockNum)} + contractHasTEVM := func(contractHash common.Hash) (bool, error) { return false, nil } + ibs.Prepare(txHash, rw.lastBlockHash, int(txIndex)) + _, _, err = core.ApplyTransaction(rw.chainConfig, core.GetHashFn(rw.lastHeader, rw.getHeader), rw.engine, nil, gp, ibs, noop, rw.lastHeader, txn, usedGas, vmConfig, contractHasTEVM) + if err != nil { + panic(fmt.Errorf("could not apply tx %d [%x] failed: %w", txIndex, txHash, err)) + } + } + if dependency, ok := rw.stateReader.ReadError(); ok { + //fmt.Printf("rollback %d\n", txNum) + rw.rs.RollbackTxNum(txNum, dependency) + } else { + if err = ibs.CommitBlock(rw.lastRules, rw.stateWriter); err != nil { + panic(err) + } + //fmt.Printf("commit %d\n", txNum) + rw.rs.CommitTxNum(txNum) + } +} + +type FillWorker struct { + txNum uint64 + doneCount *uint64 + rs *state.ReconState + ac *libstate.AggregatorContext + fromKey, toKey []byte + currentKey []byte + bitmap roaring64.Bitmap + total uint64 + progress uint64 +} + +func NewFillWorker(txNum uint64, doneCount *uint64, rs *state.ReconState, a *libstate.Aggregator, fromKey, toKey []byte) *FillWorker { + fw := &FillWorker{ + txNum: txNum, + doneCount: doneCount, + rs: rs, + ac: a.MakeContext(), + fromKey: fromKey, + toKey: toKey, + } + return fw +} + +func (fw *FillWorker) Total() uint64 { + return atomic.LoadUint64(&fw.total) +} + +func (fw *FillWorker) Progress() uint64 { + return atomic.LoadUint64(&fw.progress) +} + +func (fw *FillWorker) fillAccounts() { + defer func() { + atomic.AddUint64(fw.doneCount, 1) + }() + it := fw.ac.IterateAccountsHistory(fw.fromKey, fw.toKey, fw.txNum) + atomic.StoreUint64(&fw.total, it.Total()) + for it.HasNext() { + key, val, progress := it.Next() + atomic.StoreUint64(&fw.progress, progress) + fw.currentKey = key + if len(val) > 0 { + var a accounts.Account + a.Reset() + pos := 0 + nonceBytes := int(val[pos]) + pos++ + if nonceBytes > 0 { + a.Nonce = bytesToUint64(val[pos : pos+nonceBytes]) + pos += nonceBytes + } + balanceBytes := int(val[pos]) + pos++ + if balanceBytes > 0 { + a.Balance.SetBytes(val[pos : pos+balanceBytes]) + pos += balanceBytes + } + codeHashBytes := int(val[pos]) + pos++ + if codeHashBytes > 0 { + copy(a.CodeHash[:], val[pos:pos+codeHashBytes]) + pos += codeHashBytes + } + incBytes := int(val[pos]) + pos++ + if incBytes > 0 { + a.Incarnation = bytesToUint64(val[pos : pos+incBytes]) + } + value := make([]byte, a.EncodingLengthForStorage()) + a.EncodeForStorage(value) + fw.rs.Put(kv.PlainState, key, value) + //fmt.Printf("Account [%x]=>{Balance: %d, Nonce: %d, Root: %x, CodeHash: %x}\n", key, &a.Balance, a.Nonce, a.Root, a.CodeHash) + } + } +} + +func (fw *FillWorker) fillStorage() { + defer func() { + atomic.AddUint64(fw.doneCount, 1) + }() + it := fw.ac.IterateStorageHistory(fw.fromKey, fw.toKey, fw.txNum) + atomic.StoreUint64(&fw.total, it.Total()) + for it.HasNext() { + key, val, progress := it.Next() + atomic.StoreUint64(&fw.progress, progress) + fw.currentKey = key + compositeKey := dbutils.PlainGenerateCompositeStorageKey(key[:20], state.FirstContractIncarnation, key[20:]) + if len(val) > 0 { + if len(val) > 1 || val[0] != 0 { + fw.rs.Put(kv.PlainState, compositeKey, val) + } + //fmt.Printf("Storage [%x] => [%x]\n", compositeKey, val) + } + } +} + +func (fw *FillWorker) fillCode() { + defer func() { + atomic.AddUint64(fw.doneCount, 1) + }() + it := fw.ac.IterateCodeHistory(fw.fromKey, fw.toKey, fw.txNum) + atomic.StoreUint64(&fw.total, it.Total()) + for it.HasNext() { + key, val, progress := it.Next() + atomic.StoreUint64(&fw.progress, progress) + fw.currentKey = key + compositeKey := dbutils.PlainGenerateStoragePrefix(key, state.FirstContractIncarnation) + if len(val) > 0 { + if len(val) > 1 || val[0] != 0 { + codeHash := crypto.Keccak256(val) + fw.rs.Put(kv.Code, codeHash[:], val) + fw.rs.Put(kv.PlainContractCode, compositeKey, codeHash[:]) + } + //fmt.Printf("Code [%x] => [%x]\n", compositeKey, val) + } + } +} + +func (fw *FillWorker) ResetProgress() { + fw.total = 0 + fw.progress = 0 +} + +func (fw *FillWorker) bitmapAccounts() { + defer func() { + atomic.AddUint64(fw.doneCount, 1) + }() + it := fw.ac.IterateAccountsReconTxs(fw.fromKey, fw.toKey, fw.txNum) + atomic.StoreUint64(&fw.total, it.Total()) + for it.HasNext() { + txNum, progress := it.Next() + atomic.StoreUint64(&fw.progress, progress) + fw.bitmap.Add(txNum) + } +} + +func (fw *FillWorker) bitmapStorage() { + defer func() { + atomic.AddUint64(fw.doneCount, 1) + }() + it := fw.ac.IterateStorageReconTxs(fw.fromKey, fw.toKey, fw.txNum) + atomic.StoreUint64(&fw.total, it.Total()) + for it.HasNext() { + txNum, progress := it.Next() + atomic.StoreUint64(&fw.progress, progress) + fw.bitmap.Add(txNum) + } +} + +func (fw *FillWorker) bitmapCode() { + defer func() { + atomic.AddUint64(fw.doneCount, 1) + }() + it := fw.ac.IterateCodeReconTxs(fw.fromKey, fw.toKey, fw.txNum) + atomic.StoreUint64(&fw.total, it.Total()) + for it.HasNext() { + txNum, progress := it.Next() + atomic.StoreUint64(&fw.progress, progress) + fw.bitmap.Add(txNum) + } +} + +func Recon(genesis *core.Genesis, logger log.Logger) error { + sigs := make(chan os.Signal, 1) + interruptCh := make(chan bool, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + go func() { + <-sigs + interruptCh <- true + }() + ctx := context.Background() + aggPath := filepath.Join(datadir, "erigon23") + agg, err := libstate.NewAggregator(aggPath, AggregationStep) + if err != nil { + return fmt.Errorf("create history: %w", err) + } + defer agg.Close() + reconDbPath := path.Join(datadir, "recondb") + if _, err = os.Stat(reconDbPath); err != nil { + if !errors.Is(err, os.ErrNotExist) { + return err + } + } else if err = os.RemoveAll(reconDbPath); err != nil { + return err + } + db, err := kv2.NewMDBX(logger).Path(reconDbPath).WriteMap().Open() + if err != nil { + return err + } + var blockReader services.FullBlockReader + var allSnapshots *snapshotsync.RoSnapshots + allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots")) + defer allSnapshots.Close() + if err := allSnapshots.Reopen(); err != nil { + return fmt.Errorf("reopen snapshot segments: %w", err) + } + blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) + // Compute mapping blockNum -> last TxNum in that block + txNums := make([]uint64, allSnapshots.BlocksAvailable()+1) + if err = allSnapshots.Bodies.View(func(bs []*snapshotsync.BodySegment) error { + for _, b := range bs { + if err = b.Iterate(func(blockNum, baseTxNum, txAmount uint64) { + txNums[blockNum] = baseTxNum + txAmount + }); err != nil { + return err + } + } + return nil + }); err != nil { + return fmt.Errorf("build txNum => blockNum mapping: %w", err) + } + endTxNumMinimax := agg.EndTxNumMinimax() + fmt.Printf("Max txNum in files: %d\n", endTxNumMinimax) + blockNum := uint64(sort.Search(len(txNums), func(i int) bool { + return txNums[i] > endTxNumMinimax + })) + if blockNum == uint64(len(txNums)) { + return fmt.Errorf("mininmax txNum not found in snapshot blocks: %d", endTxNumMinimax) + } + if blockNum == 0 { + return fmt.Errorf("not enough transactions in the history data") + } + if block+1 > blockNum { + return fmt.Errorf("specified block %d which is higher than available %d", block, blockNum) + } + fmt.Printf("Max blockNum = %d\n", blockNum) + blockNum = block + 1 + txNum := txNums[blockNum-1] + fmt.Printf("Corresponding block num = %d, txNum = %d\n", blockNum, txNum) + workerCount := runtime.NumCPU() + var wg sync.WaitGroup + rs := state.NewReconState() + var fromKey, toKey []byte + bigCount := big.NewInt(int64(workerCount)) + bigStep := big.NewInt(0x100000000) + bigStep.Div(bigStep, bigCount) + bigCurrent := big.NewInt(0) + fillWorkers := make([]*FillWorker, workerCount) + var doneCount uint64 + for i := 0; i < workerCount; i++ { + fromKey = toKey + if i == workerCount-1 { + toKey = nil + } else { + bigCurrent.Add(bigCurrent, bigStep) + toKey = make([]byte, 4) + bigCurrent.FillBytes(toKey) + } + //fmt.Printf("%d) Fill worker [%x] - [%x]\n", i, fromKey, toKey) + fillWorkers[i] = NewFillWorker(txNum, &doneCount, rs, agg, fromKey, toKey) + } + logEvery := time.NewTicker(logInterval) + defer logEvery.Stop() + doneCount = 0 + for i := 0; i < workerCount; i++ { + fillWorkers[i].ResetProgress() + go fillWorkers[i].bitmapAccounts() + } + for atomic.LoadUint64(&doneCount) < uint64(workerCount) { + select { + case <-logEvery.C: + var m runtime.MemStats + libcommon.ReadMemStats(&m) + var p float64 + for i := 0; i < workerCount; i++ { + if total := fillWorkers[i].Total(); total > 0 { + p += float64(fillWorkers[i].Progress()) / float64(total) + } + } + p *= 100.0 + log.Info("Scan accounts history", "workers", workerCount, "progress", fmt.Sprintf("%.2f%%", p), + "alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys), + ) + } + } + doneCount = 0 + for i := 0; i < workerCount; i++ { + fillWorkers[i].ResetProgress() + go fillWorkers[i].bitmapStorage() + } + for atomic.LoadUint64(&doneCount) < uint64(workerCount) { + select { + case <-logEvery.C: + var m runtime.MemStats + libcommon.ReadMemStats(&m) + var p float64 + for i := 0; i < workerCount; i++ { + if total := fillWorkers[i].Total(); total > 0 { + p += float64(fillWorkers[i].Progress()) / float64(total) + } + } + p *= 100.0 + log.Info("Scan storage history", "workers", workerCount, "progress", fmt.Sprintf("%.2f%%", p), + "alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys), + ) + } + } + doneCount = 0 + for i := 0; i < workerCount; i++ { + fillWorkers[i].ResetProgress() + go fillWorkers[i].bitmapCode() + } + for atomic.LoadUint64(&doneCount) < uint64(workerCount) { + select { + case <-logEvery.C: + var m runtime.MemStats + libcommon.ReadMemStats(&m) + var p float64 + for i := 0; i < workerCount; i++ { + if total := fillWorkers[i].Total(); total > 0 { + p += float64(fillWorkers[i].Progress()) / float64(total) + } + } + p *= 100.0 + log.Info("Scan code history", "workers", workerCount, "progress", fmt.Sprintf("%.2f%%", p), + "alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys), + ) + } + } + var bitmap roaring64.Bitmap + for i := 0; i < workerCount; i++ { + bitmap.Or(&fillWorkers[i].bitmap) + } + log.Info("Ready to replay", "transactions", bitmap.GetCardinality(), "out of", txNum) + rs.SetWorkBitmap(&bitmap) + var lock sync.RWMutex + reconWorkers := make([]*ReconWorker, workerCount) + roTxs := make([]kv.Tx, workerCount) + defer func() { + for i := 0; i < workerCount; i++ { + if roTxs[i] != nil { + roTxs[i].Rollback() + } + } + }() + for i := 0; i < workerCount; i++ { + roTxs[i], err = db.BeginRo(ctx) + if err != nil { + return err + } + } + for i := 0; i < workerCount; i++ { + reconWorkers[i] = NewReconWorker(lock.RLocker(), &wg, rs, agg, blockReader, allSnapshots, txNums, chainConfig, logger, genesis) + reconWorkers[i].SetTx(roTxs[i]) + } + wg.Add(workerCount) + count := uint64(0) + rollbackCount := uint64(0) + total := bitmap.GetCardinality() + for i := 0; i < workerCount; i++ { + go reconWorkers[i].run() + } + commitThreshold := uint64(256 * 1024 * 1024) + prevCount := uint64(0) + prevRollbackCount := uint64(0) + prevTime := time.Now() + for count < total { + select { + case <-logEvery.C: + var m runtime.MemStats + libcommon.ReadMemStats(&m) + sizeEstimate := rs.SizeEstimate() + count = rs.DoneCount() + rollbackCount = rs.RollbackCount() + currentTime := time.Now() + interval := currentTime.Sub(prevTime) + speedTx := float64(count-prevCount) / (float64(interval) / float64(time.Second)) + progress := 100.0 * float64(count) / float64(total) + var repeatRatio float64 + if count > prevCount { + repeatRatio = 100.0 * float64(rollbackCount-prevRollbackCount) / float64(count-prevCount) + } + prevTime = currentTime + prevCount = count + prevRollbackCount = rollbackCount + log.Info("State reconstitution", "workers", workerCount, "progress", fmt.Sprintf("%.2f%%", progress), "tx/s", fmt.Sprintf("%.1f", speedTx), "repeat ratio", fmt.Sprintf("%.2f%%", repeatRatio), "buffer", libcommon.ByteCount(sizeEstimate), + "alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys), + ) + if sizeEstimate >= commitThreshold { + err := func() error { + lock.Lock() + defer lock.Unlock() + for i := 0; i < workerCount; i++ { + roTxs[i].Rollback() + } + rwTx, err := db.BeginRw(ctx) + if err != nil { + return err + } + if err = rs.Flush(rwTx); err != nil { + return err + } + if err = rwTx.Commit(); err != nil { + return err + } + for i := 0; i < workerCount; i++ { + if roTxs[i], err = db.BeginRo(ctx); err != nil { + return err + } + reconWorkers[i].SetTx(roTxs[i]) + } + return nil + }() + if err != nil { + panic(err) + } + } + } + } + wg.Wait() + for i := 0; i < workerCount; i++ { + roTxs[i].Rollback() + } + rwTx, err := db.BeginRw(ctx) + if err != nil { + return err + } + defer func() { + if rwTx != nil { + rwTx.Rollback() + } + }() + if err = rs.Flush(rwTx); err != nil { + return err + } + if err = rwTx.Commit(); err != nil { + return err + } + doneCount = 0 + for i := 0; i < workerCount; i++ { + fillWorkers[i].ResetProgress() + go fillWorkers[i].fillAccounts() + } + for atomic.LoadUint64(&doneCount) < uint64(workerCount) { + select { + case <-logEvery.C: + var m runtime.MemStats + libcommon.ReadMemStats(&m) + sizeEstimate := rs.SizeEstimate() + var p float64 + for i := 0; i < workerCount; i++ { + if total := fillWorkers[i].Total(); total > 0 { + p += float64(fillWorkers[i].Progress()) / float64(total) + } + } + p *= 100.0 + log.Info("Filling accounts", "workers", workerCount, "progress", fmt.Sprintf("%.2f%%", p), "buffer", libcommon.ByteCount(sizeEstimate), + "alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys), + ) + if sizeEstimate >= commitThreshold { + flushStart := time.Now() + rwTx, err := db.BeginRw(ctx) + if err != nil { + return err + } + if err = rs.Flush(rwTx); err != nil { + return err + } + if err = rwTx.Commit(); err != nil { + return err + } + log.Info("Flush buffer", "duration", time.Since(flushStart)) + } + } + } + doneCount = 0 + for i := 0; i < workerCount; i++ { + fillWorkers[i].ResetProgress() + go fillWorkers[i].fillStorage() + } + for atomic.LoadUint64(&doneCount) < uint64(workerCount) { + select { + case <-logEvery.C: + var m runtime.MemStats + libcommon.ReadMemStats(&m) + sizeEstimate := rs.SizeEstimate() + var p float64 + for i := 0; i < workerCount; i++ { + if total := fillWorkers[i].Total(); total > 0 { + p += float64(fillWorkers[i].Progress()) / float64(total) + } + } + p *= 100.0 + log.Info("Filling storage", "workers", workerCount, "progress", fmt.Sprintf("%.2f%%", p), "buffer", libcommon.ByteCount(sizeEstimate), + "alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys), + ) + if sizeEstimate >= commitThreshold { + flushStart := time.Now() + rwTx, err := db.BeginRw(ctx) + if err != nil { + return err + } + if err = rs.Flush(rwTx); err != nil { + return err + } + if err = rwTx.Commit(); err != nil { + return err + } + log.Info("Flush buffer", "duration", time.Since(flushStart)) + } + } + } + doneCount = 0 + for i := 0; i < workerCount; i++ { + fillWorkers[i].ResetProgress() + go fillWorkers[i].fillCode() + } + for atomic.LoadUint64(&doneCount) < uint64(workerCount) { + select { + case <-logEvery.C: + var m runtime.MemStats + libcommon.ReadMemStats(&m) + sizeEstimate := rs.SizeEstimate() + var p float64 + for i := 0; i < workerCount; i++ { + if total := fillWorkers[i].Total(); total > 0 { + p += float64(fillWorkers[i].Progress()) / float64(total) + } + } + p *= 100.0 + log.Info("Filling code", "workers", workerCount, "progress", fmt.Sprintf("%.2f%%", p), "buffer", libcommon.ByteCount(sizeEstimate), + "alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys), + ) + if sizeEstimate >= commitThreshold { + flushStart := time.Now() + rwTx, err := db.BeginRw(ctx) + if err != nil { + return err + } + if err = rs.Flush(rwTx); err != nil { + return err + } + if err = rwTx.Commit(); err != nil { + return err + } + log.Info("Flush buffer", "duration", time.Since(flushStart)) + } + } + } + rwTx, err = db.BeginRw(ctx) + if err != nil { + return err + } + if err = rs.Flush(rwTx); err != nil { + return err + } + if err = rwTx.Commit(); err != nil { + return err + } + if rwTx, err = db.BeginRw(ctx); err != nil { + return err + } + log.Info("Computing hashed state") + tmpDir := filepath.Join(datadir, "tmp") + if err = stagedsync.PromoteHashedStateCleanly("recon", rwTx, stagedsync.StageHashStateCfg(db, tmpDir), ctx); err != nil { + return err + } + if err = rwTx.Commit(); err != nil { + return err + } + if rwTx, err = db.BeginRw(ctx); err != nil { + return err + } + if _, err = stagedsync.RegenerateIntermediateHashes("recon", rwTx, stagedsync.StageTrieCfg(db, false /* checkRoot */, false /* saveHashesToDB */, false /* badBlockHalt */, tmpDir, blockReader), common.Hash{}, make(chan struct{}, 1)); err != nil { + return err + } + if err = rwTx.Commit(); err != nil { + return err + } + return nil +} diff --git a/core/state/HistoryReader22.go b/core/state/history_reader_22.go similarity index 97% rename from core/state/HistoryReader22.go rename to core/state/history_reader_22.go index 647dfe25886..be89d3bf43b 100644 --- a/core/state/HistoryReader22.go +++ b/core/state/history_reader_22.go @@ -126,8 +126,10 @@ func (hr *HistoryReader22) ReadAccountStorage(address common.Address, incarnatio } func (hr *HistoryReader22) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) { - if err := hr.ri.ReadAccountCode(address.Bytes()); err != nil { - return nil, err + if hr.ri != nil { + if err := hr.ri.ReadAccountCode(address.Bytes()); err != nil { + return nil, err + } } enc, err := hr.a.ReadAccountCodeBeforeTxNum(address.Bytes(), hr.txNum, nil /* roTx */) if err != nil { diff --git a/core/state/history_reader_nostate.go b/core/state/history_reader_nostate.go new file mode 100644 index 00000000000..d1e270bec3d --- /dev/null +++ b/core/state/history_reader_nostate.go @@ -0,0 +1,210 @@ +package state + +import ( + "fmt" + + "github.com/ledgerwatch/erigon-lib/kv" + libstate "github.com/ledgerwatch/erigon-lib/state" + "github.com/ledgerwatch/erigon/common" + "github.com/ledgerwatch/erigon/common/dbutils" + "github.com/ledgerwatch/erigon/core/types/accounts" +) + +type RequiredStateError struct { + StateTxNum uint64 +} + +func (r *RequiredStateError) Error() string { + return fmt.Sprintf("required state at txNum %d", r.StateTxNum) +} + +// Implements StateReader and StateWriter +type HistoryReaderNoState struct { + ac *libstate.AggregatorContext + tx kv.Tx + txNum uint64 + trace bool + rs *ReconState + readError bool + stateTxNum uint64 +} + +func NewHistoryReaderNoState(ac *libstate.AggregatorContext, rs *ReconState) *HistoryReaderNoState { + return &HistoryReaderNoState{ac: ac, rs: rs} +} + +func (hr *HistoryReaderNoState) SetTxNum(txNum uint64) { + hr.txNum = txNum +} + +func (hr *HistoryReaderNoState) SetTx(tx kv.Tx) { + hr.tx = tx +} + +func (hr *HistoryReaderNoState) SetTrace(trace bool) { + hr.trace = trace +} + +func (hr *HistoryReaderNoState) ReadAccountData(address common.Address) (*accounts.Account, error) { + enc, noState, stateTxNum, err := hr.ac.ReadAccountDataNoState(address.Bytes(), hr.txNum) + if err != nil { + return nil, err + } + if !noState { + if !hr.rs.Done(stateTxNum) { + hr.readError = true + hr.stateTxNum = stateTxNum + return nil, &RequiredStateError{StateTxNum: stateTxNum} + } + enc = hr.rs.Get(kv.PlainState, address.Bytes()) + if enc == nil { + enc, err = hr.tx.GetOne(kv.PlainState, address.Bytes()) + if err != nil { + return nil, err + } + if enc == nil { + return nil, nil + } + } + var a accounts.Account + if err = a.DecodeForStorage(enc); err != nil { + return nil, err + } + if hr.trace { + fmt.Printf("ReadAccountData [%x] => [nonce: %d, balance: %d, codeHash: %x], noState=%t, stateTxNum=%d, txNum: %d\n", address, a.Nonce, &a.Balance, a.CodeHash, noState, stateTxNum, hr.txNum) + } + return &a, nil + } + if len(enc) == 0 { + if hr.trace { + fmt.Printf("ReadAccountData [%x] => [], noState=%t, stateTxNum=%d, txNum: %d\n", address, noState, stateTxNum, hr.txNum) + } + return nil, nil + } + var a accounts.Account + a.Reset() + pos := 0 + nonceBytes := int(enc[pos]) + pos++ + if nonceBytes > 0 { + a.Nonce = bytesToUint64(enc[pos : pos+nonceBytes]) + pos += nonceBytes + } + balanceBytes := int(enc[pos]) + pos++ + if balanceBytes > 0 { + a.Balance.SetBytes(enc[pos : pos+balanceBytes]) + pos += balanceBytes + } + codeHashBytes := int(enc[pos]) + pos++ + if codeHashBytes > 0 { + copy(a.CodeHash[:], enc[pos:pos+codeHashBytes]) + pos += codeHashBytes + } + incBytes := int(enc[pos]) + pos++ + if incBytes > 0 { + a.Incarnation = bytesToUint64(enc[pos : pos+incBytes]) + } + if hr.trace { + fmt.Printf("ReadAccountData [%x] => [nonce: %d, balance: %d, codeHash: %x], noState=%t, stateTxNum=%d, txNum: %d\n", address, a.Nonce, &a.Balance, a.CodeHash, noState, stateTxNum, hr.txNum) + } + return &a, nil +} + +func (hr *HistoryReaderNoState) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) { + enc, noState, stateTxNum, err := hr.ac.ReadAccountStorageNoState(address.Bytes(), key.Bytes(), hr.txNum) + if err != nil { + return nil, err + } + if !noState { + if !hr.rs.Done(stateTxNum) { + hr.readError = true + hr.stateTxNum = stateTxNum + return nil, &RequiredStateError{StateTxNum: stateTxNum} + } + compositeKey := dbutils.PlainGenerateCompositeStorageKey(address.Bytes(), FirstContractIncarnation, key.Bytes()) + enc = hr.rs.Get(kv.PlainState, compositeKey) + if enc == nil { + enc, err = hr.tx.GetOne(kv.PlainState, compositeKey) + if err != nil { + return nil, err + } + } + } + if hr.trace { + if enc == nil { + fmt.Printf("ReadAccountStorage [%x] [%x] => [], txNum: %d\n", address, key.Bytes(), hr.txNum) + } else { + fmt.Printf("ReadAccountStorage [%x] [%x] => [%x], txNum: %d\n", address, key.Bytes(), enc, hr.txNum) + } + } + if enc == nil { + return nil, nil + } + return enc, nil +} + +func (hr *HistoryReaderNoState) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) { + enc, noState, stateTxNum, err := hr.ac.ReadAccountCodeNoState(address.Bytes(), hr.txNum) + if err != nil { + return nil, err + } + if !noState { + if !hr.rs.Done(stateTxNum) { + hr.readError = true + hr.stateTxNum = stateTxNum + return nil, &RequiredStateError{StateTxNum: stateTxNum} + } + enc = hr.rs.Get(kv.Code, codeHash.Bytes()) + if enc == nil { + enc, err = hr.tx.GetOne(kv.Code, codeHash.Bytes()) + if err != nil { + return nil, err + } + } + } + if hr.trace { + fmt.Printf("ReadAccountCode [%x] => [%x], noState=%t, stateTxNum=%d, txNum: %d\n", address, enc, noState, stateTxNum, hr.txNum) + } + return enc, nil +} + +func (hr *HistoryReaderNoState) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) { + size, noState, stateTxNum, err := hr.ac.ReadAccountCodeSizeNoState(address.Bytes(), hr.txNum) + if err != nil { + return 0, err + } + if !noState { + if !hr.rs.Done(stateTxNum) { + hr.readError = true + hr.stateTxNum = stateTxNum + return 0, &RequiredStateError{StateTxNum: stateTxNum} + } + enc := hr.rs.Get(kv.Code, codeHash.Bytes()) + if enc == nil { + enc, err = hr.tx.GetOne(kv.Code, codeHash.Bytes()) + if err != nil { + return 0, err + } + } + size = len(enc) + } + if hr.trace { + fmt.Printf("ReadAccountCodeSize [%x] => [%d], txNum: %d\n", address, size, hr.txNum) + } + return size, nil +} + +func (hr *HistoryReaderNoState) ReadAccountIncarnation(address common.Address) (uint64, error) { + return 0, nil +} + +func (hr *HistoryReaderNoState) ResetError() { + hr.readError = false +} + +func (hr *HistoryReaderNoState) ReadError() (uint64, bool) { + return hr.stateTxNum, hr.readError +} diff --git a/core/state/intra_block_state.go b/core/state/intra_block_state.go index 43c98f608a2..d061ee91fe8 100644 --- a/core/state/intra_block_state.go +++ b/core/state/intra_block_state.go @@ -70,7 +70,7 @@ type IntraBlockState struct { // unable to deal with database-level errors. Any error that occurs // during a database read is memoized here and will eventually be returned // by IntraBlockState.Commit. - dbErr error + savedErr error // The refund counter, also used by state transitioning. refund uint64 @@ -115,13 +115,13 @@ func (sdb *IntraBlockState) SetTrace(trace bool) { // setErrorUnsafe sets error but should be called in medhods that already have locks func (sdb *IntraBlockState) setErrorUnsafe(err error) { - if sdb.dbErr == nil { - sdb.dbErr = err + if sdb.savedErr == nil { + sdb.savedErr = err } } func (sdb *IntraBlockState) Error() error { - return sdb.dbErr + return sdb.savedErr } // Reset clears out all ephemeral state objects from the state db, but keeps diff --git a/core/state/state_object.go b/core/state/state_object.go index 7547a97fda9..ffdcde3e770 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -69,13 +69,6 @@ type stateObject struct { original accounts.Account db *IntraBlockState - // DB error. - // State objects are used by the consensus core and VM which are - // unable to deal with database-level errors. Any error that occurs - // during a database read is memoized here and will eventually be returned - // by IntraBlockState.Commit. - dbErr error - // Write caches. //trie Trie // storage trie, which becomes non-nil on first access code Code // contract bytecode, which gets set when code is loaded @@ -134,8 +127,8 @@ func (so *stateObject) EncodeRLP(w io.Writer) error { // setError remembers the first non-nil error it is called with. func (so *stateObject) setError(err error) { - if so.dbErr == nil { - so.dbErr = err + if so.db.savedErr == nil { + so.db.savedErr = err } } diff --git a/core/state/state_recon_writer.go b/core/state/state_recon_writer.go new file mode 100644 index 00000000000..c3893b6545e --- /dev/null +++ b/core/state/state_recon_writer.go @@ -0,0 +1,257 @@ +package state + +import ( + //"fmt" + + "container/heap" + "github.com/RoaringBitmap/roaring/roaring64" + "github.com/holiman/uint256" + "github.com/ledgerwatch/erigon-lib/kv" + libstate "github.com/ledgerwatch/erigon-lib/state" + "github.com/ledgerwatch/erigon/common" + "github.com/ledgerwatch/erigon/common/dbutils" + "github.com/ledgerwatch/erigon/core/types/accounts" + "golang.org/x/exp/constraints" + "sync" +) + +type theap[T constraints.Ordered] []T + +func (h theap[T]) Len() int { + return len(h) +} + +func (h theap[T]) Less(i, j int) bool { + return h[i] < h[j] +} + +func (h theap[T]) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *theap[T]) Push(a interface{}) { + *h = append(*h, a.(T)) +} + +func (h *theap[T]) Pop() interface{} { + c := *h + *h = c[:len(c)-1] + return c[len(c)-1] +} + +// ReconState is the accumulator of changes to the state +type ReconState struct { + lock sync.RWMutex + workIterator roaring64.IntPeekable64 + doneBitmap roaring64.Bitmap + triggers map[uint64][]uint64 + queue theap[uint64] + changes map[string]map[string][]byte + sizeEstimate uint64 + rollbackCount uint64 +} + +func NewReconState() *ReconState { + rs := &ReconState{ + triggers: map[uint64][]uint64{}, + changes: map[string]map[string][]byte{}, + } + return rs +} + +func (rs *ReconState) SetWorkBitmap(workBitmap *roaring64.Bitmap) { + rs.workIterator = workBitmap.Iterator() +} + +func (rs *ReconState) Put(table string, key, val []byte) { + rs.lock.Lock() + defer rs.lock.Unlock() + t, ok := rs.changes[table] + if !ok { + t = map[string][]byte{} + rs.changes[table] = t + } + t[string(key)] = val + rs.sizeEstimate += uint64(len(key)) + uint64(len(val)) +} + +func (rs *ReconState) Delete(table string, key []byte) { + rs.lock.Lock() + defer rs.lock.Unlock() + t, ok := rs.changes[table] + if !ok { + t = map[string][]byte{} + rs.changes[table] = t + } + t[string(key)] = nil + rs.sizeEstimate += uint64(len(key)) +} + +func (rs *ReconState) Get(table string, key []byte) []byte { + rs.lock.RLock() + defer rs.lock.RUnlock() + t, ok := rs.changes[table] + if !ok { + return nil + } + return t[string(key)] +} + +func (rs *ReconState) Flush(rwTx kv.RwTx) error { + rs.lock.Lock() + defer rs.lock.Unlock() + for table, t := range rs.changes { + for ks, val := range t { + if len(val) == 0 { + if err := rwTx.Delete(table, []byte(ks), nil); err != nil { + return err + } + } else { + if err := rwTx.Put(table, []byte(ks), val); err != nil { + return err + } + } + } + } + rs.changes = map[string]map[string][]byte{} + rs.sizeEstimate = 0 + return nil +} + +func (rs *ReconState) Schedule() (uint64, bool) { + rs.lock.Lock() + defer rs.lock.Unlock() + for rs.queue.Len() < 16 && rs.workIterator.HasNext() { + heap.Push(&rs.queue, rs.workIterator.Next()) + } + if rs.queue.Len() > 0 { + return heap.Pop(&rs.queue).(uint64), true + } + return 0, false +} + +func (rs *ReconState) CommitTxNum(txNum uint64) { + rs.lock.Lock() + defer rs.lock.Unlock() + if tt, ok := rs.triggers[txNum]; ok { + for _, t := range tt { + heap.Push(&rs.queue, t) + } + delete(rs.triggers, txNum) + } + rs.doneBitmap.Add(txNum) +} + +func (rs *ReconState) RollbackTxNum(txNum, dependency uint64) { + rs.lock.Lock() + defer rs.lock.Unlock() + if rs.doneBitmap.Contains(dependency) { + heap.Push(&rs.queue, txNum) + } else { + tt, _ := rs.triggers[dependency] + tt = append(tt, txNum) + rs.triggers[dependency] = tt + } + rs.rollbackCount++ +} + +func (rs *ReconState) Done(txNum uint64) bool { + rs.lock.RLock() + defer rs.lock.RUnlock() + return rs.doneBitmap.Contains(txNum) +} + +func (rs *ReconState) DoneCount() uint64 { + rs.lock.RLock() + defer rs.lock.RUnlock() + return rs.doneBitmap.GetCardinality() +} + +func (rs *ReconState) RollbackCount() uint64 { + rs.lock.RLock() + defer rs.lock.RUnlock() + return rs.rollbackCount +} + +func (rs *ReconState) SizeEstimate() uint64 { + rs.lock.RLock() + defer rs.lock.RUnlock() + return rs.sizeEstimate +} + +type StateReconWriter struct { + ac *libstate.AggregatorContext + rs *ReconState + txNum uint64 +} + +func NewStateReconWriter(ac *libstate.AggregatorContext, rs *ReconState) *StateReconWriter { + return &StateReconWriter{ + ac: ac, + rs: rs, + } +} + +func (w *StateReconWriter) SetTxNum(txNum uint64) { + w.txNum = txNum +} + +func (w *StateReconWriter) UpdateAccountData(address common.Address, original, account *accounts.Account) error { + found, txNum := w.ac.MaxAccountsTxNum(address.Bytes()) + if !found { + return nil + } + if txNum != w.txNum { + //fmt.Printf("no change account [%x] txNum = %d\n", address, txNum) + return nil + } + value := make([]byte, account.EncodingLengthForStorage()) + account.EncodeForStorage(value) + //fmt.Printf("account [%x]=>{Balance: %d, Nonce: %d, Root: %x, CodeHash: %x} txNum: %d\n", address, &account.Balance, account.Nonce, account.Root, account.CodeHash, w.txNum) + w.rs.Put(kv.PlainState, address[:], value) + return nil +} + +func (w *StateReconWriter) UpdateAccountCode(address common.Address, incarnation uint64, codeHash common.Hash, code []byte) error { + found, txNum := w.ac.MaxCodeTxNum(address.Bytes()) + if !found { + return nil + } + if txNum != w.txNum { + //fmt.Printf("no change code [%x] txNum = %d\n", address, txNum) + return nil + } + w.rs.Put(kv.Code, codeHash[:], code) + if len(code) > 0 { + //fmt.Printf("code [%x] => [%x] CodeHash: %x, txNum: %d\n", address, code, codeHash, w.txNum) + w.rs.Put(kv.PlainContractCode, dbutils.PlainGenerateStoragePrefix(address[:], FirstContractIncarnation), codeHash[:]) + } + return nil +} + +func (w *StateReconWriter) DeleteAccount(address common.Address, original *accounts.Account) error { + return nil +} + +func (w *StateReconWriter) WriteAccountStorage(address common.Address, incarnation uint64, key *common.Hash, original, value *uint256.Int) error { + found, txNum := w.ac.MaxStorageTxNum(address.Bytes(), key.Bytes()) + if !found { + //fmt.Printf("no found storage [%x] [%x]\n", address, *key) + return nil + } + if txNum != w.txNum { + //fmt.Printf("no change storage [%x] [%x] txNum = %d\n", address, *key, txNum) + return nil + } + v := value.Bytes() + if len(v) != 0 { + //fmt.Printf("storage [%x] [%x] => [%x], txNum: %d\n", address, *key, v, w.txNum) + compositeKey := dbutils.PlainGenerateCompositeStorageKey(address.Bytes(), FirstContractIncarnation, key.Bytes()) + w.rs.Put(kv.PlainState, compositeKey, v) + } + return nil +} + +func (w *StateReconWriter) CreateContract(address common.Address) error { + return nil +} diff --git a/go.mod b/go.mod index 415f1852b12..d2360950a28 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/json-iterator/go v1.1.12 github.com/julienschmidt/httprouter v1.3.0 github.com/kevinburke/go-bindata v3.21.0+incompatible - github.com/ledgerwatch/erigon-lib v0.0.0-20220701042032-ed452dbc4b21 + github.com/ledgerwatch/erigon-lib v0.0.0-20220702183834-707a89842d6b github.com/ledgerwatch/log/v3 v3.4.1 github.com/ledgerwatch/secp256k1 v1.0.0 github.com/nxadm/tail v1.4.9-0.20211216163028-4472660a31a6 diff --git a/go.sum b/go.sum index 982d284196a..45f0c922d41 100644 --- a/go.sum +++ b/go.sum @@ -383,8 +383,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20220701042032-ed452dbc4b21 h1:mZAojUAtvuvFLS8sumuYlZrHKGvkjTBxA6fvvujT/Kc= -github.com/ledgerwatch/erigon-lib v0.0.0-20220701042032-ed452dbc4b21/go.mod h1:7sQ5B5m54zoo7RVRVukH3YZCYVrCC+BmwDBD+9KyTrE= +github.com/ledgerwatch/erigon-lib v0.0.0-20220702183834-707a89842d6b h1:jxk2V9PBN9z2FQIL2SAV3V1wq01RUPz2kgzSqaCZmJQ= +github.com/ledgerwatch/erigon-lib v0.0.0-20220702183834-707a89842d6b/go.mod h1:7sQ5B5m54zoo7RVRVukH3YZCYVrCC+BmwDBD+9KyTrE= github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc= github.com/ledgerwatch/log/v3 v3.4.1/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY= github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ= diff --git a/turbo/snapshotsync/block_reader.go b/turbo/snapshotsync/block_reader.go index c5c5ef2d365..c35e22b99fd 100644 --- a/turbo/snapshotsync/block_reader.go +++ b/turbo/snapshotsync/block_reader.go @@ -313,6 +313,9 @@ func (back *BlockReaderWithSnapshots) Header(ctx context.Context, tx kv.Getter, } return nil }) + if err != nil { + return h, err + } if ok { return h, nil }