Skip to content

Commit

Permalink
satellite/metabase: spanner segments loop implementation
Browse files Browse the repository at this point in the history
Change-Id: I5023e29dfca99008c72aeb5e8818ac5b6840fbe4
  • Loading branch information
mniewrzal authored and Storj Robot committed Apr 19, 2024
1 parent ed6bf03 commit 8dacb7b
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 95 deletions.
8 changes: 5 additions & 3 deletions satellite/metabase/adapter.go
Expand Up @@ -8,6 +8,7 @@ import (

"go.uber.org/zap"

"storj.io/common/dbutil"
"storj.io/common/tagsql"
)

Expand All @@ -16,6 +17,7 @@ import (
type Adapter interface {
BeginObjectNextVersion(context.Context, BeginObjectNextVersion, *Object) error
GetObjectLastCommitted(ctx context.Context, opts GetObjectLastCommitted, object *Object) error
IterateLoopSegments(ctx context.Context, aliasCache *NodeAliasCache, opts IterateLoopSegments, fn func(context.Context, LoopSegmentsIterator) error) error
TestingBeginObjectExactVersion(ctx context.Context, opts BeginObjectExactVersion, object *Object) error

EnsureNodeAliases(ctx context.Context, opts EnsureNodeAliases) error
Expand All @@ -28,9 +30,9 @@ type Adapter interface {

// PostgresAdapter uses Cockroach related SQL queries.
type PostgresAdapter struct {
log *zap.Logger
db tagsql.DB
aliasCache *NodeAliasCache
log *zap.Logger
db tagsql.DB
impl dbutil.Implementation
}

var _ Adapter = &PostgresAdapter{}
Expand Down
12 changes: 6 additions & 6 deletions satellite/metabase/db.go
Expand Up @@ -98,16 +98,16 @@ func Open(ctx context.Context, log *zap.Logger, connstr string, config Config, e
switch impl {
case dbutil.Postgres:
db.adapters = append(db.adapters, &PostgresAdapter{
log: log,
db: rawdb,
aliasCache: db.aliasCache,
log: log,
db: rawdb,
impl: impl,
})
case dbutil.Cockroach:
db.adapters = append(db.adapters, &CockroachAdapter{
PostgresAdapter{
log: log,
db: rawdb,
aliasCache: db.aliasCache,
log: log,
db: rawdb,
impl: impl,
},
})
default:
Expand Down
226 changes: 208 additions & 18 deletions satellite/metabase/loop.go
Expand Up @@ -5,10 +5,13 @@ package metabase

import (
"context"
"errors"
"math"
"time"

"cloud.google.com/go/spanner"
"github.com/zeebo/errs"
"google.golang.org/api/iterator"

"storj.io/common/storj"
"storj.io/common/tagsql"
Expand Down Expand Up @@ -63,7 +66,8 @@ func (db *DB) IterateLoopObjects(ctx context.Context, opts IterateLoopObjects, f
}

it := &loopIterator{
db: db,
db: db,
aliasCache: db.aliasCache,

batchSize: opts.BatchSize,

Expand Down Expand Up @@ -103,7 +107,8 @@ type loopIterator struct {
cursor loopIterateCursor

// failErr is set when either scan or next query fails during iteration.
failErr error
failErr error
aliasCache *NodeAliasCache
}

type loopIterateCursor struct {
Expand Down Expand Up @@ -257,8 +262,14 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments,

loopIteratorBatchSizeLimit.Ensure(&opts.BatchSize)

it := &loopSegmentIterator{
db: db,
return db.ChooseAdapter(uuid.UUID{}).IterateLoopSegments(ctx, db.aliasCache, opts, fn)
}

// IterateLoopSegments implements Adapter.
func (p *PostgresAdapter) IterateLoopSegments(ctx context.Context, aliasCache *NodeAliasCache, opts IterateLoopSegments, fn func(context.Context, LoopSegmentsIterator) error) (err error) {
it := &postgresLoopSegmentIterator{
db: p,
aliasCache: aliasCache,

asOfSystemTime: opts.AsOfSystemTime,
asOfSystemInterval: opts.AsOfSystemInterval,
Expand Down Expand Up @@ -295,9 +306,16 @@ func (db *DB) IterateLoopSegments(ctx context.Context, opts IterateLoopSegments,
return fn(ctx, it)
}

// loopSegmentIterator enables iteration of all segments in metabase.
type loopSegmentIterator struct {
db *DB
type loopSegmentIteratorCursor struct {
StartStreamID uuid.UUID
StartPosition SegmentPosition
EndStreamID uuid.UUID
}

// postgresLoopSegmentIterator enables iteration of all segments in metabase.
type postgresLoopSegmentIterator struct {
db *PostgresAdapter
aliasCache *NodeAliasCache

batchSize int
// batchPieces are reused between result pages to reduce memory consumption
Expand All @@ -314,14 +332,8 @@ type loopSegmentIterator struct {
failErr error
}

type loopSegmentIteratorCursor struct {
StartStreamID uuid.UUID
StartPosition SegmentPosition
EndStreamID uuid.UUID
}

// Next returns true if there was another item and copy it in item.
func (it *loopSegmentIterator) Next(ctx context.Context, item *LoopSegmentEntry) bool {
func (it *postgresLoopSegmentIterator) Next(ctx context.Context, item *LoopSegmentEntry) bool {
next := it.curRows.Next()
if !next {
if it.curIndex < it.batchSize {
Expand Down Expand Up @@ -363,7 +375,7 @@ func (it *loopSegmentIterator) Next(ctx context.Context, item *LoopSegmentEntry)
return true
}

func (it *loopSegmentIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) {
func (it *postgresLoopSegmentIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows, err error) {
defer mon.Task()(&ctx)(&err)

return it.db.db.QueryContext(ctx, `
Expand All @@ -377,7 +389,7 @@ func (it *loopSegmentIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows,
remote_alias_pieces,
placement
FROM segments
`+it.db.asOfTime(it.asOfSystemTime, it.asOfSystemInterval)+`
`+it.db.impl.AsOfSystemInterval(it.asOfSystemInterval)+`
WHERE
(stream_id, position) > ($1, $2) AND stream_id <= $4
ORDER BY (stream_id, position) ASC
Expand All @@ -388,7 +400,7 @@ func (it *loopSegmentIterator) doNextQuery(ctx context.Context) (_ tagsql.Rows,
}

// scanItem scans doNextQuery results into LoopSegmentEntry.
func (it *loopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEntry) error {
func (it *postgresLoopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEntry) error {
err := it.curRows.Scan(
&item.StreamID, &item.Position,
&item.CreatedAt, &item.ExpiresAt, &item.RepairedAt,
Expand All @@ -410,10 +422,188 @@ func (it *loopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEn
it.batchPieces[it.curIndex] = it.batchPieces[it.curIndex][:len(item.AliasPieces)]
}

item.Pieces, err = it.db.aliasCache.convertAliasesToPieces(ctx, item.AliasPieces, it.batchPieces[it.curIndex])
item.Pieces, err = it.aliasCache.convertAliasesToPieces(ctx, item.AliasPieces, it.batchPieces[it.curIndex])
if err != nil {
return Error.New("failed to convert aliases to pieces: %w", err)
}

return nil
}

type spannerLoopSegmentIterator struct {
db *SpannerAdapter

batchSize int
// TODO(spanner) would be nice to have it at some point
// batchPieces are reused between result pages to reduce memory consumption
// batchPieces []Pieces

curIndex int
curRows *spanner.RowIterator
curRow *spanner.Row
cursor loopSegmentIteratorCursor

// failErr is set when either scan or next query fails during iteration.
failErr error
aliasCache *NodeAliasCache
}

// Next returns true if there was another item and copy it in item.
func (it *spannerLoopSegmentIterator) Next(ctx context.Context, item *LoopSegmentEntry) bool {
var err error
it.curRow, err = it.curRows.Next()
next := !errors.Is(err, iterator.Done)
if err != nil && next {
it.failErr = errs.Combine(it.failErr, err)
return false
}

if !next {
if it.curIndex < it.batchSize {
return false
}

rows := it.doNextQuery(ctx)

it.curRows.Stop()

it.curRows = rows
it.curIndex = 0

it.curRow, err = it.curRows.Next()
if err != nil {
if !errors.Is(err, iterator.Done) {
it.failErr = errs.Combine(it.failErr, err)
}
return false
}
}

err = it.scanItem(ctx, item)
if err != nil {
it.failErr = errs.Combine(it.failErr, err)
return false
}

it.curIndex++
it.cursor.StartStreamID = item.StreamID
it.cursor.StartPosition = item.Position
return true
}

func (it *spannerLoopSegmentIterator) doNextQuery(ctx context.Context) (_ *spanner.RowIterator) {
stmt := spanner.Statement{
SQL: `
SELECT
stream_id, position,
created_at, expires_at, repaired_at,
root_piece_id,
encrypted_size,
plain_offset, plain_size,
redundancy,
remote_alias_pieces,
placement
FROM segments
WHERE
(stream_id > @streamid OR (stream_id = @streamid AND position > @position)) AND stream_id <= @endstreamid
ORDER BY stream_id ASC, position ASC
LIMIT @batchsize
`,
Params: map[string]interface{}{
"streamid": it.cursor.StartStreamID.Bytes(),
"position": int64(it.cursor.StartPosition.Encode()),
"endstreamid": it.cursor.EndStreamID.Bytes(),
"batchsize": it.batchSize,
}}
return it.db.client.Single().Query(ctx, stmt)
}

// IterateLoopSegments implements Adapter.
func (s *SpannerAdapter) IterateLoopSegments(ctx context.Context, aliasCache *NodeAliasCache, opts IterateLoopSegments, fn func(context.Context, LoopSegmentsIterator) error) (err error) {
it := &spannerLoopSegmentIterator{
db: s,
aliasCache: aliasCache,

batchSize: opts.BatchSize,

curIndex: 0,
cursor: loopSegmentIteratorCursor{
StartStreamID: opts.StartStreamID,
EndStreamID: opts.EndStreamID,
},
}

if !opts.StartStreamID.IsZero() {
// uses MaxInt32 instead of MaxUint32 because position is an int8 in db.
it.cursor.StartPosition = SegmentPosition{math.MaxInt32, math.MaxInt32}
}
if it.cursor.EndStreamID.IsZero() {
it.cursor.EndStreamID = uuid.Max()
}

it.curRows = it.doNextQuery(ctx)

defer func() {
it.curRows.Stop()
}()

return fn(ctx, it)
}

func (it *spannerLoopSegmentIterator) scanItem(ctx context.Context, item *LoopSegmentEntry) (err error) {
var position int64
var createdAt time.Time
var repairedAt, expiresAt spanner.NullTime
var encryptedSize, plainOffset, plainSize, redundancy, placement int64
var streamID, rootPieceID, remoteAliasPieces []byte
if err := it.curRow.Columns(&streamID, &position,
&createdAt, &repairedAt, &expiresAt,
&rootPieceID,
&encryptedSize, &plainOffset, &plainSize,
&redundancy,
&remoteAliasPieces,
&placement,
); err != nil {
return Error.New("failed to scan segment: %w", err)
}

item.StreamID, err = uuid.FromBytes(streamID)
if err != nil {
return Error.New("failed to scan segment: %w", err)
}
item.Position = SegmentPositionFromEncoded(uint64(position))
item.CreatedAt = createdAt
item.RootPieceID, err = storj.PieceIDFromBytes(rootPieceID)
if err != nil {
return Error.New("failed to scan segment: %w", err)
}
if repairedAt.Valid {
item.RepairedAt = &repairedAt.Time
}
if expiresAt.Valid {
item.ExpiresAt = &expiresAt.Time
}
item.EncryptedSize = int32(encryptedSize)
item.PlainOffset = plainOffset
item.PlainSize = int32(plainSize)
rs := redundancyScheme{RedundancyScheme: &storj.RedundancyScheme{}}
if err := rs.Scan(redundancy); err != nil {
return Error.New("failed to scan segment: %w", err)
}
item.Redundancy = *rs.RedundancyScheme

aliasPieces := AliasPieces{}
err = aliasPieces.SetBytes(remoteAliasPieces)
if err != nil {
return Error.New("failed to scan segment: %w", err)
}
item.AliasPieces = aliasPieces

item.Placement = storj.PlacementConstraint(placement)
item.Pieces, err = it.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Error.New("failed to scan segment: %w", err)
}

return nil
}

0 comments on commit 8dacb7b

Please sign in to comment.