Skip to content

Commit

Permalink
satellite/metainfo: BeginCopyObject and FinishCopyObject
Browse files Browse the repository at this point in the history
Metainfo endpoints to use server-side copy.

Fixes #4475

Change-Id: Ided06aed9e6187d6d8f030e95dda019ba78fff95
  • Loading branch information
Fadila82 committed Feb 24, 2022
1 parent fbe2680 commit 2d4760f
Show file tree
Hide file tree
Showing 2 changed files with 334 additions and 0 deletions.
238 changes: 238 additions & 0 deletions satellite/metainfo/endpoint_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -1654,6 +1654,244 @@ func (endpoint *Endpoint) FinishMoveObject(ctx context.Context, req *pb.ObjectFi
return &pb.ObjectFinishMoveResponse{}, nil
}

// Server side copy.

// BeginCopyObject begins copying object to different key.
func (endpoint *Endpoint) BeginCopyObject(ctx context.Context, req *pb.ObjectBeginCopyRequest) (resp *pb.ObjectBeginCopyResponse, err error) {
defer mon.Task()(&ctx)(&err)

err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}

now := time.Now()
keyInfo, err := endpoint.validateAuthN(ctx, req.Header,
verifyPermission{
action: macaroon.Action{
Op: macaroon.ActionRead,
Bucket: req.Bucket,
EncryptedPath: req.EncryptedObjectKey,
Time: now,
},
},
verifyPermission{
action: macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: req.NewBucket,
EncryptedPath: req.NewEncryptedObjectKey,
Time: now,
},
},
verifyPermission{
action: macaroon.Action{
Op: macaroon.ActionWrite,
Bucket: req.NewBucket,
EncryptedPath: req.NewEncryptedObjectKey,
Time: now,
},
},
)
if err != nil {
return nil, err
}

for _, bucket := range [][]byte{req.Bucket, req.NewBucket} {
err = endpoint.validateBucket(ctx, bucket)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}
}

// we are verifying existence of target bucket only because source bucket
// will be checked while quering source object
// TODO this needs to be optimized to avoid DB call on each request
newBucketPlacement, err := endpoint.buckets.GetBucketPlacement(ctx, req.NewBucket, keyInfo.ProjectID)
if err != nil {
if storj.ErrBucketNotFound.Has(err) {
return nil, rpcstatus.Errorf(rpcstatus.NotFound, "bucket not found: %s", req.NewBucket)
}
endpoint.log.Error("unable to check bucket", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}

// if source and target buckets are different, we need to check their geofencing configs
if !bytes.Equal(req.Bucket, req.NewBucket) {
oldBucketPlacement, err := endpoint.buckets.GetBucketPlacement(ctx, req.Bucket, keyInfo.ProjectID)
if err != nil {
if storj.ErrBucketNotFound.Has(err) {
return nil, rpcstatus.Errorf(rpcstatus.NotFound, "bucket not found: %s", req.Bucket)
}
endpoint.log.Error("unable to check bucket", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if oldBucketPlacement != newBucketPlacement {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "copying object to bucket with different placement policy is not (yet) supported")
}
}

result, err := endpoint.metabase.BeginCopyObject(ctx, metabase.BeginCopyObject{
ObjectLocation: metabase.ObjectLocation{
ProjectID: keyInfo.ProjectID,
BucketName: string(req.Bucket),
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
},
Version: metabase.DefaultVersion,
})
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
}

response, err := convertBeginCopyObjectResults(result)
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}

satStreamID, err := endpoint.packStreamID(ctx, &internalpb.StreamID{
Bucket: req.Bucket,
EncryptedObjectKey: req.EncryptedObjectKey,
Version: int32(metabase.DefaultVersion),
StreamId: result.StreamID[:],
EncryptionParameters: &pb.EncryptionParameters{
CipherSuite: pb.CipherSuite(result.EncryptionParameters.CipherSuite),
BlockSize: int64(result.EncryptionParameters.BlockSize),
},
Placement: int32(newBucketPlacement),
})
if err != nil {
endpoint.log.Error("internal", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}

response.StreamId = satStreamID
return response, nil
}

