-
Notifications
You must be signed in to change notification settings - Fork 245
/
transport.go
173 lines (146 loc) · 5.11 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package datasync
import (
"context"
"crypto/ecdsa"
"errors"
"math"
"math/rand"
"time"
"github.com/golang/protobuf/proto"
"github.com/vacp2p/mvds/protobuf"
"github.com/vacp2p/mvds/state"
"github.com/vacp2p/mvds/transport"
"go.uber.org/zap"
datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer"
)
const backoffInterval = 30
var errNotInitialized = errors.New("Datasync transport not initialized")
var DatasyncTicker = 300 * time.Millisecond
// It's easier to calculate nextEpoch if we consider seconds as a unit rather than
// 300 ms, so we multiply the result by the ratio
var offsetToSecond = uint64(time.Second / DatasyncTicker)
// payloadTagSize is the tag size for the protobuf.Payload message which is number of fields * 2 bytes
var payloadTagSize = 14
// timestampPayloadSize is the maximum size in bytes for the timestamp field (uint64)
var timestampPayloadSize = 10
type NodeTransport struct {
packets chan transport.Packet
logger *zap.Logger
maxMessageSize uint32
dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error
}
func NewNodeTransport() *NodeTransport {
return &NodeTransport{
packets: make(chan transport.Packet),
}
}
func (t *NodeTransport) Init(dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error, maxMessageSize uint32, logger *zap.Logger) {
t.dispatch = dispatch
t.maxMessageSize = maxMessageSize
t.logger = logger
}
func (t *NodeTransport) AddPacket(p transport.Packet) {
t.packets <- p
}
func (t *NodeTransport) Watch() transport.Packet {
return <-t.packets
}
func (t *NodeTransport) Send(_ state.PeerID, peer state.PeerID, payload protobuf.Payload) error {
if t.dispatch == nil {
return errNotInitialized
}
payloads := splitPayloadInBatches(&payload, int(t.maxMessageSize))
for _, payload := range payloads {
if !payload.IsValid() {
t.logger.Error("payload is invalid")
continue
}
marshalledPayload, err := proto.Marshal(payload)
if err != nil {
t.logger.Error("failed to marshal payload")
continue
}
publicKey, err := datasyncpeer.IDToPublicKey(peer)
if err != nil {
t.logger.Error("failed to conver id to public key", zap.Error(err))
continue
}
// We don't return an error otherwise datasync will keep
// re-trying sending at each epoch
err = t.dispatch(context.Background(), publicKey, marshalledPayload, payload)
if err != nil {
t.logger.Error("failed to send message", zap.Error(err))
continue
}
}
return nil
}
func splitPayloadInBatches(payload *protobuf.Payload, maxSizeBytes int) []*protobuf.Payload {
newPayload := &protobuf.Payload{}
var response []*protobuf.Payload
currentSize := payloadTagSize
// this is not going to be 100% accurate, but should be fine in most cases, faster
// than using proto.Size
for _, ack := range payload.Acks {
if len(ack)+currentSize+1 > maxSizeBytes {
// We check if it's valid as it might be that the initial message
// is too big, in this case we still batch it
if newPayload.IsValid() {
response = append(response, newPayload)
}
newPayload = &protobuf.Payload{Acks: [][]byte{ack}}
currentSize = len(ack) + payloadTagSize + 1
} else {
newPayload.Acks = append(newPayload.Acks, ack)
currentSize += len(ack)
}
}
for _, offer := range payload.Offers {
if len(offer)+currentSize+1 > maxSizeBytes {
if newPayload.IsValid() {
response = append(response, newPayload)
}
newPayload = &protobuf.Payload{Offers: [][]byte{offer}}
currentSize = len(offer) + payloadTagSize + 1
} else {
newPayload.Offers = append(newPayload.Offers, offer)
currentSize += len(offer)
}
}
for _, request := range payload.Requests {
if len(request)+currentSize+1 > maxSizeBytes {
if newPayload.IsValid() {
response = append(response, newPayload)
}
newPayload = &protobuf.Payload{Requests: [][]byte{request}}
currentSize = len(request) + payloadTagSize + 1
} else {
newPayload.Requests = append(newPayload.Requests, request)
currentSize += len(request)
}
}
for _, message := range payload.Messages {
// We add the body size, the length field for payload, the length field for group id,
// the length of timestamp, body and groupid
if currentSize+1+1+timestampPayloadSize+len(message.Body)+len(message.GroupId) > maxSizeBytes {
if newPayload.IsValid() {
response = append(response, newPayload)
}
newPayload = &protobuf.Payload{Messages: []*protobuf.Message{message}}
currentSize = timestampPayloadSize + len(message.Body) + len(message.GroupId) + payloadTagSize + 1 + 1
} else {
newPayload.Messages = append(newPayload.Messages, message)
currentSize += len(message.Body) + len(message.GroupId) + timestampPayloadSize
}
}
if newPayload.IsValid() {
response = append(response, newPayload)
}
return response
}
// CalculateSendTime calculates the next epoch
// at which a message should be sent.
// We randomize it a bit so that not all messages are sent on the same epoch
func CalculateSendTime(count uint64, time int64) int64 {
return time + int64(uint64(math.Exp2(float64(count-1)))*backoffInterval*offsetToSecond) + int64(rand.Intn(30)) // nolint: gosec
}