Skip to content

Commit

Permalink
Parallel: handle fixup & code review & enhancement
Browse files Browse the repository at this point in the history
No fundamental change, some improvements, include:
** Add a new type ParallelStateProcessor;
** move Parallel Config to BlockChain
** more precious ParallelNum set
** Add EnableParallelProcessor()
** remove panic()
** remove useless: redo flag,
** change waitChan from `chan int` to `chan struct {}` and communicate by close()
** dispatch policy: queue `from` ahead of `to`
** pre-allocate allLogs
** disable parallel processor is snapshot is not enabled
** others: rename...
  • Loading branch information
setunapo committed May 20, 2022
1 parent dd1ce37 commit 6d22e98
Show file tree
Hide file tree
Showing 9 changed files with 390 additions and 448 deletions.
46 changes: 31 additions & 15 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"math/big"
"os"
"path/filepath"
"runtime"
godebug "runtime/debug"
"strconv"
"strings"
Expand Down Expand Up @@ -804,17 +805,16 @@ var (
}
ParallelTxFlag = cli.BoolFlag{
Name: "parallel",
Usage: "Enable the experimental parallel transaction execution mode (default = false)",
Usage: "Enable the experimental parallel transaction execution mode, only valid in full sync mode (default = false)",
}
ParallelTxNumFlag = cli.IntFlag{
Name: "parallel.num",
Usage: "Number of slot for transaction execution, only valid in parallel mode (default: CPUNum - 1)",
Value: core.ParallelExecNum,
Usage: "Number of slot for transaction execution, only valid in parallel mode (runtime calculated, no fixed default value)",
}
ParallelTxQueueSizeFlag = cli.IntFlag{
Name: "parallel.queuesize",
Usage: "Max number of Tx that can be queued to a slot, only valid in parallel mode",
Value: core.MaxPendingQueueSize,
Usage: "Max number of Tx that can be queued to a slot, only valid in parallel mode (advanced option)",
Value: 20,
}

// Init network
Expand Down Expand Up @@ -1336,16 +1336,6 @@ func SetNodeConfig(ctx *cli.Context, cfg *node.Config) {
if ctx.GlobalIsSet(InsecureUnlockAllowedFlag.Name) {
cfg.InsecureUnlockAllowed = ctx.GlobalBool(InsecureUnlockAllowedFlag.Name)
}
if ctx.GlobalIsSet(ParallelTxFlag.Name) {
core.ParallelTxMode = true
}
if ctx.GlobalIsSet(ParallelTxNumFlag.Name) {
core.ParallelExecNum = ctx.GlobalInt(ParallelTxNumFlag.Name)
}
if ctx.GlobalIsSet(ParallelTxQueueSizeFlag.Name) {
core.MaxPendingQueueSize = ctx.GlobalInt(ParallelTxQueueSizeFlag.Name)
}

}