func convertBeginCopyObjectResults(result metabase.BeginCopyObjectResult) (*pb.ObjectBeginCopyResponse, error) {
keys := make([]*pb.EncryptedKeyAndNonce, len(result.EncryptedKeysNonces))
for i, key := range result.EncryptedKeysNonces {
var nonce storj.Nonce
var err error
if len(key.EncryptedKeyNonce) != 0 {
nonce, err = storj.NonceFromBytes(key.EncryptedKeyNonce)
if err != nil {
return nil, err
}
}

keys[i] = &pb.EncryptedKeyAndNonce{
Position: &pb.SegmentPosition{
PartNumber: int32(key.Position.Part),
Index: int32(key.Position.Index),
},
EncryptedKey: key.EncryptedKey,
EncryptedKeyNonce: nonce,
}
}

// TODO we need this becase of an uplink issue with how we are storing key and nonce
if result.EncryptedMetadataKey == nil {
streamMeta := &pb.StreamMeta{}
err := pb.Unmarshal(result.EncryptedMetadata, streamMeta)
if err != nil {
return nil, err
}
if streamMeta.LastSegmentMeta != nil {
result.EncryptedMetadataKey = streamMeta.LastSegmentMeta.EncryptedKey
result.EncryptedMetadataKeyNonce = streamMeta.LastSegmentMeta.KeyNonce
}
}

var metadataNonce storj.Nonce
var err error
if len(result.EncryptedMetadataKeyNonce) != 0 {
metadataNonce, err = storj.NonceFromBytes(result.EncryptedMetadataKeyNonce)
if err != nil {
return nil, err
}
}

return &pb.ObjectBeginCopyResponse{
EncryptedMetadataKey: result.EncryptedMetadataKey,
EncryptedMetadataKeyNonce: metadataNonce,
EncryptionParameters: &pb.EncryptionParameters{
CipherSuite: pb.CipherSuite(result.EncryptionParameters.CipherSuite),
BlockSize: int64(result.EncryptionParameters.BlockSize),
},
SegmentKeys: keys,
}, nil
}

// FinishCopyObject accepts new encryption keys for object copy and updates the corresponding object ObjectKey and segments EncryptedKey.
func (endpoint *Endpoint) FinishCopyObject(ctx context.Context, req *pb.ObjectFinishCopyRequest) (resp *pb.ObjectFinishCopyResponse, err error) {
defer mon.Task()(&ctx)(&err)

err = endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())
if err != nil {
endpoint.log.Warn("unable to collect uplink version", zap.Error(err))
}

streamID, err := endpoint.unmarshalSatStreamID(ctx, req.StreamId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}

keyInfo, err := endpoint.validateAuth(ctx, req.Header, macaroon.Action{
Op: macaroon.ActionWrite,
Time: time.Now(),
Bucket: req.NewBucket,
EncryptedPath: req.NewEncryptedMetadataKey,
})
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Unauthenticated, err.Error())
}

err = endpoint.validateBucket(ctx, req.NewBucket)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}

exists, err := endpoint.buckets.HasBucket(ctx, req.NewBucket, keyInfo.ProjectID)
if err != nil {
endpoint.log.Error("unable to check bucket", zap.Error(err))
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
} else if !exists {
return nil, rpcstatus.Errorf(rpcstatus.NotFound, "target bucket not found: %s", req.NewBucket)
}

streamUUID, err := uuid.FromBytes(streamID.StreamId)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}

newStreamID, err := uuid.New()
if err != nil {
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}

err = endpoint.metabase.FinishCopyObject(ctx, metabase.FinishCopyObject{
ObjectStream: metabase.ObjectStream{
ProjectID: keyInfo.ProjectID,
BucketName: string(streamID.Bucket),
ObjectKey: metabase.ObjectKey(streamID.EncryptedObjectKey),
Version: metabase.DefaultVersion,
StreamID: streamUUID,
},
NewStreamID: newStreamID,
NewSegmentKeys: protobufkeysToMetabase(req.NewSegmentKeys),
NewBucket: string(req.NewBucket),
NewEncryptedObjectKey: req.NewEncryptedObjectKey,
NewEncryptedMetadataKeyNonce: req.NewEncryptedMetadataKeyNonce[:],
NewEncryptedMetadataKey: req.NewEncryptedMetadataKey,
})
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
}

return &pb.ObjectFinishCopyResponse{}, nil
}

// protobufkeysToMetabase converts []*pb.EncryptedKeyAndNonce to []metabase.EncryptedKeyAndNonce.
func protobufkeysToMetabase(protoKeys []*pb.EncryptedKeyAndNonce) []metabase.EncryptedKeyAndNonce {
keys := make([]metabase.EncryptedKeyAndNonce, len(protoKeys))
Expand Down
96 changes: 96 additions & 0 deletions satellite/metainfo/endpoint_object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1235,3 +1235,99 @@ func testDeleteObject(t *testing.T, createObject func(ctx context.Context, t *te
})
})
}

