Skip to content

Commit

Permalink
satellite/audit: implement rangedloop observer
Browse files Browse the repository at this point in the history
This change implements the ranged loop observer to replace the audit
chore that builds the audit queue.

The strategy employed by this change is to use a collector for each
segment range to  build separate per-node segment reservoirs that are
then merge them during the join step.

In previous observer migrations, there were only a handful of tests so
the strategy was to duplicate them. In this package, there are dozens
of tests that utilize the chore. To reduce code churn and maintenance
burden until the chore is removed, this change introduces a helper that
runs tests under both the chore and observer, providing a pair of
functions that can be used to pause or run the queueing function.

#5232

Change-Id: I8bb4b4e55cf98b1aac9f26307e3a9a355cb3f506
  • Loading branch information
azdagron committed Jan 3, 2023
1 parent 9544a67 commit 590d443
Show file tree
Hide file tree
Showing 12 changed files with 441 additions and 156 deletions.
38 changes: 24 additions & 14 deletions satellite/audit/audit_test.go
Expand Up @@ -25,14 +25,14 @@ import (
// specified bucket are counted correctly for storage node audit bandwidth
// usage and the storage nodes will be paid for that.
func TestAuditOrderLimit(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit

audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)

now := time.Now()

Expand All @@ -46,7 +46,9 @@ func TestAuditOrderLimit(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)

audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)

queueSegment, err := audits.VerifyQueue.Next(ctx)
require.NoError(t, err)
require.False(t, queueSegment.StreamID.IsZero())
Expand Down Expand Up @@ -77,14 +79,14 @@ func TestAuditOrderLimit(t *testing.T) {

// Minimal test to verify that copies aren't audited.
func TestAuditSkipsRemoteCopies(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit

audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)

uplink := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)
Expand All @@ -105,7 +107,9 @@ func TestAuditSkipsRemoteCopies(t *testing.T) {
_, err = project.CopyObject(ctx, "testbucket", "testobj1", "testbucket", "copy", nil)
require.NoError(t, err)

audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)

queue := audits.VerifyQueue

auditSegments := make([]audit.Segment, 0, 2)
Expand All @@ -131,7 +135,9 @@ func TestAuditSkipsRemoteCopies(t *testing.T) {
err = uplink.DeleteObject(ctx, satellite, "testbucket", "testobj2")
require.NoError(t, err)

audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)

queue = audits.VerifyQueue

// verify that the copy is being audited
Expand All @@ -146,14 +152,14 @@ func TestAuditSkipsRemoteCopies(t *testing.T) {

// Minimal test to verify that inline objects are not audited even if they are copies.
func TestAuditSkipsInlineCopies(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit

audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)

uplink := planet.Uplinks[0]
testData := testrand.Bytes(1 * memory.KiB)
Expand All @@ -170,7 +176,9 @@ func TestAuditSkipsInlineCopies(t *testing.T) {
_, err = project.CopyObject(ctx, "testbucket", "testobj1", "testbucket", "copy", nil)
require.NoError(t, err)

audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)

queue := audits.VerifyQueue
_, err = queue.Next(ctx)
require.Truef(t, audit.ErrEmptyQueue.Has(err), "unexpected error %v", err)
Expand All @@ -181,7 +189,9 @@ func TestAuditSkipsInlineCopies(t *testing.T) {
err = uplink.DeleteObject(ctx, satellite, "testbucket", "testobj2")
require.NoError(t, err)

audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)

queue = audits.VerifyQueue
_, err = queue.Next(ctx)
require.Truef(t, audit.ErrEmptyQueue.Has(err), "unexpected error %v", err)
Expand Down
16 changes: 16 additions & 0 deletions satellite/audit/collector.go
Expand Up @@ -53,3 +53,19 @@ func (collector *Collector) RemoteSegment(ctx context.Context, segment *segmentl
func (collector *Collector) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error) {
return nil
}

// Process performs per-node reservoir sampling on remote segments for addition into the audit queue.
func (collector *Collector) Process(ctx context.Context, segments []segmentloop.Segment) (err error) {
for _, segment := range segments {
// The reservoir ends up deferencing and copying the segment internally
// but that's not obvious, so alias the loop variable.
segment := segment
if segment.Inline() {
continue
}
if err := collector.RemoteSegment(ctx, &segment); err != nil {
return err
}
}
return nil
}
60 changes: 60 additions & 0 deletions satellite/audit/common_test.go
@@ -0,0 +1,60 @@
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.

package audit_test

import (
"context"
"testing"

"go.uber.org/zap"

"storj.io/common/testcontext"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite"
)

type pauseQueueingFunc = func(satellite *testplanet.Satellite)
type runQueueingOnceFunc = func(ctx context.Context, satellite *testplanet.Satellite) error

// testWithChoreAndObserver runs an audit test for both the chore and observer.
// It provides functions that the test can use to pause and run the queueing
// done by the chore or observer.
func testWithChoreAndObserver(t *testing.T, planetConfig testplanet.Config, run func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc)) {
t.Run("Chore", func(t *testing.T) {
planetConfig := planetConfig
testplanet.Run(t, planetConfig, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
t.Helper()
run(t, ctx, planet,
func(satellite *testplanet.Satellite) {
satellite.Audit.Chore.Loop.Pause()
},
func(ctx context.Context, satellite *testplanet.Satellite) error {
satellite.Audit.Chore.Loop.TriggerWait()
return nil
},
)
})
})

