Skip to content

Commit

Permalink
satellite/metabase/rangedloop: cancellation (#5364)
Browse files Browse the repository at this point in the history
Support interruption of the ranged segment loop through context.

Part of #5223

Change-Id: Iae0260e250f8ea33affed95c6592a1f42df384eb
  • Loading branch information
Erikvv committed Jan 5, 2023
1 parent 8c69ee6 commit 1d4411f
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 9 deletions.
2 changes: 1 addition & 1 deletion satellite/metabase/rangedloop/observerstats.go
Expand Up @@ -16,7 +16,7 @@ func sendObserverDurations(observerDurations []ObserverDuration) {
}

// completedObserverStatsInstance is initialized once
// so that hopefully there is never more than once objec instance per satellite process
// so that hopefully there is never more than once object instance per satellite process
// and statistics of different object instances don't clobber each other.
var completedObserverStatsInstance *completedObserverStats

Expand Down
78 changes: 78 additions & 0 deletions satellite/metabase/rangedloop/rangedlooptest/callbackobserver.go
@@ -0,0 +1,78 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.

package rangedlooptest

import (
"context"
"time"

"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/segmentloop"
)

var _ rangedloop.Observer = (*CallbackObserver)(nil)
var _ rangedloop.Partial = (*CallbackObserver)(nil)

// CallbackObserver can be used to easily attach logic to the ranged segment loop during tests.
type CallbackObserver struct {
OnProcess func(context.Context, []segmentloop.Segment) error
OnStart func(context.Context, time.Time) error
OnFork func(context.Context) (rangedloop.Partial, error)
OnJoin func(context.Context, rangedloop.Partial) error
OnFinish func(context.Context) error
}

// Start executes a callback at ranged segment loop start.
func (c *CallbackObserver) Start(ctx context.Context, time time.Time) error {
if c.OnStart == nil {
return nil
}

return c.OnStart(ctx, time)
}

// Fork executes a callback for every segment range at ranged segment loop fork stage.
func (c *CallbackObserver) Fork(ctx context.Context) (rangedloop.Partial, error) {
if c.OnFork == nil {
return c, nil
}

partial, err := c.OnFork(ctx)
if err != nil {
return nil, err
}

if partial == nil {
return c, nil
}

return partial, nil
}

// Join executes a callback for every segment range at ranged segment loop join stage.
func (c *CallbackObserver) Join(ctx context.Context, partial rangedloop.Partial) error {
if c.OnJoin == nil {
return nil
}

return c.OnJoin(ctx, partial)
}

// Finish executes a callback at ranged segment loop end.
func (c *CallbackObserver) Finish(ctx context.Context) error {
if c.OnFinish == nil {
return nil
}

return c.OnFinish(ctx)
}

// Process executes a callback for every batch of segment in the ranged segment loop.
func (c *CallbackObserver) Process(ctx context.Context, segments []segmentloop.Segment) error {
if c.OnProcess == nil {
return nil
}

return c.OnProcess(ctx, segments)
}
36 changes: 36 additions & 0 deletions satellite/metabase/rangedloop/rangedlooptest/infiniteprovider.go
@@ -0,0 +1,36 @@
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.

package rangedlooptest

import (
"context"

"storj.io/storj/satellite/metabase/rangedloop"
"storj.io/storj/satellite/metabase/segmentloop"
)

var _ rangedloop.RangeSplitter = (*InfiniteSegmentProvider)(nil)
var _ rangedloop.SegmentProvider = (*InfiniteSegmentProvider)(nil)

// InfiniteSegmentProvider allow to iterate indefinitely to test service cancellation.
type InfiniteSegmentProvider struct {
}

// CreateRanges splits the segments into equal ranges.
func (m *InfiniteSegmentProvider) CreateRanges(nRanges int, batchSize int) (segmentsProviders []rangedloop.SegmentProvider, err error) {
for i := 0; i < nRanges; i++ {
segmentsProviders = append(segmentsProviders, &InfiniteSegmentProvider{})
}
return segmentsProviders, nil
}

// Iterate allows to loop over the segments stored in the provider.
func (m *InfiniteSegmentProvider) Iterate(ctx context.Context, fn func([]segmentloop.Segment) error) error {
for {
err := fn(make([]segmentloop.Segment, 3))
if err != nil {
return err
}
}
}
26 changes: 18 additions & 8 deletions satellite/metabase/rangedloop/service.go
Expand Up @@ -144,15 +144,13 @@ func createGoroutineClosure(ctx context.Context, rangeProvider SegmentProvider,
defer mon.Task()(&ctx)(&err)

return rangeProvider.Iterate(ctx, func(segments []segmentloop.Segment) error {
for _, state := range states {
start := time.Now()
err := state.rangeObserver.Process(ctx, segments)
if err != nil {
return err
}
state.duration += time.Since(start)
// check for cancellation every segment batch
select {
case <-ctx.Done():
return ctx.Err()
default:
return processBatch(ctx, states, segments)
}
return nil
})
}
}
Expand Down Expand Up @@ -212,3 +210,15 @@ func finishObserver(ctx context.Context, state observerState) (ObserverDuration,
Observer: state.observer,
}, state.observer.Finish(ctx)
}

func processBatch(ctx context.Context, states []*rangeObserverState, segments []segmentloop.Segment) (err error) {
for _, state := range states {
start := time.Now()
err := state.rangeObserver.Process(ctx, segments)
if err != nil {
return err
}
state.duration += time.Since(start)
}
return nil
}
32 changes: 32 additions & 0 deletions satellite/metabase/rangedloop/service_test.go
Expand Up @@ -4,6 +4,7 @@
package rangedloop_test

import (
"context"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -117,3 +118,34 @@ func TestLoopDuration(t *testing.T) {
require.Equal(t, expectedSleep, observerDuration.Duration.Round(sleepIncrement))
}
}

func TestLoopCancellation(t *testing.T) {
parallelism := 2
batchSize := 1
ctx, cancel := context.WithCancel(testcontext.NewWithTimeout(t, time.Second*10))

observers := []rangedloop.Observer{
&rangedlooptest.CountObserver{},
&rangedlooptest.CallbackObserver{
OnProcess: func(ctx context.Context, segments []segmentloop.Segment) error {
// cancel from inside the loop, when it is certain that the loop has started
cancel()
return nil
},
},
}

loopService := rangedloop.NewService(
zaptest.NewLogger(t),
rangedloop.Config{
BatchSize: batchSize,
Parallelism: parallelism,
},
&rangedlooptest.InfiniteSegmentProvider{},
observers,
)

_, err := loopService.RunOnce(ctx)

require.ErrorIs(t, err, context.Canceled)
}

0 comments on commit 1d4411f

Please sign in to comment.