Skip to content

Commit

Permalink
satellite/metabase: split out TestingGetAllObjects for db adapters
Browse files Browse the repository at this point in the history
This includes a fix for TestingAllSegments to aggregate results from all
adapters, since we are making that the behavior for TestingAllObjects
and TestingGetState here.

Change-Id: I16410c37e8f2267c00eb9168c805cbfdf69103a2
  • Loading branch information
thepaul authored and Storj Robot committed May 6, 2024
1 parent 094d27b commit 6d475df
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 24 deletions.
1 change: 1 addition & 0 deletions satellite/metabase/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Adapter interface {
ListNodeAliases(ctx context.Context) (_ []NodeAliasEntry, err error)

TestingBatchInsertSegments(ctx context.Context, aliasCache *NodeAliasCache, segments []RawSegment) (err error)
TestingGetAllObjects(ctx context.Context) (_ []RawObject, err error)
TestingGetAllSegments(ctx context.Context, aliasCache *NodeAliasCache) (_ []RawSegment, err error)
TestingDeleteAll(ctx context.Context) (err error)
}
Expand Down
29 changes: 19 additions & 10 deletions satellite/metabase/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,13 +342,18 @@ func (db *DB) BucketEmpty(ctx context.Context, opts BucketEmpty) (empty bool, er
func (db *DB) TestingAllObjects(ctx context.Context) (objects []Object, err error) {
defer mon.Task()(&ctx)(&err)

rawObjects, err := db.testingGetAllObjects(ctx)
if err != nil {
return nil, Error.Wrap(err)
var rawObjects []RawObject
for _, a := range db.adapters {
adapterObjects, err := a.TestingGetAllObjects(ctx)
if err != nil {
return nil, Error.Wrap(err)
}
rawObjects = append(rawObjects, adapterObjects...)
}

for _, o := range rawObjects {
objects = append(objects, Object(o))
sortRawObjects(rawObjects)
objects = make([]Object, len(rawObjects))
for i, o := range rawObjects {
objects[i] = Object(o)
}

return objects, nil
Expand All @@ -359,11 +364,15 @@ func (db *DB) TestingAllObjects(ctx context.Context) (objects []Object, err erro
func (db *DB) TestingAllSegments(ctx context.Context) (segments []Segment, err error) {
defer mon.Task()(&ctx)(&err)

rawSegments, err := db.ChooseAdapter(uuid.UUID{}).TestingGetAllSegments(ctx, db.aliasCache)
if err != nil {
return nil, Error.Wrap(err)
var rawSegments []RawSegment
for _, a := range db.adapters {
adapterSegments, err := a.TestingGetAllSegments(ctx, db.aliasCache)
if err != nil {
return nil, Error.Wrap(err)
}
rawSegments = append(rawSegments, adapterSegments...)
}

sortRawSegments(rawSegments)
for _, s := range rawSegments {
segments = append(segments, Segment(s))
}
Expand Down
51 changes: 37 additions & 14 deletions satellite/metabase/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package metabase
import (
"context"
"errors"
"sort"
"time"

"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -88,19 +89,40 @@ type RawState struct {
Segments []RawSegment
}

func sortRawObjects(objects []RawObject) {
sort.Slice(objects, func(i, j int) bool {
return objects[i].ObjectStream.Less(objects[j].ObjectStream)
})
}

func sortRawSegments(segments []RawSegment) {
sort.Slice(segments, func(i, j int) bool {
if segments[i].StreamID == segments[j].StreamID {
return segments[i].Position.Less(segments[j].Position)
}
return segments[i].StreamID.Less(segments[j].StreamID)
})
}

// TestingGetState returns the state of the database.
func (db *DB) TestingGetState(ctx context.Context) (_ *RawState, err error) {
state := &RawState{}

state.Objects, err = db.testingGetAllObjects(ctx)
if err != nil {
return nil, Error.New("GetState: %w", err)
}
for _, a := range db.adapters {
objects, err := a.TestingGetAllObjects(ctx)
if err != nil {
return nil, Error.New("GetState: %w", err)
}
state.Objects = append(state.Objects, objects...)

state.Segments, err = db.ChooseAdapter(uuid.UUID{}).TestingGetAllSegments(ctx, db.aliasCache)
if err != nil {
return nil, Error.New("GetState: %w", err)
segments, err := a.TestingGetAllSegments(ctx, db.aliasCache)
if err != nil {
return nil, Error.New("GetState: %w", err)
}
state.Segments = append(state.Segments, segments...)
}
sortRawObjects(state.Objects)
sortRawSegments(state.Segments)

return state, nil
}
Expand Down Expand Up @@ -137,15 +159,11 @@ func (s *SpannerAdapter) TestingDeleteAll(ctx context.Context) (err error) {
return Error.Wrap(err)
}

// testingGetAllObjects returns the state of the database.
func (db *DB) testingGetAllObjects(ctx context.Context) (_ []RawObject, err error) {
if db.db == nil {
// TODO: implement it with adapter and support Spanner.
return nil, nil
}
// TestingGetAllObjects returns the state of the database.
func (p *PostgresAdapter) TestingGetAllObjects(ctx context.Context) (_ []RawObject, err error) {
objs := []RawObject{}

rows, err := db.db.QueryContext(ctx, `
rows, err := p.db.QueryContext(ctx, `
WITH ignore_full_scan_for_test AS (SELECT 1)
SELECT
project_id, bucket_name, object_key, version, stream_id,
Expand Down Expand Up @@ -203,6 +221,11 @@ func (db *DB) testingGetAllObjects(ctx context.Context) (_ []RawObject, err erro
return objs, nil
}

// TestingGetAllObjects returns the state of the database.
func (s *SpannerAdapter) TestingGetAllObjects(ctx context.Context) (_ []RawObject, err error) {
return nil, nil
}

// TestingBatchInsertObjects batch inserts objects for testing.
// This implementation does no verification on the correctness of objects.
func (db *DB) TestingBatchInsertObjects(ctx context.Context, objects []RawObject) (err error) {
Expand Down

0 comments on commit 6d475df

Please sign in to comment.