-
Notifications
You must be signed in to change notification settings - Fork 394
/
sender.go
234 lines (194 loc) · 6.09 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package orders
import (
"context"
"io"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
monkit "gopkg.in/spacemonkeygo/monkit.v2"
"storj.io/storj/internal/sync2"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/kademlia"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
)
var (
// OrderError represents errors with orders
OrderError = errs.Class("order")
mon = monkit.Package()
)
// Info contains full information about an order.
type Info struct {
Limit *pb.OrderLimit2
Order *pb.Order2
Uplink *identity.PeerIdentity
}
// ArchivedInfo contains full information about an archived order.
type ArchivedInfo struct {
Limit *pb.OrderLimit2
Order *pb.Order2
Uplink *identity.PeerIdentity
Status Status
ArchivedAt time.Time
}
// Status is the archival status of the order.
type Status byte
// Statuses for satellite responses.
const (
StatusUnsent Status = iota
StatusAccepted
StatusRejected
)
// DB implements storing orders for sending to the satellite.
type DB interface {
// Enqueue inserts order to the list of orders needing to be sent to the satellite.
Enqueue(ctx context.Context, info *Info) error
// ListUnsent returns orders that haven't been sent yet.
ListUnsent(ctx context.Context, limit int) ([]*Info, error)
// ListUnsentBySatellite returns orders that haven't been sent yet grouped by satellite.
ListUnsentBySatellite(ctx context.Context) (map[storj.NodeID][]*Info, error)
// Archive marks order as being handled.
Archive(ctx context.Context, satellite storj.NodeID, serial storj.SerialNumber, status Status) error
// ListArchived returns orders that have been sent.
ListArchived(ctx context.Context, limit int) ([]*ArchivedInfo, error)
}
// SenderConfig defines configuration for sending orders.
type SenderConfig struct {
Interval time.Duration `help:"duration between sending" default:"1h0m0s"`
Timeout time.Duration `help:"timeout for sending" default:"1h0m0s"`
}
// Sender sends every interval unsent orders to the satellite.
type Sender struct {
log *zap.Logger
config SenderConfig
transport transport.Client
kademlia *kademlia.Kademlia
orders DB
Loop sync2.Cycle
}
// NewSender creates an order sender.
func NewSender(log *zap.Logger, transport transport.Client, kademlia *kademlia.Kademlia, orders DB, config SenderConfig) *Sender {
return &Sender{
log: log,
transport: transport,
kademlia: kademlia,
orders: orders,
config: config,
Loop: *sync2.NewCycle(config.Interval),
}
}
// Run sends orders on every interval to the appropriate satellites.
func (sender *Sender) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return sender.Loop.Run(ctx, sender.runOnce)
}
func (sender *Sender) runOnce(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
sender.log.Debug("sending")
ordersBySatellite, err := sender.orders.ListUnsentBySatellite(ctx)
if err != nil {
sender.log.Error("listing orders", zap.Error(err))
return nil
}
if len(ordersBySatellite) > 0 {
var group errgroup.Group
ctx, cancel := context.WithTimeout(ctx, sender.config.Timeout)
defer cancel()
for satelliteID, orders := range ordersBySatellite {
satelliteID, orders := satelliteID, orders
group.Go(func() error {
sender.Settle(ctx, satelliteID, orders)
return nil
})
}
_ = group.Wait() // doesn't return errors
} else {
sender.log.Debug("no orders to send")
}
return nil
}
// Settle uploads orders to the satellite.
func (sender *Sender) Settle(ctx context.Context, satelliteID storj.NodeID, orders []*Info) {
log := sender.log.Named(satelliteID.String())
err := sender.settle(ctx, log, satelliteID, orders)
if err != nil {
log.Error("failed to settle orders", zap.Error(err))
}
}
func (sender *Sender) settle(ctx context.Context, log *zap.Logger, satelliteID storj.NodeID, orders []*Info) (err error) {
defer mon.Task()(&ctx)(&err)
log.Info("sending", zap.Int("count", len(orders)))
defer log.Info("finished")
satellite, err := sender.kademlia.FindNode(ctx, satelliteID)
if err != nil {
return OrderError.New("unable to find satellite on the network: %v", err)
}
conn, err := sender.transport.DialNode(ctx, &satellite)
if err != nil {
return OrderError.New("unable to connect to the satellite: %v", err)
}
defer func() {
if cerr := conn.Close(); cerr != nil {
err = errs.Combine(err, OrderError.New("failed to close connection: %v", err))
}
}()
client, err := pb.NewOrdersClient(conn).Settlement(ctx)
if err != nil {
return OrderError.New("failed to start settlement: %v", err)
}
var group errgroup.Group
group.Go(func() error {
for _, order := range orders {
err := client.Send(&pb.SettlementRequest{
Limit: order.Limit,
Order: order.Order,
})
if err != nil {
return err
}
}
return client.CloseSend()
})
var errList errs.Group
errHandle := func(cls errs.Class, format string, args ...interface{}) {
log.Sugar().Errorf(format, args...)
errList.Add(cls.New(format, args...))
}
for {
response, err := client.Recv()
if err != nil {
if err == io.EOF {
break
}
errHandle(OrderError, "failed to receive response: %v", err)
break
}
switch response.Status {
case pb.SettlementResponse_ACCEPTED:
err = sender.orders.Archive(ctx, satelliteID, response.SerialNumber, StatusAccepted)
if err != nil {
errHandle(OrderError, "failed to archive order as accepted: serial: %v, %v", response.SerialNumber, err)
}
case pb.SettlementResponse_REJECTED:
err = sender.orders.Archive(ctx, satelliteID, response.SerialNumber, StatusRejected)
if err != nil {
errHandle(OrderError, "failed to archive order as rejected: serial: %v, %v", response.SerialNumber, err)
}
default:
errHandle(OrderError, "unexpected response: %v", response.Status)
}
}
if err := group.Wait(); err != nil {
errHandle(OrderError, "sending aggreements returned an error: %v", err)
}
return errList.Err()
}
// Close stops the sending service.
func (sender *Sender) Close() error {
sender.Loop.Close()
return nil
}