-
Notifications
You must be signed in to change notification settings - Fork 402
/
service.go
301 lines (254 loc) · 8.35 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package contact
import (
"context"
"encoding/base64"
"math/rand"
"strings"
"sync"
"time"
"github.com/gogo/protobuf/proto"
"github.com/spacemonkeygo/monkit/v3"
"github.com/spf13/pflag"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/storagenode/trust"
)
var (
mon = monkit.Package()
// Error is the default error class for contact package.
Error = errs.Class("contact")
errPingSatellite = errs.Class("ping satellite")
)
const initialBackOff = time.Second
// Config contains configurable values for contact service.
type Config struct {
ExternalAddress string `user:"true" help:"the public address of the node, useful for nodes behind NAT" default:""`
// Chore config values
Interval time.Duration `help:"how frequently the node contact chore should run" releaseDefault:"1h" devDefault:"30s"`
Tags SignedTags `help:"protobuf serialized signed node tags in hex (base64) format"`
}
// SignedTags represents base64 encoded signed tags.
type SignedTags pb.SignedNodeTagSets
// Type implements pflag.Value interface.
func (u *SignedTags) Type() string {
return "signedtags"
}
// String implements pflag.Value interface.
func (u *SignedTags) String() string {
if u == nil {
return ""
}
p := pb.SignedNodeTagSets(*u)
raw, err := proto.Marshal(&p)
if err != nil {
return err.Error()
}
return base64.StdEncoding.EncodeToString(raw)
}
// Set implements flag.Value interface.
func (u *SignedTags) Set(s string) error {
p := pb.SignedNodeTagSets{}
for i, part := range strings.Split(s, ",") {
if s == "" {
return nil
}
if u == nil {
return nil
}
raw, err := base64.StdEncoding.DecodeString(part)
if err != nil {
return errs.New("signed tag configuration #%d is not base64 encoded: %s", i+1, s)
}
err = proto.Unmarshal(raw, &p)
if err != nil {
return errs.New("signed tag configuration #%d is not a pb.SignedNodeTagSets{}: %s", i+1, s)
}
u.Tags = append(u.Tags, p.Tags...)
}
return nil
}
var _ pflag.Value = &SignedTags{}
// NodeInfo contains information necessary for introducing storagenode to satellite.
type NodeInfo struct {
ID storj.NodeID
Address string
Version pb.NodeVersion
Capacity pb.NodeCapacity
Operator pb.NodeOperator
NoiseKeyAttestation *pb.NoiseKeyAttestation
DebounceLimit int
FastOpen bool
}
// Service is the contact service between storage nodes and satellites.
type Service struct {
log *zap.Logger
rand *rand.Rand
dialer rpc.Dialer
mu sync.Mutex
self NodeInfo
trust *trust.Pool
quicStats *QUICStats
initialized sync2.Fence
tags *pb.SignedNodeTagSets
}
// NewService creates a new contact service.
func NewService(log *zap.Logger, dialer rpc.Dialer, self NodeInfo, trust *trust.Pool, quicStats *QUICStats, tags *pb.SignedNodeTagSets) *Service {
return &Service{
log: log,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
dialer: dialer,
trust: trust,
self: self,
quicStats: quicStats,
tags: tags,
}
}
// PingSatellites attempts to ping all satellites in trusted list until backoff reaches maxInterval.
func (service *Service) PingSatellites(ctx context.Context, maxInterval time.Duration) (err error) {
defer mon.Task()(&ctx)(&err)
satellites := service.trust.GetSatellites(ctx)
var group errgroup.Group
for _, satellite := range satellites {
satellite := satellite
group.Go(func() error {
return service.pingSatellite(ctx, satellite, maxInterval)
})
}
return group.Wait()
}
func (service *Service) pingSatellite(ctx context.Context, satellite storj.NodeID, maxInterval time.Duration) error {
interval := initialBackOff
attempts := 0
for {
mon.Meter("satellite_contact_request").Mark(1) //mon:locked
err := service.pingSatelliteOnce(ctx, satellite)
attempts++
if err == nil {
return nil
}
service.log.Error("ping satellite failed ", zap.Stringer("Satellite ID", satellite), zap.Int("attempts", attempts), zap.Error(err))
// Sleeps until interval times out, then continue. Returns if context is cancelled.
if !sync2.Sleep(ctx, interval) {
service.log.Info("context cancelled", zap.Stringer("Satellite ID", satellite))
return nil
}
interval *= 2
if interval >= maxInterval {
service.log.Info("retries timed out for this cycle", zap.Stringer("Satellite ID", satellite))
return nil
}
}
}
func (service *Service) pingSatelliteOnce(ctx context.Context, id storj.NodeID) (err error) {
defer mon.Task()(&ctx, id)(&err)
conn, err := service.dialSatellite(ctx, id)
if err != nil {
return errPingSatellite.Wrap(err)
}
defer func() { err = errs.Combine(err, conn.Close()) }()
self := service.Local()
var features uint64
if self.FastOpen {
features |= uint64(pb.NodeAddress_TCP_FASTOPEN_ENABLED)
}
resp, err := pb.NewDRPCNodeClient(conn).CheckIn(ctx, &pb.CheckInRequest{
Address: self.Address,
Version: &self.Version,
Capacity: &self.Capacity,
Operator: &self.Operator,
NoiseKeyAttestation: self.NoiseKeyAttestation,
DebounceLimit: int32(self.DebounceLimit),
Features: features,
SignedTags: service.tags,
})
service.quicStats.SetStatus(false)
if err != nil {
return errPingSatellite.Wrap(err)
}
if resp != nil {
service.quicStats.SetStatus(resp.PingNodeSuccessQuic)
if !resp.PingNodeSuccess {
return errPingSatellite.New("%s", resp.PingErrorMessage)
}
}
if resp.PingErrorMessage != "" {
service.log.Warn("Your node is still considered to be online but encountered an error.", zap.Stringer("Satellite ID", id), zap.String("Error", resp.GetPingErrorMessage()))
}
return nil
}
// RequestPingMeQUIC sends pings request to satellite for a pingBack via QUIC.
func (service *Service) RequestPingMeQUIC(ctx context.Context) (stats *QUICStats, err error) {
defer mon.Task()(&ctx)(&err)
stats = NewQUICStats(true)
satellites := service.trust.GetSatellites(ctx)
if len(satellites) < 1 {
return nil, errPingSatellite.New("no trusted satellite available")
}
// Shuffle the satellites
// All the Storagenodes get a default list of trusted satellites (The Storj DCS ones) and
// most of the SN operators don't change the list, hence if it always starts with
// the same satellite we are going to put always more pressure on the first trusted
// satellite on the list. So we iterate over the list of trusted satellites in a
// random order to avoid putting pressure on the first trusted on the list
service.rand.Shuffle(len(satellites), func(i, j int) {
satellites[i], satellites[j] = satellites[j], satellites[i]
})
for _, satellite := range satellites {
err = service.requestPingMeOnce(ctx, satellite)
if err != nil {
stats.SetStatus(false)
// log warning and try the next trusted satellite
service.log.Warn("failed PingMe request to satellite", zap.Stringer("Satellite ID", satellite), zap.Error(err))
continue
}
stats.SetStatus(true)
return stats, nil
}
return stats, errPingSatellite.New("failed to ping storage node using QUIC: %q", err)
}
func (service *Service) requestPingMeOnce(ctx context.Context, satellite storj.NodeID) (err error) {
defer mon.Task()(&ctx, satellite)(&err)
conn, err := service.dialSatellite(ctx, satellite)
if err != nil {
return errPingSatellite.Wrap(err)
}
defer func() { err = errs.Combine(err, conn.Close()) }()
node := service.Local()
_, err = pb.NewDRPCNodeClient(conn).PingMe(ctx, &pb.PingMeRequest{
Address: node.Address,
Transport: pb.NodeTransport_QUIC_RPC,
})
if err != nil {
return errPingSatellite.Wrap(err)
}
return nil
}
func (service *Service) dialSatellite(ctx context.Context, id storj.NodeID) (*rpc.Conn, error) {
nodeurl, err := service.trust.GetNodeURL(ctx, id)
if err != nil {
return nil, errPingSatellite.Wrap(err)
}
return service.dialer.DialNodeURL(ctx, nodeurl)
}
// Local returns the storagenode info.
func (service *Service) Local() NodeInfo {
service.mu.Lock()
defer service.mu.Unlock()
return service.self
}
// UpdateSelf updates the local node with the capacity.
func (service *Service) UpdateSelf(capacity *pb.NodeCapacity) {
service.mu.Lock()
defer service.mu.Unlock()
if capacity != nil {
service.self.Capacity = *capacity
}
service.initialized.Release()
}