-
Notifications
You must be signed in to change notification settings - Fork 402
/
main.go
347 lines (290 loc) · 10.2 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package main
import (
"context"
"encoding/csv"
"encoding/hex"
"errors"
"os"
"strings"
"github.com/spf13/cobra"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/fpath"
"storj.io/common/peertls/tlsopts"
"storj.io/common/rpc"
"storj.io/common/signing"
"storj.io/common/uuid"
"storj.io/private/cfgstruct"
"storj.io/private/process"
"storj.io/storj/private/revocation"
"storj.io/storj/satellite"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/orders"
"storj.io/storj/satellite/overlay"
"storj.io/storj/satellite/satellitedb"
)
// Satellite defines satellite configuration.
type Satellite struct {
Database string `help:"satellite database connection string" releaseDefault:"postgres://" devDefault:"postgres://"`
satellite.Config
}
var (
rootCmd = &cobra.Command{
Use: "segment-verify",
Short: "segment-verify",
}
runCmd = &cobra.Command{
Use: "run",
Short: "commands to process segments",
}
rangeCmd = &cobra.Command{
Use: "range",
Short: "runs the command on a range of segments",
RunE: verifySegments,
}
bucketsCmd = &cobra.Command{
Use: "buckets",
Short: "runs the command on segments from specified buckets",
RunE: verifySegments,
}
readCSVCmd = &cobra.Command{
Use: "read-csv",
Short: "runs the command on segments from an input CSV file",
RunE: verifySegments,
}
summarizeCmd = &cobra.Command{
Use: "summarize-log",
Short: "summarizes verification log",
Args: cobra.ExactArgs(1),
RunE: summarizeVerificationLog,
}
nodeCheckCmd = &cobra.Command{
Use: "node-check",
Short: "checks segments for too many duplicate or unvetted nodes",
RunE: verifySegmentsNodeCheck,
}
satelliteCfg Satellite
rangeCfg RangeConfig
bucketsCfg BucketConfig
readCSVCfg ReadCSVConfig
nodeCheckCfg NodeCheckConfig
confDir string
identityDir string
)
func init() {
defaultConfDir := fpath.ApplicationDir("storj", "satellite")
defaultIdentityDir := fpath.ApplicationDir("storj", "identity", "satellite")
cfgstruct.SetupFlag(zap.L(), rootCmd, &confDir, "config-dir", defaultConfDir, "main directory for satellite configuration")
cfgstruct.SetupFlag(zap.L(), rootCmd, &identityDir, "identity-dir", defaultIdentityDir, "main directory for satellite identity credentials")
defaults := cfgstruct.DefaultsFlag(rootCmd)
rootCmd.AddCommand(runCmd)
rootCmd.AddCommand(summarizeCmd)
rootCmd.AddCommand(nodeCheckCmd)
runCmd.AddCommand(rangeCmd)
runCmd.AddCommand(bucketsCmd)
runCmd.AddCommand(readCSVCmd)
process.Bind(runCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(rangeCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(rangeCmd, &rangeCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(bucketsCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(bucketsCmd, &bucketsCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(readCSVCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(readCSVCmd, &readCSVCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(nodeCheckCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(nodeCheckCmd, &nodeCheckCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
}
// RangeConfig defines configuration for verifying segment existence.
type RangeConfig struct {
Service ServiceConfig
Verify VerifierConfig
Low string `help:"hex lowest segment id prefix to verify"`
High string `help:"hex highest segment id prefix to verify (excluded)"`
}
// BucketConfig defines configuration for verifying segment existence within a list of buckets.
type BucketConfig struct {
Service ServiceConfig
Verify VerifierConfig
BucketsCSV string `help:"csv file of project_id,bucket_name of buckets to verify" default:""`
}
// ReadCSVConfig defines configuration for verifying existence of specific segments.
type ReadCSVConfig struct {
Service ServiceConfig
Verify VerifierConfig
InputFile string `help:"csv file of segment_id,position for segments to verify"`
}
func verifySegments(cmd *cobra.Command, args []string) error {
ctx, _ := process.Ctx(cmd)
log := zap.L()
// open default satellite database
db, err := satellitedb.Open(ctx, log.Named("db"), satelliteCfg.Database, satellitedb.Options{
ApplicationName: "segment-verify",
SaveRollupBatchSize: satelliteCfg.Tally.SaveRollupBatchSize,
ReadRollupBatchSize: satelliteCfg.Tally.ReadRollupBatchSize,
})
if err != nil {
return errs.New("Error starting master database on satellite: %+v", err)
}
defer func() {
err = errs.Combine(err, db.Close())
}()
// open metabase
metabaseDB, err := metabase.Open(ctx, log.Named("metabase"), satelliteCfg.Metainfo.DatabaseURL,
satelliteCfg.Config.Metainfo.Metabase("segment-verify"))
if err != nil {
return Error.Wrap(err)
}
defer func() { _ = metabaseDB.Close() }()
// check whether satellite and metabase versions match
versionErr := db.CheckVersion(ctx)
if versionErr != nil {
log.Error("versions skewed", zap.Error(versionErr))
return Error.Wrap(versionErr)
}
versionErr = metabaseDB.CheckVersion(ctx)
if versionErr != nil {
log.Error("versions skewed", zap.Error(versionErr))
return Error.Wrap(versionErr)
}
// setup dialer
identity, err := satelliteCfg.Identity.Load()
if err != nil {
log.Error("Failed to load identity.", zap.Error(err))
return errs.New("Failed to load identity: %+v", err)
}
revocationDB, err := revocation.OpenDBFromCfg(ctx, satelliteCfg.Server.Config)
if err != nil {
return errs.New("Error creating revocation database: %+v", err)
}
defer func() {
err = errs.Combine(err, revocationDB.Close())
}()
tlsOptions, err := tlsopts.NewOptions(identity, satelliteCfg.Server.Config, revocationDB)
if err != nil {
return Error.Wrap(err)
}
dialer := rpc.NewDefaultDialer(tlsOptions)
// setup dependencies for verification
overlay, err := overlay.NewService(log.Named("overlay"), db.OverlayCache(), db.NodeEvents(), "", "", satelliteCfg.Overlay)
if err != nil {
return Error.Wrap(err)
}
ordersService, err := orders.NewService(log.Named("orders"), signing.SignerFromFullIdentity(identity), overlay, orders.NewNoopDB(), satelliteCfg.Orders)
if err != nil {
return Error.Wrap(err)
}
var (
verifyConfig VerifierConfig
serviceConfig ServiceConfig
commandFunc func(ctx context.Context, service *Service) error
)
switch cmd.Name() {
case "range":
verifyConfig = rangeCfg.Verify
serviceConfig = rangeCfg.Service
commandFunc = func(ctx context.Context, service *Service) error {
return verifySegmentsRange(ctx, service, rangeCfg)
}
case "buckets":
verifyConfig = bucketsCfg.Verify
serviceConfig = bucketsCfg.Service
commandFunc = func(ctx context.Context, service *Service) error {
return verifySegmentsBuckets(ctx, service, bucketsCfg)
}
case "read-csv":
verifyConfig = readCSVCfg.Verify
serviceConfig = readCSVCfg.Service
commandFunc = func(ctx context.Context, service *Service) error {
return verifySegmentsCSV(ctx, service, readCSVCfg)
}
default:
return errors.New("unknown command: " + cmd.Name())
}
// setup verifier
verifier := NewVerifier(log.Named("verifier"), dialer, ordersService, verifyConfig)
service, err := NewService(log.Named("service"), metabaseDB, verifier, overlay, serviceConfig)
if err != nil {
return Error.Wrap(err)
}
verifier.reportPiece = service.problemPieces.Write
defer func() { err = errs.Combine(err, service.Close()) }()
log.Debug("starting", zap.Any("config", service.config), zap.String("command", cmd.Name()))
return commandFunc(ctx, service)
}
func verifySegmentsRange(ctx context.Context, service *Service, rangeCfg RangeConfig) error {
// parse arguments
var low, high uuid.UUID
lowBytes, err := hex.DecodeString(rangeCfg.Low)
if err != nil {
return Error.Wrap(err)
}
highBytes, err := hex.DecodeString(rangeCfg.High)
if err != nil {
return Error.Wrap(err)
}
copy(low[:], lowBytes)
copy(high[:], highBytes)
if high.IsZero() {
return Error.New("high argument not specified")
}
return service.ProcessRange(ctx, low, high)
}
func verifySegmentsBuckets(ctx context.Context, service *Service, bucketCfg BucketConfig) error {
if bucketsCfg.BucketsCSV == "" {
return Error.New("bucket list file path not provided")
}
bucketList, err := service.ParseBucketFile(bucketsCfg.BucketsCSV)
if err != nil {
return Error.Wrap(err)
}
return service.ProcessBuckets(ctx, bucketList.Buckets)
}
func verifySegmentsCSV(ctx context.Context, service *Service, readCSVCfg ReadCSVConfig) (err error) {
if readCSVCfg.InputFile == "" {
return Error.New("input CSV file not provided")
}
segmentSource, err := OpenSegmentCSVFile(readCSVCfg.InputFile)
if err != nil {
return Error.Wrap(err)
}
defer func() { err = errs.Combine(err, segmentSource.Close()) }()
return service.ProcessSegmentsFromCSV(ctx, segmentSource)
}
func main() {
process.Exec(rootCmd)
}
// ParseBucketFile parses a csv file containing project_id and bucket names.
func (service *Service) ParseBucketFile(path string) (_ BucketList, err error) {
csvFile, err := os.Open(path)
if err != nil {
return BucketList{}, err
}
defer func() {
err = errs.Combine(err, csvFile.Close())
}()
csvReader := csv.NewReader(csvFile)
allEntries, err := csvReader.ReadAll()
if err != nil {
return BucketList{}, err
}
bucketList := BucketList{}
for _, entry := range allEntries {
if len(entry) < 2 {
return BucketList{}, Error.New("unable to parse buckets file: %w", err)
}
projectId, err := projectIdFromCompactString(strings.TrimSpace(entry[0]))
if err != nil {
return BucketList{}, Error.New("unable to parse buckets file: %w", err)
}
bucketList.Add(projectId, strings.TrimSpace(entry[1]))
}
return bucketList, nil
}
func projectIdFromCompactString(s string) (uuid.UUID, error) {
decoded, err := hex.DecodeString(s)
if err != nil {
return uuid.UUID{}, Error.New("invalid string")
}
return uuid.FromBytes(decoded)
}