-
Notifications
You must be signed in to change notification settings - Fork 985
/
pubsub_filter.go
110 lines (98 loc) · 3.69 KB
/
pubsub_filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package p2p
import (
"fmt"
"strings"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/network/forks"
)
var _ pubsub.SubscriptionFilter = (*Service)(nil)
// It is set at this limit to handle the possibility
// of double topic subscriptions at fork boundaries.
// -> 64 Attestation Subnets * 2.
// -> 4 Sync Committee Subnets * 2.
// -> Block,Aggregate,ProposerSlashing,AttesterSlashing,Exits,SyncContribution * 2.
const pubsubSubscriptionRequestLimit = 200
// CanSubscribe returns true if the topic is of interest and we could subscribe to it.
func (s *Service) CanSubscribe(topic string) bool {
if !s.isInitialized() {
return false
}
parts := strings.Split(topic, "/")
if len(parts) != 5 {
return false
}
// The topic must start with a slash, which means the first part will be empty.
if parts[0] != "" {
return false
}
if parts[1] != "eth2" {
return false
}
phase0ForkDigest, err := s.currentForkDigest()
if err != nil {
log.WithError(err).Error("Could not determine fork digest")
return false
}
altairForkDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, s.genesisValidatorsRoot)
if err != nil {
log.WithError(err).Error("Could not determine altair fork digest")
return false
}
bellatrixForkDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().BellatrixForkEpoch, s.genesisValidatorsRoot)
if err != nil {
log.WithError(err).Error("Could not determine Bellatrix fork digest")
return false
}
capellaForkDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().CapellaForkEpoch, s.genesisValidatorsRoot)
if err != nil {
log.WithError(err).Error("Could not determine Capella fork digest")
return false
}
switch parts[2] {
case fmt.Sprintf("%x", phase0ForkDigest):
case fmt.Sprintf("%x", altairForkDigest):
case fmt.Sprintf("%x", bellatrixForkDigest):
case fmt.Sprintf("%x", capellaForkDigest):
default:
return false
}
if parts[4] != encoder.ProtocolSuffixSSZSnappy {
return false
}
// Check the incoming topic matches any topic mapping. This includes a check for part[3].
for gt := range gossipTopicMappings {
if _, err := scanfcheck(strings.Join(parts[0:4], "/"), gt); err == nil {
return true
}
}
return false
}
// FilterIncomingSubscriptions is invoked for all RPCs containing subscription notifications.
// This method returns only the topics of interest and may return an error if the subscription
// request contains too many topics.
func (s *Service) FilterIncomingSubscriptions(_ peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error) {
if len(subs) > pubsubSubscriptionRequestLimit {
return nil, pubsub.ErrTooManySubscriptions
}
return pubsub.FilterSubscriptions(subs, s.CanSubscribe), nil
}
// scanfcheck uses fmt.Sscanf to check that a given string matches expected format. This method
// returns the number of formatting substitutions matched and error if the string does not match
// the expected format. Note: this method only accepts integer compatible formatting substitutions
// such as %d or %x.
func scanfcheck(input, format string) (int, error) {
var t int
// Sscanf requires argument pointers with the appropriate type to load the value from the input.
// This method only checks that the input conforms to the format, the arguments are not used and
// therefore we can reuse the same integer pointer.
var cnt = strings.Count(format, "%")
var args []interface{}
for i := 0; i < cnt; i++ {
args = append(args, &t)
}
return fmt.Sscanf(input, format, args...)
}