/
execution_context.go
139 lines (114 loc) · 3.08 KB
/
execution_context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package state
import (
"bytes"
"fmt"
"github.com/okx/okbchain/libs/tendermint/libs/log"
"github.com/okx/okbchain/libs/tendermint/types"
"time"
)
type prerunContext struct {
prerunTx bool
taskChan chan *executionTask
taskResultChan chan *executionTask
prerunTask *executionTask
logger log.Logger
}
func newPrerunContex(logger log.Logger) *prerunContext {
return &prerunContext{
taskChan: make(chan *executionTask, 1),
taskResultChan: make(chan *executionTask, 1),
logger: logger,
}
}
func (pc *prerunContext) checkIndex(height int64) {
var index int64
if pc.prerunTask != nil {
index = pc.prerunTask.index
}
pc.logger.Info("Not apply delta", "height", height, "prerunIndex", index)
}
func (pc *prerunContext) flushPrerunResult() {
for {
select {
case task := <-pc.taskResultChan:
task.dump("Flush prerun result")
default:
return
}
}
}
func (pc *prerunContext) prerunRoutine() {
pc.prerunTx = true
for task := range pc.taskChan {
task.run()
}
}
func (pc *prerunContext) dequeueResult() (*ABCIResponses, time.Duration, error) {
expected := pc.prerunTask
for context := range pc.taskResultChan {
context.dump("Got prerun result")
if context.stopped {
continue
}
if context.height != expected.block.Height {
continue
}
if context.index != expected.index {
continue
}
if bytes.Equal(context.block.AppHash, expected.block.AppHash) {
return context.result.res, context.result.duration, context.result.err
} else {
// todo
panic("wrong app hash")
}
}
return nil, 0, nil
}
func (pc *prerunContext) stopPrerun(height int64) (index int64) {
task := pc.prerunTask
// stop the existing prerun if any
if task != nil {
if height > 0 && height != task.block.Height {
task.dump(fmt.Sprintf(
"Prerun sanity check failed. block.Height=%d, context.block.Height=%d",
height,
task.block.Height))
// todo
panic("Prerun sanity check failed")
}
task.dump("Stopping prerun")
task.stop()
index = task.index
}
pc.flushPrerunResult()
pc.prerunTask = nil
return index
}
func (pc *prerunContext) notifyPrerun(blockExec *BlockExecutor, block *types.Block) {
stoppedIndex := pc.stopPrerun(block.Height)
stoppedIndex++
pc.prerunTask = newExecutionTask(blockExec, block, stoppedIndex)
pc.prerunTask.dump("Notify prerun")
// start a new one
pc.taskChan <- pc.prerunTask
}
func (pc *prerunContext) getPrerunResult(block *types.Block) (res *ABCIResponses, duration time.Duration, err error) {
pc.checkIndex(block.Height)
// blockExec.prerunContext == nil means:
// 1. prerunTx disabled
// 2. we are in fasy-sync: the block comes from BlockPool.AddBlock not State.addProposalBlockPart and no prerun result expected
if pc.prerunTask != nil {
prerunHash := pc.prerunTask.block.Hash()
res, duration, err = pc.dequeueResult()
pc.prerunTask = nil
//compare block hash equal prerun block hash
if !bytes.Equal(prerunHash, block.Hash()) {
res = nil
pc.logger.Error("unequal block hash between prerun and block",
"prerun hash", prerunHash,
"block hash", block.Hash())
}
}
return
}