t.Run("Observer", func(t *testing.T) {
planetConfig := planetConfig
reconfigureSatellite := planetConfig.Reconfigure.Satellite
planetConfig.Reconfigure.Satellite = func(log *zap.Logger, index int, config *satellite.Config) {
if reconfigureSatellite != nil {
reconfigureSatellite(log, index, config)
}
config.Audit.UseRangedLoop = true
}
testplanet.Run(t, planetConfig, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
t.Helper()
run(t, ctx, planet,
func(satellite *testplanet.Satellite) {},
func(ctx context.Context, satellite *testplanet.Satellite) error {
_, err := satellite.RangedLoop.RangedLoop.Service.RunOnce(ctx)
return err
},
)
})
})
}
20 changes: 12 additions & 8 deletions satellite/audit/getshare_test.go
Expand Up @@ -72,22 +72,24 @@ func reformVerifierWithMockConnector(t testing.TB, sat *testplanet.Satellite, mo
}

func TestGetShareDoesNameLookupIfNecessary(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
testSatellite := planet.Satellites[0]
audits := testSatellite.Audit

audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(testSatellite)

ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)

err := ul.Upload(ctx, testSatellite, "test.bucket", "some//path", testData)
require.NoError(t, err)

audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, testSatellite)
require.NoError(t, err)

queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -126,22 +128,24 @@ func TestGetShareDoesNameLookupIfNecessary(t *testing.T) {
}

func TestGetSharePrefers(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
testSatellite := planet.Satellites[0]
audits := testSatellite.Audit

audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(testSatellite)

ul := planet.Uplinks[0]
testData := testrand.Bytes(8 * memory.KiB)

err := ul.Upload(ctx, testSatellite, "test.bucket", "some//path", testData)
require.NoError(t, err)

audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, testSatellite)
require.NoError(t, err)

queue := audits.VerifyQueue
queueSegment, err := queue.Next(ctx)
require.NoError(t, err)
Expand Down
14 changes: 8 additions & 6 deletions satellite/audit/integration_test.go
Expand Up @@ -20,19 +20,19 @@ import (
)

func TestChoreAndWorkerIntegration(t *testing.T) {
testplanet.Run(t, testplanet.Config{
testWithChoreAndObserver(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 5, UplinkCount: 1,
Reconfigure: testplanet.Reconfigure{
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
// disable reputation write cache so changes are immediate
config.Reputation.FlushInterval = 0
},
},
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet, pauseQueueing pauseQueueingFunc, runQueueingOnce runQueueingOnceFunc) {
satellite := planet.Satellites[0]
audits := satellite.Audit
audits.Worker.Loop.Pause()
audits.Chore.Loop.Pause()
pauseQueueing(satellite)

ul := planet.Uplinks[0]

Expand All @@ -44,11 +44,12 @@ func TestChoreAndWorkerIntegration(t *testing.T) {
require.NoError(t, err)
}

audits.Chore.Loop.TriggerWait()
err := runQueueingOnce(ctx, satellite)
require.NoError(t, err)

queue := audits.VerifyQueue

uniqueSegments := make(map[audit.Segment]struct{})
var err error
var segment audit.Segment
var segmentCount int
for {
Expand All @@ -67,7 +68,8 @@ func TestChoreAndWorkerIntegration(t *testing.T) {
requireAuditQueueEmpty(ctx, t, audits.VerifyQueue)

// Repopulate the queue for the worker.
audits.Chore.Loop.TriggerWait()
err = runQueueingOnce(ctx, satellite)
require.NoError(t, err)

// Make sure the worker processes the audit queue.
audits.Worker.Loop.TriggerWait()
Expand Down

0 comments on commit 590d443

Please sign in to comment.