-
Notifications
You must be signed in to change notification settings - Fork 1k
/
rpc_topic_mappings.go
237 lines (203 loc) · 7.5 KB
/
rpc_topic_mappings.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
package p2p
import (
"reflect"
"github.com/pkg/errors"
p2ptypes "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v3/config/params"
types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
pb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
)
// SchemaVersionV1 specifies the schema version for our rpc protocol ID.
const SchemaVersionV1 = "/1"
// SchemaVersionV2 specifies the next schema version for our rpc protocol ID.
const SchemaVersionV2 = "/2"
// Specifies the protocol prefix for all our Req/Resp topics.
const protocolPrefix = "/eth2/beacon_chain/req"
// StatusMessageName specifies the name for the status message topic.
const StatusMessageName = "/status"
// GoodbyeMessageName specifies the name for the goodbye message topic.
const GoodbyeMessageName = "/goodbye"
// BeaconBlocksByRangeMessageName specifies the name for the beacon blocks by range message topic.
const BeaconBlocksByRangeMessageName = "/beacon_blocks_by_range"
// BeaconBlocksByRootsMessageName specifies the name for the beacon blocks by root message topic.
const BeaconBlocksByRootsMessageName = "/beacon_blocks_by_root"
// PingMessageName Specifies the name for the ping message topic.
const PingMessageName = "/ping"
// MetadataMessageName specifies the name for the metadata message topic.
const MetadataMessageName = "/metadata"
const (
// V1 RPC Topics
// RPCStatusTopicV1 defines the v1 topic for the status rpc method.
RPCStatusTopicV1 = protocolPrefix + StatusMessageName + SchemaVersionV1
// RPCGoodByeTopicV1 defines the v1 topic for the goodbye rpc method.
RPCGoodByeTopicV1 = protocolPrefix + GoodbyeMessageName + SchemaVersionV1
// RPCBlocksByRangeTopicV1 defines v1 the topic for the blocks by range rpc method.
RPCBlocksByRangeTopicV1 = protocolPrefix + BeaconBlocksByRangeMessageName + SchemaVersionV1
// RPCBlocksByRootTopicV1 defines the v1 topic for the blocks by root rpc method.
RPCBlocksByRootTopicV1 = protocolPrefix + BeaconBlocksByRootsMessageName + SchemaVersionV1
// RPCPingTopicV1 defines the v1 topic for the ping rpc method.
RPCPingTopicV1 = protocolPrefix + PingMessageName + SchemaVersionV1
// RPCMetaDataTopicV1 defines the v1 topic for the metadata rpc method.
RPCMetaDataTopicV1 = protocolPrefix + MetadataMessageName + SchemaVersionV1
// V2 RPC Topics
// RPCBlocksByRangeTopicV2 defines v2 the topic for the blocks by range rpc method.
RPCBlocksByRangeTopicV2 = protocolPrefix + BeaconBlocksByRangeMessageName + SchemaVersionV2
// RPCBlocksByRootTopicV2 defines the v2 topic for the blocks by root rpc method.
RPCBlocksByRootTopicV2 = protocolPrefix + BeaconBlocksByRootsMessageName + SchemaVersionV2
// RPCMetaDataTopicV2 defines the v2 topic for the metadata rpc method.
RPCMetaDataTopicV2 = protocolPrefix + MetadataMessageName + SchemaVersionV2
)
// RPC errors for topic parsing.
const (
invalidRPCMessageType = "provided message type doesn't have a registered mapping"
)
// RPCTopicMappings map the base message type to the rpc request.
var RPCTopicMappings = map[string]interface{}{
// RPC Status Message
RPCStatusTopicV1: new(pb.Status),
// RPC Goodbye Message
RPCGoodByeTopicV1: new(types.SSZUint64),
// RPC Block By Range Message
RPCBlocksByRangeTopicV1: new(pb.BeaconBlocksByRangeRequest),
RPCBlocksByRangeTopicV2: new(pb.BeaconBlocksByRangeRequest),
// RPC Block By Root Message
RPCBlocksByRootTopicV1: new(p2ptypes.BeaconBlockByRootsReq),
RPCBlocksByRootTopicV2: new(p2ptypes.BeaconBlockByRootsReq),
// RPC Ping Message
RPCPingTopicV1: new(types.SSZUint64),
// RPC Metadata Message
RPCMetaDataTopicV1: new(interface{}),
RPCMetaDataTopicV2: new(interface{}),
}
// Maps all registered protocol prefixes.
var protocolMapping = map[string]bool{
protocolPrefix: true,
}
// Maps all the protocol message names for the different rpc
// topics.
var messageMapping = map[string]bool{
StatusMessageName: true,
GoodbyeMessageName: true,
BeaconBlocksByRangeMessageName: true,
BeaconBlocksByRootsMessageName: true,
PingMessageName: true,
MetadataMessageName: true,
}
// Maps all the RPC messages which are to updated in altair.
var altairMapping = map[string]bool{
BeaconBlocksByRangeMessageName: true,
BeaconBlocksByRootsMessageName: true,
MetadataMessageName: true,
}
var versionMapping = map[string]bool{
SchemaVersionV1: true,
SchemaVersionV2: true,
}
// VerifyTopicMapping verifies that the topic and its accompanying
// message type is correct.
func VerifyTopicMapping(topic string, msg interface{}) error {
msgType, ok := RPCTopicMappings[topic]
if !ok {
return errors.New("rpc topic is not registered currently")
}
receivedType := reflect.TypeOf(msg)
registeredType := reflect.TypeOf(msgType)
typeMatches := registeredType.AssignableTo(receivedType)
if !typeMatches {
return errors.Errorf("accompanying message type is incorrect for topic: wanted %v but got %v",
registeredType.String(), receivedType.String())
}
return nil
}
// TopicDeconstructor splits the provided topic to its logical sub-sections.
// It is assumed all input topics will follow the specific schema:
// /protocol-prefix/message-name/schema-version/...
// For the purposes of deconstruction, only the first 3 components are
// relevant.
func TopicDeconstructor(topic string) (string, string, string, error) {
origTopic := topic
protPrefix := ""
message := ""
version := ""
// Iterate through all the relevant mappings to find the relevant prefixes,messages
// and version for this topic.
for k := range protocolMapping {
keyLen := len(k)
if keyLen > len(topic) {
continue
}
if topic[:keyLen] == k {
protPrefix = k
topic = topic[keyLen:]
}
}
if protPrefix == "" {
return "", "", "", errors.Errorf("unable to find a valid protocol prefix for %s", origTopic)
}
for k := range messageMapping {
keyLen := len(k)
if keyLen > len(topic) {
continue
}
if topic[:keyLen] == k {
message = k
topic = topic[keyLen:]
}
}
if message == "" {
return "", "", "", errors.Errorf("unable to find a valid message for %s", origTopic)
}
for k := range versionMapping {
keyLen := len(k)
if keyLen > len(topic) {
continue
}
if topic[:keyLen] == k {
version = k
topic = topic[keyLen:]
}
}
if version == "" {
return "", "", "", errors.Errorf("unable to find a valid schema version for %s", origTopic)
}
return protPrefix, message, version, nil
}
// RPCTopic is a type used to denote and represent a req/resp topic.
type RPCTopic string
// ProtocolPrefix returns the protocol prefix of the rpc topic.
func (r RPCTopic) ProtocolPrefix() string {
prefix, _, _, err := TopicDeconstructor(string(r))
if err != nil {
return ""
}
return prefix
}
// MessageType returns the message type of the rpc topic.
func (r RPCTopic) MessageType() string {
_, message, _, err := TopicDeconstructor(string(r))
if err != nil {
return ""
}
return message
}
// Version returns the schema version of the rpc topic.
func (r RPCTopic) Version() string {
_, _, version, err := TopicDeconstructor(string(r))
if err != nil {
return ""
}
return version
}
// TopicFromMessage constructs the rpc topic from the provided message
// type and epoch.
func TopicFromMessage(msg string, epoch types.Epoch) (string, error) {
if !messageMapping[msg] {
return "", errors.Errorf("%s: %s", invalidRPCMessageType, msg)
}
version := SchemaVersionV1
isAltair := epoch >= params.BeaconConfig().AltairForkEpoch
if isAltair && altairMapping[msg] {
version = SchemaVersionV2
}
return protocolPrefix + msg + version, nil
}