-
Notifications
You must be signed in to change notification settings - Fork 176
/
distributor.go
95 lines (78 loc) · 3.51 KB
/
distributor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package pubsub
import (
"github.com/onflow/flow-go/consensus/hotstuff"
)
// Distributor distributes notifications to a list of consumers (event consumers).
//
// It allows thread-safe subscription of multiple consumers to events.
type Distributor struct {
*FollowerDistributor
*CommunicatorDistributor
*ParticipantDistributor
}
var _ hotstuff.Consumer = (*Distributor)(nil)
func NewDistributor() *Distributor {
return &Distributor{
FollowerDistributor: NewFollowerDistributor(),
CommunicatorDistributor: NewCommunicatorDistributor(),
ParticipantDistributor: NewParticipantDistributor(),
}
}
// AddConsumer adds an event consumer to the Distributor
func (p *Distributor) AddConsumer(consumer hotstuff.Consumer) {
p.FollowerDistributor.AddFollowerConsumer(consumer)
p.CommunicatorDistributor.AddCommunicatorConsumer(consumer)
p.ParticipantDistributor.AddParticipantConsumer(consumer)
}
// FollowerDistributor ingests consensus follower events and distributes it to consumers.
// It allows thread-safe subscription of multiple consumers to events.
type FollowerDistributor struct {
*ProposalViolationDistributor
*FinalizationDistributor
}
var _ hotstuff.FollowerConsumer = (*FollowerDistributor)(nil)
func NewFollowerDistributor() *FollowerDistributor {
return &FollowerDistributor{
ProposalViolationDistributor: NewProtocolViolationDistributor(),
FinalizationDistributor: NewFinalizationDistributor(),
}
}
// AddFollowerConsumer registers the input `consumer` to be notified on `hotstuff.ConsensusFollowerConsumer` events.
func (d *FollowerDistributor) AddFollowerConsumer(consumer hotstuff.FollowerConsumer) {
d.FinalizationDistributor.AddFinalizationConsumer(consumer)
d.ProposalViolationDistributor.AddProposalViolationConsumer(consumer)
}
// TimeoutAggregationDistributor ingests timeout aggregation events and distributes it to consumers.
// It allows thread-safe subscription of multiple consumers to events.
type TimeoutAggregationDistributor struct {
*TimeoutAggregationViolationDistributor
*TimeoutCollectorDistributor
}
var _ hotstuff.TimeoutAggregationConsumer = (*TimeoutAggregationDistributor)(nil)
func NewTimeoutAggregationDistributor() *TimeoutAggregationDistributor {
return &TimeoutAggregationDistributor{
TimeoutAggregationViolationDistributor: NewTimeoutAggregationViolationDistributor(),
TimeoutCollectorDistributor: NewTimeoutCollectorDistributor(),
}
}
func (d *TimeoutAggregationDistributor) AddTimeoutAggregationConsumer(consumer hotstuff.TimeoutAggregationConsumer) {
d.TimeoutAggregationViolationDistributor.AddTimeoutAggregationViolationConsumer(consumer)
d.TimeoutCollectorDistributor.AddTimeoutCollectorConsumer(consumer)
}
// VoteAggregationDistributor ingests vote aggregation events and distributes it to consumers.
// It allows thread-safe subscription of multiple consumers to events.
type VoteAggregationDistributor struct {
*VoteAggregationViolationDistributor
*VoteCollectorDistributor
}
var _ hotstuff.VoteAggregationConsumer = (*VoteAggregationDistributor)(nil)
func NewVoteAggregationDistributor() *VoteAggregationDistributor {
return &VoteAggregationDistributor{
VoteAggregationViolationDistributor: NewVoteAggregationViolationDistributor(),
VoteCollectorDistributor: NewQCCreatedDistributor(),
}
}
func (d *VoteAggregationDistributor) AddVoteAggregationConsumer(consumer hotstuff.VoteAggregationConsumer) {
d.VoteAggregationViolationDistributor.AddVoteAggregationViolationConsumer(consumer)
d.VoteCollectorDistributor.AddVoteCollectorConsumer(consumer)
}