-
Notifications
You must be signed in to change notification settings - Fork 402
/
worker.go
162 lines (136 loc) · 4.64 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package audit
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/sync2"
)
// Error is the default audit errs class.
var Error = errs.Class("audit")
// Config contains configurable values for audit chore and workers.
type Config struct {
MaxRetriesStatDB int `help:"max number of times to attempt updating a statdb batch" default:"3"`
MinBytesPerSecond memory.Size `help:"the minimum acceptable bytes that storage nodes can transfer per second to the satellite" default:"128B" testDefault:"1.00 KB"`
MinDownloadTimeout time.Duration `help:"the minimum duration for downloading a share from storage nodes before timing out" default:"5m0s" testDefault:"5s"`
MaxReverifyCount int `help:"limit above which we consider an audit is failed" default:"3"`
ChoreInterval time.Duration `help:"how often to run the reservoir chore" releaseDefault:"24h" devDefault:"1m" testDefault:"$TESTINTERVAL"`
QueueInterval time.Duration `help:"how often to recheck an empty audit queue" releaseDefault:"1h" devDefault:"1m" testDefault:"$TESTINTERVAL"`
Slots int `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"`
WorkerConcurrency int `help:"number of workers to run audits on segments" default:"2"`
}
// Worker contains information for populating audit queue and processing audits.
type Worker struct {
log *zap.Logger
queues *Queues
verifier *Verifier
reporter *Reporter
Loop *sync2.Cycle
limiter *sync2.Limiter
}
// NewWorker instantiates Worker.
func NewWorker(log *zap.Logger, queues *Queues, verifier *Verifier, reporter *Reporter, config Config) (*Worker, error) {
return &Worker{
log: log,
queues: queues,
verifier: verifier,
reporter: reporter,
Loop: sync2.NewCycle(config.QueueInterval),
limiter: sync2.NewLimiter(config.WorkerConcurrency),
}, nil
}
// Run runs audit service 2.0.
func (worker *Worker) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// Wait for all audits to run.
defer worker.limiter.Wait()
return worker.Loop.Run(ctx, func(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
err = worker.process(ctx)
if err != nil {
worker.log.Error("process", zap.Error(Error.Wrap(err)))
}
return nil
})
}
// Close halts the worker.
func (worker *Worker) Close() error {
worker.Loop.Close()
return nil
}
// process repeatedly removes an item from the queue and runs an audit.
func (worker *Worker) process(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
// get the current queue
queue := worker.queues.Fetch()
worker.limiter.Wait()
for {
segment, err := queue.Next()
if err != nil {
if ErrEmptyQueue.Has(err) {
// get a new queue and return if empty; otherwise continue working.
queue = worker.queues.Fetch()
if queue.Size() == 0 {
return nil
}
continue
}
return err
}
worker.limiter.Go(ctx, func() {
err := worker.work(ctx, segment)
if err != nil {
worker.log.Error("audit failed",
zap.String("Segment StreamID", segment.StreamID.String()),
zap.Uint64("Segment Position", segment.Position.Encode()),
zap.Error(err))
}
})
}
}
func (worker *Worker) work(ctx context.Context, segment Segment) (err error) {
defer mon.Task()(&ctx)(&err)
var errlist errs.Group
// First, attempt to reverify nodes for this segment that are in containment mode.
report, err := worker.verifier.Reverify(ctx, segment)
if err != nil {
errlist.Add(err)
}
// TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update
_, err = worker.reporter.RecordAudits(ctx, report)
if err != nil {
errlist.Add(err)
}
// Skip all reverified nodes in the next Verify step.
skip := make(map[storj.NodeID]bool)
for _, nodeID := range report.Successes {
skip[nodeID] = true
}
for _, nodeID := range report.Offlines {
skip[nodeID] = true
}
for _, nodeID := range report.Fails {
skip[nodeID] = true
}
for _, pending := range report.PendingAudits {
skip[pending.NodeID] = true
}
for _, nodeID := range report.Unknown {
skip[nodeID] = true
}
// Next, audit the the remaining nodes that are not in containment mode.
report, err = worker.verifier.Verify(ctx, segment, skip)
if err != nil {
errlist.Add(err)
}
// TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update
_, err = worker.reporter.RecordAudits(ctx, report)
if err != nil {
errlist.Add(err)
}
return errlist.Err()
}