Skip to content

Commit

Permalink
satellite/cmd: segment-verify verifies segments in given bucket list
Browse files Browse the repository at this point in the history
Provides the `segment-verify run buckets` command for verifying segments within a list of buckets.

Bucket list is a csv file with `project_id,bucket_name` to be checked.

https://github.com/storj/storj-private/issues/101

Change-Id: I3d25c27b56fcab4a6a1aebb6f87514d6c97de3ff
  • Loading branch information
Fadila82 authored and Storj Robot committed Dec 13, 2022
1 parent 021c98c commit 995f78d
Show file tree
Hide file tree
Showing 4 changed files with 352 additions and 10 deletions.
8 changes: 6 additions & 2 deletions cmd/tools/segment-verify/README.md
Expand Up @@ -25,7 +25,11 @@ There are few parameters for controlling the verification itself:
```

## Running the tool

- by specifying range boundaries:
```
segment-verify run range --low 00 --high ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff --config-dir ./satellite-config-dir
```
```
- by specifying buckets to be checked:
```
segment-verify run buckets --buckets-csv bucket.csv
```
89 changes: 87 additions & 2 deletions cmd/tools/segment-verify/main.go
Expand Up @@ -4,7 +4,12 @@
package main

import (
"context"
"encoding/csv"
"encoding/hex"
"errors"
"os"
"strings"

"github.com/spf13/cobra"
"github.com/zeebo/errs"
Expand Down Expand Up @@ -46,7 +51,13 @@ var (
rangeCmd = &cobra.Command{
Use: "range",
Short: "runs the command on a range of segments",
RunE: verifySegmentsRange,
RunE: verifySegments,
}

bucketsCmd = &cobra.Command{
Use: "buckets",
Short: "runs the command on segments from specified buckets",
RunE: verifySegments,
}

summarizeCmd = &cobra.Command{
Expand All @@ -58,6 +69,7 @@ var (

satelliteCfg Satellite
rangeCfg RangeConfig
bucketsCfg BucketConfig

confDir string
identityDir string
Expand All @@ -73,10 +85,13 @@ func init() {
rootCmd.AddCommand(runCmd)
rootCmd.AddCommand(summarizeCmd)
runCmd.AddCommand(rangeCmd)
runCmd.AddCommand(bucketsCmd)

process.Bind(runCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(rangeCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(rangeCmd, &rangeCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(bucketsCmd, &satelliteCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
process.Bind(bucketsCmd, &bucketsCfg, defaults, cfgstruct.ConfDir(confDir), cfgstruct.IdentityDir(identityDir))
}

// RangeConfig defines configuration for verifying segment existence.
Expand All @@ -88,7 +103,16 @@ type RangeConfig struct {
High string `help:"hex highest segment id prefix to verify (excluded)"`
}

func verifySegmentsRange(cmd *cobra.Command, args []string) error {
// BucketConfig defines configuration for verifying segment existence within a list of buckets.
type BucketConfig struct {
Service ServiceConfig
Verify VerifierConfig

BucketsCSV string `help:"csv file of project_id,bucket_name of buckets to verify" default:""`
}

func verifySegments(cmd *cobra.Command, args []string) error {

ctx, _ := process.Ctx(cmd)
log := zap.L()

Expand Down Expand Up @@ -166,7 +190,16 @@ func verifySegmentsRange(cmd *cobra.Command, args []string) error {
return Error.Wrap(err)
}
defer func() { err = errs.Combine(err, service.Close()) }()
if cmd.Name() == "range" {
return verifySegmentsRange(ctx, service, rangeCfg)
}
if cmd.Name() == "buckets" {
return verifySegmentsBuckets(ctx, service, bucketsCfg)
}
return errors.New("unknown commnand: " + cmd.Name())
}

func verifySegmentsRange(ctx context.Context, service *Service, rangeCfg RangeConfig) error {
// parse arguments
var low, high uuid.UUID

Expand All @@ -189,6 +222,58 @@ func verifySegmentsRange(cmd *cobra.Command, args []string) error {
return service.ProcessRange(ctx, low, high)
}

func verifySegmentsBuckets(ctx context.Context, service *Service, bucketCfg BucketConfig) error {
if bucketsCfg.BucketsCSV == "" {
return Error.New("bucket list file path not provided")
}

bucketList, err := service.ParseBucketFile(bucketsCfg.BucketsCSV)
if err != nil {
return Error.Wrap(err)
}
return service.ProcessBuckets(ctx, bucketList.Buckets)
}

func main() {
process.Exec(rootCmd)
}

// ParseBucketFile parses a csv file containing project_id and bucket names.
func (service *Service) ParseBucketFile(path string) (_ BucketList, err error) {
csvFile, err := os.Open(path)
if err != nil {
return BucketList{}, err
}
defer func() {
err = errs.Combine(err, csvFile.Close())
}()

csvReader := csv.NewReader(csvFile)
allEntries, err := csvReader.ReadAll()
if err != nil {
return BucketList{}, err
}

bucketList := BucketList{}
for _, entry := range allEntries {
if len(entry) < 2 {
return BucketList{}, Error.New("unable to parse buckets file: %w", err)
}

projectId, err := projectIdFromCompactString(strings.TrimSpace(entry[0]))
if err != nil {
return BucketList{}, Error.New("unable to parse buckets file: %w", err)
}
bucketList.Add(projectId, strings.TrimSpace(entry[1]))
}
return bucketList, nil
}

func projectIdFromCompactString(s string) (uuid.UUID, error) {
decoded, err := hex.DecodeString(s)
if err != nil {
return uuid.UUID{}, Error.New("invalid string")
}

return uuid.FromBytes(decoded)
}
116 changes: 116 additions & 0 deletions cmd/tools/segment-verify/service.go
Expand Up @@ -31,6 +31,7 @@ type Metabase interface {
LatestNodesAliasMap(ctx context.Context) (*metabase.NodeAliasMap, error)
GetSegmentByPosition(ctx context.Context, opts metabase.GetSegmentByPosition) (segment metabase.Segment, err error)
ListVerifySegments(ctx context.Context, opts metabase.ListVerifySegments) (result metabase.ListVerifySegmentsResult, err error)
ListBucketsStreamIDs(ctx context.Context, opts metabase.ListBucketsStreamIDs) (result metabase.ListBucketsStreamIDsResult, err error)
}

// Verifier verifies a batch of segments.
Expand Down Expand Up @@ -83,6 +84,7 @@ type Service struct {
priorityNodes NodeAliasSet
onlineNodes NodeAliasSet
offlineCount map[metabase.NodeAlias]int
bucketList BucketList
}

// NewService returns a new service for verifying segments.
Expand Down Expand Up @@ -216,6 +218,19 @@ func (service *Service) parseNodeFile(path string) (NodeAliasSet, error) {
return set, nil
}

// BucketList contains a list of buckets to check segments from.
type BucketList struct {
Buckets []metabase.BucketLocation
}

// Add adds a bucket to the bucket list.
func (list *BucketList) Add(projectID uuid.UUID, bucketName string) {
list.Buckets = append(list.Buckets, metabase.BucketLocation{
ProjectID: projectID,
BucketName: bucketName,
})
}

// ProcessRange processes segments between low and high uuid.UUID with the specified batchSize.
func (service *Service) ProcessRange(ctx context.Context, low, high uuid.UUID) (err error) {
defer mon.Task()(&ctx)(&err)
Expand Down Expand Up @@ -300,6 +315,107 @@ func (service *Service) ProcessRange(ctx context.Context, low, high uuid.UUID) (
}
}

// ProcessBuckets processes segments in buckets with the specified batchSize.
func (service *Service) ProcessBuckets(ctx context.Context, buckets []metabase.BucketLocation) (err error) {
defer mon.Task()(&ctx)(&err)

aliasMap, err := service.metabase.LatestNodesAliasMap(ctx)
if err != nil {
return Error.Wrap(err)
}
service.aliasMap = aliasMap

err = service.loadOnlineNodes(ctx)
if err != nil {
return Error.Wrap(err)
}

err = service.loadPriorityNodes(ctx)
if err != nil {
return Error.Wrap(err)
}

err = service.applyIgnoreNodes(ctx)
if err != nil {
return Error.Wrap(err)
}

var progress int64

cursorBucket := metabase.BucketLocation{}
cursorStreamID := uuid.UUID{}
cursorPosition := metabase.SegmentPosition{} // Convert to struct that contains the status.
segmentsData := make([]Segment, service.config.BatchSize)
segments := make([]*Segment, service.config.BatchSize)
for {

listStreamIDsResult, err := service.metabase.ListBucketsStreamIDs(ctx, metabase.ListBucketsStreamIDs{
BucketList: metabase.ListVerifyBucketList{
Buckets: service.bucketList.Buckets,
},
CursorBucket: cursorBucket,
CursorStreamID: cursorStreamID,
Limit: service.config.BatchSize,

AsOfSystemInterval: service.config.AsOfSystemInterval,
})
if err != nil {
return Error.Wrap(err)
}
for {
// TODO loop for this
result, err := service.metabase.ListVerifySegments(ctx, metabase.ListVerifySegments{
StreamIDs: listStreamIDsResult.StreamIDs,
CursorStreamID: cursorStreamID,
CursorPosition: cursorPosition,
Limit: service.config.BatchSize,

AsOfSystemInterval: service.config.AsOfSystemInterval,
})
if err != nil {
return Error.Wrap(err)
}

// All done?
if len(result.Segments) == 0 {
break
}

segmentsData = segmentsData[:len(result.Segments)]
segments = segments[:len(result.Segments)]

last := &result.Segments[len(result.Segments)-1]
cursorStreamID, cursorPosition = last.StreamID, last.Position

for i := range segments {
segmentsData[i].VerifySegment = result.Segments[i]
segments[i] = &segmentsData[i]
}

service.log.Info("processing segments",
zap.Int64("progress", progress),
zap.Int("count", len(segments)),
zap.Stringer("first", segments[0].StreamID),
zap.Stringer("last", segments[len(segments)-1].StreamID),
)
progress += int64(len(segments))

// Process the data.
err = service.ProcessSegments(ctx, segments)
if err != nil {
return Error.Wrap(err)
}
}

if len(listStreamIDsResult.StreamIDs) == 0 {
return nil
}

cursorBucket = listStreamIDsResult.LastBucket
// TODO remove processed project_ids and bucket_names?
}
}

// ProcessSegments processes a collection of segments.
func (service *Service) ProcessSegments(ctx context.Context, segments []*Segment) (err error) {
defer mon.Task()(&ctx)(&err)
Expand Down

0 comments on commit 995f78d

Please sign in to comment.