Skip to content

Commit

Permalink
satellite/metabase: IterateLoopSegments accepts ranges
Browse files Browse the repository at this point in the history
Fixes: #5207

Change-Id: I7872696068320987825de2d381f57ea503736e89
  • Loading branch information
Fadila82 authored and Storj Robot committed Oct 13, 2022
1 parent fac638f commit 35f74b7
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 8 deletions.
47 changes: 39 additions & 8 deletions satellite/metabase/loop.go
Expand Up @@ -5,6 +5,7 @@ package metabase

import (
"context"
"math"
"time"

"github.com/zeebo/errs"
Expand Down Expand Up @@ -225,13 +226,23 @@ type IterateLoopSegments struct {
BatchSize int
AsOfSystemTime time.Time
AsOfSystemInterval time.Duration
StartStreamID uuid.UUID
EndStreamID uuid.UUID
}

// Verify verifies segments request fields.
func (opts *IterateLoopSegments) Verify() error {
if opts.BatchSize < 0 {
return ErrInvalidRequest.New("BatchSize is negative")
}
if !opts.EndStreamID.IsZero() {
if opts.EndStreamID.Less(opts.StartStreamID) {
return ErrInvalidRequest.New("EndStreamID is smaller than StartStreamID")
}
if opts.StartStreamID == opts.EndStreamID {
return ErrInvalidRequest.New("StartStreamID and EndStreamID must be different")
}
}
return nil
}

Expand All @@ -251,7 +262,21 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments,
batchSize: opts.BatchSize,

curIndex: 0,
cursor: loopSegmentIteratorCursor{},
cursor: loopSegmentIteratorCursor{
StartStreamID: opts.StartStreamID,
EndStreamID: opts.EndStreamID,
},
}

if !opts.StartStreamID.IsZero() {
// uses MaxInt32 instead of MaxUint32 because position is an int8 in db.
it.cursor.StartPosition = SegmentPosition{math.MaxInt32, math.MaxInt32}
}
if it.cursor.EndStreamID.IsZero() {
it.cursor.EndStreamID, err = maxUUID()
if err != nil {
return err
}
}

loopIteratorBatchSizeLimit.Ensure(&it.batchSize)
Expand Down Expand Up @@ -288,8 +313,9 @@ type loopSegmentIterator struct {
}

type loopSegmentIteratorCursor struct {
StreamID uuid.UUID
Position SegmentPosition
StartStreamID uuid.UUID
StartPosition SegmentPosition
EndStreamID uuid.UUID
}

// Next returns true if there was another item and copy it in item.
Expand Down Expand Up @@ -329,8 +355,8 @@ func (it *loopSegmentIterator) Next(ctx context.Context, item *LoopSegmentEntry)
}

it.curIndex++
it.cursor.StreamID = item.StreamID
it.cursor.Position = item.Position
it.cursor.StartStreamID = item.StreamID
it.cursor.StartPosition = item.Position

return true
}
Expand All @@ -351,11 +377,11 @@ func (it *loopSegmentIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows,
FROM segments
`+it.db.asOfTime(it.asOfSystemTime, it.asOfSystemInterval)+`
WHERE
(stream_id, position) > ($1, $2)
(stream_id, position) > ($1, $2) AND stream_id <= $4
ORDER BY (stream_id, position) ASC
LIMIT $3
`, it.cursor.StreamID, it.cursor.Position,
it.batchSize,
`, it.cursor.StartStreamID, it.cursor.StartPosition.Encode(),
it.batchSize, it.cursor.EndStreamID,
)
}

Expand Down Expand Up @@ -383,3 +409,8 @@ func (it *loopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEn

return nil
}

func maxUUID() (uuid.UUID, error) {
maxUUID, err := uuid.FromString("ffffffff-ffff-ffff-ffff-ffffffffffff")
return maxUUID, err
}
199 changes: 199 additions & 0 deletions satellite/metabase/loop_test.go
Expand Up @@ -4,6 +4,7 @@
package metabase_test

import (
"sort"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -335,6 +336,46 @@ func TestIterateLoopSegments(t *testing.T) {
metabasetest.Verify{}.Check(ctx, t, db)
})

t.Run("Wrongly defined ranges", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)

startStreamID, err := uuid.New()
require.NoError(t, err)

endStreamID, err := uuid.New()
require.NoError(t, err)

if startStreamID.Less(endStreamID) {
startStreamID, endStreamID = endStreamID, startStreamID
}

metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
StartStreamID: startStreamID,
EndStreamID: endStreamID,
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "EndStreamID is smaller than StartStreamID",
}.Check(ctx, t, db)

metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
StartStreamID: startStreamID,
EndStreamID: startStreamID,
},
ErrClass: &metabase.ErrInvalidRequest,
ErrText: "StartStreamID and EndStreamID must be different",
}.Check(ctx, t, db)
metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
StartStreamID: startStreamID,
},
Result: nil,
}.Check(ctx, t, db)

metabasetest.Verify{}.Check(ctx, t, db)
})

t.Run("no segments", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)

Expand All @@ -360,6 +401,34 @@ func TestIterateLoopSegments(t *testing.T) {
Result: nil,
}.Check(ctx, t, db)

metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
BatchSize: 10,
AsOfSystemTime: time.Now(),
},
Result: nil,
}.Check(ctx, t, db)

startStreamID, err := uuid.New()
require.NoError(t, err)

endStreamID, err := uuid.New()
require.NoError(t, err)

if endStreamID.Less(startStreamID) {
startStreamID, endStreamID = endStreamID, startStreamID
}

metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
BatchSize: 10,
AsOfSystemTime: time.Now(),
StartStreamID: startStreamID,
EndStreamID: endStreamID,
},
Result: nil,
}.Check(ctx, t, db)

metabasetest.Verify{}.Check(ctx, t, db)
})

Expand Down Expand Up @@ -505,6 +574,136 @@ func TestIterateLoopSegments(t *testing.T) {
Segments: expectedRaw,
}.Check(ctx, t, db)
})

t.Run("streamID range", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)

numberOfObjects := 10
numberOfSegmentsPerObject := 3

expected := make([]metabase.LoopSegmentEntry, numberOfObjects*numberOfSegmentsPerObject)
expectedRaw := make([]metabase.RawSegment, numberOfObjects*numberOfSegmentsPerObject)
expectedObjects := make([]metabase.RawObject, numberOfObjects)

for i := 0; i < numberOfObjects; i++ {
committed := metabasetest.RandObjectStream()

expectedObjects[i] = metabase.RawObject(
metabasetest.CreateObject(ctx, t, db, committed, byte(numberOfSegmentsPerObject)))

for j := 0; j < numberOfSegmentsPerObject; j++ {

entry := metabase.LoopSegmentEntry{
StreamID: committed.StreamID,
Position: metabase.SegmentPosition{0, uint32(j)},
RootPieceID: storj.PieceID{1},
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
CreatedAt: now,
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: int64(j) * 512,
Redundancy: metabasetest.DefaultRedundancy,
}
expected[i*numberOfSegmentsPerObject+j] = entry
expectedRaw[i*numberOfSegmentsPerObject+j] = metabase.RawSegment{
StreamID: entry.StreamID,
Position: entry.Position,
RootPieceID: entry.RootPieceID,
Pieces: entry.Pieces,
CreatedAt: entry.CreatedAt,
EncryptedSize: entry.EncryptedSize,
PlainSize: entry.PlainSize,
PlainOffset: entry.PlainOffset,
Redundancy: entry.Redundancy,

EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
}
}
}
sort.Slice(expected, func(i, j int) bool {
if expected[i].StreamID.Less(expected[j].StreamID) {
return true
}
if expected[i].StreamID == expected[j].StreamID {
return expected[i].Position.Less(expected[j].Position)
}
return false
})

sort.Slice(expectedObjects, func(i, j int) bool {
return expectedObjects[i].StreamID.Less(expectedObjects[j].StreamID)
})

{ // StartStreamID set
metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
StartStreamID: expectedObjects[0].StreamID,
},
Result: expected[numberOfSegmentsPerObject:],
}.Check(ctx, t, db)

metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
StartStreamID: expectedObjects[0].StreamID,
BatchSize: 1,
},
Result: expected[numberOfSegmentsPerObject:],
}.Check(ctx, t, db)
}

{ // EndStreamID set
metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
EndStreamID: expectedObjects[3].StreamID,
},
Result: expected[:4*numberOfSegmentsPerObject],
}.Check(ctx, t, db)

metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
BatchSize: 1,
EndStreamID: expectedObjects[3].StreamID,
},
Result: expected[:4*numberOfSegmentsPerObject],
}.Check(ctx, t, db)

metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
BatchSize: 1,
EndStreamID: expectedObjects[numberOfObjects-1].StreamID,
},
Result: expected,
}.Check(ctx, t, db)
}

{ // StartStreamID and EndStreamID set
metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
AsOfSystemTime: time.Now(),
StartStreamID: expectedObjects[0].StreamID,
EndStreamID: expectedObjects[5].StreamID,
},
Result: expected[numberOfSegmentsPerObject : 6*numberOfSegmentsPerObject],
}.Check(ctx, t, db)

metabasetest.IterateLoopSegments{
Opts: metabase.IterateLoopSegments{
BatchSize: 1,
AsOfSystemTime: time.Now(),
StartStreamID: expectedObjects[0].StreamID,
EndStreamID: expectedObjects[5].StreamID,
},
Result: expected[numberOfSegmentsPerObject : 6*numberOfSegmentsPerObject],
}.Check(ctx, t, db)
}

metabasetest.Verify{
Objects: expectedObjects,
Segments: expectedRaw,
}.Check(ctx, t, db)
})
})
}

Expand Down
3 changes: 3 additions & 0 deletions satellite/metabase/metabasetest/test.go
Expand Up @@ -399,6 +399,9 @@ func (step IterateLoopSegments) Check(ctx *testcontext.Context, t testing.TB, db
}

sort.Slice(step.Result, func(i, j int) bool {
if step.Result[i].StreamID == step.Result[j].StreamID {
return step.Result[i].Position.Less(step.Result[j].Position)
}
return bytes.Compare(step.Result[i].StreamID[:], step.Result[j].StreamID[:]) < 0
})
diff := cmp.Diff(step.Result, result, DefaultTimeDiff())
Expand Down

0 comments on commit 35f74b7

Please sign in to comment.