func TestEndpoint_CopyObject(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]
satelliteSys := planet.Satellites[0]
uplnk := planet.Uplinks[0]

// upload a small inline object
err := uplnk.Upload(ctx, planet.Satellites[0], "testbucket", "testobject", testrand.Bytes(1*memory.KiB))
require.NoError(t, err)
objects, err := satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx)
require.NoError(t, err)
require.Len(t, objects, 1)

getResp, err := satelliteSys.API.Metainfo.Endpoint.GetObject(ctx, &pb.ObjectGetRequest{
Header: &pb.RequestHeader{
ApiKey: apiKey.SerializeRaw(),
},
Bucket: []byte("testbucket"),
EncryptedPath: []byte(objects[0].ObjectKey),
})
require.NoError(t, err)

testEncryptedMetadataNonce := testrand.Nonce()
// update the object metadata
beginResp, err := satelliteSys.API.Metainfo.Endpoint.BeginCopyObject(ctx, &pb.ObjectBeginCopyRequest{
Header: &pb.RequestHeader{
ApiKey: apiKey.SerializeRaw(),
},
Bucket: getResp.Object.Bucket,
EncryptedObjectKey: getResp.Object.EncryptedPath,
NewBucket: []byte("testbucket"),
NewEncryptedObjectKey: []byte("newencryptedkey"),
})
require.NoError(t, err)
assert.Len(t, beginResp.SegmentKeys, 1)
assert.Equal(t, beginResp.EncryptedMetadataKey, objects[0].EncryptedMetadataEncryptedKey)
assert.Equal(t, beginResp.EncryptedMetadataKeyNonce.Bytes(), objects[0].EncryptedMetadataNonce)

segmentKeys := pb.EncryptedKeyAndNonce{
Position: beginResp.SegmentKeys[0].Position,
EncryptedKeyNonce: testrand.Nonce(),
EncryptedKey: []byte("newencryptedkey"),
}

_, err = satelliteSys.API.Metainfo.Endpoint.FinishCopyObject(ctx, &pb.ObjectFinishCopyRequest{
Header: &pb.RequestHeader{
ApiKey: apiKey.SerializeRaw(),
},
StreamId: getResp.Object.StreamId,
NewBucket: []byte("testbucket"),
NewEncryptedObjectKey: []byte("newobjectkey"),
NewEncryptedMetadataKeyNonce: testEncryptedMetadataNonce,
NewEncryptedMetadataKey: []byte("encryptedmetadatakey"),
NewSegmentKeys: []*pb.EncryptedKeyAndNonce{&segmentKeys},
})
require.NoError(t, err)

objectsAfterCopy, err := satelliteSys.API.Metainfo.Metabase.TestingAllObjects(ctx)
require.NoError(t, err)
require.Len(t, objectsAfterCopy, 2)

getCopyResp, err := satelliteSys.API.Metainfo.Endpoint.GetObject(ctx, &pb.ObjectGetRequest{
Header: &pb.RequestHeader{
ApiKey: apiKey.SerializeRaw(),
},
Bucket: []byte("testbucket"),
EncryptedPath: []byte("newobjectkey"),
})
require.NoError(t, err, objectsAfterCopy[1])
require.NotEqual(t, getResp.Object.StreamId, getCopyResp.Object.StreamId)
require.NotZero(t, getCopyResp.Object.StreamId)
require.Equal(t, getResp.Object.InlineSize, getCopyResp.Object.InlineSize)

// compare segments
originalSegment, err := satelliteSys.API.Metainfo.Endpoint.DownloadSegment(ctx, &pb.SegmentDownloadRequest{
Header: &pb.RequestHeader{
ApiKey: apiKey.SerializeRaw(),
},
StreamId: getResp.Object.StreamId,
CursorPosition: segmentKeys.Position,
})
require.NoError(t, err)
copiedSegment, err := satelliteSys.API.Metainfo.Endpoint.DownloadSegment(ctx, &pb.SegmentDownloadRequest{
Header: &pb.RequestHeader{
ApiKey: apiKey.SerializeRaw(),
},
StreamId: getCopyResp.Object.StreamId,
CursorPosition: segmentKeys.Position,
})
require.NoError(t, err)
require.Equal(t, originalSegment.EncryptedInlineData, copiedSegment.EncryptedInlineData)
})
}

0 comments on commit 2d4760f

Please sign in to comment.