-
Notifications
You must be signed in to change notification settings - Fork 387
/
endpoint.go
365 lines (312 loc) · 12 KB
/
endpoint.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package orders
import (
"bytes"
"context"
"io"
"sort"
"time"
"github.com/skyrings/skyring-common/tools/uuid"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/identity"
"storj.io/common/pb"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
)
// DB implements saving order after receiving from storage node
//
// architecture: Database
type DB interface {
// CreateSerialInfo creates serial number entry in database.
CreateSerialInfo(ctx context.Context, serialNumber storj.SerialNumber, bucketID []byte, limitExpiration time.Time) error
// UseSerialNumber creates a used serial number entry in database from an
// existing serial number.
// It returns the bucket ID associated to serialNumber.
UseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) ([]byte, error)
// UnuseSerialNumber removes pair serial number -> storage node id from database
UnuseSerialNumber(ctx context.Context, serialNumber storj.SerialNumber, storageNodeID storj.NodeID) error
// DeleteExpiredSerials deletes all expired serials in serial_number, used_serials, and consumed_serials table.
DeleteExpiredSerials(ctx context.Context, now time.Time) (_ int, err error)
// DeleteExpiredConsumedSerials deletes all expired serials in the consumed_serials table.
DeleteExpiredConsumedSerials(ctx context.Context, now time.Time) (_ int, err error)
// UpdateBucketBandwidthAllocation updates 'allocated' bandwidth for given bucket
UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
// UpdateBucketBandwidthSettle updates 'settled' bandwidth for given bucket
UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
// UpdateBucketBandwidthInline updates 'inline' bandwidth for given bucket
UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error
// UpdateStoragenodeBandwidthSettle updates 'settled' bandwidth for given storage node
UpdateStoragenodeBandwidthSettle(ctx context.Context, storageNode storj.NodeID, action pb.PieceAction, amount int64, intervalStart time.Time) error
// GetBucketBandwidth gets total bucket bandwidth from period of time
GetBucketBandwidth(ctx context.Context, projectID uuid.UUID, bucketName []byte, from, to time.Time) (int64, error)
// GetStorageNodeBandwidth gets total storage node bandwidth from period of time
GetStorageNodeBandwidth(ctx context.Context, nodeID storj.NodeID, from, to time.Time) (int64, error)
// ProcessOrders takes a list of order requests and processes them in a batch
ProcessOrders(ctx context.Context, requests []*ProcessOrderRequest) (responses []*ProcessOrderResponse, err error)
// WithTransaction runs the callback and provides it with a Transaction.
WithTransaction(ctx context.Context, cb func(ctx context.Context, tx Transaction) error) error
// WithQueue TODO: DOCS
WithQueue(ctx context.Context, cb func(ctx context.Context, queue Queue) error) error
}
// Transaction represents a database transaction but with higher level actions.
type Transaction interface {
// UpdateBucketBandwidthBatch updates all the bandwidth rollups in the database
UpdateBucketBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []BucketBandwidthRollup) error
// UpdateStoragenodeBandwidthBatch updates all the bandwidth rollups in the database
UpdateStoragenodeBandwidthBatch(ctx context.Context, intervalStart time.Time, rollups []StoragenodeBandwidthRollup) error
// CreateConsumedSerialsBatch TODO: DOCS
CreateConsumedSerialsBatch(ctx context.Context, consumedSerials []ConsumedSerial) (err error)
// HasConsumedSerial TODO: DOCS
HasConsumedSerial(ctx context.Context, nodeID storj.NodeID, serialNumber storj.SerialNumber) (bool, error)
}
// Queue TODO: DOCS
type Queue interface {
// GetPendingSerialsBatch TODO: DOCS
GetPendingSerialsBatch(ctx context.Context, size int) ([]PendingSerial, error)
}
// ConsumedSerial TODO: DOCS
type ConsumedSerial struct {
NodeID storj.NodeID
SerialNumber storj.SerialNumber
ExpiresAt time.Time
}
// PendingSerial is a serial number reported by a storagenode waiting to be
// settled
type PendingSerial struct {
NodeID storj.NodeID
BucketID []byte
Action uint
SerialNumber storj.SerialNumber
ExpiresAt time.Time
Settled uint64
}
var (
// Error the default orders errs class
Error = errs.Class("orders error")
// ErrUsingSerialNumber error class for serial number
ErrUsingSerialNumber = errs.Class("serial number")
mon = monkit.Package()
)
// BucketBandwidthRollup contains all the info needed for a bucket bandwidth rollup
type BucketBandwidthRollup struct {
ProjectID uuid.UUID
BucketName string
Action pb.PieceAction
Inline int64
Allocated int64
Settled int64
}
// SortBucketBandwidthRollups sorts the rollups
func SortBucketBandwidthRollups(rollups []BucketBandwidthRollup) {
sort.SliceStable(rollups, func(i, j int) bool {
uuidCompare := bytes.Compare(rollups[i].ProjectID[:], rollups[j].ProjectID[:])
switch {
case uuidCompare == -1:
return true
case uuidCompare == 1:
return false
case rollups[i].BucketName < rollups[j].BucketName:
return true
case rollups[i].BucketName > rollups[j].BucketName:
return false
case rollups[i].Action < rollups[j].Action:
return true
case rollups[i].Action > rollups[j].Action:
return false
default:
return false
}
})
}
// StoragenodeBandwidthRollup contains all the info needed for a storagenode bandwidth rollup
type StoragenodeBandwidthRollup struct {
NodeID storj.NodeID
Action pb.PieceAction
Allocated int64
Settled int64
}
// SortStoragenodeBandwidthRollups sorts the rollups
func SortStoragenodeBandwidthRollups(rollups []StoragenodeBandwidthRollup) {
sort.SliceStable(rollups, func(i, j int) bool {
nodeCompare := bytes.Compare(rollups[i].NodeID.Bytes(), rollups[j].NodeID.Bytes())
switch {
case nodeCompare == -1:
return true
case nodeCompare == 1:
return false
case rollups[i].Action < rollups[j].Action:
return true
case rollups[i].Action > rollups[j].Action:
return false
default:
return false
}
})
}
// ProcessOrderRequest for batch order processing
type ProcessOrderRequest struct {
Order *pb.Order
OrderLimit *pb.OrderLimit
}
// ProcessOrderResponse for batch order processing responses
type ProcessOrderResponse struct {
SerialNumber storj.SerialNumber
Status pb.SettlementResponse_Status
}
// Endpoint for orders receiving
//
// architecture: Endpoint
type Endpoint struct {
log *zap.Logger
satelliteSignee signing.Signee
DB DB
settlementBatchSize int
}
// drpcEndpoint wraps streaming methods so that they can be used with drpc
type drpcEndpoint struct{ *Endpoint }
// DRPC returns a DRPC form of the endpoint.
func (endpoint *Endpoint) DRPC() pb.DRPCOrdersServer { return &drpcEndpoint{Endpoint: endpoint} }
// NewEndpoint new orders receiving endpoint
func NewEndpoint(log *zap.Logger, satelliteSignee signing.Signee, db DB, settlementBatchSize int) *Endpoint {
return &Endpoint{
log: log,
satelliteSignee: satelliteSignee,
DB: db,
settlementBatchSize: settlementBatchSize,
}
}
func monitoredSettlementStreamReceive(ctx context.Context, stream settlementStream) (_ *pb.SettlementRequest, err error) {
defer mon.Task()(&ctx)(&err)
return stream.Recv()
}
func monitoredSettlementStreamSend(ctx context.Context, stream settlementStream, resp *pb.SettlementResponse) (err error) {
defer mon.Task()(&ctx)(&err)
switch resp.Status {
case pb.SettlementResponse_ACCEPTED:
mon.Event("settlement_response_accepted")
case pb.SettlementResponse_REJECTED:
mon.Event("settlement_response_rejected")
default:
mon.Event("settlement_response_unknown")
}
return stream.Send(resp)
}
// Settlement receives orders and handles them in batches
func (endpoint *Endpoint) Settlement(stream pb.Orders_SettlementServer) (err error) {
return endpoint.doSettlement(stream)
}
// Settlement receives orders and handles them in batches
func (endpoint *drpcEndpoint) Settlement(stream pb.DRPCOrders_SettlementStream) (err error) {
return endpoint.doSettlement(stream)
}
// settlementStream is the minimum interface required to perform settlements.
type settlementStream interface {
Context() context.Context
Send(*pb.SettlementResponse) error
Recv() (*pb.SettlementRequest, error)
}
// doSettlement receives orders and handles them in batches
func (endpoint *Endpoint) doSettlement(stream settlementStream) (err error) {
ctx := stream.Context()
defer mon.Task()(&ctx)(&err)
peer, err := identity.PeerIdentityFromContext(ctx)
if err != nil {
return rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
}
formatError := func(err error) error {
if err == io.EOF {
return nil
}
return rpcstatus.Error(rpcstatus.Unknown, err.Error())
}
log := endpoint.log.Named(peer.ID.String())
log.Debug("Settlement")
requests := make([]*ProcessOrderRequest, 0, endpoint.settlementBatchSize)
defer func() {
if len(requests) > 0 {
err = errs.Combine(err, endpoint.processOrders(ctx, stream, requests))
if err != nil {
err = formatError(err)
}
}
}()
for {
request, err := monitoredSettlementStreamReceive(ctx, stream)
if err != nil {
return formatError(err)
}
if request == nil {
return rpcstatus.Error(rpcstatus.InvalidArgument, "request missing")
}
if request.Limit == nil {
return rpcstatus.Error(rpcstatus.InvalidArgument, "order limit missing")
}
if request.Order == nil {
return rpcstatus.Error(rpcstatus.InvalidArgument, "order missing")
}
orderLimit := request.Limit
order := request.Order
if orderLimit.StorageNodeId != peer.ID {
return rpcstatus.Error(rpcstatus.Unauthenticated, "only specified storage node can settle order")
}
rejectErr := func() error {
// satellite verifies that it signed the order limit
if err := signing.VerifyOrderLimitSignature(ctx, endpoint.satelliteSignee, orderLimit); err != nil {
return Error.New("unable to verify order limit")
}
// satellite verifies that the order signature matches pub key in order limit
if err := signing.VerifyUplinkOrderSignature(ctx, orderLimit.UplinkPublicKey, order); err != nil {
return Error.New("unable to verify order")
}
// TODO should this reject or just error ??
if orderLimit.SerialNumber != order.SerialNumber {
return Error.New("invalid serial number")
}
if orderLimit.OrderExpiration.Before(time.Now()) {
return Error.New("order limit expired")
}
return nil
}()
if rejectErr != nil {
log.Debug("order limit/order verification failed", zap.Stringer("serial", orderLimit.SerialNumber), zap.Error(rejectErr))
err := monitoredSettlementStreamSend(ctx, stream, &pb.SettlementResponse{
SerialNumber: orderLimit.SerialNumber,
Status: pb.SettlementResponse_REJECTED,
})
if err != nil {
return formatError(err)
}
continue
}
requests = append(requests, &ProcessOrderRequest{Order: order, OrderLimit: orderLimit})
if len(requests) >= endpoint.settlementBatchSize {
err = endpoint.processOrders(ctx, stream, requests)
requests = requests[:0]
if err != nil {
return formatError(err)
}
}
}
}
func (endpoint *Endpoint) processOrders(ctx context.Context, stream settlementStream, requests []*ProcessOrderRequest) (err error) {
defer mon.Task()(&ctx)(&err)
responses, err := endpoint.DB.ProcessOrders(ctx, requests)
if err != nil {
return err
}
for _, response := range responses {
r := &pb.SettlementResponse{
SerialNumber: response.SerialNumber,
Status: response.Status,
}
err = monitoredSettlementStreamSend(ctx, stream, r)
if err != nil {
return err
}
}
return nil
}