Skip to content

Commit

Permalink
satellite/metabase: add TestingBatchInsertSegments
Browse files Browse the repository at this point in the history
Convenient method to insert many segments into DB.

Change-Id: Ibce3110b68a5437a878d7b5f23f95a97ae9bde0c
  • Loading branch information
mniewrzal authored and Storj Robot committed Apr 9, 2024
1 parent daf8f74 commit 6073ca7
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 39 deletions.
7 changes: 6 additions & 1 deletion satellite/metabase/adapter.go
Expand Up @@ -6,6 +6,8 @@ package metabase
import (
"context"

"go.uber.org/zap"

"storj.io/common/tagsql"
)

Expand All @@ -15,11 +17,14 @@ type Adapter interface {
BeginObjectNextVersion(context.Context, BeginObjectNextVersion, *Object) error
GetObjectLastCommitted(ctx context.Context, opts GetObjectLastCommitted, object *Object) error
TestingBeginObjectExactVersion(ctx context.Context, opts BeginObjectExactVersion, object *Object) error
TestingBatchInsertSegments(ctx context.Context, segments []RawSegment) (err error)
}

// PostgresAdapter uses Cockroach related SQL queries.
type PostgresAdapter struct {
db tagsql.DB
log *zap.Logger
db tagsql.DB
aliasCache *NodeAliasCache
}

var _ Adapter = &PostgresAdapter{}
Expand Down
5 changes: 5 additions & 0 deletions satellite/metabase/aliascache.go
Expand Up @@ -199,6 +199,11 @@ func (cache *NodeAliasCache) EnsurePiecesToAliases(ctx context.Context, pieces P
return aliasPieces, nil
}

// reset resets cache to it's initial state.
func (cache *NodeAliasCache) reset() {
cache.latest.Store(NewNodeAliasMap(nil))
}

