Skip to content

Commit

Permalink
satellite/metadabase/rangedloop: stream affinity for test provider
Browse files Browse the repository at this point in the history
Some observers assume that they will observe all the segments for a
given stream, and that they will observe those segments in a sequential
stream over one or more iterations.

This change updates the range provider from rangedlooptest to provide
these guarantees.

The change also removes the Mock suffix from the provider/splitter types
since the package name (rangedlooptest) implies that the type is a test
double.

Change-Id: I927c409807e305787abcde57427baac22f663eaa
  • Loading branch information
azdagron authored and Storj Robot committed Dec 9, 2022
1 parent ba7d2c2 commit 633ab8d
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 70 deletions.
126 changes: 126 additions & 0 deletions satellite/metabase/rangedloop/rangedlooptest/provider.go
@@ -0,0 +1,126 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.

package rangedlooptest

import (
"context"
"fmt"
"math"
"sort"

"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/segmentloop"
)

var _ rangedloop.RangeSplitter = (*RangeSplitter)(nil)

// RangeSplitter allows to iterate over segments from an in-memory source.
type RangeSplitter struct {
Segments []segmentloop.Segment
}

var _ rangedloop.SegmentProvider = (*SegmentProvider)(nil)

// SegmentProvider allows to iterate over segments from an in-memory source.
type SegmentProvider struct {
Segments []segmentloop.Segment

batchSize int
}

// CreateRanges splits the segments into equal ranges.
func (m *RangeSplitter) CreateRanges(nRanges int, batchSize int) ([]rangedloop.SegmentProvider, error) {
// The segments for a given stream must be handled by a single segment
// provider. Split the segments into streams.
streams := streamsFromSegments(m.Segments)

// Break up the streams into ranges
rangeSize := int(math.Ceil(float64(len(streams)) / float64(nRanges)))

rangeProviders := []rangedloop.SegmentProvider{}
for i := 0; i < nRanges; i++ {
offset := min(i*rangeSize, len(streams))
end := min(offset+rangeSize, len(streams))
rangeProviders = append(rangeProviders, &SegmentProvider{
Segments: segmentsFromStreams(streams[offset:end]),
batchSize: batchSize,
})
}

return rangeProviders, nil
}

// Iterate allows to loop over the segments stored in the provider.
func (m *SegmentProvider) Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error {
for offset := 0; offset < len(m.Segments); offset += m.batchSize {
end := min(offset+m.batchSize, len(m.Segments))
err := fn(m.Segments[offset:end])
if err != nil {
return err
}
}

return nil
}

func min(x, y int) int {
if x < y {
return x
}
return y
}

func streamsFromSegments(segments []segmentloop.Segment) [][]segmentloop.Segment {
// Duplicate and sort the segments by stream ID
segments = append([]segmentloop.Segment(nil), segments...)
for i, segment := range segments {
fmt.Println("BEFORE:", i, segment.StreamID, segment.Position)
}
sort.Slice(segments, func(i int, j int) bool {
idcmp := segments[i].StreamID.Compare(segments[j].StreamID)
switch {
case idcmp < 0:
return true
case idcmp > 0:
return false
default:
return segments[i].Position.Less(segments[j].Position)
}
})
for i, segment := range segments {
fmt.Println("AFTER:", i, segment.StreamID, segment.Position)
}
// Break up the sorted segments into streams
var streams [][]segmentloop.Segment
var stream []segmentloop.Segment
for _, segment := range segments {
if len(stream) > 0 && stream[0].StreamID != segment.StreamID {
// Stream ID changed; push and reset stream
streams = append(streams, stream)
stream = nil
}
stream = append(stream, segment)
}

// Append the last stream (will be empty if there were no segments)
if len(stream) > 0 {
streams = append(streams, stream)
}

for i, stream := range streams {
for j, segment := range stream {
fmt.Println("STREAM:", i, j, segment.StreamID, segment.Position)
}
}

return streams
}

func segmentsFromStreams(streams [][]segmentloop.Segment) []segmentloop.Segment {
var segments []segmentloop.Segment
for _, stream := range streams {
segments = append(segments, stream...)
}
return segments
}
144 changes: 144 additions & 0 deletions satellite/metabase/rangedloop/rangedlooptest/provider_test.go
@@ -0,0 +1,144 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.

package rangedlooptest

import (
"context"
"fmt"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/require"

"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/segmentloop"
)

var (
r = rand.New(rand.NewSource(time.Now().Unix()))
)

