-
Notifications
You must be signed in to change notification settings - Fork 390
/
cache.go
363 lines (292 loc) · 11.7 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package overlay
import (
"context"
"errors"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/pkg/pb"
"storj.io/storj/pkg/storj"
"storj.io/storj/storage"
)
const (
// OnlineWindow is the maximum amount of time that can pass without seeing a node before that node is considered offline
OnlineWindow = 1 * time.Hour
)
// ErrEmptyNode is returned when the nodeID is empty
var ErrEmptyNode = errs.New("empty node ID")
// ErrNodeNotFound is returned if a node does not exist in database
var ErrNodeNotFound = errs.Class("node not found")
// ErrBucketNotFound is returned if a bucket is unable to be found in the routing table
var ErrBucketNotFound = errs.New("bucket not found")
// ErrNotEnoughNodes is when selecting nodes failed with the given parameters
var ErrNotEnoughNodes = errs.Class("not enough nodes")
// OverlayError creates class of errors for stack traces
var OverlayError = errs.Class("overlay error")
// DB implements the database for overlay.Cache
type DB interface {
// SelectStorageNodes looks up nodes based on criteria
SelectStorageNodes(ctx context.Context, count int, criteria *NodeCriteria) ([]*pb.Node, error)
// SelectNewStorageNodes looks up nodes based on new node criteria
SelectNewStorageNodes(ctx context.Context, count int, criteria *NewNodeCriteria) ([]*pb.Node, error)
// Get looks up the node by nodeID
Get(ctx context.Context, nodeID storj.NodeID) (*NodeDossier, error)
// GetAll looks up nodes based on the ids from the overlay cache
GetAll(ctx context.Context, nodeIDs storj.NodeIDList) ([]*NodeDossier, error)
// List lists nodes starting from cursor
List(ctx context.Context, cursor storj.NodeID, limit int) ([]*NodeDossier, error)
// Paginate will page through the database nodes
Paginate(ctx context.Context, offset int64, limit int) ([]*NodeDossier, bool, error)
// Update updates node information
Update(ctx context.Context, value *pb.Node) error
// CreateStats initializes the stats for node.
CreateStats(ctx context.Context, nodeID storj.NodeID, initial *NodeStats) (stats *NodeStats, err error)
// FindInvalidNodes finds a subset of storagenodes that have stats below provided reputation requirements.
FindInvalidNodes(ctx context.Context, nodeIDs storj.NodeIDList, maxStats *NodeStats) (invalid storj.NodeIDList, err error)
// UpdateStats all parts of single storagenode's stats.
UpdateStats(ctx context.Context, request *UpdateRequest) (stats *NodeStats, err error)
// UpdateNodeInfo updates node dossier with info requested from the node itself like node type, email, wallet, capacity, and version.
UpdateNodeInfo(ctx context.Context, node storj.NodeID, nodeInfo *pb.InfoResponse) (stats *NodeDossier, err error)
// UpdateUptime updates a single storagenode's uptime stats.
UpdateUptime(ctx context.Context, nodeID storj.NodeID, isUp bool) (stats *NodeStats, err error)
}
// FindStorageNodesRequest defines easy request parameters.
type FindStorageNodesRequest struct {
MinimumRequiredNodes int
RequestedCount int
FreeBandwidth int64
FreeDisk int64
ExcludedNodes []storj.NodeID
MinimumVersion string // semver or empty
}
// NodeCriteria are the requirements for selecting nodes
type NodeCriteria struct {
FreeBandwidth int64
FreeDisk int64
AuditCount int64
AuditSuccessRatio float64
UptimeCount int64
UptimeSuccessRatio float64
Excluded []storj.NodeID
MinimumVersion string // semver or empty
}
// NewNodeCriteria are the requirement for selecting new nodes
type NewNodeCriteria struct {
FreeBandwidth int64
FreeDisk int64
AuditThreshold int64
Excluded []storj.NodeID
MinimumVersion string // semver or empty
}
// UpdateRequest is used to update a node status.
type UpdateRequest struct {
NodeID storj.NodeID
AuditSuccess bool
IsUp bool
}
// NodeDossier is the complete info that the satellite tracks for a storage node
type NodeDossier struct {
pb.Node
Type pb.NodeType
Operator pb.NodeOperator
Capacity pb.NodeCapacity
Reputation NodeStats
Version pb.NodeVersion
}
// Online checks if a node is online based on the collected statistics.
//
// A node is considered online if the last attempt for contact was successful
// and it was within the last hour.
func (node *NodeDossier) Online() bool {
return time.Now().Sub(node.Reputation.LastContactSuccess) < OnlineWindow &&
node.Reputation.LastContactSuccess.After(node.Reputation.LastContactFailure)
}
// NodeStats contains statistics about a node.
type NodeStats struct {
Latency90 int64
AuditSuccessRatio float64
AuditSuccessCount int64
AuditCount int64
UptimeRatio float64
UptimeSuccessCount int64
UptimeCount int64
LastContactSuccess time.Time
LastContactFailure time.Time
}
// Cache is used to store and handle node information
type Cache struct {
log *zap.Logger
db DB
preferences NodeSelectionConfig
}
// NewCache returns a new Cache
func NewCache(log *zap.Logger, db DB, preferences NodeSelectionConfig) *Cache {
return &Cache{
log: log,
db: db,
preferences: preferences,
}
}
// Close closes resources
func (cache *Cache) Close() error { return nil }
// Inspect lists limited number of items in the cache
func (cache *Cache) Inspect(ctx context.Context) (storage.Keys, error) {
// TODO: implement inspection tools
return nil, errors.New("not implemented")
}
// List returns a list of nodes from the cache DB
func (cache *Cache) List(ctx context.Context, cursor storj.NodeID, limit int) (_ []*NodeDossier, err error) {
defer mon.Task()(&ctx)(&err)
return cache.db.List(ctx, cursor, limit)
}
// Paginate returns a list of `limit` nodes starting from `start` offset.
func (cache *Cache) Paginate(ctx context.Context, offset int64, limit int) (_ []*NodeDossier, _ bool, err error) {
defer mon.Task()(&ctx)(&err)
return cache.db.Paginate(ctx, offset, limit)
}
// Get looks up the provided nodeID from the overlay cache
func (cache *Cache) Get(ctx context.Context, nodeID storj.NodeID) (_ *NodeDossier, err error) {
defer mon.Task()(&ctx)(&err)
if nodeID.IsZero() {
return nil, ErrEmptyNode
}
return cache.db.Get(ctx, nodeID)
}
// OfflineNodes returns indices of the nodes that are offline
func (cache *Cache) OfflineNodes(ctx context.Context, nodes []storj.NodeID) (offline []int, err error) {
defer mon.Task()(&ctx)(&err)
// TODO: optimize
results, err := cache.GetAll(ctx, nodes)
if err != nil {
return nil, err
}
for i, r := range results {
if r == nil || !r.Online() {
offline = append(offline, i)
}
}
return offline, nil
}
// FindStorageNodes searches the overlay network for nodes that meet the provided requirements
func (cache *Cache) FindStorageNodes(ctx context.Context, req FindStorageNodesRequest) ([]*pb.Node, error) {
return cache.FindStorageNodesWithPreferences(ctx, req, &cache.preferences)
}
// FindStorageNodesWithPreferences searches the overlay network for nodes that meet the provided criteria
func (cache *Cache) FindStorageNodesWithPreferences(ctx context.Context, req FindStorageNodesRequest, preferences *NodeSelectionConfig) (_ []*pb.Node, err error) {
defer mon.Task()(&ctx)(&err)
// TODO: verify logic
// TODO: add sanity limits to requested node count
// TODO: add sanity limits to excluded nodes
reputableNodeCount := req.MinimumRequiredNodes
if reputableNodeCount <= 0 {
reputableNodeCount = req.RequestedCount
}
auditCount := preferences.AuditCount
if auditCount < preferences.NewNodeAuditThreshold {
auditCount = preferences.NewNodeAuditThreshold
}
reputableNodes, err := cache.db.SelectStorageNodes(ctx, reputableNodeCount, &NodeCriteria{
FreeBandwidth: req.FreeBandwidth,
FreeDisk: req.FreeDisk,
AuditCount: auditCount,
AuditSuccessRatio: preferences.AuditSuccessRatio,
UptimeCount: preferences.UptimeCount,
UptimeSuccessRatio: preferences.UptimeRatio,
Excluded: req.ExcludedNodes,
MinimumVersion: preferences.MinimumVersion,
})
if err != nil {
return nil, err
}
newNodeCount := int64(float64(reputableNodeCount) * preferences.NewNodePercentage)
newNodes, err := cache.db.SelectNewStorageNodes(ctx, int(newNodeCount), &NewNodeCriteria{
FreeBandwidth: req.FreeBandwidth,
FreeDisk: req.FreeDisk,
AuditThreshold: preferences.NewNodeAuditThreshold,
Excluded: req.ExcludedNodes,
MinimumVersion: preferences.MinimumVersion,
})
if err != nil {
return nil, err
}
nodes := []*pb.Node{}
nodes = append(nodes, newNodes...)
nodes = append(nodes, reputableNodes...)
if len(reputableNodes) < reputableNodeCount {
return nodes, ErrNotEnoughNodes.New("requested %d found %d", reputableNodeCount, len(reputableNodes))
}
return nodes, nil
}
// GetAll looks up the provided ids from the overlay cache
func (cache *Cache) GetAll(ctx context.Context, ids storj.NodeIDList) (_ []*NodeDossier, err error) {
defer mon.Task()(&ctx)(&err)
if len(ids) == 0 {
return nil, OverlayError.New("no ids provided")
}
return cache.db.GetAll(ctx, ids)
}
// Put adds a node id and proto definition into the overlay cache and stat db
func (cache *Cache) Put(ctx context.Context, nodeID storj.NodeID, value pb.Node) (err error) {
defer mon.Task()(&ctx)(&err)
// If we get a Node without an ID (i.e. bootstrap node)
// we don't want to add to the routing tbale
if nodeID.IsZero() {
return nil
}
if nodeID != value.Id {
return errors.New("invalid request")
}
return cache.db.Update(ctx, &value)
}
// Create adds a new stats entry for node.
func (cache *Cache) Create(ctx context.Context, nodeID storj.NodeID, initial *NodeStats) (stats *NodeStats, err error) {
defer mon.Task()(&ctx)(&err)
return cache.db.CreateStats(ctx, nodeID, initial)
}
// FindInvalidNodes finds a subset of storagenodes that have stats below provided reputation requirements.
func (cache *Cache) FindInvalidNodes(ctx context.Context, nodeIDs storj.NodeIDList, maxStats *NodeStats) (invalid storj.NodeIDList, err error) {
defer mon.Task()(&ctx)(&err)
return cache.db.FindInvalidNodes(ctx, nodeIDs, maxStats)
}
// UpdateStats all parts of single storagenode's stats.
func (cache *Cache) UpdateStats(ctx context.Context, request *UpdateRequest) (stats *NodeStats, err error) {
defer mon.Task()(&ctx)(&err)
return cache.db.UpdateStats(ctx, request)
}
// UpdateNodeInfo updates node dossier with info requested from the node itself like node type, email, wallet, capacity, and version.
func (cache *Cache) UpdateNodeInfo(ctx context.Context, node storj.NodeID, nodeInfo *pb.InfoResponse) (stats *NodeDossier, err error) {
defer mon.Task()(&ctx)(&err)
return cache.db.UpdateNodeInfo(ctx, node, nodeInfo)
}
// UpdateUptime updates a single storagenode's uptime stats.
func (cache *Cache) UpdateUptime(ctx context.Context, nodeID storj.NodeID, isUp bool) (stats *NodeStats, err error) {
defer mon.Task()(&ctx)(&err)
return cache.db.UpdateUptime(ctx, nodeID, isUp)
}
// ConnFailure implements the Transport Observer `ConnFailure` function
func (cache *Cache) ConnFailure(ctx context.Context, node *pb.Node, failureError error) {
var err error
defer mon.Task()(&ctx)(&err)
// TODO: Kademlia paper specifies 5 unsuccessful PINGs before removing the node
// from our routing table, but this is the cache so maybe we want to treat
// it differently.
_, err = cache.db.UpdateUptime(ctx, node.Id, false)
if err != nil {
zap.L().Debug("error updating uptime for node", zap.Error(err))
}
}
// ConnSuccess implements the Transport Observer `ConnSuccess` function
func (cache *Cache) ConnSuccess(ctx context.Context, node *pb.Node) {
var err error
defer mon.Task()(&ctx)(&err)
err = cache.Put(ctx, node.Id, *node)
if err != nil {
zap.L().Debug("error updating uptime for node", zap.Error(err))
}
_, err = cache.db.UpdateUptime(ctx, node.Id, true)
if err != nil {
zap.L().Debug("error updating node connection info", zap.Error(err))
}
}