// ConvertAliasesToPieces converts alias pieces to pieces.
func (cache *NodeAliasCache) ConvertAliasesToPieces(ctx context.Context, aliasPieces AliasPieces) (_ Pieces, err error) {
return cache.convertAliasesToPieces(ctx, aliasPieces, make(Pieces, len(aliasPieces)))
Expand Down
10 changes: 7 additions & 3 deletions satellite/metabase/db.go
Expand Up @@ -98,12 +98,16 @@ func Open(ctx context.Context, log *zap.Logger, connstr string, config Config) (
switch impl {
case dbutil.Postgres:
db.adapters = append(db.adapters, &PostgresAdapter{
db: rawdb,
log: log,
db: rawdb,
aliasCache: db.aliasCache,
})
case dbutil.Cockroach:
db.adapters = append(db.adapters, &CockroachAdapter{
PostgresAdapter{
db: rawdb,
log: log,
db: rawdb,
aliasCache: db.aliasCache,
},
})
default:
Expand Down Expand Up @@ -158,7 +162,7 @@ func (db *DB) DestroyTables(ctx context.Context) error {
DROP TABLE IF EXISTS metabase_versions;
DROP SEQUENCE IF EXISTS node_alias_seq;
`)
db.aliasCache = NewNodeAliasCache(db)
db.aliasCache.reset()
return Error.Wrap(err)
}

Expand Down
48 changes: 15 additions & 33 deletions satellite/metabase/loop_test.go
Expand Up @@ -499,42 +499,27 @@ func TestIterateLoopSegments(t *testing.T) {
numberOfSegments := 5

committed := metabasetest.RandObjectStream()
expectedObject := metabasetest.CreateObject(ctx, t, db, committed, byte(numberOfSegments))
expected := make([]metabase.LoopSegmentEntry, numberOfSegments)
expectedRaw := make([]metabase.RawSegment, numberOfSegments)
for i := 0; i < numberOfSegments; i++ {
entry := metabase.LoopSegmentEntry{
StreamID: committed.StreamID,
Position: metabase.SegmentPosition{0, uint32(i)},
RootPieceID: storj.PieceID{1},
Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}},
CreatedAt: now,
EncryptedSize: 1024,
PlainSize: 512,
PlainOffset: int64(i) * 512,
Redundancy: metabasetest.DefaultRedundancy,
AliasPieces: metabase.AliasPieces([]metabase.AliasPiece{
{Alias: 1},
}),
}
expected[i] = entry
expectedRaw[i] = metabase.RawSegment{
StreamID: entry.StreamID,
Position: entry.Position,
RootPieceID: entry.RootPieceID,
Pieces: entry.Pieces,
CreatedAt: entry.CreatedAt,
EncryptedSize: entry.EncryptedSize,
PlainSize: entry.PlainSize,
PlainOffset: entry.PlainOffset,
Redundancy: entry.Redundancy,

EncryptedKey: []byte{3},
EncryptedKeyNonce: []byte{4},
EncryptedETag: []byte{5},
rawSegment := metabasetest.DefaultRawSegment(committed, metabase.SegmentPosition{0, uint32(i)})
expected[i] = metabase.LoopSegmentEntry{
StreamID: rawSegment.StreamID,
Position: rawSegment.Position,
RootPieceID: rawSegment.RootPieceID,
Pieces: rawSegment.Pieces,
CreatedAt: rawSegment.CreatedAt,
EncryptedSize: rawSegment.EncryptedSize,
PlainSize: rawSegment.PlainSize,
PlainOffset: rawSegment.PlainOffset,
Redundancy: rawSegment.Redundancy,
}
expectedRaw[i] = rawSegment
}

err := db.TestingBatchInsertSegments(ctx, expectedRaw)
require.NoError(t, err)

{ // less segments than limit
limit := 10
metabasetest.IterateLoopSegments{
Expand Down Expand Up @@ -572,9 +557,6 @@ func TestIterateLoopSegments(t *testing.T) {
}

metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(expectedObject),
},
Segments: expectedRaw,
}.Check(ctx, t, db)
})
Expand Down
3 changes: 2 additions & 1 deletion satellite/metabase/metabasetest/test.go
Expand Up @@ -414,7 +414,8 @@ func (step IterateLoopSegments) Check(ctx *testcontext.Context, t testing.TB, db
}
return bytes.Compare(step.Result[i].StreamID[:], step.Result[j].StreamID[:]) < 0
})
diff := cmp.Diff(step.Result, result, DefaultTimeDiff())
// ignore AliasPieces because we won't be always able to predict node aliases for tests
diff := cmp.Diff(step.Result, result, DefaultTimeDiff(), cmpopts.IgnoreFields(metabase.LoopSegmentEntry{}, "AliasPieces"))
require.Zero(t, diff)
}

Expand Down
129 changes: 128 additions & 1 deletion satellite/metabase/raw.go
Expand Up @@ -109,7 +109,7 @@ func (db *DB) TestingDeleteAll(ctx context.Context) (err error) {
WITH ignore_full_scan_for_test AS (SELECT 1) DELETE FROM node_aliases;
WITH ignore_full_scan_for_test AS (SELECT 1) SELECT setval('node_alias_seq', 1, false);
`)
db.aliasCache = NewNodeAliasCache(db)
db.aliasCache.reset()
return Error.Wrap(err)
}

Expand Down Expand Up @@ -349,3 +349,130 @@ func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err er
}
return segs, nil
}

// TestingBatchInsertSegments batch inserts segments for testing.
// This implementation does no verification on the correctness of segments.
func (db *DB) TestingBatchInsertSegments(ctx context.Context, segments []RawSegment) (err error) {
return db.ChooseAdapter(uuid.UUID{}).TestingBatchInsertSegments(ctx, segments)
}

