Skip to content

Commit

Permalink
satellite/metainfo: support CopyObject source object custom version
Browse files Browse the repository at this point in the history
We should be able to do copy object operation on version different than
latest. This change wires ObjectVersion from CopyObject request into
metabase code to be able to choose specific version of source copy object.

Change-Id: Ib41a26d3de5ba5b0cce93ea03835e1e5a6c4d18a
  • Loading branch information
mniewrzal authored and Storj Robot committed Jan 25, 2024
1 parent 3772788 commit 0269ed3
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 10 deletions.
3 changes: 2 additions & 1 deletion satellite/metabase/copy_object.go
Expand Up @@ -22,6 +22,7 @@ type BeginCopyObjectResult BeginMoveCopyResults
// BeginCopyObject holds all data needed begin copy object method.
type BeginCopyObject struct {
ObjectLocation
Version Version

// VerifyLimits holds a callback by which the caller can interrupt the copy
// if it turns out the copy would exceed a limit.
Expand All @@ -30,7 +31,7 @@ type BeginCopyObject struct {

// BeginCopyObject collects all data needed to begin object copy procedure.
func (db *DB) BeginCopyObject(ctx context.Context, opts BeginCopyObject) (_ BeginCopyObjectResult, err error) {
result, err := db.beginMoveCopyObject(ctx, opts.ObjectLocation, CopySegmentLimit, opts.VerifyLimits)
result, err := db.beginMoveCopyObject(ctx, opts.ObjectLocation, opts.Version, CopySegmentLimit, opts.VerifyLimits)
if err != nil {
return BeginCopyObjectResult{}, err
}
Expand Down
85 changes: 85 additions & 0 deletions satellite/metabase/copy_object_test.go
Expand Up @@ -83,6 +83,32 @@ func TestBeginCopyObject(t *testing.T) {
Segments: expectedRawSegments,
}.Check(ctx, t, db)
})

t.Run("begin copy object multiple versions", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)

obj := obj
obj.Version = 1
obj1 := metabasetest.CreateObject(ctx, t, db, obj, 0)
obj.Version = 2
obj2 := metabasetest.CreateObjectVersioned(ctx, t, db, obj, 0)
obj.Version = 3
obj3 := metabasetest.CreateObjectVersioned(ctx, t, db, obj, 0)

for _, object := range []metabase.Object{obj1, obj2, obj3} {
metabasetest.BeginCopyObject{
Opts: metabase.BeginCopyObject{
ObjectLocation: object.Location(),
Version: object.Version,
},
Result: metabase.BeginCopyObjectResult{
StreamID: object.StreamID,
Version: object.Version,
EncryptionParameters: object.Encryption,
},
}.Check(ctx, t, db)
}
})
})
}

Expand Down Expand Up @@ -1453,5 +1479,64 @@ func TestFinishCopyObject(t *testing.T) {
},
}.Check(ctx, t, db)
})

t.Run("copy object from versioned source object", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)

obj.Version = 1
sourceObject, sourceSegments := metabasetest.CreateTestObject{}.Run(ctx, t, db, obj, 1)
obj.Version = 2
latestObject := metabasetest.CreateObjectVersioned(ctx, t, db, obj, 0)

results, err := db.BeginCopyObject(ctx, metabase.BeginCopyObject{
ObjectLocation: sourceObject.Location(),
Version: sourceObject.Version,
})
require.NoError(t, err)

expectedCopiedObject := metabase.Object{
ObjectStream: metabase.ObjectStream{
ProjectID: sourceObject.ProjectID,
BucketName: sourceObject.BucketName,
ObjectKey: metabase.ObjectKey("new key"),
StreamID: testrand.UUID(),
Version: 1,
},
Status: metabase.CommittedVersioned,
SegmentCount: 1,

CreatedAt: time.Now(),
TotalPlainSize: sourceObject.TotalPlainSize,
TotalEncryptedSize: sourceObject.TotalEncryptedSize,
FixedSegmentSize: sourceObject.FixedSegmentSize,
Encryption: sourceObject.Encryption,
}

expectedTargetSegment := sourceSegments[0]
expectedTargetSegment.StreamID = expectedCopiedObject.StreamID
expectedTargetSegment.EncryptedETag = nil

metabasetest.FinishCopyObject{
Opts: metabase.FinishCopyObject{
ObjectStream: sourceObject.ObjectStream,
NewBucket: sourceObject.BucketName,
NewEncryptedObjectKey: metabase.ObjectKey("new key"),
NewStreamID: expectedCopiedObject.StreamID,
NewVersioned: true,

NewSegmentKeys: results.EncryptedKeysNonces,
},
Result: expectedCopiedObject,
}.Check(ctx, t, db)

