Skip to content

Commit

Permalink
satellite/metabase: return object metadata with FinishCopyObject
Browse files Browse the repository at this point in the history
Updates metadata and metainfo to return object metadata with
FinishCopyObject request.

#4474

Change-Id: I32cba5c20a943272e9b5964df1b3d6463ad212dc
  • Loading branch information
mniewrzal authored and Fadila82 committed Feb 25, 2022
1 parent 332e6af commit c0297ba
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 90 deletions.
100 changes: 53 additions & 47 deletions satellite/metabase/copy_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,23 +143,23 @@ func (finishCopy FinishCopyObject) Verify() error {
// FinishCopyObject accepts new encryption keys for copied object and insert the corresponding new object ObjectKey and segments EncryptedKey.
// TODO should be in one transaction.
// TODO handle the case when the source and destination encrypted object keys are the same.
func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (err error) {
func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (object Object, err error) {
defer mon.Task()(&ctx)(&err)

if err := opts.Verify(); err != nil {
return err
return Object{}, err
}

originalObject, err := db.GetObjectExactVersion(ctx, GetObjectExactVersion{
opts.Version,
opts.Location(),
})
if err != nil {
return errs.Wrap(err)
return Object{}, errs.Wrap(err)
}

if int(originalObject.SegmentCount) != len(opts.NewSegmentKeys) {
return ErrInvalidRequest.New("wrong amount of segments keys received (received %d, need %d)", originalObject.SegmentCount, len(opts.NewSegmentKeys))
return Object{}, ErrInvalidRequest.New("wrong amount of segments keys received (received %d, need %d)", originalObject.SegmentCount, len(opts.NewSegmentKeys))
}

var newSegmentKeys struct {
Expand Down Expand Up @@ -219,33 +219,33 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (err
return nil
})
if err != nil {
return Error.New("unable to copy object: %w", err)
return Object{}, Error.New("unable to copy object: %w", err)
}

for index := range segments {
if newSegmentKeys.Positions[index] != int64(segments[index].Position.Encode()) {
return Error.New("missing new segment keys for segment %d", int64(segments[index].Position.Encode()))
return Object{}, Error.New("missing new segment keys for segment %d", int64(segments[index].Position.Encode()))
}
}

err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) {
// TODO we need to handle metadata correctly (copy from original object or replace)
_, err = db.db.ExecContext(ctx, `
INSERT INTO objects (
project_id, bucket_name, object_key, version, stream_id,
expires_at, status, segment_count,
encryption,
encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
zombie_deletion_deadline
) VALUES (
$1, $2, $3, $4, $5,
$6,`+committedStatus+`, $7,
$8,
$9, $10, $11,
$12, $13, $14, null
)
`, opts.ProjectID, opts.NewBucket, opts.NewEncryptedObjectKey, opts.Version, opts.NewStreamID,
INSERT INTO objects (
project_id, bucket_name, object_key, version, stream_id,
expires_at, status, segment_count,
encryption,
encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
zombie_deletion_deadline
) VALUES (
$1, $2, $3, $4, $5,
$6,`+committedStatus+`, $7,
$8,
$9, $10, $11,
$12, $13, $14, null
)`,
opts.ProjectID, opts.NewBucket, opts.NewEncryptedObjectKey, opts.Version, opts.NewStreamID,
originalObject.ExpiresAt, originalObject.SegmentCount,
encryptionParameters{&originalObject.Encryption},
originalObject.EncryptedMetadata, opts.NewEncryptedMetadataKeyNonce, opts.NewEncryptedMetadataKey,
Expand All @@ -260,24 +260,23 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (err

// TODO: optimize - we should do a bulk insert
for index, originalSegment := range segments {

_, err = db.db.ExecContext(ctx, `
INSERT INTO segments (
stream_id, position,
encrypted_key_nonce, encrypted_key,
root_piece_id, -- non-null constraint
redundancy,
encrypted_size, plain_offset, plain_size,
inline_data
) VALUES (
$1, $2,
$3, $4,
$5,
$6,
$7, $8, $9,
$10
)
`, opts.NewStreamID, originalSegment.Position.Encode(),
INSERT INTO segments (
stream_id, position,
encrypted_key_nonce, encrypted_key,
root_piece_id, -- non-null constraint
redundancy,
encrypted_size, plain_offset, plain_size,
inline_data
) VALUES (
$1, $2,
$3, $4,
$5,
$6,
$7, $8, $9,
$10
)
`, opts.NewStreamID, originalSegment.Position.Encode(),
newSegmentKeys.EncryptedKeyNonces[index], newSegmentKeys.EncryptedKeys[index],
originalSegment.RootPieceID,
redundancyScheme{&originalSegment.Redundancy},
Expand All @@ -287,26 +286,33 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (err
if err != nil {
return Error.New("unable to copy segment: %w", err)
}

}

// TODO : we need flatten references
_, err = db.db.ExecContext(ctx, `
INSERT INTO segment_copies (
stream_id, ancestor_stream_id
) VALUES (
$1, $2
)
`, opts.NewStreamID, originalObject.StreamID)
INSERT INTO segment_copies (
stream_id, ancestor_stream_id
) VALUES (
$1, $2
)
`, opts.NewStreamID, originalObject.StreamID)
if err != nil {
return Error.New("unable to copy object: %w", err)
}

return nil
})
if err != nil {
return err
return Object{}, err
}

copyObject := originalObject
copyObject.StreamID = opts.NewStreamID
copyObject.ObjectKey = ObjectKey(opts.NewEncryptedObjectKey)
copyObject.EncryptedMetadataEncryptedKey = opts.NewEncryptedMetadataKey
copyObject.EncryptedMetadataNonce = opts.NewEncryptedMetadataKeyNonce

mon.Meter("finish_copy_object").Mark(1)

return nil
return copyObject, nil
}
49 changes: 49 additions & 0 deletions satellite/metabase/copy_object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,55 @@ func TestFinishCopyObject(t *testing.T) {
}.Check(ctx, t, db)
})

t.Run("returned object", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)

objStream := metabasetest.RandObjectStream()
copyStream := metabasetest.RandObjectStream()
copyStream.ProjectID = objStream.ProjectID
copyStream.BucketName = objStream.BucketName

originalObj := metabasetest.CreateTestObject{
CommitObject: &metabase.CommitObject{
ObjectStream: objStream,
EncryptedMetadata: testrand.Bytes(64),
EncryptedMetadataNonce: testrand.Nonce().Bytes(),
EncryptedMetadataEncryptedKey: testrand.Bytes(265),
},
}.Run(ctx, t, db, objStream, 0)

expectedCopyObject := originalObj
expectedCopyObject.ObjectKey = copyStream.ObjectKey
expectedCopyObject.StreamID = copyStream.StreamID
expectedCopyObject.EncryptedMetadataEncryptedKey = testrand.Bytes(32)
expectedCopyObject.EncryptedMetadataNonce = testrand.Nonce().Bytes()

metabasetest.FinishCopyObject{
Opts: metabase.FinishCopyObject{
ObjectStream: objStream,
NewBucket: copyStream.BucketName,
NewStreamID: copyStream.StreamID,
NewEncryptedObjectKey: []byte(copyStream.ObjectKey),
NewEncryptedMetadataKey: expectedCopyObject.EncryptedMetadataEncryptedKey,
NewEncryptedMetadataKeyNonce: expectedCopyObject.EncryptedMetadataNonce,
},
Result: expectedCopyObject,
}.Check(ctx, t, db)

metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(originalObj),
metabase.RawObject(expectedCopyObject),
},
Copies: []metabase.RawCopy{
{
StreamID: copyStream.StreamID,
AncestorStreamID: objStream.StreamID,
},
},
}.Check(ctx, t, db)
})

t.Run("finish copy object with existing metadata", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)

Expand Down
68 changes: 32 additions & 36 deletions satellite/metabase/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
Expand Down Expand Up @@ -330,18 +332,16 @@ func TestGetSegmentByPosition(t *testing.T) {
newEncryptedMetadataKeyNonce := testrand.Nonce()
newEncryptedMetadataKey := testrand.Bytes(32)

metabasetest.FinishCopyObject{
Opts: metabase.FinishCopyObject{
NewStreamID: copyObjStream.StreamID,
NewBucket: copyObjStream.BucketName,
ObjectStream: obj.ObjectStream,
NewSegmentKeys: newEncryptedKeyNonces,
NewEncryptedObjectKey: []byte(copyObjStream.ObjectKey),
NewEncryptedMetadataKeyNonce: newEncryptedMetadataKeyNonce.Bytes(),
NewEncryptedMetadataKey: newEncryptedMetadataKey,
},
ErrText: "",
}.Check(ctx, t, db)
_, err := db.FinishCopyObject(ctx, metabase.FinishCopyObject{
NewStreamID: copyObjStream.StreamID,
NewBucket: copyObjStream.BucketName,
ObjectStream: obj.ObjectStream,
NewSegmentKeys: newEncryptedKeyNonces,
NewEncryptedObjectKey: []byte(copyObjStream.ObjectKey),
NewEncryptedMetadataKeyNonce: newEncryptedMetadataKeyNonce.Bytes(),
NewEncryptedMetadataKey: newEncryptedMetadataKey,
})
require.NoError(t, err)

expectedSegment := metabase.Segment{
StreamID: obj.StreamID,
Expand Down Expand Up @@ -521,18 +521,16 @@ func TestGetSegmentByPosition(t *testing.T) {
newEncryptedMetadataKeyNonce := testrand.Nonce()
newEncryptedMetadataKey := testrand.Bytes(32)

metabasetest.FinishCopyObject{
Opts: metabase.FinishCopyObject{
ObjectStream: obj.ObjectStream,
NewStreamID: copyObjStream.StreamID,
NewBucket: copyObjStream.BucketName,
NewSegmentKeys: newEncryptedKeyNonces,
NewEncryptedObjectKey: []byte(copyObjStream.ObjectKey),
NewEncryptedMetadataKeyNonce: newEncryptedMetadataKeyNonce.Bytes(),
NewEncryptedMetadataKey: newEncryptedMetadataKey,
},
ErrText: "",
}.Check(ctx, t, db)
_, err := db.FinishCopyObject(ctx, metabase.FinishCopyObject{
ObjectStream: obj.ObjectStream,
NewStreamID: copyObjStream.StreamID,
NewBucket: copyObjStream.BucketName,
NewSegmentKeys: newEncryptedKeyNonces,
NewEncryptedObjectKey: []byte(copyObjStream.ObjectKey),
NewEncryptedMetadataKeyNonce: newEncryptedMetadataKeyNonce.Bytes(),
NewEncryptedMetadataKey: newEncryptedMetadataKey,
})
require.NoError(t, err)

expectedSegment := metabase.Segment{
StreamID: obj.StreamID,
Expand Down Expand Up @@ -698,18 +696,16 @@ func TestGetSegmentByPosition(t *testing.T) {
newEncryptedMetadataKeyNonce := testrand.Nonce()
newEncryptedMetadataKey := testrand.Bytes(32)

metabasetest.FinishCopyObject{
Opts: metabase.FinishCopyObject{
ObjectStream: obj.ObjectStream,
NewStreamID: copyObjStream.StreamID,
NewBucket: copyObjStream.BucketName,
NewSegmentKeys: newEncryptedKeyNonces,
NewEncryptedObjectKey: []byte(copyObjStream.ObjectKey),
NewEncryptedMetadataKeyNonce: newEncryptedMetadataKeyNonce.Bytes(),
NewEncryptedMetadataKey: newEncryptedMetadataKey,
},
ErrText: "",
}.Check(ctx, t, db)
_, err := db.FinishCopyObject(ctx, metabase.FinishCopyObject{
ObjectStream: obj.ObjectStream,
NewStreamID: copyObjStream.StreamID,
NewBucket: copyObjStream.BucketName,
NewSegmentKeys: newEncryptedKeyNonces,
NewEncryptedObjectKey: []byte(copyObjStream.ObjectKey),
NewEncryptedMetadataKeyNonce: newEncryptedMetadataKeyNonce.Bytes(),
NewEncryptedMetadataKey: newEncryptedMetadataKey,
})
require.NoError(t, err)

expectedSegment := metabase.Segment{
StreamID: obj.StreamID,
Expand Down
9 changes: 5 additions & 4 deletions satellite/metabase/metabasetest/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
Expand Down Expand Up @@ -317,10 +319,9 @@ func (cc CreateObjectCopy) Run(ctx *testcontext.Context, t testing.TB, db *metab
NewEncryptedMetadataKey: testrand.Bytes(32),
}
}
FinishCopyObject{
Opts: *opts,
ErrText: "",
}.Check(ctx, t, db)

_, err := db.FinishCopyObject(ctx, *opts)
require.NoError(t, err)

copyObj := cc.OriginalObject
copyObj.StreamID = copyStream.StreamID
Expand Down
6 changes: 5 additions & 1 deletion satellite/metabase/metabasetest/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,14 +675,18 @@ func (step BeginCopyObject) Check(ctx *testcontext.Context, t testing.TB, db *me
// FinishCopyObject is for testing metabase.FinishCopyObject.
type FinishCopyObject struct {
Opts metabase.FinishCopyObject
Result metabase.Object
ErrClass *errs.Class
ErrText string
}

// Check runs the test.
func (step FinishCopyObject) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) {
err := db.FinishCopyObject(ctx, step.Opts)
result, err := db.FinishCopyObject(ctx, step.Opts)
checkError(t, err, step.ErrClass, step.ErrText)

diff := cmp.Diff(step.Result, result, cmpopts.EquateApproxTime(5*time.Second))
require.Zero(t, diff)
}

// GetProjectSegmentCount is for testing metabase.GetProjectSegmentCount.
Expand Down
13 changes: 11 additions & 2 deletions satellite/metainfo/endpoint_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -1870,7 +1870,7 @@ func (endpoint *Endpoint) FinishCopyObject(ctx context.Context, req *pb.ObjectFi
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}

err = endpoint.metabase.FinishCopyObject(ctx, metabase.FinishCopyObject{
object, err := endpoint.metabase.FinishCopyObject(ctx, metabase.FinishCopyObject{
ObjectStream: metabase.ObjectStream{
ProjectID: keyInfo.ProjectID,
BucketName: string(streamID.Bucket),
Expand All @@ -1889,7 +1889,16 @@ func (endpoint *Endpoint) FinishCopyObject(ctx context.Context, req *pb.ObjectFi
return nil, endpoint.convertMetabaseErr(err)
}

return &pb.ObjectFinishCopyResponse{}, nil
// we can return nil redundancy because this request won't be used for downloading
protoObject, err := endpoint.objectToProto(ctx, object, nil)
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}

return &pb.ObjectFinishCopyResponse{
Object: protoObject,
}, nil
}

// protobufkeysToMetabase converts []*pb.EncryptedKeyAndNonce to []metabase.EncryptedKeyAndNonce.
Expand Down

0 comments on commit c0297ba

Please sign in to comment.