// TestingBatchInsertSegments implements postgres adapter.
func (p *PostgresAdapter) TestingBatchInsertSegments(ctx context.Context, segments []RawSegment) (err error) {
const maxRowsPerCopy = 250000

minLength := len(segments)
if maxRowsPerCopy < minLength {
minLength = maxRowsPerCopy
}

aliases := make([]AliasPieces, 0, minLength)
return Error.Wrap(pgxutil.Conn(ctx, p.db,
func(conn *pgx.Conn) error {
progress, total := 0, len(segments)
for len(segments) > 0 {
batch := segments
if len(batch) > maxRowsPerCopy {
batch = batch[:maxRowsPerCopy]
}
segments = segments[len(batch):]

aliases = aliases[:len(batch)]
for i, segment := range batch {
aliases[i], err = p.aliasCache.EnsurePiecesToAliases(ctx, segment.Pieces)
if err != nil {
return err
}
}

source := newCopyFromRawSegments(batch, aliases)
_, err := conn.CopyFrom(ctx, pgx.Identifier{"segments"}, source.Columns(), source)
if err != nil {
return err
}

progress += len(batch)
p.log.Info("batch insert", zap.Int("progress", progress), zap.Int("total", total))
}
return err
}))
}

type copyFromRawSegments struct {
idx int
rows []RawSegment
aliases []AliasPieces
row []any
}

func newCopyFromRawSegments(rows []RawSegment, aliases []AliasPieces) *copyFromRawSegments {
return &copyFromRawSegments{
rows: rows,
aliases: aliases,
idx: -1,
}
}

func (ctr *copyFromRawSegments) Next() bool {
ctr.idx++
return ctr.idx < len(ctr.rows)
}

func (ctr *copyFromRawSegments) Columns() []string {
return []string{
"stream_id",
"position",

"created_at",
"repaired_at",
"expires_at",

"root_piece_id",
"encrypted_key_nonce",
"encrypted_key",
"encrypted_etag",

"encrypted_size",
"plain_size",
"plain_offset",

"redundancy",
"inline_data",
"remote_alias_pieces",
"placement",
}
}

func (ctr *copyFromRawSegments) Values() ([]any, error) {
obj := &ctr.rows[ctr.idx]
aliases := &ctr.aliases[ctr.idx]

aliasPieces, err := aliases.Bytes()
if err != nil {
return nil, err
}
ctr.row = append(ctr.row[:0],
obj.StreamID.Bytes(),
obj.Position.Encode(),

obj.CreatedAt,
obj.RepairedAt,
obj.ExpiresAt,

obj.RootPieceID.Bytes(),
obj.EncryptedKeyNonce,
obj.EncryptedKey,
obj.EncryptedETag,

obj.EncryptedSize,
obj.PlainSize,
obj.PlainOffset,

redundancyScheme{&obj.Redundancy},
obj.InlineData,
aliasPieces,
obj.Placement,
)
return ctr.row, nil
}

func (ctr *copyFromRawSegments) Err() error { return nil }
26 changes: 26 additions & 0 deletions satellite/metabase/raw_test.go
Expand Up @@ -5,7 +5,10 @@ package metabase_test

import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/require"

"storj.io/common/testcontext"
Expand Down Expand Up @@ -44,3 +47,26 @@ func TestTestingBatchInsertObjects_RoundTrip(t *testing.T) {
require.Equal(t, validObjects, insertedObjects)
})
}

func TestTestingBatchInsertSegments(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := metabasetest.RandObjectStream()

rawSegments := make([]metabase.RawSegment, 10)
for i := range rawSegments {
rawSegments[i] = metabasetest.DefaultRawSegment(obj, metabase.SegmentPosition{
Part: 1,
Index: uint32(i) + 1,
})
}

err := db.TestingBatchInsertSegments(ctx, rawSegments)
require.NoError(t, err)

insertedSegments, err := db.TestingAllSegments(ctx)
require.NoError(t, err)

require.Zero(t, cmp.Diff(rawSegments, metabasetest.SegmentsToRaw(insertedSegments),
cmpopts.EquateApproxTime(1*time.Second)))
})
}

0 comments on commit 6073ca7

Please sign in to comment.