-
Notifications
You must be signed in to change notification settings - Fork 390
/
observer.go
325 lines (273 loc) · 9 KB
/
observer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package durability
import (
"context"
"fmt"
"time"
"github.com/jtolio/eventkit"
"github.com/zeebo/errs"
"golang.org/x/exp/slices"
"storj.io/common/storj"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/nodeselection"
"storj.io/storj/satellite/overlay"
)
var ek = eventkit.Package()
// HealthStat collects the availability conditions for one class (for example: nodes with the same owner).
type HealthStat struct {
// because 0 means uninitialized, we store the min +1
minPlusOne int
Exemplar string
}
// Update updates the stat with one measurement: number of pieces which are available even without the nodes of the selected class.
// Exemplar is one example identifier with such measurement. Useful to dig deeper, based on this one example.
func (h *HealthStat) Update(num int, exemplar string) {
if num < h.minPlusOne-1 || h.minPlusOne == 0 {
h.minPlusOne = num + 1
h.Exemplar = exemplar
}
}
// Merge can merge two stat to one, without losing information.
func (h *HealthStat) Merge(stat *HealthStat) {
if stat.minPlusOne < h.minPlusOne && stat.minPlusOne > 0 {
h.minPlusOne = stat.minPlusOne
h.Exemplar = stat.Exemplar
}
}
// Min returns the minimal number.
func (h *HealthStat) Min() int {
return h.minPlusOne - 1
}
// Unused returns true when stat is uninitialized (-1) and was not updated with any number.
func (h *HealthStat) Unused() bool {
return h.minPlusOne == 0
}
// NodeClassifier identifies a risk class (for example an owner, or country) of the SelectedNode.
type NodeClassifier func(node *nodeselection.SelectedNode) string
// ReportConfig configures durability report.
type ReportConfig struct {
Enabled bool `help:"whether to enable durability report (rangedloop observer)" default:"true"`
}
// Report is a calculator (rangloop.Observer) which checks the availability of pieces without certain nodes.
// It can answer the following question:
// 1. loosing a given group of nodes (all nodes of one country or all nodes of one owner)...
// 2. what will be the lowest humber of healhty pieces, checking all the segments.
//
// Example: we have one segment where 80 pieces are stored, but 42 of them are in Germany.
//
// in this case this reporter will return 38 for the class "country:DE" (assuming all the other segments are more lucky).
type Report struct {
healthStat map[string]*HealthStat
busFactor HealthStat
classifier NodeClassifier
aliasMap *metabase.NodeAliasMap
nodes map[storj.NodeID]*nodeselection.SelectedNode
db overlay.DB
metabaseDB *metabase.DB
reporter func(n time.Time, class string, value string, stat *HealthStat)
reportThreshold int
busFactorThreshold int
asOfSystemInterval time.Duration
// map between classes (like "country:hu" and integer IDs)
className map[classID]string
// contains the available classes for each node alias.
classified []classID
maxPieceCount int
class string
}
// NewDurability creates the new instance.
func NewDurability(db overlay.DB, metabaseDB *metabase.DB, class string, classifier NodeClassifier, maxPieceCount int, reportThreshold int, busFactorThreshold int, asOfSystemInterval time.Duration) *Report {
return &Report{
class: class,
db: db,
metabaseDB: metabaseDB,
classifier: classifier,
reportThreshold: reportThreshold,
busFactorThreshold: busFactorThreshold,
asOfSystemInterval: asOfSystemInterval,
nodes: make(map[storj.NodeID]*nodeselection.SelectedNode),
healthStat: make(map[string]*HealthStat),
reporter: reportToEventkit,
maxPieceCount: maxPieceCount,
}
}
// Start implements rangedloop.Observer.
func (c *Report) Start(ctx context.Context, startTime time.Time) error {
nodes, err := c.db.GetParticipatingNodes(ctx, -12*time.Hour, c.asOfSystemInterval)
if err != nil {
return errs.Wrap(err)
}
c.nodes = map[storj.NodeID]*nodeselection.SelectedNode{}
for ix := range nodes {
c.nodes[nodes[ix].ID] = &nodes[ix]
}
aliasMap, err := c.metabaseDB.LatestNodesAliasMap(ctx)
if err != nil {
return errs.Wrap(err)
}
c.aliasMap = aliasMap
c.resetStat()
c.classifyNodeAliases()
return nil
}
func (c *Report) resetStat() {
c.healthStat = make(map[string]*HealthStat)
c.busFactor = HealthStat{}
}
func (c *Report) classifyNodeAliases() {
classes := make(map[string]classID)
c.className = make(map[classID]string)
classes["unclassified"] = 0
c.className[0] = "unclassified"
c.classified = make([]classID, c.aliasMap.Max()+1)
for _, node := range c.nodes {
alias, ok := c.aliasMap.Alias(node.ID)
if !ok {
continue
}
class := c.classifier(node)
id, ok := classes[class]
if !ok {
id = classID(len(classes))
c.className[id] = class
classes[class] = id
}
c.classified[alias] = id
}
}
// Fork implements rangedloop.Observer.
func (c *Report) Fork(ctx context.Context) (rangedloop.Partial, error) {
d := &ObserverFork{
aliasMap: c.aliasMap,
nodes: c.nodes,
classifierCache: make([][]string, c.aliasMap.Max()+1),
reportThreshold: c.reportThreshold,
healthStat: make([]HealthStat, len(c.className)),
controlledByClassCache: make([]int32, len(c.className)),
busFactorCache: make([]int32, 0, c.maxPieceCount),
classified: c.classified,
}
return d, nil
}
// Join implements rangedloop.Observer.
func (c *Report) Join(ctx context.Context, partial rangedloop.Partial) (err error) {
defer mon.Task()(&ctx)(&err)
fork := partial.(*ObserverFork)
for cid, stat := range fork.healthStat {
if stat.Unused() {
continue
}
name := c.className[classID(cid)]
existing, found := c.healthStat[name]
if !found {
c.healthStat[name] = &HealthStat{
minPlusOne: stat.minPlusOne,
Exemplar: stat.Exemplar,
}
} else {
existing.Merge(&stat)
}
}
c.busFactor.Update(fork.busFactor.Min(), fork.busFactor.Exemplar)
return nil
}
// Finish implements rangedloop.Observer.
func (c *Report) Finish(ctx context.Context) error {
reportTime := time.Now()
for name, stat := range c.healthStat {
c.reporter(reportTime, c.class, name, stat)
}
c.reporter(reportTime, "bus_factor", c.class, &c.busFactor)
return nil
}
// TestChangeReporter modifies the reporter for unit tests.
func (c *Report) TestChangeReporter(r func(n time.Time, class string, name string, stat *HealthStat)) {
c.reporter = r
}
// GetClass return with the class instance name (like last_net or country).
func (c *Report) GetClass() string {
return c.class
}
// classID is a fork level short identifier for each class.
type classID int32
// ObserverFork is the durability calculator for each segment range.
type ObserverFork struct {
controlledByClassCache []int32
healthStat []HealthStat
busFactor HealthStat
aliasMap *metabase.NodeAliasMap
nodes map[storj.NodeID]*nodeselection.SelectedNode
classifierCache [][]string
busFactorCache []int32
reportThreshold int
busFactorThreshold int
classified []classID
}
// Process implements rangedloop.Partial.
func (c *ObserverFork) Process(ctx context.Context, segments []rangedloop.Segment) (err error) {
controlledByClass := c.controlledByClassCache
for i := range segments {
s := &segments[i]
if s.Inline() {
continue
}
healthyPieceCount := 0
for _, piece := range s.AliasPieces {
if len(c.classified) <= int(piece.Alias) {
// this is a new node, but we can ignore it.
// will be included in the next execution cycle.
continue
}
class := c.classified[piece.Alias]
// unavailable/offline nodes were not classified
if class == 0 {
continue
}
healthyPieceCount++
controlledByClass[class]++
}
busFactorGroups := c.busFactorCache
streamLocation := fmt.Sprintf("%s/%d", s.StreamID, s.Position.Encode())
for classID, count := range controlledByClass {
if count == 0 {
continue
}
// reset the value for the next iteration
controlledByClass[classID] = 0
diff := healthyPieceCount - int(count)
busFactorGroups = append(busFactorGroups, count)
if c.reportThreshold > 0 && diff > c.reportThreshold {
continue
}
c.healthStat[classID].Update(diff, streamLocation)
}
slices.SortFunc[int32](busFactorGroups, func(a int32, b int32) bool {
return a > b
})
rollingSum := 0
busFactor := 0
for _, count := range busFactorGroups {
if rollingSum < c.busFactorThreshold {
busFactor++
rollingSum += int(count)
} else {
break
}
}
c.busFactor.Update(busFactor, streamLocation)
}
return nil
}
func reportToEventkit(n time.Time, class string, name string, stat *HealthStat) {
ek.Event("durability",
eventkit.String("class", class),
eventkit.String("value", name),
eventkit.String("exemplar", stat.Exemplar),
eventkit.Timestamp("report_time", n),
eventkit.Int64("min", int64(stat.Min())),
)
}
var _ rangedloop.Observer = &Report{}
var _ rangedloop.Partial = &ObserverFork{}