-
Notifications
You must be signed in to change notification settings - Fork 177
/
vote_collectors.go
148 lines (127 loc) · 5.57 KB
/
vote_collectors.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package voteaggregator
import (
"fmt"
"sync"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/mempool"
)
// NewCollectorFactoryMethod is a factory method to generate a VoteCollector for concrete view
type NewCollectorFactoryMethod = func(view uint64, workers hotstuff.Workers) (hotstuff.VoteCollector, error)
// VoteCollectors implements management of multiple vote collectors indexed by view.
// Implements hotstuff.VoteCollectors interface. Creating a VoteCollector for a
// particular view is lazy (instances are created on demand).
// This structure is concurrently safe.
type VoteCollectors struct {
*component.ComponentManager
log zerolog.Logger
lock sync.RWMutex
lowestRetainedView uint64 // lowest view, for which we still retain a VoteCollector and process votes
collectors map[uint64]hotstuff.VoteCollector // view -> VoteCollector
workerPool hotstuff.Workerpool // for processing votes that are already cached in VoteCollectors and waiting for respective block
createCollector NewCollectorFactoryMethod // factory method for creating collectors
}
var _ hotstuff.VoteCollectors = (*VoteCollectors)(nil)
func NewVoteCollectors(logger zerolog.Logger, lowestRetainedView uint64, workerPool hotstuff.Workerpool, factoryMethod NewCollectorFactoryMethod) *VoteCollectors {
// Component manager for wrapped worker pool
componentBuilder := component.NewComponentManagerBuilder()
componentBuilder.AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()
<-ctx.Done() // wait for parent context to signal shutdown
workerPool.StopWait() // wait till all workers exit
})
return &VoteCollectors{
log: logger,
ComponentManager: componentBuilder.Build(),
lowestRetainedView: lowestRetainedView,
collectors: make(map[uint64]hotstuff.VoteCollector),
workerPool: workerPool,
createCollector: factoryMethod,
}
}
// GetOrCreateCollector retrieves the hotstuff.VoteCollector for the specified
// view or creates one if none exists.
// - (collector, true, nil) if no collector can be found by the view, and a new collector was created.
// - (collector, false, nil) if the collector can be found by the view
// - (nil, false, error) if running into any exception creating the vote collector state machine
//
// Expected error returns during normal operations:
// - mempool.BelowPrunedThresholdError - in case view is lower than lowestRetainedView
func (v *VoteCollectors) GetOrCreateCollector(view uint64) (hotstuff.VoteCollector, bool, error) {
cachedCollector, hasCachedCollector, err := v.getCollector(view)
if err != nil {
return nil, false, err
}
if hasCachedCollector {
return cachedCollector, false, nil
}
collector, err := v.createCollector(view, v.workerPool)
if err != nil {
return nil, false, fmt.Errorf("could not create vote collector for view %d: %w", view, err)
}
// Initial check showed that there was no collector. However, it's possible that after the
// initial check but before acquiring the lock to add the newly-created collector, another
// goroutine already added the needed collector. Hence, check again after acquiring the lock:
v.lock.Lock()
defer v.lock.Unlock()
clr, found := v.collectors[view]
if found {
return clr, false, nil
}
v.collectors[view] = collector
return collector, true, nil
}
// getCollector retrieves hotstuff.VoteCollector from local cache in concurrent safe way.
// Performs check for lowestRetainedView.
// Expected error returns during normal operations:
// - mempool.BelowPrunedThresholdError - in case view is lower than lowestRetainedView
func (v *VoteCollectors) getCollector(view uint64) (hotstuff.VoteCollector, bool, error) {
v.lock.RLock()
defer v.lock.RUnlock()
if view < v.lowestRetainedView {
return nil, false, mempool.NewBelowPrunedThresholdErrorf("cannot retrieve collector for pruned view %d (lowest retained view %d)", view, v.lowestRetainedView)
}
clr, found := v.collectors[view]
return clr, found, nil
}
// PruneUpToView prunes the vote collectors with views _below_ the given value, i.e.
// we only retain and process whose view is equal or larger than `lowestRetainedView`.
// If `lowestRetainedView` is smaller than the previous value, the previous value is
// kept and the method call is a NoOp.
func (v *VoteCollectors) PruneUpToView(lowestRetainedView uint64) {
v.lock.Lock()
defer v.lock.Unlock()
if v.lowestRetainedView >= lowestRetainedView {
return
}
if len(v.collectors) == 0 {
v.lowestRetainedView = lowestRetainedView
return
}
sizeBefore := len(v.collectors)
// to optimize the pruning of large view-ranges, we compare:
// * the number of views for which we have collectors: len(v.collectors)
// * the number of views that need to be pruned: view-v.lowestRetainedView
// We iterate over the dimension which is smaller.
if uint64(len(v.collectors)) < lowestRetainedView-v.lowestRetainedView {
for w := range v.collectors {
if w < lowestRetainedView {
delete(v.collectors, w)
}
}
} else {
for w := v.lowestRetainedView; w < lowestRetainedView; w++ {
delete(v.collectors, w)
}
}
from := v.lowestRetainedView
v.lowestRetainedView = lowestRetainedView
v.log.Debug().
Uint64("prior_lowest_retained_view", from).
Uint64("lowest_retained_view", lowestRetainedView).
Int("prior_size", sizeBefore).
Int("size", len(v.collectors)).
Msgf("pruned vote collectors")
}