diff --git a/satellite/metabase/adapter.go b/satellite/metabase/adapter.go index 928941e4329a..d0d74552ca58 100644 --- a/satellite/metabase/adapter.go +++ b/satellite/metabase/adapter.go @@ -8,6 +8,7 @@ import ( "go.uber.org/zap" + "storj.io/common/dbutil" "storj.io/common/tagsql" ) @@ -16,6 +17,7 @@ import ( type Adapter interface { BeginObjectNextVersion(context.Context, BeginObjectNextVersion, *Object) error GetObjectLastCommitted(ctx context.Context, opts GetObjectLastCommitted, object *Object) error + IterateLoopSegments(ctx context.Context, aliasCache *NodeAliasCache, opts IterateLoopSegments, fn func(context.Context, LoopSegmentsIterator) error) error TestingBeginObjectExactVersion(ctx context.Context, opts BeginObjectExactVersion, object *Object) error EnsureNodeAliases(ctx context.Context, opts EnsureNodeAliases) error @@ -28,9 +30,9 @@ type Adapter interface { // PostgresAdapter uses Cockroach related SQL queries. type PostgresAdapter struct { - log *zap.Logger - db tagsql.DB - aliasCache *NodeAliasCache + log *zap.Logger + db tagsql.DB + impl dbutil.Implementation } var _ Adapter = &PostgresAdapter{} diff --git a/satellite/metabase/db.go b/satellite/metabase/db.go index d3751f6861a4..3dcb28a69fd0 100644 --- a/satellite/metabase/db.go +++ b/satellite/metabase/db.go @@ -98,16 +98,16 @@ func Open(ctx context.Context, log *zap.Logger, connstr string, config Config, e switch impl { case dbutil.Postgres: db.adapters = append(db.adapters, &PostgresAdapter{ - log: log, - db: rawdb, - aliasCache: db.aliasCache, + log: log, + db: rawdb, + impl: impl, }) case dbutil.Cockroach: db.adapters = append(db.adapters, &CockroachAdapter{ PostgresAdapter{ - log: log, - db: rawdb, - aliasCache: db.aliasCache, + log: log, + db: rawdb, + impl: impl, }, }) default: diff --git a/satellite/metabase/loop.go b/satellite/metabase/loop.go index 12cc20aeef93..fb1a7a62c689 100644 --- a/satellite/metabase/loop.go +++ b/satellite/metabase/loop.go @@ -5,10 +5,13 @@ package metabase import ( "context" + "errors" "math" "time" + "cloud.google.com/go/spanner" "github.com/zeebo/errs" + "google.golang.org/api/iterator" "storj.io/common/storj" "storj.io/common/tagsql" @@ -63,7 +66,8 @@ func (db *DB) IterateLoopObjects(ctx context.Context, opts IterateLoopObjects, f } it := &loopIterator{ - db: db, + db: db, + aliasCache: db.aliasCache, batchSize: opts.BatchSize, @@ -103,7 +107,8 @@ type loopIterator struct { cursor loopIterateCursor // failErr is set when either scan or next query fails during iteration. - failErr error + failErr error + aliasCache *NodeAliasCache } type loopIterateCursor struct { @@ -257,8 +262,14 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments, loopIteratorBatchSizeLimit.Ensure(&opts.BatchSize) - it := &loopSegmentIterator{ - db: db, + return db.ChooseAdapter(uuid.UUID{}).IterateLoopSegments(ctx, db.aliasCache, opts, fn) +} + +// IterateLoopSegments implements Adapter. +func (p *PostgresAdapter) IterateLoopSegments(ctx context.Context, aliasCache *NodeAliasCache, opts IterateLoopSegments, fn func(context.Context, LoopSegmentsIterator) error) (err error) { + it := &postgresLoopSegmentIterator{ + db: p, + aliasCache: aliasCache, asOfSystemTime: opts.AsOfSystemTime, asOfSystemInterval: opts.AsOfSystemInterval, @@ -295,9 +306,16 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments, return fn(ctx, it) } -// loopSegmentIterator enables iteration of all segments in metabase. -type loopSegmentIterator struct { - db *DB +type loopSegmentIteratorCursor struct { + StartStreamID uuid.UUID + StartPosition SegmentPosition + EndStreamID uuid.UUID +} + +// postgresLoopSegmentIterator enables iteration of all segments in metabase. +type postgresLoopSegmentIterator struct { + db *PostgresAdapter + aliasCache *NodeAliasCache batchSize int // batchPieces are reused between result pages to reduce memory consumption @@ -314,14 +332,8 @@ type loopSegmentIterator struct { failErr error } -type loopSegmentIteratorCursor struct { - StartStreamID uuid.UUID - StartPosition SegmentPosition - EndStreamID uuid.UUID -} - // Next returns true if there was another item and copy it in item. -func (it *loopSegmentIterator) Next(ctx context.Context, item *LoopSegmentEntry) bool { +func (it *postgresLoopSegmentIterator) Next(ctx context.Context, item *LoopSegmentEntry) bool { next := it.curRows.Next() if !next { if it.curIndex < it.batchSize { @@ -363,7 +375,7 @@ func (it *loopSegmentIterator) Next(ctx context.Context, item *LoopSegmentEntry) return true } -func (it *loopSegmentIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) { +func (it *postgresLoopSegmentIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) { defer mon.Task()(&ctx)(&err) return it.db.db.QueryContext(ctx, ` @@ -377,7 +389,7 @@ func (it *loopSegmentIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, remote_alias_pieces, placement FROM segments - `+it.db.asOfTime(it.asOfSystemTime, it.asOfSystemInterval)+` + `+it.db.impl.AsOfSystemInterval(it.asOfSystemInterval)+` WHERE (stream_id, position) > ($1, $2) AND stream_id <= $4 ORDER BY (stream_id, position) ASC @@ -388,7 +400,7 @@ func (it *loopSegmentIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, } // scanItem scans doNextQuery results into LoopSegmentEntry. -func (it *loopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEntry) error { +func (it *postgresLoopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEntry) error { err := it.curRows.Scan( &item.StreamID, &item.Position, &item.CreatedAt, &item.ExpiresAt, &item.RepairedAt, @@ -410,10 +422,188 @@ func (it *loopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEn it.batchPieces[it.curIndex] = it.batchPieces[it.curIndex][:len(item.AliasPieces)] } - item.Pieces, err = it.db.aliasCache.convertAliasesToPieces(ctx, item.AliasPieces, it.batchPieces[it.curIndex]) + item.Pieces, err = it.aliasCache.convertAliasesToPieces(ctx, item.AliasPieces, it.batchPieces[it.curIndex]) if err != nil { return Error.New("failed to convert aliases to pieces: %w", err) } return nil } + +type spannerLoopSegmentIterator struct { + db *SpannerAdapter + + batchSize int + // TODO(spanner) would be nice to have it at some point + // batchPieces are reused between result pages to reduce memory consumption + // batchPieces []Pieces + + curIndex int + curRows *spanner.RowIterator + curRow *spanner.Row + cursor loopSegmentIteratorCursor + + // failErr is set when either scan or next query fails during iteration. + failErr error + aliasCache *NodeAliasCache +} + +// Next returns true if there was another item and copy it in item. +func (it *spannerLoopSegmentIterator) Next(ctx context.Context, item *LoopSegmentEntry) bool { + var err error + it.curRow, err = it.curRows.Next() + next := !errors.Is(err, iterator.Done) + if err != nil && next { + it.failErr = errs.Combine(it.failErr, err) + return false + } + + if !next { + if it.curIndex < it.batchSize { + return false + } + + rows := it.doNextQuery(ctx) + + it.curRows.Stop() + + it.curRows = rows + it.curIndex = 0 + + it.curRow, err = it.curRows.Next() + if err != nil { + if !errors.Is(err, iterator.Done) { + it.failErr = errs.Combine(it.failErr, err) + } + return false + } + } + + err = it.scanItem(ctx, item) + if err != nil { + it.failErr = errs.Combine(it.failErr, err) + return false + } + + it.curIndex++ + it.cursor.StartStreamID = item.StreamID + it.cursor.StartPosition = item.Position + return true +} + +func (it *spannerLoopSegmentIterator) doNextQuery(ctx context.Context) (_ *spanner.RowIterator) { + stmt := spanner.Statement{ + SQL: ` + SELECT + stream_id, position, + created_at, expires_at, repaired_at, + root_piece_id, + encrypted_size, + plain_offset, plain_size, + redundancy, + remote_alias_pieces, + placement + FROM segments + WHERE + (stream_id > @streamid OR (stream_id = @streamid AND position > @position)) AND stream_id <= @endstreamid + ORDER BY stream_id ASC, position ASC + LIMIT @batchsize + `, + Params: map[string]interface{}{ + "streamid": it.cursor.StartStreamID.Bytes(), + "position": int64(it.cursor.StartPosition.Encode()), + "endstreamid": it.cursor.EndStreamID.Bytes(), + "batchsize": it.batchSize, + }} + return it.db.client.Single().Query(ctx, stmt) +} + +// IterateLoopSegments implements Adapter. +func (s *SpannerAdapter) IterateLoopSegments(ctx context.Context, aliasCache *NodeAliasCache, opts IterateLoopSegments, fn func(context.Context, LoopSegmentsIterator) error) (err error) { + it := &spannerLoopSegmentIterator{ + db: s, + aliasCache: aliasCache, + + batchSize: opts.BatchSize, + + curIndex: 0, + cursor: loopSegmentIteratorCursor{ + StartStreamID: opts.StartStreamID, + EndStreamID: opts.EndStreamID, + }, + } + + if !opts.StartStreamID.IsZero() { + // uses MaxInt32 instead of MaxUint32 because position is an int8 in db. + it.cursor.StartPosition = SegmentPosition{math.MaxInt32, math.MaxInt32} + } + if it.cursor.EndStreamID.IsZero() { + it.cursor.EndStreamID = uuid.Max() + } + + it.curRows = it.doNextQuery(ctx) + + defer func() { + it.curRows.Stop() + }() + + return fn(ctx, it) +} + +func (it *spannerLoopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEntry) (err error) { + var position int64 + var createdAt time.Time + var repairedAt, expiresAt spanner.NullTime + var encryptedSize, plainOffset, plainSize, redundancy, placement int64 + var streamID, rootPieceID, remoteAliasPieces []byte + if err := it.curRow.Columns(&streamID, &position, + &createdAt, &repairedAt, &expiresAt, + &rootPieceID, + &encryptedSize, &plainOffset, &plainSize, + &redundancy, + &remoteAliasPieces, + &placement, + ); err != nil { + return Error.New("failed to scan segment: %w", err) + } + + item.StreamID, err = uuid.FromBytes(streamID) + if err != nil { + return Error.New("failed to scan segment: %w", err) + } + item.Position = SegmentPositionFromEncoded(uint64(position)) + item.CreatedAt = createdAt + item.RootPieceID, err = storj.PieceIDFromBytes(rootPieceID) + if err != nil { + return Error.New("failed to scan segment: %w", err) + } + if repairedAt.Valid { + item.RepairedAt = &repairedAt.Time + } + if expiresAt.Valid { + item.ExpiresAt = &expiresAt.Time + } + item.EncryptedSize = int32(encryptedSize) + item.PlainOffset = plainOffset + item.PlainSize = int32(plainSize) + rs := redundancyScheme{RedundancyScheme: &storj.RedundancyScheme{}} + if err := rs.Scan(redundancy); err != nil { + return Error.New("failed to scan segment: %w", err) + } + item.Redundancy = *rs.RedundancyScheme + + aliasPieces := AliasPieces{} + err = aliasPieces.SetBytes(remoteAliasPieces) + if err != nil { + return Error.New("failed to scan segment: %w", err) + } + item.AliasPieces = aliasPieces + + item.Placement = storj.PlacementConstraint(placement) + item.Pieces, err = it.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces) + if err != nil { + return Error.New("failed to scan segment: %w", err) + } + + return nil +} diff --git a/satellite/metabase/loop_test.go b/satellite/metabase/loop_test.go index bcdb10c35ade..198b45959484 100644 --- a/satellite/metabase/loop_test.go +++ b/satellite/metabase/loop_test.go @@ -494,73 +494,6 @@ func TestIterateLoopSegments(t *testing.T) { }.Check(ctx, t, db) }) - t.Run("batch size", func(t *testing.T) { - defer metabasetest.DeleteAll{}.Check(ctx, t, db) - numberOfSegments := 5 - - committed := metabasetest.RandObjectStream() - expected := make([]metabase.LoopSegmentEntry, numberOfSegments) - expectedRaw := make([]metabase.RawSegment, numberOfSegments) - for i := 0; i < numberOfSegments; i++ { - rawSegment := metabasetest.DefaultRawSegment(committed, metabase.SegmentPosition{0, uint32(i)}) - expected[i] = metabase.LoopSegmentEntry{ - StreamID: rawSegment.StreamID, - Position: rawSegment.Position, - RootPieceID: rawSegment.RootPieceID, - Pieces: rawSegment.Pieces, - CreatedAt: rawSegment.CreatedAt, - EncryptedSize: rawSegment.EncryptedSize, - PlainSize: rawSegment.PlainSize, - PlainOffset: rawSegment.PlainOffset, - Redundancy: rawSegment.Redundancy, - } - expectedRaw[i] = rawSegment - } - - err := db.TestingBatchInsertSegments(ctx, expectedRaw) - require.NoError(t, err) - - { // less segments than limit - limit := 10 - metabasetest.IterateLoopSegments{ - Opts: metabase.IterateLoopSegments{ - BatchSize: limit, - }, - Result: expected, - }.Check(ctx, t, db) - - metabasetest.IterateLoopSegments{ - Opts: metabase.IterateLoopSegments{ - BatchSize: limit, - AsOfSystemTime: time.Now(), - }, - Result: expected, - }.Check(ctx, t, db) - } - - { // more segments than limit - limit := 3 - metabasetest.IterateLoopSegments{ - Opts: metabase.IterateLoopSegments{ - BatchSize: limit, - }, - Result: expected, - }.Check(ctx, t, db) - - metabasetest.IterateLoopSegments{ - Opts: metabase.IterateLoopSegments{ - BatchSize: limit, - AsOfSystemTime: time.Now(), - }, - Result: expected, - }.Check(ctx, t, db) - } - - metabasetest.Verify{ - Segments: expectedRaw, - }.Check(ctx, t, db) - }) - t.Run("streamID range", func(t *testing.T) { defer metabasetest.DeleteAll{}.Check(ctx, t, db) @@ -696,6 +629,79 @@ func TestIterateLoopSegments(t *testing.T) { }) } +// TODO(spanner) only single test was migrated to spanner using TestingBatchInsertSegments +// because we need upload methods to run rest of tests without bigger modifications. +func TestIterateLoopSegmentsWithSpanner(t *testing.T) { + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + t.Run("batch size", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + numberOfSegments := 5 + + committed := metabasetest.RandObjectStream() + expected := make([]metabase.LoopSegmentEntry, numberOfSegments) + expectedRaw := make([]metabase.RawSegment, numberOfSegments) + for i := 0; i < numberOfSegments; i++ { + rawSegment := metabasetest.DefaultRawSegment(committed, metabase.SegmentPosition{0, uint32(i)}) + expected[i] = metabase.LoopSegmentEntry{ + StreamID: rawSegment.StreamID, + Position: rawSegment.Position, + RootPieceID: rawSegment.RootPieceID, + Pieces: rawSegment.Pieces, + CreatedAt: rawSegment.CreatedAt, + EncryptedSize: rawSegment.EncryptedSize, + PlainSize: rawSegment.PlainSize, + PlainOffset: rawSegment.PlainOffset, + Redundancy: rawSegment.Redundancy, + } + expectedRaw[i] = rawSegment + } + + err := db.TestingBatchInsertSegments(ctx, expectedRaw) + require.NoError(t, err) + + { // less segments than limit + limit := 10 + metabasetest.IterateLoopSegments{ + Opts: metabase.IterateLoopSegments{ + BatchSize: limit, + }, + Result: expected, + }.Check(ctx, t, db) + + metabasetest.IterateLoopSegments{ + Opts: metabase.IterateLoopSegments{ + BatchSize: limit, + AsOfSystemTime: time.Now(), + }, + Result: expected, + }.Check(ctx, t, db) + } + + { // more segments than limit + limit := 3 + metabasetest.IterateLoopSegments{ + Opts: metabase.IterateLoopSegments{ + BatchSize: limit, + }, + Result: expected, + }.Check(ctx, t, db) + + metabasetest.IterateLoopSegments{ + Opts: metabase.IterateLoopSegments{ + BatchSize: limit, + AsOfSystemTime: time.Now(), + }, + Result: expected, + }.Check(ctx, t, db) + } + + metabasetest.Verify{ + Segments: expectedRaw, + }.Check(ctx, t, db) + }) + }, metabasetest.WithSpanner()) +} + func loopObjectEntryFromRaw(m metabase.RawObject) metabase.LoopObjectEntry { return metabase.LoopObjectEntry{ ObjectStream: m.ObjectStream, diff --git a/satellite/metabase/raw.go b/satellite/metabase/raw.go index cfc70d2816a6..0ddf8c60c040 100644 --- a/satellite/metabase/raw.go +++ b/satellite/metabase/raw.go @@ -490,7 +490,7 @@ func (p *PostgresAdapter) TestingBatchInsertSegments(ctx context.Context, aliasC aliases = aliases[:len(batch)] for i, segment := range batch { - aliases[i], err = p.aliasCache.EnsurePiecesToAliases(ctx, segment.Pieces) + aliases[i], err = aliasCache.EnsurePiecesToAliases(ctx, segment.Pieces) if err != nil { return err }