-
Notifications
You must be signed in to change notification settings - Fork 175
/
aggregators.go
82 lines (71 loc) · 2.72 KB
/
aggregators.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package consensus
import (
"fmt"
"github.com/gammazero/workerpool"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub"
"github.com/onflow/flow-go/consensus/hotstuff/timeoutaggregator"
"github.com/onflow/flow-go/consensus/hotstuff/timeoutcollector"
"github.com/onflow/flow-go/consensus/hotstuff/voteaggregator"
"github.com/onflow/flow-go/consensus/hotstuff/votecollector"
"github.com/onflow/flow-go/module"
)
// NewVoteAggregator creates new VoteAggregator and subscribes for finalization events.
// No error returns are expected during normal operations.
func NewVoteAggregator(
log zerolog.Logger,
hotstuffMetrics module.HotstuffMetrics,
engineMetrics module.EngineMetrics,
mempoolMetrics module.MempoolMetrics,
lowestRetainedView uint64,
notifier hotstuff.VoteAggregationConsumer,
voteProcessorFactory hotstuff.VoteProcessorFactory,
distributor *pubsub.FollowerDistributor,
) (hotstuff.VoteAggregator, error) {
createCollectorFactoryMethod := votecollector.NewStateMachineFactory(log, notifier, voteProcessorFactory.Create)
voteCollectors := voteaggregator.NewVoteCollectors(log, lowestRetainedView, workerpool.New(4), createCollectorFactoryMethod)
// initialize the vote aggregator
aggregator, err := voteaggregator.NewVoteAggregator(
log,
hotstuffMetrics,
engineMetrics,
mempoolMetrics,
notifier,
lowestRetainedView,
voteCollectors,
)
if err != nil {
return nil, fmt.Errorf("could not create vote aggregator: %w", err)
}
distributor.AddOnBlockFinalizedConsumer(aggregator.OnFinalizedBlock)
return aggregator, nil
}
// NewTimeoutAggregator creates new TimeoutAggregator and connects Hotstuff event source with event handler.
// No error returns are expected during normal operations.
func NewTimeoutAggregator(log zerolog.Logger,
hotstuffMetrics module.HotstuffMetrics,
engineMetrics module.EngineMetrics,
mempoolMetrics module.MempoolMetrics,
notifier *pubsub.Distributor,
timeoutProcessorFactory hotstuff.TimeoutProcessorFactory,
distributor *pubsub.TimeoutAggregationDistributor,
lowestRetainedView uint64,
) (hotstuff.TimeoutAggregator, error) {
timeoutCollectorFactory := timeoutcollector.NewTimeoutCollectorFactory(log, distributor, timeoutProcessorFactory)
collectors := timeoutaggregator.NewTimeoutCollectors(log, hotstuffMetrics, lowestRetainedView, timeoutCollectorFactory)
// initialize the timeout aggregator
aggregator, err := timeoutaggregator.NewTimeoutAggregator(
log,
hotstuffMetrics,
engineMetrics,
mempoolMetrics,
lowestRetainedView,
collectors,
)
if err != nil {
return nil, fmt.Errorf("could not create timeout aggregator: %w", err)
}
notifier.AddConsumer(aggregator)
return aggregator, nil
}