metabasetest.Verify{
Objects: []metabase.RawObject{
metabase.RawObject(sourceObject),
metabase.RawObject(latestObject),
metabase.RawObject(expectedCopiedObject),
},
Segments: metabasetest.SegmentsToRaw(append(sourceSegments, expectedTargetSegment)),
}.Check(ctx, t, db)
})
})
}
23 changes: 14 additions & 9 deletions satellite/metabase/move_object.go
Expand Up @@ -43,7 +43,8 @@ type BeginMoveCopyResults struct {

// BeginMoveObject collects all data needed to begin object move procedure.
func (db *DB) BeginMoveObject(ctx context.Context, opts BeginMoveObject) (_ BeginMoveObjectResult, err error) {
result, err := db.beginMoveCopyObject(ctx, opts.ObjectLocation, MoveSegmentLimit, nil)
// TODO(ver) add support specifying move source object version
result, err := db.beginMoveCopyObject(ctx, opts.ObjectLocation, 0, MoveSegmentLimit, nil)
if err != nil {
return BeginMoveObjectResult{}, err
}
Expand All @@ -52,20 +53,24 @@ func (db *DB) BeginMoveObject(ctx context.Context, opts BeginMoveObject) (_ Begi
}

// beginMoveCopyObject collects all data needed to begin object move/copy procedure.
func (db *DB) beginMoveCopyObject(ctx context.Context, location ObjectLocation, segmentLimit int64, verifyLimits func(encryptedObjectSize int64, nSegments int64) error) (result BeginMoveCopyResults, err error) {
func (db *DB) beginMoveCopyObject(ctx context.Context, location ObjectLocation, version Version, segmentLimit int64, verifyLimits func(encryptedObjectSize int64, nSegments int64) error) (result BeginMoveCopyResults, err error) {
defer mon.Task()(&ctx)(&err)

if err := location.Verify(); err != nil {
return BeginMoveCopyResults{}, err
}

object, err := db.GetObjectLastCommitted(ctx, GetObjectLastCommitted{
ObjectLocation: ObjectLocation{
ProjectID: location.ProjectID,
BucketName: location.BucketName,
ObjectKey: location.ObjectKey,
},
})
var object Object
if version > 0 {
object, err = db.GetObjectExactVersion(ctx, GetObjectExactVersion{
ObjectLocation: location,
Version: version,
})
} else {
object, err = db.GetObjectLastCommitted(ctx, GetObjectLastCommitted{
ObjectLocation: location,
})
}
if err != nil {
return BeginMoveCopyResults{}, err
}
Expand Down
15 changes: 15 additions & 0 deletions satellite/metainfo/endpoint_object.go
Expand Up @@ -1998,6 +1998,10 @@ func (endpoint *Endpoint) BeginCopyObject(ctx context.Context, req *pb.ObjectBeg
}
}

if err := validateObjectVersion(req.ObjectVersion); err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}

// if source and target buckets are different, we need to check their geofencing configs
if !bytes.Equal(req.Bucket, req.NewBucket) {
// TODO we may try to combine those two DB calls into single one
Expand All @@ -2022,12 +2026,23 @@ func (endpoint *Endpoint) BeginCopyObject(ctx context.Context, req *pb.ObjectBeg
}
}

var version metabase.Version
if len(req.ObjectVersion) != 0 {
var sv metabase.StreamVersionID
sv, err = metabase.StreamVersionIDFromBytes(req.ObjectVersion)
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
}
version = sv.Version()
}

result, err := endpoint.metabase.BeginCopyObject(ctx, metabase.BeginCopyObject{
ObjectLocation: metabase.ObjectLocation{
ProjectID: keyInfo.ProjectID,
BucketName: string(req.Bucket),
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
},
Version: version,
VerifyLimits: func(encryptedObjectSize int64, nSegments int64) error {
return endpoint.checkUploadLimitsForNewObject(ctx, keyInfo, encryptedObjectSize, nSegments)
},
Expand Down
46 changes: 46 additions & 0 deletions satellite/metainfo/endpoint_object_test.go
Expand Up @@ -3032,5 +3032,51 @@ func TestEndpoint_Object_No_StorageNodes_Versioning(t *testing.T) {
require.Error(t, err)
require.True(t, errs2.IsRPC(err, rpcstatus.NotFound))
})

t.Run("begin copy object from older version", func(t *testing.T) {
defer ctx.Check(deleteBucket(bucketName))

require.NoError(t, createBucket(bucketName))
require.NoError(t, planet.Satellites[0].API.Buckets.Service.EnableBucketVersioning(ctx, []byte(bucketName), projectID))

objectKeyA := "test-object-a"
versionIDs := make([][]byte, 2)
for i := range versionIDs {
object, err := planet.Uplinks[0].UploadWithOptions(ctx, satelliteSys, bucketName, objectKeyA, testrand.Bytes(100), nil)
require.NoError(t, err)

versionIDs[i] = object.Version
}

objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
require.NoError(t, err)
require.Len(t, objects, 2)

_, err = planet.Satellites[0].Metainfo.Endpoint.BeginCopyObject(ctx, &pb.BeginCopyObjectRequest{
Header: &pb.RequestHeader{ApiKey: apiKey},
Bucket: []byte(bucketName),
EncryptedObjectKey: []byte(objects[0].ObjectKey),
ObjectVersion: versionIDs[0],

NewBucket: []byte(bucketName),
NewEncryptedObjectKey: []byte("copied-object"),
})
require.NoError(t, err)

// NOT existing source version
nonExistingVersion := versionIDs[0]
nonExistingVersion[0]++
_, err = planet.Satellites[0].Metainfo.Endpoint.BeginCopyObject(ctx, &pb.BeginCopyObjectRequest{
Header: &pb.RequestHeader{ApiKey: apiKey},
Bucket: []byte(bucketName),
EncryptedObjectKey: []byte(objects[0].ObjectKey),
ObjectVersion: nonExistingVersion,

NewBucket: []byte(bucketName),
NewEncryptedObjectKey: []byte("copied-object"),
})
require.Error(t, err)
require.True(t, errs2.IsRPC(err, rpcstatus.NotFound))
})
})
}

0 comments on commit 0269ed3

Please sign in to comment.