/
buffer.go
132 lines (116 loc) · 3.35 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright (c) 2019 IoTeX Foundation
// This is an alpha (internal) release and is not suitable for production. This source code is provided 'as is' and no
// warranties are given as to title or non-infringement, merchantability or fitness for purpose and, to the extent
// permitted by law, all liability for your use of the code is disclaimed. This source code is governed by Apache
// License 2.0 that can be found in the LICENSE file.
package blocksync
import (
"sync"
"go.uber.org/zap"
"github.com/iotexproject/iotex-core/pkg/log"
)
// blockBuffer is used to keep in-coming block in order.
type blockBuffer struct {
mu sync.RWMutex
blockQueues map[uint64]*uniQueue
bufferSize uint64
intervalSize uint64
}
type syncBlocksInterval struct {
Start uint64
End uint64
}
func newBlockBuffer(bufferSize, intervalSize uint64) *blockBuffer {
return &blockBuffer{
blockQueues: map[uint64]*uniQueue{},
bufferSize: bufferSize,
intervalSize: intervalSize,
}
}
func (b *blockBuffer) Delete(height uint64) []*peerBlock {
b.mu.Lock()
defer b.mu.Unlock()
queue, ok := b.blockQueues[height]
if !ok {
return nil
}
blks := queue.dequeAll()
delete(b.blockQueues, height)
return blks
}
func (b *blockBuffer) Cleanup(height uint64) {
b.mu.Lock()
defer b.mu.Unlock()
size := len(b.blockQueues)
if size > int(b.bufferSize)*2 {
log.L().Warn("blockBuffer is leaking memory.", zap.Int("bufferSize", size))
newQueues := map[uint64]*uniQueue{}
for h := range b.blockQueues {
if h > height {
newQueues[h] = b.blockQueues[h]
}
}
b.blockQueues = newQueues
}
}
// AddBlock tries to put given block into buffer and flush buffer into blockchain.
func (b *blockBuffer) AddBlock(tipHeight uint64, blk *peerBlock) (bool, uint64) {
b.mu.Lock()
defer b.mu.Unlock()
blkHeight := blk.block.Height()
if blkHeight <= tipHeight {
return false, 0
}
if blkHeight > tipHeight+b.bufferSize {
return false, tipHeight + b.bufferSize
}
if _, ok := b.blockQueues[blkHeight]; !ok {
b.blockQueues[blkHeight] = newUniQueue()
}
b.blockQueues[blkHeight].enque(blk)
return true, blkHeight
}
// GetBlocksIntervalsToSync returns groups of syncBlocksInterval are missing upto targetHeight.
func (b *blockBuffer) GetBlocksIntervalsToSync(confirmedHeight uint64, targetHeight uint64) []syncBlocksInterval {
b.mu.RLock()
defer b.mu.RUnlock()
var (
start uint64
startSet bool
bi []syncBlocksInterval
)
// The sync range shouldn't go beyond tip height + buffer size to avoid being too aggressive
if targetHeight > confirmedHeight+b.bufferSize {
targetHeight = confirmedHeight + b.bufferSize
}
// The sync range should at least contain one interval to speculatively fetch missing blocks
if targetHeight < confirmedHeight+b.intervalSize {
targetHeight = confirmedHeight + b.intervalSize
}
var iLen uint64
for h := confirmedHeight + 1; h <= targetHeight; h++ {
if _, ok := b.blockQueues[h]; !ok {
iLen++
if !startSet {
start = h
startSet = true
}
if iLen >= b.intervalSize {
bi = append(bi, syncBlocksInterval{Start: start, End: h})
startSet = false
iLen = 0
}
continue
}
if startSet {
bi = append(bi, syncBlocksInterval{Start: start, End: h - 1})
startSet = false
iLen = 0
}
}
// handle last interval
if startSet {
bi = append(bi, syncBlocksInterval{Start: start, End: targetHeight})
}
return bi
}