-
Notifications
You must be signed in to change notification settings - Fork 985
/
pubsub.go
186 lines (161 loc) · 6.02 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package p2p
import (
"context"
"encoding/hex"
"strings"
"time"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
pbrpc "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
)
const (
// overlay parameters
gossipSubD = 8 // topic stable mesh target count
gossipSubDlo = 6 // topic stable mesh low watermark
gossipSubDhi = 12 // topic stable mesh high watermark
// gossip parameters
gossipSubMcacheLen = 6 // number of windows to retain full messages in cache for `IWANT` responses
gossipSubMcacheGossip = 3 // number of windows to gossip about
gossipSubSeenTTL = 550 // number of heartbeat intervals to retain message IDs
// fanout ttl
gossipSubFanoutTTL = 60000000000 // TTL for fanout maps for topics we are not subscribed to but have published to, in nano seconds
// heartbeat interval
gossipSubHeartbeatInterval = 700 * time.Millisecond // frequency of heartbeat, milliseconds
// misc
randomSubD = 6 // random gossip target
)
var errInvalidTopic = errors.New("invalid topic format")
// Specifies the fixed size context length.
const digestLength = 4
// Specifies the prefix for any pubsub topic.
const gossipTopicPrefix = "/eth2/"
// JoinTopic will join PubSub topic, if not already joined.
func (s *Service) JoinTopic(topic string, opts ...pubsub.TopicOpt) (*pubsub.Topic, error) {
s.joinedTopicsLock.Lock()
defer s.joinedTopicsLock.Unlock()
if _, ok := s.joinedTopics[topic]; !ok {
topicHandle, err := s.pubsub.Join(topic, opts...)
if err != nil {
return nil, err
}
s.joinedTopics[topic] = topicHandle
}
return s.joinedTopics[topic], nil
}
// LeaveTopic closes topic and removes corresponding handler from list of joined topics.
// This method will return error if there are outstanding event handlers or subscriptions.
func (s *Service) LeaveTopic(topic string) error {
s.joinedTopicsLock.Lock()
defer s.joinedTopicsLock.Unlock()
if t, ok := s.joinedTopics[topic]; ok {
if err := t.Close(); err != nil {
return err
}
delete(s.joinedTopics, topic)
}
return nil
}
// PublishToTopic joins (if necessary) and publishes a message to a PubSub topic.
func (s *Service) PublishToTopic(ctx context.Context, topic string, data []byte, opts ...pubsub.PubOpt) error {
topicHandle, err := s.JoinTopic(topic)
if err != nil {
return err
}
// Wait for at least 1 peer to be available to receive the published message.
for {
if len(topicHandle.ListPeers()) > 0 || flags.Get().MinimumSyncPeers == 0 {
return topicHandle.Publish(ctx, data, opts...)
}
select {
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), "unable to find requisite number of peers for topic %s, 0 peers found to publish to", topic)
default:
time.Sleep(100 * time.Millisecond)
}
}
}
// SubscribeToTopic joins (if necessary) and subscribes to PubSub topic.
func (s *Service) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub.Subscription, error) {
s.awaitStateInitialized() // Genesis time and genesis validators root are required to subscribe.
topicHandle, err := s.JoinTopic(topic)
if err != nil {
return nil, err
}
scoringParams, err := s.topicScoreParams(topic)
if err != nil {
return nil, err
}
if scoringParams != nil {
if err := topicHandle.SetScoreParams(scoringParams); err != nil {
return nil, err
}
logGossipParameters(topic, scoringParams)
}
return topicHandle.Subscribe(opts...)
}
// peerInspector will scrape all the relevant scoring data and add it to our
// peer handler.
func (s *Service) peerInspector(peerMap map[peer.ID]*pubsub.PeerScoreSnapshot) {
// Iterate through all the connected peers and through any of their
// relevant topics.
for pid, snap := range peerMap {
s.peers.Scorers().GossipScorer().SetGossipData(pid, snap.Score,
snap.BehaviourPenalty, convertTopicScores(snap.Topics))
}
}
// creates a custom gossipsub parameter set.
func pubsubGossipParam() pubsub.GossipSubParams {
gParams := pubsub.DefaultGossipSubParams()
gParams.Dlo = gossipSubDlo
gParams.D = gossipSubD
gParams.HeartbeatInterval = gossipSubHeartbeatInterval
gParams.HistoryLength = gossipSubMcacheLen
gParams.HistoryGossip = gossipSubMcacheGossip
return gParams
}
// We have to unfortunately set this globally in order
// to configure our message id time-cache rather than instantiating
// it with a router instance.
func setPubSubParameters() {
pubsub.TimeCacheDuration = 550 * gossipSubHeartbeatInterval
}
// convert from libp2p's internal schema to a compatible prysm protobuf format.
func convertTopicScores(topicMap map[string]*pubsub.TopicScoreSnapshot) map[string]*pbrpc.TopicScoreSnapshot {
newMap := make(map[string]*pbrpc.TopicScoreSnapshot, len(topicMap))
for t, s := range topicMap {
newMap[t] = &pbrpc.TopicScoreSnapshot{
TimeInMesh: uint64(s.TimeInMesh.Milliseconds()),
FirstMessageDeliveries: float32(s.FirstMessageDeliveries),
MeshMessageDeliveries: float32(s.MeshMessageDeliveries),
InvalidMessageDeliveries: float32(s.InvalidMessageDeliveries),
}
}
return newMap
}
// ExtractGossipDigest extracts the relevant fork digest from the gossip topic.
// Topics are in the form of /eth2/{fork-digest}/{topic} and this method extracts the
// fork digest from the topic string to a 4 byte array.
func ExtractGossipDigest(topic string) ([4]byte, error) {
// Ensure the topic prefix is correct.
if len(topic) < len(gossipTopicPrefix)+1 || topic[:len(gossipTopicPrefix)] != gossipTopicPrefix {
return [4]byte{}, errInvalidTopic
}
start := len(gossipTopicPrefix)
end := strings.Index(topic[start:], "/")
if end == -1 { // Ensure a topic suffix exists.
return [4]byte{}, errInvalidTopic
}
end += start
strDigest := topic[start:end]
digest, err := hex.DecodeString(strDigest)
if err != nil {
return [4]byte{}, err
}
if len(digest) != digestLength {
return [4]byte{}, errors.Errorf("invalid digest length wanted %d but got %d", digestLength, len(digest))
}
return bytesutil.ToBytes4(digest), nil
}