-
Notifications
You must be signed in to change notification settings - Fork 215
/
message.go
128 lines (108 loc) · 3.36 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package p2p
import (
"context"
"errors"
"fmt"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p/p2pcrypto"
"github.com/spacemeshos/go-spacemesh/p2p/service"
)
type directProtocolMessage struct {
metadata service.P2PMetadata
sender p2pcrypto.PublicKey
data service.Data
}
func (pm directProtocolMessage) Metadata() service.P2PMetadata {
return pm.metadata
}
func (pm directProtocolMessage) Sender() p2pcrypto.PublicKey {
return pm.sender
}
func (pm directProtocolMessage) Data() service.Data {
return pm.data
}
func (pm directProtocolMessage) Bytes() []byte {
return pm.data.Bytes()
}
type gossipProtocolMessage struct {
sender p2pcrypto.PublicKey
ownMessage bool
data service.Data
validationChan chan service.MessageValidation
requestID string
}
func (pm gossipProtocolMessage) Sender() p2pcrypto.PublicKey {
return pm.sender // DirectSender
}
func (pm gossipProtocolMessage) IsOwnMessage() bool {
return pm.ownMessage
}
func (pm gossipProtocolMessage) Data() service.Data {
return pm.data
}
func (pm gossipProtocolMessage) RequestID() string {
return pm.requestID
}
func (pm gossipProtocolMessage) Bytes() []byte {
return pm.data.Bytes()
}
func (pm gossipProtocolMessage) ValidationCompletedChan() chan service.MessageValidation {
return pm.validationChan
}
func (pm gossipProtocolMessage) ReportValidation(ctx context.Context, protocol string) {
if pm.validationChan != nil {
// TODO(dshulyak) this definitely should not be logged in message data structure
log.AppLog.WithContext(ctx).With().Debug("reporting valid gossip message",
log.String("protocol", protocol),
log.String("requestId", pm.requestID),
log.FieldNamed("sender", pm.sender),
log.Int("validation_chan_len", len(pm.validationChan)))
pm.validationChan <- service.NewMessageValidation(pm.sender, pm.Bytes(), protocol, pm.requestID)
}
}
// ProtocolMessageMetadata is a general p2p message wrapper
type ProtocolMessageMetadata struct {
NextProtocol string
ClientVersion string
Timestamp int64
AuthPubkey []byte
NetworkID int32
}
// Payload holds either a byte array or a wrapped req-res message.
type Payload struct {
Payload []byte
Wrapped *service.DataMsgWrapper
}
// ProtocolMessage is a pair of metadata and a a payload.
type ProtocolMessage struct {
Metadata *ProtocolMessageMetadata
Payload *Payload
}
// CreatePayload is a helper function to format a payload for sending.
func CreatePayload(data service.Data) (*Payload, error) {
switch x := data.(type) {
case service.DataBytes:
if x.Payload == nil {
return nil, fmt.Errorf("unable to send empty payload")
}
return &Payload{Payload: x.Bytes()}, nil
case *service.DataMsgWrapper:
return &Payload{Wrapped: x}, nil
case nil:
return nil, fmt.Errorf("unable to send empty payload")
default:
}
return nil, fmt.Errorf("unable to determine payload type")
}
// ExtractData is a helper function to extract the payload data from a message payload.
func ExtractData(pm *Payload) (service.Data, error) {
var data service.Data
if payload := pm.Payload; payload != nil {
data = &service.DataBytes{Payload: payload}
} else if wrap := pm.Wrapped; wrap != nil {
data = &service.DataMsgWrapper{Req: wrap.Req, MsgType: wrap.MsgType, ReqID: wrap.ReqID, Payload: wrap.Payload}
} else {
return nil, errors.New("not valid data type")
}
return data, nil
}