-
Notifications
You must be signed in to change notification settings - Fork 402
/
reverifyworker.go
114 lines (95 loc) · 3.19 KB
/
reverifyworker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package audit
import (
"context"
"time"
"go.uber.org/zap"
"storj.io/common/sync2"
)
// ReverifyWorker processes reverifications (retrying piece audits against nodes that timed out
// during a Verification).
type ReverifyWorker struct {
log *zap.Logger
queue ReverifyQueue
reverifier *Reverifier
reporter Reporter
Loop *sync2.Cycle
concurrency int
retryInterval time.Duration
}
// NewReverifyWorker creates a new ReverifyWorker.
func NewReverifyWorker(log *zap.Logger, queue ReverifyQueue, reverifier *Reverifier, reporter Reporter, config Config) *ReverifyWorker {
return &ReverifyWorker{
log: log,
queue: queue,
reverifier: reverifier,
reporter: reporter,
Loop: sync2.NewCycle(config.QueueInterval),
concurrency: config.ReverifyWorkerConcurrency,
retryInterval: config.ReverificationRetryInterval,
}
}
// Run runs a ReverifyWorker.
func (worker *ReverifyWorker) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
return worker.Loop.Run(ctx, func(ctx context.Context) (err error) {
err = worker.process(ctx)
if err != nil {
worker.log.Error("failure processing reverify queue", zap.Error(Error.Wrap(err)))
}
return nil
})
}
func (worker *ReverifyWorker) process(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
limiter := sync2.NewLimiter(worker.concurrency)
defer limiter.Wait()
for {
// We start the timeout clock _before_ pulling the next job from
// the queue. This gives us the best chance of having this worker
// terminate and get cleaned up before another reverification
// worker tries to take over the job.
//
// (If another worker does take over the job before this worker
// has been cleaned up, it is ok; the downside should only be
// duplication of work and monkit stats.)
ctx, cancel := context.WithTimeout(ctx, worker.retryInterval)
reverifyJob, err := worker.queue.GetNextJob(ctx, worker.retryInterval)
if err != nil {
cancel()
if ErrEmptyQueue.Has(err) {
return nil
}
return err
}
started := limiter.Go(ctx, func() {
defer cancel()
logger := worker.log.With(
zap.Stringer("Segment StreamID", reverifyJob.Locator.StreamID),
zap.Uint64("Segment Position", reverifyJob.Locator.Position.Encode()),
zap.Stringer("Node ID", reverifyJob.Locator.NodeID),
zap.Int("Piece Number", reverifyJob.Locator.PieceNum))
worker.work(ctx, logger, reverifyJob)
})
if !started {
cancel()
return ctx.Err()
}
}
}
func (worker *ReverifyWorker) work(ctx context.Context, logger *zap.Logger, job *ReverificationJob) {
defer mon.Task()(&ctx)(nil)
logger.Debug("beginning piecewise audit")
outcome, reputation := worker.reverifier.ReverifyPiece(ctx, logger, &job.Locator)
logger.Debug("piecewise audit complete", zap.Int("outcome", int(outcome)))
err := worker.reporter.RecordReverificationResult(ctx, job, outcome, reputation)
if err != nil {
logger.Error("finished with audit, but failed to remove entry from queue", zap.Error(err))
}
}
// Close halts the worker.
func (worker *ReverifyWorker) Close() error {
worker.Loop.Close()
return nil
}