-
Notifications
You must be signed in to change notification settings - Fork 45
/
storeboundary.go
75 lines (63 loc) · 1.73 KB
/
storeboundary.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package pipeline
import "sort"
type storeBoundary struct {
nextBoundary uint64
interval uint64
requestStopBlock uint64
stopBlockReached bool
}
func NewStoreBoundary(
interval uint64,
requestStartBlockNum uint64,
requestStopBlock uint64,
) *storeBoundary {
b := &storeBoundary{
interval: interval,
requestStopBlock: requestStopBlock,
}
b.InitNextBoundary(requestStartBlockNum)
return b
}
func (r *storeBoundary) OverBoundary(blockNum uint64) bool {
return blockNum >= r.nextBoundary
}
func (r *storeBoundary) BumpBoundary() {
if r.stopBlockReached {
panic("should not be calling bump when stop block has been reached")
}
r.nextBoundary = r.computeBoundaryBlock(r.nextBoundary)
}
func (r *storeBoundary) computeBoundaryBlock(atBlockNum uint64) uint64 {
v := atBlockNum % r.interval
w := atBlockNum - v
res := w + r.interval
return res
//return atBlockNum - atBlockNum%r.interval + r.interval
}
func (r *storeBoundary) InitNextBoundary(blockNum uint64) {
r.nextBoundary = r.computeBoundaryBlock(blockNum)
}
func (r *storeBoundary) GetStoreFlushRanges(isSubRequest bool, reqStopBlockNum uint64, blockNum uint64) []uint64 {
boundaries := map[uint64]bool{}
for r.OverBoundary(blockNum) {
boundaries[r.nextBoundary] = true
r.BumpBoundary()
if isBlockOverStopBlock(r.nextBoundary, reqStopBlockNum) {
break
}
}
if isSubRequest && isBlockOverStopBlock(blockNum, reqStopBlockNum) {
boundaries[reqStopBlockNum] = true
}
out := []uint64{}
for v := range boundaries {
out = append(out, v)
}
sort.Slice(out, func(i, j int) bool {
return out[i] < out[j]
})
return out
}
func isBlockOverStopBlock(currentBlock uint64, stopBlock uint64) bool {
return stopBlock != 0 && currentBlock >= stopBlock
}