forked from 0xProject/0x-mesh
-
Notifications
You must be signed in to change notification settings - Fork 0
/
message_handler.go
215 lines (197 loc) · 7.04 KB
/
message_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
package core
import (
"context"
"github.com/0xProject/0x-mesh/constants"
"github.com/0xProject/0x-mesh/encoding"
"github.com/0xProject/0x-mesh/meshdb"
"github.com/0xProject/0x-mesh/p2p"
"github.com/0xProject/0x-mesh/zeroex"
"github.com/0xProject/0x-mesh/zeroex/ordervalidator"
"github.com/ethereum/go-ethereum/common"
log "github.com/sirupsen/logrus"
)
// Ensure that App implements p2p.MessageHandler.
var _ p2p.MessageHandler = &App{}
type orderSelector struct {
nextOffset int
db *meshdb.MeshDB
}
func min(a int, b int) int {
if a < b {
return a
}
return b
}
func (app *App) GetMessagesToShare(max int) ([][]byte, error) {
return app.orderSelector.GetMessagesToShare(max)
}
func (orderSelector *orderSelector) GetMessagesToShare(max int) ([][]byte, error) {
// For now, we use a round robin strategy to select a set of orders to share.
// We might return less than max even if there are max or greater orders
// currently stored.
// Use a snapshot to make sure state doesn't change between our two queries.
ordersSnapshot, err := orderSelector.db.Orders.GetSnapshot()
if err != nil {
return nil, err
}
defer ordersSnapshot.Release()
notRemovedFilter := orderSelector.db.Orders.IsRemovedIndex.ValueFilter([]byte{0})
count, err := ordersSnapshot.NewQuery(notRemovedFilter).Count()
if err != nil {
return nil, err
}
if count == 0 {
return nil, nil
}
// Select up to the maximum number of orders starting at the offset that was
// calculated the last time this was called with `app`.
offset := min(orderSelector.nextOffset, count)
var selectedOrders []*meshdb.Order
if offset < count {
err = ordersSnapshot.NewQuery(notRemovedFilter).Offset(offset).Max(max).Run(&selectedOrders)
if err != nil {
return nil, err
}
}
// If more orders can be shared than were selected, append the maximum amount of
// unique (in this round) orders that can be added to the selected orders without
// exceeding the maximum number to share.
overflow := min(max-len(selectedOrders), offset)
if overflow > 0 {
var overflowSelectedOrders []*meshdb.Order
err = ordersSnapshot.NewQuery(notRemovedFilter).Offset(0).Max(overflow).Run(&overflowSelectedOrders)
if err != nil {
return nil, err
}
selectedOrders = append(selectedOrders, overflowSelectedOrders...)
orderSelector.nextOffset = overflow
} else {
// Calculate the next offset and wrap back to 0 if the next offset is larger
// than or equal to count.
orderSelector.nextOffset += max
if orderSelector.nextOffset >= count {
orderSelector.nextOffset = 0
}
}
log.WithFields(map[string]interface{}{
"maxNumberToShare": max,
"actualNumberToShare": len(selectedOrders),
}).Trace("preparing to share orders with peers")
// After we have selected all the orders to share, we need to encode them to
// the message data format.
messageData := make([][]byte, len(selectedOrders))
for i, order := range selectedOrders {
log.WithFields(map[string]interface{}{
"order": order,
}).Trace("selected order to share")
encoded, err := encoding.OrderToRawMessage(order.SignedOrder)
if err != nil {
return nil, err
}
messageData[i] = encoded
}
return messageData, nil
}
func (app *App) HandleMessages(ctx context.Context, messages []*p2p.Message) error {
// First we validate the messages and decode them into orders.
orders := []*zeroex.SignedOrder{}
orderHashToMessage := map[common.Hash]*p2p.Message{}
for _, msg := range messages {
if err := validateMessageSize(msg); err != nil {
log.WithFields(map[string]interface{}{
"error": err,
"from": msg.From,
"maxOrderSizeInBytes": constants.MaxOrderSizeInBytes,
"actualSizeInBytes": len(msg.Data),
}).Trace("received message that exceeds maximum size")
app.handlePeerScoreEvent(msg.From, psInvalidMessage)
continue
}
result, err := app.schemaValidateMeshMessage(msg.Data)
if err != nil {
log.WithFields(map[string]interface{}{
"error": err,
"from": msg.From,
}).Trace("could not schema validate message")
app.handlePeerScoreEvent(msg.From, psInvalidMessage)
continue
}
if !result.Valid() {
formattedErrors := make([]string, len(result.Errors()))
for i, resultError := range result.Errors() {
formattedErrors[i] = resultError.String()
}
log.WithFields(map[string]interface{}{
"errors": formattedErrors,
"from": msg.From,
}).Trace("order schema validation failed for message")
app.handlePeerScoreEvent(msg.From, psInvalidMessage)
continue
}
order, err := encoding.RawMessageToOrder(msg.Data)
if err != nil {
log.WithFields(map[string]interface{}{
"error": err,
"from": msg.From,
}).Trace("could not decode received message")
app.handlePeerScoreEvent(msg.From, psInvalidMessage)
continue
}
orderHash, err := order.ComputeOrderHash()
if err != nil {
return err
}
// Validate doesn't guarantee there are no duplicates so we keep track of
// which orders we've already seen.
if _, alreadySeen := orderHashToMessage[orderHash]; alreadySeen {
continue
}
orders = append(orders, order)
orderHashToMessage[orderHash] = msg
app.handlePeerScoreEvent(msg.From, psValidMessage)
}
// Next, we validate the orders.
validationResults, err := app.orderWatcher.ValidateAndStoreValidOrders(ctx, orders, false, app.chainID)
if err != nil {
return err
}
// Store any valid orders and update the peer scores.
for _, acceptedOrderInfo := range validationResults.Accepted {
// If the order isn't new, we don't log it's receipt or adjust peer scores
if !acceptedOrderInfo.IsNew {
continue
}
msg := orderHashToMessage[acceptedOrderInfo.OrderHash]
// If we've reached this point, the message is valid, we were able to
// decode it into an order and check that this order is valid. Update
// peer scores accordingly.
log.WithFields(map[string]interface{}{
"orderHash": acceptedOrderInfo.OrderHash.Hex(),
"from": msg.From.String(),
}).Info("received new valid order from peer")
log.WithFields(map[string]interface{}{
"order": acceptedOrderInfo.SignedOrder,
"orderHash": acceptedOrderInfo.OrderHash.Hex(),
"from": msg.From.String(),
}).Trace("all fields for new valid order received from peer")
app.handlePeerScoreEvent(msg.From, psOrderStored)
}
// We don't store invalid orders, but in some cases still need to update peer
// scores.
for _, rejectedOrderInfo := range validationResults.Rejected {
msg := orderHashToMessage[rejectedOrderInfo.OrderHash]
log.WithFields(map[string]interface{}{
"rejectedOrderInfo": rejectedOrderInfo,
"from": msg.From.String(),
}).Trace("not storing rejected order received from peer")
switch rejectedOrderInfo.Status {
case ordervalidator.ROInternalError, ordervalidator.ROEthRPCRequestFailed, ordervalidator.ROCoordinatorRequestFailed, ordervalidator.RODatabaseFullOfOrders:
// Don't incur a negative score for these status types (it might not be
// their fault).
default:
// For other status types, we need to update the peer's score
app.handlePeerScoreEvent(msg.From, psInvalidMessage)
}
}
return nil
}