func setSmartCard(ctx *cli.Context, cfg *node.Config) {
Expand Down Expand Up @@ -1666,6 +1656,32 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.GlobalIsSet(RangeLimitFlag.Name) {
cfg.RangeLimit = ctx.GlobalBool(RangeLimitFlag.Name)
}
if ctx.GlobalIsSet(ParallelTxFlag.Name) {
cfg.ParallelTxMode = ctx.GlobalBool(ParallelTxFlag.Name)
// The best prallel num will be tuned later, we do a simple parallel num set here
numCpu := runtime.NumCPU()
var parallelNum int
if ctx.GlobalIsSet(ParallelTxNumFlag.Name) {
// first of all, we use "--parallel.num", but "--parallel.num 0" is not allowed
parallelNum = ctx.GlobalInt(ParallelTxNumFlag.Name)
if parallelNum < 1 {
parallelNum = 1
}
} else if numCpu == 1 {
parallelNum = 1 // single CPU core
} else if numCpu < 10 {
parallelNum = numCpu - 1
} else {
parallelNum = 8 // we found concurrency 8 is slightly better than 15
}
cfg.ParallelTxNum = parallelNum
// set up queue size, it is an advanced option
if ctx.GlobalIsSet(ParallelTxQueueSizeFlag.Name) {
cfg.ParallelTxQueueSize = ctx.GlobalInt(ParallelTxQueueSizeFlag.Name)
} else {
cfg.ParallelTxQueueSize = 20 // default queue size, will be optimized
}
}
// Read the value from the flag no matter if it's set or not.
cfg.Preimages = ctx.GlobalBool(CachePreimagesFlag.Name)
if cfg.NoPruning && !cfg.Preimages {
Expand Down
44 changes: 22 additions & 22 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ var (

errInsertionInterrupted = errors.New("insertion is interrupted")
errStateRootVerificationFailed = errors.New("state root verification failed")
ParallelTxMode = false // parallel transaction execution
)

const (
Expand Down Expand Up @@ -241,12 +240,13 @@ type BlockChain struct {
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing

engine consensus.Engine
prefetcher Prefetcher
validator Validator // Block and state validator interface
processor Processor // Block transaction processor interface
vmConfig vm.Config
pipeCommit bool
engine consensus.Engine
prefetcher Prefetcher
validator Validator // Block and state validator interface
processor Processor // Block transaction processor interface
vmConfig vm.Config
pipeCommit bool
parallelExecution bool

shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
Expand Down Expand Up @@ -311,9 +311,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
bc.prefetcher = NewStatePrefetcher(chainConfig, bc, engine)
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.processor = NewStateProcessor(chainConfig, bc, engine)
if ParallelTxMode {
bc.processor.InitParallelOnce()
}

var err error
bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped)
Expand Down Expand Up @@ -2114,15 +2111,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
if err != nil {
return it.index, err
}

bc.updateHighestVerifiedHeader(block.Header())

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
var followupInterrupt uint32
// For diff sync, it may fallback to full sync, so we still do prefetch
// parallel mode has a pipeline, similar to this prefetch, to save CPU we disable this prefetch for parallel
if !ParallelTxMode {
if !bc.parallelExecution {
if len(block.Transactions()) >= prefetchTxNumber {
throwaway := statedb.Copy()
go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
Expand All @@ -2136,16 +2132,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
statedb.EnablePipeCommit()
}
statedb.SetExpectedStateRoot(block.Root())

var receipts types.Receipts
var logs []*types.Log
var usedGas uint64
if ParallelTxMode {
statedb, receipts, logs, usedGas, err = bc.processor.ProcessParallel(block, statedb, bc.vmConfig)
} else {
statedb, receipts, logs, usedGas, err = bc.processor.Process(block, statedb, bc.vmConfig)
}

statedb, receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
atomic.StoreUint32(&followupInterrupt, 1)
activeState = statedb
if err != nil {
Expand Down Expand Up @@ -3122,3 +3109,16 @@ func EnablePersistDiff(limit uint64) BlockChainOption {
return chain
}
}

func EnableParallelProcessor(parallelNum int, queueSize int) BlockChainOption {
return func(chain *BlockChain) *BlockChain {
if chain.snaps == nil {
// disable parallel processor if snapshot is not enabled to avoid concurrent issue for SecureTrie
log.Info("parallel processor is not enabled since snapshot is not enabled")
return chain
}
chain.parallelExecution = true
chain.processor = NewParallelStateProcessor(chain.Config(), chain, chain.engine, parallelNum, queueSize)
return chain
}
}
2 changes: 1 addition & 1 deletion core/state/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (ch createObjectChange) revert(s *StateDB) {
if s.parallel.isSlotDB {
delete(s.parallel.dirtiedStateObjectsInSlot, *ch.account)
} else {
s.deleteStateObjectFromStateDB(*ch.account)
s.deleteStateObj(*ch.account)
}
delete(s.stateObjectsDirty, *ch.account)
}
Expand Down
2 changes: 0 additions & 2 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ func (c Code) String() string {
return string(c) //strings.Join(Disassemble(c), " ")
}

type StorageKeys map[common.Hash]struct{}

type Storage map[common.Hash]common.Hash

func (s Storage) String() (str string) {
Expand Down
Loading

0 comments on commit 6d22e98

Please sign in to comment.