/
broadcast.go
99 lines (81 loc) · 3.16 KB
/
broadcast.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package server
import (
"github.com/make-os/kit/params"
pushtypes "github.com/make-os/kit/remote/push/types"
"github.com/pkg/errors"
"github.com/tendermint/tendermint/p2p"
)
// BroadcastNoteAndEndorsementFunc describes a function for broadcasting a push
// note and an endorsement of it.
type BroadcastNoteAndEndorsementFunc func(note pushtypes.PushNote) error
// BroadcastNoteAndEndorsement broadcasts a push note and an endorsement of it.
// The node has to be a top host to broadcast an endorsement.
func (sv *Server) BroadcastNoteAndEndorsement(note pushtypes.PushNote) error {
// Broadcast the push note to peers
sv.noteBroadcaster(note)
// Get the top hosts
topHosts, err := sv.logic.GetTicketManager().GetTopHosts(params.NumTopHostsLimit)
if err != nil {
return errors.Wrap(err, "failed to get top hosts")
}
// Exit with nil if node is not among the top hosts
if !topHosts.Has(sv.validatorKey.PubKey().MustBytes32()) {
return nil
}
// At this point, the node is a top host, create a signed endorsement
endorsement, err := sv.endorsementCreator(sv.validatorKey, note)
if err != nil {
return err
}
sv.log.Debug("Created a note endorsement", "NoteID", note.ID().String())
// Broadcast the endorsement
sv.endorsementBroadcaster(endorsement)
// Cache the Endorsement object as an endorsement of the PushNote so can use it
// to create a mempool-bound push transaction when enough endorsements are discovered.
sv.registerNoteEndorsement(note.ID().String(), endorsement)
// Attempt to create a PushTx and send to the transaction pool
sv.makePushTx(endorsement.NoteID.HexStr())
return nil
}
// BroadcastPushNoteFunc describes a function for broadcasting a push note
type BroadcastPushNoteFunc func(pushNote pushtypes.PushNote)
// broadcastPushNote broadcast push transaction to peers.
// It will not send to original sender of the push note.
func (sv *Server) broadcastPushNote(pushNote pushtypes.PushNote) {
for _, peer := range sv.Switch.Peers().List() {
bz, id := pushNote.BytesAndID()
if sv.isNoteSender(string(peer.ID()), id.String()) {
continue
}
if peer.Send(PushNoteReactorChannel, bz) {
sv.log.Debug("Sent push note to peer", "PeerID", peer.ID(), "ID", id)
}
}
}
// BroadcastEndorsementFunc describes a function for broadcasting endorsement
type BroadcastEndorsementFunc func(endorsement pushtypes.Endorsement)
// broadcastEndorsement sends out push endorsements (Endorsement) to peers
func (sv *Server) broadcastEndorsement(endorsement pushtypes.Endorsement) {
for _, peer := range sv.Switch.Peers().List() {
bz, id := endorsement.BytesAndID()
if sv.isEndorsementSender(string(peer.ID()), id.String()) {
continue
}
if peer.Send(PushEndReactorChannel, bz) {
sv.log.Debug("Sent push endorsement to peer", "PeerID", peer.ID(), "TxID", id)
}
}
}
// BroadcastMsg broadcast messages to peers
func (sv *Server) BroadcastMsg(ch byte, msg []byte) {
for _, peer := range sv.Switch.Peers().List() {
peer.Send(ch, msg)
}
}
// GetChannels implements Reactor.
func (sv *Server) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
{ID: PushNoteReactorChannel, Priority: 5},
{ID: PushEndReactorChannel, Priority: 5},
}
}