diff --git a/satellite/metabase/adapter.go b/satellite/metabase/adapter.go index 8d2c0100597e..f4fd6db6ba2d 100644 --- a/satellite/metabase/adapter.go +++ b/satellite/metabase/adapter.go @@ -6,6 +6,8 @@ package metabase import ( "context" + "go.uber.org/zap" + "storj.io/common/tagsql" ) @@ -15,11 +17,14 @@ type Adapter interface { BeginObjectNextVersion(context.Context, BeginObjectNextVersion, *Object) error GetObjectLastCommitted(ctx context.Context, opts GetObjectLastCommitted, object *Object) error TestingBeginObjectExactVersion(ctx context.Context, opts BeginObjectExactVersion, object *Object) error + TestingBatchInsertSegments(ctx context.Context, segments []RawSegment) (err error) } // PostgresAdapter uses Cockroach related SQL queries. type PostgresAdapter struct { - db tagsql.DB + log *zap.Logger + db tagsql.DB + aliasCache *NodeAliasCache } var _ Adapter = &PostgresAdapter{} diff --git a/satellite/metabase/aliascache.go b/satellite/metabase/aliascache.go index 67b7d4680ef6..c54f8c30fa2e 100644 --- a/satellite/metabase/aliascache.go +++ b/satellite/metabase/aliascache.go @@ -199,6 +199,11 @@ func (cache *NodeAliasCache) EnsurePiecesToAliases(ctx context.Context, pieces P return aliasPieces, nil } +// reset resets cache to it's initial state. +func (cache *NodeAliasCache) reset() { + cache.latest.Store(NewNodeAliasMap(nil)) +} + // ConvertAliasesToPieces converts alias pieces to pieces. func (cache *NodeAliasCache) ConvertAliasesToPieces(ctx context.Context, aliasPieces AliasPieces) (_ Pieces, err error) { return cache.convertAliasesToPieces(ctx, aliasPieces, make(Pieces, len(aliasPieces))) diff --git a/satellite/metabase/db.go b/satellite/metabase/db.go index 4a7a9fe2d46f..1b64c97bfe7e 100644 --- a/satellite/metabase/db.go +++ b/satellite/metabase/db.go @@ -98,12 +98,16 @@ func Open(ctx context.Context, log *zap.Logger, connstr string, config Config) ( switch impl { case dbutil.Postgres: db.adapters = append(db.adapters, &PostgresAdapter{ - db: rawdb, + log: log, + db: rawdb, + aliasCache: db.aliasCache, }) case dbutil.Cockroach: db.adapters = append(db.adapters, &CockroachAdapter{ PostgresAdapter{ - db: rawdb, + log: log, + db: rawdb, + aliasCache: db.aliasCache, }, }) default: @@ -158,7 +162,7 @@ func (db *DB) DestroyTables(ctx context.Context) error { DROP TABLE IF EXISTS metabase_versions; DROP SEQUENCE IF EXISTS node_alias_seq; `) - db.aliasCache = NewNodeAliasCache(db) + db.aliasCache.reset() return Error.Wrap(err) } diff --git a/satellite/metabase/loop_test.go b/satellite/metabase/loop_test.go index 33eb84ed907a..bcdb10c35ade 100644 --- a/satellite/metabase/loop_test.go +++ b/satellite/metabase/loop_test.go @@ -499,42 +499,27 @@ func TestIterateLoopSegments(t *testing.T) { numberOfSegments := 5 committed := metabasetest.RandObjectStream() - expectedObject := metabasetest.CreateObject(ctx, t, db, committed, byte(numberOfSegments)) expected := make([]metabase.LoopSegmentEntry, numberOfSegments) expectedRaw := make([]metabase.RawSegment, numberOfSegments) for i := 0; i < numberOfSegments; i++ { - entry := metabase.LoopSegmentEntry{ - StreamID: committed.StreamID, - Position: metabase.SegmentPosition{0, uint32(i)}, - RootPieceID: storj.PieceID{1}, - Pieces: metabase.Pieces{{Number: 0, StorageNode: storj.NodeID{2}}}, - CreatedAt: now, - EncryptedSize: 1024, - PlainSize: 512, - PlainOffset: int64(i) * 512, - Redundancy: metabasetest.DefaultRedundancy, - AliasPieces: metabase.AliasPieces([]metabase.AliasPiece{ - {Alias: 1}, - }), - } - expected[i] = entry - expectedRaw[i] = metabase.RawSegment{ - StreamID: entry.StreamID, - Position: entry.Position, - RootPieceID: entry.RootPieceID, - Pieces: entry.Pieces, - CreatedAt: entry.CreatedAt, - EncryptedSize: entry.EncryptedSize, - PlainSize: entry.PlainSize, - PlainOffset: entry.PlainOffset, - Redundancy: entry.Redundancy, - - EncryptedKey: []byte{3}, - EncryptedKeyNonce: []byte{4}, - EncryptedETag: []byte{5}, + rawSegment := metabasetest.DefaultRawSegment(committed, metabase.SegmentPosition{0, uint32(i)}) + expected[i] = metabase.LoopSegmentEntry{ + StreamID: rawSegment.StreamID, + Position: rawSegment.Position, + RootPieceID: rawSegment.RootPieceID, + Pieces: rawSegment.Pieces, + CreatedAt: rawSegment.CreatedAt, + EncryptedSize: rawSegment.EncryptedSize, + PlainSize: rawSegment.PlainSize, + PlainOffset: rawSegment.PlainOffset, + Redundancy: rawSegment.Redundancy, } + expectedRaw[i] = rawSegment } + err := db.TestingBatchInsertSegments(ctx, expectedRaw) + require.NoError(t, err) + { // less segments than limit limit := 10 metabasetest.IterateLoopSegments{ @@ -572,9 +557,6 @@ func TestIterateLoopSegments(t *testing.T) { } metabasetest.Verify{ - Objects: []metabase.RawObject{ - metabase.RawObject(expectedObject), - }, Segments: expectedRaw, }.Check(ctx, t, db) }) diff --git a/satellite/metabase/metabasetest/test.go b/satellite/metabase/metabasetest/test.go index 7fbf96062a88..994f4feb27f8 100644 --- a/satellite/metabase/metabasetest/test.go +++ b/satellite/metabase/metabasetest/test.go @@ -414,7 +414,8 @@ func (step IterateLoopSegments) Check(ctx *testcontext.Context, t testing.TB, db } return bytes.Compare(step.Result[i].StreamID[:], step.Result[j].StreamID[:]) < 0 }) - diff := cmp.Diff(step.Result, result, DefaultTimeDiff()) + // ignore AliasPieces because we won't be always able to predict node aliases for tests + diff := cmp.Diff(step.Result, result, DefaultTimeDiff(), cmpopts.IgnoreFields(metabase.LoopSegmentEntry{}, "AliasPieces")) require.Zero(t, diff) } diff --git a/satellite/metabase/raw.go b/satellite/metabase/raw.go index 3cc116babadf..5ee92c8549e5 100644 --- a/satellite/metabase/raw.go +++ b/satellite/metabase/raw.go @@ -109,7 +109,7 @@ func (db *DB) TestingDeleteAll(ctx context.Context) (err error) { WITH ignore_full_scan_for_test AS (SELECT 1) DELETE FROM node_aliases; WITH ignore_full_scan_for_test AS (SELECT 1) SELECT setval('node_alias_seq', 1, false); `) - db.aliasCache = NewNodeAliasCache(db) + db.aliasCache.reset() return Error.Wrap(err) } @@ -349,3 +349,130 @@ func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err er } return segs, nil } + +// TestingBatchInsertSegments batch inserts segments for testing. +// This implementation does no verification on the correctness of segments. +func (db *DB) TestingBatchInsertSegments(ctx context.Context, segments []RawSegment) (err error) { + return db.ChooseAdapter(uuid.UUID{}).TestingBatchInsertSegments(ctx, segments) +} + +// TestingBatchInsertSegments implements postgres adapter. +func (p *PostgresAdapter) TestingBatchInsertSegments(ctx context.Context, segments []RawSegment) (err error) { + const maxRowsPerCopy = 250000 + + minLength := len(segments) + if maxRowsPerCopy < minLength { + minLength = maxRowsPerCopy + } + + aliases := make([]AliasPieces, 0, minLength) + return Error.Wrap(pgxutil.Conn(ctx, p.db, + func(conn *pgx.Conn) error { + progress, total := 0, len(segments) + for len(segments) > 0 { + batch := segments + if len(batch) > maxRowsPerCopy { + batch = batch[:maxRowsPerCopy] + } + segments = segments[len(batch):] + + aliases = aliases[:len(batch)] + for i, segment := range batch { + aliases[i], err = p.aliasCache.EnsurePiecesToAliases(ctx, segment.Pieces) + if err != nil { + return err + } + } + + source := newCopyFromRawSegments(batch, aliases) + _, err := conn.CopyFrom(ctx, pgx.Identifier{"segments"}, source.Columns(), source) + if err != nil { + return err + } + + progress += len(batch) + p.log.Info("batch insert", zap.Int("progress", progress), zap.Int("total", total)) + } + return err + })) +} + +type copyFromRawSegments struct { + idx int + rows []RawSegment + aliases []AliasPieces + row []any +} + +func newCopyFromRawSegments(rows []RawSegment, aliases []AliasPieces) *copyFromRawSegments { + return ©FromRawSegments{ + rows: rows, + aliases: aliases, + idx: -1, + } +} + +func (ctr *copyFromRawSegments) Next() bool { + ctr.idx++ + return ctr.idx < len(ctr.rows) +} + +func (ctr *copyFromRawSegments) Columns() []string { + return []string{ + "stream_id", + "position", + + "created_at", + "repaired_at", + "expires_at", + + "root_piece_id", + "encrypted_key_nonce", + "encrypted_key", + "encrypted_etag", + + "encrypted_size", + "plain_size", + "plain_offset", + + "redundancy", + "inline_data", + "remote_alias_pieces", + "placement", + } +} + +func (ctr *copyFromRawSegments) Values() ([]any, error) { + obj := &ctr.rows[ctr.idx] + aliases := &ctr.aliases[ctr.idx] + + aliasPieces, err := aliases.Bytes() + if err != nil { + return nil, err + } + ctr.row = append(ctr.row[:0], + obj.StreamID.Bytes(), + obj.Position.Encode(), + + obj.CreatedAt, + obj.RepairedAt, + obj.ExpiresAt, + + obj.RootPieceID.Bytes(), + obj.EncryptedKeyNonce, + obj.EncryptedKey, + obj.EncryptedETag, + + obj.EncryptedSize, + obj.PlainSize, + obj.PlainOffset, + + redundancyScheme{&obj.Redundancy}, + obj.InlineData, + aliasPieces, + obj.Placement, + ) + return ctr.row, nil +} + +func (ctr *copyFromRawSegments) Err() error { return nil } diff --git a/satellite/metabase/raw_test.go b/satellite/metabase/raw_test.go index 83ca9223de0c..54a839f7b1e8 100644 --- a/satellite/metabase/raw_test.go +++ b/satellite/metabase/raw_test.go @@ -5,7 +5,10 @@ package metabase_test import ( "testing" + "time" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/require" "storj.io/common/testcontext" @@ -44,3 +47,26 @@ func TestTestingBatchInsertObjects_RoundTrip(t *testing.T) { require.Equal(t, validObjects, insertedObjects) }) } + +func TestTestingBatchInsertSegments(t *testing.T) { + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + obj := metabasetest.RandObjectStream() + + rawSegments := make([]metabase.RawSegment, 10) + for i := range rawSegments { + rawSegments[i] = metabasetest.DefaultRawSegment(obj, metabase.SegmentPosition{ + Part: 1, + Index: uint32(i) + 1, + }) + } + + err := db.TestingBatchInsertSegments(ctx, rawSegments) + require.NoError(t, err) + + insertedSegments, err := db.TestingAllSegments(ctx) + require.NoError(t, err) + + require.Zero(t, cmp.Diff(rawSegments, metabasetest.SegmentsToRaw(insertedSegments), + cmpopts.EquateApproxTime(1*time.Second))) + }) +}