-
Notifications
You must be signed in to change notification settings - Fork 402
/
service.go
141 lines (118 loc) · 3.88 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package audit
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/storj/internal/memory"
"storj.io/storj/internal/sync2"
"storj.io/storj/pkg/identity"
"storj.io/storj/pkg/storj"
"storj.io/storj/pkg/transport"
"storj.io/storj/satellite/metainfo"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
)
// Error is the default audit errs class
var Error = errs.Class("audit error")
// Config contains configurable values for audit service
type Config struct {
MaxRetriesStatDB int `help:"max number of times to attempt updating a statdb batch" default:"3"`
Interval time.Duration `help:"how frequently segments are audited" default:"30s"`
MinBytesPerSecond memory.Size `help:"the minimum acceptable bytes that storage nodes can transfer per second to the satellite" default:"128B"`
MinDownloadTimeout time.Duration `help:"the minimum duration for downloading a share from storage nodes before timing out" default:"25s"`
MaxReverifyCount int `help:"limit above which we consider an audit is failed" default:"3"`
}
// Service helps coordinate Cursor and Verifier to run the audit process continuously
type Service struct {
log *zap.Logger
Cursor *Cursor
Verifier *Verifier
Reporter reporter
Loop sync2.Cycle
}
// NewService instantiates a Service with access to a Cursor and Verifier
func NewService(log *zap.Logger, config Config, metainfo *metainfo.Service,
orders *orders.Service, transport transport.Client, overlay *overlay.Cache,
containment Containment, identity *identity.FullIdentity) (*Service, error) {
return &Service{
log: log,
Cursor: NewCursor(metainfo),
Verifier: NewVerifier(log.Named("audit:verifier"), metainfo, transport, overlay, containment, orders, identity, config.MinBytesPerSecond, config.MinDownloadTimeout),
Reporter: NewReporter(log.Named("audit:reporter"), overlay, containment, config.MaxRetriesStatDB, int32(config.MaxReverifyCount)),
Loop: *sync2.NewCycle(config.Interval),
}, nil
}
// Run runs auditing service
func (service *Service) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
service.log.Info("Audit cron is starting up")
return service.Loop.Run(ctx, func(ctx context.Context) error {
err := service.process(ctx)
if err != nil {
service.log.Error("process", zap.Error(err))
}
return nil
})
}
// Close halts the audit loop
func (service *Service) Close() error {
service.Loop.Close()
return nil
}
// process picks a random stripe and verifies correctness
func (service *Service) process(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
var stripe *Stripe
for {
s, more, err := service.Cursor.NextStripe(ctx)
if err != nil {
return err
}
if s != nil {
stripe = s
break
}
if !more {
return nil
}
}
var errlist errs.Group
report, err := service.Verifier.Reverify(ctx, stripe)
if err != nil {
errlist.Add(err)
}
// TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update
_, err = service.Reporter.RecordAudits(ctx, report)
if err != nil {
errlist.Add(err)
}
// skip all reverified nodes in the next Verify step
skip := make(map[storj.NodeID]bool)
if report != nil {
for _, nodeID := range report.Successes {
skip[nodeID] = true
}
for _, nodeID := range report.Offlines {
skip[nodeID] = true
}
for _, nodeID := range report.Fails {
skip[nodeID] = true
}
for _, pending := range report.PendingAudits {
skip[pending.NodeID] = true
}
}
report, err = service.Verifier.Verify(ctx, stripe, skip)
if err != nil {
errlist.Add(err)
}
// TODO(moby) we need to decide if we want to do something with nodes that the reporter failed to update
_, err = service.Reporter.RecordAudits(ctx, report)
if err != nil {
errlist.Add(err)
}
return errlist.Err()
}