Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move NumSpeculativeProcs from module variable to function parameter #931

Merged
merged 1 commit into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 33 additions & 35 deletions core/blockstm/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ type ExecVersionView struct {
sender common.Address
}

var NumSpeculativeProcs int = 8

func SetProcs(specProcs int) {
NumSpeculativeProcs = specProcs
}

func (ev *ExecVersionView) Execute() (er ExecResult) {
er.ver = ev.ver
if er.err = ev.et.Execute(ev.mvh, ev.ver.Incarnation); er.err != nil {
Expand Down Expand Up @@ -180,6 +174,9 @@ type ParallelExecutor struct {
// Stores the execution statistics for the last incarnation of each task
stats map[int]ExecutionStat

// Number of workers that execute transactions speculatively
numSpeculativeProcs int

statsMutex sync.Mutex

// Channel for tasks that should be prioritized
Expand Down Expand Up @@ -255,7 +252,7 @@ type ExecutionStat struct {
Worker int
}

func NewParallelExecutor(tasks []ExecTask, profile bool, metadata bool) *ParallelExecutor {
func NewParallelExecutor(tasks []ExecTask, profile bool, metadata bool, numProcs int) *ParallelExecutor {
numTasks := len(tasks)

var resultQueue SafeQueue
Expand All @@ -271,27 +268,28 @@ func NewParallelExecutor(tasks []ExecTask, profile bool, metadata bool) *Paralle
}

pe := &ParallelExecutor{
tasks: tasks,
stats: make(map[int]ExecutionStat, numTasks),
chTasks: make(chan ExecVersionView, numTasks),
chSpeculativeTasks: make(chan struct{}, numTasks),
chSettle: make(chan int, numTasks),
chResults: make(chan struct{}, numTasks),
specTaskQueue: specTaskQueue,
resultQueue: resultQueue,
lastSettled: -1,
skipCheck: make(map[int]bool),
execTasks: makeStatusManager(numTasks),
validateTasks: makeStatusManager(0),
diagExecSuccess: make([]int, numTasks),
diagExecAbort: make([]int, numTasks),
mvh: MakeMVHashMap(),
lastTxIO: MakeTxnInputOutput(numTasks),
txIncarnations: make([]int, numTasks),
estimateDeps: make(map[int][]int),
preValidated: make(map[int]bool),
begin: time.Now(),
profile: profile,
tasks: tasks,
numSpeculativeProcs: numProcs,
stats: make(map[int]ExecutionStat, numTasks),
chTasks: make(chan ExecVersionView, numTasks),
chSpeculativeTasks: make(chan struct{}, numTasks),
chSettle: make(chan int, numTasks),
chResults: make(chan struct{}, numTasks),
specTaskQueue: specTaskQueue,
resultQueue: resultQueue,
lastSettled: -1,
skipCheck: make(map[int]bool),
execTasks: makeStatusManager(numTasks),
validateTasks: makeStatusManager(0),
diagExecSuccess: make([]int, numTasks),
diagExecAbort: make([]int, numTasks),
mvh: MakeMVHashMap(),
lastTxIO: MakeTxnInputOutput(numTasks),
txIncarnations: make([]int, numTasks),
estimateDeps: make(map[int][]int),
preValidated: make(map[int]bool),
begin: time.Now(),
profile: profile,
}

return pe
Expand Down Expand Up @@ -329,10 +327,10 @@ func (pe *ParallelExecutor) Prepare() error {
}
}

pe.workerWg.Add(NumSpeculativeProcs + numGoProcs)
pe.workerWg.Add(pe.numSpeculativeProcs + numGoProcs)

// Launch workers that execute transactions
for i := 0; i < NumSpeculativeProcs+numGoProcs; i++ {
for i := 0; i < pe.numSpeculativeProcs+numGoProcs; i++ {
go func(procNum int) {
defer pe.workerWg.Done()

Expand Down Expand Up @@ -366,7 +364,7 @@ func (pe *ParallelExecutor) Prepare() error {
}
}

if procNum < NumSpeculativeProcs {
if procNum < pe.numSpeculativeProcs {
for range pe.chSpeculativeTasks {
doWork(pe.specTaskQueue.Pop().(ExecVersionView))
}
Expand Down Expand Up @@ -597,12 +595,12 @@ func (pe *ParallelExecutor) Step(res *ExecResult) (result ParallelExecutionResul

type PropertyCheck func(*ParallelExecutor) error

func executeParallelWithCheck(tasks []ExecTask, profile bool, check PropertyCheck, metadata bool, interruptCtx context.Context) (result ParallelExecutionResult, err error) {
func executeParallelWithCheck(tasks []ExecTask, profile bool, check PropertyCheck, metadata bool, numProcs int, interruptCtx context.Context) (result ParallelExecutionResult, err error) {
if len(tasks) == 0 {
return ParallelExecutionResult{MakeTxnInputOutput(len(tasks)), nil, nil, nil}, nil
}

pe := NewParallelExecutor(tasks, profile, metadata)
pe := NewParallelExecutor(tasks, profile, metadata, numProcs)
err = pe.Prepare()

if err != nil {
Expand Down Expand Up @@ -636,6 +634,6 @@ func executeParallelWithCheck(tasks []ExecTask, profile bool, check PropertyChec
return
}

func ExecuteParallel(tasks []ExecTask, profile bool, metadata bool, interruptCtx context.Context) (result ParallelExecutionResult, err error) {
return executeParallelWithCheck(tasks, profile, nil, metadata, interruptCtx)
func ExecuteParallel(tasks []ExecTask, profile bool, metadata bool, numProcs int, interruptCtx context.Context) (result ParallelExecutionResult, err error) {
return executeParallelWithCheck(tasks, profile, nil, metadata, numProcs, interruptCtx)
}
10 changes: 6 additions & 4 deletions core/blockstm/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (

type OpType int

var numProcs = 8

const readType = 0
const writeType = 1
const otherType = 2
Expand Down Expand Up @@ -425,7 +427,7 @@ func runParallel(t *testing.T, tasks []ExecTask, validation PropertyCheck, metad
profile := false

start := time.Now()
result, err := executeParallelWithCheck(tasks, false, validation, metadata, nil)
result, err := executeParallelWithCheck(tasks, false, validation, metadata, numProcs, nil)

if result.Deps != nil && profile {
result.Deps.Report(*result.Stats, func(str string) { fmt.Println(str) })
Expand Down Expand Up @@ -458,7 +460,7 @@ func runParallel(t *testing.T, tasks []ExecTask, validation PropertyCheck, metad
func runParallelGetMetadata(t *testing.T, tasks []ExecTask, validation PropertyCheck) map[int]map[int]bool {
t.Helper()

res, err := executeParallelWithCheck(tasks, true, validation, false, nil)
res, err := executeParallelWithCheck(tasks, true, validation, false, numProcs, nil)

assert.NoError(t, err, "error occur during parallel execution")

Expand Down Expand Up @@ -943,7 +945,7 @@ func TestBreakFromCircularDependency(t *testing.T) {
cancel()

// This should not hang
_, err := ExecuteParallel(tasks, false, true, ctx)
_, err := ExecuteParallel(tasks, false, true, numProcs, ctx)

if err == nil {
t.Error("Expected cancel error")
Expand Down Expand Up @@ -976,7 +978,7 @@ func TestBreakFromPartialCircularDependency(t *testing.T) {
cancel()

// This should not hang
_, err := ExecuteParallel(tasks, false, true, ctx)
_, err := ExecuteParallel(tasks, false, true, numProcs, ctx)

if err == nil {
t.Error("Expected cancel error")
Expand Down
6 changes: 2 additions & 4 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,6 @@
// transactions failed to execute due to insufficient gas it will return an error.
// nolint:gocognit
func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptCtx context.Context) (types.Receipts, []*types.Log, uint64, error) {
blockstm.SetProcs(cfg.ParallelSpeculativeProcesses)

var (
receipts types.Receipts
header = block.Header()
Expand Down Expand Up @@ -364,7 +362,7 @@
backupStateDB := statedb.Copy()

profile := false
result, err := blockstm.ExecuteParallel(tasks, profile, metadata, interruptCtx)
result, err := blockstm.ExecuteParallel(tasks, profile, metadata, cfg.ParallelSpeculativeProcesses, interruptCtx)

if err == nil && profile && result.Deps != nil {
_, weight := result.Deps.LongestPath(*result.Stats)
Expand Down Expand Up @@ -398,7 +396,7 @@
t.totalUsedGas = usedGas
}

_, err = blockstm.ExecuteParallel(tasks, false, metadata, interruptCtx)
_, err = blockstm.ExecuteParallel(tasks, false, metadata, cfg.ParallelSpeculativeProcesses, interruptCtx)

Check warning on line 399 in core/parallel_state_processor.go

View check run for this annotation

Codecov / codecov/patch

core/parallel_state_processor.go#L399

Added line #L399 was not covered by tests

break
}
Expand Down