-
Notifications
You must be signed in to change notification settings - Fork 182
/
execution_context.go
133 lines (109 loc) · 2.78 KB
/
execution_context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package state
import (
"bytes"
"fmt"
"github.com/okex/exchain/libs/tendermint/libs/log"
"github.com/okex/exchain/libs/tendermint/types"
)
type prerunContext struct {
prerunTx bool
taskChan chan *executionTask
taskResultChan chan *executionTask
prerunTask *executionTask
logger log.Logger
}
func newPrerunContex(logger log.Logger) *prerunContext {
return &prerunContext{
taskChan: make(chan *executionTask, 1),
taskResultChan: make(chan *executionTask, 1),
logger: logger,
}
}
func (pc *prerunContext) checkIndex(height int64) {
var index int64
if pc.prerunTask != nil {
index = pc.prerunTask.index
}
pc.logger.Info("Not apply delta", "height", height, "prerunIndex", index)
}
func (pc *prerunContext) flushPrerunResult() {
for {
select {
case task := <-pc.taskResultChan:
task.dump("Flush prerun result")
default:
return
}
}
}
func (pc *prerunContext) prerunRoutine() {
pc.prerunTx = true
for task := range pc.taskChan {
task.run()
}
}
func (pc *prerunContext) dequeueResult() (*ABCIResponses, error) {
expected := pc.prerunTask
for context := range pc.taskResultChan {
context.dump("Got prerun result")
if context.stopped {
continue
}
if context.height != expected.block.Height {
continue
}
if context.index != expected.index {
continue
}
if bytes.Equal(context.block.AppHash, expected.block.AppHash) {
return context.result.res, context.result.err
} else {
// todo
panic("wrong app hash")
}
}
return nil, nil
}
func (pc *prerunContext) stopPrerun(height int64) (index int64) {
task := pc.prerunTask
// stop the existing prerun if any
if task != nil {
if height > 0 && height != task.block.Height {
task.dump(fmt.Sprintf(
"Prerun sanity check failed. block.Height=%d, context.block.Height=%d",
height,
task.block.Height))
// todo
panic("Prerun sanity check failed")
}
task.dump("Stopping prerun")
task.stop()
index = task.index
}
pc.flushPrerunResult()
pc.prerunTask = nil
return index
}
func (pc *prerunContext) notifyPrerun(blockExec *BlockExecutor, block *types.Block) {
stoppedIndex := pc.stopPrerun(block.Height)
stoppedIndex++
pc.prerunTask = newExecutionTask(blockExec, block, stoppedIndex)
pc.prerunTask.dump("Notify prerun")
// start a new one
pc.taskChan <- pc.prerunTask
}
func (pc *prerunContext) getPrerunResult(height int64, fastSync bool) (res *ABCIResponses, err error) {
pc.checkIndex(height)
if fastSync {
pc.stopPrerun(height)
return
}
// blockExec.prerunContext == nil means:
// 1. prerunTx disabled
// 2. we are in fasy-sync: the block comes from BlockPool.AddBlock not State.addProposalBlockPart and no prerun result expected
if pc.prerunTask != nil {
res, err = pc.dequeueResult()
pc.prerunTask = nil
}
return
}