Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
Experiment in parallel execution (ledgerwatch#4652)
Browse files Browse the repository at this point in the history
* Restructure tx execution

* fixes

* Fixes and traces

* Tracing

* More tracing

* Drain the result channel

* Intermediate

* more efficient parallel exec

* Sorted buffer

* Fix results size

* fix for the recon

* Fix compilation

* Sort keys in Write and Read sets, fix compilation in rpcdaemon22

* Update to latest erigon-lib

* Update to erigon-lib

* Remove go.mod replace

* Update erigon-lib

* Update to erigon-lib main

* Fix lint

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
  • Loading branch information
3 people committed Jul 23, 2022
1 parent 1cb6be0 commit 81d106b
Show file tree
Hide file tree
Showing 12 changed files with 656 additions and 243 deletions.
12 changes: 7 additions & 5 deletions cmd/rpcdaemon22/commands/eth_receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([
txNumbers := roaring64.New()
txNumbers.AddRange(fromTxNum, toTxNum) // [min,max)

topicsBitmap, err := getTopicsBitmap(api._agg, tx, crit.Topics, fromTxNum, toTxNum)
ac := api._agg.MakeContext()

topicsBitmap, err := getTopicsBitmap(ac, tx, crit.Topics, fromTxNum, toTxNum)
if err != nil {
return nil, err
}
Expand All @@ -139,7 +141,7 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([
var addrBitmap *roaring64.Bitmap
for _, addr := range crit.Addresses {
var bitmapForORing roaring64.Bitmap
it := api._agg.LogAddrIterator(addr.Bytes(), fromTxNum, toTxNum, nil)
it := ac.LogAddrIterator(addr.Bytes(), fromTxNum, toTxNum, nil)
for it.HasNext() {
bitmapForORing.Add(it.Next())
}
Expand All @@ -162,7 +164,7 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([
var lastHeader *types.Header
var lastSigner *types.Signer
var lastRules *params.Rules
stateReader := state.NewHistoryReader22(api._agg, nil /* ReadIndices */)
stateReader := state.NewHistoryReader22(ac, nil /* ReadIndices */)
iter := txNumbers.Iterator()
for iter.HasNext() {
txNum := iter.Next()
Expand Down Expand Up @@ -233,12 +235,12 @@ func (api *APIImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([
// {{}, {B}} matches any topic in first position AND B in second position
// {{A}, {B}} matches topic A in first position AND B in second position
// {{A, B}, {C, D}} matches topic (A OR B) in first position AND (C OR D) in second position
func getTopicsBitmap(a *libstate.Aggregator, c kv.Tx, topics [][]common.Hash, from, to uint64) (*roaring64.Bitmap, error) {
func getTopicsBitmap(ac *libstate.AggregatorContext, c kv.Tx, topics [][]common.Hash, from, to uint64) (*roaring64.Bitmap, error) {
var result *roaring64.Bitmap
for _, sub := range topics {
var bitmapForORing roaring64.Bitmap
for _, topic := range sub {
it := a.LogTopicIterator(topic.Bytes(), from, to, nil)
it := ac.LogTopicIterator(topic.Bytes(), from, to, nil)
for it.HasNext() {
bitmapForORing.Add(it.Next())
}
Expand Down
7 changes: 4 additions & 3 deletions cmd/rpcdaemon22/commands/trace_filtering.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,11 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str
allTxs roaring64.Bitmap
txsTo roaring64.Bitmap
)
ac := api._agg.MakeContext()

for _, addr := range req.FromAddress {
if addr != nil {
it := api._agg.TraceFromIterator(addr.Bytes(), fromTxNum, toTxNum, nil)
it := ac.TraceFromIterator(addr.Bytes(), fromTxNum, toTxNum, nil)
for it.HasNext() {
allTxs.Add(it.Next())
}
Expand All @@ -266,7 +267,7 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str

for _, addr := range req.ToAddress {
if addr != nil {
it := api._agg.TraceToIterator(addr.Bytes(), fromTxNum, toTxNum, nil)
it := ac.TraceToIterator(addr.Bytes(), fromTxNum, toTxNum, nil)
for it.HasNext() {
txsTo.Add(it.Next())
}
Expand Down Expand Up @@ -319,7 +320,7 @@ func (api *TraceAPIImpl) Filter(ctx context.Context, req TraceFilterRequest, str
var lastHeader *types.Header
var lastSigner *types.Signer
var lastRules *params.Rules
stateReader := state.NewHistoryReader22(api._agg, nil /* ReadIndices */)
stateReader := state.NewHistoryReader22(ac, nil /* ReadIndices */)
noop := state.NewNoopWriter()
for it.HasNext() {
txNum := uint64(it.Next())
Expand Down
12 changes: 6 additions & 6 deletions cmd/state/commands/erigon22.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func Erigon22(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log
}
return h
}
readWrapper := &ReaderWrapper22{r: agg, roTx: rwTx}
readWrapper := &ReaderWrapper22{ac: agg.MakeContext(), roTx: rwTx}
writeWrapper := &WriterWrapper22{w: agg}

for !interrupt {
Expand Down Expand Up @@ -396,7 +396,7 @@ func processBlock22(startTxNum uint64, trace bool, txNumStart uint64, rw *Reader
// Implements StateReader and StateWriter
type ReaderWrapper22 struct {
roTx kv.Tx
r *libstate.Aggregator
ac *libstate.AggregatorContext
blockNum uint64
}

Expand All @@ -406,7 +406,7 @@ type WriterWrapper22 struct {
}

func (rw *ReaderWrapper22) ReadAccountData(address common.Address) (*accounts.Account, error) {
enc, err := rw.r.ReadAccountData(address.Bytes(), rw.roTx)
enc, err := rw.ac.ReadAccountData(address.Bytes(), rw.roTx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -444,7 +444,7 @@ func (rw *ReaderWrapper22) ReadAccountData(address common.Address) (*accounts.Ac
}

func (rw *ReaderWrapper22) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) {
enc, err := rw.r.ReadAccountStorage(address.Bytes(), key.Bytes(), rw.roTx)
enc, err := rw.ac.ReadAccountStorage(address.Bytes(), key.Bytes(), rw.roTx)
if err != nil {
return nil, err
}
Expand All @@ -458,11 +458,11 @@ func (rw *ReaderWrapper22) ReadAccountStorage(address common.Address, incarnatio
}

func (rw *ReaderWrapper22) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) {
return rw.r.ReadAccountCode(address.Bytes(), rw.roTx)
return rw.ac.ReadAccountCode(address.Bytes(), rw.roTx)
}

func (rw *ReaderWrapper22) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) {
return rw.r.ReadAccountCodeSize(address.Bytes(), rw.roTx)
return rw.ac.ReadAccountCodeSize(address.Bytes(), rw.roTx)
}

func (rw *ReaderWrapper22) ReadAccountIncarnation(address common.Address) (uint64, error) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/state/commands/history22.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func History22(genesis *core.Genesis, logger log.Logger) error {
return fmt.Errorf("reopen snapshot segments: %w", err)
}
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
readWrapper := state.NewHistoryReader22(h.MakeContext(), ri)

for !interrupt {
select {
Expand Down Expand Up @@ -169,7 +170,6 @@ func History22(genesis *core.Genesis, logger log.Logger) error {
txNum += uint64(len(b.Transactions())) + 2 // Pre and Post block transaction
continue
}
readWrapper := state.NewHistoryReader22(h, ri)
if traceBlock != 0 {
readWrapper.SetTrace(blockNum == uint64(traceBlock))
}
Expand Down
195 changes: 195 additions & 0 deletions cmd/state/commands/replay_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package commands

import (
"context"
"fmt"
"path"
"path/filepath"
"sort"

"github.com/ledgerwatch/erigon-lib/kv/memdb"
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/state"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
)

var txhash string
var txnum uint64

func init() {
withDataDir(replayTxCmd)
rootCmd.AddCommand(replayTxCmd)
replayTxCmd.Flags().StringVar(&txhash, "txhash", "", "hash of the transaction to replay")
replayTxCmd.Flags().Uint64Var(&txnum, "txnum", 0, "tx num for replay")
}

var replayTxCmd = &cobra.Command{
Use: "replaytx",
Short: "Experimental command to replay a given transaction using only history",
RunE: func(cmd *cobra.Command, args []string) error {
return ReplayTx(genesis)
},
}

func ReplayTx(genesis *core.Genesis) error {
var blockReader services.FullBlockReader
var allSnapshots *snapshotsync.RoSnapshots
allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots"))
defer allSnapshots.Close()
if err := allSnapshots.Reopen(); err != nil {
return fmt.Errorf("reopen snapshot segments: %w", err)
}
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
// Compute mapping blockNum -> last TxNum in that block
txNums := make([]uint64, allSnapshots.BlocksAvailable()+1)
if err := allSnapshots.Bodies.View(func(bs []*snapshotsync.BodySegment) error {
for _, b := range bs {
if err := b.Iterate(func(blockNum, baseTxNum, txAmount uint64) {
txNums[blockNum] = baseTxNum + txAmount
}); err != nil {
return err
}
}
return nil
}); err != nil {
return fmt.Errorf("build txNum => blockNum mapping: %w", err)
}
ctx := context.Background()
var txNum uint64
if txhash != "" {
txnHash := common.HexToHash(txhash)
fmt.Printf("Tx hash = [%x]\n", txnHash)
db := memdb.New()
roTx, err := db.BeginRo(ctx)
if err != nil {
return err
}
defer roTx.Rollback()
bn, ok, err := blockReader.TxnLookup(ctx, roTx, txnHash)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("transaction not found")
}
fmt.Printf("Found in block %d\n", bn)
var header *types.Header
if header, err = blockReader.HeaderByNumber(ctx, nil, bn); err != nil {
return err
}
blockHash := header.Hash()
b, _, err := blockReader.BlockWithSenders(ctx, nil, blockHash, bn)
if err != nil {
return err
}
txs := b.Transactions()
var txIndex int
for txIndex = 0; txIndex < len(txs); txIndex++ {
if txs[txIndex].Hash() == txnHash {
fmt.Printf("txIndex = %d\n", txIndex)
break
}
}
txNum = txNums[bn-1] + 1 + uint64(txIndex)
} else {
txNum = txnum
}
fmt.Printf("txNum = %d\n", txNum)
aggPath := filepath.Join(datadir, "erigon23")
agg, err := libstate.NewAggregator(aggPath, AggregationStep)
if err != nil {
return fmt.Errorf("create history: %w", err)
}
defer agg.Close()
ac := agg.MakeContext()
workCh := make(chan state.TxTask)
rs := state.NewReconState(workCh)
if err = replayTxNum(ctx, allSnapshots, blockReader, txNum, txNums, rs, ac); err != nil {
return err
}
return nil
}

func replayTxNum(ctx context.Context, allSnapshots *snapshotsync.RoSnapshots, blockReader services.FullBlockReader,
txNum uint64, txNums []uint64, rs *state.ReconState, ac *libstate.AggregatorContext,
) error {
bn := uint64(sort.Search(len(txNums), func(i int) bool {
return txNums[i] > txNum
}))
txIndex := int(txNum - txNums[bn-1] - 1)
fmt.Printf("bn=%d, txIndex=%d\n", bn, txIndex)
var header *types.Header
var err error
if header, err = blockReader.HeaderByNumber(ctx, nil, bn); err != nil {
return err
}
blockHash := header.Hash()
b, _, err := blockReader.BlockWithSenders(ctx, nil, blockHash, bn)
if err != nil {
return err
}
txn := b.Transactions()[txIndex]
stateWriter := state.NewStateReconWriter(ac, rs)
stateReader := state.NewHistoryReaderNoState(ac, rs)
stateReader.SetTxNum(txNum)
stateWriter.SetTxNum(txNum)
noop := state.NewNoopWriter()
rules := chainConfig.Rules(bn)
for {
stateReader.ResetError()
ibs := state.New(stateReader)
gp := new(core.GasPool).AddGas(txn.GetGas())
//fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d, gas=%d, input=[%x]\n", txNum, blockNum, txIndex, txn.GetGas(), txn.GetData())
vmConfig := vm.Config{NoReceipts: true, SkipAnalysis: core.SkipAnalysis(chainConfig, bn)}
contractHasTEVM := func(contractHash common.Hash) (bool, error) { return false, nil }
getHeader := func(hash common.Hash, number uint64) *types.Header {
h, err := blockReader.Header(ctx, nil, hash, number)
if err != nil {
panic(err)
}
return h
}
getHashFn := core.GetHashFn(header, getHeader)
logger := log.New()
engine := initConsensusEngine(chainConfig, logger, allSnapshots)
txnHash := txn.Hash()
blockContext := core.NewEVMBlockContext(header, getHashFn, engine, nil /* author */, contractHasTEVM)
ibs.Prepare(txnHash, blockHash, txIndex)
msg, err := txn.AsMessage(*types.MakeSigner(chainConfig, bn), header.BaseFee, rules)
if err != nil {
return err
}
txContext := core.NewEVMTxContext(msg)
vmenv := vm.NewEVM(blockContext, txContext, ibs, chainConfig, vmConfig)

_, err = core.ApplyMessage(vmenv, msg, gp, true /* refunds */, false /* gasBailout */)
if err != nil {
return fmt.Errorf("could not apply tx %d [%x] failed: %w", txIndex, txnHash, err)
}
if err = ibs.FinalizeTx(rules, noop); err != nil {
return err
}
if dependency, ok := stateReader.ReadError(); ok {
fmt.Printf("dependency %d on %d\n", txNum, dependency)
if err = replayTxNum(ctx, allSnapshots, blockReader, dependency, txNums, rs, ac); err != nil {
return err
}
} else {
if err = ibs.CommitBlock(rules, stateWriter); err != nil {
return err
}
break
}
}
rs.CommitTxNum(txNum)
fmt.Printf("commited %d\n", txNum)
return nil
}
Loading

0 comments on commit 81d106b

Please sign in to comment.