-
Notifications
You must be signed in to change notification settings - Fork 211
/
pubsub.go
318 lines (277 loc) · 10 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
package pubsub
import (
"context"
"errors"
"fmt"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/hash"
"github.com/spacemeshos/go-spacemesh/log"
p2pmetrics "github.com/spacemeshos/go-spacemesh/p2p/metrics"
)
func init() {
pubsub.GossipSubD = 6
pubsub.GossipSubDscore = 4
pubsub.GossipSubDout = 3
pubsub.GossipSubDlo = 4
pubsub.GossipSubDhi = 8
pubsub.GossipSubDlazy = 8
pubsub.GossipSubDirectConnectInitialDelay = 30 * time.Second
pubsub.GossipSubIWantFollowupTime = 5 * time.Second
pubsub.GossipSubHistoryLength = 10
pubsub.GossipSubGossipFactor = 0.1
}
const (
// score thresholds.
// see details https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#score-thresholds
// GossipScoreThreshold when a peer's score drops below this threshold, no gossip is emitted towards that peer
// and gossip from that peer is ignored.
GossipScoreThreshold = -500
// PublishScoreThreshold when a peer's score drops below this threshold, self published messages are not propagated
// towards this peer when (flood) publishing.
PublishScoreThreshold = -1000
// GraylistScoreThreshold when a peer's score drops below this threshold, the peer is graylisted and its RPCs are ignored.
GraylistScoreThreshold = -2500
// AcceptPXScoreThreshold when a peer sends us PX information with a prune, we only accept it and connect to the
// supplied peers if the originating peer's score exceeds this threshold.
AcceptPXScoreThreshold = 1000
// OpportunisticGraftScoreThreshold when the median peer score in the mesh drops below this value, the router
// may select more peers with score above the median to opportunistically graft on the mesh.
OpportunisticGraftScoreThreshold = 3.5
// AtxProtocol is the protocol id for ATXs.
AtxProtocol = "ax1"
// ProposalProtocol is the protocol id for block proposals.
ProposalProtocol = "pp1"
// TxProtocol iis the protocol id for transactions.
TxProtocol = "tx1"
// HareProtocol is the protocol id for hare messages.
HareProtocol = "hr1"
// BlockCertify is the protocol id for block certification.
BlockCertify = "bc1"
// BeaconProtocol is used currently only for recording metrics, but
// potentially will become used as an actual protocol if we decide to merge
// the beacon protocols.
// https://github.com/spacemeshos/go-spacemesh/issues/4207
BeaconProtocol = "b1"
// BeaconWeakCoinProtocol is the protocol id for beacon weak coin.
BeaconWeakCoinProtocol = "bw1"
// BeaconProposalProtocol is the protocol id for beacon proposals.
BeaconProposalProtocol = "bp1"
// BeaconFirstVotesProtocol is the protocol id for beacon first vote.
BeaconFirstVotesProtocol = "bf1"
// BeaconFollowingVotesProtocol is the protocol id for beacon following votes.
BeaconFollowingVotesProtocol = "bo1"
MalfeasanceProof = "mp1"
)
// DefaultConfig for PubSub.
func DefaultConfig() Config {
return Config{Flood: true, QueueSize: 10000, Throttle: 10000}
}
// Config for PubSub.
type Config struct {
Flood bool
IsBootnode bool
Bootnodes []peer.AddrInfo
// Direct peers should be configured on both ends.
Direct []peer.AddrInfo
MaxMessageSize int
QueueSize int
Throttle int
}
// New creates PubSub instance.
func New(ctx context.Context, logger log.Log, h host.Host, cfg Config) (*PubSub, error) {
// TODO(dshulyak) refactor code to accept options
opts := getOptions(cfg)
ps, err := pubsub.NewGossipSub(ctx, h, opts...)
if err != nil {
return nil, fmt.Errorf("failed to initialize gossipsub instance: %w", err)
}
return &PubSub{
logger: logger,
pubsub: ps,
topics: map[string]*pubsub.Topic{},
host: h,
}, nil
}
//go:generate mockgen -typed -package=mocks -destination=./mocks/publisher.go -source=./pubsub.go
// Publisher interface for publishing messages.
type Publisher interface {
Publish(context.Context, string, []byte) error
}
// Subscriber is an interface for subcribing to messages.
type Subscriber interface {
Register(string, GossipHandler, ...ValidatorOpt)
}
type ValidatorOpt = pubsub.ValidatorOpt
var (
WithValidatorInline = pubsub.WithValidatorInline
WithValidatorConcurrency = pubsub.WithValidatorConcurrency
)
// PublishSubsciber common interface for publisher and subscribing.
type PublishSubsciber interface {
Publisher
Subscriber
}
// GossipHandler is a function that is for receiving p2p messages.
type GossipHandler = func(context.Context, peer.ID, []byte) error
// SyncHandler is a function that is for receiving synced data.
type SyncHandler = func(context.Context, types.Hash32, peer.ID, []byte) error
// ErrValidationReject is returned by a GossipHandler to indicate that the
// pubsub validation result is ValidationReject. ValidationAccept is indicated
// by a nil error and ValidationIgnore is indicated by any error that is not a
// ErrValidationReject.
var ErrValidationReject = errors.New("validation reject")
// ChainGossipHandler helper to chain multiple GossipHandler together. Called synchronously and in the order.
func ChainGossipHandler(handlers ...GossipHandler) GossipHandler {
return func(ctx context.Context, pid peer.ID, msg []byte) error {
for _, h := range handlers {
if err := h(ctx, pid, msg); err != nil {
return err
}
}
return nil
}
}
// DropPeerOnValidationReject wraps a gossip handler to provide a handler that drops a
// peer if the wrapped handler returns ErrValidationReject.
func DropPeerOnValidationReject(handler GossipHandler, h host.Host, logger log.Log) GossipHandler {
return func(ctx context.Context, peer peer.ID, data []byte) error {
err := handler(ctx, peer, data)
if errors.Is(err, ErrValidationReject) {
p2pmetrics.DroppedConnectionsValidationReject.Inc()
err := h.Network().ClosePeer(peer)
if err != nil {
logger.With().Debug("failed to close peer",
log.String("peer", peer.ShortString()),
log.Err(err),
)
}
}
return err
}
}
func DropPeerOnSyncValidationReject(handler SyncHandler, h host.Host, logger log.Log) SyncHandler {
return func(ctx context.Context, hash types.Hash32, peer peer.ID, data []byte) error {
err := handler(ctx, hash, peer, data)
if errors.Is(err, ErrValidationReject) {
p2pmetrics.DroppedConnectionsValidationReject.Inc()
err := h.Network().ClosePeer(peer)
if err != nil {
logger.With().Debug("failed to close peer",
log.String("peer", peer.ShortString()),
log.Err(err),
)
}
}
return err
}
}
func msgID(msg *pubsubpb.Message) string {
hasher := hash.New()
if msg.Topic != nil {
hasher.Write([]byte(*msg.Topic))
}
hasher.Write(msg.Data)
return string(hasher.Sum(nil))
}
func getOptions(cfg Config) []pubsub.Option {
boots := map[peer.ID]struct{}{}
for _, addr := range cfg.Bootnodes {
boots[addr.ID] = struct{}{}
}
options := []pubsub.Option{
// Gossipsubv1.1 configuration
pubsub.WithFloodPublish(cfg.Flood),
pubsub.WithDirectPeers(cfg.Direct),
pubsub.WithMessageIdFn(msgID),
pubsub.WithNoAuthor(),
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
pubsub.WithPeerOutboundQueueSize(8192),
pubsub.WithValidateQueueSize(cfg.QueueSize),
pubsub.WithValidateThrottle(cfg.Throttle),
pubsub.WithRawTracer(p2pmetrics.NewGoSIPCollector()),
pubsub.WithPeerScore(
&pubsub.PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 {
_, exist := boots[p]
if exist && !cfg.IsBootnode {
return 10000
}
return 0
},
AppSpecificWeight: 1,
// TODO: consider setting IP co-location threshold before applying penalties
// P7: behavioral penalties, decay after 1hr
BehaviourPenaltyThreshold: 6,
BehaviourPenaltyWeight: -10,
BehaviourPenaltyDecay: pubsub.ScoreParameterDecay(time.Hour),
DecayInterval: pubsub.DefaultDecayInterval,
DecayToZero: pubsub.DefaultDecayToZero,
// this retains non-positive scores for 6 hours
RetainScore: 6 * time.Hour,
Topics: map[string]*pubsub.TopicScoreParams{
AtxProtocol: defaultTopicParam(),
ProposalProtocol: defaultTopicParam(),
},
// TODO: add TopicScoreParams
},
&pubsub.PeerScoreThresholds{
GossipThreshold: GossipScoreThreshold,
PublishThreshold: PublishScoreThreshold,
GraylistThreshold: GraylistScoreThreshold,
AcceptPXThreshold: AcceptPXScoreThreshold,
OpportunisticGraftThreshold: OpportunisticGraftScoreThreshold,
},
),
// TODO: add peer scoring debugging with WithPeerScoreInspect
}
if cfg.MaxMessageSize != 0 {
options = append(options, pubsub.WithMaxMessageSize(cfg.MaxMessageSize))
}
// enable Peer eXchange on bootstrappers
if cfg.IsBootnode {
// turn off the mesh for bootnodes -- only do gossip and PX
pubsub.GossipSubD = 0
pubsub.GossipSubDscore = 0
pubsub.GossipSubDlo = 0
pubsub.GossipSubDhi = 0
pubsub.GossipSubDout = 0
pubsub.GossipSubDlazy = 64
pubsub.GossipSubGossipFactor = 0.25
pubsub.GossipSubPruneBackoff = 5 * time.Minute
// turn on PX
options = append(options, pubsub.WithPeerExchange(true))
}
return options
}
func defaultTopicParam() *pubsub.TopicScoreParams {
return &pubsub.TopicScoreParams{
TopicWeight: 0.1, // max cap is 50, max mesh penalty is -10, single invalid message is -100
// 1 tick per second, maxes at 1 after 1 hour
TimeInMeshWeight: 0.00027, // ~1/3600
TimeInMeshQuantum: time.Second,
TimeInMeshCap: 1,
// deliveries decay after 1 hour, cap at 1000
FirstMessageDeliveriesWeight: 5, // max value is 500
FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
FirstMessageDeliveriesCap: 100,
// TODO: consider mesh delivery failure when the network grows and traffic becomes significant
// invalid messages decay after 1 hour
InvalidMessageDeliveriesWeight: -1000,
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
}
}
func castResult(err error) string {
switch {
case err == nil:
return "accept"
case errors.Is(err, ErrValidationReject):
return "reject"
default:
return "ignore"
}
}