Skip to content

Commit

Permalink
cmd/tools/segment-verify: adjust to SN Exists endpoint
Browse files Browse the repository at this point in the history
Change-Id: I409aeae29aa87996f2a6047f976d215a69e9d7f5
  • Loading branch information
profclems committed Dec 21, 2022
1 parent 9535444 commit cda1d67
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 45 deletions.
2 changes: 2 additions & 0 deletions cmd/tools/segment-verify/README.md
Expand Up @@ -22,6 +22,8 @@ There are few parameters for controlling the verification itself:
--verify.per-piece-timeout duration duration to wait per piece download (default 800ms)
# Just the regular dialing timeout.
--verify.dial-timeout duration how long to wait for a successful dial (default 2s)
# This allows to specify the minimum node version that has the Exists endpoint.
--verify.version-with-exists string minimum storage node version with implemented Exists method (default "v1.69.2")
```

## Running the tool
Expand Down
38 changes: 36 additions & 2 deletions cmd/tools/segment-verify/process.go
Expand Up @@ -83,15 +83,15 @@ func (service *Service) VerifyBatches(ctx context.Context, batches []*Batch) err
for _, batch := range batches {
batch := batch

nodeURL, err := service.convertAliasToNodeURL(ctx, batch.Alias)
info, err := service.GetNodeInfo(ctx, batch.Alias)
if err != nil {
return Error.Wrap(err)
}

ignoreThrottle := service.priorityNodes.Contains(batch.Alias)

limiter.Go(ctx, func() {
verifiedCount, err := service.verifier.Verify(ctx, batch.Alias, nodeURL, batch.Items, ignoreThrottle)
verifiedCount, err := service.verifier.Verify(ctx, batch.Alias, info.NodeURL, info.Version, batch.Items, ignoreThrottle)
if err != nil {
if ErrNodeOffline.Has(err) {
mu.Lock()
Expand Down Expand Up @@ -143,6 +143,9 @@ func (service *Service) convertAliasToNodeURL(ctx context.Context, alias metabas
return storj.NodeURL{}, Error.Wrap(err)
}

// TODO: single responsibility?
service.nodesVersionMap[alias] = info.Version.Version

nodeURL = storj.NodeURL{
ID: info.Id,
Address: info.Address.Address,
Expand All @@ -152,3 +155,34 @@ func (service *Service) convertAliasToNodeURL(ctx context.Context, alias metabas
}
return nodeURL, nil
}

// NodeInfo contains node information.
type NodeInfo struct {
Version string
NodeURL storj.NodeURL
}

// GetNodeInfo retrieves node information, using a cache if needed.
func (service *Service) GetNodeInfo(ctx context.Context, alias metabase.NodeAlias) (NodeInfo, error) {
nodeURL, err := service.convertAliasToNodeURL(ctx, alias)
if err != nil {
return NodeInfo{}, Error.Wrap(err)
}

version, ok := service.nodesVersionMap[alias]

if !ok {
info, err := service.overlay.Get(ctx, nodeURL.ID)
if err != nil {
return NodeInfo{}, Error.Wrap(err)
}

service.nodesVersionMap[alias] = info.Version.Version
version = info.Version.Version
}

return NodeInfo{
NodeURL: nodeURL,
Version: version,
}, nil
}
24 changes: 13 additions & 11 deletions cmd/tools/segment-verify/service.go
Expand Up @@ -37,7 +37,7 @@ type Metabase interface {

// Verifier verifies a batch of segments.
type Verifier interface {
Verify(ctx context.Context, nodeAlias metabase.NodeAlias, target storj.NodeURL, segments []*Segment, ignoreThrottle bool) (verifiedCount int, err error)
Verify(ctx context.Context, nodeAlias metabase.NodeAlias, target storj.NodeURL, targetVersion string, segments []*Segment, ignoreThrottle bool) (verifiedCount int, err error)
}

// Overlay is used to fetch information about nodes.
Expand Down Expand Up @@ -88,12 +88,13 @@ type Service struct {
verifier Verifier
overlay Overlay

aliasMap *metabase.NodeAliasMap
aliasToNodeURL map[metabase.NodeAlias]storj.NodeURL
priorityNodes NodeAliasSet
onlineNodes NodeAliasSet
offlineCount map[metabase.NodeAlias]int
bucketList BucketList
aliasMap *metabase.NodeAliasMap
aliasToNodeURL map[metabase.NodeAlias]storj.NodeURL
priorityNodes NodeAliasSet
onlineNodes NodeAliasSet
offlineCount map[metabase.NodeAlias]int
bucketList BucketList
nodesVersionMap map[metabase.NodeAlias]string

// this is a callback so that problematic pieces can be reported as they are found,
// rather than being kept in a list which might grow unreasonably large.
Expand Down Expand Up @@ -129,10 +130,11 @@ func NewService(log *zap.Logger, metabaseDB Metabase, verifier Verifier, overlay
verifier: verifier,
overlay: overlay,

aliasToNodeURL: map[metabase.NodeAlias]storj.NodeURL{},
priorityNodes: NodeAliasSet{},
onlineNodes: NodeAliasSet{},
offlineCount: map[metabase.NodeAlias]int{},
aliasToNodeURL: map[metabase.NodeAlias]storj.NodeURL{},
priorityNodes: NodeAliasSet{},
onlineNodes: NodeAliasSet{},
offlineCount: map[metabase.NodeAlias]int{},
nodesVersionMap: map[metabase.NodeAlias]string{},

reportPiece: problemPieces.Write,
}, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/tools/segment-verify/service_test.go
Expand Up @@ -471,7 +471,7 @@ type verifierMock struct {
processed map[storj.NodeID][]*segmentverify.Segment
}

func (v *verifierMock) Verify(ctx context.Context, alias metabase.NodeAlias, target storj.NodeURL, segments []*segmentverify.Segment, _ bool) (int, error) {
func (v *verifierMock) Verify(ctx context.Context, alias metabase.NodeAlias, target storj.NodeURL, targetVersion string, segments []*segmentverify.Segment, _ bool) (int, error) {
v.mu.Lock()
if v.processed == nil {
v.processed = map[storj.NodeID][]*segmentverify.Segment{}
Expand Down
130 changes: 124 additions & 6 deletions cmd/tools/segment-verify/verify.go
Expand Up @@ -8,10 +8,12 @@ import (
"io"
"time"

"github.com/blang/semver"
"github.com/zeebo/errs"
"go.uber.org/zap"

"storj.io/common/errs2"
"storj.io/common/pb"
"storj.io/common/rpc"
"storj.io/common/rpc/rpcpool"
"storj.io/common/rpc/rpcstatus"
Expand All @@ -26,13 +28,16 @@ import (
// ErrNodeOffline is returned when it was not possible to contact a node or the node was not responding.
var ErrNodeOffline = errs.Class("node offline")

var errWrongNodeVersion = errs.Class("wrong node version")

// VerifierConfig contains configurations for operation.
type VerifierConfig struct {
DialTimeout time.Duration `help:"how long to wait for a successful dial" default:"2s"`
PerPieceTimeout time.Duration `help:"duration to wait per piece download" default:"800ms"`
OrderRetryThrottle time.Duration `help:"how much to wait before retrying order creation" default:"50ms"`

RequestThrottle time.Duration `help:"minimum interval for sending out each request" default:"150ms"`
RequestThrottle time.Duration `help:"minimum interval for sending out each request" default:"150ms"`
VersionWithExists string `help:"minimum storage node version with implemented Exists method" default:"v1.69.2"`
}

// NodeVerifier implements segment verification by dialing nodes.
Expand All @@ -45,6 +50,8 @@ type NodeVerifier struct {
orders *orders.Service

reportPiece pieceReporterFunc

versionWithExists semver.Version
}

var _ Verifier = (*NodeVerifier)(nil)
Expand All @@ -62,16 +69,34 @@ func NewVerifier(log *zap.Logger, dialer rpc.Dialer, orders *orders.Service, con
IdleExpiration: 10 * time.Minute,
})

version, err := semver.ParseTolerant(config.VersionWithExists)
if err != nil {
log.Warn("invalid VersionWithExists", zap.String("VersionWithExists", config.VersionWithExists), zap.Error(err))
}

return &NodeVerifier{
log: log,
config: config,
dialer: configuredDialer,
orders: orders,
log: log,
config: config,
dialer: configuredDialer,
orders: orders,
versionWithExists: version,
}
}

// Verify a collection of segments by attempting to download a byte from each segment from the target node.
func (service *NodeVerifier) Verify(ctx context.Context, alias metabase.NodeAlias, target storj.NodeURL, segments []*Segment, ignoreThrottle bool) (verifiedCount int, err error) {
func (service *NodeVerifier) Verify(ctx context.Context, alias metabase.NodeAlias, target storj.NodeURL, targetVersion string, segments []*Segment, ignoreThrottle bool) (verifiedCount int, err error) {
verifiedCount, err = service.VerifyWithExists(ctx, alias, target, targetVersion, segments)
// if Exists method is unimplemented or it is wrong node version fallback to download verification
if !errs2.IsRPC(err, rpcstatus.Unimplemented) && !errWrongNodeVersion.Has(err) {
return verifiedCount, err
}
if err != nil {
service.log.Debug("fallback to download method", zap.Error(err))
err = nil
}

service.log.Debug("verify segments by downloading pieces")

var client *piecestore.Client
defer func() {
if client != nil {
Expand Down Expand Up @@ -212,6 +237,99 @@ func findPieceNum(segment *Segment, alias metabase.NodeAlias) uint16 {
panic("piece number not found")
}

// VerifyWithExists verifies that the segments exist on the specified node by calling the piecestore Exists
// endpoint if the node version supports it.
func (service *NodeVerifier) VerifyWithExists(ctx context.Context, alias metabase.NodeAlias, target storj.NodeURL, targetVersion string, segments []*Segment) (verifiedCount int, err error) {
if service.versionWithExists.String() == "" || targetVersion == "" {
return 0, errWrongNodeVersion.New("missing node version or no base version defined")
}

nodeVersion, err := semver.ParseTolerant(targetVersion)
if err != nil {
return 0, errWrongNodeVersion.Wrap(err)
}

if !nodeVersion.GE(service.versionWithExists) {
return 0, errWrongNodeVersion.New("too old version")
}

service.log.Debug("verify segments using Exists method", zap.Stringer("node-id", target.ID))

var conn *rpc.Conn
var client pb.DRPCPiecestoreClient
defer func() {
if conn != nil {
_ = conn.Close()
}
}()

const maxDials = 2
dialCount := 0

for client == nil {
dialCount++
if dialCount > maxDials {
return 0, ErrNodeOffline.New("too many redials")
}

conn, err := service.dialer.DialNodeURL(rpcpool.WithForceDial(ctx), target)
if err != nil {
service.log.Info("failed to dial node",
zap.Stringer("node-id", target.ID),
zap.Error(err))
} else {
client = pb.NewDRPCPiecestoreClient(conn)
}
}

err = service.verifySegmentsWithExists(ctx, client, alias, target, segments)
if err != nil {
// we could not do the verification, for a reason that implies we won't be able
// to do any more
return 0, Error.Wrap(err)
}

return len(segments), nil
}

// verifySegmentsWithExists calls the Exists endpoint on the specified target node for each segment.
func (service *NodeVerifier) verifySegmentsWithExists(ctx context.Context, client pb.DRPCPiecestoreClient, alias metabase.NodeAlias, target storj.NodeURL, segments []*Segment) (err error) {
pieceIds := make([]storj.PieceID, 0, len(segments))

for _, segment := range segments {
pieceNum := findPieceNum(segment, alias)

pieceId := segment.RootPieceID.Derive(target.ID, int32(pieceNum))
pieceIds = append(pieceIds, pieceId)
}

response, err := client.Exists(ctx, &pb.ExistsRequest{
PieceIds: pieceIds,
})
if err != nil {
return Error.Wrap(err)
}

for index := range pieceIds {
if missing(index, response.Missing) {
segments[index].Status.MarkNotFound()
} else {
segments[index].Status.MarkFound()
}
}

return nil
}

func missing(index int, missing []uint32) bool {
for _, m := range missing {
if uint32(index) == m {
return true
}
}
return false
}

// rateLimiter limits the rate of some type of event. It acts like a token
// bucket, allowing for bursting, as long as the _average_ interval between
// events over the lifetime of the rateLimiter is less than or equal to the
Expand Down

0 comments on commit cda1d67

Please sign in to comment.