func TestSplitter(t *testing.T) {
mkseg := func(streamID byte, pos uint64) segmentloop.Segment {
return segmentloop.Segment{
StreamID: uuid.UUID{0: streamID},
Position: metabase.SegmentPositionFromEncoded(pos),
}
}

mkstream := func(streamID byte, numSegments int) []segmentloop.Segment {
var stream []segmentloop.Segment
for i := 0; i < numSegments; i++ {
stream = append(stream, mkseg(streamID, uint64(numSegments)))
}
return stream
}

intermix := func(segments []segmentloop.Segment) []segmentloop.Segment {
segments = append([]segmentloop.Segment(nil), segments...)
r.Shuffle(len(segments), func(i, j int) {
segments[i], segments[j] = segments[j], segments[i]
})
return segments
}

combine := func(streams ...[]segmentloop.Segment) []segmentloop.Segment {
return segmentsFromStreams(streams)
}

stream1 := mkstream(1, 3)
stream2 := mkstream(2, 5)
stream3 := mkstream(3, 1)
stream4 := mkstream(4, 2)
stream5 := mkstream(5, 4)

for _, tt := range []struct {
desc string
segments []segmentloop.Segment
numRanges int
expectRanges [][]segmentloop.Segment
}{
{
desc: "no segments",
segments: nil,
numRanges: 2,
expectRanges: [][]segmentloop.Segment{
{},
{},
},
},
{
desc: "one stream over two ranges",
segments: stream1,
numRanges: 2,
expectRanges: [][]segmentloop.Segment{
stream1,
{},
},
},
{
desc: "two streams over two ranges",
segments: combine(stream1, stream2),
numRanges: 2,
expectRanges: [][]segmentloop.Segment{
stream1,
stream2,
},
},
{
desc: "three streams over two ranges",
segments: combine(stream1, stream2, stream3),
numRanges: 2,
expectRanges: [][]segmentloop.Segment{
combine(stream1, stream2),
stream3,
},
},
{
desc: "three streams intermixed over two ranges",
segments: intermix(combine(stream1, stream2, stream3)),
numRanges: 2,
expectRanges: [][]segmentloop.Segment{
combine(stream1, stream2),
stream3,
},
},
{
desc: "five streams intermixed over three ranges",
segments: intermix(combine(stream1, stream2, stream3, stream4, stream5)),
numRanges: 3,
expectRanges: [][]segmentloop.Segment{
combine(stream1, stream2),
combine(stream3, stream4),
stream5,
},
},
} {
t.Run(tt.desc, func(t *testing.T) {
const batchSize = 3

splitter := RangeSplitter{Segments: tt.segments}

providers, err := splitter.CreateRanges(tt.numRanges, batchSize)
require.NoError(t, err)

var actualRanges [][]segmentloop.Segment
for _, provider := range providers {
rangeSegments := []segmentloop.Segment{}
err := provider.Iterate(context.Background(), func(segments []segmentloop.Segment) error {
if len(segments) > batchSize {
return fmt.Errorf("iterated segments (%d) larger than batch size (%d)", len(segments), batchSize)
}
rangeSegments = append(rangeSegments, segments...)
return nil
})
require.NoError(t, err)
actualRanges = append(actualRanges, rangeSegments)
}
require.Equal(t, tt.expectRanges, actualRanges)
})
}
}
68 changes: 0 additions & 68 deletions satellite/metabase/rangedloop/rangedlooptest/providermock.go

This file was deleted.

2 changes: 1 addition & 1 deletion satellite/metabase/rangedloop/service_test.go
Expand Up @@ -47,7 +47,7 @@ func RunTest(t *testing.T, parallelism int, nSegments int, nObservers int) {
Parallelism: parallelism,
AsOfSystemInterval: 0,
},
&rangedlooptest.RangeSplitterMock{
&rangedlooptest.RangeSplitter{
Segments: make([]segmentloop.Segment, nSegments),
},
observers,
Expand Down
2 changes: 1 addition & 1 deletion satellite/rangedloop.go
Expand Up @@ -76,7 +76,7 @@ func NewRangedLoop(log *zap.Logger, full *identity.FullIdentity, db DB, metabase

{ // setup ranged loop
// TODO: replace with real segment provider
segments := &rangedlooptest.RangeSplitterMock{}
segments := &rangedlooptest.RangeSplitter{}
peer.RangedLoop.Service = rangedloop.NewService(log.Named("rangedloop"), config.RangedLoop, segments, nil)

peer.Services.Add(lifecycle.Item{
Expand Down

0 comments on commit 633ab8d

Please sign in to comment.