-
Notifications
You must be signed in to change notification settings - Fork 1k
/
service.go
494 lines (435 loc) · 14.7 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
// Package p2p defines the network protocol implementation for Ethereum consensus
// used by beacon nodes, including peer discovery using discv5, gossip-sub
// using libp2p, and handing peer lifecycles + handshakes.
package p2p
import (
"context"
"crypto/ecdsa"
"sync"
"time"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/async"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v3/config/params"
leakybucket "github.com/prysmaticlabs/prysm/v3/container/leaky-bucket"
prysmnetwork "github.com/prysmaticlabs/prysm/v3/network"
"github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1/metadata"
"github.com/prysmaticlabs/prysm/v3/runtime"
"github.com/prysmaticlabs/prysm/v3/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
var _ runtime.Service = (*Service)(nil)
// In the event that we are at our peer limit, we
// stop looking for new peers and instead poll
// for the current peer limit status for the time period
// defined below.
var pollingPeriod = 6 * time.Second
// Refresh rate of ENR set at twice per slot.
var refreshRate = slots.DivideSlotBy(2)
// maxBadResponses is the maximum number of bad responses from a peer before we stop talking to it.
const maxBadResponses = 5
// pubsubQueueSize is the size that we assign to our validation queue and outbound message queue for
// gossipsub.
const pubsubQueueSize = 600
// maxDialTimeout is the timeout for a single peer dial.
var maxDialTimeout = params.BeaconNetworkConfig().RespTimeout
// Service for managing peer to peer (p2p) networking.
type Service struct {
started bool
isPreGenesis bool
pingMethod func(ctx context.Context, id peer.ID) error
cancel context.CancelFunc
cfg *Config
peers *peers.Status
addrFilter *multiaddr.Filters
ipLimiter *leakybucket.Collector
privKey *ecdsa.PrivateKey
metaData metadata.Metadata
pubsub *pubsub.PubSub
joinedTopics map[string]*pubsub.Topic
joinedTopicsLock sync.Mutex
subnetsLock map[uint64]*sync.RWMutex
subnetsLockLock sync.Mutex // Lock access to subnetsLock
initializationLock sync.Mutex
dv5Listener Listener
startupErr error
stateNotifier statefeed.Notifier
ctx context.Context
host host.Host
genesisTime time.Time
genesisValidatorsRoot []byte
activeValidatorCount uint64
}
// NewService initializes a new p2p service compatible with shared.Service interface. No
// connections are made until the Start function is called during the service registry startup.
func NewService(ctx context.Context, cfg *Config) (*Service, error) {
var err error
ctx, cancel := context.WithCancel(ctx)
_ = cancel // govet fix for lost cancel. Cancel is handled in service.Stop().
s := &Service{
ctx: ctx,
stateNotifier: cfg.StateNotifier,
cancel: cancel,
cfg: cfg,
isPreGenesis: true,
joinedTopics: make(map[string]*pubsub.Topic, len(gossipTopicMappings)),
subnetsLock: make(map[uint64]*sync.RWMutex),
}
dv5Nodes := parseBootStrapAddrs(s.cfg.BootstrapNodeAddr)
cfg.Discv5BootStrapAddr = dv5Nodes
ipAddr := prysmnetwork.IPAddr()
s.privKey, err = privKey(s.cfg)
if err != nil {
log.WithError(err).Error("Failed to generate p2p private key")
return nil, err
}
s.metaData, err = metaDataFromConfig(s.cfg)
if err != nil {
log.WithError(err).Error("Failed to create peer metadata")
return nil, err
}
s.addrFilter, err = configureFilter(s.cfg)
if err != nil {
log.WithError(err).Error("Failed to create address filter")
return nil, err
}
s.ipLimiter = leakybucket.NewCollector(ipLimit, ipBurst, 30*time.Second, true /* deleteEmptyBuckets */)
opts := s.buildOptions(ipAddr, s.privKey)
h, err := libp2p.New(opts...)
if err != nil {
log.WithError(err).Error("Failed to create p2p host")
return nil, err
}
s.host = h
s.host.RemoveStreamHandler(identify.IDDelta)
// Gossipsub registration is done before we add in any new peers
// due to libp2p's gossipsub implementation not taking into
// account previously added peers when creating the gossipsub
// object.
psOpts := s.pubsubOptions()
// Set the pubsub global parameters that we require.
setPubSubParameters()
// Reinitialize them in the event we are running a custom config.
attestationSubnetCount = params.BeaconNetworkConfig().AttestationSubnetCount
syncCommsSubnetCount = params.BeaconConfig().SyncCommitteeSubnetCount
gs, err := pubsub.NewGossipSub(s.ctx, s.host, psOpts...)
if err != nil {
log.WithError(err).Error("Failed to start pubsub")
return nil, err
}
s.pubsub = gs
s.peers = peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: int(s.cfg.MaxPeers),
ScorerParams: &scorers.Config{
BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{
Threshold: maxBadResponses,
DecayInterval: time.Hour,
},
},
})
// Initialize Data maps.
types.InitializeDataMaps()
return s, nil
}
// Start the p2p service.
func (s *Service) Start() {
if s.started {
log.Error("Attempted to start p2p service when it was already started")
return
}
// Waits until the state is initialized via an event feed.
// Used for fork-related data when connecting peers.
s.awaitStateInitialized()
s.isPreGenesis = false
var peersToWatch []string
if s.cfg.RelayNodeAddr != "" {
peersToWatch = append(peersToWatch, s.cfg.RelayNodeAddr)
if err := dialRelayNode(s.ctx, s.host, s.cfg.RelayNodeAddr); err != nil {
log.WithError(err).Errorf("Could not dial relay node")
}
}
if !s.cfg.NoDiscovery {
ipAddr := prysmnetwork.IPAddr()
listener, err := s.startDiscoveryV5(
ipAddr,
s.privKey,
)
if err != nil {
log.WithError(err).Fatal("Failed to start discovery")
s.startupErr = err
return
}
err = s.connectToBootnodes()
if err != nil {
log.WithError(err).Error("Could not add bootnode to the exclusion list")
s.startupErr = err
return
}
s.dv5Listener = listener
go s.listenForNewNodes()
}
s.started = true
if len(s.cfg.StaticPeers) > 0 {
addrs, err := PeersFromStringAddrs(s.cfg.StaticPeers)
if err != nil {
log.WithError(err).Error("Could not connect to static peer")
}
s.connectWithAllPeers(addrs)
}
// Initialize metadata according to the
// current epoch.
s.RefreshENR()
// if the current epoch is beyond bellatrix, increase the
// MaxGossipSize and MaxChunkSize to 10Mb.
s.increaseMaxMessageSizesForBellatrix()
// Periodic functions.
async.RunEvery(s.ctx, params.BeaconNetworkConfig().TtfbTimeout, func() {
ensurePeerConnections(s.ctx, s.host, peersToWatch...)
})
async.RunEvery(s.ctx, 30*time.Minute, s.Peers().Prune)
async.RunEvery(s.ctx, params.BeaconNetworkConfig().RespTimeout, s.updateMetrics)
async.RunEvery(s.ctx, refreshRate, s.RefreshENR)
async.RunEvery(s.ctx, 1*time.Minute, func() {
log.WithFields(logrus.Fields{
"inbound": len(s.peers.InboundConnected()),
"outbound": len(s.peers.OutboundConnected()),
"activePeers": len(s.peers.Active()),
}).Info("Peer summary")
})
multiAddrs := s.host.Network().ListenAddresses()
logIPAddr(s.host.ID(), multiAddrs...)
p2pHostAddress := s.cfg.HostAddress
p2pTCPPort := s.cfg.TCPPort
if p2pHostAddress != "" {
logExternalIPAddr(s.host.ID(), p2pHostAddress, p2pTCPPort)
verifyConnectivity(p2pHostAddress, p2pTCPPort, "tcp")
}
p2pHostDNS := s.cfg.HostDNS
if p2pHostDNS != "" {
logExternalDNSAddr(s.host.ID(), p2pHostDNS, p2pTCPPort)
}
go s.forkWatcher()
}
// Stop the p2p service and terminate all peer connections.
func (s *Service) Stop() error {
defer s.cancel()
s.started = false
if s.dv5Listener != nil {
s.dv5Listener.Close()
}
return nil
}
// Status of the p2p service. Will return an error if the service is considered unhealthy to
// indicate that this node should not serve traffic until the issue has been resolved.
func (s *Service) Status() error {
if s.isPreGenesis {
return nil
}
if !s.started {
return errors.New("not running")
}
if s.startupErr != nil {
return s.startupErr
}
if s.genesisTime.IsZero() {
return errors.New("no genesis time set")
}
return nil
}
// Started returns true if the p2p service has successfully started.
func (s *Service) Started() bool {
return s.started
}
// Encoding returns the configured networking encoding.
func (_ *Service) Encoding() encoder.NetworkEncoding {
return &encoder.SszNetworkEncoder{}
}
// PubSub returns the p2p pubsub framework.
func (s *Service) PubSub() *pubsub.PubSub {
return s.pubsub
}
// Host returns the currently running libp2p
// host of the service.
func (s *Service) Host() host.Host {
return s.host
}
// SetStreamHandler sets the protocol handler on the p2p host multiplexer.
// This method is a pass through to libp2pcore.Host.SetStreamHandler.
func (s *Service) SetStreamHandler(topic string, handler network.StreamHandler) {
s.host.SetStreamHandler(protocol.ID(topic), handler)
}
// PeerID returns the Peer ID of the local peer.
func (s *Service) PeerID() peer.ID {
return s.host.ID()
}
// Disconnect from a peer.
func (s *Service) Disconnect(pid peer.ID) error {
return s.host.Network().ClosePeer(pid)
}
// Connect to a specific peer.
func (s *Service) Connect(pi peer.AddrInfo) error {
return s.host.Connect(s.ctx, pi)
}
// Peers returns the peer status interface.
func (s *Service) Peers() *peers.Status {
return s.peers
}
// ENR returns the local node's current ENR.
func (s *Service) ENR() *enr.Record {
if s.dv5Listener == nil {
return nil
}
return s.dv5Listener.Self().Record()
}
// DiscoveryAddresses represents our enr addresses as multiaddresses.
func (s *Service) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
if s.dv5Listener == nil {
return nil, nil
}
return convertToUdpMultiAddr(s.dv5Listener.Self())
}
// Metadata returns a copy of the peer's metadata.
func (s *Service) Metadata() metadata.Metadata {
return s.metaData.Copy()
}
// MetadataSeq returns the metadata sequence number.
func (s *Service) MetadataSeq() uint64 {
return s.metaData.SequenceNumber()
}
// AddPingMethod adds the metadata ping rpc method to the p2p service, so that it can
// be used to refresh ENR.
func (s *Service) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) {
s.pingMethod = reqFunc
}
func (s *Service) pingPeers() {
if s.pingMethod == nil {
return
}
for _, pid := range s.peers.Connected() {
go func(id peer.ID) {
if err := s.pingMethod(s.ctx, id); err != nil {
log.WithField("peer", id).WithError(err).Debug("Failed to ping peer")
}
}(pid)
}
}
// Waits for the beacon state to be initialized, important
// for initializing the p2p service as p2p needs to be aware
// of genesis information for peering.
func (s *Service) awaitStateInitialized() {
s.initializationLock.Lock()
defer s.initializationLock.Unlock()
if s.isInitialized() {
return
}
stateChannel := make(chan *feed.Event, 1)
stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel)
cleanup := stateSub.Unsubscribe
defer cleanup()
for {
select {
case event := <-stateChannel:
if event.Type == statefeed.Initialized {
data, ok := event.Data.(*statefeed.InitializedData)
if !ok {
// log.Fatalf will prevent defer from being called
cleanup()
log.Fatalf("Received wrong data over state initialized feed: %v", data)
}
s.genesisTime = data.StartTime
s.genesisValidatorsRoot = data.GenesisValidatorsRoot
_, err := s.currentForkDigest() // initialize fork digest cache
if err != nil {
log.WithError(err).Error("Could not initialize fork digest")
}
return
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return
}
}
}
func (s *Service) connectWithAllPeers(multiAddrs []multiaddr.Multiaddr) {
addrInfos, err := peer.AddrInfosFromP2pAddrs(multiAddrs...)
if err != nil {
log.WithError(err).Error("Could not convert to peer address info's from multiaddresses")
return
}
for _, info := range addrInfos {
// make each dial non-blocking
go func(info peer.AddrInfo) {
if err := s.connectWithPeer(s.ctx, info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
}(info)
}
}
func (s *Service) connectWithPeer(ctx context.Context, info peer.AddrInfo) error {
ctx, span := trace.StartSpan(ctx, "p2p.connectWithPeer")
defer span.End()
if info.ID == s.host.ID() {
return nil
}
if s.Peers().IsBad(info.ID) {
return errors.New("refused to connect to bad peer")
}
ctx, cancel := context.WithTimeout(ctx, maxDialTimeout)
defer cancel()
if err := s.host.Connect(ctx, info); err != nil {
s.Peers().Scorers().BadResponsesScorer().Increment(info.ID)
return err
}
return nil
}
func (s *Service) connectToBootnodes() error {
nodes := make([]*enode.Node, 0, len(s.cfg.Discv5BootStrapAddr))
for _, addr := range s.cfg.Discv5BootStrapAddr {
bootNode, err := enode.Parse(enode.ValidSchemes, addr)
if err != nil {
return err
}
// do not dial bootnodes with their tcp ports not set
if err := bootNode.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
if !enr.IsNotFound(err) {
log.WithError(err).Error("Could not retrieve tcp port")
}
continue
}
nodes = append(nodes, bootNode)
}
multiAddresses := convertToMultiAddr(nodes)
s.connectWithAllPeers(multiAddresses)
return nil
}
// Returns true if the service is aware of the genesis time and genesis validators root. This is
// required for discovery and pubsub validation.
func (s *Service) isInitialized() bool {
return !s.genesisTime.IsZero() && len(s.genesisValidatorsRoot) == 32
}
// increaseMaxMessageSizesForBellatrix increases the max sizes of gossip and chunk from 1 Mb to 10Mb,
// if the current epoch is or above the configured BellatrixForkEpoch.
func (s *Service) increaseMaxMessageSizesForBellatrix() {
currentSlot := slots.Since(s.genesisTime)
currentEpoch := slots.ToEpoch(currentSlot)
if currentEpoch >= params.BeaconConfig().BellatrixForkEpoch {
encoder.SetMaxGossipSizeForBellatrix()
encoder.SetMaxChunkSizeForBellatrix()
}
}