From f40805763e55d8efe2af7879fd6404de355b1d9d Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Fri, 28 Jul 2023 11:30:44 +0200 Subject: [PATCH] satellite/metabase: adjust segment commit to use pending_objects table Change is adjusting CommitSegment to check pending object existence in `pending_objects` or `objects` table. Satellite stream id is used to determine if we need to use `pending_objects` or `objects` table. General goal is to support both tables until `objects` table will be free from pending objects. Whenever it will be needed code will be supporting both tables at once. Part of https://github.com/storj/storj/issues/6046 Change-Id: I954444a53b4733ae6fc909420573242b02746787 --- satellite/metabase/commit.go | 220 +++++--- satellite/metabase/commit_test.go | 754 ++++++++++++++++++++++++- satellite/metainfo/endpoint_segment.go | 4 + 3 files changed, 880 insertions(+), 98 deletions(-) diff --git a/satellite/metabase/commit.go b/satellite/metabase/commit.go index 0a055e6411e8..027ea9c7eaf6 100644 --- a/satellite/metabase/commit.go +++ b/satellite/metabase/commit.go @@ -338,6 +338,8 @@ type CommitSegment struct { Pieces Pieces Placement storj.PlacementConstraint + + UsePendingObjectsTable bool } // CommitSegment commits segment to the database. @@ -378,47 +380,89 @@ func (db *DB) CommitSegment(ctx context.Context, opts CommitSegment) (err error) return Error.New("unable to convert pieces to aliases: %w", err) } + // second part will be removed when there will be no pending_objects in objects table. // Verify that object exists and is partial. - _, err = db.db.ExecContext(ctx, ` - INSERT INTO segments ( - stream_id, position, expires_at, - root_piece_id, encrypted_key_nonce, encrypted_key, - encrypted_size, plain_offset, plain_size, encrypted_etag, - redundancy, - remote_alias_pieces, - placement - ) VALUES ( - (SELECT stream_id - FROM objects WHERE - project_id = $12 AND - bucket_name = $13 AND - object_key = $14 AND - version = $15 AND - stream_id = $16 AND - status = `+pendingStatus+ - ` ), $1, $2, - $3, $4, $5, - $6, $7, $8, $9, - $10, - $11, - $17 - ) - ON CONFLICT(stream_id, position) - DO UPDATE SET - expires_at = $2, - root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5, - encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9, - redundancy = $10, - remote_alias_pieces = $11, - placement = $17 + if opts.UsePendingObjectsTable { + _, err = db.db.ExecContext(ctx, ` + INSERT INTO segments ( + stream_id, position, expires_at, + root_piece_id, encrypted_key_nonce, encrypted_key, + encrypted_size, plain_offset, plain_size, encrypted_etag, + redundancy, + remote_alias_pieces, + placement + ) VALUES ( + (SELECT stream_id + FROM pending_objects WHERE + project_id = $12 AND + bucket_name = $13 AND + object_key = $14 AND + stream_id = $15 + ), $1, $2, + $3, $4, $5, + $6, $7, $8, $9, + $10, + $11, + $16 + ) + ON CONFLICT(stream_id, position) + DO UPDATE SET + expires_at = $2, + root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5, + encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9, + redundancy = $10, + remote_alias_pieces = $11, + placement = $16 `, opts.Position, opts.ExpiresAt, - opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey, - opts.EncryptedSize, opts.PlainOffset, opts.PlainSize, opts.EncryptedETag, - redundancyScheme{&opts.Redundancy}, - aliasPieces, - opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID, - opts.Placement, - ) + opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey, + opts.EncryptedSize, opts.PlainOffset, opts.PlainSize, opts.EncryptedETag, + redundancyScheme{&opts.Redundancy}, + aliasPieces, + opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID, + opts.Placement, + ) + } else { + _, err = db.db.ExecContext(ctx, ` + INSERT INTO segments ( + stream_id, position, expires_at, + root_piece_id, encrypted_key_nonce, encrypted_key, + encrypted_size, plain_offset, plain_size, encrypted_etag, + redundancy, + remote_alias_pieces, + placement + ) VALUES ( + (SELECT stream_id + FROM objects WHERE + project_id = $12 AND + bucket_name = $13 AND + object_key = $14 AND + version = $15 AND + stream_id = $16 AND + status = `+pendingStatus+ + ` ), $1, $2, + $3, $4, $5, + $6, $7, $8, $9, + $10, + $11, + $17 + ) + ON CONFLICT(stream_id, position) + DO UPDATE SET + expires_at = $2, + root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5, + encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9, + redundancy = $10, + remote_alias_pieces = $11, + placement = $17 + `, opts.Position, opts.ExpiresAt, + opts.RootPieceID, opts.EncryptedKeyNonce, opts.EncryptedKey, + opts.EncryptedSize, opts.PlainOffset, opts.PlainSize, opts.EncryptedETag, + redundancyScheme{&opts.Redundancy}, + aliasPieces, + opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID, + opts.Placement, + ) + } if err != nil { if code := pgerrcode.FromError(err); code == pgxerrcode.NotNullViolation { return ErrPendingObjectMissing.New("") @@ -448,6 +492,8 @@ type CommitInlineSegment struct { EncryptedETag []byte InlineData []byte + + UsePendingObjectsTable bool } // CommitInlineSegment commits inline segment to the database. @@ -472,39 +518,71 @@ func (db *DB) CommitInlineSegment(ctx context.Context, opts CommitInlineSegment) return ErrInvalidRequest.New("PlainOffset negative") } - // Verify that object exists and is partial. - _, err = db.db.ExecContext(ctx, ` - INSERT INTO segments ( - stream_id, position, expires_at, - root_piece_id, encrypted_key_nonce, encrypted_key, - encrypted_size, plain_offset, plain_size, encrypted_etag, - inline_data - ) VALUES ( - (SELECT stream_id - FROM objects WHERE - project_id = $11 AND - bucket_name = $12 AND - object_key = $13 AND - version = $14 AND - stream_id = $15 AND - status = `+pendingStatus+ - ` ), $1, $2, - $3, $4, $5, - $6, $7, $8, $9, - $10 - ) - ON CONFLICT(stream_id, position) - DO UPDATE SET - expires_at = $2, - root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5, - encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9, - inline_data = $10 + if opts.UsePendingObjectsTable { + _, err = db.db.ExecContext(ctx, ` + INSERT INTO segments ( + stream_id, position, expires_at, + root_piece_id, encrypted_key_nonce, encrypted_key, + encrypted_size, plain_offset, plain_size, encrypted_etag, + inline_data + ) VALUES ( + (SELECT stream_id + FROM pending_objects WHERE + project_id = $11 AND + bucket_name = $12 AND + object_key = $13 AND + stream_id = $14 + ), $1, $2, + $3, $4, $5, + $6, $7, $8, $9, + $10 + ) + ON CONFLICT(stream_id, position) + DO UPDATE SET + expires_at = $2, + root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5, + encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9, + inline_data = $10 `, opts.Position, opts.ExpiresAt, - storj.PieceID{}, opts.EncryptedKeyNonce, opts.EncryptedKey, - len(opts.InlineData), opts.PlainOffset, opts.PlainSize, opts.EncryptedETag, - opts.InlineData, - opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID, - ) + storj.PieceID{}, opts.EncryptedKeyNonce, opts.EncryptedKey, + len(opts.InlineData), opts.PlainOffset, opts.PlainSize, opts.EncryptedETag, + opts.InlineData, + opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID, + ) + } else { + _, err = db.db.ExecContext(ctx, ` + INSERT INTO segments ( + stream_id, position, expires_at, + root_piece_id, encrypted_key_nonce, encrypted_key, + encrypted_size, plain_offset, plain_size, encrypted_etag, + inline_data + ) VALUES ( + (SELECT stream_id + FROM objects WHERE + project_id = $11 AND + bucket_name = $12 AND + object_key = $13 AND + version = $14 AND + stream_id = $15 AND + status = `+pendingStatus+ + ` ), $1, $2, + $3, $4, $5, + $6, $7, $8, $9, + $10 + ) + ON CONFLICT(stream_id, position) + DO UPDATE SET + expires_at = $2, + root_piece_id = $3, encrypted_key_nonce = $4, encrypted_key = $5, + encrypted_size = $6, plain_offset = $7, plain_size = $8, encrypted_etag = $9, + inline_data = $10 + `, opts.Position, opts.ExpiresAt, + storj.PieceID{}, opts.EncryptedKeyNonce, opts.EncryptedKey, + len(opts.InlineData), opts.PlainOffset, opts.PlainSize, opts.EncryptedETag, + opts.InlineData, + opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID, + ) + } if err != nil { if code := pgerrcode.FromError(err); code == pgxerrcode.NotNullViolation { return ErrPendingObjectMissing.New("") diff --git a/satellite/metabase/commit_test.go b/satellite/metabase/commit_test.go index afaf47531a05..10bb53021a23 100644 --- a/satellite/metabase/commit_test.go +++ b/satellite/metabase/commit_test.go @@ -1137,6 +1137,8 @@ func TestBeginSegment(t *testing.T) { }.Check(ctx, t, db) }) + // TODO those test are copies of tests above with some adjustments to test pending_objects table. + // we will be able to delete those tests when we will start supporting only pending_objects table. t.Run("use pending objects table", func(t *testing.T) { obj.Version = metabase.NextVersion t.Run("pending object missing", func(t *testing.T) { @@ -1952,6 +1954,349 @@ func TestCommitSegment(t *testing.T) { }, }.Check(ctx, t, db) }) + + // TODO those test are copies of tests above with some adjustments to test pending_objects table. + // we will be able to delete those tests when we will start supporting only pending_objects table. + t.Run("use pending objects table", func(t *testing.T) { + obj.Version = metabase.NextVersion + t.Run("duplicate", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + now1 := time.Now() + zombieDeadline := now1.Add(24 * time.Hour) + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + UsePendingObjectsTable: true, + }, + Version: 1, + }.Check(ctx, t, db) + + rootPieceID := testrand.PieceID() + pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}} + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + + metabasetest.BeginSegment{ + Opts: metabase.BeginSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: rootPieceID, + Pieces: pieces, + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: rootPieceID, + Pieces: pieces, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: metabasetest.DefaultRedundancy, + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: rootPieceID, + Pieces: pieces, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: metabasetest.DefaultRedundancy, + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + PendingObjects: []metabase.RawPendingObject{ + { + PendingObjectStream: metabasetest.ObjectStreamToPending(obj), + CreatedAt: now1, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + CreatedAt: now, + + RootPieceID: rootPieceID, + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainOffset: 0, + PlainSize: 512, + + Redundancy: metabasetest.DefaultRedundancy, + + Pieces: pieces, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("overwrite", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + now1 := time.Now() + zombieDeadline := now1.Add(24 * time.Hour) + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + UsePendingObjectsTable: true, + }, + Version: 1, + }.Check(ctx, t, db) + + rootPieceID1 := testrand.PieceID() + rootPieceID2 := testrand.PieceID() + pieces1 := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}} + pieces2 := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}} + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + + metabasetest.BeginSegment{ + Opts: metabase.BeginSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: rootPieceID1, + Pieces: pieces1, + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: rootPieceID1, + Pieces: pieces1, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: metabasetest.DefaultRedundancy, + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + RootPieceID: rootPieceID2, + Pieces: pieces2, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: metabasetest.DefaultRedundancy, + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + PendingObjects: []metabase.RawPendingObject{ + { + PendingObjectStream: metabasetest.ObjectStreamToPending(obj), + CreatedAt: now1, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + CreatedAt: now, + + RootPieceID: rootPieceID2, + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainOffset: 0, + PlainSize: 512, + + Redundancy: metabasetest.DefaultRedundancy, + + Pieces: pieces2, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("commit segment of object with expires at", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + rootPieceID := testrand.PieceID() + pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}} + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + + now := time.Now() + expectedExpiresAt := now.Add(33 * time.Hour) + zombieDeadline := now.Add(24 * time.Hour) + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + ExpiresAt: &expectedExpiresAt, + UsePendingObjectsTable: true, + }, + Version: 1, + }.Check(ctx, t, db) + + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + ExpiresAt: &expectedExpiresAt, + RootPieceID: rootPieceID, + Pieces: pieces, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: metabasetest.DefaultRedundancy, + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + PendingObjects: []metabase.RawPendingObject{ + { + PendingObjectStream: metabasetest.ObjectStreamToPending(obj), + CreatedAt: now, + ExpiresAt: &expectedExpiresAt, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + CreatedAt: now, + ExpiresAt: &expectedExpiresAt, + + RootPieceID: rootPieceID, + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainOffset: 0, + PlainSize: 512, + + Redundancy: metabasetest.DefaultRedundancy, + + Pieces: pieces, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("commit segment of pending object", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + rootPieceID := testrand.PieceID() + pieces := metabase.Pieces{{Number: 0, StorageNode: testrand.NodeID()}} + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + encryptedETag := testrand.Bytes(32) + + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + UsePendingObjectsTable: true, + }, + Version: 1, + }.Check(ctx, t, db) + + metabasetest.CommitSegment{ + Opts: metabase.CommitSegment{ + ObjectStream: obj, + RootPieceID: rootPieceID, + Pieces: pieces, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainSize: 512, + PlainOffset: 0, + Redundancy: metabasetest.DefaultRedundancy, + EncryptedETag: encryptedETag, + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + PendingObjects: []metabase.RawPendingObject{ + { + PendingObjectStream: metabasetest.ObjectStreamToPending(obj), + CreatedAt: now, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + CreatedAt: now, + + RootPieceID: rootPieceID, + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + EncryptedSize: 1024, + PlainOffset: 0, + PlainSize: 512, + EncryptedETag: encryptedETag, + + Redundancy: metabasetest.DefaultRedundancy, + + Pieces: pieces, + }, + }, + }.Check(ctx, t, db) + }) + }) }) } @@ -2047,6 +2392,29 @@ func TestCommitInlineSegment(t *testing.T) { }.Check(ctx, t, db) }) + t.Run("commit inline segment of missing object", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + + metabasetest.CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + InlineData: []byte{1, 2, 3}, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainSize: 512, + PlainOffset: 0, + }, + ErrClass: &metabase.ErrPendingObjectMissing, + }.Check(ctx, t, db) + + metabasetest.Verify{}.Check(ctx, t, db) + }) + t.Run("duplicate", func(t *testing.T) { defer metabasetest.DeleteAll{}.Check(ctx, t, db) @@ -2197,38 +2565,15 @@ func TestCommitInlineSegment(t *testing.T) { }.Check(ctx, t, db) }) - t.Run("commit inline segment of missing object", func(t *testing.T) { + t.Run("commit segment of committed object", func(t *testing.T) { defer metabasetest.DeleteAll{}.Check(ctx, t, db) encryptedKey := testrand.Bytes(32) encryptedKeyNonce := testrand.Bytes(32) - metabasetest.CommitInlineSegment{ - Opts: metabase.CommitInlineSegment{ - ObjectStream: obj, - InlineData: []byte{1, 2, 3}, - - EncryptedKey: encryptedKey, - EncryptedKeyNonce: encryptedKeyNonce, - - PlainSize: 512, - PlainOffset: 0, - }, - ErrClass: &metabase.ErrPendingObjectMissing, - }.Check(ctx, t, db) - - metabasetest.Verify{}.Check(ctx, t, db) - }) - - t.Run("commit segment of committed object", func(t *testing.T) { - defer metabasetest.DeleteAll{}.Check(ctx, t, db) - - encryptedKey := testrand.Bytes(32) - encryptedKeyNonce := testrand.Bytes(32) - - now := time.Now() - - metabasetest.CreateObject(ctx, t, db, obj, 0) + now := time.Now() + + metabasetest.CreateObject(ctx, t, db, obj, 0) metabasetest.CommitInlineSegment{ Opts: metabase.CommitInlineSegment{ ObjectStream: obj, @@ -2439,6 +2784,361 @@ func TestCommitInlineSegment(t *testing.T) { }, }.Check(ctx, t, db) }) + + // TODO those test are copies of tests above with some adjustments to test pending_objects table. + // we will be able to delete those tests when we will start supporting only pending_objects table. + t.Run("use pending objects table", func(t *testing.T) { + obj.Version = metabase.NextVersion + t.Run("duplicate", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + UsePendingObjectsTable: true, + }, + Version: 1, + }.Check(ctx, t, db) + + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + + metabasetest.CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + InlineData: []byte{1, 2, 3}, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainSize: 512, + PlainOffset: 0, + + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + + metabasetest.CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + InlineData: []byte{1, 2, 3}, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainSize: 512, + PlainOffset: 0, + + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + PendingObjects: []metabase.RawPendingObject{ + { + PendingObjectStream: metabasetest.ObjectStreamToPending(obj), + CreatedAt: now, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + CreatedAt: now, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainOffset: 0, + PlainSize: 512, + + InlineData: []byte{1, 2, 3}, + EncryptedSize: 3, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("overwrite", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + UsePendingObjectsTable: true, + }, + Version: 1, + }.Check(ctx, t, db) + + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + + metabasetest.CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + InlineData: []byte{1, 2, 3}, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainSize: 512, + PlainOffset: 0, + + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + + metabasetest.CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + InlineData: []byte{4, 5, 6}, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainSize: 512, + PlainOffset: 0, + + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + PendingObjects: []metabase.RawPendingObject{ + { + PendingObjectStream: metabasetest.ObjectStreamToPending(obj), + CreatedAt: now, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + Position: metabase.SegmentPosition{Part: 0, Index: 0}, + CreatedAt: now, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainOffset: 0, + PlainSize: 512, + + InlineData: []byte{4, 5, 6}, + EncryptedSize: 3, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("commit empty segment of pending object", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + encryptedETag := testrand.Bytes(32) + + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + UsePendingObjectsTable: true, + }, + Version: 1, + }.Check(ctx, t, db) + + metabasetest.CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainSize: 0, + PlainOffset: 0, + EncryptedETag: encryptedETag, + + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + PendingObjects: []metabase.RawPendingObject{ + { + PendingObjectStream: metabasetest.ObjectStreamToPending(obj), + CreatedAt: now, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + CreatedAt: now, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainOffset: 0, + PlainSize: 0, + + EncryptedSize: 0, + EncryptedETag: encryptedETag, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("commit segment of pending object", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + encryptedETag := testrand.Bytes(32) + + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + UsePendingObjectsTable: true, + }, + Version: 1, + }.Check(ctx, t, db) + + metabasetest.CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + InlineData: []byte{1, 2, 3}, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainSize: 512, + PlainOffset: 0, + EncryptedETag: encryptedETag, + + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + PendingObjects: []metabase.RawPendingObject{ + { + PendingObjectStream: metabasetest.ObjectStreamToPending(obj), + CreatedAt: now, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + CreatedAt: now, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainOffset: 0, + PlainSize: 512, + + InlineData: []byte{1, 2, 3}, + EncryptedSize: 3, + EncryptedETag: encryptedETag, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("commit segment of object with expires at", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + encryptedKey := testrand.Bytes(32) + encryptedKeyNonce := testrand.Bytes(32) + encryptedETag := testrand.Bytes(32) + + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + expectedExpiresAt := now.Add(33 * time.Hour) + metabasetest.BeginObjectNextVersion{ + Opts: metabase.BeginObjectNextVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + ExpiresAt: &expectedExpiresAt, + UsePendingObjectsTable: true, + }, + Version: 1, + }.Check(ctx, t, db) + + metabasetest.CommitInlineSegment{ + Opts: metabase.CommitInlineSegment{ + ObjectStream: obj, + ExpiresAt: &expectedExpiresAt, + InlineData: []byte{1, 2, 3}, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainSize: 512, + PlainOffset: 0, + EncryptedETag: encryptedETag, + + UsePendingObjectsTable: true, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + PendingObjects: []metabase.RawPendingObject{ + { + PendingObjectStream: metabasetest.ObjectStreamToPending(obj), + CreatedAt: now, + ExpiresAt: &expectedExpiresAt, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + Segments: []metabase.RawSegment{ + { + StreamID: obj.StreamID, + CreatedAt: now, + ExpiresAt: &expectedExpiresAt, + + EncryptedKey: encryptedKey, + EncryptedKeyNonce: encryptedKeyNonce, + + PlainOffset: 0, + PlainSize: 512, + + InlineData: []byte{1, 2, 3}, + EncryptedSize: 3, + EncryptedETag: encryptedETag, + }, + }, + }.Check(ctx, t, db) + }) + }) }) } diff --git a/satellite/metainfo/endpoint_segment.go b/satellite/metainfo/endpoint_segment.go index f89ee33d635b..d9e6fc8c5238 100644 --- a/satellite/metainfo/endpoint_segment.go +++ b/satellite/metainfo/endpoint_segment.go @@ -373,6 +373,8 @@ func (endpoint *Endpoint) CommitSegment(ctx context.Context, req *pb.SegmentComm Redundancy: rs, Pieces: pieces, Placement: storj.PlacementConstraint(streamID.Placement), + + UsePendingObjectsTable: streamID.UsePendingObjectsTable, } err = endpoint.validateRemoteSegment(ctx, mbCommitSegment, originalLimits) @@ -488,6 +490,8 @@ func (endpoint *Endpoint) MakeInlineSegment(ctx context.Context, req *pb.Segment EncryptedETag: req.EncryptedETag, InlineData: req.EncryptedInlineData, + + UsePendingObjectsTable: streamID.UsePendingObjectsTable, }) if err != nil { return nil, endpoint.convertMetabaseErr(err)