Skip to content

Commit

Permalink
cmd/tools/segment-verify: fix piece id derivation
Browse files Browse the repository at this point in the history
Change-Id: Ib27fd8630e1e5a90060dff2a09c51f488960177f
  • Loading branch information
egonelbre committed Oct 6, 2022
1 parent c8506cd commit ea4b302
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 17 deletions.
2 changes: 1 addition & 1 deletion cmd/tools/segment-verify/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (service *Service) VerifyBatches(ctx context.Context, batches []*Batch) err
ignoreThrottle := service.priorityNodes.Contains(batch.Alias)

limiter.Go(ctx, func() {
err := service.verifier.Verify(ctx, nodeURL, batch.Items, ignoreThrottle)
err := service.verifier.Verify(ctx, batch.Alias, nodeURL, batch.Items, ignoreThrottle)
if err != nil {
if ErrNodeOffline.Has(err) {
mu.Lock()
Expand Down
76 changes: 76 additions & 0 deletions cmd/tools/segment-verify/process_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.

package main_test

import (
"io/ioutil"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"

"storj.io/common/memory"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
segmentverify "storj.io/storj/cmd/tools/segment-verify"
"storj.io/storj/private/testplanet"
)

func TestProcess(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]

verifier := segmentverify.NewVerifier(
planet.Log().Named("verifier"),
satellite.Dialer,
satellite.Orders.Service,
segmentverify.VerifierConfig{
PerPieceTimeout: time.Second,
OrderRetryThrottle: 500 * time.Millisecond,
RequestThrottle: 500 * time.Millisecond,
})

config := segmentverify.ServiceConfig{
NotFoundPath: ctx.File("not-found.csv"),
RetryPath: ctx.File("retry.csv"),
Check: 2,
BatchSize: 4,
Concurrency: 2,
}

service, err := segmentverify.NewService(
planet.Log().Named("process"),
satellite.Metabase.DB,
verifier,
satellite.Overlay.Service,
config)
require.NoError(t, err)

// upload some data
data := testrand.Bytes(8 * memory.KiB)
for _, up := range planet.Uplinks {
for i := 0; i < 10; i++ {
err := up.Upload(ctx, satellite, "bucket1", strconv.Itoa(i), data)
require.NoError(t, err)
}
}

err = service.ProcessRange(ctx, uuid.UUID{}, maxUUID)
require.NoError(t, err)

require.NoError(t, service.Close())

retryCSV, err := ioutil.ReadFile(config.RetryPath)
require.NoError(t, err)
require.Equal(t, "stream id,position,found,not found,retry\n", string(retryCSV))

notFoundCSV, err := ioutil.ReadFile(config.NotFoundPath)
require.NoError(t, err)
require.Equal(t, "stream id,position,found,not found,retry\n", string(notFoundCSV))
})
}
2 changes: 1 addition & 1 deletion cmd/tools/segment-verify/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Metabase interface {

// Verifier verifies a batch of segments.
type Verifier interface {
Verify(ctx context.Context, target storj.NodeURL, segments []*Segment, ignoreThrottle bool) error
Verify(ctx context.Context, nodeAlias metabase.NodeAlias, target storj.NodeURL, segments []*Segment, ignoreThrottle bool) error
}

// Overlay is used to fetch information about nodes.
Expand Down
2 changes: 1 addition & 1 deletion cmd/tools/segment-verify/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ type verifierMock struct {
processed map[storj.NodeID][]*segmentverify.Segment
}

func (v *verifierMock) Verify(ctx context.Context, target storj.NodeURL, segments []*segmentverify.Segment, _ bool) error {
func (v *verifierMock) Verify(ctx context.Context, alias metabase.NodeAlias, target storj.NodeURL, segments []*segmentverify.Segment, _ bool) error {
v.mu.Lock()
if v.processed == nil {
v.processed = map[storj.NodeID][]*segmentverify.Segment{}
Expand Down
22 changes: 17 additions & 5 deletions cmd/tools/segment-verify/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"storj.io/common/rpc/rpcstatus"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/orders"
"storj.io/uplink/private/piecestore"
)
Expand Down Expand Up @@ -60,7 +61,7 @@ func NewVerifier(log *zap.Logger, dialer rpc.Dialer, orders *orders.Service, con
}

// Verify a collection of segments by attempting to download a byte from each segment from the target node.
func (service *NodeVerifier) Verify(ctx context.Context, target storj.NodeURL, segments []*Segment, ignoreThrottle bool) error {
func (service *NodeVerifier) Verify(ctx context.Context, alias metabase.NodeAlias, target storj.NodeURL, segments []*Segment, ignoreThrottle bool) error {
client, err := piecestore.Dial(ctx, service.dialer, target, piecestore.DefaultConfig)
if err != nil {
return ErrNodeOffline.Wrap(err)
Expand All @@ -69,7 +70,7 @@ func (service *NodeVerifier) Verify(ctx context.Context, target storj.NodeURL, s

for i, segment := range segments {
downloadStart := time.Now()
err := service.verifySegment(ctx, client, target, segment)
err := service.verifySegment(ctx, client, alias, target, segment)
if err != nil {
return Error.Wrap(err)
}
Expand All @@ -84,8 +85,10 @@ func (service *NodeVerifier) Verify(ctx context.Context, target storj.NodeURL, s
}

// verifySegment tries to verify the segment by downloading a single byte from the specified segment.
func (service *NodeVerifier) verifySegment(ctx context.Context, client *piecestore.Client, target storj.NodeURL, segment *Segment) error {
limit, piecePrivateKey, _, err := service.orders.CreateAuditOrderLimit(ctx, target.ID, 0, segment.RootPieceID, 1)
func (service *NodeVerifier) verifySegment(ctx context.Context, client *piecestore.Client, alias metabase.NodeAlias, target storj.NodeURL, segment *Segment) error {
pieceNum := findPieceNum(segment, alias)

limit, piecePrivateKey, _, err := service.orders.CreateAuditOrderLimit(ctx, target.ID, pieceNum, segment.RootPieceID, segment.Redundancy.ShareSize)
if err != nil {
service.log.Error("failed to create order limit",
zap.Stringer("retrying in", service.config.OrderRetryThrottle),
Expand All @@ -97,7 +100,7 @@ func (service *NodeVerifier) verifySegment(ctx context.Context, client *piecesto
return Error.Wrap(ctx.Err())
}

limit, piecePrivateKey, _, err = service.orders.CreateAuditOrderLimit(ctx, target.ID, 0, segment.RootPieceID, 1)
limit, piecePrivateKey, _, err = service.orders.CreateAuditOrderLimit(ctx, target.ID, pieceNum, segment.RootPieceID, segment.Redundancy.ShareSize)
if err != nil {
return Error.Wrap(err)
}
Expand Down Expand Up @@ -149,3 +152,12 @@ func (service *NodeVerifier) verifySegment(ctx context.Context, client *piecesto

return nil
}

func findPieceNum(segment *Segment, alias metabase.NodeAlias) uint16 {
for _, p := range segment.AliasPieces {
if p.Alias == alias {
return p.Number
}
}
panic("piece number not found")
}
38 changes: 29 additions & 9 deletions cmd/tools/segment-verify/verify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"storj.io/common/memory"
"storj.io/common/testcontext"
Expand All @@ -21,10 +22,15 @@ import (

func TestVerifier(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 1, UplinkCount: 1,
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: testplanet.ReconfigureRS(4, 4, 4, 4),
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
satellite := planet.Satellites[0]

snoCount := int32(len(planet.StorageNodes))

config := segmentverify.VerifierConfig{
PerPieceTimeout: time.Second,
OrderRetryThrottle: 500 * time.Millisecond,
Expand Down Expand Up @@ -56,18 +62,31 @@ func TestVerifier(t *testing.T) {
for _, raw := range result.Segments {
validSegments = append(validSegments, &segmentverify.Segment{
VerifySegment: raw,
Status: segmentverify.Status{Retry: 1},
Status: segmentverify.Status{Retry: snoCount},
})
}

// expect all segments are found on the node
err = service.Verify(ctx, planet.StorageNodes[0].NodeURL(), validSegments, true)
aliasMap, err := satellite.Metabase.DB.LatestNodesAliasMap(ctx)
require.NoError(t, err)

var g errgroup.Group
for _, node := range planet.StorageNodes {
node := node
alias, ok := aliasMap.Alias(node.ID())
require.True(t, ok)
g.Go(func() error {
return service.Verify(ctx, alias, node.NodeURL(), validSegments, true)
})
}
require.NoError(t, g.Wait())
for _, seg := range validSegments {
require.Equal(t, segmentverify.Status{Found: 1, NotFound: 0, Retry: 0}, seg.Status)
require.Equal(t, segmentverify.Status{Found: snoCount, NotFound: 0, Retry: 0}, seg.Status)
}

// segment not found
alias0, ok := aliasMap.Alias(planet.StorageNodes[0].ID())
require.True(t, ok)

validSegment0 := &segmentverify.Segment{
VerifySegment: result.Segments[0],
Status: segmentverify.Status{Retry: 1},
Expand All @@ -77,7 +96,8 @@ func TestVerifier(t *testing.T) {
StreamID: testrand.UUID(),
Position: metabase.SegmentPosition{},
RootPieceID: testrand.PieceID(),
AliasPieces: metabase.AliasPieces{{Number: 0, Alias: 1}},
Redundancy: result.Segments[0].Redundancy,
AliasPieces: metabase.AliasPieces{{Number: 0, Alias: alias0}},
},
Status: segmentverify.Status{Retry: 1},
}
Expand All @@ -86,7 +106,7 @@ func TestVerifier(t *testing.T) {
Status: segmentverify.Status{Retry: 1},
}

err = service.Verify(ctx, planet.StorageNodes[0].NodeURL(),
err = service.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(),
[]*segmentverify.Segment{validSegment0, missingSegment, validSegment1}, true)
require.NoError(t, err)
require.Equal(t, segmentverify.Status{Found: 1}, validSegment0.Status)
Expand All @@ -96,7 +116,7 @@ func TestVerifier(t *testing.T) {
// Test throttling
verifyStart := time.Now()
const throttleN = 5
err = service.Verify(ctx, planet.StorageNodes[0].NodeURL(), validSegments[:throttleN], false)
err = service.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), validSegments[:throttleN], false)
require.NoError(t, err)
verifyDuration := time.Since(verifyStart)
require.Greater(t, verifyDuration, config.RequestThrottle*(throttleN-1))
Expand All @@ -106,7 +126,7 @@ func TestVerifier(t *testing.T) {
// node offline
err = planet.StopNodeAndUpdate(ctx, planet.StorageNodes[0])
require.NoError(t, err)
err = service.Verify(ctx, planet.StorageNodes[0].NodeURL(), validSegments, true)
err = service.Verify(ctx, alias0, planet.StorageNodes[0].NodeURL(), validSegments, true)
require.Error(t, err)
require.True(t, segmentverify.ErrNodeOffline.Has(err))
})
Expand Down

0 comments on commit ea4b302

Please sign in to comment.